Indexing BigData with ElasticSearch

Our goal: @BugSense was always to evolve fast in order to keep providing state-of-the-art features, while analysing our stored data as efficiently as possible. Despite our pretty short run, we have already come to analyze 12M data points per day, while our dataset size still grows  exponentially to TBs. That lead to a real challenge; to act ahead and figure out a way to store and analyze huge amounts of data as efficiently as possible and to make that data easily searchable for our users, by providing them with advanced queries.

We wanted something better than mining such amounts of data in batches, in order to extract valuable information. We wanted a solution that could provide not only fast results, even if the data sets are huge, but also access to both single and aggregated pieces of information. All this of course while keeping operational costs and complexity to a minimum. We searched through various solutions but none of them fitted our ideal. BigTable for instance, was perfect to use for storing reliably large amounts of information but extracting metrics over the entire data set, depending on the size of the data set and the complexity of processing, it could take several days to finish.

The solution finally appeared in the name of ElasticSearch, an open-source Java based full text indexing system, based on the also open-source Apache Lucene engine, that allows you to query and explore your data set as you collect it. It was the ideal solution for us, as doing BigData analysis requires a distributed architecture.



How we used it: Elasticsearch supports multiple indices (databases) and multiple mappings (tables) per index. Mapping is the process of defining how a document should be mapped to the Search Engine, including characteristics such as which fields are searchable and if/how they are tokenized. For starters, we configured explicit mappings. Here’s an example:

Most of our data is stored in a “not analyzed” form. This means that the field in question will be searchable but it won’t be processed in any other way, so there won’t be an overhead of loading tokenized field data into the memory. However, we also have ngrammed fields, for which we’ve constructed our custom analyzer like this:

the message fields are pre-tokenized into chunks (ngrams), to give our users the ability to search all error messages quickly and efficiently, by writing some part (substring) of the error into a search box.  

How we scaled it: Ever since we released our application in the tubes, we relied upon Google’s AppEngine infrastructure when it came to availability, replication and load balancing. By integrating ElasticSearch though, moving that part of infrastructure out of GAE was inevitable. Fortunately, the decision was easy. Amazon’s cloud offering (AWS) was the perfect match to ElasticSearch, as ElasticSearch supports automatic node discovery via the AWS EC2 API and automatic gateway backups via S3. Combining this with the fact that we were already using AWS EC2 for our proprietary in-house in-memory BigData analytics database, LDB, the choice appeared almost apparent. (Nevertheless, we are also benchmarking running a part of the cluser on Azure.)

After selecting our infrastructure provider, our Operations team initially deployed a staging cluster of ElasticSearch nodes in order to estimate actual resource requirements of CPU and memory. At the same time, our Development team implemented a tee in the app code where data was being sent both to GAE’s Datastore and the ElasticSearch cluster transparently.

Considering we are already handling error reports in volumes of 6000/minute, the staging cluster quickly crumbled under the load, so we did some necessary tweaking, mainly in the elasticsearch.yml configuration file, such as:

  • index.translog.flush_threshold_period: An extremely low flushing period for the transaction log would increase I/O, which is a scarce commodity in EC2. Setting it to 5s provided with a proper balance of operational safety and load coalescence.
  • index.merge.policy.use_compound_file and index.compound_format: Since we didn’t use a large number of indices, by design, open files would not be an issue and that allowed us to disable the Lucene compound format for better performance.
  • index.refresh_interval: As with the transaction log flushing threshold, setting the index refresh interval to 5s provided a proper balance of operational safety and load coalescence.
  • indices.memory.index_buffer_size: By increasing the index buffer size from the default 10% to 20% of the total JVM heap memory, we allowed more memory for indexing operations which is crucial for the high write factor of our loads.
  • index.cache.field.type: Converting the index field cache to soft allows the JVM to perform garbage collection on the field cache (instead of only obeying expiration settings), lowering the actual memory usage.
  • index.gateway.snapshot_interval: Increasing the snapshot interval of the index data to the gateway teams up with the other 2 settings concerning syncing operations (transaction log flush interval and index refresh interval) in our effort to minimize I/O operations. Since we’re running in a replicated cluster, we can increase this interval with no fear of data desynchronization whatsoever.

We should mention that, while in our staging environment, we used the “local” gateway for data snapshotting, whereas in production we eventually used the shared “fs” gateway over a local NFS share. This provided us with many nice features such as centralized management and  immediate block device snapshotting. We also took the following steps further down the optimization road:

  1. As per best practices, we set the minimum and maximum JVM heap size to the same number (-Xms and -Xmx), and added -XX:+UseTLAB, -XX:+CMSClassUnloadingEnabled and -XX:+CMSPermGenSweepingEnabled to ElasticSearch’s JVM options, compliments of Java 7. We cannot stress enough how crucial it is to run ElasticSearch through Oracle’s official Java 7 distribution (no OpenJDK or Java 6). The performance is orders of magnitude better!
  1. We switched from the default CFQ scheduler of our custom-compiled kernel (read this post to see why) to DEADLINE and noticed improved throughput (since ElasticSearch was the only major process running on all the servers of the cluster hence the competition for I/O was lower).
  1. We increased the open file limits both in sysctl via fs.file-max and /etc/security/limits.conf to allow for more headroom in ElasticSearch operations.
  1. Finally, we turned off dynamic mapper and index creation and defined these operations in-code and disabled source (_source) storage along with the document, allowing lower memory and storage requirements per query.

After performing the above optimizations we managed to increase performance of our cluster on the same instances to more than 10000 errors/minute. Complementary to these optimizations we took advantage of the featureset of the ElasticSearch EC2 plugin which includes:

  • automatic cluster node discovery via EC2 tags (i.e. discovery.ec2.tag.environment:  ’prod’) that allowed us to segregate our staging and production clusters, even when belonging to the same security group and
  • region segmentation (i.e. cloud.aws.region: us-east-1) that allowed us to configure ElasticSearch to join a cluster belonging to the same EC2 region (but different AZ).

Of course, all these features are good and dandy when testing but when you move into production, it’s a whole new game. Thanks to the amazing tools that the ElasticSearch community offers for cluster monitoring though (such as Paramedic and elasticsearch-head), our lives were made much easier. Combining these tools with an already established toolchain utilizing:

  • configuration automation via Opscode Chef
  • monitoring and alerting via Sonian Sensu and Twilio
  • metric gathering via Ganglia and eStatsD/Graphite and
  • remote job execution via Rundeck

we were confident that we would continue to offer the same highly-available level of service we always did. In the end, moving from a staging cluster to production was meticulously planned and with no unforeseen consequences whatsoever. We ended up using a multi-AZ 3-node cluster with 12 shards and 2 replicas behind an Elastic Load Balancer, allowing us to scale out if and when we needed without further complicating maintenance.

Bottom Line: Using ElasticSearch for our BigData allowed us to further exploit our dataset’s information in ways that, until recently, were either not possible or extremely expensive. And our users didn’t have to suffer any service interruptions at all. 

If you have any questions on our solution just send us a tweet and we promise to elaborate further!