Chapter 06 - Executor Framework

Without executor framework

Normally we can execute task like

class ThreadPerTaskWebServer {
    public static void main(String[] args) throws IOException {
        ServerSocket socket = new ServerSocket(80);
        
        while (true) {
            final Socket connection = socket.accept();
            Runnable task = new Runnable() {
                public void run() {
                    handleRequest(connection);
                }
            }
            
            new Thread(task).start();
        }
    }
}

A few problem here:

  • There is no bounded on how many threads you create. If you create too many threads than the server can handle it will be bad. Will run into OutOfMemoryIssue and hard to recover
  • Creating and destroying a thread is costly

Using executor framework

class TaskExecutionWebServer {
    private static final int NThreads = 100;
    private static final Executor exec = Executors.newFixedThreadPool(NThreads);
    
    public static void main(String[] args) throws IOException {
        ServerSocket socket = new ServerSocket(80);
        Runnable task = new Runnable() {
            public void run() {
                handleRequest(connection);
            }
        }
        exec.execute(task);
    }
}

Now we can reuse the NThreads for creation.

Of course, if you want to create 1 thread per request still, you can create your own executor

public class ThreadPerTaskExecutor implements Executor {
    public void execute(Runnable r) {
        new Thread(r).start();
    }
}

Type of thread pool

  1. newFixedThreadPool: provide a fixed thread pool. If a thread die, it will add new thread
  2. newCachedThreadPool: no upper limit but have the ability to expand on demands
  3. newSingleThreadExecutor: tasks are guarantee to process sequentially either FIFO, LIFO or some priority order
  4. newScheduledThreadPool: fixed thread pool that support delay and periodic task

Executor lifecycle

  1. Running
  2. Shutting down
  3. Terminate

It support the following methods:

  1. void shutdown(): gracefully shutdown, wait for all the task to complete and stop taking more task. After it shutdown, it transit to terminated state
  2. List<Runnable> shutdownNow(): force shutdown, return the list of tasks that never get to run
  3. boolean isShutdown()
  4. boolean isTerminated()
  5. boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException

Schedule delayed or periodic tasks

Avoid to use Timer to schedule a task

Timer only have 1 thread

Timer is the old way to schedule task in Java. It launches only 1 thread to manage the background scheduled task. This has a problem:

  • If both taskA taskB being schedule, taskB takes longer time then taskA. TaskB will occupy the thread and block taskA from being scheduled.

Consider following example:

public class TimerExample {  
    static AtomicInteger count = new AtomicInteger(0);  
  
    public static void main(String[] args) throws InterruptedException {  
        Timer timer = new Timer();  
        timer.schedule(new ExampleTask("Task 1", 5000), 1, 1000);  
        timer.schedule(new ExampleTask("Task 2", 1000), 1, 1000);  
    }  
  
    static class ExampleTask extends TimerTask {  
        private final String name;  
        private final int sleepTime;  
  
        public ExampleTask(String name, int sleepTime) {  
            this.name = name;  
            this.sleepTime = sleepTime;  
        }  
  
        public void run() {  
            System.out.printf(  
                "[%s] Executing in: %s, count: %d\n",  
                this.name,  
                Thread.currentThread().getName(),  
                count.addAndGet(1));  
            try {  
                Thread.sleep(this.sleepTime);  
            } catch (InterruptedException e) {  
                throw new RuntimeException(e);  
            }  
        }  
    }  
}
> Task :app:org.example.chapter6.TimerExample.main()
[Task 1] Executing in: Timer-0, count: 1
[Task 2] Executing in: Timer-0, count: 2
[Task 1] Executing in: Timer-0, count: 3

In here, since Task1 takes 5 seconds and Task2 only takes 1 second, we're expecting 5 Task2 between 2 Task1. As a result, Task1 is blocking the thread.

Timer thread is fail-fast

If there is a problem in a timer thread, it will stop the whole program — which affect other scheduled task as well:

  
public class TimerExample {  
    static AtomicInteger count = new AtomicInteger(0);  
  
    public static void main(String[] args) throws InterruptedException {  
        Timer timer = new Timer();  
        timer.schedule(new ThrowTask(), 1);  
        timer.schedule(new ExampleTask("Task 1", 1000), 1);  
    }  
  
    static class ThrowTask extends TimerTask {  
        public void run() {  
            System.out.printf("Executing in: %s\n", Thread.currentThread().getName());  
            throw new RuntimeException("Something is wrong");  
        }  
    }
    ...
}
> Task :app:org.example.chapter6.TimerExample.main()
Executing in: Timer-0

BUILD SUCCESSFUL in 1s
2 actionable tasks: 2 executed
Configuration cache entry reused.
Exception in thread "Timer-0" java.lang.RuntimeException: Something is wrong
	at org.example.chapter6.TimerExample$ThrowTask.run(TimerExample.java:23)
	at java.base/java.util.TimerThread.mainLoop(Timer.java:572)
	at java.base/java.util.TimerThread.run(Timer.java:522)
10:03:57 PM: Execution finished ':app:org.example.chapter6.TimerExample.main()'.

As a result, the example task is not even executing at all.

Use ScheduledThreadPoolExecutor instead

When dealing with schedule or delay task, we should use ScheduledExecutorService

public class ScheduledThreadPoolExample {  
    static AtomicInteger count = new AtomicInteger(0);  
  
    public static void main(String[] args) throws InterruptedException {  
        try (ScheduledExecutorService executorService = new ScheduledThreadPoolExecutor(2)) {  
            executorService.scheduleAtFixedRate(new ExampleTask("Task 1", 5000),1, 1, TimeUnit.SECONDS);  
            executorService.scheduleAtFixedRate(new ExampleTask("Task 2", 1000),1, 1, TimeUnit.SECONDS);  
            executorService.awaitTermination(7, TimeUnit.SECONDS);  
        }  
    }  
  
    static class ExampleTask implements Runnable {  
        private final String name;  
        private final int sleepTime;  
  
        public ExampleTask(String name, int sleepTime) {  
            this.name = name;  
            this.sleepTime = sleepTime;  
        }  
  
        public void run() {  
            System.out.printf(  
                "[%s] Executing in: %s, count: %d\n",  
                this.name,  
                Thread.currentThread().getName(),  
                count.addAndGet(1));  
            try {  
                Thread.sleep(this.sleepTime);  
            } catch (InterruptedException e) {  
                throw new RuntimeException(e);  
            }  
        }  
    }  
}

[!note]
The use new ScheduledThreadPoolExecutor(poolSize) is the same thing as calling Executors.newScheduledThreadPool(num_thread). ScheduledThreadPoolExecutor will allow us to use more APIs whereas Excecutors.newScheduledThreadPool abstract it

> Task :app:org.example.chapter6.ScheduledThreadPoolExample.main()
[Task 2] Executing in: pool-1-thread-2, count: 1
[Task 1] Executing in: pool-1-thread-1, count: 2
[Task 2] Executing in: pool-1-thread-2, count: 3
[Task 2] Executing in: pool-1-thread-2, count: 4
[Task 2] Executing in: pool-1-thread-2, count: 5
[Task 2] Executing in: pool-1-thread-2, count: 6
[Task 1] Executing in: pool-1-thread-1, count: 7
[Task 2] Executing in: pool-1-thread-2, count: 8

As a result, we do see 5 tasks 2 between a task 1.

Similarly in here, this code still work without affecting the second task since now this is a fail-safe method

public class ScheduledThreadPoolExample {  
    static AtomicInteger count = new AtomicInteger(0);  
  
    public static void main(String[] args) throws InterruptedException {  
        try (ScheduledExecutorService executorService = new ScheduledThreadPoolExecutor(2)) {  
            executorService.scheduleAtFixedRate(new ThrowTask(),1, 1, TimeUnit.SECONDS);  
            executorService.scheduleAtFixedRate(new ExampleTask("Task 2", 1000),1, 1, TimeUnit.SECONDS);  
            executorService.awaitTermination(7, TimeUnit.SECONDS);  
        }  
    }  
  
    static class ThrowTask implements Runnable {  
        public void run() {  
            System.out.printf("Executing in: %s\n", Thread.currentThread().getName());  
            throw new RuntimeException("Something is wrong");  
        }  
    }
    ...
}
> Task :app:org.example.chapter6.ScheduledThreadPoolExample.main()
Executing in: pool-1-thread-1
[Task 2] Executing in: pool-1-thread-2, count: 1
[Task 2] Executing in: pool-1-thread-2, count: 2
[Task 2] Executing in: pool-1-thread-2, count: 3
[Task 2] Executing in: pool-1-thread-2, count: 4
[Task 2] Executing in: pool-1-thread-2, count: 5
[Task 2] Executing in: pool-1-thread-2, count: 6

We simply just ignore the exception

Callable and Future

A task executed by Executor has 4 phases:

  1. Created
  2. Submitted
  3. Started
  4. Completed

Result of executor.submit would be a Future

public class CallableAndFutureExample {  
    public static void main(String[] args) throws ExecutionException, InterruptedException {  
        ExecutorService executorService = Executors.newCachedThreadPool();  
        Future<String> result = executorService.submit(() -> "Hello world");  
        System.out.println(result.get());  
        executorService.shutdown();  
    }  
}
Hello world

A task could be either Callable or Runnable which will return a Future

Future cancel behavior:

If task has not run, cancel. If task is running, it can be cancel if they're responsive to interruption

Future on its own:

A future can also run by itself without ExecutorService

RunnableFuture<String> future = new FutureTask<>(() -> "Some tasks");
future.run()

This will run on the thread that's execute this code.

[!danger]
future.run() is a blocking call

[!note]
We need to use RunnableFuture here since Future itself is interface and doesn't provide method for running.

Example with i/o heavy task in parallel

It's a good practice to separate the I/O heavy with the compute heavy task out so that we can run them in parallel. Considering the following example where we render markdown file. We need to

  1. Download images given urls (I/O heavy)
  2. Render images after download (CPU heavy)
  3. Render text content (CPU heavy)

From the look of this, we need to separate into 2 group:

  1. I/O task
  2. CPU task

Consider this following example:

Parallel download example

  
record Page (String content, List<String> imageUrls) {}  
  
@ThreadSafe  
class MarkdownRender {  
    void renderContent(String content) throws InterruptedException {  
        System.out.printf("Rendering page content: %s\n", content);  
        Thread.sleep(1000);  
    }  
  
    void renderImage(String url) throws InterruptedException {  
        System.out.printf("Rendering page image urls %s\n", url);  
        Thread.sleep(1000);  
    }  
  
    String downloadImage(String url) throws InterruptedException {  
        System.out.printf("Downloading page image urls %s\n", url);  
        Thread.sleep(5000);  
        return url;  
    }  
}  
  
public class PageRenderingExample {  
    static MarkdownRender markdownRender = new MarkdownRender();  
    static ExecutorService executorService = Executors.newCachedThreadPool();  
  
    public static void main(String[] args) throws InterruptedException {  
        Page homePage = new Page("Some content", List.of("1.png", "2.png", "3.png"));  
        serveRequest(homePage);  
        executorService.shutdown();  
    }  
  
    static void serveRequest(Page page) {  
        Callable<List<String>> downloadImageTask = () -> {  
            try {  
                List<String> downloadedUrls = new ArrayList<>();  
                for (String url : page.imageUrls())  
                    downloadedUrls.add(markdownRender.downloadImage(url));  
                return downloadedUrls;  
            } catch (InterruptedException e) {  
                throw new RuntimeException(e);  
            }  
        };  
  
        Future<List<String>> imageDownloadingResult = executorService.submit(downloadImageTask);  
  
        try {  
            markdownRender.renderContent(page.content());  
            for (String url : imageDownloadingResult.get()) {  
                markdownRender.renderImage(url);  
            }  
        } catch (ExecutionException | InterruptedException e) {  
            Thread.currentThread().interrupt();  
        }  
    }  
}
Rendering page content: Some content
Downloading page image urls 1.png
Downloading page image urls 2.png
Downloading page image urls 3.png
Rendering page image urls 1.png
Rendering page image urls 2.png
Rendering page image urls 3.png

BUILD SUCCESSFUL in 19s

We execute as our order above by separating between CPU task and I/O task:

  • Main thread can handle the rendering task
  • Downloading is sent to background thread

However if you look at this, the downloading task is 5 times slower than the main thread. Scheduling to only 1 worker is not good enough to meet performance. As a result we want to scale this one up

Better parallel download example

static void serveRequestScale(Page page) {  
    CompletionService<String> completionService = new ExecutorCompletionService<>(executorService);  
    Runnable downloadImageTask = () -> {  
        for (String url : page.imageUrls()) {  
            completionService.submit(() -> markdownRender.downloadImage(url));  
        }  
    };  
  
    downloadImageTask.run(); // Execute on the current thread
  
    try {  
        markdownRender.renderContent(page.content());  
        for (int i = 0; i < page.imageUrls().size(); i++) {  
            String url = completionService.take().get();  
            markdownRender.renderImage(url);  
        }  
    } catch (InterruptedException | ExecutionException e) {  
        Thread.currentThread().interrupt();  
    }  
}

CompletionService is a combination of ExecutorService and BlockingQueue. Whenever we call .take() it will give us the first finished Future and we can get() for result.

This method improves our performance by 10s

Rendering page content: Some content
Downloading page image urls 3.png
Downloading page image urls 1.png
Downloading page image urls 2.png
Rendering page image urls 1.png
Rendering page image urls 2.png
Rendering page image urls 3.png

BUILD SUCCESSFUL in 9s

Timeout an execution

We can timeout using future.get(timeout, unit). Note that this is a blocking call

public class ThreadTimeoutExample {  
    public static void main(String[] args) throws ExecutionException, InterruptedException, TimeoutException {  
        ExecutorService executorService = Executors.newCachedThreadPool();  
        RunnableFuture<String> runnableFuture = new FutureTask<>(() -> {  
            try {  
                Thread.sleep(5000);  
                return "Hello world";  
            } catch (InterruptedException e) {  
                throw new RuntimeException(e);  
            }  
        });  
        executorService.execute(runnableFuture);  // We cant execute future.run() directly here since it's blocking call
  
        System.out.println(runnableFuture.get(1, TimeUnit.SECONDS));  
    }  
}

If it's timeout, future.get() will throw exception

> Task :app:org.example.chapter6.ThreadTimeoutExample.main()
Exception in thread "main" java.util.concurrent.TimeoutException
	at java.base/java.util.concurrent.FutureTask.get(FutureTask.java:206)
	at org.example.chapter6.ThreadTimeoutExample.main(ThreadTimeoutExample.java:19)

[!danger]
future.get(…) is blocking call

InvokeAll Example

If we have a list of Callable<T> of the same type, we can use the InvokeAll method from executors

public class ExecutorInvokeAllExample {  
    public static void main(String[] args) throws InterruptedException, ExecutionException {  
        ExecutorService executorService = Executors.newCachedThreadPool();  
  
        Callable<String> task1 = () -> "Task1";  
        Callable<String> task2 = () -> {  
            Thread.sleep(5000);  
            return "task2";  
        };  
  
        List<Future<String>> result = executorService.invokeAll(List.of(task1, task2), 6, TimeUnit.SECONDS);  
  
        executorService.shutdown();  // Dont accept new tasks
  
        for (Future<String> each : result) {  
            System.out.println(each.get());  
        }  
    }  
}
> Task :app:org.example.chapter6.ExecutorInvokeAllExample.main()
Task1
task2