第七章:顺序消息

深入了解 RocketMQ 顺序消息的实现原理和使用方法。

最后更新: 2024-01-15
页面目录

RocketMQ 顺序消息

顺序消息是指消息的消费顺序与发送顺序一致。本章详细介绍顺序消息的实现原理和使用方法。

顺序消息概述

什么是顺序消息

┌─────────────────────────────────────────────────────────────────┐
│                      顺序消息 vs 普通消息                         │
├─────────────────────────────────────────────────────────────────┤
│                                                                  │
│   普通消息:                                                      │
│   Producer ──► [msg1, msg2, msg3] ──► Consumer (乱序)          │
│                      │                                           │
│                      ▼                                           │
│                  msg3, msg1, msg2 (随机顺序消费)                │
│                                                                  │
│   ────────────────────────────────────────────────────────────  │
│                                                                  │
│   顺序消息:                                                      │
│   Producer ──► [msg1, msg2, msg3] ──► Consumer (有序)          │
│                      │                                           │
│                      ▼                                           │
│                  msg1, msg2, msg3 (严格按序消费)                 │
│                                                                  │
└─────────────────────────────────────────────────────────────────┘

顺序消息类型

类型 说明 特点
全局顺序 全局严格有序 性能低,吞吐量受限
分区有序 同一分区有序 性能较高,常用方案

实现原理

分区有序原理

┌─────────────────────────────────────────────────────────────────┐
│                      分区有序原理                                 │
├─────────────────────────────────────────────────────────────────┤
│                                                                  │
│   Producer                                                      │
│      │                                                         │
│      │ 根据 Key 计算 Sharding Key                               │
│      │ ▼                                                         │
│      │ hash(Key) % QueueNum                                     │
│      │ ▼                                                         │
│   ┌──▼──┐                                                      │
│   │ Q0  │  ──► Message 1 ──► Message 3 ──► Message 5           │
│   └──┬──┘     (同一分区)                                          │
│      │                                                             │
│      │ hash(Key1) % 4 = 0                                       │
│      ▼                                                             │
│   ┌──▼──┐                                                      │
│   │ Q1  │  ──► Message 2 ──► Message 4 ──► Message 6           │
│   └──┬──┘     (同一分区)                                          │
│      │                                                             │
│      │ hash(Key2) % 4 = 1                                        │
│      ▼                                                             │
│   ┌──▼──┐                                                      │
│   │ Q2  │  ──► Message 7                                        │
│   └──┬──┘     (同一分区)                                          │
│      │                                                             │
│      │ hash(Key3) % 4 = 2                                        │
│      ▼                                                             │
│   ┌──▼──┐                                                      │
│   │ Q3  │  ──► Message 8                                        │
│   └─────┘     (同一分区)                                          │
│                                                                  │
│   结论: 同一 Sharding Key 的消息会发送到同一个队列                 │
│                                                                  │
└─────────────────────────────────────────────────────────────────┘

生产者实现

Java 生产者

import org.apache.rocketmq.client.producer.MessageQueueSelector;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.common.message.MessageQueue;
import java.util.List;

/**
 * 顺序消息生产者
 */
public class OrderedProducer {

    private DefaultMQProducer producer;

    public void sendOrderedMessages() throws Exception {
        
        // 订单创建
        Message msg1 = new Message(
            "ORDER_TOPIC",
            "order:create",
            "order_1001",      // 订单ID作为Key
            "订单1创建".getBytes()
        );

        // 订单支付
        Message msg2 = new Message(
            "ORDER_TOPIC",
            "order:pay",
            "order_1001",      // 相同Key确保同一队列
            "订单1支付".getBytes()
        );

        // 订单发货
        Message msg3 = new Message(
            "ORDER_TOPIC",
            "order:ship",
            "order_1001",
            "订单1发货".getBytes()
        );

        // 订单完成
        Message msg4 = new Message(
            "ORDER_TOPIC",
            "order:complete",
            "order_1001",
            "订单1完成".getBytes()
        );

        // 使用 MessageQueueSelector 指定队列
        producer.send(msg1, new MessageQueueSelector() {
            @Override
            public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
                // 使用订单ID作为选择依据
                String orderId = (String) msg.getKeys();
                int index = Math.abs(orderId.hashCode()) % mqs.size();
                return mqs.get(index);
            }
        }, msg1.getKeys());

        producer.send(msg2, new MessageQueueSelector() {
            @Override
            public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
                String orderId = (String) msg.getKeys();
                int index = Math.abs(orderId.hashCode()) % mqs.size();
                return mqs.get(index);
            }
        }, msg2.getKeys());

        // 发送 msg3, msg4 ...
    }
}

简化实现

/**
 * 使用 RocketMQTemplate 发送顺序消息
 */
@Service
public class OrderMessageService {

    @Autowired
    private RocketMQTemplate rocketMQTemplate;

    public void sendOrderProcess(String orderId) {
        // 按顺序发送消息
        sendCreateOrder(orderId);
        sendPayOrder(orderId);
        sendShipOrder(orderId);
        sendCompleteOrder(orderId);
    }

    private void sendCreateOrder(String orderId) {
        Message message = new Message(
            "ORDER_TOPIC",
            "order:create",
            orderId,
            ("创建订单: " + orderId).getBytes()
        );
        
        // 使用同步发送确保顺序
        rocketMQTemplate.send("ORDER_TOPIC:" + "order:create", message);
    }

    private void sendPayOrder(String orderId) {
        Message message = new Message(
            "ORDER_TOPIC",
            "order:pay",
            orderId,
            ("支付订单: " + orderId).getBytes()
        );
        rocketMQTemplate.send("ORDER_TOPIC:" + "order:pay", message);
    }

    // ...
}

消费者实现

顺序消费监听器

import org.apache.rocketmq.client.consumer.listener.ConsumeOrderlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeOrderlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerOrderly;
import org.apache.rocketmq.common.message.MessageExt;
import java.util.List;

/**
 * 顺序消息消费者
 */
public class OrderedMessageListener implements MessageListenerOrderly {

    @Override
    public ConsumeOrderlyStatus consumeMessage(
            List<MessageExt> msgs,
            ConsumeOrderlyContext context) {
        
        // 自动提交模式
        context.setAutoCommit(true);
        
        for (MessageExt msg : msgs) {
            try {
                String topic = msg.getTopic();
                String tags = msg.getTags();
                String body = new String(msg.getBody(), "UTF-8");
                String orderId = msg.getKeys();
                
                System.out.println("消费消息: " + body + ", 订单ID: " + orderId);
                
                // 按业务逻辑处理
                processMessage(topic, tags, body);
                
            } catch (Exception e) {
                // 发生异常时暂停消费,稍后重试
                context.setSuspendCurrentQueueTimeMillis(1000);
                return ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT;
            }
        }
        
        return ConsumeOrderlyStatus.SUCCESS;
    }

    private void processMessage(String topic, String tags, String body) {
        switch (tags) {
            case "order:create":
                // 处理订单创建
                handleCreateOrder(body);
                break;
            case "order:pay":
                // 处理订单支付
                handlePayOrder(body);
                break;
            case "order:ship":
                // 处理订单发货
                handleShipOrder(body);
                break;
            case "order:complete":
                // 处理订单完成
                handleCompleteOrder(body);
                break;
        }
    }
}

Spring Boot 注解方式

import org.apache.rocketmq.spring.annotation.ConsumeMode;
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;

@Service
@RocketMQMessageListener(
    topic = "ORDER_TOPIC",
    consumerGroup = "ordered-consumer-group",
    selectorExpression = "order:*",  // 监听所有订单消息
    consumeMode = ConsumeMode.ORDERLY  // 顺序消费模式
)
public class OrderlyConsumer implements RocketMQListener<String> {

    @Override
    public void onMessage(String message) {
        try {
            // 解析消息
            System.out.println("收到消息: " + message);
            
            // 业务处理
            processMessage(message);
            
        } catch (Exception e) {
            // 抛出异常触发重试
            throw e;
        }
    }

    private void processMessage(String message) {
        // 处理消息
    }
}

最佳实践

1. 合理设计 Sharding Key

/**
 * Sharding Key 设计原则
 */
public class ShardingKeyDesign {
    
    // ✅ 推荐: 使用具有业务意义的字段作为 Sharding Key
    // - 订单ID: 保证同一订单的消息有序
    // - 用户ID: 保证同一用户的消息有序
    // - 设备ID: 保证同一设备的消息有序
    
    public void sendOrderMessage(String orderId) {
        // 正确: 使用订单ID作为Sharding Key
        Message message = new Message(
            "ORDER_TOPIC",
            "order:create",
            orderId,  // 订单ID
            content.getBytes()
        );
    }
    
    // ❌ 不推荐: 使用随机Key或无业务意义字段
    public void sendBadMessage(String orderId) {
        Message message = new Message(
            "ORDER_TOPIC",
            "order:create",
            UUID.randomUUID().toString(),  // 错误: 随机Key无法保证顺序
            content.getBytes()
        );
    }
}

2. 异常处理

/**
 * 顺序消息异常处理
 */
public class OrderlyExceptionHandler {

    public ConsumeOrderlyStatus handleException(
            MessageExt msg,
            ConsumeOrderlyContext context,
            Exception e) {
        
        // 记录错误日志
        log.error("处理顺序消息失败: {}", msg.getKeys(), e);
        
        // 获取重试次数
        int reconsumeTimes = msg.getReconsumeTimes();
        
        if (reconsumeTimes >= 3) {
            // 超过最大重试次数,记录死信
            handleDeadLetter(msg);
            return ConsumeOrderlyStatus.SUCCESS;
        }
        
        // 暂停当前队列消费,1秒后继续
        context.setSuspendCurrentQueueTimeMillis(1000);
        return ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT;
    }
}

3. 性能优化

/**
 * 顺序消息性能优化
 */
public class OrderlyPerformanceOptimize {
    
    // 1. 增加队列数量
    // 多个独立的顺序流可以分配到不同的队列
    
    // 2. 合理设置消费线程
    // 队列数 = 消费线程数 * 并发倍数
    
    // 3. 避免单队列瓶颈
    // 如果只有一个消费者监听一个队列,可以考虑拆分
    
    @RocketMQMessageListener(
        topic = "ORDER_TOPIC",
        consumerGroup = "ordered-consumer-group",
        consumeThreadMin = 20,
        consumeThreadMax = 50
    )
    public class OptimizedConsumer implements RocketMQListener<Message> {
        // 配置合理的消费线程数
    }
}

应用场景

订单处理

订单流程:
创建订单 ──► 支付订单 ──► 商家发货 ──► 确认收货 ──► 订单完成

每个步骤必须按顺序执行,不能跳过或乱序。

数据同步

数据同步流程:
新增数据 ──► 更新索引 ──► 发送通知 ──► 更新缓存

每个实体的变更必须按顺序应用,避免数据不一致。

金融交易

交易流程:
下单 ──► 冻结资金 ──► 扣款 ──► 商家结算 ──► 完成交易

资金操作必须严格有序,避免资损。

常见问题

1. 消息乱序

// 问题: 消息消费乱序
// 原因: 
// 1. 消费者使用并发消费模式
// 2. Sharding Key 设计不合理

// 解决:
@RocketMQMessageListener(
    consumeMode = ConsumeMode.ORDERLY  // 使用顺序消费模式
)
public class FixedConsumer implements RocketMQListener<String> {
    // ...
}

2. 消费阻塞

// 问题: 单条消息处理慢导致队列阻塞
// 原因: 
// 1. 单条消息处理时间过长
// 2. 网络延迟、数据库慢查询

// 解决:
// 1. 优化单条消息处理速度
// 2. 设置合理的暂停时间
context.setSuspendCurrentQueueTimeMillis(3000);  // 暂停3秒后重试

下一步

接下来让我们学习事务消息。

👉 事务消息