Oliver Hall

storm data partitioning

While working with Storm over the past few days, I came across an interesting bug. We’d noticed that the output of our Storm-based statistics system would shift between a couple of different values, seemingly at random. The cause was not obvious, but when we found it, we felt it was interesting enough to be worth sharing.

splitting hairs

A week or so ago we’d added a number of repartitioning operations into our Storm topology, in order to split the topology better, and make use of varying levels of parallelism. Essentially, a Storm topology is made up of a number of input sources (Spouts), and various processing sections (Bolts), linked through each other however you choose. We’re using Trident, an abstraction on top of Storm that deals with streams of data flowing through Functions, that perform the actual processing. Trident then combines these into bolts, working out the most appropriate places to split streams.

To go deeper, where it actually splits one bolt from another is on repartitioning operations. Repartitioning operations determine which tuples are to be sent to which of the various bolts processing the next stage in the pipeline. For instance, the shuffle operation uses a round-robin algorithm to allocate incoming tuples evenly across all processing partitions. So if you have two Trident functions on the same stream, these will be formed into the same bolt unless there is a partitioning operation of some kind between them.

We decided to start using these to split up our biggest bolts into a number of smaller, more manageable sections, allowing us to scale the parallelism (number of partitions executing simultaneously) of each of these up or down depending upon how fast each was. shuffle() operations at the end of each input source stream ensured that not only could each input source be scaled independently, but the data produced should be as evenly distributed as possible.

The next part of the topology performs aggregations and storage. This involved a groupBy() after various processing operations, before finally aggregating our data and storing it. This groupBy() was performed on the key by which our data was persisted, which (we thought) would group data with the same key on the same partition, ensuring that each partition contained a subset of the overall keyspace.

However, groupBy() actually works in a different manner. It groups by the field(s) passed to it, but only within a partition. This meant that if two bits of data with the same key ended up on different partitions, then each of those partitions would end up having a record for that key.

IMG_20131203_190030

This caused some interesting FUN, as we performed DRPC queries on the persisted state, which would then return partition_count records per key. We had made an assumption in the code that makes the DRPC queries that a single record would be returned per key. This seemed reasonable enough, given that we were aggregating by key, and any aggregations of values for a key would be done within Storm itself. Given this assumption, we read the values returned by DRPC into result objects, which were then placed into a SortedSet. *KLAXON* This meant that the values that ended up in the set could be any one of the returned duplicates, meaning shifting data. Nasty…

IMG_20131203_190022

one horse race

The solution involved going back to the descriptions of partition operations. We had assumed that groupBy() was a partitioning operation, while actually it was merely grouping within a partition. The solution proved to be as simple as adding a partition() command for the same key that we’re grouping by, thus ensuring that all records for a given key are in the same partition by the time groupBy() is called. Magic! The duplicates are gone, and our data is stable again.

blog comments powered by Disqus