一个跨多个微服务的业务流程,比如视频处理发布,其内在的复杂性不在于单个服务的实现,而在于如何保证整个流程的原子性。一个典型的流程可能包含:接收上传、启动转码、提取元数据、内容审核、分发CDN。这五个步骤,分布在不同服务中,任何一步失败,都需要可靠地回滚已完成的操作,避免产生数据孤岛或“幽灵数据”。在真实项目中,依靠业务代码在每个服务里手动处理补偿逻辑,会迅速演变成一场维护噩梦。
最初的构想是基于消息队列的编排(Choreography)Saga。每个服务完成自己的工作后,发出一个事件,下一个服务监听该事件并开始工作。失败时,则发出一个失败事件,之前的服务监听并执行补偿。这种模式看似简单,但很快暴露了致命缺陷:
- 状态可见性黑洞:整个Saga的全局状态是隐式的,分散在各个服务的日志和消息队列的流转记录中。当一个流程卡住时,定位问题变得极其困难。
- 环路依赖:服务之间通过事件相互订阅,很容易形成复杂的、难以追踪的依赖关系,甚至循环依赖。
- 补偿逻辑的脆弱性:哪个服务负责触发补偿?如果一个服务在执行补偿操作时崩溃了怎么办?这套机制本身缺乏一个可靠的“事务协调者”。
我们需要一个强有力的协调者(Orchestrator),它必须是有状态的、持久化的、高可用的。一个普通的Node.js服务来做协调者?当服务实例崩溃时,内存中的所有Saga状态都会丢失。用Redis或数据库来持久化状态?我们需要自己处理并发控制、状态转移的原子性,以及从故障中恢复的复杂逻辑。这本质上是在重复造一个状态机的轮子。
这就是引入Apache Flink的原因。Flink不仅仅是一个数据处理引擎,其核心是一个强大的分布式状态化流处理框架。它的Checkpoint机制可以保证状态的Exactly-Once,内置的定时器服务可以完美处理Saga的超时,而其本身的高可用架构解决了单点故障问题。我们可以将每一个Saga实例看作一个数据流中的“事件”,Saga协调器就是一个Flink作业,负责管理每个Saga实例的生命周期状态机。
架构设计:Flink作为Saga协调中心
整个系统的核心是一个由Flink作业实现的Saga Orchestrator。所有参与方(Node.js微服务)通过一个中心化的消息队列(如Kafka)与Orchestrator通信。
sequenceDiagram participant Client participant SagaInitiator (Node.js) participant Kafka participant FlinkOrchestrator participant TranscodingSvc (Node.js) participant MetadataSvc (Node.js) participant NotificationSvc (Node.js) Client->>SagaInitiator: POST /startVideoProcess SagaInitiator->>Kafka: Topic: saga-commands | Msg: { type: 'START', sagaId: '...', payload: {...} } FlinkOrchestrator->>Kafka: Consume saga-commands FlinkOrchestrator-->>FlinkOrchestrator: Create new SagaState FlinkOrchestrator->>Kafka: Topic: transcoding-commands | Msg: { type: 'START_TRANSCODE', sagaId: '...', ... } FlinkOrchestrator->>Kafka: Topic: saga-state-updates | Msg: { sagaId: '...', status: 'RUNNING', step: 'TRANSCODING' } TranscodingSvc->>Kafka: Consume transcoding-commands TranscodingSvc-->>TranscodingSvc: Process video... TranscodingSvc->>Kafka: Topic: saga-events | Msg: { type: 'TRANSCODE_SUCCESS', sagaId: '...', ... } FlinkOrchestrator->>Kafka: Consume saga-events FlinkOrchestrator-->>FlinkOrchestrator: Update SagaState (transcoding complete) FlinkOrchestrator->>Kafka: Topic: metadata-commands | Msg: { type: 'EXTRACT_METADATA', sagaId: '...', ... } FlinkOrchestrator->>Kafka: Topic: saga-state-updates | Msg: { sagaId: '...', status: 'RUNNING', step: 'METADATA_EXTRACTION' } MetadataSvc->>Kafka: Consume metadata-commands MetadataSvc-->>MetadataSvc: Extract metadata... MetadataSvc->>Kafka: Topic: saga-events | Msg: { type: 'METADATA_FAILURE', sagaId: '...', reason: '...' } FlinkOrchestrator->>Kafka: Consume saga-events (failure) FlinkOrchestrator-->>FlinkOrchestrator: Start compensation logic FlinkOrchestrator->>Kafka: Topic: transcoding-commands | Msg: { type: 'CANCEL_TRANSCODE', sagaId: '...', ... } FlinkOrchestrator->>Kafka: Topic: saga-state-updates | Msg: { sagaId: '...', status: 'COMPENSATING', step: 'CANCEL_TRANSCODING' } TranscodingSvc->>Kafka: Consume transcoding-commands TranscodingSvc-->>TranscodingSvc: Delete transcoded artifacts... TranscodingSvc->>Kafka: Topic: saga-events | Msg: { type: 'CANCEL_TRANSCODE_SUCCESS', sagaId: '...' } FlinkOrchestrator->>Kafka: Consume saga-events FlinkOrchestrator-->>FlinkOrchestrator: Finalize SagaState as FAILED FlinkOrchestrator->>Kafka: Topic: saga-state-updates | Msg: { sagaId: '...', status: 'FAILED' } NotificationSvc->>Kafka: Consume saga-state-updates NotificationSvc-->>Client: Push real-time status via WebSocket
Flink Saga Orchestrator 的核心实现
我们将使用Flink的KeyedProcessFunction
来为每个sagaId
维护一个独立的状态机。
1. Saga状态定义 (POJO)
// SagaState.java
import java.io.Serializable;
import java.util.Map;
import java.util.Stack;
public class SagaState implements Serializable {
private static final long serialVersionUID = 1L;
public String sagaId;
public SagaStatus status;
public String currentStep;
public Map<String, Object> payload;
// 用于存储补偿操作
public Stack<CompensationStep> compensationStack;
public long lastUpdated;
public String failureReason;
public enum SagaStatus {
STARTED, RUNNING, COMMITTED, FAILED, COMPENSATING
}
public static class CompensationStep implements Serializable {
private static final long serialVersionUID = 1L;
public String service;
public String action;
public Map<String, Object> payload;
}
}
2. KeyedProcessFunction
实现
这个函数是整个协调器的心脏。它消费来自saga-commands
和saga-events
两个Topic的合并流,并根据sagaId
进行分区。
// SagaOrchestrator.java
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
import org.apache.flink.util.Collector;
// InputSagaEvent is a wrapper for commands and events
public class SagaOrchestrator extends KeyedProcessFunction<String, InputSagaEvent, SagaStateUpdate> {
private static final long SAGA_TIMEOUT_MS = 60 * 60 * 1000; // 1 hour timeout
// 每个Saga实例的状态都由Flink管理,保证了持久化和容错
private transient ValueState<SagaState> state;
@Override
public void open(Configuration parameters) {
ValueStateDescriptor<SagaState> descriptor = new ValueStateDescriptor<>(
"saga-state", SagaState.class);
state = getRuntimeContext().getState(descriptor);
}
@Override
public void processElement(InputSagaEvent event, Context ctx, Collector<SagaStateUpdate> out) throws Exception {
SagaState currentState = state.value();
// 为Saga设置一个全局超时定时器
long timerTimestamp = ctx.timestamp() + SAGA_TIMEOUT_MS;
ctx.timerService().registerEventTimeTimer(timerTimestamp);
if (event.getType().equals("START")) {
if (currentState != null) {
// 幂等性处理:如果Saga已存在,则忽略START命令
return;
}
// 初始化Saga
SagaState newState = SagaFactory.createFromStartCommand(event);
state.update(newState);
// 启动第一个步骤
executeNextStep(newState, ctx, out);
} else {
if (currentState == null) {
// 孤儿事件,可能来自延迟或重复的消息,直接忽略
// 在生产环境中,这里应该记录一个严重的警告日志
return;
}
// 处理后续事件
processSagaEvent(currentState, event, ctx, out);
}
}
private void processSagaEvent(SagaState currentState, InputSagaEvent event, Context ctx, Collector<SagaStateUpdate> out) throws Exception {
// 核心状态机逻辑
switch (currentState.status) {
case RUNNING:
if (event.isSuccess()) {
SagaLogic.onStepSuccess(currentState, event);
executeNextStep(currentState, ctx, out);
} else {
currentState.status = SagaState.SagaStatus.COMPENSATING;
currentState.failureReason = event.getReason();
startCompensation(currentState, ctx, out);
}
break;
case COMPENSATING:
if (event.isCompensationSuccess()) {
SagaLogic.onCompensationSuccess(currentState, event);
compensateNextStep(currentState, ctx, out);
} else {
// 补偿失败是严重问题,需要人工介入
// 标记Saga为失败,并发出告警
currentState.status = SagaState.SagaStatus.FAILED;
// Log critical error, maybe push to a dead-letter queue
}
break;
// COMMITTED, FAILED 状态是终态,不再处理事件
}
currentState.lastUpdated = ctx.timestamp();
state.update(currentState);
out.collect(SagaStateUpdate.from(currentState));
}
private void executeNextStep(SagaState currentState, Context ctx, Collector<SagaStateUpdate> out) {
// 从Saga定义中获取下一步操作
SagaStep nextStep = SagaDefinition.getNextStep(currentState.currentStep);
if (nextStep != null) {
// 通过Side Output或直接写入Kafka来发送命令
// ctx.output(commandOutputTag, nextStep.buildCommand(currentState));
currentState.currentStep = nextStep.getName();
currentState.status = SagaState.SagaStatus.RUNNING;
// 将补偿操作压入栈
currentState.compensationStack.push(nextStep.getCompensationStep());
} else {
// 所有步骤完成,Saga提交
currentState.status = SagaState.SagaStatus.COMMITTED;
}
state.update(currentState);
out.collect(SagaStateUpdate.from(currentState));
}
private void startCompensation(SagaState currentState, Context ctx, Collector<SagaStateUpdate> out) {
compensateNextStep(currentState, ctx, out);
}
private void compensateNextStep(SagaState currentState, Context ctx, Collector<SagaStateUpdate> out) {
if (!currentState.compensationStack.isEmpty()) {
SagaState.CompensationStep stepToCompensate = currentState.compensationStack.pop();
// 发送补偿命令
// ctx.output(commandOutputTag, buildCompensationCommand(stepToCompensate));
} else {
// 所有补偿完成,Saga最终失败
currentState.status = SagaState.SagaStatus.FAILED;
}
state.update(currentState);
out.collect(SagaStateUpdate.from(currentState));
}
@Override
public void onTimer(long timestamp, OnTimerContext ctx, Collector<SagaStateUpdate> out) throws Exception {
SagaState currentState = state.value();
if (currentState != null && currentState.status != SagaState.SagaStatus.COMMITTED && currentState.status != SagaState.SagaStatus.FAILED) {
// Saga 超时
currentState.status = SagaState.SagaStatus.COMPENSATING;
currentState.failureReason = "Saga timed out";
startCompensation(currentState, ctx, out);
state.update(currentState);
out.collect(SagaStateUpdate.from(currentState));
}
}
}
这里的关键在于,所有状态的修改都通过state.update()
提交给Flink。当Checkpoint发生时,这些状态会被快照到持久化存储(如HDFS或S3)。如果作业失败并重启,Flink会自动从上一个成功的Checkpoint恢复状态,保证每个Saga实例都能从中断的地方继续执行,不多也不少。
Node.js 微服务实现
参与Saga的Node.js服务非常简单。它们是无状态的,只负责监听命令,执行业务逻辑,然后发布结果事件。
// transcoding-service/index.js
const { Kafka } = require('kafkajs');
const kafka = new Kafka({
clientId: 'transcoding-service',
brokers: ['kafka:9092'],
});
const producer = kafka.producer();
const consumer = kafka.consumer({ groupId: 'transcoding-group' });
// 模拟业务逻辑
const startTranscode = async (sagaId, payload) => {
console.log(`[${sagaId}] Starting transcode for: ${payload.sourceUri}`);
// 模拟耗时操作
await new Promise(resolve => setTimeout(resolve, 5000));
// 模拟随机失败
if (Math.random() < 0.2) {
console.error(`[${sagaId}] Transcoding failed`);
throw new Error('FFMPEG process failed');
}
console.log(`[${sagaId}] Transcoding successful`);
return { transcodedUri: `s3://bucket/processed/${payload.id}.mp4` };
};
const cancelTranscode = async (sagaId, payload) => {
console.log(`[${sagaId}] Compensating: Deleting transcoded artifacts for: ${payload.sourceUri}`);
// 补偿操作必须是幂等的,且尽可能保证成功
await new Promise(resolve => setTimeout(resolve, 1000));
console.log(`[${sagaId}] Compensation successful`);
};
const run = async () => {
await producer.connect();
await consumer.connect();
await consumer.subscribe({ topic: 'transcoding-commands', fromBeginning: true });
await consumer.run({
eachMessage: async ({ topic, partition, message }) => {
const command = JSON.parse(message.value.toString());
const { sagaId, type, payload } = command;
try {
if (type === 'START_TRANSCODE') {
const result = await startTranscode(sagaId, payload);
await producer.send({
topic: 'saga-events',
messages: [{ value: JSON.stringify({ sagaId, type: 'TRANSCODE_SUCCESS', payload: result }) }],
});
} else if (type === 'CANCEL_TRANSCODE') {
await cancelTranscode(sagaId, payload);
await producer.send({
topic: 'saga-events',
messages: [{ value: JSON.stringify({ sagaId, type: 'CANCEL_TRANSCODE_SUCCESS' }) }],
});
}
} catch (error) {
// 任何业务逻辑失败,都发送失败事件
await producer.send({
topic: 'saga-events',
messages: [{ value: JSON.stringify({ sagaId, type: 'TRANSCODE_FAILURE', reason: error.message }) }],
});
}
},
});
};
run().catch(e => console.error('[transcoding-service] Error', e));
前端实时状态展示与Valtio
当后端流程复杂且异步时,前端的状态管理也面临挑战。用户需要实时看到任务的进展。我们使用一个简单的Node.js服务(NotificationSvc)消费saga-state-updates
Kafka Topic,并通过WebSocket将状态实时推送给前端。
在前端,Valtio是一个极简且强大的状态管理库。它的Proxy-based机制使得状态更新和组件重渲染非常自然。
1. Valtio Store
// src/store/sagaStore.ts
import { proxy, subscribe } from 'valtio';
export type SagaStatus = 'PENDING' | 'RUNNING' | 'COMMITTED' | 'FAILED' | 'COMPENSATING';
export interface SagaState {
sagaId: string;
status: SagaStatus;
currentStep: string;
failureReason?: string;
lastUpdated: number;
}
// 使用Map来存储多个Saga实例的状态
const sagaStates = proxy<Record<string, SagaState>>({});
// 模拟WebSocket连接
// 在真实应用中,这里会是一个WebSocket客户端
const mockWebSocket = {
onmessage: (handler: (event: { data: string }) => void) => {
// 模拟来自服务器的推送
setInterval(() => {
const mockUpdate = { sagaId: 'video-123', status: 'RUNNING', currentStep: 'METADATA_EXTRACTION', lastUpdated: Date.now() };
handler({ data: JSON.stringify(mockUpdate) });
}, 3000);
}
};
mockWebSocket.onmessage((event) => {
const update = JSON.parse(event.data) as SagaState;
// 直接修改proxy对象,所有使用该状态的组件都会自动更新
sagaStates[update.sagaId] = update;
});
export default sagaStates;
2. React Component
// src/components/SagaStatusTracker.tsx
import React from 'react';
import { useSnapshot } from 'valtio';
import sagaStates from '../store/sagaStore';
interface Props {
sagaId: string;
}
const SagaStatusTracker: React.FC<Props> = ({ sagaId }) => {
// useSnapshot会创建状态的一个不可变快照
// 当store变化时,组件自动重渲染
const states = useSnapshot(sagaStates);
const saga = states[sagaId];
if (!saga) {
return <div data-testid="saga-status">Status for {sagaId}: PENDING...</div>;
}
return (
<div data-testid="saga-status">
<h2>Saga Status: {saga.sagaId}</h2>
<p><strong>Status:</strong> <span data-testid="status-value">{saga.status}</span></p>
<p><strong>Current Step:</strong> {saga.currentStep}</p>
{saga.status === 'FAILED' && (
<p style={{ color: 'red' }}>
<strong>Failure Reason:</strong> {saga.failureReason}
</p>
)}
<p><small>Last Updated: {new Date(saga.lastUpdated).toLocaleTimeString()}</small></p>
</div>
);
};
export default SagaStatusTracker;
Valtio的优雅之处在于,我们无需编写actions, reducers或selectors。只需修改代理状态对象,React组件就能响应式地更新。
使用React Testing Library测试复杂状态流
对于这样一个实时更新的UI,测试尤为重要。我们需要确保UI能正确响应来自后端的各种状态变更。React Testing Library (RTL) 提倡测试用户所见的行为,而不是实现细节。
// src/components/SagaStatusTracker.test.tsx
import React from 'react';
import { render, screen, act } from '@testing-library/react';
import SagaStatusTracker from './SagaStatusTracker';
import sagaStates from '../store/sagaStore';
// 我们需要手动控制Valtio store来模拟WebSocket推送
describe('SagaStatusTracker', () => {
const SAGA_ID = 'test-saga-001';
beforeEach(() => {
// 每个测试用例前清空状态
for (const key in sagaStates) {
delete sagaStates[key];
}
});
test('renders initial pending state if saga does not exist', () => {
render(<SagaStatusTracker sagaId={SAGA_ID} />);
expect(screen.getByTestId('saga-status')).toHaveTextContent('Status for test-saga-001: PENDING...');
});
test('updates status and current step when store changes', () => {
render(<SagaStatusTracker sagaId={SAGA_ID} />);
// 模拟第一次状态更新 (RUNNING)
act(() => {
sagaStates[SAGA_ID] = {
sagaId: SAGA_ID,
status: 'RUNNING',
currentStep: 'TRANSCODING',
lastUpdated: Date.now(),
};
});
expect(screen.getByTestId('status-value')).toHaveTextContent('RUNNING');
expect(screen.getByText(/Current Step:/i)).toHaveTextContent('TRANSCODING');
});
test('displays failure reason when saga fails', () => {
render(<SagaStatusTracker sagaId={SAGA_ID} />);
// 模拟失败状态更新
act(() => {
sagaStates[SAGA_ID] = {
sagaId: SAGA_ID,
status: 'FAILED',
currentStep: 'METADATA_EXTRACTION',
failureReason: 'Metadata service unreachable',
lastUpdated: Date.now(),
};
});
expect(screen.getByTestId('status-value')).toHaveTextContent('FAILED');
expect(screen.getByText(/Failure Reason:/i)).toBeInTheDocument();
expect(screen.getByText('Metadata service unreachable')).toBeInTheDocument();
});
test('does not show failure reason for non-failed states', () => {
render(<SagaStatusTracker sagaId={SAGA_ID} />);
act(() => {
sagaStates[SAGA_ID] = {
sagaId: SAGA_ID,
status: 'COMMITTED',
currentStep: 'DONE',
lastUpdated: Date.now(),
};
});
expect(screen.getByTestId('status-value')).toHaveTextContent('COMMITTED');
expect(screen.queryByText(/Failure Reason:/i)).not.toBeInTheDocument();
});
});
这里的act()
是关键,它确保了所有由状态更新引起的React渲染都完成后,才执行断言。通过这种方式,我们可以自信地验证UI在整个Saga生命周期中的行为都是正确的。
局限性与未来展望
这套架构虽然强大,但也引入了新的复杂性。运维一个Flink集群本身就需要专业知识。Flink作业的升级,特别是当状态schema
发生变化时,需要谨慎处理,利用Savepoints进行平滑迁移。对于业务开发人员来说,他们现在需要理解Saga的模式,并按照约定实现服务接口和补偿逻辑,这增加了心智负担。
未来的优化方向可以是在Flink之上构建一个更通用的Saga定义与执行平台。可以通过JSON或YAML来声明式地定义一个Saga的流程图、每个步骤的服务调用和补偿操作,由平台动态解析并执行这个定义。这样,业务开发者可以从Saga的底层实现细节中解脱出来,更专注于业务逻辑本身。此外,集成分布式追踪系统(如OpenTelemetry)至关重要,将Saga ID作为trace ID的一部分,可以清晰地串联起从Flink协调器到各个Node.js微服务的完整调用链,极大地简化了调试过程。