SkyWalking - Python 应用追踪:基于 skywalking-python 的埋点

SkyWalking - Python 应用追踪:基于 skywalking-python 的埋点
在这里插入图片描述
👋 大家好,欢迎来到我的技术博客!
📚 在这里,我会分享学习笔记、实战经验与技术思考,力求用简单的方式讲清楚复杂的问题。
🎯 本文将围绕SkyWalking这个话题展开,希望能为你带来一些启发或实用的参考。
🌱 无论你是刚入门的新手,还是正在进阶的开发者,希望你都能有所收获!

文章目录

🐍 SkyWalking - Python 应用追踪:基于 skywalking-python 的埋点

在当今微服务架构盛行的时代,分布式系统的可观测性(Observability)已成为保障系统稳定性和性能优化的关键能力。Apache SkyWalking 作为一款开源的 APM(Application Performance Monitoring)系统,以其强大的分布式追踪、服务拓扑图、指标分析和告警能力,被广泛应用于生产环境。虽然 SkyWalking 最初以 Java 生态为主力支持对象,但随着社区的发展,Python 应用也可以通过 skywalking-python 实现无侵入或轻量级埋点,从而接入完整的观测体系。

本文将深入探讨如何在 Python 应用中集成 SkyWalking,并结合 Java 示例进行对比,帮助开发者理解跨语言追踪的核心机制。同时,我们将使用 Mermaid 图表展示调用链路结构,并提供多个可访问的官方文档链接供读者延伸学习。


🧭 什么是 SkyWalking?

Apache SkyWalking 是一个观测性平台,用于监控、追踪、诊断和可视化分布式系统的性能问题。它支持自动探针(Agent)和手动埋点(Manual Instrumentation),覆盖多种语言和框架:

  • ✅ Java
  • ✅ Python
  • ✅ Node.js
  • ✅ Go
  • ✅ .NET
  • ✅ PHP(实验性)
  • ✅ Rust(实验性)

SkyWalking 的核心组件包括:

  1. OAP Server:后端处理与存储。
  2. UI:可视化仪表盘。
  3. Agent / SDK:嵌入应用采集数据。
官方网站:https://skywalking.apache.org/
文档中心:https://skywalking.apache.org/docs/

🐍 Python 埋点基础:skywalking-python

skywalking-python 是 SkyWalking 官方提供的 Python 探针库,支持自动和手动两种方式采集追踪数据。其主要功能包括:

  • 自动追踪 HTTP 请求(如 Flask、Django、FastAPI)
  • 手动创建 Span(用于自定义业务逻辑)
  • 上下文传播(Context Propagation)
  • 支持 gRPC 和 HTTP 协议上报

🔧 安装与配置

首先安装 skywalking-python

pip install apache-skywalking 

然后,在你的 Python 应用入口处初始化 SkyWalking Agent:

from skywalking import agent, config config.init( collector_address='127.0.0.1:11800',# OAP 服务器地址 service_name='my-python-service', protocol='grpc'# 或 'http') agent.start()

⚠️ 注意:必须在导入其他业务模块之前初始化 agent,否则无法正确拦截框架请求。


📡 示例一:Flask 应用自动追踪

我们先从最简单的 Flask 应用开始,展示自动埋点的能力:

# app.pyfrom flask import Flask from skywalking import agent, config # 初始化 SkyWalking config.init(collector_address='127.0.0.1:11800', service_name='flask-demo', protocol='grpc') agent.start() app = Flask(__name__)@app.route('/')defhello():return"Hello, SkyWalking!"@app.route('/user/<int:user_id>')defget_user(user_id):# 模拟数据库查询import time time.sleep(0.1)returnf"User {user_id} data"if __name__ =='__main__': app.run(host='0.0.0.0', port=5000)

启动该服务后,访问 http://localhost:5000/user/123,你将在 SkyWalking UI 中看到如下信息:

  • 服务名:flask-demo
  • Endpoint:GET /user/<int:user_id>
  • 响应时间:约 100ms
  • 调用链路自动生成

🧵 示例二:手动埋点 —— 自定义 Span

有时我们需要追踪某个特定函数或外部调用(如 Redis、MySQL、第三方 API),这时就需要手动创建 Span:

from skywalking import Component from skywalking.trace.context import get_context from skywalking.trace.tags import Tag defcall_external_api(user_id): context = get_context()with context.new_exit_span(op="ExternalAPI/call", peer="api.example.com", component=Component.Unknown)as span: span.tag(Tag(key="user.id", val=str(user_id)))# 模拟网络延迟import time time.sleep(0.2)return{"status":"success","data":f"User {user_id}"}@app.route('/fetch/<int:user_id>')deffetch_user(user_id): result = call_external_api(user_id)return result 

在这个例子中:

  • new_exit_span 表示这是一个“出口”Span,通常用于外部服务调用。
  • peer 字段标识目标服务地址。
  • tag 用于附加业务信息,便于后续筛选和分析。

🔗 跨服务追踪:Python 与 Java 交互

在真实场景中,Python 服务往往与 Java 服务协同工作。SkyWalking 支持跨语言上下文传播,确保整个调用链完整。

假设我们有一个 Java Spring Boot 服务,提供用户信息服务:

// UserController.java@RestController@RequestMapping("/api/user")publicclassUserController{@AutowiredprivateUserService userService;@GetMapping("/{id}")publicResponseEntity<User>getUser(@PathVariableLong id){User user = userService.findById(id);returnResponseEntity.ok(user);}}

对应的 Python 服务调用该接口:

import requests from skywalking import agent, config from skywalking.trace.context import get_context from skywalking.trace.carrier import Carrier config.init(collector_address='127.0.0.1:11800', service_name='python-client', protocol='grpc') agent.start()defcall_java_service(user_id): context = get_context() carrier = Carrier()with context.new_exit_span(op="HTTP/GET", peer="localhost:8080", component=Component.HttpClient)as span: span.tag(TagHttpMethod("GET")) span.tag(TagHttpURL(f"http://localhost:8080/api/user/{user_id}"))# 注入上下文到 HTTP Header context.inject(carrier) headers =dict(carrier) response = requests.get(f"http://localhost:8080/api/user/{user_id}", headers=headers)return response.json()@app.route('/proxy/user/<int:user_id>')defproxy_user(user_id): data = call_java_service(user_id)return data 

在 Java 端,Spring Cloud Sleuth + SkyWalking Agent 会自动提取 Header 中的 Trace 上下文,从而实现无缝衔接。


🔄 上下文传播机制详解

SkyWalking 使用类似 W3C Trace Context 的机制进行跨服务传播。关键 Header 包括:

  • sw8: SkyWalking 自定义协议头(Base64 编码)
  • traceparent: W3C 标准头(可选)

在手动埋点时,我们通过 Carrier 对象封装这些 Header:

carrier = Carrier() context.inject(carrier)# 将当前上下文注入到 carrier headers =dict(carrier)# 转为字典,用于 HTTP 请求

接收方(如 Java 服务)则通过以下方式提取:

@GetMapping("/trace")publicStringtrace(HttpServletRequest request){String sw8 = request.getHeader("sw8");// SkyWalking Agent 会自动解析并恢复上下文return"Traced!";}

这种机制确保了即使跨越不同语言、不同框架,调用链依然连贯。


📊 数据上报协议:gRPC vs HTTP

SkyWalking 支持两种上报协议:

协议优点缺点
gRPC高效、低延迟、支持流式传输需要额外依赖,防火墙可能限制
HTTP兼容性好,调试方便性能略低,不支持双向流

在 Python 中切换协议只需修改 protocol 参数:

config.init( collector_address='127.0.0.1:12800',# HTTP Collector 默认端口 service_name='my-service', protocol='http')
更多协议说明请参考:https://skywalking.apache.org/docs/main/latest/en/setup/backend/backend-setup/

🎯 性能影响评估

引入 SkyWalking 是否会影响应用性能?答案是:极小影响

  • 自动埋点仅在首次加载类时进行字节码增强(Java)或装饰器注入(Python)。
  • 数据上报异步进行,不影响主流程。
  • 可配置采样率,默认 100%,生产环境可调整为 10%~30% 以降低开销。

Python 中设置采样率:

config.init( collector_address='127.0.0.1:11800', service_name='sampled-service', sample_rate=0.3# 30% 采样)

🧩 插件生态与框架支持

skywalking-python 内置支持主流框架:

  • ✅ Flask
  • ✅ Django
  • ✅ FastAPI
  • ✅ Tornado
  • ✅ aiohttp
  • ✅ urllib3 / requests
  • ✅ Redis / MySQL / PostgreSQL(需手动启用)

启用插件示例:

import skywalking.plugin.install skywalking.plugin.install()# 自动安装所有已知插件

你也可以按需启用:

from skywalking.plugin import plugin_flask, plugin_requests plugin_flask.install() plugin_requests.install()

🧭 分布式追踪原理图解

下面用 Mermaid 展示一个典型的跨服务调用链:

🗄️ Database🛒 Order Service (Python)👤 User Service (Java)⚙️ API Gateway (Python)🖥️ Client🗄️ Database🛒 Order Service (Python)👤 User Service (Java)⚙️ API Gateway (Python)🖥️ ClientGET /user/123/ordersHTTP GET /api/user/123{ "name": "Alice" }HTTP GET /orders?user_id=123SELECT * FROM orders WHERE user_id=123[{order_id: 1}, ...][{order_id: 1}, ...]{ "user": "...", "orders": [...] }

在这个调用链中:

  • 每个服务节点都会生成自己的 Span。
  • 上下文通过 Header 传递,确保 TraceID 一致。
  • 最终在 SkyWalking UI 中形成完整的火焰图(Flame Graph)。

🧪 示例三:异步任务追踪(Celery)

对于后台任务系统如 Celery,我们同样可以追踪:

from celery import Celery from skywalking import agent, config config.init(collector_address='127.0.0.1:11800', service_name='celery-worker', protocol='grpc') agent.start() app = Celery('tasks', broker='redis://localhost:6379')@app.taskdefsend_email(user_id, content): context = get_context()with context.new_local_span(op="Task/send_email")as span: span.tag(Tag(key="user.id", val=str(user_id)))# 模拟发送耗时import time time.sleep(1)print(f"Email sent to user {user_id}")

调用该任务:

from tasks import send_email send_email.delay(123,"Welcome!")

在 SkyWalking 中,你将看到独立于 HTTP 请求的任务追踪记录。


📈 指标与告警

除了追踪,SkyWalking 还提供丰富的指标监控:

  • 服务吞吐量(TPS)
  • 平均响应时间
  • 错误率
  • JVM / Python 内存 & GC(Java 支持更完善)

你可以配置告警规则,例如:

“当 UserService 响应时间 > 500ms 持续 5 分钟,则触发告警”

告警可通过 Webhook、Slack、钉钉等渠道通知。

配置文件示例(alarm-settings.yml):

rules:-name: service_resp_time_rule expression: service_resp_time > 500 duration:5message:"Service response time is too high"
告警规则文档:https://skywalking.apache.org/docs/main/latest/en/setup/backend/backend-alarm/

🛠️ 故障排查技巧

1. 数据未上报?

检查以下几点:

  • OAP 服务是否运行?端口是否开放?
  • collector_address 配置是否正确?
  • 防火墙是否阻止了 gRPC/HTTP 连接?
  • 日志中是否有错误?启用 debug 模式:
import logging logging.basicConfig(level=logging.DEBUG)

2. 调用链断裂?

  • 确保上下游都正确注入/提取了上下文 Header。
  • 检查中间件(如 Nginx、API Gateway)是否透传了 sw8 头。
  • 在 Java 端确认是否启用了 @TraceCrossThread 注解(异步场景)。

3. UI 无数据显示?

  • 确认服务名是否拼写一致。
  • 检查时间范围选择是否正确。
  • 查看 OAP 日志是否有数据写入异常。

🌐 生产环境最佳实践

✅ 1. 合理命名服务与端点

避免使用默认名如 python-service,应体现业务含义:

service_name ="order-management-api"

Endpoint 命名建议:

@app.route('/v1/orders/<int:order_id>')defget_order(order_id):...# 在 Span 中显式设置操作名 span.operation_name ="OrderService.GetOrder"

✅ 2. 使用环境变量配置

不要硬编码配置,推荐使用环境变量:

import os config.init( collector_address=os.getenv('SW_AGENT_COLLECTOR_BACKEND_SERVICES','127.0.0.1:11800'), service_name=os.getenv('SW_AGENT_NAME','default-service'), sample_rate=float(os.getenv('SW_AGENT_SAMPLE_RATE','1.0')))

✅ 3. 优雅关闭 Agent

在应用退出时,确保 Agent 正常关闭以完成数据上报:

import atexit from skywalking import agent atexit.register(agent.stop)

✅ 4. 监控 Agent 自身状态

SkyWalking 提供 /internal/stats 接口(Java Agent),Python 可通过日志监控缓冲区积压情况。


🧩 扩展功能:日志关联

SkyWalking 支持将 Trace ID 注入日志,便于在 ELK 或 Loki 中关联查询。

Python 中可使用 logging.Filter

import logging from skywalking.trace.context import get_context classSkyWalkingFilter(logging.Filter):deffilter(self, record): context = get_context()if context.segment: record.trace_id = context.segment.trace_id else: record.trace_id ="N/A"returnTrue logger = logging.getLogger(__name__) logger.addFilter(SkyWalkingFilter())

然后在日志格式中加入 %(trace_id)s

handler.setFormatter(logging.Formatter('[%(asctime)s] [%(trace_id)s] %(levelname)s - %(message)s'))

这样,每条日志都会带上 Trace ID,实现“从日志跳转到调用链”。


🧠 高级特性:动态采样与条件追踪

在高流量场景下,全量采集成本过高。SkyWalking 支持动态采样策略:

from skywalking import config config.sample_rate =0.1# 10%# 或者根据条件采样defshould_sample(span):if span.operation_name.startswith("Payment"):returnTrue# 支付相关接口 100% 采样returnFalse config.sampling_policy = should_sample 

此外,还可以基于 Header 强制采样:

# 如果请求头包含 X-SkyWalking-Sampled: true,则强制采样if request.headers.get('X-SkyWalking-Sampled')=='true': context.force_sampling()

📦 与 OpenTelemetry 的关系

OpenTelemetry(OTel)是 CNCF 主导的新一代可观测性标准,旨在统一 Metrics、Logs、Traces。SkyWalking 已支持 OTel 协议:

  • 可接收 OTel 格式的数据(通过 OAP 的 OTLP Receiver)
  • skywalking-python 未来可能兼容 OTel API

目前建议:

  • 新项目可优先考虑 OTel
  • 现有 SkyWalking 用户无需迁移,两者可共存
OTel 官网:https://opentelemetry.io/
SkyWalking OTel 支持:https://skywalking.apache.org/docs/main/latest/en/protocols/otlp/

🧪 示例四:集成测试中的追踪验证

在单元测试或集成测试中,你可能希望验证追踪行为是否符合预期:

import unittest from skywalking.trace.context import get_context classTestTracing(unittest.TestCase):deftest_span_creation(self): context = get_context()with context.new_local_span(op="TestSpan")as span: span.tag(Tag(key="test.key", val="test.value")) self.assertIsNotNone(span.span_id) self.assertEqual(span.operation_name,"TestSpan")deftest_context_propagation(self): context = get_context() carrier = Carrier() context.inject(carrier) self.assertIn("sw8", carrier.items)

你还可以模拟 OAP 上报,验证数据结构是否正确。


🌍 多数据中心部署

在跨地域部署时,建议每个区域部署独立的 OAP 集群,通过 cluster 配置实现数据聚合:

# oap-server.yamlcluster:selector: ${SW_CLUSTER:zookeeper}zookeeper:namespace: ${SW_NAMESPACE:""}hostPort: ${SW_CLUSTER_ZK_HOST_PORT:localhost:2181}

Python Agent 无需特殊配置,只需指向本地 OAP 即可。


💡 小贴士:提升可观测性体验

  1. 善用 Tag:为 Span 添加业务语义标签,如 user_id, order_status
  2. 标准化操作名:使用 Domain.Action.Resource 格式,如 Order.Create, Payment.Refund
  3. 关联错误信息:捕获异常时记录到 Span:
try: risky_operation()except Exception as e: span.error_occurred =True span.log(e)
  1. 设置超时告警:对关键路径设置 SLA,超时自动告警。

🏁 总结

通过 skywalking-python,我们可以轻松为 Python 应用添加分布式追踪能力,无论你是使用 Flask、Django、FastAPI,还是 Celery、aiohttp,都能找到合适的集成方式。配合 Java 服务,构建完整的跨语言调用链,实现真正的端到端可观测性。

SkyWalking 不仅仅是一个“监控工具”,更是系统稳定性工程的重要组成部分。它帮助我们在故障发生前预警,在故障发生时快速定位,在复盘时提供数据支撑。


📚 延伸阅读


🚀 动手实践建议

  1. 在本地部署 SkyWalking OAP + UI(Docker 一键启动)
  2. 创建一个 Flask + Spring Boot 的简单 Demo
  3. 观察调用链、响应时间、错误率等指标
  4. 尝试配置告警规则
  5. 将 Trace ID 注入日志,体验关联查询

可观测性不是“锦上添花”,而是现代软件系统的“生命线”。从今天开始,为你的 Python 应用装上 SkyWalking 的眼睛吧!👁️‍🗨️


🌟 提示:技术世界没有银弹,但有不断进化的工具。SkyWalking 是你构建稳定系统的得力助手,而非替代品。真正的稳定性,源于良好的架构设计、完善的测试体系、以及持续的监控优化。

Happy Tracing! 🎉


🙌 感谢你读到这里!
🔍 技术之路没有捷径,但每一次阅读、思考和实践,都在悄悄拉近你与目标的距离。
💡 如果本文对你有帮助,不妨 👍 点赞、📌 收藏、📤 分享 给更多需要的朋友!
💬 欢迎在评论区留下你的想法、疑问或建议,我会一一回复,我们一起交流、共同成长 🌿
🔔 关注我,不错过下一篇干货!我们下期再见!✨

Read more

[玩转树莓派CM0 之 BLE] Python 与 BLE 设备通信指南 -- P1

前言 树莓派 CM0 有带无线功能与不带无线功能两个版本, 具体可以参考如下型号说明: 型号前缀无线RAM LPDDR2eMMC存储CM0000000CM00 = No00 = 512MB000 = 0GB (Lite)CM0000008CM00 = No00 = 512MB008 = 8GBCM0000016CM00 = No00 = 512MB016 = 16GBCM0100000CM01 = Yes00 = 512MB000 = 0GB (Lite)CM0100008CM01 = Yes00 = 512MB008 = 8GBCM0100016CM01 = Yes00 = 512MB016 = 16GB 或者查看你的板子中是否带有无线模块 如果你的 CM0 具备无线功能, 则可以继续按照本教程的后续步骤进行测试. 需要知道的基础知识 在BLE通信中, 设备工作于主从模式: * 主设备可主动扫描并连接周围从设备的广播信号. * 从设备则通过广播自身信号, 被动等待主设备发起连接. 用一个简单的比喻, 这就像美食街里有许多商贩在叫卖 (从设备广播) , 而顾

By Ne0inhk
python-django-flask的在线食品安全信息平台

python-django-flask的在线食品安全信息平台

目录 * 技术架构设计 * 核心功能实现 * 安全与性能 * 部署与扩展 * 代码示例(Flask) * 测试与优化 * 开发技术路线 * 结论 * 源码lw获取/同行可拿货,招校园代理 :文章底部获取博主联系方式! 技术架构设计 后端框架选择:对比Django与Flask的适用性,Django适合全栈开发(内置ORM、Admin),Flask更轻量适合微服务。 数据库设计:使用PostgreSQL或MySQL存储食品检测数据,设计多表关联(食品分类、检测报告、企业信息)。 RESTful API:Flask-RESTful或Django REST framework构建接口,支持数据增删改查和JSON响应。 核心功能实现 数据爬取与清洗:Scrapy或BeautifulSoup爬取政府公开数据(如FDA数据库),Pandas清洗异常值。 关键词检索优化:Elasticsearch实现食品名称、企业名称的高效搜索,支持模糊匹配。 可视化图表:集成Matplotlib或ECharts,生成污染物含量趋势图、地域分布热力

By Ne0inhk
【开源工具】深度解析:Python+PyQt5打造微信多开神器 - 原理剖析与完整实现

【开源工具】深度解析:Python+PyQt5打造微信多开神器 - 原理剖析与完整实现

🚀【开源工具】深度解析:Python+PyQt5打造微信多开神器 - 原理剖析与完整实现 🌈 个人主页:创客白泽 - ZEEKLOG博客 🔥 系列专栏:🐍《Python开源项目实战》 💡 热爱不止于代码,热情源自每一个灵感闪现的夜晚。愿以开源之火,点亮前行之路。 👍 如果觉得这篇文章有帮助,欢迎您一键三连,分享给更多人哦 📖 前言 微信作为国民级IM工具,但官方始终未提供多开功能。本文将深入讲解如何利用Python+PyQt5开发跨平台微信多开助手,突破官方限制。不同于网上简单的多开脚本,本项目实现了: * 自动化路径探测 * 可视化操作界面 * 多模式多开机制 * 完整的异常处理体系 🎯 一、功能全景 1.1 核心功能矩阵 功能模块技术实现亮点智能路径探测注册表查询+全盘扫描支持99%的安装场景可视化交互PyQt5自定义UI组件媲美原生应用的体验多开引擎子进程管理+沙盒隔离支持三种多开模式配置持久化QSettings序列化自动记忆用户偏好 1.2 技术栈深度 图形界面PyQt5多线程搜索跨进程通信注册表操作子进程管理

By Ne0inhk
【Python基础:语法第一课】Python 基础语法详解:变量、类型、动态特性与运算符实战,构建完整的编程基础认知体系

【Python基础:语法第一课】Python 基础语法详解:变量、类型、动态特性与运算符实战,构建完整的编程基础认知体系

🎬 个人主页:艾莉丝努力练剑 ❄专栏传送门:《C语言》《数据结构与算法》《C/C++干货分享&学习过程记录》 《Linux操作系统编程详解》《笔试/面试常见算法:从基础到进阶》《Python干货分享》 ⭐️为天地立心,为生民立命,为往圣继绝学,为万世开太平 🎬 艾莉丝的简介: 文章目录 * 1 ~> 常量和表达式 * 2 ~> 变量和类型 * 2.1 变量是什么 * 2.2 变量的语法 * 2.2.1 定义变量 * 2.2.2 使用变量 * 2.3 变量的类型:对于不同种类的变量作出区分 * 2.3.1 整数 * 2.

By Ne0inhk