Rabbit MQ

RabbitMQ: Smart Decoupling Rabbits!

I am a big fan of asynchronous software architectures, hence love the words like micro-services, data-streaming, events, decoupling, and many more. It can add a new dimension to scalable, reliable systems.

Backend development is pretty much easy until the scaling part comes into the picture, and suddenly it feels like a creepy system. Considering the various services connecting and depending on each other for scaling the system needs to have a good messaging service. To handle the long-running API load decoupling the system with micro-services is the most preferable solution. Message brokers like RabbitMQ and Kafka are leading and making it easy for all of us.

Then the major question comes up is, “Is decoupling necessary?” Is it not adding latency and extra overhead and managing part to the server. Yes definitely. But it solves all the major scaling problems. And the best example I can give is the ‘Conveyer Belt’ at the airport. It makes everyone’s job easy. If there isn’t a belt airport manager needs to allocate the persons to distribute the luggage, and also the queue of persons would make a mess which might be a headache unnecessarily.  Decoupling a system with micro-services is the same key to a complex system.

There are so many good solutions in the market like Kafka, RabbitMQ, ActiveMQ, ZeroMQ, JMS but for this article, we will stick to RabbitMQ as it’s easy to get it started and can handle a huge load. Will cover all the basic aspects of RabbitMQ with a sample Simple Logging System.

Comparison between RabbitMQ and Kafka

– RabbitMQ is a smart broker and dumb consumer. Scalable, fast, and has a message size limit of 2GB(128MB preferred). It can handle 50 thousand messages/sec and is very easy to configure.

– Kafka is much more powerful than a rabbit with scaling and clustering but not built for huge messages, it has a preferable message size limit of only 1MB to 10MB. It’s a dumb broker and smart consumer and can handle millions of messages per second easily.

Advanced Message Queuing Protocol (AMQP)

HTTP is a synchronous protocol, but as I said to decouple a system we need asynchronous protocol i.e. AMQP, which provides us a way to handle multiple lightweight connections over a single TCP connection by creating channels. AMQP uses the Pub-Sub model.

AMQP 0-9-1

Usually, the publisher sends a message and the consumer consumes it and works on a task, but rabbit uses the 0-9-1 model where there is a middleware that handles the message routing and acknowledgments with the help of exchanges.

Channels

We can connect the services using multiple channels and with one TCP connection and transfer the data efficiently. It reduces the connection overhead, handshake as well as network cost.

RabbitMQ

Publishers and Consumers

You can add any number of publishers and consumers as per your requirement, to scale your system. But the preferable solution is you can add a single publisher and multiple consumers as volume increases.

Let’s consider, in our system, we need to handle information, warning, and error logs. So we can add a single logging service, later as load increases you can again divide this service into info, warn, error services. So currently we have one publisher for adding new log, and one subscriber for handling logging logic(worker).

Exchanges, Bindings, and Queues

Sending a message from Publisher to Subscriber can be achieved using various ways. In RabbitMQ it is achieved using Exchanges, Queues with the help of Bindings(routing keys).

Bindings are the glue between Exchanges and Queues which helps to route a message to a destination with some set of rules.

Queues are used to store the data and make the system stateful, although we can make Queues stateless it’s not recommended. The incoming data is passed to queues and then the consumer does work on them. Queues are at the consumer-facing so with the positive acknowledgment we inform the exchange that the message has been delivered.

Exchanges are of major four types Direct, Topic, Fan-out, and Headers. There is one more type that is used for removing the message is Dead Letter Exchange.

RabbitMQ

Using the above image we will understand how each type of exchange works.

1. Direct Exchange

Direct Exchange is the default exchange type, where the data is directly sent to the queue with the help of matching routing keys(bindings). Suppose the key for the current message is matching with “log.error.user then it will send a message to Error Queue.

2. Topic Exchange

This is similar to Direct Exchange but here data can be sent to multiple queues by matching routing keys and patterns as per the chosen topic. Patterns include [*, #] where * means match specific position Eg: “log.*.user” and # means matching all characters Eg: “log.info.#”. 

Suppose we have a topic given as “log.*.user” it will match the key and pattern and send a message to both Error and Information Queue but not to Warning Queue as it’s not in the same topic.

3. Fan-out Exchange

This exchange sends messages to all the present queues directly and the provided keys will be ignored.

4. Headers Exchange

In this exchange, the header value is checked and routing is done on that basis. The header may contain values like[all, any]. As a special argument, this key-value pair is added and matched with headers.

5. Dead letter Exchanges

Whenever the key is not matched or the exchange fails to identify the destination for the message, then the undelivered messages will be dropped into Dead Letter Exchange.

Advantages and Usage

  1. Communication between Micro-Services(shared-nothing model) done with the rabbits.
  2. Decoupling helps in improving the performance of time-consuming APIs.
  3. Fire and forget. When a rabbit is introduced we don’t have to rethink that part after decoupling.
  4. Reduces the dependency of shared resources by queue management.
  5. By using the stateful model we can make the system durable and redo the things we want.
  6. Channels reduce costly TCP connections and make data transfer very easy.
  7. We can add Dead Letter Queues to handle undelivered messages.

Conclusion

For managing the API load of your system nicely and scaling on-demand, use asynchronous protocols, decouple the system with micro-services. Don’t hesitate to add a new small service at any time. Use message brokers like RabbitMQ to communicate between various services and make rabbits run faster.

Happy Coding!

We are using a factory pattern to create a blueprint of all queues, so that don’t need to duplicate the rabbit code and at the time of change in rabbit structure just need to apply the change in a single place.

// rabbit class
class RabbitMQ {
    constructor(connection, name) {
        this.connection = connection;
        this.name = name; this.channel;
    };


    ////////////////////////
    //     init queue     //
    ////////////////////////

    initQueue = async () => {

        // create channel
        const channel = await this.connection.createChannel();

        // assign to this.channel
        this.channel = channel;

        // assert the queue
        await this.channel.assertQueue(this.name, { durable: true });
    };


    /////////////////////////
    //       publish       //
    ///////////////////////// 

    publishMessage = async (message) => {

        // create buffer data
        const messageBuffer = Buffer.from(JSON.stringify(message));

        // publish a message
        const resultSend = this.channel.sendToQueue(
            this.name, messageBuffer, { persistent: true }
        );

        // throw error
        if (!resultSend) throw Error(`Error in publishing!`);
    };


    /////////////////////////
    //       consume       //
    /////////////////////////

    consumeMessage = async (worker) => {
        return this.channel.consume(this.name, async (messageBuffer) => {

           // received object
           const messageObject = JSON.parse(messageBuffer.content);

           try {
               // send websocket message
               if (messageObject) await worker(messageObject);
           } catch (err) {
               console.error("Error while consuming message" + err.stack);
           };

           // acknowledge the message
           this.channel.ack(messageBuffer);

        }, {
            // no automatic ack
            noAck: false
        });
     };
 };

After creating the class template initialize and create queue messages.

////////////////////////
//     initialize     //
////////////////////////

const amqplib = require('amqplib');


let rabbits = {};


const initQueuesAndConsumers = async () => {
    const connection = await amqplib.connect(amqp://localhost);

    // create instances 
    rabbits.logError(connection, 'error_handling_queue');
    rabbits.logWarning(connection, 'warning_handling_queue');
    rabbits.logInfo(connection, 'information_handling_queue');

    // init queues
    rabbits.logError.initQueue();
    rabbits.logWarning.initQueue();
    rabbits.logInfo.initQueue();

    // handle received message
    rabbits.logError.consumeMessage(worker(messageObject));
    rabbits.logWarning.consumeMessage(worker(messageObject));
    rabbits.logInfo.consumeMessage(worker(messageObject));
};

initQueuesAndConsumers();


////////////////////////
//  publish messages  //
////////////////////////

rabbits.logError.publishMessage({
    code: 401,
    file: "authentication.js",
    message: "JWT token expired"
});

Note

If you want to get an in-depth idea about RabbitMQ Architecture, I will suggest the below books.

  1. RabbitMQ Essentials, a book by Lovisa Johansson.
  2. RabbitMQ in Action, a book by Jason J.W. Williams.

Visit medium to read more: https://medium.com/ever-blogs

You Might Also Like
%d bloggers like this: