ForkJoinPool
ForkJoinPool is only suited for recursion task where we have to divide into subtask (fork
) and accumulate it in the end (join
)
First we fork()
the main task to different subtask. This action is asynchronous
After that we join()
to wait for each task to be completed. This action is synchronous
How it works internally
ForkJoinPool implements a special type of worker which has a WorkQueue
. Which follows a Work Stealing algorithm.
Each thread in ForkJoinPool
will have a Deque:
When a thread is free, it will try to steal work from another thread queue.
To avoid confliction between the main worker and the support worker. For the main worker we do a LIFO whereas the support thread we take the job in the FIFO order.
[!Note]
Doing the task as of LIFO order has better performance comparing to FIFO. This is because of CPU caching, the task that is new to the queue are still being cached in the CPU.
Usages
ForkJoinTask
To use ForkJoinPool
. It's recommended to use ForkJoinPool.commonPool()
instead of initialise one to save resources.
The task that submitted in ForkJoinPool
needs to belong to ForkJoinTask
There are mainly 2 types:
RecursiveAction
: recursion function without returning any valuesRecursiveTask<V>
: recursion function returning a value
API Usages
forkJoinTask.invoke()
— synchronous, equivalent toforkJoinTask.fork().join()
forkJoinTask.fork()
— asynchronous, execute thecompute()
in aForkJoinPool
forkJoinTask.join()
— synchronous, wait for theForkJoinTask
to be completed. Throws Unchecked exception.forkJoinTask.get()
— synchronous, similar tojoin()
, wait for the task to be completed. Throws Checked exception and clients need to handle the exception. Exception will be wrapped asExecutionException
.ForkJoinTask.invokeAll(List<ForkJoinTask>)
— static function, it calls thefork()
asynchronouly and automatically wait until everything is done synchronously. Recommended way to useForkJoinTask
since we keep track of the order.
Example
public class Main {
private final static ForkJoinPool threadPool = ForkJoinPool.commonPool();
public static void main(String[] args) throws ExecutionException, InterruptedException {
var task = threadPool.submit(new RecursiveCapitalise("hello world"));
System.out.println("finished: " + task.get());
}
static class RecursiveCapitalise extends RecursiveTask<String> {
private final String word;
public RecursiveCapitalise(String word) {
this.word = word;
}
@Override
protected String compute() {
if (word.length() == 1) {
System.out.printf("%s %s%n", word, Thread.currentThread().getName());
return word.toUpperCase();
}
try {
Thread.sleep(1000);
var wordLength = word.length();
var firstHalf = new RecursiveCapitalise(word.substring(0, wordLength / 2));
var secondHalf = new RecursiveCapitalise(word.substring(wordLength / 2, wordLength));
return RecursiveTask.invokeAll(
List.of(firstHalf, secondHalf)
).stream().map(ForkJoinTask::join).collect(Collectors.joining());
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
}
}
> Task :concurrency-recursion:Main.main()
r ForkJoinPool.commonPool-worker-2
ForkJoinPool.commonPool-worker-1
e ForkJoinPool.commonPool-worker-6
h main
l ForkJoinPool.commonPool-worker-3
l ForkJoinPool.commonPool-worker-4
d ForkJoinPool.commonPool-worker-4
l ForkJoinPool.commonPool-worker-5
o ForkJoinPool.commonPool-worker-4
w ForkJoinPool.commonPool-worker-7
o ForkJoinPool.commonPool-worker-9
finished: HELLO WORLD
Note: in here when finishing, we should be calling task.get()
instead of task.invoke()
. So the following is wrong:
public static void main(String[] args) throws ExecutionException, InterruptedException {
var task = threadPool.submit(new RecursiveCapitalise("hello world"));
System.out.println("finished: " + task.invoke());
}
The reason why is doing .submit()
it executes the task already. Calling invoke()
will execute it a second time.