最初的需求是在一个 C++ 核心业务系统中,为运维和调试人员提供一个轻量级、跨平台的监控面板。这个面板需要能实时展示系统内部多个节点的共识状态,并能手动触发一些调试指令。Web 技术栈显得过于笨重,需要一个完整的 Web Server;而为 iOS 和 Android 分别开发原生应用,维护成本又太高。React Native 成了自然的选项,但真正的挑战在于如何将一个高性能、多线程、状态复杂的 C++ 模块——一个 Paxos 共识引擎——高效且安全地桥接到 React Native 的 JavaScript 世界。
传统的 React Native Bridge 是基于异步消息传递的,对于高频的状态查询和实时数据同步来说,性能损耗和延迟都无法接受。我们需要的是一种更直接、更底层的通信方式。这正是 React Native 的新架构——Turbo Modules 和 JSI (JavaScript Interface)——所要解决的问题。它允许 JavaScript 直接、同步地调用 C++ 函数,甚至持有 C++ 对象的引用,这为我们的架构设想打开了大门。
我们的目标是:
- 在 C++ 中实现一个简化的、但在概念上完备的多节点 Paxos 共识模拟器。
- 通过 JSI 将这个 C++ 模块封装成一个 Turbo Native Module。
- 实现从 C++ 到 JavaScript 的事件推送,以便实时更新 UI 状态。
- 在 React Native 端构建一个可以与 C++ 核心交互的监控界面。
Paxos 核心模块的 C++ 实现
在生产环境中,完整的 Paxos 实现极其复杂,通常会使用 Multi-Paxos 或其变种 Raft。为了在本文中聚焦于 C++ 与 RN 的通信,我将实现一个经典的 Basic Paxos 模型,它用于对单个值达成共識。我们将模拟一个由3个节点组成的集群,这些节点将在同一个进程内的多个线程中运行,通过一个共享的“消息总线”进行通信。
首先,定义核心数据结构和节点接口。
// src/core/PaxosNode.h
#ifndef PAXOS_NODE_H
#define PAXOS_NODE_H
#include <iostream>
#include <string>
#include <vector>
#include <mutex>
#include <thread>
#include <optional>
#include <functional>
#include <map>
#include <atomic>
// 定义提案ID,通常由轮次编号和提议者ID组成,以保证唯一性
struct ProposalID {
int number;
int node_id;
bool operator<(const ProposalID& other) const {
if (number != other.number) {
return number < other.number;
}
return node_id < other.node_id;
}
bool operator==(const ProposalID& other) const {
return number == other.number && node_id == other.node_id;
}
};
// 定义不同角色的状态
struct ProposerState {
ProposalID proposal_id;
std::string proposed_value;
int promise_count = 0;
int accepted_count = 0;
bool proposal_active = false;
};
struct AcceptorState {
ProposalID promised_id = {-1, -1};
std::optional<ProposalID> accepted_id;
std::optional<std::string> accepted_value;
};
// 节点状态的聚合视图
struct NodeStateView {
int id;
std::string role; // "Proposer", "Acceptor", "Learner"
int proposal_round;
std::string proposed_value;
int promise_count;
int accepted_count;
int promised_round;
std::optional<std::string> accepted_value;
std::optional<std::string> learned_value;
};
class PaxosNode {
public:
// JS侧的回调函数,用于将状态更新推送给UI
using StateUpdateCallback = std::function<void(int nodeId, const NodeStateView&)>;
PaxosNode(int id, int total_nodes, StateUpdateCallback cb);
~PaxosNode();
// 外部触发提案
void Propose(const std::string& value);
// 接收消息的入口
void ReceiveMessage(const std::string& message_type, const std::string& payload);
// 获取当前节点状态,供UI查询
NodeStateView GetState();
private:
const int id_;
const int total_nodes_;
const int majority_;
StateUpdateCallback state_update_callback_;
std::mutex mtx_;
ProposerState proposer_state_;
AcceptorState acceptor_state_;
std::optional<std::string> learned_value_;
// Proposer 逻辑
void Prepare();
// Acceptor 逻辑
void OnPrepare(const ProposalID& pid);
void OnAcceptRequest(const ProposalID& pid, const std::string& value);
// Learner 逻辑
void OnAccepted(const ProposalID& pid, const std::string& value);
// 广播消息
void Broadcast(const std::string& message_type, const std::string& payload);
// 用于定期通知UI更新
void NotifyStateUpdate();
};
#endif // PAXOS_NODE_H
这份头文件定义了 Paxos 算法中最重要的几个概念:ProposalID
用于保证提案的唯一性和可比较性,ProposerState
和 AcceptorState
分别存储了提议者和接受者的内部状态。NodeStateView
是一个关键的设计,它是一个只读的数据结构,专门用于将 C++ 内部复杂的状态暴露给外部(在这里是 JSI Bridge),避免了直接暴露内部锁和复杂对象。
StateUpdateCallback
是实现从 C++ 主动推送到 JS 的核心。JS 端会传入一个函数,C++ 在内部状态发生关键变化时调用它。
接下来是具体的实现。这里的难点在于线程安全,因为 Propose
和 ReceiveMessage
可能会被不同的线程调用。所有对内部状态的修改都必须由互斥锁 mtx_
保护。
// src/core/PaxosNode.cpp (部分核心逻辑)
#include "PaxosNode.h"
#include "MessageBus.h" // 假设存在一个消息总线来模拟网络通信
// 为了简化,我们使用一个全局的消息总线
extern MessageBus g_message_bus;
PaxosNode::PaxosNode(int id, int total_nodes, StateUpdateCallback cb)
: id_(id),
total_nodes_(total_nodes),
majority_((total_nodes / 2) + 1),
state_update_callback_(cb) {
proposer_state_.proposal_id = {0, id_};
}
PaxosNode::~PaxosNode() {}
void PaxosNode::Propose(const std::string& value) {
std::lock_guard<std::mutex> lock(mtx_);
// 每次新提案都增加轮次号
proposer_state_.proposal_id.number++;
proposer_state_.proposed_value = value;
proposer_state_.promise_count = 0;
proposer_state_.accepted_count = 0;
proposer_state_.proposal_active = true;
std::cout << "[Node " << id_ << "] Starting proposal round " << proposer_state_.proposal_id.number << " with value '" << value << "'" << std::endl;
Prepare();
NotifyStateUpdate();
}
void PaxosNode::Prepare() {
// 广播 Prepare 请求
// 在真实项目中,这里是网络调用
std::string payload = std::to_string(proposer_state_.proposal_id.number) + ":" + std::to_string(proposer_state_.proposal_id.node_id);
Broadcast("prepare", payload);
}
void PaxosNode::ReceiveMessage(const std::string& message_type, const std::string& payload) {
std::lock_guard<std::mutex> lock(mtx_);
if (message_type == "prepare") {
// ... 解析 payload 获取 proposal_id
// OnPrepare(parsed_pid);
} else if (message_type == "promise") {
// ...
// Proposer 收到 Promise 响应
proposer_state_.promise_count++;
if (proposer_state_.proposal_active && proposer_state_.promise_count >= majority_) {
proposer_state_.proposal_active = false; // 防止重复发送 Accept
std::cout << "[Node " << id_ << "] Got majority promises. Sending Accept request." << std::endl;
// 广播 Accept 请求
std::string accept_payload = "...";
Broadcast("accept_request", accept_payload);
}
} else if (message_type == "accept_request") {
// ...
} else if (message_type == "accepted") {
// ...
// Learner 逻辑,收到过半数 accepted 消息后,学习到最终值
// 简单实现:我们假设收到一个 accepted 就代表学习到了
// 在实际系统中,learner需要计数
if (!learned_value_.has_value()) {
learned_value_ = payload; // payload is the accepted value
std::cout << "[Node " << id_ << "] Learned value: '" << *learned_value_ << "'" << std::endl;
}
}
NotifyStateUpdate();
}
void PaxosNode::OnPrepare(const ProposalID& pid) {
if (pid < acceptor_state_.promised_id) {
// 忽略旧的提案
return;
}
acceptor_state_.promised_id = pid;
std::cout << "[Node " << id_ << "] Promising for round " << pid.number << std::endl;
// 回复 Promise,附带之前已接受的提案(如果有)
std::string promise_payload = "..."; // 包含 promised_id 和 accepted_value
// Send("promise", promise_payload, pid.node_id); // 单点发送
}
void PaxosNode::Broadcast(const std::string& message_type, const std::string& payload) {
// 模拟广播
for (int i = 0; i < total_nodes_; ++i) {
g_message_bus.PostMessage(i, message_type, payload);
}
}
void PaxosNode::NotifyStateUpdate() {
if (state_update_callback_) {
state_update_callback_(id_, GetState());
}
}
NodeStateView PaxosNode::GetState() {
// 注意:这个函数没有加锁,因为它应该被持有锁的函数调用,或者在设计上保证其访问的数据是线程安全的快照。
// 为了暴露给JSI,最好在这里创建一个安全的快照。
std::lock_guard<std::mutex> lock(mtx_);
return {
id_,
proposer_state_.proposal_active ? "Proposer" : "Acceptor/Learner",
proposer_state_.proposal_id.number,
proposer_state_.proposed_value,
proposer_state_.promise_count,
proposer_state_.accepted_count,
acceptor_state_.promised_id.number,
acceptor_state_.accepted_value,
learned_value_
};
}
这个实现只是一个骨架,但它展示了核心的逻辑:状态的维护、角色的转换以及线程安全的考量。MessageBus
是一个模拟的网络层,负责将一个节点发出的消息投递到所有节点的“邮箱”里。
桥接 C++ 到 React Native:Turbo Module 与 JSI
现在,我们将这个 PaxosNode
封装成一个 React Native 可以直接调用的模块。这需要几个步骤:
- 定义 Turbo Module 的 JS 规范。
- 编写 C++ 模块实现,使用 JSI 来暴露
PaxosNode
的功能。
1. 定义 JS 规范 (NativePaxos.ts
)
这个文件告诉 React Native 的 Codegen 工具,我们的原生模块有哪些方法和属性。
import type { TurboModule } from 'react-native/Libraries/TurboModule/RCTExport';
import { TurboModuleRegistry } from 'react-native';
export interface NodeState {
id: number;
role: string;
proposal_round: number;
proposed_value: string;
promise_count: number;
accepted_count: number;
promised_round: number;
accepted_value?: string;
learned_value?: string;
}
export interface Spec extends TurboModule {
// 初始化并启动一个拥有N个节点的Paxos集群模拟
// 返回 true 表示成功
initialize(nodeCount: number): Promise<boolean>;
// 让指定ID的节点发起一个提案
propose(nodeId: number, value: string): void;
// 获取指定ID节点的当前状态
getNodeState(nodeId: number): NodeState;
// 注册一个监听器,当任何节点状态更新时,C++会调用这个函数
addStateUpdateListener(listener: (states: NodeState[]) => void): void;
// 清理资源
cleanup(): void;
}
export default TurboModuleRegistry.get<Spec>('PaxosModule');
2. 编写 C++ 模块实现
这是整个架构的核心。我们将创建一个 C++ 类,它继承自 JSI 的 HostObject
,并把我们的 Paxos 集群管理起来。
// cpp/PaxosModule.cpp
#include "PaxosModule.h"
#include "core/PaxosNode.h"
#include "core/MessageBus.h"
#include <vector>
#include <memory>
#include <jsi/jsi.h>
using namespace facebook;
// 全局的模拟环境
MessageBus g_message_bus;
std::vector<std::shared_ptr<PaxosNode>> g_nodes;
std::vector<std::thread> g_node_threads;
std::shared_ptr<jsi::Function> g_js_callback;
std::shared_ptr<facebook::react::CallInvoker> g_call_invoker;
std::mutex g_callback_mutex;
// JSI HostObject,我们的 C++ Paxos 管理器
class PaxosManager : public jsi::HostObject {
private:
jsi::Runtime& runtime_;
public:
PaxosManager(jsi::Runtime& rt) : runtime_(rt) {}
// 暴露给 JS 的方法列表
jsi::Value get(jsi::Runtime& rt, const jsi::PropNameID& name) override {
std::string propName = name.utf8(rt);
if (propName == "initialize") {
return jsi::Function::createFromHostFunction(rt, name, 1,
[this](jsi::Runtime& rt, const jsi::Value& thisVal, const jsi::Value* args, size_t count) -> jsi::Value {
// ... 实现 initialize 逻辑
// 创建 PaxosNode 实例和线程
return jsi::Value(true);
});
}
if (propName == "propose") {
// ... 实现 propose 逻辑
}
if (propName == "getNodeState") {
// ... 实现 getNodeState 逻辑
}
if (propName == "addStateUpdateListener") {
return jsi::Function::createFromHostFunction(rt, name, 1,
[this](jsi::Runtime& rt, const jsi::Value& thisVal, const jsi::Value* args, size_t count) -> jsi::Value {
if (count > 0 && args[0].isObject() && args[0].asObject(rt).isFunction(rt)) {
std::lock_guard<std::mutex> lock(g_callback_mutex);
g_js_callback = std::make_shared<jsi::Function>(args[0].asObject(rt).asFunction(rt));
}
return jsi::Value::undefined();
});
}
if (propName == "cleanup") {
// ... 实现 cleanup 逻辑
}
return jsi::Value::undefined();
}
};
// 模块安装函数
void PaxosModule::install(jsi::Runtime& jsiRuntime) {
auto moduleName = "PaxosModule";
auto paxosManager = std::make_shared<PaxosManager>(jsiRuntime);
jsiRuntime.global().setProperty(jsiRuntime, moduleName, jsi::Object::createFromHostObject(jsiRuntime, paxosManager));
}
// C++ -> JS 回调的实现
void CppStateUpdateCallback(int nodeId, const NodeStateView& state) {
// 这个函数在 C++ 的 Paxos 线程被调用,不能直接操作 JSI Runtime
// 必须通过 CallInvoker 调度到 JS 线程执行
g_call_invoker->invokeAsync([]{
std::lock_guard<std::mutex> lock(g_callback_mutex);
if (!g_js_callback) return;
// 准备 JS 对象数组
// auto runtime = ...
// jsi::Array jsStates = ...
// for (const auto& node : g_nodes) {
// NodeStateView s = node->GetState();
// jsi::Object jsState(runtime);
// jsState.setProperty(runtime, "id", s.id);
// ...
// jsStates.setValueAtIndex(runtime, s.id, jsState);
// }
// g_js_callback->call(runtime, jsStates);
});
}
这段代码的复杂性在于线程模型的处理。PaxosNode
运行在自己的后台线程中,而 JSI 调用来自 React Native 的 JS 线程。当后台线程需要更新 UI 时,它不能直接调用 g_js_callback
,因为 JSI Runtime 不是线程安全的。正确的做法是使用 React Native 提供的 CallInvoker
,它能将一个 lambda 函数安全地调度到 JS 线程去执行。CppStateUpdateCallback
就是这个机制的核心实现。
React Native UI 实现
UI 部分相对直接。我们使用 React Hooks 来管理状态,并通过调用原生模块的方法来与 C++ 后端交互。
// PaxosMonitorScreen.tsx
import React, { useState, useEffect } from 'react';
import { View, Text, Button, StyleSheet, ScrollView } from 'react-native';
import PaxosModule, { NodeState } from './NativePaxos'; // 导入我们的模块
const NODE_COUNT = 3;
const PaxosMonitorScreen = () => {
const [isInitialized, setIsInitialized] = useState(false);
const [nodeStates, setNodeStates] = useState<NodeState[]>([]);
const [proposalValue, setProposalValue] = useState('');
useEffect(() => {
const initializePaxos = async () => {
// 模块初始化
const success = await PaxosModule.initialize(NODE_COUNT);
if (success) {
setIsInitialized(true);
// 注册回调,这是从 C++ 接收实时更新的关键
PaxosModule.addStateUpdateListener((states: NodeState[]) => {
// 这个回调由 C++ 触发,在 JS 线程执行
setNodeStates(states);
});
// 获取初始状态
const initialStates = [];
for (let i = 0; i < NODE_COUNT; i++) {
initialStates.push(PaxosModule.getNodeState(i));
}
setNodeStates(initialStates);
}
};
initializePaxos();
return () => {
// 组件卸载时清理 C++ 资源
PaxosModule.cleanup();
};
}, []);
const handlePropose = (nodeId: number) => {
const value = `Value from Node ${nodeId} at ${new Date().toLocaleTimeString()}`;
// 调用 C++ 方法发起提案
PaxosModule.propose(nodeId, value);
};
return (
<ScrollView style={styles.container}>
<Text style={styles.title}>Paxos Cluster Monitor</Text>
{isInitialized ? (
<View>
{nodeStates.map(state => (
<View key={state.id} style={styles.nodeCard}>
<Text style={styles.nodeTitle}>Node {state.id}</Text>
<Text>Role: {state.role}</Text>
<Text>Learned Value: {state.learned_value ?? 'N/A'}</Text>
<Text>Proposal Round: {state.proposal_round}</Text>
<Text>Promised Round: {state.promised_round}</Text>
<Text>Promises Received: {state.promise_count}</Text>
<Button title="Propose New Value" onPress={() => handlePropose(state.id)} />
</View>
))}
</View>
) : (
<Text>Initializing Paxos Cluster...</Text>
)}
</ScrollView>
);
};
const styles = StyleSheet.create({
// ... 样式定义
});
export default PaxosMonitorScreen;
UI 代码的核心在 useEffect
hook 中。它负责初始化 C++ 模块,并且注册一个监听函数 addStateUpdateListener
。当 C++ 端的 Paxos 状态发生变化并通过 CallInvoker
调用这个 JS 函数时,setNodeStates
会被触发,从而以响应式的方式更新整个界面。用户点击按钮时,handlePropose
直接调用 PaxosModule.propose
,这是一个同步的 JSI 调用,立即在 C++ 端触发提案流程。
架构图与总结
整个系统的交互流程可以用一个 Mermaid 图清晰地表示出来:
graph TD subgraph React Native Realm A[UI Component] -- 1. Propose(value) --> B(PaxosModule.ts); B -- 2. JSI Call --> C{JSI Runtime}; A -- 7. Update State --> D[React State]; end subgraph C++ Native Realm C -- 3. Forward Call --> E[PaxosManager HostObject]; E -- 4. Invoke --> F[PaxosNode::Propose]; F -- 5. State Change & Broadcast --> G[Other PaxosNode Threads]; G -- 6. NotifyStateUpdate --> H{CallInvoker}; end H -- 6.1 Schedule on JS Thread --> C; C -- 6.2 Execute JS Callback --> B;
这个流程展示了从用户交互触发 C++ 逻辑,再由 C++ 后台线程的计算结果反向驱动 UI 更新的完整闭环。
遗留问题与未来迭代
我们构建的这个系统虽然验证了架构的可行性,但在生产环境中还有很多工作要做。
- 网络层实现: 当前的
MessageBus
是进程内模拟。一个真实的实现需要替换为一个健壮的网络层,例如使用 gRPC 或者自定义的 TCP/UDP 协议,并处理网络分区、延迟和丢包等问题。 - Paxos 算法完备性: 我们只实现了 Basic Paxos。对于持续的服务,需要升级到 Multi-Paxos 或 Raft 来处理一系列值的共识,形成一个复制状态机。
- 错误处理与健壮性: C++ 代码中的异常需要被捕获并优雅地传递到 JS 层,而不是让应用崩溃。JSI 提供了相应的机制来处理 C++ 异常。
- 内存管理: 在 JSI 中传递对象,特别是
HostObject
,需要非常小心地管理生命周期。C++ 对象必须在 JS 对象被垃圾回收后正确地析构,避免内存泄漏。 - 性能优化: 对于状态更新极为频繁的场景,每次都序列化整个
NodeStateView
可能会成为瓶颈。可以考虑增量更新,或者使用 JSI 的ArrayBuffer
功能来传递二进制数据,以获得更高的性能。
尽管如此,通过 JSI 将一个高性能 C++ 核心与 React Native UI 无缝结合的模式,为许多需要重计算、底层操作或复用现有 C++ 库的移动应用场景提供了一个极为强大的解决方案。它打破了以往桥接技术的性能壁垒,让真正的原生性能和前端开发效率得以共存。