我们团队面临一个棘手的现实:一个核心的、承载着巨大流量的遗留Java应用集群,其关键配置,包括功能开关(Feature Flags)、服务降级策略和动态路由规则,全部硬依赖于一个庞大的ZooKeeper集群。长久以来,对这些配置的任何变更都意味着一次高风险的手动操作:工程师通过命令行工具连接到生产环境的ZooKeeper,小心翼翼地执行set
或create
命令。这个过程不仅效率低下,而且缺乏审计、版本控制和回滚机制,每一次线上变更都像是在走钢丝。
随着团队全面拥抱云原生和GitOps,这个手动流程成了我们工作流中的一块顽疾。我们所有的现代化服务都通过Flux CD进行管理,任何变更都始于一个Git提交,经过代码审查(Code Review),最终自动同步到Kubernetes集群。我们迫切需要一种方法,将对ZooKeeper的管理也纳入到这个声明式的、可追溯的体系中来。
最初的构想是在CI/CD流水线中加入一个脚本,在部署时读取Git仓库中的配置文件并写入ZooKeeper。但这很快被否决。这种方式只在部署时执行一次,无法保证状态的最终一致性。如果有人绕过GitOps流程手动修改了ZooKeeper中的节点(ZNode),系统将对此一无所知,配置漂移在所难免。我们需要的是一个持续运行的控制器,一个能够不断地将“期望状态”(Git中的定义)与“实际状态”(ZooKeeper中的数据)进行比对和调节的“调谐循环”(Reconciliation Loop)。这正是Kubernetes Operator模式的核心思想。
技术选型决策:为何是TypeScript Operator?
确定了Operator模式后,接下来的问题是技术栈的选择。Go语言配合Kubebuilder或Operator SDK是社区的主流选择,生态成熟,性能卓越。但在我们的特定场景下,我们最终选择了TypeScript。
原因有三:
- 团队技能匹配:我们的平台工程团队对Node.js和TypeScript的掌握程度远超Go。采用TypeScript能最大化开发效率,降低维护成本。
- 生态系统整合:我们已经有大量基于Node.js的内部工具和库,包括用于监控和告警的模块。使用TypeScript可以无缝复用这些资产。
- 非性能瓶颈:这个Operator的核心任务是同步配置,其QPS(每秒查询率)极低,通常与代码发布的频率一致。它不会成为性能瓶颈,因此Go语言带来的极致性能优势在这里并不具备决定性。
最终的架构方案清晰起来:
- Git Repository: 作为配置的唯一真实来源 (Single Source of Truth)。
- Flux CD: 监听Git仓库,将我们定义的Kubernetes自定义资源(CRD)同步到集群中。
- Kubernetes CRD: 我们将定义一个名为
ZooKeeperConfig
的CRD,用来以声明式的方式描述一个ZNode的状态(路径、数据、ACL等)。 - TypeScript Operator: 部署在Kubernetes集群中,持续监听
ZooKeeperConfig
资源的变化。一旦有创建、更新或删除事件,Operator就会执行相应的逻辑,与目标ZooKeeper集群进行交互,确保ZNode的实际状态与CR中定义的期望状态一致。
graph TD A[Developer] -- git push --> B(Git Repository: config.yaml); B -- GitOps Sync --> C{Flux CD}; C -- Applies CR --> D[Kubernetes API Server]; D -- Watches ZooKeeperConfig CR --> E(TypeScript ZooKeeper Operator); E -- Reconciles State --> F((ZooKeeper Cluster)); F -- Provides Config --> G[Legacy Java Application Fleet];
步骤化实现:从CRD到调谐循环
1. 定义ZooKeeperConfig
CRD
首先,我们需要设计并应用CRD。这是我们声明式API的基石。一个好的CRD设计应该清晰、简洁且能覆盖所有必要场景。
crd.yaml
:
apiVersion: apiextensions.k8s.io/v1
kind: CustomResourceDefinition
metadata:
name: zookeeperconfigs.platform.example.com
spec:
group: platform.example.com
names:
kind: ZooKeeperConfig
plural: zookeeperconfigs
singular: zookeeperconfig
shortNames:
- zkc
scope: Namespaced
versions:
- name: v1alpha1
served: true
storage: true
schema:
openAPIV3Schema:
type: object
properties:
spec:
type: object
properties:
path:
type: string
description: "The full path of the ZNode in ZooKeeper."
data:
type: string
description: "The data to be stored in the ZNode, base64 encoded."
encoding:
type: string
description: "The encoding of the original data before base64. Defaults to 'utf-8'."
enum: ["utf-8", "binary"]
default: "utf-8"
syncPolicy:
type: string
description: "Policy for syncing. 'createOrUpdate' is default. 'createOnly' will not update if node exists."
enum: ["createOrUpdate", "createOnly"]
default: "createOrUpdate"
required:
- path
- data
status:
type: object
properties:
lastSyncedTime:
type: string
format: date-time
state:
type: string
enum: ["Pending", "Synced", "Error", "OutOfSync"]
message:
type: string
observedGeneration:
type: integer
format: int64
这个CRD定义了spec
(期望状态)和status
(实际状态)。spec
中包含了ZNode的路径path
和经过Base64编码的data
。我们还加入了一个syncPolicy
字段,提供了更灵活的同步策略。status
字段将用于反馈同步结果,这对于可观测性和调试至关重要。
2. 搭建TypeScript Operator项目骨架
我们不使用任何现成的Operator框架,而是直接基于@kubernetes/client-node
库来构建,这样可以对控制器逻辑有最精细的掌控。
项目结构:
.
├── src/
│ ├── client/
│ │ └── zookeeperClient.ts # ZooKeeper连接和操作的封装
│ ├── controller.ts # 核心调谐逻辑
│ ├── crd.ts # ZooKeeperConfig的TypeScript类型定义
│ └── main.ts # 程序入口,启动Informer和控制器
├── package.json
└── tsconfig.json
关键依赖:
-
@kubernetes/client-node
: 用于与Kubernetes API Server交互。 -
node-zookeeper-client
: 一个成熟的ZooKeeper Node.js客户端。
3. 实现核心调谐逻辑 (Reconciliation Loop)
这是整个Operator的心脏。controller.ts
文件将包含我们的主调谐函数。基本逻辑是:
- 使用
@kubernetes/client-node
的Watch
或Informer
机制来监听ZooKeeperConfig
资源的ADDED
,MODIFIED
,DELETED
事件。 - 为每个事件触发一个调谐函数
reconcile(namespace, name)
。 - 在
reconcile
函数中,获取最新的CR对象。 - 根据CR的
spec
与ZooKeeper中的实际状态进行比较和操作。 - 操作完成后,更新CR的
status
字段。
以下是controller.ts
中经过简化的核心实现,包含了详尽的注释和生产级的考量。
src/controller.ts
:
import * as k8s from '@kubernetes/client-node';
import { ZooKeeperClient } from './client/zookeeperClient';
import { ZooKeeperConfig } from './crd';
import { logger } from './logger'; // 假设我们有一个结构化的日志记录器
const kc = new k8s.KubeConfig();
kc.loadFromDefault();
const k8sApi = kc.makeApiClient(k8s.CustomObjectsApi);
const k8sAppsApi = kc.makeApiClient(k8s.AppsV1Api);
const GROUP = 'platform.example.com';
const VERSION = 'v1alpha1';
const PLURAL = 'zookeeperconfigs';
// 在真实项目中,zkConnectionString应来自环境变量或ConfigMap
const zkClient = new ZooKeeperClient('zookeeper-1:2181,zookeeper-2:2181');
/**
* The main reconciliation function. This function is triggered for every change
* to a ZooKeeperConfig object.
* @param namespace The namespace of the object.
* @param name The name of the object.
*/
export async function reconcile(namespace: string, name: string): Promise<void> {
logger.info(`Reconciliation started for ${namespace}/${name}`);
// 1. Fetch the latest version of the ZooKeeperConfig object.
let cr: ZooKeeperConfig;
try {
const res = await k8sApi.getNamespacedCustomObject(GROUP, VERSION, namespace, PLURAL, name);
cr = res.body as ZooKeeperConfig;
} catch (err: any) {
if (err.statusCode === 404) {
logger.warn(`ZooKeeperConfig ${namespace}/${name} not found. Assuming it was deleted.`);
// The object is gone, nothing to do. The deletion logic is handled by the finalizer.
return;
}
logger.error(`Failed to fetch ZooKeeperConfig ${namespace}/${name}`, { error: err.message });
throw err;
}
// 2. Handle deletion (via finalizers).
const finalizerName = `${PLURAL}.${GROUP}/finalizer`;
if (cr.metadata?.deletionTimestamp && cr.metadata?.finalizers?.includes(finalizerName)) {
await handleDeletion(cr);
// Remove the finalizer to allow Kubernetes to delete the object.
try {
const finalizers = cr.metadata.finalizers.filter(f => f !== finalizerName);
const patch = [{ op: 'replace', path: '/metadata/finalizers', value: finalizers }];
await k8sApi.patchNamespacedCustomObject(GROUP, VERSION, namespace, PLURAL, name, patch, undefined, undefined, undefined, { headers: { 'Content-Type': 'application/json-patch+json' } });
logger.info(`Finalizer removed for ${namespace}/${name}`);
} catch(err) {
logger.error(`Failed to remove finalizer for ${namespace}/${name}`, { error: err });
// Re-throw to trigger a retry.
throw err;
}
return;
}
// Add finalizer if it doesn't exist. This ensures we can handle deletion gracefully.
if (!cr.metadata?.finalizers?.includes(finalizerName)) {
try {
const finalizers = cr.metadata?.finalizers || [];
finalizers.push(finalizerName);
const patch = [{ op: 'replace', path: '/metadata/finalizers', value: finalizers }];
await k8sApi.patchNamespacedCustomObject(GROUP, VERSION, namespace, PLURAL, name, patch, undefined, undefined, undefined, { headers: { 'Content-Type': 'application/json-patch+json' } });
logger.info(`Finalizer added for ${namespace}/${name}`);
} catch(err) {
logger.error(`Failed to add finalizer for ${namespace}/${name}`, { error: err });
throw err;
}
}
// 3. Main reconciliation logic.
try {
await zkClient.ensureConnected();
const { path, data, encoding, syncPolicy } = cr.spec;
const nodeExists = await zkClient.exists(path);
const bufferData = Buffer.from(data, 'base64');
if (nodeExists) {
if (syncPolicy === 'createOnly') {
logger.info(`Node ${path} exists and syncPolicy is 'createOnly'. Skipping update.`);
await updateStatus(cr, 'Synced', 'Node exists and syncPolicy is createOnly.');
return;
}
// Node exists, check if data needs update.
const currentData = await zkClient.getData(path);
if (currentData && Buffer.compare(currentData, bufferData) !== 0) {
logger.info(`Data mismatch for node ${path}. Updating.`);
await zkClient.setData(path, bufferData);
await updateStatus(cr, 'Synced', `Node ${path} updated successfully.`);
} else {
logger.info(`Node ${path} is already in the desired state.`);
// Even if no change, update status to refresh timestamp
await updateStatus(cr, 'Synced', 'Node is already in desired state.');
}
} else {
// Node does not exist, create it.
logger.info(`Node ${path} does not exist. Creating.`);
await zkClient.create(path, bufferData);
await updateStatus(cr, 'Synced', `Node ${path} created successfully.`);
}
} catch (error: any) {
logger.error(`Reconciliation failed for ${namespace}/${name}`, { error: error.message, path: cr.spec.path });
await updateStatus(cr, 'Error', `Failed to sync: ${error.message}`);
// Re-throw error to signal to the controller runtime that this reconciliation failed and should be retried.
throw error;
}
}
async function handleDeletion(cr: ZooKeeperConfig): Promise<void> {
const { path } = cr.spec;
logger.info(`Handling deletion for ZooKeeperConfig ${cr.metadata?.namespace}/${cr.metadata?.name}, removing ZNode ${path}`);
try {
await zkClient.ensureConnected();
const nodeExists = await zkClient.exists(path);
if (nodeExists) {
await zkClient.remove(path);
logger.info(`Successfully removed ZNode ${path}`);
} else {
logger.warn(`ZNode ${path} not found during deletion. Skipping.`);
}
} catch (error: any) {
logger.error(`Failed to remove ZNode ${path}`, { error: error.message });
// In a production system, you might want to implement a retry mechanism or alert here.
// For now, we re-throw to block finalizer removal and retry.
throw error;
}
}
/**
* Updates the status subresource of the ZooKeeperConfig object.
*/
async function updateStatus(cr: ZooKeeperConfig, state: 'Synced' | 'Error' | 'Pending', message: string): Promise<void> {
const { namespace, name, generation } = cr.metadata!;
const status = {
observedGeneration: generation,
state,
message,
lastSyncedTime: new Date().toISOString(),
};
// The status patch must be a strategic merge patch or JSON patch.
const patch = [{ op: 'replace', path: '/status', value: status }];
try {
await k8sApi.patchNamespacedCustomObjectStatus(
GROUP, VERSION, namespace!, PLURAL, name!,
patch,
undefined, undefined, undefined,
{ headers: { 'Content-Type': 'application/json-patch+json' } }
);
} catch (err: any) {
logger.error(`Failed to update status for ${namespace}/${name}`, { error: err.body.message });
}
}
这段代码的核心考量点:
- 幂等性:每次调谐都从读取最新状态开始,无论执行多少次,对于同一个
spec
,结果都应该是一致的。 - 错误处理与重试:当与ZooKeeper或K8s API Server通信失败时,函数会抛出异常。控制器的主循环(未在代码中展示,通常是一个简单的
while(true)
加try/catch
)会捕获它,并经过一段退避延时后重试,确保最终一致性。 - 状态反馈:
updateStatus
函数至关重要。它将调谐结果写回CR的status
字段,使得我们可以通过kubectl get zkc -o yaml
清晰地看到每个配置的同步状态。 - 优雅删除 (Finalizers): 这是Operator开发中的一个关键模式。我们给每个CR对象添加一个
finalizer
。当用户删除CR时,Kubernetes不会立即清理它,而是设置一个deletionTimestamp
。我们的Operator检测到这个时间戳,执行清理逻辑(删除对应的ZNode),清理成功后再移除finalizer
。此时,Kubernetes才会真正删除这个CR对象。这保证了外部资源(ZNode)的生命周期与Kubernetes资源同步。
4. 与Flux CD集成
集成是整个流程中最简单的部分。我们只需要在Git仓库中创建一个指向ZooKeeperConfig
清单目录的Kustomization
对象。
infrastructure/flux/zk-configs-kustomization.yaml
:
apiVersion: kustomize.toolkit.fluxcd.io/v1beta2
kind: Kustomization
metadata:
name: zookeeper-configs
namespace: flux-system
spec:
interval: 5m
path: "./zookeeper-configs" # Git仓库中存放ZooKeeperConfig CR的目录
prune: true
sourceRef:
kind: GitRepository
name: platform-config-repo
validation: client
现在,开发人员的工作流程变得极其顺畅。要修改一个功能开关,他们只需:
在
zookeeper-configs
目录下修改或创建一个YAML文件。zookeeper-configs/feature-new-recommendation-engine.yaml
:apiVersion: platform.example.com/v1alpha1 kind: ZooKeeperConfig metadata: name: feature-new-recommendation-engine namespace: default spec: path: /app/features/new-recommendation-engine/enabled # The base64 representation of the string "true" data: "dHJ1ZQ==" encoding: "utf-8"
提交并推送这个变更到Git仓库。
Flux CD检测到变更,将
ZooKeeperConfig
应用到集群。我们的TypeScript Operator收到通知,执行调谐逻辑,在ZooKeeper中创建或更新
/app/features/new-recommendation-engine/enabled
节点,将其值设为true
。
整个过程完全自动化,有版本记录,可审计,可回滚(只需在Git中revert
一个提交)。
局限性与未来迭代路径
这个方案虽然解决了核心痛点,但并非完美。在真实生产环境中,它还存在一些局限和可以优化的方向。
首先,目前的Operator是单线程的调谐模型。如果短时间内有大量的ZooKeeperConfig
变更,可能会导致处理延迟。一个可行的优化是引入工作队列模式,将调谐任务放入队列,由一组worker并发处理,以提高吞吐量。
其次,对ZooKeeper集群的连接管理可以做得更健壮。当前实现依赖node-zookeeper-client
库的内部重连逻辑,但我们可以增加更上层的健康检查和熔断机制。例如,如果Operator连续多次无法连接到ZooKeeper,应主动更新所有相关CR的状态为Error
或OutOfSync
,并触发告警,而不是无限期地重试。
最后,可以进一步丰富CRD的能力,比如支持设置ZNode的ACL(访问控制列表),或者支持从ConfigMap或Secret中引用data
字段,避免将敏感信息直接存储在Git中。这需要Operator增加对ConfigMap和Secret资源的Watch
逻辑,并在调谐时进行关联读取。