基于关系型数据库构建可插拔式NLP处理引擎的整洁架构实践


一个常见的NLP任务起点,往往是一个Python脚本。它用几行代码加载一个预训练模型,处理一段文本,然后打印结果。这对于验证性工作(POC)是完美的。但在真实项目中,需求会迅速膨胀:我们需要支持不同的处理流程(先分词再实体识别,或者先情感分析再关键词提取),需要能够动态调整每个处理步骤的参数,还需要将处理结果和处理链本身持久化。这时,那个最初的脚本就变成了一团难以维护的泥潭。

问题核心在于,业务逻辑(NLP处理步骤)、应用逻辑(如何编排这些步骤)和框架细节(API服务、数据库连接)被混在了一起。整洁架构(Clean Architecture)的核心目标就是解决这种混乱,通过强制性的依赖关系规则,将系统划分为独立的层次。

本文将构建一个NLP处理引擎,它的处理流程不是硬编码在代码中,而是由存储在关系型数据库中的配置动态驱动。我们将严格遵循整洁架构的原则,展示如何将NLP算法、业务规则、数据持久化和Web API清晰地分离开来。

架构蓝图与依赖规则

整洁架构的核心是依赖倒置原则。内层(业务逻辑)不应该知道任何关于外层(实现细节)的事情。所有依赖关系都指向内部。

graph TD
    subgraph Frameworks & Drivers [框架与驱动]
        A[FastAPI Web Server]
        B[SQLAlchemy ORM]
        C[Spacy/NLTK Library]
        D[PostgreSQL Database]
    end

    subgraph Interface Adapters [接口适配器]
        E[Controllers]
        F[Presenters]
        G[Repository Implementations]
        H[NLP Processor Adapters]
    end

    subgraph Application Business Rules [应用业务规则]
        I[Use Cases / Interactors]
        J[Repository Interfaces]
        K[NLP Processor Interfaces]
    end

    subgraph Enterprise Business Rules [企业业务规则]
        L[Entities: Pipeline, Stage, Document]
    end

    A --> E
    B --> G
    C --> H

    E --> I
    I --> J
    I --> K
    G --> J
    H --> K
    
    I --> L
    J --> L
    K --> L

    style L fill:#f9f,stroke:#333,stroke-width:2px
    style I fill:#ff9,stroke:#333,stroke-width:2px
    style J fill:#ff9,stroke:#333,stroke-width:2px
    style K fill:#ff9,stroke:#333,stroke-width:2px
    style E fill:#9cf,stroke:#333,stroke-width:2px
    style F fill:#9cf,stroke:#333,stroke-width:2px
    style G fill:#9cf,stroke:#333,stroke-width:2px
    style H fill:#9cf,stroke:#333,stroke-width:2px
    style A fill:#c9f,stroke:#333,stroke-width:2px
    style B fill:#c9f,stroke:#333,stroke-width:2px
    style C fill:#c9f,stroke:#333,stroke-width:2px
    style D fill:#c9f,stroke:#333,stroke-width:2px
  • Entities (实体层): 定义了系统中最核心的业务对象。它们是纯粹的数据结构,不包含任何框架或数据库的依赖。
  • Use Cases (用例层): 包含了应用特有的业务规则。它们编排实体来完成特定任务,但对外部世界一无所知。它们通过接口与外层通信。
  • Interface Adapters (接口适配器层): 负责转换数据格式。例如,将HTTP请求转换为用例的输入,或将用例的输出转换为JSON响应。数据库的Repository实现也在这里。
  • Frameworks & Drivers (框架与驱动层): 最外层,包含了数据库、Web框架、UI等所有具体实现。

我们将使用Python 3.10+,FastAPI作为Web框架,SQLAlchemy 2.0作为ORM。

实体层与数据库定义

这是我们架构的核心。实体层不应知道SQL的存在。它们是简单的Python数据类。

# src/domain/entities.py

import uuid
from dataclasses import dataclass, field
from typing import Dict, Any, List

@dataclass
class PipelineStageEntity:
    """定义NLP处理流程中的一个阶段"""
    id: uuid.UUID = field(default_factory=uuid.uuid4)
    name: str  # e.g., "tokenize", "named_entity_recognition"
    processor_name: str  # 映射到具体的NLP处理器实现
    params: Dict[str, Any] = field(default_factory=dict)
    order: int = 0

@dataclass
class PipelineEntity:
    """定义一个完整的NLP处理流程"""
    id: uuid.UUID = field(default_factory=uuid.uuid4)
    name: str
    description: str
    stages: List[PipelineStageEntity] = field(default_factory=list)

@dataclass
class DocumentEntity:
    """待处理的文档"""
    id: uuid.UUID = field(default_factory=uuid.uuid4)
    text: str
    metadata: Dict[str, Any] = field(default_factory=dict)

@dataclass
class AnalysisResultEntity:
    """处理结果"""
    id: uuid.UUID = field(default_factory=uuid.uuid4)
    document_id: uuid.UUID
    pipeline_id: uuid.UUID
    stage_results: Dict[str, Any] = field(default_factory=dict) # key是stage_name

这里的关键在于,这些类完全是业务概念的体现,没有任何db.Column@app.route之类的装饰器。

接下来,我们在接口适配器层定义这些实体如何映射到数据库表。这是一个实现细节。

-- schema.sql

CREATE EXTENSION IF NOT EXISTS "uuid-ossp";

CREATE TABLE pipelines (
    id UUID PRIMARY KEY DEFAULT uuid_generate_v4(),
    name VARCHAR(255) NOT NULL UNIQUE,
    description TEXT
);

CREATE TABLE pipeline_stages (
    id UUID PRIMARY KEY DEFAULT uuid_generate_v4(),
    pipeline_id UUID NOT NULL REFERENCES pipelines(id) ON DELETE CASCADE,
    name VARCHAR(255) NOT NULL,
    processor_name VARCHAR(255) NOT NULL,
    params JSONB,
    stage_order INTEGER NOT NULL,
    UNIQUE (pipeline_id, name),
    UNIQUE (pipeline_id, stage_order)
);

CREATE TABLE analysis_results (
    id UUID PRIMARY KEY DEFAULT uuid_generate_v4(),
    document_id UUID NOT NULL, -- In a real system, this would link to a documents table
    pipeline_id UUID NOT NULL REFERENCES pipelines(id),
    stage_results JSONB,
    created_at TIMESTAMP WITH TIME ZONE DEFAULT CURRENT_TIMESTAMP
);

用例层:定义应用核心逻辑

用例层定义了系统能做什么。它通过接口(抽象基类)来声明其对外部世界的依赖,比如数据持久化和NLP处理。

# src/application/ports/pipeline_repository.py

from abc import ABC, abstractmethod
from typing import Optional
import uuid

from src.domain.entities import PipelineEntity

class PipelineRepository(ABC):
    """
    持久化流水线的接口。
    这是用例层定义的“端口”,具体实现由适配器层提供。
    """
    @abstractmethod
    def find_by_id(self, pipeline_id: uuid.UUID) -> Optional[PipelineEntity]:
        raise NotImplementedError

    @abstractmethod
    def save(self, pipeline: PipelineEntity) -> None:
        raise NotImplementedError

# src/application/ports/nlp_processor.py

from abc import ABC, abstractmethod
from typing import Any, Dict

from src.domain.entities import DocumentEntity

class NlpProcessor(ABC):
    """
    NLP处理器的接口。
    同样,这是用例层定义的端口。
    """
    @abstractmethod
    def process(self, document: DocumentEntity, params: Dict[str, Any]) -> Any:
        raise NotImplementedError

有了这些接口,我们就可以编写核心的用例了。注意,这个用例完全不依赖任何具体的数据库或NLP库。

# src/application/use_cases/execute_pipeline.py

import uuid
import logging
from typing import Dict

from src.domain.entities import DocumentEntity, AnalysisResultEntity
from src.application.ports.pipeline_repository import PipelineRepository
from src.application.ports.nlp_processor import NlpProcessor

logger = logging.getLogger(__name__)

class ExecutePipelineUseCase:
    """
    核心用例:执行一个完整的NLP处理流水线。
    """
    def __init__(
        self,
        pipeline_repo: PipelineRepository,
        processor_registry: Dict[str, NlpProcessor] # 处理器注册表,依赖注入
    ):
        self._pipeline_repo = pipeline_repo
        self._processor_registry = processor_registry

    def execute(self, pipeline_id: uuid.UUID, document: DocumentEntity) -> AnalysisResultEntity:
        # 1. 从仓库获取流水线定义
        pipeline = self._pipeline_repo.find_by_id(pipeline_id)
        if not pipeline:
            logger.error(f"Pipeline with id {pipeline_id} not found.")
            raise ValueError(f"Pipeline with id {pipeline_id} not found.")

        # 2. 按顺序执行各个阶段
        stage_results = {}
        # 确保阶段按order排序
        sorted_stages = sorted(pipeline.stages, key=lambda s: s.order)

        for stage in sorted_stages:
            processor_name = stage.processor_name
            processor = self._processor_registry.get(processor_name)

            if not processor:
                logger.error(f"Processor '{processor_name}' for stage '{stage.name}' not found in registry.")
                # 在真实项目中,这里应该有更健壮的错误处理,比如跳过或中断
                raise ValueError(f"Processor '{processor_name}' not found.")
            
            try:
                logger.info(f"Executing stage '{stage.name}' with processor '{processor_name}'...")
                result = processor.process(document, stage.params)
                stage_results[stage.name] = result
            except Exception as e:
                logger.exception(
                    f"Error executing stage '{stage.name}' with processor '{processor_name}'. Aborting pipeline."
                )
                # 决定是中断还是继续
                raise RuntimeError(f"Failed at stage {stage.name}: {e}") from e

        # 3. 创建并返回最终结果实体
        analysis_result = AnalysisResultEntity(
            document_id=document.id,
            pipeline_id=pipeline.id,
            stage_results=stage_results,
        )
        
        # 在真实场景中,可能还有一个SaveResultUseCase来持久化结果
        # 这里为了简化,仅返回实体
        return analysis_result

这个用例非常纯粹:它只关心业务流程。它依赖的PipelineRepositoryNlpProcessor都是抽象的,这使得测试变得极其容易。我们可以用内存中的字典来模拟pipeline_repo,用简单的mock对象来模拟processor_registry,从而在不触及数据库或真实NLP模型的情况下测试所有业务逻辑。

接口适配器层:连接外部世界

这一层是粘合剂。它实现用例层定义的接口。

1. SQLAlchemy Repository 实现

这是对PipelineRepository接口的SQLAlchemy实现。

# src/infrastructure/persistence/sqlalchemy/repositories.py

import uuid
from typing import Optional
from sqlalchemy.orm import Session
from sqlalchemy import select

from src.domain.entities import PipelineEntity, PipelineStageEntity
from src.application.ports.pipeline_repository import PipelineRepository
from . import models as db_models # SQLAlchemy的 declarative models

class SqlAlchemyPipelineRepository(PipelineRepository):

    def __init__(self, db_session: Session):
        self._session = db_session

    def find_by_id(self, pipeline_id: uuid.UUID) -> Optional[PipelineEntity]:
        # 一个常见的错误是直接返回SQLAlchemy模型。
        # 适配器的职责是将在基础设施层获取的数据(ORM对象)
        # 转换为领域层的实体(Entity)。
        stmt = select(db_models.Pipeline).where(db_models.Pipeline.id == pipeline_id)
        pipeline_orm = self._session.scalars(stmt).one_or_none()

        if not pipeline_orm:
            return None
        
        return self._to_entity(pipeline_orm)

    def save(self, pipeline: PipelineEntity) -> None:
        # 这个方法需要处理创建和更新两种情况,这里简化为创建
        pipeline_orm = self._from_entity(pipeline)
        self._session.add(pipeline_orm)
        # 在实际应用中,这里需要处理事务提交或回滚
        # 一般由更高层的单元工作模式(Unit of Work)管理

    def _to_entity(self, orm_obj: db_models.Pipeline) -> PipelineEntity:
        """ORM object to Domain Entity"""
        return PipelineEntity(
            id=orm_obj.id,
            name=orm_obj.name,
            description=orm_obj.description,
            stages=[
                PipelineStageEntity(
                    id=s.id,
                    name=s.name,
                    processor_name=s.processor_name,
                    params=s.params,
                    order=s.stage_order
                ) for s in sorted(orm_obj.stages, key=lambda st: st.stage_order)
            ]
        )

    def _from_entity(self, entity: PipelineEntity) -> db_models.Pipeline:
        """Domain Entity to ORM object"""
        return db_models.Pipeline(
            id=entity.id,
            name=entity.name,
            description=entity.description,
            stages=[
                db_models.PipelineStage(
                    id=s.id,
                    pipeline_id=entity.id,
                    name=s.name,
                    processor_name=s.processor_name,
                    params=s.params,
                    stage_order=s.order
                ) for s in entity.stages
            ]
        )

这里的关键点在于_to_entity_from_entity方法。它们是防腐层,防止SQLAlchemy的ORM对象污染到用例层和实体层。这是一个常见的坑,很多项目声称使用整洁架构,但用例层里却充满了ORM对象,这破坏了依赖规则。

2. NLP 处理器适配器

这里我们为NlpProcessor接口提供具体的实现,比如使用spaCy库。

# src/infrastructure/nlp/spacy_processors.py

import spacy
from typing import Any, Dict, List

from src.domain.entities import DocumentEntity
from src.application.ports.nlp_processor import NlpProcessor

# 在生产环境中,模型应该只加载一次
# 这里为了演示,每次都加载
# 一个更好的方法是在应用启动时加载并注入
_MODELS = {}

def get_spacy_model(model_name: str):
    if model_name not in _MODELS:
        try:
            _MODELS[model_name] = spacy.load(model_name)
        except OSError:
            # 自动下载模型
            spacy.cli.download(model_name)
            _MODELS[model_name] = spacy.load(model_name)
    return _MODELS[model_name]

class SpacyNerProcessor(NlpProcessor):
    """使用spaCy进行命名实体识别"""
    def process(self, document: DocumentEntity, params: Dict[str, Any]) -> List[Dict[str, Any]]:
        model_name = params.get("model", "en_core_web_sm")
        nlp = get_spacy_model(model_name)
        doc = nlp(document.text)
        
        results = []
        for ent in doc.ents:
            results.append({
                "text": ent.text,
                "start_char": ent.start_char,
                "end_char": ent.end_char,
                "label": ent.label_
            })
        return results

class SpacyTokenizeProcessor(NlpProcessor):
    """使用spaCy进行分词"""
    def process(self, document: DocumentEntity, params: Dict[str, Any]) -> List[str]:
        model_name = params.get("model", "en_core_web_sm")
        nlp = get_spacy_model(model_name)
        doc = nlp(document.text)
        return [token.text for token in doc]

框架层:组装与启动

最外层负责组装所有东西。在这里,我们使用FastAPI,并通过依赖注入将具体的实现(如SqlAlchemyPipelineRepository)注入到用例中。

# src/main.py

import uuid
from fastapi import FastAPI, Depends, HTTPException
from sqlalchemy.orm import Session
from pydantic import BaseModel, Field
from typing import Dict, Any

from src.application.use_cases.execute_pipeline import ExecutePipelineUseCase
from src.infrastructure.persistence.sqlalchemy.database import SessionLocal
from src.infrastructure.persistence.sqlalchemy.repositories import SqlAlchemyPipelineRepository
from src.infrastructure.nlp.spacy_processors import SpacyNerProcessor, SpacyTokenizeProcessor
from src.domain.entities import DocumentEntity

# --- FastAPI App Setup ---
app = FastAPI(title="Clean NLP Engine")

# --- Dependency Injection Setup ---
# 1. 创建NLP处理器注册表
# 这是配置驱动的核心,key是数据库中存储的processor_name
processor_registry = {
    "spacy_ner": SpacyNerProcessor(),
    "spacy_tokenize": SpacyTokenizeProcessor(),
}

# 2. 数据库会话依赖
def get_db():
    db = SessionLocal()
    try:
        yield db
    finally:
        db.close()

# 3. 用例依赖
def get_execute_pipeline_use_case(db: Session = Depends(get_db)) -> ExecutePipelineUseCase:
    repo = SqlAlchemyPipelineRepository(db)
    return ExecutePipelineUseCase(repo, processor_registry)

# --- API Endpoints ---
class DocumentInput(BaseModel):
    text: str
    metadata: Dict[str, Any] = Field(default_factory=dict)

@app.post("/pipelines/{pipeline_id}/execute")
def execute_pipeline(
    pipeline_id: uuid.UUID,
    document_input: DocumentInput,
    use_case: ExecutePipelineUseCase = Depends(get_execute_pipeline_use_case)
):
    """执行一个NLP流水线"""
    try:
        document = DocumentEntity(text=document_input.text, metadata=document_input.metadata)
        result_entity = use_case.execute(pipeline_id, document)
        
        # Presenter的职责:将领域实体转换为API响应
        return {
            "document_id": result_entity.document_id,
            "pipeline_id": result_entity.pipeline_id,
            "results": result_entity.stage_results
        }
    except ValueError as e:
        raise HTTPException(status_code=404, detail=str(e))
    except RuntimeError as e:
        raise HTTPException(status_code=500, detail=str(e))
    except Exception as e:
        # 通用错误处理
        raise HTTPException(status_code=500, detail=f"An unexpected error occurred: {e}")

# 这里可以添加创建和管理流水线的端点
# ...

实践中的权衡与边界

这套架构不是银弹。它的主要优点是可测试性、可维护性和对业务逻辑的保护。当系统复杂度高、生命周期长时,这些优点尤为突出。

然而,它也引入了额外的复杂性。对于一个简单的应用,定义所有这些层、接口和数据转换对象无疑是过度设计。在真实项目中,我们需要权衡:

  1. 性能开销: 数据在层与层之间转换(ORM对象 -> 实体 -> DTO)会带来一定的性能损耗。在高吞吐量场景下,这可能是个问题。有时为了性能,可能会选择性地打破一些规则,比如在某些读取密集型查询中直接返回DTO而不是完整的领域实体。
  2. 开发效率: 对于小型团队或快速迭代的项目,严格分层可能会减慢初期开发速度。需要明确的是,这种架构投入的是前期成本,以换取后期的维护收益。
  3. 适用范围: 该架构最适用于业务逻辑复杂、需要长期演进的系统。如果你的应用只是一个薄薄的API层,包裹着一个数据库,那么CQRS等模式可能更合适。对于纯粹的数据管道或ETL任务,这种面向对象的领域建模也可能不是最佳选择。

这个基于数据库驱动的NLP引擎架构,其最大的价值在于将“流程编排”这个业务逻辑从代码中剥离出来,变成了可以被管理的数据。业务人员甚至可以通过一个UI界面来组合、配置和实验不同的NLP处理流程,而无需修改和重新部署任何代码。这正是软件工程与架构为特定领域(如NLP)带来巨大价值的体现。


  目录