采用 gRPC 桥接 Dart 与 Quarkus 实现面向 ScyllaDB 的高吞吐实时数据架构


我们面临的技术痛点非常明确:一个物联网平台需要处理来自数百万设备的实时遥测数据流。核心指标要求写入路径的 p99 延迟必须低于10毫秒,同时,一个面向运营人员的监控仪表盘需要以近乎实时的方式展示设备状态的聚合视图。单一技术栈难以同时满足这两个看似矛盾的需求——极致的写入性能与高效的实时推送能力。

初步的技术构想很快就排除了单体应用。数据摄入和处理是典型的CPU与IO密集型任务,而实时推送则需要管理海量的长连接,这是IO密集但对计算资源要求不高的场景。将两者强行耦合在一起,只会导致资源争抢和运维上的噩梦。因此,一个异构的、职责分离的微服务架构成为必然选择。

技术选型决策过程充满了权衡。

对于数据存储层,ScyllaDB 成为首选。它的 Seastar 框架和 shard-per-core 架构能最大化利用硬件资源,避免了传统 JVM 数据库(如 Cassandra)在GC上的性能抖动,这对于我们严苛的低延迟写入要求至关重要。

对于数据摄入与处理服务,Quarkus 框架脱颖而出。它基于 GraalVM 的 AOT 编译能力可以生成启动飞快、内存占用极低的本地可执行文件。更重要的是,Quarkus 拥抱响应式编程模型,其底层的 Vert.x 事件循环与 ScyllaDB 的异步驱动能完美结合,构建出一条从网络IO到数据库IO完全无阻塞的处理链路。Java 生态的成熟度也为我们提供了稳定可靠的 ScyllaDB 驱动和各类库。

最具争议的是实时推送网关。最初团队考虑过使用 Node.js 或 Go,但最终我们选择了一个不那么主流的方案:Dart Server。原因有三:首先,前端监控仪表盘已经确定使用 Flutter Web,服务端使用 Dart 可以复用数据模型(DTOs),减少跨语言维护成本;其次,Dart 的单线程事件循环模型与 isolates 机制非常适合管理大量 WebSocket 长连接,其资源开销远低于基于线程模型的传统框架;最后,Dart 社区提供了稳定高效的 gRPC 和 WebSocket 库。

这两个异构服务之间的通信,我们选择了 gRPC。基于 Protobuf 的二进制序列化性能远超 JSON,而 HTTP/2 的多路复用特性也减少了连接开销,为内部服务间的高频调用提供了坚实基础。

最终的架构图如下:

graph TD
    subgraph Clients
        A[IoT Devices] -->|MQTT/TCP| B(Ingestion Endpoint);
        C[Flutter Web Dashboard] <-->|WebSocket| D[Dart Real-time Gateway];
    end

    subgraph Backend Infrastructure
        B --> E{Quarkus Data Service};
        D -- gRPC --> E;
        E -- CQL --> F[(ScyllaDB Cluster)];
    end

    style F fill:#f9f,stroke:#333,stroke-width:2px

第一步:定义通信契约 - Protobuf

所有协作都始于一份清晰的契约。我们定义了一个 telemetry.proto 文件来规范 Dart 网关和 Quarkus 服务之间的数据交换。

// a/src/main/proto/telemetry.proto
syntax = "proto3";

option java_multiple_files = true;
option java_package = "com.example.telemetry.grpc";
option java_outer_classname = "TelemetryProto";

package telemetry;

import "google/protobuf/timestamp.proto";
import "google/protobuf/empty.proto";

// 遥测服务定义
service TelemetryService {
  // 上报单条遥测数据
  rpc Report (TelemetryRequest) returns (google.protobuf.Empty) {}

  // 订阅指定设备的实时状态流
  // 客户端发起一次请求,服务端会持续推送状态更新
  rpc SubscribeDeviceStatus (DeviceSubscriptionRequest) returns (stream DeviceStatusResponse) {}
}

// 遥测数据上报请求
message TelemetryRequest {
  string device_id = 1;
  double temperature = 2;
  double humidity = 3;
  int64 battery_level = 4;
  google.protobuf.Timestamp timestamp = 5;
}

// 设备状态订阅请求
message DeviceSubscriptionRequest {
  string device_id = 1;
}

// 设备实时状态响应
message DeviceStatusResponse {
  string device_id = 1;
  enum Status {
    UNKNOWN = 0;
    ONLINE = 1;
    OFFLINE = 2;
    WARNING = 3;
  }
  Status status = 2;
  double last_temperature = 3;
  google.protobuf.Timestamp last_seen = 4;
}

这份契约定义了两个核心 RPC 方法:Report 用于数据写入,SubscribeDeviceStatus 是一个服务端流式 RPC,用于实时状态推送。这种设计将写入和读取路径在接口层面就分离开。

第二步:构建坚实的后盾 - Quarkus 数据服务

Quarkus 服务是整个系统的引擎,负责与 ScyllaDB 高效交互。

项目依赖与配置

pom.xml 中必须包含 Quarkus gRPC 服务端、ScyllaDB Java 驱动和 Mutiny 响应式库。

<!-- pom.xml -->
<dependencies>
    <dependency>
        <groupId>io.quarkus</groupId>
        <artifactId>quarkus-grpc</artifactId>
    </dependency>
    <dependency>
        <groupId>com.datastax.oss.quarkus</groupId>
        <artifactId>cassandra-quarkus-client</artifactId>
        <version>1.2.0</version> <!-- 请使用最新版本 -->
    </dependency>
    <dependency>
        <groupId>io.quarkus</groupId>
        <artifactId>quarkus-mutiny</artifactId>
    </dependency>
    <!-- for testing -->
    <dependency>
        <groupId>io.quarkus</groupId>
        <artifactId>quarkus-junit5</artifactId>
        <scope>test</scope>
    </dependency>
</dependencies>

application.properties 配置是连接数据库的关键。在真实项目中,这些配置应当外部化管理。

# application.properties

# gRPC Server configuration
quarkus.grpc.server.port=9001
quarkus.grpc.server.enable-reflection=true # for development and debugging

# ScyllaDB/Cassandra Driver configuration
quarkus.cassandra.contact-points=scylla-node1:9042,scylla-node2:9042
quarkus.cassandra.local-datacenter=dc1
quarkus.cassandra.keyspace=iot_platform
quarkus.cassandra.request.consistency=LOCAL_QUORUM

# Fine-tuning the driver for performance
quarkus.cassandra.advanced.control-connection.contact-points=scylla-node1:9042
quarkus.cassandra.advanced.protocol.version=V4
quarkus.cassandra.advanced.pooling.local.connections-per-host=8
quarkus.cassandra.advanced.socket.connect-timeout=5s

这里的配置细节很关键。connections-per-host 需要根据业务负载和 ScyllaDB 节点核心数进行压测调优。LOCAL_QUORUM 提供了写入路径上的强一致性保障。

ScyllaDB 数据模型

数据模型的设计直接影响性能。我们为遥测数据设计了一个时间序列模型,使用设备ID作为分区键,时间戳作为聚类键,确保同一设备的数据在物理上存储在一起,并按时间排序。

// Schema definition in CQL
CREATE KEYSPACE IF NOT EXISTS iot_platform
WITH replication = {'class': 'NetworkTopologyStrategy', 'dc1': 3};

USE iot_platform;

CREATE TABLE IF NOT EXISTS telemetry_data (
    device_id text,
    event_time timestamp,
    temperature double,
    humidity double,
    battery_level bigint,
    PRIMARY KEY ((device_id), event_time)
) WITH CLUSTERING ORDER BY (event_time DESC);

CREATE TABLE IF NOT EXISTS device_status (
    device_id text PRIMARY KEY,
    last_seen timestamp,
    last_temperature double,
    status text // e.g., 'ONLINE', 'WARNING'
);

device_status 表是一个典型的实体视图,用于快速查询设备的最新状态,避免了对 telemetry_data 表进行全分区扫描。

数据访问层 (DAO)

DAO 封装了所有与 ScyllaDB 的异步交互。使用 CqlSession 的异步方法和 Uni 是实现非阻塞的关键。

// src/main/java/com/example/telemetry/data/TelemetryDao.java
package com.example.telemetry.data;

import com.datastax.oss.driver.api.core.CqlSession;
import com.datastax.oss.driver.api.core.cql.BoundStatement;
import com.datastax.oss.driver.api.core.cql.PreparedStatement;
import com.example.telemetry.grpc.TelemetryRequest;
import io.smallrye.mutiny.Uni;
import jakarta.enterprise.context.ApplicationScoped;
import jakarta.inject.Inject;
import java.time.Instant;

@ApplicationScoped
public class TelemetryDao {

    @Inject
    CqlSession session;

    private PreparedStatement insertTelemetryPs;
    private PreparedStatement updateStatusPs;

    // A common pattern is to prepare statements at initialization
    // This is more efficient than preparing them on each request
    public Uni<Void> initialize() {
        return Uni.createFrom().completionStage(session.prepareAsync("INSERT INTO telemetry_data (device_id, event_time, temperature, humidity, battery_level) VALUES (?, ?, ?, ?, ?)"))
                .onItem().invoke(ps -> this.insertTelemetryPs = ps)
                .chain(() -> Uni.createFrom().completionStage(session.prepareAsync("UPDATE device_status SET last_seen = ?, last_temperature = ?, status = ? WHERE device_id = ?")))
                .onItem().invoke(ps -> this.updateStatusPs = ps)
                .replaceWithVoid();
    }

    public Uni<Void> saveTelemetry(TelemetryRequest request) {
        Instant eventTime = Instant.ofEpochSecond(
            request.getTimestamp().getSeconds(),
            request.getTimestamp().getNanos()
        );

        // A real implementation would run these two queries as a BATCH for atomicity.
        // For simplicity, we execute them sequentially here.
        BoundStatement telemetryBs = insertTelemetryPs.bind(
                request.getDeviceId(),
                eventTime,
                request.getTemperature(),
                request.getHumidity(),
                request.getBatteryLevel()
        );
        
        String status = determineStatus(request); // Business logic to determine status
        BoundStatement statusBs = updateStatusPs.bind(
                eventTime,
                request.getTemperature(),
                status,
                request.getDeviceId()
        );

        // Chain the async executions
        return Uni.createFrom().completionStage(session.executeAsync(telemetryBs))
                .chain(() -> Uni.createFrom().completionStage(session.executeAsync(statusBs)))
                .replaceWithVoid();
    }
    
    private String determineStatus(TelemetryRequest request) {
        if (request.getTemperature() > 85.0) {
            return "WARNING";
        }
        return "ONLINE";
    }
}

gRPC 服务实现

TelemetryServiceImpl 实现了 .proto 文件中定义的服务。report 方法是单向写入,而 subscribeDeviceStatus 则复杂得多,它需要一种机制来将数据库的变更推送给流订阅者。在真实项目中,这通常会通过一个消息总线(如 Kafka)实现。为了简化示例,我们使用一个内存中的 Multi 来模拟这个事件流。

// src/main/java/com/example/telemetry/service/TelemetryServiceImpl.java
package com.example.telemetry.service;

import com.example.telemetry.data.TelemetryDao;
import com.example.telemetry.grpc.*;
import com.google.protobuf.Empty;
import com.google.protobuf.Timestamp;
import io.quarkus.grpc.GrpcService;
import io.smallrye.mutiny.Multi;
import io.smallrye.mutiny.Uni;
import jakarta.inject.Inject;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.time.Duration;
import java.time.Instant;

@GrpcService
public class TelemetryServiceImpl implements TelemetryService {

    private static final Logger LOGGER = LoggerFactory.getLogger(TelemetryServiceImpl.class);

    @Inject
    TelemetryDao telemetryDao;

    @Override
    public Uni<Empty> report(TelemetryRequest request) {
        // Logging with context is crucial for observability
        LOGGER.info("Received telemetry report for device: {}", request.getDeviceId());
        return telemetryDao.saveTelemetry(request)
                .onFailure().invoke(e -> LOGGER.error("Failed to save telemetry for device: " + request.getDeviceId(), e))
                .replaceWith(Empty.newBuilder().build());
    }

    @Override
    public Multi<DeviceStatusResponse> subscribeDeviceStatus(DeviceSubscriptionRequest request) {
        String deviceId = request.getDeviceId();
        LOGGER.info("New subscription for device status: {}", deviceId);

        // In a real-world scenario, this stream would be fed by a message broker (e.g., Kafka)
        // or a Change Data Capture (CDC) stream from the database.
        // Here, we simulate it with a ticking clock for demonstration purposes.
        return Multi.createFrom().ticks().every(Duration.ofSeconds(2))
                .onOverflow().drop() // Backpressure strategy
                .map(tick -> {
                    // In a real app, you would fetch the latest status from the 'device_status' table.
                    LOGGER.debug("Pushing simulated status for device: {}", deviceId);
                    return DeviceStatusResponse.newBuilder()
                            .setDeviceId(deviceId)
                            .setStatus(DeviceStatusResponse.Status.ONLINE)
                            .setLastTemperature(25.5 + (Math.random() * 2))
                            .setLastSeen(Timestamp.newBuilder().setSeconds(Instant.now().getEpochSecond()).build())
                            .build();
                })
                .onCancellation().invoke(() -> LOGGER.info("Subscription cancelled for device: {}", deviceId));
    }
}

第三步:构建轻量级前哨 - Dart 实时网关

Dart 服务是连接前端和后端的桥梁。它极其轻量,只做两件事:管理 WebSocket 连接和代理 gRPC 请求。

项目依赖

pubspec.yaml 文件定义了项目依赖。

# pubspec.yaml
name: real_time_gateway
description: A WebSocket to gRPC gateway.
version: 1.0.0
environment:
  sdk: '>=3.0.0 <4.0.0'

dependencies:
  grpc: ^3.2.4
  protobuf: ^3.1.0
  shelf: ^1.4.1
  shelf_web_socket: ^1.0.4
  web_socket_channel: ^2.4.0

dev_dependencies:
  lints: ^2.1.0
  test: ^1.24.0
  # Tool for generating proto files
  grpc_tools: ^3.2.4

首先,需要使用 protocprotoc-gen-dart 工具从 .proto 文件生成 Dart 代码。

gRPC 客户端封装

创建一个服务类来封装与 Quarkus 后端的 gRPC 通信细节,使其易于在 WebSocket 处理器中使用。

// lib/grpc/telemetry_client.dart
import 'package:grpc/grpc.dart';
import 'generated/telemetry.pbgrpc.dart';

class TelemetryGrpcClient {
  late TelemetryServiceClient _client;
  late ClientChannel _channel;

  TelemetryGrpcClient() {
    // Configuration should be loaded from an external file/env var
    _channel = ClientChannel(
      'localhost', // Quarkus service host
      port: 9001,   // Quarkus service port
      options: const ChannelOptions(
        credentials: ChannelCredentials.insecure(),
        // Add keep-alive options for production
        // keepAlive: ClientKeepAliveOptions(...)
      ),
    );
    _client = TelemetryServiceClient(_channel);
    print('gRPC client connected to localhost:9001');
  }

  Stream<DeviceStatusResponse> subscribeToDeviceStatus(String deviceId) {
    final request = DeviceSubscriptionRequest()..deviceId = deviceId;
    // This returns a stream of responses directly from the gRPC call
    return _client.subscribeDeviceStatus(request);
  }

  Future<void> shutdown() async {
    await _channel.shutdown();
  }
}

这里的错误处理和连接重试逻辑在生产环境中是必不可少的。

WebSocket 服务实现

我们使用 shelfshelf_web_socket 包来构建一个简单的 HTTP 服务器,并将特定路径的请求升级为 WebSocket 连接。

// bin/server.dart
import 'dart:async';
import 'dart:convert';
import 'package:shelf/shelf.dart';
import 'package:shelf/shelf_io.dart' as io;
import 'package:shelf_web_socket/shelf_web_socket.dart';
import 'package:web_socket_channel/web_socket_channel.dart';

import 'packagepackage:real_time_gateway/grpc/telemetry_client.dart';
import 'packagepackage:real_time_gateway/grpc/generated/telemetry.pb.dart';

// A simple in-memory store for active subscriptions
// Key: WebSocketChannel, Value: StreamSubscription to gRPC stream
final Map<WebSocketChannel, StreamSubscription> _subscriptions = {};

void main() async {
  final telemetryClient = TelemetryGrpcClient();

  final handler = webSocketHandler((WebSocketChannel webSocket) {
    print('New WebSocket connection established.');

    webSocket.stream.listen(
      (message) {
        try {
          final decoded = jsonDecode(message as String);
          final command = decoded['command'];
          final deviceId = decoded['deviceId'];

          if (command == 'subscribe' && deviceId is String) {
            print('Received subscribe command for device: $deviceId');
            // Cancel any existing subscription for this channel
            _subscriptions[webSocket]?.cancel();
            
            final grpcStream = telemetryClient.subscribeToDeviceStatus(deviceId);

            final subscription = grpcStream.listen(
              (DeviceStatusResponse status) {
                // Convert proto message to a map/JSON for the WebSocket client
                final payload = {
                  'deviceId': status.deviceId,
                  'status': status.status.name,
                  'lastTemperature': status.lastTemperature,
                  'lastSeen': status.lastSeen.toDateTime().toIso8601String(),
                };
                // A key part of production code is handling potential sink closures
                if (webSocket.sink.closeCode == null) {
                    webSocket.sink.add(jsonEncode(payload));
                }
              },
              onError: (err) {
                print('gRPC stream error for device $deviceId: $err');
                // Propagate error to the client
                if (webSocket.sink.closeCode == null) {
                    webSocket.sink.add(jsonEncode({'error': 'Failed to get device status.'}));
                }
              },
              onDone: () {
                print('gRPC stream for device $deviceId completed.');
                _subscriptions.remove(webSocket);
              },
            );
            _subscriptions[webSocket] = subscription;
          } else if (command == 'unsubscribe') {
             print('Received unsubscribe command for device: $deviceId');
             _subscriptions[webSocket]?.cancel();
             _subscriptions.remove(webSocket);
          }
        } catch (e) {
          print('Error processing WebSocket message: $e');
        }
      },
      onDone: () {
        print('WebSocket connection closed.');
        // Clean up resources when client disconnects
        _subscriptions[webSocket]?.cancel();
        _subscriptions.remove(webSocket);
      },
      onError: (err) {
        print('WebSocket stream error: $err');
        _subscriptions[webSocket]?.cancel();
        _subscriptions.remove(webSocket);
      }
    );
  });

  final server = await io.serve(handler, '0.0.0.0', 8080);
  print('WebSocket server listening on port ${server.port}');
}

这段代码的核心逻辑是:当 WebSocket 收到一个 subscribe 消息时,它会调用 gRPC 客户端的 subscribeToDeviceStatus 方法,这会与 Quarkus 服务建立一个长期的流式连接。每当 Quarkus 服务从流中推送一条新数据,Dart 服务就会将其转发给相应的 WebSocket 客户端。连接关闭时的资源清理(取消 gRPC 订阅)至关重要,否则会导致严重的资源泄漏。

架构的局限性与未来迭代路径

这套架构并非没有缺点。最显著的是增加了运维复杂性,我们需要维护两个独立部署的服务,以及它们之间的网络策略。gRPC 桥接层本身也可能成为瓶颈,尽管可以通过水平扩展 Dart 网关实例来缓解,但这需要一个支持会话粘性或一个外部状态存储来处理 WebSocket 连接。

此外,Quarkus 服务中的状态推送机制过于简化。一个更健壮的实现应该引入消息队列(如 Apache Pulsar 或 Kafka)。当遥测数据被处理并更新 device_status 表后,一个变更事件会被发布到消息队列。Quarkus 服务可以订阅这个队列,并将事件推送给所有活跃的 gRPC 订阅者。这种方式解耦了写入路径和通知路径,提供了更好的弹性和背压处理能力。

另一个可行的优化是探索 Quarkus 的 GraalVM native image。这将进一步降低 Quarkus 服务的内存占用和启动时间,使其在 Serverless 或容器化环境中更具成本效益。对于 Dart 网关,可以考虑使用 isolates 来处理潜在的 CPU 密集型任务(例如复杂的 JSON 序列化),以避免阻塞主事件循环。


  目录