当前位置: 首页 > news >正文

python轻量级异步定时任务

先看一段代码:

from apscheduler.schedulers.background import BackgroundScheduler
import time
import logging# 设置日志记录器
logging.basicConfig()
logging.getLogger('apscheduler').setLevel(logging.DEBUG)def job_function():print("This is a scheduled job.")scheduler = BackgroundScheduler()# 添加一个每 5 分钟执行一次的任务
scheduler.add_job(job_function, 'interval', minutes=5)# 启动调度器
scheduler.start()try:# 主程序继续执行其他操作while True:print("Main program is running...")time.sleep(1)
except (KeyboardInterrupt, SystemExit):# 当主程序退出时,关闭调度器scheduler.shutdown()

这里面就是一个异步的定时任务。

其中的 apscheduler,是指:Advanced Python Scheduler

APScheduler 是一个功能强大的 Python 任务调度库,可以用于在指定的时间间隔或特定的时间点执行任务。BackgroundScheduler是 APScheduler 中的一种调度器类型,它在后台运行,不会阻塞主程序的执行。

多种触发方式

  • BackgroundScheduler支持多种触发任务的方式,包括:
    • 固定时间间隔(interval):按照固定的时间间隔执行任务。例如,每隔 5 分钟执行一次任务。
    • 特定日期时间(cron):使用类似于 Unix cron 表达式的方式来指定任务的执行时间。这允许你非常精确地控制任务的执行时间,例如每天的特定时间、每周的特定日期等。
    • 一次性执行(date):在指定的日期时间执行一次任务。

任务管理

  • BackgroundScheduler提供了丰富的任务管理功能,包括:
    • 添加任务:使用add_job方法可以向调度器中添加一个任务。你可以指定任务的执行函数、触发方式以及其他参数。
    • 暂停和恢复任务:可以使用pause_jobresume_job方法暂停和恢复特定的任务。这在需要临时停止任务执行或在特定条件下恢复任务执行时非常有用。
    • 删除任务:使用remove_job方法可以从调度器中删除一个任务。这可以在任务不再需要执行时进行清理。
  1. 错误处理

    • 如果任务执行过程中出现错误,BackgroundScheduler会捕获并记录错误信息,以便进行故障排除。
    • 可以通过设置日志记录器来记录任务执行的详细信息,包括错误信息。这有助于及时发现和解决问题。

与其他库的集成

  • BackgroundScheduler可以与其他 Python 库和框架很好地集成。例如,可以在 Flask 或 Django 等 Web 框架中使用它来执行后台任务。
  • 它还可以与数据库、消息队列等其他系统进行交互,以实现更复杂的任务调度和处理流程。

其实准确的来说,他的应用应该是这样的:

from apscheduler.schedulers.background import BackgroundSchedulerdef job1(bane, age):print("Job 1 executed.")def job2():print("Job 2 executed.")def my_task():print("This task runs at 10 am every day.")scheduler = BackgroundScheduler()scheduler.job1(func=run_usage, args=(None, None), id="my_job", trigger="interval", minutes=30, replace_existing=False)# 尝试添加第二个任务,与第一个任务 ID 相同
scheduler.add_job(job2, 'interval', minutes=10, id='my_job', replace_existing=True)# 设置 cron 表达式,每天上午 10 点执行任务
scheduler.add_job(my_task, trigger='cron', hour=10)scheduler.start()try:while True:pass
except (KeyboardInterrupt, SystemExit):scheduler.shutdown()

在使用BackgroundScheduler(以及 APScheduler 中的其他调度器类型)的add_job()方法时,replace_existing参数用于控制当添加一个新任务时,如果已经存在一个具有相同 ID 的任务,应该如何处理。

具体作用如下:

  • 如果replace_existing=True

    • 当尝试添加一个新任务,且新任务的 ID 与已存在任务的 ID 相同时,新任务将替换旧任务。这意味着旧任务将被停止并从调度器中移除,新任务将按照其指定的参数进行调度执行。
    • 例如,你可能在程序运行过程中需要更新某个任务的执行函数、触发方式或其他参数。通过设置这个参数为True,可以确保新的设置生效,而无需手动删除旧任务再添加新任务。
  • 如果replace_existing=False(默认值):

    • 当尝试添加一个新任务,且新任务的 ID 与已存在任务的 ID 相同时,新任务将不会被添加,并且不会对已存在的任务产生任何影响。
    • 在这种情况下,如果想要更新一个任务,需要先使用remove_job()方法手动删除旧任务,然后再添加新任务。

所以:在这个例子中,首先添加了一个任务job1,然后尝试添加一个具有相同 ID 的任务job2。当replace_existing=True时,job2将替换job1;当replace_existing=False时,job2不会被添加。


http://www.mrgr.cn/news/24291.html

相关文章:

  • QT QPrinter无弹窗后台打印
  • 构建高效 Python Web API:RESTful 设计与 GraphQL 实践
  • iOS 知识点记录
  • 海康威视相机在QTcreate上的使用教程
  • 超声波 HC-SR04 的使用 CubeMx + STM32F103C8T6 【含两个】
  • 油耳用什么掏耳朵比较好?质量最好的可视挖耳勺推荐
  • 非标独立设计选型--二十二--减速机选型计算
  • uniapp中基于vue3实现输入验证码功能
  • 问题:vite首次加载慢
  • OpenFeign
  • 原型模式详细介绍和代码实现
  • 一个简约的uniapp登录界面,基于uniapp+vue3+uview-plus
  • Vue3.5正式上线,有哪些新特性和用法?
  • Unity6的GPUDriven渲染到底是什么?
  • JavaScript高阶面试题:(第一天)
  • maven中的仓库的配置与优先级
  • 水平垂直居中的几种方法(总结)
  • 基于Spring Boot的电子请柬私人定制销售平台的设计与实现---附源码78900
  • 史级低价1元《魔域口袋版》神话斗神·黑悟空 带领战队傲视群雄
  • 住宅建筑电气火灾预防