Flutter for OpenHarmony:queue 异步任务队列管理(并发控制与任务调度) 深度解析与鸿蒙适配指南
欢迎加入开源鸿蒙跨平台社区:https://openharmonycrossplatform.ZEEKLOG.net

前言
在 Dart 中,异步操作(Future)通常是并发执行的。如果你在一个 for 循环里发起了 100 个网络请求:
for(var url in urls){fetch(url);// 瞬间发出100个请求}这会导致什么?
- 服务器爆炸:可能触发 API速率限制(429 Too Many Requests)。
- 客户端OOM:瞬间创建过多的 Socket 连接和 Buffer。
- UI 卡顿:大量的 Event Loop 任务阻塞。
我们需要一种机制来限制并发数,或者让任务串行执行。
queue package 就是这样一个轻量级的任务队列管理库。它允许你控制同时运行的 Future 数量。
对于 OpenHarmony 应用,特别是在处理文件批量上传、数据库批量写入或图片批量处理等场景,queue 是保护系统资源的防波堤。
一、核心功能
queue 的核心类是 Queue。它维护一个待执行的任务列表,并根据设定的 parallel(并发数)来调度执行。
- Serial Queue (parallel: 1): 严格串行,一个接一个。
- Concurrent Queue (parallel: N): 最多同时跑 N 个。
- Task Management: 添加任务,取消剩余任务(部分支持)。
槽位 1
槽位 2
等待
完成
调度下一个
多个任务 1..100
Queue
运行中
运行中
挂起任务
二、集成与用法详解
2.1 添加依赖
dependencies:queue: ^3.4.0 2.2 基础用法
import'package:queue/queue.dart';voidmain()async{// 1. 创建队列,限制并行数为 2final queue =Queue(parallel:2);// 2. 添加任务for(int i =0; i <5; i++){ queue.add(()async{print('开始任务 $i');awaitFuture.delayed(Duration(seconds:1));print('结束任务 $i');});}// 3. 等待所有任务完成await queue.onComplete;print('全部完成!');}输出会是:同时有两个 Start,1秒后两个 End,然后接下两个…

2.3 获取返回值
queue.add 会返回一个 Future,你可以等待单个任务的结果。
final resultFuture = queue.add(()async{returnawait http.get(...);});print(await resultFuture);
三、OpenHarmony 适配与实战:批量文件上传
在鸿蒙 App 中,用户选择了 50 张照片要上传到服务器。如果我们并发 50 个上传任务,可能会因为内存占用过高被系统杀掉,或者导致网络拥塞传输极慢。
3.1 实现并发控制上传器
import'package:queue/queue.dart';classUploadManager{// 限制同时上传 3 张图片final _uploadQueue =Queue(parallel:3);Future<void>uploadImages(List<String> filePaths)async{final futures =<Future>[];for(final path in filePaths){// 将任务加入队列final future = _uploadQueue.add(()=>_doUpload(path)); futures.add(future);}// 等待全部完成awaitFuture.wait(futures);}Future<String>_doUpload(String path)async{print('正在上传 $path...');// 模拟耗时awaitFuture.delayed(Duration(seconds:2));return'url_for_$path';}voidcancelAll(){ _uploadQueue.cancel();// 取消尚未开始的任务}}使用 queue 后,无论用户一次选了多少图,系统最多只维护 3 个上传连接,既快又稳。

3.2 数据库写入保护
SQLite(尤其是 sqflite)通常只支持单线程写入。如果多个并发 Future 试图同时写库,可能会遇到锁竞争。
虽然数据库驱动通常有内部锁,但在业务层使用 Queue(parallel: 1) 强制串行化写入任务,可以从逻辑上避免竞争条件,确保写入顺序。
final dbQueue =Queue(parallel:1);Future<void>logAction(String action){return dbQueue.add(()async{await db.insert('logs',{'action': action});});}
四、进阶:与 Stream 结合
如果你有一个源源不断的任务流(比如实时处理每一帧与服务器通信),可以将 Queue 包装在 Stream 的 listen 回调中。
stream.listen((data){ queue.add(()=>process(data));});但要注意,如果生产者的速度远快于消费者的速度(Queue 处理不过来),queue 内部的待处理列表会无限堆积,最终导致 OOM。这种情况下你需要通过 Backpressure(背压)机制来丢弃任务或暂停生产者。
五、总结
queue 是 Dart 异步编程中被低估的工具。它用不到 100 行代码解决了复杂的并发调度问题。
对于 OpenHarmony 开发者:
- 网络优化:用 queue 限制 API 并发,避免拥塞。
- 资源保护:控制文件 IO 和内存密集型任务的并发度,防止应用崩溃。
它非常适合作为应用基础架构的一部分(BaseRepository, NetworkManager)。
最佳实践:
- 按场景创建 Queue:不要全局共用一个 Queue,应该为网络、DB、文件各自创建 Queue。
- 设置合理的 parallel:网络请求通常 4-6 个,IO 操作通常 1-2 个。
- 处理异常:
queue.add返回的 Future 可能会抛出异常,记得在 await 时 catch 它,否则可能中断整个流程。
六、完整实战示例
import'package:queue/queue.dart';classDownloadManager{// 限制同时只能有 2 个下载任务,避免带宽占满final _queue =Queue(parallel:2);Future<void>downloadFiles(List<String> urls)async{final futures =<Future>[];for(var url in urls){// queue.add 返回的是任务闭包的返回值 (Future)// 这里的 catchError 保证单个失败不影响整体等待final task = _queue.add(()=>_startDownload(url)).catchError((e){print('❌ 下载失败 $url: $e');}); futures.add(task);}// 等待所有下载结束awaitFuture.wait(futures);print('所有文件处理完毕');}Future<String>_startDownload(String url)async{final activeCount = _queue.running;final pendingCount = _queue.pending;print('⬇️ 开始下载: $url (当前进行中: $activeCount, 等待中: $pendingCount)');// 模拟不同大小文件的下载时间awaitFuture.delayed(Duration(milliseconds:500));if(url.contains('error'))throwException('404 Not Found');print('✅ 下载完成: $url');return'/storage/$url';}}voidmain()async{final manager =DownloadManager();// 模拟一批文件,其中包含一个坏链接final urls =['file1.zip','file2.zip','file_error.zip','file3.zip','file4.zip'];await manager.downloadFiles(urls);}