Asyncio

Asyncio is like Coroutine in Python. which used yield and send to make 2 way communication and use event loop to make IO multiplexing happen

Asyncio will launch an event loop on a thread. For example, 1 thread can have only 1 event loop. Whenever we hits await it will put this task into an event loop and wait for it to be done. The mainthread is free to execute other things.

Asyncio example

import asyncio
import threading
from concurrent.futures import ThreadPoolExecutor

async def fetch_database(db_name):
    print(f"Fetching database {db_name}", threading.current_thread().name)
    await asyncio.sleep(3)
    print(f"Done fetching database {db_name}", threading.current_thread().name)

async def main():
    print(f"This is executing on {threading.current_thread().name}")
    tasks = [
        fetch_database("dog"),
        fetch_database("cat")
    ]

    print(f"This is still running in {threading.current_thread().name}")
    await asyncio.gather(*tasks)

asyncio.run(main())
This is executing on MainThread
This is still running in MainThread
Fetching database dog MainThread
Fetching database cat MainThread
Done fetching database dog MainThread
Done fetching database cat MainThread
sequenceDiagram
    participant MT as MainThread (Event Loop)
    participant C1 as main() coroutine
    participant C2 as fetch_database("dog")
    participant C3 as fetch_database("cat")

    autonumber

    MT->>C1: start main()
    C1->>MT: print "This is executing on MainThread"
    C1->>MT: print "This is still running in MainThread"

    C1-->>MT: await asyncio.gather(C2, C3)
    MT->>C2: schedule & run
    C2->>MT: print "Fetching database dog"
    C2-->>MT: await asyncio.sleep(3) (suspend)

    MT->>C3: schedule & run
    C3->>MT: print "Fetching database cat"
    C3-->>MT: await asyncio.sleep(3) (suspend)

    Note over MT: event loop idle waiting for timers

    MT->>C2: resume after sleep
    C2->>MT: print "Done fetching database dog"
    C2-->>MT: finish

    MT->>C3: resume after sleep
    C3->>MT: print "Done fetching database cat"
    C3-->>MT: finish

    MT->>C1: gather() completed, resume main()
    C1-->>MT: main() returns, program exits

Threadpool example

from concurrent.futures import Future, ThreadPoolExecutor
import threading
import time

def fetch_database(db_name):
    print(f"Fetching database {db_name}", threading.current_thread().name)
    time.sleep(3)
    print(f"Done fetching database {db_name}", threading.current_thread().name)

def main():
    with ThreadPoolExecutor() as executor:
        futures: Future = []
        print(f"This is executing on {threading.current_thread().name}")
        futures.append(executor.submit(fetch_database, "dog"))
        futures.append(executor.submit(fetch_database, "cat"))
        print(f"This is still running in {threading.current_thread().name}")
        
        for future in futures:
            future.result()

main()
This is executing on MainThread
Fetching database dog ThreadPoolExecutor-0_0
Fetching database cat ThreadPoolExecutor-0_1
This is still running in MainThread
Done fetching database dog ThreadPoolExecutor-0_0
Done fetching database cat ThreadPoolExecutor-0_1
sequenceDiagram
    participant MT as MainThread
    participant T1 as Worker Thread 1
    participant T2 as Worker Thread 2

    autonumber

    MT->>MT: start main()
    MT->>MT: print "This is executing on MainThread"

    MT-->>T1: submit fetch_database("dog")
    MT-->>T2: submit fetch_database("cat")

    T1->>T1: print "Fetching database dog"
    T1-->>T1: time.sleep(3) (blocks thread)

    T2->>T2: print "Fetching database cat"
    T2-->>T2: time.sleep(3) (blocks thread)

    MT->>MT: print "This is still running in MainThread"

    T1->>T1: print "Done fetching database dog"
    T1-->>MT: future completes

    T2->>T2: print "Done fetching database cat"
    T2-->>MT: future completes

    MT->>MT: future1.result()
    MT->>MT: future2.result()
    MT->>MT: main() returns

Performance comparison

Let's run both of these and compare performance:

/usr/bin/time -v python3 thread-example.py
/usr/bin/time -v python3 asyncio-example.py

We have the following output:

MetricThreadsAsyncio
User CPU0.04 s0.09 s
System CPU0.00 s0.01 s
Total CPU0.04 s0.10 s
CPU %1%3%
Wall time3.05 s3.11 s
Max RAM11.7 MB19.9 MB
Voluntary ctx switches212
Involuntary ctx switches77

In this example, Threads clearly outperform asyncio. However the amount of Context switch of asyncio is 20 times smaller. Let's scale this example up:

Scale example

Let's scale the both code up to fetch 1000 database concurrently

Thread base

from concurrent.futures import Future, ThreadPoolExecutor
import threading
import time
import os
from contextlib import redirect_stdout

def fetch_database(db_name):
    print(f"Fetching database {db_name}", threading.current_thread().name)
    time.sleep(3)
    print(f"Done fetching database {db_name}", threading.current_thread().name)

def main():
    with open(os.devnull, "w") as fnull, redirect_stdout(fnull):
        with ThreadPoolExecutor() as executor:
            futures: Future = []
            print(f"This is executing on {threading.current_thread().name}")
            
            for i in range(0, 1000):
                futures.append(executor.submit(fetch_database, i))

            print(f"This is still running in {threading.current_thread().name}")
            
            for future in futures:
                future.result()

main()

Eventloop base

import asyncio
import threading
from concurrent.futures import ThreadPoolExecutor
import os
from contextlib import redirect_stdout

async def fetch_database(db_name):
    print(f"Fetching database {db_name}", threading.current_thread().name)
    await asyncio.sleep(3)
    print(f"Done fetching database {db_name}", threading.current_thread().name)

async def main():
    with open(os.devnull, "w") as fnull, redirect_stdout(fnull):
        print(f"This is executing on {threading.current_thread().name}")
        tasks = []
        
        for i in range(0, 1000):
            tasks.append(fetch_database(i))

        print(f"This is still running in {threading.current_thread().name}")
        await asyncio.gather(*tasks)

asyncio.run(main())

[!note]
In here we redirect stdout to /dev/null to only collect metrics

MetricAsyncioThreadPoolExecutor
Wall Time3.08 s150.06 s (2m30s)
User CPU Time0.06 s0.09 s
System CPU Time0.02 s0.05 s
Total CPU Time0.08 s0.14 s
Average CPU Usage2%0%
Max RAM Usage21.8 MB13.9 MB
Voluntary Context Switches22683
Involuntary Context Switches574
Minor Page Faults32962140
Major Page Faults00
Total Tasks10001000
Effective Concurrency1000~32
Scheduler TypeUser-space (event loop)Kernel (OS threads)
QuestionWinnerWhy
Which finished faster?🟢 Asyncio50× lower wall time
Which scales better?🟢 AsyncioNo thread limit
Which is more CPU-efficient?🟢 AsyncioFewer switches, less overhead
Which uses less RAM?🟡 ThreadsBut only because it did far less work
Which is the correct model for massive I/O?🟢 AsyncioDesigned exactly for this

Why ThreadPool is bottle neck?

By default threadpool maxworker is

max_workers = min(32, os.cpu_count() + 4)

This is limited by your hardware component. In here, im using a AMD Ryzen 7 PRO 5875U with Radeon Graphics (16) @ 2.000GHz so my os.cpu_count() is only 16. In this case, the ThreadPoolExecutor will generate 32 os thread.

What if I specify more max_worker ?

Sure you can specify more, but asyncio userspace event loop have no limit of thread. Also the amount of threads that cpu needs to keep track would eventually increase the context switch, which eventually the performance will not change much or even slightly worse because of the context switch.

Conclusion

For simple job with low concurrent, ThreadPoolExecutor is a great option for simplicity and performance. However for scalability, we need to use asyncio