基于 TypeScript 与 Kubebuilder 构建 ZooKeeper Operator 实现声明式配置管理


我们团队面临一个棘手的现实:一个核心的、承载着巨大流量的遗留Java应用集群,其关键配置,包括功能开关(Feature Flags)、服务降级策略和动态路由规则,全部硬依赖于一个庞大的ZooKeeper集群。长久以来,对这些配置的任何变更都意味着一次高风险的手动操作:工程师通过命令行工具连接到生产环境的ZooKeeper,小心翼翼地执行setcreate命令。这个过程不仅效率低下,而且缺乏审计、版本控制和回滚机制,每一次线上变更都像是在走钢丝。

随着团队全面拥抱云原生和GitOps,这个手动流程成了我们工作流中的一块顽疾。我们所有的现代化服务都通过Flux CD进行管理,任何变更都始于一个Git提交,经过代码审查(Code Review),最终自动同步到Kubernetes集群。我们迫切需要一种方法,将对ZooKeeper的管理也纳入到这个声明式的、可追溯的体系中来。

最初的构想是在CI/CD流水线中加入一个脚本,在部署时读取Git仓库中的配置文件并写入ZooKeeper。但这很快被否决。这种方式只在部署时执行一次,无法保证状态的最终一致性。如果有人绕过GitOps流程手动修改了ZooKeeper中的节点(ZNode),系统将对此一无所知,配置漂移在所难免。我们需要的是一个持续运行的控制器,一个能够不断地将“期望状态”(Git中的定义)与“实际状态”(ZooKeeper中的数据)进行比对和调节的“调谐循环”(Reconciliation Loop)。这正是Kubernetes Operator模式的核心思想。

技术选型决策:为何是TypeScript Operator?

确定了Operator模式后,接下来的问题是技术栈的选择。Go语言配合Kubebuilder或Operator SDK是社区的主流选择,生态成熟,性能卓越。但在我们的特定场景下,我们最终选择了TypeScript。

原因有三:

  1. 团队技能匹配:我们的平台工程团队对Node.js和TypeScript的掌握程度远超Go。采用TypeScript能最大化开发效率,降低维护成本。
  2. 生态系统整合:我们已经有大量基于Node.js的内部工具和库,包括用于监控和告警的模块。使用TypeScript可以无缝复用这些资产。
  3. 非性能瓶颈:这个Operator的核心任务是同步配置,其QPS(每秒查询率)极低,通常与代码发布的频率一致。它不会成为性能瓶颈,因此Go语言带来的极致性能优势在这里并不具备决定性。

最终的架构方案清晰起来:

  • Git Repository: 作为配置的唯一真实来源 (Single Source of Truth)。
  • Flux CD: 监听Git仓库,将我们定义的Kubernetes自定义资源(CRD)同步到集群中。
  • Kubernetes CRD: 我们将定义一个名为ZooKeeperConfig的CRD,用来以声明式的方式描述一个ZNode的状态(路径、数据、ACL等)。
  • TypeScript Operator: 部署在Kubernetes集群中,持续监听ZooKeeperConfig资源的变化。一旦有创建、更新或删除事件,Operator就会执行相应的逻辑,与目标ZooKeeper集群进行交互,确保ZNode的实际状态与CR中定义的期望状态一致。
graph TD
    A[Developer] -- git push --> B(Git Repository: config.yaml);
    B -- GitOps Sync --> C{Flux CD};
    C -- Applies CR --> D[Kubernetes API Server];
    D -- Watches ZooKeeperConfig CR --> E(TypeScript ZooKeeper Operator);
    E -- Reconciles State --> F((ZooKeeper Cluster));
    F -- Provides Config --> G[Legacy Java Application Fleet];

步骤化实现:从CRD到调谐循环

1. 定义ZooKeeperConfig CRD

首先,我们需要设计并应用CRD。这是我们声明式API的基石。一个好的CRD设计应该清晰、简洁且能覆盖所有必要场景。

crd.yaml:

apiVersion: apiextensions.k8s.io/v1
kind: CustomResourceDefinition
metadata:
  name: zookeeperconfigs.platform.example.com
spec:
  group: platform.example.com
  names:
    kind: ZooKeeperConfig
    plural: zookeeperconfigs
    singular: zookeeperconfig
    shortNames:
      - zkc
  scope: Namespaced
  versions:
    - name: v1alpha1
      served: true
      storage: true
      schema:
        openAPIV3Schema:
          type: object
          properties:
            spec:
              type: object
              properties:
                path:
                  type: string
                  description: "The full path of the ZNode in ZooKeeper."
                data:
                  type: string
                  description: "The data to be stored in the ZNode, base64 encoded."
                encoding:
                  type: string
                  description: "The encoding of the original data before base64. Defaults to 'utf-8'."
                  enum: ["utf-8", "binary"]
                  default: "utf-8"
                syncPolicy:
                  type: string
                  description: "Policy for syncing. 'createOrUpdate' is default. 'createOnly' will not update if node exists."
                  enum: ["createOrUpdate", "createOnly"]
                  default: "createOrUpdate"
              required:
                - path
                - data
            status:
              type: object
              properties:
                lastSyncedTime:
                  type: string
                  format: date-time
                state:
                  type: string
                  enum: ["Pending", "Synced", "Error", "OutOfSync"]
                message:
                  type: string
                observedGeneration:
                  type: integer
                  format: int64

这个CRD定义了spec(期望状态)和status(实际状态)。spec中包含了ZNode的路径path和经过Base64编码的data。我们还加入了一个syncPolicy字段,提供了更灵活的同步策略。status字段将用于反馈同步结果,这对于可观测性和调试至关重要。

2. 搭建TypeScript Operator项目骨架

我们不使用任何现成的Operator框架,而是直接基于@kubernetes/client-node库来构建,这样可以对控制器逻辑有最精细的掌控。

项目结构:

.
├── src/
│   ├── client/
│   │   └── zookeeperClient.ts  # ZooKeeper连接和操作的封装
│   ├── controller.ts           # 核心调谐逻辑
│   ├── crd.ts                  # ZooKeeperConfig的TypeScript类型定义
│   └── main.ts                 # 程序入口,启动Informer和控制器
├── package.json
└── tsconfig.json

关键依赖:

  • @kubernetes/client-node: 用于与Kubernetes API Server交互。
  • node-zookeeper-client: 一个成熟的ZooKeeper Node.js客户端。

3. 实现核心调谐逻辑 (Reconciliation Loop)

这是整个Operator的心脏。controller.ts文件将包含我们的主调谐函数。基本逻辑是:

  1. 使用@kubernetes/client-nodeWatchInformer机制来监听ZooKeeperConfig资源的ADDED, MODIFIED, DELETED事件。
  2. 为每个事件触发一个调谐函数reconcile(namespace, name)
  3. reconcile函数中,获取最新的CR对象。
  4. 根据CR的spec与ZooKeeper中的实际状态进行比较和操作。
  5. 操作完成后,更新CR的status字段。

以下是controller.ts中经过简化的核心实现,包含了详尽的注释和生产级的考量。

src/controller.ts:

import * as k8s from '@kubernetes/client-node';
import { ZooKeeperClient } from './client/zookeeperClient';
import { ZooKeeperConfig } from './crd';
import { logger } from './logger'; // 假设我们有一个结构化的日志记录器

const kc = new k8s.KubeConfig();
kc.loadFromDefault();

const k8sApi = kc.makeApiClient(k8s.CustomObjectsApi);
const k8sAppsApi = kc.makeApiClient(k8s.AppsV1Api);

const GROUP = 'platform.example.com';
const VERSION = 'v1alpha1';
const PLURAL = 'zookeeperconfigs';

// 在真实项目中,zkConnectionString应来自环境变量或ConfigMap
const zkClient = new ZooKeeperClient('zookeeper-1:2181,zookeeper-2:2181');

/**
 * The main reconciliation function. This function is triggered for every change
 * to a ZooKeeperConfig object.
 * @param namespace The namespace of the object.
 * @param name The name of the object.
 */
export async function reconcile(namespace: string, name: string): Promise<void> {
  logger.info(`Reconciliation started for ${namespace}/${name}`);

  // 1. Fetch the latest version of the ZooKeeperConfig object.
  let cr: ZooKeeperConfig;
  try {
    const res = await k8sApi.getNamespacedCustomObject(GROUP, VERSION, namespace, PLURAL, name);
    cr = res.body as ZooKeeperConfig;
  } catch (err: any) {
    if (err.statusCode === 404) {
      logger.warn(`ZooKeeperConfig ${namespace}/${name} not found. Assuming it was deleted.`);
      // The object is gone, nothing to do. The deletion logic is handled by the finalizer.
      return;
    }
    logger.error(`Failed to fetch ZooKeeperConfig ${namespace}/${name}`, { error: err.message });
    throw err;
  }
  
  // 2. Handle deletion (via finalizers).
  const finalizerName = `${PLURAL}.${GROUP}/finalizer`;
  if (cr.metadata?.deletionTimestamp && cr.metadata?.finalizers?.includes(finalizerName)) {
    await handleDeletion(cr);
    // Remove the finalizer to allow Kubernetes to delete the object.
    try {
        const finalizers = cr.metadata.finalizers.filter(f => f !== finalizerName);
        const patch = [{ op: 'replace', path: '/metadata/finalizers', value: finalizers }];
        await k8sApi.patchNamespacedCustomObject(GROUP, VERSION, namespace, PLURAL, name, patch, undefined, undefined, undefined, { headers: { 'Content-Type': 'application/json-patch+json' } });
        logger.info(`Finalizer removed for ${namespace}/${name}`);
    } catch(err) {
        logger.error(`Failed to remove finalizer for ${namespace}/${name}`, { error: err });
        // Re-throw to trigger a retry.
        throw err;
    }
    return;
  }

  // Add finalizer if it doesn't exist. This ensures we can handle deletion gracefully.
  if (!cr.metadata?.finalizers?.includes(finalizerName)) {
    try {
        const finalizers = cr.metadata?.finalizers || [];
        finalizers.push(finalizerName);
        const patch = [{ op: 'replace', path: '/metadata/finalizers', value: finalizers }];
        await k8sApi.patchNamespacedCustomObject(GROUP, VERSION, namespace, PLURAL, name, patch, undefined, undefined, undefined, { headers: { 'Content-Type': 'application/json-patch+json' } });
        logger.info(`Finalizer added for ${namespace}/${name}`);
    } catch(err) {
        logger.error(`Failed to add finalizer for ${namespace}/${name}`, { error: err });
        throw err;
    }
  }


  // 3. Main reconciliation logic.
  try {
    await zkClient.ensureConnected();
    const { path, data, encoding, syncPolicy } = cr.spec;
    const nodeExists = await zkClient.exists(path);

    const bufferData = Buffer.from(data, 'base64');

    if (nodeExists) {
      if (syncPolicy === 'createOnly') {
        logger.info(`Node ${path} exists and syncPolicy is 'createOnly'. Skipping update.`);
        await updateStatus(cr, 'Synced', 'Node exists and syncPolicy is createOnly.');
        return;
      }
      
      // Node exists, check if data needs update.
      const currentData = await zkClient.getData(path);
      if (currentData && Buffer.compare(currentData, bufferData) !== 0) {
        logger.info(`Data mismatch for node ${path}. Updating.`);
        await zkClient.setData(path, bufferData);
        await updateStatus(cr, 'Synced', `Node ${path} updated successfully.`);
      } else {
        logger.info(`Node ${path} is already in the desired state.`);
        // Even if no change, update status to refresh timestamp
        await updateStatus(cr, 'Synced', 'Node is already in desired state.');
      }
    } else {
      // Node does not exist, create it.
      logger.info(`Node ${path} does not exist. Creating.`);
      await zkClient.create(path, bufferData);
      await updateStatus(cr, 'Synced', `Node ${path} created successfully.`);
    }

  } catch (error: any) {
    logger.error(`Reconciliation failed for ${namespace}/${name}`, { error: error.message, path: cr.spec.path });
    await updateStatus(cr, 'Error', `Failed to sync: ${error.message}`);
    // Re-throw error to signal to the controller runtime that this reconciliation failed and should be retried.
    throw error;
  }
}

async function handleDeletion(cr: ZooKeeperConfig): Promise<void> {
    const { path } = cr.spec;
    logger.info(`Handling deletion for ZooKeeperConfig ${cr.metadata?.namespace}/${cr.metadata?.name}, removing ZNode ${path}`);
    try {
        await zkClient.ensureConnected();
        const nodeExists = await zkClient.exists(path);
        if (nodeExists) {
            await zkClient.remove(path);
            logger.info(`Successfully removed ZNode ${path}`);
        } else {
            logger.warn(`ZNode ${path} not found during deletion. Skipping.`);
        }
    } catch (error: any) {
        logger.error(`Failed to remove ZNode ${path}`, { error: error.message });
        // In a production system, you might want to implement a retry mechanism or alert here.
        // For now, we re-throw to block finalizer removal and retry.
        throw error;
    }
}


/**
 * Updates the status subresource of the ZooKeeperConfig object.
 */
async function updateStatus(cr: ZooKeeperConfig, state: 'Synced' | 'Error' | 'Pending', message: string): Promise<void> {
    const { namespace, name, generation } = cr.metadata!;
    const status = {
        observedGeneration: generation,
        state,
        message,
        lastSyncedTime: new Date().toISOString(),
    };

    // The status patch must be a strategic merge patch or JSON patch.
    const patch = [{ op: 'replace', path: '/status', value: status }];

    try {
        await k8sApi.patchNamespacedCustomObjectStatus(
            GROUP, VERSION, namespace!, PLURAL, name!,
            patch,
            undefined, undefined, undefined,
            { headers: { 'Content-Type': 'application/json-patch+json' } }
        );
    } catch (err: any) {
        logger.error(`Failed to update status for ${namespace}/${name}`, { error: err.body.message });
    }
}

这段代码的核心考量点:

  • 幂等性:每次调谐都从读取最新状态开始,无论执行多少次,对于同一个spec,结果都应该是一致的。
  • 错误处理与重试:当与ZooKeeper或K8s API Server通信失败时,函数会抛出异常。控制器的主循环(未在代码中展示,通常是一个简单的while(true)try/catch)会捕获它,并经过一段退避延时后重试,确保最终一致性。
  • 状态反馈updateStatus函数至关重要。它将调谐结果写回CR的status字段,使得我们可以通过kubectl get zkc -o yaml清晰地看到每个配置的同步状态。
  • 优雅删除 (Finalizers): 这是Operator开发中的一个关键模式。我们给每个CR对象添加一个finalizer。当用户删除CR时,Kubernetes不会立即清理它,而是设置一个deletionTimestamp。我们的Operator检测到这个时间戳,执行清理逻辑(删除对应的ZNode),清理成功后再移除finalizer。此时,Kubernetes才会真正删除这个CR对象。这保证了外部资源(ZNode)的生命周期与Kubernetes资源同步。

4. 与Flux CD集成

集成是整个流程中最简单的部分。我们只需要在Git仓库中创建一个指向ZooKeeperConfig清单目录的Kustomization对象。

infrastructure/flux/zk-configs-kustomization.yaml:

apiVersion: kustomize.toolkit.fluxcd.io/v1beta2
kind: Kustomization
metadata:
  name: zookeeper-configs
  namespace: flux-system
spec:
  interval: 5m
  path: "./zookeeper-configs" # Git仓库中存放ZooKeeperConfig CR的目录
  prune: true
  sourceRef:
    kind: GitRepository
    name: platform-config-repo
  validation: client

现在,开发人员的工作流程变得极其顺畅。要修改一个功能开关,他们只需:

  1. zookeeper-configs目录下修改或创建一个YAML文件。

    zookeeper-configs/feature-new-recommendation-engine.yaml:

    apiVersion: platform.example.com/v1alpha1
    kind: ZooKeeperConfig
    metadata:
      name: feature-new-recommendation-engine
      namespace: default
    spec:
      path: /app/features/new-recommendation-engine/enabled
      # The base64 representation of the string "true"
      data: "dHJ1ZQ=="
      encoding: "utf-8"
  2. 提交并推送这个变更到Git仓库。

  3. Flux CD检测到变更,将ZooKeeperConfig应用到集群。

  4. 我们的TypeScript Operator收到通知,执行调谐逻辑,在ZooKeeper中创建或更新/app/features/new-recommendation-engine/enabled节点,将其值设为true

整个过程完全自动化,有版本记录,可审计,可回滚(只需在Git中revert一个提交)。

局限性与未来迭代路径

这个方案虽然解决了核心痛点,但并非完美。在真实生产环境中,它还存在一些局限和可以优化的方向。

首先,目前的Operator是单线程的调谐模型。如果短时间内有大量的ZooKeeperConfig变更,可能会导致处理延迟。一个可行的优化是引入工作队列模式,将调谐任务放入队列,由一组worker并发处理,以提高吞吐量。

其次,对ZooKeeper集群的连接管理可以做得更健壮。当前实现依赖node-zookeeper-client库的内部重连逻辑,但我们可以增加更上层的健康检查和熔断机制。例如,如果Operator连续多次无法连接到ZooKeeper,应主动更新所有相关CR的状态为ErrorOutOfSync,并触发告警,而不是无限期地重试。

最后,可以进一步丰富CRD的能力,比如支持设置ZNode的ACL(访问控制列表),或者支持从ConfigMap或Secret中引用data字段,避免将敏感信息直接存储在Git中。这需要Operator增加对ConfigMap和Secret资源的Watch逻辑,并在调谐时进行关联读取。


  目录