Rapid intake and output of data, with control over the scale of the failures

(written by lawrence krubner, however indented passages are often quotes). You can contact lawrence at: lawrence@krubner.com, or follow me on Twitter.

Two new and interesting libraries:

s3-logging:

At first, we used the Hadoop S3 client, which we had used in some lightweight use cases before. Unfortunately, it ended up coping poorly. Each server had a sustained throughput of about 10 megabytes of compressed data per second, and every so often S3 would hiccup and stop accepting data for a few minutes. When this happened, Hadoop’s client would simply buffer the data in-memory until all the memory was exhausted, at which point the process would become non-responsive. That server’s traffic would then spill over onto the other servers, and the process would be repeated.

…So in addition to minimizing overhead, s3-journal writes entries to disk in batches constrained by the number of entries, the maximum time allowed to elapse between the first to last entry, or both. These entries are written and then read out as a single binary blob, allowing us a high effective throughput measured in entries/sec without saturating the throughput of the disk controller. The downside to this, obviously, is the same as with the previous client: data buffered in memory may be lost forever. However, the issue with the other client was less that any data could be lost, but rather that the amount of data that could be lost was unbounded. By exposing parameters for how often s3-journal flushes to disk, people using the library can find their own golden mean between throughput and durability.

riffling:

There exist today a surprising number of in-process key/value stores, including LevelDB, LMDB, RocksDB, and many others. Each of these provides some variation on the “database as library” use case, where a process needs to persist and look up certain values by key. Our first version took LevelDB, which had decent Java bindings, added some background syncing logic, and wrapped an HTTP server around it.

At first, this worked quite well. Starting from an empty initial state, we were able to quickly populate the database, all the while maintaining an impressively low latency on reads. But then the size of the database exceeded the available memory.

LevelDB uses memory-mapping to keep its index files in memory regions that can be quickly accessed by the process. However, once the size of the memory-mapped regions exceed the available memory, some will be evicted from memory, and only refetched if the region is accessed again. This works well if only some of the regions are “hot” – they can stay in memory and the others can be lazily loaded on demand. Unfortunately, our data had a uniform access pattern, which meant that regions were being continuously evicted and reloaded.

Even worse, once the database grew past a threshold, write throughput plummeted. This is because of write amplification, which is a measure of how many bytes need to be written to disk for each byte written to the database. Since most databases need to keep their entries ordered, a write to the middle of an index will require at least half the index to be rewritten. Most databases will offset this cost by keeping a write-ahead log, or WAL, which persists the recent updates in order of update, and then periodically merges these changes into the main index. This amortizes the write amplification, but the overhead can still be significant. In our case, with a 100GB database comprising 100mm entries, it appeared to be as high as 50x.

..Our implementation cherry-picked the elements we wanted from each of these sources: fixed memory overhead per key, linear time merging, and block compression. While memory-mapping is used for the hashtable, values are read directly from disk, decoupling our I/O throughput from how much memory is available. The keys are consistently ordered to allow for linear merges, but not lexicographically, so unlike SSTables range queries are not possible. The resulting library, Riffle, is far from novel, but is only ~600 lines of code and is something we we can understand inside and out, which allowed us to write a simple set of Hadoop jobs that, given an arbitrary set of updated keys and values would construct a set of sharded Riffle indices, which could then be downloaded by our on-premise database servers and efficiently merged into the current set of values. A server which is new or has fallen behind can simultaneously download many such updates, merging them all together in constant space and linear time.

Post external references

  1. 1
    http://blog.factual.com/how-factual-uses-persistent-storage-for-its-real-time-services
Source