为基于Quarkus的Trino查询网关实现深入引擎的端到端分布式追踪


我们数据平台团队面临一个棘手的日常问题:业务方通过我们基于Quarkus构建的API网关报告某个查询非常缓慢。但在Datadog APM中,我们能看到的只是一个持续时间很长的JDBC数据库调用,链路在这里戛然而止。这个追踪信息毫无用处,因为它无法告诉我们问题到底出在哪:是Trino集群的资源紧张?是查询计划生成耗时过长?还是某个Worker节点执行特定Split时遇到了瓶颈?整个Trino集群对于我们的可观测性系统来说,就是一个巨大的黑盒。

最初的解决方案非常原始:在Quarkus网关记录查询ID,然后让On-Call工程师手动去Trino的Web UI或服务器日志里根据ID搜索,试图拼凑出整个执行过程。这种方式效率低下,响应迟缓,而且完全依赖工程师的个人经验。在真实生产环境中,这套流程根本无法规模化。我们需要的是一个自动化的、端到端的追踪视图,能将用户的API请求与Trino引擎内部的执行细节无缝关联起来。

标准的OpenTelemetry JDBC Instrumentation只能追踪到客户端发起请求的那一刻,无法穿透到数据库服务端。问题的核心在于,trace context必须被传递到Trino服务端,并被Trino内部的组件所理解和处理。我们的最终方案是,开发一个自定义的Trino插件,从查询中提取追踪上下文,并在查询生命周期的关键节点创建和上报新的、有关联的Span。

整个架构的数据流如下:

sequenceDiagram
    participant User as 客户端
    participant Gateway as Quarkus查询网关
    participant Trino as Trino Coordinator
    participant Listener as 自定义EventListener
    participant DD Agent as Datadog Agent

    User->>+Gateway: POST /api/query (SQL)
    Gateway->>Gateway: OpenTelemetry启动Trace (Span A)
    Gateway->>Gateway: 注入traceparent到SQL注释
    Gateway->>+Trino: 执行JDBC查询 (携带traceparent)
    Trino->>+Listener: 触发queryCreated事件
    Listener->>Listener: 解析SQL注释中的traceparent
    Listener->>DD Agent: 创建并发送子Span (Span B, parent=A)
    Note right of Listener: Span B代表Trino查询的完整生命周期
    Trino-->>-Gateway: 返回查询结果
    Gateway-->>-User: 返回HTTP响应
    Gateway->>DD Agent: 完成并发送主Span (Span A)
    Trino->>+Listener: 触发queryCompleted事件
    Listener->>Listener: 为Span B添加结果标签(status, rows, cpu_time)
    Listener->>-DD Agent: 完成并发送子Span (Span B)

第一步:改造Quarkus网关以注入追踪上下文

Quarkus对MicroProfile Telemetry和OpenTelemetry提供了出色的原生支持,大部分工作都是自动完成的。我们的任务是确保在通过JDBC驱动向Trino发送查询之前,将当前的追踪上下文(W3C traceparent)附加到SQL语句上。一个常见的错误是尝试通过Trino的X-Trino-Session头来传递这些元数据,但这会污染业务Session属性,且不易于在服务端统一处理。一个更干净、侵入性更小的方式是将其作为SQL注释注入。

首先,确保Quarkus项目的依赖完整。

pom.xml 关键依赖:

<dependencies>
    <!-- Quarkus 核心 -->
    <dependency>
        <groupId>io.quarkus</groupId>
        <artifactId>quarkus-resteasy-reactive-jackson</artifactId>
    </dependency>
    <dependency>
        <groupId>io.quarkus</groupId>
        <artifactId>quarkus-arc</artifactId>
    </dependency>

    <!-- Trino JDBC 驱动 -->
    <dependency>
        <groupId>io.trino</groupId>
        <artifactId>trino-jdbc</artifactId>
        <version>435</version> <!-- 请使用与你集群匹配的版本 -->
    </dependency>
    
    <!-- Quarkus OpenTelemetry & Datadog Exporter -->
    <dependency>
        <groupId>io.quarkus</groupId>
        <artifactId>quarkus-opentelemetry</artifactId>
    </dependency>
    <dependency>
        <groupId>io.opentelemetry</groupId>
        <artifactId>opentelemetry-exporter-otlp</artifactId>
    </dependency>

    <!-- Datadog Trace ID 兼容性 -->
    <dependency>
        <groupId>com.datadoghq</groupId>
        <artifactId>dd-trace-api</artifactId>
        <version>1.20.0</version>
    </dependency>
</dependencies>

接下来,配置application.properties以启用追踪并指向Datadog Agent。

application.properties:

# 服务信息
quarkus.application.name=trino-query-gateway
quarkus.application.version=1.0.0

# OpenTelemetry 配置
quarkus.otel.sdk.disabled=false
quarkus.otel.exporter.otlp.traces.endpoint=http://datadog-agent:4318/v1/traces
quarkus.otel.propagators=tracecontext,baggage,datadog

# 确保 Datadog 128-bit trace ID 格式被正确处理
quarkus.otel.traces.sampler=always_on

这里的quarkus.otel.propagators配置中加入datadog是为了与Datadog生态系统更好地兼容,但核心是tracecontext,它实现了W3C标准的上下文传播。

然后,我们创建查询服务。关键在于TrinoQueryService,它负责建立JDBC连接并执行查询。在这里,我们拦截查询,注入上下文。

QueryResource.java (JAX-RS Endpoint):

import jakarta.inject.Inject;
import jakarta.ws.rs.POST;
import jakarta.ws.rs.Path;
import jakarta.ws.rs.Produces;
import jakarta.ws.rs.core.MediaType;

@Path("/api/query")
public class QueryResource {

    @Inject
    TrinoQueryService queryService;

    @POST
    @Produces(MediaType.APPLICATION_JSON)
    public String executeQuery(String sql) {
        // 在真实项目中,这里会有认证、授权、输入校验等逻辑
        // OTel会自动为这个REST请求创建一个新的Span
        return queryService.executeQuery(sql);
    }
}

TrinoQueryService.java (核心逻辑):

import io.opentelemetry.api.trace.Span;
import io.opentelemetry.api.trace.propagation.W3CTraceContextPropagator;
import io.opentelemetry.context.Context;
import io.opentelemetry.context.TextMapSetter;
import jakarta.enterprise.context.ApplicationScoped;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.ResultSet;
import java.sql.Statement;
import java.util.HashMap;
import java.util.Map;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@ApplicationScoped
public class TrinoQueryService {

    private static final Logger LOGGER = LoggerFactory.getLogger(TrinoQueryService.class);
    private static final String TRINO_JDBC_URL = "jdbc:trino://trino-coordinator:8080/system/public";
    private static final String TRINO_USER = "query-gateway-user";

    public String executeQuery(String originalSql) {
        String sqlWithContext = injectTraceContext(originalSql);

        try (Connection connection = DriverManager.getConnection(TRINO_JDBC_URL, TRINO_USER, null);
             Statement statement = connection.createStatement();
             ResultSet rs = statement.executeQuery(sqlWithContext)) {
            
            // 简化处理,仅返回查询的第一行第一列
            if (rs.next()) {
                return rs.getString(1);
            }
            return "No results";
        } catch (Exception e) {
            LOGGER.error("Failed to execute Trino query", e);
            // 异常会自动被OTel捕捉并标记到当前Span上
            throw new RuntimeException("Query execution failed", e);
        }
    }

    private String injectTraceContext(String sql) {
        // TextMapSetter 用于将Context写入一个 carrier (这里是HashMap)
        TextMapSetter<Map<String, String>> setter = (carrier, key, value) -> carrier.put(key, value);
        Map<String, String> carrier = new HashMap<>();

        // 从当前上下文中提取traceparent
        W3CTraceContextPropagator.getInstance().inject(Context.current(), carrier, setter);
        
        String traceparent = carrier.get("traceparent");
        if (traceparent == null || traceparent.isEmpty()) {
            LOGGER.warn("traceparent not found in current context. Trino trace will be detached.");
            return sql;
        }

        // 这里的注释格式是关键,服务端将依赖这个格式进行解析
        String traceComment = String.format("/* traceparent='%s' */", traceparent);
        LOGGER.info("Injecting trace context: {}", traceComment);
        
        return traceComment + "\n" + sql;
    }
}

injectTraceContext方法是这里的核心。它使用OpenTelemetry的W3CTraceContextPropagator来获取当前激活的traceparent,并将其格式化为一个SQL注释,附加到原始查询的开头。这样,任何发送到Trino的查询都会携带它的“血统”信息。

第二步:构建Trino EventListener插件

这是实现端到端追踪最关键的部分。我们需要创建一个Trino插件,它能监听查询的生命周期事件(创建、完成),并从查询语句中解析出我们注入的traceparent

项目结构:
创建一个新的Maven项目用于Trino插件。

trino-datadog-listener/
├── pom.xml
└── src/
    └── main/
        ├── java/
        │   └── com/
        │       └── mycompany/
        │           └── trino/
        │               ├── DatadogTracer.java
        │               ├── DatadogTracingEventListener.java
        │               └── DatadogTracingEventListenerFactory.java
        └── resources/
            └── META-INF/
                └── services/
                    └── io.trino.spi.eventlistener.EventListenerFactory

pom.xml 关键依赖:

<project>
    <modelVersion>4.0.0</modelVersion>
    <groupId>com.mycompany.trino</groupId>
    <artifactId>trino-datadog-listener</artifactId>
    <version>1.0-SNAPSHOT</version>

    <properties>
        <trino.version>435</trino.version>
        <dd-trace-api.version>1.20.0</dd-trace-api.version>
    </properties>

    <dependencies>
        <dependency>
            <groupId>io.trino</groupId>
            <artifactId>trino-spi</artifactId>
            <version>${trino.version}</version>
            <scope>provided</scope>
        </dependency>
        
        <!-- Datadog APM Client -->
        <dependency>
            <groupId>com.datadoghq</groupId>
            <artifactId>dd-trace-api</artifactId>
            <version>${dd-trace-api.version}</version>
        </dependency>
        <dependency>
            <groupId>com.datadoghq</groupId>
            <artifactId>dd-java-agent</artifactId>
            <version>${dd-trace-api.version}</version>
            <classifier>all</classifier>
        </dependency>

        <!-- Google Guava, Trino SPI依赖 -->
        <dependency>
            <groupId>com.google.guava</groupId>
            <artifactId>guava</artifactId>
            <version>32.1.3-jre</version>
            <scope>provided</scope>
        </dependency>
    </dependencies>
    
    <!-- 使用 Shade Plugin 打包所有依赖到一个 fat jar -->
    <build>
        <plugins>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-shade-plugin</artifactId>
                <version>3.5.1</version>
                <executions>
                    <execution>
                        <phase>package</phase>
                        <goals>
                            <goal>shade</goal>
                        </goals>
                        <configuration>
                            <relocations>
                                <!--
                                为了避免与Trino自带的库或其它插件冲突,
                                将插件依赖的库重定位到一个新的包路径下。
                                这是一个生产级插件必须考虑的细节。
                                -->
                                <relocation>
                                    <pattern>datadog.trace</pattern>
                                    <shadedPattern>com.mycompany.trino.shaded.datadog.trace</shadedPattern>
                                </relocation>
                            </relocations>
                        </configuration>
                    </execution>
                </executions>
            </plugin>
        </plugins>
    </build>
</project>

这里的maven-shade-plugin至关重要。Trino的插件是动态加载的,如果我们的插件和Trino内核或其他插件依赖了同一个库的不同版本,就会引发灾难性的ClassLoader问题。通过重定位(relocation),我们将所有依赖的包名都修改了,从而避免了冲突。

DatadogTracingEventListenerFactory.java:

import io.trino.spi.eventlistener.EventListener;
import io.trino.spi.eventlistener.EventListenerFactory;
import java.util.Map;

public class DatadogTracingEventListenerFactory implements EventListenerFactory {
    @Override
    public String getName() {
        return "datadog-tracing";
    }

    @Override
    public EventListener create(Map<String, String> config) {
        return new DatadogTracingEventListener(config);
    }
}

DatadogTracingEventListener.java:

import com.google.common.collect.ImmutableMap;
import datadog.trace.api.DDTags;
import datadog.trace.api.interceptor.MutableSpan;
import datadog.trace.bootstrap.instrumentation.api.AgentSpan;
import datadog.trace.bootstrap.instrumentation.api.AgentTracer;
import io.trino.spi.eventlistener.EventListener;
import io.trino.spi.eventlistener.QueryCompletedEvent;
import io.trino.spi.eventlistener.QueryCreatedEvent;
import io.trino.spi.eventlistener.SplitCompletedEvent;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.ConcurrentHashMap;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class DatadogTracingEventListener implements EventListener {
    private static final Logger log = LoggerFactory.getLogger(DatadogTracingEventListener.class);
    private static final Pattern TRACEPARENT_PATTERN = Pattern.compile("/\\*\\s*traceparent='([^']+)'\\s*\\*/");

    private final Map<String, AgentSpan> activeSpans = new ConcurrentHashMap<>();
    private final String serviceName;
    
    public DatadogTracingEventListener(Map<String, String> config) {
        this.serviceName = config.getOrDefault("datadog.service.name", "trino-cluster");
        log.info("Datadog Tracing Event Listener initialized for service: {}", serviceName);
    }

    @Override
    public void queryCreated(QueryCreatedEvent queryCreatedEvent) {
        String query = queryCreatedEvent.getMetadata().getQuery();
        Matcher matcher = TRACEPARENT_PATTERN.matcher(query);

        AgentSpan span;
        if (matcher.find()) {
            String traceparent = matcher.group(1);
            try {
                // 从traceparent中提取上下文
                AgentSpan.Context extractedContext = AgentTracer.get().extract(
                        ImmutableMap.of("traceparent", traceparent),
                        (carrier, key) -> carrier.get(key)
                );
                // 创建一个子Span
                span = AgentTracer.get().startSpan("trino.query", extractedContext);
                log.debug("Started child span for query {} with parent context.", queryCreatedEvent.getMetadata().getQueryId());
            } catch (Exception e) {
                // 如果解析失败,则创建一个新的根Span,避免整个流程崩溃
                log.warn("Failed to extract traceparent. Starting a new root span. Error: {}", e.getMessage());
                span = startNewRootSpan();
            }
        } else {
            // 如果没有找到traceparent,也创建一个根Span,确保所有查询都有追踪
            span = startNewRootSpan();
            log.debug("No traceparent found. Started a new root span for query {}.", queryCreatedEvent.getMetadata().getQueryId());
        }

        // 设置通用标签
        span.setServiceName(this.serviceName);
        span.setTag("trino.query.id", queryCreatedEvent.getMetadata().getQueryId());
        span.setTag("trino.user", queryCreatedEvent.getContext().getUser());
        span.setTag("trino.source", queryCreatedEvent.getContext().getSource().orElse("unknown"));
        span.setTag(DDTags.COMPONENT, "trino-engine");
        span.setTag(DDTags.DB_TYPE, "trino");
        span.setTag(DDTags.RESOURCE_NAME, query.substring(0, Math.min(query.length(), 200)));

        activeSpans.put(queryCreatedEvent.getMetadata().getQueryId(), span);
    }

    private AgentSpan startNewRootSpan() {
        return AgentTracer.get().startSpan("trino.query");
    }

    @Override
    public void queryCompleted(QueryCompletedEvent queryCompletedEvent) {
        String queryId = queryCompletedEvent.getMetadata().getQueryId();
        AgentSpan span = activeSpans.remove(queryId);

        if (span == null) {
            log.warn("Could not find active span for completed query: {}", queryId);
            return;
        }

        try {
            // 添加查询完成后的详细信息
            span.setTag("trino.query.state", queryCompletedEvent.getMetadata().getQueryState());
            span.setTag("trino.cpu_time_ms", queryCompletedEvent.getStatistics().getCpuTime().toMillis());
            span.setTag("trino.wall_time_ms", queryCompletedEvent.getStatistics().getWallTime().toMillis());
            span.setTag("trino.queued_time_ms", queryCompletedEvent.getStatistics().getQueuedTime().toMillis());
            span.setTag("trino.processed_rows", queryCompletedEvent.getStatistics().getOutputRows());
            span.setTag("trino.processed_bytes", queryCompletedEvent.getStatistics().getOutputBytes());
            
            queryCompletedEvent.getFailureInfo().ifPresent(failure -> {
                span.setError(true);
                span.setTag("error.message", failure.getFailureMessage().orElse("Unknown error"));
                span.setTag("error.type", failure.getFailureType().orElse("N/A"));
                span.setTag("error.stack", failure.getFailureHost() + "\n" + failure.getFailureTask().orElse(""));
            });

        } finally {
            // 确保Span总是被关闭
            span.finish();
            log.debug("Finished span for query {}", queryId);
        }
    }
}

src/main/resources/META-INF/services/io.trino.spi.eventlistener.EventListenerFactory:

com.mycompany.trino.DatadogTracingEventListenerFactory

这段代码的核心逻辑:

  1. queryCreated:
    • 当Trino Coordinator接收到一个新查询时,此方法被调用。
    • 使用正则表达式从SQL注释中提取traceparent
    • 使用Datadog的AgentTracer.get().extract()来反序列化这个traceparent,得到一个父Span的上下文。
    • 基于这个上下文,创建一个名为trino.query的子Span。
    • 如果提取失败或traceparent不存在,为了不丢失监控,会创建一个新的根Span。这是一个重要的容错设计。
    • 将创建的Span与queryId关联,存入一个ConcurrentHashMap中。
  2. queryCompleted:
    • 当查询执行完成(无论成功或失败),此方法被调用。
    • 从Map中取出对应的Span。
    • 将查询的最终状态、CPU时间、处理行数等关键统计信息作为标签(tags)添加到Span中。
    • 如果查询失败,将错误信息也记录到Span上。
    • 最后,调用span.finish(),完成Span并将其发送给Datadog Agent。

第三步:部署与配置

  1. 打包插件:
    trino-datadog-listener项目目录下运行 mvn clean package。这会生成一个 fat jar,例如 trino-datadog-listener-1.0-SNAPSHOT.jar

  2. 部署插件:

    • 在Trino Coordinator节点上,创建一个新的插件目录: mkdir -p /path/to/trino/plugins/datadog-tracing
    • 将打包好的fat jar复制到这个目录中。
  3. 配置Trino:

    • 在Trino的配置目录(通常是etc)下,创建或修改event-listener.properties文件。
    • 添加以下内容:
      event-listener.name=datadog-tracing
      # 以下为自定义配置,会被传入EventListenerFactory的create方法
      datadog.service.name=trino-production-cluster
    • 为了让Trino插件中的AgentTracer能找到Datadog Agent,你需要通过JVM参数来配置Agent的位置。修改Trino的etc/jvm.config文件,添加:
      -Ddd.agent.host=localhost
      -Ddd.agent.port=8126
      这里的host和port应该指向你的Datadog Agent。
  4. 重启Trino Coordinator:
    重启Trino Coordinator以加载新的插件和配置。检查Coordinator的日志,应该能看到我们之前添加的"Datadog Tracing Event Listener initialized..."日志消息。

最终成果与局限性

部署完成后,Datadog中的追踪视图发生了质变。之前断裂的链路现在被完美地连接起来了。点击一个来自Quarkus网关的慢查询trace,可以看到其下有一个trino.query子Span,这个Span的持续时间精确地反映了查询在Trino中的总耗时。点开这个子Span,所有的自定义标签,如trino.cpu_time_mstrino.processed_rows以及失败时的错误信息都一目了然。这使得定位性能瓶颈从数小时的猜测和手动排查,缩短到了几分钟的点击和分析。

然而,当前的实现方案也存在其局限性。我们追踪的粒度停留在Trino Coordinator层面,记录的是整个查询的生命周期。它无法提供更深层次的可见性,比如查询在各个Worker节点上的任务(Task)和分片(Split)的执行情况。要实现那种级别的追踪,需要对Trino的调度器和执行引擎进行更深度的代码侵入,例如通过自定义的ExchangeClient或者在Task执行逻辑中继续传递和处理追踪上下文。此外,当前的设计是同步地在EventListener中上报追踪数据。对于一个每秒处理数千个查询的高负载集群,这可能会对Coordinator的事件处理线程造成压力。一个更健壮的生产级方案可能会引入一个内存中的无锁队列,将Span数据异步地发送给一个独立的后台线程进行上报,从而将可观测性逻辑与核心的查询处理路径解耦。


  目录