RabbitMQ
RabbitMQ is suitable for a message queue where worker subscribe to a queue to work on a task.
RabbitMQ is push model where the queue push the task to the worker
flowchart LR
subgraph Exchange["Exchange"]
direction TB
Direct["Direct Exchange"]
Fanout["Fanout Exchange"]
Topic["Topic Exchange"]
end
subgraph Queue["Queues"]
direction TB
QueueA["Queue A"]
QueueB["Queue B"]
QueueC["Queue C"]
end
Producer1["Producer 1"] -- Publish --> Direct & Topic
Producer2["Producer 2"] -- Publish --> Fanout
Direct -- Route --> QueueA & QueueB
Fanout -- Broadcast --> QueueA & QueueB & QueueC
Topic -- Pattern Match --> QueueC
QueueA -- Consume --> Consumer1["Consumer 1"]
QueueB -- Consume --> Consumer1["Consumer 1"]
QueueB -- Consume --> Consumer2["Consumer 2"]
QueueC -- Consume --> Consumer3["Consumer 3"]
There are 3 types of exchanges
| Exchange | Description |
|---|---|
| Direct Exchange | Route message to a specific queue |
| Fanout exchange | Broadcast message to all queues |
| Topic exchange | Route basec on specific pattern |
Publisher:
- can publish to multiple exchanges.
- Exchange can choose to publish to multiple queue
Consumer:
- Can consume from multiple queues.
- After consuming, consumer will delete message
[!important]
In constrast to Kafka, if 2 workers consume the same queue, they cannot receive the same message. See Kafka vs RabbitMQ
Topic exchange example
For topic exchange, we can use
*(star): Subtitutes for one word#(hash): Substitutes for zero or more word
import pika
connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()
# 1. Declare the Topic Exchange
channel.exchange_declare(exchange='vehicle_logs', exchange_type='topic')
# 2. Publish messages with different routing keys
messages = {
"tesla.red.fast": "A red Tesla driving fast",
"ford.blue.slow": "A blue Ford driving slow",
"tesla.white.parked": "A white Tesla parked"
}
for key, body in messages.items():
channel.basic_publish(exchange='vehicle_logs', routing_key=key, body=body)
print(f" [x] Sent {key}: {body}")
connection.close()
import pika
connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()
channel.exchange_declare(exchange='vehicle_logs', exchange_type='topic')
# Create a temporary random queue for this consumer
result = channel.queue_declare(queue='', exclusive=True)
queue_name = result.method.queue
# BINDING: Match any Tesla (* matches color, # matches everything after)
channel.queue_bind(exchange='vehicle_logs', queue=queue_name, routing_key="tesla.#")
def callback(ch, method, properties, body):
print(f" [Tesla Fan] Received: {body}")
channel.basic_consume(queue=queue_name, on_message_callback=callback, auto_ack=True)
channel.start_consuming()
Scaling
Loading...
We simply spawn more nodes of RabbitMQ to create a cluster