第八章:事务消息

深入了解 RocketMQ 事务消息的实现原理和使用方法,实现分布式事务。

最后更新: 2024-01-15
页面目录

RocketMQ 事务消息

事务消息用于实现分布式事务,确保本地业务和消息发送的原子性。

事务消息概述

什么是事务消息

┌─────────────────────────────────────────────────────────────────┐
│                      事务消息原理                                 │
├─────────────────────────────────────────────────────────────────┤
│                                                                  │
│   ┌─────────────────────────────────────────────────────────┐   │
│   │                   RocketMQ 事务消息                       │   │
│   │                                                         │   │
│   │   ┌─────────┐     ┌─────────┐     ┌─────────┐         │   │
│   │   │  Phase1 │ ──► │  Phase2 │ ──► │  Phase3 │         │   │
│   │   │ Prepare │     │ Commit  │     │ Confirm │         │   │
│   │   └─────────┘     └─────────┘     └─────────┘         │   │
│   │                                                         │   │
│   └─────────────────────────────────────────────────────────┘   │
│                                                                  │
│   Phase1: 发送半消息(Prepare),本地事务未提交                    │
│   Phase2: 执行本地事务,根据结果提交或回滚                       │
│   Phase3: 如果本地事务超时/失败,Broker 回查事务状态             │
│                                                                  │
└─────────────────────────────────────────────────────────────────┘

事务消息流程

┌─────────────────────────────────────────────────────────────────┐
│                      事务消息完整流程                             │
├─────────────────────────────────────────────────────────────────┤
│                                                                  │
│   Producer           Broker              Consumer                │
│      │                 │                   │                    │
│      │ 1. 发送半消息    │                   │                    │
│      │ ───────────────►│                   │                    │
│      │                 │                   │                    │
│      │ 2. 返回成功      │                   │                    │
│      │ ◄───────────────│                   │                    │
│      │                 │                   │                    │
│      │ 3. 执行本地事务  │                   │                    │
│      │ ◄───────────────│                   │                    │
│      │                 │                   │                    │
│      │ 4. 提交/回滚     │                   │                    │
│      │ ────────────────►│                   │                    │
│      │                 │                   │                    │
│      │ 5. 提交成功      │                   │                    │
│      │                 │ 6. 投递消息        │                    │
│      │                 │───────────────────►│                    │
│      │                 │                   │                    │
│      │     (超时/失败)    │                   │                    │
│      │                 │ 7. 回查事务状态     │                    │
│      │ ◄────────────────│                   │                    │
│      │                 │                   │                    │
│      │ 8. 返回事务状态   │                   │                    │
│      │ ────────────────►│                   │                    │
│                                                                  │
└─────────────────────────────────────────────────────────────────┘

实现原理

两阶段提交

┌─────────────────────────────────────────────────────────────────┐
│                      两阶段提交                                   │
├─────────────────────────────────────────────────────────────────┤
│                                                                  │
│   第一阶段: Prepare (准备阶段)                                  │
│   ┌─────────────────────────────────────────────────────────┐   │
│   │ 1. 发送半消息到 Broker                                  │   │
│   │ 2. 半消息标记为"暂不可投递"                              │   │
│   │ 3. 返回发送成功                                          │   │
│   │ 4. 执行本地业务                                           │   │
│   └─────────────────────────────────────────────────────────┘   │
│                              │                                   │
│                              ▼                                   │
│   第二阶段: Commit/Rollback (提交/回滚)                          │
│   ┌─────────────────────────────────────────────────────────┐   │
│   │ 本地事务成功:                                            │   │
│   │    • 提交半消息                                          │   │
│   │    • Broker 投递消息给消费者                            │   │
│   │                                                         │   │
│   │ 本地事务失败:                                            │   │
│   │    • 回滚半消息                                          │   │
│   │    • 消息被丢弃                                          │   │
│   └─────────────────────────────────────────────────────────┘   │
│                                                                  │
└─────────────────────────────────────────────────────────────────┘

事务状态回查

┌─────────────────────────────────────────────────────────────────┐
│                      事务状态回查                                │
├─────────────────────────────────────────────────────────────────┤
│                                                                  │
│   场景: 本地事务执行后,提交/回滚消息丢失                        │
│                                                                  │
│   Broker                                                        │
│      │                                                         │
│      │ 1. 等待提交/回滚超时 (默认 6s)                          │
│      │ ▼                                                         │
│      │ 2. 调用 TransactionListener.checkLocalTransaction      │
│      │ ▼                                                         │
│      │    Producer                                              │
│      │    ├── 本地事务已提交 ──► 返回 COMMIT_MESSAGE          │
│      │    ├── 本地事务已回滚 ──► 返回 ROLLBACK_MESSAGE        │
│      │    └── 未知状态     ──► 返回 UNKNOW (再次回查)         │
│      │                                                         │
│      │ 3. 根据返回状态决定消息命运                              │
│      │                                                         │
└─────────────────────────────────────────────────────────────────┘

Java 实现

事务监听器

import org.apache.rocketmq.client.producer.TransactionListener;
import org.apache.rocketmq.client.producer.TransactionMQProducer;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.common.message.MessageExt;

/**
 * 事务消息监听器实现
 */
public class OrderTransactionListener implements TransactionListener {

    @Autowired
    private OrderService orderService;
    
    @Autowired
    private AccountService accountService;

    /**
     * 执行本地事务
     */
    @Override
    public LocalTransactionState executeLocalTransaction(Message msg, Object arg) {
        String orderId = (String) arg;
        
        try {
            // 1. 创建订单(本地事务)
            Order order = new Order();
            order.setOrderId(orderId);
            order.setStatus("PENDING");
            orderService.createOrder(order);
            
            // 2. 扣减账户余额(本地事务)
            accountService.deductBalance(order.getUserId(), order.getAmount());
            
            // 3. 提交本地事务成功
            return LocalTransactionState.COMMIT_MESSAGE;
            
        } catch (Exception e) {
            // 本地事务失败,回滚
            log.error("执行本地事务失败, orderId: {}", orderId, e);
            return LocalTransactionState.ROLLBACK_MESSAGE;
        }
    }

    /**
     * 回查事务状态
     */
    @Override
    public LocalTransactionState checkLocalTransaction(MessageExt msg) {
        String orderId = msg.getUserProperty("orderId");
        
        try {
            // 查询订单状态
            Order order = orderService.getOrderById(orderId);
            
            if (order == null) {
                // 订单不存在,回滚
                return LocalTransactionState.ROLLBACK_MESSAGE;
            }
            
            switch (order.getStatus()) {
                case "COMPLETED":
                    // 订单已完成,提交消息
                    return LocalTransactionState.COMMIT_MESSAGE;
                case "PENDING":
                    // 订单处理中,回查未知
                    return LocalTransactionState.UNKNOW;
                case "CANCELLED":
                    // 订单已取消,回滚
                    return LocalTransactionState.ROLLBACK_MESSAGE;
                default:
                    return LocalTransactionState.UNKNOW;
            }
            
        } catch (Exception e) {
            log.error("回查事务状态失败, orderId: {}", orderId, e);
            return LocalTransactionState.UNKNOW;
        }
    }
}

事务生产者

import org.apache.rocketmq.client.producer.TransactionMQProducer;
import org.apache.rocketmq.common.message.Message;

/**
 * 事务消息生产者
 */
public class TransactionProducer {

    private TransactionMQProducer producer;

    public void init() {
        producer = new TransactionMQProducer("transaction-producer-group");
        producer.setNamesrvAddr("127.0.0.1:9876");
        
        // 设置事务监听器
        producer.setTransactionListener(new OrderTransactionListener());
        
        // 设置线程池(用于执行本地事务)
        ExecutorService executor = Executors.newFixedThreadPool(10);
        producer.setExecutorServiceExecutors(executor);
        
        producer.start();
    }

    /**
     * 发送事务消息
     */
    public void sendTransactionMessage(Order order) throws Exception {
        String orderId = order.getOrderId();
        
        // 创建消息
        Message message = new Message(
            "ORDER_TOPIC",
            "order:create",
            orderId,
            JSON.toJSONString(order).getBytes()
        );
        
        // 设置事务ID
        message.putUserProperty("orderId", orderId);
        message.putUserProperty("userId", order.getUserId());
        message.putUserProperty("amount", String.valueOf(order.getAmount()));
        
        // 发送事务消息
        // arg 参数会传递给 executeLocalTransaction
        TransactionSendResult result = producer.sendMessageInTransaction(
            message,
            orderId  // 作为本地事务的标识
        );
        
        System.out.println("发送事务消息结果: " + result.getSendStatus());
        
        // 处理发送结果
        if (result.getSendStatus() == SendStatus.SEND_OK) {
            System.out.println("事务消息发送成功, transactionId: " + result.getTransactionId());
        }
    }
}

Spring Boot 集成

import org.apache.rocketmq.spring.core.RocketMQLocalTransactionListener;
import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.apache.rocketmq.spring.support.RocketMQHeaders;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.messaging.Message;
import org.springframework.stereotype.Service;

/**
 * 事务消息服务
 */
@Service
public class TransactionalOrderService {

    @Autowired
    private RocketMQTemplate rocketMQTemplate;

    /**
     * 发送事务消息
     */
    public void sendTransactionalMessage(Order order) {
        // 设置事务ID
        order.setTransactionId(UUID.randomUUID().toString());
        
        // 使用 convertAndSend 发送
        rocketMQTemplate.asyncSend("ORDER_TOPIC:order:create",
            order,
            new SendCallback() {
                @Override
                public void onSuccess(SendResult sendResult) {
                    System.out.println("发送成功");
                }

                @Override
                public void onException(Throwable e) {
                    System.out.println("发送失败: " + e.getMessage());
                }
            });
    }
}

/**
 * 事务监听器
 */
@Component
@RocketMQTransactionListener
public class OrderTransactionListenerImpl implements RocketMQLocalTransactionListener {

    @Autowired
    private OrderService orderService;

    /**
     * 执行本地事务
     */
    @Override
    public RocketMQLocalTransactionState executeLocalTransaction(
            Message msg, Object arg) {
        
        try {
            Order order = (Order) arg;
            
            // 创建订单
            orderService.createOrder(order);
            
            // 扣减余额
            accountService.deduct(order.getUserId(), order.getAmount());
            
            return RocketMQLocalTransactionState.COMMIT;
            
        } catch (Exception e) {
            log.error("本地事务执行失败", e);
            return RocketMQLocalTransactionState.ROLLBACK;
        }
    }

    /**
     * 回查事务状态
     */
    @Override
    public RocketMQLocalTransactionState checkLocalTransaction(
            Message msg) {
        
        String orderId = msg.getHeaders().get("orderId", String.class);
        
        Order order = orderService.getOrderById(orderId);
        
        if (order != null && "COMPLETED".equals(order.getStatus())) {
            return RocketMQLocalTransactionState.COMMIT;
        }
        
        return RocketMQLocalTransactionState.UNKNOWN;
    }
}

应用场景

1. 订单支付

┌─────────────────────────────────────────────────────────────────┐
│                      订单支付场景                                 │
├─────────────────────────────────────────────────────────────────┤
│                                                                  │
│   用户支付订单:                                                  │
│                                                                  │
│   1. 发送事务消息(创建订单)                                     │
│   2. 本地事务:                                                  │
│      ├── 创建订单记录 (status=PENDING)                          │
│      └── 冻结用户余额                                           │
│   3. 提交事务:                                                  │
│      └── 如果本地成功,提交消息,扣款                            │
│                                                                  │
│   消息投递后:                                                    │
│   ├── 商家系统收到消息 → 准备发货                               │
│   └── 积分系统收到消息 → 增加积分                                │
│                                                                  │
└─────────────────────────────────────────────────────────────────┘

2. 数据同步

/**
 * 数据同步场景
 */
public void syncDataWithTransaction(Data data) {
    Message message = new Message(
        "SYNC_TOPIC",
        "data:sync",
        data.getId(),
        JSON.toJSONString(data).getBytes()
    );
    
    // 发送事务消息
    producer.sendMessageInTransaction(message, data, new SendCallback() {
        @Override
        public void onSuccess(SendResult result) {
            // 记录同步成功
            syncService.recordSyncSuccess(data.getId());
        }

        @Override
        public void onException(Throwable e) {
            // 记录同步失败
            syncService.recordSyncFailed(data.getId());
        }
    });
}

最佳实践

1. 幂等处理

/**
 * 事务消息幂等性
 */
public class IdempotentTransactionListener implements TransactionListener {

    @Override
    public LocalTransactionState executeLocalTransaction(Message msg, Object arg) {
        String orderId = (String) arg;
        
        // 幂等检查
        if (orderService.isOrderExists(orderId)) {
            return LocalTransactionState.COMMIT_MESSAGE;
        }
        
        // 执行本地事务
        orderService.createOrder(orderId);
        
        return LocalTransactionState.COMMIT_MESSAGE;
    }

    @Override
    public LocalTransactionState checkLocalTransaction(MessageExt msg) {
        // 查询本地事务状态
        String orderId = msg.getUserProperty("orderId");
        
        if (orderService.isOrderCompleted(orderId)) {
            return LocalTransactionState.COMMIT_MESSAGE;
        }
        
        return LocalTransactionState.UNKNOW;
    }
}

2. 异常处理

/**
 * 事务消息异常处理
 */
public class TransactionExceptionHandler {

    public LocalTransactionState handleException(
            Message msg, 
            Object arg, 
            Exception e) {
        
        log.error("执行本地事务异常, orderId: {}", arg, e);
        
        // 记录异常
        alertService.sendAlert("事务消息处理失败: " + e.getMessage());
        
        // 根据异常类型决定回滚还是未知
        if (e instanceof BizException) {
            // 业务异常,回滚
            return LocalTransactionState.ROLLBACK_MESSAGE;
        } else if (e instanceof TimeoutException) {
            // 超时,返回未知让Broker回查
            return LocalTransactionState.UNKNOW;
        } else {
            // 其他异常,返回未知
            return LocalTransactionState.UNKNOW;
        }
    }
}

3. 配置优化

/**
 * 事务消息配置
 */
public class TransactionProducerConfig {

    public TransactionMQProducer createProducer() {
        TransactionMQProducer producer = new TransactionMQProducer(
            "transaction-producer-group"
        );
        
        producer.setNamesrvAddr("127.0.0.1:9876");
        
        // 事务回查线程数
        producer.setCheckThreadPoolMinSize(5);
        producer.setCheckThreadPoolMaxSize(20);
        
        // 队列数
        producer.setCheckRequestHoldMax(2000);
        
        // 事务超时时间 (ms)
        producer.setTransactionTimeout(6000);
        
        producer.setTransactionListener(new OrderTransactionListener());
        
        return producer;
    }
}

常见问题

1. 消息重复消费

// 事务消息可能重复投递,消费者需要做幂等处理
@RocketMQMessageListener(
    topic = "ORDER_TOPIC",
    consumerGroup = "order-consumer-group"
)
public class IdempotentConsumer implements RocketMQListener<Order> {

    @Override
    public void onMessage(Order order) {
        // 幂等检查
        if (orderService.isProcessed(order.getOrderId())) {
            return;
        }
        
        // 处理订单
        orderService.processOrder(order);
        
        // 标记已处理
        orderService.markProcessed(order.getOrderId());
    }
}

2. 事务超时

// 配置事务超时时间
// broker.conf
transactionTimeout = 6000  // 6秒

// 或在代码中设置
producer.setTransactionTimeout(6000);

下一步

接下来让我们学习消息过滤。

👉 消息过滤