第十一章:批量消息
深入了解 RocketMQ 批量消息的使用方法和最佳实践。
最后更新: 2024-01-15
页面目录
RocketMQ 批量消息
批量消息用于提高消息发送效率,适用于大量消息发送场景。
批量消息概述
┌─────────────────────────────────────────────────────────────────┐
│ 批量消息原理 │
├─────────────────────────────────────────────────────────────────┤
│ │
│ 普通发送: │
│ Producer │
│ │ │
│ │ send(msg1) ────────────► Broker │
│ │ send(msg2) ────────────► Broker │
│ │ send(msg3) ────────────► Broker │
│ │ │
│ │ 3 次网络往返 │
│ │
│ ──────────────────────────────────────────────────────────── │
│ │
│ 批量发送: │
│ Producer │
│ │ │
│ │ batchSend(msg1, msg2, msg3) ────► Broker │
│ │ │
│ │ 1 次网络往返 │
│ │ 效率提升 3 倍 │
│ │
└─────────────────────────────────────────────────────────────────┘
批量发送
简单批量发送
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.common.message.Message;
import java.util.ArrayList;
import java.util.List;
/**
* 批量消息发送者
*/
public class BatchProducer {
public void sendBatchMessages() throws Exception {
// 创建消息列表
List<Message> messages = new ArrayList<>();
// 添加消息
for (int i = 0; i < 100; i++) {
Message message = new Message(
"BATCH_TOPIC",
"batch:item",
"key_" + i,
("批量消息 " + i).getBytes()
);
messages.add(message);
}
// 批量发送
// 注意: 批量消息的 Topic 必须相同
// Tag 可以不同
SendResult result = producer.send(messages);
System.out.println("批量发送成功, 数量: " + messages.size());
}
}
带相同 Topic 和 Tag
/**
* 相同 Topic 和 Tag 的批量发送
*/
public class SimpleBatchProducer {
public void sendSimpleBatch() throws Exception {
String topic = "ORDER_TOPIC";
String tag = "order:create";
List<Message> messages = new ArrayList<>();
// 批量创建订单消息
for (int i = 0; i < 1000; i++) {
Order order = new Order();
order.setOrderId("ORDER_" + i);
order.setAmount(100.0 + i);
Message message = new Message(
topic,
tag,
order.getOrderId(),
JSON.toJSONString(order).getBytes()
);
messages.add(message);
}
// 批量发送
producer.send(messages);
}
}
不同 Tag 的批量发送
/**
* 不同 Topic 或 Tag 的批量发送
*/
public class MixedBatchProducer {
public void sendMixedBatch() throws Exception {
// RocketMQ 5.x 支持不同 Tag
// RocketMQ 4.x 需要按 Topic 和 Tag 分组
List<Message> batch1 = new ArrayList<>();
List<Message> batch2 = new ArrayList<>();
for (int i = 0; i < 100; i++) {
// 订单创建消息
Message msg1 = new Message(
"ORDER_TOPIC",
"order:create",
"order_" + i,
("创建订单: " + i).getBytes()
);
batch1.add(msg1);
// 库存扣减消息
Message msg2 = new Message(
"STOCK_TOPIC",
"stock:deduct",
"product_" + i,
("扣减库存: " + i).getBytes()
);
batch2.add(msg2);
}
// 分别发送
producer.send(batch1);
producer.send(batch2);
}
}
Spring Boot 实现
import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import java.util.ArrayList;
import java.util.List;
/**
* 批量消息服务
*/
@Service
public class BatchMessageService {
@Autowired
private RocketMQTemplate rocketMQTemplate;
/**
* 批量发送订单消息
*/
public void sendBatchOrders(List<Order> orders) {
List<org.springframework.messaging.Message> messages =
new ArrayList<>();
for (Order order : orders) {
org.springframework.messaging.Message message =
org.springframework.messaging.support.MessageBuilder
.withPayload(order)
.setHeader("orderId", order.getOrderId())
.build();
messages.add(message);
}
// 批量发送
rocketMQTemplate.convertAndSend("ORDER_TOPIC:order:create", messages);
}
/**
* 批量发送异步优化
*/
public void sendBatchAsync(List<Order> orders) {
// 分批发送,每批 100 条
int batchSize = 100;
int total = orders.size();
for (int i = 0; i < total; i += batchSize) {
int end = Math.min(i + batchSize, total);
List<Order> batch = orders.subList(i, end);
// 异步发送
rocketMQTemplate.asyncSend("ORDER_TOPIC:order:create",
batch,
new SendCallback() {
@Override
public void onSuccess(SendResult sendResult) {
System.out.println("批量发送成功: " + batch.size());
}
@Override
public void onException(Throwable e) {
System.err.println("批量发送失败: " + e.getMessage());
}
});
}
}
}
批量消费
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.common.message.MessageExt;
import java.util.List;
/**
* 批量消息消费
*/
public class BatchMessageListener implements MessageListenerConcurrently {
@Override
public ConsumeConcurrentlyStatus consumeMessage(
List<MessageExt> msgs,
ConsumeConcurrentlyContext context) {
System.out.println("收到批量消息, 数量: " + msgs.size());
try {
for (MessageExt msg : msgs) {
String body = new String(msg.getBody(), "UTF-8");
processMessage(body);
}
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
} catch (Exception e) {
// 批量失败,返回重试
return ConsumeConcurrentlyStatus.RECONSUME_LATER;
}
}
private void processMessage(String message) {
// 处理单条消息
}
}
Spring Boot 批量消费配置
import org.apache.rocketmq.spring.annotation.ConsumeMessageMode;
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
@Service
@RocketMQMessageListener(
topic = "ORDER_TOPIC",
consumerGroup = "batch-consumer-group",
consumeMessageBatchMaxSize = 32 // 批量消费最大数量
)
public class BatchOrderConsumer implements RocketMQListener<List<Order>> {
@Override
public void onMessage(List<Order> orders) {
System.out.println("收到批量订单, 数量: " + orders.size());
try {
// 批量处理
for (Order order : orders) {
processOrder(order);
}
} catch (Exception e) {
// 抛出异常触发重试
throw e;
}
}
private void processOrder(Order order) {
// 处理订单
}
}
性能优化
批量大小设置
/**
* 批量大小优化
*/
public class BatchSizeOptimization {
// 1. 生产者批量大小
// 默认无限制,但建议控制在合理范围
// 2. 消费者批量大小
@RocketMQMessageListener(
consumerGroup = "my-consumer-group",
consumeMessageBatchMaxSize = 32 // 批量消费最大数量
)
// 3. 单批消息大小
// 建议单批不超过 1MB
// 超过会发送失败
public void sendBatches(List<Message> allMessages) throws Exception {
// 分批发送,避免单批过大
int maxBatchSize = 100; // 每批100条
int total = allMessages.size();
for (int i = 0; i < total; i += maxBatchSize) {
int end = Math.min(i + maxBatchSize, total);
List<Message> batch = allMessages.subList(i, end);
producer.send(batch);
}
}
}
批量发送优化
/**
* 批量发送优化策略
*/
public class BatchOptimization {
/**
* 策略1: 定时批量
*/
private List<Message> messageBuffer = new ArrayList<>();
private static final int BUFFER_SIZE = 100;
private static final long FLUSH_INTERVAL = 100; // 100ms
public void sendWithBuffer(Message message) throws Exception {
messageBuffer.add(message);
// 达到批量大小或超时,发送
if (messageBuffer.size() >= BUFFER_SIZE) {
flush();
}
}
private void flush() throws Exception {
if (!messageBuffer.isEmpty()) {
producer.send(messageBuffer);
messageBuffer.clear();
}
}
/**
* 策略2: 异步批量
*/
public void sendAsyncBatch(List<Message> messages) throws Exception {
CountDownLatch latch = new CountDownLatch(messages.size());
for (Message msg : messages) {
producer.send(msg, new SendCallback() {
@Override
public void onSuccess(SendResult result) {
latch.countDown();
}
@Override
public void onException(Throwable e) {
latch.countDown();
}
});
}
latch.await(30, TimeUnit.SECONDS);
}
}
消息压缩
/**
* 批量消息压缩
*/
public class CompressedBatchProducer {
/**
* RocketMQ 支持消息压缩
*/
public void sendCompressedBatch() throws Exception {
// 批量消息自动压缩
// 压缩级别可配置
List<Message> messages = new ArrayList<>();
for (int i = 0; i < 1000; i++) {
Message message = new Message(
"COMPRESS_TOPIC",
"compress:item",
"key_" + i,
generateLargeContent().getBytes() // 大内容
);
messages.add(message);
}
// 批量发送(自动压缩)
producer.send(messages);
}
/**
* 自定义压缩
*/
public void sendManuallyCompressed() throws Exception {
// 使用 GZIP 压缩
List<String> contents = new ArrayList<>();
for (int i = 0; i < 1000; i++) {
contents.add("内容 " + i);
}
// 序列化和压缩
String json = JSON.toJSONString(contents);
byte[] compressed = compress(json.getBytes());
// 发送单条压缩消息
Message message = new Message(
"COMPRESS_TOPIC",
"compress:batch",
"batch_key",
compressed
);
producer.send(message);
}
private byte[] compress(byte[] data) throws IOException {
ByteArrayOutputStream bos = new ByteArrayOutputStream();
GZIPOutputStream gzip = new GZIPOutputStream(bos);
gzip.write(data);
gzip.close();
return bos.toByteArray();
}
}
应用场景
1. 日志收集
/**
* 日志批量收集
*/
public class LogCollector {
private List<Message> logBuffer = new ConcurrentLinkedQueue<>();
public void collectLog(String log) {
Message message = new Message(
"LOG_TOPIC",
"log:info",
UUID.randomUUID().toString(),
log.getBytes()
);
logBuffer.add(message);
// 达到一定数量批量发送
if (logBuffer.size() >= 100) {
flush();
}
}
private void flush() {
List<Message> batch = new ArrayList<>();
logBuffer.drainTo(batch, 100);
if (!batch.isEmpty()) {
try {
producer.send(batch);
} catch (Exception e) {
log.error("日志发送失败", e);
}
}
}
}
2. 数据同步
/**
* 数据批量同步
*/
public class DataSyncService {
public void syncProducts(List<Product> products) {
// 按批次同步
int batchSize = 50;
for (int i = 0; i < products.size(); i += batchSize) {
int end = Math.min(i + batchSize, products.size());
List<Product> batch = products.subList(i, end);
List<Message> messages = batch.stream()
.map(p -> new Message(
"SYNC_TOPIC",
"sync:product",
p.getId(),
JSON.toJSONString(p).getBytes()
))
.collect(Collectors.toList());
try {
producer.send(messages);
System.out.println("同步成功: " + batch.size());
} catch (Exception e) {
handleSyncError(batch, e);
}
}
}
}
最佳实践
/**
* 批量消息最佳实践
*/
public class BestPractices {
/**
* 1. 合理设置批量大小
*/
// 生产者: 每批 100-500 条
// 消费者: 每批 16-32 条
// 单批大小不超过 1MB
/**
* 2. 做好错误处理
*/
public void handleBatchError(List<Message> batch, Exception e) {
// 记录失败
log.error("批量发送失败, 数量: " + batch.size(), e);
// 逐条重试
for (Message msg : batch) {
try {
producer.send(msg);
} catch (Exception ex) {
log.error("单条重试失败", ex);
}
}
}
/**
* 3. 幂等处理
*/
public void processBatchWithIdempotency(List<Message> msgs) {
for (Message msg : msgs) {
String key = msg.getKeys();
// 检查是否已处理
if (!isProcessed(key)) {
processMessage(msg);
markProcessed(key);
}
}
}
/**
* 4. 监控批量发送
*/
public void monitorBatchSend(List<Message> batch, SendResult result) {
// 记录批量发送指标
metrics.record("batch_send_size", batch.size());
metrics.record("batch_send_time", result.getSendElapsedTime());
}
}
下一步
接下来让我们学习集群部署。
👉 集群部署