第十四章:性能优化

深入了解 RocketMQ 的性能优化技巧和最佳实践。

最后更新: 2024-01-15
页面目录

RocketMQ 性能优化

本章介绍 RocketMQ 的性能优化技巧,帮助提升系统的吞吐量和降低延迟。

性能指标

┌─────────────────────────────────────────────────────────────────┐
│                      性能优化指标                                 │
├─────────────────────────────────────────────────────────────────┤
│                                                                  │
│   ┌─────────────┐  ┌─────────────┐  ┌─────────────┐            │
│   │   吞吐量     │  │   延迟      │  │   可靠性     │            │
│   │  Messages/sec│  │   Latency  │  │  Reliability │            │
│   │   10万+     │  │   < 5ms    │  │    99.99%   │            │
│   └─────────────┘  └─────────────┘  └─────────────┘            │
│                                                                  │
└─────────────────────────────────────────────────────────────────┘

Broker 优化

JVM 参数调优

# bin/runbroker.sh

JAVA_OPT="${JAVA_OPT} -server -Xms8g -Xmx8g -Xmn4g"

# GC 参数
JAVA_OPT="${JAVA_OPT} -XX:+UseG1GC"
JAVA_OPT="${JAVA_OPT} -XX:MaxGCPauseMillis=20"
JAVA_OPT="${JAVA_OPT} -XX:+ParallelRefProcEnabled"
JAVA_OPT="${JAVA_OPT} -XX:-OmitStackTraceInFastThrow"

# 内存参数
JAVA_OPT="${JAVA_OPT} -XX:+AlwaysPreTouch"
JAVA_OPT="${JAVA_OPT} -XX:-UseLargePages"

存储优化

# broker.conf

# 存储路径
storePathRootDir=/home/rocketmq/store
storePathCommitLog=/home/rocketmq/store/commitlog

# 内存映射优化
# CommitLog 文件大小
mapedFileSizeCommitLog=1073741824  # 1GB

# ConsumeQueue 文件大小
mapedFileSizeConsumeQueue=30000000

# 索引文件大小
indexFileSize=104857600  # 100MB

# 刷盘策略
# ASYNC_FLUSH: 异步刷盘,性能高
# SYNC_FLUSH: 同步刷盘,可靠性高
flushDiskType=ASYNC_FLUSH

# 刷盘间隔
flushIntervalCommitLog=500
flushConsumeQueueInterval=1000

网络优化

# 网络线程数
# 默认: CPU 核心数
serverWorkerThreads=8

# 客户端线程池大小
sendMessageThreadPoolNums=16
pullMessageThreadPoolNums=16

# Socket 缓冲区
socketChannelPoolSize=4
socket backlog=1024

生产者优化

发送模式选择

/**
 * 生产者发送模式选择
 */

// 1. 同步发送 - 可靠性要求高
public void syncSend(Order order) throws Exception {
    Message message = new Message(topic, tag, order.toString().getBytes());
    SendResult result = producer.send(message);
    // 等待发送结果
}

// 2. 异步发送 - 延迟敏感
public void asyncSend(Order order) throws Exception {
    Message message = new Message(topic, tag, order.toString().getBytes());
    producer.send(message, new SendCallback() {
        @Override
        public void onSuccess(SendResult result) {
            // 处理成功
        }

        @Override
        public void onException(Throwable e) {
            // 处理失败
        }
    });
}

// 3. 单向发送 - 日志等高吞吐场景
public void onewaySend(String log) {
    Message message = new Message(topic, tag, log.getBytes());
    producer.sendOneway(message);
}

批量发送优化

/**
 * 批量发送优化
 */
public class BatchSendOptimization {

    private DefaultMQProducer producer;
    private List<Message> batch = new ArrayList<>();
    private static final int BATCH_SIZE = 100;
    private static final long FLUSH_INTERVAL_MS = 10;

    public void sendOptimized(Message message) throws Exception {
        batch.add(message);

        if (batch.size() >= BATCH_SIZE) {
            flush();
        }
    }

    private synchronized void flush() throws Exception {
        if (!batch.isEmpty()) {
            producer.send(batch);
            batch.clear();
        }
    }
}

连接优化

/**
 * Producer 连接优化
 */
public ProducerConfig createOptimizedProducer() {
    DefaultMQProducer producer = new DefaultMQProducer("optimized-group");

    // Nameserver 地址
    producer.setNamesrvAddr("192.168.1.101:9876;192.168.1.102:9876");

    // 发送超时时间
    producer.setSendMsgTimeout(3000);

    // 失败重试次数
    producer.setRetryTimesWhenSendFailed(2);
    producer.setRetryTimesWhenSendAsyncFailed(2);

    // 最大消息大小
    producer.setMaxMessageSize(1024 * 1024);  // 4MB

    // 压缩消息阈值
    producer.setCompressMsgBodyOverHowmuch(4096);

    // 一批消息最大条数
    producer.setMaxBatchSize(100);

    return producer;
}

消费者优化

并发消费

/**
 * 并发消费配置
 */
@RocketMQMessageListener(
    topic = "ORDER_TOPIC",
    consumerGroup = "concurrent-consumer-group",
    consumeThreadMin = 20,      // 最小线程数
    consumeThreadMax = 50,      // 最大线程数
    consumeMessageBatchMaxSize = 32  // 批量消费大小
)
public class ConcurrentConsumer implements RocketMQListener<Message> {

    @Override
    public void onMessage(Message message) {
        // 处理消息
        process(message);
    }
}

批量消费

/**
 * 批量消费实现
 */
@RocketMQMessageListener(
    topic = "BATCH_TOPIC",
    consumerGroup = "batch-consumer-group",
    consumeMessageBatchMaxSize = 100
)
public class BatchConsumer implements RocketMQListener<List<Order>> {

    @Override
    public void onMessage(List<Order> orders) {
        // 批量处理
        orderService.batchProcess(orders);
    }
}

消费线程优化

/**
 * 消费线程优化
 */
public void optimizeConsumer() {
    DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("optimized-group");

    // 核心线程数
    consumer.setConsumeThreadMin(20);

    // 最大线程数
    consumer.setConsumeThreadMax(50);

    // 批量消费大小
    consumer.setConsumeMessageBatchMaxSize(32);

    // 消息拉取大小
    consumer.setPullBatchSize(32);

    // 拉取间隔
    consumer.setPullInterval(0);

    // 消费超时
    consumer.setConsumeTimeout(15);
}

消息优化

消息大小控制

/**
 * 消息大小控制
 */

// 建议消息大小: 1KB - 100KB
// 最大消息大小: 4MB

public void sendWithSizeControl(Message message) {
    byte[] body = message.getBody();

    // 超过限制则分片发送
    if (body.length > 100 * 1024) {
        // 分片发送
        sendInChunks(message);
    } else {
        producer.send(message);
    }
}

private void sendInChunks(Message message) {
    byte[] body = message.getBody();
    int chunkSize = 50 * 1024;  // 50KB 每片

    for (int i = 0; i < body.length; i += chunkSize) {
        int end = Math.min(i + chunkSize, body.length);
        byte[] chunk = Arrays.copyOfRange(body, i, end);

        Message chunkMsg = new Message(
            message.getTopic(),
            message.getTags(),
            message.getKeys() + "_" + i,
            chunk
        );
        producer.send(chunkMsg);
    }
}

消息压缩

/**
 * 消息压缩
 */
public void sendCompressed(List<Message> messages) throws Exception {
    // RocketMQ 支持 LZ4 压缩
    // 默认压缩阈值: 4KB

    List<Message> compressedBatch = new ArrayList<>();

    for (Message msg : messages) {
        // 自动压缩大于阈值的消息
        compressedBatch.add(msg);
    }

    producer.send(compressedBatch);
}

集群优化

Topic 设计

/**
 * Topic 设计优化
 */

// 1. 合理设置队列数
// 队列数 = 消费者数 * 并发倍数

// 2. 避免热点 Topic
// 分散到多个队列

// 3. 队列数与消费者数匹配
@RocketMQMessageListener(
    topic = "OPTIMIZED_TOPIC",
    consumerGroup = "consumer-group-1",
    consumeThreadMin = 20,
    consumeThreadMax = 50
)
// 需要至少 4 个队列支持 50 个消费线程

负载均衡

/**
 * 消费负载均衡
 */
public void configureLoadBalance() {
    // 消费者数量变化时自动重新分配队列
    // CLUSTERING 模式下自动负载均衡

    // 调整重新平衡间隔
    // 默认: 5秒
    // 某些场景可以调大,减少频繁重平衡
    consumer.setAllocateMessageQueueStrategy(
        new AllocateMessageQueueAveragely()  // 平均分配
        // new AllocateMessageQueueConsistentHash()  // 一致性哈希
    );
}

性能测试

Benchmark 测试

#!/bin/bash
# benchmark-producer.sh

./tools.sh org.apache.rocketmq.tools.benchmark.ProducerBenchmark \
    -n 192.168.1.101:9876 \
    -t ORDER_TOPIC \
    -s 1024 \          # 消息大小
    -c 1000000 \       # 消息数量
    -T 10              # 线程数

测试指标

指标 优秀 良好 一般
发送 TPS > 10万 5-10万 < 5万
发送延迟 P99 < 5ms 5-20ms > 20ms
消费 TPS > 8万 3-8万 < 3万
延迟 P99 < 10ms 10-50ms > 50ms

最佳实践

配置检查清单

# JVM 配置
-server
-Xms8g
-Xmx8g
-Xmn4g
-XX:+UseG1GC

# Broker 配置
flushDiskType=ASYNC_FLUSH
sendMessageThreadPoolNums=16
pullMessageThreadPoolNums=16

# Producer 配置
sendMsgTimeout=3000
retryTimesWhenSendFailed=2
maxMessageSize=4194304

监控指标

// 需要监控的关键指标
public class MonitorMetrics {
    // Producer
    // - 发送成功率
    // - 发送延迟
    // - 发送 TPS

    // Consumer
    // - 消费延迟
    // - 消费 TPS
    // - 堆积数量

    // Broker
    // - CPU 使用率
    // - 内存使用率
    // - 磁盘 IO
    // - 网络带宽
}

下一步

接下来让我们学习最佳实践总结。

👉 最佳实践