跳到主要内容Python 实现定时任务的八种常见方案 | 极客日志Python算法
Python 实现定时任务的八种常见方案
综述由AI生成Python 实现定时任务的八种常见方案,包括基础的 while+sleep 循环、轻量级库如 Timeloop、Threading.Timer、Sched、Schedule,以及企业级框架 APScheduler、Celery 和 Airflow。文章对比了各方案的优缺点及适用场景,涵盖了代码示例、核心概念解析及架构说明,帮助开发者根据实际需求选择合适的定时任务解决方案。
芝士奶盖22 浏览 Python 实现定时任务的八种常见方案
在日常工作中,我们常常会用到需要周期性执行的任务。一种方式是采用 Linux 系统自带的 crond 结合命令行实现,另一种方式是直接使用 Python。接下来整理的是常见的 Python 定时任务的实现方式。
1. 利用 while True + sleep() 实现定时任务
位于 time 模块中的 sleep(secs) 函数,可以实现令当前执行的线程暂停 secs 秒后再继续执行。所谓暂停,即令当前线程进入阻塞状态,当达到 sleep() 函数规定的时间后,再由阻塞状态转为就绪状态,等待 CPU 调度。
基于这样的特性,我们可以通过 while 死循环 + sleep() 的方式实现简单的定时任务。
代码示例
import datetime
import time
def time_printer():
now = datetime.datetime.now()
ts = now.strftime('%Y-%m-%d %H:%M:%S')
print('do func time :', ts)
def loop_monitor():
while True:
time_printer()
time.sleep(5)
if __name__ == "__main__":
loop_monitor()
主要缺点
- 只能设定间隔,不能指定具体的时间(比如每天早上 8:00)。
sleep 是一个阻塞函数,也就是说 sleep 这一段时间,程序什么也不能操作。
2. 使用 Timeloop 库运行定时任务
Timeloop 是一个库,可用于运行多周期任务。这是一个简单的库,它使用装饰器模式在线程中运行标记函数。
代码示例
import time
from timeloop import Timeloop
from datetime import timedelta
tl = Timeloop()
@tl.job(interval=timedelta(seconds=2))
def sample_job_every_2s():
print("2s job current time : {}".format(time.ctime()))
():
(.(time.ctime()))
():
(.(time.ctime()))
tl:
tl.start()
@tl.job(interval=timedelta(seconds=5))
def
sample_job_every_5s
print
"5s job current time : {}"
format
@tl.job(interval=timedelta(seconds=10))
def
sample_job_every_10s
print
"10s job current time : {}"
format
with
3. 利用 threading.Timer 实现定时任务
threading 模块中的 Timer 是一个非阻塞函数,比 sleep 稍好一点。Timer 最基本理解就是定时器,我们可以启动多个定时任务,这些定时器任务是异步执行,所以不存在等待顺序执行问题。
参数说明
interval: 指定的时间
function: 要执行的方法
args/kwargs: 方法的参数
代码示例
import threading
import time
def task():
print("Task executed at", time.ctime())
timer = threading.Timer(5, task)
timer.start()
4. 利用内置模块 sched 实现定时任务
sched 模块实现了一个通用事件调度器,在调度器类使用一个延迟函数等待特定的时间,执行任务。同时支持多线程应用程序,在每个任务执行后会立刻调用延时函数,以确保其他线程也能执行。
核心类
class sched.scheduler(timefunc, delayfunc) 这个类定义了调度事件的通用接口,它需要外部传入两个参数:
timefunc: 一个没有参数的返回时间类型数字的函数 (常用使用的如 time 模块里面的 time)。
delayfunc: 应该是一个需要一个参数来调用、与 timefunc 的输出兼容、并且作用为延迟多个时间单位的函数 (常用的如 time 模块的 sleep)。
代码示例
import datetime
import time
import sched
def time_printer():
now = datetime.datetime.now()
ts = now.strftime('%Y-%m-%d %H:%M:%S')
print('do func time :', ts)
loop_monitor()
def loop_monitor():
s = sched.scheduler(time.time, time.sleep)
s.enter(5, 1, time_printer, ())
s.run()
if __name__ == "__main__":
loop_monitor()
主要方法
enter(delay, priority, action, argument): 安排一个事件来延迟 delay 个时间单位。
cancel(event): 从队列中删除事件。如果事件不是当前队列中的事件,则该方法将抛出一个 ValueError。
run(): 运行所有预定的事件。这个函数将等待 (使用传递给构造函数的 delayfunc() 函数),然后执行事件,直到不再有预定的事件。
个人点评:比 threading.Timer 更好,不需要循环调用。
5. 利用调度模块 schedule 实现定时任务
schedule 是一个第三方轻量级的任务调度模块,可以按照秒、分、小时、日期或者自定义事件执行时间。schedule 允许用户使用简单、人性化的语法以预定的时间间隔定期运行 Python 函数 (或其它可调用函数)。
基础用法
import schedule
import time
def job():
print("I'm working...")
schedule.every(10).seconds.do(job)
schedule.every(10).minutes.do(job)
schedule.every().hour.do(job)
schedule.every().day.at("10:30").do(job)
schedule.every(5).to(10).minutes.do(job)
schedule.every().monday.do(job)
schedule.every().wednesday.at("13:15").do(job)
schedule.every().minute.at(":17").do(job)
while True:
schedule.run_pending()
time.sleep(1)
高级功能
- 装饰器: 通过
@repeat() 装饰静态方法。
- 传递参数: 支持向任务函数传递位置参数和关键字参数。
- 取消任务: 使用
schedule.cancel_job(job) 或 schedule.clear()。
- 标签检索: 使用
.tag() 给任务打标签,并通过 schedule.get_jobs('tag_name') 检索。
- 运行至某时间: 使用
.until() 限制任务运行的截止时间。
- 并行运行: 结合 Python 内置队列或 threading 实现并行处理。
6. 利用任务框架 APScheduler 实现定时任务
APScheduler (Advanced Python Scheduler) 基于 Quartz 的一个 Python 定时任务框架,实现了 Quartz 的所有功能,使用起来十分方便。提供了基于日期、固定时间间隔以及 crontab 类型的任务,并且可以持久化任务。基于这些功能,我们可以很方便的实现一个 Python 定时任务系统。
三大特点
- 类似于 Linux Cron 的调度程序 (可选的开始/结束时间)。
- 基于时间间隔的执行调度 (周期性调度,可选的开始/结束时间)。
- 一次性执行任务 (在设定的日期/时间运行一次任务)。
四大组成部分
- 触发器 (Trigger): 包含调度逻辑,每一个作业有它自己的触发器,用于决定接下来哪一个作业会运行。
- 作业存储 (Job Store): 存储被调度的作业,默认的作业存储是简单地把作业保存在内存中,其他的作业存储是将作业保存在数据库中。
- 执行器 (Executor): 处理作业的运行,它们通常通过在作业中提交制定的可调用对象到一个线程或者进程池来进行。
- 调度器 (Scheduler): 是其他的组成部分。你通常在应用只有一个调度器。
代码示例
from apscheduler.schedulers.blocking import BlockingScheduler
from datetime import datetime
def job():
print(datetime.now().strftime("%Y-%m-%d %H:%M:%S"))
sched = BlockingScheduler()
sched.add_job(job, 'interval', seconds=5, id='my_job_id')
sched.start()
重要概念详解
Job 作业
Job 作为 APScheduler 最小执行单位。创建 Job 时指定执行的函数,函数中所需参数,Job 执行时的一些设置信息。
id: 指定作业的唯一 ID。
trigger: 确定 Job 的执行时间。
executor: 指定执行器名称。
max_instances: 执行此 Job 的最大实例数。
misfire_grace_time: Job 的延迟执行时间容差。
Trigger 触发器
- DateTrigger: 指定时间的 DateTrigger,作业只执行一次。
- IntervalTrigger: 指定间隔时间的 IntervalTrigger。
- CronTrigger: 像 Linux 的 crontab 一样的 CronTrigger。
Executor 执行器
根据实际 scheduler 选择不同的执行器,如 ThreadPoolExecutor, ProcessPoolExecutor, AsyncIOExecutor 等。
Jobstore 作业存储
决定任务的保存方式,默认存储在内存中 (MemoryJobStore),重启后就没有了。APScheduler 支持的任务存储器有:jobstores.memory, jobstores.mongodb, jobstores.redis, jobstores.sqlalchemy 等。
Scheduler 调度器
APScheduler 支持的调度器方式如下,比较常用的为 BlockingScheduler 和 BackgroundScheduler。
BlockingScheduler: 适用于调度程序是进程中唯一运行的进程,调用 start 函数会阻塞当前线程。
BackgroundScheduler: 适用于调度程序在应用程序的后台运行,调用 start 后主线程不会阻塞。
7. 使用分布式消息系统 Celery 实现定时任务
Celery 是一个简单,灵活,可靠的分布式系统,用于处理大量消息,同时为操作提供维护此类系统所需的工具,也可用于任务调度。Celery 的配置比较麻烦,如果你只是需要一个轻量级的调度工具,Celery 不会是一个好选择。
Celery 是一个强大的分布式任务队列,它可以让任务的执行完全脱离主程序,甚至可以被分配到其他主机上运行。我们通常使用它来实现异步任务 (async task) 和定时任务 (crontab)。
架构组成
Celery 架构采用典型的生产者 - 消费者模式,主要由以下部分组成:
- Celery Beat: 任务调度器,Beat 进程会读取配置文件的内容,周期性地将配置中到期需要执行的任务发送给任务队列。
- Producer: 需要在队列中进行的任务,一般由用户、触发器或其他操作将任务入队。
- Broker: 即消息中间件,在这指任务队列本身,Celery 扮演生产者和消费者的角色。
- Celery Worker: 执行任务的消费者,从队列中取出任务并执行。
- Result Backend: 任务处理完后保存状态信息和结果,以供查询。
注意事项
需要注意,Celery 本身并不具备任务的存储功能,在调度任务的时候肯定是要把任务存起来的,因此在使用 Celery 的时候还需要搭配一些具备存储、访问功能的工具,比如:消息队列、Redis 缓存、数据库等。官方推荐的是消息队列 RabbitMQ,有些时候使用 Redis 也是不错的选择。
8. 使用数据流工具 Apache Airflow 实现定时任务
Apache Airflow 是 Airbnb 开源的一款数据流程工具,目前是 Apache 孵化项目。以非常灵活的方式来支持数据的 ETL 过程,同时还支持非常多的插件来完成诸如 HDFS 监控、邮件通知等功能。Airflow 支持单机和分布式两种模式,支持 Master-Slave 模式,支持 Mesos 等资源调度,有非常好的扩展性。被大量公司采用。
核心概念
Airflow 使用 Python 开发,它通过 DAGs (Directed Acyclic Graph, 有向无环图) 来表达一个工作流中所要执行的任务,以及任务之间的关系和依赖。
- DAGs: 即有向无环图,将所有需要运行的 tasks 按照依赖关系组织起来,描述的是所有 tasks 执行顺序。
- Operators: 可以简单理解为一个 class,描述了 DAG 中某个 task 具体要做的事。
- Tasks: Task 是 Operator 的一个实例,也就是 DAGs 中的一个 node。
- Task Instance: task 的一次运行。
架构组件
在一个可扩展的生产环境中,Airflow 含有以下组件:
- 元数据库: 这个数据库存储有关任务状态的信息。
- 调度器 (Scheduler): 是一种使用 DAG 定义结合元数据中的任务状态来决定哪些任务需要被执行以及任务执行优先级的过程。
- 执行器 (Executor): 是一个消息队列进程,它被绑定到调度器中,用于确定实际执行每个任务计划的工作进程。
- Workers: 这些是实际执行任务逻辑的进程,由正在使用的执行器确定。
执行器类型
Worker 的具体实现由配置文件中的 executor 来指定,Airflow 支持多种 Executor:
SequentialExecutor: 单进程顺序执行,一般只用来测试。
LocalExecutor: 本地多进程执行。
CeleryExecutor: 使用 Celery 进行分布式任务调度。
KubernetesExecutor: 创建临时 POD 执行每次任务。
生产环境一般使用 CeleryExecutor 和 KubernetesExecutor。
总结与选型建议
以上介绍了八种 Python 定时任务的实现方案,各有优劣,适用场景不同:
- while + sleep: 最简单,适合学习或极简单的脚本,但阻塞且无法精确控制时间点。
- Timeloop / Threading.Timer / Sched: 适合轻量级、单进程内的定时需求,无需额外复杂配置。
- Schedule: 语法最人性化,适合快速开发中小型项目的定时任务,但不支持持久化。
- APScheduler: 功能最强大,支持持久化、多种触发器、分布式部署,适合企业级应用。
- Celery: 适合分布式、高并发、需要消息队列解耦的场景,配置相对复杂。
- Airflow: 适合复杂的数据管道、ETL 流程管理,强调任务依赖关系可视化,运维成本较高。
在实际项目中,应根据任务复杂度、是否需要持久化、是否涉及分布式以及团队技术栈来选择最合适的方案。
相关免费在线工具
- 加密/解密文本
使用加密算法(如AES、TripleDES、Rabbit或RC4)加密和解密文本明文。 在线工具,加密/解密文本在线工具,online
- Gemini 图片去水印
基于开源反向 Alpha 混合算法去除 Gemini/Nano Banana 图片水印,支持批量处理与下载。 在线工具,Gemini 图片去水印在线工具,online
- curl 转代码
解析常见 curl 参数并生成 fetch、axios、PHP curl 或 Python requests 示例代码。 在线工具,curl 转代码在线工具,online
- Base64 字符串编码/解码
将字符串编码和解码为其 Base64 格式表示形式即可。 在线工具,Base64 字符串编码/解码在线工具,online
- Base64 文件转换器
将字符串、文件或图像转换为其 Base64 表示形式。 在线工具,Base64 文件转换器在线工具,online
- Markdown转HTML
将 Markdown(GFM)转为 HTML 片段,浏览器内 marked 解析;与 HTML转Markdown 互为补充。 在线工具,Markdown转HTML在线工具,online