1. 需求及业务场景介绍
1.1. 业务背景
在新能源电力系统中,需对多达100个分布式光伏站点的实时气象与负荷数据进行采集、聚合,并执行短期功率预测。系统部署于单台服务器(如边缘网关或本地工作站),资源有限但要求高吞吐与低延迟。
1.2. 核心需求
- 支持 100 个站点 并发数据生成与处理;
- 单机环境下避免线程/进程爆炸,控制内存与 CPU 开销;
- 数据采集频率可配置(默认每10秒一批),适应不同业务节奏;
- 预测任务虽为 CPU 密集型,但总量可控,适合少量工作进程并行;
- 系统长期运行稳定,具备自动恢复与日志追溯能力。
1.3. 业务流程图

图注说明:异步采集:使用 asyncio 并发采集多个站点数据;无锁汇聚:通过内存字典实现轻量级聚合,超时自动清理;并行计算:利用 multiprocessing 分布计算负载,提升吞吐;跨层级通信:协程间用 asyncio.Queue,协程与进程间用 MPQueue。
1.4. 部署环境约束
| 项目 | 说明 |
|---|
| 部署模式 | 单机部署(无集群) |
| 站点规模 | 最多 100 个站点 |
| 计算资源 | 通常为 4–16 核 CPU,8–32 GB 内存 |
| 网络 | 局域网模拟 I/O、互联网 |
| 持久化 | 本地磁盘写入结果文件 |
✅ 本框架专为该类轻量级、高并发、单机场景优化:
利用 asyncio 处理 100 路 I/O 并发,仅启动少量预测进程(如 2–4 个)执行计算,避免资源争抢。
2. 异步技术特点及优点
2.1. 异步相对同步方案对比
| 对比维度 | 同步方案(Blocking I/O) | 异步方案(Async/Await) |
|---|
| I/O 效率 | 单线程阻塞等待,资源利用率低 | 多路复用事件循环,I/O 等待期间可执行其他任务 |
| 并发能力 | 需多线程/进程,开销大 | 单线程内并发,轻量高效 |
| 延迟敏感度 | 易受慢速I/O影响,整体响应变慢 | 快速响应,适合高并发低延迟场景 |
| 内存占用 | 每个请求占一个线程栈,内存成本高 | 共享事件循环,内存开销极小 |
| 可扩展性 | 扩展受限于线程池大小 | 易扩展至数千并发连接 |
2.2. 单机环境下的异步优势
- 避免“100线程”陷阱:若使用同步+多线程方案,100个站点可能创建100个线程,导致上下文切换开销剧增、内存暴涨;
- CPU 利用率更合理:异步 I/O 在等待期间释放 CPU,让出资源给预测进程;
- 响应更快:即使某站点数据延迟,不影响其他站点处理;
- 易于调优:只需调整
NUM_PREDICT_WORKERS(建议 = CPU 核数或略少),即可平衡 I/O 与计算负载。
📌 经验建议:在 8 核机器上,NUM_PREDICT_WORKERS=2~4 通常为最优配置,既利用多核,又留出资源给事件循环。
2.3. 异步优势总结
- ✅ 高吞吐:单线程处理多个站点数据采集;
- ✅ 低延迟:非阻塞等待,及时触发后续逻辑;
- ✅ 资源节约:无需创建大量线程或进程;
- ✅ 可维护性好:代码结构清晰,便于调试和监控。
3. 异步编程关键技术介绍
3.1 asyncio 框架
Python 标准库中的异步 I/O 框架,核心组件包括:
async/await:定义协程,实现非阻塞调用;asyncio.create_task():将协程放入事件循环调度;asyncio.gather():并行执行多个协程,等待全部完成;asyncio.Queue:线程安全的异步队列,用于协程间通信。
3.2 事件循环(Event Loop)
- 是异步程序的核心调度器;
- 负责监听 I/O 事件、定时器、任务就绪等;
- 在本项目中由
asyncio.run() 启动并管理整个生命周期。
3.3 协程(Coroutine)
- 使用
async def 定义的函数; - 可被挂起和恢复,不消耗线程资源;
- 示例:
generate_weather() 和 generate_load() 是独立协程,可通过 gather 并行执行。
3.4 异步队列(asyncio.Queue)
- 专为异步环境设计的生产者-消费者模型;
- 支持
put(), get(), get_nowait() 等方法; - 用于解耦数据生成与汇聚模块,实现松耦合架构。
3.5. 关键机制保障单机稳定性
- 队列限流:
asyncio.Queue(maxsize=1000) 防止突发流量打爆内存; - 超时清理:
AGGREGATE_TIMEOUT=30 秒自动丢弃未配对数据,避免 pending 字典无限增长; - 背压控制:数据生成间隔
DATA_GEN_INTERVAL=10 秒(可调),天然形成节奏控制; - 无共享状态:每个协程/进程操作独立数据,无需锁,降低复杂度。
4. 框架模拟程序设计说明
4.1 架构概览
本系统采用 “异步 + 多进程”混合架构,分为三个层次:
层次一:异步数据采集层(asyncio)
- 使用
AsyncDataGenerator 类并发生成各站点天气与负荷数据; - 利用
asyncio.gather() 实现批量并行生成; - 数据写入两个异步队列:
weather_queue 和 load_queue。
层次二:异步数据汇聚层(AsyncAggregator)
- 从两个队列读取数据,以
(batch_ts, site_id) 为键进行聚合; - 当某站点的天气和负荷数据都到达后,立即打包成任务投递至预测队列;
- 内置超时清理机制,防止因网络抖动造成内存泄漏;
- 使用
loop.run_in_executor() 将消息发送给进程队列,确保线程安全。
层次三:并行计算层(multiprocessing)
- 创建多个
Process 实例作为预测工作进程; - 使用
MPQueue 实现跨进程通信,传递预测任务; - 每个工作进程独立执行计算逻辑,模拟真实预测模型耗时;
- 结果以 JSONL 格式追加写入站点专属文件,支持流式输出与后续分析。
4.2 关键设计亮点
| 设计点 | 说明 |
|---|
| 无锁聚合 | 不使用锁,仅依赖字典和超时清理机制,性能高且避免死锁 |
| 混合通信机制 | 协程间用 asyncio.Queue,协程与进程间用 MPQueue,兼顾效率与隔离性 |
| 优雅退出机制 | 主控捕获 KeyboardInterrupt,发送终止信号,超时强制 kill |
| 可配置化参数 | 所有参数集中于 Config 类,便于调试与部署 |
| 日志分离 | 主进程与工作进程分别记录日志,便于排查问题 |
| 结果持久化 | 预测结果按站点分文件存储,格式为 JSONL,兼容大数据处理工具 |
4.3 架构适配单机 100 站点
性能设计要点
| 组件 | 单机优化策略 |
|---|
| 数据生成 | 使用 asyncio.gather 并行发起 100 个协程,实际由单线程事件循环调度,内存占用 ≈ O(1) |
| 数据汇聚 | 基于字典的无锁聚合,最大 pending 项 ≈ 100(每批),内存可控 |
| 预测计算 | 启动 NUM_PREDICT_WORKERS 个进程(默认 2),避免进程过多导致调度开销 |
| 结果写入 | 每个站点独立 .jsonl 文件,避免文件锁竞争,支持后续按站点分析 |
资源消耗估算(100 站点)
| 资源类型 | 估算值(稳态) |
|---|
| 内存 | < 200 MB(主要为 pending 缓存 + 队列) |
| CPU | 事件循环 ≈ 1 核,预测进程 ≈ N 核(N = 工作进程数) |
| 磁盘 I/O | 每 10 秒写入 100 条 JSONL 记录,极低负载 |
💡 实测建议:在普通台式机(6核16G)上可流畅运行 100 站点 × 10秒/批 的负载。
4.4 运行流程示意
data_generator()启动启动多个预测工作进程主协程启动生成数据写入 weather/load 队列aggregator.run()从队列读取聚合投递任务 -> MPQueue预测进程消费任务执行计算写入结果文件
4.5. 部署环境适配说明
为何适合单机?
- 零外部依赖:不依赖数据库、消息队列或分布式调度器;
- 自包含架构:所有组件(生成、汇聚、计算)在同一进程组内协调;
- 资源隔离清晰:I/O 与计算分离,互不干扰;
- 启动简单:
python main.py 即可运行,适合边缘设备或开发测试。
扩展性边界
- 上限:在单机上可扩展至 200–300 站点(取决于 I/O 模拟耗时与预测频率);
- 瓶颈:当预测任务极度密集(如每秒多批)时,应增加
NUM_PREDICT_WORKERS 或升级硬件; - 不适用场景:跨多机、TB 级数据、毫秒级超低延迟要求。
5. python源代码
""" 异步数据汇聚与并行计算框架 - 精简优化版 """import asyncio import time import logging import json import os import random import sys from typing import Dict, Any, Tuple from dataclasses import dataclass, asdict from datetime import datetime from multiprocessing import Process, Queue as MPQueue import multiprocessing # 配置类 - 集中管理全局参数@dataclassclassConfig: NUM_SITES:int=5# 站点数量 NUM_PREDICT_WORKERS:int=2 AGGREGATE_TIMEOUT:int=30 RESULT_DIR:str="results" DATA_GEN_INTERVAL:int=10# 调试用10秒def__post_init__(self): self.SITE_IDS =[f"site_{i:03d}"for i inrange(1, self.NUM_SITES +1)] config = Config()# 日志系统,主进程与工作进程分离日志,避免混杂;主日志含控制台+文件,工作进程仅文件defsetup_main_logger(): logger = logging.getLogger("Main") logger.setLevel(logging.INFO)ifnot logger.handlers: handler = logging.StreamHandler(sys.stdout) handler.setFormatter(logging.Formatter("%(asctime)s [%(levelname)s] %(name)s: %(message)s")) logger.addHandler(handler)# 文件日志 fh = logging.FileHandler("main.log", encoding='utf-8') fh.setFormatter(logging.Formatter("%(asctime)s [%(levelname)s] %(message)s")) logger.addHandler(fh) logger.propagate =Falsereturn logger defsetup_worker_logger(worker_id:int): logger = logging.getLogger(f"Worker-{worker_id}") logger.setLevel(logging.INFO)ifnot logger.handlers: fh = logging.FileHandler(f"worker_{worker_id}.log", encoding='utf-8') fh.setFormatter(logging.Formatter(f"[W{worker_id}] %(asctime)s [%(levelname)s] %(message)s")) logger.addHandler(fh) logger.propagate =Falsereturn logger logger = setup_main_logger()# 数据模型 - 使用dataclass实现不可变数据容器@dataclassclassWeatherData: site_id:str# 温度、辐照度字段带单位后缀提示 temperature:float# 单位:℃ irradiance:int# 单位:W/m² timestamp:float@classmethoddefgenerate(cls, site_id:str)->'WeatherData': random.seed(hash(site_id)%1000)return cls(site_id,round(20+ random.uniform(0,10),1),800+ random.randint(0,200), time.time())@dataclassclassLoadData: site_id:str power_kw:float timestamp:float@classmethoddefgenerate(cls, site_id:str)->'LoadData': random.seed(hash(site_id)%1000+1000)return cls(site_id,round(100+ random.uniform(0,50),1), time.time())# 异步数据生成器 - 核心调度逻辑classAsyncDataGenerator:def__init__(self, config: Config): self.config = config asyncdefgenerate_weather(self, site_id:str)-> WeatherData:await asyncio.sleep(1.0)return WeatherData.generate(site_id)asyncdefgenerate_load(self, site_id:str)-> LoadData:await asyncio.sleep(0.1)return LoadData.generate(site_id)asyncdefgenerate_batch(self)-> Tuple[int, Dict[str, WeatherData], Dict[str, LoadData]]:"""批量生成天气/负荷数据 通过asyncio.gather实现并行I/O模拟,返回按站点分组的字典结构""" batch_ts =int(time.time()) logger.info(f"开始生成批次 {batch_ts}...")try: weather_tasks =[self.generate_weather(sid)for sid in self.config.SITE_IDS] load_tasks =[self.generate_load(sid)for sid in self.config.SITE_IDS] weather_results =await asyncio.gather(*weather_tasks, return_exceptions=True) load_results =await asyncio.gather(*load_tasks, return_exceptions=True) weather_dict ={sid: res for sid, res inzip(self.config.SITE_IDS, weather_results)ifnotisinstance(res, Exception)} load_dict ={sid: res for sid, res inzip(self.config.SITE_IDS, load_results)ifnotisinstance(res, Exception)} logger.info(f"批次 {batch_ts} 生成完成")return batch_ts, weather_dict, load_dict except Exception as e: logger.error(f"生成批次失败: {e}")return batch_ts,{},{}# 异步汇聚器 - 无锁聚合设计classAsyncAggregator:def__init__(self, config: Config, predict_queue: MPQueue): self.config = config self.predict_queue = predict_queue self.pending ={} self.running =Trueasyncdefrun(self, weather_queue, load_queue): logger.info("汇聚器启动") asyncio.create_task(self._cleanup_loop())while self.running:await self._process_queue_once(weather_queue,'weather')await self._process_queue_once(load_queue,'load')await asyncio.sleep(0.01) logger.info("汇聚器停止")asyncdef_process_queue_once(self, queue, data_type:str):"""单次队列处理逻辑 采用(batch_ts, site_id)作为聚合键,超时自动清理机制保障内存安全""" loop = asyncio.get_running_loop()try: item =await asyncio.wait_for(loop.run_in_executor(None, queue.get_nowait), timeout=0.01) batch_ts, site_id, data = item key =(batch_ts, site_id)if key notin self.pending: self.pending[key]={'_ts': time.time(),'weather':None,'load':None} self.pending[key][data_type]= data if self.pending[key]['weather']and self.pending[key]['load']: task ={"batch_ts": batch_ts,"site_id": site_id,"weather": asdict(self.pending[key]['weather']),"load": asdict(self.pending[key]['load'])}await loop.run_in_executor(None,lambda: self.predict_queue.put(task)) logger.info(f"聚合完成: {batch_ts}/{site_id}")del self.pending[key]except Exception:pass# 包括 Empty 和 Timeoutasyncdef_cleanup_loop(self):"""定期清理超时 pending 项,防止内存泄漏"""while self.running:await asyncio.sleep(10) now = time.time() stale =[k for k, v in self.pending.items()if now - v['_ts']> self.config.AGGREGATE_TIMEOUT]for k in stale:del self.pending[k]if stale: logger.warning(f"清理 {len(stale)} 个超时项")defstop(self): self.running =False# 预测工作进程 - 独立进程执行 CPU 密集型并行计算,通过 MPQueue 接收任务,结果写入 JSONL 文件defprediction_worker(worker_id:int, config: Config, predict_queue: MPQueue):"""模拟3秒计算,输出带 worker_id 的预测结果,持久化到站点专属文件""" random.seed(time.time()+ worker_id) log = setup_worker_logger(worker_id) log.info(f"计算进程{worker_id}启动") count =0try:whileTrue:try: task = predict_queue.get(timeout=1)if task isNone:break time.sleep(3) result ={"batch_ts": task["batch_ts"],"site_id": task["site_id"],"forecast_power_kw":round(task["load"]["power_kw"]*(1+ random.uniform(-0.1,0.1)),2),"weather_temp": task["weather"]["temperature"],"compute_seconds":3.0,"worker_id": worker_id,"created_at": datetime.now().isoformat()}withopen(os.path.join(config.RESULT_DIR,f"{task['site_id']}.jsonl"),"a", encoding='utf-8')as f: f.write(json.dumps(result, ensure_ascii=False)+"\n") log.info(f"完成 {task['site_id']}") count +=1except Exception as e:if"timeout"notinstr(e).lower(): log.error(f"异常: {e}")finally: log.info(f"退出,共处理 {count} 项")# 主应用,异步数据汇聚与并行计算框架的主协调器,负责统筹数据生成、汇聚、预测计算的全流程classDataPipeline:def__init__(self, config: Config): self.config = config self.workers =[] self.weather_queue = asyncio.Queue(maxsize=1000) self.load_queue = asyncio.Queue(maxsize=1000) self.predict_queue = MPQueue(maxsize=1000) self.aggregator =None self.running =Truedefstart_workers(self):for i inrange(self.config.NUM_PREDICT_WORKERS): p = Process(target=prediction_worker, args=(i+1, self.config, self.predict_queue), daemon=True) p.start() self.workers.append(p) logger.info(f"启动 Worker-{i+1}, PID={p.pid}")asyncdefdata_generator(self): gen = AsyncDataGenerator(self.config)while self.running: batch_ts, weathers, loads =await gen.generate_batch()for sid in self.config.SITE_IDS:if sid in weathers:await self.weather_queue.put((batch_ts, sid, weathers[sid]))if sid in loads:await self.load_queue.put((batch_ts, sid, loads[sid]))await asyncio.sleep(self.config.DATA_GEN_INTERVAL)# 主应用 - 异步+多进程架构asyncdefrun(self):"""主协调器,启动数据生成协程、汇聚协程和预测工作进程,统一调度生命周期""" self.start_workers() self.aggregator = AsyncAggregator(self.config, self.predict_queue) tasks =[ asyncio.create_task(self.data_generator()), asyncio.create_task(self.aggregator.run(self.weather_queue, self.load_queue))]try:await asyncio.gather(*tasks)except asyncio.CancelledError:passfinally: self.running =Falseif self.aggregator: self.aggregator.stop()# 资源清理 - 优雅退出机制defcleanup(self):"""进程级资源回收,优雅退出:发送 None 终止信号,超时强制 kill 保障退出可靠性""" logger.info("清理资源...")# 终止工作进程for _ inrange(len(self.workers)):try: self.predict_queue.put_nowait(None)except:passfor p in self.workers: p.join(timeout=3)if p.is_alive(): p.terminate() p.join(2) logger.info("清理完成")# 入口函数,支持 Windows 多进程冻结环境,捕获 KeyboardInterrupt 实现平滑关闭defmain(): pipeline = DataPipeline(config)try: asyncio.run(pipeline.run())except KeyboardInterrupt: logger.info("收到中断信号")finally: pipeline.cleanup()if __name__ =="__main__":if sys.platform =="win32": multiprocessing.freeze_support() main()
6. 总结
单机场景价值重申
本框架在单台计算机上高效支撑 100 个站点的实时数据处理与预测任务,完美平衡了:
- 高并发 I/O(通过 asyncio);
- 适度并行计算(通过 multiprocessing);
- 资源节约与稳定性(通过限流、超时、优雅退出)。
推荐使用场景
- 边缘计算网关中的本地预测服务;
- 实验室/开发环境的仿真平台;
- 中小型园区微电网监控系统;
- 教学演示:展示异步+多进程混合编程范式。
后续优化建议(单机方向)
- 动态调节生成频率:根据系统负载自动调整
DATA_GEN_INTERVAL; - 内存监控告警:当 pending 项超过阈值时记录警告;
- 结果压缩归档:定期将
.jsonl 文件压缩,节省磁盘空间; - Web 控制面板:集成 FastAPI 提供状态查询与参数调整接口(仍保持单机部署)。
附录:关键类职责表
| 类名 | 职责描述 |
|---|
Config | 全局配置中心,统一管理参数 |
WeatherData / LoadData | 不可变数据模型,封装站点数据 |
AsyncDataGenerator | 异步生成原始数据,模拟 I/O 延迟 |
AsyncAggregator | 无锁聚合器,实现数据配对与任务分发 |
prediction_worker | 独立计算进程,执行预测逻辑并持久化结果 |
DataPipeline | 主协调器,统筹调度所有组件,负责生命周期管理 |