Andy Toone

the state we’re in

The Storm Trident tutorial demonstrates the use of a memory-mapped state object to count unique words in incoming sentences. In essence, each sentence is split into words and the words are sent to nodes in the cluster to be counted. The choice of which node each word is sent to is based on the word itself – so the word ‘dog’ will always be sent to the same node. As the word arrives, it is counted by a Count object that is incremented and stored in the local state.
TridentState wordCounts = topology.newStream("spout1", spout)
       .each(new Fields("sentence"), new Split(), new Fields("word"))
       .groupBy(new Fields("word"))
       .persistentAggregate(new MemoryMapState.Factory(), new Count(), new Fields("count"))
In this code snippet, the groupBy method performs two functions. It tells Trident how to distribute the incoming values across the cluster, and it informs the aggregate method the ‘key’ by which counts are aggregated.

Retrieving the counts for a set of words is a question of sending out those key values to the appropriate nodes, and retrieving the count value.

topology.newDRPCStream("words")
       .each(new Fields("args"), new Split(), new Fields("word"))
       .groupBy(new Fields("word"))
       .stateQuery(wordCounts, new Fields("word"), new MapGet(), new Fields("count"))
       .each(new Fields("count"), new FilterNull())
       .aggregate(new Fields("count"), new Sum(), new Fields("sum"));
Notice that once again we groupBy the word to be queried – ensuring it is routed to the correct node. Then we explicitly get the count value from our state object, using the word value as a key.

we can do better

In practice, working systems have to do much more than this. It’s rare that we want to count by a single value. We could imagine wanting to count Tweets by users, and tracking how the subject matter changes over time. In this case we would want to distinguish user, word and period to group words by the user that Tweeted them, and the time period in which they were mentioned.

Happily, Trident still works well with this example. The first code snippet might change to look a little like this.
TridentState wordCounts = topology.newStream("spout1", spout)
       .each(new Fields("tweet"), new TweetSplitter(), new Fields("user","word","period"))
       .groupBy(new Fields("user","word","period"))
       .persistentAggregate(new MemoryMapState.Factory(), new Count(), new Fields("count"))
Here, a TweetSplitter object emits a series of user/word/period tuples for each word in a tweet. For tracking these three values, we only need to add the extra fields to the groupBy method, and Trident handles the rest.

We can query this state in a similar manner, providing the extra fields to the stateQuery method:

...
   .stateQuery(wordCounts, new Fields("user","word","period"), new MapGet(), new Fields("count"))
...
And so we can easily find out how many times a user mentioned ‘Andy Murray’ in a given period.

getting complicated

There are a couple of problems with this example though. The first is that, left connected to Twitter, our in-memory state object would grow boundlessly until Java goes pop. The second is that it becomes quite expensive to query the state. Imagine for instance that you wished to track mentions of the Wimbledon Seeds for 2013, over the week leading to the finals. If you store those mentions in one minute periods, you are creating a key space of 64 Seeds, and 60 * 24 * 7 = 10,000 periods.

Though much of this keyspace may be sparsely populated, to fully query it you have to issue 640,000 state queries. This is where the cost of distribution makes itself felt. Passing round tuples takes some time, so even if the latency of the cluster is low and a tuple can be handled in a hundredth of a millisecond, a full query would still take six seconds. Even our short, simple case is too slow, and we can imagine far more intense queries that would be dramatically worse.

state the nature of your emergency

All is not lost though. The important thing to realise is that the MapState and MapGet objects are defined by interfaces, and can be overridden with our own, smarter versions. When a state object is updated, at its core it must provide a backing store that has simple map-like get and put methods:
public T get(List<Object> key);

public void put(List<Object> key, T value);
Each takes a List of Object keys – and these match up to the fields specified in the groupBy clause. The important thing to note is that we can use the knowledge that we’re storing values by a composite key with a period value to evict older values once they reach a certain age. Depending on our use case, eviction might mean deletion, or it might involve archiving the value to a slower store. In this way we can solve the problem of endlessly growing state.

state of the nation

But what about our slow queries? Our naive implementation required that we iterate over a potentially enormous key space to retrieve all of the possible counts. Can we improve this too?

The fact that the state query method also takes an interface allows us to make dramatic improvements. The key here is that we can supply our implementation of the BaseQueryFunction interface to extract the information we need. This batches together queries and then applies them to the state object, which in its turn contains our backing store.
    public T batchRetrieve(ReadOnlyMapState state, List
    
      args); 
    
As we know the shape of things being stored, and the nature of the key, we can perform smarter queries. Rather than querying the entire key space with a vast number of tuples, we can extract only the data that is relevant. We can also perform other queries – such as most recent, aggregation over a range of periods and grouping by similar terms. All of these can be carried out in-process, without the cost of transmitting tuples around the cluster – and as a consequence are orders of magnitude faster than our naive approach.

Once we have a smart state object and can perform fast queries against that, we can also consider how we distribute our data set across the cluster to ensure useful data locality. I’ll discuss that in another post.

blog comments powered by Disqus