第五章:消费者
深入了解 RocketMQ 消费者的使用,包括消费模式、拉取策略和消息处理。
最后更新: 2024-01-15
页面目录
RocketMQ 消费者
本章详细介绍 RocketMQ 消费者的使用和配置。
消费者概述
┌─────────────────────────────────────────────────────────────────┐
│ 消费者工作流程 │
├─────────────────────────────────────────────────────────────────┤
│ │
│ Consumer │
│ │ │
│ │ 1. 启动并注册到 ConsumerGroup │
│ │ ───────────────────────────────────────► Nameserver │
│ │ ◄─────────────────────────────────────── Broker │
│ │ │
│ │ 2. 定时拉取消息 │
│ │ ───────────────────────────────────────► Broker │
│ │ ◄─────────────────────────────────────── PullResult │
│ │ │
│ │ 3. 处理消息 │
│ │ └─► 执行业务逻辑 │
│ │ │
│ │ 4. 提交消费进度 │
│ │ ───────────────────────────────────────► Broker │
│ │ │
│ │ 5. ACK 确认 │
│ │ └─► 消费成功/失败 │
│ │ │
└─────────────────────────────────────────────────────────────────┘
消费模式
1. 集群消费
┌─────────────────────────────────────────────────────────────────┐
│ 集群消费模式 │
├─────────────────────────────────────────────────────────────────┤
│ │
│ Topic: ORDER_TOPIC │
│ ┌─────────────────────────────────────────────────────────┐ │
│ │ Queue 0 │ Queue 1 │ Queue 2 │ Queue 3 │ Queue 4 │ │ │
│ └─────────────────────────────────────────────────────────┘ │
│ │ │ │ │ │ │
│ │ │ │ │ │ │
│ ┌───▼───┐ ┌───▼───┐ ┌───▼───┐ ┌───▼───┐ ┌───▼───┐ │
│ │Consumer│ │Consumer│ │Consumer│ │Consumer│ │Consumer│ │
│ │ 1 │ │ 2 │ │ 1 │ │ 2 │ │ 3 │ │
│ │ Group1 │ │ Group1 │ │ Group2 │ │ Group2 │ │ Group2 │ │
│ └────────┘ └────────┘ └────────┘ └────────┘ └────────┘ │
│ │
│ 特点: │
│ ✅ 消息负载均衡到多个消费者 │
│ ✅ 每个消息只被一个消费者消费 │
│ ✅ 适用于大部分业务场景 │
│ │
└─────────────────────────────────────────────────────────────────┘
2. 广播消费
┌─────────────────────────────────────────────────────────────────┐
│ 广播消费模式 │
├─────────────────────────────────────────────────────────────────┤
│ │
│ Topic: ORDER_TOPIC │
│ ┌─────────────────────────────────────────────────────────┐ │
│ │ Queue 0 │ Queue 1 │ Queue 2 │ Queue 3 │ Queue 4 │ │ │
│ └─────────────────────────────────────────────────────────┘ │
│ │ │ │ │ │ │
│ │ │ │ │ │ │
│ ┌───▼───┐ ┌───▼───┐ ┌───▼───┐ ┌───▼───┐ ┌───▼───┐ │
│ │Consumer│ │Consumer│ │Consumer│ │Consumer│ │Consumer│ │
│ │ 1 │ │ 2 │ │ 3 │ │ 4 │ │ 5 │ │
│ │ │ │ │ │ │ │ │ │ │ │
│ └────┬───┘ └────┬───┘ └────┬───┘ └────┬───┘ └────┬───┘ │
│ │ │ │ │ │ │
│ ▼ ▼ ▼ ▼ ▼ │
│ ┌─────────────────────────────────────────────────────────┐ │
│ │ 所有消费者都会收到消息 │ │
│ └─────────────────────────────────────────────────────────┘ │
│ │
│ 特点: │
│ ✅ 每个消费者实例都会收到消息 │
│ ✅ 适用于消息通知、配置同步等场景 │
│ │
└─────────────────────────────────────────────────────────────────┘
消费者配置
Spring Boot 配置
# application.yml
rocketmq:
consumer:
# 消费者组
group: my-consumer-group
# Nameserver 地址
namesrvAddr: 127.0.0.1:9876
# 订阅关系表达式
topic: "ORDER_TOPIC"
tags: "order:*"
# 消费线程数
consumeThreadMin: 10
consumeThreadMax: 20
# 消息拉取数量
pullBatchSize: 32
# 消息消费批量大小
consumeMessageBatchMaxSize: 1
# 最大重试次数
maxReconsumeTimes: 3
# 消费超时时间
consumeTimeout: 15
Java 注解方式
import org.apache.rocketmq.spring.annotation.ConsumeMode;
import org.apache.rocketmq.spring.annotation.MessageModel;
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.springframework.stereotype.Component;
@Component
@RocketMQMessageListener(
topic = "ORDER_TOPIC",
consumerGroup = "order-consumer-group",
topic = "ORDER_TOPIC",
selectorExpression = "order:*", // Tag 过滤
consumeMode = ConsumeMode.CONCURRENTLY, // 并发消费
messageModel = MessageModel.CLUSTERING, // 集群消费
maxReconsumeTimes = 3, // 最大重试次数
consumeThreadMin = 10, // 最小线程数
consumeThreadMax = 20 // 最大线程数
)
public class OrderConsumer implements RocketMQListener<Order> {
@Override
public void onMessage(Order order) {
try {
// 处理订单消息
System.out.println("收到订单消息: " + order);
orderService.processOrder(order);
} catch (Exception e) {
// 抛出异常触发重试
throw e;
}
}
}
消息处理
并发消费
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.common.message.MessageExt;
import java.util.List;
public class MyMessageListener implements MessageListenerConcurrently {
@Override
public ConsumeConcurrentlyStatus consumeMessage(
List<MessageExt> msgs,
ConsumeConcurrentlyContext context) {
for (MessageExt msg : msgs) {
try {
String topic = msg.getTopic();
String tags = msg.getTags();
String keys = msg.getKeys();
String body = new String(msg.getBody(), "UTF-8");
System.out.println("收到消息: " + body);
// 业务处理
processMessage(body);
} catch (Exception e) {
// 记录错误日志
log.error("处理消息失败", e);
// 返回重试
return ConsumeConcurrentlyStatus.RECONSUME_LATER;
}
}
// 消费成功
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
}
顺序消费
import org.apache.rocketmq.client.consumer.listener.ConsumeOrderlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeOrderlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerOrderly;
import org.apache.rocketmq.common.message.MessageExt;
import java.util.List;
public class MyOrderlyMessageListener implements MessageListenerOrderly {
@Override
public ConsumeOrderlyStatus consumeMessage(
List<MessageExt> msgs,
ConsumeOrderlyContext context) {
context.setAutoCommit(true); // 自动提交
for (MessageExt msg : msgs) {
try {
String body = new String(msg.getBody(), "UTF-8");
String queueId = String.valueOf(msg.getQueueId());
System.out.println("队列 " + queueId + " 收到消息: " + body);
// 业务处理
processMessage(body);
} catch (Exception e) {
// 发生异常时暂停队列消费
context.setSuspendCurrentQueueTimeMillis(1000);
return ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT;
}
}
return ConsumeOrderlyStatus.SUCCESS;
}
}
消费进度管理
消费进度存储
/**
* 消费进度管理
*/
public class ConsumeProgressManager {
// 获取消费进度
public long getConsumeOffset(String group, String topic, int queueId) {
// 从 Broker 获取消费进度
// 存储在 Broker 的 consumerOffset.json 中
String offsetKey = group + "@" + topic + "@" + queueId;
return brokerView.getConsumerOffset(offsetKey);
}
// 更新消费进度
public void updateConsumeOffset(String group, String topic,
int queueId, long offset) {
// 更新到 Broker
brokerView.updateConsumerOffset(group, topic, queueId, offset);
}
// 重置消费进度
public void resetConsumeOffset(String group, String topic, long timestamp) {
// 根据时间戳重置到指定位置
}
}
手动提交进度
/**
* 手动提交消费进度
*/
public class ManualCommitListener implements MessageListenerConcurrently {
@Override
public ConsumeConcurrentlyStatus consumeMessage(
List<MessageExt> msgs,
ConsumeConcurrentlyContext context) {
ConsumeRequest request = new ConsumeRequest();
for (MessageExt msg : msgs) {
try {
// 处理消息
processMessage(msg);
request.addSuccessMsg(msg);
} catch (Exception e) {
request.addFailedMsg(msg);
}
}
// 手动处理成功和失败的消息
if (!request.getFailedMsgs().isEmpty()) {
// 记录失败消息
handleFailedMessages(request.getFailedMsgs());
return ConsumeConcurrentlyStatus.RECONSUME_LATER;
}
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
}
消息过滤
Tag 过滤
/**
* Tag 过滤 - 消费者端过滤
*/
@RocketMQMessageListener(
topic = "ORDER_TOPIC",
selectorExpression = "order:create || order:pay" // 多个 Tag 用 || 分隔
)
public class OrderConsumer implements RocketMQListener<Order> {
// 只消费 order:create 和 order:pay 消息
}
SQL92 过滤
/**
* SQL 过滤 - 服务端过滤
*/
// Broker 开启 SQL 过滤
// broker.conf 中添加: enablePropertyFilter = true
@RocketMQMessageListener(
topic = "ORDER_TOPIC",
selectorType = SelectorType.SQL92, // 使用 SQL 过滤
selectorExpression = "amount > 100 AND city = '北京'"
)
public class FilteredConsumer implements RocketMQListener<Order> {
// 只消费 amount > 100 且 city = 北京的消息
}
消息重试
重试机制
┌─────────────────────────────────────────────────────────────────┐
│ 消息重试机制 │
├─────────────────────────────────────────────────────────────────┤
│ │
│ 第一次消费失败 ──► 重试 ──► 第二次消费 │
│ │ │ │
│ │ ▼ │
│ │ 第二次失败 ──► 重试 │
│ │ │ │
│ │ ▼ │
│ │ 第三次失败 │
│ │ │ │
│ ▼ ▼ │
│ ┌─────────────┐ ┌─────────────┐
│ │ 重试队列 │ │ 死信队列 │
│ │ %RETRY% │ │ %DLQ% │
│ └─────────────┘ └─────────────┘
│ │
└─────────────────────────────────────────────────────────────────┘
重试配置
/**
* 消息重试配置
*/
@RocketMQMessageListener(
topic = "ORDER_TOPIC",
maxReconsumeTimes = 3 // 最大重试次数
)
public class OrderConsumer implements RocketMQListener<Order> {
// 当重试 3 次后仍然失败,消息会进入死信队列
}
死信队列处理
/**
* 死信队列处理
*/
@RocketMQMessageListener(
topic = "ORDER_TOPIC%DLQ", // 死信队列 Topic
consumerGroup = "dlq-consumer-group"
)
public class DLQConsumer implements RocketMQListener<MessageExt> {
@Override
public void onMessage(MessageExt msg) {
try {
// 处理死信消息
String originalTopic = msg.getKeys();
String body = new String(msg.getBody());
log.error("处理死信消息: topic={}, body={}", originalTopic, body);
// 人工处理或记录
handleDeadLetter(originalTopic, body);
} catch (Exception e) {
log.error("处理死信消息失败", e);
}
}
}
最佳实践
1. 消息处理幂等
/**
* 消息处理幂等性
*/
public class IdempotentConsumer {
@Autowired
private RedisTemplate redisTemplate;
public void consumeMessage(MessageExt msg) {
String msgId = msg.getMsgId();
String key = "msg:consumed:" + msgId;
// 检查是否已处理
if (Boolean.TRUE.equals(redisTemplate.hasKey(key))) {
System.out.println("消息已处理,跳过: " + msgId);
return;
}
try {
// 业务处理
processMessage(msg);
// 标记已处理
redisTemplate.opsForValue().set(key, "1", 7, TimeUnit.DAYS);
} catch (Exception e) {
log.error("处理消息失败", e);
throw e;
}
}
}
2. 消费限流
/**
* 消费限流
*/
public class RateLimitConsumer {
private static final int MAX_QPS = 100;
private static final RateLimiter rateLimiter = RateLimiter.create(MAX_QPS);
@Override
public ConsumeConcurrentlyStatus consumeMessage(
List<MessageExt> msgs,
ConsumeConcurrentlyContext context) {
for (MessageExt msg : msgs) {
// 获取令牌
rateLimiter.acquire();
try {
processMessage(msg);
} catch (Exception e) {
return ConsumeConcurrentlyStatus.RECONSUME_LATER;
}
}
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
}
3. 慢消费处理
/**
* 处理慢消费
*/
public class SlowConsumerOptimize {
// 1. 增加消费线程
// 2. 使用消息并行处理
// 3. 批量消费优化
@RocketMQMessageListener(
topic = "ORDER_TOPIC",
consumeThreadMin = 20,
consumeThreadMax = 50,
consumeMessageBatchMaxSize = 32 // 批量消费
)
public void batchConsume(List<Order> orders) {
// 批量处理订单
orderService.batchProcessOrders(orders);
}
}
下一步
接下来让我们学习 RocketMQ 消息类型与特性。
👉 消息类型与特性