第六章:消息类型与特性
深入了解 RocketMQ 的消息类型、消息属性和消息特性。
最后更新: 2024-01-15
页面目录
RocketMQ 消息类型与特性
本章详细介绍 RocketMQ 的消息结构和各种特性。
消息结构
消息组成
┌─────────────────────────────────────────────────────────────────┐
│ Message 结构 │
├─────────────────────────────────────────────────────────────────┤
│ │
│ Message { │
│ topic: String, // 主题 │
│ body: byte[], // 消息体 │
│ tags: String, // 标签 │
│ keys: String, // 业务键 │
│ flag: int, // 标识位 │
│ properties: Properties // 扩展属性 │
│ } │
│ │
└─────────────────────────────────────────────────────────────────┘
消息属性
| 属性 | 必填 | 说明 |
|---|---|---|
| topic | ✅ | 消息所属主题 |
| body | ✅ | 消息体内容 |
| tags | ❌ | 消息标签,用于过滤 |
| keys | ❌ | 消息业务键 |
| flag | ❌ | 消息标识位 |
| properties | ❌ | 扩展属性 |
消息属性详解
Topic(主题)
/**
* Topic 配置
*/
public class TopicConfig {
// Topic 名称规则
// 1. 不能为空
// 2. 不能包含非法字符: |,;,\n,\r
// 3. 建议格式: {业务名}_{模块}_{环境}
// 创建 Topic
// admin.createTopic("127.0.0.1:10911",
// new TopicTopic("ORDER_TOPIC", 8, 8, 0));
// Topic 配置参数
// readQueueNums: 读队列数量
// writeQueueNums: 写队列数量
// perm: 读写权限
}
Tag(标签)
/**
* Tag 使用示例
*/
public class TagUsage {
// 发送消息时设置 Tag
public void sendWithTag() {
Message message = new Message(
"ORDER_TOPIC", // Topic
"order:create", // Tag
"order_12345", // Key
"订单内容".getBytes() // Body
);
}
// 常用 Tag 命名规范
// 格式: {操作}:{对象}
// 示例:
// order:create - 订单创建
// order:pay - 订单支付
// order:ship - 订单发货
// order:complete - 订单完成
// order:cancel - 订单取消
// order:refund - 订单退款
}
Keys(业务键)
/**
* Keys 使用示例
*/
public class KeysUsage {
public void sendWithKeys() {
Message message = new Message(
"ORDER_TOPIC",
"order:create",
order.getOrderId(), // 使用订单ID作为Key
order.toString().getBytes()
);
// Keys 的作用:
// 1. 用于消息查询
// 2. 便于问题排查
// 3. 建立索引加快查询
}
// 查询消息
public void queryByKey() {
// 根据 Key 查询消息
DefaultMQPullConsumer consumer = new DefaultMQPullConsumer("my-consumer-group");
// 使用 admin 查询
MessageExt msg = admin.viewMessage("ORDER_TOPIC", offset);
// 根据 Key 模糊查询
// consumer.searchMessages("ORDER_TOPIC", "order_12345*");
}
}
消息特性
消息持久化
┌─────────────────────────────────────────────────────────────────┐
│ 消息持久化流程 │
├─────────────────────────────────────────────────────────────────┤
│ │
│ Producer ──► Broker ──► CommitLog ──► 磁盘 │
│ │ │
│ ▼ │
│ ConsumeQueue │
│ │ │
│ ▼ │
│ Consumer │
│ │
│ 持久化机制: │
│ ✅ 同步刷盘: 消息写入磁盘后返回 │
│ ✅ 异步刷盘: 消息写入内存,定时刷盘 │
│ │
└─────────────────────────────────────────────────────────────────┘
消息重试
/**
* 消息重试机制
*/
public class MessageRetry {
// 重试间隔
// RocketMQ 默认重试间隔
// 1st retry: 1s
// 2nd retry: 5s
// 3rd retry: 10s
// 4th retry: 30s
// 5th retry: 1m
// ... 以此类推,最大到2h
// 自定义重试间隔
// 设置消息属性
// message.putUserProperty("DELAY", "3"); // 自定义延迟
}
消息过滤
/**
* 消息过滤机制
*/
// Broker 端过滤
// 消费者订阅时指定过滤表达式
// Tag 过滤
consumer.subscribe("ORDER_TOPIC", "order:create || order:pay");
// SQL 过滤
consumer.subscribe("ORDER_TOPIC",
MessageSelector.bySql("amount > 100 and city = '北京'"));
// 过滤原理
// 1. Tag 过滤: Broker 根据 tagCode 过滤
// 2. SQL 过滤: Broker 执行 SQL 表达式过滤
消息扩展属性
内置属性
| 属性名 | 说明 |
|---|---|
| TAGS | 消息标签 |
| KEYS | 业务键 |
| WAIT | 是否等待存储完成 |
| DELAY | 延迟级别 |
| RETRY_NUM | 重试次数 |
自定义属性
/**
* 自定义消息属性
*/
public class CustomProperties {
public void sendWithCustomProperties() {
Message message = new Message(
"ORDER_TOPIC",
"order:create",
"order_12345",
"订单内容".getBytes()
);
// 添加自定义属性
message.putUserProperty("orderId", "12345");
message.putUserProperty("userId", "1001");
message.putUserProperty("amount", "99.9");
message.putUserProperty("city", "北京");
message.putUserProperty("channel", "APP");
message.putUserProperty("version", "v1.0");
// 发送消息
producer.send(message);
}
public void queryWithCustomProperties() {
// 根据自定义属性查询
// admin 查询消息后获取 properties
MessageExt msg = admin.viewMessage("ORDER_TOPIC", offset);
String orderId = msg.getUserProperty("orderId");
String city = msg.getUserProperty("city");
}
}
消息查询
按 Message ID 查询
/**
* 按 Message ID 查询
*/
public MessageExt queryByMessageId(String topic, String msgId) {
// Message ID 格式: rocketmq+MAC 地址 + 偏移量
// 例如: 0A123456789ABCDEF00001CD20000000
try {
// 使用 DefaultMQAdminExt 查询
MessageExt msg = admin.viewMessage(msgId);
return msg;
} catch (Exception e) {
log.error("查询消息失败", e);
return null;
}
}
按 Topic 和 Key 查询
/**
* 按 Topic 和 Key 查询
*/
public List<MessageExt> queryByTopicAndKey(String topic, String key) {
try {
// 查询消息
QueryResult result = admin.queryMessage(topic, key, 100, 0, System.currentTimeMillis());
return result.getMessageList();
} catch (Exception e) {
log.error("查询消息失败", e);
return Collections.emptyList();
}
}
按时间查询
/**
* 按时间范围查询
*/
public List<MessageExt> queryByTimeRange(String topic, long startTime, long endTime) {
try {
// 根据时间戳查询
QueryResult result = admin.queryMessage(
topic,
"*", // 不指定 key
100, // 最大消息数
startTime,
endTime
);
return result.getMessageList();
} catch (Exception e) {
log.error("查询消息失败", e);
return Collections.emptyList();
}
}
消息过滤
Tag 过滤
/**
* Tag 过滤
*/
public class TagFilter {
// 单个 Tag
consumer.subscribe("ORDER_TOPIC", "order:create");
// 多个 Tag (OR)
consumer.subscribe("ORDER_TOPIC", "order:create || order:pay");
// 所有 Tag (*)
consumer.subscribe("ORDER_TOPIC", "*");
// 不推荐 (!=)
consumer.subscribe("ORDER_TOPIC", "order:create || order:pay || order:ship");
}
SQL92 过滤
/**
* SQL92 过滤表达式
*/
public class SQLFilter {
// 比较运算符
// =, !=, >, <, >=, <=
// 逻辑运算符
// AND, OR, NOT
// IN
// city IN ('北京', '上海', '广州')
// BETWEEN
// amount BETWEEN 100 AND 1000
// IS NULL / IS NOT NULL
// city IS NOT NULL
// LIKE
// name LIKE '张%'
// 数字类型
consumer.subscribe("ORDER_TOPIC",
MessageSelector.bySql("amount > 100"));
// 字符串类型
consumer.subscribe("ORDER_TOPIC",
MessageSelector.bySql("city = '北京'"));
// 组合条件
consumer.subscribe("ORDER_TOPIC",
MessageSelector.bySql("amount > 100 AND city IN ('北京', '上海')"));
}
消息优先级
/**
* 消息优先级实现
*/
// RocketMQ 不支持严格优先级,但可以通过以下方式实现
public class PriorityImplementation {
// 方案一:多个队列模拟优先级
// 高优先级 -> Queue 0
// 中优先级 -> Queue 1
// 低优先级 -> Queue 2
public void sendWithPriority(Order order) {
Message message = new Message("ORDER_TOPIC",
order.getTag(),
order.toString().getBytes());
int priority = order.getPriority();
if (priority == 1) {
message.setTopic("ORDER_TOPIC_HIGH");
} else if (priority == 2) {
message.setTopic("ORDER_TOPIC_MEDIUM");
} else {
message.setTopic("ORDER_TOPIC_LOW");
}
producer.send(message);
}
// 方案二:延迟消息模拟优先级
public void sendWithDelayPriority(Order order) {
Message message = new Message("ORDER_TOPIC",
order.getTag(),
order.toString().getBytes());
// 高优先级无延迟
// 低优先级增加延迟
int delayLevel = getDelayLevel(order.getPriority());
message.setDelayTimeLevel(delayLevel);
producer.send(message);
}
}
下一步
接下来让我们学习顺序消息。
👉 顺序消息