Chapter 07 - Cancellation And Shutdown
7.1 Task cancellation
Bad example of cancelling using flag
Consider this example
package org.example.chapter7;
import java.util.ArrayList;
import java.util.List;
import java.util.Random;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
class Producer extends Thread {
final BlockingQueue<Integer> taskQueue;
volatile boolean cancelled = false;
public Producer(BlockingQueue<Integer> taskQueue) {
this.taskQueue = taskQueue;
}
public void run() {
while (!cancelled) {
try {
this.taskQueue.put((new Random()).nextInt());
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
}
public void cancel() {
this.cancelled = true;
}
}
public class ClientCheckWorse {
public static void main(String[] args) throws InterruptedException {
BlockingQueue<Integer> blockingQueue = new ArrayBlockingQueue<>(2);
Producer producer = new Producer(blockingQueue);
List<Integer> finished = new ArrayList<>();
producer.start();
Thread.sleep(1000);
while (finished.size() < 2) {
finished.add(blockingQueue.take());
}
System.out.println(finished);
producer.cancel(); // Cancel likely does not do anything
producer.join();
}
}
This method is to use client side cancelling where the cancellation is coded in the thread mechanism itself. However it's unreliable and only works when the producer reaches the code while (!cancelled) { for condition checking
In this example, if we're stucking on this.taskQueue.put(...) i.e the queue is full, cancellation will not happening.
To do this properly, we can use Thread.interrupt() to immediately throw interrupt exception
[!important]
There is nothing tied interupt = cancellation. But using interrupt for any other purpose is too fagile
Using interrupt to cancel a thread
package org.example.chapter7;
import java.util.ArrayList;
import java.util.List;
import java.util.Random;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
class InterruptProducer extends Thread {
final BlockingQueue<Integer> taskQueue;
volatile boolean cancelled = false;
public InterruptProducer(BlockingQueue<Integer> taskQueue) {
this.taskQueue = taskQueue;
}
public void run() {
try {
while (!Thread.currentThread().isInterrupted()) { // Technically we dont need this check since interrupt() will also raise exception
this.taskQueue.put((new Random()).nextInt());
}
} catch (InterruptedException e) {
/* Allow thread to exit */
}
}
public void cancel() {
this.interrupt();
}
}
public class UsingInterruptForCancellation {
public static void main(String[] args) throws InterruptedException {
BlockingQueue<Integer> queue = new ArrayBlockingQueue<>(2);
InterruptProducer interruptProducer = new InterruptProducer(queue);
List<Integer> finished = new ArrayList<>();
interruptProducer.start();
Thread.sleep(1000);
while (finished.size() < 2) {
finished.add(queue.take());
}
System.out.println(finished);
interruptProducer.cancel(); // Cancel will throw interrupt exception
interruptProducer.join();
Thread.sleep(1000);
}
}
7.1.2 Interruption Policies
Interruption policy determine how a thread should react when interrupt is detected. Mostly we do the following:
- Exit quickly as possible
- Cleaning up services
- Notifying if necessary
If your function not execute in the thread you own (possibly being called by someone else threadpool). The most reason action for interrupt policies is just throw it.
7.1.3 Responding to interruption
There are 2 practical way to do this
- Propagate the exception
- Restore the interruption status so that the code higher up can deal with it.
In the previous example we swallowed the interruption request. However most of the time that's not allowed. The only reason we did that there is because the thread aware how it's going to be used.
Propagate the exception
Propagate exception is easy just add throws InterruptedException in the method signature
class InterruptRunnable extends Runnable {
BlockingQueue<Integer> queue;
public void propagateNumber() throws InterruptedException {
queue.put(new Random().nextInt());
}
}
Restore interruption status
For scenario that we cannot just add throws InterruptedException (interface method doesn't allow it). We can call the interrupt() method again
class InterruptRunnable implements Runnable {
BlockingQueue<Integer> queue;
@Override
public void run() {
try {
queue.put(new Random().nextInt());
} catch (InterruptedException e) {
// throw e; // If implement directly on run(), we cant do this since the method signature does not allow
Thread.currentThread().interrupt();
}
}
}
Non-cancellable interrupt example
package org.example.chapter7;
import java.util.Random;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
class NonCancellable implements Runnable {
BlockingQueue<Integer> queue;
boolean isInterrupted = false;
public NonCancellable(BlockingQueue<Integer> queue) {
this.queue = queue;
}
@Override
public void run() {
try {
while (true) { // While here to retry incase got interrupted
try {
Thread.sleep(1000);
queue.put(new Random().nextInt());
return;
} catch (InterruptedException e) {
isInterrupted = true; // we dont care about interrupt, but we noted it
}
}
} finally { // Need try and finally here to raise the interrupt
if (isInterrupted) {
Thread.currentThread().interrupt();
}
}
}
}
public class NonCancellableTask {
public static void main(String[] args) throws InterruptedException {
BlockingQueue<Integer> queue = new ArrayBlockingQueue<>(2);
var t = new Thread(new NonCancellable(queue));
t.start();
t.interrupt();
t.join();
System.out.println(queue); // [1183241578]
}
}
If the task is meant to be non-cancellable and is using interruptable blockingmethod (ArrayBlockingQueue), we need to:
- Create a loop for the task and return after we got the critical action done (in this case
queue.put)- The reason for the loop is even if the thread get interrupted, we're still able to resume our critical action
- Store this interrupt somewhere and has the
finallyto propagate the interrupt