main
gwen 1 year ago
parent 299f508858
commit e84a7da44d

@ -0,0 +1,107 @@
Dans un environnement à architecture orientée micro-services, RabbitMQ est souvent utilisé comme un bus de messages pour faciliter la communication asynchrone entre les différents services. Voici quelques manières courantes d'utiliser RabbitMQ dans ce contexte :
### 1. **Communication Asynchrone**
- **Découplage des Services** : RabbitMQ permet de découpler les services en leur permettant de communiquer via des messages sans avoir besoin de connaître les détails d'implémentation des autres services.
- **Scalabilité** : Les services peuvent émettre des messages sans se soucier de la disponibilité des consommateurs, ce qui permet de mieux gérer les pics de charge.
### 2. **Traitement de Tâches en Arrière-Plan**
- **Files d'Attente de Travail** : Les services peuvent enfiler des tâches à exécuter en arrière-plan, comme le traitement d'images, l'envoi d'emails, ou la génération de rapports.
- **Répartition de Charge** : Les tâches peuvent être réparties entre plusieurs instances de consommateurs pour améliorer les performances et la résilience.
### 3. **Gestion des Événements**
- **Événements de Domaine** : Les services peuvent publier des événements de domaine (comme "UtilisateurCréé" ou "CommandeExpédiée") qui peuvent être consommés par d'autres services intéressés.
- **Réactivité** : Les services peuvent réagir rapidement aux événements sans avoir besoin de polling ou de sondage constant.
### 4. **Intégration de Systèmes Hétérogènes**
- **Interopérabilité** : RabbitMQ supporte plusieurs protocoles (AMQP, MQTT, STOMP), ce qui permet d'intégrer des systèmes hétérogènes et des technologies différentes.
- **Passerelles** : RabbitMQ peut servir de passerelle entre des systèmes legacy et des systèmes modernes.
### 5. **Tolérance aux Pannes et Résilience**
- **Durabilité** : Les messages peuvent être persistés sur le disque pour survivre aux redémarrages du broker.
- **Réplication** : Les files d'attente peuvent être répliquées sur plusieurs nœuds pour améliorer la tolérance aux pannes.
- **DLX et TTL** : Les messages non traités ou expirés peuvent être envoyés à des files d'attente de secours (Dead Letter Exchange) pour une gestion ultérieure.
### 6. **Orchestration de Workflows**
- **Workflows Complexes** : RabbitMQ peut être utilisé pour orchestrer des workflows complexes où plusieurs étapes doivent être exécutées dans un ordre spécifique.
- **Coordination** : Les services peuvent coordonner leurs actions via des messages pour garantir que les dépendances sont respectées.
### 7. **Surveillance et Gestion**
- **Monitoring** : RabbitMQ fournit des outils et des API pour surveiller les performances et l'état du broker.
- **Alertes** : Vous pouvez configurer des alertes pour être notifié en cas de problèmes ou de conditions spécifiques.
- **Management Plugin** : Le plugin de gestion fournit une interface web pour surveiller et gérer RabbitMQ.
### Exemple d'Architecture
#### Micro-service A (Producteur)
```python
import pika
import json
def send_message(message):
connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()
channel.exchange_declare(exchange='events', exchange_type='fanout')
channel.basic_publish(
exchange='events',
routing_key='',
body=json.dumps(message),
properties=pika.BasicProperties(
delivery_mode=2, # make message persistent
))
print(f" [x] Sent {message}")
connection.close()
if __name__ == '__main__':
message = {"event": "UserCreated", "data": {"user_id": 123, "username": "john_doe"}}
send_message(message)
```
#### Micro-service B (Consommateur)
```python
import pika
import json
def callback(ch, method, properties, body):
message = json.loads(body)
print(f" [x] Received {message}")
process_event(message)
def process_event(message):
# Traitement de l'événement
print(f"Processing event: {message}")
def main():
connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()
channel.exchange_declare(exchange='events', exchange_type='fanout')
result = channel.queue_declare(queue='', exclusive=True)
queue_name = result.method.queue
channel.queue_bind(exchange='events', queue=queue_name)
channel.basic_consume(queue=queue_name, on_message_callback=callback, auto_ack=True)
print(' [*] Waiting for events. To exit press CTRL+C')
channel.start_consuming()
if __name__ == '__main__':
main()
```
### Conclusion
RabbitMQ est un outil puissant pour la communication asynchrone et la gestion des messages dans une architecture orientée micro-services. Il permet de découpler les services, d'améliorer la scalabilité et la résilience, et de faciliter l'intégration de systèmes hétérogènes. En utilisant RabbitMQ, les micro-services peuvent communiquer de manière fiable et efficace, tout en bénéficiant des nombreuses fonctionnalités offertes par le broker de messages.

@ -0,0 +1,26 @@
#!/usr/bin/env python
import pika, sys, os
def main():
connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
channel = connection.channel()
channel.queue_declare(queue='hello')
def callback(ch, method, properties, body):
print(" [x] Received %r" % body.decode())
channel.basic_consume(queue='hello', on_message_callback=callback, auto_ack=True)
print(' [*] Waiting for messages. To exit press CTRL+C')
channel.start_consuming()
if __name__ == '__main__':
try:
main()
except KeyboardInterrupt:
print('Interrupted')
try:
sys.exit(0)
except SystemExit:
os._exit(0)

@ -0,0 +1,12 @@
#!/usr/bin/env python
import pika
connection = pika.BlockingConnection(
pika.ConnectionParameters(host='localhost'))
channel = connection.channel()
channel.queue_declare(queue='hello')
channel.basic_publish(exchange='', routing_key='hello', body='Hello World!')
print(" [x] Sent 'Hello World!'")
connection.close()

@ -0,0 +1,106 @@
Pour créer deux micro-services Python qui communiquent via un bus de messages (message broker) comme RabbitMQ, nous allons structurer les services de manière à ce qu'ils ne communiquent pas directement entre eux, mais plutôt via des files d'attente RabbitMQ.
### Prérequis
1. **Installer RabbitMQ**: Assurez-vous que RabbitMQ est installé et en cours d'exécution sur votre machine. Vous pouvez télécharger RabbitMQ depuis [le site officiel](https://www.rabbitmq.com/download.html).
2. **Installer les bibliothèques Python nécessaires**: Vous aurez besoin de la bibliothèque `pika` pour interagir avec RabbitMQ. Vous pouvez l'installer via pip:
```sh
pip install pika
```
### Micro-service 1: Producteur (Producer)
Créez un fichier nommé `producer_service.py` avec le contenu suivant:
```python
import pika
import json
def send_message(message):
# Connexion à RabbitMQ
connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()
# Déclaration de la file d'attente
channel.queue_declare(queue='task_queue', durable=True)
# Envoi du message
channel.basic_publish(
exchange='',
routing_key='task_queue',
body=json.dumps(message),
properties=pika.BasicProperties(
delivery_mode=2, # make message persistent
))
print(f" [x] Sent {message}")
# Fermeture de la connexion
connection.close()
if __name__ == '__main__':
for i in range(10):
message = {"task_id": i, "data": f"Task data {i}"}
send_message(message)
```
### Micro-service 2: Consommateur (Consumer)
Créez un fichier nommé `consumer_service.py` avec le contenu suivant:
```python
import pika
import json
def callback(ch, method, properties, body):
message = json.loads(body)
print(f" [x] Received {message}")
# Traitement du message
process_message(message)
print(f" [x] Done processing {message}")
ch.basic_ack(delivery_tag=method.delivery_tag)
def process_message(message):
# Simuler le traitement du message
print(f"Processing message: {message}")
def main():
# Connexion à RabbitMQ
connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()
# Déclaration de la file d'attente
channel.queue_declare(queue='task_queue', durable=True)
# Configuration du consommateur
channel.basic_qos(prefetch_count=1)
channel.basic_consume(queue='task_queue', on_message_callback=callback)
print(' [*] Waiting for messages. To exit press CTRL+C')
channel.start_consuming()
if __name__ == '__main__':
main()
```
### Exécution
1. **Démarrer RabbitMQ**: Assurez-vous que le serveur RabbitMQ est en cours d'exécution.
2. **Exécuter le consommateur**: Ouvrez un terminal et exécutez le consommateur:
```sh
python consumer_service.py
```
3. **Exécuter le producteur**: Ouvrez un autre terminal et exécutez le producteur:
```sh
python producer_service.py
```
### Explication
- **Producer Service**: Le producteur se connecte à RabbitMQ, déclare une file d'attente nommée `task_queue`, et envoie 10 messages à cette file d'attente. Chaque message est un dictionnaire JSON contenant un identifiant de tâche et des données de tâche.
- **Consumer Service**: Le consommateur se connecte à RabbitMQ, déclare la même file d'attente `task_queue`, et commence à consommer les messages de cette file d'attente. Chaque message reçu est traité par la fonction `process_message`, et un accusé de réception est envoyé à RabbitMQ pour confirmer que le message a été traité.
Ces deux micro-services communiquent via RabbitMQ, ce qui permet une communication asynchrone et fiable entre eux sans qu'ils aient besoin de se connaître directement.

@ -0,0 +1,269 @@
Bien sûr ! RabbitMQ supporte plusieurs modèles d'échange (exchange types) qui déterminent comment les messages sont routés vers les files d'attente. Voici les principaux modèles d'échange et des exemples pour chacun :
### 1. Direct Exchange
Un **Direct Exchange** route les messages vers les files d'attente dont la clé de routage (routing key) correspond exactement à la clé de routage du message.
#### Exemple
**Producer**:
```python
import pika
connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()
channel.exchange_declare(exchange='direct_logs', exchange_type='direct')
severity = "info"
message = "Hello World!"
channel.basic_publish(
exchange='direct_logs',
routing_key=severity,
body=message,
)
print(f" [x] Sent {severity}:{message}")
connection.close()
```
**Consumer**:
```python
import pika
connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()
channel.exchange_declare(exchange='direct_logs', exchange_type='direct')
result = channel.queue_declare(queue='', exclusive=True)
queue_name = result.method.queue
severity = "info"
channel.queue_bind(
exchange='direct_logs',
queue=queue_name,
routing_key=severity
)
def callback(ch, method, properties, body):
print(f" [x] {method.routing_key}:{body}")
channel.basic_consume(
queue=queue_name,
on_message_callback=callback,
auto_ack=True,
)
print(' [*] Waiting for logs. To exit press CTRL+C')
channel.start_consuming()
```
### 2. Fanout Exchange
Un **Fanout Exchange** route les messages à toutes les files d'attente qui sont liées à cet échange, indépendamment de la clé de routage.
#### Exemple
**Producer**:
```python
import pika
connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()
channel.exchange_declare(exchange='logs', exchange_type='fanout')
message = "Hello World!"
channel.basic_publish(
exchange='logs',
routing_key='',
body=message,
)
print(f" [x] Sent {message}")
connection.close()
```
**Consumer**:
```python
import pika
connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()
channel.exchange_declare(exchange='logs', exchange_type='fanout')
result = channel.queue_declare(queue='', exclusive=True)
queue_name = result.method.queue
channel.queue_bind(exchange='logs', queue=queue_name)
def callback(ch, method, properties, body):
print(f" [x] {body}")
channel.basic_consume(
queue=queue_name,
on_message_callback=callback,
auto_ack=True,
)
print(' [*] Waiting for logs. To exit press CTRL+C')
channel.start_consuming()
```
### 3. Topic Exchange
Un **Topic Exchange** route les messages vers les files d'attente dont la clé de routage correspond à un motif (pattern) spécifique. Les clés de routage peuvent contenir des caractères génériques (`*` pour un seul mot et `#` pour zéro ou plusieurs mots).
#### Exemple
**Producer**:
```python
import pika
connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()
channel.exchange_declare(exchange='topic_logs', exchange_type='topic')
routing_key = "kern.critical"
message = "A critical kernel error"
channel.basic_publish(
exchange='topic_logs',
routing_key=routing_key,
body=message,
)
print(f" [x] Sent {routing_key}:{message}")
connection.close()
```
**Consumer**:
```python
import pika
connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()
channel.exchange_declare(exchange='topic_logs', exchange_type='topic')
result = channel.queue_declare(queue='', exclusive=True)
queue_name = result.method.queue
binding_keys = ["kern.*", "*.critical"]
for binding_key in binding_keys:
channel.queue_bind(
exchange='topic_logs',
queue=queue_name,
routing_key=binding_key
)
def callback(ch, method, properties, body):
print(f" [x] {method.routing_key}:{body}")
channel.basic_consume(
queue=queue_name,
on_message_callback=callback,
auto_ack=True,
)
print(' [*] Waiting for logs. To exit press CTRL+C')
channel.start_consuming()
```
### 4. RPC (Remote Procedure Call)
Le modèle RPC permet à un client d'appeler une fonction sur un serveur et d'obtenir une réponse. Ce modèle est souvent utilisé pour des opérations synchrones où le client attend une réponse avant de continuer.
#### Exemple
**Client (RPC Client)**:
```python
import pika
import uuid
class FibonacciRpcClient(object):
def __init__(self):
self.connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
self.channel = self.connection.channel()
result = self.channel.queue_declare(queue='', exclusive=True)
self.callback_queue = result.method.queue
self.channel.basic_consume(
queue=self.callback_queue,
on_message_callback=self.on_response,
auto_ack=True)
def on_response(self, ch, method, props, body):
if self.corr_id == props.correlation_id:
self.response = body
def call(self, n):
self.response = None
self.corr_id = str(uuid.uuid4())
self.channel.basic_publish(
exchange='',
routing_key='rpc_queue',
properties=pika.BasicProperties(
reply_to=self.callback_queue,
correlation_id=self.corr_id,
),
body=str(n))
while self.response is None:
self.connection.process_data_events()
return int(self.response)
fibonacci_rpc = FibonacciRpcClient()
print(" [x] Requesting fib(30)")
response = fibonacci_rpc.call(30)
print(" [.] Got %r" % response)
```
**Server (RPC Server)**:
```python
import pika
connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
channel = connection.channel()
channel.queue_declare(queue='rpc_queue')
def fib(n):
if n == 0:
return 0
elif n == 1:
return 1
else:
return fib(n-1) + fib(n-2)
def on_request(ch, method, props, body):
n = int(body)
print(f" [.] fib({n})")
response = fib(n)
ch.basic_publish(
exchange='',
routing_key=props.reply_to,
properties=pika.BasicProperties(correlation_id=props.correlation_id),
body=str(response))
ch.basic_ack(delivery_tag=method.delivery_tag)
channel.basic_qos(prefetch_count=1)
channel.basic_consume(queue='rpc_queue', on_message_callback=on_request)
print(" [x] Awaiting RPC requests")
channel.start_consuming()
```
### Explication
- **Direct Exchange**: Utilisé pour le routage direct où la clé de routage doit correspondre exactement.
- **Fanout Exchange**: Utilisé pour diffuser des messages à toutes les files d'attente liées.
- **Topic Exchange**: Utilisé pour le routage basé sur des motifs de clé de routage.
- **RPC**: Utilisé pour des appels de procédure à distance où le client attend une réponse du serveur.
Ces modèles d'échange permettent de gérer différents types de communication et de routage de messages dans RabbitMQ, offrant ainsi une grande flexibilité pour diverses architectures de micro-services.

@ -0,0 +1,93 @@
Pour créer deux micro-services Python qui communiquent via RabbitMQ, vous pouvez suivre les étapes suivantes. Nous allons créer un producteur (producer) et un consommateur (consumer). Le producteur enverra des messages à une file d'attente RabbitMQ, et le consommateur lira ces messages.
### Prérequis
1. **Installer RabbitMQ**: Assurez-vous que RabbitMQ est installé et en cours d'exécution sur votre machine. Vous pouvez télécharger RabbitMQ depuis [le site officiel](https://www.rabbitmq.com/download.html).
2. **Installer les bibliothèques Python nécessaires**: Vous aurez besoin de la bibliothèque `pika` pour interagir avec RabbitMQ. Vous pouvez l'installer via pip:
```sh
pip install pika
```
### Producteur (Producer)
Créez un fichier nommé `producer.py` avec le contenu suivant:
```python
import pika
def main():
# Connexion à RabbitMQ
connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()
# Déclaration de la file d'attente
channel.queue_declare(queue='hello')
# Envoi de messages
for i in range(10):
message = f"Hello World {i}"
channel.basic_publish(exchange='',
routing_key='hello',
body=message)
print(f" [x] Sent {message}")
# Fermeture de la connexion
connection.close()
if __name__ == '__main__':
main()
```
### Consommateur (Consumer)
Créez un fichier nommé `consumer.py` avec le contenu suivant:
```python
import pika
def callback(ch, method, properties, body):
print(f" [x] Received {body}")
def main():
# Connexion à RabbitMQ
connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()
# Déclaration de la file d'attente
channel.queue_declare(queue='hello')
# Configuration du consommateur
channel.basic_consume(queue='hello',
on_message_callback=callback,
auto_ack=True)
print(' [*] Waiting for messages. To exit press CTRL+C')
channel.start_consuming()
if __name__ == '__main__':
main()
```
### Exécution
1. **Démarrer RabbitMQ**: Assurez-vous que le serveur RabbitMQ est en cours d'exécution.
2. **Exécuter le consommateur**: Ouvrez un terminal et exécutez le consommateur:
```sh
python consumer.py
```
3. **Exécuter le producteur**: Ouvrez un autre terminal et exécutez le producteur:
```sh
python producer.py
```
Vous devriez voir les messages envoyés par le producteur être reçus et affichés par le consommateur.
### Explication
- **Producer**: Le producteur se connecte à RabbitMQ, déclare une file d'attente nommée `hello`, et envoie 10 messages à cette file d'attente.
- **Consumer**: Le consommateur se connecte à RabbitMQ, déclare la même file d'attente `hello`, et commence à consommer les messages de cette file d'attente. Chaque message reçu est affiché dans la console.
Ces deux micro-services communiquent via RabbitMQ, ce qui permet une communication asynchrone et fiable entre eux.

@ -0,0 +1,304 @@
L'orchestration de workflows complexes dans une architecture orientée micro-services peut être réalisée en utilisant RabbitMQ pour coordonner les différentes étapes du workflow. Voici un exemple de workflow complexe où plusieurs étapes doivent être exécutées dans un ordre spécifique, et où chaque étape est gérée par un micro-service différent.
### Exemple de Workflow : Traitement de Commande
1. **Réception de la Commande**
2. **Validation de la Commande**
3. **Traitement du Paiement**
4. **Préparation de la Commande**
5. **Expédition de la Commande**
6. **Notification du Client**
### Micro-service 1 : Réception de la Commande
```python
import pika
import json
def send_order(order):
connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()
channel.exchange_declare(exchange='orders', exchange_type='direct')
channel.basic_publish(
exchange='orders',
routing_key='new_order',
body=json.dumps(order),
properties=pika.BasicProperties(
delivery_mode=2, # make message persistent
))
print(f" [x] Sent order: {order}")
connection.close()
if __name__ == '__main__':
order = {"order_id": 1, "customer_id": 123, "items": ["item1", "item2"], "amount": 100.0}
send_order(order)
```
### Micro-service 2 : Validation de la Commande
```python
import pika
import json
def callback(ch, method, properties, body):
order = json.loads(body)
print(f" [x] Received order for validation: {order}")
if validate_order(order):
send_to_next_step(order)
else:
print(f" [x] Order validation failed: {order}")
def validate_order(order):
# Logique de validation de la commande
return True
def send_to_next_step(order):
connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()
channel.exchange_declare(exchange='orders', exchange_type='direct')
channel.basic_publish(
exchange='orders',
routing_key='validated_order',
body=json.dumps(order),
properties=pika.BasicProperties(
delivery_mode=2, # make message persistent
))
print(f" [x] Sent validated order: {order}")
connection.close()
def main():
connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()
channel.exchange_declare(exchange='orders', exchange_type='direct')
result = channel.queue_declare(queue='', exclusive=True)
queue_name = result.method.queue
channel.queue_bind(exchange='orders', queue=queue_name, routing_key='new_order')
channel.basic_consume(queue=queue_name, on_message_callback=callback, auto_ack=True)
print(' [*] Waiting for orders to validate. To exit press CTRL+C')
channel.start_consuming()
if __name__ == '__main__':
main()
```
### Micro-service 3 : Traitement du Paiement
```python
import pika
import json
def callback(ch, method, properties, body):
order = json.loads(body)
print(f" [x] Received order for payment processing: {order}")
if process_payment(order):
send_to_next_step(order)
else:
print(f" [x] Payment processing failed: {order}")
def process_payment(order):
# Logique de traitement du paiement
return True
def send_to_next_step(order):
connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()
channel.exchange_declare(exchange='orders', exchange_type='direct')
channel.basic_publish(
exchange='orders',
routing_key='paid_order',
body=json.dumps(order),
properties=pika.BasicProperties(
delivery_mode=2, # make message persistent
))
print(f" [x] Sent paid order: {order}")
connection.close()
def main():
connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()
channel.exchange_declare(exchange='orders', exchange_type='direct')
result = channel.queue_declare(queue='', exclusive=True)
queue_name = result.method.queue
channel.queue_bind(exchange='orders', queue=queue_name, routing_key='validated_order')
channel.basic_consume(queue=queue_name, on_message_callback=callback, auto_ack=True)
print(' [*] Waiting for orders to process payment. To exit press CTRL+C')
channel.start_consuming()
if __name__ == '__main__':
main()
```
### Micro-service 4 : Préparation de la Commande
```python
import pika
import json
def callback(ch, method, properties, body):
order = json.loads(body)
print(f" [x] Received order for preparation: {order}")
if prepare_order(order):
send_to_next_step(order)
else:
print(f" [x] Order preparation failed: {order}")
def prepare_order(order):
# Logique de préparation de la commande
return True
def send_to_next_step(order):
connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()
channel.exchange_declare(exchange='orders', exchange_type='direct')
channel.basic_publish(
exchange='orders',
routing_key='prepared_order',
body=json.dumps(order),
properties=pika.BasicProperties(
delivery_mode=2, # make message persistent
))
print(f" [x] Sent prepared order: {order}")
connection.close()
def main():
connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()
channel.exchange_declare(exchange='orders', exchange_type='direct')
result = channel.queue_declare(queue='', exclusive=True)
queue_name = result.method.queue
channel.queue_bind(exchange='orders', queue=queue_name, routing_key='paid_order')
channel.basic_consume(queue=queue_name, on_message_callback=callback, auto_ack=True)
print(' [*] Waiting for orders to prepare. To exit press CTRL+C')
channel.start_consuming()
if __name__ == '__main__':
main()
```
### Micro-service 5 : Expédition de la Commande
```python
import pika
import json
def callback(ch, method, properties, body):
order = json.loads(body)
print(f" [x] Received order for shipping: {order}")
if ship_order(order):
send_to_next_step(order)
else:
print(f" [x] Order shipping failed: {order}")
def ship_order(order):
# Logique d'expédition de la commande
return True
def send_to_next_step(order):
connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()
channel.exchange_declare(exchange='orders', exchange_type='direct')
channel.basic_publish(
exchange='orders',
routing_key='shipped_order',
body=json.dumps(order),
properties=pika.BasicProperties(
delivery_mode=2, # make message persistent
))
print(f" [x] Sent shipped order: {order}")
connection.close()
def main():
connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()
channel.exchange_declare(exchange='orders', exchange_type='direct')
result = channel.queue_declare(queue='', exclusive=True)
queue_name = result.method.queue
channel.queue_bind(exchange='orders', queue=queue_name, routing_key='prepared_order')
channel.basic_consume(queue=queue_name, on_message_callback=callback, auto_ack=True)
print(' [*] Waiting for orders to ship. To exit press CTRL+C')
channel.start_consuming()
if __name__ == '__main__':
main()
```
### Micro-service 6 : Notification du Client
```python
import pika
import json
def callback(ch, method, properties, body):
order = json.loads(body)
print(f" [x] Received order for notification: {order}")
notify_customer(order)
def notify_customer(order):
# Logique de notification du client
print(f"Notifying customer for order: {order}")
def main():
connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()
channel.exchange_declare(exchange='orders', exchange_type='direct')
result = channel.queue_declare(queue='', exclusive=True)
queue_name = result.method.queue
channel.queue_bind(exchange='orders', queue=queue_name, routing_key='shipped_order')
channel.basic_consume(queue=queue_name, on_message_callback=callback, auto_ack=True)
print(' [*] Waiting for orders to notify. To exit press CTRL+C')
channel.start_consuming()
if __name__ == '__main__':
main()
```
### Explication
1. **Réception de la Commande** : Le premier micro-service envoie une nouvelle commande à la file d'attente `new_order`.
2. **Validation de la Commande** : Le deuxième micro-service consomme les messages de la file d'attente `new_order`, valide la commande, et envoie la commande validée à la file d'attente `validated_order`.
3. **Traitement du Paiement** : Le troisième micro-service consomme les messages de la file d'attente `validated_order`, traite le paiement, et envoie la commande payée à la file d'attente `paid_order`.
4. **Préparation de la Commande** : Le quatrième micro-service consomme les messages de la file d'attente `paid_order`, prépare la commande, et envoie la commande préparée à la file d'attente `prepared_order`.
5. **Expédition de la Commande** : Le cinquième micro-service consomme les messages de la file d'attente `prepared_order`, expédie la commande, et envoie la commande expédiée à la file d'attente `shipped_order`.
6. **Notification du Client** : Le sixième micro-service consomme les messages de la file d'attente `shipped_order` et notifie le client.
Ce workflow complexe est orchestré en utilisant RabbitMQ pour coordonner les différentes étapes du processus de traitement de la commande. Chaque micro-service est responsable d'une étape spécifique et communique avec les autres services via des messages RabbitMQ.
Loading…
Cancel
Save