利用 Apache Pulsar 的状态化特性为 MLOps 平台构建动态服务发现


我们在构建新一代 MLOps 平台时,遇到的第一个棘手问题并非模型训练或部署,而是服务发现。MLOps 环境中的服务与传统的 Web 服务有本质区别:它们数量众多、生命周期短暂且状态复杂。一个典型的 MLOps 工作流可能包含特征工程服务、数据标注服务、多个并行的模型训练作业(作为服务运行)、模型评估服务以及最终的在线推理服务。

graph TD
    subgraph MLOps Platform
        A[API Gateway] --> B{Orchestrator};
        B -- trigger --> C[Training Job Service];
        B -- deploy --> D[Inference Service];
        C -- reads --> E[Feature Store];
        D -- reads --> E;
        F[Client App] --> D;
    end

    subgraph Service Discovery Problem
        C -- "How to find E?" --> G((?));
        D -- "How to find E?" --> G;
        A -- "How to find D?" --> G;
        B -- "How to find C & D?" --> G;
    end

    style G fill:#f9f,stroke:#333,stroke-width:2px

这里的挑战在于:

  1. 短暂性 (Ephemerality): 训练作业服务可能只存活几小时甚至几分钟,它们启动、上报自己的状态、完成任务然后销毁。传统的 DNS 或基于静态配置的服务发现完全不适用。
  2. 状态丰富性 (Rich State): 一个推理服务不仅仅是一个 IP:Port。我们需要知道它正在提供哪个模型的哪个版本、它是否加载了特定的数据批次、它是否绑定到特定的 GPU 设备。这些元数据是动态的,并且是路由决策的关键。
  3. 事件驱动本质 (Event-Driven Nature): 整个 MLOps 流程是事件驱动的。例如,“训练数据准备就绪”事件触发训练作业,“模型评估通过”事件触发部署流程。我们的服务发现机制最好能与这种范式天然契合。

方案A:沿用传统KV存储方案 (Consul / Etcd)

最直接的思路是引入一个成熟的服务发现组件,比如 Consul 或 Etcd。这是业界的标准做法,方案清晰。

优势分析:

  • 成熟稳定: 经过了大规模生产环境的严苛考验。
  • 强一致性: 基于 Raft 协议,能保证服务注册信息的强一致性,这对于服务发现至关重要。
  • 丰富生态: 提供了 HTTP API、DNS 接口,并有大量的客户端库支持。

劣势与不匹配之处:

  • 引入新的技术栈: 在我们的平台中,已经决定使用 Apache Pulsar 作为核心的消息与事件总线,用于传递数据集更新、模型训练状态、推理日志等。再引入一套 Consul/Etcd 集群意味着额外的部署、运维和监控成本。在真实项目中,每增加一个中间件都需要严肃的成本评估。
  • 拉模型 (Pull Model) 的局限: Consul 的健康检查是典型的拉模型。Consul Agent 会定期轮询服务的健康检查端点。对于成百上千个短暂的训练作业,这种模式会给 Consul Server 带来巨大的周期性负载,并且在作业生命周期极短的情况下,健康检查的发现延迟可能无法接受。
  • 阻抗不匹配: 我们的 MLOps 平台是事件驱动的,而 Consul/Etcd 的核心是状态存储。虽然它们都提供了 Watch 机制来模拟事件通知,但这与我们系统中已经存在的 Pulsar 事件流是割裂的。这意味着服务需要同时与 Pulsar(业务事件)和 Consul(服务发现事件)交互,增加了复杂性。

方案B:利用 Apache Pulsar 自身特性统一平台

一个更大胆的想法是:我们能否利用已经作为平台核心的 Pulsar 来实现服务发现?如果可行,将极大简化整体架构。

Pulsar 不仅仅是一个消息队列,它构建在 Apache BookKeeper 之上,本质是一个持久化的、可复制的日志存储系统。这为我们提供了实现服务发现所需的“状态存储”基础。

核心思路:
利用 Pulsar 的 Compacted Topic (压缩主题)TableView 特性。

  1. 注册 (Registration): 创建一个专门的 Pulsar Topic,例如 service-registry。每个服务实例在启动时,向这个 Topic 发送一条包含其元数据(服务名、实例ID、地址、模型版本、GPU信息等)的消息。消息的 Key 是该服务实例的唯一标识符(如 inference-service:instance-abc-123)。
  2. 心跳 (Heartbeat): 服务实例周期性地向该 Topic 发送同样 Key 的消息,以更新其状态或表示自己仍然存活。
  3. 注销 (Deregistration): 服务实例在正常关闭前,发送一条特殊的消息,例如 Value 为空的 “墓碑消息” (Tombstone Message)。
  4. 发现 (Discovery): 服务消费者创建一个 Pulsar TableView 来消费 service-registry 这个 Topic。TableView 是 Pulsar 2.7.0 引入的一个强大抽象,它会在客户端内存中维护一个 Topic 的“最终状态视图”。对于每个 Key,TableView 只会保留最新的那条消息。它会自动处理 Topic Compaction 带来的消息变更,并以一个 Map<Key, Value> 的形式暴露给应用。

优势分析:

  • 架构统一: 无需引入额外的 Consul/Etcd 集群。服务发现与业务事件流共享同一个基础设施,降低了运维复杂度和成本。
  • 天然的推模型 (Push Model): 服务实例主动上报状态。客户端通过 TableView 的监听器可以近乎实时地接收到服务上线、下线或状态变更的通知。这与 MLOps 的事件驱动特性完美契合。
  • 持久化与可追溯: service-registry 主题中的所有历史注册、心跳和注销消息都可以被持久化。这为事后审计、故障排查和分析服务生命周期提供了极大的便利,这是 Consul 难以做到的。
  • 丰富的元数据: Pulsar 消息体可以承载复杂的结构化数据(如 Avro, Protobuf),非常适合 MLOps 服务丰富的元数据需求。

潜在风险与权衡:

  • 非标准用法: 这并非 Pulsar 的首要设计目标。相关的最佳实践和社区文档较少,我们需要自己趟过一些坑。
  • 最终一致性: TableView 的状态更新存在一定的延迟(从消息发布到客户端 TableView 更新)。虽然这个延迟通常在毫秒级,但在极端场景下,它并非像 Etcd 那样提供线性一致性的读。对于我们的 MLOps 场景,这种最终一致性是完全可以接受的。
  • 客户端复杂性: 需要在客户端实现服务注册和心跳逻辑,而使用 Consul 则有现成的 Agent。

最终决策与理由

我们最终选择了方案 B。核心决策依据是架构的简约性范式的一致性。在 MLOps 这种高度动态和事件驱动的系统中,将服务发现能力内建于核心事件总线之上,所带来的长期收益(更低的运维成本、更契合的系统模型)超过了其作为非标准用法所带来的短期风险。

核心实现概览

我们将使用 Python 来实现服务注册端 (ServiceRegistrar) 和服务发现端 (ServiceDiscoveryClient)。

1. 定义服务元数据 Schema

首先,定义一个清晰的 JSON Schema 来描述服务实例。在生产项目中,推荐使用 Avro 或 Protobuf 以获得更强的类型约束和性能。

{
  "serviceName": "string",
  "instanceId": "string",
  "address": {
    "host": "string",
    "port": "integer"
  },
  "status": "enum('UP', 'DOWN', 'STARTING')",
  "timestamp": "long (unix epoch millis)",
  "metadata": {
    "modelName": "string",
    "modelVersion": "string",
    "gpuType": "string",
    "batchSize": "integer"
  }
}

2. 服务注册与心跳 (ServiceRegistrar)

这是一个在服务实例内部运行的组件,负责自身的注册和生命周期管理。

import pulsar
import time
import json
import uuid
import threading
import logging
import atexit

# 配置日志
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')

class ServiceRegistrar:
    """
    负责服务实例的注册、心跳维持和优雅注销。
    """
    def __init__(self, pulsar_url: str, topic_name: str, service_info: dict, heartbeat_interval_sec: int = 15):
        if not all(k in service_info for k in ['serviceName', 'address']):
            raise ValueError("service_info must contain 'serviceName' and 'address'")

        self.pulsar_url = pulsar_url
        self.topic_name = topic_name
        self.service_info = service_info
        self.heartbeat_interval_sec = heartbeat_interval_sec
        
        # 为每个实例生成唯一ID
        self.instance_id = f"{self.service_info['serviceName']}-{uuid.uuid4()}"
        self.service_info['instanceId'] = self.instance_id
        
        self.client = None
        self.producer = None
        self.heartbeat_thread = None
        self.stop_event = threading.Event()

        # 确保在程序退出时执行注销逻辑
        atexit.register(self.stop)

    def start(self):
        """连接Pulsar并启动心跳线程"""
        try:
            self.client = pulsar.Client(self.pulsar_url)
            self.producer = self.client.create_producer(
                self.topic_name,
                producer_name=f"registrar-{self.instance_id}",
                # 关键:设置消息路由模式,确保相同 key 的消息进入同一分区
                # 这对于 Topic Compaction 至关重要
                message_routing_mode=pulsar.MessageRoutingMode.SinglePartition
            )
            logging.info(f"Service registrar for instance {self.instance_id} connected to Pulsar.")
            
            # 立即注册
            self._register()

            # 启动后台心跳线程
            self.heartbeat_thread = threading.Thread(target=self._heartbeat_loop, daemon=True)
            self.heartbeat_thread.start()
            logging.info(f"Heartbeat thread for {self.instance_id} started.")

        except Exception as e:
            logging.error(f"Failed to start ServiceRegistrar for {self.instance_id}: {e}")
            self.stop()
            raise

    def stop(self):
        """停止心跳并发送注销消息"""
        if self.stop_event.is_set():
            return
            
        logging.info(f"Stopping service registrar for {self.instance_id}...")
        self.stop_event.set()
        
        if self.heartbeat_thread and self.heartbeat_thread.is_alive():
            self.heartbeat_thread.join()

        try:
            self._deregister()
        finally:
            if self.producer:
                self.producer.close()
            if self.client:
                self.client.close()
            logging.info(f"Service registrar for {self.instance_id} stopped and cleaned up.")

    def update_metadata(self, new_metadata: dict):
        """允许服务在运行时更新其元数据"""
        self.service_info['metadata'] = new_metadata
        # 立即发送一次更新,而不是等待下一次心跳
        self._send_heartbeat('UP')
        logging.info(f"Metadata updated for {self.instance_id}. New metadata: {new_metadata}")

    def _register(self):
        """发送初始注册消息"""
        logging.info(f"Registering service instance {self.instance_id}...")
        self._send_heartbeat('STARTING')
        # 模拟服务启动过程
        time.sleep(2)
        self._send_heartbeat('UP')
        logging.info(f"Service instance {self.instance_id} registered successfully with status UP.")

    def _deregister(self):
        """发送注销消息 (Tombstone)"""
        if not self.producer:
            return
        try:
            # 一个常见的错误是忘记发送墓碑消息。没有它,服务下线后,其记录会永远留在TableView中直到被新的心跳覆盖。
            # 发送一个 value 为 None (or empty) 的消息,Pulsar Compaction 会将其视为对该 key 的删除。
            logging.info(f"Deregistering service instance {self.instance_id} by sending tombstone message.")
            self.producer.send(
                content=None,
                partition_key=self.instance_id
            )
            self.producer.flush()
        except Exception as e:
            logging.error(f"Failed to send deregister message for {self.instance_id}: {e}")

    def _heartbeat_loop(self):
        """周期性发送心跳"""
        while not self.stop_event.wait(self.heartbeat_interval_sec):
            self._send_heartbeat('UP')

    def _send_heartbeat(self, status: str):
        """构建并发送心跳消息"""
        self.service_info['status'] = status
        self.service_info['timestamp'] = int(time.time() * 1000)
        
        try:
            payload = json.dumps(self.service_info).encode('utf-8')
            self.producer.send_async(
                content=payload,
                partition_key=self.instance_id,
                callback=self._send_callback
            )
        except Exception as e:
            logging.error(f"Failed to send heartbeat for {self.instance_id}: {e}")

    @staticmethod
    def _send_callback(result, msg_id):
        if result != pulsar.Result.Ok:
            logging.warning(f"Failed to send message: {result}, msg_id: {msg_id}")
        else:
            logging.debug(f"Heartbeat sent successfully, msg_id: {msg_id}")


# 示例:一个模拟的推理服务
if __name__ == '__main__':
    service_details = {
        "serviceName": "model-inference-server",
        "address": {
            "host": "10.0.1.100",
            "port": 8080
        },
        "metadata": {
            "modelName": "resnet50",
            "modelVersion": "v1.2.0",
            "gpuType": "NVIDIA_A100",
            "batchSize": 32
        }
    }
    
    registrar = ServiceRegistrar(
        pulsar_url='pulsar://localhost:6650',
        topic_name='persistent://public/default/service-registry',
        service_info=service_details,
        heartbeat_interval_sec=10
    )
    
    try:
        registrar.start()
        # 模拟服务运行
        logging.info("Service is now 'running'. Press Ctrl+C to stop.")
        # 模拟运行时元数据更新
        time.sleep(25)
        registrar.update_metadata({
            "modelName": "resnet50",
            "modelVersion": "v1.2.1", # 模型版本更新
            "gpuType": "NVIDIA_A100",
            "batchSize": 64 # 动态调整批大小
        })
        while True:
            time.sleep(1)
    except KeyboardInterrupt:
        logging.info("Shutdown signal received.")
    finally:
        # atexit 会自动调用 registrar.stop(),但显式调用是好习惯
        registrar.stop()

3. 服务发现客户端 (ServiceDiscoveryClient)

这个客户端供需要调用其他服务的应用使用,它维护了一个本地的服务实例缓存。

import pulsar
import json
import logging
from typing import List, Dict, Callable, Optional

logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')

class ServiceDiscoveryClient:
    """
    使用 Pulsar TableView 发现和监控服务实例。
    """
    def __init__(self, pulsar_url: str, topic_name: str):
        self.pulsar_url = pulsar_url
        self.topic_name = topic_name
        self.client = None
        self.table_view = None
        self.is_started = False

    def start(self):
        """连接 Pulsar 并初始化 TableView"""
        if self.is_started:
            return
        try:
            self.client = pulsar.Client(self.pulsar_url)
            self.table_view = self.client.create_table_view(
                self.topic_name,
                schema=pulsar.schema.BytesSchema() # 以字节流接收,然后在应用层解码
            )
            self.is_started = True
            logging.info(f"Service discovery client connected and TableView for topic '{self.topic_name}' is active.")
        except Exception as e:
            logging.error(f"Failed to start ServiceDiscoveryClient: {e}")
            self.stop()
            raise

    def stop(self):
        """关闭 TableView 和客户端连接"""
        if not self.is_started:
            return
        if self.table_view:
            self.table_view.close()
        if self.client:
            self.client.close()
        self.is_started = False
        logging.info("Service discovery client stopped.")

    def get_all_services(self) -> Dict[str, Dict]:
        """
        获取当前所有活跃服务的快照。
        
        返回一个字典,key是instanceId,value是服务详情。
        这里的坑在于:必须对返回的值进行解码。
        """
        if not self.is_started:
            raise RuntimeError("Client not started. Call start() first.")
        
        snapshot = self.table_view.items()
        services = {}
        for instance_id, value_bytes in snapshot:
            if value_bytes: # 过滤掉墓碑消息 (value is None or empty)
                try:
                    services[instance_id] = json.loads(value_bytes.decode('utf-8'))
                except (json.JSONDecodeError, UnicodeDecodeError) as e:
                    logging.warning(f"Failed to decode message for instance {instance_id}: {e}")
        return services

    def get_service_instances(self, service_name: str) -> List[Dict]:
        """根据服务名获取所有健康 ('UP') 的实例列表"""
        all_services = self.get_all_services().values()
        return [
            s for s in all_services 
            if s.get('serviceName') == service_name and s.get('status') == 'UP'
        ]

    def listen_for_changes(self, callback: Callable[[str, Optional[Dict]], None]):
        """
        注册一个回调函数来监听 TableView 的变化。
        回调函数接收 (instance_id, service_data)。
        如果 service_data 为 None,表示服务已注销。
        
        注意:Pulsar Python 客户端的 TableView 尚不直接支持 add/remove listener 模式。
        这里我们通过定期轮询 TableView 来模拟这个功能。
        在 Java 客户端中,有更原生的 listener 支持。
        这是一个当前 Python 客户端实现的局限性,需要注意。
        """
        # 此处仅为示例,生产级实现需要更健壮的轮询或等待 Java Client 的功能对齐
        # 或者使用 Reader API 来实现更底层的事件监听。
        # 但 TableView 的核心优势在于状态压缩,所以我们接受这个权衡。
        logging.warning("Python client TableView listener is simulated via polling.")
        
        known_state = {}
        while self.is_started:
            current_state = {
                k: json.loads(v.decode('utf-8')) if v else None 
                for k, v in self.table_view.items()
            }
            
            # 检查新增或更新
            for instance_id, data in current_state.items():
                if instance_id not in known_state or known_state[instance_id] != data:
                    callback(instance_id, data)
            
            # 检查删除
            for instance_id in list(known_state.keys()):
                if instance_id not in current_state:
                    callback(instance_id, None)

            known_state = current_state
            time.sleep(5)


# 示例:一个需要发现推理服务的客户端应用
if __name__ == '__main__':
    discovery_client = ServiceDiscoveryClient(
        pulsar_url='pulsar://localhost:6650',
        topic_name='persistent://public/default/service-registry'
    )
    
    try:
        discovery_client.start()
        
        # 轮询查询服务
        for i in range(5):
            logging.info(f"--- Polling services (Attempt {i+1}) ---")
            inference_services = discovery_client.get_service_instances('model-inference-server')
            if inference_services:
                logging.info(f"Found {len(inference_services)} active inference instances:")
                for service in inference_services:
                    logging.info(f"  - Instance: {service['instanceId']}, "
                                 f"Address: {service['address']['host']}:{service['address']['port']}, "
                                 f"Model: {service['metadata']['modelVersion']}")
            else:
                logging.info("No active inference instances found.")
            time.sleep(10)

    except KeyboardInterrupt:
        pass
    finally:
        discovery_client.stop()

架构的扩展性与局限性

这个基于 Pulsar 的服务发现机制,虽然巧妙,但并非万能。

扩展性:

  • 客户端负载均衡: 服务发现客户端获取到所有健康实例列表后,可以轻松实现各种客户端负载均衡策略,如轮询、随机或基于元数据(例如,优先路由到处理小批次的实例)。
  • 服务治理: service-registry Topic 可以被其他治理平台消费,用于生成全局服务拓扑、监控服务健康状况、进行容量分析等。
  • 多区域部署: 利用 Pulsar 的 Geo-Replication 特性,可以将 service-registry Topic 在多个数据中心之间同步,从而实现跨区域的服务发现,这是构建多活 MLOps 平台的基础。

局限性与适用边界:

  • Pulsar 依赖: 整个方案强依赖于 Pulsar 集群的健康。Pulsar 的故障会直接导致服务发现失效。这要求我们必须具备强大的 Pulsar 运维能力。
  • 最终一致性的影响: 对于需要强一致性读的场景(例如,分布式锁的实现),这个方案并不适用。但在服务发现领域,秒级的最终一致性延迟通常是可以接受的。
  • Topic Compaction 配置: service-registry Topic 的压缩策略需要精心配置。如果压缩触发不频繁,Topic 的积压会很大;如果过于频繁,又会增加 Bookie 的负载。此外,还需要配置消息保留策略和 TTL,以自动清理那些未能优雅下线的“僵尸”服务实例留下的记录。
  • 客户端库的成熟度: 如上所示,Python 客户端的 TableView 功能相较于 Java 客户端还有待完善。在真实项目中,可能需要评估是否需要自行封装或使用更底层的 Reader API 来获得更精细的控制。

这个方案本质上是用 Pulsar 的日志抽象能力,模拟了一个带历史追溯功能的 KV 存储。它在 MLOps 这种特定领域的表现优于传统方案,因为它完美地拥抱了事件驱动的本质,并简化了技术栈。但它也清晰地界定了自己的边界,提醒我们在技术选型时,没有银弹,只有最适合特定场景的权衡。


  目录