凌晨三点,on-call 工程师被一条 Sentry 高危警报唤醒。Tornado Internal Server Error: database connection refused
。这只是故事的开始。接下来是登录跳板机,SSH 到目标服务器,用 grep
在数GB的日志文件中大海捞针,检查 netstat
确认端口状态,翻阅 dmesg
寻找 OOM Killer 的踪迹,最后查看监控图表寻找异常拐点。整个过程在睡眼惺忪的状态下至少要花费15分钟,而这15分钟可能直接影响到服务的SLA。
这种场景在我们的团队中频繁上演。Sentry 提供了优秀的错误聚合和堆栈追踪,但它只告诉我们“发生了什么”,而“为什么发生”的上下文信息,散落在各个节点的系统状态、日志和指标中。问题的核心在于,on-call 工程师的初始诊断流程高度重复,却又极度耗时。我们需要的不是更快的工程师,而是一个能在工程师介入前就完成初步信息采集的自动化系统。
我们的初步构想是建立一个事件驱动的工作流:当 Sentry 捕获到一个新的、符合特定规则(例如,高优、生产环境)的错误事件时,自动触发一个任务,去事件发生的服务器上收集一份“诊断快照”,然后将这份快照附加到 Sentry issue 上。这样,工程师点开警报链接时,第一手资料已经备好。
sequenceDiagram participant Sentry participant Webhook Gateway (Tornado) participant Puppet Orchestrator participant Production Node Sentry->>+Webhook Gateway (Tornado): POST /webhook/sentry (New Issue Event) Webhook Gateway (Tornado)-->>Sentry: HTTP 202 Accepted Note right of Webhook Gateway (Tornado): 立即响应,后台异步处理 Webhook Gateway (Tornado)->>Webhook Gateway (Tornado): 解析Payload,提取`server_name`, `issue_id` Webhook Gateway (Tornado)->>+Puppet Orchestrator: POST /orchestrator/v1/command/task (Run `diagnostics::collect` on `server_name`) Puppet Orchestrator-->>-Webhook Gateway (Tornado): Job ID Puppet Orchestrator->>+Production Node: Execute Puppet Task Production Node-->>-Puppet Orchestrator: Task Result (Diagnostic Bundle) loop Poll Job Status Webhook Gateway (Tornado)->>Puppet Orchestrator: GET /orchestrator/v1/jobs/{Job ID} Puppet Orchestrator-->>Webhook Gateway (Tornado): Job Status (e.g., running, success) end Note right of Webhook Gateway (Tornado): Job成功后,获取结果 Webhook Gateway (Tornado)->>+Sentry: POST /api/0/issues/{issue_id}/comments/ (Post Diagnostic Bundle) Sentry-->>-Webhook Gateway (Tornado): Comment Created
技术选型决策相对直接。我们内部已经全面使用 Puppet 进行配置管理,所有服务器上都运行着 Puppet Agent,这意味着我们拥有一个现成的、安全的、可以触达任何节点的执行通道。Puppet Tasks 功能正是为这种按需执行的命令而设计的。Sentry 提供了强大的 Webhooks 和 API,是天然的事件源。唯一需要新建的组件就是中间的“Webhook Gateway”。
我们选择 Tornado 来构建这个网关。原因有三:
- 轻量且高效: 网关的核心功能是接收 webhook、调用外部 API。这是典型的 I/O 密集型任务,Tornado 的异步非阻塞模型是完美选择。我们不需要 Django 或 Flask 这样功能全面的框架。
- 高并发处理能力: Sentry 在服务抖动时可能会在短时间内发送大量 webhook。Tornado 的事件循环能轻松应对这种流量尖峰,避免因处理慢而导致 Sentry 重试或超时。
- 原生异步生态: 与 Puppet Orchestrator API 和 Sentry API 的交互都是网络调用。使用 Tornado 配合
aiohttp
可以用纯异步的方式编写整个流程,代码简洁且性能高。
第一步:构建安全的 Tornado Webhook 网关
网关的首要任务是可靠地接收并验证来自 Sentry 的 webhook。在真实项目中,绝不能简单地开放一个公网端点。Sentry webhook 提供了 Sentry-Hook-Signature
请求头,用于 HMAC 签名验证,这是我们必须实现的安全第一道防线。
sentry_gateway.py
:
import os
import hmac
import hashlib
import json
import logging
from typing import Optional, Awaitable
import tornado.ioloop
import tornado.web
import tornado.options
from tornado.httpclient import AsyncHTTPClient, HTTPRequest, HTTPError
# --- 配置区 ---
# 生产环境中应使用环境变量或配置管理工具注入
TORNADO_PORT = 8888
SENTRY_WEBHOOK_SECRET = os.environ.get("SENTRY_WEBHOOK_SECRET")
SENTRY_API_TOKEN = os.environ.get("SENTRY_API_TOKEN")
SENTRY_ORG_SLUG = "my-org"
PUPPET_ORCHESTRATOR_URL = "https://puppet.mydomain.com:8143"
PUPPET_TOKEN = os.environ.get("PUPPET_TOKEN") # Puppet RBAC token
# --- 日志配置 ---
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
class SentryWebhookHandler(tornado.web.RequestHandler):
"""
接收 Sentry Webhook,验证签名,并异步触发诊断任务
"""
def prepare(self) -> Optional[Awaitable[None]]:
# 在处理请求体之前验证签名
if not SENTRY_WEBHOOK_SECRET:
logging.error("SENTRY_WEBHOOK_SECRET 未配置,无法验证 webhook")
raise tornado.web.HTTPError(500, reason="Webhook secret not configured.")
signature = self.request.headers.get("Sentry-Hook-Signature")
if not signature:
logging.warning("请求缺少 Sentry-Hook-Signature 头")
raise tornado.web.HTTPError(400, reason="Missing signature.")
# HMAC-SHA256 验证
expected_signature = hmac.new(
key=SENTRY_WEBHOOK_SECRET.encode("utf-8"),
msg=self.request.body,
digestmod=hashlib.sha256,
).hexdigest()
if not hmac.compare_digest(expected_signature, signature):
logging.error("Webhook 签名验证失败")
raise tornado.web.HTTPError(401, reason="Invalid signature.")
return super().prepare()
async def post(self):
# 验证通过后,立即返回202,避免Sentry超时重试
self.set_status(202)
self.finish("Accepted")
# 将实际处理逻辑放入后台执行
tornado.ioloop.IOLoop.current().add_callback(self.process_event)
async def process_event(self):
"""
异步处理事件:解析、触发Puppet任务、回写Sentry
"""
try:
event_data = json.loads(self.request.body)
action = event_data.get("action")
# 我们只关心新创建的 issue
if action != "created":
logging.info(f"忽略非 created 事件: {action}")
return
# 从 event context 中提取关键信息
issue_id = event_data["data"]["issue"]["id"]
# Sentry事件中的 server_name 标签通常是主机名
server_name_tag = next((tag for tag in event_data["data"]["issue"]["tags"] if tag["key"] == "server_name"), None)
if not server_name_tag:
logging.warning(f"事件 (issue_id: {issue_id}) 中未找到 server_name 标签")
return
target_node = server_name_tag["value"]
logging.info(f"接收到新 issue {issue_id},目标节点: {target_node}")
# 触发 Puppet 任务
puppet_client = PuppetClient()
job_id = await puppet_client.run_diagnostics_task(target_node)
if not job_id:
await self.post_to_sentry(issue_id, f"❌ 触发 Puppet 诊断任务失败。")
return
# 轮询任务结果
diagnostics_report = await puppet_client.poll_task_result(job_id)
# 将结果回写到 Sentry
if diagnostics_report:
await self.post_to_sentry(issue_id, diagnostics_report)
else:
await self.post_to_sentry(issue_id, f"❌ 获取 Puppet 诊断任务 (Job ID: {job_id}) 结果失败或超时。")
except json.JSONDecodeError:
logging.error("无法解析 Sentry webhook payload")
except Exception as e:
logging.exception(f"处理 Sentry 事件时发生未知错误: {e}")
async def post_to_sentry(self, issue_id: str, content: str):
"""
向指定的 Sentry issue 添加评论
"""
sentry_api_url = f"https://sentry.io/api/0/issues/{issue_id}/comments/"
headers = {
"Authorization": f"Bearer {SENTRY_API_TOKEN}",
"Content-Type": "application/json",
}
body = json.dumps({"text": content})
http_client = AsyncHTTPClient()
try:
await http_client.fetch(sentry_api_url, method="POST", headers=headers, body=body)
logging.info(f"成功将诊断报告添加到 Sentry issue {issue_id}")
except HTTPError as e:
logging.error(f"向 Sentry API 发送评论失败 (issue_id: {issue_id}): {e.response.body if e.response else e}")
# PuppetClient 的实现将在下一节展示...
def make_app():
return tornado.web.Application([
(r"/webhook/sentry", SentryWebhookHandler),
])
if __name__ == "__main__":
if not all([SENTRY_WEBHOOK_SECRET, SENTRY_API_TOKEN, PUPPET_TOKEN]):
logging.critical("关键环境变量缺失!请设置 SENTRY_WEBHOOK_SECRET, SENTRY_API_TOKEN, PUPPET_TOKEN")
exit(1)
app = make_app()
app.listen(TORNADO_PORT)
logging.info(f"Sentry Webhook Gateway 启动于端口 {TORNADO_PORT}")
tornado.ioloop.IOLoop.current().start()
这段代码的核心设计在于 post
方法。它在验证签名后立即返回 202 Accepted
,并将真正的耗时操作 process_event
通过 add_callback
交给 IOLoop 在下一个循环中执行。这是一个健壮的 webhook 处理模式,确保了上游服务(Sentry)不会因为我们的处理延迟而认为请求失败。
第二步:定义并实现 Puppet 诊断任务
Puppet Task 是一个独立于 Puppet 正常 catalog apply 流程的、可以按需执行的脚本。我们创建一个名为 diagnostics
的模块,并在其中定义一个 collect
任务。
模块结构:
/etc/puppetlabs/code/environments/production/modules/
└── diagnostics/
├── tasks/
│ ├── collect.sh
│ └── collect.json
└── metadata.json
tasks/collect.json
: 任务元数据,描述任务、参数等。
{
"description": "收集节点的基础诊断信息",
"input_method": "stdin",
"parameters": {
"log_path": {
"description": "要检查的应用程序日志文件路径",
"type": "Optional[String[1]]",
"default": "/var/log/app/current.log"
},
"log_lines": {
"description": "要从日志文件尾部获取的行数",
"type": "Integer",
"default": 200
}
}
}
tasks/collect.sh
: 任务的实际执行脚本。一个常见的错误是让这个脚本过于复杂或权限过高。它应该遵循最小权限原则,只执行只读的诊断命令。
#!/bin/bash
# 从 Puppet 任务 runner 获取参数
# Puppet 会将 JSON 参数通过 stdin 传入,并设置相应的 PT_ 环境变量
LOG_PATH=$PT_log_path
LOG_LINES=$PT_log_lines
# 使用 Markdown 格式化输出,以便在 Sentry 中美观展示
echo "### 自动诊断报告 (`date -u`)"
echo '---'
echo '#### 系统概览'
echo '```'
echo "Hostname: $(hostname -f)"
echo "Uptime: $(uptime)"
echo '```'
echo
echo '#### 内存使用 (MB)'
echo '```'
free -m
echo '```'
echo
echo '#### 最近的系统日志 (dmesg)'
echo '```'
dmesg | tail -n 30
echo '```'
echo
echo '#### 网络连接 (Listen & Established)'
echo '```'
ss -tunap | grep -E 'LISTEN|ESTAB'
echo '```'
echo
if [ -f "$LOG_PATH" ]; then
echo "#### 应用日志尾部 ($LOG_PATH)"
echo '```'
tail -n "$LOG_LINES" "$LOG_PATH"
echo '```'
else
echo "#### 应用日志"
echo "> 日志文件 $LOG_PATH 未找到。"
fi
# Puppet Task 通过 stdout 返回结果
exit 0
这个脚本收集了包括主机名、负载、内存、内核消息、网络连接和应用日志在内的关键信息,并以 Markdown 格式输出。这种格式化的输出可以直接作为 Sentry issue 的评论内容。
第三步:从 Tornado 编排 Puppet 任务
这是连接应用层和服务层的关键。Tornado 网关需要通过 Puppet Orchestrator 的 REST API 来触发任务。这里的坑在于认证和对异步任务生命周期的管理。Puppet Orchestrator API 通常使用基于 RBAC 的 token 或客户端证书进行认证。
我们将为 Tornado 网关创建一个专用的 Puppet 用户和角色,并为其生成一个生命周期较长的 token。
puppet_client.py
: (可以整合到 sentry_gateway.py
或作为独立模块)
import json
import logging
import asyncio
from tornado.httpclient import AsyncHTTPClient, HTTPRequest, HTTPError
# ... (复用 sentry_gateway.py 中的配置)
class PuppetClient:
def __init__(self):
self.http_client = AsyncHTTPClient()
self.base_url = PUPPET_ORCHESTRATOR_URL
self.headers = {
"X-Authentication": PUPPET_TOKEN,
"Content-Type": "application/json",
}
async def run_diagnostics_task(self, target_node: str) -> Optional[str]:
"""
触发 'diagnostics::collect' 任务
:return: 任务的 Job ID
"""
url = f"{self.base_url}/orchestrator/v1/command/task"
body = {
"environment": "production",
"task": "diagnostics::collect",
"params": {
# 未来可以根据 Sentry 项目动态设置日志路径
"log_path": "/var/log/my-tornado-app/service.log"
},
"scope": {
"nodes": [target_node]
},
"description": f"Sentry 自动诊断任务 for {target_node}"
}
request = HTTPRequest(
url,
method="POST",
headers=self.headers,
body=json.dumps(body),
# Puppet Server 使用自签名证书,生产环境应配置可信CA
# 这里为了演示,我们禁用证书验证,但强烈不推荐在生产中这样做
validate_cert=False
)
try:
response = await self.http_client.fetch(request)
job_info = json.loads(response.body)
job_id = job_info.get("job", {}).get("name")
logging.info(f"成功触发 Puppet 任务,Job ID: {job_id}")
return job_id
except HTTPError as e:
logging.error(f"调用 Puppet Orchestrator API 失败: {e.response.body if e.response else e}")
return None
except (json.JSONDecodeError, KeyError) as e:
logging.error(f"解析 Puppet API 响应失败: {e}")
return None
async def poll_task_result(self, job_id: str, timeout: int = 120, interval: int = 5) -> Optional[str]:
"""
轮询任务状态直到完成,并返回结果
:return: 任务的标准输出内容
"""
url = f"{self.base_url}/orchestrator/v1/jobs/{job_id}/nodes"
request = HTTPRequest(url, headers=self.headers, validate_cert=False)
elapsed_time = 0
while elapsed_time < timeout:
try:
response = await self.http_client.fetch(request)
result = json.loads(response.body)
node_report = result.get("items", [])[0] # 假设只有一个节点
state = node_report.get("state")
if state == "finished":
logging.info(f"Puppet 任务 {job_id} 完成")
# 最新版本的 Puppet 任务结果在 'result' 字段
return node_report.get("result", {}).get("stdout")
elif state == "failed":
logging.error(f"Puppet 任务 {job_id} 执行失败: {node_report.get('result')}")
return f"Puppet 任务执行失败: \n```\n{json.dumps(node_report.get('result'), indent=2)}\n```"
# 否则继续轮询
await asyncio.sleep(interval)
elapsed_time += interval
except HTTPError as e:
logging.error(f"轮询 Puppet 任务 {job_id} 状态失败: {e.response.body if e.response else e}")
return None
except (json.JSONDecodeError, IndexError, KeyError) as e:
logging.error(f"解析 Puppet 任务结果失败: {e}")
return None
logging.warning(f"轮询 Puppet 任务 {job_id} 超时 ({timeout}s)")
return None
在 SentryWebhookHandler.process_event
中,我们需要实例化并调用这个 PuppetClient
。poll_task_result
的实现是关键。Puppet 任务是异步的,API 调用立即返回一个 Job ID。我们必须通过轮询来获取最终结果。asyncio.sleep
在这里是异步编程的正确方式,它不会阻塞整个 Tornado 进程。
一个常见的错误是在轮询时不做超时处理,这可能导致协程永久挂起,消耗服务器资源。这里我们设置了120秒的超时,对于一个诊断任务来说是足够宽裕的。
最终成果
部署完成后,当一个新的严重错误发生时:
- Sentry 触发 webhook 到我们的 Tornado 网关。
- Tornado 网关验证请求,并异步调用 Puppet Orchestrator API,在出错的节点上执行
diagnostics::collect
任务。 - 任务执行完成后,Tornado 网关获取到格式化的 Markdown 报告。
- 网关调用 Sentry API,将报告作为一条评论发布到对应的 issue 中。
On-call 工程师收到的通知邮件或Slack消息链接点开后,看到的不再仅仅是冰冷的堆栈信息,而是附带了这样一条自动生成的评论:
自动诊断报告 (
2023-10-27T03:01:15Z
)
系统概览
Hostname: web-prod-03.mydomain.com Uptime: 03:01:15 up 42 days, 11:20, 0 users, load average: 0.85, 0.95, 0.91
内存使用 (MB)
total used free shared buff/cache available Mem: 15995 8010 1203 150 6782 7485 Swap: 0 0 0
最近的系统日志 (dmesg)
[3670110.123456] oom-kill:constraint=CONSTRAINT_NONE,nodemask=(null),cpuset=/,mems_allowed=0,global_oom,task_memcg=/... [3670110.123457] Out of memory: Killed process 12345 (python) total-vm:..., anon-rss:..., file-rss:...
网络连接 (Listen & Established)
... tcp LISTEN 0 128 *:80 *:* users:(("nginx",pid=...)) tcp ESTAB 0 0 10.0.1.15:34567 10.0.2.20:5432 users:(("python",pid=...)) ...
应用日志尾部 (/var/log/my-tornado-app/service.log)
... 2023-10-27 03:00:58,112 ERROR [MainThread] aio_db_pool: Failed to connect to database. Traceback (most recent call last): ... asyncpg.exceptions._base.InterfaceError: connection rejected: could not fork new process for connection ...
现在,工程师一眼就能看到 dmesg
中的 OOM killer 记录和应用日志中的数据库连接失败,可以迅速将问题定位到数据库服务器过载或自身内存泄漏,极大地缩短了故障的平均解决时间(MTTR)。
局限性与未来迭代方向
这套系统并非银弹。当前的实现存在一些局限性。首先,Puppet Task 的执行并非瞬时,从触发到获取结果通常有5到30秒的延迟,这取决于 Puppet Server 的负载和通信链路。对于需要秒级响应的场景,这个延迟可能无法接受。其次,安全性是重中之重。虽然我们使用了 token,但这个 Tornado 网关实际上成了一个可以对生产环境执行命令的高权限入口,必须对该服务自身、网络访问策略和 Puppet 的 RBAC 规则进行严格的审计和加固。
未来的迭代可以从以下几个方向展开:
- 动态化诊断脚本: 当前的诊断任务是硬编码的。可以将其改造为从一个 Git 仓库动态拉取诊断脚本,或者根据 Sentry issue 的标签(如
project:api
,project:worker
)来执行不同的诊断任务。 - 更丰富的上下文: 不仅仅是系统日志,还可以集成 Prometheus API,拉取事件发生前后几分钟内的关键业务指标和系统指标图表,一并附加到 Sentry issue 中。
- 闭环与修复尝试: 对于某些明确的、模式化的错误(如缓存服务连接失败),可以尝试执行一些预定义的、安全的修复操作(例如,
systemctl restart redis-sentinel
),并将操作结果反馈到 issue 中,实现简单的“自愈”能力。
这套系统的核心价值在于,它将 SRE 的一部分经验和标准操作流程(SOP)固化为了可重复执行的代码,将人的被动响应,升级为了机器的主动诊断。