第四章:生产者
深入了解 RocketMQ 生产者的使用,包括发送模式、消息配置和最佳实践。
最后更新: 2024-01-15
页面目录
RocketMQ 生产者
本章详细介绍 RocketMQ 生产者的使用和配置。
生产者概述
┌─────────────────────────────────────────────────────────────────┐
│ 生产者工作流程 │
├─────────────────────────────────────────────────────────────────┤
│ │
│ Producer │
│ │ │
│ │ 1. 获取 Topic 路由信息 │
│ │ ───────────────────────────────────────► Nameserver │
│ │ ◄─────────────────────────────────────── RouteInfo │
│ │ │
│ │ 2. 选择 Queue │
│ │ └─► SelectQueue() │
│ │ │
│ │ 3. 发送消息 │
│ │ ───────────────────────────────────────► Broker │
│ │ ◄─────────────────────────────────────── SendResult │
│ │ │
│ │ 4. 失败重试(可选) │
│ │ └─► 重试其他 Queue │
│ │ │
└─────────────────────────────────────────────────────────────────┘
发送模式
1. 同步发送
import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
@Service
public class OrderProducer {
@Autowired
private RocketMQTemplate rocketMQTemplate;
/**
* 同步发送 - 阻塞等待发送结果
*/
public void sendSyncMessage() {
// 创建消息
org.apache.rocketmq.common.message.Message message =
new org.apache.rocketmq.common.message.Message(
"ORDER_TOPIC", // Topic
"order:create", // Tag
"order_12345", // Key
"订单内容...".getBytes() // Body
);
// 发送消息
SendResult sendResult = rocketMQTemplate.getProducer()
.send(message);
System.out.println("发送状态: " + sendResult.getSendStatus());
System.out.println("消息ID: " + sendResult.getMsgId());
System.out.println("队列: " + sendResult.getMessageQueue().getQueueId());
}
/**
* 简化同步发送
*/
public void sendSimpleSync() {
// 直接发送对象
Order order = new Order("12345", "商品A", 100.0);
rocketMQTemplate.convertAndSend("ORDER_TOPIC:order:create", order);
}
}
2. 异步发送
import org.apache.rocketmq.client.producer.SendCallback;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.spring.core.RocketMQTemplate;
/**
* 异步发送 - 不阻塞,立即返回
*/
public void sendAsyncMessage() {
Message message = new Message(
"ORDER_TOPIC",
"order:pay",
"order_12345",
"支付内容...".getBytes()
);
// 设置异步回调
rocketMQTemplate.getProducer().send(message, new SendCallback() {
@Override
public void onSuccess(SendResult sendResult) {
System.out.println("发送成功: " + sendResult.getMsgId());
}
@Override
public void onException(Throwable e) {
System.err.println("发送失败: " + e.getMessage());
// 处理失败逻辑,如记录日志、重试等
}
});
// 异步发送不会阻塞,消息会在后台发送
System.out.println("消息已提交发送");
}
3. 单向发送
/**
* 单向发送 - 只管发送,不关心结果
* 性能最高,可靠性最低
*/
public void sendOneWayMessage() {
Message message = new Message(
"LOG_TOPIC",
"info",
"log_key",
"日志内容...".getBytes()
);
// 单向发送,无回调
rocketMQTemplate.getProducer().sendOneway(message);
System.out.println("消息已发送(单向)");
}
发送模式对比
| 模式 | 可靠性 | 性能 | 使用场景 |
|---|---|---|---|
| 同步发送 | ⭐⭐⭐⭐⭐ | ⭐⭐⭐ | 重要消息 |
| 异步发送 | ⭐⭐⭐⭐ | ⭐⭐⭐⭐ | 高并发请求 |
| 单向发送 | ⭐⭐ | ⭐⭐⭐⭐⭐ | 日志收集 |
消息配置
消息属性
/**
* 消息属性配置
*/
public void sendWithProperties() {
Message message = new Message(
"ORDER_TOPIC",
"order:create",
"order_12345",
"订单内容".getBytes()
);
// 设置消息属性
message.putUserProperty("orderId", "12345");
message.putUserProperty("userId", "1001");
message.putUserProperty("amount", "99.9");
message.putUserProperty("city", "北京");
// 设置延迟级别(消息级别)
// message.setDelayTimeLevel(3); // 1s, 5s, 10s, 30s, 1m, 2m...
SendResult result = rocketMQTemplate.getProducer().send(message);
}
生产者配置
# application.yml
rocketmq:
producer:
# 生产者组名
group: my-producer-group
# Nameserver 地址
namesrvAddr: 127.0.0.1:9876
# 发送超时时间(毫秒)
sendMsgTimeout: 3000
# 异步发送线程数
asyncSenderThreadCount: 5
# 消息压缩阈值
compressMsgBodyOverHowmuch: 4096
# 失败重试次数
retryTimesWhenSendFailed: 2
# 异步失败重试次数
retryTimesWhenSendAsyncFailed: 2
# 最大消息大小
maxMessageSize: 131072
# 是否允许发送事务消息
transactionEnabled: true
自定义 Producer
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.MQProducer;
import org.apache.rocketmq.client.producer.TransactionMQProducer;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class RocketMQConfig {
@Bean
public DefaultMQProducer defaultMQProducer() {
DefaultMQProducer producer = new DefaultMQProducer("my-producer-group");
producer.setNamesrvAddr("127.0.0.1:9876");
producer.setSendMsgTimeout(5000);
producer.setRetryTimesWhenSendFailed(3);
producer.setMaxMessageSize(4 * 1024 * 1024); // 4MB
try {
producer.start();
} catch (MQClientException e) {
throw new RuntimeException("启动 Producer 失败", e);
}
return producer;
}
@Bean
public TransactionMQProducer transactionMQProducer() {
TransactionMQProducer producer = new TransactionMQProducer("my-transaction-group");
producer.setNamesrvAddr("127.0.0.1:9876");
// 设置事务回查监听器
producer.setTransactionListener(new MyTransactionListener());
try {
producer.start();
} catch (MQClientException e) {
throw new RuntimeException("启动事务 Producer 失败", e);
}
return producer;
}
}
消息发送参数
消息选择策略
/**
* 自定义消息选择策略
*/
public void sendWithSelector() throws MQClientException {
// 消息
Message message = new Message(
"ORDER_TOPIC",
"order:create",
"order_12345",
"订单内容".getBytes()
);
// 自定义选择器 - 按订单金额选择队列
SendResult result = rocketMQTemplate.getProducer().send(
message,
new MessageQueueSelector() {
@Override
public MessageQueue select(List<MessageQueue> mqs,
Message msg,
Object arg) {
// arg 是 send 时传入的参数
Double amount = (Double) arg;
// 金额大于1000的走队列0-1
if (amount > 1000) {
return mqs.get(0);
}
// 金额在500-1000的走队列2-3
else if (amount > 500) {
return mqs.get(2);
}
// 其他走队列4+
else {
return mqs.get(mqs.size() - 1);
}
}
},
999.9 // 选择器参数
);
}
批量发送
import org.apache.rocketmq.client.producer.SendBatch;
import java.util.ArrayList;
import java.util.List;
/**
* 批量发送消息
*/
public void sendBatchMessage() throws MQClientException {
List<Message> messages = new ArrayList<>();
for (int i = 0; i < 100; i++) {
Message message = new Message(
"BATCH_TOPIC",
"batch:item",
"batch_key_" + i,
("批量消息 " + i).getBytes()
);
messages.add(message);
}
// 批量发送
SendResult result = rocketMQTemplate.getProducer().send(messages);
System.out.println("批量发送成功,数量: " + messages.size());
}
发送结果处理
SendResult
/**
* 发送结果解析
*/
public void handleSendResult(SendResult result) {
// 发送状态
SendStatus status = result.getSendStatus();
switch (status) {
case FLUSH_DISK_TIMEOUT:
System.out.println("刷新磁盘超时");
break;
case FLUSH_SLAVE_TIMEOUT:
System.out.println("同步从节点超时");
break;
case SLAVE_NOT_AVAILABLE:
System.out.println("从节点不可用");
break;
case SEND_OK:
System.out.println("发送成功");
break;
}
// 消息ID
String msgId = result.getMsgId();
System.out.println("消息ID: " + msgId);
// 消息队列信息
MessageQueue queue = result.getMessageQueue();
System.out.println("队列: " + queue.getQueueId());
System.out.println("偏移量: " + result.getQueueOffset());
}
错误处理
重试机制
/**
* 消息发送重试
*/
public void sendWithRetry() {
int maxRetry = 3;
int retryCount = 0;
SendResult result = null;
while (retryCount < maxRetry) {
try {
Message message = new Message(
"ORDER_TOPIC",
"order:create",
"order_12345",
"订单内容".getBytes()
);
result = rocketMQTemplate.getProducer().send(message);
if (result.getSendStatus() == SendStatus.SEND_OK) {
System.out.println("发送成功");
break;
}
} catch (Exception e) {
retryCount++;
System.out.println("发送失败,重试第 " + retryCount + " 次");
if (retryCount >= maxRetry) {
// 发送失败告警
sendAlert("RocketMQ发送失败: " + e.getMessage());
}
}
}
}
本地事务处理
import org.apache.rocketmq.client.producer.TransactionListener;
import org.apache.rocketmq.client.producer.TransactionSendResult;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.common.message.MessageExt;
/**
* 本地事务处理示例
*/
public class MyTransactionListener implements TransactionListener {
@Override
public LocalTransactionState executeLocalTransaction(Message msg, Object arg) {
try {
// 执行本地业务(如数据库操作)
String orderId = (String) arg;
boolean success = orderService.createOrder(orderId);
if (success) {
return LocalTransactionState.COMMIT_MESSAGE;
} else {
return LocalTransactionState.ROLLBACK_MESSAGE;
}
} catch (Exception e) {
return LocalTransactionState.UNKNOW;
}
}
@Override
public LocalTransactionState checkLocalTransaction(MessageExt msg) {
// 事务状态回查
String orderId = msg.getUserProperty("orderId");
boolean committed = orderService.isOrderCommitted(orderId);
if (committed) {
return LocalTransactionState.COMMIT_MESSAGE;
} else {
return LocalTransactionState.ROLLBACK_MESSAGE;
}
}
}
最佳实践
1. 消息标识
/**
* 消息标识最佳实践
*/
public void sendWithProperKey() {
// Key 应该具有业务含义,便于排查问题
Message message = new Message(
"ORDER_TOPIC",
"order:create",
order.getOrderId(), // 订单ID作为Key
order.toString().getBytes()
);
// 不要使用随机UUID作为Key
// message.setKeys(UUID.randomUUID().toString()); // ❌
// 设置Tag
message.setTags("order:create");
// 设置延迟级别
// message.setDelayTimeLevel(1); // 1秒后投递
}
2. 批量发送优化
/**
* 批量发送优化
*/
public void batchSendOptimized() throws MQClientException {
List<Message> batch = new ArrayList<>();
for (Order order : orders) {
Message message = new Message(
"ORDER_TOPIC",
order.getTag(),
order.getOrderId(),
order.toString().getBytes()
);
batch.add(message);
// 每100条发送一次
if (batch.size() >= 100) {
producer.send(batch);
batch.clear();
}
}
// 发送剩余消息
if (!batch.isEmpty()) {
producer.send(batch);
}
}
3. 发送监控
import io.opentelemetry.api.trace.Tracer;
/**
* 发送监控
*/
public SendResult sendWithTracing(Message message) {
// 添加链路追踪
Span span = tracer.spanBuilder("rocketmq-send")
.setAttribute("messaging.system", "rocketmq")
.setAttribute("messaging.destination", message.getTopic())
.setAttribute("messaging.operation", "send")
.startSpan();
try {
SendResult result = producer.send(message);
span.setAttribute("messaging.message_id", result.getMsgId());
span.setAttribute("messaging.rocketmq.queue_id",
result.getMessageQueue().getQueueId());
return result;
} catch (Exception e) {
span.recordException(e);
throw e;
} finally {
span.end();
}
}
下一步
接下来让我们学习 RocketMQ 消费者。
👉 消费者