DEV Community

João Esperancinha
João Esperancinha

Posted on

CQRS — Command Query Responsibility Segregation — A Java, Spring, SpringBoot, and Axon Example

Introduction

The first time this term appeared in Software Engineering lexicons was all the way back in 1997. It appeared in the book “Object-Oriented Software Construction” by Bertrand Meyer. At this time it was known as CQS (Command Query Separation). CQRS was finally introduced by Greg Young in 2010. This software architecture is designed to mitigate known caveats of Object-Oriented architecture. Specifically in this case, these caveats are:

  • Writes and reads generate, almost all the time, a very different load against the system.
  • Writing operations are often much more complicated than reading and they affect different scopes of the application. Write operations need to guarantee that correct, validated and consistent data reaches the store correctly.
  • Security concerns are also different between write and read operations.
  • The way data is normally stored in the database is in a 3NF or in another optimized form. When we read data, we often use it to build views to provide the user with readable data. The data presented to the user is hardly ever normalized. This is also referred to as denormalized data. In other words, we present the data to the user with the only concern that the user can read it.

Through this very simple list, you can see that reading and writing operate in different ways, they have different concerns, they generate different performance concerns, and they generate different loads which can put a lot of strain on the system in very different ways.

CQRS is also a form of DDD(Domain Driven Design). It wasn’t initially thought out to be an actual DDD. Every application has at least one bounded context. Bounded contexts are difficult to define but essentially, they isolate a responsibility of the application like for example, handing of debit cards, library book archiver, and patient data. The latter for example can be divided into multiple subdomains. There could be a separate domain to keep track of chronic illnesses like HIV and another different one to keep track of the common flu. Both of them have different data concerns. Being a chronic disease, HIV patients will need to keep track of a lot more data like T-Cell count, virus load, and other blood data for a lifetime. Patients with the flu don’t need so much monitoring. There are a lot more privacy concerns related to the first domain than the latter. Assessing domains is a bit of an art and it requires the analytical skills of the engineer to determine them.
Once you have defined your domain, it’s time to start the design of CQRS for it. As you may have already seen, this design has as its main concern, the separation of reading operations and writing operations. Write operations cannot be read operations. In the same way, “read” operations also cannot be “write” operations.

We will look into the source code I’m providing on GitHub
So let's start!


Commands

Commands are defined as any operation that can mutate the data without returning a value. In essence, these all write operations. In CRUD terms, these are the Creation, Update, and Delete operations. You may also refer to them as CUD. The Read operation is implicitly not a part of the operations we need for the Command side of our application.

Queries

Queries are defined as any operation that will never mutate data and will always return values. Ultimately these are all read operations. Query operations are only read operations. They are only the R in CRUD terms.

Models

There are many ways to implement CQRS. The point is always to keep read operations apart from write operations as much as possible. In our implementation, we are also going to separate operations and use Event Sourcing. This will allow us to further separate the medium where we are going to keep our data. We will use two different databases. One database will be a part of the command flows and the other database will be part of the read flows.

Implementation

Let’s first have a look at how all the moving parts will work:

blogcenter

In this example, I’m going to try to make this as simple as possible. There are much more elaborated options out there. There are more complicated, dynamic, and scalable options out there. One of these options would be to use RabbitMQ or any other kind of message queueing system to further decouple all components. However, may lead the attention out of the scope of this tutorial. The point here is to present a solution with all the fundamental points of CQRS at its core.
We are going to need a lot of dependencies, but for our example, we will focus on the Axon dependencies. You can find a complete listing of all the dependencies used in this example in the root pom.xml:

<project xmlns="http://maven.apache.org/POM/4.0.0">
    <modelVersion>4.0.0</modelVersion>
    ...
    <dependencyManagement>
        <dependencies>
            ...
            <dependency>
                <groupId>org.axonframework</groupId>
                <artifactId>axon-spring-boot-starter</artifactId>
                <version>${axon.version}</version>
                <exclusions>
                    <exclusion>
                        <groupId>org.axonframework</groupId>
                        <artifactId>axon-server-connector</artifactId>
                    </exclusion>
                </exclusions>
            </dependency>
            <dependency>
                <groupId>org.axonframework.extensions.mongo</groupId>
                <artifactId>axon-mongo</artifactId>
                <version>${axon-mongo.version}</version>
            </dependency>
            <dependency>
                <groupId>org.projectlombok</groupId>
                <artifactId>lombok</artifactId>
                <version>${lombok.version}</version>
                <optional>true</optional>
            </dependency>
            ...
        </dependencies>
    </dependencyManagement>
    ...
</project>
Enter fullscreen mode Exit fullscreen mode

Notice that I am using the axon framework. This is one of the most popular frameworks to implement the CQRS design. Namely, we are going to see how EventHandlers and Aggregators work, how the EventBus works, and how the CommandBus works. We are also going to see how this works with MongoDB in order to, ultimately, get our database updated with new data.

Core

In this module, I’m going to consider everything that would be common to the application. I could also have named this module as common. For our application to run, we are going to need to consider a few important things. Given its complexity, I am only going to implement a read-all operation and a save operation. These will be my query and my command respectively. We’ll need a DTO to get our data into our system:

public record VideoSeriesDto(
        String name,
        Integer volumes,
        BigDecimal cashValue,
        Genre genre
) {
    @Builder
    public VideoSeriesDto {
    }
}
Enter fullscreen mode Exit fullscreen mode

Sending data via a writer is an operation that needs to be understood by the reader and also by the writer. Our only common command should be located here:

@Builder
public class AddSeriesEvent {
    String id;
    String name;
    Integer volumes;
    BigDecimal cashValue;
    Genre genre;
...(
    getters and
    setters ommitted)
}
Enter fullscreen mode Exit fullscreen mode

Finally, we know from the schema, that both the “write service” and the “read service” will need to have access to the EventStore. This essentially is, at the end of the tail, our MongoDB database. Axon has very nice out-of-the-box libraries that allow us to easily implement this Event Sourcing mechanism. this is part of the reason why I chose this. It makes for a very simple form of implementation:

@Configuration
public class AxonConfig {

    @Value("${spring.data.mongodb.host:127.0.0.1}")
    private String mongoHost;

    @Value("${spring.data.mongodb.port:27017}")
    private int mongoPort;

    @Value("${spring.data.mongodb.database:test}")
    private String mongoDatabase;

    @Bean
    public TokenStore tokenStore(Serializer serializer) {
        return MongoTokenStore.builder().mongoTemplate(axonMongoTemplate()).serializer(serializer).build();
    }

    @Bean
    public EventStorageEngine eventStorageEngine(MongoClient client) {
        return MongoEventStorageEngine.builder().mongoTemplate(DefaultMongoTemplate.builder().mongoDatabase(client).build()).build();
    }

    @Bean
    public MongoTemplate axonMongoTemplate() {
        return DefaultMongoTemplate.builder().mongoDatabase(mongo(), mongoDatabase).build();
    }

    @Bean
    public MongoClient mongo() {
        return MongoClients.create(
                MongoClientSettings.builder()
                        .applyToClusterSettings(builder ->
                                builder.hosts(List.of(
                                        new ServerAddress(mongoHost, mongoPort))))
                        .build());
    }
}
Enter fullscreen mode Exit fullscreen mode

Command Service

First, we need to implement a representation of our command. In the case of our command service, we only have a command to add further video series. Therefore, our command has the same properties as the actual series. Note the id field:

@Data
@Builder
@EqualsAndHashCode
@ToString
public class AddVideoSeriesCommand {

    @TargetAggregateIdentifier
    private String id;

    private String name;

    private Integer volumes;

    private BigDecimal cashValue;

    private Genre genre;

}
Enter fullscreen mode Exit fullscreen mode

The id field is indeed a String. This is essentially our operation ID. It can be implemented in several ways. We just need to make sure that it is always a unique string, number, or whatever we choose.
Now it’s time to implement the aggregator which will send our command through the command bus and make it reach our command handler:

@Slf4j  
@NoArgsConstructor  
@Aggregate  
@Data  
public class VideoSeriesAggregate {

    @AggregateIdentifier  
    private String id;  

    @CommandHandler  
    public VideoSeriesAggregate(AddVideoSeriesCommand command) {  
        apply(AddSeriesEvent.builder()  
                .id(UUID.randomUUID().toString())  
                .cashValue(command.getCashValue())  
                .genre(command.getGenre())  
                .name(command.getName())  
                .volumes(command.getVolumes()).build()  
        );  
    }  

    @EventSourcingHandler  
    public void on(AddSeriesEvent event) {  
        this.id = event.getId();  
    }  

}  
Enter fullscreen mode Exit fullscreen mode

Notice the EventSourcingHandler. It doesn’t seem to be doing much, but remember that in this code section you are looking at the contents of the Aggregate element. If you look at the mongo database you will find something like this:

{  
"_id" : ObjectId("5df8ac587a0bba4960afce68"),  
"aggregateIdentifier" : "ed313d16-8d94-480a-85a0-b6897bcca4f5",  
"type" : "SeriesAggregate",  
"sequenceNumber" : NumberLong(0),  
"serializedPayload" : "<org.jesperancinha.video.core.events.AddSeriesEvent><id>ed313d16-8d94-480a-85a0-b6897bcca4f5</id><name>wosssda</name><volumes>10</volumes><cashValue>123.2</cashValue><genre>woo</genre></org.jesperancinha.video.core.events.AddSeriesEvent>",  
"timestamp" : "2019-12-17T10:22:16.640261Z",  
"payloadType" : "org.jesperancinha.video.core.events.AddSeriesEvent",  
"payloadRevision" : null,  
"serializedMetaData" : "<meta-data><entry><string>traceId</string><string>398a250f-8086-40e7-a767-1aa793231f62</string></entry><entry><string>correlationId</string><string>398a250f-8086-40e7-a767-1aa793231f62</string></entry></meta-data>",  
"eventIdentifier" : "2ac1a49f-0124-4f6e-b13f-140c8f36979a"  
}
Enter fullscreen mode Exit fullscreen mode

Notice the aggregateIndentifier. This is our id. You need the EventSourcingHandler in order to complete the request and have your Event sourced to the EventStore.
Now we only need to complete our application by implementing a Controller:

@RestController
@RequestMapping("/video-series")
public class VideoSeriesController {

    private final CommandGateway commandGateway;

    public VideoSeriesController(CommandGateway commandGateway) {
        this.commandGateway = commandGateway;
    }

    @PostMapping
    public void postNewVideoSeries(
            @RequestBody
                    VideoSeriesDto videoSeriesDto) {
        commandGateway.send(
                AddVideoSeriesCommand.builder()
                        .name(videoSeriesDto.name())
                        .volumes(videoSeriesDto.volumes())
                        .genre(videoSeriesDto.genre())
                        .cashValue(videoSeriesDto.cashValue())
                        .build());
    }
}
Enter fullscreen mode Exit fullscreen mode

Notice that we are injecting a CommandGateway. This is precisely the gateway that allows us to send commands into our system.
Finally, the Spring Boot Launcher:

@SpringBootApplication
@Import(AxonConfig.class)
public class VideoAppCommandLauncher {
    public static void main(String[] args) {
        SpringApplication.run(VideoAppCommandLauncher.class);
    }
}
Enter fullscreen mode Exit fullscreen mode

To complete our application we still need to configure our Spring Boot Launcher:

# spring
server.port=8080
# mongodb
spring.data.mongodb.host=localhost
spring.data.mongodb.port=27017
spring.data.mongodb.database=cqrs
Enter fullscreen mode Exit fullscreen mode

Query Service

The query service is essentially a reader of the EventStore and will act upon it without the user intervention. The query service needs to perform queries. In this way, I implemented a command to do that just that:
public class FindAllVideoSeriesQuery {

}

Notice that this command ended up being just an empty class. That is done on purpose. We do not need parameters to pass through a read all operation, but we do need its representation.
Because we are accessing a database and storing records, we now need to implement the Entity responsible for this data:

@Getter
@Setter
@ToString
@AllArgsConstructor
@NoArgsConstructor
@Builder
@Entity
@Table(name = "VIDEO_SERIES")
public class VideoSeries {

    @Id
    @GeneratedValue(strategy = IDENTITY)
    @Column
    private Long id;

    @Column
    private String name;

    @Column
    private Integer volumes;

    @Column
    private BigDecimal cashValue;

    @Column
    @Enumerated(EnumType.STRING)
    private Genre genre;

    @Override
    public boolean equals(Object o) {
        if (this == o) return true;
        if (o == null || Hibernate.getClass(this) != Hibernate.getClass(o)) return false;
        VideoSeries that = (VideoSeries) o;

        return Objects.equals(id, that.id);
    }

    @Override
    public int hashCode() {
        return Objects.hash(getId(), getName(), getVolumes(), getCashValue(), getGenre());
    }
}
Enter fullscreen mode Exit fullscreen mode

As you may already have guessed, in this implementation we are going to use JPA repositories:
public interface VideoSeriesRepository extends JpaRepository {

}

On the query side, we have EventHandlers which are very similar in shape to the Aggregate. The difference, of course, is that they process immediately once they receive a command event:

@Service  
@ProcessingGroup("video-series")  
public class VideoSeriesEventHandler {

    private final VideoSeriesRepository videoSeriesRepository;  

    public VideoSeriesEventHandler(VideoSeriesRepository videoSeriesRepository) {  
        this.videoSeriesRepository = videoSeriesRepository;  
    }  

    @EventHandler  
    public void on(AddSeriesEvent event) {  
        videoSeriesRepository.save(VideoSeries  
                .builder()  
                .name(event.getName())  
                .volumes(event.getVolumes())  
                .genre(event.getGenre())  
                .cashValue(event.getCashValue())  
                .build());  
    }  

    @QueryHandler  
    public List<VideoSeriesDto> handle(FindAllVideoSeriesQuery query) {  
        return videoSeriesRepository.findAll().stream().map(  
                videoSeries -> VideoSeriesDto.builder()  
                        .name(videoSeries.getName())  
                        .volumes(videoSeries.getVolumes())  
                        .cashValue(videoSeries.getCashValue())  
                        .genre(videoSeries.getGenre())  
                        .build()).collect(Collectors.toList());  
    }  

}
Enter fullscreen mode Exit fullscreen mode

Notice that instead of CommandHandler, we now have QueryHandler. Instead of EventSourcingHandler we now have EventHandler. There are annotations used to distinguish what happens in the command service and in the query service respectively. Also, the id isn’t there. The id isn’t important because no data will be going to the event store. All the data is handled directly with the JPA repositories.
We can now focus our attention on the Controller for the query service controller:

@RestController  
@RequestMapping("/video-series")  
public class VideoSeriesController {

    @Autowired  
    private QueryGateway queryGateway;  

    @GetMapping  
    public List<VideoSeriesDto> gertAllVideoSeries() {  
        return queryGateway.query(new FindAllVideoSeriesQuery(), ResponseTypes.multipleInstancesOf(VideoSeriesDto.class))  
                .join();  
    }  
}  
Enter fullscreen mode Exit fullscreen mode

And finally our Query Spring Boot Launcher:


@SpringBootApplication
@Import(AxonConfig.class)
public class VideoAppQueryLauncher {
    public static void main(String[] args) {
        SpringApplication.run(VideoAppQueryLauncher.class);
    }
}  
Enter fullscreen mode Exit fullscreen mode

To complete our application we need to configure it:

# spring
server.port=8090
# h2
spring.h2.console.path=/spring-h2-video-series-query-console
spring.h2.console.enabled=true
# datasource
spring.datasource.url=jdbc:h2:file:~/spring-datasource-video-series-query-file;auto_server=true
spring.datasource.driver-class-name=org.h2.Driver
spring.datasource.username=sa
spring.datasource.password=sa
spring.sql.init.schema-locations=classpath*:schema-h2.sql
spring.sql.init.mode=always
# hibernate
spring.jpa.properties.hibernate.dialect=org.hibernate.dialect.H2Dialect
spring.jpa.hibernate.ddl-auto=none
spring.jpa.show-sql=true
# mongodb
spring.data.mongodb.host=localhost
spring.data.mongodb.port=27017
spring.data.mongodb.database=cqrs
Enter fullscreen mode Exit fullscreen mode

Give it some structure:

drop table if exists VIDEO_SERIES;

create table VIDEO_SERIES  
(  
ID         bigint auto_increment primary key not null,  
NAME       varchar(100)                      not null,  
VOLUMES    int                               not null,  
CASH_VALUE decimal                           not null,  
GENRE      varchar(100)                      not null  
);
Enter fullscreen mode Exit fullscreen mode

And finally some data:

insert into VIDEO_SERIES (NAME, VOLUMES, CASH_VALUE, GENRE) values ('Modern Family', 12, 12.3, 'SITCOM');  
insert into VIDEO_SERIES (NAME, VOLUMES, CASH_VALUE, GENRE) values ('Six Feet Under', 10, 34.3, 'DRAMA');  
insert into VIDEO_SERIES (NAME, VOLUMES, CASH_VALUE, GENRE) values ('Queer as Folk', 24, 55.3, 'DRAMA');
Enter fullscreen mode Exit fullscreen mode

We are almost ready to make some tests. We need now consider how are we going to launch our persistence databases. In our example, we mention 2 important ones: MongoDB and H2. In the code, you’ll find also Postgres in other configurations. There are multiple ways to start this application combo. In the default profile, there only needs to be a running MondDB database on port 27017. We can do this by installing mongo or by using containers. This is why you’ll find three profiles on both the command and the query SpringBoot applications:

  • default — Runs with MongoDB and embedded H2 — run with make docker-mongo or with a local running MongoDB
  • local — Runs with MongoDB and Postgres — run with make docker-databases or with local running MongoDB and Postgres databases
  • prod — Runs with MongoDB and Postgres in a common network with docker-compose — Just run make build-docker and the system will be set up for you

Check the Readme.md file for further details on how to run this on your machine.
What I did for testing is very simple. First I performed a request to see all my current data:

$ curl localhost:8090/video-series
Enter fullscreen mode Exit fullscreen mode
[
  {
    "name": "Modern Family",
    "volumes": 12,
    "cashValue": 12.3,
    "genre": "SITCOM"
  },
  {
    "name": "Six Feet Under",
    "volumes": 10,
    "cashValue": 34.3,
    "genre": "DRAMA"
  },
  {
    "name": "Queer as Folk",
    "volumes": 24,
    "cashValue": 55.3,
    "genre": "DRAMA"
  }
]
Enter fullscreen mode Exit fullscreen mode

As you can see, we get three series. Let’s add a new one:

curl -d '{ "name":"True Blood", "volumes":30, "cashValue": 1323.2, "genre": "Drama"}' -H "Content-Type: application/json" -X POST http://localhost:8080/video-series
Enter fullscreen mode Exit fullscreen mode

You should now see:

curl localhost:8090/video-series
Enter fullscreen mode Exit fullscreen mode
[
  {
    "name": "Modern Family",
    "volumes": 12,
    "cashValue": 12.3,
    "genre": "SITCOM"
  },
  {
    "name": "Six Feet Under",
    "volumes": 10,
    "cashValue": 34.3,
    "genre": "DRAMA"
  },
  {
    "name": "Queer as Folk",
    "volumes": 24,
    "cashValue": 55.3,
    "genre": "DRAMA"
  },
  {
    "name": "True Blood",
    "volumes": 30,
    "cashValue": 1323.2,
    "genre": "Bloody"
  }
]
Enter fullscreen mode Exit fullscreen mode

Conclusion

Note that although we can see that this works, it’s very important that you understand what happened behind the curtain for this application. The separation between the “write” and “read”, referred to as segregation of command and query is what makes the foundations of this architecture. The more decoupled the architecture is designed, the better it is. There are thousands of corner cases and special situations in the landscape of DDD and CQRS. Event sourcing is just one of the ways to get this implemented. In our example, we used Spring, SpringBoot, and Axon to get our commands and events across our network. We didn’t use any messaging queuing system. I do intend to write another article on that, but that will be for later. For the time being, I hope you have enjoyed this tutorial about this very simple example.
You’ve probably noticed that I’m not using Java records everywhere. This is because in some instances I could not get the Axon converters to work with records, namely with the AddSeriesEvent. This issue is being resolved. Have a look at the issues document for more details on this.
Have a look at the unit and integration tests. There I’ve used Kotlin, Kotest, Testcontainers, and Mockk, which are modern alternatives for Java, Jupiter 5, and Embedded databases. There is an example that goes through the whole architecture and tests the flow from issuing a command to handling the event, to storing and querying it back from the database.

I have placed all the source code of this application on GitHub
I hope that you have enjoyed this article as much as I enjoyed writing it.
Thanks in advance for your help, and thank you for reading!

References

Top comments (0)