You cannot select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
montpelliermaalsi2024/rabbit/modeles_msg.md

270 lines
7.1 KiB
Markdown

1 year ago
Bien sûr ! RabbitMQ supporte plusieurs modèles d'échange (exchange types) qui déterminent comment les messages sont routés vers les files d'attente. Voici les principaux modèles d'échange et des exemples pour chacun :
### 1. Direct Exchange
Un **Direct Exchange** route les messages vers les files d'attente dont la clé de routage (routing key) correspond exactement à la clé de routage du message.
#### Exemple
**Producer**:
```python
import pika
connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()
channel.exchange_declare(exchange='direct_logs', exchange_type='direct')
severity = "info"
message = "Hello World!"
channel.basic_publish(
exchange='direct_logs',
routing_key=severity,
body=message,
)
print(f" [x] Sent {severity}:{message}")
connection.close()
```
**Consumer**:
```python
import pika
connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()
channel.exchange_declare(exchange='direct_logs', exchange_type='direct')
result = channel.queue_declare(queue='', exclusive=True)
queue_name = result.method.queue
severity = "info"
channel.queue_bind(
exchange='direct_logs',
queue=queue_name,
routing_key=severity
)
def callback(ch, method, properties, body):
print(f" [x] {method.routing_key}:{body}")
channel.basic_consume(
queue=queue_name,
on_message_callback=callback,
auto_ack=True,
)
print(' [*] Waiting for logs. To exit press CTRL+C')
channel.start_consuming()
```
### 2. Fanout Exchange
Un **Fanout Exchange** route les messages à toutes les files d'attente qui sont liées à cet échange, indépendamment de la clé de routage.
#### Exemple
**Producer**:
```python
import pika
connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()
channel.exchange_declare(exchange='logs', exchange_type='fanout')
message = "Hello World!"
channel.basic_publish(
exchange='logs',
routing_key='',
body=message,
)
print(f" [x] Sent {message}")
connection.close()
```
**Consumer**:
```python
import pika
connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()
channel.exchange_declare(exchange='logs', exchange_type='fanout')
result = channel.queue_declare(queue='', exclusive=True)
queue_name = result.method.queue
channel.queue_bind(exchange='logs', queue=queue_name)
def callback(ch, method, properties, body):
print(f" [x] {body}")
channel.basic_consume(
queue=queue_name,
on_message_callback=callback,
auto_ack=True,
)
print(' [*] Waiting for logs. To exit press CTRL+C')
channel.start_consuming()
```
### 3. Topic Exchange
Un **Topic Exchange** route les messages vers les files d'attente dont la clé de routage correspond à un motif (pattern) spécifique. Les clés de routage peuvent contenir des caractères génériques (`*` pour un seul mot et `#` pour zéro ou plusieurs mots).
#### Exemple
**Producer**:
```python
import pika
connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()
channel.exchange_declare(exchange='topic_logs', exchange_type='topic')
routing_key = "kern.critical"
message = "A critical kernel error"
channel.basic_publish(
exchange='topic_logs',
routing_key=routing_key,
body=message,
)
print(f" [x] Sent {routing_key}:{message}")
connection.close()
```
**Consumer**:
```python
import pika
connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()
channel.exchange_declare(exchange='topic_logs', exchange_type='topic')
result = channel.queue_declare(queue='', exclusive=True)
queue_name = result.method.queue
binding_keys = ["kern.*", "*.critical"]
for binding_key in binding_keys:
channel.queue_bind(
exchange='topic_logs',
queue=queue_name,
routing_key=binding_key
)
def callback(ch, method, properties, body):
print(f" [x] {method.routing_key}:{body}")
channel.basic_consume(
queue=queue_name,
on_message_callback=callback,
auto_ack=True,
)
print(' [*] Waiting for logs. To exit press CTRL+C')
channel.start_consuming()
```
### 4. RPC (Remote Procedure Call)
Le modèle RPC permet à un client d'appeler une fonction sur un serveur et d'obtenir une réponse. Ce modèle est souvent utilisé pour des opérations synchrones où le client attend une réponse avant de continuer.
#### Exemple
**Client (RPC Client)**:
```python
import pika
import uuid
class FibonacciRpcClient(object):
def __init__(self):
self.connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
self.channel = self.connection.channel()
result = self.channel.queue_declare(queue='', exclusive=True)
self.callback_queue = result.method.queue
self.channel.basic_consume(
queue=self.callback_queue,
on_message_callback=self.on_response,
auto_ack=True)
def on_response(self, ch, method, props, body):
if self.corr_id == props.correlation_id:
self.response = body
def call(self, n):
self.response = None
self.corr_id = str(uuid.uuid4())
self.channel.basic_publish(
exchange='',
routing_key='rpc_queue',
properties=pika.BasicProperties(
reply_to=self.callback_queue,
correlation_id=self.corr_id,
),
body=str(n))
while self.response is None:
self.connection.process_data_events()
return int(self.response)
fibonacci_rpc = FibonacciRpcClient()
print(" [x] Requesting fib(30)")
response = fibonacci_rpc.call(30)
print(" [.] Got %r" % response)
```
**Server (RPC Server)**:
```python
import pika
connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
channel = connection.channel()
channel.queue_declare(queue='rpc_queue')
def fib(n):
if n == 0:
return 0
elif n == 1:
return 1
else:
return fib(n-1) + fib(n-2)
def on_request(ch, method, props, body):
n = int(body)
print(f" [.] fib({n})")
response = fib(n)
ch.basic_publish(
exchange='',
routing_key=props.reply_to,
properties=pika.BasicProperties(correlation_id=props.correlation_id),
body=str(response))
ch.basic_ack(delivery_tag=method.delivery_tag)
channel.basic_qos(prefetch_count=1)
channel.basic_consume(queue='rpc_queue', on_message_callback=on_request)
print(" [x] Awaiting RPC requests")
channel.start_consuming()
```
### Explication
- **Direct Exchange**: Utilisé pour le routage direct où la clé de routage doit correspondre exactement.
- **Fanout Exchange**: Utilisé pour diffuser des messages à toutes les files d'attente liées.
- **Topic Exchange**: Utilisé pour le routage basé sur des motifs de clé de routage.
- **RPC**: Utilisé pour des appels de procédure à distance où le client attend une réponse du serveur.
Ces modèles d'échange permettent de gérer différents types de communication et de routage de messages dans RabbitMQ, offrant ainsi une grande flexibilité pour diverses architectures de micro-services.