RabbitMQ allows you to manage message queues between senders and recipients. In the next post we are going to use Pika in python for its implementation.
RabbitMQ enables the management of message queues between senders and receivers. In the following post, we will employ Python’s Pika library for its implementation.
RabbitMQ is an intermediary system designed to facilitate the transfer of messages between producers and consumers through the implementation of queues.
This component, essential in distributed systems architecture, is grounded in key concepts:
1️⃣. Producer: The entity responsible for originating and dispatching messages.
2️⃣. Queue: A reservoir where messages are temporarily stored.
3️⃣. Consumer: The receiving instance that processes messages according to the system’s needs.
This introduction aims to provide a clear and concise overview of the fundamental elements of RabbitMQ, paving the way for a deeper understanding of its functioning in messaging environments.
Implementing in Python with the Pika library involves creating two essential programs: the producer and the consumer.
Pika
provides an effective interface for communication with RabbitMQ, leveraging a set of carefully designed objects for this purpose.
In our practical example, envision the producer as an application designed to manage food delivery orders 🛵. This application, geared towards optimizing the delivery process, is responsible for sending multiple messages📝 related to user 📱 food orders.
To achieve this implementation, we will undertake the following steps:
Steps | Descriptions |
---|---|
Producer: | Develop a program that, like an efficient order-taker, generates and sends messages📝 to the RabbitMQ queue. These messages will contain valuable information about food orders. |
Consumer: | Create a program that acts as the receiver of these messages in the queue. The consumer will be responsible for processing these messages according to the system’s needs, performing relevant actions, such as managing the delivery of orders. |
This structured and efficient approach ensures a clear and functional implementation, providing a robust foundation for systems managing information flows in dynamic environments.
!pip install pika
send.py
📄 fileimport pika
from datetime import datetime
= pika.BlockingConnection(
connection ='localhost'))
pika.ConnectionParameters(host= connection.channel()
channel
='delivery')
channel.queue_declare(queue
=['🍕🍕🍕','🍔🍔🍔','🍰🍰🍰','🍺🍺🍺']
pedidos
for i in pedidos:
='', routing_key='delivery', body=i)
channel.basic_publish(exchangeprint(" [x] Se envia pedido!'"+ i)
connection.close()
import pika, sys, os
from datetime import datetime
def main(queue='delivery'):
= pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
connection = connection.channel()
channel =queue)
channel.queue_declare(queue
def callback(ch, method, properties, body):
print(" [x] Received %r" % body.decode())
='delivery', on_message_callback=callback, auto_ack=True)
channel.basic_consume(queue
print(' [*] Waiting for messages. To exit press CTRL+C')
channel.start_consuming()
if __name__ == '__main__':
try:
=queue)
main(queueexcept KeyboardInterrupt:
print('Interrupted')
try:
0)
sys.exit(except SystemExit:
0) os._exit(
In the following, we will modify the script to enable it to connect to a MongoDB Atlas and perform the insertion of received messages.
import pymongo
import pika, sys, os
from datetime import datetime
# Crear una conexion con MongoClient
= pymongo.MongoClient("mongodb+srv://NombreUser:PasswordUser@clusterName.moczg.mongodb.net/rabbit?retryWrites=true&w=majority")
client
# Database
= client["rabbit"]
db
# Collection
= db["mensajes"]
collection
def main(queue='delivery'):
= pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
connection = connection.channel()
channel =queue)
channel.queue_declare(queue
def callback(ch, method, properties, body):
print(" [x] Received %r" % body.decode())
={'fecha':datetime.now(),'queue':queue,'message':body.decode()}
body_indsert"mensajes"].insert_one(body_indsert)
db[
='hello', on_message_callback=callback, auto_ack=True)
channel.basic_consume(queue
print(' [*] Waiting for messages. To exit press CTRL+C')
channel.start_consuming()
if __name__ == '__main__':
try:
=queue)
main(queueexcept KeyboardInterrupt:
print('Interrupted')
try:
0)
sys.exit(except SystemExit:
0) os._exit(
To download the code for these two files, you can do so from the following link.
To learn more about RabbitMQ, you can visit the following sites:
Text and figures are licensed under Creative Commons Attribution CC BY 4.0. The figures that have been reused from other sources don't fall under this license and can be recognized by a note in their caption: "Figure from ...".
For attribution, please cite this work as
Mendez (2023, Dec. 2). Romina Mendez: RabbitMQ-Pika. Retrieved from https://r0mymendez.github.io/posts_en/2023-12-02-rabbitmq-pika/
BibTeX citation
@misc{mendez2023rabbitmq-pika, author = {Mendez, Romina}, title = {Romina Mendez: RabbitMQ-Pika}, url = {https://r0mymendez.github.io/posts_en/2023-12-02-rabbitmq-pika/}, year = {2023} }