Message Brokers - Producer/Consumer
On the preview text we were playing a bit with Go and RabbitMQ. There we had
one producer sending messages and one or multiple consumers. All one of them
had something in common. That something was a queue. The producer was producing
and publishing messages to that queue and the consumers were consuming and
processing any message coming from that queue. We also remember RabbitMQ will
try to distribute those messages in between the consumers and that distribution
is equal. It there are 6 messages and 3 consumers each will receive and process
2 messages.
Message Brokers - Publish/Subscribers - Broadcasting
Now we want to play a little with another concept behind Message Brokers which
is “Publish/Subscribers”. It works like a broadcast. One produce will produce
a message and publish it to “something”. Consumers will later on create their
own queues and subscribe their queues to this “something” so they can receive
all the messages delivered there. RabbitMQ call this “something” as “exchanger”.
In a Publish/Subscriber scenario, a producer will create an exchange in which
consumers will bind (subscribe) their own queues to it. Later the producer will
publish messages to that exchange and it is the exchanger duty to broadcast that
message to all the subscribed queues.
How can we use producer/subscriber on our example
The idea is that later I’m going to develop the same consumer in different
languages so I can compare the time each language will take to perform the
same task. To have it clear lets say I have four consumers:
- Written in GoLang
- Written in Rust
- Written in Python
- Written in C#
They will all performe the same task and subscribe to the same exchange so
when the user sends a new data to process. They will all run and they will
all produce an output of how long it took to finish that task.
A scenario to work on
- Our producer will create a new exchange by the name of “MathExchange”
- We are going to launch 3 instances of our consumers. Each one of them will
create their own queues and bind them (subscribe) to the “MathExchange”
exchanger
- Producer will receive and HTTP request to execute something
- Producer will create and publish a message to “MathExchange” exchanger
- All of our consumers will get a copy of that message in their private queues
- Consumers will now receive the message and process it
Consumer Channels
Those you see are the 3 consumer instances we have running.
MathExchange Exchange and Bindings
As described on RabbitMQ we now have one Exchange named as “MathExchange” and 3
subscribers binded to it
Bindings to MathExchange
Now we see one producer running on the bottom right of the screen and 3
consumers. When I took that screen shot the message was already produced by the
producer and consumed by all of the 3 consumers as we expected.
Code
Consumer
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
|
package main
import (
"encoding/json"
"fmt"
"log"
"math/big"
"time"
"github.com/streadway/amqp"
)
type isPrime struct {
X int64 `json:"x"`
}
func processMsg(msg []byte) {
var data isPrime
errM := json.Unmarshal(msg, &data)
if errM != nil {
fmt.Println(errM)
}
start := time.Now()
n := data.X
if big.NewInt(n).ProbablyPrime(0) {
fmt.Println(n, "is prime")
} else {
fmt.Println(n, "is not prime")
}
elapsed := time.Since(start)
log.Printf("DoTheMath checking if %d is prime took %s", n, elapsed)
}
func failOnError(err error, msg string) {
if err != nil {
log.Fatalf("%s: %s", msg, err)
}
}
func main() {
conn, errC := amqp.Dial("amqp://unicorn:yellow@localhost:5672/")
failOnError(errC, "Faild to connect to RabbitMQ")
defer conn.Close()
ch, errCh := conn.Channel()
failOnError(errCh, "Failed to Open a channel")
defer ch.Close()
errEx := ch.ExchangeDeclare(
"MathExchange", // name
"fanout", // type
true, // durable
false, // auto-delete
false, // internal
false, // no-wait
nil) // arguments
failOnError(errEx, "Fail to declare an exchange")
q, errQ := ch.QueueDeclare(
"", // name
true, // durable
false, // delete when unused
true, // exclusive
false, // no-wait
nil) // arguments
failOnError(errQ, "Fail to declare a queue")
errB := ch.QueueBind(
q.Name, // queue name
"", // routing key
"MathExchange", // exchange
false,
nil,
)
failOnError(errB, "Fail to bild a queue to MathExchange")
msgs, errP := ch.Consume(
q.Name, // queue
"GoLang", // consumer
true, // auto-ack
false, // exclusive
false, // no-local
false, // no-wait
nil, // arguments
)
failOnError(errP, "Fail to register a consumer")
listening := make(chan bool)
go func() {
for d := range msgs {
fmt.Printf("Recieved Message: %s\n", d.Body)
processMsg(d.Body)
}
}()
fmt.Println(" - Waiting for messages")
<-listening
defer conn.Close()
}
|
The Consumer will
- Connect to RabitMQ
- Declare a channel
- Declare an exchange (same exchange from the producer)
- Declare a queue (no name to get a unique name for the running instance)
- Bind the declared queue to the exchanhe (subscribe to it)
- Listen for messages
- Process the message and go back to listen for messages
Producer
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
|
package main
import (
"fmt"
"io/ioutil"
"log"
"net/http"
"github.com/gorilla/mux"
"github.com/streadway/amqp"
)
func scheadulePrimeChecker(w http.ResponseWriter, r *http.Request) {
fmt.Println("new addition request")
reqBody, errB := ioutil.ReadAll(r.Body)
if errB != nil {
fmt.Println(w, "Ops! A disaster just hapenned. I can't read body content.")
}
defer r.Body.Close()
queueMsgRabbitMQ(reqBody)
}
func queueMsgRabbitMQ(data []byte) {
fmt.Println("Queue a msg to RabbitMQ Resquest")
conn, errC := amqp.Dial("amqp://unicorn:yellow@localhost:5672/")
failOnError(errC, "Failed to connect to RabbitMQ")
defer conn.Close()
ch, errCh := conn.Channel()
failOnError(errCh, "Failed to Open a Channel")
defer ch.Close()
errEx := ch.ExchangeDeclare(
"MathExchange", // name
"fanout", // type
true, // durable
false, // auto-delete
false, // internal
false, // no-wait
nil) // arguments
failOnError(errEx, "Fail to declare an exchange")
errP := ch.Publish(
"MathExchange", // exchange
"", // routing key
false, // mandatory
false, // immediate
amqp.Publishing{
DeliveryMode: amqp.Persistent,
ContentType: "application/json",
Body: data,
},
)
failOnError(errP, "Fail to publish a message to the queue")
fmt.Println("Successfully Published Message to RMQ Queue")
defer conn.Close()
}
func failOnError(err error, msg string) {
if err != nil {
log.Fatalf("%s: %s", msg, err)
}
}
func main() {
router := mux.NewRouter().StrictSlash(true)
router.HandleFunc("/isprime", scheadulePrimeChecker).Methods("PUT")
log.Fatal(http.ListenAndServe(":8081", router))
}
|
The Producer will:
- Connect to RabitMQ
- Declare a channel
- Declare an exchange
- Publish a message to that exchange
Inspirations
RabbitMQ tutorial