在分布式系统中,跨多个独立数据库执行原子性操作是一个无法回避的难题。一个常见的场景是,订单服务需要在订单库中创建记录,同时库存服务必须在库存库中扣减相应数量。如果其中一个操作失败,整个业务活动必须回滚,就像从未发生过一样。一个天真的实现可能如下:
// 这是一个错误且危险的示例
public void createOrderUnsafe(Connection orderDbConn, Connection inventoryDbConn, Order order, Sku sku) {
try {
// 1. 关闭自动提交
orderDbConn.setAutoCommit(false);
inventoryDbConn.setAutoCommit(false);
// 2. 执行各自的本地事务
orderDao.create(orderDbConn, order);
inventoryDao.decreaseStock(inventoryDbConn, sku);
// 3. 尝试提交两个事务
orderDbConn.commit();
inventoryDbConn.commit(); // 如果这里失败怎么办?订单库已经提交了!
} catch (SQLException e) {
// 尝试回滚,但无法保证两个都成功,或者一个已经提交了
orderDbConn.rollback();
inventoryDbConn.rollback();
}
}
这段代码的致命缺陷在于,orderDbConn.commit()
成功后,如果系统崩溃或 inventoryDbConn.commit()
失败,订单数据将永久存在,而库存并未扣减,造成数据不一致。这正是两阶段提交(Two-Phase Commit, 2PC)协议要解决的核心问题:确保一组分布式事务参与者(在这里是两个SQL数据库)最终达成一致的原子性决策——要么全部提交,要么全部回滚。
两阶段提交的本质
2PC引入了一个中心化的“事务协调器”(Transaction Coordinator, TC),而各个数据库则扮演“资源管理器”(Resource Manager, RM)的角色。整个过程分为两个明确的阶段:
准备阶段 (Prepare Phase):
- TC向所有RM发送
PREPARE
请求。 - RM收到请求后,执行本地事务的所有操作,将必要的回滚日志(undo log)和重做日志(redo log)持久化到磁盘,然后锁定相关资源。此时,事务已准备好提交,但尚未真正提交。
- RM向TC回应
VOTE_COMMIT
(同意提交)或VOTE_ABORT
(拒绝提交)。
- TC向所有RM发送
提交/回滚阶段 (Commit/Rollback Phase):
- 如果所有RM都回应
VOTE_COMMIT
: TC做出“全局提交”的决策,将该决策持久化到自己的日志中,然后向所有RM发送COMMIT
请求。RM收到后,释放锁并完成本地事务提交。 - 如果任何一个RM回应
VOTE_ABORT
或超时: TC做出“全局回滚”的决策,记录日志,并向所有RM发送ROLLBACK
请求。RM根据预先写好的回滚日志撤销所有操作。
- 如果所有RM都回应
其核心思想在于,一旦RM在准备阶段投票同意,它就失去了单方面回滚的权利,必须等待TC的最终指令。这个“承诺”是保证原子性的关键。
sequenceDiagram participant TC as Transaction Coordinator participant RM1 as SQL Database 1 participant RM2 as SQL Database 2 TC->>RM1: PREPARE(txId) TC->>RM2: PREPARE(txId) Note right of RM1: Executes SQL, writes redo/undo logs, locks resources RM1-->>TC: VOTE_COMMIT Note right of RM2: Executes SQL, writes redo/undo logs, locks resources RM2-->>TC: VOTE_COMMIT Note left of TC: All votes are 'COMMIT'. Decision is global commit. TC->>TC: Log 'COMMIT' for txId TC->>RM1: COMMIT(txId) TC->>RM2: COMMIT(txId) RM1-->>TC: ACK RM2-->>TC: ACK
实战:构建一个支持崩溃恢复的协调器
理论很简单,但魔鬼在细节中。一个生产级的2PC协调器必须能处理自身和参与者的崩溃。我们将使用Java和标准的JTA/XA接口从零开始构建一个这样的协调器。XA规范是2PC协议在数据库层面的标准实现。
1. 项目设计与核心组件
我们的协调器需要以下几个部分:
-
TransactionCoordinator
: 核心调度器,负责驱动2PC流程。 -
PersistentTransactionLogger
: TC的持久化日志,用于在崩溃后恢复事务状态。这是TC无状态、可恢复的关键。 -
SQLResourceManager
: 对javax.sql.XADataSource
和javax.transaction.xa.XAResource
的封装,代表一个参与事务的SQL数据库。 -
TransactionState
: 一个枚举,定义事务的生命周期状态 (INITIAL
,PREPARING
,PREPARED
,COMMITTING
,ROLLING_BACK
,DONE
)。
2. 事务日志:协调器的记忆
协调器的最大挑战是在自身崩溃后如何继续未完成的事务。答案是日志先行(Write-Ahead Logging)。在做出任何全局决策(提交或回滚)并通知RM之前,TC必须将该决策持久化。
我们将实现一个简单的基于文件的日志。
TransactionState.java
public enum TransactionState {
INITIAL, // 初始状态
PREPARING, // 正在准备
PREPARED, // 所有参与者准备就绪,已决定全局提交
COMMITTING, // 正在通知提交
ABORTED, // 已决定全局回滚
ROLLING_BACK, // 正在通知回滚
COMMITTED, // 事务已成功提交
ROLLED_BACK; // 事务已成功回滚
public boolean isFinished() {
return this == COMMITTED || this == ROLLED_BACK;
}
}
PersistentTransactionLogger.java
import java.io.*;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.StandardOpenOption;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
// 这是一个简化的、非生产级的日志实现,仅用于演示原理
// 真实项目中会使用更高效、更可靠的日志系统,如预写日志文件
public class PersistentTransactionLogger implements Closeable {
private final Path logFile;
private final BufferedWriter writer;
public PersistentTransactionLogger(Path logFile) throws IOException {
this.logFile = logFile;
// 使用追加模式打开文件,并确保每次写入都刷新到磁盘
this.writer = Files.newBufferedWriter(logFile, StandardOpenOption.CREATE, StandardOpenOption.APPEND);
}
/**
* 记录事务状态的变更。这是最关键的操作,必须保证原子性和持久性。
* @param txId 事务ID
* @param state 新的状态
*/
public synchronized void log(String txId, TransactionState state) throws IOException {
String logEntry = String.format("%s,%s,%d%n", txId, state.name(), System.currentTimeMillis());
writer.write(logEntry);
// 在生产环境中,flush()可能不够,需要fsync来确保数据落盘
writer.flush();
}
/**
* 在协调器启动时调用,用于恢复未完成的事务。
* @return 一个Map,包含所有未完成事务的最终状态
*/
public Map<String, TransactionState> recover() throws IOException {
Map<String, TransactionState> transactionStates = new ConcurrentHashMap<>();
if (!Files.exists(logFile)) {
return transactionStates;
}
try (BufferedReader reader = Files.newBufferedReader(logFile)) {
String line;
while ((line = reader.readLine()) != null) {
String[] parts = line.split(",");
if (parts.length >= 2) {
transactionStates.put(parts[0], TransactionState.valueOf(parts[1]));
}
}
}
// 过滤掉已经完成的事务
transactionStates.entrySet().removeIf(entry -> entry.getValue().isFinished());
System.out.printf("Recovered %d unfinished transactions from log.%n", transactionStates.size());
return transactionStates;
}
@Override
public void close() throws IOException {
if (writer != null) {
writer.close();
}
}
}
这里的关键是 log
方法。在TC决定是 COMMIT
还是 ROLLBACK
之后,它必须先调用 logger.log(txId, TransactionState.PREPARED)
或 logger.log(txId, TransactionState.ABORTED)
,然后才能向RM发送指令。
3. 资源管理器:与XA接口交互
我们需要一个通用的方式来与任何支持XA的数据库交互。
SQLResourceManager.java
import javax.sql.XAConnection;
import javax.sql.XADataSource;
import javax.transaction.xa.XAException;
import javax.transaction.xa.XAResource;
import javax.transaction.xa.Xid;
import java.sql.SQLException;
public class SQLResourceManager {
private final String name;
private final XADataSource xaDataSource;
private XAConnection xaConnection;
private XAResource xaResource;
private Xid xid;
public SQLResourceManager(String name, XADataSource xaDataSource) {
this.name = name;
this.xaDataSource = xaDataSource;
}
public void start(Xid xid) throws SQLException, XAException {
this.xid = xid;
this.xaConnection = xaDataSource.getXAConnection();
this.xaResource = xaConnection.getXAResource();
// 将此资源与全局事务ID关联
this.xaResource.start(xid, XAResource.TMNOFLAGS);
}
public XAConnection getXaConnection() {
return xaConnection;
}
public void end() throws XAException {
this.xaResource.end(xid, XAResource.TMSUCCESS);
}
/**
* 执行PREPARE阶段
* @return XAResource.XA_OK表示成功准备
*/
public int prepare() throws XAException {
System.out.printf("RM [%s] preparing transaction %s...%n", name, xid);
return xaResource.prepare(xid);
}
public void commit() throws XAException {
System.out.printf("RM [%s] committing transaction %s...%n", name, xid);
// 第二个参数为false表示这是一个单阶段提交优化,我们这里是标准的2PC
xaResource.commit(xid, false);
}
public void rollback() throws XAException {
System.out.printf("RM [%s] rolling back transaction %s...%n", name, xid);
xaResource.rollback(xid);
}
public void close() {
if (xaConnection != null) {
try {
xaConnection.close();
} catch (SQLException e) {
// 在真实项目中,这里需要详细的日志记录
e.printStackTrace();
}
}
}
public String getName() {
return name;
}
}
4. 协调器核心逻辑
现在是时候将所有部分组合在一起了。协调器负责整个流程的驱动和错误处理。
TransactionCoordinator.java
import javax.transaction.xa.XAException;
import javax.transaction.xa.XAResource;
import javax.transaction.xa.Xid;
import java.io.IOException;
import java.nio.file.Paths;
import java.sql.Connection;
import java.sql.Statement;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.function.Consumer;
public class TransactionCoordinator {
private final PersistentTransactionLogger logger;
private final Map<String, TransactionState> recoveredTransactions;
private final ExecutorService recoveryExecutor = Executors.newSingleThreadExecutor();
public TransactionCoordinator() throws IOException {
this.logger = new PersistentTransactionLogger(Paths.get("transaction.log"));
this.recoveredTransactions = logger.recover();
}
// 启动时恢复未完成的事务
public void recover(List<SQLResourceManager> allPossibleRMs) {
if (recoveredTransactions.isEmpty()) {
return;
}
System.out.println("Starting recovery process...");
recoveryExecutor.submit(() -> {
recoveredTransactions.forEach((txId, state) -> {
System.out.printf("Recovering transaction %s with state %s%n", txId, state);
// 这里需要一个更复杂的机制来查找哪些RM参与了特定事务
// 为简化,我们假设所有RM都参与了
if (state == TransactionState.PREPARED || state == TransactionState.COMMITTING) {
System.out.printf("Retrying COMMIT for transaction %s%n", txId);
doCommit(txId, allPossibleRMs);
} else if (state == TransactionState.ABORTED || state == TransactionState.ROLLING_BACK) {
System.out.printf("Retrying ROLLBACK for transaction %s%n", txId);
doRollback(txId, allPossibleRMs);
}
});
});
}
public void execute(Consumer<Map<String, Connection>> businessLogic, SQLResourceManager... managers) {
String txIdStr = UUID.randomUUID().toString();
Xid xid = new GlobalXid(txIdStr.getBytes(), "my-branch".getBytes());
List<SQLResourceManager> enlistedManagers = new ArrayList<>();
System.out.printf("Starting global transaction: %s%n", txIdStr);
boolean voteOk = true;
try {
logger.log(txIdStr, TransactionState.INITIAL);
// 1. 将业务逻辑与事务关联
for (SQLResourceManager manager : managers) {
manager.start(xid);
enlistedManagers.add(manager);
}
// 这是一个简化的注入业务逻辑的方式
// Key为RM的名字,Value为可用的Connection
Map<String, Connection> connections = new ConcurrentHashMap<>();
for (SQLResourceManager manager : enlistedManagers) {
connections.put(manager.getName(), manager.getXaConnection().getConnection());
}
businessLogic.accept(connections);
for (SQLResourceManager manager : enlistedManagers) {
manager.end();
}
// --- 阶段一: 准备 ---
logger.log(txIdStr, TransactionState.PREPARING);
for (SQLResourceManager manager : enlistedManagers) {
if (manager.prepare() != XAResource.XA_OK) {
voteOk = false;
System.err.printf("RM [%s] voted to abort.%n", manager.getName());
break;
}
}
} catch (Exception e) {
System.err.printf("Exception during prepare phase for tx %s: %s%n", txIdStr, e.getMessage());
voteOk = false;
}
// --- 阶段二: 提交或回滚 ---
if (voteOk) {
// 关键点:先写日志,再执行操作
try {
logger.log(txIdStr, TransactionState.PREPARED);
doCommit(txIdStr, enlistedManagers);
} catch (IOException e) {
// 日志写入失败是致命的。此时系统处于不确定状态。
// 生产系统需要有重试或报警机制。
System.err.println("FATAL: Failed to log PREPARED state. Manual intervention required.");
}
} else {
try {
logger.log(txIdStr, TransactionState.ABORTED);
doRollback(txIdStr, enlistedManagers);
} catch (IOException e) {
System.err.println("FATAL: Failed to log ABORTED state. Manual intervention required.");
}
}
}
private void doCommit(String txId, List<SQLResourceManager> managers) {
try {
logger.log(txId, TransactionState.COMMITTING);
for (SQLResourceManager manager : managers) {
try {
manager.commit();
} catch (XAException e) {
// 提交失败是一个严重问题。可能需要人工介入。
// 协调器需要不断重试,因为RM已经承诺提交。
System.err.printf("Failed to commit RM [%s] for tx %s. Will retry. Error: %s%n", manager.getName(), txId, e.getMessage());
// TODO: 添加重试逻辑
}
}
logger.log(txId, TransactionState.COMMITTED);
System.out.printf("Global transaction %s committed successfully.%n", txId);
} catch (IOException e) {
System.err.println("FATAL: Failed to log COMMITTING/COMMITTED state.");
} finally {
managers.forEach(SQLResourceManager::close);
}
}
private void doRollback(String txId, List<SQLResourceManager> managers) {
try {
logger.log(txId, TransactionState.ROLLING_BACK);
for (SQLResourceManager manager : managers) {
try {
manager.rollback();
} catch (XAException e) {
// 回滚失败也需要重试
System.err.printf("Failed to rollback RM [%s] for tx %s. Will retry. Error: %s%n", manager.getName(), txId, e.getMessage());
// TODO: 添加重试逻辑
}
}
logger.log(txId, TransactionState.ROLLED_BACK);
System.out.printf("Global transaction %s rolled back.%n", txId);
} catch (IOException e) {
System.err.println("FATAL: Failed to log ROLLING_BACK/ROLLED_BACK state.");
} finally {
managers.forEach(SQLResourceManager::close);
}
}
// 简单的Xid实现
static class GlobalXid implements Xid {
private final byte[] gtrid;
private final byte[] bqual;
public GlobalXid(byte[] gtrid, byte[] bqual) {
this.gtrid = gtrid;
this.bqual = bqual;
}
@Override
public int getFormatId() {
return 0; // 使用OSI CCR格式
}
@Override
public byte[] getGlobalTransactionId() {
return gtrid;
}
@Override
public byte[] getBranchQualifier() {
return bqual;
}
}
}
常见误区与生产环境的挑战
这个实现虽然展示了核心原理,但在真实项目中,它还远远不够。
阻塞问题: 这是2PC最著名的缺陷。如果在第二阶段,TC向RM发送
COMMIT
指令后,RM崩溃了,那么在它恢复之前,它所持有的数据库锁将一直存在,阻塞其他事务。更糟的是,如果TC在写完PREPARED
日志后永久宕机,所有RM将永远等待指令,数据库资源被永久锁定。这就是为什么2PC通常只适用于内部、高可用的网络环境。启发式决策 (Heuristic Decisions): 如果一个RM由于网络分区长时间联系不上TC,它可能会单方面做出“启发式”决策(例如,超时后自动回滚)。当网络恢复后,TC的指令(例如
COMMIT
)可能与RM的启发式决策冲突,导致数据不一致。XA规范定义了XA_HEURCOM
(启发式提交),XA_HEURRB
(启发式回滚) 等错误码。处理这些情况非常复杂,通常需要人工介入。性能开销: 2PC引入了多轮网络通信,增加了事务的延迟。准备阶段的资源锁定也会降低系统的并发能力。对于需要高吞吐量的系统,2PC往往不是最佳选择。
协调器单点问题: 尽管我们的协调器可以从崩溃中恢复,但在它恢复期间,所有进行中的分布式事务都会停滞。生产级的协调器本身需要做高可用集群(例如,使用Paxos或Raft协议在多个副本间同步事务日志),这极大地增加了系统复杂度。
技术的适用边界
两阶段提交提供的是强一致性(ACID中的A,原子性),这是它的核心价值。在金融、电信等对数据一致性要求极高的场景,尤其是在事务参与者数量少、网络延迟低且可靠的环境中,2PC仍然是一个可行的方案。
然而,在当今大规模、高并发的互联网架构中,可用性往往比强一致性更受青ডাক্ট。2PC的阻塞和性能问题使其难以适应这类场景。因此,业界发展出了许多基于最终一致性的替代方案,如Saga、TCC(Try-Confirm-Cancel)模式,它们通过补偿事务来达到最终的数据一致,虽然牺牲了ACID的隔离性,但换来了更高的系统可用性和吞吐量。选择哪种方案,本质上是在CAP理论中进行权衡,不存在银弹。理解2PC的原理和它的局限性,是做出正确架构决策的基石。