利用 Flink 状态化流处理实现持久化 Saga 模式及其前端状态同步


一个跨多个微服务的业务流程,比如视频处理发布,其内在的复杂性不在于单个服务的实现,而在于如何保证整个流程的原子性。一个典型的流程可能包含:接收上传、启动转码、提取元数据、内容审核、分发CDN。这五个步骤,分布在不同服务中,任何一步失败,都需要可靠地回滚已完成的操作,避免产生数据孤岛或“幽灵数据”。在真实项目中,依靠业务代码在每个服务里手动处理补偿逻辑,会迅速演变成一场维护噩梦。

最初的构想是基于消息队列的编排(Choreography)Saga。每个服务完成自己的工作后,发出一个事件,下一个服务监听该事件并开始工作。失败时,则发出一个失败事件,之前的服务监听并执行补偿。这种模式看似简单,但很快暴露了致命缺陷:

  1. 状态可见性黑洞:整个Saga的全局状态是隐式的,分散在各个服务的日志和消息队列的流转记录中。当一个流程卡住时,定位问题变得极其困难。
  2. 环路依赖:服务之间通过事件相互订阅,很容易形成复杂的、难以追踪的依赖关系,甚至循环依赖。
  3. 补偿逻辑的脆弱性:哪个服务负责触发补偿?如果一个服务在执行补偿操作时崩溃了怎么办?这套机制本身缺乏一个可靠的“事务协调者”。

我们需要一个强有力的协调者(Orchestrator),它必须是有状态的持久化的高可用的。一个普通的Node.js服务来做协调者?当服务实例崩溃时,内存中的所有Saga状态都会丢失。用Redis或数据库来持久化状态?我们需要自己处理并发控制、状态转移的原子性,以及从故障中恢复的复杂逻辑。这本质上是在重复造一个状态机的轮子。

这就是引入Apache Flink的原因。Flink不仅仅是一个数据处理引擎,其核心是一个强大的分布式状态化流处理框架。它的Checkpoint机制可以保证状态的Exactly-Once,内置的定时器服务可以完美处理Saga的超时,而其本身的高可用架构解决了单点故障问题。我们可以将每一个Saga实例看作一个数据流中的“事件”,Saga协调器就是一个Flink作业,负责管理每个Saga实例的生命周期状态机。

架构设计:Flink作为Saga协调中心

整个系统的核心是一个由Flink作业实现的Saga Orchestrator。所有参与方(Node.js微服务)通过一个中心化的消息队列(如Kafka)与Orchestrator通信。

sequenceDiagram
    participant Client
    participant SagaInitiator (Node.js)
    participant Kafka
    participant FlinkOrchestrator
    participant TranscodingSvc (Node.js)
    participant MetadataSvc (Node.js)
    participant NotificationSvc (Node.js)

    Client->>SagaInitiator: POST /startVideoProcess
    SagaInitiator->>Kafka: Topic: saga-commands | Msg: { type: 'START', sagaId: '...', payload: {...} }
    
    FlinkOrchestrator->>Kafka: Consume saga-commands
    FlinkOrchestrator-->>FlinkOrchestrator: Create new SagaState
    FlinkOrchestrator->>Kafka: Topic: transcoding-commands | Msg: { type: 'START_TRANSCODE', sagaId: '...', ... }
    FlinkOrchestrator->>Kafka: Topic: saga-state-updates | Msg: { sagaId: '...', status: 'RUNNING', step: 'TRANSCODING' }

    TranscodingSvc->>Kafka: Consume transcoding-commands
    TranscodingSvc-->>TranscodingSvc: Process video...
    TranscodingSvc->>Kafka: Topic: saga-events | Msg: { type: 'TRANSCODE_SUCCESS', sagaId: '...', ... }
    
    FlinkOrchestrator->>Kafka: Consume saga-events
    FlinkOrchestrator-->>FlinkOrchestrator: Update SagaState (transcoding complete)
    FlinkOrchestrator->>Kafka: Topic: metadata-commands | Msg: { type: 'EXTRACT_METADATA', sagaId: '...', ... }
    FlinkOrchestrator->>Kafka: Topic: saga-state-updates | Msg: { sagaId: '...', status: 'RUNNING', step: 'METADATA_EXTRACTION' }

    MetadataSvc->>Kafka: Consume metadata-commands
    MetadataSvc-->>MetadataSvc: Extract metadata...
    MetadataSvc->>Kafka: Topic: saga-events | Msg: { type: 'METADATA_FAILURE', sagaId: '...', reason: '...' }

    FlinkOrchestrator->>Kafka: Consume saga-events (failure)
    FlinkOrchestrator-->>FlinkOrchestrator: Start compensation logic
    FlinkOrchestrator->>Kafka: Topic: transcoding-commands | Msg: { type: 'CANCEL_TRANSCODE', sagaId: '...', ... }
    FlinkOrchestrator->>Kafka: Topic: saga-state-updates | Msg: { sagaId: '...', status: 'COMPENSATING', step: 'CANCEL_TRANSCODING' }
    
    TranscodingSvc->>Kafka: Consume transcoding-commands
    TranscodingSvc-->>TranscodingSvc: Delete transcoded artifacts...
    TranscodingSvc->>Kafka: Topic: saga-events | Msg: { type: 'CANCEL_TRANSCODE_SUCCESS', sagaId: '...' }

    FlinkOrchestrator->>Kafka: Consume saga-events
    FlinkOrchestrator-->>FlinkOrchestrator: Finalize SagaState as FAILED
    FlinkOrchestrator->>Kafka: Topic: saga-state-updates | Msg: { sagaId: '...', status: 'FAILED' }
    
    NotificationSvc->>Kafka: Consume saga-state-updates
    NotificationSvc-->>Client: Push real-time status via WebSocket

我们将使用Flink的KeyedProcessFunction来为每个sagaId维护一个独立的状态机。

1. Saga状态定义 (POJO)

// SagaState.java
import java.io.Serializable;
import java.util.Map;
import java.util.Stack;

public class SagaState implements Serializable {
    private static final long serialVersionUID = 1L;

    public String sagaId;
    public SagaStatus status;
    public String currentStep;
    public Map<String, Object> payload;
    // 用于存储补偿操作
    public Stack<CompensationStep> compensationStack; 
    public long lastUpdated;
    public String failureReason;

    public enum SagaStatus {
        STARTED, RUNNING, COMMITTED, FAILED, COMPENSATING
    }

    public static class CompensationStep implements Serializable {
        private static final long serialVersionUID = 1L;
        public String service;
        public String action;
        public Map<String, Object> payload;
    }
}

2. KeyedProcessFunction 实现

这个函数是整个协调器的心脏。它消费来自saga-commandssaga-events两个Topic的合并流,并根据sagaId进行分区。

// SagaOrchestrator.java
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
import org.apache.flink.util.Collector;

// InputSagaEvent is a wrapper for commands and events
public class SagaOrchestrator extends KeyedProcessFunction<String, InputSagaEvent, SagaStateUpdate> {

    private static final long SAGA_TIMEOUT_MS = 60 * 60 * 1000; // 1 hour timeout

    // 每个Saga实例的状态都由Flink管理,保证了持久化和容错
    private transient ValueState<SagaState> state;

    @Override
    public void open(Configuration parameters) {
        ValueStateDescriptor<SagaState> descriptor = new ValueStateDescriptor<>(
                "saga-state", SagaState.class);
        state = getRuntimeContext().getState(descriptor);
    }

    @Override
    public void processElement(InputSagaEvent event, Context ctx, Collector<SagaStateUpdate> out) throws Exception {
        SagaState currentState = state.value();
        
        // 为Saga设置一个全局超时定时器
        long timerTimestamp = ctx.timestamp() + SAGA_TIMEOUT_MS;
        ctx.timerService().registerEventTimeTimer(timerTimestamp);

        if (event.getType().equals("START")) {
            if (currentState != null) {
                // 幂等性处理:如果Saga已存在,则忽略START命令
                return;
            }
            // 初始化Saga
            SagaState newState = SagaFactory.createFromStartCommand(event);
            state.update(newState);
            
            // 启动第一个步骤
            executeNextStep(newState, ctx, out);
        } else {
            if (currentState == null) {
                // 孤儿事件,可能来自延迟或重复的消息,直接忽略
                // 在生产环境中,这里应该记录一个严重的警告日志
                return;
            }
            // 处理后续事件
            processSagaEvent(currentState, event, ctx, out);
        }
    }

    private void processSagaEvent(SagaState currentState, InputSagaEvent event, Context ctx, Collector<SagaStateUpdate> out) throws Exception {
        // 核心状态机逻辑
        switch (currentState.status) {
            case RUNNING:
                if (event.isSuccess()) {
                    SagaLogic.onStepSuccess(currentState, event);
                    executeNextStep(currentState, ctx, out);
                } else {
                    currentState.status = SagaState.SagaStatus.COMPENSATING;
                    currentState.failureReason = event.getReason();
                    startCompensation(currentState, ctx, out);
                }
                break;
            case COMPENSATING:
                if (event.isCompensationSuccess()) {
                    SagaLogic.onCompensationSuccess(currentState, event);
                    compensateNextStep(currentState, ctx, out);
                } else {
                    // 补偿失败是严重问题,需要人工介入
                    // 标记Saga为失败,并发出告警
                    currentState.status = SagaState.SagaStatus.FAILED;
                    // Log critical error, maybe push to a dead-letter queue
                }
                break;
            // COMMITTED, FAILED 状态是终态,不再处理事件
        }
        
        currentState.lastUpdated = ctx.timestamp();
        state.update(currentState);
        out.collect(SagaStateUpdate.from(currentState));
    }
    
    private void executeNextStep(SagaState currentState, Context ctx, Collector<SagaStateUpdate> out) {
        // 从Saga定义中获取下一步操作
        SagaStep nextStep = SagaDefinition.getNextStep(currentState.currentStep);
        if (nextStep != null) {
            // 通过Side Output或直接写入Kafka来发送命令
            // ctx.output(commandOutputTag, nextStep.buildCommand(currentState));
            currentState.currentStep = nextStep.getName();
            currentState.status = SagaState.SagaStatus.RUNNING;
            
            // 将补偿操作压入栈
            currentState.compensationStack.push(nextStep.getCompensationStep());
        } else {
            // 所有步骤完成,Saga提交
            currentState.status = SagaState.SagaStatus.COMMITTED;
        }
        
        state.update(currentState);
        out.collect(SagaStateUpdate.from(currentState));
    }

    private void startCompensation(SagaState currentState, Context ctx, Collector<SagaStateUpdate> out) {
        compensateNextStep(currentState, ctx, out);
    }

    private void compensateNextStep(SagaState currentState, Context ctx, Collector<SagaStateUpdate> out) {
        if (!currentState.compensationStack.isEmpty()) {
            SagaState.CompensationStep stepToCompensate = currentState.compensationStack.pop();
            // 发送补偿命令
            // ctx.output(commandOutputTag, buildCompensationCommand(stepToCompensate));
        } else {
            // 所有补偿完成,Saga最终失败
            currentState.status = SagaState.SagaStatus.FAILED;
        }
        
        state.update(currentState);
        out.collect(SagaStateUpdate.from(currentState));
    }

    @Override
    public void onTimer(long timestamp, OnTimerContext ctx, Collector<SagaStateUpdate> out) throws Exception {
        SagaState currentState = state.value();
        if (currentState != null && currentState.status != SagaState.SagaStatus.COMMITTED && currentState.status != SagaState.SagaStatus.FAILED) {
            // Saga 超时
            currentState.status = SagaState.SagaStatus.COMPENSATING;
            currentState.failureReason = "Saga timed out";
            startCompensation(currentState, ctx, out);
            state.update(currentState);
            out.collect(SagaStateUpdate.from(currentState));
        }
    }
}

这里的关键在于,所有状态的修改都通过state.update()提交给Flink。当Checkpoint发生时,这些状态会被快照到持久化存储(如HDFS或S3)。如果作业失败并重启,Flink会自动从上一个成功的Checkpoint恢复状态,保证每个Saga实例都能从中断的地方继续执行,不多也不少。

Node.js 微服务实现

参与Saga的Node.js服务非常简单。它们是无状态的,只负责监听命令,执行业务逻辑,然后发布结果事件。

// transcoding-service/index.js
const { Kafka } = require('kafkajs');

const kafka = new Kafka({
  clientId: 'transcoding-service',
  brokers: ['kafka:9092'],
});

const producer = kafka.producer();
const consumer = kafka.consumer({ groupId: 'transcoding-group' });

// 模拟业务逻辑
const startTranscode = async (sagaId, payload) => {
  console.log(`[${sagaId}] Starting transcode for: ${payload.sourceUri}`);
  // 模拟耗时操作
  await new Promise(resolve => setTimeout(resolve, 5000));
  
  // 模拟随机失败
  if (Math.random() < 0.2) {
      console.error(`[${sagaId}] Transcoding failed`);
      throw new Error('FFMPEG process failed');
  }

  console.log(`[${sagaId}] Transcoding successful`);
  return { transcodedUri: `s3://bucket/processed/${payload.id}.mp4` };
};

const cancelTranscode = async (sagaId, payload) => {
    console.log(`[${sagaId}] Compensating: Deleting transcoded artifacts for: ${payload.sourceUri}`);
    // 补偿操作必须是幂等的,且尽可能保证成功
    await new Promise(resolve => setTimeout(resolve, 1000));
    console.log(`[${sagaId}] Compensation successful`);
};


const run = async () => {
  await producer.connect();
  await consumer.connect();

  await consumer.subscribe({ topic: 'transcoding-commands', fromBeginning: true });

  await consumer.run({
    eachMessage: async ({ topic, partition, message }) => {
      const command = JSON.parse(message.value.toString());
      const { sagaId, type, payload } = command;

      try {
        if (type === 'START_TRANSCODE') {
          const result = await startTranscode(sagaId, payload);
          await producer.send({
            topic: 'saga-events',
            messages: [{ value: JSON.stringify({ sagaId, type: 'TRANSCODE_SUCCESS', payload: result }) }],
          });
        } else if (type === 'CANCEL_TRANSCODE') {
          await cancelTranscode(sagaId, payload);
          await producer.send({
            topic: 'saga-events',
            messages: [{ value: JSON.stringify({ sagaId, type: 'CANCEL_TRANSCODE_SUCCESS' }) }],
          });
        }
      } catch (error) {
        // 任何业务逻辑失败,都发送失败事件
        await producer.send({
            topic: 'saga-events',
            messages: [{ value: JSON.stringify({ sagaId, type: 'TRANSCODE_FAILURE', reason: error.message }) }],
        });
      }
    },
  });
};

run().catch(e => console.error('[transcoding-service] Error', e));

前端实时状态展示与Valtio

当后端流程复杂且异步时,前端的状态管理也面临挑战。用户需要实时看到任务的进展。我们使用一个简单的Node.js服务(NotificationSvc)消费saga-state-updates Kafka Topic,并通过WebSocket将状态实时推送给前端。

在前端,Valtio是一个极简且强大的状态管理库。它的Proxy-based机制使得状态更新和组件重渲染非常自然。

1. Valtio Store

// src/store/sagaStore.ts
import { proxy, subscribe } from 'valtio';

export type SagaStatus = 'PENDING' | 'RUNNING' | 'COMMITTED' | 'FAILED' | 'COMPENSATING';

export interface SagaState {
  sagaId: string;
  status: SagaStatus;
  currentStep: string;
  failureReason?: string;
  lastUpdated: number;
}

// 使用Map来存储多个Saga实例的状态
const sagaStates = proxy<Record<string, SagaState>>({});

// 模拟WebSocket连接
// 在真实应用中,这里会是一个WebSocket客户端
const mockWebSocket = {
  onmessage: (handler: (event: { data: string }) => void) => {
    // 模拟来自服务器的推送
    setInterval(() => {
        const mockUpdate = { sagaId: 'video-123', status: 'RUNNING', currentStep: 'METADATA_EXTRACTION', lastUpdated: Date.now() };
        handler({ data: JSON.stringify(mockUpdate) });
    }, 3000);
  }
};

mockWebSocket.onmessage((event) => {
  const update = JSON.parse(event.data) as SagaState;
  // 直接修改proxy对象,所有使用该状态的组件都会自动更新
  sagaStates[update.sagaId] = update;
});

export default sagaStates;

2. React Component

// src/components/SagaStatusTracker.tsx
import React from 'react';
import { useSnapshot } from 'valtio';
import sagaStates from '../store/sagaStore';

interface Props {
  sagaId: string;
}

const SagaStatusTracker: React.FC<Props> = ({ sagaId }) => {
  // useSnapshot会创建状态的一个不可变快照
  // 当store变化时,组件自动重渲染
  const states = useSnapshot(sagaStates);
  const saga = states[sagaId];

  if (!saga) {
    return <div data-testid="saga-status">Status for {sagaId}: PENDING...</div>;
  }

  return (
    <div data-testid="saga-status">
      <h2>Saga Status: {saga.sagaId}</h2>
      <p><strong>Status:</strong> <span data-testid="status-value">{saga.status}</span></p>
      <p><strong>Current Step:</strong> {saga.currentStep}</p>
      {saga.status === 'FAILED' && (
        <p style={{ color: 'red' }}>
          <strong>Failure Reason:</strong> {saga.failureReason}
        </p>
      )}
      <p><small>Last Updated: {new Date(saga.lastUpdated).toLocaleTimeString()}</small></p>
    </div>
  );
};

export default SagaStatusTracker;

Valtio的优雅之处在于,我们无需编写actions, reducers或selectors。只需修改代理状态对象,React组件就能响应式地更新。

使用React Testing Library测试复杂状态流

对于这样一个实时更新的UI,测试尤为重要。我们需要确保UI能正确响应来自后端的各种状态变更。React Testing Library (RTL) 提倡测试用户所见的行为,而不是实现细节。

// src/components/SagaStatusTracker.test.tsx
import React from 'react';
import { render, screen, act } from '@testing-library/react';
import SagaStatusTracker from './SagaStatusTracker';
import sagaStates from '../store/sagaStore';

// 我们需要手动控制Valtio store来模拟WebSocket推送
describe('SagaStatusTracker', () => {
  const SAGA_ID = 'test-saga-001';

  beforeEach(() => {
    // 每个测试用例前清空状态
    for (const key in sagaStates) {
      delete sagaStates[key];
    }
  });

  test('renders initial pending state if saga does not exist', () => {
    render(<SagaStatusTracker sagaId={SAGA_ID} />);
    expect(screen.getByTestId('saga-status')).toHaveTextContent('Status for test-saga-001: PENDING...');
  });

  test('updates status and current step when store changes', () => {
    render(<SagaStatusTracker sagaId={SAGA_ID} />);

    // 模拟第一次状态更新 (RUNNING)
    act(() => {
      sagaStates[SAGA_ID] = {
        sagaId: SAGA_ID,
        status: 'RUNNING',
        currentStep: 'TRANSCODING',
        lastUpdated: Date.now(),
      };
    });
    
    expect(screen.getByTestId('status-value')).toHaveTextContent('RUNNING');
    expect(screen.getByText(/Current Step:/i)).toHaveTextContent('TRANSCODING');
  });

  test('displays failure reason when saga fails', () => {
    render(<SagaStatusTracker sagaId={SAGA_ID} />);

    // 模拟失败状态更新
    act(() => {
      sagaStates[SAGA_ID] = {
        sagaId: SAGA_ID,
        status: 'FAILED',
        currentStep: 'METADATA_EXTRACTION',
        failureReason: 'Metadata service unreachable',
        lastUpdated: Date.now(),
      };
    });

    expect(screen.getByTestId('status-value')).toHaveTextContent('FAILED');
    expect(screen.getByText(/Failure Reason:/i)).toBeInTheDocument();
    expect(screen.getByText('Metadata service unreachable')).toBeInTheDocument();
  });

  test('does not show failure reason for non-failed states', () => {
    render(<SagaStatusTracker sagaId={SAGA_ID} />);

    act(() => {
      sagaStates[SAGA_ID] = {
        sagaId: SAGA_ID,
        status: 'COMMITTED',
        currentStep: 'DONE',
        lastUpdated: Date.now(),
      };
    });

    expect(screen.getByTestId('status-value')).toHaveTextContent('COMMITTED');
    expect(screen.queryByText(/Failure Reason:/i)).not.toBeInTheDocument();
  });
});

这里的act()是关键,它确保了所有由状态更新引起的React渲染都完成后,才执行断言。通过这种方式,我们可以自信地验证UI在整个Saga生命周期中的行为都是正确的。

局限性与未来展望

这套架构虽然强大,但也引入了新的复杂性。运维一个Flink集群本身就需要专业知识。Flink作业的升级,特别是当状态schema发生变化时,需要谨慎处理,利用Savepoints进行平滑迁移。对于业务开发人员来说,他们现在需要理解Saga的模式,并按照约定实现服务接口和补偿逻辑,这增加了心智负担。

未来的优化方向可以是在Flink之上构建一个更通用的Saga定义与执行平台。可以通过JSON或YAML来声明式地定义一个Saga的流程图、每个步骤的服务调用和补偿操作,由平台动态解析并执行这个定义。这样,业务开发者可以从Saga的底层实现细节中解脱出来,更专注于业务逻辑本身。此外,集成分布式追踪系统(如OpenTelemetry)至关重要,将Saga ID作为trace ID的一部分,可以清晰地串联起从Flink协调器到各个Node.js微服务的完整调用链,极大地简化了调试过程。


  目录