跳到主要内容高性能 C++ 调度器设计与实现 | 极客日志C++AI算法
高性能 C++ 调度器设计与实现
基于 C++20 实现的单机任务调度器,提供任务提交、资源管理、进程执行、超时控制和状态跟踪能力。采用多线程模型,包含调度、回收、PSI 监测线程。支持 CPU 核数和内存上限准入控制,通过 fork/exec 执行任务并可选 cgroup v2 资源限制。具备可观测性,提供 Prometheus 兼容指标采集和 HTTP 导出。支持持久化恢复、超时终止、信号管理和 PSI 背压。适用于 AI 训练、CI/CD、Serverless 及边缘计算等场景。
机器人1 浏览 项目概述
1.1 背景
TaskScheduler 是一个 C++20 实现的单机任务调度器,用于管理和执行本地任务。它提供了完整的任务生命周期管理能力,包括任务提交、资源管理、进程执行、超时控制和状态跟踪。
1.1.1 单机调度器应用场景
1. AI/机器学习训练与推理
场景:一台服务器上有多个 GPU,需要同时运行多个训练或推理任务。
需求:按 GPU 显存、计算单元分配任务;防止任务互相抢占资源;支持优先级。
2. CI/CD 与自动化测试平台
场景:GitLab Runner、Jenkins Agent、内部构建系统。
需求:并发执行多个构建/测试任务;限制每个任务的 CPU/内存;任务队列管理、失败重试、超时控制。
3. Serverless / FaaS(函数即服务)的本地运行时
场景:用户提交一个函数,平台在本地执行。
需求:快速启动、资源隔离、超时 kill;高并发。
4. 边缘计算(Edge Computing)
场景:摄像头、IoT 网关、车载设备等资源受限设备。
需求:低内存占用;无依赖;支持定时任务、事件触发任务。
5. 游戏服务器(Game Server)
场景:一个物理机部署多个游戏房间(Room)实例。
需求:每个房间独立进程;动态扩缩容;低延迟通信。
1.2 核心目标
- 任务调度:支持任务提交、排队、调度和执行的完整闭环
- 资源管理:基于 CPU 核数和内存上限进行准入控制
- 进程隔离:通过 fork/exec 执行任务,可选 cgroup v2 资源限制
- 可观测性:提供指标采集、HTTP 导出和 Prometheus 兼容格式
- 高可靠性:支持持久化恢复、超时终止、信号管理和 PSI 背压
1.3 技术特点
- 多线程模型:独立的调度、回收、PSI 监测和 Cron 触发线程
- 资源感知:预留/释放机制避免资源超卖
- 灵活配置:支持优先级调度、命令白/黑名单、工作目录限制
- 轻量级实现:核心代码约 600 行,依赖 SQLite 和 Linux 系统调用
需求分析
2.1 功能性需求
2.1.1 任务提交与管理
struct JobSpec {
std::string cmd;
int cpu_cores{1};
size_t memory_mb{256};
int timeout_sec{0};
int priority{0};
};
- 支持通过 JobSpec 提交任务,包含命令、资源需求、超时和优先级
- 队列长度限制:max_queue_size 配置,超出则拒绝提交
- 命令准入:支持白名单/黑名单校验
2.1.2 资源配额管理
struct {
total_cpu{};
total_mem_mb{};
};
ResourceQuota
int
4
size_t
2048
- CPU 和内存的预留/释放机制
- 启动前检查资源是否足够,不足则等待
- 任务结束后自动释放资源
2.1.3 任务生命周期
enum class JobStatus {
Pending,
Running,
Succeeded,
Failed,
Timeout,
Cancelled
};
- 完整状态转换:Pending → Running → Succeeded/Failed/Timeout
- 超时管理:两阶段终止(SIGTERM → 宽限期 → SIGKILL)
- 进程组管理:整组清理避免子进程泄漏
2.1.4 可观测性
- 指标采集:提交数、拒绝数、运行数、成功/失败/超时数、排队延迟
- HTTP 导出:Prometheus 兼容的 /metrics 端点
- 健康检查:/health 端点返回 ok
2.2 非功能性需求
- 线程安全:所有共享状态通过互斥锁保护
- 资源可控:任务结束后正确释放资源,避免泄漏
- 优雅退出:stop() 能够等待任务完成或按策略终止
- 日志完善:关键操作有日志记录,便于排障
2.3 可选特性
- cgroup v2 隔离:CPU 配额和内存限制
- PSI 背压监测:根据系统压力暂停新任务启动
- SQLite 持久化:支持重启后恢复未完成任务
- Cron 调度:支持定时触发任务(简化版 cron 表达式)
架构设计
3.1 总体架构
3.2 线程模型
- 主线程:处理外部 submit() 调用,加锁操作 pending_ 队列
- dispatcher 线程:从 pending_ 取任务,检查资源,fork/exec 启动进程
- reaper 线程:周期性 waitpid 回收子进程,处理超时,释放资源
- psi 线程(可选):读取 cgroup pressure 文件,更新背压标志
- cron 线程(可选):检查模板到期时间,生成任务实例
- http 线程(可选):处理 HTTP 请求,返回指标或健康状态
3.3 数据流
submit() → 校验白/黑名单 → 检查队列上限 → pending_.push_back() → inc_submitted() → cv_.notify_all()
dispatcher_loop() → pick_next_job() → rm_.reserve() → launch_job() → fork/exec → running_[id] = job
reaper_loop() → waitpid(WNOHANG) → 更新 exit_code/status → rm_.release() → cleanup_cgroup() → running_.erase(id)
核心模块设计
4.1 Scheduler(调度器)
class Scheduler {
public:
explicit Scheduler(SchedulerOptions opts);
int submit(const JobSpec& spec);
void start();
void stop();
bool idle() const;
Metrics::Snapshot metrics() const;
private:
void dispatcher_loop();
void reaper_loop();
void psi_loop();
void cron_loop();
bool launch_job(Job& job);
bool pick_next_job(Job& out);
void restore_from_store();
};
private:
SchedulerOptions opts_;
ResourceManager rm_;
std::vector<Job> pending_;
std::unordered_map<int, Job> running_;
mutable std::mutex mu_;
std::condition_variable cv_;
std::atomic<bool> shutting_down_{false};
std::atomic<bool> psi_backpressure_{false};
Metrics metrics_;
std::unique_ptr<JobStore> store_;
std::unique_ptr<CronScheduler> cron_sched_;
std::unique_ptr<MetricsHttpServer> metrics_server_;
4.2 ResourceManager(资源管理器)
职责:管理 CPU 和内存配额,提供预留/释放接口
class ResourceManager {
public:
explicit ResourceManager(ResourceQuota quota);
bool reserve(int cpu, size_t mem_mb);
void release(int cpu, size_t mem_mb);
std::pair<int, size_t> used() const;
private:
ResourceQuota quota_;
int used_cpu_{0};
size_t used_mem_mb_{0};
mutable std::mutex mu_;
};
核心逻辑(src/resource_manager.cpp):
bool ResourceManager::reserve(int cpu, size_t mem_mb) {
std::lock_guard lk(mu_);
if (used_cpu_ + cpu > quota_.total_cpu || used_mem_mb_ + mem_mb > quota_.total_mem_mb) {
return false;
}
used_cpu_ += cpu;
used_mem_mb_ += mem_mb;
return true;
}
4.3 Metrics(指标收集器)
职责:提供原子计数器,生成快照和 Prometheus 文本
| 指标名 | 类型 | 说明 |
|---|
| submitted_ | counter | 累计提交任务数 |
| rejected_ | counter | 因队列满/策略拒绝的次数 |
| running_ | gauge | 当前运行中任务数 |
| succeeded_ | counter | 成功完成任务数 |
| failed_ | counter | 失败任务数 |
| timeout_ | counter | 超时被终止任务数 |
| launch_failed_ | counter | 启动失败次数 |
| pressure_blocked_ | counter | 因背压暂停的累计次数 |
| pressure_active_ | gauge | 背压是否激活(1/0) |
| queue_wait_ms_total_ | counter | 队列等待时长总和(毫秒) |
| queue_wait_count_ | counter | 统计样本数 |
| queue_wait_ms_max_ | gauge | 最大等待时长(毫秒) |
tasks_total{status="submitted"} 100
tasks_total{status="rejected"} 5
tasks_total{status="succeeded"} 80
tasks_total{status="failed"} 10
tasks_total{status="timeout"} 5
tasks_running_current 3
tasks_pending_current 5
4.4 CgroupHelper(cgroup 辅助)
std::string create_cgroup_for_job(int job_id, int cpu_cores, size_t mem_mb, const CgroupConfig& cfg);
bool attach_pid_to_cgroup(pid_t pid, const std::string& cg_path);
void cleanup_cgroup(const std::string& cg_path);
实现细节(src/cgroup_helper.cpp):
- 在 cfg.base_path 下创建 job_ 子目录
- 写入 cpu.max:<quota_us> <period_us>,例如 100000 100000 表示 1 核
- 写入 memory.max:字节数,例如 268435456 表示 256MB
- 写入 cgroup.procs:将 pid 加入该 cgroup
4.5 JobStore(持久化存储)
职责:通过 SQLite 持久化任务状态,支持重启恢复
CREATE TABLE jobs (
id INTEGER PRIMARY KEY AUTOINCREMENT,
cmd TEXT NOT NULL,
cpu_cores INTEGER,
memory_mb INTEGER,
timeout_sec INTEGER,
priority INTEGER,
status TEXT,
submit_ms INTEGER,
start_ms INTEGER,
end_ms INTEGER,
exit_code INTEGER
);
class JobStore {
public:
bool init(const std::string& path);
int insert_job(const JobSpec& spec, PersistStatus status, int64_t submit_ms, ...);
void update_status(int id, PersistStatus status, ...);
std::vector<PersistedJob> load_unfinished();
};
恢复策略(Scheduler::restore_from_store()):
void Scheduler::restore_from_store() {
if (!store_) return;
auto jobs = store_->load_unfinished();
for (auto& pj : jobs) {
Job job;
job.id = next_id_++;
job.spec = pj.spec;
job.status = JobStatus::Pending;
job.enqueue_time = std::chrono::steady_clock::now();
pending_.push_back(job);
}
cv_.notify_all();
}
4.6 CronScheduler(定时触发)
- 支持简化表达式:@every s,例如 @every 60s 每 60 秒触发一次
- 完整 5 字段 cron(分 时 日 月 周)接口已定义但简化实现
void CronScheduler::tick(SubmitCallback submit_cb) {
auto now = std::chrono::system_clock::now();
for (auto& tpl : templates_) {
if (!tpl.enabled) continue;
if (now >= tpl.next_run) {
submit_cb(tpl.spec);
tpl.next_run = tpl.cron.next_run(now);
}
}
}
4.7 MetricsHttpServer(HTTP 指标服务)
职责:提供轻量级 HTTP 服务,导出指标和健康检查
- GET /metrics:返回 Prometheus 文本格式指标
- GET /health:返回 ok
- 监听线程:accept() 接收连接
- 工作线程池:处理请求,生成响应
- 连接队列:有限长度,避免内存膨胀
实现细节(src/metrics_http_server.cpp):
class MetricsHttpServer {
public:
using MetricsHandler = std::function<std::string()>;
bool start(int port, MetricsHandler handler);
void stop();
private:
void accept_loop();
void worker_loop();
};
关键流程设计
5.1 任务提交流程
5.2 调度启动流程
关键代码(Scheduler::launch_job()):
bool Scheduler::launch_job(Job& job) {
std::string cg_path;
if (opts_.cgroup.enabled) {
cg_path = create_cgroup_for_job(job.id, job.spec.cpu_cores, job.spec.memory_mb, opts_.cgroup);
}
pid_t pid = fork();
if (pid == 0) {
setpgid(0, 0);
if (!cg_path.empty()) {
attach_pid_to_cgroup(getpid(), cg_path);
}
if (opts_.rlimit_nofile >= 0) {
struct rlimit rl;
rl.rlim_cur = rl.rlim_max = opts_.rlimit_nofile;
setrlimit(RLIMIT_NOFILE, &rl);
}
if (opts_.disable_core_dump) {
struct rlimit rl;
rl.rlim_cur = rl.rlim_max = 0;
setrlimit(RLIMIT_CORE, &rl);
}
if (!opts_.workdir.empty()) {
chdir(opts_.workdir.c_str());
}
execl("/bin/sh", "sh", "-c", job.spec.cmd.c_str(), nullptr);
_exit(127);
}
job.pid = pid;
job.pgid = pid;
job.start_time = std::chrono::steady_clock::now();
job.status = JobStatus::Running;
return true;
}
5.3 回收超时流程
关键代码(Scheduler::reaper_loop()):
void Scheduler::reaper_loop() {
while (!shutting_down_.load()) {
std::this_thread::sleep_for(std::chrono::milliseconds(100));
std::lock_guard lk(mu_);
auto now = std::chrono::steady_clock::now();
for (auto it = running_.begin(); it != running_.end(); ) {
Job& job = it->second;
if (job.spec.timeout_sec > 0) {
auto elapsed = std::chrono::duration_cast<std::chrono::seconds>(
now - job.start_time).count();
if (elapsed >= job.spec.timeout_sec) {
if (!job.sigterm_sent) {
kill(-job.pgid, SIGTERM);
job.sigterm_sent = true;
job.kill_deadline = now + std::chrono::seconds(opts_.kill_grace_sec);
} else if (now >= *job.kill_deadline) {
kill(-job.pgid, SIGKILL);
}
}
}
int status;
pid_t ret = waitpid(job.pid, &status, WNOHANG);
if (ret > 0) {
job.exit_code = status;
job.end_time = now;
if (job.sigterm_sent) {
job.status = JobStatus::Timeout;
metrics_.inc_timeout();
} else if (WIFEXITED(status) && WEXITSTATUS(status) == 0) {
job.status = JobStatus::Succeeded;
metrics_.inc_succeeded();
} else {
job.status = JobStatus::Failed;
metrics_.inc_failed();
}
rm_.release(job.spec.cpu_cores, job.spec.memory_mb);
metrics_.dec_running();
if (opts_.cgroup.enabled) {
std::string cg_path = opts_.cgroup.base_path + "/job_" + std::to_string(job.id);
cleanup_cgroup(cg_path);
}
it = running_.erase(it);
} else {
++it;
}
}
}
}
5.4 PSI 背压流程
- psi_thread 周期性读取 /sys/fs/cgroup/scheduler/memory.pressure 和 cpu.pressure
- 解析 avg10 值(10 秒平均压力)
- 与阈值比较(如 50.0),超过则设置 psi_backpressure_ = true
- dispatcher_loop 检查背压标志,若为 true 则跳过本轮调度
void Scheduler::psi_loop() {
const double threshold = 50.0;
while (!shutting_down_.load()) {
std::this_thread::sleep_for(std::chrono::seconds(1));
std::string mem_pressure_file = opts_.cgroup.base_path + "/memory.pressure";
std::ifstream ifs(mem_pressure_file);
double avg10 = parse_psi_avg10(ifs);
bool pressure = (avg10 > threshold);
if (pressure != psi_backpressure_.load()) {
psi_backpressure_.store(pressure);
metrics_.set_pressure_active(pressure);
Logger::instance().log(Logger::Level::Info, pressure ? "PSI backpressure activated" : "PSI backpressure cleared");
}
}
}
配置与接口
6.1 配置选项
struct SchedulerOptions {
ResourceQuota quota;
CgroupConfig cgroup;
int max_queue_size{1000};
int kill_grace_sec{2};
bool enable_priority{false};
bool enable_psi_monitor{false};
std::vector<std::string> cmd_whitelist;
std::vector<std::string> cmd_blacklist;
std::string workdir;
int metrics_http_port{-1};
int rlimit_nofile{-1};
bool disable_core_dump{true};
bool enable_persistence{false};
std::string db_path{"state/tasks.db"};
bool enable_cron{false};
int cron_tick_ms{1000};
};
6.2 命令行接口
./scheduler \
--cmd "echo hello" \
--cpu 1 \
--mem 256 \
--timeout 5 \
--priority 10 \
--total-cpu 4 \
--total-mem 2048 \
--cgroup \
--enable-priority \
--metrics-port 8080 \
--whitelist ls,echo \
--blacklist rm,shutdown \
--workdir /tmp
6.3 HTTP 接口
$ curl http://localhost:8080/health
ok
$ curl http://localhost:8080/metrics
tasks_total{status="submitted"} 100
tasks_total{status="rejected"} 5
tasks_total{status="succeeded"} 80
...
相关免费在线工具
- 加密/解密文本
使用加密算法(如AES、TripleDES、Rabbit或RC4)加密和解密文本明文。 在线工具,加密/解密文本在线工具,online
- RSA密钥对生成器
生成新的随机RSA私钥和公钥pem证书。 在线工具,RSA密钥对生成器在线工具,online
- Mermaid 预览与可视化编辑
基于 Mermaid.js 实时预览流程图、时序图等图表,支持源码编辑与即时渲染。 在线工具,Mermaid 预览与可视化编辑在线工具,online
- Base64 字符串编码/解码
将字符串编码和解码为其 Base64 格式表示形式即可。 在线工具,Base64 字符串编码/解码在线工具,online
- Base64 文件转换器
将字符串、文件或图像转换为其 Base64 表示形式。 在线工具,Base64 文件转换器在线工具,online
- Markdown转HTML
将 Markdown(GFM)转为 HTML 片段,浏览器内 marked 解析;与 HTML转Markdown 互为补充。 在线工具,Markdown转HTML在线工具,online