DEV Community

Cover image for Building an Event-Driven Architecture for Content Embedding Generation with AWS Bedrock, DynamoDb, and AWS Batch

Building an Event-Driven Architecture for Content Embedding Generation with AWS Bedrock, DynamoDb, and AWS Batch

Hello everyone,

In this blog, I’ll walk you through building an event-driven pipeline that converts the contents in Dynamo DB into E*mbeddings, making them searchable via **OpenSearch* for vector search. The goal of this pipeline is to automatically handle the entire process whenever new content is added or existing content is modified in Dynamo DB

This event-driven architecture triggers each step in the process seamlessly, converting newly added or updated items into embeddings and storing them in OpenSearch

One of my key design goals is to minimize the level of coding for connecting services and reduce the reliance on Lambda Functions. Instead, I focused on leveraging AWS Services and Event Bridge to connect and automate the workflow.

This is how the workflow will look like

Before diving into the implementation Let’s prepare the scripts for inserting data and converting contents into embeddings and inserting them into OpenSearch

For this POC I am using this dataset from Kaggle https://www.kaggle.com/datasets/fernandogarciah24/top-1000-imdb-dataset

Code for inserting data into Dynamo DB:

import csv
import boto3
import uuid

# DynamoDB table name
DYNAMODB_TABLE_NAME = "content"

# Initialize DynamoDB client
dynamodb = boto3.resource('dynamodb',region_name="us-east-1")
table = dynamodb.Table(DYNAMODB_TABLE_NAME)

def process_csv_and_insert_to_dynamodb(csv_file_path):
    try:
        # Open and read the CSV file
        with open(csv_file_path, mode='r', encoding='utf-8') as file:
            csv_reader = csv.DictReader(file)
            content_id = 0  # Start content_id from 0

            # Iterate over each row in the CSV
            for row in csv_reader:
                # Prepare item for DynamoDB
                item = {
                    'content_id': content_id,
                    'content_title': row['Series_Title'],
                    'genre': row['Genre'],
                    'overview': row['Overview']
                }

                # Insert item into DynamoDB
                table.put_item(Item=item)

                print(f"Inserted: {item}")
                content_id += 1  # Increment content_id
                if content_id == 65: #stopping at first 65 records for testing
                    break
    except Exception as e:
        print(f"Error: {e}")

# Provide the path to your CSV file
csv_file_path = "movies.csv"

# Call the function
process_csv_and_insert_to_dynamodb(csv_file_path)
Enter fullscreen mode Exit fullscreen mode

Code for converting content into Embeddings:

import boto3
import json
import os
import time
from opensearchpy import OpenSearch, RequestsHttpConnection
from requests.auth import HTTPBasicAuth
from requests_aws4auth import AWS4Auth


credentials = boto3.Session().get_credentials()
aws_auth = AWS4Auth(
    credentials.access_key,
    credentials.secret_key,
    'us-east-1',
    'aoss',  # Service name for OpenSearch Serverless
    session_token=credentials.token
)


QUEUE_URL = "queue url"
OPENSEARCH_ENDPOINT = "open search serverless endpoint"
INDEX_NAME = "contents"
AWS_REGION = 'us-east-1'

# AWS Clients
sqs = boto3.client('sqs', region_name=AWS_REGION)
bedrock_runtime = boto3.client('bedrock-runtime', region_name=AWS_REGION)

# OpenSearch Client
def get_opensearch_client():
    return OpenSearch(
        hosts=[{'host': OPENSEARCH_ENDPOINT, 'port': 443}],
        http_auth=aws_auth,
        use_ssl=True,
        verify_certs=True,
        connection_class=RequestsHttpConnection
    )

# Function to poll messages from SQS
def poll_sqs_messages():
    response = sqs.receive_message(
        QueueUrl=QUEUE_URL,
        MaxNumberOfMessages=10,  # Fetch up to 10 messages
        WaitTimeSeconds=10
    )
    return response.get('Messages', [])

# Function to call Amazon Titan for embedding generation
def generate_embeddings(text):
    payload = {
        "inputText": text
    }
    response = bedrock_runtime.invoke_model(
        modelId="amazon.titan-embed-text-v1",
        contentType="application/json",
        accept="application/json",
        body=json.dumps(payload)
    )
    response_body = json.loads(response['body'].read())
    return response_body.get('embedding')

# Function to store embeddings in OpenSearch
def store_embeddings_in_opensearch(content_id, embedding, content_title,genre):
    client = get_opensearch_client()
    print("got the client")
    document = {
        "title": content_title,
        "overview": embedding,
        "genre": genre,
        "content_id": content_id
    }
    print("got the document")
    response = client.index(
        index=INDEX_NAME,
        body=document
    )
    print("got the response")
    return response

# Main Processing Function
def main():
    print("Starting Batch Job to process SQS messages...")

    messages = poll_sqs_messages()
    if not messages:
        print("No messages found in the queue. Exiting.")
        return

    for message in messages:
        try:
            body = json.loads(message['Body'])
            db_record = body['dynamodb']['NewImage']
            content_title = db_record['content_title']['S']
            overview = db_record['overview']['S']
            content_id = db_record['content_id']['N']
            genre = db_record['genre']['S']

            # Generate Embedding
            embedding = generate_embeddings(overview)
            print(f"Generated embedding for content: {content_title}")

            # Store in OpenSearch
            store_embeddings_in_opensearch(content_id, embedding, content_title,genre)
            # Delete message from SQS after successful processing
            # sqs.delete_message(
            #     QueueUrl=QUEUE_URL,
            #     ReceiptHandle=message['ReceiptHandle']
            # )
        #     print(f"Deleted message {document_id} from SQS.")

        except Exception as e:
            print(f"Error processing message: {str(e)}")
            continue

    print("Batch Job completed successfully.")

if __name__ == "__main__":
    main()
Enter fullscreen mode Exit fullscreen mode
  • Create a Dockerfile and Create a Docker image to push it to ECR
FROM python:3.9-slim
    USER root

    # Install dependencies
    RUN pip install boto3 opensearch-py requests requests-aws4auth

    # Copy the script into the container
    COPY process_embeddings_batch.py /app/process_embeddings_batch.py

    # Default command
    CMD ["python", "/app/process_embeddings_batch.py"]
Enter fullscreen mode Exit fullscreen mode
  • Visit the ECR section and Click on Create a Repo. Provide a name for the Repo and Click on Push Commands button once the Repo is Created

  • Use the push command to build and push the docker image

Step-by-step implementation:

  • Create a DynamoDB table and enable streams

  • Create a SQS Queue for holding the records of DB

  • Create an Event Bridge Pipe to Connect Dynamo DB and SQS

  • Create a Cloud Watch Alarm When messages in the Queue exceed more than 50 messages

  • Create a AWS Batch job definition to run jobs

  • Create a State Machine to Submit a job to AWS Batch

  • Create a rule in Even Bridge to listen for Alarm and Trigger the Step Function state machine

  • Create an OpenSearch Serverless collection and index

Create a DynamoDB table and enable streams:

  • Visit DynamoDB Service, click on the Create Tables button, Provide a name for the table, make content_id as the partition key, and type as Number

  • Visit the Exports and Streams tab and enable the Streams

Create a SQS Queue for holding the records of DB:

  • Visit the SQS service from the AWS console and click on Create a Queue. There is no need for a message order. So Go for the standard queue. Provide a name for the Queue and click on Create Queue

Create an Event Bridge Pipe to Connect Dynamo DB and SQS:

  • Visit Event Bridge Service and click on Create pipe

  • Select Source as DyanoDB and Target as SQS

  • Click on Create Pipe. This will push the dynamo db records to SQS

Create a Cloud Watch Alarm When messages in the Queue exceed more than 50 messages:

  • Visit the cloud watch service in the AWS console. From the Side panel Click on All Alarms and Click on Create Alarm

  • Select SQS ApproximateNumberOfMessagesVisible as the metric

  • Keep the conditions like greater than 50 messages

  • Skip the actions, provide a name for the alarm, and click on Create Alarm

Create a AWS Batch job definition to run jobs:

  • Click on Create a job definition, select Fargate as Orchestration type provide storage as 21GB and keep the rest of the fields as it is and visit the next section

  • Copy the URI of the ECR image and paste it here which we created at the start of the blog and keep the rest of the fields as it is

  • Keep the user as root and select logging as AWS Logs

  • Review everything and create the job definition

  • We also need to create a job queue and computing environment. Provide basic details and create a job queue with a compute environment

  • Here is how to create a computing environment

  • Visit the Job Queue and click on create a job queue with the environment we created

  • Now We have Job Definition and Job Queue with us. Let’s create a state machine to trigger jobs

Create a State Machine to Submit a job to AWS Batch:

  • Visit the step functions service and click on Create a state machine.

  • Select submit job state and select job definition and queue we created above

  • For the sample, I am using only the one required state. Keep adding more states based on your needs

  • Click on the submit job state and select job queue and definition from the right-side panel

  • Now save the state machine and Let’s connect the SQS and this state machine based on the cloud watch Alarm We Created above

Create a rule in Even Bridge to listen for Alarm and Trigger the Step Function state machine:

  • Visit the Event Bridge service and select Rules from the side menu

  • Provide a name for the rule and select an event with a pattern

  • In the event pattern paste the following JSON
 {
      "source": ["aws.cloudwatch"],
      "detail-type": ["CloudWatch Alarm State Change"],
      "detail": {
        "state": {
          "value": ["ALARM"]
        },
        "alarmName": ["sqs-threshold-alarm"]
      }
    }
Enter fullscreen mode Exit fullscreen mode

  • Select the target as the state machine we created and click on Create rule

Create an OpenSearch Serverless collection and index:

  • Visit the Serverless section from OpenSearch Service, Click on Create a Collection, Provide the details like this, and click on Create collection

  • Once the collection is created Click on Create an index with the following configuration

That’s it We have everything ready and connected to every service. Let’s run the inserting python script to insert data into Dynamo DB and check everything is Trigged well or not.

Note: Once the Threshold is reached it will take some time to get the state machine to be triggered

These are the jobs that got triggered whenever I inserted data into Dynamo DB

Verify whether records are inserted or not in the index. You can check the records count or dashboard to see the records

Once the data is ready you can build a vector search engine by following my previous article here

Building a vector search Engine

Please let me know if you are struck at any place and need any assistance through the comment section. Thanks. Have a great day

Top comments (0)