我们的语义搜索原型上线后,性能表现起初令人满意。Sanic 的异步特性结合 Qdrant 基于 Rust 的高性能向量检索引擎,响应速度极快。但随着调用量增加,偶尔出现的 P95 延迟飙升问题让我们陷入了困境。问题排查过程如同盲人摸象:是 Svelte 前端发起的网络请求慢了?是 Sanic 的事件循环被某个任务阻塞了?还是 Qdrant 内部正在进行索引合并导致查询变慢?分散在各个组件中的日志只能提供孤立的线索,我们无法将一次缓慢的用户请求与后端的具体操作关联起来。
这种信息孤岛是不可接受的。我们需要一种方法来追踪单个请求的全生命周期,从用户在 Svelte 界面点击搜索按钮开始,穿过 Sanic 的 API 网关,直到 Qdrant 完成向量检索,最后再返回给用户。目标很明确:构建一个端到端的可观测性体系。技术选型上,OpenTelemetry 作为 CNCF 的一个开放标准,提供了与供应商无关的 API 和 SDK,是目前最理想的选择。
架构与可观测性设计
我们的目标架构需要将 OpenTelemetry Collector 作为数据管道的核心,接收来自 Svelte 前端、Sanic 后端以及未来可能加入的任何服务的遥测数据(Traces, Metrics, Logs),处理后统一导出到可观测性后端,这里我们选择 Jaeger 进行追踪数据的可视化。
sequenceDiagram participant SvelteApp as Svelte 前端 participant SanicAPI as Sanic 后端 API participant QdrantDB as Qdrant 向量数据库 participant OTelCollector as OpenTelemetry Collector participant JaegerUI as Jaeger UI SvelteApp->>+SanicAPI: 发起搜索请求 (携带 traceparent header) SanicAPI->>SanicAPI: OTel 中间件解析 traceparent, 创建子 Span SanicAPI->>+QdrantDB: 执行向量搜索 (手动创建 Qdrant Span) QdrantDB-->>-SanicAPI: 返回搜索结果 SanicAPI-->>-SvelteApp: 返回 API 响应 Note over SvelteApp, SanicAPI: 各自独立将 Span 数据
异步发送到 OTel Collector SanicAPI-->>OTelCollector: 导出 Sanic & Qdrant Spans SvelteApp-->>OTelCollector: 导出浏览器 Spans OTelCollector->>OTelCollector: 处理 & 批次化 OTelCollector->>JaegerUI: 导出至 Jaeger
这个设计的关键在于 traceparent
header 的传递。它像一根绳子,将分布在不同进程和机器上的操作(Spans)串联成一个完整的调用链(Trace)。
后端先行:为 Sanic 集成 OpenTelemetry
后端是整个系统的核心,也是追踪信息的汇聚点。为 Sanic 应用集成 OpenTelemetry 需要精确的配置,尤其是在异步环境中,确保上下文(Context)的正确传递至关重要。
1. 依赖安装
首先,我们需要安装所有相关的 OpenTelemetry 库。
# 核心 OTel 库
pip install opentelemetry-api opentelemetry-sdk
# OTLP 导出器,用于将数据发送到 Collector
pip install opentelemetry-exporter-otlp
# Sanic 自动埋点库
pip install opentelemetry-instrumentation-sanic
# Qdrant 客户端
pip install qdrant-client
# 其他依赖
pip install sanic sanic-ext python-dotenv ujson
2. 初始化追踪器
在一个独立的文件 observability.py
中,我们集中处理 OpenTelemetry 的初始化逻辑。这在真实项目中是最佳实践,避免主应用文件 server.py
过于臃肿。
# observability.py
import logging
import os
from dotenv import load_dotenv
from opentelemetry import trace
from opentelemetry.sdk.resources import Resource
from opentelemetry.sdk.trace import TracerProvider
from opentelemetry.sdk.trace.export import BatchSpanProcessor
from opentelemetry.exporter.otlp.proto.grpc.trace_exporter import OTLPSpanExporter
from opentelemetry.instrumentation.sanic import SanicInstrumentor
def setup_observability(app):
"""
配置 OpenTelemetry Tracing 和 Logging
"""
load_dotenv()
# 从环境变量获取配置,这是生产级应用的标准做法
service_name = os.getenv("OTEL_SERVICE_NAME", "semantic-search-api")
otlp_endpoint = os.getenv("OTEL_EXPORTER_OTLP_ENDPOINT", "http://localhost:4317")
# 1. 配置 Resource,用于标识服务
resource = Resource(attributes={
"service.name": service_name
})
# 2. 设置 TracerProvider
tracer_provider = TracerProvider(resource=resource)
trace.set_tracer_provider(tracer_provider)
# 3. 配置 OTLP Exporter
# 这里的 endpoint 指向 OpenTelemetry Collector 的 gRPC 端口
span_exporter = OTLPSpanExporter(endpoint=otlp_endpoint, insecure=True)
# 4. 配置 Span Processor
# BatchSpanProcessor 会在后台批量发送数据,对主应用性能影响较小
span_processor = BatchSpanProcessor(span_exporter)
tracer_provider.add_span_processor(span_processor)
# 5. 自动埋点 Sanic 应用
# 这会自动处理请求的入口和出口,创建根 Span
SanicInstrumentor().instrument_app(app)
# 6. 配置日志,将 trace_id 和 span_id 注入到日志中
# 这是关联 Logs 和 Traces 的关键
class OTelLoggingFormatter(logging.Formatter):
def format(self, record):
span = trace.get_current_span()
if span and span.is_recording():
trace_id = span.get_span_context().trace_id
span_id = span.get_span_context().span_id
record.otel_trace_id = f"{trace_id:032x}"
record.otel_span_id = f"{span_id:016x}"
else:
record.otel_trace_id = "0" * 32
record.otel_span_id = "0" * 16
return super().format(record)
# 在真实项目中,日志格式应更详尽
log_format = '%(asctime)s - %(levelname)s - [%(otel_trace_id)s-%(otel_span_id)s] - %(message)s'
formatter = OTelLoggingFormatter(log_format)
handler = logging.StreamHandler()
handler.setFormatter(formatter)
# 获取 Sanic 默认的 logger 并重新配置
sanic_logger = logging.getLogger("sanic.root")
sanic_logger.handlers.clear()
sanic_logger.addHandler(handler)
sanic_logger.setLevel(logging.INFO)
app.ctx.logger = sanic_logger
app.ctx.tracer = trace.get_tracer(__name__)
app.ctx.logger.info(f"Observability setup complete for service '{service_name}'. Exporting to {otlp_endpoint}")
这段代码做了几件至关重要的事:
- 服务识别: 通过
Resource
告诉 OTel 后端这些追踪数据来自哪个服务。 - 异步导出: 使用
BatchSpanProcessor
避免每次追踪都阻塞请求。 - 自动埋点:
SanicInstrumentor
帮我们处理了大部分工作,它会自动从请求头中提取traceparent
并创建 Span。 - 日志关联: 自定义
Formatter
将trace_id
和span_id
注入到每一行日志中。当你在日志平台看到一条错误日志时,可以直接用trace_id
去 Jaeger 搜索完整的调用链,定位根因。
3. 主应用与手动埋点
现在我们来编写主应用 server.py
。自动埋点只能覆盖框架层面,对于业务逻辑中的关键操作,如调用 Qdrant,我们必须手动创建 Span 来获得更精细的追踪粒度。
# server.py
import os
import logging
from sanic import Sanic, response
from sanic_ext import Extend
from qdrant_client import QdrantClient, models
from observability import setup_observability
# --- 应用初始化 ---
app = Sanic("SemanticSearchAPI")
app.config.CORS_ORIGINS = "*" # 仅用于开发环境
Extend(app)
# --- 客户端初始化 ---
@app.before_server_start
async def setup_clients(app, loop):
qdrant_host = os.getenv("QDRANT_HOST", "qdrant")
qdrant_port = int(os.getenv("QDRANT_PORT", 6333))
app.ctx.qdrant_client = QdrantClient(host=qdrant_host, port=qdrant_port)
app.ctx.logger.info(f"Qdrant client initialized for host: {qdrant_host}:{qdrant_port}")
# --- OpenTelemetry 初始化 ---
setup_observability(app)
# --- 核心业务逻辑 ---
@app.post("/api/v1/search")
async def semantic_search(request):
logger = request.app.ctx.logger
tracer = request.app.ctx.tracer
qdrant: QdrantClient = request.app.ctx.qdrant_client
try:
data = request.json
query_vector = data.get("vector")
limit = data.get("limit", 10)
collection_name = "my_collection" # 在真实应用中应该可配置
if not query_vector or not isinstance(query_vector, list):
logger.warning("Invalid search request: 'vector' field is missing or not a list.")
return response.json({"error": "Invalid input"}, status=400)
# 这里的 with 块是手动埋点的核心
# 我们创建了一个名为 'qdrant.search' 的子 Span
with tracer.start_as_current_span("qdrant.search") as span:
# 为 Span 添加有意义的属性,这对于后续分析至关重要
span.set_attribute("db.system", "qdrant")
span.set_attribute("db.operation", "search")
span.set_attribute("db.qdrant.collection_name", collection_name)
span.set_attribute("db.qdrant.limit", limit)
span.set_attribute("db.qdrant.vector_size", len(query_vector))
logger.info(f"Performing search on collection '{collection_name}' with limit {limit}")
try:
search_result = qdrant.search(
collection_name=collection_name,
query_vector=query_vector,
limit=limit,
with_payload=True
)
# 记录查询到的结果数量
span.set_attribute("db.qdrant.hits", len(search_result))
span.set_status(trace.StatusCode.OK)
except Exception as e:
logger.error(f"Qdrant search failed: {e}", exc_info=True)
# 标记 Span 为错误状态,并记录异常信息
span.set_status(trace.StatusCode.ERROR, description=str(e))
# 向上抛出异常,由全局错误处理器捕获
raise
# 格式化返回结果
results = [
{"id": hit.id, "score": hit.score, "payload": hit.payload}
for hit in search_result
]
return response.json({"results": results})
except Exception as e:
logger.error(f"An unexpected error occurred in semantic_search: {e}", exc_info=True)
# 确保在任何异常情况下都返回标准的错误响应
return response.json({"error": "Internal Server Error"}, status=500)
if __name__ == "__main__":
app.run(host="0.0.0.0", port=8000, debug=False, access_log=False)
这里的关键是 tracer.start_as_current_span("qdrant.search")
。它创建了一个新的 Span,并将其设置为当前上下文的活动 Span。在这个 with
块内执行的所有操作,包括日志记录,都会自动关联到这个新创建的 qdrant.search
Span。我们还通过 span.set_attribute
添加了丰富的元数据,例如集合名称、查询限制等,这些信息在分析性能瓶颈时价值连城。
前端协同:为 Svelte 应用集成 OpenTelemetry
前端的可观测性同样重要,它能帮我们量化用户感知的真实性能,例如从点击到看到结果的完整时长。
1. 依赖安装
我们需要安装针对 Web 的 OpenTelemetry 库。
npm install @opentelemetry/api \
@opentelemetry/sdk-trace-web \
@opentelemetry/instrumentation-fetch \
@opentelemetry/exporter-trace-otlp-http \
@opentelemetry/context-zone
2. 初始化追踪器
在 Svelte 项目的 src
目录下创建一个 tracing.js
文件,用于集中管理 OTel 配置。
// src/tracing.js
import { ZoneContextManager } from '@opentelemetry/context-zone';
import { OTLPTraceExporter } from '@opentelemetry/exporter-trace-otlp-http';
import { registerInstrumentations } from '@opentelemetry/instrumentation';
import { FetchInstrumentation } from '@opentelemetry/instrumentation-fetch';
import { WebTracerProvider, BatchSpanProcessor } from '@opentelemetry/sdk-trace-web';
import { Resource } from '@opentelemetry/resources';
import { SemanticResourceAttributes } from '@opentelemetry/semantic-conventions';
const resource = new Resource({
[SemanticResourceAttributes.SERVICE_NAME]: 'svelte-frontend',
});
// 指向 OTel Collector 的 HTTP 端口
const exporter = new OTLPTraceExporter({
url: 'http://localhost:4318/v1/traces',
});
const provider = new WebTracerProvider({
resource: resource,
});
provider.addSpanProcessor(new BatchSpanProcessor(exporter));
// ZoneContextManager 用于在异步回调中正确传递上下文
provider.register({
contextManager: new ZoneContextManager(),
});
// 自动埋点 fetch API
// `propagateTraceHeaderCorsUrls` 是关键,它允许 OTel 在跨域请求中注入 traceparent header
registerInstrumentations({
instrumentations: [
new FetchInstrumentation({
propagateTraceHeaderCorsUrls: [
'http://localhost:8000/api/v1/search',
]
}),
],
});
console.log("Svelte OpenTelemetry tracing initialized.");
将这个文件在你的主入口文件 (如 main.js
或 App.svelte
的 script 顶部) 导入即可:import './tracing';
FetchInstrumentation
是这里的核心。它会自动拦截所有 fetch
调用,开始一个新的 Span,并将 traceparent
header 注入到请求中。Sanic 后端的 SanicInstrumentor
会自动识别这个 header,从而将前端和后端的 Span 连接起来。
3. Svelte 组件示例
一个简单的 Svelte 搜索组件如下,它本身不需要任何 OTel 相关的代码,因为埋点是自动的。
<!-- src/Search.svelte -->
<script>
import { trace } from '@opentelemetry/api';
let results = [];
let isLoading = false;
let error = null;
async function performSearch() {
isLoading = true;
error = null;
// 获取当前活动的 tracer
const tracer = trace.getTracer('svelte-component-tracer');
// 手动创建一个 Span,覆盖整个用户操作
const span = tracer.startSpan('user-perform-search-action');
// 在 Span 的上下文中执行操作
await trace.context.with(trace.setSpan(trace.context.active(), span), async () => {
try {
// 生成一个随机向量用于演示
const randomVector = Array.from({ length: 128 }, () => Math.random() * 2 - 1);
span.addEvent('Search triggered by user');
const response = await fetch('http://localhost:8000/api/v1/search', {
method: 'POST',
headers: { 'Content-Type': 'application/json' },
body: JSON.stringify({ vector: randomVector, limit: 5 }),
});
if (!response.ok) {
throw new Error(`API Error: ${response.statusText}`);
}
const data = await response.json();
results = data.results;
span.setAttribute('search.results.count', results.length);
span.setStatus({ code: 1 }); // 1 = OK
} catch (e) {
error = e.message;
span.recordException(e);
span.setStatus({ code: 2, message: e.message }); // 2 = ERROR
} finally {
isLoading = false;
span.end();
}
});
}
</script>
<main>
<h1>Semantic Search</h1>
<button on:click={performSearch} disabled={isLoading}>
{isLoading ? 'Searching...' : 'Perform Random Search'}
</button>
{#if error}
<p style="color: red;">Error: {error}</p>
{/if}
{#if results.length > 0}
<ul>
{#each results as result}
<li>
<strong>ID:</strong> {result.id} |
<strong>Score:</strong> {result.score.toFixed(4)}
</li>
{/each}
</ul>
{/if}
</main>
虽然 FetchInstrumentation
能自动追踪网络请求,但通过 tracer.startSpan('user-perform-search-action')
手动包裹整个用户交互过程,能提供更丰富的业务上下文。这个 Span 会成为 fetch
Span 的父 Span,在 Jaeger UI 中形成一个更清晰的层级结构。
整合与部署
为了方便地将所有组件(Svelte, Sanic, Qdrant, OTel Collector, Jaeger)运行起来,我们使用 Docker Compose。
1. OpenTelemetry Collector 配置 (otel-collector-config.yaml
)
receivers:
otlp:
protocols:
grpc:
http:
processors:
batch:
exporters:
logging:
loglevel: debug
jaeger:
endpoint: jaeger-all-in-one:14250
tls:
insecure: true
service:
pipelines:
traces:
receivers: [otlp]
processors: [batch]
exporters: [logging, jaeger]
这个配置告诉 Collector 通过 gRPC 和 HTTP 接收 OTLP 数据,进行批处理,然后同时导出到控制台(用于调试)和 Jaeger。
2. Docker Compose (docker-compose.yml
)
version: '3.8'
services:
# Sanic API
api:
build:
context: ./backend # 假设后端代码在 backend 目录
ports:
- "8000:8000"
environment:
- OTEL_SERVICE_NAME=semantic-search-api
- OTEL_EXPORTER_OTLP_ENDPOINT=http://otel-collector:4317
- QDRANT_HOST=qdrant
depends_on:
- qdrant
- otel-collector
# Qdrant
qdrant:
image: qdrant/qdrant:latest
ports:
- "6333:6333"
- "6334:6334"
volumes:
- ./qdrant_storage:/qdrant/storage
# OpenTelemetry Collector
otel-collector:
image: otel/opentelemetry-collector-contrib:latest
command: ["--config=/etc/otel-collector-config.yaml"]
volumes:
- ./otel-collector-config.yaml:/etc/otel-collector-config.yaml
ports:
- "4317:4317" # gRPC
- "4318:4318" # HTTP
depends_on:
- jaeger
# Jaeger
jaeger:
image: jaegertracing/all-in-one:latest
ports:
- "16686:16686" # Jaeger UI
- "14250:14250" # gRPC for Collector
# Svelte (使用 Nginx 托管编译后的静态文件)
frontend:
build:
context: ./frontend # 假设前端代码在 frontend 目录
ports:
- "8080:80"
depends_on:
- api
启动所有服务 (docker-compose up --build
) 后,访问 http://localhost:8080
点击搜索按钮,然后打开 Jaeger UI (http://localhost:16686
),你就能看到一条完整的调用链,清晰地展示了从前端交互到数据库查询的每一步耗时。
方案的局限性与未来展望
这套方案为我们的语义搜索服务提供了坚实的可观测性基础,但它并非终点。在生产环境中,还存在一些需要进一步完善的地方。
首先,当前的追踪是全量采样的。在高流量场景下,这会带来巨大的性能开销和存储成本。下一步需要引入采样策略,例如基于头部的采样(Head-based Sampling)或更高级的尾部采样(Tail-based Sampling),后者可以在请求完成后根据其特征(如是否出错、耗时是否超长)来决定是否保留这条追踪数据。
其次,我们只实现了 Tracing。一个完整的可观测性体系还包括 Metrics 和 Logs。我们可以通过 OpenTelemetry 的 MeterProvider
收集关键业务指标(如QPS、P99延迟、搜索结果数等)并导出到 Prometheus,通过 OTel 的日志SDK 将结构化日志统一发送到 Loki 或 Elasticsearch,从而实现三者的真正联动。
最后,对 Qdrant 的埋点是手动的。这依赖于应用开发者自觉地包裹客户端调用。更健壮的方案是为 qdrant-client
贡献一个官方的 OpenTelemetry Instrumentation 库,实现无侵入的自动埋点,或者使用 eBPF 这类技术在更底层进行无感知的网络调用监控。