Ingesting data into an OpenSearch cluster looks easy if you read the documentation. The truth is it is easy, but it all depends on how much you care about the data you are ingesting. Let me go one step back. Why do we even use OpenSearch? With the rise of AI, you also need a knowledge base. These knowledge bases can be hosted in OpenSearch. However, to use the OpenSearcg database, you must also fill it out with data.
I recently was challenged to create a database for GenAI purposes. We had a dataset that needed to be loaded into OpenSearch. Amazon OpenSearch Ingestion sounds like a service that can help you with that.
Preparing the data
First of all, the data is never in the format that you want it to be. First, we will need to transform it into something that we can ingest. For this use-case, I used StepFunctions (but avoid costly loops) each document coming in will trigger an execution. In my case, I wanted to be able to search the document with semantic meaning. In order to do this efficiently, we will need to chunk the documents into smaller bits. This way, we can generate embeddings for each chunk, which can be used for semantic search. You can create a single file with all chunks, and each chunk will have the same metadata set; the only difference is the chunk text and chunk identification. Once we have this file, we will place it on S3, S3 will send an S3 Event Trigger to an SQS Queue, and the OpenSearch Ingestion pipeline will ingest it into the OpenSearch database.
So far so good
There is nothing wrong with the approach I just described. This is a textbook example coming from the AWS documentation pages. But an ingestion pipeline will cost you money even when you don’t ingest anything. For this reason, I developed a Lambda function that would stop the pipeline when no messages are in the queue. It will also start the pipeline when there are messages in the queue. This was a nice optimization from a cost perspective.
After some testing, we expected 10k documents in the system, but we only had around 5k. Where did the other documents go? The dead-letter queues were empty, and we did not have any log traces of any failures whatsoever… It turned out that when you stop the ingestion pipeline, all messages in the buffer are lost.
Acknowledgments to the rescue
You can argue a lot about the fact that when you stop a pipeline, the documents in the buffer are lost. But that is the reality that I needed to deal with. You can enable persistent buffering, which would at least help with the data loss part. But you still need to start the pipeline to continue the job. I looked more in the direction of SQS. The whole purpose of SQS is to queue messages and ensure they are processed. After some investigation, I noticed two options I wanted to share.
version: '2'
s3_pipeline:
source:
s3:
workers: 10
notification_type: sqs
notification_source: s3
codec:
json: {}
compression: none
acknowledgments: true
records_to_accumulate: 100
on_error: retain_messages
sqs:
queue_url: https://sqs.eu-west-1.amazonaws.com/000000000000/MyQueue
maximum_messages: 10
visibility_timeout: 60s
poll_delay: 0s
visibility_duplication_protection: true
aws:
region: eu-west-1
sts_role_arn: arn:aws:iam::000000000000:role/MyRole
sink:
- opensearch:
serverless: false
hosts:
- https://vpcxxxxxx.eu-west-1.es.amazonaws.com
index: documents-${/index}
bulk_size: 5
max_retries: 3
dlq:
s3:
bucket: MyBucketName
key_path_prefix: dlq/
region: eu-west-1
sts_role_arn: arn:aws:iam::000000000000:role/MyRole
aws:
region: eu-west-1
sts_role_arn: arn:aws:iam::000000000000:role/MyRole
document_id: ${/chunk_id}
actions:
- type: delete
when: /operation == "delete"
- type: update
when: /operation == "update"
- type: index
when: /operation == "index"
The configuration options are acknowledgments
and visibility_duplication_protection
. The former will keep the message in the queue until it has been ingested into the sink, and the latter will ensure that the message stays in flight as long as it is in the buffer. These options will ensure that the SQS queue is used as intended. When a failure occurs, the message will reoccur in the queue and be retried. After x number of attempts, it will go to the dead-letter queue. These settings ensured that all 10k documents in the batch were ingested as expected.
The pipeline would still be stopped once the QSQ Queue is empty. To prevent this from happening I also included the ApproximateNumberOfMessagesNotVisible
next to the ApproximateNumberOfMessages
metric.
Dealing with updates and deletes
I also wanted to share how we deal with updates and deletions. Ingestion of a new document is easy, but updates and deletions are different. When you have ingested a document with 15 chunks, and the updated version only has 12, you need to delete 3 and potentially update some chunks. For example, this might occur when you delete a paragraph in the middle of a text.
First, you need to know if the document has already been ingested. A search action on the OpenSearch database can do this. This search action returns all the chunks already present in the database. You can use logic to determine what chunk needs to be updated and which needs to be deleted. We add three additional metadata fields in each chunk: index
, chunk_id
, and operation
.
The index
field determines the index to which the document is being ingested. The chunck_id
will become the internal document id, and the operation
field will determine if the chunk needs to be indexed, updated, or deleted.
Conclusion
Ingesting data into OpenSearch sounds straightforward, but the real challenge lies in ensuring reliability, efficiency, and cost-effectiveness. While the textbook approach works, real-world scenarios often demand optimizations—like dynamically stopping the ingestion pipeline to save costs or leveraging SQS acknowledgments to guarantee message processing.
By implementing these adjustments, we achieved a more resilient ingestion process that ensures all documents make it into OpenSearch as expected. The combination of Step Functions for document transformation, SQS for reliable queuing, and metadata tagging for updates and deletions provided a scalable and maintainable solution. At the end of the day, ingestion isn’t just about getting data into the system—it’s about making sure the right data gets there, stays there, and can be efficiently queried when needed.
Photo by Andrea Piacquadio
The post Optimizing OpenSearch Ingestion: Ensuring Reliability, Efficiency, and Cost Savings appeared first on Xebia.
Top comments (0)