DEV Community

Peter Saktor
Peter Saktor

Posted on

Handling Concurrency in CosmosDB with ETags

Handling Concurrency in CosmosDB with ETags

When working with distributed systems and databases like Azure CosmosDB, concurrency can be a tricky problem to handle effectively. Without proper safeguards, simultaneous updates to the same data can lead to conflicts and data corruption. Fortunately, CosmosDB provides a built-in mechanism to manage concurrency through ETags (Entity Tags).

In this post, we'll explore what ETags are, how they work in CosmosDB, and how you can leverage them to solve concurrency issues in your applications.

What is an ETag?

An ETag is a string value automatically maintained by CosmosDB for every document. It represents the version of the document at a given point in time. Every time a document is modified, its ETag value changes.

Think of it as a fingerprint of the document's current state:

  • Before an update: The ETag represents the state of the document before the modification.
  • After an update: A new ETag is generated, reflecting the updated state.

The Concurrency Challenge

In a typical scenario, two or more clients may attempt to read and update the same document at the same time. Without a mechanism to manage these simultaneous updates, one client's changes may overwrite the other's, leading to lost updates.

Here’s an example:

  1. Client A reads a document with ETag abc123.
  2. Client B reads the same document with ETag abc123.
  3. Both clients modify the document and send their updates back to CosmosDB.

Without checking the ETag, the last update (whichever arrives later) overwrites the first, discarding any intermediate changes.

How ETags Solve Concurrency

To address this, CosmosDB allows you to implement optimistic concurrency control using ETags. When updating a document, you can specify the ETag value you expect the document to have. If the document’s current ETag doesn't match the one provided in your request, CosmosDB will reject the update with a precondition failure.

Workflow:

  1. Read the document: Fetch the document along with its current ETag.
  2. Modify the document: Make your changes locally.
  3. Update with ETag: Include the ETag in the update request as a precondition.
  4. CosmosDB validates:
    • If the ETag matches: The update succeeds.
    • If the ETag doesn’t match: The update fails, signaling a conflict.

Implementation in Code

Let’s look at a practical example using the Azure CosmosDB SDK for .NET:

Fetching the Document

var container = cosmosClient.GetContainer("databaseId", "containerId");

// Read the document
var response = await container.ReadItemAsync<dynamic>("documentId", new PartitionKey("partitionKeyValue"));
var document = response.Resource;

// Store the ETag
string etag = response.ETag;
Enter fullscreen mode Exit fullscreen mode

Updating with ETag

// Modify the document
document["propertyToUpdate"] = "newValue";

// Prepare the request options with ETag
var requestOptions = new ItemRequestOptions
{
    IfMatchEtag = etag // Specify the expected ETag
};

try
{
    // Attempt the update
    var updateResponse = await container.ReplaceItemAsync(document, "documentId", new PartitionKey("partitionKeyValue"), requestOptions);
    Console.WriteLine("Update succeeded!");
}
catch (CosmosException ex) when (ex.StatusCode == System.Net.HttpStatusCode.PreconditionFailed)
{
    Console.WriteLine("Concurrency conflict detected. Update failed.");
}
Enter fullscreen mode Exit fullscreen mode

Key Scenarios Where ETags Shine

  1. Preventing Lost Updates: By validating the ETag before applying updates, you ensure no intermediate changes are overwritten.
  2. Conflict Resolution: If a conflict occurs, you can re-fetch the document, merge changes, and retry the update.
  3. Auditing Changes: Use ETags to identify when documents have been modified and by whom (alongside other metadata).

Best Practices

  1. Use Partition Keys: Always include the correct partition key when working with CosmosDB to optimize performance.
  2. Handle Precondition Failures Gracefully: Design your application to retry updates or prompt users to resolve conflicts.
  3. Log ETag Values: Maintain a history of ETags for debugging and auditing purposes.

Conclusion

Concurrency is a common challenge in distributed databases, but CosmosDB’s ETag mechanism provides a simple and effective way to manage it. By adopting optimistic concurrency control with ETags, you can safeguard your data integrity and create more resilient applications.

Practical example

using System;
using System.Net;
using System.Threading.Tasks;
using Microsoft.Azure.Cosmos;

namespace Infrastructure.CosmosDB
{
    public static class CosmosDBConcurrencyWrapper
    {
        public static async Task<string?> GetItemEtagAsync(string? id, PartitionKey partitionKey, Container container)
        {
            ArgumentNullException.ThrowIfNull(container, nameof(container));

            if (string.IsNullOrEmpty(id))
            {
                return null;
            }

            try
            {
                ItemResponse<dynamic> response = await container.ReadItemAsync<dynamic>(id, partitionKey);
                return response.ETag;
            }
            catch (CosmosException ex) when (ex.StatusCode == HttpStatusCode.NotFound)
            {
                return null;
            }
        }

        public static async Task<Tuple<ItemResponse<T>, bool>> TryUpsertItemAsync<T>(PartitionKey partitionKey, T item, Container container, string? etag)
        {
            ArgumentNullException.ThrowIfNull(container, nameof(container));
            ArgumentNullException.ThrowIfNull(item, nameof(item));

            try
            {
                ItemResponse<T> response = await container.UpsertItemAsync(item, partitionKey, new ItemRequestOptions { IfMatchEtag = etag });
                return new(response, response.StatusCode == HttpStatusCode.OK);
            }
            catch (CosmosException ex) when (ex.StatusCode == HttpStatusCode.PreconditionFailed)
            {
                throw new ConcurrencyException(ex);
            }
        }       
    }

    public class ConcurrencyException : Exception
    {
        public ConcurrencyException()
        {
        }

        public ConcurrencyException(string message)
            : base(message)
        {
        }

        public ConcurrencyException(string message, Exception inner)
            : base(message, inner)
        {
        }

        public ConcurrencyException(Exception inner)
            : base("Concurrency problem detected", inner)
        {
        }
    }
}
Enter fullscreen mode Exit fullscreen mode

Top comments (0)