Chapter 07 - Cancellation And Shutdown

7.1 Task cancellation

Bad example of cancelling using flag

Consider this example

package org.example.chapter7;  
  
import java.util.ArrayList;  
import java.util.List;  
import java.util.Random;  
import java.util.concurrent.ArrayBlockingQueue;  
import java.util.concurrent.BlockingQueue;  
  
class Producer extends Thread {  
    final BlockingQueue<Integer> taskQueue;  
    volatile boolean cancelled = false;  
  
    public Producer(BlockingQueue<Integer> taskQueue) {  
        this.taskQueue = taskQueue;  
    }  
  
    public void run() {  
        while (!cancelled) {  
            try {  
                this.taskQueue.put((new Random()).nextInt());  
            } catch (InterruptedException e) {  
                throw new RuntimeException(e);  
            }  
        }  
  
    }  
  
    public void cancel() {  
        this.cancelled = true;  
    }  
}  
  
public class ClientCheckWorse {  
    public static void main(String[] args) throws InterruptedException {  
        BlockingQueue<Integer> blockingQueue = new ArrayBlockingQueue<>(2);  
        Producer producer = new Producer(blockingQueue);  
        List<Integer> finished = new ArrayList<>();  
  
        producer.start();  
  
        Thread.sleep(1000);  
  
        while (finished.size() < 2) {  
            finished.add(blockingQueue.take());  
        }  
  
        System.out.println(finished);  
  
        producer.cancel(); // Cancel likely does not do anything  
        producer.join();  
    }  
}

This method is to use client side cancelling where the cancellation is coded in the thread mechanism itself. However it's unreliable and only works when the producer reaches the code while (!cancelled) { for condition checking

In this example, if we're stucking on this.taskQueue.put(...) i.e the queue is full, cancellation will not happening.

To do this properly, we can use Thread.interrupt() to immediately throw interrupt exception

[!important]
There is nothing tied interupt = cancellation. But using interrupt for any other purpose is too fagile

Using interrupt to cancel a thread

[!Danger]
Unlike SIGTERM from OS. Java thread interrupt() does not kill the thread. OS can just kill the process because it owns all the locking and so on so it's safe to clean up. Interupt() in java simply just raise the interupt flag, from the code itself we need to handle using while (!Thread.currentThread().isInterrupted()) {...}

package org.example.chapter7;  
  
  
import java.util.ArrayList;  
import java.util.List;  
import java.util.Random;  
import java.util.concurrent.ArrayBlockingQueue;  
import java.util.concurrent.BlockingQueue;  
  
class InterruptProducer extends Thread {  
    final BlockingQueue<Integer> taskQueue;  
    volatile boolean cancelled = false;  
  
    public InterruptProducer(BlockingQueue<Integer> taskQueue) {  
        this.taskQueue = taskQueue;  
    }  
  
    public void run() {  
        try {  
            while (!Thread.currentThread().isInterrupted()) {  // Technically we dont need this check since interrupt() will also raise exception
                this.taskQueue.put((new Random()).nextInt());  
            }  
        } catch (InterruptedException e) {  
            /* Allow thread to exit */  
        }  
    }  
  
    public void cancel() {  
        this.interrupt();  
    }  
}  
  
public class UsingInterruptForCancellation {  
    public static void main(String[] args) throws InterruptedException {  
        BlockingQueue<Integer> queue = new ArrayBlockingQueue<>(2);  
        InterruptProducer interruptProducer = new InterruptProducer(queue);  
  
        List<Integer> finished = new ArrayList<>();  
  
        interruptProducer.start();  
        Thread.sleep(1000);  
  
        while (finished.size() < 2) {  
            finished.add(queue.take());  
        }  
  
        System.out.println(finished);  
  
        interruptProducer.cancel(); // Cancel will throw interrupt exception  
        interruptProducer.join();  
        Thread.sleep(1000);  
  
    }  
}

7.1.2 Interruption Policies

Interruption policy determine how a thread should react when interrupt is detected. Mostly we do the following:

  1. Exit quickly as possible
  2. Cleaning up services
  3. Notifying if necessary

If your function not execute in the thread you own (possibly being called by someone else threadpool). The most reason action for interrupt policies is just throw it.

7.1.3 Responding to interruption

There are 2 practical way to do this

  1. Propagate the exception
  2. Restore the interruption status so that the code higher up can deal with it.

In the previous example we swallowed the interruption request. However most of the time that's not allowed. The only reason we did that there is because the thread aware how it's going to be used.

Propagate the exception

Propagate exception is easy just add throws InterruptedException in the method signature

class InterruptRunnable extends Runnable {
    BlockingQueue<Integer> queue;

    public void propagateNumber() throws InterruptedException {
        queue.put(new Random().nextInt());
    }
}

Restore interruption status

For scenario that we cannot just add throws InterruptedException (interface method doesn't allow it). We can call the interrupt() method again

class InterruptRunnable implements Runnable {
    BlockingQueue<Integer> queue;

    @Override
    public void run() {
        try {
            queue.put(new Random().nextInt());
        } catch (InterruptedException e) {
//            throw e; // If implement directly on run(), we cant do this since the method signature does not allow
            Thread.currentThread().interrupt();
        }
    }
}

Non-cancellable interrupt example

package org.example.chapter7;


import java.util.Random;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;

class NonCancellable implements Runnable {
    BlockingQueue<Integer> queue;
    boolean isInterrupted = false;

    public NonCancellable(BlockingQueue<Integer> queue) {
        this.queue = queue;
    }

    @Override
    public void run() {
        try {
            while (true) { // While here to retry incase got interrupted
                try {
                    Thread.sleep(1000);
                    queue.put(new Random().nextInt());
                    return;
                } catch (InterruptedException e) {
                    isInterrupted = true; // we dont care about interrupt, but we noted it
                }
            }
        } finally { // Need try and finally here to raise the interrupt
            if (isInterrupted) {
                Thread.currentThread().interrupt();
            }
        }
    }
}

public class NonCancellableTask {
    public static void main(String[] args) throws InterruptedException {
        BlockingQueue<Integer> queue = new ArrayBlockingQueue<>(2);
        var t = new Thread(new NonCancellable(queue));
        t.start();
        t.interrupt();
        t.join();
        System.out.println(queue); // [1183241578]
    }
}

If the task is meant to be non-cancellable and is using interruptable blockingmethod (ArrayBlockingQueue), we need to:

  1. Create a loop for the task and return after we got the critical action done (in this case queue.put)
    • The reason for the loop is even if the thread get interrupted, we're still able to resume our critical action
  2. Store this interrupt somewhere and has the finally to propagate the interrupt

7.1.4 Example: timed run

Sometimes we want a mechanism that only a allow a thread to run within a certain time frame.

Bad example

public class TimedRunExample {
    private static final ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(1);
    
    /**
     * 2 problems:
     * - The timedRun execute from main(), Thread.currentThread() is the main thread.
     *   If the Runnable finished before the interrupt schedule run, whatever left from
     *   the main thread will be Interrupted.
     * - If the code inside the while (!Thread.currentThread().isInterrupted()) is busy, timedRun will return
     *   in a very very long time
     */
    public static void timedRun(Runnable r, long timeout, TimeUnit unit) {
        final Thread taskThread = Thread.currentThread();
        scheduledExecutorService.schedule(taskThread::interrupt, timeout, unit);
        r.run();
    }
    
    public static void main(String[] args) {
        Runnable task = new Runnable() {
            @Override
            public void run() {
                while (!Thread.currentThread().isInterrupted()) {
                    // Some code here
                }
                throw new RuntimeException();
            }
        };
        
        // The main advantage of this is the `main()` here aware of the exception that we're throwing
        timedRun(task, 1, TimeUnit.SECONDS);
        System.out.println("Should not continue");
        scheduledExecutorService.shutdown();
    }
}

This example synchronously run timedRun and then trigger a ScheduledExecutorService to be able to cancel it after a timeout. The reason we want to trigger synchronously is because we want to preserve the exception from the Runnable task. See: Thread won't propagate exception

Now this caused 2 problem:

  1. timedRun execute on main, waht if the Runnable task finished before the taskThread::interrupt scheduled? The main thread will receive interrupt event (or whoever call timedRun will do) this is bad since we don't know how the caller gonna handle interrupt event, are they gonna shutdown the server? Who knows. If we not sure then never try to interrupt them.
  2. The thread itself may execute for a very long time before it can check Thread.currentThread().isInterrupted() again. During this time, timedRun is blocked in the main thread and cannot return.

Better example

public class TimedRunExample {
    private static final ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(1);
    
    public static void betterTimedRun(Runnable r, long timeout, TimeUnit unit) throws InterruptedException {
        class RethrowRunnable implements Runnable {
            public volatile Throwable throwable;
            public void run() {
                try {
                    r.run();
                } catch (Throwable throwable) {
                    this.throwable = throwable;
                }
            }

            public void rethrow() {
                if (throwable != null) {
                    throw laundryThrowable(throwable);
                }
            }
        }
        RethrowRunnable rethrow = new RethrowRunnable();
        Thread thread = new Thread(rethrow);
        thread.start();
        // Fix problem 1. thread::interrupt only interrupt the new thread, doesn't matter if that thread finished early or not
        scheduledExecutorService.schedule(thread::interrupt, timeout, unit);
        // Fix problem 2, even if the code is busy loop, it does not blocked betterTimedRun to return
        thread.join(unit.toMillis(timeout));
        rethrow.rethrow();
    }

    public static RuntimeException laundryThrowable(Throwable throwable) {
        if (throwable instanceof RuntimeException) {
            return (RuntimeException) throwable;
        } else if (throwable instanceof Error) {
            throw (Error) throwable;
        }
        throw new IllegalStateException("Uncheck", throwable);
    }
    
    public static void main(String[] args) {
        Runnable task = new Runnable() {
            @Override
            public void run() {
                while (!Thread.currentThread().isInterrupted()) {
                    // Some code here
                }
                throw new RuntimeException();
            }
        };
        
        try {
            betterTimedRun(task, 1, TimeUnit.SECONDS);
        } catch (InterruptedException e) {
            // Force user to handle exception
        }
        
        System.out.println("Should not continue");
        scheduledExecutorService.shutdown();
    }
}

This example address the problem:

  1. betterTimedRun use a separate Thread to execute Runnable task. So when it interrupt, it interrupt the selected Thread thread only without affecting the main thread
  2. betterTimedRun will not block forever, it only wait for a timeout whether or not it finish, it still return so that unblock the betterTimedRun just for 1 seconds
  3. It implements RethrowRunnable so that we can still have visibility if an exception is throw Thread won't propagate exception

Problem:

  • Because we use join(timeout) we dont know if the thread exit normally or join timed out

[!NOTE]
It's java Thread API problem that join does not return a status indicating whether or not it was successful or timeout