基本介绍

django-apscheduler支持三种调度任务:固定时间间隔,固定时间点(日期),Crontab 命令。同时,它还支持异步执行、后台执行调度任务 配置简单、功能齐全、使用灵活、支持windows和linux,适合中小型项目。

django-apscheduler中相关的概念和python的定时任务框架apscheduler中的概念是一样的。

  • 可以独立运行,也可以放在程序(如Django、Flask)中。
  • 灵活,可以在程序开始前开启定时任务,也可以执行到某个任务时,立即可开启定时任务。
  • 如果依赖程序的话,会占用程序资源。

基础组件:
APScheduler 有四种组件,分别是:调度器(scheduler),作业存储(job store),触发器(trigger),执行器(executor)。

  • schedulers(调度器)
    它是任务调度器,属于控制器角色。它配置作业存储器和执行器可以在调度器中完成,例如添加、修改和移除作业。
  • triggers(触发器)
    描述调度任务被触发的条件。不过触发器完全是无状态的。
  • job stores(作业存储器)
    任务持久化仓库,默认保存任务在内存中,也可将任务保存都各种数据库中。
  • executors(执行器)
    负责处理作业的运行,它们通常通过在作业中提交指定的可调用对象到一个线程或者进城池来进行。当作业完成时,执行器将会通知调度器。

schedulers(调度器):
它提供 7 种调度器,能够满足我们各种场景的需要。例如:后台执行某个操作,异步执行操作等。调度器分别是:

  • BlockingScheduler : 调度器在当前进程的主线程中运行,也就是会阻塞当前线程。
  • BackgroundScheduler : 调度器在后台线程中运行,不会阻塞当前线程。(Django框架使用)
  • AsyncIOScheduler : 结合 asyncio 模块(一个异步框架)一起使用。
  • GeventScheduler : 程序中使用 gevent(高性能的Python并发框架)作为IO模型,和 - GeventExecutor 配合使用。
  • TornadoScheduler : 程序中使用 Tornado(一个web框架)的IO模型,用 ioloop.add_timeout 完成定时唤醒。
  • TwistedScheduler : 配合 TwistedExecutor,用 reactor.callLater 完成定时唤醒。scrapy爬虫框
  • QtScheduler : 你的应用是一个 Qt 应用,需使用QTimer完成定时唤醒。

triggers(触发器):

  • date 触发器 作业任务只会执行一次。它表示特定的时间点触发。它的参数如下:

  • interval 触发器 固定时间间隔触发。interval 间隔调度,参数如下:

  • cron 触发器 在特定时间周期性地触发,和Linux crontab格式兼容。

安装插件

python
pip install django-apscheduler
或者
pip install apscheduler

使用插件

修改settings.py增加以下代码:

python
INSTALLED_APPS = [
'django_apscheduler', # 新加入的定时任务插件django-apscheduler
]

迁移数据库

因为django-apscheduler会创建表来存储定时任务的一些信息,所以将app加入之后需要迁移数据。

plaintext
python manage.py migrate

django_apscheduler_djangojob—用于存储任务的表格

django_apscheduler_djangojobexecution—用于存储任务执行状态的表格

  • status: 执行状态
  • duration: 执行了多长时间
  • exception: 是否出现了什么异常

这两个表用来管理你所需要的定时任务,然后就开始在任意view.py下写你需要实现的任务。

创建任务

装饰器

在任意view.py中实现代码

示例:

python
import time
from apscheduler.schedulers.background import BackgroundScheduler
from django_apscheduler.jobstores import DjangoJobStore, register_job, register_events

print('django-apscheduler')

# 实例化调度器
scheduler = BackgroundScheduler()
# 调度器使用DjangoJobStore()
scheduler.add_jobstore(DjangoJobStore(), "default")

# 添加任务
# 每隔5s执行这个任务
@register_job(scheduler,"interval", seconds=5,args=['测试'],id='job1')
def job1(name):
# 具体要执行的代码
print('%s 任务运行成功!%s' %(name,time.strftime("%Y-%m-%d %H:%M:%S")))

# 监控任务——注册定时任务
register_events(scheduler)
# 调度器开始运行
scheduler.start()
#停止APScheduler运行(如果报错,调度器就立即停止执行)
#scheduler.shutdown()

django服务启动后,这个任务会被存储在django_apscheduler_djangojob表中,并按照设置定时执行。

@register_job装饰器参数说明:

  1. scheduler: 指定调度器

  2. trigger: 任务执行的方式,共有三种: date、interval、cron。
    ‘date’ + ‘run_date’ 的参数组合, 能实现单次任务。
    例子: 2019-07-07 22:49:00 执行任务
    @register_job(scheduler, ‘date’, id=‘test’, run_date=‘2019-07-07 22:49:00’)

    ‘interval’ + ‘hours’ + ‘minutes’’'+ … 的参数组合,能实现间隔性任务。
    例子:每隔3个半小时执行任务
    @register_job(scheduler, ‘interval’, id=‘test’, hours=3, minutes=30)
    还有seconds,days参数可以选择
    注:如果任务需要执行10秒,而间隔设置为1秒,它是不会给你开10个线程同时去执行10个任务的。它会错过其他任务直到当前任务完成。

    ‘cron’ + ‘hour’ + ‘minute’+…的参数组合,能实现cron类的任务。
    例子:每天的8点半执行任务
    @register_job(scheduler, ‘cron’, id=‘test’, hour=8, minute=30)
    还有day,second,month等参数可以选择。

  3. id: 任务的名字,不传的话会自动生成。不过为了之后对任务进行暂停、开启、删除等操作,建议给一个名字。并且是唯一的,如果多个任务取一个名字,之前的任务就会被覆盖。

  4. args: list类型,执行代码所需要的参数。

  5. next_run_time:datetime类型,开始执行时间。

add_job函数

装饰器的方法适合于写代码的人自己创建任务,如果想让用户通过页面创建定时任务,需要使用add_job函数。

示例:

python
#与前端的接口
class DjangoCronjobViewSet(ModelViewSet):
queryset = DjangoCronjob.objects.all()
serializer_class = DjangoCronjobSerializer

filter_backends = [filters.SearchFilter, filters.OrderingFilter, DjangoFilterBackend]
# 指定自定义的过滤器
from .serializers import DjangoCronjobFilter
filterset_class = DjangoCronjobFilter


#创建任务
def create(self, request, *args, **kwargs):
serializer = self.get_serializer(data=request.data)
try:
serializer.is_valid(raise_exception=True)
# self.perform_create(serializer)
server_ids = request.data.get('server_ids')
#脚本内容
cron_script = request.data.get('cron_script')
#脚本名
command = request.data.get('command')
note = request.data.get('note')
schedule_str = str(request.data.get('schedule'))
schedule_list = schedule_str.split(' ')
minute = schedule_list[0]
hour = schedule_list[1]
day = schedule_list[2]
month = schedule_list[3]
weekday = schedule_list[4]
name = request.data.get('name')
try:
scheduler.add_job(execute, 'cron', minute=minute,hour=hour,day=day,month=month,week=weekday,id=name,args=[command,cron_script,server_ids],replace_existing=True)
# 如果没有运行再启动调度器,如果调度器之前没有被启动过,那么它将会开始执行所有已经添加并且没有被暂停的任务。
if not scheduler.running:
scheduler.start()
#添加任务后暂停运行,后面手动开启
job = scheduler.get_job(name)
if job:
job.pause()
except Exception as e:
res = {'code': 500, 'msg': '%s' % e}
return Response(res)
job = DjangoJob.objects.get(id=name)
DjangoCronjob.objects.create(
job=job,
server_ids=server_ids,
cron_script=cron_script,
command=command,
schedule=schedule_str,
note=note,
)
res = {'code': 200, 'msg': '创建成功'}
except Exception as e:
res = {'code': 400, 'msg': '%s' %e}
return Response(res)

#移除任务
def destroy(self, request, *args, **kwargs):
instance = self.get_object()
try:
DjangoJobExecution.objects.filter(job_id=str(instance)).delete() #删除执行记录
self.perform_destroy(instance) #删除数据
scheduler.remove_job(str(instance)) #移除任务
res = {'code': 200, 'msg': '删除成功'}
except Exception as e:
res = {'code': 500, 'msg': '%s' %e}
return Response(res)

# 具体要执行的代码
def execute(command,cron_script,server_ids):
pass

这样就可以通过前端创建任务了。

add_job函数参数说明:
和装饰器的参数大同小异,只是第一个参数不同。
如果具体要执行的函数和调用它的函数在一个文件中,那么只需要传递这个函数名就可以了(如上面的例子)。
如果业务代码写到另外一个文件中,view.py中只写前后端交互的接口函数,这种情况下传递的参数为一个字符串,格式为: ‘package.module:some.object’,即 包名.模块:函数名。

触发器执行示例

  • date 触发器
python
# 在 2017-12-13 时刻运行一次 job_func 方法
scheduler .add_job(job_func, 'date', run_date=date(2017, 12, 13), args=['text'])
# 在 2017-12-13 14:00:00 时刻运行一次 job_func 方法
scheduler .add_job(job_func, 'date', run_date=datetime(2017, 12, 13, 14, 0, 0), args=['text'])
# 在 2020-12-13 14:00:01 时刻运行一次 job_func 方法
scheduler.add_job(job3,"date",run_date='2020-12-13 14:00:01',args=['王飞'],id="job3")
# 每天0点执行函数的代码,0点的话,hour可以不用写
app.scheduler.add_job(函数名, "cron", hour=0, args=[函数需要传的参数])
#每天凌晨3点执行代码
app.scheduler.add_job(函数名, "cron", hour=3, args=[app])
#如果date后面没有参数的话,就是立刻执行代码,一般测试的时候用
app.scheduler.add_job(函数名, "date", args=[app])
  • interval 触发器
python
# 每隔两分钟执行一次 job_func 方法
scheduler .add_job(job_func, 'interval', minutes=2)
# 在 2017-12-13 14:00:01 ~ 2017-12-13 14:00:10 之间, 每隔两分钟执行一次 job_func 方法
scheduler .add_job(job_func, 'interval', minutes=2, start_date='2017-12-13 14:00:01' , end_date='2017-12-13 14:00:10')
  • cron 触发器
python
# 在每天的2点35分36分37分 执行 job_func 任务
scheduler.add_job(job4,"cron",hour='2', minute='35-37',args=['王涛'],id="job4")

作业存储(job store)

有两种添加方法,其中一种是 add_job(), 另一种则是@register_job()修饰器来修饰函数。

这个两种办法的区别是:第一种方法返回一个 Job 的实例,可以用来改变或者移除 job。第二种方法只适用于应用运行期间不会改变的 job。

执行器(executor)

执行调度任务的模块。最常用的 executor 是 ThreadPoolExecutor,存储路径是在Django数据库中。

执行器 executors 可以使用线程池(ThreadPoolExecutor)和进程池(ProcessPoolExecutor)

  • 线程池
python
from apscheduler.executors.pool import ThreadPoolExecutor
ThreadPoolExecutor(max_workers)
ThreadPoolExecutor(20) # 最多20个线程同时执行

executors = {
'default': ThreadPoolExecutor(20)
}
scheduler = BackgroundScheduler(executors=executors)
  • 进程池
python
from apscheduler.executors.pool import ProcessPoolExecutor
ProcessPoolExecutor(max_workers)
ProcessPoolExecutor(5) # 最多5个进程同时执行

executors = {
'default': ProcessPoolExecutor(3)
}
scheduler = BackgroundScheduler(executors=executors)

其他功能

django-apscheduler框架还提供了很多操作定时任务的函数。比如:

  • 删除任务:scheduler.remove_job(job_name)
  • 暂停任务:scheduler.pause_job(job_name)
  • 开启任务:scheduler.resume_job(job_name)
  • 获取所有任务:scheduler.get_jobs()
  • 修改任务:scheduler.modify_job(job_name)

注:修改任务只能修改参数,如果要修改执行时间的话,有3种方法:
第一就把任务删了重新创建;
第二直接操作数据库;
第三用到下面重设任务。

  • 重设任务:scheduler.reschedule_job(job_name)
python
scheduler.reschedule_job(job_id="job1", trigger='interval', minutes=1)

总结:

django-apscheduler使用起来十分方便。提供了基于日期、固定时间间隔以及crontab 类型的任务,我们可以在主程序的运行过程中快速增加新作业或删除旧作业,并且作业会存储在数据库中,当调度器重启时,不必重新添加作业,作业会恢复原状态继续执行。django-apscheduler可以当作一个跨平台的调度工具来使用,可以做为 linux 系统crontab 工具或 windows 计划任务程序的替换。需要注意的是,apscheduler不是一个守护进程或服务,它自身不带有任何命令行工具。它主要是要在现有的应用程序中运行。