I am using below functions but my kafkaListener is keeps on consuming messages even if my consumer is in paused state.
import org.apache.kafka.clients.consumer.Consumer;
private Consumer<String, String> kafkaConsumer;
public void pauseKafkaConsumer() {
if (!kafkaConsumer.paused().isEmpty()) {
return;
}
// Pause all assigned partitions
Collection<TopicPartition> assignedPartitions =
kafkaConsumer.assignment();
kafkaConsumer.poll(0);
kafkaConsumer.pause(assignedPartitions);
}
// Resume the Kafka consumer
public void resumeKafkaConsumer() {
// Resume all paused partitions
Collection<TopicPartition> pausedPartitions =
kafkaConsumer.paused();
kafkaConsumer.resume(pausedPartitions);
}
@KafkaListener(topics = "#{'${spring.kafka.consumer.topic}'}", groupId = "#{'${spring.kafka.consumer.groupId}'}", containerFactory = "kafkaListenerContainerFactory")
public void consume(String stream, Consumer<?, ?> consumer, Acknowledgment acknowledgment) {
this.kafkaConsumer = (Consumer<String, String>) consumer;
if (getEventCount() > 10) {
pauseKafkaConsumer();
return;
}
}
Top comments (0)