第十五章:最佳实践

汇总 RocketMQ 使用中的最佳实践,涵盖设计、开发、运维等方面。

最后更新: 2024-01-15
页面目录

RocketMQ 最佳实践

本章汇总 RocketMQ 使用中的最佳实践,帮助构建可靠、高效的消息系统。

开发规范

1. Topic 命名规范

/**
 * Topic 命名规范
 */
public class TopicNamingConvention {

    // 推荐格式: {业务线}_{系统}_{模块}_{环境}
    // 示例:
    // order_service_order_create_dev      // 开发环境
    // order_service_order_create_test    // 测试环境
    // order_service_order_create_prod     // 生产环境

    // 常见 Topic 命名
    // order_service_create      // 订单创建
    // order_service_pay          // 订单支付
    // payment_service_refund     // 退款
    // stock_service_deduct       // 库存扣减
    // user_service_register      // 用户注册
    // notification_service_sms    // 短信通知
}

2. Tag 命名规范

/**
 * Tag 命名规范
 */
public class TagNamingConvention {

    // 格式: {操作}:{对象}
    // 示例:
    // create:order       // 创建订单
    // update:order       // 更新订单
    // cancel:order        // 取消订单
    // pay:order           // 支付订单

    // 避免过度使用 Tag
    // 建议: 同一 Topic 下不超过 10 个 Tag
}

3. 消息 Key 设计

/**
 * 消息 Key 设计
 */
public class MessageKeyDesign {

    // Key 应该具有业务意义,便于问题排查
    public void sendOrderMessage(Order order) {
        Message message = new Message(
            "ORDER_TOPIC",
            "order:create",
            order.getOrderId(),  // ✅ 订单ID
            order.toString().getBytes()
        );
    }

    // 避免使用随机 Key
    public void badExample() {
        Message message = new Message(
            "ORDER_TOPIC",
            "order:create",
            UUID.randomUUID().toString(),  // ❌ 无业务意义
            content.getBytes()
        );
    }
}

生产者最佳实践

发送可靠性

/**
 * 发送可靠性最佳实践
 */
public class ProducerReliability {

    private DefaultMQProducer producer;

    public void init() {
        producer = new DefaultMQProducer("reliable-producer-group");

        // 1. 配置多个 Nameserver
        producer.setNamesrvAddr("ns1:9876;ns2:9876;ns3:9876");

        // 2. 发送失败重试
        producer.setRetryTimesWhenSendFailed(3);

        // 3. 异步发送失败重试
        producer.setRetryTimesWhenSendAsyncFailed(3);

        // 4. 合理超时时间
        producer.setSendMsgTimeout(5000);
    }

    /**
     * 同步发送 - 重要消息
     */
    public SendResult sendReliableMessage(Message message) throws Exception {
        try {
            return producer.send(message);
        } catch (Exception e) {
            // 记录失败日志
            log.error("消息发送失败: {}", message.getKeys(), e);

            // 告警通知
            alertService.sendAlert("消息发送失败: " + e.getMessage());

            throw e;
        }
    }

    /**
     * 异步发送 - 高并发场景
     */
    public void sendAsyncMessage(Message message) {
        producer.send(message, new SendCallback() {
            @Override
            public void onSuccess(SendResult result) {
                // 成功处理
            }

            @Override
            public void onException(Throwable e) {
                // 失败处理 - 记录、重试、告警
                handleSendFailure(message, e);
            }
        });
    }
}

发送性能

/**
 * 发送性能最佳实践
 */
public class ProducerPerformance {

    /**
     * 1. 批量发送
     */
    public void batchSend(List<Message> messages) throws Exception {
        // 按 Topic 和 Tag 分组
        Map<String, List<Message>> grouped = messages.stream()
            .collect(Collectors.groupingBy(
                m -> m.getTopic() + ":" + m.getTags()
            ));

        for (List<Message> batch : grouped.values()) {
            producer.send(batch);
        }
    }

    /**
     * 2. 并发发送
     */
    public void concurrentSend(List<Message> messages) {
        ExecutorService executor = Executors.newFixedThreadPool(10);

        for (Message msg : messages) {
            executor.submit(() -> {
                try {
                    producer.send(msg);
                } catch (Exception e) {
                    log.error("发送失败", e);
                }
            });
        }

        executor.shutdown();
    }

    /**
     * 3. 合理使用单向发送
     */
    public void sendOneWay(String log) {
        // 适用于日志等不需要可靠性的场景
        Message message = new Message("LOG_TOPIC", "log:info", log.getBytes());
        producer.sendOneway(message);
    }
}

消费者最佳实践

消费可靠性

/**
 * 消费可靠性最佳实践
 */
@Service
@RocketMQMessageListener(
    topic = "ORDER_TOPIC",
    consumerGroup = "reliable-consumer-group",
    maxReconsumeTimes = 3
)
public class ReliableConsumer implements RocketMQListener<Message> {

    @Override
    public void onMessage(Message message) {
        try {
            // 1. 解析消息
            String body = new String(message.getBody(), "UTF-8");
            Order order = JSON.parseObject(body, Order.class);

            // 2. 幂等检查
            if (isProcessed(order.getOrderId())) {
                return;
            }

            // 3. 业务处理
            processOrder(order);

            // 4. 标记已处理
            markProcessed(order.getOrderId());

        } catch (Exception e) {
            // 记录错误日志
            log.error("处理消息失败, msgId: {}", message.getMsgId(), e);

            // 抛出异常触发重试
            throw e;
        }
    }

    private boolean isProcessed(String orderId) {
        return redisTemplate.hasKey("processed:order:" + orderId);
    }

    private void markProcessed(String orderId) {
        redisTemplate.opsForValue().set(
            "processed:order:" + orderId,
            "1",
            24,
            TimeUnit.HOURS
        );
    }
}

消费性能

/**
 * 消费性能最佳实践
 */
@Service
@RocketMQMessageListener(
    topic = "PERF_TOPIC",
    consumerGroup = "perf-consumer-group",
    consumeThreadMin = 20,
    consumeThreadMax = 50,
    consumeMessageBatchMaxSize = 32
)
public class PerformanceConsumer implements RocketMQListener<List<Order>> {

    @Override
    public void onMessage(List<Order> orders) {
        try {
            // 1. 批量处理
            batchProcess(orders);

        } catch (Exception e) {
            // 抛出异常触发重试
            throw e;
        }
    }

    private void batchProcess(List<Order> orders) {
        // 使用批量处理提高性能
        orderService.batchProcess(orders);
    }
}

消息幂等

/**
 * 消息幂等性实现
 */
public class IdempotentHandler {

    private RedisTemplate redisTemplate;

    /**
     * Redis 幂等检查
     */
    public boolean checkAndMark(String msgId, int expireSeconds) {
        String key = "msg:processed:" + msgId;
        Boolean success = redisTemplate.opsForValue()
            .setIfAbsent(key, "1", expireSeconds, TimeUnit.SECONDS);
        return Boolean.TRUE.equals(success);
    }

    /**
     * 数据库幂等检查
     */
    public boolean checkIdempotent(String businessId) {
        return orderService.existsOrder(businessId);
    }

    /**
     * 消息处理
     */
    public void processWithIdempotency(Message message) {
        String msgId = message.getMsgId();

        // 幂等检查
        if (!checkAndMark(msgId, 7 * 24 * 3600)) {
            log.info("消息已处理, 跳过: {}", msgId);
            return;
        }

        // 处理消息
        doProcess(message);
    }
}

运维最佳实践

集群部署

/**
 * 集群部署最佳实践
 */
public class ClusterDeployment {

    // 1. NameServer 部署
    // 至少 2 个,推荐 3 个

    // 2. Broker 部署
    // 生产环境使用 DLedger 集群
    // 配置: 3 主 3 从

    // 3. 配置示例
    public static final String NAMESEVER_ADDR =
        "192.168.1.101:9876;192.168.1.102:9876;192.168.1.103:9876";
}

监控告警

/**
 * 监控告警最佳实践
 */
public class MonitoringBestPractices {

    // 需要监控的关键指标

    // Producer:
    // - 发送失败率
    // - 发送延迟 P99

    // Consumer:
    // - 消费延迟
    // - 消息堆积量
    // - 消费失败率

    // Broker:
    // - CPU 使用率
    // - 内存使用率
    // - 磁盘使用率
    // - 网络带宽
}

容量规划

/**
 * 容量规划最佳实践
 */
public class CapacityPlanning {

    // 1. 消息大小
    // 建议: 1KB - 100KB
    // 最大: 4MB

    // 2. Topic 队列数
    // 队列数 >= 消费者数 * 并发倍数

    // 3. 存储容量
    // 预估公式:
    // 每日消息量 * 平均消息大小 * 保留天数 * 安全系数

    // 4. 带宽
    // 发送带宽 = TPS * 消息大小
    // 网络带宽需要考虑峰值
}

安全最佳实践

访问控制

# ACL 配置
# broker.conf
aclEnable = true

# plain_acl.yml
accounts:
  - accessKey: producer_key
    secretKey: producer_secret
    whiteRemoteAddress:
    admin: false
    defaultGroupPerms:
      - group: producer-group
        permission: PUB
  - accessKey: consumer_key
    secretKey: consumer_secret
    whiteRemoteAddress:
    admin: false
    defaultGroupPerms:
      - group: consumer-group
        permission: SUB

故障排查清单

┌─────────────────────────────────────────────────────────────────┐
│                   RocketMQ 故障排查清单                           │
├─────────────────────────────────────────────────────────────────┤
│                                                                  │
│  消息发送失败:                                                   │
│  □ 检查 Broker 状态                                             │
│  □ 检查 Nameserver 连接                                         │
│  □ 检查网络连通性                                               │
│  □ 查看 Broker 日志                                              │
│                                                                  │
│  消息消费失败:                                                   │
│  □ 检查消费者配置                                                │
│  □ 检查消息处理逻辑                                             │
│  □ 查看消费日志                                                 │
│  □ 检查重试队列                                                 │
│                                                                  │
│  消息堆积:                                                       │
│  □ 检查消费者数量                                               │
│  □ 检查消费速度                                                 │
│  □ 扩容消费者                                                   │
│                                                                  │
└─────────────────────────────────────────────────────────────────┘

总结

核心要点

环节 要点
设计 Topic/Tag 命名规范、消息 Key 有意义
生产 可靠性配置、失败重试、幂等发送
消费 幂等处理、异常重试、顺序保证
运维 集群部署、监控告警、容量规划

推荐配置

# 生产环境推荐配置

# Broker
-Xms8g -Xmx8g -Xmn4g
flushDiskType=ASYNC_FLUSH
sendMessageThreadPoolNums=16

# Producer
namesrvAddr: 多个地址
retryTimesWhenSendFailed: 3
sendMsgTimeout: 3000

# Consumer
consumeThreadMin: 20
consumeThreadMax: 50
maxReconsumeTimes: 3

恭喜您完成了 RocketMQ 权威教程的全部内容!

👉 返回教程首页