Scaling MongoDB at Mailbox

Posted by Cuong Do on September 12, 2013

Mailbox has grown unbelievably quickly. During that growth, one performance issue that impacted us was MongoDB’s database-level write lock. The amount of time Mailbox’s backends were waiting for the write lock was resulting in user-perceived latency.

While MongoDB allows you to add shards to a MongoDB cluster easily, we wanted to spare ourselves potential long-term pain by moving one of the most frequently updated MongoDB collections, which stores email-related data, to its own cluster. We theorized that this would, at a minimum, cut the amount of write lock contention in half. While we could have chosen to scale by adding more shards, we wanted to be able to independently optimize and administer the different types of data separately.

I started by poring through the MongoDB documentation. I quickly found the cloneCollection command. However, to quote the MongoDB 2.2 documentation: “cloneCollection cannot clone a collection through a mongos: you must connect directly to the mongod instance.” In other words, you can’t use this command with a sharded collection. You can’t use renameCollection on sharded collections either, closing off other possibilities. There were other possible solutions, but they all would’ve impacted performance for Mailbox users or would have simply failed to work at Mailbox’s scale.

So, I wrote a quick Python script to copy the data, and another to compare the original versus the copy to ensure data integrity. Along the way, I encountered many surprises. For example, a single Python process using gevent and pymongo can copy a large MongoDB collection in half the time that mongodump (written in C++) takes, even when the MongoDB client and server are on the same machine.

Our experiences have culminated in Hydra, our newly open-sourced set of tools we’ve developed for MongoDB collection migration.

Creating the initial snapshot of a MongoDB collection

To copy all documents in a collection, I started with an intentionally naive implementation that didn’t have much more code than this:

for email_data in source_email_data.find():
    destination_email_data.insert(email_data)

Issue #1: Slowness

It was obvious that such a naive approach wouldn’t perform well for larger amounts of data, so I quickly experimented with different means of achieving faster copy performance. I implemented various micro-optimizations, like adjusting how many documents the MongoDB driver fetched at once. However, those only yielded only marginal performance improvements. My goal was to finish the data migration in about a day, I was still far from that goal.

An early experiment I did was to measure the “speed of light” for MongoDB API operations – the speed of a simple C++ implementation using the MongoDB C++ SDK. Being rusty at C++ and wanting my mostly Python-proficient colleagues to easily be able to use/adapt the code for other uses, I didn’t pursue the C++ implementation too far but found that for simple cases, a naive C++ implementation was typically 5–10 times as fast as a naive Python implementation for the same task.

So, I returned to Python, which is the default language of choice for Dropbox. Moreover, when performing a series of remote network requests, such as queries to mongod, the client often spends much of its time waiting for the server to respond; there didn’t seem to be very many CPU-intensive parts for copy_collection.py (my MongoDB collection copying tool). This was corroborated by the very low CPU usage of the initial copy_collection.py.

I then experimented with adding concurrent MongoDB requests to copy_collection.py. Initial experiments with worker threads resulted in disappointment. Next, I tried using worker processes communicating through a Python Queue object. The performance still wasn’t much better, because the overhead of the IPCs was overwhelming any potential concurrency benefits. Using Pipes and other IPC mechanisms didn’t help much either.

Next, I decided to see how much performance I could squeeze out of a single Python process using asynchronous MongoDB queries. One of the more popular libraries for this is gevent, so I decided to give it a try. gevent patches standard Python modules, such as socket, to execute asynchronously. The beauty of gevent is that you can write asynchronous code that reads simply, like synchronous code.

Traditionally, asynchronous code to copy documents between two collections might have looked like this:

import asynclib

def copy_documents(source_collection, destination_collection, _ids, callback):
    """
    Given a list of _id's (MongoDB's unique identifier field for each document),
    copies the corresponding documents from the source collection to the destination
    collection
    """

    def _copy_documents_callback(...):
        if error_detected():
            callback(error)

    # copy documents, passing a callback function that will handle errors and
    # other notifications
    for _id in _ids:
        copy_document(source_collection, destination_collection, _id,
                      _copy_documents_callback)

    # more error handling omitted for brevity
    callback(None)

def copy_document(source_collection, destination_collection, _id, callback):
    """
    Copies document corresponding to the given _id from the source to the
    destination.
    """
    def _insert_doc(doc):
        """
        callback that takes the document read from the source collection
        and inserts it into destination collection
        """
        if error_detected():
            callback(error)
        destination_collection.insert(doc, callback) # another MongoDB operation

    # find the specified document asynchronously, passing a callback to receive
    # the retrieved data
    source_collection.find_one({'$id': _id}, callback=_insert_doc)

With gevent, the code uses no callbacks and reads sequentially:

import gevent
gevent.monkey.patch_all()

def copy_documents(source_collection, destination_collection, _ids):
    """
    Given a list of _id's (MongoDB's unique identifier field for each document),
    copies the corresponding documents from the source collection to the destination
    collection
    """

    # copies each document using a separate greenlet; optimizations are certainly
    # possible but omitted in this example
    for _id in _ids:
        gevent.spawn(copy_document, source_collection, destination_collection, _id)

def copy_document(source_collection, destination_collection, _id):
    """
    Copies document corresponding to the given _id from the source to the
    destination.
    """
    # both of the following function calls block without gevent; with gevent they
    # simply cede control to another greenlet while waiting for Mongo to respond
    source_doc = source_collection.find_one({'$id': _id})
    destination_collection.insert(source_doc) # another MongoDB operation

This simple code will copy documents from a source MongoDB collection to a destination, based on their _id fields, which are the unique identifiers for each MongoDB document. copy_documents delegates the work of copying documents to greenlets (which are like threads but are cooperatively scheduled) that run copy_document(). When a greenlet performs a blocking operation, such as any request to MongoDB, it yields control to any other greenlet that is ready to execute. Since greenlets all execute in the same thread and process, you generally don’t need any kind of inter-greenlet locking.

With gevent, I was able to achieve much faster performance than either the thread worker pool or process worker pool approaches. Here’s a summary of the performance of each approach:

Approach Performance (higher is better)
single process, no gevent 520 documents/sec
thread worker pool 652 documents/sec
process worker pool 670 documents/sec
single process, with gevent 2,381 documents/sec

Combining gevent with worker processes – one for each shard – yielded a linear increase in performance. The key to using worker processes efficiently was to eliminate as much IPC as possible.

Somewhat surprisingly, using gevent in just a single process could produce a full copy of a collection in just under half the time as the mongodump tool, which is written in C++ but queries synchronously and is single-process/thread.

Issue #2: Replicating updates after the snapshot

Because MongoDB is not transactional, when you try to read a large MongoDB collection while updates are being performed to it, you will receive a result set that reflects MongoDB’s state at different points in time. For example, suppose you start reading a whole collection using a MongoDB find() query. Your result set could look like this:

included: document saved before your find()
included: document saved before your find()
included: document saved before your find()
excluded: document deleted just after your find() began
included: document inserted after your find() began

Moreover, to minimize the downtime required to point the Mailbox backend to the new copy of the collection, it was necessary to figure out a way to stream changes from the source MongoDB cluster to the new MongoDB cluster with as little latency as possible.

Like most asynchronously replicating data stores, MongoDB uses a log of operations – its oplog – to record and distribute a record of the insert/update/remove operations executed on a mongod instance to other mongod replicas. Given a snapshot of the data, the oplog can be used to apply all changes performed since the snapshot was taken.

So, I decided to stream oplog entries from the source cluster and apply those changes at the destination cluster. Thanks to an informative post on Kristina Chodorow’s blog, I was quickly able to grasp the basics of the oplog format. Replicating inserts and removes was trivial, because their serialization format is straightforward. On the other hand, updates took more work.

The structure of update oplog entries was not immediately obvious, and in MongoDB 2.2.x, it uses duplicate keys that can’t be displayed by the Mongo shell, let alone most MongoDB drivers. After some thought, I devised a workaround that simply used the _id embedded in the update to trigger another copy of the document from the source. While this doesn’t have identical semantics as applying just the specified update, this guarantees that the copied data is at least as recent as the op we’ve received. Here is a diagram showing how intermediate versions of documents (in this case, v2) are not necessarily copied, but the source and destination are still eventually consistent:

update_ops
applying update ops

I also ran into a performance issue replaying ops on the destination cluster. Though I had a separate process to replay ops for each shard, applying ops serially (my initial approach for prototyping and ensuring correctness) was far too slow to keep up with the onslaught of Mailbox queries.

Applying ops concurrently seemed to be the way to go, but the question was how to preserve correctness. Specifically, two operations affecting the same _id cannot execute out of order. A simple workaround I devised was to maintain, in a Python set, the set of _ids being modified by in-progress operations. When copy_collection.py encounters another update to an _id that is currently being updated, we block the later update and any other ops that come after it from being applied. We start applying new ops only when the older operation on the _id has finished. Here’s a diagram to illustrate op blocking:

blocking_ops
concurrent op replay

Verifying copied data

Comparing the copied data to the original is normally a straightforward operation. Doing it efficiently also isn’t particularly challenging when you use multiple processes and gevent.

However, doing it when the source and the copy are both being updated requires some thought. At first, I tried just logging warnings whenever compare_collections.py (the tool I wrote to compare two collections) found a data inconsistency in a document that had been recently updated. Later, I could repeat verification for those documents. However, that doesn’t work for deleted documents, for which there remains no last modified timestamp.

I started thinking about the term “eventual consistency,” which is often used when talking about asychronously replicating systems such as MongoDB’s replica sets and MySQL’s master/slave replication. Given enough time (i.e. after some amount of retries) and barring catastrophe, the source and the copy will eventually become consistent. So, I added retry comparisons with an increasing backoff between successive retries. There are potential issues with certain cases, such as data that oscillates between two values. However, the data being migrated didn’t have any problematic update patterns.

Before performing the final cutover from the original MongoDB cluster to the new MongoDB cluster, I wanted the ability to verify that the most recent ops had been applied. So, I added a command-line option to compare_collections.py to compare the documents modified by the most recent N ops. Running this for a sufficiently large set of ops during downtime would provide additional confidence that there weren’t undetected data inconsistencies. Running it for even hundreds of thousands of ops per shard only takes a few minutes. This also mitigates concerns regarding undetected data inconsistencies resulting from the compare/retry approach.

Handling the unexpected

Despite taking various precautions to handle errors (retries, catching possible exceptions, logging), there were still an uncomfortable number of issues arising during my final test runs leading up to the production migration. There were sporadic network issues, a specific set of documents that was consistently causing mongos to sever its connection from copy_collection.py, and occasional connection resets from mongod.

Soon, I realized that I couln’t identify all the relevant failure scenarios, so I shifted my focus to quickly recovering from failures. I added logging of _ids of documents for which compare_collections.py had detected inconstencies. Then, I created another tool whose sole job was to re-copy the documents with those _ids.

Migration time!

During the production migration, copy_collection.py created an initial snapsphot of hundreds of millions of emails and replayed more than a hundred million MongoDB operations. Performing the initial snapshot, building indices, and catching up on replication took about 9 hours – well within the 24 hour goal I had set. I continued to let copy_collection.py replay ops from the source cluster’s oplogs for another day while I used compare_collections.py to verify all copied data three times (for additional safety).

The actual cutover to the new MongoDB cluster happened recently. The MongoDB-related work was very short (a few minutes). During a brief maintence window, I ran compare_collections.py to compare documents modified by the last 500,000 operations in each shard. After detecting no inconsistencies in the most recently updated data, we ran some smoke tests, pointed the Mailbox backend code to the new cluster, and brought the Mailbox service back up to the public. Our users haven’t reported any issues caused by the cutover. This was a success in my mind, as the best backend migrations are invisible to our users.

In contrast, our backend monitoring showed us the true benefits of the migration:

write_lock
before and after

The decrease in the percentage of time the write lock was held was far better than the linear (50%) improvement we had expected based on our MongoDB profiling. Great success!

Hello, world

We’re open-sourcing Hydra, the suite of tools we developed to perform the aforementioned MongoDB collection migration. We hope this code will be useful for anyone who needs to perform a live re-partitioning of their MongoDB data.

  • David Butler

    I’m curious if you tried bulk inserting multiple documents with each call, instead of inserting one document at a time?

    • Cuong Do

      Yes! The open-sourced code performs batch inserts.

      • David Butler

        The reason I ask is because the article only mentions trying single inserts. We get similar performance by simply bulk inserting without any other code optimizations. If one didn’t know anything about MongoDB, they might get the impression from your article that it doesn’t support bulk inserts.

        • Opsy

          It doesn’t. You can send a “batch” of documents from the client but they will still be inserted basically one at a time.

          • David Butler

            I think you’re splitting hairs. Whether you call them “batch” or “bulk” inserts is not relevant. (Incidentally, MongoDB calls them “bulk inserts” http://docs.mongodb.org/manual/core/bulk-inserts/ ). What’s relevant is whether it’s faster than inserting one at a time (it is).

            What is relevant is that the article only mentions sending one document at a time to MongoDB, and the proposed solution is parallelizing the inserts application-side. This gives the impression that MongoDB has no “batch” or “bulk” insert functionality built in, when in fact it does.

            Based on the numbers in this article and personal experience, parallelizing inserts from the app offers no advantage whatsoever to simply using MongoDB’s built-in “bulk insert” functionality. Either way, you are rate-limited by MongoDB’s global database write lock.

          • Opsy

            You are confusing the wire protocol (which allows you to send a list of documents) for any sort of multiple insert functionality on the server (which would allow you to have a single insert operation on the server which inserts multiple documents). On the server, the documents are inserted one-at-a-time no matter how they got shipped over (even though the server has the capacity to accept a “batch” of documents). If you send enough single documents in parallel you will get the same throughput as if you send a “batch” via a single network trip. Note the wording in documentation – only each document insert is atomic, all you are amortizing is the cost of GLE call (plus avoiding the overhead of extra network trips).

            And as far as the claim about being rate-limited by MongoDB’s db write lock, that is highly unlikely. Majority of cases where applications are assumed to be limited by the “write lock” there is simply not enough work being sent to the DB to max out its capacity. Sending documents one at a time instead of in batches, as well as not using enough parallel worker threads to generate and send the work to the server is most frequently the culprit for low throughput, after that the disk I/O throughput will be the next limiting factor.

          • David Butler

            Your point about the true bottleneck for inserts in MongoDB is well
            taken. Honestly, I have not attempted to max out the rate of inserts
            into MongoDB and hence cannot say for certain what the true bottleneck would be.

            However, to reiterate what I said earlier, simply using batch inserts with no parallelization at all gave me similar inserts/sec as what was listed in this article.

            I’m not sure why you think I’m confused about how “batch” inserts work. I am simply using the same terminology that the MongoDB documentation itself uses. Nothing I said contradicts anything you have said. I know MongoDB *actually* inserts one document at a time. That is not the issue.

            The only point I want to address is one which I keep bringing up, and which you brought up yourself:

            > If you send enough single documents in parallel you will get the same throughput as if you send a “batch” via a single network trip.

            This is the whole reason I keep replying to this thread at all. Why bother massively parallelizing single inserts, when simply using the built-in batch insert functionality will give you the exact same performance? At best this solution seems over-engineered. At worst, it seems to indicate that the author didn’t know about batch inserts.

            (I was told in an earlier reply that this tool supports batch inserts. If so, why bother mentioning massively parallelized single inserts at all?)

  • http://blog.serverdensity.com/ David Mytton

    Great post – I’ve done some similar migrations and moving use cases to different clusters really helps with the locking, as you found. Can you elaborate on the size of the sharded clusters and what kind of networking/hardware/etc they are? Is this on AWS?

  • http://twitter.com/kareemk kareemk

    We moved to TokuMX – http://www.tokutek.com/products/tokumx-for-mongodb/ – recently and saw a 2x improvement in response times. They’ve swapped out mongo’s backend with their fractal tree storage engine which is MVCC. Definitely worth checking out.

    • NPR

      TokuMX is still far from production ready even though they are trying really hard.

      • http://twitter.com/kareemk kareemk

        In that case that would apply to MongoDB as well. Toku’s storgae engine has been running in production systems longer then Mongo and there was very little change that had to be made to adapt it for Mongo. Have you had a bad experience with them – if so please elaborate.

  • InsFocus

    You use the MongoDB2.2.X?

    Another thing need to concern is the stability of MongoDB

    The MongoDB Shard Cluster 2.4.X is so unstable while we try to insert documents under the high rate, it’s frustrating.

    • Cuong Do

      Yes, we are currently on MongoDB 2.2.x

  • ning

    we have just implement the same thing, using mongo export, mongoimport, and oplogreplayer ,(we can use the exported data file as a backup)

    https://github.com/idning/mongomigrate

    we do pre-split before import data to dest collection, and we use safe write in mongoimport and oplog replay. Our performance is 1.8 billion documents in 7days. it’s too long for a online service!

    we will try using gevent, thanks for your post.

    • Opsy

      Why in the world would you pick to use mongoexport and mongoimport for this? You are wasting time waiting for bson to json serialization (and back) and you are losing information about data types. Why not just use mongodump and mongorestore (with oplog replay or without)?

      Mongoexport is terrible for backup purposes too. You’re basically converting your whole data to json/text for no good reason.

  • Aaron Wang

    Great post!

    Which monitoring tool are you using?

  • Test

    alert(11)

  • Opsy

    “High” write lock %age just means you are doing writes. Did you have any actual indications that this was your bottleneck/preventing higher throughput? What were the queues like?