第九章:消息过滤
深入了解 RocketMQ 的消息过滤机制,包括 Tag 过滤和 SQL92 过滤。
最后更新: 2024-01-15
页面目录
RocketMQ 消息过滤
消息过滤用于在服务端或客户端筛选感兴趣的消息,提高消费效率。
过滤机制概述
┌─────────────────────────────────────────────────────────────────┐
│ 消息过滤机制 │
├─────────────────────────────────────────────────────────────────┤
│ │
│ Producer ──► [msg1, msg2, msg3, msg4] ──► Broker │
│ │ │
│ ▼ │
│ ┌─────────────────────────────────────────────────────────┐ │
│ │ 消息过滤 │ │
│ │ │ │
│ │ ┌───────────┐ ┌───────────┐ ┌───────────┐ │ │
│ │ │ Tag 过滤 │ │SQL92 过滤 │ │ 类过滤 │ │ │
│ │ └───────────┘ └───────────┘ └───────────┘ │ │
│ │ │ │
│ └─────────────────────────────────────────────────────────┘ │
│ │ │
│ ▼ │
│ Consumer ◄─── 只接收过滤后的消息 │
│ │
└─────────────────────────────────────────────────────────────────┘
过滤类型对比
| 类型 | 过滤位置 | 过滤能力 | 性能 |
|---|---|---|---|
| Tag 过滤 | Broker | 简单匹配 | 高 |
| SQL92 过滤 | Broker | 复杂条件 | 中 |
| 类过滤 | Client | 任意逻辑 | 低 |
Tag 过滤
基础用法
/**
* Tag 过滤
*/
public class TagFilterProducer {
public void sendWithTags() {
// 发送不同 Tag 的消息
producer.send(new Message("ORDER_TOPIC", "order:create",
"订单1创建".getBytes()));
producer.send(new Message("ORDER_TOPIC", "order:pay",
"订单2支付".getBytes()));
producer.send(new Message("ORDER_TOPIC", "order:ship",
"订单3发货".getBytes()));
}
}
public class TagFilterConsumer {
public void subscribeByTags() {
// 单个 Tag
consumer.subscribe("ORDER_TOPIC", "order:create");
// 多个 Tag (OR)
consumer.subscribe("ORDER_TOPIC", "order:create || order:pay");
// 所有 Tag
consumer.subscribe("ORDER_TOPIC", "*");
}
}
SQL92 过滤
/**
* SQL92 过滤表达式
*/
public class SQLFilterExpression {
// 比较运算符: =, !=, >, <, >=, <=
// 逻辑运算符: AND, OR, NOT
// IN, BETWEEN, LIKE, IS NULL
// 示例: amount > 100 AND city = '北京'
}
// 消费者订阅
@RocketMQMessageListener(
topic = "ORDER_TOPIC",
selectorType = SelectorType.SQL92,
selectorExpression = "amount > 100 AND level = 'vip'"
)
public class SQLFilterConsumer implements RocketMQListener<Order> {
}
Broker 配置
# broker.conf
enablePropertyFilter = true
waitTimeMomentInPropertyFilterQueue = 5000
maxMsgsInBatch = 64
常见问题
过滤不生效
- Broker 是否启用 SQL 过滤
- 选择器类型是否正确
- 属性类型是否匹配
下一步
接下来让我们学习延迟消息。
👉 延迟消息