Chapter 06 - Executor Framework
Without executor framework
Normally we can execute task like
class ThreadPerTaskWebServer {
public static void main(String[] args) throws IOException {
ServerSocket socket = new ServerSocket(80);
while (true) {
final Socket connection = socket.accept();
Runnable task = new Runnable() {
public void run() {
handleRequest(connection);
}
}
new Thread(task).start();
}
}
}
A few problem here:
- There is no bounded on how many threads you create. If you create too many threads than the server can handle it will be bad. Will run into
OutOfMemoryIssueand hard to recover - Creating and destroying a thread is costly
Using executor framework
class TaskExecutionWebServer {
private static final int NThreads = 100;
private static final Executor exec = Executors.newFixedThreadPool(NThreads);
public static void main(String[] args) throws IOException {
ServerSocket socket = new ServerSocket(80);
Runnable task = new Runnable() {
public void run() {
handleRequest(connection);
}
}
exec.execute(task);
}
}
Now we can reuse the NThreads for creation.
Of course, if you want to create 1 thread per request still, you can create your own executor
public class ThreadPerTaskExecutor implements Executor {
public void execute(Runnable r) {
new Thread(r).start();
}
}
Type of thread pool
newFixedThreadPool: provide a fixed thread pool. If a thread die, it will add new threadnewCachedThreadPool: no upper limit but have the ability to expand on demandsnewSingleThreadExecutor: tasks are guarantee to process sequentially either FIFO, LIFO or some priority ordernewScheduledThreadPool: fixed thread pool that support delay and periodic task
Executor lifecycle
- Running
- Shutting down
- Terminate
It support the following methods:
void shutdown(): gracefully shutdown, wait for all the task to complete and stop taking more task. After it shutdown, it transit to terminated stateList<Runnable> shutdownNow(): force shutdown, return the list of tasks that never get to runboolean isShutdown()boolean isTerminated()boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException
Schedule delayed or periodic tasks
Avoid to use Timer to schedule a task
Timer only have 1 thread
Timer is the old way to schedule task in Java. It launches only 1 thread to manage the background scheduled task. This has a problem:
- If both
taskAtaskBbeing schedule,taskBtakes longer time thentaskA.TaskBwill occupy the thread and blocktaskAfrom being scheduled.
Consider following example:
public class TimerExample {
static AtomicInteger count = new AtomicInteger(0);
public static void main(String[] args) throws InterruptedException {
Timer timer = new Timer();
timer.schedule(new ExampleTask("Task 1", 5000), 1, 1000);
timer.schedule(new ExampleTask("Task 2", 1000), 1, 1000);
}
static class ExampleTask extends TimerTask {
private final String name;
private final int sleepTime;
public ExampleTask(String name, int sleepTime) {
this.name = name;
this.sleepTime = sleepTime;
}
public void run() {
System.out.printf(
"[%s] Executing in: %s, count: %d\n",
this.name,
Thread.currentThread().getName(),
count.addAndGet(1));
try {
Thread.sleep(this.sleepTime);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
}
}
> Task :app:org.example.chapter6.TimerExample.main()
[Task 1] Executing in: Timer-0, count: 1
[Task 2] Executing in: Timer-0, count: 2
[Task 1] Executing in: Timer-0, count: 3
In here, since Task1 takes 5 seconds and Task2 only takes 1 second, we're expecting 5 Task2 between 2 Task1. As a result, Task1 is blocking the thread.
Timer thread is fail-fast
If there is a problem in a timer thread, it will stop the whole program — which affect other scheduled task as well:
public class TimerExample {
static AtomicInteger count = new AtomicInteger(0);
public static void main(String[] args) throws InterruptedException {
Timer timer = new Timer();
timer.schedule(new ThrowTask(), 1);
timer.schedule(new ExampleTask("Task 1", 1000), 1);
}
static class ThrowTask extends TimerTask {
public void run() {
System.out.printf("Executing in: %s\n", Thread.currentThread().getName());
throw new RuntimeException("Something is wrong");
}
}
...
}
> Task :app:org.example.chapter6.TimerExample.main()
Executing in: Timer-0
BUILD SUCCESSFUL in 1s
2 actionable tasks: 2 executed
Configuration cache entry reused.
Exception in thread "Timer-0" java.lang.RuntimeException: Something is wrong
at org.example.chapter6.TimerExample$ThrowTask.run(TimerExample.java:23)
at java.base/java.util.TimerThread.mainLoop(Timer.java:572)
at java.base/java.util.TimerThread.run(Timer.java:522)
10:03:57 PM: Execution finished ':app:org.example.chapter6.TimerExample.main()'.
As a result, the example task is not even executing at all.
Use ScheduledThreadPoolExecutor instead
When dealing with schedule or delay task, we should use ScheduledExecutorService
public class ScheduledThreadPoolExample {
static AtomicInteger count = new AtomicInteger(0);
public static void main(String[] args) throws InterruptedException {
try (ScheduledExecutorService executorService = new ScheduledThreadPoolExecutor(2)) {
executorService.scheduleAtFixedRate(new ExampleTask("Task 1", 5000),1, 1, TimeUnit.SECONDS);
executorService.scheduleAtFixedRate(new ExampleTask("Task 2", 1000),1, 1, TimeUnit.SECONDS);
executorService.awaitTermination(7, TimeUnit.SECONDS);
}
}
static class ExampleTask implements Runnable {
private final String name;
private final int sleepTime;
public ExampleTask(String name, int sleepTime) {
this.name = name;
this.sleepTime = sleepTime;
}
public void run() {
System.out.printf(
"[%s] Executing in: %s, count: %d\n",
this.name,
Thread.currentThread().getName(),
count.addAndGet(1));
try {
Thread.sleep(this.sleepTime);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
}
}
[!note]
The usenew ScheduledThreadPoolExecutor(poolSize)is the same thing as callingExecutors.newScheduledThreadPool(num_thread).ScheduledThreadPoolExecutorwill allow us to use more APIs whereasExcecutors.newScheduledThreadPoolabstract it
> Task :app:org.example.chapter6.ScheduledThreadPoolExample.main()
[Task 2] Executing in: pool-1-thread-2, count: 1
[Task 1] Executing in: pool-1-thread-1, count: 2
[Task 2] Executing in: pool-1-thread-2, count: 3
[Task 2] Executing in: pool-1-thread-2, count: 4
[Task 2] Executing in: pool-1-thread-2, count: 5
[Task 2] Executing in: pool-1-thread-2, count: 6
[Task 1] Executing in: pool-1-thread-1, count: 7
[Task 2] Executing in: pool-1-thread-2, count: 8
As a result, we do see 5 tasks 2 between a task 1.
Similarly in here, this code still work without affecting the second task since now this is a fail-safe method
public class ScheduledThreadPoolExample {
static AtomicInteger count = new AtomicInteger(0);
public static void main(String[] args) throws InterruptedException {
try (ScheduledExecutorService executorService = new ScheduledThreadPoolExecutor(2)) {
executorService.scheduleAtFixedRate(new ThrowTask(),1, 1, TimeUnit.SECONDS);
executorService.scheduleAtFixedRate(new ExampleTask("Task 2", 1000),1, 1, TimeUnit.SECONDS);
executorService.awaitTermination(7, TimeUnit.SECONDS);
}
}
static class ThrowTask implements Runnable {
public void run() {
System.out.printf("Executing in: %s\n", Thread.currentThread().getName());
throw new RuntimeException("Something is wrong");
}
}
...
}
> Task :app:org.example.chapter6.ScheduledThreadPoolExample.main()
Executing in: pool-1-thread-1
[Task 2] Executing in: pool-1-thread-2, count: 1
[Task 2] Executing in: pool-1-thread-2, count: 2
[Task 2] Executing in: pool-1-thread-2, count: 3
[Task 2] Executing in: pool-1-thread-2, count: 4
[Task 2] Executing in: pool-1-thread-2, count: 5
[Task 2] Executing in: pool-1-thread-2, count: 6
We simply just ignore the exception
Callable and Future
A task executed by Executor has 4 phases:
- Created
- Submitted
- Started
- Completed
Result of executor.submit would be a Future
public class CallableAndFutureExample {
public static void main(String[] args) throws ExecutionException, InterruptedException {
ExecutorService executorService = Executors.newCachedThreadPool();
Future<String> result = executorService.submit(() -> "Hello world");
System.out.println(result.get());
executorService.shutdown();
}
}
Hello world
A task could be either Callable or Runnable which will return a Future
Future cancel behavior:
If task has not run, cancel. If task is running, it can be cancel if they're responsive to interruption
Future on its own:
A future can also run by itself without ExecutorService
RunnableFuture<String> future = new FutureTask<>(() -> "Some tasks");
future.run()
This will run on the thread that's execute this code.
[!danger]
future.run()is a blocking call
[!note]
We need to useRunnableFuturehere sinceFutureitself is interface and doesn't provide method for running.
Example with i/o heavy task in parallel
It's a good practice to separate the I/O heavy with the compute heavy task out so that we can run them in parallel. Considering the following example where we render markdown file. We need to
- Download images given urls (I/O heavy)
- Render images after download (CPU heavy)
- Render text content (CPU heavy)
From the look of this, we need to separate into 2 group:
- I/O task
- CPU task
Consider this following example:
Parallel download example
record Page (String content, List<String> imageUrls) {}
@ThreadSafe
class MarkdownRender {
void renderContent(String content) throws InterruptedException {
System.out.printf("Rendering page content: %s\n", content);
Thread.sleep(1000);
}
void renderImage(String url) throws InterruptedException {
System.out.printf("Rendering page image urls %s\n", url);
Thread.sleep(1000);
}
String downloadImage(String url) throws InterruptedException {
System.out.printf("Downloading page image urls %s\n", url);
Thread.sleep(5000);
return url;
}
}
public class PageRenderingExample {
static MarkdownRender markdownRender = new MarkdownRender();
static ExecutorService executorService = Executors.newCachedThreadPool();
public static void main(String[] args) throws InterruptedException {
Page homePage = new Page("Some content", List.of("1.png", "2.png", "3.png"));
serveRequest(homePage);
executorService.shutdown();
}
static void serveRequest(Page page) {
Callable<List<String>> downloadImageTask = () -> {
try {
List<String> downloadedUrls = new ArrayList<>();
for (String url : page.imageUrls())
downloadedUrls.add(markdownRender.downloadImage(url));
return downloadedUrls;
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
};
Future<List<String>> imageDownloadingResult = executorService.submit(downloadImageTask);
try {
markdownRender.renderContent(page.content());
for (String url : imageDownloadingResult.get()) {
markdownRender.renderImage(url);
}
} catch (ExecutionException | InterruptedException e) {
Thread.currentThread().interrupt();
}
}
}
Rendering page content: Some content
Downloading page image urls 1.png
Downloading page image urls 2.png
Downloading page image urls 3.png
Rendering page image urls 1.png
Rendering page image urls 2.png
Rendering page image urls 3.png
BUILD SUCCESSFUL in 19s
We execute as our order above by separating between CPU task and I/O task:
- Main thread can handle the rendering task
- Downloading is sent to background thread
However if you look at this, the downloading task is 5 times slower than the main thread. Scheduling to only 1 worker is not good enough to meet performance. As a result we want to scale this one up
Better parallel download example
static void serveRequestScale(Page page) {
CompletionService<String> completionService = new ExecutorCompletionService<>(executorService);
Runnable downloadImageTask = () -> {
for (String url : page.imageUrls()) {
completionService.submit(() -> markdownRender.downloadImage(url));
}
};
downloadImageTask.run(); // Execute on the current thread
try {
markdownRender.renderContent(page.content());
for (int i = 0; i < page.imageUrls().size(); i++) {
String url = completionService.take().get();
markdownRender.renderImage(url);
}
} catch (InterruptedException | ExecutionException e) {
Thread.currentThread().interrupt();
}
}
CompletionService is a combination of ExecutorService and BlockingQueue. Whenever we call .take() it will give us the first finished Future and we can get() for result.
This method improves our performance by 10s
Rendering page content: Some content
Downloading page image urls 3.png
Downloading page image urls 1.png
Downloading page image urls 2.png
Rendering page image urls 1.png
Rendering page image urls 2.png
Rendering page image urls 3.png
BUILD SUCCESSFUL in 9s
Timeout an execution
We can timeout using future.get(timeout, unit). Note that this is a blocking call
public class ThreadTimeoutExample {
public static void main(String[] args) throws ExecutionException, InterruptedException, TimeoutException {
ExecutorService executorService = Executors.newCachedThreadPool();
RunnableFuture<String> runnableFuture = new FutureTask<>(() -> {
try {
Thread.sleep(5000);
return "Hello world";
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
});
executorService.execute(runnableFuture); // We cant execute future.run() directly here since it's blocking call
System.out.println(runnableFuture.get(1, TimeUnit.SECONDS));
}
}
If it's timeout, future.get() will throw exception
> Task :app:org.example.chapter6.ThreadTimeoutExample.main()
Exception in thread "main" java.util.concurrent.TimeoutException
at java.base/java.util.concurrent.FutureTask.get(FutureTask.java:206)
at org.example.chapter6.ThreadTimeoutExample.main(ThreadTimeoutExample.java:19)
[!danger]
future.get(…)is blocking call
InvokeAll Example
If we have a list of Callable<T> of the same type, we can use the InvokeAll method from executors
public class ExecutorInvokeAllExample {
public static void main(String[] args) throws InterruptedException, ExecutionException {
ExecutorService executorService = Executors.newCachedThreadPool();
Callable<String> task1 = () -> "Task1";
Callable<String> task2 = () -> {
Thread.sleep(5000);
return "task2";
};
List<Future<String>> result = executorService.invokeAll(List.of(task1, task2), 6, TimeUnit.SECONDS);
executorService.shutdown(); // Dont accept new tasks
for (Future<String> each : result) {
System.out.println(each.get());
}
}
}
> Task :app:org.example.chapter6.ExecutorInvokeAllExample.main()
Task1
task2