利用 Redis Streams 构建从 Firestore 到 Dgraph 的弹性数据同步总线


在一个复杂的业务场景中,我们常常需要将数据冗余到不同特性的存储系统中,以满足多样的查询需求。我们的主业务数据存储在 Firestore,它为客户端提供了出色的实时性和开发便利性。然而,随着业务发展,我们需要对数据进行深度的、复杂的关系分析,例如分析多层社交网络关系或检测欺诈环路,这正是图数据库 Dgraph 的强项。问题的核心变为:如何构建一个高效、可靠、近乎实时的数据管道,将 Firestore 的变更同步到 Dgraph?

直接在 Firestore 的 Cloud Function 中调用 Dgraph API 是一种看似简单直接的方案。当 Firestore 文档发生 onCreate, onUpdate, onDelete 时,触发一个函数,该函数负责将变更应用到 Dgraph。

这种方案的脆弱性在生产环境中会暴露无遗:

  1. 紧密耦合: Firestore 的写入操作与 Dgraph 的可用性直接挂钩。如果 Dgraph 短暂不可用或响应缓慢,Cloud Function 将会重试,这可能导致 Firestore 端操作的延迟感知,并产生不必要的云函数执行费用。
  2. 缺乏缓冲: 突发的高写入流量会直接冲击 Dgraph,可能导致其过载。我们没有一个缓冲层来削峰填谷。
  3. 数据丢失风险: 如果 Cloud Function 在多次重试后仍然失败,这次数据变更事件就可能永久丢失,导致数据不一致。
  4. 扩展性差: 如果未来需要将数据同步到第三个系统(例如 Elasticsearch),我们就需要修改或增加更多的 Cloud Function,维护成本会线性增加。

一个更稳健的架构是引入一个消息队列作为中间缓冲层,实现系统间的解耦。方案选项包括 Google Pub/Sub、Kafka 或 RabbitMQ。这些都是优秀的消息系统,但对于我们的场景可能过于重量级。Kafka 需要独立的集群维护;Pub/Sub 虽然是托管服务,但其消费模型对于需要保证顺序和精细化控制的场景,配置起来相对复杂。

最终我们选择 Redis Streams 作为数据同步总线的核心。这是一个令人兴奋的权衡:它比传统的 full-fledged 消息队列更轻量、延迟更低,同时又提供了比简单 Redis Pub/Sub 强大得多的特性,如消息持久化、消费者组以及独立的消息确认机制,这些特性恰好是我们构建一个弹性数据管道所必需的。

架构设计

我们的最终架构如下:

graph TD
    A[Firestore] -- onWrite Trigger --> B(Cloud Function);
    B -- XADD --> C{Redis Stream: 'firestore_changes'};
    C -- XREADGROUP --> D[Go Consumer Service - Group 'dgraph_workers'];
    D -- Upsert Mutation --> E[Dgraph];
    D -- XACK --> C;
    subgraph "Error Handling"
        D -- On Failure --> F{Redis Stream: 'dead_letter_queue'};
    end

    style B fill:#f9f,stroke:#333,stroke-width:2px;
    style D fill:#ccf,stroke:#333,stroke-width:2px;
  1. Firestore & Cloud Function (生产者): Firestore 上的任何文档写入(创建或更新)都会触发一个 Cloud Function。这个函数只做一件事:将变更数据序列化,然后通过 XADD 命令推送到 Redis Stream firestore_changes 中。它不关心消费者是谁,也不关心消费是否成功。
  2. Redis Streams (总线): 作为核心缓冲和分发层。firestore_changes 这个 Stream 会持久化所有的数据变更事件。
  3. Go Consumer Service (消费者): 一个独立部署的、可水平扩展的 Go 服务。它使用消费者组(dgraph_workers)的模式从 firestore_changes Stream 中拉取消息。这种模式允许多个消费者实例并行处理消息,同时保证每条消息只被组内的一个消费者处理。
  4. Dgraph (目标): 消费者服务在处理消息后,会将数据转换为 Dgraph 的 upsert mutation 格式,并写入 Dgraph。
  5. 确认与死信队列: 数据成功写入 Dgraph 后,消费者会向 Redis 发送 XACK 命令,告知 Redis 这条消息已成功处理,可以从消费者的 pending 列表中移除。如果处理失败(例如数据格式错误或Dgraph持续不可用),在几次重试后,消息将被推送到一个死信队列 dead_letter_queue 中,等待人工介入,同时依然 XACK 原始消息,防止阻塞整个数据流。

核心实现:生产者 Cloud Function

这个 Node.js Cloud Function 的职责非常单一,因此可以保持极简和高效。

index.js

const functions = require("firebase-functions");
const { createClient } = require("redis");

// 在函数外部初始化 Redis 客户端,以便在函数调用之间重用连接
// 强烈建议使用环境变量来管理配置
const redisClient = createClient({
  url: process.env.REDIS_URL || "redis://localhost:6379",
});

// 异步连接,并在出现错误时记录日志
redisClient.on('error', (err) => console.error('Redis Client Error', err));
(async () => {
  await redisClient.connect();
})();

const STREAM_KEY = "firestore_changes";

exports.propagateToStream = functions
  .region("asia-east2") // 指定函数部署的区域
  .firestore.document("users/{userId}")
  .onWrite(async (change, context) => {
    // onWrite 包含了创建、更新和删除
    // change.before.data() 是变更前的数据
    // change.after.data() 是变更后的数据

    const userId = context.params.userId;
    const eventType = change.before.exists && change.after.exists
      ? "UPDATE"
      : change.after.exists
      ? "CREATE"
      : "DELETE"; // 尽管我们的消费者目前不处理DELETE,但标记出来是个好习惯

    if (eventType === "DELETE") {
      // 在这个例子中,我们假设只同步创建和更新
      // 对于删除,可以发送一个带有特殊标记的事件
      console.log(`Skipping DELETE event for user: ${userId}`);
      return null;
    }
    
    const data = change.after.data();

    // 准备推送到 Stream 的消息
    // 将其扁平化为键值对,这对于 Redis Stream 格式更友好
    const message = {
      eventId: context.eventId, // 使用 Firestore 事件 ID 保证幂等性
      timestamp: context.timestamp,
      entityId: userId,
      entityType: "user", // 方便消费者进行路由
      eventType: eventType,
      payload: JSON.stringify(data), // 将复杂对象序列化为 JSON 字符串
    };

    try {
      if (!redisClient.isOpen) {
        await redisClient.connect();
      }
      
      // 使用 XADD 将消息添加到 Stream
      // '*' 表示让 Redis 自动生成消息 ID
      const messageId = await redisClient.xAdd(STREAM_KEY, "*", message);
      console.log(`Successfully added user ${userId} change to stream. Message ID: ${messageId}`);
    } catch (error) {
      console.error(`Failed to add message to Redis Stream for user ${userId}:`, error);
      // 抛出错误以触发 Cloud Function 的自动重试机制
      // 对于连接 Redis 的失败,重试是合理的
      throw new functions.https.HttpsError('internal', 'Failed to write to Redis Stream', error);
    }

    return null;
  });

这里的关键点是,Cloud Function 内部不包含任何复杂的业务逻辑。它只是一个适配器,将 Firestore 的事件格式转换为我们定义好的内部消息格式,然后推送到 Redis。错误处理也很直接:如果无法连接或写入 Redis,就抛出异常,让 Google Cloud 的重试机制介入。

核心实现:Go 消费者服务

消费者是整个管道的大脑,它负责处理业务逻辑、保证数据写入的幂等性以及处理异常情况。我们选择 Go 是因为它出色的并发性能和健壮的生态系统。

项目结构与配置

/dgraph-sync-service
  |- go.mod
  |- go.sum
  |- main.go
  |- config/
  |   |- config.go
  |- consumer/
  |   |- worker.go
  |- dgraph/
  |   |- client.go

config/config.go

package config

import (
	"github.com/spf13/viper"
	"strings"
)

type Config struct {
	RedisAddr         string `mapstructure:"REDIS_ADDR"`
	RedisStreamName   string `mapstructure:"REDIS_STREAM_NAME"`
	RedisGroupName    string `mapstructure:"REDIS_GROUP_NAME"`
	RedisConsumerName string `mapstructure:"REDIS_CONSUMER_NAME"`
	DgraphEndpoint    string `mapstructure:"DGRAPH_ENDPOINT"`
	WorkerConcurrency int    `mapstructure:"WORKER_CONCURRENCY"`
	MaxRetries        int    `mapstructure:"MAX_RETRIES"`
}

func LoadConfig() (config Config, err error) {
	viper.SetDefault("REDIS_ADDR", "localhost:6379")
	viper.SetDefault("REDIS_STREAM_NAME", "firestore_changes")
	viper.SetDefault("REDIS_GROUP_NAME", "dgraph_workers")
	viper.SetDefault("REDIS_CONSUMER_NAME", "consumer-1") // 应该用主机名或UUID动态生成
	viper.SetDefault("DGRAPH_ENDPOINT", "localhost:9080")
	viper.SetDefault("WORKER_CONCURRENCY", 4)
	viper.SetDefault("MAX_RETRIES", 3)

	viper.SetEnvKeyReplacer(strings.NewReplacer(".", "_"))
	viper.AutomaticEnv()

	err = viper.Unmarshal(&config)
	return
}

Dgraph 客户端与 Upsert 逻辑

dgraph/client.go

package dgraph

import (
	"context"
	"encoding/json"
	"fmt"
	"log"

	"github.com/dgraph-io/dgo/v210"
	"github.com/dgraph-io/dgo/v210/protos/api"
	"google.golang.org/grpc"
	"google.golang.org/grpc/credentials/insecure"
)

type DgraphClient struct {
	dgo *dgo.Dgraph
}

// User represents the structure of our user data in Dgraph.
type User struct {
	UID        string `json:"uid,omitempty"`
	FirestoreID string `json:"firestore_id,omitempty"` // This is our external identifier
	Name       string `json:"name,omitempty"`
	Email      string `json:"email,omitempty"`
	CreatedAt  string `json:"created_at,omitempty"`
}

func NewDgraphClient(endpoint string) (*DgraphClient, error) {
	dialOpts := []grpc.DialOption{
		grpc.WithTransportCredentials(insecure.NewCredentials()),
	}
	conn, err := grpc.Dial(endpoint, dialOpts...)
	if err != nil {
		return nil, fmt.Errorf("failed to connect to Dgraph: %w", err)
	}

	dgoClient := dgo.NewDgraphClient(api.NewDgraphClient(conn))
	return &DgraphClient{dgo: dgoClient}, nil
}

// SetupSchema ensures the required schema, especially the unique constraint on firestore_id.
func (c *DgraphClient) SetupSchema(ctx context.Context) error {
	op := &api.Operation{
		Schema: `
			type User {
				firestore_id: string! @index(hash) @upsert
				name: string
				email: string
				created_at: string
			}
		`,
	}
	return c.dgo.Alter(ctx, op)
}


// UpsertUser performs an "update or insert" operation.
// This is the key to idempotency.
func (c *DgraphClient) UpsertUser(ctx context.Context, firestoreID string, payload string) error {
	var userData map[string]interface{}
	if err := json.Unmarshal([]byte(payload), &userData); err != nil {
		return fmt.Errorf("failed to unmarshal payload: %w", err)
	}

	// The node to find or create.
	query := fmt.Sprintf(`query {
		user as var(func: eq(firestore_id, "%s"))
	}`, firestoreID)

	// Create a user instance for mutation.
	// We must include the identifier in the mutation data.
	user := User{
		FirestoreID: firestoreID,
		Name:       safeGetString(userData, "name"),
		Email:      safeGetString(userData, "email"),
		CreatedAt:  safeGetString(userData, "createdAt"), // Assuming field names match
	}
	
	mu := &api.Mutation{
		SetJson: mustMarshal(user),
	}

	req := &api.Request{
		Query:     query,
		Mutations: []*api.Mutation{mu},
		CommitNow: true,
	}

	if _, err := c.dgo.NewTxn().Do(ctx, req); err != nil {
		return fmt.Errorf("dgraph transaction failed for user %s: %w", firestoreID, err)
	}
	
	log.Printf("Successfully upserted user with Firestore ID: %s", firestoreID)
	return nil
}

// Helper functions to prevent panics on type assertions or marshalling
func safeGetString(data map[string]interface{}, key string) string {
	if val, ok := data[key].(string); ok {
		return val
	}
	return ""
}

func mustMarshal(v interface{}) []byte {
	b, err := json.Marshal(v)
	if err != nil {
		panic(err) // In a real app, this should be handled more gracefully.
	}
	return b
}

关键点: Dgraph 的 upsert 功能是实现幂等性的核心。我们为 firestore_id 字段设置了 @upsert 指令,并将其作为唯一标识。这意味着无论我们的消费者因为重试而处理同一条消息多少次,最终在 Dgraph 中都只会有一条对应的记录,并且其状态是最新的。

消费者 Worker 逻辑

consumer/worker.go

package consumer

import (
	"context"
	"log"
	"sync"
	"time"

	"github.com/go-redis/redis/v8"
	"your_project/config"
	"your_project/dgraph"
)

type Worker struct {
	cfg        config.Config
	redisClient *redis.Client
	dgraphClient *dgraph.DgraphClient
}

func NewWorker(cfg config.Config, rdb *redis.Client, dgraphClient *dgraph.DgraphClient) *Worker {
	return &Worker{
		cfg:        cfg,
		redisClient: rdb,
		dgraphClient: dgraphClient,
	}
}

func (w *Worker) Start(ctx context.Context, wg *sync.WaitGroup) {
	defer wg.Done()
	log.Printf("Starting worker for group %s with consumer name %s", w.cfg.RedisGroupName, w.cfg.RedisConsumerName)

	// 确保消费者组存在,如果不存在则从流的开始创建
	err := w.redisClient.XGroupCreateMkStream(ctx, w.cfg.RedisStreamName, w.cfg.RedisGroupName, "0").Err()
	if err != nil && err.Error() != "BUSYGROUP Consumer Group name already exists" {
		log.Fatalf("Failed to create consumer group: %v", err)
	}

	for {
		select {
		case <-ctx.Done():
			log.Println("Worker shutting down.")
			return
		default:
			// 从这个消费者的 pending list 中读取,如果没有,则从 stream 中读取新消息
			// Block for 2 seconds if no new message
			streams, err := w.redisClient.XReadGroup(ctx, &redis.XReadGroupArgs{
				Group:    w.cfg.RedisGroupName,
				Consumer: w.cfg.RedisConsumerName,
				Streams:  []string{w.cfg.RedisStreamName, ">"},
				Count:    1,
				Block:    2 * time.Second,
			}).Result()

			if err == redis.Nil {
				continue // No new messages
			} else if err != nil {
				log.Printf("Error reading from stream: %v. Retrying in 5s.", err)
				time.Sleep(5 * time.Second)
				continue
			}

			for _, stream := range streams {
				for _, message := range stream.Messages {
					w.processMessage(ctx, message)
				}
			}
		}
	}
}

func (w *Worker) processMessage(ctx context.Context, msg redis.XMessage) {
	log.Printf("Processing message ID: %s", msg.ID)
	
	firestoreID, okId := msg.Values["entityId"].(string)
	payload, okPayload := msg.Values["payload"].(string)

	if !okId || !okPayload {
		log.Printf("Malformed message, missing entityId or payload. Message ID: %s. Acknowledging and skipping.", msg.ID)
		w.ackMessage(ctx, msg.ID)
		return
	}
	
	var err error
	for i := 0; i < w.cfg.MaxRetries; i++ {
		err = w.dgraphClient.UpsertUser(ctx, firestoreID, payload)
		if err == nil {
			// Success
			w.ackMessage(ctx, msg.ID)
			return
		}
		log.Printf("Attempt %d/%d failed for message %s: %v", i+1, w.cfg.MaxRetries, msg.ID, err)
		time.Sleep(time.Duration(i+1) * time.Second) // Exponential backoff
	}

	// All retries failed, move to dead-letter queue
	log.Printf("All retries failed for message %s. Moving to dead-letter queue.", msg.ID)
	w.moveToDeadLetter(ctx, msg)
	w.ackMessage(ctx, msg.ID) // Acknowledge original message to unblock stream
}

func (w *Worker) ackMessage(ctx context.Context, messageID string) {
	if err := w.redisClient.XAck(ctx, w.cfg.RedisStreamName, w.cfg.RedisGroupName, messageID).Err(); err != nil {
		log.Printf("Failed to ACK message %s: %v", messageID, err)
	}
}

func (w *Worker) moveToDeadLetter(ctx context.Context, msg redis.XMessage) {
	deadLetterKey := w.cfg.RedisStreamName + ":dead_letter"
	
	// Add original message ID and error context to the dead letter message
	deadLetterValues := msg.Values
	deadLetterValues["original_message_id"] = msg.ID
	deadLetterValues["failed_at"] = time.Now().UTC().Format(time.RFC3339)
	
	flatValues := make([]interface{}, 0, len(deadLetterValues)*2)
	for k, v := range deadLetterValues {
		flatValues = append(flatValues, k, v)
	}

	if err := w.redisClient.XAdd(ctx, &redis.XAddArgs{
		Stream: deadLetterKey,
		Values: flatValues,
	}).Err(); err != nil {
		log.Printf("CRITICAL: Failed to move message %s to dead-letter queue: %v", msg.ID, err)
	}
}

主程序 main.go

package main

import (
	"context"
	"log"
	"os"
	"os/signal"
	"sync"
	"syscall"
	"your_project/config"
	"your_project/consumer"
	"your_project/dgraph"

	"github.com/go-redis/redis/v8"
)

func main() {
	cfg, err := config.LoadConfig()
	if err != nil {
		log.Fatalf("Could not load config: %v", err)
	}
	
	rdb := redis.NewClient(&redis.Options{
		Addr: cfg.RedisAddr,
	})
	if err := rdb.Ping(context.Background()).Err(); err != nil {
		log.Fatalf("Could not connect to Redis: %v", err)
	}

	dgraphClient, err := dgraph.NewDgraphClient(cfg.DgraphEndpoint)
	if err != nil {
		log.Fatalf("Could not connect to Dgraph: %v", err)
	}

	// It's a good practice to set up the schema on startup
	if err := dgraphClient.SetupSchema(context.Background()); err != nil {
		log.Printf("Warning: failed to setup Dgraph schema, might already exist: %v", err)
	}

	ctx, cancel := context.WithCancel(context.Background())
	var wg sync.WaitGroup

	// Start multiple worker goroutines
	for i := 0; i < cfg.WorkerConcurrency; i++ {
		wg.Add(1)
		worker := consumer.NewWorker(cfg, rdb, dgraphClient)
		go worker.Start(ctx, &wg)
	}

	// Graceful shutdown
	sigChan := make(chan os.Signal, 1)
	signal.Notify(sigChan, syscall.SIGINT, syscall.SIGTERM)
	<-sigChan

	log.Println("Shutdown signal received, canceling workers...")
	cancel()
	wg.Wait()
	log.Println("All workers have been shut down.")
}

架构的扩展性与局限性

这个基于 Redis Streams 的架构提供了很好的扩展性。如果 Dgraph 的写入成为瓶颈,我们可以简单地增加 Go 消费者的实例数量。XREADGROUP 机制会确保消息被均匀地分发到各个实例。如果未来需要将数据同步到 Elasticsearch,我们只需创建一个新的消费者组 es_workers,用它自己的逻辑来消费 firestore_changes Stream,而完全不影响现有的 Dgraph 同步流程。

然而,这个方案并非银弹。它最大的局限在于对删除操作的处理。Firestore 的 onWrite 触发器在文档被删除时,change.after.data() 是不存在的。我们的 Cloud Function 可以捕获到 DELETE 事件并将其推送到 Stream,但消费者需要实现从 Dgraph 中删除对应节点的逻辑。这增加了消费者逻辑的复杂性。

此外,虽然 Redis Streams 是持久化的,但 Redis 本身如果配置为单点,仍是整个系统的单点故障。在生产环境中,必须使用 Redis Sentinel 或 Redis Cluster 来保证其高可用性。最后,这个模式本质上是一个定制化的 CDC(Change Data Capture)实现,它依赖于 Firestore 触发器的行为和保证。对于需要严格事务性和快照级别一致性的场景,可能需要探索更复杂的解决方案,例如定期的数据校对任务来弥补潜在的事件丢失风险。


  目录