Chapter 07 - Cancellation And Shutdown
7.1 Task cancellation
Bad example of cancelling using flag
Consider this example
package org.example.chapter7;
import java.util.ArrayList;
import java.util.List;
import java.util.Random;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
class Producer extends Thread {
final BlockingQueue<Integer> taskQueue;
volatile boolean cancelled = false;
public Producer(BlockingQueue<Integer> taskQueue) {
this.taskQueue = taskQueue;
}
public void run() {
while (!cancelled) {
try {
this.taskQueue.put((new Random()).nextInt());
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
}
public void cancel() {
this.cancelled = true;
}
}
public class ClientCheckWorse {
public static void main(String[] args) throws InterruptedException {
BlockingQueue<Integer> blockingQueue = new ArrayBlockingQueue<>(2);
Producer producer = new Producer(blockingQueue);
List<Integer> finished = new ArrayList<>();
producer.start();
Thread.sleep(1000);
while (finished.size() < 2) {
finished.add(blockingQueue.take());
}
System.out.println(finished);
producer.cancel(); // Cancel likely does not do anything
producer.join();
}
}
This method is to use client side cancelling where the cancellation is coded in the thread mechanism itself. However it's unreliable and only works when the producer reaches the code while (!cancelled) { for condition checking
In this example, if we're stucking on this.taskQueue.put(...) i.e the queue is full, cancellation will not happening.
To do this properly, we can use Thread.interrupt() to immediately throw interrupt exception
[!important]
There is nothing tied interupt = cancellation. But using interrupt for any other purpose is too fagile
Using interrupt to cancel a thread
[!Danger]
UnlikeSIGTERMfrom OS. Java threadinterrupt()does not kill the thread. OS can just kill the process because it owns all the locking and so on so it's safe to clean up.Interupt()in java simply just raise the interupt flag, from the code itself we need to handle usingwhile (!Thread.currentThread().isInterrupted()) {...}
package org.example.chapter7;
import java.util.ArrayList;
import java.util.List;
import java.util.Random;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
class InterruptProducer extends Thread {
final BlockingQueue<Integer> taskQueue;
volatile boolean cancelled = false;
public InterruptProducer(BlockingQueue<Integer> taskQueue) {
this.taskQueue = taskQueue;
}
public void run() {
try {
while (!Thread.currentThread().isInterrupted()) { // Technically we dont need this check since interrupt() will also raise exception
this.taskQueue.put((new Random()).nextInt());
}
} catch (InterruptedException e) {
/* Allow thread to exit */
}
}
public void cancel() {
this.interrupt();
}
}
public class UsingInterruptForCancellation {
public static void main(String[] args) throws InterruptedException {
BlockingQueue<Integer> queue = new ArrayBlockingQueue<>(2);
InterruptProducer interruptProducer = new InterruptProducer(queue);
List<Integer> finished = new ArrayList<>();
interruptProducer.start();
Thread.sleep(1000);
while (finished.size() < 2) {
finished.add(queue.take());
}
System.out.println(finished);
interruptProducer.cancel(); // Cancel will throw interrupt exception
interruptProducer.join();
Thread.sleep(1000);
}
}
7.1.2 Interruption Policies
Interruption policy determine how a thread should react when interrupt is detected. Mostly we do the following:
- Exit quickly as possible
- Cleaning up services
- Notifying if necessary
If your function not execute in the thread you own (possibly being called by someone else threadpool). The most reason action for interrupt policies is just throw it.
7.1.3 Responding to interruption
There are 2 practical way to do this
- Propagate the exception
- Restore the interruption status so that the code higher up can deal with it.
In the previous example we swallowed the interruption request. However most of the time that's not allowed. The only reason we did that there is because the thread aware how it's going to be used.
Propagate the exception
Propagate exception is easy just add throws InterruptedException in the method signature
class InterruptRunnable extends Runnable {
BlockingQueue<Integer> queue;
public void propagateNumber() throws InterruptedException {
queue.put(new Random().nextInt());
}
}
Restore interruption status
For scenario that we cannot just add throws InterruptedException (interface method doesn't allow it). We can call the interrupt() method again
class InterruptRunnable implements Runnable {
BlockingQueue<Integer> queue;
@Override
public void run() {
try {
queue.put(new Random().nextInt());
} catch (InterruptedException e) {
// throw e; // If implement directly on run(), we cant do this since the method signature does not allow
Thread.currentThread().interrupt();
}
}
}
Non-cancellable interrupt example
package org.example.chapter7;
import java.util.Random;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
class NonCancellable implements Runnable {
BlockingQueue<Integer> queue;
boolean isInterrupted = false;
public NonCancellable(BlockingQueue<Integer> queue) {
this.queue = queue;
}
@Override
public void run() {
try {
while (true) { // While here to retry incase got interrupted
try {
Thread.sleep(1000);
queue.put(new Random().nextInt());
return;
} catch (InterruptedException e) {
isInterrupted = true; // we dont care about interrupt, but we noted it
}
}
} finally { // Need try and finally here to raise the interrupt
if (isInterrupted) {
Thread.currentThread().interrupt();
}
}
}
}
public class NonCancellableTask {
public static void main(String[] args) throws InterruptedException {
BlockingQueue<Integer> queue = new ArrayBlockingQueue<>(2);
var t = new Thread(new NonCancellable(queue));
t.start();
t.interrupt();
t.join();
System.out.println(queue); // [1183241578]
}
}
If the task is meant to be non-cancellable and is using interruptable blockingmethod (ArrayBlockingQueue), we need to:
- Create a loop for the task and return after we got the critical action done (in this case
queue.put)- The reason for the loop is even if the thread get interrupted, we're still able to resume our critical action
- Store this interrupt somewhere and has the
finallyto propagate the interrupt
7.1.4 Example: timed run
Sometimes we want a mechanism that only a allow a thread to run within a certain time frame.
Bad example
public class TimedRunExample {
private static final ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(1);
/**
* 2 problems:
* - The timedRun execute from main(), Thread.currentThread() is the main thread.
* If the Runnable finished before the interrupt schedule run, whatever left from
* the main thread will be Interrupted.
* - If the code inside the while (!Thread.currentThread().isInterrupted()) is busy, timedRun will return
* in a very very long time
*/
public static void timedRun(Runnable r, long timeout, TimeUnit unit) {
final Thread taskThread = Thread.currentThread();
scheduledExecutorService.schedule(taskThread::interrupt, timeout, unit);
r.run();
}
public static void main(String[] args) {
Runnable task = new Runnable() {
@Override
public void run() {
while (!Thread.currentThread().isInterrupted()) {
// Some code here
}
throw new RuntimeException();
}
};
// The main advantage of this is the `main()` here aware of the exception that we're throwing
timedRun(task, 1, TimeUnit.SECONDS);
System.out.println("Should not continue");
scheduledExecutorService.shutdown();
}
}
This example synchronously run timedRun and then trigger a ScheduledExecutorService to be able to cancel it after a timeout. The reason we want to trigger synchronously is because we want to preserve the exception from the Runnable task. See: Thread won't propagate exception
Now this caused 2 problem:
timedRunexecute on main, waht if theRunnable taskfinished before thetaskThread::interruptscheduled? The main thread will receive interrupt event (or whoever calltimedRunwill do) this is bad since we don't know how the caller gonna handle interrupt event, are they gonna shutdown the server? Who knows. If we not sure then never try to interrupt them.- The thread itself may execute for a very long time before it can check
Thread.currentThread().isInterrupted()again. During this time,timedRunis blocked in the main thread and cannot return.
Better example
public class TimedRunExample {
private static final ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(1);
public static void betterTimedRun(Runnable r, long timeout, TimeUnit unit) throws InterruptedException {
class RethrowRunnable implements Runnable {
public volatile Throwable throwable;
public void run() {
try {
r.run();
} catch (Throwable throwable) {
this.throwable = throwable;
}
}
public void rethrow() {
if (throwable != null) {
throw laundryThrowable(throwable);
}
}
}
RethrowRunnable rethrow = new RethrowRunnable();
Thread thread = new Thread(rethrow);
thread.start();
// Fix problem 1. thread::interrupt only interrupt the new thread, doesn't matter if that thread finished early or not
scheduledExecutorService.schedule(thread::interrupt, timeout, unit);
// Fix problem 2, even if the code is busy loop, it does not blocked betterTimedRun to return
thread.join(unit.toMillis(timeout));
rethrow.rethrow();
}
public static RuntimeException laundryThrowable(Throwable throwable) {
if (throwable instanceof RuntimeException) {
return (RuntimeException) throwable;
} else if (throwable instanceof Error) {
throw (Error) throwable;
}
throw new IllegalStateException("Uncheck", throwable);
}
public static void main(String[] args) {
Runnable task = new Runnable() {
@Override
public void run() {
while (!Thread.currentThread().isInterrupted()) {
// Some code here
}
throw new RuntimeException();
}
};
try {
betterTimedRun(task, 1, TimeUnit.SECONDS);
} catch (InterruptedException e) {
// Force user to handle exception
}
System.out.println("Should not continue");
scheduledExecutorService.shutdown();
}
}
This example address the problem:
betterTimedRunuse a separateThreadto executeRunnable task. So when it interrupt, it interrupt the selectedThread threadonly without affecting the main threadbetterTimedRunwill not block forever, it only wait for a timeout whether or not it finish, it still return so that unblock thebetterTimedRunjust for 1 seconds- It implements
RethrowRunnableso that we can still have visibility if an exception is throw Thread won't propagate exception
Problem:
- Because we use
join(timeout)we dont know if the thread exit normally or join timed out
[!NOTE]
It's java Thread API problem that join does not return a status indicating whether or not it was successful or timeout