第八章:事务消息
深入了解 RocketMQ 事务消息的实现原理和使用方法,实现分布式事务。
最后更新: 2024-01-15
页面目录
RocketMQ 事务消息
事务消息用于实现分布式事务,确保本地业务和消息发送的原子性。
事务消息概述
什么是事务消息
┌─────────────────────────────────────────────────────────────────┐
│ 事务消息原理 │
├─────────────────────────────────────────────────────────────────┤
│ │
│ ┌─────────────────────────────────────────────────────────┐ │
│ │ RocketMQ 事务消息 │ │
│ │ │ │
│ │ ┌─────────┐ ┌─────────┐ ┌─────────┐ │ │
│ │ │ Phase1 │ ──► │ Phase2 │ ──► │ Phase3 │ │ │
│ │ │ Prepare │ │ Commit │ │ Confirm │ │ │
│ │ └─────────┘ └─────────┘ └─────────┘ │ │
│ │ │ │
│ └─────────────────────────────────────────────────────────┘ │
│ │
│ Phase1: 发送半消息(Prepare),本地事务未提交 │
│ Phase2: 执行本地事务,根据结果提交或回滚 │
│ Phase3: 如果本地事务超时/失败,Broker 回查事务状态 │
│ │
└─────────────────────────────────────────────────────────────────┘
事务消息流程
┌─────────────────────────────────────────────────────────────────┐
│ 事务消息完整流程 │
├─────────────────────────────────────────────────────────────────┤
│ │
│ Producer Broker Consumer │
│ │ │ │ │
│ │ 1. 发送半消息 │ │ │
│ │ ───────────────►│ │ │
│ │ │ │ │
│ │ 2. 返回成功 │ │ │
│ │ ◄───────────────│ │ │
│ │ │ │ │
│ │ 3. 执行本地事务 │ │ │
│ │ ◄───────────────│ │ │
│ │ │ │ │
│ │ 4. 提交/回滚 │ │ │
│ │ ────────────────►│ │ │
│ │ │ │ │
│ │ 5. 提交成功 │ │ │
│ │ │ 6. 投递消息 │ │
│ │ │───────────────────►│ │
│ │ │ │ │
│ │ (超时/失败) │ │ │
│ │ │ 7. 回查事务状态 │ │
│ │ ◄────────────────│ │ │
│ │ │ │ │
│ │ 8. 返回事务状态 │ │ │
│ │ ────────────────►│ │ │
│ │
└─────────────────────────────────────────────────────────────────┘
实现原理
两阶段提交
┌─────────────────────────────────────────────────────────────────┐
│ 两阶段提交 │
├─────────────────────────────────────────────────────────────────┤
│ │
│ 第一阶段: Prepare (准备阶段) │
│ ┌─────────────────────────────────────────────────────────┐ │
│ │ 1. 发送半消息到 Broker │ │
│ │ 2. 半消息标记为"暂不可投递" │ │
│ │ 3. 返回发送成功 │ │
│ │ 4. 执行本地业务 │ │
│ └─────────────────────────────────────────────────────────┘ │
│ │ │
│ ▼ │
│ 第二阶段: Commit/Rollback (提交/回滚) │
│ ┌─────────────────────────────────────────────────────────┐ │
│ │ 本地事务成功: │ │
│ │ • 提交半消息 │ │
│ │ • Broker 投递消息给消费者 │ │
│ │ │ │
│ │ 本地事务失败: │ │
│ │ • 回滚半消息 │ │
│ │ • 消息被丢弃 │ │
│ └─────────────────────────────────────────────────────────┘ │
│ │
└─────────────────────────────────────────────────────────────────┘
事务状态回查
┌─────────────────────────────────────────────────────────────────┐
│ 事务状态回查 │
├─────────────────────────────────────────────────────────────────┤
│ │
│ 场景: 本地事务执行后,提交/回滚消息丢失 │
│ │
│ Broker │
│ │ │
│ │ 1. 等待提交/回滚超时 (默认 6s) │
│ │ ▼ │
│ │ 2. 调用 TransactionListener.checkLocalTransaction │
│ │ ▼ │
│ │ Producer │
│ │ ├── 本地事务已提交 ──► 返回 COMMIT_MESSAGE │
│ │ ├── 本地事务已回滚 ──► 返回 ROLLBACK_MESSAGE │
│ │ └── 未知状态 ──► 返回 UNKNOW (再次回查) │
│ │ │
│ │ 3. 根据返回状态决定消息命运 │
│ │ │
└─────────────────────────────────────────────────────────────────┘
Java 实现
事务监听器
import org.apache.rocketmq.client.producer.TransactionListener;
import org.apache.rocketmq.client.producer.TransactionMQProducer;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.common.message.MessageExt;
/**
* 事务消息监听器实现
*/
public class OrderTransactionListener implements TransactionListener {
@Autowired
private OrderService orderService;
@Autowired
private AccountService accountService;
/**
* 执行本地事务
*/
@Override
public LocalTransactionState executeLocalTransaction(Message msg, Object arg) {
String orderId = (String) arg;
try {
// 1. 创建订单(本地事务)
Order order = new Order();
order.setOrderId(orderId);
order.setStatus("PENDING");
orderService.createOrder(order);
// 2. 扣减账户余额(本地事务)
accountService.deductBalance(order.getUserId(), order.getAmount());
// 3. 提交本地事务成功
return LocalTransactionState.COMMIT_MESSAGE;
} catch (Exception e) {
// 本地事务失败,回滚
log.error("执行本地事务失败, orderId: {}", orderId, e);
return LocalTransactionState.ROLLBACK_MESSAGE;
}
}
/**
* 回查事务状态
*/
@Override
public LocalTransactionState checkLocalTransaction(MessageExt msg) {
String orderId = msg.getUserProperty("orderId");
try {
// 查询订单状态
Order order = orderService.getOrderById(orderId);
if (order == null) {
// 订单不存在,回滚
return LocalTransactionState.ROLLBACK_MESSAGE;
}
switch (order.getStatus()) {
case "COMPLETED":
// 订单已完成,提交消息
return LocalTransactionState.COMMIT_MESSAGE;
case "PENDING":
// 订单处理中,回查未知
return LocalTransactionState.UNKNOW;
case "CANCELLED":
// 订单已取消,回滚
return LocalTransactionState.ROLLBACK_MESSAGE;
default:
return LocalTransactionState.UNKNOW;
}
} catch (Exception e) {
log.error("回查事务状态失败, orderId: {}", orderId, e);
return LocalTransactionState.UNKNOW;
}
}
}
事务生产者
import org.apache.rocketmq.client.producer.TransactionMQProducer;
import org.apache.rocketmq.common.message.Message;
/**
* 事务消息生产者
*/
public class TransactionProducer {
private TransactionMQProducer producer;
public void init() {
producer = new TransactionMQProducer("transaction-producer-group");
producer.setNamesrvAddr("127.0.0.1:9876");
// 设置事务监听器
producer.setTransactionListener(new OrderTransactionListener());
// 设置线程池(用于执行本地事务)
ExecutorService executor = Executors.newFixedThreadPool(10);
producer.setExecutorServiceExecutors(executor);
producer.start();
}
/**
* 发送事务消息
*/
public void sendTransactionMessage(Order order) throws Exception {
String orderId = order.getOrderId();
// 创建消息
Message message = new Message(
"ORDER_TOPIC",
"order:create",
orderId,
JSON.toJSONString(order).getBytes()
);
// 设置事务ID
message.putUserProperty("orderId", orderId);
message.putUserProperty("userId", order.getUserId());
message.putUserProperty("amount", String.valueOf(order.getAmount()));
// 发送事务消息
// arg 参数会传递给 executeLocalTransaction
TransactionSendResult result = producer.sendMessageInTransaction(
message,
orderId // 作为本地事务的标识
);
System.out.println("发送事务消息结果: " + result.getSendStatus());
// 处理发送结果
if (result.getSendStatus() == SendStatus.SEND_OK) {
System.out.println("事务消息发送成功, transactionId: " + result.getTransactionId());
}
}
}
Spring Boot 集成
import org.apache.rocketmq.spring.core.RocketMQLocalTransactionListener;
import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.apache.rocketmq.spring.support.RocketMQHeaders;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.messaging.Message;
import org.springframework.stereotype.Service;
/**
* 事务消息服务
*/
@Service
public class TransactionalOrderService {
@Autowired
private RocketMQTemplate rocketMQTemplate;
/**
* 发送事务消息
*/
public void sendTransactionalMessage(Order order) {
// 设置事务ID
order.setTransactionId(UUID.randomUUID().toString());
// 使用 convertAndSend 发送
rocketMQTemplate.asyncSend("ORDER_TOPIC:order:create",
order,
new SendCallback() {
@Override
public void onSuccess(SendResult sendResult) {
System.out.println("发送成功");
}
@Override
public void onException(Throwable e) {
System.out.println("发送失败: " + e.getMessage());
}
});
}
}
/**
* 事务监听器
*/
@Component
@RocketMQTransactionListener
public class OrderTransactionListenerImpl implements RocketMQLocalTransactionListener {
@Autowired
private OrderService orderService;
/**
* 执行本地事务
*/
@Override
public RocketMQLocalTransactionState executeLocalTransaction(
Message msg, Object arg) {
try {
Order order = (Order) arg;
// 创建订单
orderService.createOrder(order);
// 扣减余额
accountService.deduct(order.getUserId(), order.getAmount());
return RocketMQLocalTransactionState.COMMIT;
} catch (Exception e) {
log.error("本地事务执行失败", e);
return RocketMQLocalTransactionState.ROLLBACK;
}
}
/**
* 回查事务状态
*/
@Override
public RocketMQLocalTransactionState checkLocalTransaction(
Message msg) {
String orderId = msg.getHeaders().get("orderId", String.class);
Order order = orderService.getOrderById(orderId);
if (order != null && "COMPLETED".equals(order.getStatus())) {
return RocketMQLocalTransactionState.COMMIT;
}
return RocketMQLocalTransactionState.UNKNOWN;
}
}
应用场景
1. 订单支付
┌─────────────────────────────────────────────────────────────────┐
│ 订单支付场景 │
├─────────────────────────────────────────────────────────────────┤
│ │
│ 用户支付订单: │
│ │
│ 1. 发送事务消息(创建订单) │
│ 2. 本地事务: │
│ ├── 创建订单记录 (status=PENDING) │
│ └── 冻结用户余额 │
│ 3. 提交事务: │
│ └── 如果本地成功,提交消息,扣款 │
│ │
│ 消息投递后: │
│ ├── 商家系统收到消息 → 准备发货 │
│ └── 积分系统收到消息 → 增加积分 │
│ │
└─────────────────────────────────────────────────────────────────┘
2. 数据同步
/**
* 数据同步场景
*/
public void syncDataWithTransaction(Data data) {
Message message = new Message(
"SYNC_TOPIC",
"data:sync",
data.getId(),
JSON.toJSONString(data).getBytes()
);
// 发送事务消息
producer.sendMessageInTransaction(message, data, new SendCallback() {
@Override
public void onSuccess(SendResult result) {
// 记录同步成功
syncService.recordSyncSuccess(data.getId());
}
@Override
public void onException(Throwable e) {
// 记录同步失败
syncService.recordSyncFailed(data.getId());
}
});
}
最佳实践
1. 幂等处理
/**
* 事务消息幂等性
*/
public class IdempotentTransactionListener implements TransactionListener {
@Override
public LocalTransactionState executeLocalTransaction(Message msg, Object arg) {
String orderId = (String) arg;
// 幂等检查
if (orderService.isOrderExists(orderId)) {
return LocalTransactionState.COMMIT_MESSAGE;
}
// 执行本地事务
orderService.createOrder(orderId);
return LocalTransactionState.COMMIT_MESSAGE;
}
@Override
public LocalTransactionState checkLocalTransaction(MessageExt msg) {
// 查询本地事务状态
String orderId = msg.getUserProperty("orderId");
if (orderService.isOrderCompleted(orderId)) {
return LocalTransactionState.COMMIT_MESSAGE;
}
return LocalTransactionState.UNKNOW;
}
}
2. 异常处理
/**
* 事务消息异常处理
*/
public class TransactionExceptionHandler {
public LocalTransactionState handleException(
Message msg,
Object arg,
Exception e) {
log.error("执行本地事务异常, orderId: {}", arg, e);
// 记录异常
alertService.sendAlert("事务消息处理失败: " + e.getMessage());
// 根据异常类型决定回滚还是未知
if (e instanceof BizException) {
// 业务异常,回滚
return LocalTransactionState.ROLLBACK_MESSAGE;
} else if (e instanceof TimeoutException) {
// 超时,返回未知让Broker回查
return LocalTransactionState.UNKNOW;
} else {
// 其他异常,返回未知
return LocalTransactionState.UNKNOW;
}
}
}
3. 配置优化
/**
* 事务消息配置
*/
public class TransactionProducerConfig {
public TransactionMQProducer createProducer() {
TransactionMQProducer producer = new TransactionMQProducer(
"transaction-producer-group"
);
producer.setNamesrvAddr("127.0.0.1:9876");
// 事务回查线程数
producer.setCheckThreadPoolMinSize(5);
producer.setCheckThreadPoolMaxSize(20);
// 队列数
producer.setCheckRequestHoldMax(2000);
// 事务超时时间 (ms)
producer.setTransactionTimeout(6000);
producer.setTransactionListener(new OrderTransactionListener());
return producer;
}
}
常见问题
1. 消息重复消费
// 事务消息可能重复投递,消费者需要做幂等处理
@RocketMQMessageListener(
topic = "ORDER_TOPIC",
consumerGroup = "order-consumer-group"
)
public class IdempotentConsumer implements RocketMQListener<Order> {
@Override
public void onMessage(Order order) {
// 幂等检查
if (orderService.isProcessed(order.getOrderId())) {
return;
}
// 处理订单
orderService.processOrder(order);
// 标记已处理
orderService.markProcessed(order.getOrderId());
}
}
2. 事务超时
// 配置事务超时时间
// broker.conf
transactionTimeout = 6000 // 6秒
// 或在代码中设置
producer.setTransactionTimeout(6000);
下一步
接下来让我们学习消息过滤。
👉 消息过滤