DEV Community

Arpad Toth for AWS Community Builders

Posted on • Originally published at arpadt.com

Building event-driven workflows with DynamoDB Streams

Event-based architectures allow us to break down complex, hard-to-read code into more manageable components. Using DynamoDB streams and lightweight Lambda functions, we can create loosely coupled resources that automatically respond to events, streamlining workflows and improving system clarity.

1. The scenario

Bob is developing a gaming application where users earn scores for completing tasks. Each task is categorized by difficulty, allowing players to earn different scores based on the challenge level.

The backend logic adds these scores to a DynamoDB table. Bob decided to implement a single-table design for the entire application.

Scores are submitted to an API Gateway REST API endpoint via a POST request. A Lambda function processes the payload and saves it to the database. Alternatively, if the payload doesn’t require validation or restructuring, the API Gateway can connect directly to DynamoDB - offering a faster, but not always viable solution.

In both cases, the new score must be added to the user's total score.

2. Approaches

Bob has two main options to tackle this challenge.

2.1. Synchronous solution

In this approach, Bob adds a Lambda integration to the API Gateway endpoint. The function calculates the new score based on the task type provided in the request. It may also need to fetch the relevant task-score mapping from the database. Then, the function retrieves the user's existing total score from the database, adds the new score, and saves the updated total score back to the table.

This pattern is commonly used. One benefit is its simplicity: a single function handles all necessary operations, keeping the architecture straightforward. All that’s required is to write and deploy the code to Lambda.

One function performs the business logic

However, while simplifying the architecture, we increase code complexity. The Lambda function can quickly become hard to maintain, as it takes on multiple responsibilities. Additionally, synchronous processing can delay the response to the client since the business logic must be completed before returning a result. Finally, in some scenarios, the client doesn’t necessarily need to receive the result, i.e., the updated total score in the response.

2.2. Asynchronous solution

Alternatively, Bob can design an asynchronous solution. Since the client doesn’t need the new total score in the response, the endpoint can simply return an acknowledgement that the request was received. All further processing - calculations and database updates - can then occur in the background.

This approach allows the client to receive an immediate response without waiting for the entire process to finish. The endpoint handler function’s only responsibility is to add the new score to the database. In some cases, API Gateway can even write directly to DynamoDB, bypassing the need for Lambda.

The business logic can then be split across separate functions hidden from the client. This keeps each Lambda function’s code short and focused, as each handles a specific, well-defined task, creating loose coupling between different parts of the logic.

Processing is split into smaller chunks

The main drawback of the asynchronous approach is the added architectural complexity. Bob will need to set up and manage additional resources, which requires more time, design consideration, and maintenance.

Bob eventually decided to go with the asynchronous solution for this application.

3. The chosen asynchronous solution

Let’s explore how to set up an asynchronous workflow for score processing.

3.1. DynamoDB Streams

DynamoDB Streams capture any item-level changes (create/update/delete) in a DynamoDB table.

This feature is exactly what we need: it lets us detect when a new score item is added to the table, triggering a function to handle score processing.

Enabling Streams in DynamoDB using the CDK is straightforward:

const table = new dynamodb.Table(scope, 'SOME_ID', {
  // ... other properties
 stream: dynamodb.StreamViewType.NEW_IMAGE,
});
Enter fullscreen mode Exit fullscreen mode

With this configuration, the Lambda function consuming the stream receives the full item as it appears after modification.

3.2. Avoiding recursion

However, there’s a challenge: since we’re using a single-table design, the new score item is saved in the same table as the total score.

When a new score is written, it’s added to the stream, triggering the Lambda function to process the score and update the total score in the same table. This update could trigger the function again, potentially causing a loop or function error.

To avoid this, we configure the event source mapping to trigger only for specific events, like when a new score item is created. We exclude events like total score updates to prevent recursion.

3.3. Filter-friendly table design

Filters can be applied to various attributes, and using one of the index keys in the filter configuration makes setup easier. Careful table design can simplify this filtering process.

In single-table design, we use generic partition and sort keys, often labelled PK and SK. Here, the partition key (PK) can specify the item type. For example, each score item’s partition key can start with SCORE#.... The part after # is not relevant to this example.

We can configure the filter like this in the event source mapping using TypeScript CDK:

const scoreProcessorFn = new NodejsFunction(
  // function properties here
);

scoreProcessorFn.addEventSource(
  new lambdaEventSources.DynamoEventSource(table, { // Table construct from above
    filters: [
      lambda.FilterCriteria.filter({
        eventName: lambda.FilterRule.isEqual('INSERT'),
        dynamodb: {
          Keys: {
            PK: { S: lambda.FilterRule.beginsWith('SCORE') },
          },
        },
      }),
    ],
    // other configuration properties here
  }),
);
Enter fullscreen mode Exit fullscreen mode

This setup ensures that every time a new item with a partition key (PK) beginning with SCORE is added (INSERT), the processing function runs. Items whose partition key doesn't start with SCORE won’t trigger the function.

If you need the function to run on both INSERT and MODIFY events, you can configure the filter as follows:

scoreProcessorFn.addEventSource(
  new lambdaEventSources.DynamoEventSource(table, { // Table construct from above
    filters: [
      lambda.FilterCriteria.filter({
        eventName: lambda.FilterRule.isEqual('INSERT'),
        dynamodb: {
          Keys: {
            PK: { S: lambda.FilterRule.beginsWith('SCORE') },
          },
        },
      }),
      lambda.FilterCriteria.filter({
        eventName: lambda.FilterRule.isEqual('MODIFY'),
        dynamodb: {
          Keys: {
            PK: { S: lambda.FilterRule.beginsWith('SCORE') },
          },
        },
      }),
    ],
    // other configuration properties here
  }),
);
Enter fullscreen mode Exit fullscreen mode

Using similar filters, we can configure other Lambda functions to run only when specific items are created or updated, preventing recursive loops in our architecture.

3.4. Processing function code

When Lambda detects new records in the stream, it invokes the function with an event object as the first argument. You can view an example event format in the documentation.

Here’s an example of the processor function code in TypeScript:

export async function handler(event: DynamoDBStreamEvent, context: Context) {
  // Assuming that the batch size is set to 1 in the event source mapping, we can access the only record in the batch using its index.
  // If the batch size is set to a number greater than 1, you should iterate over the records to process them all.
  const score = event.Records[0].dynamodb?.NewImage;

  const [, uniqueId] = score.PK.S?.split('#') || [];
  const [, otherId] = score.SK.S?.split('#') || [];
  const scoreValue = score.Score.N;
  // other necessary attributes here

  // processing logic here
}
Enter fullscreen mode Exit fullscreen mode

One important note: even if using the document client module from the AWS SDK to avoid specifying data type descriptors (S, N, BOOL, etc.) in code, the stream record will still contain them. Be sure to include these descriptors in the access path when retrieving attribute values.

4. Summary

Using DynamoDB Streams and Lambda functions, we can build loosely coupled, event-driven workflows. This approach requires additional resources to be created and managed, but it leads to cleaner, more maintainable code in each function.

To prevent loops in the architecture, we can set up filters in the event source mapping. These filters ensure that the function only triggers when the stream record meets specific conditions, streamlining the event processing flow.

5. References, further reading

Creating a single-table design with Amazon DynamoDB - AWS blog post about single-table design

Create your first Lambda function - How to create a Lambda function

Creating tables and loading data for code examples in DynamoDB - How to create a and populate DynamoDB tables

Top comments (0)