When implementing asynchronous communication between services using Kafka in a Spring Boot application, where a service produces a request and waits for a response, it is best to encapsulate the Kafka communication logic into a reusable abstraction layer or library. Below is the best approach to achieve this:
Best Approach
1. Use Kafka Request-Reply Pattern
Kafka is inherently asynchronous and not designed for direct request-response communication. However, you can implement a request-reply pattern by:
- Producing a message to a Kafka topic (e.g.,
request-topic
). - The consumer processes the message and produces a response to a
response-topic
. - The producer listens for the response on the
response-topic
using a unique correlation ID.
2. Spring Kafka Library
Spring Boot's Kafka integration (spring-kafka
) provides tools to build this pattern effectively. The library supports producing, consuming, and implementing correlation IDs for request-reply communication.
3. Leverage Correlation ID
Use a correlation ID to match the request with the response. This ID is sent with the request message and used to filter the response.
Steps to Implement
1. Dependency Setup
Add the following dependencies in your pom.xml
:
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter</artifactId>
</dependency>
2. Kafka Configuration
Define the producer and consumer factories, as well as templates for Kafka:
@Configuration
public class KafkaConfig {
@Bean
public ProducerFactory<String, String> producerFactory() {
Map<String, Object> configProps = new HashMap<>();
configProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
configProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
configProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
return new DefaultKafkaProducerFactory<>(configProps);
}
@Bean
public KafkaTemplate<String, String> kafkaTemplate() {
return new KafkaTemplate<>(producerFactory());
}
@Bean
public ConsumerFactory<String, String> consumerFactory() {
Map<String, Object> configProps = new HashMap<>();
configProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
configProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
configProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
configProps.put(ConsumerConfig.GROUP_ID_CONFIG, "request-reply-group");
return new DefaultKafkaConsumerFactory<>(configProps);
}
@Bean
public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, String> factory =
new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
return factory;
}
}
3. Implement Request-Reply Logic
Producer Service
The producer sends a request message and listens for the response with a CompletableFuture
:
@Service
public class KafkaRequestReplyService {
private final KafkaTemplate<String, String> kafkaTemplate;
private final Map<String, CompletableFuture<String>> pendingRequests = new ConcurrentHashMap<>();
@Value("${kafka.request.topic}")
private String requestTopic;
@Value("${kafka.response.topic}")
private String responseTopic;
public KafkaRequestReplyService(KafkaTemplate<String, String> kafkaTemplate) {
this.kafkaTemplate = kafkaTemplate;
}
public CompletableFuture<String> sendRequest(String request) {
String correlationId = UUID.randomUUID().toString();
CompletableFuture<String> future = new CompletableFuture<>();
pendingRequests.put(correlationId, future);
// Send request with correlation ID as a header
kafkaTemplate.send(requestTopic, request)
.addCallback(
success -> System.out.println("Request sent successfully"),
failure -> future.completeExceptionally(failure)
);
return future;
}
@KafkaListener(topics = "${kafka.response.topic}", groupId = "request-reply-group")
public void listenForResponse(ConsumerRecord<String, String> record) {
String correlationId = record.headers().lastHeader("correlationId").value().toString();
String response = record.value();
CompletableFuture<String> future = pendingRequests.remove(correlationId);
if (future != null) {
future.complete(response);
}
}
}
Key Points:
- A unique correlation ID is generated for each request.
- Responses are matched with the corresponding future by the correlation ID.
- The
KafkaListener
listens for responses and resolves the future.
Consumer Service
The consumer processes the request and sends a response with the same correlation ID:
@Service
public class KafkaConsumerService {
private final KafkaTemplate<String, String> kafkaTemplate;
@Value("${kafka.response.topic}")
private String responseTopic;
public KafkaConsumerService(KafkaTemplate<String, String> kafkaTemplate) {
this.kafkaTemplate = kafkaTemplate;
}
@KafkaListener(topics = "${kafka.request.topic}", groupId = "request-reply-group")
public void processRequest(ConsumerRecord<String, String> record) {
String request = record.value();
String correlationId = record.headers().lastHeader("correlationId").value().toString();
// Process the request (business logic)
String response = "Processed: " + request;
// Send the response back with the same correlation ID
kafkaTemplate.send(responseTopic, response)
.addCallback(
success -> System.out.println("Response sent successfully"),
failure -> System.err.println("Failed to send response")
);
}
}
4. Properties Configuration
Add the Kafka configuration properties to your application.properties
:
spring.kafka.bootstrap-servers=localhost:9092
kafka.request.topic=request-topic
kafka.response.topic=response-topic
5. Testing the Request-Reply
You can initiate a request and wait for the response as follows:
@RestController
@RequestMapping("/api")
public class TestController {
private final KafkaRequestReplyService kafkaRequestReplyService;
public TestController(KafkaRequestReplyService kafkaRequestReplyService) {
this.kafkaRequestReplyService = kafkaRequestReplyService;
}
@PostMapping("/send")
public ResponseEntity<String> sendRequest(@RequestBody String request) {
try {
CompletableFuture<String> responseFuture = kafkaRequestReplyService.sendRequest(request);
String response = responseFuture.get(10, TimeUnit.SECONDS); // Wait for response
return ResponseEntity.ok(response);
} catch (Exception e) {
return ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR).body("Error: " + e.getMessage());
}
}
}
Conclusion
The request-reply pattern with a custom implementation using spring-kafka
is the most flexible and effective approach for asynchronous communication in a Spring Boot application. You can encapsulate this logic in a service or library for reusability.
Top comments (0)