RabbitMQ-Pika

Python

RabbitMQ allows you to manage message queues between senders and recipients. In the next post we are going to use Pika in python for its implementation.

Romina Mendez https://example.com/norajones
2023-12-02

RabbitMQ enables the management of message queues between senders and receivers. In the following post, we will employ Python’s Pika library for its implementation.


Introduction: What is RabbitMQ?

RabbitMQ is an intermediary system designed to facilitate the transfer of messages between producers and consumers through the implementation of queues.

This component, essential in distributed systems architecture, is grounded in key concepts:

This introduction aims to provide a clear and concise overview of the fundamental elements of RabbitMQ, paving the way for a deeper understanding of its functioning in messaging environments.


Implementation with Pika in Python 🐍

Implementing in Python with the Pika library involves creating two essential programs: the producer and the consumer.

Pika provides an effective interface for communication with RabbitMQ, leveraging a set of carefully designed objects for this purpose.

In our practical example, envision the producer as an application designed to manage food delivery orders 🛵. This application, geared towards optimizing the delivery process, is responsible for sending multiple messages📝 related to user 📱 food orders.

To achieve this implementation, we will undertake the following steps:

Steps Descriptions
Producer: Develop a program that, like an efficient order-taker, generates and sends messages📝 to the RabbitMQ queue. These messages will contain valuable information about food orders.
Consumer: Create a program that acts as the receiver of these messages in the queue. The consumer will be responsible for processing these messages according to the system’s needs, performing relevant actions, such as managing the delivery of orders.

This structured and efficient approach ensures a clear and functional implementation, providing a robust foundation for systems managing information flows in dynamic environments.


1️⃣ . Install pika

!pip install pika


2️⃣ . Create send.py 📄 file

import pika
from datetime import datetime

connection = pika.BlockingConnection(
    pika.ConnectionParameters(host='localhost'))
channel = connection.channel()

channel.queue_declare(queue='delivery')

pedidos=['🍕🍕🍕','🍔🍔🍔','🍰🍰🍰','🍺🍺🍺']

for i in pedidos:
    channel.basic_publish(exchange='', routing_key='delivery', body=i)
    print(" [x] Se envia pedido!'"+ i)

connection.close()

3️⃣. Create send.py 📄 file

import pika, sys, os
from datetime import datetime


def main(queue='delivery'):
    connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
    channel = connection.channel()
    channel.queue_declare(queue=queue)

    def callback(ch, method, properties, body):
        print(" [x] Received %r" % body.decode())

    channel.basic_consume(queue='delivery', on_message_callback=callback, auto_ack=True)

    print(' [*] Waiting for messages. To exit press CTRL+C')
    channel.start_consuming()

if __name__ == '__main__':
    try:
        main(queue=queue)
    except KeyboardInterrupt:
        print('Interrupted')
        try:
            sys.exit(0)
        except SystemExit:
            os._exit(0)

Image description

4️⃣. MongoDB + Pika

In the following, we will modify the script to enable it to connect to a MongoDB Atlas and perform the insertion of received messages.

import pymongo
import pika, sys, os
from datetime import datetime

# Crear una conexion con MongoClient
client = pymongo.MongoClient("mongodb+srv://NombreUser:PasswordUser@clusterName.moczg.mongodb.net/rabbit?retryWrites=true&w=majority")

# Database
db = client["rabbit"]

# Collection
collection= db["mensajes"]

def main(queue='delivery'):
    connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
    channel = connection.channel()
    channel.queue_declare(queue=queue)

    def callback(ch, method, properties, body):
        print(" [x] Received %r" % body.decode())
        body_indsert={'fecha':datetime.now(),'queue':queue,'message':body.decode()}
        db["mensajes"].insert_one(body_indsert)

    channel.basic_consume(queue='hello', on_message_callback=callback, auto_ack=True)

    print(' [*] Waiting for messages. To exit press CTRL+C')
    channel.start_consuming()

if __name__ == '__main__':
    try:
        main(queue=queue)
    except KeyboardInterrupt:
        print('Interrupted')
        try:
            sys.exit(0)
        except SystemExit:
            os._exit(0)

To download the code for these two files, you can do so from the following link.


To learn more about RabbitMQ, you can visit the following sites:


Reuse

Text and figures are licensed under Creative Commons Attribution CC BY 4.0. The figures that have been reused from other sources don't fall under this license and can be recognized by a note in their caption: "Figure from ...".

Citation

For attribution, please cite this work as

Mendez (2023, Dec. 2). Romina Mendez: RabbitMQ-Pika. Retrieved from https://r0mymendez.github.io/posts_en/2023-12-02-rabbitmq-pika/

BibTeX citation

@misc{mendez2023rabbitmq-pika,
  author = {Mendez, Romina},
  title = {Romina Mendez: RabbitMQ-Pika},
  url = {https://r0mymendez.github.io/posts_en/2023-12-02-rabbitmq-pika/},
  year = {2023}
}