schedule

使用起来比较简单的一个包

安装:

pip install schedule

具体用法:

import schedule

# add schedule job
schedule.every(10).seconds.do(lambda: print("running"))

# run scheduler
while True:
    schedule.run_pending()
    time.sleep(1)

运行带有参数的 job

def func(name: str):
    print(f"My name is {name}")
    
schedule.every(5).seconds.do(func, name="Tom")
while True:
    schedule.run_pending()
    time.sleep(1)

Apscheduler

一个功能更为完整的包

安装:

pip install apscheduler

一些基本概念:

Triggers:任务触发逻辑

  • cron:cron 格式触发
  • interval:固定时间间隔触发
  • date:在某固定日期触发一次
  • combine:组合条件触发

Scheduler

  • BlockingScheduler: 阻塞式,当程序只运行这个 scheduler 时使用
  • BackgroundScheduler:调度器在后台运行

Executor

  • ThreadPoolExecutor:默认使用多线程执行器
  • ProcessPoolExecutor:如果是 CPU 密集型任务可以使用多进程执行器

Job store:如果任务调度信息存在内存中,当程序退出后会丢失,可以其他存储器进行持久化存储

  • MemoryJobStore: 默认使用内存存储
  • SQLAlchemyJobStore
  • MongoDBJobStore
  • etc.

创建 scheduler

# 创建 BlockingScheduler
from apscheduler.schedulers.blocking import BlockingScheduler
scheduler = BlockingScheduler()

# 创建 BackgroundScheduler
from apscheduler.schedulers.background import BackgroundScheduler
scheduler = BackgroundScheduler()

# 自定义 job store、executor、job defaults、time zone
import pytz
from apscheduler.jobstores.sqlalchemy import SQLAlchemyJobStore
from apscheduler.executors.pool import ThreadPoolExecutor, ProcessPoolExecutor

scheduler = BackgroundScheduler(
    jobstores={'mongo': MongoDBJobStore()},
    executors={'processpool': ProcessPoolExecutor(5)},
    job_defaults = {
        'coalesce': False,
        'max_instances': 3
    },
    timezone=pytz.utc
)
    

新增任务

from datetime import datetime
def tick():
    print(datetime.now().strftime('%Y-%m-%d %H:%M:%S'))

# add job with cron trigger
scheduler.add_job(tick, "cron", second=0) # every minute at second 0

先创建 triggers 再创建任务

from apscheduler.triggers.interval import IntervalTrigger
from apscheduler.triggers.cron import CronTrigger

c_trigger = CronTrigger(second=0) # every minute at second 0
i_trigger = IntervalTrigger(seconds=10) # every 10 seconds

scheduler.add_job(tick, c_trigger)
scheduler.add_job(tick, i_trigger)

带参数任务

def whoami(name: str, age: int):
    print(f"My name is {name}, age is {age}")

scheduler.add_job(whoami, i_trigger, args=("Tom",))
# or
scheduler.add_job(whoami, i_trigger, kwargs={"name":"Tom", "age":13})

启动调度器

# start
scheduler.start()

# shutdown
scheduler.shutdown()