DEV Community

James Eastham
James Eastham

Posted on • Edited on

How to use Amazon SQS and SNS for inter-service communication - Part 1

It's been a busy week this week, so further development on the league manager application has been slow.

That said, it felt like time to add the groundings for inter-service communication.

It's event bus time.

Inter-Service Communication

In a microservice-based architecture, there is always going to be some element of communication between your separate services.

What was once a simple method call within the same library, now crosses processes, networks and sometimes even geographical locations.

Microservices are, in my opinion, the best way to write software in 2020. That said, getting these services to communicate is a pain in the arse.

There are two types of communication to consider:

1. Synchronous (request/response)

A synchronous communication is used when the calling service needs a response right away.

In our football league management application, let's consider the transfer service is going to move Player A from one team to another. For that to happen, index data about the two teams need to be pulled from the team service.

The transfer service could make two separate HTTP calls to https://team-service/team/{id} to return the data and then parse the response.

That's all well and good, but that's extremely chatty and also relies on https://team-service resolving to the correct location.

An alternative option (and the one I prefer, albeit slightly more complicated) is for the transfer service to hold a cache of the required team data it needs.

On startup, it runs one HTTP request to return ALL of the current HTTP data. Whenever a new team is created thereafter, the transfer service can simply listen out for an 'info.newteamcreated' event.

Whenever team data is required, the transfer service can first query its local cache (in mem, Redis) and failing that then make the outbound request to the team-service.

This is a concept called eventual consistency.

Eventual consistency can be described as each replica of the data receiving the entire dataset eventually!

Dropbox and similar storage technologies are a good example of eventual consistency. You edit a file locally on your PC, eventually, both the dropbox servers and your other connected smartphone will get that same update. However, if you hit save and instantly check your phone, the update may not be there right away.

This keeps the requirement for an instant response in the transfer service intact, whilst also further decoupling our services. Win!

It also leads quite nicely to our second type of communication.

2. Asyncronous (publish/subscribe)

Asynchronous communication is my favourite type of communication.

It's the act of a service raising a new event to some kind of centralised location and not caring what happens next.

The team-service in its current form already has a couple of examples of this.

When a new team is persisted to the databases a notification is sent to an event handler containing the details of the newly created team. Similar things happen when players are added or removed.

// CreateTeam creates a new team in the database.
func (interactor *TeamInteractor) CreateTeam(team *CreateTeamRequest) (*CreateTeamResponse, error) {
    if len(team.Name) == 0 {
        interactor.Logger.Log("Team name cannot be empty")
        var response = &CreateTeamResponse{
            ID:     "",
            Name:   team.Name,
            Errors: make([]string, 1),
        }

        response.Errors[0] = "Team name cannot be empty"

        return response, errors.New("Team name cannot be empty")
    }

    newTeam := &domain.Team{
        Name: team.Name,
    }

    createdTeamID := interactor.TeamRepository.Store(newTeam)

    interactor.EventHandler.Publish("leaguemanager-newteam", TeamCreatedEvent{
        TeamID:   createdTeamID,
        TeamName: team.Name,
    })

    return &CreateTeamResponse{
        ID:   createdTeamID,
        Name: newTeam.Name,
    }, nil
}
Enter fullscreen mode Exit fullscreen mode

The team service handles the storage of the team data, and then just tells the world about it. There's no need to wait for anything else to happen, no worrying about anything else. Just a simple publish and that is that.

I prefer this kind of communication, mainly for the ease of inserting new functionality.

Sticking with the team/transfer interaction. Let's say we have 4 instances of the transfer service running to handle a spike in load. When a new team is created ALL 4 instances can listen for the same event. No complications, no additional code. Just four services all listening for the same event.

Now, the fixture management team have started working on their scheduling service. They also care about when a new team is created so it can be used by their scheduling service.

In the world of microservices, this kind of functionality is simple to add.

The fixture service just needs to start listening out for the same 'info.newteamcreated' event. It then receives the same event at the same time as the 4 transfer instances.

Alt Text

AWS Implementation

There is no single AWS service that gives the complete set of functionality we need for the above implementation. To get it working exactly as we need to, a combination of Amazon Simple Queue and Simple Notification services are required.

Conceptually, this is how the implementation will work.

Alt Text

As with all of the AWS SDKs, the details for making this work with Go Lang is incredibly easy.

Our implementation of the SNS publish looks a little something like this.

package infrastructure

import (
    "errors"
    "fmt"
    "strings"
    "team-service/domain"

    "github.com/aws/aws-sdk-go/aws"
    "github.com/aws/aws-sdk-go/aws/credentials"
    "github.com/aws/aws-sdk-go/aws/session"
    "github.com/aws/aws-sdk-go/service/sns"
)

// ErrTopicNotFound is returned when the requested topic is not found.
var ErrTopicNotFound = errors.New("Specified topic not found")

// AmazonSnsEventBus is an event bus implementation using Amaazon SQS.
type AmazonSnsEventBus struct {
    svc       *sns.SNS
    availableTopics []string
}

// NewAmazonSnsEventBus creates a instance of the AmazonSnsEventBus.
func NewAmazonSnsEventBus(requiredTopics []string) *AmazonSnsEventBus {
    // Initialize a session that the SDK will use to load
    // credentials from the shared credentials file ~/.aws/credentials
    // and region from the shared configuration file ~/.aws/config.
    sess := session.Must(session.NewSession(&aws.Config{
        Region:      aws.String("eu-west-1"),
        Credentials: credentials.NewSharedCredentials("", "league-manager-sqs"),
    }))

    svc := sns.New(sess)

    availableTopics, _ := svc.ListTopics(nil)

    availableTopicArns := make([]string, len(availableTopics.Topics))

    for i, t := range availableTopics.Topics {
        availableTopicArns[i] = *t.TopicArn
    }

    return &AmazonSnsEventBus{
        svc:       svc,
        availableTopics: availableTopicArns,
    }
}

// Publish sends a new message to the event bus.
func (ev AmazonSnsEventBus) Publish(publishTo string, evt domain.Event) error {
    requiredTopicArn := ""

    for _, t := range ev.availableTopics {
        if strings.Contains(t, publishTo) {
            requiredTopicArn = t
        }
    }

    if len(requiredTopicArn) > 0 {

        result, err := ev.svc.Publish(&sns.PublishInput{
            Message: aws.String(string(evt.AsEvent())),
            TopicArn:    aws.String(requiredTopicArn),
        })

        if err != nil {
            fmt.Println("Error", err)
        }

        fmt.Println("Event published: ", *result.MessageId)

        return err
    }

    return ErrTopicNotFound
}

Enter fullscreen mode Exit fullscreen mode

And then the usage in main.go

teamInteractor := new(usecases.TeamInteractor)
// teamInteractor.TeamRepository = infrastructure.NewInMemTeamRepo()
teamInteractor.TeamRepository = infrastructure.NewDynamoDbRepo()
teamInteractor.Logger = new(infrastructure.Logger)
// teamInteractor.EventHandler = new(infrastructure.MockEventBus)
teamInteractor.EventHandler = infrastructure.NewAmazonSnsEventBus()
Enter fullscreen mode Exit fullscreen mode

One of the important things I've learned about both SNS and SQS is that you are charged per request.

For that reason, on the creation of the AmazonSqsEventBus, I load into memory all of the available topics.

availableTopics, _ := svc.ListTopics(nil)

availableTopicArns := make([]string, len(availableTopics.Topics))

for i, t := range availableTopics.Topics {
    availableTopicArns[i] = *t.TopicArn
}

return &AmazonSnsEventBus{
    svc:       svc,
    availableTopics: availableTopicArns,
}
Enter fullscreen mode Exit fullscreen mode

When the publish request is actually made, I loop through the available topic ARN's and pick out the one that contains the topic name I'm trying to publish to.

requiredTopicArn := ""

for _, t := range ev.availableTopics {
    if strings.Contains(t, publishTo) {
        requiredTopicArn = t
    }
}

if len(requiredTopicArn) > 0 {
    ...
}
Enter fullscreen mode Exit fullscreen mode

With that, I can run the application and make a POST request to create a new team. Low and behold...

Alt Text

Part one of the solution complete, albeit the much much simpler side.

In my next post, I'm going to dive deeper into the subscribe side of this model. This is a much more complicated side.

For each instance of a service running, it will need to create its own queue and then subscribe that queue to the correct SNS topic. Ideally, it also needs to sever that connection on shut down.

That could all be done manually within the UI, but in the world of auto-scaling and load balancing that would just be no fun.

As always, any comments questions or feedback is greatly appreciated. This is a learning journey for me as well.

Top comments (2)

Collapse
 
capnofdship profile image
capnOfdShip

Just curious, in the sample, why do all 4 transfer services have their own queue? Wouldn't 1 queue for the same service (per topic) be enough? Since they would all be writing to the same database and only 1 of them would need to process the message.

Is SNS+ SQS a better approach to microservice communication vs using Kinesis?

We are considering Kinesis thiking it is faster. And we can replay the messages if needed (but we can't think of a scenario for our use case to use it right now).

Collapse
 
jeastham1993 profile image
James Eastham

Hey, thanks for the message. In the specific example, the transfer services would all hold their own internal cache of the team data. So if they shared a queue, once one service had picked up the new queue message the other 3 would never see it.

I don't think that would be the 'best' way to do it n production, I was just providing an example.

I hadn't really looked at Kinesis until recently, but yes I think Kinesis would be a much better provider for this model. It suppors a much more typical pub/sub setup.