Hello again to this series where I try to build a Message Bus, in the simplest way possible to understand the architecture and how it works.
In the first episode of this series, we talked about what a Message Bus is, why it's useful, and started off with the producer part of the code.
If you haven't read that one, please do as we're going to build on top of it.
Here's a simple diagram to help us see the components (Broker, Consumer, Producer) and how they interact with each other.
In this episode, I'm going to implement the broker part responsible of handling a new message, and storing it somewhere.
In other words, we're going to fully implement the green lines in the diagram above.
In the next episode however, we're going to tackle the consumers and that's when (hopefully, remember I don't know exactly how things are going... just going with the flow) all the pieces come together.
Enough talk, let's create out broker:
// internal/broker/broker.go
type Broker struct {
config *BrokerConfig
// utils
decoder decoder.Decoder
// core
topics map[string]*Topic
}
- The
config
field is used to group all configuration in one place instead of having theBroker
struct bloated with fields. - Our broker needs to be able to "decode" messages sent by the producers, so a
Decoder
interface is needed, we'll implement it next. - The
topics
field is a map between topic names, andTopic
objects. We'll get back to this shortly.
1. Broker config
Here's how the BrokerConfig
looks like:
// internal/broker/config.go
type BrokerConfig struct {
// fields relevant to producers
ProducerHost string
ProducerPort string
}
For the time being, it only contains fields relevant to the producer part, but we'll add to it along the way.
2. Decoder
The interface of the decoder is
// internal/shared/decoder/decoder.go
package decoder
import "mbus/internal/apiv1"
type Decoder interface {
Decode([]byte) (*apiv1.Message, error)
}
And we implement it using msgpack
:
// internal/shared/decoder/msgpack.go
package decoder
import (
"mbus/internal/apiv1"
"github.com/vmihailenco/msgpack"
)
type MsgpackDecoder struct {
}
func (*MsgpackDecoder) Decode(data []byte) (*apiv1.Message, error) {
var msg apiv1.Message
err := msgpack.Unmarshal(data, &msg)
if err != nil {
return nil, err
}
return &msg, nil
}
3. Topics
Let's zoom inside the broker's internals to understand what map
looks like.
- We receive a message from a producer on the "orders" topic (in read life, the message might be a JSON object describing the order).
- We route it to the appropriate topic object using our
topics
map. - We add the message to the topic and diaptch
Note: Dispatch is a term used a lot in both event-based and messaging systems, and it just means delivering the event/message to all its listeners/consumers.
Because we don't have the consumer part of the system build yet, I wanted to just save the messages on the topic object itself until we build the consumer part.
To do that, we need the topic object to have a queue (First In, First Out) that we add messages to.
We can build a simple queue using a Go slice, but it's always best to use language-provided tools and mechanisms in its standard libary. We can use container/list
package which implements a doubly-linked list as a queue.
Implementing a queue as a doubly-linked list is better in terms of memory reuse compared to a slice... We're not building Kafka as to worry about memory optimizations but just saying.
Alright. I can hear you say "Enough talk, show me the code."
Let's first implement a queue using container/list
:
// internal/shared/queue/queue.go
package queue
import (
"container/list"
"mbus/internal/apiv1"
)
type MessageQueue struct {
list *list.List
}
func New() *MessageQueue {
return &MessageQueue{
list: list.New(),
}
}
func (q *MessageQueue) Enqueue(message *apiv1.Message) {
q.list.PushBack(message)
}
func (q *MessageQueue) Dequeue() *apiv1.Message {
if q.list.Len() == 0 {
return nil
}
message := q.list.Front()
q.list.Remove(message)
return message.Value.(*apiv1.Message)
}
func (q *MessageQueue) Len() int {
return q.list.Len()
}
Simple and effective.
Alright, with that done here's our Topic
:
// internal/broker/topic.go
package broker
import (
"mbus/internal/apiv1"
"mbus/internal/shared/queue"
)
type Topic struct {
Name string
Queue *queue.MessageQueue
}
func NewTopic(name string) *Topic {
return &Topic{
Name: name,
Queue: queue.New(),
}
}
func (topic *Topic) Dispatch(message *apiv1.Message) {
// for now, we only save the message in a queue
topic.Queue.Enqueue(message)
}
Again, quite simple stuff here.
Now that we have everything ready, let's implement the broker part responsible of handling a new message from producers.
// internal/broker/broker.go
func (broker *Broker) ProducerListen() error {
listener, err := net.Listen("tcp", net.JoinHostPort(broker.config.ProducerHost, broker.config.ProducerPort))
if err != nil {
return err
}
for {
// Accept new connection
conn, err := listener.Accept()
if err != nil {
return err
}
// read it
data, err := io.ReadAll(conn)
if err != nil {
return err
}
message, err := broker.decoder.Decode(data)
if err != nil {
return err
}
// do something with the message
// for now, just print it
broker.HandleNewMessage(message)
// close it off
conn.Close()
}
}
Alright, let's see what ProducerListen
does:
- It creates a listener on the host/port combination provided in the config (we'll get them from command-line arguments once we implement the
cmd
for the broker) - We start by accepting a connection on the TCP listener we created. Mind you, this call will block (i.e. put our program to sleep) until a new connection is established.
- After that, we read all data from that connection
- We decode the raw data into a
apiv1.Message
object - We call
HandleNewMessage
with the message (we'll work on that function shortly) - Close off the connection.
- And we keep doing steps 2-6 over and over again.
Our ProducerListen
call is blocking in two ways:
- The call to
listener.Accept
will block the main thread (the main application path) so our broker won't be able to do anything else but to wait as well. That's kind of bad because we can't send/receive data to/from consumers. - After we get a connection, you can see that we do a bunch of operations on it: read it, decode it and handle it. During this time, we can't receive new messages from producers because our main thread is busi handling the message we got. So if all those steps take 1 second to finish, we can only around receive 1 message per second. Our broker throughput is not its selling point.
Both of these points can be solved by introducing goroutines
which allow us to take some of the work to the "background" and not halt the entire application when doing a blocking operation.
Introducing goroutines however, will complicate things a bit because we will need to protect ourselves from race conditions using synchronization services such as a Mutex or Semaphore.
But I said in the begenning of these series: everything will be as simple as it can be.
And I meant it. I want to avoid goroutines as much as possible, unless we need to.
Our code is trivial and easy to understand that way. Which is the goal of this series anyway.
Once we finalize everything we can gradually start making things better and better.
Alright, the HandleNewMessage
is quite simple:
// internal/broker/broker.go
func (broker *Broker) HandleNewMessage(message *apiv1.Message) {
// get the topic
topic, exists := broker.topics[message.Topic]
// if it does not exist, create it
if exists == false || topic == nil {
log.Printf("creating topic %s", message.Topic)
broker.topics[message.Topic] = NewTopic(message.Topic)
}
// add the message to the topic
log.Printf("new message dispatched on topic %s", message.Topic)
broker.topics[message.Topic].Dispatch(message)
}
- We test whether we have a topic registerd with the message topic
- If not, we create one using
NewTopic
- We call
Dispatch
on the topic.
If you recall from earlier, our Dispatch
function from the Topic
class only adds the message to a queue.
Putting it all together
We got all the pieces, now we just need to glue them together with a command-line program:
// cmd/broker/broker.go
package main
import (
"flag"
"log"
"mbus/internal/broker"
)
var (
produceHost string
producePort string
)
func main() {
parseFlags()
config := &broker.BrokerConfig{
ProducerHost: produceHost,
ProducerPort: producePort,
}
broker, err := broker.New(config)
if err != nil {
log.Fatalf(err.Error())
}
// Listen for producers sending in messages
log.Println("Broker: start listening for incoming producer messages...")
err = broker.ProducerListen()
if err != nil {
log.Fatalf(err.Error())
}
}
func parseFlags() {
flag.StringVar(&produceHost, "phost", "127.0.0.1", "The host to listen to for producer messages")
flag.StringVar(&producePort, "pport", "9990", "The port to listen to for producer messages")
flag.Parse()
}
- We parse command-line flags (host and port where producers send messages to)
- We create a
BrokerConfig
object with the config we got. - We create a broker instance with our config object
- Then we start listening for new messages!
Testing it out
At the end of each episode, we try to test our code to see if it works.
We do manual testing here. Nice.
Let's add this piece of code to our broker:
// internal/broker/broker.go
func (broker *Broker) TestingThisOut() {
for _, topic := range broker.topics {
if topic == nil {
continue
}
for topic.Queue.Len() > 0 {
message := topic.Queue.Dequeue()
log.Printf("Received new message from topic '%s': '%s'", topic.Name, string(message.Data))
}
}
}
This will allow us to print any new messages we have in the queue of any topic that has any.
And let's add this to our broker command-line, between creating the broker and ProducerListen
:
// cmd/broker/broker.go
// broker.New call
ticker := time.Tick(3 * time.Second)
go func(ticker <-chan time.Time) {
for {
select {
case <-ticker:
broker.TestingThisOut()
}
}
}(ticker)
// calling ProducerListen here.
This code, will run forever in the background, periodically every 3 seconds, and will call TestingThisOut
function (well thought of name) which will print messages stored in queues of topics.
Now we're ready to test.
Open two terminals. Make sure you run make
to rebuild the project.
Run the broker in one of them:
./build/broker
And send a message in the second terminal
./build/producer -topic orders -message "look ma! a new order"
Get back to the terminal where you ran your broker, and you should see something like this:
2024/06/02 10:39:43 Broker: start listening for incoming producer messages...
2024/06/02 10:39:52 creating topic orders
2024/06/02 10:39:52 new message dispatched on topic orders
2024/06/02 10:39:55 Received message from topic orders: look ma! a new order
And that concludes this episode.
See you in the next one!
Top comments (0)