在 EKS 上使用 Go 构建面向 Cassandra 的分布式 Saga 事务协调器


项目初期,一个看似简单的用户注册流程,需要在用户服务中创建记录、在账户服务中初始化余额,并在权限服务中赋予默认角色。这三步操作,必须是一个原子操作。任何一步失败,整个注册流程都应回滚,不能留下脏数据。在关系型数据库的世界里,一个 BEGIN TRANSACTION 就能解决问题。但我们的后端技术栈选择了 Cassandra,这是一个为可扩展性和高可用性而设计的 NoSQL 数据库,它牺牲了多键事务这一便利性。

一个直接的、也是最容易出错的实现方式,就是在业务逻辑代码里串行调用三个服务:

// anti-pattern: a naive and dangerous implementation
func RegisterUser(ctx context.Context, user User, account Account, role Role) error {
	// Step 1: Create user
	if err := userService.Create(ctx, user); err != nil {
		// Failure here is simple, nothing to roll back
		return err
	}

	// Step 2: Initialize account
	if err := accountService.Initialize(ctx, account); err != nil {
		// CRITICAL PROBLEM: User is created, but account is not.
		// How to reliably delete the user? What if the delete call also fails?
		// The system is now in an inconsistent state.
		userService.Delete(ctx, user.ID) // This is a best-effort, not guaranteed
		return err
	}

	// Step 3: Grant role
	if err := permissionService.Grant(ctx, role); err != nil {
		// EVEN WORSE: User and account are created, but role is not.
		// Now we have to roll back two separate operations.
		accountService.Delete(ctx, account.ID)
		userService.Delete(ctx, user.ID)
		return err
	}

	return nil
}

这段代码在真实生产环境中是灾难性的。网络抖动、服务重启、节点故障都可能导致补偿操作(Delete)失败,最终导致数据不一致。这个痛点迫使我们必须寻找一个应用层的分布式事务解决方案。Saga 模式是处理这类问题的经典模型,它将一个长事务拆分为一系列本地事务,每个本地事务都有一个对应的补偿操作。当任何本地事务失败时,Saga 会反向执行所有已完成事务的补偿操作,从而保证最终一致性。

架构决策:编排式 Saga (Orchestration) vs. 协同式 Saga (Choreography)

Saga 模式主要有两种实现方式:

  1. 协同式 (Choreography): 每个服务在完成自己的本地事务后,发布一个事件。其他服务订阅这些事件并触发自己的本地事务。这种方式去中心化,服务间耦合度低。但在复杂流程中,事件链会变得难以追踪和调试,你很难知道整个分布式事务进行到了哪一步。
  2. 编排式 (Orchestration): 引入一个中心化的协调器(Orchestrator)。由协调器负责调用每个参与方服务,并根据执行结果决定是继续下一步还是执行补偿操作。这种方式逻辑集中,状态清晰,易于监控和排错。

考虑到我们的注册流程未来可能扩展到十几个步骤(如发送欢迎邮件、初始化用户画像、同步到CRM等),一个清晰、可控的执行流程至关重要。因此,我们选择了编排式 Saga。我们将构建一个专用的 Go 服务作为 Saga 协调器,部署在我们的 AWS EKS 集群上。

核心设计:Saga 协调器与状态持久化

协调器的核心职责是驱动 Saga 流程并记录其状态。为了保证协调器自身在崩溃后能够恢复,Saga 的执行状态日志必须被持久化。一个常见的错误是把状态存在内存里,或者存在 Redis 这种可能丢失数据的缓存中。

在我们的场景下,最务实的选择是直接使用 Cassandra 来存储 Saga 的状态日志。这避免了引入新的存储依赖(如 ZooKeeper 或 etcd),并且可以利用 Cassandra 的高可用特性。

首先,定义 Saga 日志的数据模型。

CQL Schema for Saga Log:

CREATE TABLE IF NOT EXISTS saga_logs.sagas (
    saga_id uuid,
    saga_name text,
    current_step int,
    status text, // PENDING, COMMITTING, COMPENSATING, FAILED, COMPLETED
    payload blob, // JSON encoded initial data
    step_states map<int, text>, // { step_index: status (e.g., "COMPLETED", "FAILED") }
    created_at timestamp,
    updated_at timestamp,
    PRIMARY KEY (saga_id)
);

这个表结构是协调器的心脏。saga_id 唯一标识一次事务,status 记录了整个 Saga 的宏观状态,current_stepstep_states 则精确追踪了微观的执行进度。

接下来,是协调器的核心 Go 结构定义。

// orchestrator/saga.go
package orchestrator

import (
	"context"
	"encoding/json"
	"time"
	"github.com/gocql/gocql"
)

// SagaStatus represents the overall status of a saga instance.
type SagaStatus string

const (
	StatusPending      SagaStatus = "PENDING"
	StatusCommitting   SagaStatus = "COMMITTING"
	StatusCompensating SagaStatus = "COMPENSATING"
	StatusCompleted    SagaStatus = "COMPLETED"
	StatusFailed       SagaStatus = "FAILED"
)

// Step represents a single action and its compensation in a saga.
type Step struct {
	Name          string
	Action        func(ctx context.Context, payload []byte) error
	Compensation  func(ctx context.Context, payload []byte) error
}

// Definition defines the structure of a saga.
type Definition struct {
	Name  string
	Steps []Step
}

// Instance represents a running instance of a saga.
type Instance struct {
	ID           gocql.UUID
	Name         string
	CurrentStep  int
	Status       SagaStatus
	Payload      []byte
	StepStates   map[int]string
	CreatedAt    time.Time
	UpdatedAt    time.Time
}

// SagaRepository handles persistence of saga instances.
type SagaRepository interface {
	Create(ctx context.Context, instance *Instance) error
	Update(ctx context.Context, instance *Instance) error
	Get(ctx context.Context, sagaID gocql.UUID) (*Instance, error)
	FindInFlight(ctx context.Context) ([]*Instance, error)
}

步骤化实现:构建健壮的协调器

1. Saga 执行引擎

执行引擎是协调器的大脑,它负责按顺序执行 Saga 的每个步骤,并在失败时触发补偿流程。

// orchestrator/engine.go
package orchestrator

import (
	"context"
	"log"
	"time"

	"github.com/gocql/gocql"
)

// Engine drives the saga execution.
type Engine struct {
	repo        SagaRepository
	definitions map[string]*Definition
}

func NewEngine(repo SagaRepository, definitions []*Definition) *Engine {
	defMap := make(map[string]*Definition)
	for _, def := range definitions {
		defMap[def.Name] = def
	}
	return &Engine{repo: repo, definitions: defMap}
}

// Start a new saga instance.
func (e *Engine) Start(ctx context.Context, sagaName string, payload []byte) (gocql.UUID, error) {
	def, ok := e.definitions[sagaName]
	if !ok {
		return gocql.UUID{}, fmt.Errorf("saga definition '%s' not found", sagaName)
	}

	instance := &Instance{
		ID:          gocql.TimeUUID(),
		Name:        sagaName,
		CurrentStep: 0,
		Status:      StatusCommitting,
		Payload:     payload,
		StepStates:  make(map[int]string),
		CreatedAt:   time.Now().UTC(),
		UpdatedAt:   time.Now().UTC(),
	}

	if err := e.repo.Create(ctx, instance); err != nil {
		return gocql.UUID{}, fmt.Errorf("failed to create saga instance: %w", err)
	}

	// Asynchronously execute the saga.
	go e.execute(context.Background(), instance)

	return instance.ID, nil
}

// The core execution loop.
func (e *Engine) execute(ctx context.Context, instance *Instance) {
	def := e.definitions[instance.Name]

	for i := instance.CurrentStep; i < len(def.Steps); i++ {
		step := def.Steps[i]
		log.Printf("Saga[%s]: Executing step %d: %s", instance.ID, i, step.Name)

		instance.CurrentStep = i
		instance.StepStates[i] = "PENDING"
		if err := e.repo.Update(ctx, instance); err != nil {
			log.Printf("SAGA_FATAL: Saga[%s] failed to update state before step %d: %v", instance.ID, i, err)
			// This is a critical failure. The system needs a recovery mechanism.
			// We can't proceed. The recovery process will pick this up later.
			return
		}

		// In a real system, use a robust HTTP client with retries and timeouts.
		err := step.Action(ctx, instance.Payload)
		if err != nil {
			log.Printf("Saga[%s]: Step %d (%s) failed: %v. Starting compensation.", instance.ID, i, step.Name, err)
			instance.Status = StatusCompensating
			instance.StepStates[i] = "FAILED"
			if updateErr := e.repo.Update(ctx, instance); updateErr != nil {
				log.Printf("SAGA_FATAL: Saga[%s] failed to update state to COMPENSATING: %v", instance.ID, updateErr)
				return
			}
			e.compensate(ctx, instance)
			return
		}

		instance.StepStates[i] = "COMPLETED"
		log.Printf("Saga[%s]: Step %d (%s) completed.", instance.ID, i, step.Name)
	}

	instance.Status = StatusCompleted
	instance.UpdatedAt = time.Now().UTC()
	if err := e.repo.Update(ctx, instance); err != nil {
		log.Printf("SAGA_FATAL: Saga[%s] failed to update final status to COMPLETED: %v", instance.ID, err)
		return
	}
	log.Printf("Saga[%s]: Execution completed successfully.", instance.ID)
}

// The compensation loop.
func (e *Engine) compensate(ctx context.Context, instance *Instance) {
	def := e.definitions[instance.Name]

	// Compensate backwards from the last successfully completed or the failed step.
	for i := instance.CurrentStep; i >= 0; i-- {
		// Only compensate steps that have actually started or completed.
		if status, ok := instance.StepStates[i]; !ok || status == "PENDING" {
			continue
		}
		
		step := def.Steps[i]
		log.Printf("Saga[%s]: Compensating step %d: %s", instance.ID, i, step.Name)

		// Compensation actions must be idempotent and should rarely fail.
		// If they do fail, a human operator needs to be alerted.
		err := step.Compensation(ctx, instance.Payload)
		if err != nil {
			log.Printf("SAGA_CRITICAL: Saga[%s] compensation for step %d (%s) failed: %v. Manual intervention required.", instance.ID, i, step.Name, err)
			instance.Status = StatusFailed
			if updateErr := e.repo.Update(ctx, instance); updateErr != nil {
				log.Printf("SAGA_FATAL: Saga[%s] failed to update status to FAILED: %v", instance.ID, updateErr)
			}
			return
		}
		instance.StepStates[i] = "COMPENSATED"
		if updateErr := e.repo.Update(ctx, instance); updateErr != nil {
			log.Printf("SAGA_FATAL: Saga[%s] failed to update state after compensation: %v", instance.ID, updateErr)
			return
		}
	}

	instance.Status = StatusFailed // Overall status is failed, even if compensation succeeds
	instance.UpdatedAt = time.Now().UTC()
	if err := e.repo.Update(ctx, instance); err != nil {
		log.Printf("SAGA_FATAL: Saga[%s] failed to update final status to FAILED: %v", instance.ID, err)
	}
	log.Printf("Saga[%s]: Compensation completed.", instance.ID)
}

这里的坑在于状态更新。必须在执行任何ActionCompensation之前,先将Saga实例的状态更新到数据库。这遵循了“Write-Ahead Log”的原则。如果协调器在调用Action后、更新状态前崩溃,重启后它会认为这个Action从未执行过,从而导致重复执行,破坏幂等性。

2. 协调器崩溃恢复

这是体现系统健壮性的关键。如果运行协调器的 Pod 在 EKS 上被重启,我们必须能恢复所有中断的 Saga 流程。实现方式是在协调器服务启动时,启动一个后台 goroutine,扫描数据库中所有处于中间状态(COMMITTINGCOMPENSATING)的 Saga,然后将它们重新送入执行引擎。

// orchestrator/recovery.go
package orchestrator

import (
	"context"
	"log"
	"time"
)

func (e *Engine) StartRecoveryProcess(ctx context.Context) {
	log.Println("Starting Saga recovery process...")
	ticker := time.NewTicker(1 * time.Minute) // Periodically scan for stranded sagas
	defer ticker.Stop()

	for {
		select {
		case <-ctx.Done():
			log.Println("Stopping Saga recovery process.")
			return
		case <-ticker.C:
			e.recoverInFlightSagas(ctx)
		}
	}
}

func (e *Engine) recoverInFlightSagas(ctx context.Context) {
	inFlight, err := e.repo.FindInFlight(ctx)
	if err != nil {
		log.Printf("Recovery Error: Failed to find in-flight sagas: %v", err)
		return
	}

	if len(inFlight) == 0 {
		return
	}

	log.Printf("Recovery: Found %d in-flight sagas to recover.", len(inFlight))
	for _, instance := range inFlight {
		// Use a copy of the instance for goroutine safety
		instToRecover := instance
		log.Printf("Recovery: Resuming saga %s from step %d with status %s", instToRecover.ID, instToRecover.CurrentStep, instToRecover.Status)
		
		go func() {
			if instToRecover.Status == StatusCommitting {
				e.execute(context.Background(), instToRecover)
			} else if instToRecover.Status == StatusCompensating {
				e.compensate(context.Background(), instToRecover)
			}
		}()
	}
}

main.go 中启动这个恢复进程:

// main.go
func main() {
	// ... (repository, definitions setup)
	
	engine := orchestrator.NewEngine(repo, definitions)
	
	// Start the background recovery process
	recoveryCtx, cancelRecovery := context.WithCancel(context.Background())
	defer cancelRecovery()
	go engine.StartRecoveryProcess(recoveryCtx)

	// ... (start http server for API)
}

3. 定义和使用 Saga

现在,我们可以为用户注册流程定义一个 Saga。ActionCompensation的实现通常是调用其他微服务的 HTTP 客户端。

// sagas/registration.go

type RegistrationPayload struct {
	UserID    string `json:"user_id"`
	Email     string `json:"email"`
	AccountID string `json:"account_id"`
	Role      string `json:"role"`
}

// Assume userService, accountService, permissionService are clients for other microservices.
func GetRegistrationSaga() *orchestrator.Definition {
	return &orchestrator.Definition{
		Name: "UserRegistration",
		Steps: []orchestrator.Step{
			{
				Name: "CreateUser",
				Action: func(ctx context.Context, payload []byte) error {
					var p RegistrationPayload
					json.Unmarshal(payload, &p)
					return userService.Create(ctx, p.UserID, p.Email)
				},
				Compensation: func(ctx context.Context, payload []byte) error {
					var p RegistrationPayload
					json.Unmarshal(payload, &p)
					// Compensation actions MUST be idempotent.
					return userService.Delete(ctx, p.UserID)
				},
			},
			{
				Name: "InitializeAccount",
				Action: func(ctx context.Context, payload []byte) error {
					var p RegistrationPayload
					json.Unmarshal(payload, &p)
					return accountService.Initialize(ctx, p.AccountID, p.UserID)
				},
				Compensation: func(ctx context.Context, payload []byte) error {
					var p RegistrationPayload
					json.Unmarshal(payload, &p)
					return accountService.Delete(ctx, p.AccountID)
				},
			},
			// ... more steps
		},
	}
}

4. 前端交互与 API 设计

前端不能同步等待这个长流程完成。API 应该是异步的。

  1. POST /api/v1/registrations: 前端发起注册请求。

    • 请求体包含所有需要的数据。
    • 协调器立即调用 engine.Start(),这会创建一个 Saga 记录并异步开始执行。
    • API 立即返回 202 Accepted,响应体中包含 saga_id
  2. GET /api/v1/registrations/status/{saga_id}: 前端使用上一步获取的 saga_id 来轮询事务的最终状态。

    • API 查询 Saga 日志表,返回 status 字段(如 COMMITTING, COMPLETED, FAILED)。
    • 当前端获取到 COMPLETEDFAILED 时,停止轮询并向用户展示最终结果。

这种异步API设计将前端体验与后端复杂且耗时的流程解耦,提升了用户体验。

5. EKS 部署考量

将协调器部署到 EKS 时,有几个关键配置。

Kubernetes Deployment YAML (simplified):

apiVersion: apps/v1
kind: Deployment
metadata:
  name: saga-orchestrator
spec:
  replicas: 2 # At least two for high availability
  selector:
    matchLabels:
      app: saga-orchestrator
  template:
    metadata:
      labels:
        app: saga-orchestrator
    spec:
      containers:
      - name: orchestrator
        image: your-repo/saga-orchestrator:v1.0.0
        ports:
        - containerPort: 8080
        envFrom:
        - configMapRef:
            name: saga-config
        livenessProbe:
          httpGet:
            path: /healthz
            port: 8080
          initialDelaySeconds: 15
          periodSeconds: 20
        readinessProbe:
          httpGet:
            path: /readyz
            port: 8080
          initialDelaySeconds: 20 # Give time for initial recovery scan
          periodSeconds: 10
  • 高可用: 部署至少两个副本。但是,这会引入一个新问题:两个 Pod 可能会同时尝试恢复同一个 Saga 实例。这需要一个分布式锁机制。一个简单的实现是在 Saga 日志表中增加一个 locked_bylock_expires_at 字段。恢复进程在处理一个实例前,先尝试获取锁。
  • 探针 (Probes): readinessProbe 非常重要。在服务启动后,它应该等待初始的恢复扫描完成后才返回成功。否则,EKS 可能会在服务准备好处理新请求(和恢复旧请求)之前就将流量导入,导致问题。
sequenceDiagram
    participant Frontend
    participant Saga Orchestrator API
    participant Saga Engine
    participant Cassandra (Saga Log)
    participant Service A
    participant Service B

    Frontend->>+Saga Orchestrator API: POST /start_saga (payload)
    Saga Orchestrator API->>Saga Engine: Start("SagaName", payload)
    Saga Engine->>+Cassandra (Saga Log): CREATE Saga record (status: COMMITTING)
    Cassandra (Saga Log)-->>-Saga Engine: Saga created, ID: xyz
    Saga Engine-->>Saga Orchestrator API: return saga_id: xyz
    Saga Orchestrator API-->>-Frontend: 202 Accepted, { "saga_id": "xyz" }

    Note right of Saga Engine: Execution happens async
    Saga Engine->>+Cassandra (Saga Log): UPDATE step 0 state: PENDING
    Cassandra (Saga Log)-->>-Saga Engine: OK
    Saga Engine->>+Service A: Execute Action 1
    Service A-->>-Saga Engine: Success
    Saga Engine->>+Cassandra (Saga Log): UPDATE step 0 state: COMPLETED
    Cassandra (Saga Log)-->>-Saga Engine: OK

    Saga Engine->>+Cassandra (Saga Log): UPDATE step 1 state: PENDING
    Cassandra (Saga Log)-->>-Saga Engine: OK
    Saga Engine->>+Service B: Execute Action 2
    Service B-->>-Saga Engine: Error (e.g., 500)

    Note right of Saga Engine: Failure detected, start compensation
    Saga Engine->>+Cassandra (Saga Log): UPDATE Saga status: COMPENSATING, step 1 state: FAILED
    Cassandra (Saga Log)-->>-Saga Engine: OK

    Saga Engine->>+Service A: Execute Compensation 1
    Service A-->>-Saga Engine: Success
    Saga Engine->>+Cassandra (Saga Log): UPDATE step 0 state: COMPENSATED
    Cassandra (Saga Log)-->>-Saga Engine: OK
    Saga Engine->>+Cassandra (Saga Log): UPDATE Saga status: FAILED
    Cassandra (Saga Log)-->>-Saga Engine: OK

    loop Poll Status
        Frontend->>+Saga Orchestrator API: GET /status/xyz
        Saga Orchestrator API->>+Cassandra (Saga Log): SELECT status FROM sagas WHERE saga_id=xyz
        Cassandra (Saga Log)-->>-Saga Orchestrator API: status: FAILED
        Saga Orchestrator API-->>-Frontend: 200 OK, { "status": "FAILED" }
    end

当前方案的局限性与未来展望

我们构建的这套基于 Go 和 Cassandra 的 Saga 协调器解决了跨服务数据一致性的核心痛点,并且具备了通过崩溃恢复保障的健壮性。但它并非银弹,也存在一些局限和可以演进的方向。

首先,目前的协调器实现虽然可以通过部署多个实例来避免单点故障,但缺乏一个成熟的分布式锁或领导者选举机制来防止多个实例同时处理同一个 Saga,这在高负载下可能导致重复操作。引入基于 etcd 或 Consul 的分布式锁会是下一步的优化重点。

其次,Saga 的定义目前是硬编码在代码中的。对于更动态的业务流程,可以将其演变为一个配置驱动的引擎,从数据库或配置文件中加载 Saga 的步骤定义,从而实现业务流程的动态调整而无需重新部署协调器服务。

最后,随着业务复杂度的增加,全链路的监控和可观测性变得至关重要。需要将 OpenTelemetry 集成到协调器和所有参与方服务中,将 saga_id 作为追踪上下文的一部分,以便在 Jaeger 或 Zipkin 这样的系统中,可以清晰地看到一次分布式事务的完整生命周期、瓶颈和失败点。这对于排查生产环境中的复杂问题是不可或缺的。


  目录