构建与 React Native 通信的 C++ 高性能 Paxos 共识引擎


最初的需求是在一个 C++ 核心业务系统中,为运维和调试人员提供一个轻量级、跨平台的监控面板。这个面板需要能实时展示系统内部多个节点的共识状态,并能手动触发一些调试指令。Web 技术栈显得过于笨重,需要一个完整的 Web Server;而为 iOS 和 Android 分别开发原生应用,维护成本又太高。React Native 成了自然的选项,但真正的挑战在于如何将一个高性能、多线程、状态复杂的 C++ 模块——一个 Paxos 共识引擎——高效且安全地桥接到 React Native 的 JavaScript 世界。

传统的 React Native Bridge 是基于异步消息传递的,对于高频的状态查询和实时数据同步来说,性能损耗和延迟都无法接受。我们需要的是一种更直接、更底层的通信方式。这正是 React Native 的新架构——Turbo Modules 和 JSI (JavaScript Interface)——所要解决的问题。它允许 JavaScript 直接、同步地调用 C++ 函数,甚至持有 C++ 对象的引用,这为我们的架构设想打开了大门。

我们的目标是:

  1. 在 C++ 中实现一个简化的、但在概念上完备的多节点 Paxos 共识模拟器。
  2. 通过 JSI 将这个 C++ 模块封装成一个 Turbo Native Module。
  3. 实现从 C++ 到 JavaScript 的事件推送,以便实时更新 UI 状态。
  4. 在 React Native 端构建一个可以与 C++ 核心交互的监控界面。

Paxos 核心模块的 C++ 实现

在生产环境中,完整的 Paxos 实现极其复杂,通常会使用 Multi-Paxos 或其变种 Raft。为了在本文中聚焦于 C++ 与 RN 的通信,我将实现一个经典的 Basic Paxos 模型,它用于对单个值达成共識。我们将模拟一个由3个节点组成的集群,这些节点将在同一个进程内的多个线程中运行,通过一个共享的“消息总线”进行通信。

首先,定义核心数据结构和节点接口。

// src/core/PaxosNode.h

#ifndef PAXOS_NODE_H
#define PAXOS_NODE_H

#include <iostream>
#include <string>
#include <vector>
#include <mutex>
#include <thread>
#include <optional>
#include <functional>
#include <map>
#include <atomic>

// 定义提案ID,通常由轮次编号和提议者ID组成,以保证唯一性
struct ProposalID {
    int number;
    int node_id;

    bool operator<(const ProposalID& other) const {
        if (number != other.number) {
            return number < other.number;
        }
        return node_id < other.node_id;
    }

    bool operator==(const ProposalID& other) const {
        return number == other.number && node_id == other.node_id;
    }
};

// 定义不同角色的状态
struct ProposerState {
    ProposalID proposal_id;
    std::string proposed_value;
    int promise_count = 0;
    int accepted_count = 0;
    bool proposal_active = false;
};

struct AcceptorState {
    ProposalID promised_id = {-1, -1};
    std::optional<ProposalID> accepted_id;
    std::optional<std::string> accepted_value;
};

// 节点状态的聚合视图
struct NodeStateView {
    int id;
    std::string role; // "Proposer", "Acceptor", "Learner"
    int proposal_round;
    std::string proposed_value;
    int promise_count;
    int accepted_count;
    int promised_round;
    std::optional<std::string> accepted_value;
    std::optional<std::string> learned_value;
};

class PaxosNode {
public:
    // JS侧的回调函数,用于将状态更新推送给UI
    using StateUpdateCallback = std::function<void(int nodeId, const NodeStateView&)>;

    PaxosNode(int id, int total_nodes, StateUpdateCallback cb);
    ~PaxosNode();

    // 外部触发提案
    void Propose(const std::string& value);

    // 接收消息的入口
    void ReceiveMessage(const std::string& message_type, const std::string& payload);

    // 获取当前节点状态,供UI查询
    NodeStateView GetState();

private:
    const int id_;
    const int total_nodes_;
    const int majority_;
    StateUpdateCallback state_update_callback_;

    std::mutex mtx_;
    ProposerState proposer_state_;
    AcceptorState acceptor_state_;
    std::optional<std::string> learned_value_;
    
    // Proposer 逻辑
    void Prepare();

    // Acceptor 逻辑
    void OnPrepare(const ProposalID& pid);
    void OnAcceptRequest(const ProposalID& pid, const std::string& value);

    // Learner 逻辑
    void OnAccepted(const ProposalID& pid, const std::string& value);

    // 广播消息
    void Broadcast(const std::string& message_type, const std::string& payload);

    // 用于定期通知UI更新
    void NotifyStateUpdate();
};

#endif // PAXOS_NODE_H

这份头文件定义了 Paxos 算法中最重要的几个概念:ProposalID 用于保证提案的唯一性和可比较性,ProposerStateAcceptorState 分别存储了提议者和接受者的内部状态。NodeStateView 是一个关键的设计,它是一个只读的数据结构,专门用于将 C++ 内部复杂的状态暴露给外部(在这里是 JSI Bridge),避免了直接暴露内部锁和复杂对象。

StateUpdateCallback 是实现从 C++ 主动推送到 JS 的核心。JS 端会传入一个函数,C++ 在内部状态发生关键变化时调用它。

接下来是具体的实现。这里的难点在于线程安全,因为 ProposeReceiveMessage 可能会被不同的线程调用。所有对内部状态的修改都必须由互斥锁 mtx_ 保护。

// src/core/PaxosNode.cpp (部分核心逻辑)
#include "PaxosNode.h"
#include "MessageBus.h" // 假设存在一个消息总线来模拟网络通信

// 为了简化,我们使用一个全局的消息总线
extern MessageBus g_message_bus;

PaxosNode::PaxosNode(int id, int total_nodes, StateUpdateCallback cb)
    : id_(id),
      total_nodes_(total_nodes),
      majority_((total_nodes / 2) + 1),
      state_update_callback_(cb) {
    proposer_state_.proposal_id = {0, id_};
}

PaxosNode::~PaxosNode() {}

void PaxosNode::Propose(const std::string& value) {
    std::lock_guard<std::mutex> lock(mtx_);
    // 每次新提案都增加轮次号
    proposer_state_.proposal_id.number++;
    proposer_state_.proposed_value = value;
    proposer_state_.promise_count = 0;
    proposer_state_.accepted_count = 0;
    proposer_state_.proposal_active = true;
    
    std::cout << "[Node " << id_ << "] Starting proposal round " << proposer_state_.proposal_id.number << " with value '" << value << "'" << std::endl;
    Prepare();
    NotifyStateUpdate();
}

void PaxosNode::Prepare() {
    // 广播 Prepare 请求
    // 在真实项目中,这里是网络调用
    std::string payload = std::to_string(proposer_state_.proposal_id.number) + ":" + std::to_string(proposer_state_.proposal_id.node_id);
    Broadcast("prepare", payload);
}

void PaxosNode::ReceiveMessage(const std::string& message_type, const std::string& payload) {
    std::lock_guard<std::mutex> lock(mtx_);

    if (message_type == "prepare") {
        // ... 解析 payload 获取 proposal_id
        // OnPrepare(parsed_pid);
    } else if (message_type == "promise") {
        // ...
        // Proposer 收到 Promise 响应
        proposer_state_.promise_count++;
        if (proposer_state_.proposal_active && proposer_state_.promise_count >= majority_) {
            proposer_state_.proposal_active = false; // 防止重复发送 Accept
            std::cout << "[Node " << id_ << "] Got majority promises. Sending Accept request." << std::endl;
            // 广播 Accept 请求
            std::string accept_payload = "...";
            Broadcast("accept_request", accept_payload);
        }
    } else if (message_type == "accept_request") {
        // ...
    } else if (message_type == "accepted") {
        // ...
        // Learner 逻辑,收到过半数 accepted 消息后,学习到最终值
        // 简单实现:我们假设收到一个 accepted 就代表学习到了
        // 在实际系统中,learner需要计数
        if (!learned_value_.has_value()) {
            learned_value_ = payload; // payload is the accepted value
            std::cout << "[Node " << id_ << "] Learned value: '" << *learned_value_ << "'" << std::endl;
        }
    }
    NotifyStateUpdate();
}

void PaxosNode::OnPrepare(const ProposalID& pid) {
    if (pid < acceptor_state_.promised_id) {
        // 忽略旧的提案
        return;
    }
    acceptor_state_.promised_id = pid;
    std::cout << "[Node " << id_ << "] Promising for round " << pid.number << std::endl;

    // 回复 Promise,附带之前已接受的提案(如果有)
    std::string promise_payload = "..."; // 包含 promised_id 和 accepted_value
    // Send("promise", promise_payload, pid.node_id); // 单点发送
}

void PaxosNode::Broadcast(const std::string& message_type, const std::string& payload) {
    // 模拟广播
    for (int i = 0; i < total_nodes_; ++i) {
        g_message_bus.PostMessage(i, message_type, payload);
    }
}

void PaxosNode::NotifyStateUpdate() {
    if (state_update_callback_) {
        state_update_callback_(id_, GetState());
    }
}

NodeStateView PaxosNode::GetState() {
    // 注意:这个函数没有加锁,因为它应该被持有锁的函数调用,或者在设计上保证其访问的数据是线程安全的快照。
    // 为了暴露给JSI,最好在这里创建一个安全的快照。
    std::lock_guard<std::mutex> lock(mtx_);
    return {
        id_,
        proposer_state_.proposal_active ? "Proposer" : "Acceptor/Learner",
        proposer_state_.proposal_id.number,
        proposer_state_.proposed_value,
        proposer_state_.promise_count,
        proposer_state_.accepted_count,
        acceptor_state_.promised_id.number,
        acceptor_state_.accepted_value,
        learned_value_
    };
}

这个实现只是一个骨架,但它展示了核心的逻辑:状态的维护、角色的转换以及线程安全的考量。MessageBus 是一个模拟的网络层,负责将一个节点发出的消息投递到所有节点的“邮箱”里。

桥接 C++ 到 React Native:Turbo Module 与 JSI

现在,我们将这个 PaxosNode 封装成一个 React Native 可以直接调用的模块。这需要几个步骤:

  1. 定义 Turbo Module 的 JS 规范。
  2. 编写 C++ 模块实现,使用 JSI 来暴露 PaxosNode 的功能。

1. 定义 JS 规范 (NativePaxos.ts)

这个文件告诉 React Native 的 Codegen 工具,我们的原生模块有哪些方法和属性。

import type { TurboModule } from 'react-native/Libraries/TurboModule/RCTExport';
import { TurboModuleRegistry } from 'react-native';

export interface NodeState {
  id: number;
  role: string;
  proposal_round: number;
  proposed_value: string;
  promise_count: number;
  accepted_count: number;
  promised_round: number;
  accepted_value?: string;
  learned_value?: string;
}

export interface Spec extends TurboModule {
  // 初始化并启动一个拥有N个节点的Paxos集群模拟
  // 返回 true 表示成功
  initialize(nodeCount: number): Promise<boolean>;

  // 让指定ID的节点发起一个提案
  propose(nodeId: number, value: string): void;

  // 获取指定ID节点的当前状态
  getNodeState(nodeId: number): NodeState;

  // 注册一个监听器,当任何节点状态更新时,C++会调用这个函数
  addStateUpdateListener(listener: (states: NodeState[]) => void): void;
  
  // 清理资源
  cleanup(): void;
}

export default TurboModuleRegistry.get<Spec>('PaxosModule');

2. 编写 C++ 模块实现

这是整个架构的核心。我们将创建一个 C++ 类,它继承自 JSI 的 HostObject,并把我们的 Paxos 集群管理起来。

// cpp/PaxosModule.cpp

#include "PaxosModule.h"
#include "core/PaxosNode.h"
#include "core/MessageBus.h"
#include <vector>
#include <memory>
#include <jsi/jsi.h>

using namespace facebook;

// 全局的模拟环境
MessageBus g_message_bus;
std::vector<std::shared_ptr<PaxosNode>> g_nodes;
std::vector<std::thread> g_node_threads;
std::shared_ptr<jsi::Function> g_js_callback;
std::shared_ptr<facebook::react::CallInvoker> g_call_invoker;
std::mutex g_callback_mutex;

// JSI HostObject,我们的 C++ Paxos 管理器
class PaxosManager : public jsi::HostObject {
private:
    jsi::Runtime& runtime_;

public:
    PaxosManager(jsi::Runtime& rt) : runtime_(rt) {}

    // 暴露给 JS 的方法列表
    jsi::Value get(jsi::Runtime& rt, const jsi::PropNameID& name) override {
        std::string propName = name.utf8(rt);

        if (propName == "initialize") {
            return jsi::Function::createFromHostFunction(rt, name, 1, 
                [this](jsi::Runtime& rt, const jsi::Value& thisVal, const jsi::Value* args, size_t count) -> jsi::Value {
                    // ... 实现 initialize 逻辑
                    // 创建 PaxosNode 实例和线程
                    return jsi::Value(true);
                });
        }
        if (propName == "propose") {
            // ... 实现 propose 逻辑
        }
        if (propName == "getNodeState") {
            // ... 实现 getNodeState 逻辑
        }
        if (propName == "addStateUpdateListener") {
            return jsi::Function::createFromHostFunction(rt, name, 1,
                [this](jsi::Runtime& rt, const jsi::Value& thisVal, const jsi::Value* args, size_t count) -> jsi::Value {
                    if (count > 0 && args[0].isObject() && args[0].asObject(rt).isFunction(rt)) {
                        std::lock_guard<std::mutex> lock(g_callback_mutex);
                        g_js_callback = std::make_shared<jsi::Function>(args[0].asObject(rt).asFunction(rt));
                    }
                    return jsi::Value::undefined();
                });
        }
        if (propName == "cleanup") {
            // ... 实现 cleanup 逻辑
        }

        return jsi::Value::undefined();
    }
};

// 模块安装函数
void PaxosModule::install(jsi::Runtime& jsiRuntime) {
    auto moduleName = "PaxosModule";
    auto paxosManager = std::make_shared<PaxosManager>(jsiRuntime);
    jsiRuntime.global().setProperty(jsiRuntime, moduleName, jsi::Object::createFromHostObject(jsiRuntime, paxosManager));
}

// C++ -> JS 回调的实现
void CppStateUpdateCallback(int nodeId, const NodeStateView& state) {
    // 这个函数在 C++ 的 Paxos 线程被调用,不能直接操作 JSI Runtime
    // 必须通过 CallInvoker 调度到 JS 线程执行
    g_call_invoker->invokeAsync([]{
        std::lock_guard<std::mutex> lock(g_callback_mutex);
        if (!g_js_callback) return;

        // 准备 JS 对象数组
        // auto runtime = ...
        // jsi::Array jsStates = ...
        // for (const auto& node : g_nodes) {
        //     NodeStateView s = node->GetState();
        //     jsi::Object jsState(runtime);
        //     jsState.setProperty(runtime, "id", s.id);
        //     ...
        //     jsStates.setValueAtIndex(runtime, s.id, jsState);
        // }
        // g_js_callback->call(runtime, jsStates);
    });
}

这段代码的复杂性在于线程模型的处理。PaxosNode 运行在自己的后台线程中,而 JSI 调用来自 React Native 的 JS 线程。当后台线程需要更新 UI 时,它不能直接调用 g_js_callback,因为 JSI Runtime 不是线程安全的。正确的做法是使用 React Native 提供的 CallInvoker,它能将一个 lambda 函数安全地调度到 JS 线程去执行。CppStateUpdateCallback 就是这个机制的核心实现。

React Native UI 实现

UI 部分相对直接。我们使用 React Hooks 来管理状态,并通过调用原生模块的方法来与 C++ 后端交互。

// PaxosMonitorScreen.tsx

import React, { useState, useEffect } from 'react';
import { View, Text, Button, StyleSheet, ScrollView } from 'react-native';
import PaxosModule, { NodeState } from './NativePaxos'; // 导入我们的模块

const NODE_COUNT = 3;

const PaxosMonitorScreen = () => {
  const [isInitialized, setIsInitialized] = useState(false);
  const [nodeStates, setNodeStates] = useState<NodeState[]>([]);
  const [proposalValue, setProposalValue] = useState('');

  useEffect(() => {
    const initializePaxos = async () => {
      // 模块初始化
      const success = await PaxosModule.initialize(NODE_COUNT);
      if (success) {
        setIsInitialized(true);
        
        // 注册回调,这是从 C++ 接收实时更新的关键
        PaxosModule.addStateUpdateListener((states: NodeState[]) => {
          // 这个回调由 C++ 触发,在 JS 线程执行
          setNodeStates(states);
        });

        // 获取初始状态
        const initialStates = [];
        for (let i = 0; i < NODE_COUNT; i++) {
          initialStates.push(PaxosModule.getNodeState(i));
        }
        setNodeStates(initialStates);
      }
    };

    initializePaxos();

    return () => {
      // 组件卸载时清理 C++ 资源
      PaxosModule.cleanup();
    };
  }, []);

  const handlePropose = (nodeId: number) => {
    const value = `Value from Node ${nodeId} at ${new Date().toLocaleTimeString()}`;
    // 调用 C++ 方法发起提案
    PaxosModule.propose(nodeId, value);
  };

  return (
    <ScrollView style={styles.container}>
      <Text style={styles.title}>Paxos Cluster Monitor</Text>
      {isInitialized ? (
        <View>
          {nodeStates.map(state => (
            <View key={state.id} style={styles.nodeCard}>
              <Text style={styles.nodeTitle}>Node {state.id}</Text>
              <Text>Role: {state.role}</Text>
              <Text>Learned Value: {state.learned_value ?? 'N/A'}</Text>
              <Text>Proposal Round: {state.proposal_round}</Text>
              <Text>Promised Round: {state.promised_round}</Text>
              <Text>Promises Received: {state.promise_count}</Text>
              <Button title="Propose New Value" onPress={() => handlePropose(state.id)} />
            </View>
          ))}
        </View>
      ) : (
        <Text>Initializing Paxos Cluster...</Text>
      )}
    </ScrollView>
  );
};

const styles = StyleSheet.create({
    // ... 样式定义
});

export default PaxosMonitorScreen;

UI 代码的核心在 useEffect hook 中。它负责初始化 C++ 模块,并且注册一个监听函数 addStateUpdateListener。当 C++ 端的 Paxos 状态发生变化并通过 CallInvoker 调用这个 JS 函数时,setNodeStates 会被触发,从而以响应式的方式更新整个界面。用户点击按钮时,handlePropose 直接调用 PaxosModule.propose,这是一个同步的 JSI 调用,立即在 C++ 端触发提案流程。

架构图与总结

整个系统的交互流程可以用一个 Mermaid 图清晰地表示出来:

graph TD
    subgraph React Native Realm
        A[UI Component] -- 1. Propose(value) --> B(PaxosModule.ts);
        B -- 2. JSI Call --> C{JSI Runtime};
        A -- 7. Update State --> D[React State];
    end

    subgraph C++ Native Realm
        C -- 3. Forward Call --> E[PaxosManager HostObject];
        E -- 4. Invoke --> F[PaxosNode::Propose];
        F -- 5. State Change & Broadcast --> G[Other PaxosNode Threads];
        G -- 6. NotifyStateUpdate --> H{CallInvoker};
    end
    
    H -- 6.1 Schedule on JS Thread --> C;
    C -- 6.2 Execute JS Callback --> B;

这个流程展示了从用户交互触发 C++ 逻辑,再由 C++ 后台线程的计算结果反向驱动 UI 更新的完整闭环。

遗留问题与未来迭代

我们构建的这个系统虽然验证了架构的可行性,但在生产环境中还有很多工作要做。

  1. 网络层实现: 当前的 MessageBus 是进程内模拟。一个真实的实现需要替换为一个健壮的网络层,例如使用 gRPC 或者自定义的 TCP/UDP 协议,并处理网络分区、延迟和丢包等问题。
  2. Paxos 算法完备性: 我们只实现了 Basic Paxos。对于持续的服务,需要升级到 Multi-Paxos 或 Raft 来处理一系列值的共识,形成一个复制状态机。
  3. 错误处理与健壮性: C++ 代码中的异常需要被捕获并优雅地传递到 JS 层,而不是让应用崩溃。JSI 提供了相应的机制来处理 C++ 异常。
  4. 内存管理: 在 JSI 中传递对象,特别是 HostObject,需要非常小心地管理生命周期。C++ 对象必须在 JS 对象被垃圾回收后正确地析构,避免内存泄漏。
  5. 性能优化: 对于状态更新极为频繁的场景,每次都序列化整个 NodeStateView 可能会成为瓶颈。可以考虑增量更新,或者使用 JSI 的 ArrayBuffer 功能来传递二进制数据,以获得更高的性能。

尽管如此,通过 JSI 将一个高性能 C++ 核心与 React Native UI 无缝结合的模式,为许多需要重计算、底层操作或复用现有 C++ 库的移动应用场景提供了一个极为强大的解决方案。它打破了以往桥接技术的性能壁垒,让真正的原生性能和前端开发效率得以共存。


  目录