DEV Community

Cover image for Building a Data Ingestion Solution for Amazon Bedrock Knowledge Bases
Anthony Wat for AWS Community Builders

Posted on • Originally published at blog.avangards.io

Building a Data Ingestion Solution for Amazon Bedrock Knowledge Bases

Introduction

2025 started off busy, and only recently have I had the chance to catch up on all the new Amazon Bedrock features that were launched during re:Invent 2024. These new capabilities make it easier than ever to build a comprehensive RAG solution using a low-code approach. As I explored these new features, I realized that most, if not all, are functional features. I feel that there isn’t a lot of guidance on the operational aspects of Bedrock services, so I decided to write more about this topic.

A key component of any RAG system is the data ingestion pipeline. Amazon Bedrock Knowledge Bases has built-in data ingestion that does the heavy lifting, and synchronizations can be triggered on demand. From an operational perspective, the end-to-end process should ideally be automated and aligned with either the update cadence of the data source or a designated maintenance window. This is an ideal use case for a Lambda-based automation solution, for which I have built a basic version. In this blog post, we’ll walk through its design and implementation.

Design Overview

The overall design of the solution is depicted in the following diagram:

Solution architecture

The solution works as follows:

  1. A Lambda function, triggered by an EventBridge schedule rule, periodically starts an ingestion (a.k.a. sync) job for each specified knowledge base and data source. The function also sends a message with job ID information to an SQS queue.

  2. Another Lambda function, triggered by an EventBridge schedule rule, frequently fetches messages from the SQS queue. For each message, the function uses the job ID information to get details about the ingestion job. The message is removed from the queue if the job has completed, and a notification is sent to one of the two SNS topics depending on whether the job is successful or failed.

  3. (Optional) Additional downstream tasks can be performed by subscribing to the SNS topics.

With the high-level architecture in mind, let’s now dive into the detailed design of each major component.

Component Design: Starting Ingestion Jobs

Amazon Bedrock Knowledge Bases simplifies ingestion for data sources it natively supports such as Amazon S3 and Web Crawler. Its default parsing logic covers most cases, while its default chunking logic allows the selection of different strategies that could improve data retrieval quality.

For Amazon S3, while it is possible to start ingesting data as the S3 bucket is updated (using S3 event notifications for instance), it is not recommended when the knowledge base is in use especially during high usage periods. For Web Crawler, detecting website updates - especially one you don't own - is often difficult. Instead, a more reliable approach is to schedule ingestion during a maintenance window (for example, after midnight). This, as we know, can easily be implemented using an EventBridge rule that runs on a schedule.

When using default settings, you simply need to synchronize the data source when the data source is updated. This is done programmatically via the StartIngestionJob action in the Agents for Amazon Bedrock API. This action is available as the start_ingestion_job method in Boto3, which is used in our Python-based Lambda function.

Since ingestion is asynchronous, you must check job statuses separately. In our event-based setup, each job’s details (knowledge base ID, data source ID, ingestion job ID) is passed to the other Lambda function that checks ingestion job statuses. To facilitate communication of the decoupled components, we can use an SNS topic, SQS queue, or DynamoDB table. Since long-running jobs require multiple status checks, an SQS standard queue seems like a better fit.

Lastly, we need to configure which knowledge bases, data sources, and SQS queues the Lambda function should manage. AWS Systems Manager Parameter Store is an obvious choice. To structure the knowledge base and data source information, we’ll use a JSON list that contains an object with the knowledge base ID and the list of data source IDs, for example:

[
  {
    "knowledge_base_id" = "YO4R9AYHQZ",
    "data_source_ids"   = ["5IHZ5YAIBY"]
  }
]
Enter fullscreen mode Exit fullscreen mode

Now that we've outlined the design, let's look at the implementation of the Lambda function:

import boto3
import json
from botocore.exceptions import ClientError

bedrock_agent = boto3.client('bedrock-agent')
sqs = boto3.client('sqs')
ssm = boto3.client('ssm')


def lambda_handler(event, context):
    try:
        # Retrieve the JSON config from Parameter Store
        response = ssm.get_parameter(Name='/start-kb-ingestion-jobs/config-json')
        config_json = response['Parameter']['Value']
        config = json.loads(config_json)

        for record in config:
            knowledge_base_id = record.get('knowledge_base_id')
            for data_source_id in record.get('data_source_ids'):
                # Start the ingestion job
                print(f'Starting ingestion job for data source {data_source_id} of knowledge base {knowledge_base_id}')
                response = bedrock_agent.start_ingestion_job(
                    knowledgeBaseId=knowledge_base_id,
                    dataSourceId=data_source_id
                )
                ingestion_job_id = response['ingestionJob']['ingestionJobId']

                # Send a message to the SQS queue
                response = ssm.get_parameter(Name='/start-kb-ingestion-jobs/sqs-queue-url')
                sqs_queue_url = response['Parameter']['Value']
                message = {
                    'knowledge_base_id': knowledge_base_id,
                    'data_source_id': data_source_id,
                    'ingestion_job_id': ingestion_job_id
                }
                sqs.send_message(
                    QueueUrl=sqs_queue_url,
                    MessageBody=json.dumps(message)
                )
        return {
            'statusCode': 200,
            'body': 'Success'
        }
    except ClientError as e:
        return {
            'statusCode': 500,
            'body': f'Client error: {str(e)}'
        }
    except Exception as e:
        return {
            'statusCode': 500,
            'body': f'Unexpected error: {str(e)}'
        }
Enter fullscreen mode Exit fullscreen mode

Component Design: Checking Ingestion Job Statuses

The second piece of the puzzle is checking the status of any ingestion jobs initiated by the solution and sending notifications when a job completes, fails, or is canceled. Notifications should also include any warnings to highlight issues ingesting specific documents that could impact the quality of the RAG solution.

To retrieve ingestion job details, the GetIngestionJob action in the Agents for Amazon Bedrock API can be used. The Boto3 equivalent would be the get_ingestion_job method. The required parameters are included in each SQS message sent by the Lambda function that starts ingestion jobs as described earlier. The approach is to poll and process SQS messages on a schedule (for example, every five minutes).

SQS polling can be tricky, especially if you’re unfamiliar with it. Short and long polling determine how long a ReceiveMessage API request (or its Boto3 equivalent, the receive_message method) waits for at least one message to show up in the queue. Here, long polling is not ideal since the best practice is to minimize Lambda function runtime when idle. It is also unnecessary given the Lambda function is run on a schedule anyway. Additionally, SQS’s distributed nature affects how many messages the ReceiveMessage API returns. Even with a higher MaxNumberOfMessages number, receiving multiple messages aren’t guaranteed in each call. Consequently, you must call the API until it returns empty result.

For notifications, the publish-subscribe pattern fits the scenario well. We’ll use two Amazon SNS topics: one for successful job completions and another for failures and cancellations. Administrators can handle subscriptions downstream as needed, whether via email or additional Lambda processing.

Finally, SSM Parameter Store will store configuration details, including the SQS queue URL and SNS topic ARNs, ensuring consistency. The resulting Lambda function could look like this:

import boto3
import json
from botocore.exceptions import ClientError

bedrock_agent = boto3.client('bedrock-agent')
ssm = boto3.client('ssm')
sns = boto3.client('sns')
sqs = boto3.client('sqs')


def get_ssm_parameter(name):
    response = ssm.get_parameter(Name=name, WithDecryption=True)
    return response['Parameter']['Value']


def get_ingestion_job(knowledge_base_id, data_source_id, ingestion_job_id):
    response = bedrock_agent.get_ingestion_job(
        knowledgeBaseId=knowledge_base_id,
        dataSourceId=data_source_id,
        ingestionJobId=ingestion_job_id
    )
    return response['ingestionJob']


def lambda_handler(event, context):
    try:
        sqs_queue_url = get_ssm_parameter('/check-kb-ingestion-job-statuses/sqs-queue-url')
        success_sns_topic_arn = get_ssm_parameter('/check-kb-ingestion-job-statuses/success-sns-topic-arn')
        failure_sns_topic_arn = get_ssm_parameter('/check-kb-ingestion-job-statuses/failure-sns-topic-arn')

        response = sqs.receive_message(
            QueueUrl=sqs_queue_url,
            MaxNumberOfMessages=10
        )
        while 'Messages' in response:
            messages = response['Messages']
            for message in messages:
                body = json.loads(message['Body'])
                knowledge_base_id = body['knowledge_base_id']
                data_source_id = body['data_source_id']
                ingestion_job_id = body['ingestion_job_id']

                print(
                    f'Checking ingestion job status for knowledge base {knowledge_base_id} data source {data_source_id} job {ingestion_job_id}')
                ingestion_job = get_ingestion_job(knowledge_base_id, data_source_id, ingestion_job_id)
                print(
                    f'Ingestion job summary: \n\n{json.dumps(ingestion_job, indent=2, sort_keys=True, default=str)}')
                job_status = ingestion_job['status']
                if job_status == 'COMPLETE':
                    sns.publish(
                        TopicArn=success_sns_topic_arn,
                        Subject=f'Ingestion job for knowledge base {knowledge_base_id} data source {data_source_id} job {ingestion_job_id} Completed',
                        Message=json.dumps(ingestion_job, indent=2, sort_keys=True, default=str)
                    )
                elif job_status == 'FAILED':
                    sns.publish(
                        TopicArn=failure_sns_topic_arn,
                        Subject=f'Ingestion job for knowledge base {knowledge_base_id} data source {data_source_id} job {ingestion_job_id} FAILED',
                        Message=json.dumps(ingestion_job, indent=2, sort_keys=True, default=str)
                    )
                elif job_status == 'STOPPED':
                    sns.publish(
                        TopicArn=failure_sns_topic_arn,
                        Subject=f'Ingestion job for knowledge base {knowledge_base_id} data source {data_source_id} job {ingestion_job_id} STOPPED',
                        Message=json.dumps(ingestion_job, indent=2, sort_keys=True, default=str)
                    )

                if job_status in ['COMPLETE', 'FAILED', 'STOPPED']:
                    sqs.delete_message(
                        QueueUrl=sqs_queue_url,
                        ReceiptHandle=message['ReceiptHandle']
                    )
            response = sqs.receive_message(
                QueueUrl=sqs_queue_url,
                MaxNumberOfMessages=10
            )
        return {
            'statusCode': 200,
            'body': 'Success'
        }
    except ClientError as e:
        return {
            'statusCode': 500,
            'body': f'Client error: {str(e)}'
        }
    except Exception as e:
        return {
            'statusCode': 500,
            'body': f'Unexpected error: {str(e)}'
        }
Enter fullscreen mode Exit fullscreen mode

Terraform Configuration Design

No solution is complete without an easy way to deploy it. Terraform is an obvious choice given my advocacy for it, however any IaC solutions or even frameworks such as AWS SAM would do. Since the event-based automation architecture is quite typical, I won’t go into too much details on how the Terraform configuration is developed. But here is a snippet related to the first Lambda function:

# Data sources and local values omitted for brevity

resource "aws_sqs_queue" "check_kb_ingestion_job_statuses" {
  name = "check-kb-ingestion-job-statuses"
}

resource "aws_ssm_parameter" "start_kb_ingestion_jobs_config_json" {
  name  = "/start-kb-ingestion-jobs/config-json"
  type  = "String"
  value = jsonencode(var.start_kb_ingestion_jobs_config_json)
}

resource "aws_ssm_parameter" "start_kb_ingestion_jobs_sqs_queue_url" {
  name  = "/start-kb-ingestion-jobs/sqs-queue-url"
  type  = "String"
  value = aws_sqs_queue.check_kb_ingestion_job_statuses.id
}

resource "aws_iam_role" "lambda_start_kb_ingestion_jobs" {
  name = "FunctionExecutionRoleForLambda-start-kb-ingestion-jobs"
  assume_role_policy = jsonencode({
    Version = "2012-10-17"
    Statement = [
      {
        Action = "sts:AssumeRole"
        Effect = "Allow"
        Principal = {
          Service = "lambda.amazonaws.com"
        }
        Condition = {
          StringEquals = {
            "aws:SourceAccount" = "${local.account_id}"
          }
        }
      }
    ]
  })
}

resource "aws_iam_role_policy_attachment" "lambda_start_kb_ingestion_jobs_lambda_basic_execution" {
  role       = aws_iam_role.lambda_start_kb_ingestion_jobs.name
  policy_arn = data.aws_iam_policy.lambda_basic_execution.arn
}

resource "aws_iam_role_policy" "lambda_start_kb_ingestion_jobs" {
  name = "FunctionExecutionRolePolicyForLambda-start-kb-ingestion-jobs"
  role = aws_iam_role.lambda_start_kb_ingestion_jobs.name
  policy = jsonencode({
    Version = "2012-10-17"
    Statement = [
      {
        Action = [
          "ssm:GetParameter",
          "ssm:GetParameters",
          "ssm:GetParametersByPath"
        ]
        Effect   = "Allow"
        Resource = "arn:${local.partition}:ssm:${local.region}:${local.account_id}:parameter/*"
      },
      {
        Action   = "sqs:SendMessage"
        Effect   = "Allow"
        Resource = "arn:${local.partition}:sqs:${local.region}:${local.account_id}:*"
      },
      {
        Action = [
          "bedrock:StartIngestionJob"
        ]
        Effect   = "Allow"
        Resource = "arn:${local.partition}:bedrock:${local.region}:${local.account_id}:knowledge-base/*"
      }
    ]
  })
}

resource "aws_lambda_function" "start_kb_ingestion_jobs" {
  function_name = "start-kb-ingestion-jobs"
  role          = aws_iam_role.lambda_start_kb_ingestion_jobs.arn
  description   = "Lambda function that starts ingestion jobs for Bedrock Knowledge Bases"
  filename      = data.archive_file.start_kb_ingestion_jobs_zip.output_path
  handler       = "index.lambda_handler"
  runtime       = "python3.13"
  architectures = ["arm64"]
  timeout       = 60
  # source_code_hash is required to detect changes to Lambda code/zip
  source_code_hash = data.archive_file.start_kb_ingestion_jobs_zip.output_base64sha256
}

resource "aws_cloudwatch_event_rule" "start_kb_ingestion_jobs" {
  name                = "lambda-start-kb-ingestion-jobs"
  schedule_expression = var.start_kb_ingestion_jobs_schedule
}

resource "aws_cloudwatch_event_target" "start_kb_ingestion_jobs" {
  rule = aws_cloudwatch_event_rule.start_kb_ingestion_jobs.name
  arn  = aws_lambda_function.start_kb_ingestion_jobs.arn
}

resource "aws_lambda_permission" "start_kb_ingestion_jobs" {
  statement_id  = "AllowExecutionFromCloudWatch"
  action        = "lambda:InvokeFunction"
  function_name = aws_lambda_function.start_kb_ingestion_jobs.function_name
  principal     = "events.amazonaws.com"
  source_arn    = aws_cloudwatch_event_rule.start_kb_ingestion_jobs.arn
}

# Remaining resources omitted for brevity
Enter fullscreen mode Exit fullscreen mode

What you will notice is that:

  • The IAM and resource policies follow the least privilege principle.

  • The SSM parameters uses a simple parameter hierarchy based on the Lambda function name. If deploying multiple copies of the solution, consider using a variable to set the function name and related source names.

  • The Lambda function uses the arm64 architecture for ~20% better cost efficiency per GB-second/month vs. x86_64, though the impact in this solution is negligible.

  • The default Lambda execution timeout of three seconds is insufficient. Increasing it to 60 seconds provides a good safety net.

Deploying and Testing the Solution

✅ You can find the complete Terraform configuration and source code in the 3_kb_data_ingestion directory in this GitHub repository.

To deploy and test the solution, you need a knowledge base with at least one data source that has content to ingest either in an S3 bucket or a crawlable website. You can set this up in the Bedrock console using the vector database quick start options. Alternatively, deploy a sample knowledge base using the Terraform configuration from my blog post How To Manage an Amazon Bedrock Knowledge Base Using Terraform. This configuration is also available in the same GitHub repository under the 2_knowledge_base directory.

With the prerequisites in place, deploy the solution as follows:

  1. From the root of the cloned GitHub repository, navigate to 3_kb_data_ingestion.

  2. Copy terraform.tfvars.example as terraform.tfvars and update the variables to match your configuration.

* By default, the schedule for the `start-kb-ingestion-jobs` Lambda function is daily at 0:00 UTC, while the schedule for the `check_kb_ingestion_job_statuses` Lambda function is every five minutes.
Enter fullscreen mode Exit fullscreen mode
  1. Configure your AWS credentials.

  2. Run terraform init and terraform apply -var-file terraform.tfvars.

Once deployed, test the solution by adding an email subscription to the SNS topics check-kb-ingestion-job-statuses-success and check-kb-ingestion-job-statuses-failure for your e-mail address so that you can receive email notifications. Confirm your subscriptions using the link in the verification emails.

Adding an email subscription to the SNS topics

Next, manually invoke the start-kb-ingestion-jobs Lambda function in the Lambda console within the five minutes of the scheduled check_kb_ingestion_job_statuses runs. Invoke it a few times, pausing for a few seconds between each call, to create multiple jobs for the next function to check for.

Invoking the start-kb-ingestion-jobs Lambda function manually

After the next check_kb_ingestion_job_statuses invocation completes at the scheduled time, check the CloudWatch logs to confirm it ran successfully. You should also receive a few emails from SNS with its status. Here is one example:

Success email notification

The message shows several warnings about files that couldn’t be ingested. In my case, I used a S3 bucket that contained tar.gz files, which are unsupported. There was also one file exceeding the 50 MB limit, so it was also skipped. With this information, you can remediate these issues to ensure good data input to the knowledge base.

Once you've verified the solution works, remove the SNS subscription and replace it with those that better fit your needs. If you don’t plan to keep the knowledge base, delete it along with the vector store (for example, the OSS index) to avoid unnecessary costs.

Summary

In this blog post, we examined the development of a Lambda-based data ingestion solution for Bedrock knowledge bases. The design follows a familiar event-based pattern, leveraging AWS APIs to perform the required tasks and Terraform for deployment. That said, while checking ingestion job statues on a schedule works, it is not as efficient as a true event-based system which Amazon Bedrock does not seem to support at the moment. Perhaps I can update the solution once more support for data ingestion job events are added in the future.

Another interesting experience while writing this blog post is that I used GitHub Copilot to develop both the Lambda functions and the Terraform configuration. While it didn’t produce ready-to-use code, it saved me significant time. I’ll share more about this experience in my next blog post.

In the mean time, please check out other helpful content in the Avangards Blog. Thanks for reading!

Top comments (0)