Contents

Messages Brokers

Message Brokers

You can think about it as a post office. Alice will write a letter to Ana and then go to the post office to post that letter. Later on, the post office will check Ana’s address and deliver it to her. Ana can also acknowledge that she got the letter by signing a document and return it to the postman who represents the post office.

Message Brokers work in a similar way. They have an central server (the post office. Represented by a Queue) which the address is known by the producers (Alice). So the producer can send a message to that broker who will later on deliver it to a consumer (Ana). The consumer can, as in the post office, acknowledge the fact that the message was correctly delivered by sending the message broker an “ACK” message.

A real post office has some limitations like the number of cars, the number of employees, the storage room size, etc. Some of those limitations can make the service become slow or overloaded. The same happens to our message brokers. Their limitations are related to the hardware they are running in: CPU, Memory, Storage size, Network bandwidth, etc.

But what is a Message Broker

It is a software that enables services to communicate with each other and exchange information. It is done by implementing a Queue in which one or more producers can post message and one or more consumer can read it or compete.

The beauty is that neither the producer nor the consumer knows each other and can be written in different languages, they can be performed in different operations systems and even in different countries. The only thing they share in common is that both know how the message should look like and what is the broker address.

When to use

There are many scenarios where you can use a Message Broker. Some are:

  • Message Brokers are commonly used whenever we have some asynchronous task to execute.

Let’s say the user demanded a system to perform some heavy math. We don’t want our service to be on waiting for that calculation to finish and depending on the sort of calculation we want it to be performed in some specific machine with a GPU. For that case we, as a producer, queue a message on the message broker and later another service listening to that same queue will receive the message and start the process to proceed with the calculation

  • Routing messages to one or more destinations

Later we will see that message brokers have different ways to distribute the message produced by a producer. On is to deliver the same message to multiple consumers.

BE AWARE

Message Brokers are not safe. Don’t put to much trust into it. They won’t encrypt your messages and consumer that has access to the queue will be able to read produced messages and depending on what your consumers are doing with the messages any producer can take control of it. If you are using Message Brokers to message senstive information or commands to the consumers you should encrypt the messages before send it to the queue. Check for Command Injection

Some Message Brokers

Rabbit MQ

Kafka

Azure Service Bus

Amazon MQ

Our experiment

I’m going to start with a very simple experiment using RabbitMQ because it is free and you can try it on your local machine but since all the experiment I’m doing is to have something to learn more about Azure I will later on extend the example to work with Azure Service Bus.

The main idea is to use Golang to develop some of the producers and consumers but I might use other languages as well.

There are many Client libraries out there which you can use to communicate with the Message Broke you choose to use. Since we are now working with GoLang and RabbitMQ I’m going to be using the Go RabbitMQ Client Library

Some containers to run our experiment

We are going to need a RabbitMQ instance running on our local machine

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
# docker-compose.yml
---
version: '3'
services:
  rabbitmq:
    image: rabbitmq:3-management
    container_name: rabbitmq
    environment:
      RABBITMQ_DEFAULT_USER: unicorn
      RABBITMQ_DEFAULT_PASS: yellow
    ports:
      - 5672:5672
      - 15672:15672

Producer

  1. It will launch a web server waiting for a JSON on http://localhost:8081/isprime
  2. It will connect to the RabbitMQ and queue a message so another process can consume the data and check if the number is prime or not
1
2
3
4
# JSON Object content
{
	"x": 2
}

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
package main

import (
	"fmt"
	"io/ioutil"
	"log"
	"net/http"

	"github.com/gorilla/mux"
	"github.com/streadway/amqp"
)

func scheadulePrimeChecker(w http.ResponseWriter, r *http.Request) {
	fmt.Println("new addition request")
	reqBody, errB := ioutil.ReadAll(r.Body)
	if errB != nil {
		fmt.Println(w, "Ops! A disaster just hapenned. I can't read body content.")
	}
	defer r.Body.Close()
	queueMsgRabbitMQ(reqBody)
}

func queueMsgRabbitMQ(data []byte) {
	fmt.Println("Queue a msg to RabbitMQ Resquest")

	conn, errC := amqp.Dial("amqp://unicorn:yellow@localhost:5672/")
	failOnError(errC, "Failed to connect to RabbitMQ")
	defer conn.Close()

	ch, errCh := conn.Channel()
	failOnError(errCh, "Failed to Open a Channel")
	defer ch.Close()

	_, errQ := ch.QueueDeclare(
		"DoTheMath",
		true,  // durable
		false, // delete when unused
		false, // exclusive
		false, // no-wait
		nil)   // arguments
	failOnError(errQ, "Fail to declare a queue")

	errP := ch.Publish(
		"",
		"DoTheMath",
		false,
		false,
		amqp.Publishing{
			DeliveryMode: amqp.Persistent,
			ContentType:  "application/json",
			Body:         data,
		},
	)
	failOnError(errP, "Fail to publish a message to the queue")

	fmt.Println("Successfully Published Message to RMQ Queue")
	defer conn.Close()
}

func failOnError(err error, msg string) {
	if err != nil {
		log.Fatalf("%s: %s", msg, err)
	}
}

func main() {
	router := mux.NewRouter().StrictSlash(true)
	router.HandleFunc("/isprime", scheadulePrimeChecker).Methods("PUT")
	log.Fatal(http.ListenAndServe(":8081", router))
}

Consumer

  1. It will connect to a RabbitMQ Queue and wait for a new message
  2. When a new message is delivered it will check if the number is prime or not
  3. It goes back to the point where it waits for a new message to arrive
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
package main

import (
	"encoding/json"
	"fmt"
	"log"
	"math/big"
	"time"

	"github.com/streadway/amqp"
)

type isPrime struct {
	X int64 `json:"x"`
}

func processMsg(msg []byte) {
	var data isPrime
	errM := json.Unmarshal(msg, &data)
	if errM != nil {
		fmt.Println(errM)
	}
	start := time.Now()

	n := data.X
	if big.NewInt(n).ProbablyPrime(0) {
		fmt.Println(n, "is prime")
	} else {
		fmt.Println(n, "is not prime")
	}

	elapsed := time.Since(start)
	log.Printf("DoTheMath checking if %d is prime took %s", n, elapsed)
}

func failOnError(err error, msg string) {
	if err != nil {
		log.Fatalf("%s: %s", msg, err)
	}
}

func main() {
	conn, errC := amqp.Dial("amqp://unicorn:yellow@localhost:5672/")
	failOnError(errC, "Faild to connect to RabbitMQ")
	defer conn.Close()

	ch, errCh := conn.Channel()
	failOnError(errCh, "Failed to Open a channel")
	defer ch.Close()

	_, errQ := ch.QueueDeclare(
		"DoTheMath",
		true,  // durable
		false, // delete when unused
		false, // exclusive
		false, // no-wait
		nil)   // arguments
	failOnError(errQ, "Fail to declare a queue")

	msgs, errP := ch.Consume(
		"DoTheMath",
		"GoLang",
		true,
		false,
		false,
		false,
		nil,
	)
	failOnError(errP, "Fail to register a consumer")

	listening := make(chan bool)
	go func() {
		for d := range msgs {
			fmt.Printf("Recieved Message: %s\n", d.Body)
			processMsg(d.Body)
		}
	}()

	fmt.Println(" - Waiting for messages")
	<-listening

	defer conn.Close()
}

Running multiple consumers at once

By default RabbitMQ will send each message to the next consumer, in sequence. On average every consumer will get the same number of messages. RabbitMQ uses round-robin method to distribute the messages.

Message acknowledgment

If you enable acknowledgment for your queue. An ack(nowledgement) is sent back by the consumer to tell RabbitMQ that a particular message has been received, processed and that RabbitMQ is free to delete it. If a connection is closed before an ACK RabbitMQ will re-queue this message.

Message durability

If enabled it will save us from loosing messages in case RabbitMQ server dies. But setting a queue as durable is not enough. We also need to set the messages we send as Persistent. Not all the messages need to be Persistent but only those which you can’t loose in case RabbitMQ server restarts. Persistent messages will be saved to the disk but those messages will be lost in case your sever crashes before it writes to the disk.

Inspirations

Breaking out of message brokers

RabbitMQ tutorial