Python 实现定时任务的八种常见方案
在日常工作中,我们常常会用到需要周期性执行的任务。一种方式是采用 Linux 系统自带的 crond 结合命令行实现,另一种方式是直接使用 Python。接下来整理的是常见的 Python 定时任务的实现方式。
Python 实现定时任务的八种常见方案,包括基础的 while+sleep 循环、轻量级库如 Timeloop、Threading.Timer、Sched、Schedule,以及企业级框架 APScheduler、Celery 和 Airflow。文章对比了各方案的优缺点及适用场景,涵盖了代码示例、核心概念解析及架构说明,帮助开发者根据实际需求选择合适的定时任务解决方案。

在日常工作中,我们常常会用到需要周期性执行的任务。一种方式是采用 Linux 系统自带的 crond 结合命令行实现,另一种方式是直接使用 Python。接下来整理的是常见的 Python 定时任务的实现方式。
位于 time 模块中的 sleep(secs) 函数,可以实现令当前执行的线程暂停 secs 秒后再继续执行。所谓暂停,即令当前线程进入阻塞状态,当达到 sleep() 函数规定的时间后,再由阻塞状态转为就绪状态,等待 CPU 调度。
基于这样的特性,我们可以通过 while 死循环 + sleep() 的方式实现简单的定时任务。
import datetime
import time
def time_printer():
now = datetime.datetime.now()
ts = now.strftime('%Y-%m-%d %H:%M:%S')
print('do func time :', ts)
def loop_monitor():
while True:
time_printer()
time.sleep(5) # 暂停 5 秒
if __name__ == "__main__":
loop_monitor()
sleep 是一个阻塞函数,也就是说 sleep 这一段时间,程序什么也不能操作。Timeloop 是一个库,可用于运行多周期任务。这是一个简单的库,它使用装饰器模式在线程中运行标记函数。
import time
from timeloop import Timeloop
from datetime import timedelta
tl = Timeloop()
@tl.job(interval=timedelta(seconds=2))
def sample_job_every_2s():
print("2s job current time : {}".format(time.ctime()))
@tl.job(interval=timedelta(seconds=5))
def sample_job_every_5s():
print("5s job current time : {}".format(time.ctime()))
@tl.job(interval=timedelta(seconds=10))
def sample_job_every_10s():
print("10s job current time : {}".format(time.ctime()))
# 启动定时器
with tl:
tl.start()
threading 模块中的 Timer 是一个非阻塞函数,比 sleep 稍好一点。Timer 最基本理解就是定时器,我们可以启动多个定时任务,这些定时器任务是异步执行,所以不存在等待顺序执行问题。
interval: 指定的时间function: 要执行的方法args/kwargs: 方法的参数import threading
import time
def task():
print("Task executed at", time.ctime())
# 创建 Timer 对象,5 秒后执行 task 函数
timer = threading.Timer(5, task)
timer.start()
# 注意:Timer 默认只执行一次,如果需要循环调用,需要在任务内部再次创建 Timer
sched 模块实现了一个通用事件调度器,在调度器类使用一个延迟函数等待特定的时间,执行任务。同时支持多线程应用程序,在每个任务执行后会立刻调用延时函数,以确保其他线程也能执行。
class sched.scheduler(timefunc, delayfunc) 这个类定义了调度事件的通用接口,它需要外部传入两个参数:
timefunc: 一个没有参数的返回时间类型数字的函数 (常用使用的如 time 模块里面的 time)。delayfunc: 应该是一个需要一个参数来调用、与 timefunc 的输出兼容、并且作用为延迟多个时间单位的函数 (常用的如 time 模块的 sleep)。import datetime
import time
import sched
def time_printer():
now = datetime.datetime.now()
ts = now.strftime('%Y-%m-%d %H:%M:%S')
print('do func time :', ts)
loop_monitor()
def loop_monitor():
s = sched.scheduler(time.time, time.sleep) # 生成调度器
s.enter(5, 1, time_printer, ())
s.run()
if __name__ == "__main__":
loop_monitor()
enter(delay, priority, action, argument): 安排一个事件来延迟 delay 个时间单位。cancel(event): 从队列中删除事件。如果事件不是当前队列中的事件,则该方法将抛出一个 ValueError。run(): 运行所有预定的事件。这个函数将等待 (使用传递给构造函数的 delayfunc() 函数),然后执行事件,直到不再有预定的事件。个人点评:比 threading.Timer 更好,不需要循环调用。
schedule 是一个第三方轻量级的任务调度模块,可以按照秒、分、小时、日期或者自定义事件执行时间。schedule 允许用户使用简单、人性化的语法以预定的时间间隔定期运行 Python 函数 (或其它可调用函数)。
import schedule
import time
def job():
print("I'm working...")
schedule.every(10).seconds.do(job)
schedule.every(10).minutes.do(job)
schedule.every().hour.do(job)
schedule.every().day.at("10:30").do(job)
schedule.every(5).to(10).minutes.do(job)
schedule.every().monday.do(job)
schedule.every().wednesday.at("13:15").do(job)
schedule.every().minute.at(":17").do(job)
while True:
schedule.run_pending()
time.sleep(1)
@repeat() 装饰静态方法。schedule.cancel_job(job) 或 schedule.clear()。.tag() 给任务打标签,并通过 schedule.get_jobs('tag_name') 检索。.until() 限制任务运行的截止时间。APScheduler (Advanced Python Scheduler) 基于 Quartz 的一个 Python 定时任务框架,实现了 Quartz 的所有功能,使用起来十分方便。提供了基于日期、固定时间间隔以及 crontab 类型的任务,并且可以持久化任务。基于这些功能,我们可以很方便的实现一个 Python 定时任务系统。
from apscheduler.schedulers.blocking import BlockingScheduler
from datetime import datetime
def job():
print(datetime.now().strftime("%Y-%m-%d %H:%M:%S"))
sched = BlockingScheduler()
sched.add_job(job, 'interval', seconds=5, id='my_job_id')
sched.start()
Job 作为 APScheduler 最小执行单位。创建 Job 时指定执行的函数,函数中所需参数,Job 执行时的一些设置信息。
id: 指定作业的唯一 ID。trigger: 确定 Job 的执行时间。executor: 指定执行器名称。max_instances: 执行此 Job 的最大实例数。misfire_grace_time: Job 的延迟执行时间容差。目前 APScheduler 支持三种触发器:
根据实际 scheduler 选择不同的执行器,如 ThreadPoolExecutor, ProcessPoolExecutor, AsyncIOExecutor 等。
决定任务的保存方式,默认存储在内存中 (MemoryJobStore),重启后就没有了。APScheduler 支持的任务存储器有:jobstores.memory, jobstores.mongodb, jobstores.redis, jobstores.sqlalchemy 等。
APScheduler 支持的调度器方式如下,比较常用的为 BlockingScheduler 和 BackgroundScheduler。
BlockingScheduler: 适用于调度程序是进程中唯一运行的进程,调用 start 函数会阻塞当前线程。BackgroundScheduler: 适用于调度程序在应用程序的后台运行,调用 start 后主线程不会阻塞。Celery 是一个简单,灵活,可靠的分布式系统,用于处理大量消息,同时为操作提供维护此类系统所需的工具,也可用于任务调度。Celery 的配置比较麻烦,如果你只是需要一个轻量级的调度工具,Celery 不会是一个好选择。
Celery 是一个强大的分布式任务队列,它可以让任务的执行完全脱离主程序,甚至可以被分配到其他主机上运行。我们通常使用它来实现异步任务 (async task) 和定时任务 (crontab)。
Celery 架构采用典型的生产者 - 消费者模式,主要由以下部分组成:
需要注意,Celery 本身并不具备任务的存储功能,在调度任务的时候肯定是要把任务存起来的,因此在使用 Celery 的时候还需要搭配一些具备存储、访问功能的工具,比如:消息队列、Redis 缓存、数据库等。官方推荐的是消息队列 RabbitMQ,有些时候使用 Redis 也是不错的选择。
Apache Airflow 是 Airbnb 开源的一款数据流程工具,目前是 Apache 孵化项目。以非常灵活的方式来支持数据的 ETL 过程,同时还支持非常多的插件来完成诸如 HDFS 监控、邮件通知等功能。Airflow 支持单机和分布式两种模式,支持 Master-Slave 模式,支持 Mesos 等资源调度,有非常好的扩展性。被大量公司采用。
Airflow 使用 Python 开发,它通过 DAGs (Directed Acyclic Graph, 有向无环图) 来表达一个工作流中所要执行的任务,以及任务之间的关系和依赖。
在一个可扩展的生产环境中,Airflow 含有以下组件:
Worker 的具体实现由配置文件中的 executor 来指定,Airflow 支持多种 Executor:
SequentialExecutor: 单进程顺序执行,一般只用来测试。LocalExecutor: 本地多进程执行。CeleryExecutor: 使用 Celery 进行分布式任务调度。KubernetesExecutor: 创建临时 POD 执行每次任务。生产环境一般使用 CeleryExecutor 和 KubernetesExecutor。
以上介绍了八种 Python 定时任务的实现方案,各有优劣,适用场景不同:
在实际项目中,应根据任务复杂度、是否需要持久化、是否涉及分布式以及团队技术栈来选择最合适的方案。

微信公众号「极客日志」,在微信中扫描左侧二维码关注。展示文案:极客日志 zeeklog
使用加密算法(如AES、TripleDES、Rabbit或RC4)加密和解密文本明文。 在线工具,加密/解密文本在线工具,online
解析常见 curl 参数并生成 fetch、axios、PHP curl 或 Python requests 示例代码。 在线工具,curl 转代码在线工具,online
将字符串编码和解码为其 Base64 格式表示形式即可。 在线工具,Base64 字符串编码/解码在线工具,online
将字符串、文件或图像转换为其 Base64 表示形式。 在线工具,Base64 文件转换器在线工具,online
将 Markdown(GFM)转为 HTML 片段,浏览器内 marked 解析;与 HTML转Markdown 互为补充。 在线工具,Markdown转HTML在线工具,online
将 HTML 片段转为 GitHub Flavored Markdown,支持标题、列表、链接、代码块与表格等;浏览器内处理,可链接预填。 在线工具,HTML转Markdown在线工具,online