DEV Community

Cover image for I tried making DynamoDB Streams easier to use in Java
JICA98
JICA98

Posted on

I tried making DynamoDB Streams easier to use in Java

DynamoDB offers DynamoDB Streams, which provides a way to capture changes to items in a table. Although DynamoDB Streams offer valuable insights into data changes, managing and processing these streams can be challenging (when your are not using lambda or kinesis).
Anyways, I just wanted to use DynamoDB Streams along with Flux in Java and that's what inspired me to write this wrapper around DynamoDB Stream's low level api (which you can find here).

First of all, what's diffifult with the existing api, you ask? Well here are a few things you need to do, to listen to the changes of your DynamoDB:

  1. Well first you need to call DescribeStream API which takes your StreamARN as input and returns you a couple of Shards.
  2. After which you need to get the ShardIterator for each these Shards using the GetShardIterator API.
  3. And now, since you have an iterator, you can call the GetRecords API to get the actual DynamoDB Records (which has new and old images)

On the top of all of these, you need to take care of the expired shards as well as poll these steps since the api does not provide a stream.

And so I created dynamo-streams which will take care of all of these steps for you and you just need to listen to these changes. These changes will come through a Flux.

So, where to we get started?

  • First add the maven or gradle dependency for the dynamo-streams
 <dependency>
     <groupId>io.github.jica98</groupId>
     <artifactId>aws-java-dynamo-streams</artifactId>
     <version>0.0.5</version>
 </dependency>   
Enter fullscreen mode Exit fullscreen mode
implementation group: 'io.github.jica98', name: 'aws-java-dynamo-streams', version: '0.0.5'
Enter fullscreen mode Exit fullscreen mode
  • If you are using Spring, then you can declare a bean for the DynamoStreams in one your config classes.

   private static final String STREAM_ARN = "arn:aws:dynamodb:us-east-1:your-dynamo-db-stream";

    @Bean(destroyMethod = "shutdown")
    protected AmazonDynamoDBStreams streamsClient() {
        return AmazonDynamoDBStreamsClientBuilder
                .standard()
                .withRegion(Regions.US_EAST_1)
                .withCredentials(new DefaultAWSCredentialsProviderChain())
                .build();
    }

    @Bean(destroyMethod = "shutdown")
    protected DynamoStreams<DataRoot> dynamoStreams(AmazonDynamoDBStreams dynamoDBStreams) {
        return new DynamoStreams<>(
                StreamConfig.<DataRoot>builder()
                        .clazz(DataRoot.class)
                        .dynamoDBStreams(dynamoDBStreams)
                        .streamARN(STREAM_ARN)
                .build());
    }

Enter fullscreen mode Exit fullscreen mode
  • Finally in your controller, autowire it and subscribe to the changes using a Flux. There are a couple of utility methods such as newImages or oldImages which will return flux of only the newImages or oldImages respectively.
    @Autowired
    private DynamoStreams<DataRoot> dynamoStreams;

    @PostConstruct
    void postConstruct() {
        // Initialize here to start streaming events
        dynamoStreams.initialize();
    }

    @GetMapping(produces = MediaType.TEXT_EVENT_STREAM_VALUE)
    public Flux<ServerSentEvent<DataRoot>> streamData() {
        return dynamoStreams.stream()
                .newImages()
                .map(data -> ServerSentEvent.<DataRoot>builder()
                        .data(data)
                        .id(UUID.randomUUID().toString())
                        .build());
    }
Enter fullscreen mode Exit fullscreen mode

Hope this makes the DynamoDB Streams easier to use in your code and I acknowledge the fact that the correct way of subscribing to the events will always be through a lambda.

You can find the project over here.

Changes I wish to do in the future:

  1. Polling is only active when there is atleast one subscriber. (Right now it polls at a configurable interval irrespective of the subscriber count)
  2. Add more utility methods.
  3. Add support for something other than flux.

Top comments (0)