第十章:延迟消息
深入了解 RocketMQ 延迟消息的实现原理和使用方法。
最后更新: 2024-01-15
页面目录
RocketMQ 延迟消息
延迟消息是指消息发送后,不能立即被消费者消费,需要等待指定时间后才能被消费。
延迟消息概述
Producer Broker Consumer
│ │ │
│ 1. 发送延迟消息 │ │
│ ───────────────►│ │
│ │ │
│ │ 2. 进入延迟队列 │
│ │ (暂时不可消费) │
│ │ │
│ │ 3. 等待延迟时间 │
│ │ │
│ │ 4. 延迟时间到达 │
│ │ │
│ │ 5. 投递消息 │
│ │─────────────────────────►│
延迟等级
RocketMQ 4.x 延迟等级
// level 1: 1s
// level 2: 5s
// level 3: 10s
// level 4: 30s
// level 5: 1m
// level 6: 2m
// ...
// level 18: 2h
Java 实现
发送延迟消息
public class DelayProducer {
public void sendDelayMessage() throws Exception {
Message message = new Message(
"ORDER_TOPIC",
"order:remind",
"order_12345",
"订单提醒".getBytes()
);
// 设置延迟等级
message.setDelayTimeLevel(5); // 1分钟后投递
SendResult result = producer.send(message);
}
}
Spring Boot 实现
@Service
public class DelayMessageService {
@Autowired
private RocketMQTemplate rocketMQTemplate;
public void sendOrderTimeoutMessage(String orderId, int timeoutMinutes) {
Message message = MessageBuilder
.withPayload("订单超时提醒: " + orderId)
.setHeader("orderId", orderId)
.build();
int delayLevel = getDelayLevel(timeoutMinutes);
rocketMQTemplate.asyncSend("ORDER_TIMEOUT_TOPIC:order:timeout",
message, null, 3000, delayLevel);
}
private int getDelayLevel(int timeoutMinutes) {
if (timeoutMinutes <= 1) return 1;
if (timeoutMinutes <= 5) return 2;
if (timeoutMinutes <= 30) return 4;
if (timeoutMinutes <= 60) return 5;
return 5;
}
}
消费者实现
@Service
@RocketMQMessageListener(
topic = "ORDER_TIMEOUT_TOPIC",
consumerGroup = "timeout-consumer-group",
selectorExpression = "order:timeout"
)
public class OrderTimeoutConsumer implements RocketMQListener<String> {
@Override
public void onMessage(String message) {
System.out.println("收到订单超时消息: " + message);
// 处理超时逻辑
String orderId = extractOrderId(message);
orderService.cancelOrder(orderId);
}
}
应用场景
订单超时处理
public class OrderTimeoutWorkflow {
public void createOrder(Order order) {
// 创建订单
orderService.createOrder(order);
// 发送超时延迟消息
Message message = new Message(
"ORDER_TIMEOUT_TOPIC",
"order:timeout",
order.getOrderId(),
("订单超时提醒: " + order.getOrderId()).getBytes()
);
message.setDelayTimeLevel(4); // 30秒后投递
producer.send(message);
}
}
最佳实践
幂等处理
@Service
@RocketMQMessageListener(
topic = "ORDER_TIMEOUT_TOPIC",
consumerGroup = "timeout-consumer-group"
)
public class IdempotentTimeoutConsumer implements RocketMQListener<String> {
@Override
public void onMessage(String message) {
String orderId = extractOrderId(message);
String key = "processed:timeout:" + orderId;
if (Boolean.TRUE.equals(redisTemplate.hasKey(key))) {
return;
}
processOrderTimeout(orderId);
redisTemplate.opsForValue().set(key, "1", 24, TimeUnit.HOURS);
}
}
下一步
接下来让我们学习批量消息。
👉 批量消息