第七章:顺序消息
深入了解 RocketMQ 顺序消息的实现原理和使用方法。
最后更新: 2024-01-15
页面目录
RocketMQ 顺序消息
顺序消息是指消息的消费顺序与发送顺序一致。本章详细介绍顺序消息的实现原理和使用方法。
顺序消息概述
什么是顺序消息
┌─────────────────────────────────────────────────────────────────┐
│ 顺序消息 vs 普通消息 │
├─────────────────────────────────────────────────────────────────┤
│ │
│ 普通消息: │
│ Producer ──► [msg1, msg2, msg3] ──► Consumer (乱序) │
│ │ │
│ ▼ │
│ msg3, msg1, msg2 (随机顺序消费) │
│ │
│ ──────────────────────────────────────────────────────────── │
│ │
│ 顺序消息: │
│ Producer ──► [msg1, msg2, msg3] ──► Consumer (有序) │
│ │ │
│ ▼ │
│ msg1, msg2, msg3 (严格按序消费) │
│ │
└─────────────────────────────────────────────────────────────────┘
顺序消息类型
| 类型 | 说明 | 特点 |
|---|---|---|
| 全局顺序 | 全局严格有序 | 性能低,吞吐量受限 |
| 分区有序 | 同一分区有序 | 性能较高,常用方案 |
实现原理
分区有序原理
┌─────────────────────────────────────────────────────────────────┐
│ 分区有序原理 │
├─────────────────────────────────────────────────────────────────┤
│ │
│ Producer │
│ │ │
│ │ 根据 Key 计算 Sharding Key │
│ │ ▼ │
│ │ hash(Key) % QueueNum │
│ │ ▼ │
│ ┌──▼──┐ │
│ │ Q0 │ ──► Message 1 ──► Message 3 ──► Message 5 │
│ └──┬──┘ (同一分区) │
│ │ │
│ │ hash(Key1) % 4 = 0 │
│ ▼ │
│ ┌──▼──┐ │
│ │ Q1 │ ──► Message 2 ──► Message 4 ──► Message 6 │
│ └──┬──┘ (同一分区) │
│ │ │
│ │ hash(Key2) % 4 = 1 │
│ ▼ │
│ ┌──▼──┐ │
│ │ Q2 │ ──► Message 7 │
│ └──┬──┘ (同一分区) │
│ │ │
│ │ hash(Key3) % 4 = 2 │
│ ▼ │
│ ┌──▼──┐ │
│ │ Q3 │ ──► Message 8 │
│ └─────┘ (同一分区) │
│ │
│ 结论: 同一 Sharding Key 的消息会发送到同一个队列 │
│ │
└─────────────────────────────────────────────────────────────────┘
生产者实现
Java 生产者
import org.apache.rocketmq.client.producer.MessageQueueSelector;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.common.message.MessageQueue;
import java.util.List;
/**
* 顺序消息生产者
*/
public class OrderedProducer {
private DefaultMQProducer producer;
public void sendOrderedMessages() throws Exception {
// 订单创建
Message msg1 = new Message(
"ORDER_TOPIC",
"order:create",
"order_1001", // 订单ID作为Key
"订单1创建".getBytes()
);
// 订单支付
Message msg2 = new Message(
"ORDER_TOPIC",
"order:pay",
"order_1001", // 相同Key确保同一队列
"订单1支付".getBytes()
);
// 订单发货
Message msg3 = new Message(
"ORDER_TOPIC",
"order:ship",
"order_1001",
"订单1发货".getBytes()
);
// 订单完成
Message msg4 = new Message(
"ORDER_TOPIC",
"order:complete",
"order_1001",
"订单1完成".getBytes()
);
// 使用 MessageQueueSelector 指定队列
producer.send(msg1, new MessageQueueSelector() {
@Override
public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
// 使用订单ID作为选择依据
String orderId = (String) msg.getKeys();
int index = Math.abs(orderId.hashCode()) % mqs.size();
return mqs.get(index);
}
}, msg1.getKeys());
producer.send(msg2, new MessageQueueSelector() {
@Override
public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
String orderId = (String) msg.getKeys();
int index = Math.abs(orderId.hashCode()) % mqs.size();
return mqs.get(index);
}
}, msg2.getKeys());
// 发送 msg3, msg4 ...
}
}
简化实现
/**
* 使用 RocketMQTemplate 发送顺序消息
*/
@Service
public class OrderMessageService {
@Autowired
private RocketMQTemplate rocketMQTemplate;
public void sendOrderProcess(String orderId) {
// 按顺序发送消息
sendCreateOrder(orderId);
sendPayOrder(orderId);
sendShipOrder(orderId);
sendCompleteOrder(orderId);
}
private void sendCreateOrder(String orderId) {
Message message = new Message(
"ORDER_TOPIC",
"order:create",
orderId,
("创建订单: " + orderId).getBytes()
);
// 使用同步发送确保顺序
rocketMQTemplate.send("ORDER_TOPIC:" + "order:create", message);
}
private void sendPayOrder(String orderId) {
Message message = new Message(
"ORDER_TOPIC",
"order:pay",
orderId,
("支付订单: " + orderId).getBytes()
);
rocketMQTemplate.send("ORDER_TOPIC:" + "order:pay", message);
}
// ...
}
消费者实现
顺序消费监听器
import org.apache.rocketmq.client.consumer.listener.ConsumeOrderlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeOrderlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerOrderly;
import org.apache.rocketmq.common.message.MessageExt;
import java.util.List;
/**
* 顺序消息消费者
*/
public class OrderedMessageListener implements MessageListenerOrderly {
@Override
public ConsumeOrderlyStatus consumeMessage(
List<MessageExt> msgs,
ConsumeOrderlyContext context) {
// 自动提交模式
context.setAutoCommit(true);
for (MessageExt msg : msgs) {
try {
String topic = msg.getTopic();
String tags = msg.getTags();
String body = new String(msg.getBody(), "UTF-8");
String orderId = msg.getKeys();
System.out.println("消费消息: " + body + ", 订单ID: " + orderId);
// 按业务逻辑处理
processMessage(topic, tags, body);
} catch (Exception e) {
// 发生异常时暂停消费,稍后重试
context.setSuspendCurrentQueueTimeMillis(1000);
return ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT;
}
}
return ConsumeOrderlyStatus.SUCCESS;
}
private void processMessage(String topic, String tags, String body) {
switch (tags) {
case "order:create":
// 处理订单创建
handleCreateOrder(body);
break;
case "order:pay":
// 处理订单支付
handlePayOrder(body);
break;
case "order:ship":
// 处理订单发货
handleShipOrder(body);
break;
case "order:complete":
// 处理订单完成
handleCompleteOrder(body);
break;
}
}
}
Spring Boot 注解方式
import org.apache.rocketmq.spring.annotation.ConsumeMode;
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
@Service
@RocketMQMessageListener(
topic = "ORDER_TOPIC",
consumerGroup = "ordered-consumer-group",
selectorExpression = "order:*", // 监听所有订单消息
consumeMode = ConsumeMode.ORDERLY // 顺序消费模式
)
public class OrderlyConsumer implements RocketMQListener<String> {
@Override
public void onMessage(String message) {
try {
// 解析消息
System.out.println("收到消息: " + message);
// 业务处理
processMessage(message);
} catch (Exception e) {
// 抛出异常触发重试
throw e;
}
}
private void processMessage(String message) {
// 处理消息
}
}
最佳实践
1. 合理设计 Sharding Key
/**
* Sharding Key 设计原则
*/
public class ShardingKeyDesign {
// ✅ 推荐: 使用具有业务意义的字段作为 Sharding Key
// - 订单ID: 保证同一订单的消息有序
// - 用户ID: 保证同一用户的消息有序
// - 设备ID: 保证同一设备的消息有序
public void sendOrderMessage(String orderId) {
// 正确: 使用订单ID作为Sharding Key
Message message = new Message(
"ORDER_TOPIC",
"order:create",
orderId, // 订单ID
content.getBytes()
);
}
// ❌ 不推荐: 使用随机Key或无业务意义字段
public void sendBadMessage(String orderId) {
Message message = new Message(
"ORDER_TOPIC",
"order:create",
UUID.randomUUID().toString(), // 错误: 随机Key无法保证顺序
content.getBytes()
);
}
}
2. 异常处理
/**
* 顺序消息异常处理
*/
public class OrderlyExceptionHandler {
public ConsumeOrderlyStatus handleException(
MessageExt msg,
ConsumeOrderlyContext context,
Exception e) {
// 记录错误日志
log.error("处理顺序消息失败: {}", msg.getKeys(), e);
// 获取重试次数
int reconsumeTimes = msg.getReconsumeTimes();
if (reconsumeTimes >= 3) {
// 超过最大重试次数,记录死信
handleDeadLetter(msg);
return ConsumeOrderlyStatus.SUCCESS;
}
// 暂停当前队列消费,1秒后继续
context.setSuspendCurrentQueueTimeMillis(1000);
return ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT;
}
}
3. 性能优化
/**
* 顺序消息性能优化
*/
public class OrderlyPerformanceOptimize {
// 1. 增加队列数量
// 多个独立的顺序流可以分配到不同的队列
// 2. 合理设置消费线程
// 队列数 = 消费线程数 * 并发倍数
// 3. 避免单队列瓶颈
// 如果只有一个消费者监听一个队列,可以考虑拆分
@RocketMQMessageListener(
topic = "ORDER_TOPIC",
consumerGroup = "ordered-consumer-group",
consumeThreadMin = 20,
consumeThreadMax = 50
)
public class OptimizedConsumer implements RocketMQListener<Message> {
// 配置合理的消费线程数
}
}
应用场景
订单处理
订单流程:
创建订单 ──► 支付订单 ──► 商家发货 ──► 确认收货 ──► 订单完成
每个步骤必须按顺序执行,不能跳过或乱序。
数据同步
数据同步流程:
新增数据 ──► 更新索引 ──► 发送通知 ──► 更新缓存
每个实体的变更必须按顺序应用,避免数据不一致。
金融交易
交易流程:
下单 ──► 冻结资金 ──► 扣款 ──► 商家结算 ──► 完成交易
资金操作必须严格有序,避免资损。
常见问题
1. 消息乱序
// 问题: 消息消费乱序
// 原因:
// 1. 消费者使用并发消费模式
// 2. Sharding Key 设计不合理
// 解决:
@RocketMQMessageListener(
consumeMode = ConsumeMode.ORDERLY // 使用顺序消费模式
)
public class FixedConsumer implements RocketMQListener<String> {
// ...
}
2. 消费阻塞
// 问题: 单条消息处理慢导致队列阻塞
// 原因:
// 1. 单条消息处理时间过长
// 2. 网络延迟、数据库慢查询
// 解决:
// 1. 优化单条消息处理速度
// 2. 设置合理的暂停时间
context.setSuspendCurrentQueueTimeMillis(3000); // 暂停3秒后重试
下一步
接下来让我们学习事务消息。
👉 事务消息