使用 OpenFaaS 与 Tekton 在 GKE 上构建事件驱动的异步数据处理管道


一个棘手的生产问题摆在面前:用户上传视频文件到 GCS,我们需要对其进行一系列处理——生成多种分辨率的转码版本、提取关键帧作为封面、分析元数据。这些操作耗时且资源密集,从几秒到几十分钟不等。直接在 API 服务中同步处理会造成请求超时,而简单的后台任务队列又难以管理复杂的多阶段依赖和并发。

最初的构想是使用一个长时间运行的 Pod 消费消息队列,但这套方案在资源利用率上并不理想。大部分时间,这个 Pod 可能是空闲的,却依然占用着 GKE 节点的 CPU 和内存。我们需要一个更云原生的解决方案:仅在有工作时才消耗资源,并且能够清晰地编排和观测复杂的工作流。

技术选型聚焦在了 OpenFaaS 和 Tekton 的组合上。OpenFaaS 擅长作为事件网关,它能以极低的资源占用率持续监听事件,并快速响应。但它的函数执行时间有硬性限制,不适合长时间运行的转码任务。Tekton 恰好弥补了这一点,它是一个为 Kubernetes 设计的强大 CI/CD 流水线引擎,但我们完全可以将其用于通用的工作流编排。它的 TaskPipeline 概念能够完美地描述我们的视频处理流程。

我们的架构思路是:

  1. 事件入口: 一个轻量级的 OpenFaaS Node.js 函数接收 GCS 的上传完成事件。
  2. 职责分离: 该函数不执行任何实际的数据处理。它的唯一职责是验证事件,然后以编程方式创建一个 Tekton PipelineRun 资源。
  3. 工作流执行: Tekton Controller 感知到新的 PipelineRun 后,开始调度执行定义好的、包含多个 Task(转码、截图等)的 Pipeline
  4. 资源隔离: 每个 Task 都在一个独立的 Pod 中运行,任务完成后 Pod 销毁,资源被释放。

这种模式将快速响应的事件接收层与高资源消耗的批处理层彻底解耦,实现了真正的按需计算。

环境准备与 Tekton 组件安装

假设我们已经有一个运行中的 GKE 集群,并且 kubectlgcloud 命令行工具已配置妥当。首先,安装 Tekton Pipelines。在真实项目中,我们会使用 GitOps 工具如 ArgoCD 来管理这些清单,但这里为了演示,直接使用 kubectl

# 安装 Tekton Pipelines 最新版
kubectl apply --filename https://storage.googleapis.com/tekton-releases/pipeline/latest/release.yaml

# (可选) 安装 Tekton CLI (tkn) 用于简化交互
# brew install tektoncd-cli

安装完成后,可以通过检查 Pod 状态确认 Tekton Controller 和 Webhook 是否正常运行。

kubectl get pods --namespace tekton-pipelines

设计 Tekton 数据处理流水线

我们的视频处理流水线包含四个核心步骤:

  1. clone-and-prepare: 从模拟的代码仓库(或 GCS)拉取处理脚本和视频文件。
  2. transcode-1080p: 转码为 1080p 分辨率。
  3. transcode-720p: 转码为 720p 分辨率。
  4. extract-thumbnail: 提取视频第5秒的帧作为缩略图。

这其中,两个转码任务可以并行执行以缩短总处理时间。

1. 定义 WorkspaceTasks

Workspace 是 Tekton 中用于在不同 Task 之间共享文件的关键机制。我们将使用一个 PersistentVolumeClaim 类型的 Workspace 来存储原始视频和处理产物。

首先,定义各个独立的 Task。这里的关键在于将每个操作封装成一个可复用的单元。

task-transcode.yaml:

apiVersion: tekton.dev/v1beta1
kind: Task
metadata:
  name: ffmpeg-transcode
spec:
  description: >-
    Transcodes a video file to a specified resolution using ffmpeg.
  params:
    - name: sourceFile
      description: The path to the source video file within the workspace.
      type: string
    - name: targetResolution
      description: The target resolution (e.g., '1920x1080').
      type: string
    - name: targetFilename
      description: The name of the output file.
      type: string
  workspaces:
    - name: data
      description: The workspace containing the source video and for storing the output.
  steps:
    - name: transcode
      image: jrottenberg/ffmpeg # 使用一个包含 ffmpeg 的公共镜像
      workingDir: $(workspaces.data.path)
      script: |
        #!/bin/bash
        set -e # 任何命令失败则立即退出
        
        echo "Starting transcoding for $(params.sourceFile) to $(params.targetResolution)..."
        
        # 检查源文件是否存在
        if [ ! -f "$(params.sourceFile)" ]; then
          echo "Error: Source file $(params.sourceFile) not found!"
          exit 1
        fi
        
        # -y: 覆盖输出文件, -i: 输入文件, -vf: 视频滤镜 (scale), -c:v: 视频编码器, -preset: 编码速度与质量权衡
        ffmpeg -y -i "$(params.sourceFile)" -vf scale=$(params.targetResolution) -c:v libx264 -preset medium "$(params.targetFilename)"
        
        echo "Transcoding complete. Output: $(params.targetFilename)"

task-extract-thumbnail.yaml:

apiVersion: tekton.dev/v1beta1
kind: Task
metadata:
  name: ffmpeg-extract-thumbnail
spec:
  description: >-
    Extracts a single frame from a video file as a thumbnail.
  params:
    - name: sourceFile
      description: The path to the source video file.
      type: string
    - name: timestamp
      description: The timestamp to capture the frame from (e.g., '00:00:05').
      type: string
      default: "00:00:05"
    - name: targetFilename
      description: The name of the output thumbnail file.
      type: string
      default: "thumbnail.jpg"
  workspaces:
    - name: data
      description: The workspace containing the video.
  steps:
    - name: extract
      image: jrottenberg/ffmpeg
      workingDir: $(workspaces.data.path)
      script: |
        #!/bin/bash
        set -e
        
        echo "Extracting thumbnail from $(params.sourceFile) at $(params.timestamp)..."
        
        if [ ! -f "$(params.sourceFile)" ]; then
          echo "Error: Source file $(params.sourceFile) not found!"
          exit 1
        fi
        
        # -ss: seek to position, -vframes 1: 只提取一帧
        ffmpeg -y -i "$(params.sourceFile)" -ss "$(params.timestamp)" -vframes 1 "$(params.targetFilename)"
        
        echo "Thumbnail extraction complete. Output: $(params.targetFilename)"

注意这些 Task 的设计:它们是参数化的、独立的,并且只做一件事。这是保证流水线可维护性的关键。

2. 编排 Pipeline

现在,我们将这些 Task 组合成一个完整的 Pipeline

pipeline-video-processing.yaml:

apiVersion: tekton.dev/v1beta1
kind: Pipeline
metadata:
  name: video-processing-pipeline
spec:
  description: >-
    A pipeline to transcode a video into multiple resolutions and extract a thumbnail.
  params:
    - name: sourceVideo
      description: The name of the source video file.
      type: string
  workspaces:
    - name: shared-data
      description: PVC for sharing video files between tasks.
  tasks:
    # 模拟从GCS下载视频的步骤,实际项目中会替换为真实下载逻辑
    - name: fetch-video
      taskSpec:
        workspaces:
          - name: data
        params:
          - name: videoName
            type: string
        steps:
          - name: get-video
            image: curlimages/curl
            workingDir: $(workspaces.data.path)
            script: |
              #!/bin/sh
              # 在真实场景中,这里会使用 gsutil 等工具从 GCS 下载
              # 为了演示,我们创建一个伪视频文件
              echo "Simulating download for $(params.videoName)..."
              dd if=/dev/urandom of=$(params.videoName) bs=1M count=10
              echo "File $(params.videoName) created."
      workspaces:
        - name: data
          workspace: shared-data
      params:
        - name: videoName
          value: $(params.sourceVideo)

    - name: transcode-1080p
      taskRef:
        name: ffmpeg-transcode
      runAfter: [ "fetch-video" ] # 必须在 fetch-video 之后运行
      workspaces:
        - name: data
          workspace: shared-data
      params:
        - name: sourceFile
          value: $(params.sourceVideo)
        - name: targetResolution
          value: "1920x1080"
        - name: targetFilename
          value: "output-1080p.mp4"

    - name: transcode-720p
      taskRef:
        name: ffmpeg-transcode
      runAfter: [ "fetch-video" ] # 与 1080p 转码任务并行
      workspaces:
        - name: data
          workspace: shared-data
      params:
        - name: sourceFile
          value: $(params.sourceVideo)
        - name: targetResolution
          value: "1280x720"
        - name: targetFilename
          value: "output-720p.mp4"

    - name: extract-thumbnail
      taskRef:
        name: ffmpeg-extract-thumbnail
      runAfter: [ "fetch-video" ] # 缩略图提取可以与转码并行
      workspaces:
        - name: data
          workspace: shared-data
      params:
        - name: sourceFile
          value: $(params.sourceVideo)
        - name: targetFilename
          value: "thumbnail.jpg"

这个 Pipeline 定义了任务间的依赖关系 (runAfter),允许多个任务并行执行,并通过 workspaces 实现了文件共享。

我们可以用 Mermaid.js 可视化这个工作流:

graph TD
    A[fetch-video] --> B[transcode-1080p];
    A --> C[transcode-720p];
    A --> D[extract-thumbnail];

构建 OpenFaaS 触发器

现在,我们需要一个 Node.js 函数来接收外部事件,并启动这个 Pipeline

1. RBAC 权限配置

这是最容易被忽略但至关重要的一步。OpenFaaS 函数运行在一个 Pod 中,它需要明确的权限才能在 Kubernetes API Server 中创建 PipelineRun 资源。我们将创建一个 ServiceAccount,并通过 RoleRoleBinding 授予它必要的权限。

rbac-for-faas.yaml:

apiVersion: v1
kind: ServiceAccount
metadata:
  name: tekton-trigger-sa
  namespace: openfaas-fn # 假设函数部署在这个命名空间

---
apiVersion: rbac.authorization.k8s.io/v1
kind: Role
metadata:
  name: tekton-pipelinerun-creator
  namespace: default # PipelineRun 将被创建在 default 命名空间
rules:
- apiGroups: ["tekton.dev"]
  resources: ["pipelineruns"]
  verbs: ["create", "get", "list"]

---
apiVersion: rbac.authorization.k8s.io/v1
kind: RoleBinding
metadata:
  name: bind-sa-to-pipelinerun-creator
  namespace: default # RoleBinding 必须在 Role 所在的命名空间
roleRef:
  apiGroup: rbac.authorization.k8s.io
  kind: Role
  name: tekton-pipelinerun-creator
subjects:
- kind: ServiceAccount
  name: tekton-trigger-sa
  namespace: openfaas-fn # 引用函数所在的 ServiceAccount

将这些资源应用到集群后,我们就可以在部署 OpenFaaS 函数时指定使用 tekton-trigger-sa 这个 ServiceAccount

2. Node.js 函数实现

我们将使用官方的 OpenFaaS Node.js 模板 (node18-express),并引入 @kubernetes/client-node 库来与 Kubernetes API 交互。

tekton-trigger/handler.js:

'use strict'

const k8s = require('@kubernetes/client-node');

// Kubernetes client configuration
const kc = new k8s.KubeConfig();
// 在集群内部署时,loadFromCluster 会自动使用 ServiceAccount 的 token
// 在本地开发时,它会使用 ~/.kube/config
process.env.NODE_ENV === 'production' ? kc.loadFromCluster() : kc.loadFromDefault();

const k8sApi = kc.makeApiClient(k8s.CustomObjectsApi);

const TEKTON_GROUP = 'tekton.dev';
const TEKTON_VERSION = 'v1beta1';
const TEKTON_PLURAL = 'pipelineruns';
const TARGET_NAMESPACE = 'default'; // PipelineRun 将被创建在此命名空间

/**
 * Generates a unique name for the PipelineRun.
 * @param {string} baseName 
 * @returns {string} A unique name with a timestamp and random suffix.
 */
const generatePipelineRunName = (baseName) => {
    const timestamp = new Date().getTime();
    const randomSuffix = Math.random().toString(36).substring(2, 7);
    return `${baseName}-${timestamp}-${randomSuffix}`;
};


module.exports = async (event, context) => {
    const { body } = event;

    // --- 1. 输入验证 ---
    // 在真实项目中,这里会有严格的验证逻辑
    // 例如,验证 GCS 事件的签名、解析事件体等
    if (!body || !body.videoFile) {
        return context
            .status(400)
            .fail({ error: "Bad Request: 'videoFile' is required in the request body." });
    }
    const videoFile = body.videoFile;
    console.log(`Received request to process video: ${videoFile}`);

    // --- 2. 构建 PipelineRun 对象 ---
    const pipelineRunName = generatePipelineRunName('video-process');

    const pipelineRunManifest = {
        apiVersion: `${TEKTON_GROUP}/${TEKTON_VERSION}`,
        kind: 'PipelineRun',
        metadata: {
            name: pipelineRunName,
            namespace: TARGET_NAMESPACE,
            labels: {
                "trigger.openfaas.io/source": "video-upload",
                "source-file": videoFile.replace(/[^a-zA-Z0-9-]/g, '-').toLowerCase()
            }
        },
        spec: {
            pipelineRef: {
                name: 'video-processing-pipeline' // 引用我们之前创建的 Pipeline
            },
            params: [
                {
                    name: 'sourceVideo',
                    value: videoFile
                }
            ],
            workspaces: [
                {
                    name: 'shared-data',
                    // 在生产环境中,应该使用一个动态创建的 PVC 或一个预先存在的、
                    // 容量足够的 PVC。这里为了简单,我们假设一个名为 'tekton-pvc' 的 PVC 存在。
                    persistentVolumeClaim: {
                        claimName: 'tekton-pvc' 
                    }
                }
            ],
            // 生产级考量:为 PipelineRun 设置超时
            timeout: "1h0m0s" 
        }
    };
    
    // --- 3. 调用 Kubernetes API 创建 PipelineRun ---
    try {
        console.log(`Creating PipelineRun '${pipelineRunName}' in namespace '${TARGET_NAMESPACE}'...`);
        const result = await k8sApi.createNamespacedCustomObject(
            TEKTON_GROUP,
            TEKTON_VERSION,
            TARGET_NAMESPACE,
            TEKTON_PLURAL,
            pipelineRunManifest
        );
        console.log("Successfully created PipelineRun.");

        return context
            .status(202) // 202 Accepted 表示请求已被接受,正在异步处理
            .succeed({
                message: "Video processing pipeline started successfully.",
                pipelineRunName: pipelineRunName,
                namespace: TARGET_NAMESPACE
            });

    } catch (err) {
        console.error("Error creating PipelineRun:", err.body ? JSON.stringify(err.body) : err.message);
        // 这里的错误处理至关重要,需要记录详细日志以供排查
        // 可能是 RBAC 权限问题、Pipeline 不存在、或 API Server 连接问题
        return context
            .status(500)
            .fail({ 
                error: "Internal Server Error: Failed to trigger the processing pipeline.",
                details: err.body ? err.body.message : "Unknown error"
            });
    }
}

这个函数包含了关键的生产级实践:

  • 环境判断: process.env.NODE_ENV 用于区分本地开发和集群内运行的 KubeConfig 加载方式。
  • 唯一命名: generatePipelineRunName 确保每次触发都创建一个新的、不冲突的 PipelineRun
  • 错误处理: try...catch 块捕获创建 PipelineRun 时可能发生的任何错误,并返回有意义的错误信息。
  • 正确的 HTTP 状态码: 返回 202 Accepted 明确告知客户端这是一个异步操作。

3. 部署函数

最后,配置 stack.yml 文件来部署这个函数到 OpenFaaS。

tekton-trigger/stack.yml:

version: 1.0
provider:
  name: openfaas
  gateway: http://127.0.0.1:8080 # 本地测试时指向 port-forward 的网关

functions:
  tekton-trigger:
    lang: node18-express
    handler: ./tekton-trigger
    image: your-docker-registry/tekton-trigger:latest
    build_args:
      NPM_CONFIG_LOGLEVEL: warn
    annotations:
      # 将函数与我们创建的 ServiceAccount 关联
      com.openfaas.serviceaccount: "tekton-trigger-sa"
    environment:
      NODE_ENV: production
      # 其他可能需要的环境变量

package.json 中添加 @kubernetes/client-node 依赖后,使用 faas-cli up -f tekton-trigger/stack.yml 命令即可构建并部署函数。

触发与观测

部署完成后,我们可以通过调用 OpenFaaS 函数的端点来触发整个流程:

curl -X POST http://<OPENFAAS_GATEWAY_IP>:8080/function/tekton-trigger \
  -H "Content-Type: application/json" \
  -d '{"videoFile": "my-awesome-video.mov"}'

如果一切正常,会收到 202 Accepted 的响应。此时,我们可以使用 tkn CLI 来观测流水线的执行情况:

# 列出最近的 PipelineRun
tkn pipelinerun list

# 查看特定 PipelineRun 的日志,-f 表示实时跟踪
tkn pipelinerun logs <pipelinerun-name> -f

你将看到 fetch-video 任务首先启动,然后 transcode-1080p, transcode-720pextract-thumbnail 并行开始执行,最后所有任务完成,PipelineRun 状态变为 Succeeded

当前方案的局限性与未来迭代

这套架构虽然解决了核心问题,但在生产环境中仍有几个方面需要完善。

首先,状态通知机制是缺失的。当前的实现是“即发即忘”,调用方无法得知处理何时完成、是成功还是失败。一个健壮的系统需要在 Pipeline 的末尾增加一个 finally 任务,无论流水线成功或失败,这个任务都会被执行。它可以向一个回调 URL 发送 HTTP 请求,或向消息队列写入一条状态更新消息,从而闭合整个异步调用的环路。

其次,错误处理和重试策略还比较初级。如果某个转码 Task 因为临时性问题(如节点资源不足)失败,整个 PipelineRun 就会失败。Tekton 本身支持 Task 级别的重试,我们应该为可能出现网络或资源抖动的 Task 配置合理的重试次数。对于无法自动恢复的失败,finally 任务也应捕获错误信息并上报给告警系统。

最后,工作区 (Workspace) 的管理可以更精细化。我们使用了一个共享的 PVC,但在高并发场景下,为每个 PipelineRun 动态创建独立的 PVC 会提供更好的隔离性,避免不同任务间的文件冲突。这可以通过 Tekton 的 VolumeClaimTemplate 来实现,但这会增加存储管理的复杂性。在成本和隔离性之间需要做出权衡。


  目录