从零构建一个支持崩溃恢复的SQL两阶段提交事务协调器


在分布式系统中,跨多个独立数据库执行原子性操作是一个无法回避的难题。一个常见的场景是,订单服务需要在订单库中创建记录,同时库存服务必须在库存库中扣减相应数量。如果其中一个操作失败,整个业务活动必须回滚,就像从未发生过一样。一个天真的实现可能如下:

// 这是一个错误且危险的示例
public void createOrderUnsafe(Connection orderDbConn, Connection inventoryDbConn, Order order, Sku sku) {
    try {
        // 1. 关闭自动提交
        orderDbConn.setAutoCommit(false);
        inventoryDbConn.setAutoCommit(false);

        // 2. 执行各自的本地事务
        orderDao.create(orderDbConn, order);
        inventoryDao.decreaseStock(inventoryDbConn, sku);

        // 3. 尝试提交两个事务
        orderDbConn.commit();
        inventoryDbConn.commit(); // 如果这里失败怎么办?订单库已经提交了!

    } catch (SQLException e) {
        // 尝试回滚,但无法保证两个都成功,或者一个已经提交了
        orderDbConn.rollback();
        inventoryDbConn.rollback();
    }
}

这段代码的致命缺陷在于,orderDbConn.commit() 成功后,如果系统崩溃或 inventoryDbConn.commit() 失败,订单数据将永久存在,而库存并未扣减,造成数据不一致。这正是两阶段提交(Two-Phase Commit, 2PC)协议要解决的核心问题:确保一组分布式事务参与者(在这里是两个SQL数据库)最终达成一致的原子性决策——要么全部提交,要么全部回滚。

两阶段提交的本质

2PC引入了一个中心化的“事务协调器”(Transaction Coordinator, TC),而各个数据库则扮演“资源管理器”(Resource Manager, RM)的角色。整个过程分为两个明确的阶段:

  1. 准备阶段 (Prepare Phase):

    • TC向所有RM发送PREPARE请求。
    • RM收到请求后,执行本地事务的所有操作,将必要的回滚日志(undo log)和重做日志(redo log)持久化到磁盘,然后锁定相关资源。此时,事务已准备好提交,但尚未真正提交。
    • RM向TC回应VOTE_COMMIT(同意提交)或VOTE_ABORT(拒绝提交)。
  2. 提交/回滚阶段 (Commit/Rollback Phase):

    • 如果所有RM都回应VOTE_COMMIT: TC做出“全局提交”的决策,将该决策持久化到自己的日志中,然后向所有RM发送COMMIT请求。RM收到后,释放锁并完成本地事务提交。
    • 如果任何一个RM回应VOTE_ABORT或超时: TC做出“全局回滚”的决策,记录日志,并向所有RM发送ROLLBACK请求。RM根据预先写好的回滚日志撤销所有操作。

其核心思想在于,一旦RM在准备阶段投票同意,它就失去了单方面回滚的权利,必须等待TC的最终指令。这个“承诺”是保证原子性的关键。

sequenceDiagram
    participant TC as Transaction Coordinator
    participant RM1 as SQL Database 1
    participant RM2 as SQL Database 2

    TC->>RM1: PREPARE(txId)
    TC->>RM2: PREPARE(txId)
    
    Note right of RM1: Executes SQL, writes redo/undo logs, locks resources
    RM1-->>TC: VOTE_COMMIT
    
    Note right of RM2: Executes SQL, writes redo/undo logs, locks resources
    RM2-->>TC: VOTE_COMMIT
    
    Note left of TC: All votes are 'COMMIT'. Decision is global commit.
    TC->>TC: Log 'COMMIT' for txId
    
    TC->>RM1: COMMIT(txId)
    TC->>RM2: COMMIT(txId)

    RM1-->>TC: ACK
    RM2-->>TC: ACK

实战:构建一个支持崩溃恢复的协调器

理论很简单,但魔鬼在细节中。一个生产级的2PC协调器必须能处理自身和参与者的崩溃。我们将使用Java和标准的JTA/XA接口从零开始构建一个这样的协调器。XA规范是2PC协议在数据库层面的标准实现。

1. 项目设计与核心组件

我们的协调器需要以下几个部分:

  • TransactionCoordinator: 核心调度器,负责驱动2PC流程。
  • PersistentTransactionLogger: TC的持久化日志,用于在崩溃后恢复事务状态。这是TC无状态、可恢复的关键。
  • SQLResourceManager: 对javax.sql.XADataSourcejavax.transaction.xa.XAResource的封装,代表一个参与事务的SQL数据库。
  • TransactionState: 一个枚举,定义事务的生命周期状态 (INITIAL, PREPARING, PREPARED, COMMITTING, ROLLING_BACK, DONE)。

2. 事务日志:协调器的记忆

协调器的最大挑战是在自身崩溃后如何继续未完成的事务。答案是日志先行(Write-Ahead Logging)。在做出任何全局决策(提交或回滚)并通知RM之前,TC必须将该决策持久化。

我们将实现一个简单的基于文件的日志。

TransactionState.java

public enum TransactionState {
    INITIAL,       // 初始状态
    PREPARING,     // 正在准备
    PREPARED,      // 所有参与者准备就绪,已决定全局提交
    COMMITTING,    // 正在通知提交
    ABORTED,       // 已决定全局回滚
    ROLLING_BACK,  // 正在通知回滚
    COMMITTED,     // 事务已成功提交
    ROLLED_BACK;   // 事务已成功回滚

    public boolean isFinished() {
        return this == COMMITTED || this == ROLLED_BACK;
    }
}

PersistentTransactionLogger.java

import java.io.*;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.StandardOpenOption;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;

// 这是一个简化的、非生产级的日志实现,仅用于演示原理
// 真实项目中会使用更高效、更可靠的日志系统,如预写日志文件
public class PersistentTransactionLogger implements Closeable {

    private final Path logFile;
    private final BufferedWriter writer;

    public PersistentTransactionLogger(Path logFile) throws IOException {
        this.logFile = logFile;
        // 使用追加模式打开文件,并确保每次写入都刷新到磁盘
        this.writer = Files.newBufferedWriter(logFile, StandardOpenOption.CREATE, StandardOpenOption.APPEND);
    }

    /**
     * 记录事务状态的变更。这是最关键的操作,必须保证原子性和持久性。
     * @param txId 事务ID
     * @param state 新的状态
     */
    public synchronized void log(String txId, TransactionState state) throws IOException {
        String logEntry = String.format("%s,%s,%d%n", txId, state.name(), System.currentTimeMillis());
        writer.write(logEntry);
        // 在生产环境中,flush()可能不够,需要fsync来确保数据落盘
        writer.flush(); 
    }

    /**
     * 在协调器启动时调用,用于恢复未完成的事务。
     * @return 一个Map,包含所有未完成事务的最终状态
     */
    public Map<String, TransactionState> recover() throws IOException {
        Map<String, TransactionState> transactionStates = new ConcurrentHashMap<>();
        if (!Files.exists(logFile)) {
            return transactionStates;
        }

        try (BufferedReader reader = Files.newBufferedReader(logFile)) {
            String line;
            while ((line = reader.readLine()) != null) {
                String[] parts = line.split(",");
                if (parts.length >= 2) {
                    transactionStates.put(parts[0], TransactionState.valueOf(parts[1]));
                }
            }
        }
        
        // 过滤掉已经完成的事务
        transactionStates.entrySet().removeIf(entry -> entry.getValue().isFinished());
        
        System.out.printf("Recovered %d unfinished transactions from log.%n", transactionStates.size());
        return transactionStates;
    }

    @Override
    public void close() throws IOException {
        if (writer != null) {
            writer.close();
        }
    }
}

这里的关键是 log 方法。在TC决定是 COMMIT 还是 ROLLBACK 之后,它必须先调用 logger.log(txId, TransactionState.PREPARED)logger.log(txId, TransactionState.ABORTED),然后才能向RM发送指令。

3. 资源管理器:与XA接口交互

我们需要一个通用的方式来与任何支持XA的数据库交互。

SQLResourceManager.java

import javax.sql.XAConnection;
import javax.sql.XADataSource;
import javax.transaction.xa.XAException;
import javax.transaction.xa.XAResource;
import javax.transaction.xa.Xid;
import java.sql.SQLException;

public class SQLResourceManager {

    private final String name;
    private final XADataSource xaDataSource;
    private XAConnection xaConnection;
    private XAResource xaResource;
    private Xid xid;

    public SQLResourceManager(String name, XADataSource xaDataSource) {
        this.name = name;
        this.xaDataSource = xaDataSource;
    }

    public void start(Xid xid) throws SQLException, XAException {
        this.xid = xid;
        this.xaConnection = xaDataSource.getXAConnection();
        this.xaResource = xaConnection.getXAResource();
        // 将此资源与全局事务ID关联
        this.xaResource.start(xid, XAResource.TMNOFLAGS);
    }
    
    public XAConnection getXaConnection() {
        return xaConnection;
    }

    public void end() throws XAException {
        this.xaResource.end(xid, XAResource.TMSUCCESS);
    }

    /**
     * 执行PREPARE阶段
     * @return XAResource.XA_OK表示成功准备
     */
    public int prepare() throws XAException {
        System.out.printf("RM [%s] preparing transaction %s...%n", name, xid);
        return xaResource.prepare(xid);
    }

    public void commit() throws XAException {
        System.out.printf("RM [%s] committing transaction %s...%n", name, xid);
        // 第二个参数为false表示这是一个单阶段提交优化,我们这里是标准的2PC
        xaResource.commit(xid, false);
    }

    public void rollback() throws XAException {
        System.out.printf("RM [%s] rolling back transaction %s...%n", name, xid);
        xaResource.rollback(xid);
    }

    public void close() {
        if (xaConnection != null) {
            try {
                xaConnection.close();
            } catch (SQLException e) {
                // 在真实项目中,这里需要详细的日志记录
                e.printStackTrace();
            }
        }
    }
    
    public String getName() {
        return name;
    }
}

4. 协调器核心逻辑

现在是时候将所有部分组合在一起了。协调器负责整个流程的驱动和错误处理。

TransactionCoordinator.java

import javax.transaction.xa.XAException;
import javax.transaction.xa.XAResource;
import javax.transaction.xa.Xid;
import java.io.IOException;
import java.nio.file.Paths;
import java.sql.Connection;
import java.sql.Statement;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.function.Consumer;

public class TransactionCoordinator {

    private final PersistentTransactionLogger logger;
    private final Map<String, TransactionState> recoveredTransactions;
    private final ExecutorService recoveryExecutor = Executors.newSingleThreadExecutor();

    public TransactionCoordinator() throws IOException {
        this.logger = new PersistentTransactionLogger(Paths.get("transaction.log"));
        this.recoveredTransactions = logger.recover();
    }
    
    // 启动时恢复未完成的事务
    public void recover(List<SQLResourceManager> allPossibleRMs) {
        if (recoveredTransactions.isEmpty()) {
            return;
        }
        System.out.println("Starting recovery process...");
        recoveryExecutor.submit(() -> {
            recoveredTransactions.forEach((txId, state) -> {
                System.out.printf("Recovering transaction %s with state %s%n", txId, state);
                // 这里需要一个更复杂的机制来查找哪些RM参与了特定事务
                // 为简化,我们假设所有RM都参与了
                if (state == TransactionState.PREPARED || state == TransactionState.COMMITTING) {
                    System.out.printf("Retrying COMMIT for transaction %s%n", txId);
                    doCommit(txId, allPossibleRMs);
                } else if (state == TransactionState.ABORTED || state == TransactionState.ROLLING_BACK) {
                    System.out.printf("Retrying ROLLBACK for transaction %s%n", txId);
                    doRollback(txId, allPossibleRMs);
                }
            });
        });
    }

    public void execute(Consumer<Map<String, Connection>> businessLogic, SQLResourceManager... managers) {
        String txIdStr = UUID.randomUUID().toString();
        Xid xid = new GlobalXid(txIdStr.getBytes(), "my-branch".getBytes());
        List<SQLResourceManager> enlistedManagers = new ArrayList<>();

        System.out.printf("Starting global transaction: %s%n", txIdStr);

        boolean voteOk = true;
        try {
            logger.log(txIdStr, TransactionState.INITIAL);

            // 1. 将业务逻辑与事务关联
            for (SQLResourceManager manager : managers) {
                manager.start(xid);
                enlistedManagers.add(manager);
            }
            
            // 这是一个简化的注入业务逻辑的方式
            // Key为RM的名字,Value为可用的Connection
            Map<String, Connection> connections = new ConcurrentHashMap<>();
            for (SQLResourceManager manager : enlistedManagers) {
                connections.put(manager.getName(), manager.getXaConnection().getConnection());
            }
            businessLogic.accept(connections);
            
            for (SQLResourceManager manager : enlistedManagers) {
                manager.end();
            }

            // --- 阶段一: 准备 ---
            logger.log(txIdStr, TransactionState.PREPARING);
            for (SQLResourceManager manager : enlistedManagers) {
                if (manager.prepare() != XAResource.XA_OK) {
                    voteOk = false;
                    System.err.printf("RM [%s] voted to abort.%n", manager.getName());
                    break;
                }
            }

        } catch (Exception e) {
            System.err.printf("Exception during prepare phase for tx %s: %s%n", txIdStr, e.getMessage());
            voteOk = false;
        }

        // --- 阶段二: 提交或回滚 ---
        if (voteOk) {
            // 关键点:先写日志,再执行操作
            try {
                logger.log(txIdStr, TransactionState.PREPARED);
                doCommit(txIdStr, enlistedManagers);
            } catch (IOException e) {
                // 日志写入失败是致命的。此时系统处于不确定状态。
                // 生产系统需要有重试或报警机制。
                System.err.println("FATAL: Failed to log PREPARED state. Manual intervention required.");
            }
        } else {
             try {
                logger.log(txIdStr, TransactionState.ABORTED);
                doRollback(txIdStr, enlistedManagers);
            } catch (IOException e) {
                 System.err.println("FATAL: Failed to log ABORTED state. Manual intervention required.");
            }
        }
    }

    private void doCommit(String txId, List<SQLResourceManager> managers) {
        try {
            logger.log(txId, TransactionState.COMMITTING);
            for (SQLResourceManager manager : managers) {
                try {
                    manager.commit();
                } catch (XAException e) {
                    // 提交失败是一个严重问题。可能需要人工介入。
                    // 协调器需要不断重试,因为RM已经承诺提交。
                    System.err.printf("Failed to commit RM [%s] for tx %s. Will retry. Error: %s%n", manager.getName(), txId, e.getMessage());
                    // TODO: 添加重试逻辑
                }
            }
            logger.log(txId, TransactionState.COMMITTED);
            System.out.printf("Global transaction %s committed successfully.%n", txId);
        } catch (IOException e) {
            System.err.println("FATAL: Failed to log COMMITTING/COMMITTED state.");
        } finally {
            managers.forEach(SQLResourceManager::close);
        }
    }
    
    private void doRollback(String txId, List<SQLResourceManager> managers) {
        try {
            logger.log(txId, TransactionState.ROLLING_BACK);
            for (SQLResourceManager manager : managers) {
                try {
                    manager.rollback();
                } catch (XAException e) {
                    // 回滚失败也需要重试
                    System.err.printf("Failed to rollback RM [%s] for tx %s. Will retry. Error: %s%n", manager.getName(), txId, e.getMessage());
                    // TODO: 添加重试逻辑
                }
            }
            logger.log(txId, TransactionState.ROLLED_BACK);
            System.out.printf("Global transaction %s rolled back.%n", txId);
        } catch (IOException e) {
            System.err.println("FATAL: Failed to log ROLLING_BACK/ROLLED_BACK state.");
        } finally {
            managers.forEach(SQLResourceManager::close);
        }
    }
    
    // 简单的Xid实现
    static class GlobalXid implements Xid {
        private final byte[] gtrid;
        private final byte[] bqual;

        public GlobalXid(byte[] gtrid, byte[] bqual) {
            this.gtrid = gtrid;
            this.bqual = bqual;
        }

        @Override
        public int getFormatId() {
            return 0; // 使用OSI CCR格式
        }

        @Override
        public byte[] getGlobalTransactionId() {
            return gtrid;
        }

        @Override
        public byte[] getBranchQualifier() {
            return bqual;
        }
    }
}

常见误区与生产环境的挑战

这个实现虽然展示了核心原理,但在真实项目中,它还远远不够。

  1. 阻塞问题: 这是2PC最著名的缺陷。如果在第二阶段,TC向RM发送COMMIT指令后,RM崩溃了,那么在它恢复之前,它所持有的数据库锁将一直存在,阻塞其他事务。更糟的是,如果TC在写完PREPARED日志后永久宕机,所有RM将永远等待指令,数据库资源被永久锁定。这就是为什么2PC通常只适用于内部、高可用的网络环境。

  2. 启发式决策 (Heuristic Decisions): 如果一个RM由于网络分区长时间联系不上TC,它可能会单方面做出“启发式”决策(例如,超时后自动回滚)。当网络恢复后,TC的指令(例如COMMIT)可能与RM的启发式决策冲突,导致数据不一致。XA规范定义了 XA_HEURCOM (启发式提交), XA_HEURRB (启发式回滚) 等错误码。处理这些情况非常复杂,通常需要人工介入。

  3. 性能开销: 2PC引入了多轮网络通信,增加了事务的延迟。准备阶段的资源锁定也会降低系统的并发能力。对于需要高吞吐量的系统,2PC往往不是最佳选择。

  4. 协调器单点问题: 尽管我们的协调器可以从崩溃中恢复,但在它恢复期间,所有进行中的分布式事务都会停滞。生产级的协调器本身需要做高可用集群(例如,使用Paxos或Raft协议在多个副本间同步事务日志),这极大地增加了系统复杂度。

技术的适用边界

两阶段提交提供的是强一致性(ACID中的A,原子性),这是它的核心价值。在金融、电信等对数据一致性要求极高的场景,尤其是在事务参与者数量少、网络延迟低且可靠的环境中,2PC仍然是一个可行的方案。

然而,在当今大规模、高并发的互联网架构中,可用性往往比强一致性更受青ডাক্ট。2PC的阻塞和性能问题使其难以适应这类场景。因此,业界发展出了许多基于最终一致性的替代方案,如Saga、TCC(Try-Confirm-Cancel)模式,它们通过补偿事务来达到最终的数据一致,虽然牺牲了ACID的隔离性,但换来了更高的系统可用性和吞吐量。选择哪种方案,本质上是在CAP理论中进行权衡,不存在银弹。理解2PC的原理和它的局限性,是做出正确架构决策的基石。


  目录