DEV Community

Guille Ojeda for AWS Community Builders

Posted on • Originally published at blog.guilleojeda.com

Building Reliable Messaging Patterns in AWS with SQS and SNS

Building distributed systems requires putting a lot of attention on communication between components. These components often need to exchange information asynchronously, and that's where message queues and pub/sub systems are the go-to solution. AWS provides two core services for this purpose: Simple Queue Service (SQS) and Simple Notification Service (SNS). While these managed services handle the fundamental mechanics of message delivery, you need to understand how to configure them to build reliable distributed systems.

This article explores those configuration details, as well as practical patterns for implementing reliable messaging using SQS and SNS. We'll examine how these services work together, talk about error handling strategies, and learn how to scale messaging infrastructure effectively.

The examples in this article use Node.js, but the patterns apply to any programming language with an AWS SDK.

Understanding AWS Messaging Services

AWS messaging services solve different aspects of the distributed communication problem. SQS provides managed message queues that enable point-to-point communication between components. When a producer sends a message to an SQS queue, that message will be delivered to exactly one consumer. This guarantee makes SQS ideal for workload distribution and task processing.

Here's how to create a basic SQS queue:

const standardQueueConfig = {
    QueueName: 'order-processing-queue',
    Attributes: {
        MessageRetentionPeriod: '1209600',
        ReceiveMessageWaitTimeSeconds: '20',
        VisibilityTimeout: '30'
    }
};
Enter fullscreen mode Exit fullscreen mode

The configuration above defines how your queue is going to behave. Message retention period determines how long messages remain available if not processed, in this case 14 days. The receive message wait time enables long polling, reducing empty responses and unnecessary API calls. Visibility timeout specifies how long a message remains hidden during processing, preventing multiple consumers from processing the same message simultaneously.

SQS offers two queue types: Standard and FIFO (First-In-First-Out). Standard queues provide "at-least-once" delivery and support nearly unlimited throughput, but messages may occasionally be delivered out of order or more than once. FIFO queues, on the other hand, guarantee exactly-once processing and strict message ordering, but with limited throughput - 3,000 messages per second with batching, or 300 without.

FIFO queues require additional configuration:

const fifoQueueConfig = {
    QueueName: 'order-processing-queue.fifo',
    Attributes: {
        FifoQueue: 'true',
        ContentBasedDeduplication: 'true',
        MessageRetentionPeriod: '1209600',
        ReceiveMessageWaitTimeSeconds: '20',
        VisibilityTimeout: '30'
    }
};
Enter fullscreen mode Exit fullscreen mode

The .fifo suffix in the queue name is mandatory for FIFO queues. Content-based deduplication automatically detects and removes duplicate messages based on their content, though you can also provide explicit deduplication IDs if needed.

SNS, meanwhile, implements the publish-subscribe pattern. Messages sent to an SNS topic are delivered to multiple subscribers simultaneously. This makes SNS ideal for broadcasting notifications, implementing event-driven architectures, and decoupling services. When a message arrives at an SNS topic, it fans out to all subscribed endpoints immediately.

Creating an SNS topic involves specifying its basic attributes and any desired message filtering capabilities:

const topicConfig = {
    Name: 'order-events',
    Attributes: {
        KmsMasterKeyId: 'alias/aws/sns',
        FilterPolicy: JSON.stringify({
            eventType: ['order_created', 'order_updated', 'order_cancelled']
        })
    }
};
Enter fullscreen mode Exit fullscreen mode

Message filtering in SNS deserves special attention because it can significantly reduce unnecessary processing. Rather than forcing every subscriber to receive and filter messages themselves, SNS can filter messages at the service level based on message attributes:

// Message filtering configuration
const filterPolicy = {
    eventType: ['order_created'],
    priority: ['HIGH'],
    region: ['us-east-1', 'us-west-2']
};

const subscriptionAttributes = {
    FilterPolicy: JSON.stringify(filterPolicy)
};
Enter fullscreen mode Exit fullscreen mode

When applied to a subscription, this filter ensures the subscriber only receives messages matching specific criteria. This filtering happens before message delivery, reducing both processing overhead and potential costs.

Implementing Reliable Queue Processing

Processing messages reliably requires paying special attention to several aspects of the messaging lifecycle.

First, let's look at a basic but reliable message processor:

const processQueue = async (queueUrl) => {
    const receiveParams = {
        QueueUrl: queueUrl,
        MaxNumberOfMessages: 10,
        WaitTimeSeconds: 20,
        MessageAttributeNames: ['All']
    };

    try {
        const data = await sqs.receiveMessage(receiveParams).promise();

        if (!data.Messages) {
            return;
        }

        for (const message of data.Messages) {
            try {
                const body = JSON.parse(message.Body);
                await processMessageByType(body);
                await deleteMessage(queueUrl, message.ReceiptHandle);

                console.log(`Successfully processed message ${message.MessageId}`);
            } catch (error) {
                console.error(`Error processing message ${message.MessageId}:`, error);
            }
        }
    } catch (error) {
        console.error('Error receiving messages:', error);
    }
};
Enter fullscreen mode Exit fullscreen mode

This implementation includes several important reliability features. Long polling reduces unnecessary API calls while ensuring timely message processing. Batch message processing improves throughput and reduces costs. Error handling at both the receive and process levels ensures that failures don't crash the processor. Messages are only deleted after successful processing, ensuring no message is lost due to processing failures.

However, reliable message processing requires more than just careful implementation. We need to handle messages that consistently fail processing, implement proper monitoring, and ensure our system scales appropriately.

Handling Failed Messages with Dead Letter Queues

Messages that can't be processed successfully after multiple attempts need special handling. Dead Letter Queues (DLQs) provide a way to isolate these problematic messages for analysis and potential reprocessing. Here's how to implement a good DLQ strategy:

const dlqConfig = {
    QueueName: 'order-processing-dlq',
    Attributes: {
        MessageRetentionPeriod: '1209600'
    }
};

const mainQueueConfig = {
    QueueName: 'order-processing',
    Attributes: {
        RedrivePolicy: JSON.stringify({
            deadLetterTargetArn: dlqArn,
            maxReceiveCount: 3
        })
    }
};
Enter fullscreen mode Exit fullscreen mode

The redrive policy automatically moves messages to the DLQ after multiple failed processing attempts. This prevents infinite processing loops while preserving failed messages for analysis. The maxReceiveCount parameter determines how many processing attempts are allowed before a message moves to the DLQ.

Processing messages from a DLQ requires a couple of changes:

const processDLQ = async (dlqUrl) => {
    const params = {
        QueueUrl: dlqUrl,
        MaxNumberOfMessages: 10,
        WaitTimeSeconds: 20,
        MessageAttributeNames: ['All']
    };

    try {
        const data = await sqs.receiveMessage(params).promise();

        if (!data.Messages) {
            return;
        }

        for (const message of data.Messages) {
            try {
                const failureAnalysis = await analyzeFailure(message);

                if (failureAnalysis.isRecoverable) {
                    await returnToMainQueue(message);
                } else {
                    await storeFailedMessage(message);
                }

                await deleteMessage(dlqUrl, message.ReceiptHandle);
            } catch (error) {
                console.error('Error processing DLQ message:', error);
            }
        }
    } catch (error) {
        console.error('Error receiving DLQ messages:', error);
    }
};

const analyzeFailure = async (message) => {
    const attributes = message.MessageAttributes;
    const messageAge = Date.now() - attributes.SentTimestamp;
    const failureCount = parseInt(attributes.ApproximateReceiveCount);

    return {
        isRecoverable: messageAge < 86400000 && failureCount < 5,
        failureReason: determineFailureReason(message)
    };
};
Enter fullscreen mode Exit fullscreen mode

This implementation analyzes failed messages to determine if they're recoverable based on their age and failure count. Recoverable messages can be returned to the main queue for reprocessing, while permanently failed messages are stored for further analysis.

Monitoring and Observability

A reliable messaging system requires good monitoring to detect and respond to issues before they impact your applications. Amazon CloudWatch provides basic metrics for both SQS and SNS, but effective monitoring requires understanding which metrics actually matter and how to interpret them.

For SQS queues, the ApproximateNumberOfMessages metric indicates how many messages are available for retrieval. However, this number alone doesn't tell the whole story. You also need to monitor ApproximateNumberOfMessagesNotVisible, which shows messages currently being processed, and ApproximateAgeOfOldestMessage, which can indicate processing backlogs or stalled consumers.

Here's how to set up basic queue monitoring:

const setupQueueMonitoring = async (queueUrl) => {
    const alarmConfig = {
        AlarmName: 'QueueMessageAge',
        AlarmDescription: 'Alert when messages are getting old',
        MetricName: 'ApproximateAgeOfOldestMessage',
        Namespace: 'AWS/SQS',
        Dimensions: [{
            Name: 'QueueName',
            Value: getQueueNameFromUrl(queueUrl)
        }],
        Period: 300,
        EvaluationPeriods: 2,
        Threshold: 3600,
        ComparisonOperator: 'GreaterThanThreshold',
        Statistic: 'Maximum'
    };

    await cloudwatch.putMetricAlarm(alarmConfig).promise();
};
Enter fullscreen mode Exit fullscreen mode

This configuration alerts you when messages remain unprocessed for more than an hour, which might indicate processing issues. However, CloudWatch metrics alone often don't provide enough visibility into message processing. Custom metrics can provide deeper insights into your system's behavior:

const recordCustomMetrics = async (message, processingResult) => {
    const metrics = [
        {
            MetricName: 'MessageProcessingTime',
            Value: processingResult.duration,
            Unit: 'Milliseconds',
            Dimensions: [
                {
                    Name: 'MessageType',
                    Value: message.attributes.messageType
                },
                {
                    Name: 'Environment',
                    Value: process.env.ENVIRONMENT
                }
            ],
            Timestamp: new Date()
        }
    ];

    await cloudwatch.putMetricData({
        Namespace: 'CustomMessageProcessing',
        MetricData: metrics
    }).promise();
};
Enter fullscreen mode Exit fullscreen mode

These custom metrics track processing time by message type, helping you identify performance patterns and potential bottlenecks. You might discover that certain message types consistently take longer to process or fail more frequently than others.


Stop copying cloud solutions, start understanding them. Join over 45,000 devs, tech leads, and experts learning how to architect cloud solutions, not pass exams, with the Simple AWS newsletter.


Security and Access Control

Security in messaging systems isn't just authentication and authorization. It also involves encryption, access control, and secure cross-account communication. Both SQS and SNS support server-side encryption using AWS KMS, which should be enabled for sensitive data (or for any data, really):

const setupQueueEncryption = async (queueUrl) => {
    const attributes = {
        QueueUrl: queueUrl,
        Attributes: {
            KmsMasterKeyId: 'alias/aws/sqs',
            Policy: JSON.stringify({
                Version: '2012-10-17',
                Statement: [{
                    Effect: 'Deny',
                    Principal: '*',
                    Action: 'SQS:*',
                    Resource: queueArn,
                    Condition: {
                        Bool: {
                            'aws:SecureTransport': false
                        }
                    }
                }]
            })
        }
    };

    await sqs.setQueueAttributes(attributes).promise();
};
Enter fullscreen mode Exit fullscreen mode

Always remember the principle of least privilege. Producer services should only have permission to send messages, while consumer services should only have permission to receive and delete messages:

const producerPolicy = {
    Version: '2012-10-17',
    Statement: [{
        Effect: 'Allow',
        Action: [
            'sqs:SendMessage',
            'sqs:GetQueueUrl'
        ],
        Resource: queueArn,
        Condition: {
            ArnLike: {
                'aws:SourceArn': producerServiceArn
            }
        }
    }]
};
Enter fullscreen mode Exit fullscreen mode

Cross-account messaging adds a bit of complexity. When services in different AWS accounts need to communicate, you must configure both the sender's IAM permissions and the receiving queue's resource policy:

const crossAccountQueuePolicy = {
    Version: '2012-10-17',
    Statement: [{
        Effect: 'Allow',
        Principal: {
            AWS: sourceAccountArn
        },
        Action: 'sqs:SendMessage',
        Resource: queueArn,
        Condition: {
            StringEquals: {
                'aws:SourceAccount': sourceAccountId
            }
        }
    }]
};
Enter fullscreen mode Exit fullscreen mode

Advanced Messaging Patterns

There will come a time when what I've showed above isn't enough for your system. Let's explore some advanced patterns that address common distributed system challenges.

Message batching can significantly improve throughput and reduce costs. However, implementing batching requires you to be mindful of how you handle failures and timeouts:

const batchProcessor = async (messages, processor) => {
    const messageGroups = messages.reduce((groups, message) => {
        const type = message.MessageAttributes.Type.StringValue;
        groups[type] = groups[type] || [];
        groups[type].push(message);
        return groups;
    }, {});

    for (const [type, groupMessages] of Object.entries(messageGroups)) {
        try {
            await processor(groupMessages, type);
            await batchDeleteMessages(queueUrl, groupMessages);
        } catch (error) {
            console.error(`Error processing message group ${type}:`, error);

            // Handle partial batch failures by deleting successful messages
            if (error.partialSuccess) {
                await batchDeleteMessages(queueUrl, error.successfulMessages);
            }
        }
    }
};
Enter fullscreen mode Exit fullscreen mode

When messages must be processed in order, such as in event sourcing systems, you need to implement ordering guarantees even with standard queues:

const orderDependentProcessor = async (queueUrl) => {
    const messageCache = new Map();
    const processingOrder = [];

    const processMessageIfReady = async (message) => {
        const sequenceNumber = parseInt(
            message.MessageAttributes.SequenceNumber.StringValue
        );

        if (sequenceNumber !== processingOrder.length + 1) {
            messageCache.set(sequenceNumber, message);
            return;
        }

        await processMessage(message);
        processingOrder.push(sequenceNumber);

        let nextSequence = sequenceNumber + 1;
        while (messageCache.has(nextSequence)) {
            const nextMessage = messageCache.get(nextSequence);
            messageCache.delete(nextSequence);
            await processMessage(nextMessage);
            processingOrder.push(nextSequence);
            nextSequence++;
        }
    };
};
Enter fullscreen mode Exit fullscreen mode

Circuit breakers protect downstream services from cascade failures. In messaging systems, circuit breakers can prevent queue processors from overwhelming struggling dependencies, and will isolate a failure preventing it from bringing down the entire system:

class MessageProcessorCircuitBreaker {
    constructor(failureThreshold = 5, resetTimeout = 60000) {
        this.failureCount = 0;
        this.failureThreshold = failureThreshold;
        this.resetTimeout = resetTimeout;
        this.lastFailureTime = null;
        this.state = 'CLOSED';
    }

    async processMessage(message, processor) {
        if (this.state === 'OPEN') {
            if (Date.now() - this.lastFailureTime >= this.resetTimeout) {
                this.state = 'HALF_OPEN';
            } else {
                throw new Error('Circuit breaker is OPEN');
            }
        }

        try {
            const result = await processor(message);

            if (this.state === 'HALF_OPEN') {
                this.state = 'CLOSED';
                this.failureCount = 0;
            }

            return result;
        } catch (error) {
            this.handleFailure();
            throw error;
        }
    }

    handleFailure() {
        this.failureCount++;
        this.lastFailureTime = Date.now();

        if (this.failureCount >= this.failureThreshold) {
            this.state = 'OPEN';
        }
    }
}
Enter fullscreen mode Exit fullscreen mode

Performance and Cost Optimization

Here's where we talk about service limits, implementing efficient processing patterns, and managing costs effectively. Standard SQS queues offer virtually unlimited throughput, while FIFO queues have specific limits that you need to be mindful of:

const scalingConfig = {
    standardQueue: {
        batchSize: 10,
        concurrentExecutions: 1000,
        processingTimeout: 30
    },
    fifoQueue: {
        maxThroughput: 3000,
        batchSize: 10,
        messageGroupId: 'orderProcessing',
        deduplicationId: uuid.v4(),
        processingTimeout: 30
    }
};
Enter fullscreen mode Exit fullscreen mode

Cost optimization mostly involves balancing message retention, polling frequency, and batch processing. Long polling reduces API calls and associated costs:

const costOptimizedReceive = async (queueUrl) => {
    const params = {
        QueueUrl: queueUrl,
        MaxNumberOfMessages: 10,
        WaitTimeSeconds: 20,
        AttributeNames: ['SentTimestamp'],
        MessageAttributeNames: ['MessageType']
    };

    return await sqs.receiveMessage(params).promise();
};
Enter fullscreen mode Exit fullscreen mode

Conclusion

Building reliable messaging systems isn't just creating SQS queues and SNS topics and calling it a day. It requires understanding how the services work, how to configure them, and how to use them effectively in distributed systems. Proper error handling, monitoring, and security are just a few of the things you need to be mindful of. The patterns and practices discussed here serve as a foundation for building robust messaging systems, but it's left as an exercise to the reader to adapt them to your specific requirements and constraints.

Remember that reliability in distributed systems isn't about preventing all failures. It's about handling failures gracefully when they occur. Testing your messaging patterns under different failure conditions will help ensure your system remains reliable even when components fail or become overloaded.

As with any system, how your components communicate should evolve with your requirements. Start with simple patterns and add complexity only when required. Monitor your system's behavior, understand your traffic patterns, and adjust your implementation accordingly.


Stop copying cloud solutions, start understanding them. Join over 45,000 devs, tech leads, and experts learning how to architect cloud solutions, not pass exams, with the Simple AWS newsletter.

  • Real scenarios and solutions

  • The why behind the solutions

  • Best practices to improve them

Subscribe for free

If you'd like to know more about me, you can find me on LinkedIn or at www.guilleojeda.com

Top comments (0)