一个常见的NLP任务起点,往往是一个Python脚本。它用几行代码加载一个预训练模型,处理一段文本,然后打印结果。这对于验证性工作(POC)是完美的。但在真实项目中,需求会迅速膨胀:我们需要支持不同的处理流程(先分词再实体识别,或者先情感分析再关键词提取),需要能够动态调整每个处理步骤的参数,还需要将处理结果和处理链本身持久化。这时,那个最初的脚本就变成了一团难以维护的泥潭。
问题核心在于,业务逻辑(NLP处理步骤)、应用逻辑(如何编排这些步骤)和框架细节(API服务、数据库连接)被混在了一起。整洁架构(Clean Architecture)的核心目标就是解决这种混乱,通过强制性的依赖关系规则,将系统划分为独立的层次。
本文将构建一个NLP处理引擎,它的处理流程不是硬编码在代码中,而是由存储在关系型数据库中的配置动态驱动。我们将严格遵循整洁架构的原则,展示如何将NLP算法、业务规则、数据持久化和Web API清晰地分离开来。
架构蓝图与依赖规则
整洁架构的核心是依赖倒置原则。内层(业务逻辑)不应该知道任何关于外层(实现细节)的事情。所有依赖关系都指向内部。
graph TD subgraph Frameworks & Drivers [框架与驱动] A[FastAPI Web Server] B[SQLAlchemy ORM] C[Spacy/NLTK Library] D[PostgreSQL Database] end subgraph Interface Adapters [接口适配器] E[Controllers] F[Presenters] G[Repository Implementations] H[NLP Processor Adapters] end subgraph Application Business Rules [应用业务规则] I[Use Cases / Interactors] J[Repository Interfaces] K[NLP Processor Interfaces] end subgraph Enterprise Business Rules [企业业务规则] L[Entities: Pipeline, Stage, Document] end A --> E B --> G C --> H E --> I I --> J I --> K G --> J H --> K I --> L J --> L K --> L style L fill:#f9f,stroke:#333,stroke-width:2px style I fill:#ff9,stroke:#333,stroke-width:2px style J fill:#ff9,stroke:#333,stroke-width:2px style K fill:#ff9,stroke:#333,stroke-width:2px style E fill:#9cf,stroke:#333,stroke-width:2px style F fill:#9cf,stroke:#333,stroke-width:2px style G fill:#9cf,stroke:#333,stroke-width:2px style H fill:#9cf,stroke:#333,stroke-width:2px style A fill:#c9f,stroke:#333,stroke-width:2px style B fill:#c9f,stroke:#333,stroke-width:2px style C fill:#c9f,stroke:#333,stroke-width:2px style D fill:#c9f,stroke:#333,stroke-width:2px
- Entities (实体层): 定义了系统中最核心的业务对象。它们是纯粹的数据结构,不包含任何框架或数据库的依赖。
- Use Cases (用例层): 包含了应用特有的业务规则。它们编排实体来完成特定任务,但对外部世界一无所知。它们通过接口与外层通信。
- Interface Adapters (接口适配器层): 负责转换数据格式。例如,将HTTP请求转换为用例的输入,或将用例的输出转换为JSON响应。数据库的Repository实现也在这里。
- Frameworks & Drivers (框架与驱动层): 最外层,包含了数据库、Web框架、UI等所有具体实现。
我们将使用Python 3.10+,FastAPI作为Web框架,SQLAlchemy 2.0作为ORM。
实体层与数据库定义
这是我们架构的核心。实体层不应知道SQL的存在。它们是简单的Python数据类。
# src/domain/entities.py
import uuid
from dataclasses import dataclass, field
from typing import Dict, Any, List
@dataclass
class PipelineStageEntity:
"""定义NLP处理流程中的一个阶段"""
id: uuid.UUID = field(default_factory=uuid.uuid4)
name: str # e.g., "tokenize", "named_entity_recognition"
processor_name: str # 映射到具体的NLP处理器实现
params: Dict[str, Any] = field(default_factory=dict)
order: int = 0
@dataclass
class PipelineEntity:
"""定义一个完整的NLP处理流程"""
id: uuid.UUID = field(default_factory=uuid.uuid4)
name: str
description: str
stages: List[PipelineStageEntity] = field(default_factory=list)
@dataclass
class DocumentEntity:
"""待处理的文档"""
id: uuid.UUID = field(default_factory=uuid.uuid4)
text: str
metadata: Dict[str, Any] = field(default_factory=dict)
@dataclass
class AnalysisResultEntity:
"""处理结果"""
id: uuid.UUID = field(default_factory=uuid.uuid4)
document_id: uuid.UUID
pipeline_id: uuid.UUID
stage_results: Dict[str, Any] = field(default_factory=dict) # key是stage_name
这里的关键在于,这些类完全是业务概念的体现,没有任何db.Column
或@app.route
之类的装饰器。
接下来,我们在接口适配器层定义这些实体如何映射到数据库表。这是一个实现细节。
-- schema.sql
CREATE EXTENSION IF NOT EXISTS "uuid-ossp";
CREATE TABLE pipelines (
id UUID PRIMARY KEY DEFAULT uuid_generate_v4(),
name VARCHAR(255) NOT NULL UNIQUE,
description TEXT
);
CREATE TABLE pipeline_stages (
id UUID PRIMARY KEY DEFAULT uuid_generate_v4(),
pipeline_id UUID NOT NULL REFERENCES pipelines(id) ON DELETE CASCADE,
name VARCHAR(255) NOT NULL,
processor_name VARCHAR(255) NOT NULL,
params JSONB,
stage_order INTEGER NOT NULL,
UNIQUE (pipeline_id, name),
UNIQUE (pipeline_id, stage_order)
);
CREATE TABLE analysis_results (
id UUID PRIMARY KEY DEFAULT uuid_generate_v4(),
document_id UUID NOT NULL, -- In a real system, this would link to a documents table
pipeline_id UUID NOT NULL REFERENCES pipelines(id),
stage_results JSONB,
created_at TIMESTAMP WITH TIME ZONE DEFAULT CURRENT_TIMESTAMP
);
用例层:定义应用核心逻辑
用例层定义了系统能做什么。它通过接口(抽象基类)来声明其对外部世界的依赖,比如数据持久化和NLP处理。
# src/application/ports/pipeline_repository.py
from abc import ABC, abstractmethod
from typing import Optional
import uuid
from src.domain.entities import PipelineEntity
class PipelineRepository(ABC):
"""
持久化流水线的接口。
这是用例层定义的“端口”,具体实现由适配器层提供。
"""
@abstractmethod
def find_by_id(self, pipeline_id: uuid.UUID) -> Optional[PipelineEntity]:
raise NotImplementedError
@abstractmethod
def save(self, pipeline: PipelineEntity) -> None:
raise NotImplementedError
# src/application/ports/nlp_processor.py
from abc import ABC, abstractmethod
from typing import Any, Dict
from src.domain.entities import DocumentEntity
class NlpProcessor(ABC):
"""
NLP处理器的接口。
同样,这是用例层定义的端口。
"""
@abstractmethod
def process(self, document: DocumentEntity, params: Dict[str, Any]) -> Any:
raise NotImplementedError
有了这些接口,我们就可以编写核心的用例了。注意,这个用例完全不依赖任何具体的数据库或NLP库。
# src/application/use_cases/execute_pipeline.py
import uuid
import logging
from typing import Dict
from src.domain.entities import DocumentEntity, AnalysisResultEntity
from src.application.ports.pipeline_repository import PipelineRepository
from src.application.ports.nlp_processor import NlpProcessor
logger = logging.getLogger(__name__)
class ExecutePipelineUseCase:
"""
核心用例:执行一个完整的NLP处理流水线。
"""
def __init__(
self,
pipeline_repo: PipelineRepository,
processor_registry: Dict[str, NlpProcessor] # 处理器注册表,依赖注入
):
self._pipeline_repo = pipeline_repo
self._processor_registry = processor_registry
def execute(self, pipeline_id: uuid.UUID, document: DocumentEntity) -> AnalysisResultEntity:
# 1. 从仓库获取流水线定义
pipeline = self._pipeline_repo.find_by_id(pipeline_id)
if not pipeline:
logger.error(f"Pipeline with id {pipeline_id} not found.")
raise ValueError(f"Pipeline with id {pipeline_id} not found.")
# 2. 按顺序执行各个阶段
stage_results = {}
# 确保阶段按order排序
sorted_stages = sorted(pipeline.stages, key=lambda s: s.order)
for stage in sorted_stages:
processor_name = stage.processor_name
processor = self._processor_registry.get(processor_name)
if not processor:
logger.error(f"Processor '{processor_name}' for stage '{stage.name}' not found in registry.")
# 在真实项目中,这里应该有更健壮的错误处理,比如跳过或中断
raise ValueError(f"Processor '{processor_name}' not found.")
try:
logger.info(f"Executing stage '{stage.name}' with processor '{processor_name}'...")
result = processor.process(document, stage.params)
stage_results[stage.name] = result
except Exception as e:
logger.exception(
f"Error executing stage '{stage.name}' with processor '{processor_name}'. Aborting pipeline."
)
# 决定是中断还是继续
raise RuntimeError(f"Failed at stage {stage.name}: {e}") from e
# 3. 创建并返回最终结果实体
analysis_result = AnalysisResultEntity(
document_id=document.id,
pipeline_id=pipeline.id,
stage_results=stage_results,
)
# 在真实场景中,可能还有一个SaveResultUseCase来持久化结果
# 这里为了简化,仅返回实体
return analysis_result
这个用例非常纯粹:它只关心业务流程。它依赖的PipelineRepository
和NlpProcessor
都是抽象的,这使得测试变得极其容易。我们可以用内存中的字典来模拟pipeline_repo
,用简单的mock对象来模拟processor_registry
,从而在不触及数据库或真实NLP模型的情况下测试所有业务逻辑。
接口适配器层:连接外部世界
这一层是粘合剂。它实现用例层定义的接口。
1. SQLAlchemy Repository 实现
这是对PipelineRepository
接口的SQLAlchemy实现。
# src/infrastructure/persistence/sqlalchemy/repositories.py
import uuid
from typing import Optional
from sqlalchemy.orm import Session
from sqlalchemy import select
from src.domain.entities import PipelineEntity, PipelineStageEntity
from src.application.ports.pipeline_repository import PipelineRepository
from . import models as db_models # SQLAlchemy的 declarative models
class SqlAlchemyPipelineRepository(PipelineRepository):
def __init__(self, db_session: Session):
self._session = db_session
def find_by_id(self, pipeline_id: uuid.UUID) -> Optional[PipelineEntity]:
# 一个常见的错误是直接返回SQLAlchemy模型。
# 适配器的职责是将在基础设施层获取的数据(ORM对象)
# 转换为领域层的实体(Entity)。
stmt = select(db_models.Pipeline).where(db_models.Pipeline.id == pipeline_id)
pipeline_orm = self._session.scalars(stmt).one_or_none()
if not pipeline_orm:
return None
return self._to_entity(pipeline_orm)
def save(self, pipeline: PipelineEntity) -> None:
# 这个方法需要处理创建和更新两种情况,这里简化为创建
pipeline_orm = self._from_entity(pipeline)
self._session.add(pipeline_orm)
# 在实际应用中,这里需要处理事务提交或回滚
# 一般由更高层的单元工作模式(Unit of Work)管理
def _to_entity(self, orm_obj: db_models.Pipeline) -> PipelineEntity:
"""ORM object to Domain Entity"""
return PipelineEntity(
id=orm_obj.id,
name=orm_obj.name,
description=orm_obj.description,
stages=[
PipelineStageEntity(
id=s.id,
name=s.name,
processor_name=s.processor_name,
params=s.params,
order=s.stage_order
) for s in sorted(orm_obj.stages, key=lambda st: st.stage_order)
]
)
def _from_entity(self, entity: PipelineEntity) -> db_models.Pipeline:
"""Domain Entity to ORM object"""
return db_models.Pipeline(
id=entity.id,
name=entity.name,
description=entity.description,
stages=[
db_models.PipelineStage(
id=s.id,
pipeline_id=entity.id,
name=s.name,
processor_name=s.processor_name,
params=s.params,
stage_order=s.order
) for s in entity.stages
]
)
这里的关键点在于_to_entity
和_from_entity
方法。它们是防腐层,防止SQLAlchemy的ORM对象污染到用例层和实体层。这是一个常见的坑,很多项目声称使用整洁架构,但用例层里却充满了ORM对象,这破坏了依赖规则。
2. NLP 处理器适配器
这里我们为NlpProcessor
接口提供具体的实现,比如使用spaCy库。
# src/infrastructure/nlp/spacy_processors.py
import spacy
from typing import Any, Dict, List
from src.domain.entities import DocumentEntity
from src.application.ports.nlp_processor import NlpProcessor
# 在生产环境中,模型应该只加载一次
# 这里为了演示,每次都加载
# 一个更好的方法是在应用启动时加载并注入
_MODELS = {}
def get_spacy_model(model_name: str):
if model_name not in _MODELS:
try:
_MODELS[model_name] = spacy.load(model_name)
except OSError:
# 自动下载模型
spacy.cli.download(model_name)
_MODELS[model_name] = spacy.load(model_name)
return _MODELS[model_name]
class SpacyNerProcessor(NlpProcessor):
"""使用spaCy进行命名实体识别"""
def process(self, document: DocumentEntity, params: Dict[str, Any]) -> List[Dict[str, Any]]:
model_name = params.get("model", "en_core_web_sm")
nlp = get_spacy_model(model_name)
doc = nlp(document.text)
results = []
for ent in doc.ents:
results.append({
"text": ent.text,
"start_char": ent.start_char,
"end_char": ent.end_char,
"label": ent.label_
})
return results
class SpacyTokenizeProcessor(NlpProcessor):
"""使用spaCy进行分词"""
def process(self, document: DocumentEntity, params: Dict[str, Any]) -> List[str]:
model_name = params.get("model", "en_core_web_sm")
nlp = get_spacy_model(model_name)
doc = nlp(document.text)
return [token.text for token in doc]
框架层:组装与启动
最外层负责组装所有东西。在这里,我们使用FastAPI,并通过依赖注入将具体的实现(如SqlAlchemyPipelineRepository
)注入到用例中。
# src/main.py
import uuid
from fastapi import FastAPI, Depends, HTTPException
from sqlalchemy.orm import Session
from pydantic import BaseModel, Field
from typing import Dict, Any
from src.application.use_cases.execute_pipeline import ExecutePipelineUseCase
from src.infrastructure.persistence.sqlalchemy.database import SessionLocal
from src.infrastructure.persistence.sqlalchemy.repositories import SqlAlchemyPipelineRepository
from src.infrastructure.nlp.spacy_processors import SpacyNerProcessor, SpacyTokenizeProcessor
from src.domain.entities import DocumentEntity
# --- FastAPI App Setup ---
app = FastAPI(title="Clean NLP Engine")
# --- Dependency Injection Setup ---
# 1. 创建NLP处理器注册表
# 这是配置驱动的核心,key是数据库中存储的processor_name
processor_registry = {
"spacy_ner": SpacyNerProcessor(),
"spacy_tokenize": SpacyTokenizeProcessor(),
}
# 2. 数据库会话依赖
def get_db():
db = SessionLocal()
try:
yield db
finally:
db.close()
# 3. 用例依赖
def get_execute_pipeline_use_case(db: Session = Depends(get_db)) -> ExecutePipelineUseCase:
repo = SqlAlchemyPipelineRepository(db)
return ExecutePipelineUseCase(repo, processor_registry)
# --- API Endpoints ---
class DocumentInput(BaseModel):
text: str
metadata: Dict[str, Any] = Field(default_factory=dict)
@app.post("/pipelines/{pipeline_id}/execute")
def execute_pipeline(
pipeline_id: uuid.UUID,
document_input: DocumentInput,
use_case: ExecutePipelineUseCase = Depends(get_execute_pipeline_use_case)
):
"""执行一个NLP流水线"""
try:
document = DocumentEntity(text=document_input.text, metadata=document_input.metadata)
result_entity = use_case.execute(pipeline_id, document)
# Presenter的职责:将领域实体转换为API响应
return {
"document_id": result_entity.document_id,
"pipeline_id": result_entity.pipeline_id,
"results": result_entity.stage_results
}
except ValueError as e:
raise HTTPException(status_code=404, detail=str(e))
except RuntimeError as e:
raise HTTPException(status_code=500, detail=str(e))
except Exception as e:
# 通用错误处理
raise HTTPException(status_code=500, detail=f"An unexpected error occurred: {e}")
# 这里可以添加创建和管理流水线的端点
# ...
实践中的权衡与边界
这套架构不是银弹。它的主要优点是可测试性、可维护性和对业务逻辑的保护。当系统复杂度高、生命周期长时,这些优点尤为突出。
然而,它也引入了额外的复杂性。对于一个简单的应用,定义所有这些层、接口和数据转换对象无疑是过度设计。在真实项目中,我们需要权衡:
- 性能开销: 数据在层与层之间转换(ORM对象 -> 实体 -> DTO)会带来一定的性能损耗。在高吞吐量场景下,这可能是个问题。有时为了性能,可能会选择性地打破一些规则,比如在某些读取密集型查询中直接返回DTO而不是完整的领域实体。
- 开发效率: 对于小型团队或快速迭代的项目,严格分层可能会减慢初期开发速度。需要明确的是,这种架构投入的是前期成本,以换取后期的维护收益。
- 适用范围: 该架构最适用于业务逻辑复杂、需要长期演进的系统。如果你的应用只是一个薄薄的API层,包裹着一个数据库,那么CQRS等模式可能更合适。对于纯粹的数据管道或ETL任务,这种面向对象的领域建模也可能不是最佳选择。
这个基于数据库驱动的NLP引擎架构,其最大的价值在于将“流程编排”这个业务逻辑从代码中剥离出来,变成了可以被管理的数据。业务人员甚至可以通过一个UI界面来组合、配置和实验不同的NLP处理流程,而无需修改和重新部署任何代码。这正是软件工程与架构为特定领域(如NLP)带来巨大价值的体现。