基于 etcd 与 MongoDB 构建动态预览环境的分布式协调与状态持久化实践


团队的开发流程一直有个瓶颈:代码审查(Code Review)严重依赖于开发者的本地环境。前端工程师需要拉取后端分支,后端工程师需要理解前端的构建过程,这导致每次 PR 的审查周期都很长,反馈回路极慢。我们的目标是实现自动化预览环境——任何一个 Pull Request 被创建或更新,系统都能自动为其构建一个独立的、可访问的、全功能的运行环境。

最初的构想是一个简单的基于数据库轮询的系统。API 服务器接收到 webhook 请求后,在 MongoDB 中创建一条记录,标记状态为 PENDING。后台有一组 Worker 进程,每隔几秒轮询数据库,查找 PENDING 的任务,然后尝试抢占并执行。

这种方案在真实项目中很快就暴露了问题:

  1. 轮询效率低下: Worker 对数据库造成了不必要的周期性压力,且状态更新有延迟。
  2. 抢占逻辑复杂: 需要实现一套悲观锁或乐观锁机制来避免多个 Worker 处理同一个任务,这增加了业务逻辑的复杂性和出错的可能。
  3. 状态同步困难: Worker 崩溃后,任务状态可能永久停留在 PROCESSING,需要额外的“巡检”或“心跳”机制来处理异常,这又是一个复杂度的叠加。

我们需要一个更可靠的机制来处理分布式环境下的任务协调、锁和状态通知。这正是 etcd 的用武之地。最终,我们敲定的架构是:使用 MongoDB 作为环境元数据的持久化存储,而使用 etcd 作为分布式任务协调的核心组件,负责锁、租约和实时状态通知。

sequenceDiagram
    participant User as 开发者
    participant GitHub
    participant API_Server as API 服务器 (Node.js)
    participant Worker as 预配工作节点 (Go)
    participant ETCD as etcd 集群
    participant MONGO as MongoDB

    User->>GitHub: 推送代码, 创建/更新 PR
    GitHub->>API_Server: 发送 Webhook 事件
    API_Server->>MONGO: 创建环境记录 (status: PENDING)
    API_Server->>ETCD: 写入任务 key (e.g., /tasks/pr-123)
    Note right of Worker: Worker 通过 Watch 机制监听 /tasks/
    Worker->>ETCD: 发现新任务 /tasks/pr-123
    Worker->>ETCD: 尝试获取分布式锁 /locks/pr-123 (带租约)
    ETCD-->>Worker: 成功获取锁
    Worker->>ETCD: 更新状态 key /status/pr-123 -> BUILDING
    Worker->>MONGO: 更新持久化状态 (status: BUILDING)
    Note right of Worker: 执行环境预配脚本 (e.g., kubectl apply)
    Worker->>ETCD: 更新状态 key /status/pr-123 -> READY, value: {url: "..."}
    Worker->>MONGO: 更新持久化状态 (status: READY, url: "...")
    Worker->>ETCD: 释放锁 /locks/pr-123
    
    participant FE as 前端界面 (Chakra UI)
    API_Server->>FE: (WebSocket) 实时推送状态变更
    Note left of FE: API Server Watch /status/*, 将变更推送到前端

数据模型与持久化层:MongoDB

MongoDB 的角色相对简单直接:作为预览环境元数据的最终事实来源。它的文档模型非常适合存储结构多变的环境信息。

一个典型的环境文档结构如下:

// src/types/environment.ts
export interface Environment {
  _id: string; // MongoDB ObjectId
  prId: number;
  repo: string;
  branch: string;
  status: 'PENDING' | 'BUILDING' | 'READY' | 'FAILED' | 'DELETED';
  url?: string;
  commitHash: string;
  createdAt: Date;
  updatedAt: Date;
  logs?: string[]; // 用于存储构建日志的摘要
}

在后端服务中,我们使用 mongoose 来定义和操作这个模型。

// src/models/Environment.js
import mongoose from 'mongoose';

const environmentSchema = new mongoose.Schema({
  prId: { type: Number, required: true, index: true },
  repo: { type: String, required: true },
  branch: { type: String, required: true },
  status: {
    type: String,
    enum: ['PENDING', 'BUILDING', 'READY', 'FAILED', 'DELETED'],
    default: 'PENDING',
  },
  url: { type: String },
  commitHash: { type: String, required: true },
  logs: [String],
}, { timestamps: true });

// 复合索引确保每个仓库的PR是唯一的
environmentSchema.index({ repo: 1, prId: 1 }, { unique: true });

export const EnvironmentModel = mongoose.model('Environment', environmentSchema);

这里的核心是状态 status 字段和索引。我们为 prId 和复合键 (repo, prId) 创建了索引,以保证快速查询和数据唯一性。

分布式协调核心:etcd 的角色

etcd 在这个架构中扮演了三个关键角色:任务分发、分布式锁和实时状态总线。我们选择使用 Go 语言来编写 Worker 节点,因为它有官方且成熟的 etcd 客户端,并且非常适合执行系统级的预配任务。

1. 任务分发与 Watch 机制

当 API 服务器接收到新的预配请求时,它除了向 MongoDB 写入数据外,还会向 etcd 中一个特定的前缀下写入一个 key,例如 /tasks/my-project/pr-123。这个 key 的 value 可以包含一些元数据,但它的存在本身就是一个信号。

Worker 进程在启动时,会 Watch /tasks/ 这个前缀。

// worker/main.go
package main

import (
	"context"
	"log"
	"time"

	"go.etcd.io/etcd/clientv3"
)

func main() {
	cli, err := clientv3.New(clientv3.Config{
		Endpoints:   []string{"localhost:2379"},
		DialTimeout: 5 * time.Second,
	})
	if err != nil {
		log.Fatalf("Failed to connect to etcd: %v", err)
	}
	defer cli.Close()

	log.Println("Worker started, watching for tasks under /tasks/...")
	
    // 从当前时间点开始监听
	watchChan := cli.Watch(context.Background(), "/tasks/", clientv3.WithPrefix())

	for watchResp := range watchChan {
		for _, event := range watchResp.Events {
			// 我们只关心新创建的任务
			if event.Type == clientv3.EventTypePut {
				taskKey := string(event.Kv.Key)
				log.Printf("New task detected: %s\n", taskKey)
				// 异步处理,避免阻塞 Watch 循环
				go processTask(cli, taskKey)
			}
		}
	}
}

func processTask(cli *clientv3.Client, taskKey string) {
    // 具体的任务处理逻辑,包含锁的获取
    // ...
}

这种基于 Watch 的推送模型,远比数据库轮询要高效和实时。Worker 能够立即响应新任务,而不会有延迟。

2. 分布式锁与租约 (Lease)

为了防止多个 Worker 同时处理同一个任务,我们需要一个可靠的分布式锁。etcd 的并发扩展库提供了简单的锁实现,并且与租约机制紧密结合,这是其鲁棒性的关键。

当一个 Worker 决定处理任务 /tasks/my-project/pr-123 时,它会尝试获取一个对应的锁,例如 /locks/my-project/pr-123

// worker/processor.go
package main

import (
	"context"
	"log"
	"time"

	"go.etcd.io/etcd/clientv3"
	"go.etcd.io/etcd/clientv3/concurrency"
)

func processTask(cli *clientv3.Client, taskKey string) {
	// 从任务key中解析出 PR ID 等信息
	// ...

	// 1. 创建一个租约,TTL 为 60 秒
	// 这意味着如果 Worker 崩溃,租约将在 60 秒后过期,锁会自动释放
	lease, err := cli.Grant(context.Background(), 60)
	if err != nil {
		log.Printf("Failed to grant lease for task %s: %v", taskKey, err)
		return
	}
	// 保持租约活性
	keepAliveChan, err := cli.KeepAlive(context.Background(), lease.ID)
	if err != nil {
		log.Printf("Failed to keep lease alive for task %s: %v", taskKey, err)
		return
	}
    // 确保在函数退出时,租约被撤销
    defer func() {
        cli.Revoke(context.Background(), lease.ID)
        log.Printf("Lease revoked for task %s", taskKey)
    }()

	// 2. 使用租约创建 Session
	s, err := concurrency.NewSession(cli, concurrency.WithLease(lease.ID))
	if err != nil {
		log.Printf("Failed to create session for task %s: %v", taskKey, err)
		return
	}
	defer s.Close()

	lockKey := "/locks/" + taskKey[len("/tasks/"):] // e.g., /locks/my-project/pr-123
	m := concurrency.NewMutex(s, lockKey)

	// 3. 尝试获取锁,设置一个较短的超时时间
	// 如果其他 Worker 正在处理,我们会快速失败并放弃,而不是无限等待
	ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
	defer cancel()

	if err := m.TryLock(ctx); err != nil {
		log.Printf("Could not acquire lock for task %s, another worker is likely processing it: %v", taskKey, err)
		return
	}
	defer func() {
        if err := m.Unlock(context.Background()); err != nil {
            log.Printf("Failed to unlock mutex for task %s: %v", taskKey, err)
        } else {
            log.Printf("Successfully unlocked mutex for task %s", taskKey)
        }
    }()

	log.Printf("Acquired lock for task: %s", taskKey)
	
	// 在函数运行期间,需要消费 keepAliveChan 以保持租约
	go func() {
		for range keepAliveChan {
			// log.Printf("Lease keep-alive signal received for %s", taskKey)
		}
	}()

	// 4. 执行实际的预配逻辑
	executeProvisioning(cli, taskKey)

    // 5. 任务完成后,删除 etcd 中的任务 key
    if _, err := cli.Delete(context.Background(), taskKey); err != nil {
        log.Printf("Failed to delete task key %s from etcd: %v", taskKey, err)
    }
}

func executeProvisioning(cli *clientv3.Client, taskKey string) {
    statusKey := "/status/" + taskKey[len("/tasks/"):]

    // 更新实时状态到 etcd
    cli.Put(context.Background(), statusKey, "BUILDING")
    // ... 更新 MongoDB ...

    log.Printf("Starting provisioning for %s...", taskKey)
    time.Sleep(30 * time.Second) // 模拟耗时的构建过程
    log.Printf("Provisioning finished for %s.", taskKey)

    // 假设预配成功,生成一个URL
    previewURL := "https://" + taskKey[len("/tasks/"):] + ".preview.example.com"
    cli.Put(context.Background(), statusKey, "READY:" + previewURL)
    // ... 更新 MongoDB ...
}

租约(Lease)机制是关键。它解决了 Worker 崩溃后锁无法释放的问题。我们为锁关联一个有 TTL 的租约,Worker 负责在租约到期前不断“续约”。如果 Worker 宕机,无法续约,etcd 会自动让租约过期,并删除所有与该租约关联的 key,锁也就自然被释放了。其他 Worker 就可以安全地接管这个任务。

前端实时反馈:Chakra UI 与 WebSocket

用户最关心的是能够实时看到环境的构建状态。我们的 API 服务器会 Watch etcd/status/ 前缀,一旦有任何状态变更,就通过 WebSocket 将更新推送到前端。

前端使用 React 和 Chakra UI 构建。Chakra UI 的组件,如 BadgeSpinnerTable,非常适合展示这类状态信息。

// src/components/EnvironmentTable.jsx
import React, { useState, useEffect } from 'react';
import {
  Table, Thead, Tbody, Tr, Th, Td, Badge, Spinner, Link, Code
} from '@chakra-ui/react';
import io from 'socket.io-client';

// 状态到 Chakra UI 颜色方案的映射
const statusColorScheme = {
  PENDING: 'gray',
  BUILDING: 'blue',
  READY: 'green',
  FAILED: 'red',
};

function EnvironmentTable() {
  const [environments, setEnvironments] = useState([]);

  useEffect(() => {
    // 获取初始数据
    fetch('/api/environments')
      .then(res => res.json())
      .then(data => setEnvironments(data));

    // 建立 WebSocket 连接
    const socket = io();

    // 监听状态更新事件
    socket.on('environment_update', (updatedEnv) => {
      setEnvironments(prevEnvs => 
        prevEnvs.map(env => 
          env.prId === updatedEnv.prId ? { ...env, ...updatedEnv } : env
        )
      );
    });

    return () => {
      socket.disconnect();
    };
  }, []);

  return (
    <Table variant="simple">
      <Thead>
        <Tr>
          <Th>PR ID</Th>
          <Th>Branch</Th>
          <Th>Status</Th>
          <Th>URL</Th>
          <Th>Last Commit</Th>
        </Tr>
      </Thead>
      <Tbody>
        {environments.map(env => (
          <Tr key={env.prId}>
            <Td>{env.prId}</Td>
            <Td><Code>{env.branch}</Code></Td>
            <Td>
              <Badge colorScheme={statusColorScheme[env.status]}>
                {env.status}
              </Badge>
              {env.status === 'BUILDING' && <Spinner size="xs" ml={2} />}
            </Td>
            <Td>
              {env.status === 'READY' ? (
                <Link href={env.url} isExternal color="teal.500">
                  {env.url}
                </Link>
              ) : (
                'N/A'
              )}
            </Td>
            <Td><Code>{env.commitHash.substring(0, 7)}</Code></Td>
          </Tr>
        ))}
      </Tbody>
    </Table>
  );
}

API 服务器端的 WebSocket 逻辑大致如下(使用 socket.ionode-etcd):

// src/server.js (部分)
import { Etcd3 } from 'etcd3';
// ... express 和 socket.io 设置 ...

const etcdClient = new Etcd3();
const io = new Server(httpServer);

io.on('connection', (socket) => {
  console.log('A user connected to WebSocket');
});

// 监听 etcd 中所有状态的变化
const watcher = await etcdClient.watch().prefix('/status/').create();

watcher.on('put', (kv) => {
  const key = kv.key.toString(); // e.g., /status/my-project/pr-123
  const value = kv.value.toString(); // e.g., READY:https://...
  
  // 解析 key 和 value
  const parts = key.split('/');
  const prId = parseInt(parts[parts.length - 1].replace('pr-', ''), 10);
  const [status, url] = value.split(':', 2);
  
  // 广播事件到所有连接的前端客户端
  io.emit('environment_update', { prId, status, url });
});

这种推模型(Push Model)架构,使得前端 UI 几乎是瞬时响应后端状态变化的,用户体验非常好。

开发工具链:Rome 的整合

整个项目(前端 React + 后端 Node.js API)采用 Monorepo 结构管理。为了统一代码风格和质量标准,我们放弃了 ESLint + Prettier 的组合,转而使用 Rome。它的优势在于单一工具、单一配置文件、极快的速度。

我们的 rome.json 配置非常简洁:

{
  "$schema": "https://docs.rome.tools/schemas/12.1.3/schema.json",
  "organizeImports": {
    "enabled": true
  },
  "linter": {
    "enabled": true,
    "rules": {
      "recommended": true,
      "suspicious": {
        "noDoubleEquals": "error"
      }
    }
  },
  "formatter": {
    "enabled": true,
    "indentStyle": "space",
    "indentSize": 2,
    "lineWidth": 80
  },
  "javascript": {
    "formatter": {
      "quoteStyle": "single"
    }
  }
}

在 CI 流程中,我们只需要一条命令 npx rome ci . 就可以同时完成对整个代码库的格式化检查和 Lint 检查,极大地简化了配置和维护成本。

方案的局限性与未来迭代

这套架构虽然解决了我们最初的核心痛点,但它并非银弹。

首先,etcd 不应该被用作通用的消息队列。如果任务的创建频率极高(例如每秒上百个),etcd 可能会成为瓶颈。它的设计目标是一致性而非吞吐量。对于超大规模的场景,可能需要引入 Kafka 或 NATS 等专门的消息队列系统来解耦任务的生产和消费。

其次,当前的 Worker 预配逻辑是模拟的。在实际生产中,executeProvisioning 函数会包含复杂的与 Kubernetes API、云厂商 SDK 或 Terraform 的交互。这部分的错误处理、重试和状态上报需要更加精细化,例如,将详细的构建日志流式传输到 S3 或类似的对象存储中,并在 MongoDB 里只保留其链接。

最后,系统的可观测性尚有不足。我们需要为 Worker 节点、API 服务器和 etcd 集群本身添加详细的 Prometheus 指标监控和分布式链路追踪,以便在出现性能问题或故障时能快速定位根源。例如,监控锁的等待时间、租约续约的成功率、etcd 的 watch 事件延迟等。这些都是将该系统推向更大使命关键场景前必须完成的工作。


  目录