RabbitMQ-Pika

Python

RabbitMQ permite gestionar colas de mensajes entre emisores y destinatarios, en el siguiente post vamos a utilizar en python Pika para su implementación.

Romina Mendez https://example.com/norajones
09-10-2021

Introducción: ¿Que es RabbitMQ?

RabbitMQ es un sistema intermediario diseñado para facilitar la transferencia de mensajes entre productores y consumidores a través de la implementación de colas.

Este componente, esencial en la arquitectura de sistemas distribuidos, se basa en conceptos clave:

  1. Productor: La entidad encargada de originar y enviar mensajes.
  2. Cola: Un reservorio donde los mensajes se almacenan temporalmente.
  3. Consumidor: La instancia receptora que procesa los mensajes según la necesidad del sistema.

Esta introducción busca proporcionar una visión clara y concisa de los elementos fundamentales de RabbitMQ, allanando el camino para una comprensión más profunda de su funcionamiento en entornos de mensajería.


Implementación con Pika en Python 🐍

La implementación en Python con la librería Pika implica la creación de dos programas esenciales: el productor y el consumidor.

Pika proporciona una interfaz eficaz para la comunicación con RabbitMQ, aprovechando un conjunto de objetos cuidadosamente diseñados para este propósito.

En nuestro ejemplo práctico, imaginemos el productor como una aplicación diseñada para gestionar pedidos de entrega de alimentos 🛵. Esta aplicación, cuyo objetivo es optimizar el proceso de entrega, se encarga de enviar múltiples mensajes📝 relacionados con los pedidos realizados por los usuarios 📱.

Para lograr esta implementación, abordaremos los siguientes pasos:

Pasos Descripción
Productor: Desarrollar un programa que, como un eficiente tomador de pedidos, genere y envíe mensajes📝 a la cola de RabbitMQ. Estos mensajes contendrán información valiosa sobre los pedidos de comida.
Consumidor: Crear un programa que actúe como el receptor de estos mensajes en la cola. El consumidor estará encargado de procesar estos mensajes según las necesidades del sistema, realizando las acciones pertinentes, como gestionar la entrega de los pedidos.

Este enfoque estructurado y eficiente garantiza una implementación clara y funcional, proporcionando una base sólida para sistemas que gestionan flujos de información en entornos dinámicos.


1. Instalar pika

!pip install pika


2. Generar el script send.py 📄

import pika
from datetime import datetime

connection = pika.BlockingConnection(
    pika.ConnectionParameters(host='localhost'))
channel = connection.channel()

channel.queue_declare(queue='delivery')

pedidos=['🍕🍕🍕','🍔🍔🍔','🍰🍰🍰','🍺🍺🍺']

for i in pedidos:
    channel.basic_publish(exchange='', routing_key='delivery', body=i)
    print(" [x] Se envia pedido!'"+ i)

connection.close()

3. Generar el script receive.py 📄

import pika, sys, os
from datetime import datetime


def main(queue='delivery'):
    connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
    channel = connection.channel()
    channel.queue_declare(queue=queue)

    def callback(ch, method, properties, body):
        print(" [x] Received %r" % body.decode())

    channel.basic_consume(queue='delivery', on_message_callback=callback, auto_ack=True)

    print(' [*] Waiting for messages. To exit press CTRL+C')
    channel.start_consuming()

if __name__ == '__main__':
    try:
        main(queue=queue)
    except KeyboardInterrupt:
        print('Interrupted')
        try:
            sys.exit(0)
        except SystemExit:
            os._exit(0)

Image description

Mongodb + Pika

A continuación vamos a modificar el script para que pueda conectarse a un mongodb atlas y realice el insert de los mensajes recibidos.

import pymongo
import pika, sys, os
from datetime import datetime

# Crear una conexion con MongoClient
client = pymongo.MongoClient("mongodb+srv://NombreUser:PasswordUser@clusterName.moczg.mongodb.net/rabbit?retryWrites=true&w=majority")

# Database
db = client["rabbit"]

# Collection
collection= db["mensajes"]

def main(queue='delivery'):
    connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
    channel = connection.channel()
    channel.queue_declare(queue=queue)

    def callback(ch, method, properties, body):
        print(" [x] Received %r" % body.decode())
        body_indsert={'fecha':datetime.now(),'queue':queue,'message':body.decode()}
        db["mensajes"].insert_one(body_indsert)

    channel.basic_consume(queue='hello', on_message_callback=callback, auto_ack=True)

    print(' [*] Waiting for messages. To exit press CTRL+C')
    channel.start_consuming()

if __name__ == '__main__':
    try:
        main(queue=queue)
    except KeyboardInterrupt:
        print('Interrupted')
        try:
            sys.exit(0)
        except SystemExit:
            os._exit(0)

Para descargar el código de estos dos archivos puedes hacerlo desde el siguiente link


Para conocer más de Rabbitmq puedes ver los siguientes sitios:


Reuse

Text and figures are licensed under Creative Commons Attribution CC BY 4.0. The figures that have been reused from other sources don't fall under this license and can be recognized by a note in their caption: "Figure from ...".

Citation

For attribution, please cite this work as

Mendez (2021, Sept. 10). Romina Mendez: RabbitMQ-Pika. Retrieved from https://r0mymendez.github.io/posts/2021-09-10-rabbitmq-pika/

BibTeX citation

@misc{mendez2021rabbitmq-pika,
  author = {Mendez, Romina},
  title = {Romina Mendez: RabbitMQ-Pika},
  url = {https://r0mymendez.github.io/posts/2021-09-10-rabbitmq-pika/},
  year = {2021}
}