跳到主要内容
现代 C++ 任务并行编程系统:Taskflow 详解 | 极客日志
C++ AI 算法
现代 C++ 任务并行编程系统:Taskflow 详解 综述由AI生成 介绍基于现代 C++ 的任务并行编程系统 Taskflow。首先分析并行计算背景,对比 CPU 与 GPU 性能差异及 Amdahl 定律限制。随后详细讲解 Taskflow 的核心概念,包括静态与动态任务图、控制流任务图(CTFG)以及工作窃取(Work-stealing)运行时机制。文中通过多个 C++ 代码示例展示了如何使用 Taskflow 实现并行归约、排序、流水线及异步任务调度,并探讨了其在异构架构下的可扩展性优势。
t ag 发布于 2026/3/27 更新于 2026/6/1 30 浏览一、Why Parallel Computing?
Advance performance to a new level previously out of reach
将性能提升到过去无法达到的水平。
核心思想:
随着单核 CPU 频率提升变慢(Dennard scaling 失效),
性能提升的主要方式已经从'提高频率'转向'增加并行度' 。
也就是说:
以前:靠更快的单核
现在:靠更多核心 + GPU + 分布式
二、图表含义分析
图中表示:
Time (minutes) to speed up a machine learning algorithm
横轴:
1 CPU
8 CPUs
16 CPUs
24 CPUs
32 CPUs
40 CPUs
1 GPU
纵轴:
趋势:
随着 CPU 数量增加,时间下降
但 GPU 下降幅度更大
GPU 可以达到 10–100x 加速
三、为什么 GPU 可以 10–100 倍加速?
1⃣ 并行度数量差异
典型 CPU:
典型 GPU:
例如:
CPU 并行度 ≈ 32
GPU 并行度 ≈ 5000
在高度可并行任务中(如矩阵乘法):
理论加速比 ≈ 5000 / 32 ≈ 156
当然实际不会达到理论值,但 10–100x 是常见的。
四、Amdahl 定律解释加速极限
并行加速并不是无限的。
设:
Amdahl 定律:
Speedup(N) = 1 / ((1-P) + P/N)
举例
假设:
P = 0.95 (95% 可并行)
若使用 1000 个 GPU 核:
Speedup = 1 / (0.05 + 0.95/1000)
= 1 / (0.05 + 0.00095)
= 1 / 0.05095
≈ 19.6
即使有 1000 核,最多也只能约 20 倍加速。
这说明:
串行部分是性能瓶颈
五、为什么机器学习特别适合 GPU?
机器学习核心是:
这些都属于:
数据并行(Data Parallelism)
例如矩阵乘法:
C_ij = ∑_k A_ik * B_kj
每个 C_ij 都可以独立计算:
这意味着:
O(n^2) 个元素可以并行
这正是 GPU 擅长的结构。
六、为什么多 CPU 扩展不如 GPU?
1⃣ CPU 核心少
2⃣ 同步开销大
3⃣ 内存带宽限制
4⃣ NUMA 访问延迟
5⃣ 上下文切换开销
而 GPU:
七、理论 vs 实际 现实情况:
T(N) = T(1) / (N * η)
其中:
0 < η < 1
η 是效率因子。
当通信开销大时:
η → 0.5 或更低
八、一个简单 C++ 并行示例(OpenMP) #include <vector>
#include <omp.h>
#include <iostream>
int main () {
const int N = 100000000 ;
std::vector<double > data (N, 1.0 ) ;
double sum = 0.0 ;
#pragma omp parallel for reduction(+:sum)
for (int i = 0 ; i < N; ++i) {
sum += data[i];
}
std::cout << "Sum = " << sum << std::endl;
}
说明 如果使用 8 核:
理论时间:
T ≈ T_single / 8
九、并行计算的核心价值
更快训练 AI
更快物理仿真
更快金融建模
更快基因计算
更快科学计算
很多问题的时间复杂度是:
O(N^2) 或 O(N^3)
如果不并行,计算时间会爆炸。
十、总结
单核性能提升停滞
数据规模指数增长
GPU 提供数量级并行能力
机器学习天然可并行
10–100x 加速是现实而不是理论
一、Why Task-parallel Programming?
Modern parallel workloads are very complex and irregular
现代并行负载非常复杂且不规则
核心问题
图计算
电路仿真
AI + 物理模拟混合
自适应网格
异构调度
不规则依赖
不同计算粒度
不同硬件需求
动态生成任务
二、异构架构背景
Heterogeneous Architecture(异构架构)
CPU 负责控制逻辑
GPU 负责大规模矩阵计算
FPGA 负责特定加速模块
三、什么是 Task Parallelism?
数据并行 典型形式:
for i = 1...N: y_i = f(x_i)
每个元素独立。
适用于:
任务并行 把算法分解成任务:
T = {t_1, t_2, ..., t_n}
并定义依赖关系:
t_i → t_j
表示:
t_j 必须在 t_i 完成后执行
形成:
四、任务图模型
关键指标
1⃣ 总工作量(Work) W = ∑_{i=1}^{n} w_i
每个任务的计算量之和。
2⃣ 关键路径长度(Span) S = max_path ∑ w_i
最长依赖路径。
3⃣ 理论并行上限 根据 Brent 定理:
T_p ≥ max(W/P, S)
其中:
P = 处理器数量
五、为什么任务并行更可扩展?
Task parallelism is most scalable for future heterogeneous architectures
六、任务分解的思想
Capture your intention in decomposing an algorithm into a top-down task graph
意思是:
从算法逻辑结构出发,构建任务图。
形成:
t_1 → t_2 → t_3 → t_4 → t_5
但中间可能有分叉:
t_2 → {t_3, t_4}
七、实际系统示例
OpenMP
Kokkos
TBB Intel Threading Building Blocks
StarPU
DASK
PaRSEC
RAY
八、OpenMP 任务并行示例(带详细注释) #include <iostream>
#include <omp.h>
int main () {
#pragma omp parallel {
#pragma omp single {
#pragma omp task {
std::cout << "Task A running\n" ;
}
#pragma omp task {
std::cout << "Task B running\n" ;
}
#pragma omp taskwait
#pragma omp task {
std::cout << "Task C running after A and B\n" ;
}
}
}
return 0 ;
}
解释 表示:
必须等待之前任务完成。
这构成一个简单 DAG:
九、GPU + CPU 混合任务示意
任务 1:CPU 预处理
任务 2:GPU 计算
任务 3:CPU 后处理
时间模型:
T = T_cpu1 + T_gpu + T_cpu2
如果 GPU 并行度高:
T_gpu ≪ T_cpu1
总时间显著下降。
十、任务并行 vs 数据并行对比 特性 数据并行 任务并行 结构 规则 不规则 依赖 很少 明确依赖 扩展性 好 更好 异构适配 一般 极好 表达能力 低 高
十一、为什么说'最可扩展'? 未来趋势:
cores → 10^3 ~ 10^6
而:
规则问题越来越少
复杂系统越来越多
只有任务图可以表达:
十二、总结
能表达复杂依赖
能适配异构架构
可扩展到未来超大规模系统
能处理不规则问题
以 DAG 形式表达算法意图
关键数学模型:
G = (V, E)
T_p ≥ max(W/P, S)
Taskflow 静态任务图并行(Static Task Graph Parallelism)
一、什么是 Taskflow?
现代 C++ 任务并行库
header-only(只需包含头文件)
基于 DAG(有向无环图)的任务调度框架
支持静态和动态任务图
二、什么是 Static Task Graph? 数学模型:
定义任务图:
G = (V, E)
其中:
执行时间满足:
T_p ≥ max(W/P, S)
其中:
W = 总工作量
P = 线程数
S = 关键路径长度(critical path)
#include <iostream>
#include <taskflow/taskflow.hpp>
int main () {
tf::Executor executor;
tf::Taskflow taskflow;
auto [A, B, C, D] = taskflow.emplace (
[](){ std::cout << "TaskA\n" ; },
[](){ std::cout << "TaskB\n" ; },
[](){ std::cout << "TaskC\n" ; },
[](){ std::cout << "TaskD\n" ; });
A.precede (B, C);
D.succeed (B, C);
executor.run (taskflow).wait ();
return 0 ;
}
五、任务图结构解析 A.precede (B, C); D.succeed (B, C);
等价关系:
A → B
A → C
B → D
C → D
DAG 图结构
六、执行顺序分析 并行度分析:
若每个任务耗时为 t:
总工作量:
W = 4t
关键路径:
S = 3t
(A → B → D 或 A → C → D)
理论最优时间:
T_p ≥ max(4t/P, 3t)
如果 P ≥ 2:
T_p = 3t
七、为什么 Taskflow 是'静态任务图'?
所有任务在 run() 之前已经定义好
DAG 不会在执行过程中改变
对比:
动态任务图(Dynamic Task Graph):
八、Taskflow 设计优势
1⃣ 表达能力强
2⃣ 自动调度
九、和 OpenMP 对比
十、线程池模型 Executor 内部维护:
P 个 worker threads
当一个任务的入度为 0(无依赖)即可执行
依赖完成后触发后续任务
数学表达:
若任务 v 的入度:
deg^-(v) = 0
则可调度。
十一、关键路径决定性能 对于 DAG:
总时间近似:
T ≈ S + (W - S)/P
当:
P → ∞
有:
T → S
十二、扩展示例(加入耗时) auto [A,B,C,D] = taskflow.emplace (
[](){ std::this_thread::sleep_for (std::chrono::milliseconds (100 )); std::cout<<"A\n" ;},
[](){ std::this_thread::sleep_for (std::chrono::milliseconds (200 )); std::cout<<"B\n" ;},
[](){ std::this_thread::sleep_for (std::chrono::milliseconds (200 )); std::cout<<"C\n" ;},
[](){ std::cout<<"D\n" ;});
B 和 C 同时运行
总时间接近:
100 + 200 = 300 ms
而不是 500 ms。
十三、总结
用 DAG 表达算法
静态任务图提前构建
自动并行调度
关键路径决定性能
非常适合复杂依赖结构
Taskflow 动态任务图并行(Dynamic Task Graph Parallelism)
一、什么是动态任务图(Dynamic Task Graph)?
静态任务图 :先构建完整 DAG,再执行
动态任务图 :任务在运行过程中动态创建
二、动态任务图的数学模型 设任务集合为:
V = {t_1, t_2, ...}
依赖关系为:
E = {(t_i → t_j)}
在动态模型中:
V(t) 和 E(t)
会随着时间变化。
即:
G(t) = (V(t), E(t))
三、代码示例(修正版 + 详细注释) #include <iostream>
#include <taskflow/taskflow.hpp>
int main () {
tf::Executor executor;
auto A = executor.silent_dependent_async ([](){ std::cout << "TaskA\n" ;});
auto B = executor.silent_dependent_async ([](){ std::cout << "TaskB\n" ;}, A);
auto C = executor.silent_dependent_async ([](){ std::cout << "TaskC\n" ;}, A);
auto D = executor.silent_dependent_async ([](){ std::cout << "TaskD\n" ;}, B, C);
executor.wait_for_all ();
return 0 ;
}
四、DAG 结构分析 依赖关系:
A → B
A → C
B → D
C → D
五、执行流程解析
A 无依赖 → 立即执行
A 完成后:
B 和 C 完成后:
六、与静态任务图的区别 静态 Taskflow 动态 async taskflow.emplace silent_dependent_async 先构建 DAG 边创建边执行 run(taskflow) 直接提交到 executor 适合结构固定 适合运行时动态生成
七、关键性能分析 总工作量:
W = 4t
关键路径:
S = 3t
理论执行时间:
T_p ≥ max(W/P, S)
八、silent_dependent_async 的含义 silent_dependent_async (task, dependencies...)
创建任务
指定它依赖哪些任务
不返回 future(silent)
自动加入 executor
内部机制:
每个任务维护:
deg^-(v)
当:
deg^-(v) = 0
任务进入 ready queue。
九、为什么叫'动态'? auto A = executor.silent_dependent_async ([&executor](){ std::cout<<"TaskA\n" ;
executor.silent_dependent_async ([](){ std::cout<<"Dynamic task\n" ;});
});
十、调度模型(Work-Stealing)
维护多个 worker 线程
每个线程有本地任务队列
空闲线程会'窃取'其他线程任务
数学上:
若线程数为 P:
期望时间近似:
T_p ≈ W/P + O(S)
十一、动态模型适用场景
递归算法
图搜索
不规则计算
自适应网格
AI 推理流水线
动态负载均衡
例如:
分治算法:
T(n) = 2T(n/2) + O(n)
可以在运行中不断创建子任务。
十二、与 std::async 对比
十三、总结 动态任务图的本质:
G(t) = (V(t), E(t))
随着运行动态扩展。
支持运行时生成任务
适合不规则问题
更灵活
与异构计算更兼容
更接近未来并行编程模型
Control Taskflow Graph (CTFG) 编程模型
一、什么是 CTFG(Control Taskflow Graph)?
不仅表达'计算任务依赖',还表达'控制流逻辑'(if / while / loop / branch)
普通 DAG 只能表达:
A → B
但 CTFG 可以表达:
条件分支(if-else)
循环(while)
迭代优化
收敛判断
二、第一个例子:条件分支(if-else) auto [init, cond, yes, no] = taskflow.emplace (...);
完整示例:CTFG 条件任务 #include <iostream>
#include <taskflow/taskflow.hpp>
int main () {
tf::Executor executor;
tf::Taskflow taskflow;
auto [init, cond, yes, no] = taskflow.emplace (
[](){ std::cout << "initialize\n" ; },
[]()->int { std::cout << "checking condition\n" ; return 0 ; },
[](){ std::cout << "yes branch\n" ; },
[](){ std::cout << "no branch\n" ; });
cond.succeed (init);
cond.precede (yes, no);
executor.run (taskflow).wait ();
return 0 ;
}
三、条件任务的数学模型 定义:
t_cond : N → {0,1,...,k}
若:
t_cond() = i
则调度:
t_i
这是一个动态边选择机制 。
四、CTFG 循环优化示例 initialize ();
while (!converged ()){
optimize ();
}
print ("done" );
完整 CTFG 实现版本 #include <iostream>
#include <taskflow/taskflow.hpp>
bool converged () {
static int count = 0 ;
return ++count > 3 ;
}
int main () {
tf::Executor executor;
tf::Taskflow taskflow;
auto [init, opt, cond, stop] = taskflow.emplace (
[](){ std::cout << "initialize data\n" ; },
[](){ std::cout << "optimize step\n" ; },
[]()->int {return converged ()?1 :0 ; },
[](){ std::cout << "done!\n" ; });
cond.precede (opt, stop);
executor.run (taskflow).wait ();
return 0 ;
}
五、循环的任务图结构 init
|
v
opt
|
v
cond
/ \
opt stop
六、数学表达(循环模型) 任务图表示:
opt_i → cond_i → opt_{i+1}
直到:
cond_i = 1
七、CTFG 的核心价值 本质:
Program → Control DAG
八、为什么说'CTFG enables end-to-end parallelism expression'? 没有隐藏的串行控制逻辑。
这意味着:
整个程序 = G(V,E)
调度器可以:
九、性能模型 理论执行时间:
T_p ≥ max(W/P, S)
十、对比传统 while 循环 while (!converged ()){
optimize ();
}
十一、CTFG vs 数据并行 特性 数据并行 CTFG 表达能力 低 高 控制流 无 有 适合复杂算法 否 是 异构友好 一般 极好
十二、总结 数学本质:
Program = G(V,E)
执行时间受:
critical path
控制。
Taskflow 中的 Work-stealing Runtime(工作窃取运行时)
一、什么是 Work-stealing? Work-stealing(工作窃取)是一种并行调度算法:
空闲线程主动从其他线程'窃取'任务执行。
其目标是:
动态负载均衡
最大化 CPU 利用率
减少调度开销
保证高吞吐与低延迟
二、整体运行逻辑解析(结合流程图)
1⃣ Worker 主循环 while (true ){
if (queue.empty ()){ wait_or_steal (); }
else { execute_task (); }
}
2⃣ 队列是否为空?
情况 A:空
尝试从其他线程窃取任务
若无任务 → 等待(sleep)
情况 B:非空 CheckEmpty -- N --> Dequeue
三、执行任务 IsCondition{Condition task?}
四、条件任务执行路径 r = invoke (t)
enqueue r-th child
int r = t ();
enqueue (t.child (r));
数学表达:
设:
t_cond : N → {0,1,...,k}
若:
t_cond() = r
则只激活:
child_r
这实现了:
五、普通任务执行路径 invoke (t)
↓
DecDeps
↓
EnqueueSuccessors
invoke (t);
for (auto s : successors (t)){
s.remaining_deps--;
if (s.remaining_deps == 0 ) enqueue (s);
}
六、依赖计数模型(Strong Dependencies) 当:
deg^-(v) = 0
任务进入 ready queue。
七、完整调度数学模型
任务图为 G = (V, E)
总工作量 W
关键路径 S
线程数 P
八、为什么 Work-stealing 高效?
1⃣ 局部性好
2⃣ 负载均衡
3⃣ 低调度开销
九、与集中调度对比 复杂度对比:
集中式:
O(P)
窃取式:
O(1) 平均
十、动态图 + Work-stealing 的优势 即:
Active workers ≈ Available parallelism
保持:
十一、关键性能指标解释
Low Latency(低延迟) 任务一旦依赖满足:
deg^-(v) = 0
立即入队。
High Throughput(高吞吐) 单位时间执行任务数量:
Throughput = Tasks completed / Time
Work-stealing 减少 idle time。
Energy Efficient(节能)
减少 busy-wait
减少锁竞争
减少 cache miss
十二、代码伪实现(简化) void worker_loop () {
while (true ){
Task* t = pop_local ();
if (!t){
t = steal_from_others ();
if (!t){ sleep (); continue ; }
}
if (t->is_condition ()){
int r = t->invoke ();
enqueue (t->child (r));
} else {
t->invoke ();
for (auto s : t->successors){
if (--s->remaining_deps == 0 ) enqueue (s);
}
}
}
}
十三、为什么说'强平衡'?
若生成过快 → 更多线程参与
若生成过慢 → 线程空闲休眠
十四、关键路径影响 即使 Work-stealing 非常优秀:
总时间仍受限于:
S = critical path
当:
P → ∞
有:
T → S
十五、总结 Taskflow 的 Work-stealing Runtime:
每线程独立队列
条件任务支持控制流
强依赖计数机制
动态任务图支持
理论接近最优调度
低延迟
高吞吐
高能效
支持动态图
适合未来异构架构
完整示例代码 #include <iostream>
#include <taskflow/taskflow.hpp>
int main () {
tf::Executor executor;
tf::Taskflow taskflow;
tf::Task A = taskflow.emplace ([](){ std::cout << "Task A\n" ; }).name ("A" );
tf::Task C = taskflow.emplace ([](){ std::cout << "Task C\n" ; }).name ("C" );
tf::Task D = taskflow.emplace ([](){ std::cout << "Task D\n" ; }).name ("D" );
tf::Task B = taskflow.emplace ([](tf::Subflow& subflow){
std::cout << "Task B (start)\n" ;
tf::Task B1 = subflow.emplace ([](){ std::cout << "Task B1\n" ; }).name ("B1" );
tf::Task B2 = subflow.emplace ([](){ std::cout << "Task B2\n" ; }).name ("B2" );
tf::Task B3 = subflow.emplace ([](){ std::cout << "Task B3\n" ; }).name ("B3" );
B3. succeed (B1, B2);
std::cout << "Task B (end)\n" ;
}).name ("B" );
A.precede (B, C);
D.succeed (B, C);
executor.run (taskflow).wait ();
return 0 ;
}
任务结构解析
主任务图结构 依赖关系:
A → B
A → C
B → D
C → D
子任务图(B 内部)
执行顺序分析
A
B 和 C 并行
B 内部:
B1 和 B2 并行
B3 在 B1 和 B2 之后
D 在 B 和 C 完成后执行
并行度分析 总工作量:
W = 7t
关键路径(最长路径):
A → B → B3 → D
长度:
S = 4t
理论并行时间:
T_P ≥ max(7t/P, 4t)
Subflow 的重要特性
子任务图在 B 运行时动态创建
属于动态任务图(Dynamic Task Graph)
继承父任务的调度上下文
可嵌套
Subflow 执行语义
总结
静态主任务图
动态子任务图(Subflow)
条件依赖
嵌套并行
Work-stealing 调度
数学模型仍为:
T_P ≤ W/P + O(S)
其中:
graph TD %% 主任务图
A["任务 A"] --> B["任务 B"]
A --> C["任务 C"]
B --> D["任务 D"]
C --> D
%% 子任务图 B 内部
subgraph B 子任务图
B1["任务 B1"] --> B3["任务 B3"]
B2["任务 B2"] --> B3
B3 --> B
end
%% 样式美化
style A fill:#ffcc00,stroke:#333,stroke-width:2px
style B fill:#66ccff,stroke:#333,stroke-width:2px
style C fill:#ff99cc,stroke:#333,stroke-width:2px
style D fill:#99ff99,stroke:#333,stroke-width:2px
style B1 fill:#ccff99,stroke:#333
style B2 fill:#ccff99,stroke:#333
style B3 fill:#ccff99,stroke:#333
#include <taskflow/taskflow.hpp>
#include <iostream>
#include <cstdlib>
#include <ctime>
int main () {
std::srand (static_cast <unsigned >(std::time (nullptr )));
tf::Executor executor;
tf::Taskflow taskflow;
tf::Task init = taskflow.emplace ([](){ std::cout << "初始化任务 init\n" ; }).name ("init" );
tf::Task stop = taskflow.emplace ([](){ std::cout << "结束任务 stop\n" ; }).name ("stop" );
tf::Task cond = taskflow.emplace ([](){
int r = std::rand ()%2 ;
std::cout << "条件任务 cond 返回:" << r << "\n" ;
return r;
}).name ("cond" );
init.precede (cond);
cond.precede (cond, stop);
executor.run (taskflow).wait ();
return 0 ;
}
说明
init 任务
初始化数据结构或准备工作
必须在 cond 之前执行
cond 任务
条件任务,返回 0 或 1
Taskflow 会根据返回值选择下一步任务
返回 0 → 再次执行 cond(形成循环)
返回 1 → 执行 stop,结束任务流
stop 任务
程序结束前的清理任务
只有当 cond 返回 1 时执行
依赖关系
init.precede(cond):init 在 cond 前执行
cond.precede(cond, stop):cond 根据返回值决定下一步
#include <taskflow/taskflow.hpp>
#include <iostream>
int main () {
tf::Taskflow f1;
tf::Task f1A = f1. emplace ([](){ std::cout << "Task f1A\n" ; }).name ("f1A" );
tf::Task f1B = f1. emplace ([](){ std::cout << "Task f1B\n" ; }).name ("f1B" );
f1A.precede (f1B);
tf::Taskflow f2;
tf::Task f2A = f2. emplace ([](){ std::cout << "Task f2A\n" ; }).name ("f2A" );
tf::Task f2B = f2. emplace ([](){ std::cout << "Task f2B\n" ; }).name ("f2B" );
tf::Task f2C = f2. emplace ([](){ std::cout << "Task f2C\n" ; }).name ("f2C" );
tf::Task f1_module_task = f2. composed_of (f1).name ("module" );
f1_module_task.succeed (f2A, f2B);
f1_module_task.precede (f2C);
tf::Executor executor;
executor.run (f2).wait ();
return 0 ;
}
理解与注释:
子任务流 f1
包含任务 f1A 和 f1B,并且 f1A 先于 f1B 执行。
主任务流 f2
包含普通任务 f2A, f2B, f2C。
使用 composed_of(f1) 将子任务流 f1 嵌入为模块任务 module。
依赖关系
f1_module_task.succeed(f2A, f2B) → 模块执行后再执行 f2A 和 f2B。
f1_module_task.precede(f2C) → 模块任务必须在 f2C 执行前完成。
执行器 tf::Executor
Taskflow Executor 异步任务示例 #include <taskflow/taskflow.hpp>
#include <iostream>
#include <future>
int main () {
tf::Executor executor;
std::future<int > future = executor.async ([](){
std::cout << "async task returns 1\n" ;
return 1 ;
});
executor.silent_async ([](){
std::cout << "async task does not return\n" ;
});
tf::AsyncTask A = executor.silent_dependent_async ([](){ std::cout << "Task A\n" ; });
tf::AsyncTask B = executor.silent_dependent_async ([](){ std::cout << "Task B\n" ; }, A);
tf::AsyncTask C = executor.silent_dependent_async ([](){ std::cout << "Task C\n" ; }, A);
tf::AsyncTask D = executor.silent_dependent_async ([](){ std::cout << "Task D\n" ; }, B, C);
executor.wait_for_all ();
int result = future.get ();
std::cout << "Future result: " << result << "\n" ;
return 0 ;
}
理解:
Executor
tf::Executor 是 Taskflow 的运行时,管理线程池与任务调度。
普通异步任务
executor.async 返回 std::future,可以获取返回值。
executor.silent_async 不返回值,也不阻塞。
动态依赖任务
silent_dependent_async 可以让任务依赖其他任务完成后执行。
例如:D 依赖于 B 和 C,只有当 B 和 C 都完成后,D 才会运行。
等待任务完成
executor.wait_for_all() 阻塞当前线程直到所有异步任务完成。
获取 future
对于返回值任务,可以通过 future.get() 获取执行结果。
async task returns 1
(future)
async task does
not return
#include <taskflow/taskflow.hpp>
#include <iostream>
int main () {
tf::Executor executor;
tf::Taskflow taskflow;
auto A = taskflow.emplace ([](){ std::cout << "Task A\n" ; }).name ("A" );
auto B = taskflow.emplace ([](){ std::cout << "Task B\n" ; }).name ("B" );
auto C = taskflow.emplace ([](){ std::cout << "Task C\n" ; }).name ("C" );
A.precede (B, C);
tf::Future<void > run_once = executor.run (taskflow);
run_once.get ();
executor.run_n (taskflow, 4 );
executor.run_until (taskflow, [counter=5 ]() mutable {return --counter == 0 ;});
executor.wait_for_all ();
return 0 ;
}
注释
tf::Executor executor;
tf::Taskflow taskflow;
taskflow.emplace(...)
创建任务,并用 .name("A") 给任务命名。
A.precede(B, C)
表示任务 A 先于 B 和 C 执行,即 B 和 C 都依赖 A。
executor.run(taskflow)
运行 taskflow 一次,返回 tf::Future<void> 可用于查询状态。
executor.run_n(taskflow, 4)
executor.run_until(taskflow, [counter=5]{...})
运行 taskflow 直到 lambda 返回 true,这里用计数器控制执行 5 次。
executor.wait_for_all()
阻塞,直到 executor 中所有 taskflow 完成。
对应 Mermaid DAG 图
说明
A : 起始任务
B 和 C : 依赖 A 执行的任务
任务流可以被执行器多次执行(run_once, run_n, run_until),DAG 表示任务间的依赖关系,而非执行次数。
Taskflow 并行算法 #include <taskflow/taskflow.hpp>
#include <taskflow/algorithm/reduce.hpp>
#include <taskflow/algorithm/sort.hpp>
#include <taskflow/algorithm/for_each.hpp>
#include <vector>
#include <iostream>
int main () {
tf::Executor executor;
tf::Taskflow taskflow;
std::vector<int > data (10 , 0 ) ;
int init_sum = 0 ;
tf::Task task1 = taskflow.for_each(
data.begin (), data.end (),
[](auto & i){ i = 100 ; }
).name ("Parallel ForEach" );
tf::Task task2 = taskflow.reduce (
data.begin (), data.end (), init_sum,
[](auto a, auto b){ return a + b; }
).name ("Parallel Reduce" );
tf::Task task3 = taskflow.sort (
data.begin (), data.end (),
[](auto a, auto b){ return a < b; }
).name ("Parallel Sort" );
task1. precede (task2);
task2. precede (task3);
executor.run (taskflow).wait ();
std::cout << "Sorted data: " ;
for (auto v : data) std::cout << v << " " ;
std::cout << std::endl;
return 0 ;
}
注释
tf::Executor executor
tf::Taskflow taskflow
taskflow.for_each
并行遍历数组 data,将每个元素赋值为 100。
taskflow.reduce
并行归约数组 data,计算总和 init_sum。
taskflow.sort
task1.precede(task2) 和 task2.precede(task3)
设置依赖关系,确保顺序:先赋值 → 再归约 → 再排序。
executor.run(taskflow).wait()
对应 Mermaid DAG 图
说明
Parallel ForEach (A) :并行赋值任务
Parallel Reduce (B) :并行求和任务
Parallel Sort (C) :并行排序任务
DAG 展示了任务间的执行顺序依赖:A → B → C。
Taskflow Pipeline
完整 C++ 示例代码(Pipeline) #include <taskflow/taskflow.hpp>
#include <taskflow/algorithm/pipeline.hpp>
#include <vector>
#include <cstdio>
int main () {
tf::Executor executor;
tf::Taskflow taskflow;
const size_t num_parallel_lines = 5 ;
std::vector<int > buffer (num_parallel_lines, 0 ) ;
tf::Pipeline pl (
num_parallel_lines,
tf::Pipe{
tf::PipeType::SERIAL,
[&buffer](tf::Pipeflow& pf){
buffer[pf.line()] = static_cast <int >(pf.token());
printf("stage 1: token %zu stored in buffer[%zu]\n" , pf.token(), pf.line());
if (pf.token() == 5 ){
pf.stop();
}
}
},
tf::Pipe{
tf::PipeType::SERIAL,
[&buffer](tf::Pipeflow& pf){
printf("stage 2: input buffer[%zu] = %d\n" , pf.line(), buffer[pf.line()]);
buffer[pf.line()] += 10 ;
}
},
tf::Pipe{
tf::PipeType::SERIAL,
[&buffer](tf::Pipeflow& pf){
printf("stage 3: input buffer[%zu] = %d\n" , pf.line(), buffer[pf.line()]);
}
}
) ;
taskflow.composed_of (pl);
executor.run (taskflow).wait ();
return 0 ;
}
注释
tf::Executor executor
tf::Taskflow taskflow
tf::Pipeline pl
创建一个 pipeline,有 num_parallel_lines 个 token 并行流。
每个 token 会依次经过三个串行 stage(SERIAL Pipe):
Stage 1 :初始化 token 并存储到 buffer 中,如果 token == 5 停止 pipeline。
Stage 2 :处理 token,修改 buffer 内容。
Stage 3 :输出 token 结果。
taskflow.composed_of(pl)
executor.run(taskflow).wait()
对应 Mermaid DAG 图
说明
Stage 1 :token 初始化
Stage 2 :处理 token(模拟业务逻辑)
Stage 3 :输出 token
依赖关系 :每个 token 在三个 stage 中顺序执行,但不同 token 之间可以并行流动,实现高吞吐量。
相关免费在线工具 加密/解密文本 使用加密算法(如AES、TripleDES、Rabbit或RC4)加密和解密文本明文。 在线工具,加密/解密文本在线工具,online
RSA密钥对生成器 生成新的随机RSA私钥和公钥pem证书。 在线工具,RSA密钥对生成器在线工具,online
Mermaid 预览与可视化编辑 基于 Mermaid.js 实时预览流程图、时序图等图表,支持源码编辑与即时渲染。 在线工具,Mermaid 预览与可视化编辑在线工具,online
随机西班牙地址生成器 随机生成西班牙地址(支持马德里、加泰罗尼亚、安达卢西亚、瓦伦西亚筛选),支持数量快捷选择、显示全部与下载。 在线工具,随机西班牙地址生成器在线工具,online
Gemini 图片去水印 基于开源反向 Alpha 混合算法去除 Gemini/Nano Banana 图片水印,支持批量处理与下载。 在线工具,Gemini 图片去水印在线工具,online
Base64 字符串编码/解码 将字符串编码和解码为其 Base64 格式表示形式即可。 在线工具,Base64 字符串编码/解码在线工具,online