第九章:消息过滤

深入了解 RocketMQ 的消息过滤机制,包括 Tag 过滤和 SQL92 过滤。

最后更新: 2024-01-15
页面目录

RocketMQ 消息过滤

消息过滤用于在服务端或客户端筛选感兴趣的消息,提高消费效率。

过滤机制概述

┌─────────────────────────────────────────────────────────────────┐
│                      消息过滤机制                                 │
├─────────────────────────────────────────────────────────────────┤
│                                                                  │
│   Producer ──► [msg1, msg2, msg3, msg4] ──► Broker              │
│                                                        │         │
│                                                        ▼         │
│   ┌─────────────────────────────────────────────────────────┐   │
│   │                    消息过滤                               │   │
│   │                                                         │   │
│   │   ┌───────────┐    ┌───────────┐    ┌───────────┐     │   │
│   │   │ Tag 过滤  │    │SQL92 过滤 │    │ 类过滤    │     │   │
│   │   └───────────┘    └───────────┘    └───────────┘     │   │
│   │                                                         │   │
│   └─────────────────────────────────────────────────────────┘   │
│                          │                                       │
│                          ▼                                       │
│   Consumer ◄─── 只接收过滤后的消息                               │
│                                                                  │
└─────────────────────────────────────────────────────────────────┘

过滤类型对比

类型 过滤位置 过滤能力 性能
Tag 过滤 Broker 简单匹配
SQL92 过滤 Broker 复杂条件
类过滤 Client 任意逻辑

Tag 过滤

基础用法

/**
 * Tag 过滤
 */
public class TagFilterProducer {

    public void sendWithTags() {
        // 发送不同 Tag 的消息
        producer.send(new Message("ORDER_TOPIC", "order:create", 
            "订单1创建".getBytes()));
        
        producer.send(new Message("ORDER_TOPIC", "order:pay", 
            "订单2支付".getBytes()));
        
        producer.send(new Message("ORDER_TOPIC", "order:ship", 
            "订单3发货".getBytes()));
    }
}

public class TagFilterConsumer {

    public void subscribeByTags() {
        // 单个 Tag
        consumer.subscribe("ORDER_TOPIC", "order:create");
        
        // 多个 Tag (OR)
        consumer.subscribe("ORDER_TOPIC", "order:create || order:pay");
        
        // 所有 Tag
        consumer.subscribe("ORDER_TOPIC", "*");
    }
}

SQL92 过滤

/**
 * SQL92 过滤表达式
 */
public class SQLFilterExpression {

    // 比较运算符: =, !=, >, <, >=, <=
    // 逻辑运算符: AND, OR, NOT
    // IN, BETWEEN, LIKE, IS NULL

    // 示例: amount > 100 AND city = '北京'
}

// 消费者订阅
@RocketMQMessageListener(
    topic = "ORDER_TOPIC",
    selectorType = SelectorType.SQL92,
    selectorExpression = "amount > 100 AND level = 'vip'"
)
public class SQLFilterConsumer implements RocketMQListener<Order> {
}

Broker 配置

# broker.conf
enablePropertyFilter = true
waitTimeMomentInPropertyFilterQueue = 5000
maxMsgsInBatch = 64

常见问题

过滤不生效

  • Broker 是否启用 SQL 过滤
  • 选择器类型是否正确
  • 属性类型是否匹配

下一步

接下来让我们学习延迟消息。

👉 延迟消息