Syncing Postgres to Elasticsearch: lessons learned
At a high level, the problem is that you have your data in one place (for us, that's Postgres), and you want to keep a copy of it in Elasticsearch. This means every write you make (
DELETE statements) needs to be replicated to Elasticsearch. At first this sounds easy: just add some code which pushes a document to Elasticsearch after updating Postgres, and you're done.
But what happens if Elasticsearch is slow to acknowledge the update? What if Elasticsearch processes those updates out of order? How do you know Elasticsearch processed every update correctly?
We thought those issues through, and decided our indexes had to be:
- Updated asynchronously - The user's request should be delayed as little as possible.
- Eventually consistent - While it can lag behind slightly, serving stale results indefinitely isn't an option.
- Easy to rebuild - Updates can be lost before reaching Elasticsearch, and Elasticsearch itself is known to lose data under network partitions.
This is the easy part. Rather than generating and indexing the Elasticsearch document inside the request cycle, we enqueue a job to resync it asynchronously. Those jobs are processed by a pool of workers, either individually or in batches - as you start processing higher volumes, batching makes more and more sense.
Leaving the JSON generation and Elasticsearch API call out of the request cycle helps keep our API response times low and predictable.
The easiest way to get data into Elasticsearch is via the update API, setting any fields which were changed. Unfortunately, this offers no safety when it comes to concurrent updates, so you can end up with old or corrupt data in your index.
To handle this, Elasticsearch offers a versioning system with optimistic locking. Every write to a document causes its version to increment by 1. When posting an update, you read the current version of a document, increment it and supply that as the version number in your update. If someone else has written to the document in the meantime, the update will fail. Unfortunately, it's still possible to have an older update win under this scheme. Consider a situation where users Alice and Bob make requests which update some data at the same time:
|Postgres update commits||-|
|Elasticsearch request delayed||-|
|-||Postgres update commits|
|-||Reads v2 from Elasticsearch|
|-||Writes v3 to Elasticsearch|
|Reads v3 from Elasticsearch||-|
|Writes v4 to Elasticsearch||Changes lost|
This may seem unlikely, but it isn't. If you're making a lot of updates, especially if you're doing them asynchronously, you will end up with bad data in your search cluster. Fortunately, Elasticsearch provides another way of doing versioning. Rather than letting it generate version numbers, you can set
external in your requests, and provide your own version numbers. Elasticsearch will always keep the highest version of a document you send it.
Since we're using Postgres, we already have a great version number available to us: transaction IDs. They're 64-bit integers, and they always increase on new transactions. Getting hold of the current one is as simple as:
The asynchronous job simply selects the current transaction ID, loads the relevent data from Postgres, and sends it to Elasticsearch with that ID set as the version. Since this all happens after the data is committed in Postgres, the document we send to Elasticsearch is at least as up to date as when we enqueued the asynchronous job. It can be newer (if another transaction has committed in the meantime), but that's fine. We don't need every version of every record to make it to Elasticsearch. All we care about is ending up with the newest one once all our asynchronous jobs have run.
Rebuilding from scratch
The last thing to take care of is to handle any inconsistencies from lost updates. We do so by periodically resyncing all recently written Postgres records, and the same code allows us to easily rebuild our indexes from scratch without downtime.
With the asynchronous approach above, and without a transactional, Postgres-backed queue, it's possible to lose updates. If an app server dies after committing the transaction in Postgres, but before enqueueing the sync job, that update won't make it to Elasticsearch. Even with a transactional, Postgres-backed queue there is a chance of losing updates for other reasons (such as the issues under network partition mentioned earlier).
To handle the above, we decided to periodically resync all recently updated records. To do this we use Elasticsearch's Bulk API, and reindex anything which was updated after the last resync (with a small overlap to make sure no records get missed by this catch-up process).
The great thing about this approach is you can use the same code to rebuild the entire index. You'll need to do this routinely, when you change your mappings, and it's always nice to know you can recover from disaster.
On the point of rebuilding indexes from scratch, you'll want to do that without downtime. It's worth taking a look at how to do this with aliases right from the start. You'll avoid a bunch of pain later on.
There's a lot more to building a great search experience than you can fit in one blog post. Different applications have different constraints, and it's worth thinking yours through before you start writing production code. That said, hopefully you'll find some of the techniques in this post useful.