In the post I will give practical comparison of using different Java classes for implementing the same toy problem: finding sum of integers in given range.

1. Toy problem

Suppose we want to calculate the sum of numbers in some closed range [start, end]. To make the task interesting, let’s do it concurrently using N threads.

2. Divide and conquer

From general point of view, the described problem suits very well divide and conquer paradigm. So, applied to sum finding problem, general plan can look as follows:

  1. Divide the range into sub-ranges

  2. Delegate the job of finding sum of sub-ranges to individual threads in the pool

  3. Aggregate sub-ranges sums by collecting results from individual threads

3. Implementation alternatives

The approach described above can be implemented using different tools:

Since I’m doing this as an exercise and just to have fun, I will do an implementation for each alternative. So, let’s start.

3.1. ForkJoinPool and Stream.parallel

For our toy problem - the resulting code is pretty concise and declarative (except part of creating ForkJoinPool).

Note: ForkJoinPool is created manually in order to explicitly control the number of threads used. Probably, most often you would use common pool, that is created and managed by JVM implicitly.

import java.util.Scanner;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ForkJoinPool;
import java.util.stream.IntStream;

public class DivideAndConquerSum {

    private static int sum(int rangeStart, int rangeEnd, int numberOfThreads) {
        ForkJoinPool forkJoinPool = new ForkJoinPool(numberOfThreads);
        try {
            return forkJoinPool.submit(() ->
                    IntStream.rangeClosed(rangeStart, rangeEnd)
                            .parallel()
                            .sum()
            ).get();
        } catch (InterruptedException | ExecutionException e) {
            throw new RuntimeException(e);
        }
    }

    public static void main(String[] args) throws InterruptedException {
        try (Scanner input = new Scanner(System.in)) {
            int rangeStart, rangeEnd, numberOfThreads;
            do {
                System.out.print("Enter the range start: ");
                rangeStart = input.nextInt();

                System.out.print("Enter the range end: ");
                rangeEnd = input.nextInt();

                System.out.print("Enter the number of threads: ");
                numberOfThreads = input.nextInt();

                if (rangeStart >= rangeEnd || numberOfThreads < 1) {
                    System.out.println("Warning: range start should be less then range end. Also number of threads should not be less then 1.");
                }
            } while (rangeStart >= rangeEnd || numberOfThreads < 1);


            int sum = DivideAndConquerSum.sum(rangeStart, rangeEnd, numberOfThreads);

            System.out.println(String.format("Sum of numbers in the range [%s, %s] found in %s threads is %s",
                    rangeStart, rangeEnd, numberOfThreads, sum));
        }
    }
}

3.2. ExecutorService and CompletableFuture

Starting from Java 8, we have pretty powerful and general tool in our toolbox - CompletableFuture. It allows to perform async operations in much easier way then before. And overall, code becomes more declarative and composable (though cumbersome sometimes).

import java.util.List;
import java.util.Scanner;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.stream.Collectors;
import java.util.stream.IntStream;

public class DivideAndConquerSum {

    private static int sum(int rangeStart, int rangeEnd, int numberOfThreads) {
        int numberOfSubRanges = Math.min(numberOfThreads, rangeEnd - rangeStart + 1);
        int numbersPerSubRange = findNumbersPerSubRanges(rangeStart, rangeEnd, numberOfThreads);

        ExecutorService executorPool = Executors.newFixedThreadPool(numberOfSubRanges);

        List<CompletableFuture<Integer>> subRanges = IntStream.range(0, numberOfSubRanges)
                .mapToObj(subRangeIndex -> {
                    int lower = rangeStart + (subRangeIndex * numbersPerSubRange);
                    int upper = (subRangeIndex == numberOfThreads - 1) ? rangeEnd : lower + numbersPerSubRange - 1;
                    return CompletableFuture.supplyAsync(() -> IntStream.rangeClosed(lower, upper).sum(), executorPool);
                })
                .collect(Collectors.toList());

        return CompletableFuture.allOf(subRanges.toArray(new CompletableFuture[0]))
                .thenApply(v -> {
                            Integer total = subRanges.stream()
                                    .map(CompletableFuture::join)
                                    .reduce(0, Integer::sum);
                            executorPool.shutdownNow();
                            return total;
                        }
                ).join();
    }

    private static int findNumbersPerSubRanges(int rangeStart, int rangeEnd, int numberOfThreads) {
        if (numberOfThreads >= rangeEnd - rangeStart + 1) {
            return 1;
        } else {
            return (rangeEnd - rangeStart + 1) / numberOfThreads;
        }
    }

    public static void main(String[] args) throws InterruptedException {
        try (Scanner input = new Scanner(System.in)) {
            int rangeStart, rangeEnd, numberOfThreads;
            do {
                System.out.print("Enter the range start: ");
                rangeStart = input.nextInt();

                System.out.print("Enter the range end: ");
                rangeEnd = input.nextInt();

                System.out.print("Enter the number of threads: ");
                numberOfThreads = input.nextInt();

                if (rangeStart >= rangeEnd || numberOfThreads < 1) {
                    System.out.println("Warning: range start should be less then range end. Also number of threads should not be less then 1.");
                }
            } while (rangeStart >= rangeEnd || numberOfThreads < 1);


            int sum = DivideAndConquerSum.sum(rangeStart, rangeEnd, numberOfThreads);

            System.out.println(String.format("Sum of numbers in the range [%s, %s] found in %s threads is %s",
                    rangeStart, rangeEnd, numberOfThreads, sum));
        }
    }
}

As we can see, we have to manage sub-ranges explicitly. In case of our toy problem - it’s overkill, but in more complex situations, this is not so big price for async and composable execution.

3.3. ExecutorService and Future

Combination of ExecutorService and Future is pretty powerful tool also. Although, it has the drawbacks, that led to introducing CompletableFuture: explicit blocking and problems to compose multiple futures in declarative way.

import java.util.ArrayList;
import java.util.List;
import java.util.Scanner;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.stream.IntStream;

public class DivideAndConquerSum {

    private static int sum(int rangeStart, int rangeEnd, int numberOfThreads) {
        int totalSum = 0;
        try {
            int numberOfSubRanges = Math.min(numberOfThreads, rangeEnd - rangeStart + 1);

            int numbersPerSubRange = findNumbersPerSubRanges(rangeStart, rangeEnd, numberOfThreads);
            List<Callable<Integer>> subRanges = new ArrayList<>();
            for (int subRangeIndex = 0; subRangeIndex < numberOfSubRanges; subRangeIndex++) {
                int lower = rangeStart + (subRangeIndex * numbersPerSubRange);
                int upper = (subRangeIndex == numberOfThreads - 1) ? rangeEnd : lower + numbersPerSubRange - 1;
                subRanges.add(() -> IntStream.rangeClosed(lower, upper).sum());
            }

            ExecutorService executorPool = Executors.newFixedThreadPool(numberOfSubRanges);
            List<Future<Integer>> resultFromParts = executorPool.invokeAll(subRanges, 10, TimeUnit.SECONDS);
            executorPool.shutdown();

            for (Future<Integer> result : resultFromParts) {
                totalSum += result.get();
            }

        } catch (Exception ex) {
            throw new RuntimeException(ex);
        }

        return totalSum;
    }

    private static int findNumbersPerSubRanges(int rangeStart, int rangeEnd, int numberOfThreads) {
        if (numberOfThreads >= rangeEnd - rangeStart + 1) {
            return 1;
        } else {
            return (rangeEnd - rangeStart + 1) / numberOfThreads;
        }
    }

    public static void main(String[] args) throws InterruptedException {
        try (Scanner input = new Scanner(System.in)) {
            int rangeStart, rangeEnd, numberOfThreads;
            do {
                System.out.print("Enter the range start: ");
                rangeStart = input.nextInt();

                System.out.print("Enter the range end: ");
                rangeEnd = input.nextInt();

                System.out.print("Enter the number of threads: ");
                numberOfThreads = input.nextInt();

                if (rangeStart >= rangeEnd || numberOfThreads < 1) {
                    System.out.println("Warning: range start should be less then range end. Also number of threads should not be less then 1.");
                }
            } while (rangeStart >= rangeEnd || numberOfThreads < 1);


            int sum = DivideAndConquerSum.sum(rangeStart, rangeEnd, numberOfThreads);

            System.out.println(String.format("Sum of numbers in the range [%s, %s] found in %s threads is %s",
                    rangeStart, rangeEnd, numberOfThreads, sum));
        }
    }
}

3.4. Runnable and Thread

These are most basic tools, that could be used. The main advantage - they are available from very first version of Java.

import java.util.Scanner;

public class DivideAndConquerSum {

    private static class Sum implements Runnable {
        private final int lower;
        private final int upper;
        int sum;

        Sum(int lower, int upper) {
            this.lower = lower;
            this.upper = upper;
        }

        @Override
        public void run() {
            for (int number = lower; number <= upper; number++) {
                sum += number;
            }
        }
    }

    private static int sum(int rangeStart, int rangeEnd, int numberOfThreads) throws InterruptedException {
        int numberOfSubRanges = Math.min(numberOfThreads, rangeEnd - rangeStart + 1);
        int numbersPerSubRange = findNumbersPerSubRanges(rangeStart, rangeEnd, numberOfThreads);

        Sum[] sums = new Sum[numberOfSubRanges];
        Thread[] pool = new Thread[numberOfSubRanges];

        for (int index = 0; index < numberOfSubRanges; index++) {
            int lower = rangeStart + (index * numbersPerSubRange);
            int upper = (index == numberOfThreads - 1) ? rangeEnd : lower + numbersPerSubRange - 1;

            Sum task = new Sum(lower, upper);
            sums[index] = task;

            Thread thread = new Thread(task);
            pool[index] = thread;

            thread.start();
        }

        for (Thread thread : pool) {
            thread.join();
        }

        int totalSum = 0;
        for (Sum sum : sums) {
            totalSum += sum.sum;
        }

        return totalSum;
    }

    private static int findNumbersPerSubRanges(int rangeStart, int rangeEnd, int numberOfThreads) {
        if (numberOfThreads >= rangeEnd - rangeStart + 1) {
            return 1;
        } else {
            return (rangeEnd - rangeStart + 1) / numberOfThreads;
        }
    }

    public static void main(String[] args) throws InterruptedException {
        try (Scanner input = new Scanner(System.in)) {
            int rangeStart, rangeEnd, numberOfThreads;
            do {
                System.out.print("Enter the range start: ");
                rangeStart = input.nextInt();

                System.out.print("Enter the range end: ");
                rangeEnd = input.nextInt();

                System.out.print("Enter the number of threads: ");
                numberOfThreads = input.nextInt();

                if (rangeStart >= rangeEnd || numberOfThreads < 1) {
                    System.out.println("Warning: range start should be less then range end. Also number of threads should not be less then 1.");
                }
            } while (rangeStart >= rangeEnd || numberOfThreads < 1);


            int sum = DivideAndConquerSum.sum(rangeStart, rangeEnd, numberOfThreads);

            System.out.println(String.format("Sum of numbers in the range [%s, %s] found in %s threads is %s",
                    rangeStart, rangeEnd, numberOfThreads, sum));
        }
    }
}

4. Conclusion

If you would encounter a task to find sum of integers in the given range, you should choose ForkJoinPool and Stream.parallel for several basic reasons:

  • the produced code is most concise

  • the produced code relies on standard Java library, that is heavily tested and widely used

However, for situations a bit more complex then described toy problem, other approaches become relevant:

Oleksii Zghurskyi