第十四章:性能优化
深入了解 RocketMQ 的性能优化技巧和最佳实践。
最后更新: 2024-01-15
页面目录
RocketMQ 性能优化
本章介绍 RocketMQ 的性能优化技巧,帮助提升系统的吞吐量和降低延迟。
性能指标
┌─────────────────────────────────────────────────────────────────┐
│ 性能优化指标 │
├─────────────────────────────────────────────────────────────────┤
│ │
│ ┌─────────────┐ ┌─────────────┐ ┌─────────────┐ │
│ │ 吞吐量 │ │ 延迟 │ │ 可靠性 │ │
│ │ Messages/sec│ │ Latency │ │ Reliability │ │
│ │ 10万+ │ │ < 5ms │ │ 99.99% │ │
│ └─────────────┘ └─────────────┘ └─────────────┘ │
│ │
└─────────────────────────────────────────────────────────────────┘
Broker 优化
JVM 参数调优
# bin/runbroker.sh
JAVA_OPT="${JAVA_OPT} -server -Xms8g -Xmx8g -Xmn4g"
# GC 参数
JAVA_OPT="${JAVA_OPT} -XX:+UseG1GC"
JAVA_OPT="${JAVA_OPT} -XX:MaxGCPauseMillis=20"
JAVA_OPT="${JAVA_OPT} -XX:+ParallelRefProcEnabled"
JAVA_OPT="${JAVA_OPT} -XX:-OmitStackTraceInFastThrow"
# 内存参数
JAVA_OPT="${JAVA_OPT} -XX:+AlwaysPreTouch"
JAVA_OPT="${JAVA_OPT} -XX:-UseLargePages"
存储优化
# broker.conf
# 存储路径
storePathRootDir=/home/rocketmq/store
storePathCommitLog=/home/rocketmq/store/commitlog
# 内存映射优化
# CommitLog 文件大小
mapedFileSizeCommitLog=1073741824 # 1GB
# ConsumeQueue 文件大小
mapedFileSizeConsumeQueue=30000000
# 索引文件大小
indexFileSize=104857600 # 100MB
# 刷盘策略
# ASYNC_FLUSH: 异步刷盘,性能高
# SYNC_FLUSH: 同步刷盘,可靠性高
flushDiskType=ASYNC_FLUSH
# 刷盘间隔
flushIntervalCommitLog=500
flushConsumeQueueInterval=1000
网络优化
# 网络线程数
# 默认: CPU 核心数
serverWorkerThreads=8
# 客户端线程池大小
sendMessageThreadPoolNums=16
pullMessageThreadPoolNums=16
# Socket 缓冲区
socketChannelPoolSize=4
socket backlog=1024
生产者优化
发送模式选择
/**
* 生产者发送模式选择
*/
// 1. 同步发送 - 可靠性要求高
public void syncSend(Order order) throws Exception {
Message message = new Message(topic, tag, order.toString().getBytes());
SendResult result = producer.send(message);
// 等待发送结果
}
// 2. 异步发送 - 延迟敏感
public void asyncSend(Order order) throws Exception {
Message message = new Message(topic, tag, order.toString().getBytes());
producer.send(message, new SendCallback() {
@Override
public void onSuccess(SendResult result) {
// 处理成功
}
@Override
public void onException(Throwable e) {
// 处理失败
}
});
}
// 3. 单向发送 - 日志等高吞吐场景
public void onewaySend(String log) {
Message message = new Message(topic, tag, log.getBytes());
producer.sendOneway(message);
}
批量发送优化
/**
* 批量发送优化
*/
public class BatchSendOptimization {
private DefaultMQProducer producer;
private List<Message> batch = new ArrayList<>();
private static final int BATCH_SIZE = 100;
private static final long FLUSH_INTERVAL_MS = 10;
public void sendOptimized(Message message) throws Exception {
batch.add(message);
if (batch.size() >= BATCH_SIZE) {
flush();
}
}
private synchronized void flush() throws Exception {
if (!batch.isEmpty()) {
producer.send(batch);
batch.clear();
}
}
}
连接优化
/**
* Producer 连接优化
*/
public ProducerConfig createOptimizedProducer() {
DefaultMQProducer producer = new DefaultMQProducer("optimized-group");
// Nameserver 地址
producer.setNamesrvAddr("192.168.1.101:9876;192.168.1.102:9876");
// 发送超时时间
producer.setSendMsgTimeout(3000);
// 失败重试次数
producer.setRetryTimesWhenSendFailed(2);
producer.setRetryTimesWhenSendAsyncFailed(2);
// 最大消息大小
producer.setMaxMessageSize(1024 * 1024); // 4MB
// 压缩消息阈值
producer.setCompressMsgBodyOverHowmuch(4096);
// 一批消息最大条数
producer.setMaxBatchSize(100);
return producer;
}
消费者优化
并发消费
/**
* 并发消费配置
*/
@RocketMQMessageListener(
topic = "ORDER_TOPIC",
consumerGroup = "concurrent-consumer-group",
consumeThreadMin = 20, // 最小线程数
consumeThreadMax = 50, // 最大线程数
consumeMessageBatchMaxSize = 32 // 批量消费大小
)
public class ConcurrentConsumer implements RocketMQListener<Message> {
@Override
public void onMessage(Message message) {
// 处理消息
process(message);
}
}
批量消费
/**
* 批量消费实现
*/
@RocketMQMessageListener(
topic = "BATCH_TOPIC",
consumerGroup = "batch-consumer-group",
consumeMessageBatchMaxSize = 100
)
public class BatchConsumer implements RocketMQListener<List<Order>> {
@Override
public void onMessage(List<Order> orders) {
// 批量处理
orderService.batchProcess(orders);
}
}
消费线程优化
/**
* 消费线程优化
*/
public void optimizeConsumer() {
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("optimized-group");
// 核心线程数
consumer.setConsumeThreadMin(20);
// 最大线程数
consumer.setConsumeThreadMax(50);
// 批量消费大小
consumer.setConsumeMessageBatchMaxSize(32);
// 消息拉取大小
consumer.setPullBatchSize(32);
// 拉取间隔
consumer.setPullInterval(0);
// 消费超时
consumer.setConsumeTimeout(15);
}
消息优化
消息大小控制
/**
* 消息大小控制
*/
// 建议消息大小: 1KB - 100KB
// 最大消息大小: 4MB
public void sendWithSizeControl(Message message) {
byte[] body = message.getBody();
// 超过限制则分片发送
if (body.length > 100 * 1024) {
// 分片发送
sendInChunks(message);
} else {
producer.send(message);
}
}
private void sendInChunks(Message message) {
byte[] body = message.getBody();
int chunkSize = 50 * 1024; // 50KB 每片
for (int i = 0; i < body.length; i += chunkSize) {
int end = Math.min(i + chunkSize, body.length);
byte[] chunk = Arrays.copyOfRange(body, i, end);
Message chunkMsg = new Message(
message.getTopic(),
message.getTags(),
message.getKeys() + "_" + i,
chunk
);
producer.send(chunkMsg);
}
}
消息压缩
/**
* 消息压缩
*/
public void sendCompressed(List<Message> messages) throws Exception {
// RocketMQ 支持 LZ4 压缩
// 默认压缩阈值: 4KB
List<Message> compressedBatch = new ArrayList<>();
for (Message msg : messages) {
// 自动压缩大于阈值的消息
compressedBatch.add(msg);
}
producer.send(compressedBatch);
}
集群优化
Topic 设计
/**
* Topic 设计优化
*/
// 1. 合理设置队列数
// 队列数 = 消费者数 * 并发倍数
// 2. 避免热点 Topic
// 分散到多个队列
// 3. 队列数与消费者数匹配
@RocketMQMessageListener(
topic = "OPTIMIZED_TOPIC",
consumerGroup = "consumer-group-1",
consumeThreadMin = 20,
consumeThreadMax = 50
)
// 需要至少 4 个队列支持 50 个消费线程
负载均衡
/**
* 消费负载均衡
*/
public void configureLoadBalance() {
// 消费者数量变化时自动重新分配队列
// CLUSTERING 模式下自动负载均衡
// 调整重新平衡间隔
// 默认: 5秒
// 某些场景可以调大,减少频繁重平衡
consumer.setAllocateMessageQueueStrategy(
new AllocateMessageQueueAveragely() // 平均分配
// new AllocateMessageQueueConsistentHash() // 一致性哈希
);
}
性能测试
Benchmark 测试
#!/bin/bash
# benchmark-producer.sh
./tools.sh org.apache.rocketmq.tools.benchmark.ProducerBenchmark \
-n 192.168.1.101:9876 \
-t ORDER_TOPIC \
-s 1024 \ # 消息大小
-c 1000000 \ # 消息数量
-T 10 # 线程数
测试指标
| 指标 | 优秀 | 良好 | 一般 |
|---|---|---|---|
| 发送 TPS | > 10万 | 5-10万 | < 5万 |
| 发送延迟 P99 | < 5ms | 5-20ms | > 20ms |
| 消费 TPS | > 8万 | 3-8万 | < 3万 |
| 延迟 P99 | < 10ms | 10-50ms | > 50ms |
最佳实践
配置检查清单
# JVM 配置
-server
-Xms8g
-Xmx8g
-Xmn4g
-XX:+UseG1GC
# Broker 配置
flushDiskType=ASYNC_FLUSH
sendMessageThreadPoolNums=16
pullMessageThreadPoolNums=16
# Producer 配置
sendMsgTimeout=3000
retryTimesWhenSendFailed=2
maxMessageSize=4194304
监控指标
// 需要监控的关键指标
public class MonitorMetrics {
// Producer
// - 发送成功率
// - 发送延迟
// - 发送 TPS
// Consumer
// - 消费延迟
// - 消费 TPS
// - 堆积数量
// Broker
// - CPU 使用率
// - 内存使用率
// - 磁盘 IO
// - 网络带宽
}
下一步
接下来让我们学习最佳实践总结。
👉 最佳实践