RabbitMQ

RabbitMQ is suitable for a message queue where worker subscribe to a queue to work on a task.

RabbitMQ is push model where the queue push the task to the worker

flowchart LR
 subgraph Exchange["Exchange"]
    direction TB
        Direct["Direct Exchange"]
        Fanout["Fanout Exchange"]
        Topic["Topic Exchange"]
  end
 subgraph Queue["Queues"]
    direction TB
        QueueA["Queue A"]
        QueueB["Queue B"]
        QueueC["Queue C"]
  end
    Producer1["Producer 1"] -- Publish --> Direct & Topic
    Producer2["Producer 2"] -- Publish --> Fanout
    Direct -- Route --> QueueA & QueueB
    Fanout -- Broadcast --> QueueA & QueueB & QueueC
    Topic -- Pattern Match --> QueueC
    QueueA -- Consume --> Consumer1["Consumer 1"]
    QueueB -- Consume --> Consumer1["Consumer 1"]
    QueueB -- Consume --> Consumer2["Consumer 2"]
    QueueC -- Consume --> Consumer3["Consumer 3"]

There are 3 types of exchanges

ExchangeDescription
Direct ExchangeRoute message to a specific queue
Fanout exchangeBroadcast message to all queues
Topic exchangeRoute basec on specific pattern

Publisher:

  • can publish to multiple exchanges.
    • Exchange can choose to publish to multiple queue

Consumer:

  • Can consume from multiple queues.
  • After consuming, consumer will delete message

[!important]
In constrast to Kafka, if 2 workers consume the same queue, they cannot receive the same message. See Kafka vs RabbitMQ

Topic exchange example

For topic exchange, we can use

  • * (star): Subtitutes for one word
  • # (hash): Substitutes for zero or more word
import pika

connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()

# 1. Declare the Topic Exchange
channel.exchange_declare(exchange='vehicle_logs', exchange_type='topic')

# 2. Publish messages with different routing keys
messages = {
    "tesla.red.fast": "A red Tesla driving fast",
    "ford.blue.slow": "A blue Ford driving slow",
    "tesla.white.parked": "A white Tesla parked"
}

for key, body in messages.items():
    channel.basic_publish(exchange='vehicle_logs', routing_key=key, body=body)
    print(f" [x] Sent {key}: {body}")

connection.close()
import pika

connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()

channel.exchange_declare(exchange='vehicle_logs', exchange_type='topic')

# Create a temporary random queue for this consumer
result = channel.queue_declare(queue='', exclusive=True)
queue_name = result.method.queue

# BINDING: Match any Tesla (* matches color, # matches everything after)
channel.queue_bind(exchange='vehicle_logs', queue=queue_name, routing_key="tesla.#")

def callback(ch, method, properties, body):
    print(f" [Tesla Fan] Received: {body}")

channel.basic_consume(queue=queue_name, on_message_callback=callback, auto_ack=True)
channel.start_consuming()

Scaling

Loading...

We simply spawn more nodes of RabbitMQ to create a cluster