π° Beginners new to AWS CDK, please do look at my previous articles one by one in this series.
If in case missed my previous article, do find it with the below links.
π Original previous post at π Dev Post
π Reposted previous post at π dev to @aravindvcyber
In this article, let us refactor our previous stepfunction which directly invokes our lambda function by adding a similar queue that triggers lambda indirectly with a batch size limit. This could help us achieve better optimization in lambda concurrency like one in our previous article. scalable-event-driven-processing-using-eventbridge-and-sqs
Current flow βοΈ
Proposed flow π
Benefits achieved in this approach π£ββοΈ
There are multiple benefits associated with this approach as follows.
We can detach the direct invocation of our lambda by Stepfunction thereby triggering an indirect/buffered invocation from a standard SQS queue.
With this approach, we will be able to have the lambda concurrency limit at a minimum by batching the lambda triggers.
Also if you remember one thing our lambda is supposed to make a dynamodb write operation, which if running in high concurrency would need more write units while using our dynamodb table. With this approach, we can use maintain and utilize consistent write units in our table.
Besides this, we will be able to scale this easily in higher environments.
Also we can also make use of the dead letter queues for debugging and inspection.
Refining the queue used in our previous construct π
Let us edit our previous construct construct/sfn-simple.ts
by adding definitions for the dead letter queue and the working queue as follows.
Dead letter Queue πΎ
The dead letter queue needs to catch hold of the failed messages, so the working queue always has records to be processed.
const sfnCommonEventProcessorQueueDLQ: DeadLetterQueue = {
queue: new Queue(this, "sfnCommonEventProcessorQueueDLQ", {
retentionPeriod: Duration.days(14),
removalPolicy: RemovalPolicy.DESTROY,
queueName: "sfnCommonEventProcessorQueueDLQ",
}),
maxReceiveCount: 100,
};
Standard Queue π
The standard queue can also be a FIFO queue based on the cost and business requirements as shown below.
const sfnCommonEventProcessorQueue = new Queue(
this,
"sfnCommonEventProcessorQueue",
{
retentionPeriod: Duration.days(5),
removalPolicy: RemovalPolicy.DESTROY,
deliveryDelay: Duration.seconds(3),
queueName: "sfnCommonEventProcessorQueue",
visibilityTimeout: Duration.minutes(100),
deadLetterQueue: sfnCommonEventProcessorQueueDLQ,
}
);
New Job to push messages into our Queue π―
Just like how we defined a new lambda invoke job, we can define a new job that will push a message into a queue that will be later polled by our existing lambda asynchronously.
You could also find that we have fully reused the payload, inputPath, resultPath, resultSelector, etc.
const recordingQueue = new tasks.SqsSendMessage(
this,
"Record using Queue",
{
inputPath: "$",
messageBody: sfnTaskPayload,
queue: sfnCommonEventProcessorQueue,
comment: "Record message into dynamodb using SQS queue buffered lambda",
integrationPattern: sfn.IntegrationPattern.WAIT_FOR_TASK_TOKEN,
resultSelector: {
"Payload.$": "$",
"StatusCode.$": "$.statusCode",
},
resultPath: "$.recordResult",
}
);
Adding event source for the existing lambda βΊ
Now we will connect our new queue to our existing lambda inside the same construct.
As you can see that we have set a batch size of the queue this is even though there is n
number of state functions running concurrently. You may also use a FIFO
queue if find that the order of messages is very important while recording and processing.
triggerFunction.addEventSource(
new SqsEventSource(sfnCommonEventProcessorQueue, {
batchSize: 2,
maxBatchingWindow: Duration.seconds(10)
})
);
Here batchSize
is the maximum number of records to retrieve per invocation and maxBatchingWindow
defines the max time to wait before returning the records to lambda for processing. If not defined, the lambda returns immediately with as many records available under the batchSize
.
Callback Pattern π
Once again if you notice something, we have implemented the callback pattern, where we pause the execution of the stepfunction after we pushed the message to SQS and wait for lambda to post updates to the statemachine once it has done processing resulting in success or failure.
With this approach, we could understand where the actual work happening, and not keep polling for updates again and again until the result is ready.
callback pattern illustration β¦οΈ
callback pattern illustration when succeed πΌ
Changes in the lambda function π
The current changes we have made look sufficient at first sight, but they will fail, this is because the payload lambda current gets will be different from the old invocation. This is because while SQS is polled by lambda, the actual message will be inside an object which contains an array of event records. And inside the event record, the body property will be giving the actual message, when we passed as payload in our previous article. So certainly we need to tweak our lambda a little bit as follows.
{
"Records": [
{
"messageId": "ff11de7f-795a-4c85-8612-9d376e01df0d",
"receiptHandle": "**********",
"body": "{\"Record\":{\"createdAt\":\"2022-04-18T07:51:06Z\",\"messageId\":\"538a8436-9987-1f65-478d-815c77f00d0f\",\"event\":{\"message\":\"A secret message\"}},\"MyTaskToken\":\"**********\"}",
"attributes": {
"ApproximateReceiveCount": "3",
"SentTimestamp": "1650268267274",
"SenderId": "AROAYLZFFS6HZ3ESZEEBH",
"ApproximateFirstReceiveTimestamp": "1650268270274"
},
"messageAttributes": {},
"md5OfBody": "edddae585d76eb14439f5804dcef24a4",
"eventSource": "aws:sqs",
"eventSourceARN": "**********",
"awsRegion": "ap-south-1"
}
]
}
Rewriting the lambda code π
So, let us write the message recorder lambda function as shown below taking note of the above payload changes.
Let us think this gives us an additional step, rather this step helps us to process writing to lambda in a batch with records of messages during the polling process.
import { PutItemInput } from "aws-sdk/clients/dynamodb";
import { DynamoDB, StepFunctions } from "aws-sdk";
const sfn = new StepFunctions({ apiVersion: "2016-11-23" });
exports.processor = async function (event: any) {
const dynamo = new DynamoDB();
let result: any | undefined = undefined;
await Promise.all(
event.Records.map(async (Record: any) => {
const msg = JSON.parse(Record.body).Record;
const crt_time: number = new Date(msg.createdAt).getTime();
const putData: PutItemInput = {
TableName: process.env.MESSAGES_TABLE_NAME || "",
Item: {
messageId: { S: msg.messageId },
createdAt: { N: `${crt_time}` },
event: { S: JSON.stringify(msg.event) },
},
ReturnConsumedCapacity: "TOTAL",
};
try {
result = await dynamo.putItem(putData).promise();
} catch (err) {
const sendFailure: StepFunctions.SendTaskFailureInput = {
error: JSON.stringify(err),
cause: JSON.stringify({
statusCode: 500,
headers: { "Content-Type": "text/json" },
putStatus: {
messageId: msg.messageId,
ProcessorResult: err,
},
}),
taskToken: JSON.parse(Record.body).MyTaskToken,
};
console.log(sendFailure);
await sfn.sendTaskFailure(sendFailure, function (err: any, data: any) {
if (err) console.log(err, err.stack);
else console.log(data);
});
return sendFailure;
}
const sendSuccess: StepFunctions.SendTaskSuccessInput = {
output: JSON.stringify({
statusCode: 200,
headers: { "Content-Type": "text/json" },
putStatus: {
messageId: msg.messageId,
ProcessorResult: result,
},
}),
taskToken: JSON.parse(Record.body).MyTaskToken,
};
console.log(sendSuccess);
await sfn
.sendTaskSuccess(sendSuccess, function (err: any, data: any) {
if (err) console.log(err, err.stack);
else console.log(data);
})
.promise();
return sendSuccess;
})
);
};
lambda changes summary π
exports.processor = async function (event: any) {
const dynamo = new DynamoDB();
let result: any | undefined = undefined;
await Promise.all(
event.Records.map(async (Record: any) => {
const msg = JSON.parse(Record.body).Record;
/////rest the same like previous article
})
);
};
TaskToken changes π
Besides the taskToken
has been also changed, inside the lambda from previous example to the below value.
Before
taskToken: Record.MyTaskToken,
After
taskToken: JSON.parse(Record.body).MyTaskToken,
Stepfunction Timeout πͺ
Note that the timeout set for the stepfunction will cancel and exit the execution if it is waiting for more time than the defined stepfunction timeout value.
b799d9d4-5c32-5f42-89bb-830315b023f7 INFO TaskTimedOut: Task Timed Out: 'Provided task does not exist anymore'
at Request.extractError (/var/runtime/node_modules/aws-sdk/lib/protocol/json.js:52:27)
at Request.callListeners (/var/runtime/node_modules/aws-sdk/lib/sequential_executor.js:106:20)
at Request.emit (/var/runtime/node_modules/aws-sdk/lib/sequential_executor.js:78:10)
at Request.emit (/var/runtime/node_modules/aws-sdk/lib/request.js:686:14)
at Request.transition (/var/runtime/node_modules/aws-sdk/lib/request.js:22:10)
at AcceptorStateMachine.runTo (/var/runtime/node_modules/aws-sdk/lib/state_machine.js:14:12)
at /var/runtime/node_modules/aws-sdk/lib/state_machine.js:26:10
at Request.<anonymous> (/var/runtime/node_modules/aws-sdk/lib/request.js:38:9)
at Request.<anonymous> (/var/runtime/node_modules/aws-sdk/lib/request.js:688:12)
at Request.callListeners (/var/runtime/node_modules/aws-sdk/lib/sequential_executor.js:116:18) {
code: 'TaskTimedOut',
Deploying our changes π°
While you deploy your changes you may get a prompt to accept the IAM change for granting the necessary privileges for sfn to send messages to SQS, similar to what we explicitly granted to lambda in our last article.
Sample execution log πΌ
Thus we have achieved the purpose of this article to make the stepfunction to push the messages as records into a queue, which is later polled by the lambda and processed further to return the status to the statemachine. Hence this architecture is much more scalable than the previous one with the direct invocation of the lambda function.
We will be adding more connections to our stack and making it more usable in the upcoming articles by creating new constructs, so do consider following and subscribing to my newsletter.
β We have our next article in serverless, do check out
https://dev.to/aravindvcyber/aws-cdk-101-using-layers-in-lambda-functions-and-saving-json-to-s3-46fg
π Thanks for supporting! π
Would be great if you like to β Buy Me a Coffee, to help boost my efforts.
π Original post at π Dev Post
π Reposted at π dev to @aravindvcyber
π¬ AWS CDK 101 - π¬ Adding Queue to buffer our stepfunction directly invoking lambda#typescript #serverless #messagequeue #awscdk #aws https://t.co/Fm785cuPf8
β Aravind V (@Aravind_V7) April 19, 2022
Top comments (0)