跳到主要内容
SkyWalking Python 分布式追踪实战:skywalking-python 埋点指南 | 极客日志
Python java
SkyWalking Python 分布式追踪实战:skywalking-python 埋点指南 Apache SkyWalking 为 Python 微服务提供分布式追踪能力。本文介绍 skywalking-python 库的安装配置,涵盖 Flask 自动埋点、自定义 Span 手动追踪及跨语言(Java)调用链传播机制。通过 gRPC/HTTP 协议上报数据,结合 Celery 异步任务与日志关联方案,实现全链路可观测性。内容包含性能评估、故障排查及生产环境最佳实践,帮助开发者构建稳定的系统监控体系。
SkyWalking Python 分布式追踪实战:基于 skywalking-python 的埋点
在当今微服务架构盛行的时代,分布式系统的可观测性(Observability)已成为保障系统稳定性和性能优化的关键能力。Apache SkyWalking 作为一款开源的 APM 系统,以其强大的分布式追踪、服务拓扑图、指标分析和告警能力,被广泛应用于生产环境。虽然 SkyWalking 最初以 Java 生态为主力支持对象,但随着社区的发展,Python 应用也可以通过 skywalking-python 实现无侵入或轻量级埋点,从而接入完整的观测体系。
本文将深入探讨如何在 Python 应用中集成 SkyWalking,并结合 Java 示例进行对比,帮助开发者理解跨语言追踪的核心机制。同时,我们将使用 Mermaid 图表展示调用链路结构,并提供官方文档链接供读者延伸学习。
什么是 SkyWalking?
Apache SkyWalking 是一个观测性平台,用于监控、追踪、诊断和可视化分布式系统的性能问题。它支持自动探针(Agent)和手动埋点(Manual Instrumentation),覆盖多种语言和框架:
✅ Java
✅ Python
✅ Node.js
✅ Go
✅ .NET
✅ PHP(实验性)
✅ Rust(实验性)
SkyWalking 的核心组件包括:
OAP Server :后端处理与存储。
UI :可视化仪表盘。
Agent / SDK :嵌入应用采集数据。
官方网站:https://skywalking.apache.org/
文档中心:https://skywalking.apache.org/docs/
Python 埋点基础:skywalking-python
skywalking-python 是 SkyWalking 官方提供的 Python 探针库,支持自动和手动两种方式采集追踪数据。其主要功能包括:
自动追踪 HTTP 请求(如 Flask、Django、FastAPI)
手动创建 Span(用于自定义业务逻辑)
上下文传播(Context Propagation)
支持 gRPC 和 HTTP 协议上报
安装与配置
首先安装 skywalking-python:
pip install apache-skywalking-python
然后,在你的 Python 应用入口处初始化 SkyWalking Agent:
from skywalking import agent, config
config.init(
collector_address='127.0.0.1:11800' ,
service_name='my-python-service' ,
protocol='grpc'
)
agent.start()
⚠️ 注意:必须在导入其他业务模块之前初始化 agent,否则无法正确拦截框架请求。
示例一:Flask 应用自动追踪
我们先从最简单的 Flask 应用开始,展示自动埋点的能力:
from flask import Flask
from skywalking import agent, config
config.init(
collector_address='127.0.0.1:11800' ,
service_name='flask-demo' ,
protocol='grpc'
)
agent.start()
app = Flask(__name__)
@app.route('/' )
def hello ():
return "Hello, SkyWalking!"
@app.route('/user/<int:user_id>' )
def get_user (user_id ):
import time
time.sleep(0.1 )
return f"User {user_id} data"
if __name__ == '__main__' :
app.run(host='0.0.0.0' , port=5000 )
启动该服务后,访问 http://localhost:5000/user/123,你将在 SkyWalking UI 中看到如下信息:
服务名:flask-demo
Endpoint:GET /user/<int:user_id>
响应时间:约 100ms
调用链路自动生成
示例二:手动埋点 —— 自定义 Span 有时我们需要追踪某个特定函数或外部调用(如 Redis、MySQL、第三方 API),这时就需要手动创建 Span:
from skywalking import Component
from skywalking.trace.context import get_context
from skywalking.trace.tags import Tag
def call_external_api (user_id ):
context = get_context()
with context.new_exit_span(
op="ExternalAPI/call" ,
peer="api.example.com" ,
component=Component.Unknown
) as span:
span.tag(Tag(key="user.id" , val=str (user_id)))
import time
time.sleep(0.2 )
return {"status" : "success" , "data" : f"User {user_id} " }
@app.route('/fetch/<int:user_id>' )
def fetch_user (user_id ):
result = call_external_api(user_id)
return result
new_exit_span 表示这是一个'出口'Span,通常用于外部服务调用。
peer 字段标识目标服务地址。
tag 用于附加业务信息,便于后续筛选和分析。
跨服务追踪:Python 与 Java 交互 在真实场景中,Python 服务往往与 Java 服务协同工作。SkyWalking 支持跨语言上下文传播,确保整个调用链完整。
假设我们有一个 Java Spring Boot 服务,提供用户信息服务:
@RestController
@RequestMapping("/api/user")
public class UserController {
@Autowired
private UserService userService;
@GetMapping("/{id}")
public ResponseEntity<User> getUser (@PathVariable Long id) {
User user = userService.findById(id);
return ResponseEntity.ok(user);
}
}
import requests
from skywalking import agent, config
from skywalking.trace.context import get_context
from skywalking.trace.carrier import Carrier
from skywalking import Component
from skywalking.trace.tags import TagHttpMethod, TagHttpURL
config.init(
collector_address='127.0.0.1:11800' ,
service_name='python-client' ,
protocol='grpc'
)
agent.start()
def call_java_service (user_id ):
context = get_context()
carrier = Carrier()
with context.new_exit_span(
op="HTTP/GET" ,
peer="localhost:8080" ,
component=Component.HttpClient
) as span:
span.tag(TagHttpMethod("GET" ))
span.tag(TagHttpURL(f"http://localhost:8080/api/user/{user_id} " ))
context.inject(carrier)
headers = dict (carrier)
response = requests.get(
f"http://localhost:8080/api/user/{user_id} " ,
headers=headers
)
return response.json()
@app.route('/proxy/user/<int:user_id>' )
def proxy_user (user_id ):
data = call_java_service(user_id)
return data
在 Java 端,Spring Cloud Sleuth + SkyWalking Agent 会自动提取 Header 中的 Trace 上下文,从而实现无缝衔接。
上下文传播机制详解 SkyWalking 使用类似 W3C Trace Context 的机制进行跨服务传播。关键 Header 包括:
sw8: SkyWalking 自定义协议头(Base64 编码)
traceparent: W3C 标准头(可选)
在手动埋点时,我们通过 Carrier 对象封装这些 Header:
carrier = Carrier()
context.inject(carrier)
headers = dict (carrier)
@GetMapping("/trace")
public String trace (HttpServletRequest request) {
String sw8 = request.getHeader("sw8" );
return "Traced!" ;
}
这种机制确保了即使跨越不同语言、不同框架,调用链依然连贯。
数据上报协议:gRPC vs HTTP 协议 优点 缺点 gRPC 高效、低延迟、支持流式传输 需要额外依赖,防火墙可能限制 HTTP 兼容性好,调试方便 性能略低,不支持双向流
在 Python 中切换协议只需修改 protocol 参数:
config.init(
collector_address='127.0.0.1:12800' ,
service_name='my-service' ,
protocol='http'
)
性能影响评估 引入 SkyWalking 是否会影响应用性能?答案是:极小影响 。
自动埋点仅在首次加载类时进行字节码增强(Java)或装饰器注入(Python)。
数据上报异步进行,不影响主流程。
可配置采样率,默认 100%,生产环境可调整为 10%~30% 以降低开销。
config.init(
collector_address='127.0.0.1:11800' ,
service_name='sampled-service' ,
sample_rate=0.3
)
插件生态与框架支持 skywalking-python 内置支持主流框架:
✅ Flask
✅ Django
✅ FastAPI
✅ Tornado
✅ aiohttp
✅ urllib3 / requests
✅ Redis / MySQL / PostgreSQL(需手动启用)
import skywalking.plugin
skywalking.plugin.install()
from skywalking.plugin import plugin_flask, plugin_requests
plugin_flask.install()
plugin_requests.install()
分布式追踪原理图解 下面用 Mermaid 展示一个典型的跨服务调用链:
sequenceDiagram
participant Client
participant Gateway as API Gateway (Python)
participant Order as Order Service (Python)
participant User as User Service (Java)
participant DB as Database
Client->>Gateway: GET /user/123/orders
Gateway->>Order: HTTP GET /orders?user_id=123
Order->>User: HTTP GET /api/user/123
User->>DB: SELECT * FROM orders WHERE user_id=123
DB-->>User: [{order_id: 1}, ...]
User-->>Order: { "user": "...", "orders": [...] }
Order-->>Gateway: [{order_id: 1}, ...]
Gateway-->>Client: { "user": "...", "orders": [...] }
每个服务节点都会生成自己的 Span。
上下文通过 Header 传递,确保 TraceID 一致。
最终在 SkyWalking UI 中形成完整的火焰图(Flame Graph)。
示例三:异步任务追踪(Celery) 对于后台任务系统如 Celery,我们同样可以追踪:
from celery import Celery
from skywalking import agent, config
from skywalking.trace.context import get_context
from skywalking.trace.tags import Tag
config.init(
collector_address='127.0.0.1:11800' ,
service_name='celery-worker' ,
protocol='grpc'
)
agent.start()
app = Celery('tasks' , broker='redis://localhost:6379' )
@app.task
def send_email (user_id, content ):
context = get_context()
with context.new_local_span(op="Task/send_email" ) as span:
span.tag(Tag(key="user.id" , val=str (user_id)))
import time
time.sleep(1 )
print (f"Email sent to user {user_id} " )
from tasks import send_email
send_email.delay(123 , "Welcome!" )
在 SkyWalking 中,你将看到独立于 HTTP 请求的任务追踪记录。
指标与告警 除了追踪,SkyWalking 还提供丰富的指标监控:
服务吞吐量(TPS)
平均响应时间
错误率
JVM / Python 内存 & GC(Java 支持更完善)
'当 UserService 响应时间 > 500ms 持续 5 分钟,则触发告警'
告警可通过 Webhook、Slack、钉钉等渠道通知。
配置文件示例(alarm-settings.yml):
rules:
- name: service_resp_time_rule
expression: service_resp_time > 500
duration: 5
message: "Service response time is too high"
故障排查技巧
1. 数据未上报?
OAP 服务是否运行?端口是否开放?
collector_address 配置是否正确?
防火墙是否阻止了 gRPC/HTTP 连接?
日志中是否有错误?启用 debug 模式:
import logging
logging.basicConfig(level=logging.DEBUG)
2. 调用链断裂?
确保上下游都正确注入/提取了上下文 Header。
检查中间件(如 Nginx、API Gateway)是否透传了 sw8 头。
在 Java 端确认是否启用了 @TraceCrossThread 注解(异步场景)。
3. UI 无数据显示?
确认服务名是否拼写一致。
检查时间范围选择是否正确。
查看 OAP 日志是否有数据写入异常。
生产环境最佳实践
1. 合理命名服务与端点 避免使用默认名如 python-service,应体现业务含义:
service_name = "order-management-api"
@app.route('/v1/orders/<int:order_id>' )
def get_order (order_id ):
...
span.operation_name = "OrderService.GetOrder"
2. 使用环境变量配置 import os
config.init(
collector_address=os.getenv('SW_AGENT_COLLECTOR_BACKEND_SERVICES' , '127.0.0.1:11800' ),
service_name=os.getenv('SW_AGENT_NAME' , 'default-service' ),
sample_rate=float (os.getenv('SW_AGENT_SAMPLE_RATE' , '1.0' ))
)
3. 优雅关闭 Agent 在应用退出时,确保 Agent 正常关闭以完成数据上报:
import atexit
from skywalking import agent
atexit.register(agent.stop)
4. 监控 Agent 自身状态 SkyWalking 提供 /internal/stats 接口(Java Agent),Python 可通过日志监控缓冲区积压情况。
扩展功能:日志关联 SkyWalking 支持将 Trace ID 注入日志,便于在 ELK 或 Loki 中关联查询。
Python 中可使用 logging.Filter:
import logging
from skywalking.trace.context import get_context
class SkyWalkingFilter (logging.Filter):
def filter (self, record ):
context = get_context()
if context.segment:
record.trace_id = context.segment.trace_id
else :
record.trace_id = "N/A"
return True
logger = logging.getLogger(__name__)
logger.addFilter(SkyWalkingFilter())
handler.setFormatter(logging.Formatter('[%(asctime)s] [%(trace_id)s] %(levelname)s - %(message)s' ))
这样,每条日志都会带上 Trace ID,实现'从日志跳转到调用链'。
高级特性:动态采样与条件追踪 在高流量场景下,全量采集成本过高。SkyWalking 支持动态采样策略:
from skywalking import config
config.sample_rate = 0.1
def should_sample (span ):
if span.operation_name.startswith("Payment" ):
return True
return False
config.sampling_policy = should_sample
if request.headers.get('X-SkyWalking-Sampled' ) == 'true' :
context.force_sampling()
与 OpenTelemetry 的关系 OpenTelemetry(OTel)是 CNCF 主导的新一代可观测性标准,旨在统一 Metrics、Logs、Traces。SkyWalking 已支持 OTel 协议:
可接收 OTel 格式的数据(通过 OAP 的 OTLP Receiver)
skywalking-python 未来可能兼容 OTel API
新项目可优先考虑 OTel
现有 SkyWalking 用户无需迁移,两者可共存
示例四:集成测试中的追踪验证 在单元测试或集成测试中,你可能希望验证追踪行为是否符合预期:
import unittest
from skywalking.trace.context import get_context
from skywalking.trace.carrier import Carrier
class TestTracing (unittest.TestCase):
def test_span_creation (self ):
context = get_context()
with context.new_local_span(op="TestSpan" ) as span:
span.tag(Tag(key="test.key" , val="test.value" ))
self .assertIsNotNone(span.span_id)
self .assertEqual(span.operation_name, "TestSpan" )
def test_context_propagation (self ):
context = get_context()
carrier = Carrier()
context.inject(carrier)
self .assertIn("sw8" , carrier.items)
你还可以模拟 OAP 上报,验证数据结构是否正确。
多数据中心部署 在跨地域部署时,建议每个区域部署独立的 OAP 集群,通过 cluster 配置实现数据聚合:
cluster:
selector: ${SW_CLUSTER:zookeeper}
zookeeper:
namespace: ${SW_NAMESPACE:""}
hostPort: ${SW_CLUSTER_ZK_HOST_PORT:localhost:2181}
Python Agent 无需特殊配置,只需指向本地 OAP 即可。
小贴士:提升可观测性体验
善用 Tag :为 Span 添加业务语义标签,如 user_id, order_status。
标准化操作名 :使用 Domain.Action.Resource 格式,如 Order.Create, Payment.Refund。
关联错误信息 :捕获异常时记录到 Span:
try :
risky_operation()
except Exception as e:
span.error_occurred = True
span.log(e)
设置超时告警 :对关键路径设置 SLA,超时自动告警。
总结 通过 skywalking-python,我们可以轻松为 Python 应用添加分布式追踪能力,无论你是使用 Flask、Django、FastAPI,还是 Celery、aiohttp,都能找到合适的集成方式。配合 Java 服务,构建完整的跨语言调用链,实现真正的端到端可观测性。
SkyWalking 不仅仅是一个'监控工具',更是系统稳定性工程的重要组成部分。它帮助我们在故障发生前预警,在故障发生时快速定位,在复盘时提供数据支撑。
相关免费在线工具 Keycode 信息 查找任何按下的键的javascript键代码、代码、位置和修饰符。 在线工具,Keycode 信息在线工具,online
Escape 与 Native 编解码 JavaScript 字符串转义/反转义;Java 风格 \uXXXX(Native2Ascii)编码与解码。 在线工具,Escape 与 Native 编解码在线工具,online
JavaScript / HTML 格式化 使用 Prettier 在浏览器内格式化 JavaScript 或 HTML 片段。 在线工具,JavaScript / HTML 格式化在线工具,online
JavaScript 压缩与混淆 Terser 压缩、变量名混淆,或 javascript-obfuscator 高强度混淆(体积会增大)。 在线工具,JavaScript 压缩与混淆在线工具,online
curl 转代码 解析常见 curl 参数并生成 fetch、axios、PHP curl 或 Python requests 示例代码。 在线工具,curl 转代码在线工具,online
Base64 字符串编码/解码 将字符串编码和解码为其 Base64 格式表示形式即可。 在线工具,Base64 字符串编码/解码在线工具,online