Celery
Celery is a worker distribution framework on python.
sequenceDiagram
participant App as Application
participant Broker as Redis (Broker)
participant Worker as Celery Worker (Server)
App->>Broker: Send task message<br/>(task name + args)
Note right of Broker: Stores task in a list/queue<br/>per Celery routing
Worker->>Broker: Poll / fetch task message
Broker-->>Worker: Deliver task payload
Worker->>Worker: Execute task function
Note right of Worker: e.g., run `add(x, y)`<br/>or `busy_wait(10)`
Worker-->>Broker: (Optional) Send result to result backend
App->>Broker: Query result backend
Broker-->>App: Return task result
application: the app to interact with celery worker. Typically another python programBroker: could be RabbitMQ or Redis or other similar queue frameworksServer: hosts the celery worker. Typically initalise (run the command) ascelery --app APP_NAME worker
The worker executed by the celery --app APP_NAME worker will be in 1 python main thread. Now for each of the task submitted it will execute on the pool (defined by --pool).
The list of pools are:
- prefork: default option, use
multiprocessingForkJoinPool. Not affected by GIL - eventletĀ andĀ gevent: Coroutine pool (single thread) affected by GIL
- solo: execute on the main thread
- threads: running on python threading, affected by GIL
- custom: Enables specifying a custom worker pool implementation through environment variables.
for example, to launch the worker using ThreadPool, we can do something like
celery --app tasks worker -P threads --loglevel=INFO
Given the following example
# tasks.py
import time
from celery import Celery
app = Celery('tasks', broker='redis://localhost:6379/')
import sys
sys.setswitchinterval(10)
def busy_wait(seconds):
end_time = time.time() + seconds
# Pure Python loop ā no I/O, no GIL release
while time.time() < end_time:
# Do some meaningless CPU work
x = 123456789 ** 5
@app.task
def add(x, y):
print(f"Received task: {x} {y} {sys.getswitchinterval()}")
busy_wait(10)
return x + y
We can launch this like
celery --app tasks worker -P threads --loglevel=INFO
Note that because our filename is tasks.py, we need to use tasks when specifying --app
And then we can have another main.py
from tasks import add
print(add.apply_async((1,3)))
Different pool type
When execute python main.py it will execute on our celery worker and you can see the result there. Note that when you execute python main.py multiple time, it will blocked by GIL because we set the switchinterval to be 10 seconds. This will make sure that GIL will default will only do context switch after 10 seconds. Otherwise with the default of 0.05 and you will see new task still execute after 0.05 seconds. However it's not true parallel (CPU will execute 0.05 seconds 1 task and jump to another task)
This behavior is also the same for eventlet as well since it's co-routine, only 1 event-loop thread.
To obtain parallel, we need to use prefork which inturn will use multiprocessing.ForkJoinPool
celery --app tasks worker --loglevel=INFO
Leaving the -P pool option default to use prefork
Now when you submit python main.py multiple time, it will launch another process hence bypass GIL. This is to achieve true parallel.
Get the result back
To get the result back, you need to provide a backend
...
app = Celery(broker='redis://localhost:6379/', backend='redis://localhost')
...
And in main.py you can get the result back like
from tasks import add
result = add.apply_async((1, 3))
# block until the task finishes; specify timeout to avoid hanging indefinitely
print(result.get()) # 4
How it works is the result is stored in redis. And client simply fetch back the result from redis
127.0.0.1:6379> keys *
1) "_kombu.binding.celery"
2) "_kombu.binding.celery.pidbox"
3) "_kombu.binding.celeryev"
4) "celery-task-meta-129fb62f-93f0-46b8-8cf1-3a1d7d917bc5"
5) "celery-task-meta-9b3e5b82-5c9f-4139-972e-e2fc5331c986"
127.0.0.1:6379> get celery-task-meta-9b3e5b82-5c9f-4139-972e-e2fc5331c986
"{\"status\": \"SUCCESS\", \"result\": 4, \"traceback\": null, \"children\": [], \"date_done\": \"2025-09-14T06:07:42.851778+00:00\", \"task_id\": \"9b3e5b82-5c9f-4139-972e-e2fc5331c986\"}"