团队的开发流程一直有个瓶颈:代码审查(Code Review)严重依赖于开发者的本地环境。前端工程师需要拉取后端分支,后端工程师需要理解前端的构建过程,这导致每次 PR 的审查周期都很长,反馈回路极慢。我们的目标是实现自动化预览环境——任何一个 Pull Request 被创建或更新,系统都能自动为其构建一个独立的、可访问的、全功能的运行环境。
最初的构想是一个简单的基于数据库轮询的系统。API 服务器接收到 webhook 请求后,在 MongoDB 中创建一条记录,标记状态为 PENDING
。后台有一组 Worker 进程,每隔几秒轮询数据库,查找 PENDING
的任务,然后尝试抢占并执行。
这种方案在真实项目中很快就暴露了问题:
- 轮询效率低下: Worker 对数据库造成了不必要的周期性压力,且状态更新有延迟。
- 抢占逻辑复杂: 需要实现一套悲观锁或乐观锁机制来避免多个 Worker 处理同一个任务,这增加了业务逻辑的复杂性和出错的可能。
- 状态同步困难: Worker 崩溃后,任务状态可能永久停留在
PROCESSING
,需要额外的“巡检”或“心跳”机制来处理异常,这又是一个复杂度的叠加。
我们需要一个更可靠的机制来处理分布式环境下的任务协调、锁和状态通知。这正是 etcd
的用武之地。最终,我们敲定的架构是:使用 MongoDB 作为环境元数据的持久化存储,而使用 etcd
作为分布式任务协调的核心组件,负责锁、租约和实时状态通知。
sequenceDiagram participant User as 开发者 participant GitHub participant API_Server as API 服务器 (Node.js) participant Worker as 预配工作节点 (Go) participant ETCD as etcd 集群 participant MONGO as MongoDB User->>GitHub: 推送代码, 创建/更新 PR GitHub->>API_Server: 发送 Webhook 事件 API_Server->>MONGO: 创建环境记录 (status: PENDING) API_Server->>ETCD: 写入任务 key (e.g., /tasks/pr-123) Note right of Worker: Worker 通过 Watch 机制监听 /tasks/ Worker->>ETCD: 发现新任务 /tasks/pr-123 Worker->>ETCD: 尝试获取分布式锁 /locks/pr-123 (带租约) ETCD-->>Worker: 成功获取锁 Worker->>ETCD: 更新状态 key /status/pr-123 -> BUILDING Worker->>MONGO: 更新持久化状态 (status: BUILDING) Note right of Worker: 执行环境预配脚本 (e.g., kubectl apply) Worker->>ETCD: 更新状态 key /status/pr-123 -> READY, value: {url: "..."} Worker->>MONGO: 更新持久化状态 (status: READY, url: "...") Worker->>ETCD: 释放锁 /locks/pr-123 participant FE as 前端界面 (Chakra UI) API_Server->>FE: (WebSocket) 实时推送状态变更 Note left of FE: API Server Watch /status/*, 将变更推送到前端
数据模型与持久化层:MongoDB
MongoDB 的角色相对简单直接:作为预览环境元数据的最终事实来源。它的文档模型非常适合存储结构多变的环境信息。
一个典型的环境文档结构如下:
// src/types/environment.ts
export interface Environment {
_id: string; // MongoDB ObjectId
prId: number;
repo: string;
branch: string;
status: 'PENDING' | 'BUILDING' | 'READY' | 'FAILED' | 'DELETED';
url?: string;
commitHash: string;
createdAt: Date;
updatedAt: Date;
logs?: string[]; // 用于存储构建日志的摘要
}
在后端服务中,我们使用 mongoose
来定义和操作这个模型。
// src/models/Environment.js
import mongoose from 'mongoose';
const environmentSchema = new mongoose.Schema({
prId: { type: Number, required: true, index: true },
repo: { type: String, required: true },
branch: { type: String, required: true },
status: {
type: String,
enum: ['PENDING', 'BUILDING', 'READY', 'FAILED', 'DELETED'],
default: 'PENDING',
},
url: { type: String },
commitHash: { type: String, required: true },
logs: [String],
}, { timestamps: true });
// 复合索引确保每个仓库的PR是唯一的
environmentSchema.index({ repo: 1, prId: 1 }, { unique: true });
export const EnvironmentModel = mongoose.model('Environment', environmentSchema);
这里的核心是状态 status
字段和索引。我们为 prId
和复合键 (repo, prId)
创建了索引,以保证快速查询和数据唯一性。
分布式协调核心:etcd 的角色
etcd
在这个架构中扮演了三个关键角色:任务分发、分布式锁和实时状态总线。我们选择使用 Go 语言来编写 Worker 节点,因为它有官方且成熟的 etcd
客户端,并且非常适合执行系统级的预配任务。
1. 任务分发与 Watch 机制
当 API 服务器接收到新的预配请求时,它除了向 MongoDB 写入数据外,还会向 etcd
中一个特定的前缀下写入一个 key,例如 /tasks/my-project/pr-123
。这个 key 的 value 可以包含一些元数据,但它的存在本身就是一个信号。
Worker 进程在启动时,会 Watch
/tasks/
这个前缀。
// worker/main.go
package main
import (
"context"
"log"
"time"
"go.etcd.io/etcd/clientv3"
)
func main() {
cli, err := clientv3.New(clientv3.Config{
Endpoints: []string{"localhost:2379"},
DialTimeout: 5 * time.Second,
})
if err != nil {
log.Fatalf("Failed to connect to etcd: %v", err)
}
defer cli.Close()
log.Println("Worker started, watching for tasks under /tasks/...")
// 从当前时间点开始监听
watchChan := cli.Watch(context.Background(), "/tasks/", clientv3.WithPrefix())
for watchResp := range watchChan {
for _, event := range watchResp.Events {
// 我们只关心新创建的任务
if event.Type == clientv3.EventTypePut {
taskKey := string(event.Kv.Key)
log.Printf("New task detected: %s\n", taskKey)
// 异步处理,避免阻塞 Watch 循环
go processTask(cli, taskKey)
}
}
}
}
func processTask(cli *clientv3.Client, taskKey string) {
// 具体的任务处理逻辑,包含锁的获取
// ...
}
这种基于 Watch
的推送模型,远比数据库轮询要高效和实时。Worker 能够立即响应新任务,而不会有延迟。
2. 分布式锁与租约 (Lease)
为了防止多个 Worker 同时处理同一个任务,我们需要一个可靠的分布式锁。etcd
的并发扩展库提供了简单的锁实现,并且与租约机制紧密结合,这是其鲁棒性的关键。
当一个 Worker 决定处理任务 /tasks/my-project/pr-123
时,它会尝试获取一个对应的锁,例如 /locks/my-project/pr-123
。
// worker/processor.go
package main
import (
"context"
"log"
"time"
"go.etcd.io/etcd/clientv3"
"go.etcd.io/etcd/clientv3/concurrency"
)
func processTask(cli *clientv3.Client, taskKey string) {
// 从任务key中解析出 PR ID 等信息
// ...
// 1. 创建一个租约,TTL 为 60 秒
// 这意味着如果 Worker 崩溃,租约将在 60 秒后过期,锁会自动释放
lease, err := cli.Grant(context.Background(), 60)
if err != nil {
log.Printf("Failed to grant lease for task %s: %v", taskKey, err)
return
}
// 保持租约活性
keepAliveChan, err := cli.KeepAlive(context.Background(), lease.ID)
if err != nil {
log.Printf("Failed to keep lease alive for task %s: %v", taskKey, err)
return
}
// 确保在函数退出时,租约被撤销
defer func() {
cli.Revoke(context.Background(), lease.ID)
log.Printf("Lease revoked for task %s", taskKey)
}()
// 2. 使用租约创建 Session
s, err := concurrency.NewSession(cli, concurrency.WithLease(lease.ID))
if err != nil {
log.Printf("Failed to create session for task %s: %v", taskKey, err)
return
}
defer s.Close()
lockKey := "/locks/" + taskKey[len("/tasks/"):] // e.g., /locks/my-project/pr-123
m := concurrency.NewMutex(s, lockKey)
// 3. 尝试获取锁,设置一个较短的超时时间
// 如果其他 Worker 正在处理,我们会快速失败并放弃,而不是无限等待
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
if err := m.TryLock(ctx); err != nil {
log.Printf("Could not acquire lock for task %s, another worker is likely processing it: %v", taskKey, err)
return
}
defer func() {
if err := m.Unlock(context.Background()); err != nil {
log.Printf("Failed to unlock mutex for task %s: %v", taskKey, err)
} else {
log.Printf("Successfully unlocked mutex for task %s", taskKey)
}
}()
log.Printf("Acquired lock for task: %s", taskKey)
// 在函数运行期间,需要消费 keepAliveChan 以保持租约
go func() {
for range keepAliveChan {
// log.Printf("Lease keep-alive signal received for %s", taskKey)
}
}()
// 4. 执行实际的预配逻辑
executeProvisioning(cli, taskKey)
// 5. 任务完成后,删除 etcd 中的任务 key
if _, err := cli.Delete(context.Background(), taskKey); err != nil {
log.Printf("Failed to delete task key %s from etcd: %v", taskKey, err)
}
}
func executeProvisioning(cli *clientv3.Client, taskKey string) {
statusKey := "/status/" + taskKey[len("/tasks/"):]
// 更新实时状态到 etcd
cli.Put(context.Background(), statusKey, "BUILDING")
// ... 更新 MongoDB ...
log.Printf("Starting provisioning for %s...", taskKey)
time.Sleep(30 * time.Second) // 模拟耗时的构建过程
log.Printf("Provisioning finished for %s.", taskKey)
// 假设预配成功,生成一个URL
previewURL := "https://" + taskKey[len("/tasks/"):] + ".preview.example.com"
cli.Put(context.Background(), statusKey, "READY:" + previewURL)
// ... 更新 MongoDB ...
}
租约(Lease)机制是关键。它解决了 Worker 崩溃后锁无法释放的问题。我们为锁关联一个有 TTL 的租约,Worker 负责在租约到期前不断“续约”。如果 Worker 宕机,无法续约,etcd
会自动让租约过期,并删除所有与该租约关联的 key,锁也就自然被释放了。其他 Worker 就可以安全地接管这个任务。
前端实时反馈:Chakra UI 与 WebSocket
用户最关心的是能够实时看到环境的构建状态。我们的 API 服务器会 Watch
etcd
的 /status/
前缀,一旦有任何状态变更,就通过 WebSocket 将更新推送到前端。
前端使用 React 和 Chakra UI 构建。Chakra UI 的组件,如 Badge
、Spinner
和 Table
,非常适合展示这类状态信息。
// src/components/EnvironmentTable.jsx
import React, { useState, useEffect } from 'react';
import {
Table, Thead, Tbody, Tr, Th, Td, Badge, Spinner, Link, Code
} from '@chakra-ui/react';
import io from 'socket.io-client';
// 状态到 Chakra UI 颜色方案的映射
const statusColorScheme = {
PENDING: 'gray',
BUILDING: 'blue',
READY: 'green',
FAILED: 'red',
};
function EnvironmentTable() {
const [environments, setEnvironments] = useState([]);
useEffect(() => {
// 获取初始数据
fetch('/api/environments')
.then(res => res.json())
.then(data => setEnvironments(data));
// 建立 WebSocket 连接
const socket = io();
// 监听状态更新事件
socket.on('environment_update', (updatedEnv) => {
setEnvironments(prevEnvs =>
prevEnvs.map(env =>
env.prId === updatedEnv.prId ? { ...env, ...updatedEnv } : env
)
);
});
return () => {
socket.disconnect();
};
}, []);
return (
<Table variant="simple">
<Thead>
<Tr>
<Th>PR ID</Th>
<Th>Branch</Th>
<Th>Status</Th>
<Th>URL</Th>
<Th>Last Commit</Th>
</Tr>
</Thead>
<Tbody>
{environments.map(env => (
<Tr key={env.prId}>
<Td>{env.prId}</Td>
<Td><Code>{env.branch}</Code></Td>
<Td>
<Badge colorScheme={statusColorScheme[env.status]}>
{env.status}
</Badge>
{env.status === 'BUILDING' && <Spinner size="xs" ml={2} />}
</Td>
<Td>
{env.status === 'READY' ? (
<Link href={env.url} isExternal color="teal.500">
{env.url}
</Link>
) : (
'N/A'
)}
</Td>
<Td><Code>{env.commitHash.substring(0, 7)}</Code></Td>
</Tr>
))}
</Tbody>
</Table>
);
}
API 服务器端的 WebSocket 逻辑大致如下(使用 socket.io
和 node-etcd
):
// src/server.js (部分)
import { Etcd3 } from 'etcd3';
// ... express 和 socket.io 设置 ...
const etcdClient = new Etcd3();
const io = new Server(httpServer);
io.on('connection', (socket) => {
console.log('A user connected to WebSocket');
});
// 监听 etcd 中所有状态的变化
const watcher = await etcdClient.watch().prefix('/status/').create();
watcher.on('put', (kv) => {
const key = kv.key.toString(); // e.g., /status/my-project/pr-123
const value = kv.value.toString(); // e.g., READY:https://...
// 解析 key 和 value
const parts = key.split('/');
const prId = parseInt(parts[parts.length - 1].replace('pr-', ''), 10);
const [status, url] = value.split(':', 2);
// 广播事件到所有连接的前端客户端
io.emit('environment_update', { prId, status, url });
});
这种推模型(Push Model)架构,使得前端 UI 几乎是瞬时响应后端状态变化的,用户体验非常好。
开发工具链:Rome 的整合
整个项目(前端 React + 后端 Node.js API)采用 Monorepo 结构管理。为了统一代码风格和质量标准,我们放弃了 ESLint + Prettier 的组合,转而使用 Rome
。它的优势在于单一工具、单一配置文件、极快的速度。
我们的 rome.json
配置非常简洁:
{
"$schema": "https://docs.rome.tools/schemas/12.1.3/schema.json",
"organizeImports": {
"enabled": true
},
"linter": {
"enabled": true,
"rules": {
"recommended": true,
"suspicious": {
"noDoubleEquals": "error"
}
}
},
"formatter": {
"enabled": true,
"indentStyle": "space",
"indentSize": 2,
"lineWidth": 80
},
"javascript": {
"formatter": {
"quoteStyle": "single"
}
}
}
在 CI 流程中,我们只需要一条命令 npx rome ci .
就可以同时完成对整个代码库的格式化检查和 Lint 检查,极大地简化了配置和维护成本。
方案的局限性与未来迭代
这套架构虽然解决了我们最初的核心痛点,但它并非银弹。
首先,etcd
不应该被用作通用的消息队列。如果任务的创建频率极高(例如每秒上百个),etcd
可能会成为瓶颈。它的设计目标是一致性而非吞吐量。对于超大规模的场景,可能需要引入 Kafka 或 NATS 等专门的消息队列系统来解耦任务的生产和消费。
其次,当前的 Worker 预配逻辑是模拟的。在实际生产中,executeProvisioning
函数会包含复杂的与 Kubernetes API、云厂商 SDK 或 Terraform 的交互。这部分的错误处理、重试和状态上报需要更加精细化,例如,将详细的构建日志流式传输到 S3 或类似的对象存储中,并在 MongoDB 里只保留其链接。
最后,系统的可观测性尚有不足。我们需要为 Worker 节点、API 服务器和 etcd
集群本身添加详细的 Prometheus 指标监控和分布式链路追踪,以便在出现性能问题或故障时能快速定位根源。例如,监控锁的等待时间、租约续约的成功率、etcd
的 watch 事件延迟等。这些都是将该系统推向更大使命关键场景前必须完成的工作。