DEV Community

Carlos Armando Marcano Vargas
Carlos Armando Marcano Vargas

Posted on • Originally published at carlosmv.hashnode.dev on

Go RabbitMQ: Integrating RabbitMQ into Your Go Applications

In this article, we are going to learn how to integrate RabbitMQ with a Go server.

We are going to the net/http package in this article. We are going to build two servers, one that will publish messages to the RabbitMQ queue and another to consume from it.

Also, we are just using the code examples from the RabbitMQ Go tutorial and integrating them into a Go server.

I will use the same disclaimer from the RabbitMQ tutorial:

Please keep in mind that this and other tutorials are, well, tutorials. They demonstrate one new concept at a time and may intentionally oversimplify some things and leave out others. For example, topics such as connection management, error handling, connection recovery, concurrency and metric collection are largely omitted for the sake of brevity. Such simplified code should not be considered production ready.

Requirements

  • Go installed

  • RabbitMQ installed

Publisher

Here, we are going to develop a server that will publish messages to the queue. First, we have to install the RabbitMQ Go client.

go get github.com/rabbitmq/amqp091-go

Enter fullscreen mode Exit fullscreen mode

Then, we create the main.go file.

package main

import (
    "context"
    "log"
    "log/slog"
    "net/http"
    "time"

    amqp "github.com/rabbitmq/amqp091-go"
)

func main() {

    mux := http.NewServeMux()
    port := ":8000"

    mux.Handle("/", http.HandlerFunc(homeHandler))

    slog.Info("Listening on ", "port", port)

    err := http.ListenAndServe(port, mux)
    if err != nil {
        slog.Warn("Problem starting the server", "error", err)
    }

}

func homeHandler(w http.ResponseWriter, r *http.Request) {

    w.Write([]byte("Hello this is a home page"))
    rabbitmqConnection()

}

func failOnError(err error, msg string) {
    if err != nil {
        log.Panicf("%s: %s", msg, err)
    }
}

func rabbitmqConnection() {
    conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/")
    failOnError(err, "Failed to connect to RabbitMQ")
    defer conn.Close()
    ch, err := conn.Channel()
    failOnError(err, "Failed to open a channel")
    defer ch.Close()

    q, err := ch.QueueDeclare(
        "hello", // name
        false, // durable
        false, // delete when unused
        false, // exclusive
        false, // no-wait
        nil, // arguments
    )
    failOnError(err, "Failed to declare a queue")

    ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
    defer cancel()

    body := "Hello World!"
    err = ch.PublishWithContext(ctx,
        "", // exchange
        q.Name, // routing key
        false, // mandatory
        false, // immediate
        amqp.Publishing{
            ContentType: "text/plain",
            Body: []byte(body),
        })
    failOnError(err, "Failed to publish a message")
    log.Printf(" [x] Sent %s\n", body)

}

Enter fullscreen mode Exit fullscreen mode

Here we create a new HTTP multiplexer (mux) and sets the listening port to 8000. It defines a route handler (homeHandler) for the root path /.

The homeHandler() function writes a simple message ("Hello this is a home page") to the response writer and then calls the rabbitmqConnection function to connect to RabbitMQ and publish a message. So, every time the route "/" is hit, it will publish the message defined in the rabbitmqConnection() function.

The failOnError() function checks for an error and logs a panic message if the error is not nil.

The rabbitmqConnection() function establishes a connection to the RabbitMQ server using the amqp.Dial function. It creates a channel (ch) and declares a queue named "hello". Then, it publishes a message with the body "Hello World!" to the "hello" queue.

The main() function creates the web server using the http.ListenAndServe function and starts listening on port 8000.

receiver.go

Now, we create a program that consumes the messages from the queue and prints them in the console.

This is the same code from the RabbitMQ Go tutorial.

package main

import (
    "log"

    amqp "github.com/rabbitmq/amqp091-go"
)

func failOnError(err error, msg string) {
    if err != nil {
        log.Panicf("%s: %s", msg, err)
    }
}

func main() {
    conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/")
    failOnError(err, "Failed to connect to RabbitMQ")
    defer conn.Close()

    ch, err := conn.Channel()
    failOnError(err, "Failed to open a channel")
    defer ch.Close()

    q, err := ch.QueueDeclare(
        "hello", // name
        false, // durable
        false, // delete when unused
        false, // exclusive
        false, // no-wait
        nil, // arguments
    )
    failOnError(err, "Failed to declare a queue")
    msgs, err := ch.Consume(
        q.Name, // queue
        "", // consumer
        true, // auto-ack
        false, // exclusive
        false, // no-local
        false, // no-wait
        nil, // args
    )
    failOnError(err, "Failed to register a consumer")

    var forever chan struct{}

    go func() {
        for d := range msgs {
            log.Printf("Received a message: %s", d.Body)
        }
    }()

    log.Printf(" [*] Waiting for messages. To exit press CTRL+C")
    <-forever
}

Enter fullscreen mode Exit fullscreen mode

Here we declared the name of the queue("hello") from this program will consume the messages, using the ch.QueueDeclare method. The parameters specify that the queue is not durable (won't persist across server restarts), not exclusive (can be consumed by multiple consumers), and not auto-delete (won't be deleted when the last consumer unsubscribes).

The code registers a consumer using the ch.Consume method. It specifies the queue name (q.Name), an empty consumer tag (allowing RabbitMQ to generate a unique tag), true for automatic acknowledgment (messages are automatically acknowledged after processing), and false for other parameters. The method returns a channel (msgs) that receive incoming messages.

A goroutine is started to consume messages from the msgs channel. It iterates over the messages, printing their bodies to the console.

The main function logs a message indicating that it's waiting for messages and starts a forever channel to block the main thread. It prevents the program from exiting immediately and allows the consumer loop to continue running.

The <-forever statement waits for a signal to be sent to the forever channel, which will happen when the user presses CTRL+C. This allows the program to exit gracefully when interrupted.

Consumer

Here, we create the server that consumes from the queue. We have to install the RabbitMQ client too.

package main

import (
    "log"
    "log/slog"
    "net/http"

    amqp "github.com/rabbitmq/amqp091-go"
)

func failOnError(err error, msg string) {
    if err != nil {
        log.Panicf("%s: %s", msg, err)
    }
}

func main() {
    mux := http.NewServeMux()
    port := ":8080"

    mux.Handle("/", http.HandlerFunc(homeHandler))

    slog.Info("Listening on ", "port", port)

    err := http.ListenAndServe(port, mux)
    if err != nil {
        slog.Warn("Problem starting the server", "error", err)
    }

}

func homeHandler(w http.ResponseWriter, r *http.Request) {

    body := rabbitmqConsumer()
    w.Write([]byte(body))

}

func rabbitmqConsumer() string {
    conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/")
    failOnError(err, "Failed to connect to RabbitMQ")
    defer conn.Close()

    ch, err := conn.Channel()
    failOnError(err, "Failed to open a channel")
    defer ch.Close()

    q, err := ch.QueueDeclare(
        "hello", // name
        false, // durable
        false, // delete when unused
        false, // exclusive
        false, // no-wait
        nil, // arguments
    )
    failOnError(err, "Failed to declare a queue")
    msgs, err := ch.Consume(
        q.Name, // queue
        "", // consumer
        true, // auto-ack
        false, // exclusive
        false, // no-local
        false, // no-wait
        nil, // args
    )
    failOnError(err, "Failed to register a consumer")

    forever := make(chan string)
    go func() {
        for d := range msgs {
            log.Printf("Received a message: %s", d.Body)
            forever <- string(d.Body)

        }
    }()

    bodyMessage := <-forever

    return bodyMessage

}

Enter fullscreen mode Exit fullscreen mode

Here, we just copy the same code from the receiver.go and wrap it in the rabbitmqConnection() function. This function will be called everytime the route "/" is called and will show the body of the message in the browser or HTTP Client.

Conclusion

In this article, we learn how to integrate RabbitMQ into a Go server. RabbitMQ is a powerful messaging broker that can be seamlessly integrated with Go applications to handle asynchronous message communication.

By utilizing RabbitMQ, Go servers can effectively decouple components, enhance scalability, and ensure reliable message delivery.

The Publisher's source code is here.

The Consumer's source code is here.

Thank you for taking the time to read this article.

If you have any recommendations about other packages, architectures, how to improve my code, my English, or anything; please leave a comment or contact me through Twitter, or LinkedIn.

Resources

RabbitMQ Go tutorial

AMQP091 README

Top comments (0)