DEV Community

Matt Martz
Matt Martz

Posted on

Using DynamoDB Streams to sync back to your old database

In my continuing saga of database and api modernization, I'm going to explore using DynamoDB streams to back-propagate data to the old MySQL database. I'm hoping it won't come to this in practice...

For our "legacy" system, we're going to assume we have an API Gateway that invokes lambda functions that talk to an Aurora Serverless (MySQL) database.

We're then going to update the API Gateway to instead read/write to DynamoDB... and use DynamoDB streams to re-use the old legacy lambdas to write BACK to Aurora.

For the demo I'm going to create separate endpoints so I can continue using both, but in practice you'd do the cutover.

Alt Text

Table of Contents

Pre-reqs

On my local comp I have the following installed:

$ node --version
v12.13.1
$ npm --version
6.12.1
$ aws --version
aws-cli/2.0.23 Python/3.7.4 Darwin/19.5.0 botocore/2.0.0dev27
$ cdk --version
1.45.0 (build 0cfab15)
Enter fullscreen mode Exit fullscreen mode

Once you have those things... let's bootstrap a project using Amazon's CDK

mkdir blog-cdk-streams && cd blog-cdk-streams
cdk init --language typescript
npm i @aws-cdk/aws-apigateway @aws-cdk/aws-lambda @aws-cdk/aws-lambda-nodejs @aws-cdk/aws-dynamodb @aws-cdk/aws-rds @aws-cdk/aws-secretsmanager @aws-cdk/aws-ssm @aws-cdk/aws-ec2 @aws-cdk/aws-lambda-event-sources aws-sdk --save
Enter fullscreen mode Exit fullscreen mode

If, instead, you want to clone the project... have at it:

Mocking the "Legacy"

I'm going to provide links to the code for setting up the "legacy" configuration, but I'm not going to go through it in detail since it's a less important aspect of this post.

Create an Aurora Serverless MySQL table

AWS and the CDK make it surprisingly difficult to spin up a simple RDS implementation... you have to pre-configure a lot of things. I'm hoping one day they'll bake that into the CDK... Until then, I found this post useful as inspiration. I'm going to start with their Aurora Serverless class to get started a bit faster.

Alt Text

CDK not making Aurora very easy...

My version of this code is here: https://github.com/martzcodes/blog-cdk-streams/blob/master/lib/auroraserverless.ts

At the time of writing that blog post probably worked, but AWS seems to have made some changes since. I had to add one VERY IMPORTANT line to the cluster creation method:

const dbcluster = new CfnDBCluster(this, "apidbcluster", {
      engine: "aurora",
      engineMode: "serverless",
      masterUsername: secret.secretValueFromJson("username").toString(),
      masterUserPassword: secret.secretValueFromJson("password").toString(),
      deletionProtection: false,
      enableHttpEndpoint: true, // this is important!
      scalingConfiguration: {
        autoPause: true,
        minCapacity: 1,
        maxCapacity: 16,
        secondsUntilAutoPause: 300,
      },
      dbSubnetGroupName: new CfnDBSubnetGroup(this, "db-subnet-group", {
        dbSubnetGroupDescription: `${props.clusterName} database cluster subnet group`,
        subnetIds: props.vpc.selectSubnets({
          subnetType: ec2.SubnetType.PRIVATE,
        }).subnetIds,
      }).ref,
    });
Enter fullscreen mode Exit fullscreen mode

The enableHttpEndpoint is half of what allows the lambdas to talk to the Serverless project. Without it, you'd end up getting this error:

Exception BadRequestException: HttpEndpoint is not enabled for cluster blogcdkstreamsstack-auroraserverlesschefapidbclus-1j3rs66zfzfkc. Please refer to https://docs.aws.amazon.com/AmazonRDS/latest/AuroraUserGuide/data-api.html#data-api.troubleshooting
Enter fullscreen mode Exit fullscreen mode

Aurora Lambdas

On the Aurora side of things for this demo to work I need 3 lambdas with permissions to read/write to Aurora.

All the files end up following the same basic pattern:

import * as AWS from "aws-sdk";
import { RDSDataService } from "aws-sdk";

const RDSDATA = new AWS.RDSDataService();

AWS.config.update({
  maxRetries: 10,
  httpOptions: {
    timeout: 60000,
    connectTimeout: 60000,
  },
});

export const handler = async (event: any): Promise<any> => {
  try {
    console.log("START");
    console.log(event);
    console.log("ENV SECRETARN: " + process.env.SECRETARN);
    console.log("ENV DBCLUSTERARN: " + process.env.DBCLUSTERARN);
    console.log("ENV DBCLUSTERID: " + process.env.DBCLUSTERID);

    // TRY AND DO STUFF HERE

  } catch (e) {
    console.log(e);
    return {
      statusCode: 200,
      headers: { "Content-Type": "text/html; charset=utf-8" },
      body: `<html><body>Exception ${e} You've hit ${event.path} </body></html>\n`,
    };
  }
};
Enter fullscreen mode Exit fullscreen mode

For the INSERT lambda I have some actual interesting code:

interface Chef {
  firstName: string;
  lastName: string;
}

const processEvent = (event: any): Chef[] => {
  if (event.body) {
    const body = JSON.parse(event.body);
    return [
      {
        firstName: body.firstName || "Gordon",
        lastName: body.lastName || "Ramsay",
      },
    ];
  }
  if (event.Records) {
    const inserts = event.Records.filter(
      (evt: any) =>  evt.eventName === "INSERT"
    );
    const items = inserts.map((ins: any) => {
      return {
        firstName: ins.dynamodb.NewImage.firstName.S || "Gordon",
        lastName: ins.dynamodb.NewImage.lastName.S || "Ramsay",
      };
    });
    return items;
  }
  return [];
};
Enter fullscreen mode Exit fullscreen mode

That code checks the lambda event to see if it's coming from the API Gateway or from the DynamoDB Stream trigger (spoilers). If it has a body it came from the API Gateway, if it has Records it came from the stream. I then process it down into a common interface so the same lambda can be used in BOTH cases.

Alt Text

It works

Alternately, you could turn the original lambda into a step-function with the DynamoDB stream trigger and pre-process the data before sending it to the "original" / "legacy" lambda.

The three lambdas get created in the main blog-cdk-streams-stack.ts file using the experimental aws-lambda-nodejs module for CDK.

They then get assigned the policy statements needed to talk to Aurora.

Legacy APIGateway

Finally to tie the "legacy" side together we create the resources and endpoints to use the lambdas...

GET <apigatewayurl>/aurora/ # init endpoint
GET <apigatewayurl>/aurora/chefs # get all chefs
POST <apigatewayurl>/aurora/chefs # insert a chef
Enter fullscreen mode Exit fullscreen mode

Deploy "Legacy"

At this point we'd have everything up to Line 111... if you use my code you can comment out the stuff below and deploy and you'd have a working version of the "Legacy System".

To deploy the code to AWS it's as easy as running:

cdk bootstrap --profile
cdk deploy --profile personal
Enter fullscreen mode Exit fullscreen mode

Let it bake...

Run those and then let it bake.

The size of the changes are pretty impressive, but that's because it has to set up so much infrastructure to get Aurora up and running.

Create the DynamoDB Table

Now that we've established our Mock "legacy" system... we can get to the fun stuff.

Create the DynamoDB table with streams enabled:

    const dynamoTable = new Table(this, serviceName, {
      billingMode: BillingMode.PAY_PER_REQUEST,
      partitionKey: {
        name: `ChefId`,
        type: AttributeType.STRING,
      },
      removalPolicy: RemovalPolicy.DESTROY,
      tableName: serviceName,
      stream: StreamViewType.NEW_IMAGE, // this enables the streams
    });
Enter fullscreen mode Exit fullscreen mode

Alt Text

That's it!

DynamoDB Lambdas

There are only two "modernized" lambdas:

And connecting them to DynamoDB is much simpler using grantRead and grantReadWrite...

    dynamoTable.grantReadWriteData(updateDynamoChef);
    dynamoTable.grantReadData(getDynamoChefs);
Enter fullscreen mode Exit fullscreen mode

Update the API Gateway

For the demo's sake I'm going to keep the old endpoints. In practice I would update which functions the original endpoints were calling.

GET <apigatewayurl>/dynamo/chefs # get all chefs
POST <apigatewayurl>/dynamo/chefs # insert a chef
Enter fullscreen mode Exit fullscreen mode

DynamoDB Stream to trigger the legacy lambda

Finally... add an event source to the legacy lambda to trigger on the DynamoDB stream event...

    const eventSourceProps: DynamoEventSourceProps = {
      startingPosition: StartingPosition.LATEST,
      batchSize: 1,
    };
    writeAuroraChefs.addEventSource(new DynamoEventSource(dynamoTable as any, eventSourceProps) as any);
Enter fullscreen mode Exit fullscreen mode

Alt Text

Yeah... it's *that* simple...

Pièce de résistance

After re-running cdk deploy --profile personal and waiting for the updates to happen... let's walk through our endpoints.

I stored all of the requests here, so I'm just going to highlight the interesting bits.

1. GET https://nlqemzi885.execute-api.us-east-1.amazonaws.com/prod/aurora/ initializes the DB
2. GET https://nlqemzi885.execute-api.us-east-1.amazonaws.com/prod/aurora/chefs returns an empty list of chefs
3. POST https://nlqemzi885.execute-api.us-east-1.amazonaws.com/prod/aurora/chefs {"firstName":"Alton","lastName":"Brown"} inserts `Alton Brown` in the table
4. GET https://nlqemzi885.execute-api.us-east-1.amazonaws.com/prod/aurora/chefs (same endpoint as 2) returns "Alton Brown" row in the list
5. GET https://nlqemzi885.execute-api.us-east-1.amazonaws.com/prod/dynamo/chefs returns an empty table from dynamo
6. POST https://nlqemzi885.execute-api.us-east-1.amazonaws.com/prod/dynamo/chefs {"firstName":"Bobby","lastName":"Flay"} adds Bobby Flay to dynamo
7. GET https://nlqemzi885.execute-api.us-east-1.amazonaws.com/prod/dynamo/chefs confirms Bobby Flay is in dynamo
8. GET https://nlqemzi885.execute-api.us-east-1.amazonaws.com/prod/aurora/chefs (same endpoint as 2 and 4) returns Alton Brown and Bobby Flay in Aurora!
Enter fullscreen mode Exit fullscreen mode

Alt Text

Well said Chef Ramsay...

Top comments (5)

Collapse
 
sambasivaravuri profile image
sambasivaravuri

DynamoDB Stream to trigger the legacy lambda:

Could you please help me regarding this topic in Typescript.

I would like to enable triggering and streaming of new image records only of an dynamodb table in CDK.

Thank you.

Collapse
 
martzmakes profile image
Matt Martz

The examples above are in typescript. This part of the dynamodb table construct stream: StreamViewType.NEW_IMAGE is what enables streams specifically for new images (which fits your use). I'd need more detail from you to figure out what trouble you're running into.

Collapse
 
sambasivaravuri profile image
sambasivaravuri • Edited

Thank you Matt for quick response. I understand that these examples wrote in Typescript only.

My requirement is, I have 15 existing tables and for all existing 15 tables, i need to enable triggering as well as need to enable streams.

I need your guidance how to code it in Typescript like for all 15 existing different tables.
Please help me, whether i need to code it for one existing table and need to replicate for rest of the existing tables ? Or, we need to code it as generic one and need to pass all these tables as part of Environment variables.
Kindly guide me and also share the Typescript code.
Appreciate for quick response.

Thank you.

Thread Thread
 
sambasivaravuri profile image
sambasivaravuri

One more point forgot to mention that am looking for only new items in DyanmoDB table (new image) and Lambda needs to be triggered.

Thread Thread
 
martzmakes profile image
Matt Martz

If your tables were created by CDK to begin with, you should just be able to add that line to each table and re-deploy. If not, you'd need to import the table by ARN and apply the change. I haven't done that personally but you should be able to piece it together from the docs: docs.aws.amazon.com/cdk/api/latest...