You cannot select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

7.1 KiB

Bien sûr ! RabbitMQ supporte plusieurs modèles d'échange (exchange types) qui déterminent comment les messages sont routés vers les files d'attente. Voici les principaux modèles d'échange et des exemples pour chacun :

1. Direct Exchange

Un Direct Exchange route les messages vers les files d'attente dont la clé de routage (routing key) correspond exactement à la clé de routage du message.

Exemple

Producer:

import pika

connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()

channel.exchange_declare(exchange='direct_logs', exchange_type='direct')

severity = "info"
message = "Hello World!"
channel.basic_publish(
    exchange='direct_logs',
    routing_key=severity,
    body=message,
)
print(f" [x] Sent {severity}:{message}")

connection.close()

Consumer:

import pika

connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()

channel.exchange_declare(exchange='direct_logs', exchange_type='direct')

result = channel.queue_declare(queue='', exclusive=True)
queue_name = result.method.queue

severity = "info"
channel.queue_bind(
    exchange='direct_logs',
    queue=queue_name,
    routing_key=severity
)

def callback(ch, method, properties, body):
    print(f" [x] {method.routing_key}:{body}")

channel.basic_consume(
    queue=queue_name,
    on_message_callback=callback,
    auto_ack=True,
)

print(' [*] Waiting for logs. To exit press CTRL+C')
channel.start_consuming()

2. Fanout Exchange

Un Fanout Exchange route les messages à toutes les files d'attente qui sont liées à cet échange, indépendamment de la clé de routage.

Exemple

Producer:

import pika

connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()

channel.exchange_declare(exchange='logs', exchange_type='fanout')

message = "Hello World!"
channel.basic_publish(
    exchange='logs',
    routing_key='',
    body=message,
)
print(f" [x] Sent {message}")

connection.close()

Consumer:

import pika

connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()

channel.exchange_declare(exchange='logs', exchange_type='fanout')

result = channel.queue_declare(queue='', exclusive=True)
queue_name = result.method.queue

channel.queue_bind(exchange='logs', queue=queue_name)

def callback(ch, method, properties, body):
    print(f" [x] {body}")

channel.basic_consume(
    queue=queue_name,
    on_message_callback=callback,
    auto_ack=True,
)

print(' [*] Waiting for logs. To exit press CTRL+C')
channel.start_consuming()

3. Topic Exchange

Un Topic Exchange route les messages vers les files d'attente dont la clé de routage correspond à un motif (pattern) spécifique. Les clés de routage peuvent contenir des caractères génériques (* pour un seul mot et # pour zéro ou plusieurs mots).

Exemple

Producer:

import pika

connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()

channel.exchange_declare(exchange='topic_logs', exchange_type='topic')

routing_key = "kern.critical"
message = "A critical kernel error"
channel.basic_publish(
    exchange='topic_logs',
    routing_key=routing_key,
    body=message,
)
print(f" [x] Sent {routing_key}:{message}")

connection.close()

Consumer:

import pika

connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()

channel.exchange_declare(exchange='topic_logs', exchange_type='topic')

result = channel.queue_declare(queue='', exclusive=True)
queue_name = result.method.queue

binding_keys = ["kern.*", "*.critical"]
for binding_key in binding_keys:
    channel.queue_bind(
        exchange='topic_logs',
        queue=queue_name,
        routing_key=binding_key
    )

def callback(ch, method, properties, body):
    print(f" [x] {method.routing_key}:{body}")

channel.basic_consume(
    queue=queue_name,
    on_message_callback=callback,
    auto_ack=True,
)

print(' [*] Waiting for logs. To exit press CTRL+C')
channel.start_consuming()

4. RPC (Remote Procedure Call)

Le modèle RPC permet à un client d'appeler une fonction sur un serveur et d'obtenir une réponse. Ce modèle est souvent utilisé pour des opérations synchrones où le client attend une réponse avant de continuer.

Exemple

Client (RPC Client):

import pika
import uuid

class FibonacciRpcClient(object):
    def __init__(self):
        self.connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
        self.channel = self.connection.channel()

        result = self.channel.queue_declare(queue='', exclusive=True)
        self.callback_queue = result.method.queue

        self.channel.basic_consume(
            queue=self.callback_queue,
            on_message_callback=self.on_response,
            auto_ack=True)

    def on_response(self, ch, method, props, body):
        if self.corr_id == props.correlation_id:
            self.response = body

    def call(self, n):
        self.response = None
        self.corr_id = str(uuid.uuid4())
        self.channel.basic_publish(
            exchange='',
            routing_key='rpc_queue',
            properties=pika.BasicProperties(
                reply_to=self.callback_queue,
                correlation_id=self.corr_id,
            ),
            body=str(n))
        while self.response is None:
            self.connection.process_data_events()
        return int(self.response)

fibonacci_rpc = FibonacciRpcClient()

print(" [x] Requesting fib(30)")
response = fibonacci_rpc.call(30)
print(" [.] Got %r" % response)

Server (RPC Server):

import pika

connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
channel = connection.channel()

channel.queue_declare(queue='rpc_queue')

def fib(n):
    if n == 0:
        return 0
    elif n == 1:
        return 1
    else:
        return fib(n-1) + fib(n-2)

def on_request(ch, method, props, body):
    n = int(body)

    print(f" [.] fib({n})")
    response = fib(n)

    ch.basic_publish(
        exchange='',
        routing_key=props.reply_to,
        properties=pika.BasicProperties(correlation_id=props.correlation_id),
        body=str(response))
    ch.basic_ack(delivery_tag=method.delivery_tag)

channel.basic_qos(prefetch_count=1)
channel.basic_consume(queue='rpc_queue', on_message_callback=on_request)

print(" [x] Awaiting RPC requests")
channel.start_consuming()

Explication

  • Direct Exchange: Utilisé pour le routage direct où la clé de routage doit correspondre exactement.
  • Fanout Exchange: Utilisé pour diffuser des messages à toutes les files d'attente liées.
  • Topic Exchange: Utilisé pour le routage basé sur des motifs de clé de routage.
  • RPC: Utilisé pour des appels de procédure à distance où le client attend une réponse du serveur.

Ces modèles d'échange permettent de gérer différents types de communication et de routage de messages dans RabbitMQ, offrant ainsi une grande flexibilité pour diverses architectures de micro-services.