Contents

Messages Brokers - Publish/Subscribers - Broadcasting

Message Brokers - Producer/Consumer

On the preview text we were playing a bit with Go and RabbitMQ. There we had one producer sending messages and one or multiple consumers. All one of them had something in common. That something was a queue. The producer was producing and publishing messages to that queue and the consumers were consuming and processing any message coming from that queue. We also remember RabbitMQ will try to distribute those messages in between the consumers and that distribution is equal. It there are 6 messages and 3 consumers each will receive and process 2 messages.

Message Brokers - Publish/Subscribers - Broadcasting

Now we want to play a little with another concept behind Message Brokers which is “Publish/Subscribers”. It works like a broadcast. One produce will produce a message and publish it to “something”. Consumers will later on create their own queues and subscribe their queues to this “something” so they can receive all the messages delivered there. RabbitMQ call this “something” as “exchanger”.

In a Publish/Subscriber scenario, a producer will create an exchange in which consumers will bind (subscribe) their own queues to it. Later the producer will publish messages to that exchange and it is the exchanger duty to broadcast that message to all the subscribed queues.

How can we use producer/subscriber on our example

The idea is that later I’m going to develop the same consumer in different languages so I can compare the time each language will take to perform the same task. To have it clear lets say I have four consumers:

  1. Written in GoLang
  2. Written in Rust
  3. Written in Python
  4. Written in C#

They will all performe the same task and subscribe to the same exchange so when the user sends a new data to process. They will all run and they will all produce an output of how long it took to finish that task.

A scenario to work on

  1. Our producer will create a new exchange by the name of “MathExchange”
  2. We are going to launch 3 instances of our consumers. Each one of them will create their own queues and bind them (subscribe) to the “MathExchange” exchanger
  3. Producer will receive and HTTP request to execute something
  4. Producer will create and publish a message to “MathExchange” exchanger
  5. All of our consumers will get a copy of that message in their private queues
  6. Consumers will now receive the message and process it

Consumer Channels

Those you see are the 3 consumer instances we have running.

MathExchange Exchange and Bindings

As described on RabbitMQ we now have one Exchange named as “MathExchange” and 3 subscribers binded to it

Bindings to MathExchange

Now we see one producer running on the bottom right of the screen and 3 consumers. When I took that screen shot the message was already produced by the producer and consumed by all of the 3 consumers as we expected.

Code

Consumer

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
package main

import (
	"encoding/json"
	"fmt"
	"log"
	"math/big"
	"time"

	"github.com/streadway/amqp"
)

type isPrime struct {
	X int64 `json:"x"`
}

func processMsg(msg []byte) {
	var data isPrime
	errM := json.Unmarshal(msg, &data)
	if errM != nil {
		fmt.Println(errM)
	}
	start := time.Now()

	n := data.X
	if big.NewInt(n).ProbablyPrime(0) {
		fmt.Println(n, "is prime")
	} else {
		fmt.Println(n, "is not prime")
	}

	elapsed := time.Since(start)
	log.Printf("DoTheMath checking if %d is prime took %s", n, elapsed)
}

func failOnError(err error, msg string) {
	if err != nil {
		log.Fatalf("%s: %s", msg, err)
	}
}

func main() {
	conn, errC := amqp.Dial("amqp://unicorn:yellow@localhost:5672/")
	failOnError(errC, "Faild to connect to RabbitMQ")
	defer conn.Close()

	ch, errCh := conn.Channel()
	failOnError(errCh, "Failed to Open a channel")
	defer ch.Close()

	errEx := ch.ExchangeDeclare(
		"MathExchange", // name
		"fanout",       // type
		true,           // durable
		false,          // auto-delete
		false,          // internal
		false,          // no-wait
		nil)            // arguments
	failOnError(errEx, "Fail to declare an exchange")

	q, errQ := ch.QueueDeclare(
		"",    // name
		true,  // durable
		false, // delete when unused
		true,  // exclusive
		false, // no-wait
		nil)   // arguments
	failOnError(errQ, "Fail to declare a queue")

	errB := ch.QueueBind(
		q.Name,         // queue name
		"",             // routing key
		"MathExchange", // exchange
		false,
		nil,
	)
	failOnError(errB, "Fail to bild a queue to MathExchange")

	msgs, errP := ch.Consume(
		q.Name,   // queue
		"GoLang", // consumer
		true,     // auto-ack
		false,    // exclusive
		false,    // no-local
		false,    // no-wait
		nil,      // arguments
	)
	failOnError(errP, "Fail to register a consumer")

	listening := make(chan bool)
	go func() {
		for d := range msgs {
			fmt.Printf("Recieved Message: %s\n", d.Body)
			processMsg(d.Body)
		}
	}()

	fmt.Println(" - Waiting for messages")
	<-listening

	defer conn.Close()
}

The Consumer will

  1. Connect to RabitMQ
  2. Declare a channel
  3. Declare an exchange (same exchange from the producer)
  4. Declare a queue (no name to get a unique name for the running instance)
  5. Bind the declared queue to the exchanhe (subscribe to it)
  6. Listen for messages
  7. Process the message and go back to listen for messages

Producer

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
package main

import (
	"fmt"
	"io/ioutil"
	"log"
	"net/http"

	"github.com/gorilla/mux"
	"github.com/streadway/amqp"
)

func scheadulePrimeChecker(w http.ResponseWriter, r *http.Request) {
	fmt.Println("new addition request")
	reqBody, errB := ioutil.ReadAll(r.Body)
	if errB != nil {
		fmt.Println(w, "Ops! A disaster just hapenned. I can't read body content.")
	}
	defer r.Body.Close()
	queueMsgRabbitMQ(reqBody)
}

func queueMsgRabbitMQ(data []byte) {
	fmt.Println("Queue a msg to RabbitMQ Resquest")

	conn, errC := amqp.Dial("amqp://unicorn:yellow@localhost:5672/")
	failOnError(errC, "Failed to connect to RabbitMQ")
	defer conn.Close()

	ch, errCh := conn.Channel()
	failOnError(errCh, "Failed to Open a Channel")
	defer ch.Close()

	errEx := ch.ExchangeDeclare(
		"MathExchange", // name
		"fanout",       // type
		true,           // durable
		false,          // auto-delete
		false,          // internal
		false,          // no-wait
		nil)            // arguments
	failOnError(errEx, "Fail to declare an exchange")

	errP := ch.Publish(
		"MathExchange", // exchange
		"",             // routing key
		false,          // mandatory
		false,          // immediate
		amqp.Publishing{
			DeliveryMode: amqp.Persistent,
			ContentType:  "application/json",
			Body:         data,
		},
	)
	failOnError(errP, "Fail to publish a message to the queue")

	fmt.Println("Successfully Published Message to RMQ Queue")
	defer conn.Close()
}

func failOnError(err error, msg string) {
	if err != nil {
		log.Fatalf("%s: %s", msg, err)
	}
}

func main() {
	router := mux.NewRouter().StrictSlash(true)
	router.HandleFunc("/isprime", scheadulePrimeChecker).Methods("PUT")
	log.Fatal(http.ListenAndServe(":8081", router))
}

The Producer will:

  1. Connect to RabitMQ
  2. Declare a channel
  3. Declare an exchange
  4. Publish a message to that exchange

Inspirations

RabbitMQ tutorial