Python中定时任务框架APScheduler的快速入门指南
发布时间 - 2026-01-11 02:12:31 点击率:次前言

大家应该都知道在编程语言中,定时任务是常用的一种调度形式,在Python中也涌现了非常多的调度模块,本文将简要介绍APScheduler的基本使用方法。
一、APScheduler介绍
APScheduler是基于Quartz的一个python定时任务框架,实现了Quartz的所有功能,使用起来十分方便。提供了基于日期、固定时间间隔以及crontab类型的任务,并且可以持久化任务。
APScheduler提供了多种不同的调度器,方便开发者根据自己的实际需要进行使用;同时也提供了不同的存储机制,可以方便与Redis,数据库等第三方的外部持久化机制进行协同工作,总之功能非常强大和易用。
在Python的世界中,另外一个齐名的调度模块是Celery,功能也非常的强大,号称分布式的调度器,感兴趣的读者可以自行进行研究。
官网文档地址:http://apscheduler.readthedocs.io/en/latest/
安装包位置: https://pypi.python.org/pypi/APScheduler/
在系统中,如何进行安装呢?其实非常简单,基于pip直接安装即可:
pip install APScheduler
二、APScheduler的主要的调度类
在APScheduler中有以下几个非常重要的概念,需要大家理解:
1、触发器(trigger)
包含调度逻辑,每一个作业有它自己的触发器,用于决定接下来哪一个作业会运行,根据trigger中定义的时间点,频率,时间区间等等参数设置。除了他们自己初始配置以外,触发器完全是无状态的。
2、作业存储(job store)
存储被调度的作业,默认的作业存储是简单地把作业保存在内存中,其他的作业存储是将作业保存在数据库中。一个作业的数据讲在保存在持久化作业存储时被序列化,并在加载时被反序列化。调度器不能分享同一个作业存储。job store支持主流的存储机制:redis, mongodb, 关系型数据库, 内存等等
3、执行器(executor)
处理作业的运行,他们通常通过在作业中提交制定的可调用对象到一个线程或者进城池来进行。当作业完成时,执行器将会通知调度器。基于池化的操作,可以针对不同类型的作业任务,更为高效地使用cpu的计算资源。
调度器(scheduler)
通常在应用只有一个调度器,调度器提供了处理这些的合适的接口。配置作业存储和执行器可以在调度器中完成,例如添加、修改和移除作业。
这里简单列一下常用的若干调度器:
- BlockingScheduler:仅可用在当前你的进程之内,与当前的进行共享计算资源
- BackgroundScheduler: 在后台运行调度,不影响当前的系统计算运行
- AsyncIOScheduler: 如果当前系统中使用了async module,则需要使用异步的调度器
- GeventScheduler: 如果使用了gevent,则需要使用该调度
- TornadoScheduler: 如果使用了Tornado, 则使用当前的调度器
- TwistedScheduler:Twister应用的调度器
- QtScheduler: Qt的调度器
由此可知,在APscheduler的调度器中,是与底层的实现机制紧密相关的,需要依据当前的计算模型来动态选择调度器。
三、APScheduler的job管理
Job是APScheduler中的核心,其承接目前需要执行的工作和任务,其可以在系统运行过程中动态地进行增加/修改/删除/查询等操作。
3.1 Job的新增
共有两种方式进行新增job的操作:
基于add_job来动态增加
代码示例:
sched.add_job(job_function, 'cron', day_of_week='mon-fri', hour='0-9', minute="*", second="*/4")
基于修饰器scheduled_job来动态装饰job的实际函数
代码示例:
@sched.scheduled_job('cron', id='my_job_id', day='last sun')
def some_decorated_task():
print("I am printed at 00:00:00 on the last Sunday of every month!")
3.2 移除作业
job = scheduler.add_job(myfunc, 'interval', minutes=2)
job.remove()
Same, using an explicit job ID:
scheduler.add_job(myfunc, 'interval', minutes=2, id='my_job_id')
scheduler.remove_job('my_job_id')
基于job id来动态移除特定的job.
3.3 暂停和恢复作业
暂停作业:
– apscheduler.job.Job.pause()
– apscheduler.schedulers.base.BaseScheduler.pause_job()
恢复作业:
– apscheduler.job.Job.resume()
– apscheduler.schedulers.base.BaseScheduler.resume_job()
3.4. 获得job列表
获得调度作业的列表,可以使用 get_jobs() 来完成,它会返回所有的job实例。或者使用 print_jobs() 来输出所有格式化的作业列表。
3.5. 修改作业 job
可以通过apscheduler.job.Job.modify() or modify_job()来动态修改job的属性信息,除了job id无法修改之外,都是可以修改的。
job.modify(max_instances=6, name='Alternate name')
另外我们也可以通过apscheduler.job.Job.reschedule() or reschedule_job()动态重新设置trigger,示例如下:
scheduler.reschedule_job('my_job_id', trigger='cron', minute='*/5')
3.6. 关闭调度器
默认情况下调度器会等待所有正在运行的作业完成后,关闭所有的调度器和作业存储。如果你不想等待,可以将wait选项设置为False。
scheduler.shutdown() scheduler.shutdown(wait=False)
四、 APScheduler的代码示例
这里使用装饰器来展示一个调度的使用:
from apscheduler.schedulers.blocking import BlockingScheduler
sched = BlockingScheduler()
@sched.scheduled_job('interval', seconds=3)
def timed_job():
print('This job is run every three minutes.')
@sched.scheduled_job('cron', day_of_week='mon-fri', hour='0-9', minute='30-59', second='*/3')
def scheduled_job():
print('This job is run every weekday at 5pm.')
print('before the start funciton')
sched.start()
print("let us figure out the situation")
代码说明:
在这段代码中,使用了当前进程中共享计算资源的BlockingScheduler,共使用了2个调度器,其中一个是间隔3秒的执行。
另外一个调度器是模仿cron来执行的,在周一到周五其间,每天的0点到9点直接,在30分到59分之间执行,执行频次为3秒。
基于正常代码的示例如下:
from apscheduler.schedulers.background import BackgroundScheduler
from apscheduler.schedulers.blocking import BlockingScheduler
import datetime
import time
import logging
def job_function():
print "Hello World" + " " + str(datetime.datetime.now())
if __name__ == '__main__':
log = logging.getLogger('apscheduler.executors.default')
log.setLevel(logging.INFO) # DEBUG
fmt = logging.Formatter('%(levelname)s:%(name)s:%(message)s')
h = logging.StreamHandler()
h.setFormatter(fmt)
log.addHandler(h)
print('start to do it')
sched = BlockingScheduler()
# Schedules job_function to be run on the third Friday
# of June, July, August, November and December at 00:00, 01:00, 02:00 and 03:00
sched.add_job(job_function, 'cron', day_of_week='mon-fri', hour='0-9', minute="*", second="*/4")
sched.start()
五、某个异常问题的思考
在执行以下代码之时候,定时任务一直未能正常生效:
from apscheduler.schedulers.background import BackgroundScheduler
from apscheduler.schedulers.blocking import BlockingScheduler
import datetime
import time
def job_function():
print "Hello World" + " " + str(datetime.datetime.now())
if __name__ == '__main__':
print('start to do it')
sched = BlockingScheduler()
sched.add_job(job_function, 'cron', day_of_week='mon-fri', hour='0-9', minute="*", second="*/4")
sched.start()
代码报错的错误信息为:
No handlers could be found for logger “apscheduler.scheduler”
从字面意思来分析,是没有logging模块的logger存在,故需要添加上去即可。
新增对应的logging信息即可:
import logging
log = logging.getLogger('apscheduler.executors.default')
log.setLevel(logging.INFO) # DEBUG
fmt = logging.Formatter('%(levelname)s:%(name)s:%(message)s')
h = logging.StreamHandler()
h.setFormatter(fmt)
log.addHandler(h)
后来笔者重新做了一次执行,即使移除掉logging的内容,依然可以正常执行,故可以推测为需要动态引入一次依赖包logging即可。
六、总结
APScheduler是一个非常强大易用的类库,为了我们简单快捷的解决问题提供了很多的工具,并且提供了很多灵活的扩展点,只要你添加若干的web页面,就可以创建一个强大的任务调度系统,不是吗?
好了,以上就是这篇文章的全部内容了,希望本文的内容对大家的学习或者工作能带来一定的帮助,如果有疑问大家可以留言交流,谢谢大家对的支持。
# python
# 定时任务框架
# apscheduler
# 安装
# python任务调度框架
# Python实现定时任务
# Python3实现定时任务的四种方式
# python BlockingScheduler定时任务及其他方式的实现
# python 实现定时任务的四种方式
# Linux下Python脚本自启动与定时任务详解
# Python3.6 Schedule模块定时任务(实例讲解)
# 对Python定时任务的启动和停止方法详解
# 详解使用python crontab设置linux定时任务
# python Celery定时任务的示例
# Python中实现定时任务详解
# 使用了
# 移除
# 自己的
# 可以通过
# 执行器
# 另外一个
# 易用
# 要使
# 则需
# 都是
# 器中
# 几个
# 如果你
# 序列化
# 好了
# 由此可知
# 将会
# 两种
# 中有
# 其他的
相关栏目:
【
网站优化151355 】
【
网络推广146373 】
【
网络技术251813 】
【
AI营销90571 】
相关推荐:
如何快速重置建站主机并恢复默认配置?
Laravel如何使用Gate和Policy进行授权?(权限控制)
Laravel如何实现文件上传和存储?(本地与S3配置)
Laravel怎么使用Markdown渲染文档_Laravel将Markdown内容转HTML页面展示【实战】
如何在阿里云域名上完成建站全流程?
nodejs redis 发布订阅机制封装实现方法及实例代码
Laravel广播系统如何实现实时通信_Laravel Reverb与WebSockets实战教程
黑客如何通过漏洞一步步攻陷网站服务器?
如何在VPS电脑上快速搭建网站?
文字头像制作网站推荐软件,醒图能自动配文字吗?
详解Android图表 MPAndroidChart折线图
php增删改查怎么学_零基础入门php数据库操作必知基础【教程】
深入理解Android中的xmlns:tools属性
浅谈javascript alert和confirm的美化
JavaScript中的标签模板是什么_它如何扩展字符串功能
如何自己制作一个网站链接,如何制作一个企业网站,建设网站的基本步骤有哪些?
新三国志曹操传主线渭水交兵攻略
Laravel如何保护应用免受CSRF攻击?(原理和示例)
如何为不同团队 ID 动态生成多个独立按钮
Laravel如何集成第三方登录_Laravel Socialite实现微信QQ微博登录
Laravel怎么多语言本地化设置_Laravel语言包翻译与Locale动态切换【手册】
黑客如何利用漏洞与弱口令入侵网站服务器?
如何在香港服务器上快速搭建免备案网站?
Laravel中间件如何使用_Laravel自定义中间件实现权限控制
Laravel如何使用Eloquent ORM进行数据库操作?(CRUD示例)
手机网站制作平台,手机靓号代理商怎么制作属于自己的手机靓号网站?
悟空识字如何进行跟读录音_悟空识字开启麦克风权限与录音
国美网站制作流程,国美电器蒸汽鍋怎么用官方网站?
Firefox Developer Edition开发者版本入口
胶州企业网站制作公司,青岛石头网络科技有限公司怎么样?
Win11怎么关闭资讯和兴趣_Windows11任务栏设置隐藏小组件
java中使用zxing批量生成二维码立牌
Laravel集合Collection怎么用_Laravel集合常用函数详解
Laravel如何实现用户注册和登录?(Auth脚手架指南)
深圳网站制作公司好吗,在深圳找工作哪个网站最好啊?
Laravel如何实现URL美化Slug功能_Laravel使用eloquent-sluggable生成别名【方法】
历史网站制作软件,华为如何找回被删除的网站?
大型企业网站制作流程,做网站需要注册公司吗?
Laravel如何实现事件和监听器?(Event & Listener实战)
JS去除重复并统计数量的实现方法
QQ浏览器网页版登录入口 个人中心在线进入
香港服务器建站指南:免备案优势与SEO优化技巧全解析
敲碗10年!Mac系列传将迎来「触控与联网」双革新
如何快速完成中国万网建站详细流程?
Laravel如何实现数据导出到PDF_Laravel使用snappy生成网页快照PDF【方案】
Win11怎么设置默认图片查看器_Windows11照片应用关联设置
如何在HTML表单中获取用户输入并用JavaScript动态控制复利计算循环
网站建设要注意的标准 促进网站用户好感度!
Java类加载基本过程详细介绍
python中快速进行多个字符替换的方法小结

