Sunday, June 26, 2016

Java 8: CompletableFuture vs Parallel Stream

This post shows how Java 8's CompletableFuture compares with parallel streams when peforming asynchronous computations.

We will use the following class to model a long-running task:

class MyTask {
  private final int duration;
  public MyTask(int duration) {
    this.duration = duration;
  }
  public int calculate() {
    System.out.println(Thread.currentThread().getName());
    try {
      Thread.sleep(duration * 1000);
    } catch (final InterruptedException e) {
      throw new RuntimeException(e);
    }
    return duration;
  }
}

Let's create ten tasks, each with a duration of 1 second:

List<MyTask> tasks = IntStream.range(0, 10)
                                    .mapToObj(i -> new MyTask(1))
                                    .collect(toList());

How can we calculate the list of tasks efficiently?

Approach 1: Sequentially

Your first thought might be to calculate the tasks sequentially, as follows:

public static void runSequentially(List<MyTask> tasks) {
  long start = System.nanoTime();
  List<Integer> result = tasks.stream()
                              .map(MyTask::calculate)
                              .collect(toList());
  long duration = (System.nanoTime() - start) / 1_000_000;
  System.out.printf("Processed %d tasks in %d millis\n", tasks.size(), duration);
  System.out.println(result);
}

As you might expect, this takes 10 seconds to run, because each task is run one after the other on the main thread.

Approach 2: Using a parallel stream

A quick improvement is to convert your code to use a parallel stream, as shown below:

public static void useParallelStream(List<MyTask> tasks) {
  long start = System.nanoTime();
  List<Integer> result = tasks.parallelStream()
                              .map(MyTask::calculate)
                              .collect(toList());
  long duration = (System.nanoTime() - start) / 1_000_000;
  System.out.printf("Processed %d tasks in %d millis\n", tasks.size(), duration);
  System.out.println(result);
}

The output is

main
ForkJoinPool.commonPool-worker-1
ForkJoinPool.commonPool-worker-3
ForkJoinPool.commonPool-worker-2
ForkJoinPool.commonPool-worker-3
ForkJoinPool.commonPool-worker-2
main
ForkJoinPool.commonPool-worker-1
ForkJoinPool.commonPool-worker-1
main
Processed 10 tasks in 3043 millis

This time it took 3 seconds because 4 tasks were run in parallel (using three threads from the ForkJoinPool, plus the main thread).

Approach 3: Using CompletableFutures

Let's see if CompletableFutures perform any better:

public static void useCompletableFuture(List<MyTask> tasks) {
  long start = System.nanoTime();
  List<CompletableFuture<Integer>> futures =
      tasks.stream()
           .map(t -> CompletableFuture.supplyAsync(() -> t.calculate()))
           .collect(Collectors.toList());

  List<Integer> result =
      futures.stream()
             .map(CompletableFuture::join)
             .collect(Collectors.toList());
  long duration = (System.nanoTime() - start) / 1_000_000;
  System.out.printf("Processed %d tasks in %d millis\n", tasks.size(), duration);
  System.out.println(result);
}

In the code above, we first obtain a list of CompletableFutures and then invoke the join method on each future to wait for them to complete one by one. Note that join is the same as get, with the only difference being that the former doesn't throw any checked exception, so it's more convenient in a lambda expression.

Also, you must use two separate stream pipelines, as opposed to putting the two map operations after each other, because intermediate stream operations are lazy and you would have ended up processing your tasks sequentially! That's why you first need to collect your CompletableFutures in a list to allow them to start before waiting for their completion.

The output is

ForkJoinPool.commonPool-worker-1
ForkJoinPool.commonPool-worker-2
ForkJoinPool.commonPool-worker-3
ForkJoinPool.commonPool-worker-1
ForkJoinPool.commonPool-worker-2
ForkJoinPool.commonPool-worker-3
ForkJoinPool.commonPool-worker-1
ForkJoinPool.commonPool-worker-2
ForkJoinPool.commonPool-worker-3
ForkJoinPool.commonPool-worker-1
Processed 10 tasks in 4010 millis

It took 4 seconds to process 10 tasks. You will notice that only 3 ForkJoinPool threads were used and that, unlike the parallel stream, the main thread was not used.

Approach 4: Using CompletableFutures with a custom Executor

One of the advantages of CompletableFutures over parallel streams is that they allow you to specify a different Executor to submit their tasks to. This means that you can choose a more suitable number of threads based on your application. Since my example is not very CPU-intensive, I can choose to increase the number of threads to be greater than Runtime.getRuntime().getAvailableProcessors(), as shown below:

public static void useCompletableFutureWithExecutor(List<MyTask> tasks) {
  long start = System.nanoTime();
  ExecutorService executor = Executors.newFixedThreadPool(Math.min(tasks.size(), 10));
  List<CompletableFuture<Integer>> futures =
      tasks.stream()
           .map(t -> CompletableFuture.supplyAsync(() -> t.calculate(), executor))
           .collect(Collectors.toList());

  List<Integer> result =
      futures.stream()
             .map(CompletableFuture::join)
             .collect(Collectors.toList());
  long duration = (System.nanoTime() - start) / 1_000_000;
  System.out.printf("Processed %d tasks in %d millis\n", tasks.size(), duration);
  System.out.println(result);
  executor.shutdown();
}

The output is

pool-1-thread-2
pool-1-thread-4
pool-1-thread-3
pool-1-thread-1
pool-1-thread-5
pool-1-thread-6
pool-1-thread-7
pool-1-thread-8
pool-1-thread-9
pool-1-thread-10
Processed 10 tasks in 1009 millis

After this improvement, it now takes only 1 second to process 10 tasks.

As you can see, CompletableFutures provide more control over the size of the thread pool and should be used if your tasks involve I/O. However, if you're doing CPU-intensive operations, there's no point in having more threads than processors, so go for a parallel stream, as it is easier to use.

No comments:

Post a Comment

Note: Only a member of this blog may post a comment.