我们数据平台团队面临一个棘手的日常问题:业务方通过我们基于Quarkus构建的API网关报告某个查询非常缓慢。但在Datadog APM中,我们能看到的只是一个持续时间很长的JDBC数据库调用,链路在这里戛然而止。这个追踪信息毫无用处,因为它无法告诉我们问题到底出在哪:是Trino集群的资源紧张?是查询计划生成耗时过长?还是某个Worker节点执行特定Split时遇到了瓶颈?整个Trino集群对于我们的可观测性系统来说,就是一个巨大的黑盒。
最初的解决方案非常原始:在Quarkus网关记录查询ID,然后让On-Call工程师手动去Trino的Web UI或服务器日志里根据ID搜索,试图拼凑出整个执行过程。这种方式效率低下,响应迟缓,而且完全依赖工程师的个人经验。在真实生产环境中,这套流程根本无法规模化。我们需要的是一个自动化的、端到端的追踪视图,能将用户的API请求与Trino引擎内部的执行细节无缝关联起来。
标准的OpenTelemetry JDBC Instrumentation只能追踪到客户端发起请求的那一刻,无法穿透到数据库服务端。问题的核心在于,trace context必须被传递到Trino服务端,并被Trino内部的组件所理解和处理。我们的最终方案是,开发一个自定义的Trino插件,从查询中提取追踪上下文,并在查询生命周期的关键节点创建和上报新的、有关联的Span。
整个架构的数据流如下:
sequenceDiagram participant User as 客户端 participant Gateway as Quarkus查询网关 participant Trino as Trino Coordinator participant Listener as 自定义EventListener participant DD Agent as Datadog Agent User->>+Gateway: POST /api/query (SQL) Gateway->>Gateway: OpenTelemetry启动Trace (Span A) Gateway->>Gateway: 注入traceparent到SQL注释 Gateway->>+Trino: 执行JDBC查询 (携带traceparent) Trino->>+Listener: 触发queryCreated事件 Listener->>Listener: 解析SQL注释中的traceparent Listener->>DD Agent: 创建并发送子Span (Span B, parent=A) Note right of Listener: Span B代表Trino查询的完整生命周期 Trino-->>-Gateway: 返回查询结果 Gateway-->>-User: 返回HTTP响应 Gateway->>DD Agent: 完成并发送主Span (Span A) Trino->>+Listener: 触发queryCompleted事件 Listener->>Listener: 为Span B添加结果标签(status, rows, cpu_time) Listener->>-DD Agent: 完成并发送子Span (Span B)
第一步:改造Quarkus网关以注入追踪上下文
Quarkus对MicroProfile Telemetry和OpenTelemetry提供了出色的原生支持,大部分工作都是自动完成的。我们的任务是确保在通过JDBC驱动向Trino发送查询之前,将当前的追踪上下文(W3C traceparent
)附加到SQL语句上。一个常见的错误是尝试通过Trino的X-Trino-Session
头来传递这些元数据,但这会污染业务Session属性,且不易于在服务端统一处理。一个更干净、侵入性更小的方式是将其作为SQL注释注入。
首先,确保Quarkus项目的依赖完整。
pom.xml
关键依赖:
<dependencies>
<!-- Quarkus 核心 -->
<dependency>
<groupId>io.quarkus</groupId>
<artifactId>quarkus-resteasy-reactive-jackson</artifactId>
</dependency>
<dependency>
<groupId>io.quarkus</groupId>
<artifactId>quarkus-arc</artifactId>
</dependency>
<!-- Trino JDBC 驱动 -->
<dependency>
<groupId>io.trino</groupId>
<artifactId>trino-jdbc</artifactId>
<version>435</version> <!-- 请使用与你集群匹配的版本 -->
</dependency>
<!-- Quarkus OpenTelemetry & Datadog Exporter -->
<dependency>
<groupId>io.quarkus</groupId>
<artifactId>quarkus-opentelemetry</artifactId>
</dependency>
<dependency>
<groupId>io.opentelemetry</groupId>
<artifactId>opentelemetry-exporter-otlp</artifactId>
</dependency>
<!-- Datadog Trace ID 兼容性 -->
<dependency>
<groupId>com.datadoghq</groupId>
<artifactId>dd-trace-api</artifactId>
<version>1.20.0</version>
</dependency>
</dependencies>
接下来,配置application.properties
以启用追踪并指向Datadog Agent。
application.properties
:
# 服务信息
quarkus.application.name=trino-query-gateway
quarkus.application.version=1.0.0
# OpenTelemetry 配置
quarkus.otel.sdk.disabled=false
quarkus.otel.exporter.otlp.traces.endpoint=http://datadog-agent:4318/v1/traces
quarkus.otel.propagators=tracecontext,baggage,datadog
# 确保 Datadog 128-bit trace ID 格式被正确处理
quarkus.otel.traces.sampler=always_on
这里的quarkus.otel.propagators
配置中加入datadog
是为了与Datadog生态系统更好地兼容,但核心是tracecontext
,它实现了W3C标准的上下文传播。
然后,我们创建查询服务。关键在于TrinoQueryService
,它负责建立JDBC连接并执行查询。在这里,我们拦截查询,注入上下文。
QueryResource.java
(JAX-RS Endpoint):
import jakarta.inject.Inject;
import jakarta.ws.rs.POST;
import jakarta.ws.rs.Path;
import jakarta.ws.rs.Produces;
import jakarta.ws.rs.core.MediaType;
@Path("/api/query")
public class QueryResource {
@Inject
TrinoQueryService queryService;
@POST
@Produces(MediaType.APPLICATION_JSON)
public String executeQuery(String sql) {
// 在真实项目中,这里会有认证、授权、输入校验等逻辑
// OTel会自动为这个REST请求创建一个新的Span
return queryService.executeQuery(sql);
}
}
TrinoQueryService.java
(核心逻辑):
import io.opentelemetry.api.trace.Span;
import io.opentelemetry.api.trace.propagation.W3CTraceContextPropagator;
import io.opentelemetry.context.Context;
import io.opentelemetry.context.TextMapSetter;
import jakarta.enterprise.context.ApplicationScoped;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.ResultSet;
import java.sql.Statement;
import java.util.HashMap;
import java.util.Map;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ApplicationScoped
public class TrinoQueryService {
private static final Logger LOGGER = LoggerFactory.getLogger(TrinoQueryService.class);
private static final String TRINO_JDBC_URL = "jdbc:trino://trino-coordinator:8080/system/public";
private static final String TRINO_USER = "query-gateway-user";
public String executeQuery(String originalSql) {
String sqlWithContext = injectTraceContext(originalSql);
try (Connection connection = DriverManager.getConnection(TRINO_JDBC_URL, TRINO_USER, null);
Statement statement = connection.createStatement();
ResultSet rs = statement.executeQuery(sqlWithContext)) {
// 简化处理,仅返回查询的第一行第一列
if (rs.next()) {
return rs.getString(1);
}
return "No results";
} catch (Exception e) {
LOGGER.error("Failed to execute Trino query", e);
// 异常会自动被OTel捕捉并标记到当前Span上
throw new RuntimeException("Query execution failed", e);
}
}
private String injectTraceContext(String sql) {
// TextMapSetter 用于将Context写入一个 carrier (这里是HashMap)
TextMapSetter<Map<String, String>> setter = (carrier, key, value) -> carrier.put(key, value);
Map<String, String> carrier = new HashMap<>();
// 从当前上下文中提取traceparent
W3CTraceContextPropagator.getInstance().inject(Context.current(), carrier, setter);
String traceparent = carrier.get("traceparent");
if (traceparent == null || traceparent.isEmpty()) {
LOGGER.warn("traceparent not found in current context. Trino trace will be detached.");
return sql;
}
// 这里的注释格式是关键,服务端将依赖这个格式进行解析
String traceComment = String.format("/* traceparent='%s' */", traceparent);
LOGGER.info("Injecting trace context: {}", traceComment);
return traceComment + "\n" + sql;
}
}
injectTraceContext
方法是这里的核心。它使用OpenTelemetry的W3CTraceContextPropagator
来获取当前激活的traceparent
,并将其格式化为一个SQL注释,附加到原始查询的开头。这样,任何发送到Trino的查询都会携带它的“血统”信息。
第二步:构建Trino EventListener插件
这是实现端到端追踪最关键的部分。我们需要创建一个Trino插件,它能监听查询的生命周期事件(创建、完成),并从查询语句中解析出我们注入的traceparent
。
项目结构:
创建一个新的Maven项目用于Trino插件。
trino-datadog-listener/
├── pom.xml
└── src/
└── main/
├── java/
│ └── com/
│ └── mycompany/
│ └── trino/
│ ├── DatadogTracer.java
│ ├── DatadogTracingEventListener.java
│ └── DatadogTracingEventListenerFactory.java
└── resources/
└── META-INF/
└── services/
└── io.trino.spi.eventlistener.EventListenerFactory
pom.xml
关键依赖:
<project>
<modelVersion>4.0.0</modelVersion>
<groupId>com.mycompany.trino</groupId>
<artifactId>trino-datadog-listener</artifactId>
<version>1.0-SNAPSHOT</version>
<properties>
<trino.version>435</trino.version>
<dd-trace-api.version>1.20.0</dd-trace-api.version>
</properties>
<dependencies>
<dependency>
<groupId>io.trino</groupId>
<artifactId>trino-spi</artifactId>
<version>${trino.version}</version>
<scope>provided</scope>
</dependency>
<!-- Datadog APM Client -->
<dependency>
<groupId>com.datadoghq</groupId>
<artifactId>dd-trace-api</artifactId>
<version>${dd-trace-api.version}</version>
</dependency>
<dependency>
<groupId>com.datadoghq</groupId>
<artifactId>dd-java-agent</artifactId>
<version>${dd-trace-api.version}</version>
<classifier>all</classifier>
</dependency>
<!-- Google Guava, Trino SPI依赖 -->
<dependency>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
<version>32.1.3-jre</version>
<scope>provided</scope>
</dependency>
</dependencies>
<!-- 使用 Shade Plugin 打包所有依赖到一个 fat jar -->
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId>
<version>3.5.1</version>
<executions>
<execution>
<phase>package</phase>
<goals>
<goal>shade</goal>
</goals>
<configuration>
<relocations>
<!--
为了避免与Trino自带的库或其它插件冲突,
将插件依赖的库重定位到一个新的包路径下。
这是一个生产级插件必须考虑的细节。
-->
<relocation>
<pattern>datadog.trace</pattern>
<shadedPattern>com.mycompany.trino.shaded.datadog.trace</shadedPattern>
</relocation>
</relocations>
</configuration>
</execution>
</executions>
</plugin>
</plugins>
</build>
</project>
这里的maven-shade-plugin
至关重要。Trino的插件是动态加载的,如果我们的插件和Trino内核或其他插件依赖了同一个库的不同版本,就会引发灾难性的ClassLoader
问题。通过重定位(relocation),我们将所有依赖的包名都修改了,从而避免了冲突。
DatadogTracingEventListenerFactory.java
:
import io.trino.spi.eventlistener.EventListener;
import io.trino.spi.eventlistener.EventListenerFactory;
import java.util.Map;
public class DatadogTracingEventListenerFactory implements EventListenerFactory {
@Override
public String getName() {
return "datadog-tracing";
}
@Override
public EventListener create(Map<String, String> config) {
return new DatadogTracingEventListener(config);
}
}
DatadogTracingEventListener.java
:
import com.google.common.collect.ImmutableMap;
import datadog.trace.api.DDTags;
import datadog.trace.api.interceptor.MutableSpan;
import datadog.trace.bootstrap.instrumentation.api.AgentSpan;
import datadog.trace.bootstrap.instrumentation.api.AgentTracer;
import io.trino.spi.eventlistener.EventListener;
import io.trino.spi.eventlistener.QueryCompletedEvent;
import io.trino.spi.eventlistener.QueryCreatedEvent;
import io.trino.spi.eventlistener.SplitCompletedEvent;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.ConcurrentHashMap;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class DatadogTracingEventListener implements EventListener {
private static final Logger log = LoggerFactory.getLogger(DatadogTracingEventListener.class);
private static final Pattern TRACEPARENT_PATTERN = Pattern.compile("/\\*\\s*traceparent='([^']+)'\\s*\\*/");
private final Map<String, AgentSpan> activeSpans = new ConcurrentHashMap<>();
private final String serviceName;
public DatadogTracingEventListener(Map<String, String> config) {
this.serviceName = config.getOrDefault("datadog.service.name", "trino-cluster");
log.info("Datadog Tracing Event Listener initialized for service: {}", serviceName);
}
@Override
public void queryCreated(QueryCreatedEvent queryCreatedEvent) {
String query = queryCreatedEvent.getMetadata().getQuery();
Matcher matcher = TRACEPARENT_PATTERN.matcher(query);
AgentSpan span;
if (matcher.find()) {
String traceparent = matcher.group(1);
try {
// 从traceparent中提取上下文
AgentSpan.Context extractedContext = AgentTracer.get().extract(
ImmutableMap.of("traceparent", traceparent),
(carrier, key) -> carrier.get(key)
);
// 创建一个子Span
span = AgentTracer.get().startSpan("trino.query", extractedContext);
log.debug("Started child span for query {} with parent context.", queryCreatedEvent.getMetadata().getQueryId());
} catch (Exception e) {
// 如果解析失败,则创建一个新的根Span,避免整个流程崩溃
log.warn("Failed to extract traceparent. Starting a new root span. Error: {}", e.getMessage());
span = startNewRootSpan();
}
} else {
// 如果没有找到traceparent,也创建一个根Span,确保所有查询都有追踪
span = startNewRootSpan();
log.debug("No traceparent found. Started a new root span for query {}.", queryCreatedEvent.getMetadata().getQueryId());
}
// 设置通用标签
span.setServiceName(this.serviceName);
span.setTag("trino.query.id", queryCreatedEvent.getMetadata().getQueryId());
span.setTag("trino.user", queryCreatedEvent.getContext().getUser());
span.setTag("trino.source", queryCreatedEvent.getContext().getSource().orElse("unknown"));
span.setTag(DDTags.COMPONENT, "trino-engine");
span.setTag(DDTags.DB_TYPE, "trino");
span.setTag(DDTags.RESOURCE_NAME, query.substring(0, Math.min(query.length(), 200)));
activeSpans.put(queryCreatedEvent.getMetadata().getQueryId(), span);
}
private AgentSpan startNewRootSpan() {
return AgentTracer.get().startSpan("trino.query");
}
@Override
public void queryCompleted(QueryCompletedEvent queryCompletedEvent) {
String queryId = queryCompletedEvent.getMetadata().getQueryId();
AgentSpan span = activeSpans.remove(queryId);
if (span == null) {
log.warn("Could not find active span for completed query: {}", queryId);
return;
}
try {
// 添加查询完成后的详细信息
span.setTag("trino.query.state", queryCompletedEvent.getMetadata().getQueryState());
span.setTag("trino.cpu_time_ms", queryCompletedEvent.getStatistics().getCpuTime().toMillis());
span.setTag("trino.wall_time_ms", queryCompletedEvent.getStatistics().getWallTime().toMillis());
span.setTag("trino.queued_time_ms", queryCompletedEvent.getStatistics().getQueuedTime().toMillis());
span.setTag("trino.processed_rows", queryCompletedEvent.getStatistics().getOutputRows());
span.setTag("trino.processed_bytes", queryCompletedEvent.getStatistics().getOutputBytes());
queryCompletedEvent.getFailureInfo().ifPresent(failure -> {
span.setError(true);
span.setTag("error.message", failure.getFailureMessage().orElse("Unknown error"));
span.setTag("error.type", failure.getFailureType().orElse("N/A"));
span.setTag("error.stack", failure.getFailureHost() + "\n" + failure.getFailureTask().orElse(""));
});
} finally {
// 确保Span总是被关闭
span.finish();
log.debug("Finished span for query {}", queryId);
}
}
}
src/main/resources/META-INF/services/io.trino.spi.eventlistener.EventListenerFactory
:
com.mycompany.trino.DatadogTracingEventListenerFactory
这段代码的核心逻辑:
queryCreated
:- 当Trino Coordinator接收到一个新查询时,此方法被调用。
- 使用正则表达式从SQL注释中提取
traceparent
。 - 使用Datadog的
AgentTracer.get().extract()
来反序列化这个traceparent
,得到一个父Span的上下文。 - 基于这个上下文,创建一个名为
trino.query
的子Span。 - 如果提取失败或
traceparent
不存在,为了不丢失监控,会创建一个新的根Span。这是一个重要的容错设计。 - 将创建的Span与
queryId
关联,存入一个ConcurrentHashMap
中。
queryCompleted
:- 当查询执行完成(无论成功或失败),此方法被调用。
- 从Map中取出对应的Span。
- 将查询的最终状态、CPU时间、处理行数等关键统计信息作为标签(tags)添加到Span中。
- 如果查询失败,将错误信息也记录到Span上。
- 最后,调用
span.finish()
,完成Span并将其发送给Datadog Agent。
第三步:部署与配置
打包插件:
在trino-datadog-listener
项目目录下运行mvn clean package
。这会生成一个 fat jar,例如trino-datadog-listener-1.0-SNAPSHOT.jar
。部署插件:
- 在Trino Coordinator节点上,创建一个新的插件目录:
mkdir -p /path/to/trino/plugins/datadog-tracing
。 - 将打包好的fat jar复制到这个目录中。
- 在Trino Coordinator节点上,创建一个新的插件目录:
配置Trino:
- 在Trino的配置目录(通常是
etc
)下,创建或修改event-listener.properties
文件。 - 添加以下内容:
event-listener.name=datadog-tracing # 以下为自定义配置,会被传入EventListenerFactory的create方法 datadog.service.name=trino-production-cluster
- 为了让Trino插件中的
AgentTracer
能找到Datadog Agent,你需要通过JVM参数来配置Agent的位置。修改Trino的etc/jvm.config
文件,添加:
这里的host和port应该指向你的Datadog Agent。-Ddd.agent.host=localhost -Ddd.agent.port=8126
- 在Trino的配置目录(通常是
重启Trino Coordinator:
重启Trino Coordinator以加载新的插件和配置。检查Coordinator的日志,应该能看到我们之前添加的"Datadog Tracing Event Listener initialized..."
日志消息。
最终成果与局限性
部署完成后,Datadog中的追踪视图发生了质变。之前断裂的链路现在被完美地连接起来了。点击一个来自Quarkus网关的慢查询trace,可以看到其下有一个trino.query
子Span,这个Span的持续时间精确地反映了查询在Trino中的总耗时。点开这个子Span,所有的自定义标签,如trino.cpu_time_ms
、trino.processed_rows
以及失败时的错误信息都一目了然。这使得定位性能瓶颈从数小时的猜测和手动排查,缩短到了几分钟的点击和分析。
然而,当前的实现方案也存在其局限性。我们追踪的粒度停留在Trino Coordinator层面,记录的是整个查询的生命周期。它无法提供更深层次的可见性,比如查询在各个Worker节点上的任务(Task)和分片(Split)的执行情况。要实现那种级别的追踪,需要对Trino的调度器和执行引擎进行更深度的代码侵入,例如通过自定义的ExchangeClient
或者在Task执行逻辑中继续传递和处理追踪上下文。此外,当前的设计是同步地在EventListener中上报追踪数据。对于一个每秒处理数千个查询的高负载集群,这可能会对Coordinator的事件处理线程造成压力。一个更健壮的生产级方案可能会引入一个内存中的无锁队列,将Span数据异步地发送给一个独立的后台线程进行上报,从而将可观测性逻辑与核心的查询处理路径解耦。