we write about the things we build and the things we consume
the quest for search
In our previous post about our ongoing switch from Mongo to Cassandra for the new Atlas 4.0, we mentioned our need for complex queries on the data model: while this was easily accomplished with Mongo, due to its indexing capabilities, doing the same with Cassandra is way harder as its secondary indexes are right now limited to equality comparisons over single column values.
So here it comes, ElasticSearch.
what metabroadcats want
Functionally speaking, our first requirement was the ability to index programmes' schedules per broadcaster, and doing complex queries over those schedules: in other words, queries like "What are those programmes for a given Broadcasters on a given Channel?", or "What about those programmes between given dates?"
Talking about non functional requirements, we had two strong requirements:
- Scalability: as again, our plans are to ingest more and more data from broadcasters in UK and around the world, and efficiently querying for content is a crucial capability.
- Ease of operation: as again, we love simple things which can be easily operated and automated, so that we can focus on real business.
So here it comes, ElasticSearch.
ok, so why ElasticSearch?
ElasticSearch provides support for indexing nested documents and executing queries among them, returning as a result the subset of top-level documents matching the query: this is a perfect fit for our schedule-based queries, as we are going to see in a moment.
Under the hood, all that indexing stuff is based on Lucene, the de-facto standard for full-text search, and ElasticSearch provides a very efficient way to cluster its indexes, allowing for fast and scalable indexing and querying. For the technically minded, the main bottleneck of a single Lucene indexing is its indexing performance, as it only allows one single writer per index: ElasticSearch overcomes this limitation by sharding a single index over multiple hosts, allowing for independent writes on independent index shards; making an ElasticSearch index perform and scale is so just a matter of sharding it and adding hosts in the cluster. But sharding is not the only feature offered by ElasticSearch: it also provides index replication, hence fault tolerance, hence higher availability.
All of those handy features come with no extra operational burden: ElasticSearch provides a peer-based distributed architecture, hence very easy to setup and operate. Also, it provides a very nice HTTP-based REST interface, which you can use not just for interacting with indexes, but also to actually change aspects of the cluster behavior (i.e., replication), as well as query for cluster status and related statistics.
The requirements above were fully met: here it comes, our choice.
ElasticSearch in action
As we already mentioned in a previous blog post, our data model is pretty rich, and we don’t want ElasticSearch to go through the burden of indexing it all: so we created a simple "index schema" out of our more complex model.
Here is an example, showing how we modeled our main requirement above: nested documents for schedule-based (nested) queries.
We have a base schema class:
public class ESObject {
public final static FromEsObjectToMap TO_MAP = new FromEsObjectToMap();
protected Map properties = new HashMap();
public Map toMap() {
return properties;
}
private static class FromEsObjectToMap implements Function<ESObject, Map> {
@Override
public Map apply(ESObject input) {
return input.toMap();
}
}
}
And an "Item" and nested "Broadcasts" inheriting from it:
public class ESItem extends ESObject{
public final static String BROADCASTS = "broadcasts";
// … more stuff
public ESItem broadcasts(Collection broadcasts) {
properties.put(BROADCASTS, Iterables.transform(broadcasts, TO_MAP));
return this;
}
}
public class ESBroadcast extends ESObject {
public final static String TRANSMISSION_TIME = "transmissionTime";
// … more stuff
public ESBroadcast transmissionTime(Date transmissionTime) {
properties.put(TRANSMISSION_TIME, transmissionTime);
return this;
}
}
As you may note, our model is pretty much based on maps; this way, it can be efficiently indexed via ElasticSearch Java APIs:
Requests.indexRequest(INDEX_NAME). type(ESItem.TYPE). id(esItem.getUri()). source(esItem.toMap()))
And bonus point, queries can unambiguously refer to fields; here is, more specifically, an example of nested schedule-based query:
QueryBuilders.nestedQuery(ESItem.BROADCASTS, QueryBuilders.rangeQuery(ESBroadcast.TRANSMISSION_TIME).from(SOME_DATE)))
The final piece to make it working is to configure the nested field above as, guess what, "nested" on ElasticSearch mappings; we do that dynamically through Java APIs:
Requests.putMappingRequest(INDEX_NAME).type(ESItem.TYPE).source(
XContentFactory.jsonBuilder().
startObject().
startObject(ESItem.TYPE).
startObject("properties").
startObject(ESItem.BROADCASTS).
field("type").value("nested").
endObject().
endObject().
endObject().
endObject()));
feeding the baby
So we have Cassandra, and we have ElasticSearch: how do we keep the two aligned? How do we feed data into ElasticSearch?
We have three different ways, depending on the operational mode:
- Bootstrap: happening when starting up a new ElasticSearch cluster, and involves reading and ingestion of a snapshot of all data into ElasticSearch.
- Online: happening during normal operations, and involves our brand new messaging infrastructure based on ActiveMQ virtual topics, delivering "update messages" to the ElasticSearch indexing component.
- Catch-up: happening when ElasticSearch needs to be fed with data from a given point in time (maybe due to a previous crash), and involves a special component we’re building right now to replay update messages at different time windows.
If you’re interested in any of those … just wait for the next blog posts! But in the meantime, hope you enjoyed this one and don’t forget to get back with feedback!
