Max Tomassi

lambda or not lambda? that is the question

The concept of lambda architecture was first introduced by Nathan Marz a couple of years ago and since then it has gained popularity as a solution for real-time processing use cases. The basic idea is to have two main components: the “speed layer” (e.g. Storm), that is able to process data in real-time with low-latency, and the “batch layer” (e.g. Hadoop), capable of (re)processing historical data but in a slower fashion. Combining the best of the two components allows you to serve high quality data with a very low response time.

As you might know we gave the lambda architecture a chance: we implemented the speed layer using Storm and we started implementing the batch layer using Hadoop. Having to implement (almost) the same processing logic twice and to maintain two different systems is complex and time consuming. And a question was raised, inspired by an interesting article by Jay Kreps from Linkedin: do we actually need a lambda architecture or we can achieve a similar behaviour just using the real-time layer? In this post I’ll talk about how we are re-thinking the architecture of our real-time analytics system, Helios.

kafka for feeding the system

Helios is designed to be as generic (i.e. input agnostic) as possible. It tracks and calculates statistics against various types of events generated by our data ingest systems. For example our Twitter ingest system, Canary, generates an event every time it sees a new tweet or a new favourite action. All these events are delivered to Helios by an intermediate messaging system, in order to favour low coupling. Our messaging system of choice is Apache Kafka. We came to Kafka after a not really amazing time with ActiveMQ, and its features are a really good fit for this particular use case.

As I already wrote in a previous post, Kafka is implemented as a distributed commit log. This means that it keeps messages on disk for a configurable amount of time. This also allows us to replay all the messages that are on disk, whenever necessary. Why is this great? The role of the batch layer in the lambda architecture is to reprocess historical data, in order to fix potential data issues caused by bugs discovered in the processing logic. Who will do this if we get rid of the batch layer? Thanks to Kafka we can:

  1. spin up a new real-time job alongside the existing one
  2. store processed data into a new database table
  3. switch the front-end side to read from the new table as soon as the new job has caught up with the events stream
  4. turn off the original job

Also, Kafka is great performance wise: it’s really fast in processing and serving messages even when dealing with massive commit logs.

spark for processing data

Right, so Kafka is now ready to serve us all the events we need, how are we going to process them? There are several real-time processing frameworks out there, with one of the most popular being Storm. We’ve worked with Storm for quite a while by now, but we’ve not been totally convinced. It’s not as mature as we’d like and we and we found the upgrade to the latest version rather painful. Also, it’s quite hard to debug, with its logs not being very informative.

We therefore decided to move on and try another promising piece of technology: Apache Spark. I already wrote a post talking about Spark and its main features, so I invite you to have a look at it if you’re interested. Spark is a fault-tolerant and highly scalable real-time processing system. Besides its pretty impressive features it nicely suits our architecture, with its built-in Kafka integration and a spark-cassandra-connector that makes storing data to Cassandra straightforward. This introduces the last bit of our architecture: the persistence layer.

cassandra for storing the outcome

After having received and processed data we now need to store the results of this computation. As I mentioned, Helios’ job is to provide statistics around events occurring over a large period of time. Cassandra fits this use case particularly well: thanks to the way it physically stores data, storing time series with Cassandra is very efficient. Also, it provides “distributed counters”, a feature that allows you to perform increments of a numeric value atomically. Let’s consider for example this Cassandra table:

CREATE TABLE counts (
	event_type text,
	hour timestamp,
	count counter,
	PRIMARY KEY (event_type, hour)
);

The first element of the primary key is the “partition key”: it defines how data is distributed across the cluster. All the rows with the same event_type will be distributed to the same node. The second element of the primary key, in this case hour, defines how data is clustered. Specifically the rows are physically stored ordered by that particular column. That makes it really efficient to retrieve hourly counts for the same event_type over a large period of time.

Relational databases allow you to perform arbitrary queries on a given table. Cassandra, however, being a form of key/value store is much less flexible. Usually with a relational database you think at your data model first and then about the queries. With Cassandra is the opposite: you need to think about the queries that you want be able to perform and then you define your data model accordingly. This means that you can end up storing the same data multiple times multiple times, with each variant allowing a different query. This is the price you need to pay for having both efficiency and high scalability.

work in progress

We’ve just started our migration towards a “just real-time” architecture and we’re pretty excited to go ahead with this. It will be quite a challenging period and we will obviously keep you updated on our progress. Do you have experience with real-time processing? Do you have any feedback or suggestions? We’d love to hear your opinion, so feel free to leave a comment below or to tweet us!

blog comments powered by Disqus