第十章:延迟消息

深入了解 RocketMQ 延迟消息的实现原理和使用方法。

最后更新: 2024-01-15
页面目录

RocketMQ 延迟消息

延迟消息是指消息发送后,不能立即被消费者消费,需要等待指定时间后才能被消费。

延迟消息概述

Producer          Broker                    Consumer
   │                │                         │
   │ 1. 发送延迟消息  │                         │
   │ ───────────────►│                         │
   │                │                         │
   │                │ 2. 进入延迟队列           │
   │                │    (暂时不可消费)         │
   │                │                         │
   │                │ 3. 等待延迟时间           │
   │                │                         │
   │                │ 4. 延迟时间到达           │
   │                │                         │
   │                │ 5. 投递消息              │
   │                │─────────────────────────►│

延迟等级

RocketMQ 4.x 延迟等级

// level 1: 1s
// level 2: 5s
// level 3: 10s
// level 4: 30s
// level 5: 1m
// level 6: 2m
// ...
// level 18: 2h

Java 实现

发送延迟消息

public class DelayProducer {

    public void sendDelayMessage() throws Exception {
        Message message = new Message(
            "ORDER_TOPIC",
            "order:remind",
            "order_12345",
            "订单提醒".getBytes()
        );

        // 设置延迟等级
        message.setDelayTimeLevel(5);  // 1分钟后投递

        SendResult result = producer.send(message);
    }
}

Spring Boot 实现

@Service
public class DelayMessageService {

    @Autowired
    private RocketMQTemplate rocketMQTemplate;

    public void sendOrderTimeoutMessage(String orderId, int timeoutMinutes) {
        Message message = MessageBuilder
            .withPayload("订单超时提醒: " + orderId)
            .setHeader("orderId", orderId)
            .build();

        int delayLevel = getDelayLevel(timeoutMinutes);
        rocketMQTemplate.asyncSend("ORDER_TIMEOUT_TOPIC:order:timeout", 
            message, null, 3000, delayLevel);
    }

    private int getDelayLevel(int timeoutMinutes) {
        if (timeoutMinutes <= 1) return 1;
        if (timeoutMinutes <= 5) return 2;
        if (timeoutMinutes <= 30) return 4;
        if (timeoutMinutes <= 60) return 5;
        return 5;
    }
}

消费者实现

@Service
@RocketMQMessageListener(
    topic = "ORDER_TIMEOUT_TOPIC",
    consumerGroup = "timeout-consumer-group",
    selectorExpression = "order:timeout"
)
public class OrderTimeoutConsumer implements RocketMQListener<String> {

    @Override
    public void onMessage(String message) {
        System.out.println("收到订单超时消息: " + message);
        
        // 处理超时逻辑
        String orderId = extractOrderId(message);
        orderService.cancelOrder(orderId);
    }
}

应用场景

订单超时处理

public class OrderTimeoutWorkflow {

    public void createOrder(Order order) {
        // 创建订单
        orderService.createOrder(order);

        // 发送超时延迟消息
        Message message = new Message(
            "ORDER_TIMEOUT_TOPIC",
            "order:timeout",
            order.getOrderId(),
            ("订单超时提醒: " + order.getOrderId()).getBytes()
        );
        message.setDelayTimeLevel(4);  // 30秒后投递

        producer.send(message);
    }
}

最佳实践

幂等处理

@Service
@RocketMQMessageListener(
    topic = "ORDER_TIMEOUT_TOPIC",
    consumerGroup = "timeout-consumer-group"
)
public class IdempotentTimeoutConsumer implements RocketMQListener<String> {

    @Override
    public void onMessage(String message) {
        String orderId = extractOrderId(message);
        String key = "processed:timeout:" + orderId;

        if (Boolean.TRUE.equals(redisTemplate.hasKey(key))) {
            return;
        }

        processOrderTimeout(orderId);
        redisTemplate.opsForValue().set(key, "1", 24, TimeUnit.HOURS);
    }
}

下一步

接下来让我们学习批量消息。

👉 批量消息