我们在构建新一代 MLOps 平台时,遇到的第一个棘手问题并非模型训练或部署,而是服务发现。MLOps 环境中的服务与传统的 Web 服务有本质区别:它们数量众多、生命周期短暂且状态复杂。一个典型的 MLOps 工作流可能包含特征工程服务、数据标注服务、多个并行的模型训练作业(作为服务运行)、模型评估服务以及最终的在线推理服务。
graph TD subgraph MLOps Platform A[API Gateway] --> B{Orchestrator}; B -- trigger --> C[Training Job Service]; B -- deploy --> D[Inference Service]; C -- reads --> E[Feature Store]; D -- reads --> E; F[Client App] --> D; end subgraph Service Discovery Problem C -- "How to find E?" --> G((?)); D -- "How to find E?" --> G; A -- "How to find D?" --> G; B -- "How to find C & D?" --> G; end style G fill:#f9f,stroke:#333,stroke-width:2px
这里的挑战在于:
- 短暂性 (Ephemerality): 训练作业服务可能只存活几小时甚至几分钟,它们启动、上报自己的状态、完成任务然后销毁。传统的 DNS 或基于静态配置的服务发现完全不适用。
- 状态丰富性 (Rich State): 一个推理服务不仅仅是一个
IP:Port
。我们需要知道它正在提供哪个模型的哪个版本、它是否加载了特定的数据批次、它是否绑定到特定的 GPU 设备。这些元数据是动态的,并且是路由决策的关键。 - 事件驱动本质 (Event-Driven Nature): 整个 MLOps 流程是事件驱动的。例如,“训练数据准备就绪”事件触发训练作业,“模型评估通过”事件触发部署流程。我们的服务发现机制最好能与这种范式天然契合。
方案A:沿用传统KV存储方案 (Consul / Etcd)
最直接的思路是引入一个成熟的服务发现组件,比如 Consul 或 Etcd。这是业界的标准做法,方案清晰。
优势分析:
- 成熟稳定: 经过了大规模生产环境的严苛考验。
- 强一致性: 基于 Raft 协议,能保证服务注册信息的强一致性,这对于服务发现至关重要。
- 丰富生态: 提供了 HTTP API、DNS 接口,并有大量的客户端库支持。
劣势与不匹配之处:
- 引入新的技术栈: 在我们的平台中,已经决定使用 Apache Pulsar 作为核心的消息与事件总线,用于传递数据集更新、模型训练状态、推理日志等。再引入一套 Consul/Etcd 集群意味着额外的部署、运维和监控成本。在真实项目中,每增加一个中间件都需要严肃的成本评估。
- 拉模型 (Pull Model) 的局限: Consul 的健康检查是典型的拉模型。Consul Agent 会定期轮询服务的健康检查端点。对于成百上千个短暂的训练作业,这种模式会给 Consul Server 带来巨大的周期性负载,并且在作业生命周期极短的情况下,健康检查的发现延迟可能无法接受。
- 阻抗不匹配: 我们的 MLOps 平台是事件驱动的,而 Consul/Etcd 的核心是状态存储。虽然它们都提供了 Watch 机制来模拟事件通知,但这与我们系统中已经存在的 Pulsar 事件流是割裂的。这意味着服务需要同时与 Pulsar(业务事件)和 Consul(服务发现事件)交互,增加了复杂性。
方案B:利用 Apache Pulsar 自身特性统一平台
一个更大胆的想法是:我们能否利用已经作为平台核心的 Pulsar 来实现服务发现?如果可行,将极大简化整体架构。
Pulsar 不仅仅是一个消息队列,它构建在 Apache BookKeeper 之上,本质是一个持久化的、可复制的日志存储系统。这为我们提供了实现服务发现所需的“状态存储”基础。
核心思路:
利用 Pulsar 的 Compacted Topic (压缩主题) 和 TableView 特性。
- 注册 (Registration): 创建一个专门的 Pulsar Topic,例如
service-registry
。每个服务实例在启动时,向这个 Topic 发送一条包含其元数据(服务名、实例ID、地址、模型版本、GPU信息等)的消息。消息的 Key 是该服务实例的唯一标识符(如inference-service:instance-abc-123
)。 - 心跳 (Heartbeat): 服务实例周期性地向该 Topic 发送同样 Key 的消息,以更新其状态或表示自己仍然存活。
- 注销 (Deregistration): 服务实例在正常关闭前,发送一条特殊的消息,例如 Value 为空的 “墓碑消息” (Tombstone Message)。
- 发现 (Discovery): 服务消费者创建一个 Pulsar TableView 来消费
service-registry
这个 Topic。TableView 是 Pulsar 2.7.0 引入的一个强大抽象,它会在客户端内存中维护一个 Topic 的“最终状态视图”。对于每个 Key,TableView 只会保留最新的那条消息。它会自动处理 Topic Compaction 带来的消息变更,并以一个Map<Key, Value>
的形式暴露给应用。
优势分析:
- 架构统一: 无需引入额外的 Consul/Etcd 集群。服务发现与业务事件流共享同一个基础设施,降低了运维复杂度和成本。
- 天然的推模型 (Push Model): 服务实例主动上报状态。客户端通过 TableView 的监听器可以近乎实时地接收到服务上线、下线或状态变更的通知。这与 MLOps 的事件驱动特性完美契合。
- 持久化与可追溯:
service-registry
主题中的所有历史注册、心跳和注销消息都可以被持久化。这为事后审计、故障排查和分析服务生命周期提供了极大的便利,这是 Consul 难以做到的。 - 丰富的元数据: Pulsar 消息体可以承载复杂的结构化数据(如 Avro, Protobuf),非常适合 MLOps 服务丰富的元数据需求。
潜在风险与权衡:
- 非标准用法: 这并非 Pulsar 的首要设计目标。相关的最佳实践和社区文档较少,我们需要自己趟过一些坑。
- 最终一致性: TableView 的状态更新存在一定的延迟(从消息发布到客户端 TableView 更新)。虽然这个延迟通常在毫秒级,但在极端场景下,它并非像 Etcd 那样提供线性一致性的读。对于我们的 MLOps 场景,这种最终一致性是完全可以接受的。
- 客户端复杂性: 需要在客户端实现服务注册和心跳逻辑,而使用 Consul 则有现成的 Agent。
最终决策与理由
我们最终选择了方案 B。核心决策依据是架构的简约性和范式的一致性。在 MLOps 这种高度动态和事件驱动的系统中,将服务发现能力内建于核心事件总线之上,所带来的长期收益(更低的运维成本、更契合的系统模型)超过了其作为非标准用法所带来的短期风险。
核心实现概览
我们将使用 Python 来实现服务注册端 (ServiceRegistrar
) 和服务发现端 (ServiceDiscoveryClient
)。
1. 定义服务元数据 Schema
首先,定义一个清晰的 JSON Schema 来描述服务实例。在生产项目中,推荐使用 Avro 或 Protobuf 以获得更强的类型约束和性能。
{
"serviceName": "string",
"instanceId": "string",
"address": {
"host": "string",
"port": "integer"
},
"status": "enum('UP', 'DOWN', 'STARTING')",
"timestamp": "long (unix epoch millis)",
"metadata": {
"modelName": "string",
"modelVersion": "string",
"gpuType": "string",
"batchSize": "integer"
}
}
2. 服务注册与心跳 (ServiceRegistrar
)
这是一个在服务实例内部运行的组件,负责自身的注册和生命周期管理。
import pulsar
import time
import json
import uuid
import threading
import logging
import atexit
# 配置日志
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
class ServiceRegistrar:
"""
负责服务实例的注册、心跳维持和优雅注销。
"""
def __init__(self, pulsar_url: str, topic_name: str, service_info: dict, heartbeat_interval_sec: int = 15):
if not all(k in service_info for k in ['serviceName', 'address']):
raise ValueError("service_info must contain 'serviceName' and 'address'")
self.pulsar_url = pulsar_url
self.topic_name = topic_name
self.service_info = service_info
self.heartbeat_interval_sec = heartbeat_interval_sec
# 为每个实例生成唯一ID
self.instance_id = f"{self.service_info['serviceName']}-{uuid.uuid4()}"
self.service_info['instanceId'] = self.instance_id
self.client = None
self.producer = None
self.heartbeat_thread = None
self.stop_event = threading.Event()
# 确保在程序退出时执行注销逻辑
atexit.register(self.stop)
def start(self):
"""连接Pulsar并启动心跳线程"""
try:
self.client = pulsar.Client(self.pulsar_url)
self.producer = self.client.create_producer(
self.topic_name,
producer_name=f"registrar-{self.instance_id}",
# 关键:设置消息路由模式,确保相同 key 的消息进入同一分区
# 这对于 Topic Compaction 至关重要
message_routing_mode=pulsar.MessageRoutingMode.SinglePartition
)
logging.info(f"Service registrar for instance {self.instance_id} connected to Pulsar.")
# 立即注册
self._register()
# 启动后台心跳线程
self.heartbeat_thread = threading.Thread(target=self._heartbeat_loop, daemon=True)
self.heartbeat_thread.start()
logging.info(f"Heartbeat thread for {self.instance_id} started.")
except Exception as e:
logging.error(f"Failed to start ServiceRegistrar for {self.instance_id}: {e}")
self.stop()
raise
def stop(self):
"""停止心跳并发送注销消息"""
if self.stop_event.is_set():
return
logging.info(f"Stopping service registrar for {self.instance_id}...")
self.stop_event.set()
if self.heartbeat_thread and self.heartbeat_thread.is_alive():
self.heartbeat_thread.join()
try:
self._deregister()
finally:
if self.producer:
self.producer.close()
if self.client:
self.client.close()
logging.info(f"Service registrar for {self.instance_id} stopped and cleaned up.")
def update_metadata(self, new_metadata: dict):
"""允许服务在运行时更新其元数据"""
self.service_info['metadata'] = new_metadata
# 立即发送一次更新,而不是等待下一次心跳
self._send_heartbeat('UP')
logging.info(f"Metadata updated for {self.instance_id}. New metadata: {new_metadata}")
def _register(self):
"""发送初始注册消息"""
logging.info(f"Registering service instance {self.instance_id}...")
self._send_heartbeat('STARTING')
# 模拟服务启动过程
time.sleep(2)
self._send_heartbeat('UP')
logging.info(f"Service instance {self.instance_id} registered successfully with status UP.")
def _deregister(self):
"""发送注销消息 (Tombstone)"""
if not self.producer:
return
try:
# 一个常见的错误是忘记发送墓碑消息。没有它,服务下线后,其记录会永远留在TableView中直到被新的心跳覆盖。
# 发送一个 value 为 None (or empty) 的消息,Pulsar Compaction 会将其视为对该 key 的删除。
logging.info(f"Deregistering service instance {self.instance_id} by sending tombstone message.")
self.producer.send(
content=None,
partition_key=self.instance_id
)
self.producer.flush()
except Exception as e:
logging.error(f"Failed to send deregister message for {self.instance_id}: {e}")
def _heartbeat_loop(self):
"""周期性发送心跳"""
while not self.stop_event.wait(self.heartbeat_interval_sec):
self._send_heartbeat('UP')
def _send_heartbeat(self, status: str):
"""构建并发送心跳消息"""
self.service_info['status'] = status
self.service_info['timestamp'] = int(time.time() * 1000)
try:
payload = json.dumps(self.service_info).encode('utf-8')
self.producer.send_async(
content=payload,
partition_key=self.instance_id,
callback=self._send_callback
)
except Exception as e:
logging.error(f"Failed to send heartbeat for {self.instance_id}: {e}")
@staticmethod
def _send_callback(result, msg_id):
if result != pulsar.Result.Ok:
logging.warning(f"Failed to send message: {result}, msg_id: {msg_id}")
else:
logging.debug(f"Heartbeat sent successfully, msg_id: {msg_id}")
# 示例:一个模拟的推理服务
if __name__ == '__main__':
service_details = {
"serviceName": "model-inference-server",
"address": {
"host": "10.0.1.100",
"port": 8080
},
"metadata": {
"modelName": "resnet50",
"modelVersion": "v1.2.0",
"gpuType": "NVIDIA_A100",
"batchSize": 32
}
}
registrar = ServiceRegistrar(
pulsar_url='pulsar://localhost:6650',
topic_name='persistent://public/default/service-registry',
service_info=service_details,
heartbeat_interval_sec=10
)
try:
registrar.start()
# 模拟服务运行
logging.info("Service is now 'running'. Press Ctrl+C to stop.")
# 模拟运行时元数据更新
time.sleep(25)
registrar.update_metadata({
"modelName": "resnet50",
"modelVersion": "v1.2.1", # 模型版本更新
"gpuType": "NVIDIA_A100",
"batchSize": 64 # 动态调整批大小
})
while True:
time.sleep(1)
except KeyboardInterrupt:
logging.info("Shutdown signal received.")
finally:
# atexit 会自动调用 registrar.stop(),但显式调用是好习惯
registrar.stop()
3. 服务发现客户端 (ServiceDiscoveryClient
)
这个客户端供需要调用其他服务的应用使用,它维护了一个本地的服务实例缓存。
import pulsar
import json
import logging
from typing import List, Dict, Callable, Optional
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
class ServiceDiscoveryClient:
"""
使用 Pulsar TableView 发现和监控服务实例。
"""
def __init__(self, pulsar_url: str, topic_name: str):
self.pulsar_url = pulsar_url
self.topic_name = topic_name
self.client = None
self.table_view = None
self.is_started = False
def start(self):
"""连接 Pulsar 并初始化 TableView"""
if self.is_started:
return
try:
self.client = pulsar.Client(self.pulsar_url)
self.table_view = self.client.create_table_view(
self.topic_name,
schema=pulsar.schema.BytesSchema() # 以字节流接收,然后在应用层解码
)
self.is_started = True
logging.info(f"Service discovery client connected and TableView for topic '{self.topic_name}' is active.")
except Exception as e:
logging.error(f"Failed to start ServiceDiscoveryClient: {e}")
self.stop()
raise
def stop(self):
"""关闭 TableView 和客户端连接"""
if not self.is_started:
return
if self.table_view:
self.table_view.close()
if self.client:
self.client.close()
self.is_started = False
logging.info("Service discovery client stopped.")
def get_all_services(self) -> Dict[str, Dict]:
"""
获取当前所有活跃服务的快照。
返回一个字典,key是instanceId,value是服务详情。
这里的坑在于:必须对返回的值进行解码。
"""
if not self.is_started:
raise RuntimeError("Client not started. Call start() first.")
snapshot = self.table_view.items()
services = {}
for instance_id, value_bytes in snapshot:
if value_bytes: # 过滤掉墓碑消息 (value is None or empty)
try:
services[instance_id] = json.loads(value_bytes.decode('utf-8'))
except (json.JSONDecodeError, UnicodeDecodeError) as e:
logging.warning(f"Failed to decode message for instance {instance_id}: {e}")
return services
def get_service_instances(self, service_name: str) -> List[Dict]:
"""根据服务名获取所有健康 ('UP') 的实例列表"""
all_services = self.get_all_services().values()
return [
s for s in all_services
if s.get('serviceName') == service_name and s.get('status') == 'UP'
]
def listen_for_changes(self, callback: Callable[[str, Optional[Dict]], None]):
"""
注册一个回调函数来监听 TableView 的变化。
回调函数接收 (instance_id, service_data)。
如果 service_data 为 None,表示服务已注销。
注意:Pulsar Python 客户端的 TableView 尚不直接支持 add/remove listener 模式。
这里我们通过定期轮询 TableView 来模拟这个功能。
在 Java 客户端中,有更原生的 listener 支持。
这是一个当前 Python 客户端实现的局限性,需要注意。
"""
# 此处仅为示例,生产级实现需要更健壮的轮询或等待 Java Client 的功能对齐
# 或者使用 Reader API 来实现更底层的事件监听。
# 但 TableView 的核心优势在于状态压缩,所以我们接受这个权衡。
logging.warning("Python client TableView listener is simulated via polling.")
known_state = {}
while self.is_started:
current_state = {
k: json.loads(v.decode('utf-8')) if v else None
for k, v in self.table_view.items()
}
# 检查新增或更新
for instance_id, data in current_state.items():
if instance_id not in known_state or known_state[instance_id] != data:
callback(instance_id, data)
# 检查删除
for instance_id in list(known_state.keys()):
if instance_id not in current_state:
callback(instance_id, None)
known_state = current_state
time.sleep(5)
# 示例:一个需要发现推理服务的客户端应用
if __name__ == '__main__':
discovery_client = ServiceDiscoveryClient(
pulsar_url='pulsar://localhost:6650',
topic_name='persistent://public/default/service-registry'
)
try:
discovery_client.start()
# 轮询查询服务
for i in range(5):
logging.info(f"--- Polling services (Attempt {i+1}) ---")
inference_services = discovery_client.get_service_instances('model-inference-server')
if inference_services:
logging.info(f"Found {len(inference_services)} active inference instances:")
for service in inference_services:
logging.info(f" - Instance: {service['instanceId']}, "
f"Address: {service['address']['host']}:{service['address']['port']}, "
f"Model: {service['metadata']['modelVersion']}")
else:
logging.info("No active inference instances found.")
time.sleep(10)
except KeyboardInterrupt:
pass
finally:
discovery_client.stop()
架构的扩展性与局限性
这个基于 Pulsar 的服务发现机制,虽然巧妙,但并非万能。
扩展性:
- 客户端负载均衡: 服务发现客户端获取到所有健康实例列表后,可以轻松实现各种客户端负载均衡策略,如轮询、随机或基于元数据(例如,优先路由到处理小批次的实例)。
- 服务治理:
service-registry
Topic 可以被其他治理平台消费,用于生成全局服务拓扑、监控服务健康状况、进行容量分析等。 - 多区域部署: 利用 Pulsar 的 Geo-Replication 特性,可以将
service-registry
Topic 在多个数据中心之间同步,从而实现跨区域的服务发现,这是构建多活 MLOps 平台的基础。
局限性与适用边界:
- Pulsar 依赖: 整个方案强依赖于 Pulsar 集群的健康。Pulsar 的故障会直接导致服务发现失效。这要求我们必须具备强大的 Pulsar 运维能力。
- 最终一致性的影响: 对于需要强一致性读的场景(例如,分布式锁的实现),这个方案并不适用。但在服务发现领域,秒级的最终一致性延迟通常是可以接受的。
- Topic Compaction 配置:
service-registry
Topic 的压缩策略需要精心配置。如果压缩触发不频繁,Topic 的积压会很大;如果过于频繁,又会增加 Bookie 的负载。此外,还需要配置消息保留策略和 TTL,以自动清理那些未能优雅下线的“僵尸”服务实例留下的记录。 - 客户端库的成熟度: 如上所示,Python 客户端的 TableView 功能相较于 Java 客户端还有待完善。在真实项目中,可能需要评估是否需要自行封装或使用更底层的 Reader API 来获得更精细的控制。
这个方案本质上是用 Pulsar 的日志抽象能力,模拟了一个带历史追溯功能的 KV 存储。它在 MLOps 这种特定领域的表现优于传统方案,因为它完美地拥抱了事件驱动的本质,并简化了技术栈。但它也清晰地界定了自己的边界,提醒我们在技术选型时,没有银弹,只有最适合特定场景的权衡。