第六章:消息类型与特性

深入了解 RocketMQ 的消息类型、消息属性和消息特性。

最后更新: 2024-01-15
页面目录

RocketMQ 消息类型与特性

本章详细介绍 RocketMQ 的消息结构和各种特性。

消息结构

消息组成

┌─────────────────────────────────────────────────────────────────┐
│                       Message 结构                                │
├─────────────────────────────────────────────────────────────────┤
│                                                                  │
│   Message {                                                     │
│       topic: String,           // 主题                           │
│       body: byte[],            // 消息体                         │
│       tags: String,            // 标签                           │
│       keys: String,            // 业务键                         │
│       flag: int,               // 标识位                         │
│       properties: Properties    // 扩展属性                       │
│   }                                                             │
│                                                                  │
└─────────────────────────────────────────────────────────────────┘

消息属性

属性 必填 说明
topic 消息所属主题
body 消息体内容
tags 消息标签,用于过滤
keys 消息业务键
flag 消息标识位
properties 扩展属性

消息属性详解

Topic(主题)

/**
 * Topic 配置
 */
public class TopicConfig {
    
    // Topic 名称规则
    // 1. 不能为空
    // 2. 不能包含非法字符: |,;,\n,\r
    // 3. 建议格式: {业务名}_{模块}_{环境}
    
    // 创建 Topic
    // admin.createTopic("127.0.0.1:10911", 
    //     new TopicTopic("ORDER_TOPIC", 8, 8, 0));
    
    // Topic 配置参数
    // readQueueNums: 读队列数量
    // writeQueueNums: 写队列数量
    // perm: 读写权限
}

Tag(标签)

/**
 * Tag 使用示例
 */
public class TagUsage {
    
    // 发送消息时设置 Tag
    public void sendWithTag() {
        Message message = new Message(
            "ORDER_TOPIC",           // Topic
            "order:create",          // Tag
            "order_12345",           // Key
            "订单内容".getBytes()     // Body
        );
    }
    
    // 常用 Tag 命名规范
    // 格式: {操作}:{对象}
    // 示例:
    // order:create     - 订单创建
    // order:pay        - 订单支付
    // order:ship       - 订单发货
    // order:complete   - 订单完成
    // order:cancel     - 订单取消
    // order:refund     - 订单退款
}

Keys(业务键)

/**
 * Keys 使用示例
 */
public class KeysUsage {
    
    public void sendWithKeys() {
        Message message = new Message(
            "ORDER_TOPIC",
            "order:create",
            order.getOrderId(),      // 使用订单ID作为Key
            order.toString().getBytes()
        );
        
        // Keys 的作用:
        // 1. 用于消息查询
        // 2. 便于问题排查
        // 3. 建立索引加快查询
    }
    
    // 查询消息
    public void queryByKey() {
        // 根据 Key 查询消息
        DefaultMQPullConsumer consumer = new DefaultMQPullConsumer("my-consumer-group");
        
        // 使用 admin 查询
        MessageExt msg = admin.viewMessage("ORDER_TOPIC", offset);
        
        // 根据 Key 模糊查询
        // consumer.searchMessages("ORDER_TOPIC", "order_12345*");
    }
}

消息特性

消息持久化

┌─────────────────────────────────────────────────────────────────┐
│                      消息持久化流程                               │
├─────────────────────────────────────────────────────────────────┤
│                                                                  │
│   Producer ──► Broker ──► CommitLog ──► 磁盘                     │
│                              │                                  │
│                              ▼                                  │
│                         ConsumeQueue                            │
│                              │                                  │
│                              ▼                                  │
│                         Consumer                                │
│                                                                  │
│   持久化机制:                                                    │
│   ✅ 同步刷盘: 消息写入磁盘后返回                               │
│   ✅ 异步刷盘: 消息写入内存,定时刷盘                            │
│                                                                  │
└─────────────────────────────────────────────────────────────────┘

消息重试

/**
 * 消息重试机制
 */
public class MessageRetry {
    
    // 重试间隔
    // RocketMQ 默认重试间隔
    // 1st retry: 1s
    // 2nd retry: 5s
    // 3rd retry: 10s
    // 4th retry: 30s
    // 5th retry: 1m
    // ... 以此类推,最大到2h
    
    // 自定义重试间隔
    // 设置消息属性
    // message.putUserProperty("DELAY", "3"); // 自定义延迟
}

消息过滤

/**
 * 消息过滤机制
 */

// Broker 端过滤
// 消费者订阅时指定过滤表达式

// Tag 过滤
consumer.subscribe("ORDER_TOPIC", "order:create || order:pay");

// SQL 过滤
consumer.subscribe("ORDER_TOPIC", 
    MessageSelector.bySql("amount > 100 and city = '北京'"));

// 过滤原理
// 1. Tag 过滤: Broker 根据 tagCode 过滤
// 2. SQL 过滤: Broker 执行 SQL 表达式过滤

消息扩展属性

内置属性

属性名 说明
TAGS 消息标签
KEYS 业务键
WAIT 是否等待存储完成
DELAY 延迟级别
RETRY_NUM 重试次数

自定义属性

/**
 * 自定义消息属性
 */
public class CustomProperties {
    
    public void sendWithCustomProperties() {
        Message message = new Message(
            "ORDER_TOPIC",
            "order:create",
            "order_12345",
            "订单内容".getBytes()
        );
        
        // 添加自定义属性
        message.putUserProperty("orderId", "12345");
        message.putUserProperty("userId", "1001");
        message.putUserProperty("amount", "99.9");
        message.putUserProperty("city", "北京");
        message.putUserProperty("channel", "APP");
        message.putUserProperty("version", "v1.0");
        
        // 发送消息
        producer.send(message);
    }
    
    public void queryWithCustomProperties() {
        // 根据自定义属性查询
        // admin 查询消息后获取 properties
        MessageExt msg = admin.viewMessage("ORDER_TOPIC", offset);
        
        String orderId = msg.getUserProperty("orderId");
        String city = msg.getUserProperty("city");
    }
}

消息查询

按 Message ID 查询

/**
 * 按 Message ID 查询
 */
public MessageExt queryByMessageId(String topic, String msgId) {
    // Message ID 格式: rocketmq+MAC 地址 + 偏移量
    // 例如: 0A123456789ABCDEF00001CD20000000
    
    try {
        // 使用 DefaultMQAdminExt 查询
        MessageExt msg = admin.viewMessage(msgId);
        return msg;
    } catch (Exception e) {
        log.error("查询消息失败", e);
        return null;
    }
}

按 Topic 和 Key 查询

/**
 * 按 Topic 和 Key 查询
 */
public List<MessageExt> queryByTopicAndKey(String topic, String key) {
    try {
        // 查询消息
        QueryResult result = admin.queryMessage(topic, key, 100, 0, System.currentTimeMillis());
        
        return result.getMessageList();
    } catch (Exception e) {
        log.error("查询消息失败", e);
        return Collections.emptyList();
    }
}

按时间查询

/**
 * 按时间范围查询
 */
public List<MessageExt> queryByTimeRange(String topic, long startTime, long endTime) {
    try {
        // 根据时间戳查询
        QueryResult result = admin.queryMessage(
            topic,
            "*",              // 不指定 key
            100,              // 最大消息数
            startTime,
            endTime
        );
        
        return result.getMessageList();
    } catch (Exception e) {
        log.error("查询消息失败", e);
        return Collections.emptyList();
    }
}

消息过滤

Tag 过滤

/**
 * Tag 过滤
 */
public class TagFilter {
    
    // 单个 Tag
    consumer.subscribe("ORDER_TOPIC", "order:create");
    
    // 多个 Tag (OR)
    consumer.subscribe("ORDER_TOPIC", "order:create || order:pay");
    
    // 所有 Tag (*)
    consumer.subscribe("ORDER_TOPIC", "*");
    
    // 不推荐 (!=)
    consumer.subscribe("ORDER_TOPIC", "order:create || order:pay || order:ship");
}

SQL92 过滤

/**
 * SQL92 过滤表达式
 */
public class SQLFilter {
    
    // 比较运算符
    // =, !=, >, <, >=, <=
    
    // 逻辑运算符
    // AND, OR, NOT
    
    // IN
    // city IN ('北京', '上海', '广州')
    
    // BETWEEN
    // amount BETWEEN 100 AND 1000
    
    // IS NULL / IS NOT NULL
    // city IS NOT NULL
    
    // LIKE
    // name LIKE '张%'
    
    // 数字类型
    consumer.subscribe("ORDER_TOPIC", 
        MessageSelector.bySql("amount > 100"));
    
    // 字符串类型
    consumer.subscribe("ORDER_TOPIC", 
        MessageSelector.bySql("city = '北京'"));
    
    // 组合条件
    consumer.subscribe("ORDER_TOPIC", 
        MessageSelector.bySql("amount > 100 AND city IN ('北京', '上海')"));
}

消息优先级

/**
 * 消息优先级实现
 */
// RocketMQ 不支持严格优先级,但可以通过以下方式实现

public class PriorityImplementation {
    
    // 方案一:多个队列模拟优先级
    // 高优先级 -> Queue 0
    // 中优先级 -> Queue 1
    // 低优先级 -> Queue 2
    
    public void sendWithPriority(Order order) {
        Message message = new Message("ORDER_TOPIC", 
            order.getTag(), 
            order.toString().getBytes());
        
        int priority = order.getPriority();
        if (priority == 1) {
            message.setTopic("ORDER_TOPIC_HIGH");
        } else if (priority == 2) {
            message.setTopic("ORDER_TOPIC_MEDIUM");
        } else {
            message.setTopic("ORDER_TOPIC_LOW");
        }
        
        producer.send(message);
    }
    
    // 方案二:延迟消息模拟优先级
    public void sendWithDelayPriority(Order order) {
        Message message = new Message("ORDER_TOPIC", 
            order.getTag(), 
            order.toString().getBytes());
        
        // 高优先级无延迟
        // 低优先级增加延迟
        int delayLevel = getDelayLevel(order.getPriority());
        message.setDelayTimeLevel(delayLevel);
        
        producer.send(message);
    }
}

下一步

接下来让我们学习顺序消息。

👉 顺序消息