Chapter 05 - Building Block

Synchronised collections

The classed created from Collections.synchronised are mostly threadsafe. For example:

Collections.synchronizedList()

Will return a list that will be thread safe even in combound operation no matter how many thread using the same time. This is because each of the class method is used with Synchronisation and therefore locking on the whole object.

However, this still potentially cause some problem. Consider the following example

public Integer getLast(List<Integer> syncList) {
    int lastIndex = syncList.size() - 1;
    return syncList.get(lastIndex);
}

public Integer removeLast(List<Integer> syncList) {
    int lastIndex = syncList.size() - 1;
    return syncList.remove(lastIndex);
}

This might cause a ArrayIndexOutOfRange if the removeLast get called before the getLast

Thread1---LastIndex=10------------------->getLast() <Throw index out of range>
Thread2---Lastindex=10--->removeLast()

To solve this, we can synchronise the outer action as well

public Integer getLast(List<Integer> syncList) {
    synchronized(syncList) {
        int lastIndex = syncList.size() - 1;
        return syncList.get(lastIndex);
    }
}

public Integer removeLast(List<Integer> syncList) {
    synchronized(syncList) {
        int lastIndex = syncList.size() - 1;
        return syncList.remove(lastIndex);
    }
}

Note that if some one iterating through this list like this:

@NotThreadSafe
for (int i = 0; i < syncList.size(); i++) {
    syncList.get(i);
}

This is not our problem, the 2 above methods are thread safe. User can sacrifice some performance and lock on syncList while going through.

@NotThreadSafe
synchronized(syncList) {
    for (int i = 0; i < syncList.size(); i++) {
        syncList.get(i);
    }
}

Alternative to locking the list while going through is to cline this list instead and looping through the copy. Of course this will come with a performance cost

Iterator and concurrent modification

The iterator from Collections.synchronizedList are not thread safe and is a fail-fast

List<T> list = Collections.synchronized(new ArrayList<Integer>());

for (Integer i : list) {
    doSomething(i); // May throw ConcurrentModificationException
}

Sometimes, the reasons for the code fail could be hidden, for example, consider the code below:

class HiddenIterator {
    @GuardedBy("this") private final Set<Integer> integerSet = new HashSet<Integer>();

    public synchronized void add(Integer integer) { integerSet.add(integer); }
    public synchronized void remove(Integer integer) { integerSet.remove(integer); }


    public void addTen() {
        var r = new Random();
        for (int i = 0; i < 10; i++) {
            add(r.nextInt())
        }
        System.out.println("DEBUG: Added 10 elements " + integerSet); // May throw ConcurrentModificationException
    }
}

The reason is, when we do string concatenation, the compiler will turn into calling

  • StringBuilder.append(object)
    • Loop through each object and call toString() which may throw ConcurrentModificationException here when another thread call addTen() the same time

Of course the fix for it would be to client lock the integerSet when debugging as well, however since this is debugging, sometimes we don't pay attention and put the code there.

Similarly, if one calls method such as containsAll, removeAll … which will in turns use iterations, which will cause ConcurrentModificationException

Concurrent Hashmap

ConcurrentHashmap is an improvement on the synchronised collections (HashTable) ConcurrentHashMap vs HashTable. Instead of using a global lock, it used a concept called Lock Stripping. Concurrent hashmap has 16 lock strips and Volatile for read. Which allows

  • 16 concurrent writer.
  • Unlimited read since the map is Volatile and no locking needed.

Iterator from ConcurrentHashmap is weekly consistent instead of fail-fast. Likewise, method like size or isEmptysometimes are wrong and only give approximation. However in a multi-thread environment, these methods are not important.

CopyOnWriteArrayList

CopyOnWriteArrayList is an impovement of synchronised list. The way it works is everytime a writer write into the array, it makes another copy. And then the iterator will synchronise briefly to ensure that the array content is up to date.

CopyOnWriteArrayList use a mutex (reentrant) lock (Mutex vs Semaphore). Allow 1 writer to write at a time. So if 2 writers come in at the same time it will wait until the first one finish.

Reader is not blocked but eventually consistent.

Blocking Queue and consumer pattern

BlockingQueue is great for building a consumer-producer pattern. There are 2 types

  • Unbounded Queue: We can keep adding to the queue, no limit
  • Bounded Queue: The queue has some limit, when the queue is full. We need to wait to put item into a queue
flowchart LR
Producer --> |Publish| Q[blocking queue]
Consumer --> |Take| Q

Some sample class

Special Queue:

Use case for SynchronousQueue is urgent task that need to hand-off immediately rather than putting into a queue and wait for producer to pick up.

Example of a producer/consumer is like file searching. Where we have one indexing the file and another one crawling the directory.

BlockingQueue makes it safe to publish object so that it's thread confined (only belong to 1 thread). This is because only one producer can take the object and it's uniquely owned by that producer.

Work stealing pattern with deque

The BlockingDeque is an double-end queue. Which is perfect for work-stealing.

Loading...

The concept is each worker has a dequeue. It takes a certain task to do from the head put it into its local dequeue. Once its task finish. It can steal other worker queue from the end to avoid conflict.

A typical worker will try to steal from other worker first before attempting to poll from the central queue. It will batch poll so one time it polls for N tasks and store in the queue.

Another way is skip blocking queue directly and push straight to worker dequeue. However it will rquires more complex producer algorithm. We can consider round-robin or hash, etc..

Blocking and interupt

A thread may block or pause. It could be for several reasons:

  1. Acquire a lock,
  2. Waking up from thread.sleep
  3. Waiting for a result of another thread.

It could have the following states:

  • BLOCKED
  • WAITING
  • TIMED_WAITING

Some of them will throw InterruptException. We need to handle this exception. There are 2 main way to handle

  • Propagate it to caller: dont handle the interrupt exception and propagate it to caller. Most of the cases this is ok.
  • Handle the interupt signal: for critical work and we don't want to stop what we're doing. We need a properway to handle this

[!danger]
Do not catch and ignore InteruptException as it will cause huge performance issue

We can also interupt other threads by calling interupt(). The scenario when to interupt is really up to you.

Synchronizers

This refer to synchronises mechanism to synchronise all the threads together. Some of which are

Latches

Block a thread until terminate state reaches (when latch reaches to 0)

Example: CountDownLatch — terminate state is when the count reaches zero (0)

Latches is like a gate. We can use latches to delay a thread. A latches eventually will reach a terminal state. Once it reaches the terminal state, it stays there forever.

Use case:

  • Ensure something does not proceed until some condition triggered

Example:

  1. Ensure computation does not proceed until resource R has been initialise
    • We can use a simple two-state latch CountDownLatch(1)
  2. Ensure that N resources is intialise or have started
    • CountDownLatch(n)
  3. Wait until all parties involved in the activity

For example in this case here we use CountDownLatch to monitor 2 gates.

  • StartGate: binary gate that to starts all the threads
  • EndGate: Wait until n threads to complete. This is necessary to calculate the time to finish n threads
public class TestHarness {  
    static Logger logger = Logger.getAnonymousLogger();  
  
    public long timeTask(int nThreads, final Runnable task) throws InterruptedException {  
        final CountDownLatch startGate = new CountDownLatch(1);  
        final CountDownLatch endGate = new CountDownLatch(nThreads);  
  
        for (int i = 0; i < nThreads; i++) {  
            Thread thread = new Thread() {  
                @Override  
                public void run() {  
                    try {  
                        startGate.await();  
                        try {  
                            task.run();  
                        } finally {  
                            endGate.countDown();  
                        }  
                    } catch (InterruptedException e) {  
                        throw new RuntimeException(e);  
                    }  
                }  
            };  
  
            thread.start();  
        }  
  
        var startTime = Instant.now();  
        startGate.countDown();  
        endGate.await();  
  
        return Duration.between(startTime, Instant.now()).toNanos();  
    };  
  
    public static void main(String[] args) throws InterruptedException {  
        var testHarness = new TestHarness();  
        var result = testHarness.timeTask(5, () -> {  
            try {  
                Thread.sleep(1);  
            } catch (InterruptedException e) {  
                throw new RuntimeException(e);  
            }  
        });  
  
        logger.info("Result: " + result);  
    }  
}

When the exception is complicated, we can create a utility to handle each exception. For example:

private RuntimeException exceptionWrapper(Throwable cause) {  
    if (cause instanceof RuntimeException) {  
        System.out.println("Caught runtime");  
        return (RuntimeException) cause;  
    }  
    return new UndeclaredThrowableException(cause);
}
public String getInfo() throws ExecutionException, InterruptedException {  
    try {  
        return loadInfo.get();  
    } catch (Exception e) {  
        Throwable cause = e.getCause();  
        throw exceptionWrapper(cause);  
    }  
}

Semaphore

Semaphore allows number of locks to acquire at the same time. See Mutex vs Semaphore > Semaphore.

A Binary Semaphore (new Semaphore(1)) is behave the same as a mutex.

Semarphore is useful for implementing resource pools such as database connections, where you block if the pools is empty and unblock when it's not empty. To do this you initialise new Semaphore(poolSize)

Use case

  • Create any datastructure that need bounded (wait until resource available).
    • Suitable to implement rate limiter
public class SemaphoreTest {  
    public static void main(String[] args) throws InterruptedException {  
        Semaphore binarySemaphore = new Semaphore(1);  
  
        Runnable task = () -> {  
            try {  
                binarySemaphore.acquire();  
                System.out.println("Thread " + Thread.currentThread().getName() + " acquired lock.");  
                binarySemaphore.release();  
                System.out.println("Thread " + Thread.currentThread().getName() + " released lock.");  
            } catch (InterruptedException e) {  
                throw new RuntimeException(e);  
            }  
        };  
  
        ExecutorService executorService = Executors.newCachedThreadPool();  
  
        for (var i = 0; i < 2; i++) {  
            executorService.submit(task);  
        }  
  
        executorService.shutdown();  
        executorService.awaitTermination(1, TimeUnit.MINUTES);  
    }  
}

In this example, binarySemaphore only allowed to be acquire once. Which is a similar to a mutex. As a result, the second thread needs. This is basically the same as using a mutex

ReentrantLock lock = new ReentrantLock();
lock.lock()
lock.unlock()

Similarly, this is an example of using resource pool

public class ResourcePool {
    private Logger logger = Logger.getAnonymousLogger();
    private final Semaphore semaphore;

    public ResourcePool(int maxResource) {
        this.semaphore = new Semaphore(maxResource);
    }

    public boolean allocate() {
        try {
            semaphore.acquire();
            logger.info("Allocated resource to " + Thread.currentThread().getName());
            Thread.sleep(1000);
            semaphore.release();
            return true;
        } catch (InterruptedException e) {
            logger.warning(e.getMessage());
        }
        return false;
    }

    public static void main(String[] args) throws InterruptedException {
        ResourcePool resourcePool = new ResourcePool(2);

        ExecutorService executorService = Executors.newCachedThreadPool();
        for (var i = 0; i < 6; i++ )
            executorService.submit(resourcePool::allocate);

        executorService.shutdown();
        executorService.awaitTermination(1, TimeUnit.MINUTES);
    }
}

We can also create any BoundedDatastructure using Semaphore. For example

public class BoundedHashSet<K, V> {  
    private final Logger logger = Logger.getAnonymousLogger();  
    private final Semaphore semaphore;  
    private final Map<K, V> map;  
  
    public BoundedHashSet(int maxItems) {  
        this.semaphore = new Semaphore(maxItems);  
        this.map = Collections.synchronizedMap(new HashMap());  
    }  
  
    public V put(K key, V value) throws InterruptedException {  
        semaphore.acquire();  
        logger.info("Putting pair (%s, %s)".formatted(key, value));  
        this.map.put(key, value);  
        return value;  
    }  
  
    public V remove(K key) {  
        V oldValue = this.map.remove(key);  
  
        if (oldValue != null) {  
            semaphore.release();  
        }  
        return oldValue;  
    }  
  
    public static void main(String[] args) throws InterruptedException {  
        var boundedHashSet = new BoundedHashSet<Integer, String>(2);  
        boundedHashSet.put(1, "hello");  
        boundedHashSet.put(2, "hello");  
        boundedHashSet.remove(1);  
        boundedHashSet.put(3, "hello");  
    }  
}

Barrier

If latches counts down, barrier counts up until it reaches a target. For detail see CyclicBarrier

Basically we setup a new CyclicBarrier(n) and everytime we call cyclicBarrier.await(), it will increment the count by 1, and then block the thread until the barrier reaches n

var cyclicBarrier = new CyclicBarrier(5, () -> {
    // A Runnable that would be execute once by the last worker that called await() after the barrier has reached 5
})

// In some thread
cyclicBarrier.await()

Use case:

Exchanger

Exchanger can be used to communicate between 2 threads by passing variable from one to another. The passed variable is a refernce to the pointer. So 2 thread will have access to that same pointer.

For exchanger, the thread will wait at the exchange point (the point where we trigger exchanger.exchange(info) it will only continue the trigger once it receive an exchange from another thread.

// Thread 1
buffer.add("Some data");
var thread2_result = exchange.exchange(buffer); // Wait until another thread trigger this line
sout("Done");


// Thread 2
buffer.add("Some data 2");
var thread1_result = exchange.exchange(buffer); // Wait until another thread trigger this line
sout("Done");
public class ExchangerDemo {  
    static Logger log = Logger.getAnonymousLogger();  
  
    static class BufferFiller extends Thread {  
        private final int capacity;  
        private final AtomicInteger currentCapacity;  
        private final CopyOnWriteArrayList<String> buffer;  
        private final Exchanger<CopyOnWriteArrayList<String>> exchanger;  
        private final Consumer<CopyOnWriteArrayList<String>> populateBufferCb;  
  
        BufferFiller(int capacity, Exchanger<CopyOnWriteArrayList<String>> exchanger, Consumer<CopyOnWriteArrayList<String>> populateBufferCb) {  
            this.currentCapacity = new AtomicInteger(0);  
            this.capacity = capacity;  
            this.exchanger = exchanger;  
            this.buffer = new CopyOnWriteArrayList<>();  
            this.populateBufferCb = populateBufferCb;  
        }  
  
        @Override  
        public void run() {  
            while(this.currentCapacity.addAndGet(1) < this.capacity) {  
                try {  
                    populateBufferCb.accept(this.buffer);  
                    Thread.sleep(1000);  
                } catch (InterruptedException e) {  
                    throw new RuntimeException(e);  
                }  
            }  
  
            try {  
                CopyOnWriteArrayList<String> result = this.exchanger.exchange(this.buffer);  
                log.info("%s Got result: %s".formatted(Thread.currentThread().getName(), result.stream().reduce((a, b) -> a + b)));  
                result.add("ko-%s".formatted(Thread.currentThread().getName())); // this will modify the value of another thread as well. Passed here is a reference  
                log.info("%s current buffer: %s".formatted(Thread.currentThread().getName(), this.buffer.stream().reduce((a, b) -> a + b)));  
            } catch (InterruptedException e) {  
                throw new RuntimeException(e);  
            }  
  
        }  
    }  
  
    public static void main(String[] args) throws InterruptedException {  
        Exchanger<CopyOnWriteArrayList<String>> exchanger = new Exchanger<>();  
  
        BufferFiller bufferFiller1 = new BufferFiller(5, exchanger, (buffer) -> {  
            log.info("Filled from thread %s a".formatted(Thread.currentThread().getName()));  
            buffer.add("a");  
        });  
  
        BufferFiller bufferFiller2 = new BufferFiller(5, exchanger, (buffer) -> {  
            log.info("Filled from thread %s b".formatted(Thread.currentThread().getName()));  
            buffer.add("b");  
        });  
  
        ExecutorService executorService = Executors.newCachedThreadPool();  
        executorService.submit(bufferFiller1);  
        Thread.sleep(5000);  
        executorService.submit(bufferFiller2);  
        executorService.shutdown();  
        executorService.awaitTermination(1, TimeUnit.MINUTES);  
    }  
}
Sep 21, 2025 5:29:37 PM org.example.chapter5.ExchangerDemo lambda$main$0
INFO: Filled from thread pool-1-thread-1 a
Sep 21, 2025 5:29:38 PM org.example.chapter5.ExchangerDemo lambda$main$0
INFO: Filled from thread pool-1-thread-1 a
Sep 21, 2025 5:29:39 PM org.example.chapter5.ExchangerDemo lambda$main$0
INFO: Filled from thread pool-1-thread-1 a
Sep 21, 2025 5:29:40 PM org.example.chapter5.ExchangerDemo lambda$main$0
INFO: Filled from thread pool-1-thread-1 a
Sep 21, 2025 5:29:42 PM org.example.chapter5.ExchangerDemo lambda$main$1
INFO: Filled from thread pool-1-thread-2 b
Sep 21, 2025 5:29:43 PM org.example.chapter5.ExchangerDemo lambda$main$1
INFO: Filled from thread pool-1-thread-2 b
Sep 21, 2025 5:29:44 PM org.example.chapter5.ExchangerDemo lambda$main$1
INFO: Filled from thread pool-1-thread-2 b
Sep 21, 2025 5:29:45 PM org.example.chapter5.ExchangerDemo lambda$main$1
INFO: Filled from thread pool-1-thread-2 b
Sep 21, 2025 5:29:46 PM org.example.chapter5.ExchangerDemo$BufferFiller run
INFO: pool-1-thread-1 Got result: Optional[bbbb]
Sep 21, 2025 5:29:46 PM org.example.chapter5.ExchangerDemo$BufferFiller run
INFO: pool-1-thread-2 Got result: Optional[aaaa]
Sep 21, 2025 5:29:46 PM org.example.chapter5.ExchangerDemo$BufferFiller run
INFO: pool-1-thread-1 current buffer: Optional[aaaa]
Sep 21, 2025 5:29:46 PM org.example.chapter5.ExchangerDemo$BufferFiller run
INFO: pool-1-thread-2 current buffer: Optional[bbbbko-pool-1-thread-1]

Building a cache function

To build a cache function, we can start simple and then gradually address concurrency problem.

For example, build one that would able to take a Function<T, R> and return the value.

public static void main(String[] args) {  
    Function<Integer, BigInteger> expensiveFunction = new Function<Integer, BigInteger>() {  
        @Override  
        public BigInteger apply(Integer integer) {  
            return BigInteger.valueOf(new Random().nextInt(10000));  
        }  
    };  
  
    Function<Integer, BigInteger> cachedFunction = new CachedFunction<>(expensiveFunction);  
    System.out.println(cachedFunction.apply(1));  
    System.out.println(cachedFunction.apply(1));
}

1. Get a generic idea.

A generic idea is we can use ConcurrentHashMap to store the key as the arg and the value is the return type. Next time we can just query this if the cache exists

@NotThreadSafe
public static class CachedFunction<T, R> implements Function<T, R> {  
    private final Function<T, R> expensiveFunction;  
    private final ConcurrentHashMap<T, R> concurrentHashMap;
      
    public CachedFunction(Function<T, R> expensiveFunction) {  
        this.expensiveFunction = expensiveFunction;  
        this.concurrentHashMap = new ConcurrentHashMap<>();  
    }
    
    @Override  
    public R apply(T t) {
        R result = this.concurrentHashMap.get(t);
        if (result == null) {
            result = this.expensiveFunction.apply(t);
            this.concurrentHashMap.put(t, result);
        }
        return result;
    }
}

Now we can manually imagine if there is 2 threads go to the apply the same time:

Thread 1Thread 2
R result = this.concurrentHashMap.get(t);R result = this.concurrentHashMap.get(t);
if (result == null) {if (result == null) {
result = this.expensiveFunction.apply(t);result = this.expensiveFunction.apply(t);

As a result, we can see that these 2 guys execute the expensiveFunction.apply() the same time. Which is bad. The idea is we should have only 1 thread calling expensiveFunction.apply()

2. Use future

In here we can use FutureTask which have 2 option:

  • run() to run the task
  • get() to get the result of a task

The idea is when another comes, it should only calls get() if there is a thread before started it

@NotThreadSafe
public static class CachedFunction<T, R> implements Function<T, R> {  
    private final Function<T, R> expensiveFunction;  
    private final ConcurrentHashMap<T, FutureTask<R>> concurrentHashMap;
      
    public CachedFunction(Function<T, R> expensiveFunction) {  
        this.expensiveFunction = expensiveFunction;  
        this.concurrentHashMap = new ConcurrentHashMap<>();  
    }
    
    @Override  
    public R apply(T t) {
        FutureTask<R> futureTask = this.concurrentHashMap.get(t);
        
        if (futureTask == null) {
            FutureTask<R> ft = new FutureTask(() -> this.expensiveFunction.apply(t))
            futureTask = this.concurrentHashMap.put(t, ft);
            if (futureTask == null) {
                futureTask = ft;
                futureTask.run();
            }
        }
        
        try {
            return futureTask.get();
        } catch (ExecutionException | InterruptedException e) {  
            throw new RuntimeException(e);  
        }
    }
}

Now this will solve our issue where next thread can reuse. However it still fail into a problem where 2 thread put the same future together:

Thread 1Thread 2
futureTask = this.concurrentHashMap.put(t, ft);futureTask = this.concurrentHashMap.put(t, ft);

3. Use atomic

As a result, we need to use putIfAbsent() atomic method from ConcurrentHashmap

Thread 1Thread 2
futureTask = this.concurrentHashMap.putIfAbsent(t, ft);atomic method blocking — cant run
futureTask = this.concurrentHashMap.putIfAbsent(t, ft); // Not absent anymore

As a result, we have the final code:

@ThreadSafe
public class CachedFunction<T, R> implements Function<T, R> {  
    private final Function<T, R> expensiveFunction;  
    private final ConcurrentHashMap<T, FutureTask<R>> concurrentHashMap;  
  
    public CachedFunction(Function<T, R> expensiveFunction) {  
        this.expensiveFunction = expensiveFunction;  
        this.concurrentHashMap = new ConcurrentHashMap<>();  
    }  
  
    @Override  
    public R apply(T t) {  
        FutureTask<R> futureTask = concurrentHashMap.get(t);  
  
        if (futureTask == null) {  
            FutureTask<R> ft = new FutureTask<>(() -> expensiveFunction.apply(t));  
            futureTask = concurrentHashMap.putIfAbsent(t, ft);  
  
            if (futureTask == null) {  
                futureTask = ft;  
                futureTask.run();  
            }  
        }  
  
        try {  
            return futureTask.get();  
        } catch (ExecutionException | InterruptedException e) {  
            throw new RuntimeException(e);  
        }  
    }  
}