第四章:生产者

深入了解 RocketMQ 生产者的使用,包括发送模式、消息配置和最佳实践。

最后更新: 2024-01-15
页面目录

RocketMQ 生产者

本章详细介绍 RocketMQ 生产者的使用和配置。

生产者概述

┌─────────────────────────────────────────────────────────────────┐
│                      生产者工作流程                               │
├─────────────────────────────────────────────────────────────────┤
│                                                                  │
│   Producer                                                      │
│      │                                                         │
│      │ 1. 获取 Topic 路由信息                                   │
│      │ ───────────────────────────────────────► Nameserver     │
│      │ ◄─────────────────────────────────────── RouteInfo       │
│      │                                                            │
│      │ 2. 选择 Queue                                             │
│      │     └─► SelectQueue()                                    │
│      │                                                            │
│      │ 3. 发送消息                                               │
│      │ ───────────────────────────────────────► Broker          │
│      │ ◄─────────────────────────────────────── SendResult     │
│      │                                                            │
│      │ 4. 失败重试(可选)                                        │
│      │     └─► 重试其他 Queue                                    │
│      │                                                            │
└─────────────────────────────────────────────────────────────────┘

发送模式

1. 同步发送

import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;

@Service
public class OrderProducer {

    @Autowired
    private RocketMQTemplate rocketMQTemplate;

    /**
     * 同步发送 - 阻塞等待发送结果
     */
    public void sendSyncMessage() {
        // 创建消息
        org.apache.rocketmq.common.message.Message message = 
            new org.apache.rocketmq.common.message.Message(
                "ORDER_TOPIC",      // Topic
                "order:create",      // Tag
                "order_12345",       // Key
                "订单内容...".getBytes()  // Body
            );

        // 发送消息
        SendResult sendResult = rocketMQTemplate.getProducer()
            .send(message);

        System.out.println("发送状态: " + sendResult.getSendStatus());
        System.out.println("消息ID: " + sendResult.getMsgId());
        System.out.println("队列: " + sendResult.getMessageQueue().getQueueId());
    }

    /**
     * 简化同步发送
     */
    public void sendSimpleSync() {
        // 直接发送对象
        Order order = new Order("12345", "商品A", 100.0);
        rocketMQTemplate.convertAndSend("ORDER_TOPIC:order:create", order);
    }
}

2. 异步发送

import org.apache.rocketmq.client.producer.SendCallback;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.spring.core.RocketMQTemplate;

/**
 * 异步发送 - 不阻塞,立即返回
 */
public void sendAsyncMessage() {
    Message message = new Message(
        "ORDER_TOPIC",
        "order:pay",
        "order_12345",
        "支付内容...".getBytes()
    );

    // 设置异步回调
    rocketMQTemplate.getProducer().send(message, new SendCallback() {
        @Override
        public void onSuccess(SendResult sendResult) {
            System.out.println("发送成功: " + sendResult.getMsgId());
        }

        @Override
        public void onException(Throwable e) {
            System.err.println("发送失败: " + e.getMessage());
            // 处理失败逻辑,如记录日志、重试等
        }
    });

    // 异步发送不会阻塞,消息会在后台发送
    System.out.println("消息已提交发送");
}

3. 单向发送

/**
 * 单向发送 - 只管发送,不关心结果
 * 性能最高,可靠性最低
 */
public void sendOneWayMessage() {
    Message message = new Message(
        "LOG_TOPIC",
        "info",
        "log_key",
        "日志内容...".getBytes()
    );

    // 单向发送,无回调
    rocketMQTemplate.getProducer().sendOneway(message);
    
    System.out.println("消息已发送(单向)");
}

发送模式对比

模式 可靠性 性能 使用场景
同步发送 ⭐⭐⭐⭐⭐ ⭐⭐⭐ 重要消息
异步发送 ⭐⭐⭐⭐ ⭐⭐⭐⭐ 高并发请求
单向发送 ⭐⭐ ⭐⭐⭐⭐⭐ 日志收集

消息配置

消息属性

/**
 * 消息属性配置
 */
public void sendWithProperties() {
    Message message = new Message(
        "ORDER_TOPIC",
        "order:create",
        "order_12345",
        "订单内容".getBytes()
    );

    // 设置消息属性
    message.putUserProperty("orderId", "12345");
    message.putUserProperty("userId", "1001");
    message.putUserProperty("amount", "99.9");
    message.putUserProperty("city", "北京");

    // 设置延迟级别(消息级别)
    // message.setDelayTimeLevel(3); // 1s, 5s, 10s, 30s, 1m, 2m...

    SendResult result = rocketMQTemplate.getProducer().send(message);
}

生产者配置

# application.yml
rocketmq:
  producer:
    # 生产者组名
    group: my-producer-group
    # Nameserver 地址
    namesrvAddr: 127.0.0.1:9876
    # 发送超时时间(毫秒)
    sendMsgTimeout: 3000
    # 异步发送线程数
    asyncSenderThreadCount: 5
    # 消息压缩阈值
    compressMsgBodyOverHowmuch: 4096
    # 失败重试次数
    retryTimesWhenSendFailed: 2
    # 异步失败重试次数
    retryTimesWhenSendAsyncFailed: 2
    # 最大消息大小
    maxMessageSize: 131072
    # 是否允许发送事务消息
    transactionEnabled: true

自定义 Producer

import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.MQProducer;
import org.apache.rocketmq.client.producer.TransactionMQProducer;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class RocketMQConfig {

    @Bean
    public DefaultMQProducer defaultMQProducer() {
        DefaultMQProducer producer = new DefaultMQProducer("my-producer-group");
        producer.setNamesrvAddr("127.0.0.1:9876");
        producer.setSendMsgTimeout(5000);
        producer.setRetryTimesWhenSendFailed(3);
        producer.setMaxMessageSize(4 * 1024 * 1024); // 4MB
        
        try {
            producer.start();
        } catch (MQClientException e) {
            throw new RuntimeException("启动 Producer 失败", e);
        }
        
        return producer;
    }

    @Bean
    public TransactionMQProducer transactionMQProducer() {
        TransactionMQProducer producer = new TransactionMQProducer("my-transaction-group");
        producer.setNamesrvAddr("127.0.0.1:9876");
        // 设置事务回查监听器
        producer.setTransactionListener(new MyTransactionListener());
        
        try {
            producer.start();
        } catch (MQClientException e) {
            throw new RuntimeException("启动事务 Producer 失败", e);
        }
        
        return producer;
    }
}

消息发送参数

消息选择策略

/**
 * 自定义消息选择策略
 */
public void sendWithSelector() throws MQClientException {
    // 消息
    Message message = new Message(
        "ORDER_TOPIC",
        "order:create",
        "order_12345",
        "订单内容".getBytes()
    );

    // 自定义选择器 - 按订单金额选择队列
    SendResult result = rocketMQTemplate.getProducer().send(
        message,
        new MessageQueueSelector() {
            @Override
            public MessageQueue select(List<MessageQueue> mqs, 
                                       Message msg, 
                                       Object arg) {
                // arg 是 send 时传入的参数
                Double amount = (Double) arg;
                
                // 金额大于1000的走队列0-1
                if (amount > 1000) {
                    return mqs.get(0);
                }
                // 金额在500-1000的走队列2-3
                else if (amount > 500) {
                    return mqs.get(2);
                }
                // 其他走队列4+
                else {
                    return mqs.get(mqs.size() - 1);
                }
            }
        },
        999.9  // 选择器参数
    );
}

批量发送

import org.apache.rocketmq.client.producer.SendBatch;
import java.util.ArrayList;
import java.util.List;

/**
 * 批量发送消息
 */
public void sendBatchMessage() throws MQClientException {
    List<Message> messages = new ArrayList<>();
    
    for (int i = 0; i < 100; i++) {
        Message message = new Message(
            "BATCH_TOPIC",
            "batch:item",
            "batch_key_" + i,
            ("批量消息 " + i).getBytes()
        );
        messages.add(message);
    }

    // 批量发送
    SendResult result = rocketMQTemplate.getProducer().send(messages);
    
    System.out.println("批量发送成功,数量: " + messages.size());
}

发送结果处理

SendResult

/**
 * 发送结果解析
 */
public void handleSendResult(SendResult result) {
    // 发送状态
    SendStatus status = result.getSendStatus();
    switch (status) {
        case FLUSH_DISK_TIMEOUT:
            System.out.println("刷新磁盘超时");
            break;
        case FLUSH_SLAVE_TIMEOUT:
            System.out.println("同步从节点超时");
            break;
        case SLAVE_NOT_AVAILABLE:
            System.out.println("从节点不可用");
            break;
        case SEND_OK:
            System.out.println("发送成功");
            break;
    }

    // 消息ID
    String msgId = result.getMsgId();
    System.out.println("消息ID: " + msgId);

    // 消息队列信息
    MessageQueue queue = result.getMessageQueue();
    System.out.println("队列: " + queue.getQueueId());
    System.out.println("偏移量: " + result.getQueueOffset());
}

错误处理

重试机制

/**
 * 消息发送重试
 */
public void sendWithRetry() {
    int maxRetry = 3;
    int retryCount = 0;
    SendResult result = null;
    
    while (retryCount < maxRetry) {
        try {
            Message message = new Message(
                "ORDER_TOPIC",
                "order:create",
                "order_12345",
                "订单内容".getBytes()
            );
            
            result = rocketMQTemplate.getProducer().send(message);
            
            if (result.getSendStatus() == SendStatus.SEND_OK) {
                System.out.println("发送成功");
                break;
            }
        } catch (Exception e) {
            retryCount++;
            System.out.println("发送失败,重试第 " + retryCount + " 次");
            
            if (retryCount >= maxRetry) {
                // 发送失败告警
                sendAlert("RocketMQ发送失败: " + e.getMessage());
            }
        }
    }
}

本地事务处理

import org.apache.rocketmq.client.producer.TransactionListener;
import org.apache.rocketmq.client.producer.TransactionSendResult;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.common.message.MessageExt;

/**
 * 本地事务处理示例
 */
public class MyTransactionListener implements TransactionListener {

    @Override
    public LocalTransactionState executeLocalTransaction(Message msg, Object arg) {
        try {
            // 执行本地业务(如数据库操作)
            String orderId = (String) arg;
            boolean success = orderService.createOrder(orderId);
            
            if (success) {
                return LocalTransactionState.COMMIT_MESSAGE;
            } else {
                return LocalTransactionState.ROLLBACK_MESSAGE;
            }
        } catch (Exception e) {
            return LocalTransactionState.UNKNOW;
        }
    }

    @Override
    public LocalTransactionState checkLocalTransaction(MessageExt msg) {
        // 事务状态回查
        String orderId = msg.getUserProperty("orderId");
        boolean committed = orderService.isOrderCommitted(orderId);
        
        if (committed) {
            return LocalTransactionState.COMMIT_MESSAGE;
        } else {
            return LocalTransactionState.ROLLBACK_MESSAGE;
        }
    }
}

最佳实践

1. 消息标识

/**
 * 消息标识最佳实践
 */
public void sendWithProperKey() {
    // Key 应该具有业务含义,便于排查问题
    Message message = new Message(
        "ORDER_TOPIC",
        "order:create",
        order.getOrderId(),  // 订单ID作为Key
        order.toString().getBytes()
    );
    
    // 不要使用随机UUID作为Key
    // message.setKeys(UUID.randomUUID().toString()); // ❌
    
    // 设置Tag
    message.setTags("order:create");
    
    // 设置延迟级别
    // message.setDelayTimeLevel(1); // 1秒后投递
}

2. 批量发送优化

/**
 * 批量发送优化
 */
public void batchSendOptimized() throws MQClientException {
    List<Message> batch = new ArrayList<>();
    
    for (Order order : orders) {
        Message message = new Message(
            "ORDER_TOPIC",
            order.getTag(),
            order.getOrderId(),
            order.toString().getBytes()
        );
        batch.add(message);
        
        // 每100条发送一次
        if (batch.size() >= 100) {
            producer.send(batch);
            batch.clear();
        }
    }
    
    // 发送剩余消息
    if (!batch.isEmpty()) {
        producer.send(batch);
    }
}

3. 发送监控

import io.opentelemetry.api.trace.Tracer;

/**
 * 发送监控
 */
public SendResult sendWithTracing(Message message) {
    // 添加链路追踪
    Span span = tracer.spanBuilder("rocketmq-send")
        .setAttribute("messaging.system", "rocketmq")
        .setAttribute("messaging.destination", message.getTopic())
        .setAttribute("messaging.operation", "send")
        .startSpan();
    
    try {
        SendResult result = producer.send(message);
        
        span.setAttribute("messaging.message_id", result.getMsgId());
        span.setAttribute("messaging.rocketmq.queue_id", 
            result.getMessageQueue().getQueueId());
        
        return result;
    } catch (Exception e) {
        span.recordException(e);
        throw e;
    } finally {
        span.end();
    }
}

下一步

接下来让我们学习 RocketMQ 消费者。

👉 消费者