DEV Community

Maciej Kucharski
Maciej Kucharski

Posted on

Streaming data from RDBMS with jooq

Introduction

Sometimes, it's necessary to process a large number of rows fetched from the database. When the row count reaches millions, things can get complicated, especially regarding memory usage. While implementing paging and processing rows page by page is a common approach, there are alternative solutions.

Relational databases provide components such as cursors, which can be used to process large datasets while fetching only a limited number of rows at a time. With the help of jooq framework,we can take it a step further—leveraging cursors under the hood while seamlessly integrating with Java's Stream API. Let’s see it in action!

Project setup

For the sake of a clean demonstration, I’ll keep things as simple as possible. The database will be set up using SQL scripts to avoid introducing any migration tools. It will contain 5 million rows, each with a few columns mimicking real-life data. I won’t use jOOQ’s type-safe query generation features (though they’re excellent—I highly recommend trying jOOQ) to keep the focus on one specific aspect. I’m using Spring Boot, but its usage is kept to a minimum. I couldn’t resist lombok, thou :)

Database

Database for this demo is initialized using a simple SQL script:

create table sample_data
(
    id             serial primary key,
    start_date     date      not null,
    type_id        integer   not null,
    description    text      not null,
    category_id    integer   not null,
    subcategory_id integer   not null,
    created_at     timestamp not null default current_timestamp
);

insert into sample_data
select id,
       date '2024-01-01' + (random() * 365)::int,
       round((random() * 15)::numeric, 2),
       md5(random()::text),
       round((random() * 30)::numeric, 2),
       round((random() * 200)::numeric, 2)
from generate_series(1, 5000000) as id;
Enter fullscreen mode Exit fullscreen mode

A row from this database table is mapped to a Java record with the following structure:

@Builder
public record SampleDataEntity(int id, String description, LocalDate startDate, LocalDateTime createdAt, int typeId,
                               int categoryId, int subcategoryId) {
}
Enter fullscreen mode Exit fullscreen mode

Standard way

Let’s start by evaluating the memory footprint in a scenario where all the data is loaded into memory. The repository implementation is straightforward:

@Repository
@FieldDefaults(makeFinal = true, level = AccessLevel.PRIVATE)
@RequiredArgsConstructor
public class SomeRepository {
    DSLContext dslContext;

    Collection<SampleDataEntity> fetchAll() {
        return dslContext.fetch("select * from sample_data")
                .stream()
                .map(SomeRepository::mapRow)
                .toList();
    }

    private static SampleDataEntity mapRow(org.jooq.Record row) {
        return SampleDataEntity.builder()
                .id(row.get("id", int.class))
                .description(row.get("description", String.class))
                .createdAt(row.get("created_at", LocalDateTime.class))
                .startDate(row.get("start_date", LocalDate.class))
                .typeId(row.get("type_id", int.class))
                .categoryId(row.get("category_id", int.class))
                .subcategoryId(row.get("subcategory_id", int.class))
                .build();
    }
}
Enter fullscreen mode Exit fullscreen mode

For the purposes of this demo, let’s simply count the number of retrieved rows. The Thread.sleep() is included to give me time to open VisualVM and check the metrics:

@Component
@FieldDefaults(level = AccessLevel.PRIVATE, makeFinal = true)
@RequiredArgsConstructor
@Slf4j
public class SimpleRunner implements CommandLineRunner {

    SomeRepository someRepository;

    @Override
    public void run(String... args) throws Exception {
        log.info("Runner started with args={}", args);
        Thread.sleep(Duration.ofSeconds(10));
        var count = someRepository.fetchAll()
                    .size();
            log.info("Fetched {} rows", count);
    }
}
Enter fullscreen mode Exit fullscreen mode

What are the results? The program runs smoothly but consumes around 5 GB of memory:

Memory usage view in VisualVM for approach fetching all the rows at once

Cursor way

Let’s modify our repository slightly to use a variant of the jOOQ API that returns a stream:

@Repository
@FieldDefaults(makeFinal = true, level = AccessLevel.PRIVATE)
@RequiredArgsConstructor
public class SomeRepository {
    DSLContext dslContext;

    @SneakyThrows
    Stream<SampleDataEntity> streamAll() {
        return dslContext.fetchStream("select * from sample_data")
                .map(SomeRepository::mapRow);
    }

    private static SampleDataEntity mapRow(org.jooq.Record row) {
        return SampleDataEntity.builder()
                .id(row.get("id", int.class))
                .description(row.get("description", String.class))
                .createdAt(row.get("created_at", LocalDateTime.class))
                .startDate(row.get("start_date", LocalDate.class))
                .typeId(row.get("type_id", int.class))
                .categoryId(row.get("category_id", int.class))
                .subcategoryId(row.get("subcategory_id", int.class))
                .build();
    }
}
Enter fullscreen mode Exit fullscreen mode

I’ll skip pasting the runner implementation since the only difference is in how the number of items is counted. So, how does the memory graph look this time? A significant improvement is immediately noticeable:

VisualVM's memory usage view for approach using the stream

The numbers look impressive, but there’s some bad news—the entire result set is still loaded into memory. The improvement comes from streaming over the mapped results, making them garbage-collection eligible earlier. How do we know this? It will become clear once we try a different API that allows us to control the number of fetched rows:

    @SneakyThrows
    Stream<SampleDataEntity> streamAll() {
        dslContext.settings().setFetchSize(100);
        var cursor = dslContext.fetchLazy("select * from sample_data");
        return cursor.stream()
                .map(SomeRepository::mapRow);
    }
Enter fullscreen mode Exit fullscreen mode

Running the program will produce the following log entry:
org.jooq.impl.AbstractResultQuery : Fetch Size : A fetch size of 100 was set on a auto-commit PostgreSQL connection, which is not recommended. See https://jdbc.postgresql.org/documentation/query/#getting-results-based-on-a-cursor

The documentation page linked above outlines several scenarios where the JDBC driver ignores fetchSize and loads the entire result set into memory. In this case, here’s what happens:

The Connection must not be in autocommit mode. The backend closes cursors at the end of transactions, so in autocommit mode the backend will have closed the cursor before anything can be fetched from it.
Enter fullscreen mode Exit fullscreen mode

Cursor - the proper way

Now that we know what’s wrong, let’s fix it! First, we need to disable auto-commit. This can be done by setting a property on the datasource:

spring.datasource.hikari.auto-commit=false
Enter fullscreen mode Exit fullscreen mode

Even though we’re only reading data and not writing, a transaction is now required. For the sake of this demo, let’s use a well-known annotation to handle it. Our modified runner code now looks like this:

@Component
@FieldDefaults(level = AccessLevel.PRIVATE, makeFinal = true)
@RequiredArgsConstructor
@Slf4j
public class StreamingRunner implements CommandLineRunner {

    SomeRepository repository;

    @Override
    @Transactional
    public void run(String... args) throws Exception {
        log.info("Streaming started with args={}", args);
        Thread.sleep(Duration.ofSeconds(10));

        var count = repository.streamAll()
                    .count();
        log.info("Found {} rows", count);

    }
}
Enter fullscreen mode Exit fullscreen mode

What’s the result? I’d say the application now consumes a reasonable amount of memory relative to its complexity:

VisualVM's memory view for the properly used stream

Conclusion

We’ve just learned another solution for situations where our application needs to process large amounts of data. I see this approach as more useful for batch processes that handle and aggregate vast datasets—for example, for reporting—rather than for serving data directly to the UI. With the significant reduction in memory consumption, it may even be possible to use the saved resources to spin up more instances, improving overall process throughput.

Final source code can be found on GitHub

Top comments (0)