第十五章:最佳实践
汇总 RocketMQ 使用中的最佳实践,涵盖设计、开发、运维等方面。
最后更新: 2024-01-15
页面目录
RocketMQ 最佳实践
本章汇总 RocketMQ 使用中的最佳实践,帮助构建可靠、高效的消息系统。
开发规范
1. Topic 命名规范
/**
* Topic 命名规范
*/
public class TopicNamingConvention {
// 推荐格式: {业务线}_{系统}_{模块}_{环境}
// 示例:
// order_service_order_create_dev // 开发环境
// order_service_order_create_test // 测试环境
// order_service_order_create_prod // 生产环境
// 常见 Topic 命名
// order_service_create // 订单创建
// order_service_pay // 订单支付
// payment_service_refund // 退款
// stock_service_deduct // 库存扣减
// user_service_register // 用户注册
// notification_service_sms // 短信通知
}
2. Tag 命名规范
/**
* Tag 命名规范
*/
public class TagNamingConvention {
// 格式: {操作}:{对象}
// 示例:
// create:order // 创建订单
// update:order // 更新订单
// cancel:order // 取消订单
// pay:order // 支付订单
// 避免过度使用 Tag
// 建议: 同一 Topic 下不超过 10 个 Tag
}
3. 消息 Key 设计
/**
* 消息 Key 设计
*/
public class MessageKeyDesign {
// Key 应该具有业务意义,便于问题排查
public void sendOrderMessage(Order order) {
Message message = new Message(
"ORDER_TOPIC",
"order:create",
order.getOrderId(), // ✅ 订单ID
order.toString().getBytes()
);
}
// 避免使用随机 Key
public void badExample() {
Message message = new Message(
"ORDER_TOPIC",
"order:create",
UUID.randomUUID().toString(), // ❌ 无业务意义
content.getBytes()
);
}
}
生产者最佳实践
发送可靠性
/**
* 发送可靠性最佳实践
*/
public class ProducerReliability {
private DefaultMQProducer producer;
public void init() {
producer = new DefaultMQProducer("reliable-producer-group");
// 1. 配置多个 Nameserver
producer.setNamesrvAddr("ns1:9876;ns2:9876;ns3:9876");
// 2. 发送失败重试
producer.setRetryTimesWhenSendFailed(3);
// 3. 异步发送失败重试
producer.setRetryTimesWhenSendAsyncFailed(3);
// 4. 合理超时时间
producer.setSendMsgTimeout(5000);
}
/**
* 同步发送 - 重要消息
*/
public SendResult sendReliableMessage(Message message) throws Exception {
try {
return producer.send(message);
} catch (Exception e) {
// 记录失败日志
log.error("消息发送失败: {}", message.getKeys(), e);
// 告警通知
alertService.sendAlert("消息发送失败: " + e.getMessage());
throw e;
}
}
/**
* 异步发送 - 高并发场景
*/
public void sendAsyncMessage(Message message) {
producer.send(message, new SendCallback() {
@Override
public void onSuccess(SendResult result) {
// 成功处理
}
@Override
public void onException(Throwable e) {
// 失败处理 - 记录、重试、告警
handleSendFailure(message, e);
}
});
}
}
发送性能
/**
* 发送性能最佳实践
*/
public class ProducerPerformance {
/**
* 1. 批量发送
*/
public void batchSend(List<Message> messages) throws Exception {
// 按 Topic 和 Tag 分组
Map<String, List<Message>> grouped = messages.stream()
.collect(Collectors.groupingBy(
m -> m.getTopic() + ":" + m.getTags()
));
for (List<Message> batch : grouped.values()) {
producer.send(batch);
}
}
/**
* 2. 并发发送
*/
public void concurrentSend(List<Message> messages) {
ExecutorService executor = Executors.newFixedThreadPool(10);
for (Message msg : messages) {
executor.submit(() -> {
try {
producer.send(msg);
} catch (Exception e) {
log.error("发送失败", e);
}
});
}
executor.shutdown();
}
/**
* 3. 合理使用单向发送
*/
public void sendOneWay(String log) {
// 适用于日志等不需要可靠性的场景
Message message = new Message("LOG_TOPIC", "log:info", log.getBytes());
producer.sendOneway(message);
}
}
消费者最佳实践
消费可靠性
/**
* 消费可靠性最佳实践
*/
@Service
@RocketMQMessageListener(
topic = "ORDER_TOPIC",
consumerGroup = "reliable-consumer-group",
maxReconsumeTimes = 3
)
public class ReliableConsumer implements RocketMQListener<Message> {
@Override
public void onMessage(Message message) {
try {
// 1. 解析消息
String body = new String(message.getBody(), "UTF-8");
Order order = JSON.parseObject(body, Order.class);
// 2. 幂等检查
if (isProcessed(order.getOrderId())) {
return;
}
// 3. 业务处理
processOrder(order);
// 4. 标记已处理
markProcessed(order.getOrderId());
} catch (Exception e) {
// 记录错误日志
log.error("处理消息失败, msgId: {}", message.getMsgId(), e);
// 抛出异常触发重试
throw e;
}
}
private boolean isProcessed(String orderId) {
return redisTemplate.hasKey("processed:order:" + orderId);
}
private void markProcessed(String orderId) {
redisTemplate.opsForValue().set(
"processed:order:" + orderId,
"1",
24,
TimeUnit.HOURS
);
}
}
消费性能
/**
* 消费性能最佳实践
*/
@Service
@RocketMQMessageListener(
topic = "PERF_TOPIC",
consumerGroup = "perf-consumer-group",
consumeThreadMin = 20,
consumeThreadMax = 50,
consumeMessageBatchMaxSize = 32
)
public class PerformanceConsumer implements RocketMQListener<List<Order>> {
@Override
public void onMessage(List<Order> orders) {
try {
// 1. 批量处理
batchProcess(orders);
} catch (Exception e) {
// 抛出异常触发重试
throw e;
}
}
private void batchProcess(List<Order> orders) {
// 使用批量处理提高性能
orderService.batchProcess(orders);
}
}
消息幂等
/**
* 消息幂等性实现
*/
public class IdempotentHandler {
private RedisTemplate redisTemplate;
/**
* Redis 幂等检查
*/
public boolean checkAndMark(String msgId, int expireSeconds) {
String key = "msg:processed:" + msgId;
Boolean success = redisTemplate.opsForValue()
.setIfAbsent(key, "1", expireSeconds, TimeUnit.SECONDS);
return Boolean.TRUE.equals(success);
}
/**
* 数据库幂等检查
*/
public boolean checkIdempotent(String businessId) {
return orderService.existsOrder(businessId);
}
/**
* 消息处理
*/
public void processWithIdempotency(Message message) {
String msgId = message.getMsgId();
// 幂等检查
if (!checkAndMark(msgId, 7 * 24 * 3600)) {
log.info("消息已处理, 跳过: {}", msgId);
return;
}
// 处理消息
doProcess(message);
}
}
运维最佳实践
集群部署
/**
* 集群部署最佳实践
*/
public class ClusterDeployment {
// 1. NameServer 部署
// 至少 2 个,推荐 3 个
// 2. Broker 部署
// 生产环境使用 DLedger 集群
// 配置: 3 主 3 从
// 3. 配置示例
public static final String NAMESEVER_ADDR =
"192.168.1.101:9876;192.168.1.102:9876;192.168.1.103:9876";
}
监控告警
/**
* 监控告警最佳实践
*/
public class MonitoringBestPractices {
// 需要监控的关键指标
// Producer:
// - 发送失败率
// - 发送延迟 P99
// Consumer:
// - 消费延迟
// - 消息堆积量
// - 消费失败率
// Broker:
// - CPU 使用率
// - 内存使用率
// - 磁盘使用率
// - 网络带宽
}
容量规划
/**
* 容量规划最佳实践
*/
public class CapacityPlanning {
// 1. 消息大小
// 建议: 1KB - 100KB
// 最大: 4MB
// 2. Topic 队列数
// 队列数 >= 消费者数 * 并发倍数
// 3. 存储容量
// 预估公式:
// 每日消息量 * 平均消息大小 * 保留天数 * 安全系数
// 4. 带宽
// 发送带宽 = TPS * 消息大小
// 网络带宽需要考虑峰值
}
安全最佳实践
访问控制
# ACL 配置
# broker.conf
aclEnable = true
# plain_acl.yml
accounts:
- accessKey: producer_key
secretKey: producer_secret
whiteRemoteAddress:
admin: false
defaultGroupPerms:
- group: producer-group
permission: PUB
- accessKey: consumer_key
secretKey: consumer_secret
whiteRemoteAddress:
admin: false
defaultGroupPerms:
- group: consumer-group
permission: SUB
故障排查清单
┌─────────────────────────────────────────────────────────────────┐
│ RocketMQ 故障排查清单 │
├─────────────────────────────────────────────────────────────────┤
│ │
│ 消息发送失败: │
│ □ 检查 Broker 状态 │
│ □ 检查 Nameserver 连接 │
│ □ 检查网络连通性 │
│ □ 查看 Broker 日志 │
│ │
│ 消息消费失败: │
│ □ 检查消费者配置 │
│ □ 检查消息处理逻辑 │
│ □ 查看消费日志 │
│ □ 检查重试队列 │
│ │
│ 消息堆积: │
│ □ 检查消费者数量 │
│ □ 检查消费速度 │
│ □ 扩容消费者 │
│ │
└─────────────────────────────────────────────────────────────────┘
总结
核心要点
| 环节 | 要点 |
|---|---|
| 设计 | Topic/Tag 命名规范、消息 Key 有意义 |
| 生产 | 可靠性配置、失败重试、幂等发送 |
| 消费 | 幂等处理、异常重试、顺序保证 |
| 运维 | 集群部署、监控告警、容量规划 |
推荐配置
# 生产环境推荐配置
# Broker
-Xms8g -Xmx8g -Xmn4g
flushDiskType=ASYNC_FLUSH
sendMessageThreadPoolNums=16
# Producer
namesrvAddr: 多个地址
retryTimesWhenSendFailed: 3
sendMsgTimeout: 3000
# Consumer
consumeThreadMin: 20
consumeThreadMax: 50
maxReconsumeTimes: 3
恭喜您完成了 RocketMQ 权威教程的全部内容!
👉 返回教程首页