第五章:消费者

深入了解 RocketMQ 消费者的使用,包括消费模式、拉取策略和消息处理。

最后更新: 2024-01-15
页面目录

RocketMQ 消费者

本章详细介绍 RocketMQ 消费者的使用和配置。

消费者概述

┌─────────────────────────────────────────────────────────────────┐
│                      消费者工作流程                               │
├─────────────────────────────────────────────────────────────────┤
│                                                                  │
│   Consumer                                                      │
│      │                                                         │
│      │ 1. 启动并注册到 ConsumerGroup                            │
│      │ ───────────────────────────────────────► Nameserver      │
│      │ ◄─────────────────────────────────────── Broker          │
│      │                                                            │
│      │ 2. 定时拉取消息                                           │
│      │ ───────────────────────────────────────► Broker          │
│      │ ◄─────────────────────────────────────── PullResult      │
│      │                                                            │
│      │ 3. 处理消息                                               │
│      │     └─► 执行业务逻辑                                      │
│      │                                                            │
│      │ 4. 提交消费进度                                           │
│      │ ───────────────────────────────────────► Broker          │
│      │                                                            │
│      │ 5. ACK 确认                                              │
│      │     └─► 消费成功/失败                                     │
│      │                                                            │
└─────────────────────────────────────────────────────────────────┘

消费模式

1. 集群消费

┌─────────────────────────────────────────────────────────────────┐
│                      集群消费模式                                 │
├─────────────────────────────────────────────────────────────────┤
│                                                                  │
│   Topic: ORDER_TOPIC                                             │
│   ┌─────────────────────────────────────────────────────────┐   │
│   │ Queue 0 │ Queue 1 │ Queue 2 │ Queue 3 │ Queue 4 │      │   │
│   └─────────────────────────────────────────────────────────┘   │
│       │           │           │           │           │         │
│       │           │           │           │           │         │
│   ┌───▼───┐   ┌───▼───┐   ┌───▼───┐   ┌───▼───┐   ┌───▼───┐   │
│   │Consumer│   │Consumer│   │Consumer│   │Consumer│   │Consumer│   │
│   │  1     │   │  2     │   │  1     │   │  2     │   │  3     │   │
│   │ Group1 │   │  Group1 │   │ Group2 │   │ Group2 │   │ Group2 │   │
│   └────────┘   └────────┘   └────────┘   └────────┘   └────────┘   │
│                                                                  │
│   特点:                                                         │
│   ✅ 消息负载均衡到多个消费者                                     │
│   ✅ 每个消息只被一个消费者消费                                   │
│   ✅ 适用于大部分业务场景                                         │
│                                                                  │
└─────────────────────────────────────────────────────────────────┘

2. 广播消费

┌─────────────────────────────────────────────────────────────────┐
│                      广播消费模式                                 │
├─────────────────────────────────────────────────────────────────┤
│                                                                  │
│   Topic: ORDER_TOPIC                                             │
│   ┌─────────────────────────────────────────────────────────┐   │
│   │ Queue 0 │ Queue 1 │ Queue 2 │ Queue 3 │ Queue 4 │      │   │
│   └─────────────────────────────────────────────────────────┘   │
│       │           │           │           │           │         │
│       │           │           │           │           │         │
│   ┌───▼───┐   ┌───▼───┐   ┌───▼───┐   ┌───▼───┐   ┌───▼───┐   │
│   │Consumer│   │Consumer│   │Consumer│   │Consumer│   │Consumer│   │
│   │  1     │   │  2     │   │  3     │   │  4     │   │  5     │   │
│   │        │   │        │   │        │   │        │   │        │   │
│   └────┬───┘   └────┬───┘   └────┬───┘   └────┬───┘   └────┬───┘   │
│        │            │            │            │            │        │
│        ▼            ▼            ▼            ▼            ▼        │
│   ┌─────────────────────────────────────────────────────────┐   │
│   │              所有消费者都会收到消息                        │   │
│   └─────────────────────────────────────────────────────────┘   │
│                                                                  │
│   特点:                                                         │
│   ✅ 每个消费者实例都会收到消息                                   │
│   ✅ 适用于消息通知、配置同步等场景                              │
│                                                                  │
└─────────────────────────────────────────────────────────────────┘

消费者配置

Spring Boot 配置

# application.yml
rocketmq:
  consumer:
    # 消费者组
    group: my-consumer-group
    # Nameserver 地址
    namesrvAddr: 127.0.0.1:9876
    # 订阅关系表达式
    topic: "ORDER_TOPIC"
    tags: "order:*"
    # 消费线程数
    consumeThreadMin: 10
    consumeThreadMax: 20
    # 消息拉取数量
    pullBatchSize: 32
    # 消息消费批量大小
    consumeMessageBatchMaxSize: 1
    # 最大重试次数
    maxReconsumeTimes: 3
    # 消费超时时间
    consumeTimeout: 15

Java 注解方式

import org.apache.rocketmq.spring.annotation.ConsumeMode;
import org.apache.rocketmq.spring.annotation.MessageModel;
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.springframework.stereotype.Component;

@Component
@RocketMQMessageListener(
    topic = "ORDER_TOPIC",
    consumerGroup = "order-consumer-group",
    topic = "ORDER_TOPIC",
    selectorExpression = "order:*",       // Tag 过滤
    consumeMode = ConsumeMode.CONCURRENTLY,  // 并发消费
    messageModel = MessageModel.CLUSTERING,  // 集群消费
    maxReconsumeTimes = 3,               // 最大重试次数
    consumeThreadMin = 10,               // 最小线程数
    consumeThreadMax = 20                // 最大线程数
)
public class OrderConsumer implements RocketMQListener<Order> {

    @Override
    public void onMessage(Order order) {
        try {
            // 处理订单消息
            System.out.println("收到订单消息: " + order);
            orderService.processOrder(order);
        } catch (Exception e) {
            // 抛出异常触发重试
            throw e;
        }
    }
}

消息处理

并发消费

import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.common.message.MessageExt;
import java.util.List;

public class MyMessageListener implements MessageListenerConcurrently {

    @Override
    public ConsumeConcurrentlyStatus consumeMessage(
            List<MessageExt> msgs,
            ConsumeConcurrentlyContext context) {
        
        for (MessageExt msg : msgs) {
            try {
                String topic = msg.getTopic();
                String tags = msg.getTags();
                String keys = msg.getKeys();
                String body = new String(msg.getBody(), "UTF-8");
                
                System.out.println("收到消息: " + body);
                
                // 业务处理
                processMessage(body);
                
            } catch (Exception e) {
                // 记录错误日志
                log.error("处理消息失败", e);
                
                // 返回重试
                return ConsumeConcurrentlyStatus.RECONSUME_LATER;
            }
        }
        
        // 消费成功
        return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
    }
}

顺序消费

import org.apache.rocketmq.client.consumer.listener.ConsumeOrderlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeOrderlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerOrderly;
import org.apache.rocketmq.common.message.MessageExt;
import java.util.List;

public class MyOrderlyMessageListener implements MessageListenerOrderly {

    @Override
    public ConsumeOrderlyStatus consumeMessage(
            List<MessageExt> msgs,
            ConsumeOrderlyContext context) {
        
        context.setAutoCommit(true);  // 自动提交
        
        for (MessageExt msg : msgs) {
            try {
                String body = new String(msg.getBody(), "UTF-8");
                String queueId = String.valueOf(msg.getQueueId());
                
                System.out.println("队列 " + queueId + " 收到消息: " + body);
                
                // 业务处理
                processMessage(body);
                
            } catch (Exception e) {
                // 发生异常时暂停队列消费
                context.setSuspendCurrentQueueTimeMillis(1000);
                return ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT;
            }
        }
        
        return ConsumeOrderlyStatus.SUCCESS;
    }
}

消费进度管理

消费进度存储

/**
 * 消费进度管理
 */
public class ConsumeProgressManager {
    
    // 获取消费进度
    public long getConsumeOffset(String group, String topic, int queueId) {
        // 从 Broker 获取消费进度
        // 存储在 Broker 的 consumerOffset.json 中
        String offsetKey = group + "@" + topic + "@" + queueId;
        return brokerView.getConsumerOffset(offsetKey);
    }
    
    // 更新消费进度
    public void updateConsumeOffset(String group, String topic, 
                                   int queueId, long offset) {
        // 更新到 Broker
        brokerView.updateConsumerOffset(group, topic, queueId, offset);
    }
    
    // 重置消费进度
    public void resetConsumeOffset(String group, String topic, long timestamp) {
        // 根据时间戳重置到指定位置
    }
}

手动提交进度

/**
 * 手动提交消费进度
 */
public class ManualCommitListener implements MessageListenerConcurrently {

    @Override
    public ConsumeConcurrentlyStatus consumeMessage(
            List<MessageExt> msgs,
            ConsumeConcurrentlyContext context) {
        
        ConsumeRequest request = new ConsumeRequest();
        
        for (MessageExt msg : msgs) {
            try {
                // 处理消息
                processMessage(msg);
                request.addSuccessMsg(msg);
            } catch (Exception e) {
                request.addFailedMsg(msg);
            }
        }
        
        // 手动处理成功和失败的消息
        if (!request.getFailedMsgs().isEmpty()) {
            // 记录失败消息
            handleFailedMessages(request.getFailedMsgs());
            return ConsumeConcurrentlyStatus.RECONSUME_LATER;
        }
        
        return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
    }
}

消息过滤

Tag 过滤

/**
 * Tag 过滤 - 消费者端过滤
 */
@RocketMQMessageListener(
    topic = "ORDER_TOPIC",
    selectorExpression = "order:create || order:pay"  // 多个 Tag 用 || 分隔
)
public class OrderConsumer implements RocketMQListener<Order> {
    // 只消费 order:create 和 order:pay 消息
}

SQL92 过滤

/**
 * SQL 过滤 - 服务端过滤
 */

// Broker 开启 SQL 过滤
// broker.conf 中添加: enablePropertyFilter = true

@RocketMQMessageListener(
    topic = "ORDER_TOPIC",
    selectorType = SelectorType.SQL92,  // 使用 SQL 过滤
    selectorExpression = "amount > 100 AND city = '北京'"
)
public class FilteredConsumer implements RocketMQListener<Order> {
    // 只消费 amount > 100 且 city = 北京的消息
}

消息重试

重试机制

┌─────────────────────────────────────────────────────────────────┐
│                      消息重试机制                                 │
├─────────────────────────────────────────────────────────────────┤
│                                                                  │
│   第一次消费失败 ──► 重试 ──► 第二次消费                         │
│        │                                   │                     │
│        │                                   ▼                     │
│        │                              第二次失败 ──► 重试        │
│        │                                                     │   │
│        │                                                     ▼   │
│        │                                                第三次失败 │
│        │                                                     │   │
│        ▼                                                     ▼   │
│   ┌─────────────┐                                     ┌─────────────┐
│   │  重试队列    │                                     │  死信队列    │
│   │ %RETRY%     │                                     │  %DLQ%      │
│   └─────────────┘                                     └─────────────┘
│                                                                  │
└─────────────────────────────────────────────────────────────────┘

重试配置

/**
 * 消息重试配置
 */
@RocketMQMessageListener(
    topic = "ORDER_TOPIC",
    maxReconsumeTimes = 3  // 最大重试次数
)
public class OrderConsumer implements RocketMQListener<Order> {
    // 当重试 3 次后仍然失败,消息会进入死信队列
}

死信队列处理

/**
 * 死信队列处理
 */
@RocketMQMessageListener(
    topic = "ORDER_TOPIC%DLQ",  // 死信队列 Topic
    consumerGroup = "dlq-consumer-group"
)
public class DLQConsumer implements RocketMQListener<MessageExt> {
    
    @Override
    public void onMessage(MessageExt msg) {
        try {
            // 处理死信消息
            String originalTopic = msg.getKeys();
            String body = new String(msg.getBody());
            
            log.error("处理死信消息: topic={}, body={}", originalTopic, body);
            
            // 人工处理或记录
            handleDeadLetter(originalTopic, body);
            
        } catch (Exception e) {
            log.error("处理死信消息失败", e);
        }
    }
}

最佳实践

1. 消息处理幂等

/**
 * 消息处理幂等性
 */
public class IdempotentConsumer {
    
    @Autowired
    private RedisTemplate redisTemplate;
    
    public void consumeMessage(MessageExt msg) {
        String msgId = msg.getMsgId();
        String key = "msg:consumed:" + msgId;
        
        // 检查是否已处理
        if (Boolean.TRUE.equals(redisTemplate.hasKey(key))) {
            System.out.println("消息已处理,跳过: " + msgId);
            return;
        }
        
        try {
            // 业务处理
            processMessage(msg);
            
            // 标记已处理
            redisTemplate.opsForValue().set(key, "1", 7, TimeUnit.DAYS);
            
        } catch (Exception e) {
            log.error("处理消息失败", e);
            throw e;
        }
    }
}

2. 消费限流

/**
 * 消费限流
 */
public class RateLimitConsumer {
    
    private static final int MAX_QPS = 100;
    private static final RateLimiter rateLimiter = RateLimiter.create(MAX_QPS);
    
    @Override
    public ConsumeConcurrentlyStatus consumeMessage(
            List<MessageExt> msgs,
            ConsumeConcurrentlyContext context) {
        
        for (MessageExt msg : msgs) {
            // 获取令牌
            rateLimiter.acquire();
            
            try {
                processMessage(msg);
            } catch (Exception e) {
                return ConsumeConcurrentlyStatus.RECONSUME_LATER;
            }
        }
        
        return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
    }
}

3. 慢消费处理

/**
 * 处理慢消费
 */
public class SlowConsumerOptimize {
    
    // 1. 增加消费线程
    // 2. 使用消息并行处理
    // 3. 批量消费优化
    
    @RocketMQMessageListener(
        topic = "ORDER_TOPIC",
        consumeThreadMin = 20,
        consumeThreadMax = 50,
        consumeMessageBatchMaxSize = 32  // 批量消费
    )
    public void batchConsume(List<Order> orders) {
        // 批量处理订单
        orderService.batchProcessOrders(orders);
    }
}

下一步

接下来让我们学习 RocketMQ 消息类型与特性。

👉 消息类型与特性