1. Adding Dependencies
First, add the necessary dependencies to your pom.xml
file:
<!-- RocketMQ Spring Boot dependency for Spring Boot 3 -->
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-spring-boot-starter</artifactId>
<version>2.3.1</version>
<exclusions>
<exclusion>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-client</artifactId>
</exclusion>
</exclusions>
</dependency>
<!-- Dependency compatible with MQ cluster version 5.3.0 -->
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-client</artifactId>
<version>5.3.0</version>
</dependency>
2. Configuration File bootstrap.yaml
Configure your RocketMQ settings in the bootstrap.yaml
file:
rocketmq:
name-server: 192.168.1.1:9876;192.168.1.2:9876;192.168.1.3:9876 # Replace with actual NameServer addresses
consumer:
group: consume-group-test
access-key: access # Configure if ACL is used
secret-key: secret
consume-message-batch-max-size: 50 # Max messages per batch
pull-batch-size: 100 # Max messages pulled from Broker
topics:
project: "group-topic-1"
groups:
project: "consume-group-1" # Use different groups for different business processes
3. Configuration Class MqConfigProperties
Create the configuration class MqConfigProperties
:
import org.apache.rocketmq.spring.autoconfigure.RocketMQProperties;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.stereotype.Component;
import lombok.Data;
import java.io.Serializable;
/**
* RocketMQ Configuration Class
*/
@Data
@Component
@ConfigurationProperties(prefix = "rocketmq")
public class MqConfigProperties implements Serializable {
private static final long serialVersionUID = 1L;
@Autowired
private RocketMQProperties rocketMQProperties;
private TopicProperties topics;
private GroupProperties groups;
/**
* Topic Configuration Class
*/
@Data
public static class TopicProperties implements Serializable {
private static final long serialVersionUID = 1L;
private String project;
}
/**
* Consumer Group Configuration Class
*/
@Data
public static class GroupProperties implements Serializable {
private static final long serialVersionUID = 1L;
private String project;
}
}
4. Implementing the Consumer Code
Create the consumer class UserConsumer
:
import com.alibaba.fastjson2.JSONObject;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.client.consumer.rebalance.AllocateMessageQueueAveragely;
import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.remoting.RPCHook;
import org.apache.rocketmq.spring.support.RocketMQUtil;
import org.springframework.context.ApplicationContext;
import org.springframework.context.SmartLifecycle;
import org.springframework.stereotype.Component;
import lombok.extern.slf4j.Slf4j;
import javax.annotation.Resource;
import java.util.List;
/**
* Batch Consumer Implementation
*/
@Component
@Slf4j
public class UserConsumer implements SmartLifecycle {
@Resource
private MqConfigProperties mqConfigProperties;
@Resource
private ApplicationContext applicationContext;
private volatile boolean running;
private DefaultMQPushConsumer consumer;
@Override
public void start() {
if (isRunning()) {
throw new IllegalStateException("Consumer is already running");
}
initConsumer();
setRunning(true);
log.info("UserConsumer started successfully.");
}
@Override
public void stop() {
if (isRunning() && consumer != null) {
consumer.shutdown();
setRunning(false);
log.info("UserConsumer stopped.");
}
}
@Override
public boolean isRunning() {
return running;
}
private void setRunning(boolean running) {
this.running = running;
}
private void initConsumer() {
String topic = mqConfigProperties.getTopics().getProject();
String group = mqConfigProperties.getGroups().getProject();
String nameServer = mqConfigProperties.getRocketMQProperties().getNameServer();
String accessKey = mqConfigProperties.getRocketMQProperties().getConsumer().getAccessKey();
String secretKey = mqConfigProperties.getRocketMQProperties().getConsumer().getSecretKey();
RPCHook rpcHook = RocketMQUtil.getRPCHookByAkSk(applicationContext.getEnvironment(), accessKey, secretKey);
consumer = rpcHook != null
? new DefaultMQPushConsumer(group, rpcHook, new AllocateMessageQueueAveragely())
: new DefaultMQPushConsumer(group);
consumer.setNamesrvAddr(nameServer);
consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET);
consumer.setConsumeMessageBatchMaxSize(100); // Set the batch size for consumption
consumer.subscribe(topic, "*");
consumer.setMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
log.info("Received {} messages", msgs.size());
for (MessageExt message : msgs) {
String body = new String(message.getBody());
log.info("Processing message: {}", body);
User user = JSONObject.parseObject(body, User.class);
processUser(user);
}
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
consumer.start();
log.info("UserConsumer initialized with topic [{}] and group [{}].", topic, group);
}
private void processUser(User user) {
log.info("Processing user with ID: {}", user.getId());
// Handle user-related business logic
}
}
5. Producer Example Code
To produce batch messages, you can use the following UserProducer
class:
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.common.message.Message;
import java.util.ArrayList;
import java.util.List;
public class UserProducer {
private DefaultMQProducer producer;
public void sendBatchMessages(List<User> users, String topic) {
List<Message> messages = new ArrayList<>();
for (User user : users) {
messages.add(new Message(topic, JSONObject.toJSONString(user).getBytes()));
}
try {
producer.send(messages);
} catch (Exception e) {
log.error("Error sending batch messages", e);
}
}
}
6. Additional Optimization Suggestions
Performance Optimization: You can adjust the size of the consumer thread pool. By default, it's set to
consumeThreadMin=20
andconsumeThreadMax=20
. In high-concurrency scenarios, increasing the thread pool size can enhance performance.Error Handling: When consumption fails, be cautious with
RECONSUME_LATER
to avoid infinite retry loops. Set a maximum retry count based on your business requirements.Tenant Isolation: Use different groups for different business modules to avoid consuming data from the wrong group. This is especially crucial in production environments.
Top comments (0)