Scaling Elasticsearch Part 3: Queries

See part 1 and part 2 for an overview of our system and how we scale our indexing. Originally I was planning a separate post for global queries and related posts queries, but it was hard to break into two posts and contributed to me taking forever to write them.

Two types of queries run on the Elasticsearch index: global queries across all posts and local queries that search posts within a single blog. Over 90% of our queries (23 million/day) are local, and the remainder are global (2 million/day).

In this post we’ll show some performance data for global and local queries, and discuss the tradeoffs we made to improve their performance.

An Aside about Query Testing

We used JMeter for most of our query testing. Since we are comparing global and local queries, it was important to get a variety of different queries to run and to run them on mostly full indices (though sometimes ongoing development made that impractical).

Generally we ran real user queries randomly sampled from our search logs. For local queries we were running related posts type queries with the posts pseudo-randomly selected from the index. There had to be a fair bit of hand curation of the queries due to errors in our sampling techniques and the occasional bad query type that would get through. Generating a good mix of queries is a combination of art and deciding that things are good enough.

A Few Mapping Details

I’ve already written a post on how we map WordPress posts to ES documents (and wpes-lib is on github), and there’s a description of how we handle multi-lingual data. Mostly those details are irrelevant for scaling our queries, but there are two important details:

  1. All post documents are child documents of blog documents. The blog documents record meta information about the blog.
  2. Child documents are always stored on the same shard as their parent document. They all use the same routing value. This provides us with a huge optimization. Local searches can use routing on the query so it gets executed on a single node and accesses only a single shard.

The Global Query

The original plan with our global queries was to use the parent blog documents to keep track of some meta information about the blogs to determine whether a blog’s posts were globally searchable (whether they were publicly accessible, had mature content, spam, etc). By using the blog document we’d be able to update a single document and not having to reindex all the posts on the blog. Then we’d use a has_parent filter on all global queries.

Unfortunately we found that parent/child filters did not scale well for our purposes. All parent documents ids had to be loaded into memory (about 5.3 GB of data per node for our 60 million blog documents). We had plenty of RAM available for ES (30 GB) and the id cache seemed to hold steady, but we still saw much slower queries and higher server load when using the has_parent filter. Here’s the data for the final parent/child performance test we ran (note this was tested on a 16 node cluster):

Using has_parent JMeter threads Reqs/Sec Median Latency(ms) Mean Latency(ms) Max Latency(ms) CPU % Load
yes 2 5.3 538 365 2400 45 12
no 2 10 338 190 1500 30 8
yes 4 7.6 503 514 2500 75 22
no 4 17.5 207 222 2100 50 15

Avoiding using the has_parent filter saved us a lot of servers and gave us a lot more overhead, even though it means we have to bulk reindex posts more often.

In the end our global query looks something like:

POST global-*/post/_search
   "query": {
      "filtered": {
         "query": {
            "multi_match": {
              "fields": ["content","title"],
              "query": "Can I haz query?"
         "filter": {
            "and": [
                  "term": {
                     "lang": "en"
                  "and": [
                       "term": {
                          "spam": false
                       "term": {
                          "private": false
                       "term": {
                          "mature": false

One last side note on ES global queries because it cannot be said strongly enough: FILTERS ARE EPIC. If you don’t understand why I am shouting this at you, then you need to go read about and understand Bitsets and how filters are cached.

Sidenote: after re-reading that post I realized we may be able to improve our performance a bit by switching from using AND filters shown above to bool filters. This is why blogging is good. This change cut our median query time in half:

Global Query Performance With Increasing Numbers of Shards

Global queries require gathering results from all shards and then combining the results to give the final result. A search for 10 results across 10 shards requires running the query on each of the shards, and then combining those 100 results to get the final 10 results. More shards means more processing across the cluster, but also more results that need to be combined. This gets even more interesting when you start paging results. To get results 90-100 of a search across ten shards requires requiring combining 1000 results to satisfy the search.

We did some testing of our query performance across a few million blogs as we varied the number of shards using a fixed number of JMeter threads.

Shards/Index Requests/sec Median(ms) Mean(ms)
5 320 191 328
10 320 216 291
25 289 345 332
50 183 467 531

There’s a pretty clear relationship between query latency and number of shards. This result pushed us to try and minimize the total number of shards in our cluster. We can probably also improve query performance by increasing our replication.

The Local Query

Most of our local queries are used for finding related posts. We run a combination of mlt_query queries and multi_match queries to send the text of the current post and find posts that are similar. For a post with the title “The Best”, and content of “This is the best post in the world” the query would look like:

POST global-0m-10m/post/_search?routing=12345
 "query": {
   "filtered": {
     "query": {
       "multi_match": {
         "query": "The Best This is the best post in the world.",
         "fields": ["mlt_content"]
     "filter": {
       "and": [
           "term": {
             "blog_id": 12345
           "not": {
             "term": {
               "post_id": 3

Looks simple, but there’s a lot of interesting optimizations to discuss.


All of our local queries use the search routing parameter to limit the search to a single shard. Organizing our indices and shards so that an entire blog will always be in a single shard is one of the most important optimizations in our system. Without it we would not be able to scale and handle millions of queries because we would be wasting a lot of cycles searching shards that had no or very few documents that were actually related to our search.

Query Type

In the above example, we use the multi_match query. If the content was longer (more than 100 words) we would use the mlt_query. We originally started out with all queries using mlt_query to make the queries faster and ensure good relevancy. However, using mlt_query does not guarantee that the resulting query will actually have any terms in the final search query. A lot will depend on the frequency of the terms in the post and their frequency in the index. Changing to using multi_match for short content gave us a big improvement in the relevancy of our results (as measured by click through rate).


We started building related posts by using the MLT API for running the query. In that case we would only send the document id to ES and trust ES to get the post, analyze it, and construct the appropriate mlt_query for it. This worked pretty well, but did not scale as well as building our own query on the client. Switching to mlt_query gave us a 10x improvement in number of requests we could handle and reduced the query response time.

Operation Requests/sec Median(ms) Mean(ms)
MLT API 150 270 306
mlt_query 1200 77 167

From what I could tell the big bottleneck was getting the original document. Of course this change moved a lot of processing off of the ES cluster and onto the web hosts, but web hosts are easier to scale than ES nodes (or at least is a problem our systems team is very good at).

mlt_content Field

The query is going to a single field called mlt_content rather than to separate title and content fields. Searching fewer fields gives us a significant performance boost, and helps us search for words that occur in different fields in different posts. The fairly new multi_match cross_fields option could probably help here, but I assume would not be as performant as a single field.

For a while we were also storing the mlt_content field, but recent work has determined that storing the field did not speed up the mlt_query queries.

The history of how we ended up using mlt_content is also instructive. We started using the mlt_content field and storing it while we were still using the MLT API. Originally we were using the post title and content fields which were getting extracted from the document’s _source. Switching to a stored mlt_content field reduced the average time to get a document before building the query from about 500ms to 100ms. In the end this turned out to not be enough of a performance boost for our application, but is worth looking into for anyone using the MLT API.

Improving Relevancy with Rescoring

We’ve run a couple of tests to improve the relevancy of our related posts. Our strategy here has mostly been to use the mlt_query/multi_match queries as our basic query and then use rescoring to adjust the query results. For instance we build a query that slightly reranks our top 50 results based on the commonality between who has liked the current post and whether they liked the posts that were similar. Running this using the rescoring option had almost no impact on query performance and yet gave a nice bump to the click through rate of our related posts.

Shard Size and Local Query performance

While global queries perform best with a small number of  larger shards, local queries are fastest when the shard is smaller. Here’s some data we collected comparing the number of shards to routed query speed when running mlt_query searches:

Shards/Index Requests/sec Median(ms) Mean(ms) Max(ms)
10 1200 77 167 5900
30 1600 67 112 1100

Less data in each shard (more total shards) has a very significant impact on the number of slow queries and how slow they are.

Final Trade Off of Shard Size and Query Performance

Based off the global and local query results above we decided to go with 25 shards per index as a tradeoff between decent performance for both query types. This was one of the trickier decisions, but it worked reasonably well for us for a while. About 6 months after making this decision though we decided that we were ending up with too many slow queries (queries taking longer than 1 second).

I guess that’s my cue to tease the next post in this (possibly) never-ending series: rebuilding our indices to add 6-7x as many documents while reducing slow queries and speeding up all queries in general. We went from the 99th percentile of our queries taking 1.7 seconds down to 800ms and the median time dropping from 180ms to 50ms. All these improvements while going from 800 million docs in our cluster to 5.5 billion docs. Details coming up in my next post.

Scaling Elasticsearch Part 2: Indexing

In part 1 I gave an overview of our cluster configuration. In this part we’ll dig into:

  • How our data is partitioned into indices to scale over time
  • Optimizing bulk indexing
  • Scaling real time indexing
  • How we manage indexing failures and downtime.

The details of our document mappings are mostly irrelevant for our indexing scaling discussion, so we’ll skip them until part 3.

Data Partitioning

Since data is constantly growing we need an indexing structure that can grow over time as well. A well-known limitation of ES is that once an index is created you cannot change the number of shards. The common solution to this problem is to recognize that searching across an index with 10 shards is identical to searching across 10 indices with 1 shard each, and indices can be created at will.

In our case we create one index for every 10 million blogs, with 25 shards per index. We use index templates so that as our system tries to index to a non-existent index the index is created dynamically.

There are a few factors that led to our index and sharding sizes:

  1. Uniform shard sizes: Shards should be of similar sizes so that you get mostly uniform response times. Larger shards take longer to query. We tried one index per 1 million blogs and found too much variation. For instance, when we migrated Microsoft’s LiveSpaces to we got a million or so blogs added to our DBs in a row created and they have remained pretty active. This variation drove us to put many blogs into each index. We rely on the hashing algorithm to spread the blogs across all the shards in the index.
  2. Limit number of shards per index: New shards are not instantly created when a new index is created. Technically, I guess, the cluster state is red for a very short period. At one point we tested 200 shards per index. In those cases we sometimes saw a few document indexing failures in our real time indexing because the primary shards were still being allocated across the cluster. There’s probably some other ways around this, but its something to look out for.
  3. Upper limit of shard size: Early on we tried indexing 10 million blogs per index with only 5 shards per index. Our initial testing went well, but then we found that the indices with the larger shards (the older blogs) were experiencing much longer query latencies. This was us starting to hit the upper limits of our shard sizes. The upper limit on shard size varies by what kind of data you have and is difficult to predict so it’s not surprising that we hit it.
  4. Minimize total number of shards: We’ll discuss this further in our next post on global queries, but as the number of shards increases the efficiency of the search decreases, so reducing the number of shards helps make global queries faster.

Like all fun engineering problems, there is no easy or obvious answer and the solution comes by guessing, testing, and eventually deciding that things are good enough. In our case we figured out that our maximum shard size was around 30 GB. We then created shards that were fairly large but which we don’t think would be able to grow to that maximum for many years.

As I’m writing this, and after a few months in production, we’re actually wondering if our shards are still too large. We didn’t take into account that deleted documents would also negatively affect shard size, and every time we reindex or update a document we effectively delete the old version. Investigation into this is still ongoing, so I’m not going to try to go into the details. The number of deleted documents in your shards is related to how much real time indexing you are doing, and the merge policy settings.

Bulk Indexing Practicalities

Bulk indexing speed is a major limit in how quickly we can iterate during development, and indexing will probably be one of our limiting factors in launching new features in the future. Faster bulk indexing means faster iteration time, more testing of different shard/index configurations, and more testing of query scaling.

For these reason we ended up putting a lot of effort into speeding up our bulk indexing. We went from bulk indexing taking about two months (estimated, we never actually ran this) to taking less than a week. Improving bulk indexing speed is very iterative and application specific. There are some obvious things to pay attention to like using the bulk indexing API, and testing different numbers of docs in each bulk API request. There were also a few things that surprised me:

  • Infrastructure Load: ES Indexing puts a heavy load on certain parts of the infrastructure because it pulls data from so many places. In the end, our indexing bottleneck is not ES itself, but actually other pieces of our infrastructure. I suppose we could throw more infrastructure at the problem, but that’s a trade off between how often you are going to bulk reindex vs how much that infrastructure will cost.
  • Extreme Corner Cases: For instance has millions of followers, likers, and lots of commenters. Building a list of these (and keeping it up to date with real time indexing) can be very costly – like, “oh $%#@ are we really trying to index a list of 2.3 million ids” costly (and then update it every few seconds).
  • Selective Bulk Indexing: Adding fields to the index requires updating the mappings and bulk reindexing all of our data. Having a way to selectively bulk index (find all blogs of a particular type) can speed up bulk indexing a lot.
  • Cluster Restarts: After bulk indexing we need to do a full rolling restart of the cluster.

I wish we had spent more time finding and implementing bulk indexing optimizations earlier in the project.

Scaling Real Time Indexing

During normal operation, our rate of indexing (20m+ document changes a day) has never really been a problem for our Elasticsearch cluster. Our real time indexing problems have mostly stemmed from combining so many pieces of information into each document that gathering the data can be a high load on our database tables.

Creating the correct triggers to detect when a user has changed a field is often non-trivial to implement in a way that won’t over index documents. There are posts that get new comments or likes every second. Rebuilding the entire document in those cases is a complete waste of time. We make heavy use of the Update API mostly to reduce the load of recreating ES documents from scratch.

The other times when real time indexing became a problem was when something went wrong with the cluster. A couple of examples:

  • Occasionally when shards are getting relocated or initialized on a number of nodes the network can get swamped which backs up the indexing jobs or can cause a high proportion of them to start failing.
  • Query load can become too high if a non-performant query is released into production.
  • We make mistakes (shocking!) and break a few servers.
  • Occasionally we find ES bugs in production. Particularly these have been around deleteByQuery, probably because we run a lot of them.
  • A router or server fails.

Real time indexing couples other portions of our infrastructure to our indexing. If indexing gets slowed down for some reason we can create a heavy load on our DBs, or on our jobs system that runs the ES indexing (and a lot of other, more important things).

In my opinion, scaling real time indexing comes down to two pieces:

  1. How do we manage downtime and performance problems in Elasticsearch and decouple it from our other systems.
  2. When indexing fails (which it will), how do we recover and avoid bulk indexing the whole data set.

Managing Downtime

We mentioned in the first post of this series that we mark ES nodes as down if we receive particular errors from them (such as a connection error). Naturally, if a node is down, then the system has less capacity for handling indexing operations. The more nodes that are down the less indexing we can handle.

We implemented some simple heuristics to reduce the indexing load when we start to detect that a server has been down for more than a few minutes. Once triggered, we queue certain indexing jobs for later processing by just storing the blog IDs in a DB table. The longer any nodes are down, the fewer types of jobs we allow. As soon as any problems are found we disable bulk indexing of entire blogs. If problems persist for more than 5 minutes we start to disable reindexing of entire posts, and eventually we also turn off any updating of documents or deletions.

Before implementing this indexing delay mechanism we had some cases where the real time indexing overwhelmed our system. Since implementing it we haven’t seen any, and we actually smoothly weathered a failure of one of the ES network routers while maintaining our full query load.

We of course also have some switches we can throw if we want to completely disable indexing and push all blog ids that need to be reindexed into our indexing queue.

Managing Indexing Failures

Managing failures means you need to define your goals for indexing:

  • Eventually ES will have the same data as the canonical data.
  • Minimize needing to bulk re-index everything.
  • Under normal operation the index is up to date within a minute.

There are a couple of different ways our indexing can fail:

  1. An individual indexing job crashes (eg an out of memory error when we try to index 2.3 million ids 🙂 ).
  2. An individual indexing job gets an error trying to index to ES.
  3. Our heuristics delay indexing by adding it to a queue
  4. We introduce a bug to our indexing that affects a small subset of posts.
  5. We turn off real time indexing.

We have a few mechanisms that deal with these different problems.

  • Problems 1, 3, and 5: The previously mentioned indexing queue. This lets us pick up the pieces after any failure and prevent bulk reindexing everything.
  • Problem 2: When indexing jobs fail they are retried using an exponential back off mechanism (5 min, 10 min, 20 min, 40 min, …).
  • Problem 4: We run scrolling queries against the index to find the set of blogs that would have been affected by a bug, and bulk index only those blogs.

It’s All About the Failures and Iteration

Looking back on what I wish we had done better, I think a recognition that indexing is all about handling the error conditions would have been the best advice I could have gotten. Getting systems for handling failures and for faster bulk indexing in place sooner would have helped a lot.

In the next part of the series I’ll talk about query performance and balancing global and local queries.

Scaling Elasticsearch Part 1: Overview

We recently launched Related Posts across, so its time to pop the hood and take a look at what ended up in our engine.

There’s a lot of good information spread across the web on how to use Elasticsearch, but I haven’t seen too many detailed discussions of what it looks like to scale an ES cluster for a large application. Being an open source company means we get to talk about these details. Keep in mind though that scaling is very dependent on the end application so not all of our methods will be applicable for you.

I’m going to spread our scaling story out over four posts:

Scale is in the top 20 sites on the Internet. We host very big clients all the way down the long tail where this blog resides. The primary project goal was to provide related posts on all of those page views (14 billion a month and growing). We also needed to replace our aging search engine that was running for global searches, and we have plans for many more applications in the future.

I’m only going to focus on the related posts queries (searches within a single blog) and the global queries (searches across all blogs). They illustrate some really nice features of ES.

Currently every day we average:

  • 23m queries for related posts within a single shard
  • 2m global queries across all shards
  • 13m docs indexed
  • 10m docs updated
  • 2.5m docs deleted
  • 250k delete by query operations

Our index has about 600 million documents in it, with 1.5 million added every day. Including replication there is about 9 TB of data.

System Overview

We mostly rely on the default ES settings, but there are a few key places where we override them. I’m going to leave the details of how we partition data to the post on indexing.

  • The system is running 0.90.8.
  • We have a single cluster spread across 3 data centers. There are risks with doing this. You need low latency links, and some longer timeouts to prevent communication problems within the cluster. Even with very good links between DCs local nodes are still 10x faster to reach than remote nodes:
discovery.zen.fd.ping_interval: 15s
discovery.zen.fd.ping_timeout: 60s
discovery.zen.fd.ping_retries: 5

This also helps prevent nodes from being booted if they experience long GC cycles.

  • 10 data nodes and one dedicated master node in each DC (30 data nodes total)
  • Currently 175 shards with 3 copies of each shard.
  • Disable multicast, and only list the master nodes in the unicast list. Don’t waste time pinging servers that can’t be masters.
  • Dedicated hardware (96GB RAM, SSDs, fast CPUs) – CPUs definitely seem to the bottleneck that drives us to add more nodes.
  • ES is configured to use 30GB of the RAM with indices.fielddata.cache.size set to 20GB. This is still not ideal, but better than our previous setting of index.fielddata.cache: soft. 1.0 is going to have an interesting new feature that applies a “circuit breaker” to try and detect and squash out of memory problems. Elasticsearch has come a long way with preventing OutOfMemory exceptions, but I still have painful memories from when we were running into them fairly often in 0.18 and 0.19.
  • Use shard allocation awareness to spread replicas across data centers and within data centers.
cluster.routing.allocation.awareness.attributes: dc, parity

dc is the name of the data center, parity we set to 0 or 1 based on the es host number. Even hosts are on one router and odd on another.

  • We use fixed size thread pools, very deep for indexing because we don’t want to lose index operations, much shorter for search and get since if an operation has been queued that deeply the client will probably time out by the time it gets a response.
    type: fixed
    size: 30
    queue_size: 1000
    type: fixed
    size: 30
    queue_size: 1000
    type: fixed
    size: 100
    queue_size: 200
    type: fixed
    size: 100
    queue_size: 200
  • Elastica PHP Client. I’ll add the disclaimer that I think this client is slightly over object-oriented which makes it pretty big. We use it mostly because it has great error handling, and we mostly just set raw queries rather than building a sequence of objects. This limits how much of the code we have to load because we use some PHP autoloading magic:
//Autoloader for Elastica interface client to ElasticSearch
function elastica_autoload( $class_name ) {
	if ( 'Elastica' === substr( $class_name, 0, strlen( 'Elastica' ) ) ) {
		$path = str_replace( '\\', '/', $class_name );
		include_once dirname( __FILE__ ) . '/Elastica/lib/' . $path . '.php';
spl_autoload_register( 'elastica_autoload' );
  • Custom round robin ES node selection built by extending the Elastica client. We track stats on errors, number of operations, and mark nodes as down for a few minutes if certain errors occur (eg connection errors).
  • Some customizations for node recovery. See my post on speeding up restart time.

Well, that’s not so hard…

If you’ve worked with Elasticsearch a bit then most of those settings probably seem fairly expected.

Elasticsearch scaling is deceptively easy when you first start out. Before building this cluster we ran a two node cluster for over a year and were indexing a few hundred very active sites. We threw more and more data at that small cluster, and more and more queries, and the cluster hardly blinked. It has about 50 million documents on it now, and gets 1-2 million queries a day.

It is still pretty shocking to me how much harder it was to scale a 10-100x larger cluster. There is a substantial and humbling difference between running a small cluster with 10s of GB vs a cluster with 9 TB of data.

In the next part in this series I’ll talk about how we overcame the indexing problems we ran into: how our index structure scales over time, our optimizations for real time indexing, and how we handle indexing failures.