第十一章:批量消息

深入了解 RocketMQ 批量消息的使用方法和最佳实践。

最后更新: 2024-01-15
页面目录

RocketMQ 批量消息

批量消息用于提高消息发送效率,适用于大量消息发送场景。

批量消息概述

┌─────────────────────────────────────────────────────────────────┐
│                      批量消息原理                                 │
├─────────────────────────────────────────────────────────────────┤
│                                                                  │
│   普通发送:                                                      │
│   Producer                                                      │
│      │                                                         │
│      │ send(msg1) ────────────► Broker                        │
│      │ send(msg2) ────────────► Broker                        │
│      │ send(msg3) ────────────► Broker                        │
│      │                                                         │
│      │ 3 次网络往返                                            │
│                                                                  │
│   ────────────────────────────────────────────────────────────  │
│                                                                  │
│   批量发送:                                                      │
│   Producer                                                      │
│      │                                                         │
│      │ batchSend(msg1, msg2, msg3) ────► Broker               │
│      │                                                         │
│      │ 1 次网络往返                                            │
│      │ 效率提升 3 倍                                           │
│                                                                  │
└─────────────────────────────────────────────────────────────────┘

批量发送

简单批量发送

import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.common.message.Message;
import java.util.ArrayList;
import java.util.List;

/**
 * 批量消息发送者
 */
public class BatchProducer {

    public void sendBatchMessages() throws Exception {
        // 创建消息列表
        List<Message> messages = new ArrayList<>();

        // 添加消息
        for (int i = 0; i < 100; i++) {
            Message message = new Message(
                "BATCH_TOPIC",
                "batch:item",
                "key_" + i,
                ("批量消息 " + i).getBytes()
            );
            messages.add(message);
        }

        // 批量发送
        // 注意: 批量消息的 Topic 必须相同
        // Tag 可以不同
        SendResult result = producer.send(messages);

        System.out.println("批量发送成功, 数量: " + messages.size());
    }
}

带相同 Topic 和 Tag

/**
 * 相同 Topic 和 Tag 的批量发送
 */
public class SimpleBatchProducer {

    public void sendSimpleBatch() throws Exception {
        String topic = "ORDER_TOPIC";
        String tag = "order:create";

        List<Message> messages = new ArrayList<>();

        // 批量创建订单消息
        for (int i = 0; i < 1000; i++) {
            Order order = new Order();
            order.setOrderId("ORDER_" + i);
            order.setAmount(100.0 + i);

            Message message = new Message(
                topic,
                tag,
                order.getOrderId(),
                JSON.toJSONString(order).getBytes()
            );
            messages.add(message);
        }

        // 批量发送
        producer.send(messages);
    }
}

不同 Tag 的批量发送

/**
 * 不同 Topic 或 Tag 的批量发送
 */
public class MixedBatchProducer {

    public void sendMixedBatch() throws Exception {
        // RocketMQ 5.x 支持不同 Tag
        // RocketMQ 4.x 需要按 Topic 和 Tag 分组

        List<Message> batch1 = new ArrayList<>();
        List<Message> batch2 = new ArrayList<>();

        for (int i = 0; i < 100; i++) {
            // 订单创建消息
            Message msg1 = new Message(
                "ORDER_TOPIC",
                "order:create",
                "order_" + i,
                ("创建订单: " + i).getBytes()
            );
            batch1.add(msg1);

            // 库存扣减消息
            Message msg2 = new Message(
                "STOCK_TOPIC",
                "stock:deduct",
                "product_" + i,
                ("扣减库存: " + i).getBytes()
            );
            batch2.add(msg2);
        }

        // 分别发送
        producer.send(batch1);
        producer.send(batch2);
    }
}

Spring Boot 实现

import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import java.util.ArrayList;
import java.util.List;

/**
 * 批量消息服务
 */
@Service
public class BatchMessageService {

    @Autowired
    private RocketMQTemplate rocketMQTemplate;

    /**
     * 批量发送订单消息
     */
    public void sendBatchOrders(List<Order> orders) {
        List<org.springframework.messaging.Message> messages = 
            new ArrayList<>();

        for (Order order : orders) {
            org.springframework.messaging.Message message = 
                org.springframework.messaging.support.MessageBuilder
                    .withPayload(order)
                    .setHeader("orderId", order.getOrderId())
                    .build();
            messages.add(message);
        }

        // 批量发送
        rocketMQTemplate.convertAndSend("ORDER_TOPIC:order:create", messages);
    }

    /**
     * 批量发送异步优化
     */
    public void sendBatchAsync(List<Order> orders) {
        // 分批发送,每批 100 条
        int batchSize = 100;
        int total = orders.size();

        for (int i = 0; i < total; i += batchSize) {
            int end = Math.min(i + batchSize, total);
            List<Order> batch = orders.subList(i, end);

            // 异步发送
            rocketMQTemplate.asyncSend("ORDER_TOPIC:order:create", 
                batch, 
                new SendCallback() {
                    @Override
                    public void onSuccess(SendResult sendResult) {
                        System.out.println("批量发送成功: " + batch.size());
                    }

                    @Override
                    public void onException(Throwable e) {
                        System.err.println("批量发送失败: " + e.getMessage());
                    }
                });
        }
    }
}

批量消费

import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.common.message.MessageExt;
import java.util.List;

/**
 * 批量消息消费
 */
public class BatchMessageListener implements MessageListenerConcurrently {

    @Override
    public ConsumeConcurrentlyStatus consumeMessage(
            List<MessageExt> msgs,
            ConsumeConcurrentlyContext context) {

        System.out.println("收到批量消息, 数量: " + msgs.size());

        try {
            for (MessageExt msg : msgs) {
                String body = new String(msg.getBody(), "UTF-8");
                processMessage(body);
            }

            return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;

        } catch (Exception e) {
            // 批量失败,返回重试
            return ConsumeConcurrentlyStatus.RECONSUME_LATER;
        }
    }

    private void processMessage(String message) {
        // 处理单条消息
    }
}

Spring Boot 批量消费配置

import org.apache.rocketmq.spring.annotation.ConsumeMessageMode;
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;

@Service
@RocketMQMessageListener(
    topic = "ORDER_TOPIC",
    consumerGroup = "batch-consumer-group",
    consumeMessageBatchMaxSize = 32  // 批量消费最大数量
)
public class BatchOrderConsumer implements RocketMQListener<List<Order>> {

    @Override
    public void onMessage(List<Order> orders) {
        System.out.println("收到批量订单, 数量: " + orders.size());

        try {
            // 批量处理
            for (Order order : orders) {
                processOrder(order);
            }

        } catch (Exception e) {
            // 抛出异常触发重试
            throw e;
        }
    }

    private void processOrder(Order order) {
        // 处理订单
    }
}

性能优化

批量大小设置

/**
 * 批量大小优化
 */
public class BatchSizeOptimization {

    // 1. 生产者批量大小
    // 默认无限制,但建议控制在合理范围

    // 2. 消费者批量大小
    @RocketMQMessageListener(
        consumerGroup = "my-consumer-group",
        consumeMessageBatchMaxSize = 32  // 批量消费最大数量
    )

    // 3. 单批消息大小
    // 建议单批不超过 1MB
    // 超过会发送失败

    public void sendBatches(List<Message> allMessages) throws Exception {
        // 分批发送,避免单批过大
        int maxBatchSize = 100;  // 每批100条
        int total = allMessages.size();

        for (int i = 0; i < total; i += maxBatchSize) {
            int end = Math.min(i + maxBatchSize, total);
            List<Message> batch = allMessages.subList(i, end);

            producer.send(batch);
        }
    }
}

批量发送优化

/**
 * 批量发送优化策略
 */
public class BatchOptimization {

    /**
     * 策略1: 定时批量
     */
    private List<Message> messageBuffer = new ArrayList<>();
    private static final int BUFFER_SIZE = 100;
    private static final long FLUSH_INTERVAL = 100;  // 100ms

    public void sendWithBuffer(Message message) throws Exception {
        messageBuffer.add(message);

        // 达到批量大小或超时,发送
        if (messageBuffer.size() >= BUFFER_SIZE) {
            flush();
        }
    }

    private void flush() throws Exception {
        if (!messageBuffer.isEmpty()) {
            producer.send(messageBuffer);
            messageBuffer.clear();
        }
    }

    /**
     * 策略2: 异步批量
     */
    public void sendAsyncBatch(List<Message> messages) throws Exception {
        CountDownLatch latch = new CountDownLatch(messages.size());

        for (Message msg : messages) {
            producer.send(msg, new SendCallback() {
                @Override
                public void onSuccess(SendResult result) {
                    latch.countDown();
                }

                @Override
                public void onException(Throwable e) {
                    latch.countDown();
                }
            });
        }

        latch.await(30, TimeUnit.SECONDS);
    }
}

消息压缩

/**
 * 批量消息压缩
 */
public class CompressedBatchProducer {

    /**
     * RocketMQ 支持消息压缩
     */
    public void sendCompressedBatch() throws Exception {
        // 批量消息自动压缩
        // 压缩级别可配置

        List<Message> messages = new ArrayList<>();

        for (int i = 0; i < 1000; i++) {
            Message message = new Message(
                "COMPRESS_TOPIC",
                "compress:item",
                "key_" + i,
                generateLargeContent().getBytes()  // 大内容
            );
            messages.add(message);
        }

        // 批量发送(自动压缩)
        producer.send(messages);
    }

    /**
     * 自定义压缩
     */
    public void sendManuallyCompressed() throws Exception {
        // 使用 GZIP 压缩
        List<String> contents = new ArrayList<>();
        for (int i = 0; i < 1000; i++) {
            contents.add("内容 " + i);
        }

        // 序列化和压缩
        String json = JSON.toJSONString(contents);
        byte[] compressed = compress(json.getBytes());

        // 发送单条压缩消息
        Message message = new Message(
            "COMPRESS_TOPIC",
            "compress:batch",
            "batch_key",
            compressed
        );

        producer.send(message);
    }

    private byte[] compress(byte[] data) throws IOException {
        ByteArrayOutputStream bos = new ByteArrayOutputStream();
        GZIPOutputStream gzip = new GZIPOutputStream(bos);
        gzip.write(data);
        gzip.close();
        return bos.toByteArray();
    }
}

应用场景

1. 日志收集

/**
 * 日志批量收集
 */
public class LogCollector {

    private List<Message> logBuffer = new ConcurrentLinkedQueue<>();

    public void collectLog(String log) {
        Message message = new Message(
            "LOG_TOPIC",
            "log:info",
            UUID.randomUUID().toString(),
            log.getBytes()
        );
        logBuffer.add(message);

        // 达到一定数量批量发送
        if (logBuffer.size() >= 100) {
            flush();
        }
    }

    private void flush() {
        List<Message> batch = new ArrayList<>();
        logBuffer.drainTo(batch, 100);

        if (!batch.isEmpty()) {
            try {
                producer.send(batch);
            } catch (Exception e) {
                log.error("日志发送失败", e);
            }
        }
    }
}

2. 数据同步

/**
 * 数据批量同步
 */
public class DataSyncService {

    public void syncProducts(List<Product> products) {
        // 按批次同步
        int batchSize = 50;

        for (int i = 0; i < products.size(); i += batchSize) {
            int end = Math.min(i + batchSize, products.size());
            List<Product> batch = products.subList(i, end);

            List<Message> messages = batch.stream()
                .map(p -> new Message(
                    "SYNC_TOPIC",
                    "sync:product",
                    p.getId(),
                    JSON.toJSONString(p).getBytes()
                ))
                .collect(Collectors.toList());

            try {
                producer.send(messages);
                System.out.println("同步成功: " + batch.size());
            } catch (Exception e) {
                handleSyncError(batch, e);
            }
        }
    }
}

最佳实践

/**
 * 批量消息最佳实践
 */
public class BestPractices {

    /**
     * 1. 合理设置批量大小
     */
    // 生产者: 每批 100-500 条
    // 消费者: 每批 16-32 条
    // 单批大小不超过 1MB

    /**
     * 2. 做好错误处理
     */
    public void handleBatchError(List<Message> batch, Exception e) {
        // 记录失败
        log.error("批量发送失败, 数量: " + batch.size(), e);

        // 逐条重试
        for (Message msg : batch) {
            try {
                producer.send(msg);
            } catch (Exception ex) {
                log.error("单条重试失败", ex);
            }
        }
    }

    /**
     * 3. 幂等处理
     */
    public void processBatchWithIdempotency(List<Message> msgs) {
        for (Message msg : msgs) {
            String key = msg.getKeys();

            // 检查是否已处理
            if (!isProcessed(key)) {
                processMessage(msg);
                markProcessed(key);
            }
        }
    }

    /**
     * 4. 监控批量发送
     */
    public void monitorBatchSend(List<Message> batch, SendResult result) {
        // 记录批量发送指标
        metrics.record("batch_send_size", batch.size());
        metrics.record("batch_send_time", result.getSendElapsedTime());
    }
}

下一步

接下来让我们学习集群部署。

👉 集群部署