ExecutorCompletionService

(written by lawrence krubner, however indented passages are often quotes). You can contact lawrence at: lawrence@krubner.com, or follow me on Twitter.

How very much easier it is to use core.async in Clojure! My one concern is that core.async has a thread pool that is set to the number of CPUs + 2, so the thread pool is small and rigid.

Each call to contentFuture.get() waits until downloading given web site (remember that each Future represent one site) is finished. This works, but has a major bottleneck. Imagine you have as many threads in a pool as tasks (20 sites in that case). I think it’s understandable that you want to start processing contents of web sites as soon as they arrive, no matter which one is first. Response times vary greatly so don’t be surprised to find some web sites responding within a second while others need even 20 seconds. But here’s the problem: after submitting all the tasks we block on an arbitrary Future. There is no guarantee that this Future will complete first. It is very likely that other Future objects already completed and are ready for processing but we keep hanging on that arbitrary, first Future. In worst case scenario, if the first submitted page is slower by an order of magnitude compared to all the others, all the results except the first one are ready for processing and idle, while we keep waiting for the first one.

The obvious solution would be to sort web sites from fastest to slowest and submit them in that order. Then we would be guaranteed that Futures complete in the order in which we submitted them. But this is impractical and almost impossible in real life due to dynamic nature of web.

This is where ExecutorCompletionService steps in. It is a thin wrapper around ExecutorService that “remembers” all submitted tasks and allows you to wait for the first completed, as opposed to first submitted task. In a way ExecutorCompletionService keeps a handle to all intermediate Future objects and once any of them finishes, it’s returned. Crucial API method is CompletionService.take() that blocks and waits for any underlying Future to complete. Here is the submit step with ExecutorCompletionService:

final ExecutorService pool = Executors.newFixedThreadPool(5);
final ExecutorCompletionService completionService = new ExecutorCompletionService<>(pool);
for (final String site : topSites) {
completionService.submit(new Callable() {
@Override
public String call() throws Exception {
return IOUtils.toString(new URL(“http://” + site), StandardCharsets.UTF_8);
}
});
}
Notice how we seamlessly switched to completionService. Now the retrieval step:

for(int i = 0; i < topSites.size(); ++i) { final Future future = completionService.take();
try {
final String content = future.get();
//…process contents
} catch (ExecutionException e) {
log.warn(“Error while downloading”, e.getCause());
}
}
You might be wondering why we need an extra counter? Unfortunately ExecutorCompletionService doesn’t tell you how many Future objects are still there waiting so you must remember how many times to call take().

This solution feels much more robust. We process responses immediately when they are ready. take() blocks until fastest task still running finishes. And if processing takes a little bit longer and multiple responses finished, subsequent call to take() will return immediately. It’s fun to observe the program when number of pool threads is as big as the number of tasks so that we begin downloading each site at the same time. You can easily see which websites have shortest response time and which respond very slowly.

Post external references

  1. 1
    http://www.nurkiewicz.com/2013/02/executorcompletionservice-in-practice.html
Source