Feature to be tested
I have a Lambda that will consume DynamoDB Stream events, transform them, then send them to EventBridge.
Testing approach
Many articles mentioned using an SQS queue to consume the events and validate the messages as a solution. Other solutions mentioned using a Pub/Sub, or the aws-testing-library to check for the presence of CloudWatch log statements.
I chose the SQS queue option as it seemed like it would allow me to conduct thorough and reliable tests. Also, since I might swap out the Lambda for EventBridge Pipes in the future, there should be no need to change any of these tests; unlike if I watched for CloudWatch logs.
Testing EventBridge Events with SQS
The code for the test is below, and it is commented to describe each piece of code in better detail.
The test setup
The test requires three resources to be created:
- An SQS Queue to consume the EventBridge Events
- An EventBridge Rule for targeting specific EventBridge events. I will match against a record's ID so that it won't acknowledge any other non-test EB events.
- An EventBridge Target to set the SQS queue as a target for EB events, and to only consume events matching the Eventbridge Rule.
Once these are created, each test will insert, modify, or delete a record in a DynamoDB table. This will trigger the Lambda which handles DynamoDB Stream Events and sends them to EventBridge.
Verifying SQS messages
Once a DynamoDB record has been updated, I use long polling to check the SQS queue for any messages. When messages are received, I verify the message body.
Since the EventBridge rule is specific to the test case, I can trust that the message received is only meant for the test SQS queue.
Cleanup after each test
The test scenario will contain several tests.
As part of the setup for each test a record will be inserted, so that it can be modified. Then, the test will delete the same record as part of it's cleanup. This will create a lot of additional events that I'm not interested in testing. So, I make sure that these messages are cleared before the intended SQS message can be asserted.
Once all tests have been executed, I delete the test resources and destroy the clients.
Final notes before viewing the code
In the code below I create the test resources before each test run. If you are usign infrastructure as code, you might prefer to create and destroy the resources via Terraform, for example.
The code
const dynamoDBTableName = 'dynamo-db-table'
const mockRecordId = 'integration-test-emit-ddb-stream-change-event'
const sqsQueueName = 'integration-test-queue'
const eventBridgeRuleName = `${mockRecordId}-eb-rule-name`
const eventBridgeTargetID = `${mockRecordId}-eb-target-id`
const eventBridgeName = 'event-bridge-events'
const eventSource = 'event-bridge-source-service'
let eventBridgeClient: EventBridgeClient
let sqsClient: SQSClient
let dynamoDbClient: DynamoDBClient
let queueURL: string | undefined
const testRecord: DDBRecord = {
recordType: 'someType',
user: 'developer',
created: '2024-01-01T00:00:00.000Z',
recordId: mockRecordId,
recordName: `${mockRecordId}-record-name`,
version: '0',
status: 'initialStatus'
}
const setupSQSQueue = async () => {
// Create the test queue for consuming EventBridge events
const createQueueResponse = await sqsClient.send(
new CreateQueueCommand({
QueueName: sqsQueueName,
}),
)
// Set the queue URL so that it can be used throughout the tests
// Alternatively, if the queue is created via IaC, you can use the `GetQueueUrlCommand` command instead.
queueURL = createQueueResponse.QueueUrl
// Create an EB rule for the `testRecord` defined above
await eventBridgeClient.send(
new PutRuleCommand({
Name: eventBridgeRuleName,
EventPattern: JSON.stringify({
source: [eventSource],
detail: {
data: {
recordId: [mockRecordId],
},
},
}),
State: 'ENABLED',
EventBusName: eventBridgeName,
}),
)
// Get the newly created queue's ARN
const queueAttributes = await sqsClient.send(
new GetQueueAttributesCommand({
QueueUrl: queueURL,
AttributeNames: ['QueueArn'],
}),
)
// Set the queue as a target for EventBridge events
await eventBridgeClient.send(
new PutTargetsCommand({
Rule: eventBridgeRuleName,
EventBusName: eventBridgeName,
Targets: [
{
Arn: queueAttributes.Attributes?.QueueArn,
Id: eventBridgeTargetID,
},
],
}),
)
}
// Utility function to delete a single message on the queue
const deleteMessageOnQueue = async (receiptHandle: string) => {
await sqsClient.send(
new DeleteMessageCommand({
QueueUrl: queueURL,
ReceiptHandle: receiptHandle,
}),
)
}
// Utility function to clear a queue of all messages
const clearMessagesOnQueue = async () => {
const sqsMessages = await getMessageFromQueue()
const messagesToDeleteOnCleanup = getSQSMessageReceiptHandles(sqsMessages)
for (const receiptHandle of messagesToDeleteOnCleanup) {
await deleteMessageOnQueue(receiptHandle)
}
// Rather than using the above method, the `PurgeQueueCommand` command might work for you.
// I didn't use this because it could take up to 60 seconds to delete the messages on the queue.
}
const deleteTestRecordFromTable = async () => {
await dynamoDbClient.send(
new DeleteCommand({
TableName: dynamoDBTableName,
Key: {
recordId: mockRecordId,
},
}),
)
}
// Delete the SQS resource
const deleteQueue = async () => {
await sqsClient.send(new DeleteQueueCommand({ QueueUrl: queueURL }))
}
// Delete EB Target resource
const deleteEventBridgeTarget = async () => {
await eventBridgeClient.send(
new RemoveTargetsCommand({
Ids: [eventBridgeTargetID],
Rule: eventBridgeRuleName,
Force: true,
EventBusName: eventBridgeName,
}),
)
}
// Delete EB Rule resource
const deleteEventBridgeRule = async () => {
await eventBridgeClient.send(
new DeleteRuleCommand({
Name: eventBridgeRuleName,
Force: true,
EventBusName: eventBridgeName,
}),
)
}
const getMessageFromQueue = async () =>
await sqsClient.send(
new ReceiveMessageCommand({
MaxNumberOfMessages: 10,
QueueUrl: queueURL,
AttributeNames: ['All'],
WaitTimeSeconds: 30,
VisibilityTimeout: 0,
}),
)
const getSQSMessageReceiptHandles = (
sqsMessageCommandOutput: ReceiveMessageCommandOutput,
): string[] => {
if (!sqsMessageCommandOutput.Messages) return []
return sqsMessageCommandOutput.Messages.map((message) => message.ReceiptHandle).filter(
(message): message is string => message !== undefined,
)
}
// The test block
describe('Events are sent to EventBridge WHEN records are modified in a DynamoDB table', () => {
beforeAll(async () => {
// Create AWS SDK Clients
const awsConfig = { // Any AWS Config }
eventBridgeClient = new EventBridgeClient(awsConfig)
sqsClient = new SQSClient(awsConfig)
dynamoDbClient = new DynamoDBClient(awsConfig)
// Setup test resources
await setupSQSQueue()
})
afterEach(async () => {
await deleteTestRecordFromTable()
await clearMessagesOnQueue()
})
afterAll(async () => {
await deleteQueue()
await deleteEventBridgeTarget()
await deleteEventBridgeRule()
dynamoDbClient.destroy()
eventBridgeClient.destroy()
sqsClient.destroy()
})
it(`SHOULD create an event with the recordType 'X' WHEN a record is inserted into the DynamoDB table`, async () => {
// Keeping track of messages on the queue so they can be deleted later
const messagesToDeleteOnCleanup: string[] = []
// Insert a new record into the table
await dynamoDbClient.send(
new PutCommand({
TableName: dynamoDBTableName,
Item: testRecord,
}),
)
const sqsMessages = await getMessageFromQueue()
expect(sqsMessages.Messages).toBeDefined()
expect(sqsMessages.Messages).toHaveLength(1)
// Add the messages to the list for deletion on cleanup
messagesToDeleteOnCleanup.push(...getSQSMessageReceiptHandles(sqsMessages))
const parsedMessageBody: EventBridgeEventType<'X', DDBRecord> = JSON.parse(
sqsMessages.Messages?.[0].Body as string,
)
expect(parsedMessageBody['record-type']).toEqual('X')
expect(parsedMessageBody.detail.data).toEqual({
...testRecord,
status: 'ANewStatus'
})
// Clenup messages created as part of this test
for (const receiptHandle of messagesToDeleteOnCleanup) {
await deleteMessageOnQueue(receiptHandle)
}
})
describe('AND a record already exists in the database', () => {
beforeEach(async () => {
// Create the record in DynamoDB
await dynamoDbClient.send(
new PutCommand({
TableName: dynamoDBTableName,
Item: testRecord,
}),
)
await clearMessagesOnQueue()
})
it(`SHOULD create an event with the recordType 'Y' WHEN a record is modified and the status is set to 'Z'`, async () => {
const messagesToDeleteOnCleanup: string[] = []
// Update the record in the table to have a failed status
await dynamoDbClient.send(
new PutCommand({
TableName: dynamoDBTableName,
Item: {
...testRecord,
status: 'Z',
} as DDBRecord,
}),
)
const sqsMessages = await getMessageFromQueue()
expect(sqsMessages.Messages).toBeDefined()
expect(sqsMessages.Messages).toHaveLength(1)
// Add the messages to the list for deletion on cleanup
messagesToDeleteOnCleanup.push(...getSQSMessageReceiptHandles(sqsMessages))
const parsedMessageBody: EventBridgeEventType<'Y', DDBRecord> = JSON.parse(
sqsMessages.Messages?.[0].Body as string,
)
expect(parsedMessageBody['record-type']).toEqual('Y')
expect(parsedMessageBody.detail.data).toEqual({
...testRecord,
status: 'Z'
})
// Clenup messages created as part of this test
for (const receiptHandle of messagesToDeleteOnCleanup) {
await deleteMessageOnQueue(receiptHandle)
}
})
... more tests using the same approach as above
})
})
Top comments (0)