ForkJoinPool

ForkJoinPool is only suited for recursion task where we have to divide into subtask (fork) and accumulate it in the end (join)

First we fork() the main task to different subtask. This action is asynchronous
Pasted image 20231105234428.png

After that we join() to wait for each task to be completed. This action is synchronous

Pasted image 20231105234511.png

How it works internally

ForkJoinPool implements a special type of worker which has a WorkQueue. Which follows a Work Stealing algorithm.

Each thread in ForkJoinPool will have a Deque:

Pasted image 20231105234855.png

When a thread is free, it will try to steal work from another thread queue.

Pasted image 20231105234930.png

To avoid confliction between the main worker and the support worker. For the main worker we do a LIFO whereas the support thread we take the job in the FIFO order.

[!Note]
Doing the task as of LIFO order has better performance comparing to FIFO. This is because of CPU caching, the task that is new to the queue are still being cached in the CPU.

Usages

ForkJoinTask

To use ForkJoinPool. It's recommended to use ForkJoinPool.commonPool() instead of initialise one to save resources.

The task that submitted in ForkJoinPool needs to belong to ForkJoinTask

There are mainly 2 types:

  • RecursiveAction: recursion function without returning any values
  • RecursiveTask<V>: recursion function returning a value

API Usages

  • forkJoinTask.invoke()synchronous, equivalent to forkJoinTask.fork().join()

  • forkJoinTask.fork()asynchronous, execute the compute() in a ForkJoinPool

  • forkJoinTask.join()synchronous, wait for the ForkJoinTask to be completed. Throws Unchecked exception.

  • forkJoinTask.get()synchronous, similar to join(), wait for the task to be completed. Throws Checked exception and clients need to handle the exception. Exception will be wrapped as ExecutionException.

  • ForkJoinTask.invokeAll(List<ForkJoinTask>) — static function, it calls the fork() asynchronouly and automatically wait until everything is done synchronously. Recommended way to use ForkJoinTask since we keep track of the order.

Example

public class Main {  
  
  private final static ForkJoinPool threadPool = ForkJoinPool.commonPool();  
  
  public static void main(String[] args) throws ExecutionException, InterruptedException {  
    var task = threadPool.submit(new RecursiveCapitalise("hello world"));  
    System.out.println("finished: " + task.get());  
  }  
  
  
  static class RecursiveCapitalise extends RecursiveTask<String> {  
  
    private final String word;  
  
    public RecursiveCapitalise(String word) {  
      this.word = word;  
    }  
  
    @Override  
    protected String compute() {  
  
  
      if (word.length() == 1) {  
        System.out.printf("%s %s%n", word, Thread.currentThread().getName());  
        return word.toUpperCase();  
      }  
  
      try {  
        Thread.sleep(1000);  
        var wordLength = word.length();  
  
        var firstHalf = new RecursiveCapitalise(word.substring(0, wordLength / 2));  
        var secondHalf = new RecursiveCapitalise(word.substring(wordLength / 2, wordLength));  
  
        return RecursiveTask.invokeAll(  
          List.of(firstHalf, secondHalf)  
        ).stream().map(ForkJoinTask::join).collect(Collectors.joining());  
  
      } catch (InterruptedException e) {  
        throw new RuntimeException(e);  
      }  
    }  
  }  
}
> Task :concurrency-recursion:Main.main()
r ForkJoinPool.commonPool-worker-2
  ForkJoinPool.commonPool-worker-1
e ForkJoinPool.commonPool-worker-6
h main
l ForkJoinPool.commonPool-worker-3
l ForkJoinPool.commonPool-worker-4
d ForkJoinPool.commonPool-worker-4
l ForkJoinPool.commonPool-worker-5
o ForkJoinPool.commonPool-worker-4
w ForkJoinPool.commonPool-worker-7
o ForkJoinPool.commonPool-worker-9
finished: HELLO WORLD

Note: in here when finishing, we should be calling task.get() instead of task.invoke(). So the following is wrong:

public static void main(String[] args) throws ExecutionException, InterruptedException {  
  var task = threadPool.submit(new RecursiveCapitalise("hello world"));  
  System.out.println("finished: " + task.invoke());  
}

The reason why is doing .submit() it executes the task already. Calling invoke() will execute it a second time.