利用 Solid.js 与 WebSockets 构建 Nomad 实时监控面板并以 ESLint 插件保障架构一致性


调试一个在 Nomad 集群中频繁失败和重启的 alloc 是一件棘手的事。虽然 Nomad 的 UI 提供了任务状态的最终快照,但它并不擅长展示事件的实时流。nomad event stream 命令在终端中非常有用,但信息是短暂的,难以共享和协作分析。我们需要一个持久化、可访问的Web界面,实时展示集群中发生的每一个事件——特别是 AllocationUpdatedJobRegistration 这类关键事件,以便快速定位问题根源。

这个需求催生了一个内部工具的构建:一个自托管的、实时的 Nomad 事件流监控面板。

初步构想与技术栈抉择

整个系统的架构思路很直接:一个后端服务订阅 Nomad 的 HTTP 事件流 API,处理这些事件,然后通过 WebSockets 将其广播给所有连接的前端客户端。前端则是一个纯粹的展示层,负责高效地渲染高频更新的数据流。

graph TD
    subgraph Nomad Cluster
        NomadAPI["Nomad Server API (/v1/event/stream)"]
    end

    subgraph "Nomad Job: 'nomad-monitor'"
        A[Backend: Go Service] -- "HTTP Long Polling" --> NomadAPI
        A -- "Processes & Filters Events" --> A
        A -- "Broadcasts via WebSocket" --> B[Frontend: Solid.js App]
    end

    C[Developer/SRE] -- "Views in Browser" --> B

技术选型决策如下:

  1. 后端服务: 选择 Go。它的并发模型(goroutines 和 channels)非常适合处理 I/O 密集型任务,如管理多个 WebSocket 连接和处理一个持续的 HTTP 流。标准库也足够强大,无需庞大的框架。
  2. 前端框架: Solid.js。对于一个需要渲染高频数据流的界面,Solid.js 的细粒度响应式模型是理想选择。它没有虚拟 DOM 的开销,状态更新直接作用于相关的 DOM 节点,这在处理每秒可能数十条新事件的场景下,性能优势极为明显。
  3. 实时通信: WebSockets。这是实现服务器向客户端实时推送数据的标准方案,相比轮询,它延迟更低,资源消耗也更少。
  4. 部署: Nomad。最直接的方式就是将这个监控工具本身作为 Nomad Job 部署在被监控的集群上,实现“自监控”。这要求我们编写一个包含后端服务和前端静态文件服务的 Nomad 作业文件。
  5. 代码质量与架构约束: ESLint。这不仅仅是用于代码风格检查。在本项目中,我们将利用其 AST(抽象语法树)能力编写一个自定义插件,以在代码层面强制执行前端架构的最佳实践,防止潜在的性能问题和维护噩梦。

后端实现:Go 事件代理与 WebSocket Hub

后端的职责是单一且明确的:连接 Nomad,管理客户端,广播消息。

1. 主程序入口与配置

我们通过环境变量来配置 Nomad API 地址和服务的监听端口,这是云原生应用的基本实践。

// main.go
package main

import (
	"log"
	"net/http"
	"os"
	"time"
)

func main() {
	// 从环境变量获取配置,提供默认值
	nomadAddr := getEnv("NOMAD_ADDR", "http://localhost:4646")
	listenAddr := getEnv("LISTEN_ADDR", ":8080")
	log.Println("Nomad Event Stream Proxy")
	log.Printf("Connecting to Nomad at: %s", nomadAddr)
	log.Printf("Listening on: %s", listenAddr)

	// 初始化 WebSocket hub
	hub := newHub()
	go hub.run()

	// 启动 Nomad 事件流监听器
	eventStream := NewNomadEventStream(nomadAddr)
	go eventStream.Start(hub.broadcast)

	// 设置 WebSocket 路由
	http.HandleFunc("/ws", func(w http.ResponseWriter, r *http.Request) {
		serveWs(hub, w, r)
	})

	// 启动 HTTP 服务器
	err := http.ListenAndServe(listenAddr, nil)
	if err != nil {
		log.Fatalf("Failed to start server: %v", err)
	}
}

func getEnv(key, fallback string) string {
	if value, ok := os.LookupEnv(key); ok {
		return value
	}
	return fallback
}

2. 连接并处理 Nomad 事件流

这部分代码负责与 Nomad 的 /v1/event/stream 端点建立一个长连接。Nomad 会以 chunked 方式持续发送 JSON 事件。我们需要一个健壮的循环来读取、解析并处理这些事件,同时包含断线重连逻辑。

// nomad_stream.go
package main

import (
	"bufio"
	"encoding/json"
	"log"
	"net/http"
	"time"
)

// 定义我们关心的事件结构子集
type NomadEvent struct {
	Topic string          `json:"Topic"`
	Type  string          `json:"Type"`
	Payload json.RawMessage `json:"Payload"`
}

type NomadEventStream struct {
	addr      string
	client    *http.Client
}

func NewNomadEventStream(addr string) *NomadEventStream {
	return &NomadEventStream{
		addr: addr,
		client: &http.Client{
			Timeout: 0, // No timeout for long polling
		},
	}
}

// Start 方法持续监听事件流,并在失败时重试
func (s *NomadEventStream) Start(handler func([]byte)) {
	eventURL := s.addr + "/v1/event/stream"
	
	for {
		req, err := http.NewRequest("GET", eventURL, nil)
		if err != nil {
			log.Printf("Error creating request: %v. Retrying in 5s.", err)
			time.Sleep(5 * time.Second)
			continue
		}
		// 可以通过 Topic 查询参数过滤事件,这里我们获取所有事件
		// q := req.URL.Query()
		// q.Add("topic", "Allocation")
		// req.URL.RawQuery = q.Encode()

		resp, err := s.client.Do(req)
		if err != nil {
			log.Printf("Error connecting to Nomad event stream: %v. Retrying in 5s.", err)
			time.Sleep(5 * time.Second)
			continue
		}

		log.Println("Successfully connected to Nomad event stream.")
		reader := bufio.NewReader(resp.Body)

		for {
			line, err := reader.ReadBytes('\n')
			if err != nil {
				log.Printf("Error reading from stream: %v. Reconnecting...", err)
				resp.Body.Close()
				break // 跳出内层循环以重连
			}

            // Nomad 事件流有时会发送空行作为心跳
			if len(line) < 2 { 
				continue
			}

			// 我们只关心事件,直接将原始 JSON 字节转发给 hub
			// 避免在后端进行不必要的反序列化和序列化
			handler(line)
		}
	}
}

这里的关键点在于 handler(line)。我们直接将从 Nomad API 收到的原始 JSON 字节数组传递给 WebSocket Hub,由 Hub 负责广播。这避免了在后端进行json.Unmarshaljson.Marshal的往返开销,将解析工作完全交给了前端。

3. WebSocket Hub 实现

Hub 的设计是经典的并发模式:使用 channels 来安全地处理客户端的注册、注销和消息广播,避免了使用互斥锁。

// hub.go
package main

import "log"

// Hub 维护一组活跃的客户端并向它们广播消息
type Hub struct {
	clients    map[*Client]bool
	broadcast  chan []byte
	register   chan *Client
	unregister chan *Client
}

func newHub() *Hub {
	return &Hub{
		broadcast:  make(chan []byte),
		register:   make(chan *Client),
		unregister: make(chan *Client),
		clients:    make(map[*Client]bool),
	}
}

func (h *Hub) run() {
	for {
		select {
		case client := <-h.register:
			h.clients[client] = true
			log.Printf("Client connected. Total clients: %d", len(h.clients))
		case client := <-h.unregister:
			if _, ok := h.clients[client]; ok {
				delete(h.clients, client)
				close(client.send)
				log.Printf("Client disconnected. Total clients: %d", len(h.clients))
			}
		case message := <-h.broadcast:
			for client := range h.clients {
				select {
				case client.send <- message:
				default:
					// 如果客户端的发送缓冲区已满,则关闭连接
					close(client.send)
					delete(h.clients, client)
				}
			}
		}
	}
}

Client 结构体和相关的读写 goroutine 代码遵循了 gorilla/websocket 的标准示例,这里不再赘述。

部署:Nomad 作业文件

现在,我们需要将 Go 后端和编译后的 Solid.js 前端打包成一个 Nomad 作业。我们使用 artifact stanza 下载预编译的 Go 二进制文件和前端静态资源,并使用 execcaddy 驱动来运行它们。

// nomad-monitor.nomad.hcl
job "nomad-monitor" {
  datacenters = ["dc1"]
  type        = "service"

  group "monitor" {
    count = 1

    network {
      port "http" {
        to = 8080
      }
      port "caddy" {
        to = 80
      }
    }

    service {
      name     = "nomad-monitor-backend"
      tags     = ["ws", "api"]
      port     = "http"
      
      check {
        type     = "tcp"
        interval = "10s"
        timeout  = "2s"
      }
    }

    service {
      name = "nomad-monitor-ui"
      tags = ["http", "ui"]
      port = "caddy"
    }

    # 后端 Go 服务
    task "backend" {
      driver = "exec"
      
      // 在真实项目中,这里应指向 CI/CD 构建产物的 URL
      artifact {
        source      = "https://example.com/artifacts/nomad-monitor-backend"
        destination = "local/"
        options {
            checksum = "sha256:..."
        }
      }

      config {
        command = "local/nomad-monitor-backend"
      }

      env {
        // Nomad 会自动注入 NOMAD_ADDR
        // LISTEN_ADDR = ":${NOMAD_PORT_http}"
      }

      resources {
        cpu    = 100 # MHz
        memory = 64  # MB
      }
    }

    # 前端静态文件服务
    task "frontend" {
      driver = "docker"

      config {
        image = "caddy:2-alpine"
        ports = ["caddy"]
        
        // 将 Caddyfile 和静态资源挂载到容器中
        volumes = [
          "local/Caddyfile:/etc/caddy/Caddyfile",
          "local/dist:/var/www/html"
        ]
      }
      
      artifact {
        source      = "https://example.com/artifacts/frontend-dist.tar.gz"
        destination = "local/"
      }

      // Caddyfile 用于配置反向代理,将 /ws 请求转发给后端
      template {
        data = <<EOH
:80 {
    root * /var/www/html
    file_server

    reverse_proxy /ws* http://${NOMAD_ADDR_nomad_monitor_backend_http} {
        header_up Host {http.request.host}
        header_up X-Real-IP {http.request.remote}
        header_up X-Forwarded-For {http.request.remote}
        header_up X-Forwarded-Proto {http.request.scheme}
    }
}
EOH
        destination = "local/Caddyfile"
        change_mode = "signal"
        change_signal = "SIGHUP"
      }

      resources {
        cpu    = 50
        memory = 32
      }
    }
  }
}

这个作业文件是整个系统的粘合剂。它定义了两个任务如何协同工作,并通过 template 功能动态生成 Caddy 的配置,将 WebSocket 请求 /ws 代理到后端 Go 服务。这是在 Nomad 中部署多服务应用的典型模式。

前端实现:Solid.js 的高效渲染

前端的核心是创建一个可复用的 primitive 来管理 WebSocket 连接和状态。

1. createWebSocketStore Primitive

这个自定义 primitive 封装了 WebSocket 的连接、消息接收和重连逻辑,并对外暴露一个 Solid.js 的 store

// src/lib/createWebSocketStore.ts
import { createStore, SetStoreFunction } from "solid-js/store";
import { onCleanup } from "solid-js";

// 定义我们关心的事件负载结构
export interface Allocation {
  ID: string;
  Name: string;
  JobID: string;
  ClientStatus: string;
  DesiredStatus: string;
  NodeName: string;
}

export interface NomadEventPayload {
  Allocation?: Allocation;
  // ...可以添加其他事件类型的负载定义
}

export interface NomadEvent {
  Topic: string;
  Type: string;
  Payload: NomadEventPayload;
  Timestamp: number; // 我们在前端添加时间戳
}

const WS_URL = `ws://${window.location.host}/ws`;

// 这是一个关键的架构决策:不直接导出 ws 实例
// 而是导出 store 和操作函数,强制消费方通过响应式系统交互
let ws: WebSocket | null = null;
const [events, setEvents] = createStore<NomadEvent[]>([]);

const connect = () => {
  ws = new WebSocket(WS_URL);

  ws.onopen = () => {
    console.log("WebSocket connection established.");
  };

  ws.onmessage = (event) => {
    try {
      const rawEvent = JSON.parse(event.data);
      // 在这里做一些基本的转换和数据清理
      const newEvent: NomadEvent = {
        ...rawEvent,
        Timestamp: Date.now(),
      };
      
      // 在 store 数组的开头插入新事件
      setEvents((prev) => [newEvent, ...prev.slice(0, 199)]); // 只保留最新的200条
    } catch (error) {
      console.error("Failed to parse WebSocket message:", error);
    }
  };

  ws.onclose = () => {
    console.warn("WebSocket connection closed. Reconnecting in 3s...");
    ws = null;
    setTimeout(connect, 3000);
  };

  ws.onerror = (error) => {
    console.error("WebSocket error:", error);
    ws?.close();
  };
};

connect();

export const useEventStore = () => events;

// 测试用的 mock 函数
// export const __test__sendMockEvent = (event: NomadEvent) => {
//   setEvents((prev) => [event, ...prev]);
// };

2. UI 组件

UI 组件通过 useEventStore 获取响应式数据,并使用 Solid 的 <For> 组件进行渲染。Solid 的编译器会将其优化为高效的 DOM 操作,只会为新增的事件创建新的 DOM 元素,而不会重新渲染整个列表。

// src/components/EventStream.tsx
import { For } from "solid-js";
import { useEventStore } from "../lib/createWebSocketStore";
import EventCard from "./EventCard";

const EventStream = () => {
  const events = useEventStore();

  return (
    <div class="p-4 space-y-3">
      <h1 class="text-2xl font-bold">Nomad Real-time Event Stream</h1>
      <div class="border rounded-md p-2 h-[80vh] overflow-y-auto">
        <For each={events}>
          {(event, i) => <EventCard event={event} />}
        </For>
      </div>
    </div>
  );
};

export default EventStream;

项目进行到这里,一切看起来都很好。但随着团队成员的加入,一个架构问题浮现了。

架构之痛与 ESLint 插件的介入

一位新同事为了实现一个“暂停/继续”接收事件的功能,直接在 UI 组件中导入了 createWebSocketStore.ts 文件中的 ws 实例,并调用 ws.close()new WebSocket()。这破坏了 createWebSocketStore 的封装,导致了多个 WebSocket 连接和状态不一致的 bug。

这是一个典型的架构问题:我们希望模块内部的状态(如ws实例)是私有的,只通过受控的、响应式的接口(useEventStore)对外暴露。TypeScript 的 private 关键字无法阻止模块间的导入。此时,我们需要一个更强大的工具来捍卫架构边界——自定义 ESLint 规则。

我们的目标是创建一个规则,禁止任何模块从 createWebSocketStore.ts 中导入除 useEventStore 之外的任何变量

1. 编写 ESLint 规则

ESLint 规则本质上是一个访问 AST(抽象语法树)节点的函数。我们需要访问 ImportDeclaration 节点,检查其来源是否是我们的目标文件,然后检查导入的 specifiers 是否合法。

// .eslint/rules/enforce-event-store-boundary.js

'use strict';

const path = require('path');

module.exports = {
  meta: {
    type: 'problem',
    docs: {
      description: 'Enforce architecture boundary for the event store module.',
      category: 'Best Practices',
      recommended: true,
    },
    fixable: null,
    schema: [],
    messages: {
      illegalImport: "Do not import '{{identifier}}' directly from event store. Use the exported primitive 'useEventStore' instead.",
    },
  },

  create(context) {
    const restrictedPath = 'src/lib/createWebSocketStore.ts';
    const allowedImport = 'useEventStore';

    return {
      // 访问 AST 中的 'ImportDeclaration' 节点
      ImportDeclaration(node) {
        // context.getFilename() 在 v9+ 中被废弃, 实际项目中需要适配
        // 但这里为了演示核心逻辑
        const fromFilename = context.getFilename();
        const importSource = node.source.value;

        // 解析导入来源的绝对路径
        const resolvedImportPath = path.resolve(path.dirname(fromFilename), importSource);
        const targetPath = path.resolve(process.cwd(), restrictedPath);
        
        // 如果文件扩展名不匹配,则尝试添加
        const targetPathWithJs = targetPath.replace('.ts', '.js');

        const isRestricted = resolvedImportPath === targetPath || resolvedImportPath === targetPathWithJs;

        if (isRestricted) {
          node.specifiers.forEach(specifier => {
            if (specifier.type === 'ImportSpecifier' && specifier.imported.name !== allowedImport) {
              context.report({
                node: specifier,
                messageId: 'illegalImport',
                data: {
                  identifier: specifier.imported.name,
                },
              });
            }
          });
        }
      },
    };
  },
};

2. 集成规则

首先,我们需要一个 ESLint 插件来加载我们的规则。

// .eslint/plugins/custom.js
'use strict';

module.exports = {
  rules: {
    'enforce-event-store-boundary': require('./rules/enforce-event-store-boundary'),
  },
};

然后,在 .eslintrc.js 中启用这个插件和规则。

// .eslintrc.js
module.exports = {
  // ... other configs (parser, extends, etc.)
  plugins: [
    'solid', 
    'custom' // 添加我们的自定义插件
  ],
  rules: {
    // ... other rules
    'custom/enforce-event-store-boundary': 'error' // 将规则设为错误级别
  },
  // 告知 ESLint 去哪里找我们的插件
  settings: {
    'eslint-plugin-resolver': {
      paths: [__dirname],
      patterns: ['.eslint/plugins/*.js'],
    },
  },
};

配置完成后,任何试图从 createWebSocketStore.ts 导入非法变量的代码,都会在开发阶段被 ESLint 立即标记为错误,从而在代码提交前就阻止了对架构的破坏。这比代码审查更可靠、更即时。

当前方案的局限性与未来展望

这个监控面板已经能在我们的日常工作中发挥巨大作用,但它远非完美。

  1. 单点故障: 后端 Go 服务是单实例的。如果它崩溃,所有客户端都会断开连接。未来的版本可以考虑部署多个实例,并通过 Consul 等服务发现机制让前端随机连接,或者在后端之间引入 NATS 这样的消息队列来分发事件,实现高可用。
  2. 前端性能边界: 虽然 Solid.js 非常高效,但如果事件流的速率达到每秒数百条,并且不做任何聚合或节流,浏览器最终还是会达到性能极限。可以引入虚拟滚动,或者根据事件类型进行聚合显示,以应对极端情况。
  3. 信息密度与可操作性: 目前它只是一个事件的瀑布流。一个更成熟的工具需要提供搜索、过滤、按 Job ID 或 Alloc ID 聚合视图等功能。这需要对前端状态管理和后端能力进行扩展,比如后端可以提供一个查询历史事件的 API。

尽管存在这些局限,但这个项目成功地将 Solid.js 的响应式能力、WebSocket 的实时性以及 Nomad 的部署能力结合在一起,解决了一个真实的工程痛点。而自定义 ESLint 规则的引入,更是将项目从一个单纯的“应用”提升到了一个具有健壮架构保障的“工程”层面。它证明了,在现代 Web 开发中,工具链的定制化能力和架构思维同样是项目的核心竞争力。


  目录