You cannot select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
montpelliermaalsi2024/rabbit/workflow.md

9.8 KiB

L'orchestration de workflows complexes dans une architecture orientée micro-services peut être réalisée en utilisant RabbitMQ pour coordonner les différentes étapes du workflow. Voici un exemple de workflow complexe où plusieurs étapes doivent être exécutées dans un ordre spécifique, et où chaque étape est gérée par un micro-service différent.

Exemple de Workflow : Traitement de Commande

  1. Réception de la Commande
  2. Validation de la Commande
  3. Traitement du Paiement
  4. Préparation de la Commande
  5. Expédition de la Commande
  6. Notification du Client

Micro-service 1 : Réception de la Commande

import pika
import json

def send_order(order):
    connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
    channel = connection.channel()

    channel.exchange_declare(exchange='orders', exchange_type='direct')

    channel.basic_publish(
        exchange='orders',
        routing_key='new_order',
        body=json.dumps(order),
        properties=pika.BasicProperties(
            delivery_mode=2,  # make message persistent
        ))
    print(f" [x] Sent order: {order}")

    connection.close()

if __name__ == '__main__':
    order = {"order_id": 1, "customer_id": 123, "items": ["item1", "item2"], "amount": 100.0}
    send_order(order)

Micro-service 2 : Validation de la Commande

import pika
import json

def callback(ch, method, properties, body):
    order = json.loads(body)
    print(f" [x] Received order for validation: {order}")
    if validate_order(order):
        send_to_next_step(order)
    else:
        print(f" [x] Order validation failed: {order}")

def validate_order(order):
    # Logique de validation de la commande
    return True

def send_to_next_step(order):
    connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
    channel = connection.channel()

    channel.exchange_declare(exchange='orders', exchange_type='direct')

    channel.basic_publish(
        exchange='orders',
        routing_key='validated_order',
        body=json.dumps(order),
        properties=pika.BasicProperties(
            delivery_mode=2,  # make message persistent
        ))
    print(f" [x] Sent validated order: {order}")

    connection.close()

def main():
    connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
    channel = connection.channel()

    channel.exchange_declare(exchange='orders', exchange_type='direct')

    result = channel.queue_declare(queue='', exclusive=True)
    queue_name = result.method.queue

    channel.queue_bind(exchange='orders', queue=queue_name, routing_key='new_order')

    channel.basic_consume(queue=queue_name, on_message_callback=callback, auto_ack=True)

    print(' [*] Waiting for orders to validate. To exit press CTRL+C')
    channel.start_consuming()

if __name__ == '__main__':
    main()

Micro-service 3 : Traitement du Paiement

import pika
import json

def callback(ch, method, properties, body):
    order = json.loads(body)
    print(f" [x] Received order for payment processing: {order}")
    if process_payment(order):
        send_to_next_step(order)
    else:
        print(f" [x] Payment processing failed: {order}")

def process_payment(order):
    # Logique de traitement du paiement
    return True

def send_to_next_step(order):
    connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
    channel = connection.channel()

    channel.exchange_declare(exchange='orders', exchange_type='direct')

    channel.basic_publish(
        exchange='orders',
        routing_key='paid_order',
        body=json.dumps(order),
        properties=pika.BasicProperties(
            delivery_mode=2,  # make message persistent
        ))
    print(f" [x] Sent paid order: {order}")

    connection.close()

def main():
    connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
    channel = connection.channel()

    channel.exchange_declare(exchange='orders', exchange_type='direct')

    result = channel.queue_declare(queue='', exclusive=True)
    queue_name = result.method.queue

    channel.queue_bind(exchange='orders', queue=queue_name, routing_key='validated_order')

    channel.basic_consume(queue=queue_name, on_message_callback=callback, auto_ack=True)

    print(' [*] Waiting for orders to process payment. To exit press CTRL+C')
    channel.start_consuming()

if __name__ == '__main__':
    main()

Micro-service 4 : Préparation de la Commande

import pika
import json

def callback(ch, method, properties, body):
    order = json.loads(body)
    print(f" [x] Received order for preparation: {order}")
    if prepare_order(order):
        send_to_next_step(order)
    else:
        print(f" [x] Order preparation failed: {order}")

def prepare_order(order):
    # Logique de préparation de la commande
    return True

def send_to_next_step(order):
    connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
    channel = connection.channel()

    channel.exchange_declare(exchange='orders', exchange_type='direct')

    channel.basic_publish(
        exchange='orders',
        routing_key='prepared_order',
        body=json.dumps(order),
        properties=pika.BasicProperties(
            delivery_mode=2,  # make message persistent
        ))
    print(f" [x] Sent prepared order: {order}")

    connection.close()

def main():
    connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
    channel = connection.channel()

    channel.exchange_declare(exchange='orders', exchange_type='direct')

    result = channel.queue_declare(queue='', exclusive=True)
    queue_name = result.method.queue

    channel.queue_bind(exchange='orders', queue=queue_name, routing_key='paid_order')

    channel.basic_consume(queue=queue_name, on_message_callback=callback, auto_ack=True)

    print(' [*] Waiting for orders to prepare. To exit press CTRL+C')
    channel.start_consuming()

if __name__ == '__main__':
    main()

Micro-service 5 : Expédition de la Commande

import pika
import json

def callback(ch, method, properties, body):
    order = json.loads(body)
    print(f" [x] Received order for shipping: {order}")
    if ship_order(order):
        send_to_next_step(order)
    else:
        print(f" [x] Order shipping failed: {order}")

def ship_order(order):
    # Logique d'expédition de la commande
    return True

def send_to_next_step(order):
    connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
    channel = connection.channel()

    channel.exchange_declare(exchange='orders', exchange_type='direct')

    channel.basic_publish(
        exchange='orders',
        routing_key='shipped_order',
        body=json.dumps(order),
        properties=pika.BasicProperties(
            delivery_mode=2,  # make message persistent
        ))
    print(f" [x] Sent shipped order: {order}")

    connection.close()

def main():
    connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
    channel = connection.channel()

    channel.exchange_declare(exchange='orders', exchange_type='direct')

    result = channel.queue_declare(queue='', exclusive=True)
    queue_name = result.method.queue

    channel.queue_bind(exchange='orders', queue=queue_name, routing_key='prepared_order')

    channel.basic_consume(queue=queue_name, on_message_callback=callback, auto_ack=True)

    print(' [*] Waiting for orders to ship. To exit press CTRL+C')
    channel.start_consuming()

if __name__ == '__main__':
    main()

Micro-service 6 : Notification du Client

import pika
import json

def callback(ch, method, properties, body):
    order = json.loads(body)
    print(f" [x] Received order for notification: {order}")
    notify_customer(order)

def notify_customer(order):
    # Logique de notification du client
    print(f"Notifying customer for order: {order}")

def main():
    connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
    channel = connection.channel()

    channel.exchange_declare(exchange='orders', exchange_type='direct')

    result = channel.queue_declare(queue='', exclusive=True)
    queue_name = result.method.queue

    channel.queue_bind(exchange='orders', queue=queue_name, routing_key='shipped_order')

    channel.basic_consume(queue=queue_name, on_message_callback=callback, auto_ack=True)

    print(' [*] Waiting for orders to notify. To exit press CTRL+C')
    channel.start_consuming()

if __name__ == '__main__':
    main()

Explication

  1. Réception de la Commande : Le premier micro-service envoie une nouvelle commande à la file d'attente new_order.
  2. Validation de la Commande : Le deuxième micro-service consomme les messages de la file d'attente new_order, valide la commande, et envoie la commande validée à la file d'attente validated_order.
  3. Traitement du Paiement : Le troisième micro-service consomme les messages de la file d'attente validated_order, traite le paiement, et envoie la commande payée à la file d'attente paid_order.
  4. Préparation de la Commande : Le quatrième micro-service consomme les messages de la file d'attente paid_order, prépare la commande, et envoie la commande préparée à la file d'attente prepared_order.
  5. Expédition de la Commande : Le cinquième micro-service consomme les messages de la file d'attente prepared_order, expédie la commande, et envoie la commande expédiée à la file d'attente shipped_order.
  6. Notification du Client : Le sixième micro-service consomme les messages de la file d'attente shipped_order et notifie le client.

Ce workflow complexe est orchestré en utilisant RabbitMQ pour coordonner les différentes étapes du processus de traitement de la commande. Chaque micro-service est responsable d'une étape spécifique et communique avec les autres services via des messages RabbitMQ.