Chapter 05 - Building Block
Synchronised collections
The classed created from Collections.synchronised are mostly threadsafe. For example:
Collections.synchronizedList()
Will return a list that will be thread safe even in combound operation no matter how many thread using the same time. This is because each of the class method is used with Synchronisation and therefore locking on the whole object.
However, this still potentially cause some problem. Consider the following example
public Integer getLast(List<Integer> syncList) {
int lastIndex = syncList.size() - 1;
return syncList.get(lastIndex);
}
public Integer removeLast(List<Integer> syncList) {
int lastIndex = syncList.size() - 1;
return syncList.remove(lastIndex);
}
This might cause a ArrayIndexOutOfRange if the removeLast get called before the getLast
Thread1---LastIndex=10------------------->getLast() <Throw index out of range>
Thread2---Lastindex=10--->removeLast()
To solve this, we can synchronise the outer action as well
public Integer getLast(List<Integer> syncList) {
synchronized(syncList) {
int lastIndex = syncList.size() - 1;
return syncList.get(lastIndex);
}
}
public Integer removeLast(List<Integer> syncList) {
synchronized(syncList) {
int lastIndex = syncList.size() - 1;
return syncList.remove(lastIndex);
}
}
Note that if some one iterating through this list like this:
@NotThreadSafe
for (int i = 0; i < syncList.size(); i++) {
syncList.get(i);
}
This is not our problem, the 2 above methods are thread safe. User can sacrifice some performance and lock on syncList while going through.
@NotThreadSafe
synchronized(syncList) {
for (int i = 0; i < syncList.size(); i++) {
syncList.get(i);
}
}
Alternative to locking the list while going through is to cline this list instead and looping through the copy. Of course this will come with a performance cost
Iterator and concurrent modification
The iterator from Collections.synchronizedList are not thread safe and is a fail-fast
List<T> list = Collections.synchronized(new ArrayList<Integer>());
for (Integer i : list) {
doSomething(i); // May throw ConcurrentModificationException
}
Sometimes, the reasons for the code fail could be hidden, for example, consider the code below:
class HiddenIterator {
@GuardedBy("this") private final Set<Integer> integerSet = new HashSet<Integer>();
public synchronized void add(Integer integer) { integerSet.add(integer); }
public synchronized void remove(Integer integer) { integerSet.remove(integer); }
public void addTen() {
var r = new Random();
for (int i = 0; i < 10; i++) {
add(r.nextInt())
}
System.out.println("DEBUG: Added 10 elements " + integerSet); // May throw ConcurrentModificationException
}
}
The reason is, when we do string concatenation, the compiler will turn into calling
StringBuilder.append(object)- Loop through each object and call
toString()which may throwConcurrentModificationExceptionhere when another thread calladdTen()the same time
- Loop through each object and call
Of course the fix for it would be to client lock the integerSet when debugging as well, however since this is debugging, sometimes we don't pay attention and put the code there.
Similarly, if one calls method such as containsAll, removeAll … which will in turns use iterations, which will cause ConcurrentModificationException
Concurrent Hashmap
ConcurrentHashmap is an improvement on the synchronised collections (HashTable) ConcurrentHashMap vs HashTable. Instead of using a global lock, it used a concept called Lock Stripping. Concurrent hashmap has 16 lock strips and Volatile for read. Which allows
- 16 concurrent writer.
- Unlimited read since the map is Volatile and no locking needed.
Iterator from ConcurrentHashmap is weekly consistent instead of fail-fast. Likewise, method like size or isEmptysometimes are wrong and only give approximation. However in a multi-thread environment, these methods are not important.
CopyOnWriteArrayList
CopyOnWriteArrayList is an impovement of synchronised list. The way it works is everytime a writer write into the array, it makes another copy. And then the iterator will synchronise briefly to ensure that the array content is up to date.
CopyOnWriteArrayList use a mutex (reentrant) lock (Mutex vs Semaphore). Allow 1 writer to write at a time. So if 2 writers come in at the same time it will wait until the first one finish.
Reader is not blocked but eventually consistent.
Blocking Queue and consumer pattern
BlockingQueue is great for building a consumer-producer pattern. There are 2 types
- Unbounded Queue: We can keep adding to the queue, no limit
- Bounded Queue: The queue has some limit, when the queue is full. We need to wait to put item into a queue
flowchart LR
Producer --> |Publish| Q[blocking queue]
Consumer --> |Take| Q
Some sample class
- LinkedBlockingQueue (For unbounded queue)
- ArrayBlockingQueue (For bounded queue)
- PriorityBlockingQueue (For priority order)
Special Queue:
- SynchronousQueue (No capacity, 1
putneed to wait until there is one totake)
Use case for SynchronousQueue is urgent task that need to hand-off immediately rather than putting into a queue and wait for producer to pick up.
Example of a producer/consumer is like file searching. Where we have one indexing the file and another one crawling the directory.
BlockingQueue makes it safe to publish object so that it's thread confined (only belong to 1 thread). This is because only one producer can take the object and it's uniquely owned by that producer.
Work stealing pattern with dequeu
The BlockingDeque is an double-end queue. Which is perfect for work-stealing.
==⚠ Switch to EXCALIDRAW VIEW in the MORE OPTIONS menu of this document. ⚠== You can decompress Drawing data with the command palette: 'Decompress current Excalidraw file'. For more info check in plugin settings under 'Saving'
Excalidraw Data
Text Elements
Blocking Queue ^9nuDSwBo
Producer ^iSEsujXg
consumer1 ^eELIfmZu
consumer2 ^EcGtadFH
Task1 ^msESEVn2
Task2 ^ZYpmz9gZ
Task3 ^7C3DU1jC
Consumer2
takes task
from head ^XkZw0A3c
consumer1 finished all its task and start ^8UKq081R
stealing from the end ^UKS2RPVS
consumer personal task queue ^KUsJTwiY
consumer personal task queue ^JSs3vlBz
%%
Drawing
N4KAkARALgngDgUwgLgAQQQDwMYEMA2AlgCYBOuA7hADTgQBuCpAzoQPYB2KqATLZMzYBXUtiRoIACyhQ4zZAHoFAc0JRJQgEYA6bGwC2CgF7N6hbEcK4OCtptbErHALRY8RMpWdx8Q1TdIEfARcZgRmBShcZQUebQBGOJ4aOiCEfQQOKGZuAG1wMFAwYogSbgh9AGUjAGYEADkABkkABWUAEQBxSoBJABYAQUIAFTZ8AC0U4shYRHLA7CiOZWCp
ksxuZwA2AA5G7QBOLYPGxoBWHhqDgHYaxuv+EphN6+vtS6O+nm/4xq34+JbR6QCgkdTcHg7A7aLanRo1LZbHi/A41a5nYFSBCEZTSbg7LbaK4HEk1KFbPo7PpkzHWFbiVCNTHMKCkNgAawQAGExmxSOUAMTxBDC4VrSCaXDYdnKNlCDjEHn4PnlVnWZhwXCBLLiiAAM0I+HwlVgqwkgg8upZbM5AHUwZIIczWRyECaYGb0Baypi5biOOEcmgmQVI
GxNdg1M80L8Q9MILLhHAesQg6hcgBdTF68gZFPcDhCI2YwgKrDlXCNXVyhUB5hporx2YMmqhgC+zIQCGI3D68J4Z0a8Rq8UxjBY7C4aB2o9DDCYrE49U4YnxyL710uJeY7TSUG73D1BDCmM0wgVAFFghksmn8tNCqGSs3VVgoOKSmUJAdC+1KhQACE2AgJ8OyfRt4y/dB6E0AAJAA1AAZdpiAAfVtWDiAA/QAEUAA04FgzR2WGJBgWfeAGQgLU2S
oUDQyzOchDgYhcH3HsY3RLYyV+PpEipTEiA4dkCyLfBBLYaUDzQI98DCAowOKCDSg49AfyEP9AOAzEXwkfdMHfTENjQbYh20akeC2M4dj2PidhpOdo1QZw+kJM4rgpC5NxnKEHjnUFiHBGM7m0LyampdFMUkbFcXfNAB1pZZPTjEprVdJUVQkIURRy3VJWlRN5UVXl+T08gOA1LVMkMucDSNd1PQgb0e2dG0EHtQLHXi1rXQaqjmurYR/UDbgUrD
CMo24WNMUK5NUzyRj4xzXA81UwtiznUtiHLCRcHiQairrNN1vEucwmk1AAR4ElXMiudx0XKdUFeMcF0nZcOFXTiyX+SyDlnSCdz3C7ZJPOczyKq90mqu9FpKZjWPYqauJ4xo+MhPpBNLES0BOiSpNU0GEB0t9ygA5VpVLZRUBwoQEDp3U9U4KBKkIIwGR4Mb9WZgAxFbDScnYSYMgYiGUJ6IDELImF1ccoHMAhRZxCX9BIYhVkxPQslwUsmHzCQq
lqBpmjaLpekGEYxkmTE+RxUsCFIgyyYp9kqZpumGdpIQoDYAAlcI2YZVk6axgNYJivEY3eDE53IChHbi9Byck13lnd+mkAUx5lKgiBUM6do9UQ4ZmHGABVb2AHkOAvehrgvABNVDw11XT0AWJZ6V1YznK2a4dlC3YDj2HYB0RUfMSc5xvjebijh81F/p4TdMQCoLeHshISWHwEoT6dyAZKaKcUj1AZxhOFkQRS40ZXuc6WSnrOQy0r0Gy0UyPBqU
ZRrYrlVf6A5VKrahqktQ0xpTT9TGD6M6Lo7QOidLAtqfVygDV9ENSQR1Ro2wmrAKapwZpyjmrDbMuYED61QHjTaZYe7UWSOgw6I00AQRmJRbgrZpiKVSl2VS8QDjXXiGcRIQjXoTk4FNSyojHofS+rwU4dwAThT8oDXcwQkYyWPMTcG55iBQxvNkPI4EnysLmHpUm5FIC50IJUC8zAhAACs8LKBAg+LhSljEqXKMQAA0lyToABHTQ8QBiVEwPUVC
xAdg4Rwi0X2LQoA4Q/CYqiNE2B0VcQxTECM2IXXiCjGcaN+KYznEJHGlCxL405ITTRWcCg51UqUGxdjHHOJ0mwsxTsjKbDOLcbQfxrhImXqiPofEgSOU2Hw6ENQPLXESIkAZ8Ihb+QQWgTcW9NxWSRDHeMx9YoQm2SUB+DIuZpWfiVQUH8xSnm/oVBUL9VRAM1CAxm4CUHmmgS1JBroOrrz4F8zkbyvQfIOsNes2C5zhilJNGMBC5yzRTCQ2qZCK
FUMgjQisNQDq1iYeUja8Zzq8OpPCU4OwRH3TeuItADl4wPXeiuBkvFl5nD6LcA425VEIHUagImp4dF6JhgtLJLEcm8PybxIpocymopKC7EGmjhaJwgC0NkxAhBiH5NmZmrN2YQi5kzLIfNVb4EFgqpW4tyhS33Bq8lpB5buDNSrNWGs5xayiLrUgFC84FyLiXcuVca510bs3OAupbb+AdqTCQyq2CqvVbqXA3s/YBx1WgYOWj4xCQQOHE+id4jR1
9JQBO5Ro2xplrU9xkEGk+L8YE4JoTwmROibE+JiS2mmPQKkqgXSTLuWhNcPit1LibmpIfSAU94h8RhHMvs9kjh92KfGNeXVGRRQjonbi7w4SJE5tceEwjEpd2DE/bk5ysqXM/vGfKP8dH3LKuqJ51UXn1Ugag4Fx6fnLr+fiuBboX3vMtAw0FaYuaQsjHgmFXN4XzTQJmUhK1yFrQqdQ7atDcB9CxcQLBuMkP4p4VNK4ZwzgHGskRqRk58HKJKLS
pc9LdUDOIzZF6m0gZqLlXJdNJQIaXmvAKmDcNIDZK5Xks43ECl8X4V+mV2NRJ4plSnNjYN4xwDYKWAxMGnz3gfGNYojQnz8bAJp6Y2mwAbs5qcbd9w90Dj05kkpoQoA8n0KrGQ3YWgqZ1Nh2TAgoi2oAltKm3AWEYB41kT1+dC7F1LhXKA1da71ybi3ci3NsBCGA30vhPTPjXT+JSbirkkvKFwCGtAfRzJMr4ccPhfZYQcOKG4jACo/MKgC55063
6tRQAGKQWi0VcCIa8w14gXWeshAaayJw9Jm7e0zbqIIZ4KAKYQOWx8lbyhwGZbgHYOABjhU0AAKQvHtyQ9RlDKHGLaehc424QA7kc7uEy+EXyuDwL4lJXjXTGfGcdiJtD92yy9pErwKSrxWagNE+w7g9OXiyoZ6JKOQF2afIjhwDiuTRvw/ufQbifcOUlY5x7b1v3PXlG5v9CeAPvVVHU2ZXl/qBQB/57VQeSe88gunTU32xwwVhldELcFOWmnCo
hCLBVIvgyinDn50W7TOBhnnQW261bAPVglvZrp7AWTUMjlLUAiZx5AajHAZEMr13kseWvmMcq5Ty7RkMQtqfTPxiAgnclisKRjSVMm2tyYJoeGpdXs6bQaQgC8iEeh6n0OMIQrd2noH0qA9YuqiR8J2LM84nNF764gFPTmcQSSp5uASXdIm/gg86twYeKPh5Ij7o0KEVIF1HzXfsg9j9Gfk/frla5BUyenrj48qnCfIB1QgR6KBDPv1tQ/YgyfvV
2doK534TBOKQP8/wZB4X0HHdwdWl7ks0uO1bDlzi6VAg8MxmHqj3YImLc0opU9Cr2uje0ZK7utElwDmWJY5yxbvK7fQ1vFF3jBd1FRvzEwlRKWk1a0qV/yu0jXQC1jsQyFIH2k1SyG1Q5j1V5n5mNXxFNTFglmCD1CH3nFtQVnwAdVVESxdWZh1gDA9X629zDFIDtg4AjSdgkEQKEGQNQPvkTX9lYBTVQDTVDizWbyjgSljkLXgMlk4CQKYH2mW3
qXKAvGwE6CiGIB5lghj3bWgHMTnB7kJEhDRjJHsgBEIzRDZXGWChqHeCpBshEy+CHFuEbxBFB0r23gJEsl3XrxGVXRzRb3vjx3BVnzOX/guRyiuS/h7xvT7wpwqgfWp1qlpzH1fQn1Sh/Wn26kZ0BQ53SMgD9GXzBSPT5yhXA0ulhXjCg0RSWmRUYP3xQwrGuGP2KNxSYKanPzPlhGpEB0/1IMenYX+if2N24B6RnCx2snh1KG/2t3lVt24wAId1
gyYmFSEzd3RgEkgOEj3xKXk2qXYwVQtTkO4KYEuyWi1UDl1TQKgENQFjwLgJFgIPKCIJILlnIMoL0moPjFdToL1nqIhRYPDXwCLU4OOOQLOMOX4OTSDlIBDi2LEICIkL6LjhBIQLBNOJcWV0D1WwkGGE0AoCEFR1GEK0aC5GcEIDwkQmcFgi5AGAAFkdCqJbtgju1nJEhCRiNrhURvg9gTg8krCvtkYzhDhjh+4uTUQ7gag+il12Fd0iRzhNx+1F
Sbgel/C9k0BkcbpuiMcqRsdW98d284jO8ojL1SdYjwi70EjB8n1R9GoF9Qimdy9siHTcj7SShCiedV8yiBdKiShqigCShlpd9oDkMdoO0dgWiGwnxoBY8lcVdOjXgrhd1hxR1+jyM0BEQ+jDcRjgo/pEQhjLdgZ9jFNOM+V7c7wjEHxLEGkWgegjB4h6BEIeZhgWhJB7FEJNAzhYJcBUI6TK5rhK4kkYzdDO1MS6tbNgDVjXcwDxUPctipVJdIBZ
ViylsA86kg9ygYIEJkI0IMIsJcICIiISIL0KIRzus0l7sTIARoRXJM8WVqQWVBFJ4JlwpzJplFFZkpTd1iMy9fkel3h6MlEoQb8pjEdc14QEg+Ffg9hst7gCR9SQiMi2oO9idu9r0ipyc1QrTnkadn1Uj/0YEHSsjeBj1XTOd4wPSV8cFvT19CEkwRc+Md8EMdi0VGjdoDhIzAtoy24eB2xOwLobIvhiNIRb8qN798FXC0yaNPoGRKsTh5L2Uiy/
cDj5jdFyyAyBMpzQDRNxV+FRKlyoC2iYCVzMRlNVMKyHxDMdMLFdMHx9MrKwBnB+JIL6M8lhxzh/h9dihroEgKRh5+468kQ69vg9NyIHLp5/zl5jggLiNuJ4dihfhbDrzoLGhYL+4tgbNpgnd8B7NHNnN2I3NVMWKMiOsmtHBlhirIBMgsJ/MKrmFoy0h9FPVDY6gmhWgOhuh+ghhRgJgPxktUtRpft94RlPKbp0QvgWdIBCtitLpStBk4qpSYdY
REQ+LYEOths0let/j4xqqNqKAtqHkJtggpsoAZtMQ5s0lFslCNyo06yGymyWy2yOyuyey+yBzGSKxzyu0DDRjSsuToLAQBl+1UdnzOJ9gfJEQh4+wwDfzl0JrfLrpbhSUVTPy1TT4KtDgAc0RBlIQoQEKSiHSULIiTyJQzSMK4isLgFH1cLbTx9CKkLvlmdSL59yL3TucqLSiwMfSN96Kt9ljajxdtqpc2KO0BhOL6qHxhyOZVrcMLoscbhXhCMs
zxL4pfhhiX8z5+ECR89UzCAZjYDL0yzFiaj4YtKhSdLCl+FJLSlKqIBlzlKSzIAzLAD1NLLjFjNbKsqwrjFp5R5tAfIscUQZ1UqLFnBekagRK7g0YKQ+xkRQqNMfb4bAREayQiM4dhxQ6Mara4qcabIDhMrihsrcqDB8rXN3NE5T8mofMoAyqWtUAgtGrqpPU6ShA8J6hNA6S2JmAuR4JbQEAqTUJvE8IuQjB6g+qmYUs0tuJ4R3tjhqRgLU8Csi
spo+gZaShqra66qjK1rbU9qDqQydqFQ97RtDqjkTqzq5wLqFsVzrqcT0AXI4B6B4JrhcAeBYJfZNB9B/FJA8J4I8IDhKgAJOgPqJBmSu5WSnLvhQohwqQAbxNVTrDLpz5ARiVd1LJjg+59KIAZTpxCR95wpEg/gXsAQ+E0bE4scUdZkorplUdplgcgjD1edCajTULoj0K7kKaB8cLki8K7TWbWdGanSSKciWb8ibt2bWivSubaKhdeaTbh86jbat
owzqIAJxb67uLYy16z8LoiNIRvJ7In9kYWcpLn8ZKppERTgxjqRFLWMTLVL+UXb0xKzJbrt48hzc59BmALwbF4IOBkhQJyIWFqz5gEBK4cIehRZ7FMB6ADghBhgeZ6BbR4J/EAJ6Aj8LEpbPraIxzlcJzTbEZpyLaNjrbDLK77aNF2Nb7PwGkvGfGLw/GITklXxOkfqryIKhx3I56Bwr5h5QbLpUQYRrJU868ejdhJKcHUAWVzJbJdg7odlxDeA+
i7sCaGawjMoidiaScYjyaLT+9KduGwFeG6bPkiKmaRH8L6d6aCiJHgNqLpGIM6LmIGLt8xdgzt7WKVHcAuR1HK7Vd4o68HD+IsHDcppEh1bzGNTR5bJzDbGf97HDb/99F5HnczbOIZz3dNiM0ynFy7a9iHaOMZgZDhhQh2ReDzj0DLiAXrjbjcDpx8DlYLVqoZZRE7VFYniDYnUSbZDtZ3VPUH6n6X636P6v6f6/6AGgHQ1AT7ZgTiXSXyXISfYB
CqXhDYTCW7bdZs11TLp80pD445XmAyXMSuFlCJA6TWBMBKA4A9s4AYA9seYjBMA8I+h6gYADhYIjAQH24EBFg7sIGRx3h7zbJR4Y7ARJKBc/bhxl4BxwpTCBlYb8Q8H3J0YiG+JryyHexoQSQqGBkaHqQERJKVmmG1mT09mIBjTuWr1bk/4Nn4iqakijnaa0jrmq6p9zmXTRGW3KLJH7noUKiebnm+ancgzmKD7havn2h1GFctHOF+LVIJTrpk2p
jQX4proIXZFBFUQ+5R4CyVElLKnHaIAuM1LjbDEHxgmsmOkh8anyhxgG44B9AjADgztcm2x8nNLCntLUYSnPcx2lz8WD3VysT1y76IA72H2n2X220qJ3GIGARft/gSUQ356EHBSL85qRxIR/pYRkasHJnpnKRo7U8+iwLAj4wi2Tkf0iaP5tn2Ga2AFKbEiSCR8yKxHTlHTflmbLm8iu3bnELxoaLHnZHB2UWR2JcBtlHUMLxfncX/nZqo3GgbgW
UjHOITHsyNb/o6G94pi9arcDbSykXeNXnJzP3zbv2IDsXti/28XfdAPDjcTSWmnuZKWhDOYaWcCTUHjOsOWEDmXrU78yD7UfOKguXdQfi+WGlzXCBLWKBrXbX7XHXnXXX3XPWbZpW2DZWOD0ASXDWnOE0lXoTuARD4StX0bdWKLpCsuIAcv2QAm1yK0b2zXW727O7sge6+6B6h6R6x7oPQnfWWS2nUB/h0sJ1e14gqR+4SR+nxu81XJxNIdOTaGE
3cHzJk3CGkQ03SG5xSOSss2bh5luIrh836HyPgjVmBH1mAEK3aPq3MKuHqaeGm2CLTmS3iKTH2PWPeOl9PTe3yjBcqjN9RPFHrPJOKweYp3NH204y53exU9fgJ1d2xKxEnoXslkAvpENa3KuTU9Ey4XZiVLEWFjkWz3pgL23H9CqzPEJBrguQah2gy54h7EfnAmXHQPaz6zGzmzWz2zOzuzez+zBzMnrtRz6IvaVjTP0Xim+EXtf2PmfcqkCXqmQ
nqfaf6fGefneur3Lyz59gscuSBwelXIqRSNEG+FbCjgmU5mrIhFUz8O3hCOjf5mm9ESln8bi2LvS3a3ru0LbvOGDmHvG3PuXuPe3uuO+GxHu27nOa+3/u/TAeNL9Rge5fLED9qJgGGFsVWi/nOiAQ/g69Udl4VOBm1P78czLoEQhFRn4Q8f9OJQjbifGLxeRUzPwC5zLOFyBsKnuU5imwDX2RMVriMCrjapsCjVPPe/HjGXOC/PZYmA2WKDgvVZi
B1ZuXwv6Dm7muO6u72v+7nBB7h7R6pXWD2DFUauB++CCvBCYS4TLOETtW81JCKv9Wquz/jXsTGvvXwnIn8BonYn4nEnkmqTdJl62ohfVteTlCOmVmshXBFO6IMkFgycg7pfs+eW8oRw8hZ5Jmuwf2kRnJB9h56pwSSjt3d7UQzuxA9jtRy7xsNfeZbRjtaRppB8rQmRdtiW1yKsRmA0UYPuI2+4c14woGGPr6Smrx9G+Atd5pXVB67RtCGfTDDim
nbtpeKs7M6J0XsjAV+SytZHtwCsjrsGULKNOqSiwa6d923fAngZyJ5Gd+aBTZvpL3M5W1Ze5TADkYMPbO0liCdKsh7Xjpu0qy2wAeMjVwEmE9gaMGygXTABF0WQeVNQAVXLq20WQpVWqs4glo7V7c/LPoI/Wfqv136n9b+r/X/qAN0+0ZCegNRMj7A/gjhSEIIjySp5woMvaMtNQryY1iQewQEACAqHaNBsm9OIcnyrrrUvq+9DobtW6En0yoR1B
AOfV1izZ8A82K6vVxWwf8IAeEdkOMAoCNAds2AEAbB0G7OBY2MIUlAOCJR14EQpKabn3C2HDpdgE6bWlUMXSg4UQVeCbujBRDUpne2rJ3pAAo4E4WGWzH3r3hoH3cG2gZFIuHxbbsdQ+FzRqGwI4EgoiiUfXgWviE4A85GCfMTkLRT4i1qIPQGTgNjk715LIVIKUkX23RaCwW9wRwkOAFKfh9aCLEwSewb7GcLBaxDFhsT6I21rOXfG3BP0VQ8gK
oJxUgDwAAA62sTkMwGEKks+ROYAwKgAOqD8VWbnEfgag873E2RHxdAC8Vn6Bd2WU/OPF8RKBr8/ittMNDK1RIQAOR8hbkXyKiACihRhrEUWyH0DiiT6XsS/iq2K639Suuacru6Uq7sj0SJo/keEAtHsgrRYog6kryp7oBvYBoXADT3sSqB6APAPuuXGiDjA4A31JsLHlAE5NWSA4PNCOHRA9IcxJIHYf0wgFJURMo8P4FKTrwuFluYOCdP7XsgjJ
UcD5e4BcMeGnxNh09UeN8EuBdisGrww0mW295UCvhtbWgYcz+HHNm2nAoEcwI96sDQg4IwDJCP44QA+Bf3AQQmCEE0iFGgtJRqn1wB7Z0R7ROToQ3+pa0QWKtMHGuxtQY9IWciPuCSGgE6dyRBLP/KYKcZWUyeqYtYZT1zg7Ay43ifxHXniC+xX277VFhL0ujrExiJjJkR0JZE98SgTgiytMAcpuC7K3tKspDlCiXBgK+8FBpIndruDkJxiTCRcD
JDEZcJsIfCRhOpAwg7gHY74BHUYmETig4VKZERgsIkhxu0dLymADbF0TOx3Yy4EEJCEOYS64QsukVWs7RDfMsQ22hvVklSTq6x9PrHJKPr9CVJgws+sIFOqjDzq4wy6qpCdF/DMA3YSoAgGUDUj3xHicQcI0p7STE4DlXiecCwlkTmUQiSiTxLACe0xelPaqiTwfAkTsJ5E9yalU8l8TgqnYxiUJKrIZhRehdZ0B1h6DMBKgiASMAQGfGX0FQSUl
KT6ysD4BFeUw01ugF/H/jAJwEzXnHgp6J4Ssrkf2qMgVLkTSUzw7PFNASAkYTgLKKyLjQR5VjkQe3I4KjlHhfBxuzUogc1N7HMN+xrDU0jsw4bfD/evw4fP8JOaMC22Qjd7j+lnHsCDwC4n7tH1XEDtiECIpPmIN3HeIDxMPacM9n+j+U8Rg4AkRqX27MoMsNfCkXX0M5vincIBFvrxGsa2DcWcE4wUSyq5cEeC3KUsHrQ4F8iCA+AVAGoEFFRBD
WqAawMQD5H2TUAGMzGVjOxmMwLirnLArKLH7yjnwb4RUZLBn6st3ii/ULprFoIRdygYYqwJGOjGxiEA8Ys7EmKP5AkDRoMhQuDI4CQzuwyMo0HDOyB+jkZCoVAOjOxkyycZ9opNFfyK5qtRCLo1qY/3dHP9FUvMlAvzMFnEBhZsM+GeLJRlSzq6ss82RjLf4gcZhf4yoDwDiTwRKgqwqqZAB7iDg3gCIVEIOEBYIhcRiDQcH0mwmQ4ZwI4Kxr1JC
jbw7xImZlIjQzbxRlmpAyjshSNIIA90I4G7kOIY4/DmOy0icatMEaccQRVEMETtMXxAYlxK47mk8yOnCDAyJ03FjZOoiIQLpig3JOPFmTy01BAxGMONwelINc2S8LbnuzsYZTCeVIswV9LRYQT6R/ERkTi0772DWRJMqriyBCBCRqYoom0eoAQCoA/Jko/Ge5yJn0svOZM5UZTKC7qjoAmoyANqIYK6j0uJ/VBPuAIBuxN5whaKLvIVDxooSis1N
MrJK6LMH+yJD0U/LXmvzrR78neX5ODG5xvEZcZgHtmGCggG4zs1pvGB7jOA/akISvrvEzGJVCx8PX7FKU9mbg0Qm4C4FWLuBvBt4UOOAScC2RxzXeDDNvJNK958JZk2AJzlW0zkPIFpOc8cc93znwJ1pYfYuXONLkUU+O53ZcTCP7bVyXm5grcaIIbm7iGSUgnnNnwugMTbgDUovlyT7mcxBwQiAcD+ULIjy7ODjdSrXI/aWDp5UvbBf9IXm2cHB
6rdxqCU5HIFUAiAFgJwAIDiz/EHsblvqhZhSiCZNxOUcfIVHBcz5NqefmTJ9ghpaZvLdfkiOXEPzMuWsr0V4oXC+LYZiM9kKgACUZxv5DooQkZIMphxAFbogoiAvcXGjslPijLv4sCVWyGuyvdAHtkqDMAag9AfAABFS5XZPxLsiABgr2ChRRuxeHpBOkVr9NUekFR3tmy+CGNlkQjKhVXloW9oiGJHRZurJeGJy3hU0/bpwoznmlhx2cm0gwPfT
TjW2c+bjiXM4GR8K5si2PoIPhHWLE+24kHruJ66L5M+x0WTp0R3RclXIgIM8eoNzJ9yKs/1f4MpzMXwtR5lIxxksUnngThM9ig4fOVtqAzD2bitEh4qYANLBATS/JYUpaX7zMCh8u4pEuXnedL5MSgLnEuC4JKwudMlJffOP4ZKji+K0gIStyXNLil8s5VmUv/nOiqluym7LUrxX1LvFRKvxSSqKUMwFI4ARaNRDgBwATQOSLis+GigZAvEWoESI
8AYCEAEAgEMmnNK956hLVVqtYJLBEAgIeg+4KoFR3eE0dDVKWbrE3UdUAQzV9HXhdhQD43y7Vnq9IDzFzmCK3VQa0LI6pNBrTC5WoyNVAAdXpAY1tygES91tUeqo16QX2FIsugRrM1iax1ZXGeW+kM19qx1TzFH5Urdc+a8tSGrxkUqCgZa4NfoATinyEAxBG1e6rrVOquhI2DSXL2bVZr9AF4NSf2oaSjla1LavasMFTG/wbVzAbAGyCNB4R8Qw
4BIHCFSpUNMGOOJqEurGD4AUFGZIRP7SUSEZ/qhGGcIaqMBsADAWqqjAQAZjBhDgy2IdYWuzU6IecXAr+YatlAkAh+1LJtX+uIAmgEAM1A5OuJIB0kY0IeNgpoGCAUjINvq+IZACAj4AGkpAZQJKAAAUyIB4LwDyTUACN+G/YGcAACUuof2MoCLBah5gWG3ALhruBEab4zGpjYyFCgUbX13a6qCms5DFr5YchYqh8oyD+wywLBLespEyC4B4NF0c
pZLCIAzU5NbBXVX/Jv4lBtJM2VTUBy4TLjNA9iH1tkEqBsE4A0G7aKOuk0IaEV1ERYIQEYCjAxg965puaDSA2bxEmsVLD7FbWx47BzipeQowMBmTggrmp6H5rtr2YBgNmuzbevymtYlVwHJacEAbBvs2wQAA
%%
The concept is each worker has a dequeue. It takes a certain task to do fron the head put it into its local dequeue. Once its task finish. It can steal other worker queue from the end to avoid conflict.
A typical worker will try to steal from other worker first before attempting to poll from the central queue. It will batch poll so one time it polls for N tasks and store in the queue.
Another way is skip blocking queue directly and push straight to worker dequeue. However it will rquires more complex producer algorithm. We can consider round-robin or hash, etc..
Blocking and interupt
A thread may block or pause. It could be for several reasons:
- Acquire a lock,
- Waking up from thread.sleep
- Waiting for a result of another thread.
It could have the following states:
BLOCKEDWAITINGTIMED_WAITING
Some of them will throw InterruptException. We need to handle this exception. There are 2 main way to handle
- Propagate it to caller: dont handle the interrupt exception and propagate it to caller. Most of the cases this is ok.
- Handle the interupt signal: for critical work and we don't want to stop what we're doing. We need a properway to handle this
[!danger]
Do not catch and ignoreInteruptExceptionas it will cause huge performance issue
We can also interupt other threads by calling interupt(). The scenario when to interupt is really up to you.
Synchronizers
This refer to synchronises mechanism to synchronise all the threads together. Some of which are
Latches
Block a thread until terminate state reaches (when latch reaches to 0)
Example: CountDownLatch — terminate state is when the count reaches zero (0)
Latches is like a gate. We can use latches to delay a thread. A latches eventually will reach a terminal state. Once it reaches the terminal state, it stays there forever.
Use case:
- Ensure something does not proceed until some condition triggered
Example:
- Ensure computation does not proceed until resource R has been initialise
- We can use a simple two-state latch
CountDownLatch(1)
- We can use a simple two-state latch
- Ensure that N resources is intialise or have started
CountDownLatch(n)
- Wait until all parties involved in the activity
For example in this case here we use CountDownLatch to monitor 2 gates.
StartGate: binary gate that to starts all the threadsEndGate: Wait until n threads to complete. This is necessary to calculate the time to finish n threads
public class TestHarness {
static Logger logger = Logger.getAnonymousLogger();
public long timeTask(int nThreads, final Runnable task) throws InterruptedException {
final CountDownLatch startGate = new CountDownLatch(1);
final CountDownLatch endGate = new CountDownLatch(nThreads);
for (int i = 0; i < nThreads; i++) {
Thread thread = new Thread() {
@Override
public void run() {
try {
startGate.await();
try {
task.run();
} finally {
endGate.countDown();
}
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
};
thread.start();
}
var startTime = Instant.now();
startGate.countDown();
endGate.await();
return Duration.between(startTime, Instant.now()).toNanos();
};
public static void main(String[] args) throws InterruptedException {
var testHarness = new TestHarness();
var result = testHarness.timeTask(5, () -> {
try {
Thread.sleep(1);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
});
logger.info("Result: " + result);
}
}
When the exception is complicated, we can create a utility to handle each exception. For example:
private RuntimeException exceptionWrapper(Throwable cause) {
if (cause instanceof RuntimeException) {
System.out.println("Caught runtime");
return (RuntimeException) cause;
}
return new UndeclaredThrowableException(cause);
}
public String getInfo() throws ExecutionException, InterruptedException {
try {
return loadInfo.get();
} catch (Exception e) {
Throwable cause = e.getCause();
throw exceptionWrapper(cause);
}
}
Semaphore
Semaphore allows number of locks to acquire at the same time. See Mutex vs Semaphore > Semaphore.
A Binary Semaphore (new Semaphore(1)) is behave the same as a mutex.
Semarphore is useful for implementing resource pools such as database connections, where you block if the pools is empty and unblock when it's not empty. To do this you initialise new Semaphore(poolSize)
Use case
- Create any datastructure that need bounded (wait until resource available).
- Suitable to implement rate limiter
public class SemaphoreTest {
public static void main(String[] args) throws InterruptedException {
Semaphore binarySemaphore = new Semaphore(1);
Runnable task = () -> {
try {
binarySemaphore.acquire();
System.out.println("Thread " + Thread.currentThread().getName() + " acquired lock.");
binarySemaphore.release();
System.out.println("Thread " + Thread.currentThread().getName() + " released lock.");
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
};
ExecutorService executorService = Executors.newCachedThreadPool();
for (var i = 0; i < 2; i++) {
executorService.submit(task);
}
executorService.shutdown();
executorService.awaitTermination(1, TimeUnit.MINUTES);
}
}
In this example, binarySemaphore only allowed to be acquire once. Which is a similar to a mutex. As a result, the second thread needs. This is basically the same as using a mutex
ReentrantLock lock = new ReentrantLock();
lock.lock()
lock.unlock()
Similarly, this is an example of using resource pool
public class ResourcePool {
private Logger logger = Logger.getAnonymousLogger();
private final Semaphore semaphore;
public ResourcePool(int maxResource) {
this.semaphore = new Semaphore(maxResource);
}
public boolean allocate() {
try {
semaphore.acquire();
logger.info("Allocated resource to " + Thread.currentThread().getName());
Thread.sleep(1000);
semaphore.release();
return true;
} catch (InterruptedException e) {
logger.warning(e.getMessage());
}
return false;
}
public static void main(String[] args) throws InterruptedException {
ResourcePool resourcePool = new ResourcePool(2);
ExecutorService executorService = Executors.newCachedThreadPool();
for (var i = 0; i < 6; i++ )
executorService.submit(resourcePool::allocate);
executorService.shutdown();
executorService.awaitTermination(1, TimeUnit.MINUTES);
}
}
We can also create any BoundedDatastructure using Semaphore. For example
public class BoundedHashSet<K, V> {
private final Logger logger = Logger.getAnonymousLogger();
private final Semaphore semaphore;
private final Map<K, V> map;
public BoundedHashSet(int maxItems) {
this.semaphore = new Semaphore(maxItems);
this.map = Collections.synchronizedMap(new HashMap());
}
public V put(K key, V value) throws InterruptedException {
semaphore.acquire();
logger.info("Putting pair (%s, %s)".formatted(key, value));
this.map.put(key, value);
return value;
}
public V remove(K key) {
V oldValue = this.map.remove(key);
if (oldValue != null) {
semaphore.release();
}
return oldValue;
}
public static void main(String[] args) throws InterruptedException {
var boundedHashSet = new BoundedHashSet<Integer, String>(2);
boundedHashSet.put(1, "hello");
boundedHashSet.put(2, "hello");
boundedHashSet.remove(1);
boundedHashSet.put(3, "hello");
}
}
Barrier
If latches counts down, barrier counts up until it reaches a target. For detail see CyclicBarrier
Basically we setup a new CyclicBarrier(n) and everytime we call cyclicBarrier.await(), it will increment the count by 1, and then block the thread until the barrier reaches n
var cyclicBarrier = new CyclicBarrier(5, () -> {
// A Runnable that would be execute once by the last worker that called await() after the barrier has reached 5
})
// In some thread
cyclicBarrier.await()
Use case:
- Distributed task to workers
- Example: Multi-threaded game of life where each worker manage a small area https://github.com/auspham/learn/blob/master/java/java-concurrency-practice/app/src/main/java/org/example/chapter5/GameOfLifeMultiThread.java
Exchanger
Exchanger can be used to communicate between 2 threads by passing variable from one to another. The passed variable is a refernce to the pointer. So 2 thread will have access to that same pointer.
For exchanger, the thread will wait at the exchange point (the point where we trigger exchanger.exchange(info) it will only continue the trigger once it receive an exchange from another thread.
// Thread 1
buffer.add("Some data");
var thread2_result = exchange.exchange(buffer); // Wait until another thread trigger this line
sout("Done");
// Thread 2
buffer.add("Some data 2");
var thread1_result = exchange.exchange(buffer); // Wait until another thread trigger this line
sout("Done");
public class ExchangerDemo {
static Logger log = Logger.getAnonymousLogger();
static class BufferFiller extends Thread {
private final int capacity;
private final AtomicInteger currentCapacity;
private final CopyOnWriteArrayList<String> buffer;
private final Exchanger<CopyOnWriteArrayList<String>> exchanger;
private final Consumer<CopyOnWriteArrayList<String>> populateBufferCb;
BufferFiller(int capacity, Exchanger<CopyOnWriteArrayList<String>> exchanger, Consumer<CopyOnWriteArrayList<String>> populateBufferCb) {
this.currentCapacity = new AtomicInteger(0);
this.capacity = capacity;
this.exchanger = exchanger;
this.buffer = new CopyOnWriteArrayList<>();
this.populateBufferCb = populateBufferCb;
}
@Override
public void run() {
while(this.currentCapacity.addAndGet(1) < this.capacity) {
try {
populateBufferCb.accept(this.buffer);
Thread.sleep(1000);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
try {
CopyOnWriteArrayList<String> result = this.exchanger.exchange(this.buffer);
log.info("%s Got result: %s".formatted(Thread.currentThread().getName(), result.stream().reduce((a, b) -> a + b)));
result.add("ko-%s".formatted(Thread.currentThread().getName())); // this will modify the value of another thread as well. Passed here is a reference
log.info("%s current buffer: %s".formatted(Thread.currentThread().getName(), this.buffer.stream().reduce((a, b) -> a + b)));
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
}
public static void main(String[] args) throws InterruptedException {
Exchanger<CopyOnWriteArrayList<String>> exchanger = new Exchanger<>();
BufferFiller bufferFiller1 = new BufferFiller(5, exchanger, (buffer) -> {
log.info("Filled from thread %s a".formatted(Thread.currentThread().getName()));
buffer.add("a");
});
BufferFiller bufferFiller2 = new BufferFiller(5, exchanger, (buffer) -> {
log.info("Filled from thread %s b".formatted(Thread.currentThread().getName()));
buffer.add("b");
});
ExecutorService executorService = Executors.newCachedThreadPool();
executorService.submit(bufferFiller1);
Thread.sleep(5000);
executorService.submit(bufferFiller2);
executorService.shutdown();
executorService.awaitTermination(1, TimeUnit.MINUTES);
}
}
Sep 21, 2025 5:29:37 PM org.example.chapter5.ExchangerDemo lambda$main$0
INFO: Filled from thread pool-1-thread-1 a
Sep 21, 2025 5:29:38 PM org.example.chapter5.ExchangerDemo lambda$main$0
INFO: Filled from thread pool-1-thread-1 a
Sep 21, 2025 5:29:39 PM org.example.chapter5.ExchangerDemo lambda$main$0
INFO: Filled from thread pool-1-thread-1 a
Sep 21, 2025 5:29:40 PM org.example.chapter5.ExchangerDemo lambda$main$0
INFO: Filled from thread pool-1-thread-1 a
Sep 21, 2025 5:29:42 PM org.example.chapter5.ExchangerDemo lambda$main$1
INFO: Filled from thread pool-1-thread-2 b
Sep 21, 2025 5:29:43 PM org.example.chapter5.ExchangerDemo lambda$main$1
INFO: Filled from thread pool-1-thread-2 b
Sep 21, 2025 5:29:44 PM org.example.chapter5.ExchangerDemo lambda$main$1
INFO: Filled from thread pool-1-thread-2 b
Sep 21, 2025 5:29:45 PM org.example.chapter5.ExchangerDemo lambda$main$1
INFO: Filled from thread pool-1-thread-2 b
Sep 21, 2025 5:29:46 PM org.example.chapter5.ExchangerDemo$BufferFiller run
INFO: pool-1-thread-1 Got result: Optional[bbbb]
Sep 21, 2025 5:29:46 PM org.example.chapter5.ExchangerDemo$BufferFiller run
INFO: pool-1-thread-2 Got result: Optional[aaaa]
Sep 21, 2025 5:29:46 PM org.example.chapter5.ExchangerDemo$BufferFiller run
INFO: pool-1-thread-1 current buffer: Optional[aaaa]
Sep 21, 2025 5:29:46 PM org.example.chapter5.ExchangerDemo$BufferFiller run
INFO: pool-1-thread-2 current buffer: Optional[bbbbko-pool-1-thread-1]
Building a cache function
To build a cache function, we can start simple and then gradually address concurrency problem.
For example, build one that would able to take a Function<T, R> and return the value.
public static void main(String[] args) {
Function<Integer, BigInteger> expensiveFunction = new Function<Integer, BigInteger>() {
@Override
public BigInteger apply(Integer integer) {
return BigInteger.valueOf(new Random().nextInt(10000));
}
};
Function<Integer, BigInteger> cachedFunction = new CachedFunction<>(expensiveFunction);
System.out.println(cachedFunction.apply(1));
System.out.println(cachedFunction.apply(1));
}
1. Get a generic idea.
A generic idea is we can use ConcurrentHashMap to store the key as the arg and the value is the return type. Next time we can just query this if the cache exists
@NotThreadSafe
public static class CachedFunction<T, R> implements Function<T, R> {
private final Function<T, R> expensiveFunction;
private final ConcurrentHashMap<T, R> concurrentHashMap;
public CachedFunction(Function<T, R> expensiveFunction) {
this.expensiveFunction = expensiveFunction;
this.concurrentHashMap = new ConcurrentHashMap<>();
}
@Override
public R apply(T t) {
R result = this.concurrentHashMap.get(t);
if (result == null) {
result = this.expensiveFunction.apply(t);
this.concurrentHashMap.put(t, result);
}
return result;
}
}
Now we can manually imagine if there is 2 threads go to the apply the same time:
| Thread 1 | Thread 2 | |
|---|---|---|
R result = this.concurrentHashMap.get(t); | R result = this.concurrentHashMap.get(t); | |
if (result == null) { | if (result == null) { | |
result = this.expensiveFunction.apply(t); | result = this.expensiveFunction.apply(t); | |
| … | … |
As a result, we can see that these 2 guys execute the expensiveFunction.apply() the same time. Which is bad. The idea is we should have only 1 thread calling expensiveFunction.apply()
2. Use future
In here we can use FutureTask which have 2 option:
run()to run the taskget()to get the result of a task
The idea is when another comes, it should only calls get() if there is a thread before started it
@NotThreadSafe
public static class CachedFunction<T, R> implements Function<T, R> {
private final Function<T, R> expensiveFunction;
private final ConcurrentHashMap<T, FutureTask<R>> concurrentHashMap;
public CachedFunction(Function<T, R> expensiveFunction) {
this.expensiveFunction = expensiveFunction;
this.concurrentHashMap = new ConcurrentHashMap<>();
}
@Override
public R apply(T t) {
FutureTask<R> futureTask = this.concurrentHashMap.get(t);
if (futureTask == null) {
FutureTask<R> ft = new FutureTask(() -> this.expensiveFunction.apply(t))
futureTask = this.concurrentHashMap.put(t, ft);
if (futureTask == null) {
futureTask = ft;
futureTask.run();
}
}
try {
return futureTask.get();
} catch (ExecutionException | InterruptedException e) {
throw new RuntimeException(e);
}
}
}
Now this will solve our issue where next thread can reuse. However it still fail into a problem where 2 thread put the same future together:
| Thread 1 | Thread 2 | |
|---|---|---|
| … | … | |
futureTask = this.concurrentHashMap.put(t, ft); | futureTask = this.concurrentHashMap.put(t, ft); | |
| … | … |
3. Use atomic
As a result, we need to use putIfAbsent() atomic method from ConcurrentHashmap
| Thread 1 | Thread 2 | |
|---|---|---|
| … | … | |
futureTask = this.concurrentHashMap.put(t, ft); | atomic method blocking — cant run | |
| … | futureTask = this.concurrentHashMap.put(t, ft); |
As a result, we have the final code:
@ThreadSafe
public class CachedFunction<T, R> implements Function<T, R> {
private final Function<T, R> expensiveFunction;
private final ConcurrentHashMap<T, FutureTask<R>> concurrentHashMap;
public CachedFunction(Function<T, R> expensiveFunction) {
this.expensiveFunction = expensiveFunction;
this.concurrentHashMap = new ConcurrentHashMap<>();
}
@Override
public R apply(T t) {
FutureTask<R> futureTask = concurrentHashMap.get(t);
if (futureTask == null) {
FutureTask<R> ft = new FutureTask<>(() -> expensiveFunction.apply(t));
futureTask = concurrentHashMap.putIfAbsent(t, ft);
if (futureTask == null) {
futureTask = ft;
futureTask.run();
}
}
try {
return futureTask.get();
} catch (ExecutionException | InterruptedException e) {
throw new RuntimeException(e);
}
}
}