Chapter 05 - Building Block

Synchronised collections

The classed created from Collections.synchronised are mostly threadsafe. For example:

Collections.synchronizedList()

Will return a list that will be thread safe even in combound operation no matter how many thread using the same time. This is because each of the class method is used with Synchronisation and therefore locking on the whole object.

However, this still potentially cause some problem. Consider the following example

public Integer getLast(List<Integer> syncList) {
    int lastIndex = syncList.size() - 1;
    return syncList.get(lastIndex);
}

public Integer removeLast(List<Integer> syncList) {
    int lastIndex = syncList.size() - 1;
    return syncList.remove(lastIndex);
}

This might cause a ArrayIndexOutOfRange if the removeLast get called before the getLast

Thread1---LastIndex=10------------------->getLast() <Throw index out of range>
Thread2---Lastindex=10--->removeLast()

To solve this, we can synchronise the outer action as well

public Integer getLast(List<Integer> syncList) {
    synchronized(syncList) {
        int lastIndex = syncList.size() - 1;
        return syncList.get(lastIndex);
    }
}

public Integer removeLast(List<Integer> syncList) {
    synchronized(syncList) {
        int lastIndex = syncList.size() - 1;
        return syncList.remove(lastIndex);
    }
}

Note that if some one iterating through this list like this:

@NotThreadSafe
for (int i = 0; i < syncList.size(); i++) {
    syncList.get(i);
}

This is not our problem, the 2 above methods are thread safe. User can sacrifice some performance and lock on syncList while going through.

@NotThreadSafe
synchronized(syncList) {
    for (int i = 0; i < syncList.size(); i++) {
        syncList.get(i);
    }
}

Alternative to locking the list while going through is to cline this list instead and looping through the copy. Of course this will come with a performance cost

Iterator and concurrent modification

The iterator from Collections.synchronizedList are not thread safe and is a fail-fast

List<T> list = Collections.synchronized(new ArrayList<Integer>());

for (Integer i : list) {
    doSomething(i); // May throw ConcurrentModificationException
}

Sometimes, the reasons for the code fail could be hidden, for example, consider the code below:

class HiddenIterator {
    @GuardedBy("this") private final Set<Integer> integerSet = new HashSet<Integer>();

    public synchronized void add(Integer integer) { integerSet.add(integer); }
    public synchronized void remove(Integer integer) { integerSet.remove(integer); }


    public void addTen() {
        var r = new Random();
        for (int i = 0; i < 10; i++) {
            add(r.nextInt())
        }
        System.out.println("DEBUG: Added 10 elements " + integerSet); // May throw ConcurrentModificationException
    }
}

The reason is, when we do string concatenation, the compiler will turn into calling

  • StringBuilder.append(object)
    • Loop through each object and call toString() which may throw ConcurrentModificationException here when another thread call addTen() the same time

Of course the fix for it would be to client lock the integerSet when debugging as well, however since this is debugging, sometimes we don't pay attention and put the code there.

Similarly, if one calls method such as containsAll, removeAll … which will in turns use iterations, which will cause ConcurrentModificationException

Concurrent Hashmap

ConcurrentHashmap is an improvement on the synchronised collections (HashTable) ConcurrentHashMap vs HashTable. Instead of using a global lock, it used a concept called Lock Stripping. Concurrent hashmap has 16 lock strips and Volatile for read. Which allows

  • 16 concurrent writer.
  • Unlimited read since the map is Volatile and no locking needed.

Iterator from ConcurrentHashmap is weekly consistent instead of fail-fast. Likewise, method like size or isEmptysometimes are wrong and only give approximation. However in a multi-thread environment, these methods are not important.

CopyOnWriteArrayList

CopyOnWriteArrayList is an impovement of synchronised list. The way it works is everytime a writer write into the array, it makes another copy. And then the iterator will synchronise briefly to ensure that the array content is up to date.

CopyOnWriteArrayList use a mutex (reentrant) lock (Mutex vs Semaphore). Allow 1 writer to write at a time. So if 2 writers come in at the same time it will wait until the first one finish.

Reader is not blocked but eventually consistent.

Blocking Queue and consumer pattern

BlockingQueue is great for building a consumer-producer pattern. There are 2 types

  • Unbounded Queue: We can keep adding to the queue, no limit
  • Bounded Queue: The queue has some limit, when the queue is full. We need to wait to put item into a queue
flowchart LR
Producer --> |Publish| Q[blocking queue]
Consumer --> |Take| Q

Some sample class

Special Queue:

Use case for SynchronousQueue is urgent task that need to hand-off immediately rather than putting into a queue and wait for producer to pick up.

Example of a producer/consumer is like file searching. Where we have one indexing the file and another one crawling the directory.

BlockingQueue makes it safe to publish object so that it's thread confined (only belong to 1 thread). This is because only one producer can take the object and it's uniquely owned by that producer.

Work stealing pattern with dequeu

The BlockingDeque is an double-end queue. Which is perfect for work-stealing.

==⚠ Switch to EXCALIDRAW VIEW in the MORE OPTIONS menu of this document. ⚠== You can decompress Drawing data with the command palette: 'Decompress current Excalidraw file'. For more info check in plugin settings under 'Saving'

Excalidraw Data

Text Elements

Blocking Queue ^9nuDSwBo

Producer ^iSEsujXg

consumer1 ^eELIfmZu

consumer2 ^EcGtadFH

Task1 ^msESEVn2

Task2 ^ZYpmz9gZ

Task3 ^7C3DU1jC

Consumer2
takes task
from head ^XkZw0A3c

consumer1 finished all its task and start ^8UKq081R

stealing from the end ^UKS2RPVS

consumer personal task queue ^KUsJTwiY

consumer personal task queue ^JSs3vlBz

%%

Drawing

N4KAkARALgngDgUwgLgAQQQDwMYEMA2AlgCYBOuA7hADTgQBuCpAzoQPYB2KqATLZMzYBXUtiRoIACyhQ4zZAHoFAc0JRJQgEYA6bGwC2CgF7N6hbEcK4OCtptbErHALRY8RMpWdx8Q1TdIEfARcZgRmBShcZQUebQBGOJ4aOiCEfQQOKGZuAG1wMFAwYogSbgh9AGUjAGYEADkABkkABWUAEQBxSoBJABYAQUIAFTZ8AC0U4shYRHLA7CiOZWCp

ksxuZwA2AA5G7QBOLYPGxoBWHhqDgHYaxuv+EphN6+vtS6O+nm/4xq34+JbR6QCgkdTcHg7A7aLanRo1LZbHi/A41a5nYFSBCEZTSbg7LbaK4HEk1KFbPo7PpkzHWFbiVCNTHMKCkNgAawQAGExmxSOUAMTxBDC4VrSCaXDYdnKNlCDjEHn4PnlVnWZhwXCBLLiiAAM0I+HwlVgqwkgg8upZbM5AHUwZIIczWRyECaYGb0Baypi5biOOEcmgmQVI

GxNdg1M80L8Q9MILLhHAesQg6hcgBdTF68gZFPcDhCI2YwgKrDlXCNXVyhUB5hporx2YMmqhgC+zIQCGI3D68J4Z0a8Rq8UxjBY7C4aB2o9DDCYrE49U4YnxyL710uJeY7TSUG73D1BDCmM0wgVAFFghksmn8tNCqGSs3VVgoOKSmUJAdC+1KhQACE2AgJ8OyfRt4y/dB6E0AAJAA1AAZdpiAAfVtWDiAA/QAEUAA04FgzR2WGJBgWfeAGQgLU2S

oUDQyzOchDgYhcH3HsY3RLYyV+PpEipTEiA4dkCyLfBBLYaUDzQI98DCAowOKCDSg49AfyEP9AOAzEXwkfdMHfTENjQbYh20akeC2M4dj2PidhpOdo1QZw+kJM4rgpC5NxnKEHjnUFiHBGM7m0LyampdFMUkbFcXfNAB1pZZPTjEprVdJUVQkIURRy3VJWlRN5UVXl+T08gOA1LVMkMucDSNd1PQgb0e2dG0EHtQLHXi1rXQaqjmurYR/UDbgUrD

CMo24WNMUK5NUzyRj4xzXA81UwtiznUtiHLCRcHiQairrNN1vEucwmk1AAR4ElXMiudx0XKdUFeMcF0nZcOFXTiyX+SyDlnSCdz3C7ZJPOczyKq90mqu9FpKZjWPYqauJ4xo+MhPpBNLES0BOiSpNU0GEB0t9ygA5VpVLZRUBwoQEDp3U9U4KBKkIIwGR4Mb9WZgAxFbDScnYSYMgYiGUJ6IDELImF1ccoHMAhRZxCX9BIYhVkxPQslwUsmHzCQq

lqBpmjaLpekGEYxkmTE+RxUsCFIgyyYp9kqZpumGdpIQoDYAAlcI2YZVk6axgNYJivEY3eDE53IChHbi9Byck13lnd+mkAUx5lKgiBUM6do9UQ4ZmHGABVb2AHkOAvehrgvABNVDw11XT0AWJZ6V1YznK2a4dlC3YDj2HYB0RUfMSc5xvjebijh81F/p4TdMQCoLeHshISWHwEoT6dyAZKaKcUj1AZxhOFkQRS40ZXuc6WSnrOQy0r0Gy0UyPBqU

ZRrYrlVf6A5VKrahqktQ0xpTT9TGD6M6Lo7QOidLAtqfVygDV9ENSQR1Ro2wmrAKapwZpyjmrDbMuYED61QHjTaZYe7UWSOgw6I00AQRmJRbgrZpiKVSl2VS8QDjXXiGcRIQjXoTk4FNSyojHofS+rwU4dwAThT8oDXcwQkYyWPMTcG55iBQxvNkPI4EnysLmHpUm5FIC50IJUC8zAhAACs8LKBAg+LhSljEqXKMQAA0lyToABHTQ8QBiVEwPUVC

xAdg4Rwi0X2LQoA4Q/CYqiNE2B0VcQxTECM2IXXiCjGcaN+KYznEJHGlCxL405ITTRWcCg51UqUGxdjHHOJ0mwsxTsjKbDOLcbQfxrhImXqiPofEgSOU2Hw6ENQPLXESIkAZ8Ihb+QQWgTcW9NxWSRDHeMx9YoQm2SUB+DIuZpWfiVQUH8xSnm/oVBUL9VRAM1CAxm4CUHmmgS1JBroOrrz4F8zkbyvQfIOsNes2C5zhilJNGMBC5yzRTCQ2qZCK

FUMgjQisNQDq1iYeUja8Zzq8OpPCU4OwRH3TeuItADl4wPXeiuBkvFl5nD6LcA425VEIHUagImp4dF6JhgtLJLEcm8PybxIpocymopKC7EGmjhaJwgC0NkxAhBiH5NmZmrN2YQi5kzLIfNVb4EFgqpW4tyhS33Bq8lpB5buDNSrNWGs5xayiLrUgFC84FyLiXcuVca510bs3OAupbb+AdqTCQyq2CqvVbqXA3s/YBx1WgYOWj4xCQQOHE+id4jR1

9JQBO5Ro2xplrU9xkEGk+L8YE4JoTwmROibE+JiS2mmPQKkqgXSTLuWhNcPit1LibmpIfSAU94h8RhHMvs9kjh92KfGNeXVGRRQjonbi7w4SJE5tceEwjEpd2DE/bk5ysqXM/vGfKP8dH3LKuqJ51UXn1Ugag4Fx6fnLr+fiuBboX3vMtAw0FaYuaQsjHgmFXN4XzTQJmUhK1yFrQqdQ7atDcB9CxcQLBuMkP4p4VNK4ZwzgHGskRqRk58HKJKLS

pc9LdUDOIzZF6m0gZqLlXJdNJQIaXmvAKmDcNIDZK5Xks43ECl8X4V+mV2NRJ4plSnNjYN4xwDYKWAxMGnz3gfGNYojQnz8bAJp6Y2mwAbs5qcbd9w90Dj05kkpoQoA8n0KrGQ3YWgqZ1Nh2TAgoi2oAltKm3AWEYB41kT1+dC7F1LhXKA1da71ybi3ci3NsBCGA30vhPTPjXT+JSbirkkvKFwCGtAfRzJMr4ccPhfZYQcOKG4jACo/MKgC55063

6tRQAGKQWi0VcCIa8w14gXWeshAaayJw9Jm7e0zbqIIZ4KAKYQOWx8lbyhwGZbgHYOABjhU0AAKQvHtyQ9RlDKHGLaehc424QA7kc7uEy+EXyuDwL4lJXjXTGfGcdiJtD92yy9pErwKSrxWagNE+w7g9OXiyoZ6JKOQF2afIjhwDiuTRvw/ufQbifcOUlY5x7b1v3PXlG5v9CeAPvVVHU2ZXl/qBQB/57VQeSe88gunTU32xwwVhldELcFOWmnCo

hCLBVIvgyinDn50W7TOBhnnQW261bAPVglvZrp7AWTUMjlLUAiZx5AajHAZEMr13kseWvmMcq5Ty7RkMQtqfTPxiAgnclisKRjSVMm2tyYJoeGpdXs6bQaQgC8iEeh6n0OMIQrd2noH0qA9YuqiR8J2LM84nNF764gFPTmcQSSp5uASXdIm/gg86twYeKPh5Ij7o0KEVIF1HzXfsg9j9Gfk/frla5BUyenrj48qnCfIB1QgR6KBDPv1tQ/YgyfvV

2doK534TBOKQP8/wZB4X0HHdwdWl7ks0uO1bDlzi6VAg8MxmHqj3YImLc0opU9Cr2uje0ZK7utElwDmWJY5yxbvK7fQ1vFF3jBd1FRvzEwlRKWk1a0qV/yu0jXQC1jsQyFIH2k1SyG1Q5j1V5n5mNXxFNTFglmCD1CH3nFtQVnwAdVVESxdWZh1gDA9X629zDFIDtg4AjSdgkEQKEGQNQPvkTX9lYBTVQDTVDizWbyjgSljkLXgMlk4CQKYH2mW3

qXKAvGwE6CiGIB5lghj3bWgHMTnB7kJEhDRjJHsgBEIzRDZXGWChqHeCpBshEy+CHFuEbxBFB0r23gJEsl3XrxGVXRzRb3vjx3BVnzOX/guRyiuS/h7xvT7wpwqgfWp1qlpzH1fQn1Sh/Wn26kZ0BQ53SMgD9GXzBSPT5yhXA0ulhXjCg0RSWmRUYP3xQwrGuGP2KNxSYKanPzPlhGpEB0/1IMenYX+if2N24B6RnCx2snh1KG/2t3lVt24wAId1

gyYmFSEzd3RgEkgOEj3xKXk2qXYwVQtTkO4KYEuyWi1UDl1TQKgENQFjwLgJFgIPKCIJILlnIMoL0moPjFdToL1nqIhRYPDXwCLU4OOOQLOMOX4OTSDlIBDi2LEICIkL6LjhBIQLBNOJcWV0D1WwkGGE0AoCEFR1GEK0aC5GcEIDwkQmcFgi5AGAAFkdCqJbtgju1nJEhCRiNrhURvg9gTg8krCvtkYzhDhjh+4uTUQ7gag+il12Fd0iRzhNx+1F

Sbgel/C9k0BkcbpuiMcqRsdW98d284jO8ojL1SdYjwi70EjB8n1R9GoF9Qimdy9siHTcj7SShCiedV8yiBdKiShqigCShlpd9oDkMdoO0dgWiGwnxoBY8lcVdOjXgrhd1hxR1+jyM0BEQ+jDcRjgo/pEQhjLdgZ9jFNOM+V7c7wjEHxLEGkWgegjB4h6BEIeZhgWhJB7FEJNAzhYJcBUI6TK5rhK4kkYzdDO1MS6tbNgDVjXcwDxUPctipVJdIBZ

ViylsA86kg9ygYIEJkI0IMIsJcICIiISIL0KIRzus0l7sTIARoRXJM8WVqQWVBFJ4JlwpzJplFFZkpTd1iMy9fkel3h6MlEoQb8pjEdc14QEg+Ffg9hst7gCR9SQiMi2oO9idu9r0ipyc1QrTnkadn1Uj/0YEHSsjeBj1XTOd4wPSV8cFvT19CEkwRc+Md8EMdi0VGjdoDhIzAtoy24eB2xOwLobIvhiNIRb8qN798FXC0yaNPoGRKsTh5L2Uiy/

cDj5jdFyyAyBMpzQDRNxV+FRKlyoC2iYCVzMRlNVMKyHxDMdMLFdMHx9MrKwBnB+JIL6M8lhxzh/h9dihroEgKRh5+468kQ69vg9NyIHLp5/zl5jggLiNuJ4dihfhbDrzoLGhYL+4tgbNpgnd8B7NHNnN2I3NVMWKMiOsmtHBlhirIBMgsJ/MKrmFoy0h9FPVDY6gmhWgOhuh+ghhRgJgPxktUtRpft94RlPKbp0QvgWdIBCtitLpStBk4qpSYdY

REQ+LYEOths0let/j4xqqNqKAtqHkJtggpsoAZtMQ5s0lFslCNyo06yGymyWy2yOyuyey+yBzGSKxzyu0DDRjSsuToLAQBl+1UdnzOJ9gfJEQh4+wwDfzl0JrfLrpbhSUVTPy1TT4KtDgAc0RBlIQoQEKSiHSULIiTyJQzSMK4isLgFH1cLbTx9CKkLvlmdSL59yL3TucqLSiwMfSN96Kt9ljajxdtqpc2KO0BhOL6qHxhyOZVrcMLoscbhXhCMs

zxL4pfhhiX8z5+ECR89UzCAZjYDL0yzFiaj4YtKhSdLCl+FJLSlKqIBlzlKSzIAzLAD1NLLjFjNbKsqwrjFp5R5tAfIscUQZ1UqLFnBekagRK7g0YKQ+xkRQqNMfb4bAREayQiM4dhxQ6Mara4qcabIDhMrihsrcqDB8rXN3NE5T8mofMoAyqWtUAgtGrqpPU6ShA8J6hNA6S2JmAuR4JbQEAqTUJvE8IuQjB6g+qmYUs0tuJ4R3tjhqRgLU8Csi

spo+gZaShqra66qjK1rbU9qDqQydqFQ97RtDqjkTqzq5wLqFsVzrqcT0AXI4B6B4JrhcAeBYJfZNB9B/FJA8J4I8IDhKgAJOgPqJBmSu5WSnLvhQohwqQAbxNVTrDLpz5ARiVd1LJjg+59KIAZTpxCR95wpEg/gXsAQ+E0bE4scUdZkorplUdplgcgjD1edCajTULoj0K7kKaB8cLki8K7TWbWdGanSSKciWb8ibt2bWivSubaKhdeaTbh86jbat

owzqIAJxb67uLYy16z8LoiNIRvJ7In9kYWcpLn8ZKppERTgxjqRFLWMTLVL+UXb0xKzJbrt48hzc59BmALwbF4IOBkhQJyIWFqz5gEBK4cIehRZ7FMB6ADghBhgeZ6BbR4J/EAJ6Aj8LEpbPraIxzlcJzTbEZpyLaNjrbDLK77aNF2Nb7PwGkvGfGLw/GITklXxOkfqryIKhx3I56Bwr5h5QbLpUQYRrJU868ejdhJKcHUAWVzJbJdg7odlxDeA+

i7sCaGawjMoidiaScYjyaLT+9KduGwFeG6bPkiKmaRH8L6d6aCiJHgNqLpGIM6LmIGLt8xdgzt7WKVHcAuR1HK7Vd4o68HD+IsHDcppEh1bzGNTR5bJzDbGf97HDb/99F5HnczbOIZz3dNiM0ynFy7a9iHaOMZgZDhhQh2ReDzj0DLiAXrjbjcDpx8DlYLVqoZZRE7VFYniDYnUSbZDtZ3VPUH6n6X636P6v6f6/6AGgHQ1AT7ZgTiXSXyXISfYB

CqXhDYTCW7bdZs11TLp80pD445XmAyXMSuFlCJA6TWBMBKA4A9s4AYA9seYjBMA8I+h6gYADhYIjAQH24EBFg7sIGRx3h7zbJR4Y7ARJKBc/bhxl4BxwpTCBlYb8Q8H3J0YiG+JryyHexoQSQqGBkaHqQERJKVmmG1mT09mIBjTuWr1bk/4Nn4iqakijnaa0jrmq6p9zmXTRGW3KLJH7noUKiebnm+ancgzmKD7havn2h1GFctHOF+LVIJTrpk2p

jQX4proIXZFBFUQ+5R4CyVElLKnHaIAuM1LjbDEHxgmsmOkh8anyhxgG44B9AjADgztcm2x8nNLCntLUYSnPcx2lz8WD3VysT1y76IA72H2n2X220qJ3GIGARft/gSUQ356EHBSL85qRxIR/pYRkasHJnpnKRo7U8+iwLAj4wi2Tkf0iaP5tn2Ga2AFKbEiSCR8yKxHTlHTflmbLm8iu3bnELxoaLHnZHB2UWR2JcBtlHUMLxfncX/nZqo3GgbgW

UjHOITHsyNb/o6G94pi9arcDbSykXeNXnJzP3zbv2IDsXti/28XfdAPDjcTSWmnuZKWhDOYaWcCTUHjOsOWEDmXrU78yD7UfOKguXdQfi+WGlzXCBLWKBrXbX7XHXnXXX3XPWbZpW2DZWOD0ASXDWnOE0lXoTuARD4StX0bdWKLpCsuIAcv2QAm1yK0b2zXW727O7sge6+6B6h6R6x7oPQnfWWS2nUB/h0sJ1e14gqR+4SR+nxu81XJxNIdOTaGE

3cHzJk3CGkQ03SG5xSOSss2bh5luIrh836HyPgjVmBH1mAEK3aPq3MKuHqaeGm2CLTmS3iKTH2PWPeOl9PTe3yjBcqjN9RPFHrPJOKweYp3NH204y53exU9fgJ1d2xKxEnoXslkAvpENa3KuTU9Ey4XZiVLEWFjkWz3pgL23H9CqzPEJBrguQah2gy54h7EfnAmXHQPaz6zGzmzWz2zOzuzez+zBzMnrtRz6IvaVjTP0Xim+EXtf2PmfcqkCXqmQ

nqfaf6fGefneur3Lyz59gscuSBwelXIqRSNEG+FbCjgmU5mrIhFUz8O3hCOjf5mm9ESln8bi2LvS3a3ru0LbvOGDmHvG3PuXuPe3uuO+GxHu27nOa+3/u/TAeNL9Rge5fLED9qJgGGFsVWi/nOiAQ/g69Udl4VOBm1P78czLoEQhFRn4Q8f9OJQjbifGLxeRUzPwC5zLOFyBsKnuU5imwDX2RMVriMCrjapsCjVPPe/HjGXOC/PZYmA2WKDgvVZi

B1ZuXwv6Dm7muO6u72v+7nBB7h7R6pXWD2DFUauB++CCvBCYS4TLOETtW81JCKv9Wquz/jXsTGvvXwnIn8BonYn4nEnkmqTdJl62ohfVteTlCOmVmshXBFO6IMkFgycg7pfs+eW8oRw8hZ5Jmuwf2kRnJB9h56pwSSjt3d7UQzuxA9jtRy7xsNfeZbRjtaRppB8rQmRdtiW1yKsRmA0UYPuI2+4c14woGGPr6Smrx9G+Atd5pXVB67RtCGfTDDim

nbtpeKs7M6J0XsjAV+SytZHtwCsjrsGULKNOqSiwa6d923fAngZyJ5Gd+aBTZvpL3M5W1Ze5TADkYMPbO0liCdKsh7Xjpu0qy2wAeMjVwEmE9gaMGygXTABF0WQeVNQAVXLq20WQpVWqs4glo7V7c/LPoI/Wfqv136n9b+r/X/qAN0+0ZCegNRMj7A/gjhSEIIjySp5woMvaMtNQryY1iQewQEACAqHaNBsm9OIcnyrrrUvq+9DobtW6En0yoR1B

AOfV1izZ8A82K6vVxWwf8IAeEdkOMAoCNAds2AEAbB0G7OBY2MIUlAOCJR14EQpKabn3C2HDpdgE6bWlUMXSg4UQVeCbujBRDUpne2rJ3pAAo4E4WGWzH3r3hoH3cG2gZFIuHxbbsdQ+FzRqGwI4EgoiiUfXgWviE4A85GCfMTkLRT4i1qIPQGTgNjk715LIVIKUkX23RaCwW9wRwkOAFKfh9aCLEwSewb7GcLBaxDFhsT6I21rOXfG3BP0VQ8gK

oJxUgDwAAA62sTkMwGEKks+ROYAwKgAOqD8VWbnEfgag873E2RHxdAC8Vn6Bd2WU/OPF8RKBr8/ittMNDK1RIQAOR8hbkXyKiACihRhrEUWyH0DiiT6XsS/iq2K639Suuacru6Uq7sj0SJo/keEAtHsgrRYog6kryp7oBvYBoXADT3sSqB6APAPuuXGiDjA4A31JsLHlAE5NWSA4PNCOHRA9IcxJIHYf0wgFJURMo8P4FKTrwuFluYOCdP7XsgjJ

UcD5e4BcMeGnxNh09UeN8EuBdisGrww0mW295UCvhtbWgYcz+HHNm2nAoEcwI96sDQg4IwDJCP44QA+Bf3AQQmCEE0iFGgtJRqn1wB7Z0R7ROToQ3+pa0QWKtMHGuxtQY9IWciPuCSGgE6dyRBLP/KYKcZWUyeqYtYZT1zg7Ay43ifxHXniC+xX277VFhL0ujrExiJjJkR0JZE98SgTgiytMAcpuC7K3tKspDlCiXBgK+8FBpIndruDkJxiTCRcD

JDEZcJsIfCRhOpAwg7gHY74BHUYmETig4VKZERgsIkhxu0dLymADbF0TOx3Yy4EEJCEOYS64QsukVWs7RDfMsQ22hvVklSTq6x9PrHJKPr9CVJgws+sIFOqjDzq4wy6qpCdF/DMA3YSoAgGUDUj3xHicQcI0p7STE4DlXiecCwlkTmUQiSiTxLACe0xelPaqiTwfAkTsJ5E9yalU8l8TgqnYxiUJKrIZhRehdZ0B1h6DMBKgiASMAQGfGX0FQSUl

KT6ysD4BFeUw01ugF/H/jAJwEzXnHgp6J4Ssrkf2qMgVLkTSUzw7PFNASAkYTgLKKyLjQR5VjkQe3I4KjlHhfBxuzUogc1N7HMN+xrDU0jsw4bfD/evw4fP8JOaMC22Qjd7j+lnHsCDwC4n7tH1XEDtiECIpPmIN3HeIDxMPacM9n+j+U8Rg4AkRqX27MoMsNfCkXX0M5vincIBFvrxGsa2DcWcE4wUSyq5cEeC3KUsHrQ4F8iCA+AVAGoEFFRBD

WqAawMQD5H2TUAGMzGVjOxmMwLirnLArKLH7yjnwb4RUZLBn6st3ii/ULprFoIRdygYYqwJGOjGxiEA8Ys7EmKP5AkDRoMhQuDI4CQzuwyMo0HDOyB+jkZCoVAOjOxkyycZ9opNFfyK5qtRCLo1qY/3dHP9FUvMlAvzMFnEBhZsM+GeLJRlSzq6ss82RjLf4gcZhf4yoDwDiTwRKgqwqqZAB7iDg3gCIVEIOEBYIhcRiDQcH0mwmQ4ZwI4Kxr1JC

jbw7xImZlIjQzbxRlmpAyjshSNIIA90I4G7kOIY4/DmOy0icatMEaccQRVEMETtMXxAYlxK47mk8yOnCDAyJ03FjZOoiIQLpig3JOPFmTy01BAxGMONwelINc2S8LbnuzsYZTCeVIswV9LRYQT6R/ERkTi0772DWRJMqriyBCBCRqYoom0eoAQCoA/Jko/Ge5yJn0svOZM5UZTKC7qjoAmoyANqIYK6j0uJ/VBPuAIBuxN5whaKLvIVDxooSis1N

MrJK6LMH+yJD0U/LXmvzrR78neX5ODG5xvEZcZgHtmGCggG4zs1pvGB7jOA/akISvrvEzGJVCx8PX7FKU9mbg0Qm4C4FWLuBvBt4UOOAScC2RxzXeDDNvJNK958JZk2AJzlW0zkPIFpOc8cc93znwJ1pYfYuXONLkUU+O53ZcTCP7bVyXm5grcaIIbm7iGSUgnnNnwugMTbgDUovlyT7mcxBwQiAcD+ULIjy7ODjdSrXI/aWDp5UvbBf9IXm2cHB

6rdxqCU5HIFUAiAFgJwAIDiz/EHsblvqhZhSiCZNxOUcfIVHBcz5NqefmTJ9ghpaZvLdfkiOXEPzMuWsr0V4oXC+LYZiM9kKgACUZxv5DooQkZIMphxAFbogoiAvcXGjslPijLv4sCVWyGuyvdAHtkqDMAag9AfAABFS5XZPxLsiABgr2ChRRuxeHpBOkVr9NUekFR3tmy+CGNlkQjKhVXloW9oiGJHRZurJeGJy3hU0/bpwoznmlhx2cm0gwPfT

TjW2c+bjiXM4GR8K5si2PoIPhHWLE+24kHruJ66L5M+x0WTp0R3RclXIgIM8eoNzJ9yKs/1f4MpzMXwtR5lIxxksUnngThM9ig4fOVtqAzD2bitEh4qYANLBATS/JYUpaX7zMCh8u4pEuXnedL5MSgLnEuC4JKwudMlJffOP4ZKji+K0gIStyXNLil8s5VmUv/nOiqluym7LUrxX1LvFRKvxSSqKUMwFI4ARaNRDgBwATQOSLis+GigZAvEWoESI

8AYCEAEAgEMmnNK956hLVVqtYJLBEAgIeg+4KoFR3eE0dDVKWbrE3UdUAQzV9HXhdhQD43y7Vnq9IDzFzmCK3VQa0LI6pNBrTC5WoyNVAAdXpAY1tygES91tUeqo16QX2FIsugRrM1iax1ZXGeW+kM19qx1TzFH5Urdc+a8tSGrxkUqCgZa4NfoATinyEAxBG1e6rrVOquhI2DSXL2bVZr9AF4NSf2oaSjla1LavasMFTG/wbVzAbAGyCNB4R8Qw

4BIHCFSpUNMGOOJqEurGD4AUFGZIRP7SUSEZ/qhGGcIaqMBsADAWqqjAQAZjBhDgy2IdYWuzU6IecXAr+YatlAkAh+1LJtX+uIAmgEAM1A5OuJIB0kY0IeNgpoGCAUjINvq+IZACAj4AGkpAZQJKAAAUyIB4LwDyTUACN+G/YGcAACUuof2MoCLBah5gWG3ALhruBEab4zGpjYyFCgUbX13a6qCms5DFr5YchYqh8oyD+wywLBLespEyC4B4NF0c

pZLCIAzU5NbBXVX/Jv4lBtJM2VTUBy4TLjNA9iH1tkEqBsE4A0G7aKOuk0IaEV1ERYIQEYCjAxg965puaDSA2bxEmsVLD7FbWx47BzipeQowMBmTggrmp6H5rtr2YBgNmuzbevymtYlVwHJacEAbBvs2wQAA

%%

The concept is each worker has a dequeue. It takes a certain task to do fron the head put it into its local dequeue. Once its task finish. It can steal other worker queue from the end to avoid conflict.

A typical worker will try to steal from other worker first before attempting to poll from the central queue. It will batch poll so one time it polls for N tasks and store in the queue.

Another way is skip blocking queue directly and push straight to worker dequeue. However it will rquires more complex producer algorithm. We can consider round-robin or hash, etc..

Blocking and interupt

A thread may block or pause. It could be for several reasons:

  1. Acquire a lock,
  2. Waking up from thread.sleep
  3. Waiting for a result of another thread.

It could have the following states:

  • BLOCKED
  • WAITING
  • TIMED_WAITING

Some of them will throw InterruptException. We need to handle this exception. There are 2 main way to handle

  • Propagate it to caller: dont handle the interrupt exception and propagate it to caller. Most of the cases this is ok.
  • Handle the interupt signal: for critical work and we don't want to stop what we're doing. We need a properway to handle this

[!danger]
Do not catch and ignore InteruptException as it will cause huge performance issue

We can also interupt other threads by calling interupt(). The scenario when to interupt is really up to you.

Synchronizers

This refer to synchronises mechanism to synchronise all the threads together. Some of which are

Latches

Block a thread until terminate state reaches (when latch reaches to 0)

Example: CountDownLatch — terminate state is when the count reaches zero (0)

Latches is like a gate. We can use latches to delay a thread. A latches eventually will reach a terminal state. Once it reaches the terminal state, it stays there forever.

Use case:

  • Ensure something does not proceed until some condition triggered

Example:

  1. Ensure computation does not proceed until resource R has been initialise
    • We can use a simple two-state latch CountDownLatch(1)
  2. Ensure that N resources is intialise or have started
    • CountDownLatch(n)
  3. Wait until all parties involved in the activity

For example in this case here we use CountDownLatch to monitor 2 gates.

  • StartGate: binary gate that to starts all the threads
  • EndGate: Wait until n threads to complete. This is necessary to calculate the time to finish n threads
public class TestHarness {  
    static Logger logger = Logger.getAnonymousLogger();  
  
    public long timeTask(int nThreads, final Runnable task) throws InterruptedException {  
        final CountDownLatch startGate = new CountDownLatch(1);  
        final CountDownLatch endGate = new CountDownLatch(nThreads);  
  
        for (int i = 0; i < nThreads; i++) {  
            Thread thread = new Thread() {  
                @Override  
                public void run() {  
                    try {  
                        startGate.await();  
                        try {  
                            task.run();  
                        } finally {  
                            endGate.countDown();  
                        }  
                    } catch (InterruptedException e) {  
                        throw new RuntimeException(e);  
                    }  
                }  
            };  
  
            thread.start();  
        }  
  
        var startTime = Instant.now();  
        startGate.countDown();  
        endGate.await();  
  
        return Duration.between(startTime, Instant.now()).toNanos();  
    };  
  
    public static void main(String[] args) throws InterruptedException {  
        var testHarness = new TestHarness();  
        var result = testHarness.timeTask(5, () -> {  
            try {  
                Thread.sleep(1);  
            } catch (InterruptedException e) {  
                throw new RuntimeException(e);  
            }  
        });  
  
        logger.info("Result: " + result);  
    }  
}

When the exception is complicated, we can create a utility to handle each exception. For example:

private RuntimeException exceptionWrapper(Throwable cause) {  
    if (cause instanceof RuntimeException) {  
        System.out.println("Caught runtime");  
        return (RuntimeException) cause;  
    }  
    return new UndeclaredThrowableException(cause);
}
public String getInfo() throws ExecutionException, InterruptedException {  
    try {  
        return loadInfo.get();  
    } catch (Exception e) {  
        Throwable cause = e.getCause();  
        throw exceptionWrapper(cause);  
    }  
}

Semaphore

Semaphore allows number of locks to acquire at the same time. See Mutex vs Semaphore > Semaphore.

A Binary Semaphore (new Semaphore(1)) is behave the same as a mutex.

Semarphore is useful for implementing resource pools such as database connections, where you block if the pools is empty and unblock when it's not empty. To do this you initialise new Semaphore(poolSize)

Use case

  • Create any datastructure that need bounded (wait until resource available).
    • Suitable to implement rate limiter
public class SemaphoreTest {  
    public static void main(String[] args) throws InterruptedException {  
        Semaphore binarySemaphore = new Semaphore(1);  
  
        Runnable task = () -> {  
            try {  
                binarySemaphore.acquire();  
                System.out.println("Thread " + Thread.currentThread().getName() + " acquired lock.");  
                binarySemaphore.release();  
                System.out.println("Thread " + Thread.currentThread().getName() + " released lock.");  
            } catch (InterruptedException e) {  
                throw new RuntimeException(e);  
            }  
        };  
  
        ExecutorService executorService = Executors.newCachedThreadPool();  
  
        for (var i = 0; i < 2; i++) {  
            executorService.submit(task);  
        }  
  
        executorService.shutdown();  
        executorService.awaitTermination(1, TimeUnit.MINUTES);  
    }  
}

In this example, binarySemaphore only allowed to be acquire once. Which is a similar to a mutex. As a result, the second thread needs. This is basically the same as using a mutex

ReentrantLock lock = new ReentrantLock();
lock.lock()
lock.unlock()

Similarly, this is an example of using resource pool

public class ResourcePool {
    private Logger logger = Logger.getAnonymousLogger();
    private final Semaphore semaphore;

    public ResourcePool(int maxResource) {
        this.semaphore = new Semaphore(maxResource);
    }

    public boolean allocate() {
        try {
            semaphore.acquire();
            logger.info("Allocated resource to " + Thread.currentThread().getName());
            Thread.sleep(1000);
            semaphore.release();
            return true;
        } catch (InterruptedException e) {
            logger.warning(e.getMessage());
        }
        return false;
    }

    public static void main(String[] args) throws InterruptedException {
        ResourcePool resourcePool = new ResourcePool(2);

        ExecutorService executorService = Executors.newCachedThreadPool();
        for (var i = 0; i < 6; i++ )
            executorService.submit(resourcePool::allocate);

        executorService.shutdown();
        executorService.awaitTermination(1, TimeUnit.MINUTES);
    }
}

We can also create any BoundedDatastructure using Semaphore. For example

public class BoundedHashSet<K, V> {  
    private final Logger logger = Logger.getAnonymousLogger();  
    private final Semaphore semaphore;  
    private final Map<K, V> map;  
  
    public BoundedHashSet(int maxItems) {  
        this.semaphore = new Semaphore(maxItems);  
        this.map = Collections.synchronizedMap(new HashMap());  
    }  
  
    public V put(K key, V value) throws InterruptedException {  
        semaphore.acquire();  
        logger.info("Putting pair (%s, %s)".formatted(key, value));  
        this.map.put(key, value);  
        return value;  
    }  
  
    public V remove(K key) {  
        V oldValue = this.map.remove(key);  
  
        if (oldValue != null) {  
            semaphore.release();  
        }  
        return oldValue;  
    }  
  
    public static void main(String[] args) throws InterruptedException {  
        var boundedHashSet = new BoundedHashSet<Integer, String>(2);  
        boundedHashSet.put(1, "hello");  
        boundedHashSet.put(2, "hello");  
        boundedHashSet.remove(1);  
        boundedHashSet.put(3, "hello");  
    }  
}

Barrier

If latches counts down, barrier counts up until it reaches a target. For detail see CyclicBarrier

Basically we setup a new CyclicBarrier(n) and everytime we call cyclicBarrier.await(), it will increment the count by 1, and then block the thread until the barrier reaches n

var cyclicBarrier = new CyclicBarrier(5, () -> {
    // A Runnable that would be execute once by the last worker that called await() after the barrier has reached 5
})

// In some thread
cyclicBarrier.await()

Use case:

  • Distributed task to workers
    • Example: Multi-threaded game of life where each worker manage a small area https://github.com/auspham/learn/blob/master/java/java-concurrency-practice/app/src/main/java/org/example/chapter5/GameOfLifeMultiThread.java

Exchanger

Exchanger can be used to communicate between 2 threads by passing variable from one to another. The passed variable is a refernce to the pointer. So 2 thread will have access to that same pointer.

For exchanger, the thread will wait at the exchange point (the point where we trigger exchanger.exchange(info) it will only continue the trigger once it receive an exchange from another thread.

// Thread 1
buffer.add("Some data");
var thread2_result = exchange.exchange(buffer); // Wait until another thread trigger this line
sout("Done");


// Thread 2
buffer.add("Some data 2");
var thread1_result = exchange.exchange(buffer); // Wait until another thread trigger this line
sout("Done");
public class ExchangerDemo {  
    static Logger log = Logger.getAnonymousLogger();  
  
    static class BufferFiller extends Thread {  
        private final int capacity;  
        private final AtomicInteger currentCapacity;  
        private final CopyOnWriteArrayList<String> buffer;  
        private final Exchanger<CopyOnWriteArrayList<String>> exchanger;  
        private final Consumer<CopyOnWriteArrayList<String>> populateBufferCb;  
  
        BufferFiller(int capacity, Exchanger<CopyOnWriteArrayList<String>> exchanger, Consumer<CopyOnWriteArrayList<String>> populateBufferCb) {  
            this.currentCapacity = new AtomicInteger(0);  
            this.capacity = capacity;  
            this.exchanger = exchanger;  
            this.buffer = new CopyOnWriteArrayList<>();  
            this.populateBufferCb = populateBufferCb;  
        }  
  
        @Override  
        public void run() {  
            while(this.currentCapacity.addAndGet(1) < this.capacity) {  
                try {  
                    populateBufferCb.accept(this.buffer);  
                    Thread.sleep(1000);  
                } catch (InterruptedException e) {  
                    throw new RuntimeException(e);  
                }  
            }  
  
            try {  
                CopyOnWriteArrayList<String> result = this.exchanger.exchange(this.buffer);  
                log.info("%s Got result: %s".formatted(Thread.currentThread().getName(), result.stream().reduce((a, b) -> a + b)));  
                result.add("ko-%s".formatted(Thread.currentThread().getName())); // this will modify the value of another thread as well. Passed here is a reference  
                log.info("%s current buffer: %s".formatted(Thread.currentThread().getName(), this.buffer.stream().reduce((a, b) -> a + b)));  
            } catch (InterruptedException e) {  
                throw new RuntimeException(e);  
            }  
  
        }  
    }  
  
    public static void main(String[] args) throws InterruptedException {  
        Exchanger<CopyOnWriteArrayList<String>> exchanger = new Exchanger<>();  
  
        BufferFiller bufferFiller1 = new BufferFiller(5, exchanger, (buffer) -> {  
            log.info("Filled from thread %s a".formatted(Thread.currentThread().getName()));  
            buffer.add("a");  
        });  
  
        BufferFiller bufferFiller2 = new BufferFiller(5, exchanger, (buffer) -> {  
            log.info("Filled from thread %s b".formatted(Thread.currentThread().getName()));  
            buffer.add("b");  
        });  
  
        ExecutorService executorService = Executors.newCachedThreadPool();  
        executorService.submit(bufferFiller1);  
        Thread.sleep(5000);  
        executorService.submit(bufferFiller2);  
        executorService.shutdown();  
        executorService.awaitTermination(1, TimeUnit.MINUTES);  
    }  
}
Sep 21, 2025 5:29:37 PM org.example.chapter5.ExchangerDemo lambda$main$0
INFO: Filled from thread pool-1-thread-1 a
Sep 21, 2025 5:29:38 PM org.example.chapter5.ExchangerDemo lambda$main$0
INFO: Filled from thread pool-1-thread-1 a
Sep 21, 2025 5:29:39 PM org.example.chapter5.ExchangerDemo lambda$main$0
INFO: Filled from thread pool-1-thread-1 a
Sep 21, 2025 5:29:40 PM org.example.chapter5.ExchangerDemo lambda$main$0
INFO: Filled from thread pool-1-thread-1 a
Sep 21, 2025 5:29:42 PM org.example.chapter5.ExchangerDemo lambda$main$1
INFO: Filled from thread pool-1-thread-2 b
Sep 21, 2025 5:29:43 PM org.example.chapter5.ExchangerDemo lambda$main$1
INFO: Filled from thread pool-1-thread-2 b
Sep 21, 2025 5:29:44 PM org.example.chapter5.ExchangerDemo lambda$main$1
INFO: Filled from thread pool-1-thread-2 b
Sep 21, 2025 5:29:45 PM org.example.chapter5.ExchangerDemo lambda$main$1
INFO: Filled from thread pool-1-thread-2 b
Sep 21, 2025 5:29:46 PM org.example.chapter5.ExchangerDemo$BufferFiller run
INFO: pool-1-thread-1 Got result: Optional[bbbb]
Sep 21, 2025 5:29:46 PM org.example.chapter5.ExchangerDemo$BufferFiller run
INFO: pool-1-thread-2 Got result: Optional[aaaa]
Sep 21, 2025 5:29:46 PM org.example.chapter5.ExchangerDemo$BufferFiller run
INFO: pool-1-thread-1 current buffer: Optional[aaaa]
Sep 21, 2025 5:29:46 PM org.example.chapter5.ExchangerDemo$BufferFiller run
INFO: pool-1-thread-2 current buffer: Optional[bbbbko-pool-1-thread-1]

Building a cache function

To build a cache function, we can start simple and then gradually address concurrency problem.

For example, build one that would able to take a Function<T, R> and return the value.

public static void main(String[] args) {  
    Function<Integer, BigInteger> expensiveFunction = new Function<Integer, BigInteger>() {  
        @Override  
        public BigInteger apply(Integer integer) {  
            return BigInteger.valueOf(new Random().nextInt(10000));  
        }  
    };  
  
    Function<Integer, BigInteger> cachedFunction = new CachedFunction<>(expensiveFunction);  
    System.out.println(cachedFunction.apply(1));  
    System.out.println(cachedFunction.apply(1));
}

1. Get a generic idea.

A generic idea is we can use ConcurrentHashMap to store the key as the arg and the value is the return type. Next time we can just query this if the cache exists

@NotThreadSafe
public static class CachedFunction<T, R> implements Function<T, R> {  
    private final Function<T, R> expensiveFunction;  
    private final ConcurrentHashMap<T, R> concurrentHashMap;
      
    public CachedFunction(Function<T, R> expensiveFunction) {  
        this.expensiveFunction = expensiveFunction;  
        this.concurrentHashMap = new ConcurrentHashMap<>();  
    }
    
    @Override  
    public R apply(T t) {
        R result = this.concurrentHashMap.get(t);
        if (result == null) {
            result = this.expensiveFunction.apply(t);
            this.concurrentHashMap.put(t, result);
        }
        return result;
    }
}

Now we can manually imagine if there is 2 threads go to the apply the same time:

Thread 1Thread 2
R result = this.concurrentHashMap.get(t);R result = this.concurrentHashMap.get(t);
if (result == null) {if (result == null) {
result = this.expensiveFunction.apply(t);result = this.expensiveFunction.apply(t);

As a result, we can see that these 2 guys execute the expensiveFunction.apply() the same time. Which is bad. The idea is we should have only 1 thread calling expensiveFunction.apply()

2. Use future

In here we can use FutureTask which have 2 option:

  • run() to run the task
  • get() to get the result of a task

The idea is when another comes, it should only calls get() if there is a thread before started it

@NotThreadSafe
public static class CachedFunction<T, R> implements Function<T, R> {  
    private final Function<T, R> expensiveFunction;  
    private final ConcurrentHashMap<T, FutureTask<R>> concurrentHashMap;
      
    public CachedFunction(Function<T, R> expensiveFunction) {  
        this.expensiveFunction = expensiveFunction;  
        this.concurrentHashMap = new ConcurrentHashMap<>();  
    }
    
    @Override  
    public R apply(T t) {
        FutureTask<R> futureTask = this.concurrentHashMap.get(t);
        
        if (futureTask == null) {
            FutureTask<R> ft = new FutureTask(() -> this.expensiveFunction.apply(t))
            futureTask = this.concurrentHashMap.put(t, ft);
            if (futureTask == null) {
                futureTask = ft;
                futureTask.run();
            }
        }
        
        try {
            return futureTask.get();
        } catch (ExecutionException | InterruptedException e) {  
            throw new RuntimeException(e);  
        }
    }
}

Now this will solve our issue where next thread can reuse. However it still fail into a problem where 2 thread put the same future together:

Thread 1Thread 2
futureTask = this.concurrentHashMap.put(t, ft);futureTask = this.concurrentHashMap.put(t, ft);

3. Use atomic

As a result, we need to use putIfAbsent() atomic method from ConcurrentHashmap

Thread 1Thread 2
futureTask = this.concurrentHashMap.put(t, ft);atomic method blocking — cant run
futureTask = this.concurrentHashMap.put(t, ft);

As a result, we have the final code:

@ThreadSafe
public class CachedFunction<T, R> implements Function<T, R> {  
    private final Function<T, R> expensiveFunction;  
    private final ConcurrentHashMap<T, FutureTask<R>> concurrentHashMap;  
  
    public CachedFunction(Function<T, R> expensiveFunction) {  
        this.expensiveFunction = expensiveFunction;  
        this.concurrentHashMap = new ConcurrentHashMap<>();  
    }  
  
    @Override  
    public R apply(T t) {  
        FutureTask<R> futureTask = concurrentHashMap.get(t);  
  
        if (futureTask == null) {  
            FutureTask<R> ft = new FutureTask<>(() -> expensiveFunction.apply(t));  
            futureTask = concurrentHashMap.putIfAbsent(t, ft);  
  
            if (futureTask == null) {  
                futureTask = ft;  
                futureTask.run();  
            }  
        }  
  
        try {  
            return futureTask.get();  
        } catch (ExecutionException | InterruptedException e) {  
            throw new RuntimeException(e);  
        }  
    }  
}