You cannot select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
270 lines
7.1 KiB
Markdown
270 lines
7.1 KiB
Markdown
Bien sûr ! RabbitMQ supporte plusieurs modèles d'échange (exchange types) qui déterminent comment les messages sont routés vers les files d'attente. Voici les principaux modèles d'échange et des exemples pour chacun :
|
|
|
|
### 1. Direct Exchange
|
|
|
|
Un **Direct Exchange** route les messages vers les files d'attente dont la clé de routage (routing key) correspond exactement à la clé de routage du message.
|
|
|
|
#### Exemple
|
|
|
|
**Producer**:
|
|
```python
|
|
import pika
|
|
|
|
connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
|
|
channel = connection.channel()
|
|
|
|
channel.exchange_declare(exchange='direct_logs', exchange_type='direct')
|
|
|
|
severity = "info"
|
|
message = "Hello World!"
|
|
channel.basic_publish(
|
|
exchange='direct_logs',
|
|
routing_key=severity,
|
|
body=message,
|
|
)
|
|
print(f" [x] Sent {severity}:{message}")
|
|
|
|
connection.close()
|
|
```
|
|
|
|
**Consumer**:
|
|
```python
|
|
import pika
|
|
|
|
connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
|
|
channel = connection.channel()
|
|
|
|
channel.exchange_declare(exchange='direct_logs', exchange_type='direct')
|
|
|
|
result = channel.queue_declare(queue='', exclusive=True)
|
|
queue_name = result.method.queue
|
|
|
|
severity = "info"
|
|
channel.queue_bind(
|
|
exchange='direct_logs',
|
|
queue=queue_name,
|
|
routing_key=severity
|
|
)
|
|
|
|
def callback(ch, method, properties, body):
|
|
print(f" [x] {method.routing_key}:{body}")
|
|
|
|
channel.basic_consume(
|
|
queue=queue_name,
|
|
on_message_callback=callback,
|
|
auto_ack=True,
|
|
)
|
|
|
|
print(' [*] Waiting for logs. To exit press CTRL+C')
|
|
channel.start_consuming()
|
|
```
|
|
|
|
### 2. Fanout Exchange
|
|
|
|
Un **Fanout Exchange** route les messages à toutes les files d'attente qui sont liées à cet échange, indépendamment de la clé de routage.
|
|
|
|
#### Exemple
|
|
|
|
**Producer**:
|
|
```python
|
|
import pika
|
|
|
|
connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
|
|
channel = connection.channel()
|
|
|
|
channel.exchange_declare(exchange='logs', exchange_type='fanout')
|
|
|
|
message = "Hello World!"
|
|
channel.basic_publish(
|
|
exchange='logs',
|
|
routing_key='',
|
|
body=message,
|
|
)
|
|
print(f" [x] Sent {message}")
|
|
|
|
connection.close()
|
|
```
|
|
|
|
**Consumer**:
|
|
```python
|
|
import pika
|
|
|
|
connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
|
|
channel = connection.channel()
|
|
|
|
channel.exchange_declare(exchange='logs', exchange_type='fanout')
|
|
|
|
result = channel.queue_declare(queue='', exclusive=True)
|
|
queue_name = result.method.queue
|
|
|
|
channel.queue_bind(exchange='logs', queue=queue_name)
|
|
|
|
def callback(ch, method, properties, body):
|
|
print(f" [x] {body}")
|
|
|
|
channel.basic_consume(
|
|
queue=queue_name,
|
|
on_message_callback=callback,
|
|
auto_ack=True,
|
|
)
|
|
|
|
print(' [*] Waiting for logs. To exit press CTRL+C')
|
|
channel.start_consuming()
|
|
```
|
|
|
|
### 3. Topic Exchange
|
|
|
|
Un **Topic Exchange** route les messages vers les files d'attente dont la clé de routage correspond à un motif (pattern) spécifique. Les clés de routage peuvent contenir des caractères génériques (`*` pour un seul mot et `#` pour zéro ou plusieurs mots).
|
|
|
|
#### Exemple
|
|
|
|
**Producer**:
|
|
```python
|
|
import pika
|
|
|
|
connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
|
|
channel = connection.channel()
|
|
|
|
channel.exchange_declare(exchange='topic_logs', exchange_type='topic')
|
|
|
|
routing_key = "kern.critical"
|
|
message = "A critical kernel error"
|
|
channel.basic_publish(
|
|
exchange='topic_logs',
|
|
routing_key=routing_key,
|
|
body=message,
|
|
)
|
|
print(f" [x] Sent {routing_key}:{message}")
|
|
|
|
connection.close()
|
|
```
|
|
|
|
**Consumer**:
|
|
```python
|
|
import pika
|
|
|
|
connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
|
|
channel = connection.channel()
|
|
|
|
channel.exchange_declare(exchange='topic_logs', exchange_type='topic')
|
|
|
|
result = channel.queue_declare(queue='', exclusive=True)
|
|
queue_name = result.method.queue
|
|
|
|
binding_keys = ["kern.*", "*.critical"]
|
|
for binding_key in binding_keys:
|
|
channel.queue_bind(
|
|
exchange='topic_logs',
|
|
queue=queue_name,
|
|
routing_key=binding_key
|
|
)
|
|
|
|
def callback(ch, method, properties, body):
|
|
print(f" [x] {method.routing_key}:{body}")
|
|
|
|
channel.basic_consume(
|
|
queue=queue_name,
|
|
on_message_callback=callback,
|
|
auto_ack=True,
|
|
)
|
|
|
|
print(' [*] Waiting for logs. To exit press CTRL+C')
|
|
channel.start_consuming()
|
|
```
|
|
|
|
### 4. RPC (Remote Procedure Call)
|
|
|
|
Le modèle RPC permet à un client d'appeler une fonction sur un serveur et d'obtenir une réponse. Ce modèle est souvent utilisé pour des opérations synchrones où le client attend une réponse avant de continuer.
|
|
|
|
#### Exemple
|
|
|
|
**Client (RPC Client)**:
|
|
```python
|
|
import pika
|
|
import uuid
|
|
|
|
class FibonacciRpcClient(object):
|
|
def __init__(self):
|
|
self.connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
|
|
self.channel = self.connection.channel()
|
|
|
|
result = self.channel.queue_declare(queue='', exclusive=True)
|
|
self.callback_queue = result.method.queue
|
|
|
|
self.channel.basic_consume(
|
|
queue=self.callback_queue,
|
|
on_message_callback=self.on_response,
|
|
auto_ack=True)
|
|
|
|
def on_response(self, ch, method, props, body):
|
|
if self.corr_id == props.correlation_id:
|
|
self.response = body
|
|
|
|
def call(self, n):
|
|
self.response = None
|
|
self.corr_id = str(uuid.uuid4())
|
|
self.channel.basic_publish(
|
|
exchange='',
|
|
routing_key='rpc_queue',
|
|
properties=pika.BasicProperties(
|
|
reply_to=self.callback_queue,
|
|
correlation_id=self.corr_id,
|
|
),
|
|
body=str(n))
|
|
while self.response is None:
|
|
self.connection.process_data_events()
|
|
return int(self.response)
|
|
|
|
fibonacci_rpc = FibonacciRpcClient()
|
|
|
|
print(" [x] Requesting fib(30)")
|
|
response = fibonacci_rpc.call(30)
|
|
print(" [.] Got %r" % response)
|
|
```
|
|
|
|
**Server (RPC Server)**:
|
|
```python
|
|
import pika
|
|
|
|
connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
|
|
channel = connection.channel()
|
|
|
|
channel.queue_declare(queue='rpc_queue')
|
|
|
|
def fib(n):
|
|
if n == 0:
|
|
return 0
|
|
elif n == 1:
|
|
return 1
|
|
else:
|
|
return fib(n-1) + fib(n-2)
|
|
|
|
def on_request(ch, method, props, body):
|
|
n = int(body)
|
|
|
|
print(f" [.] fib({n})")
|
|
response = fib(n)
|
|
|
|
ch.basic_publish(
|
|
exchange='',
|
|
routing_key=props.reply_to,
|
|
properties=pika.BasicProperties(correlation_id=props.correlation_id),
|
|
body=str(response))
|
|
ch.basic_ack(delivery_tag=method.delivery_tag)
|
|
|
|
channel.basic_qos(prefetch_count=1)
|
|
channel.basic_consume(queue='rpc_queue', on_message_callback=on_request)
|
|
|
|
print(" [x] Awaiting RPC requests")
|
|
channel.start_consuming()
|
|
```
|
|
|
|
### Explication
|
|
|
|
- **Direct Exchange**: Utilisé pour le routage direct où la clé de routage doit correspondre exactement.
|
|
- **Fanout Exchange**: Utilisé pour diffuser des messages à toutes les files d'attente liées.
|
|
- **Topic Exchange**: Utilisé pour le routage basé sur des motifs de clé de routage.
|
|
- **RPC**: Utilisé pour des appels de procédure à distance où le client attend une réponse du serveur.
|
|
|
|
Ces modèles d'échange permettent de gérer différents types de communication et de routage de messages dans RabbitMQ, offrant ainsi une grande flexibilité pour diverses architectures de micro-services.
|