Dynamically mapping requests to work processes

(written by lawrence krubner, however indented passages are often quotes). You can contact lawrence at: lawrence@krubner.com

Damn, this is way over my head:

Blender is a Thrift and HTTP service built on Netty, a highly-scalable NIO client server library written in Java that enables the development of a variety of protocol servers and clients quickly and easily. We chose Netty over some of its other competitors, like Mina and Jetty, because it has a cleaner API, better documentation and, more importantly, because several other projects at Twitter are using this framework. To make Netty work with Thrift, we wrote a simple Thrift codec that decodes the incoming Thrift request from Netty’s channel buffer, when it is read from the socket and encodes the outgoing Thrift response, when it is written to the socket.

Netty defines a key abstraction, called a Channel, to encapsulate a connection to a network socket that provides an interface to do a set of I/O operations like read, write, connect, and bind. All channel I/O operations are asynchronous in nature. This means any I/O call returns immediately with a ChannelFuture instance that notifies whether the requested I/O operations succeed, fail, or are canceled.

When a Netty server accepts a new connection, it creates a new channel pipeline to process it. A channel pipeline is nothing but a sequence of channel handlers that implements the business logic needed to process the request. In the next section, we show how Blender maps these pipelines to query processing workflows.

In Blender, a workflow is a set of back-end services with dependencies between them, which must be processed to serve an incoming request. Blender automatically resolves dependencies between services, for example, if service A depends on service B, A is queried first and its results are passed to B. It is convenient to represent workflows as directed acyclic graphs (see below).

Sample Blender Workflow with 6 Back-end Services

In the sample workflow above, we have 6 services {s1, s2, s3, s4, s5, s6} with dependencies between them. The directed edge from s3 to s1 means that s3 must be called before calling s1 because s1 needs the results from s3. Given such a workflow, the Blender framework performs a topological sort on the DAG to determine the total ordering of services, which is the order in which they must be called. The execution order of the above workflow would be {(s3, s4), (s1, s5, s6), (s2)}. This means s3 and s4 can be called in parallel in the first batch, and once their responses are returned, s1, s5, and s6 can be called in parallel in the next batch, before finally calling s2.

Once Blender determines the execution order of a workflow, it is mapped to a Netty pipeline. This pipeline is a sequence of handlers that the request needs to pass through for processing.

Because workflows are mapped to Netty pipelines in Blender, we needed to route incoming client requests to the appropriate pipeline. For this, we built a proxy layer that multiplexes and routes client requests to pipelines as follows:

When a remote Thrift client opens a persistent connection to Blender, the proxy layer creates a map of local clients, one for each of the local workflow servers. Note that all local workflow servers are running inside Blender’s JVM process and are instantiated when the Blender process starts.
When the request arrives at the socket, the proxy layer reads it, figures out which workflow is requested, and routes it to the appropriate workflow server.
Similarly, when the response arrives from the local workflow server, the proxy reads it and writes the response back to the remote client.
We made use of Netty’s event-driven model to accomplish all the above tasks asynchronously so that no thread waits on I/O.

Once the query arrives at a workflow pipeline, it passes through the sequence of service handlers as defined by the workflow. Each service handler constructs the appropriate back-end request for that query and issues it to the remote server. For example, the real-time service handler constructs a realtime search request and issues it to one or more realtime index servers asynchronously. We are using the twitter commons library (recently open-sourced!) to provide connection-pool management, load-balancing, and dead host detection.

The I/O thread that is processing the query is freed when all the back-end requests have been dispatched. A timer thread checks every few milliseconds to see if any of the back-end responses have returned from remote servers and sets a flag indicating if the request succeeded, timed out, or failed. We maintain one object over the lifetime of the search query to manage this type of data.

Successful responses are aggregated and passed to the next batch of service handlers in the workflow pipeline. When all responses from the first batch have arrived, the second batch of asynchronous requests are made. This process is repeated until we have completed the workflow or the workflow’s timeout has elapsed.

As you can see, throughout the execution of a workflow, no thread busy-waits on I/O. This allows us to efficiently use the CPU on our Blender machines and handle a large number of concurrent requests. We also save on latency as we can execute most requests to back-end services in parallel.

To ensure a high quality of service while introducing Blender into our system, we are using the old Ruby on Rails front-end servers as proxies for routing thrift requests to our Blender cluster. Using the old front-end servers as proxies allows us to provide a consistent user experience while making significant changes to the underlying technology. In the next phase of our deploy, we will eliminate Ruby on Rails entirely from the search stack, connecting users directly to Blender and potentially reducing latencies even further.

Really advanced stuff. Exciting, though I’m not sure I’ll ever work on anything at the scale that such dynamic mappings are needed.