schedule
使用起来比较简单的一个包
安装:
pip install schedule
具体用法:
import schedule
# add schedule job
schedule.every(10).seconds.do(lambda: print("running"))
# run scheduler
while True:
schedule.run_pending()
time.sleep(1)
运行带有参数的 job
def func(name: str):
print(f"My name is {name}")
schedule.every(5).seconds.do(func, name="Tom")
while True:
schedule.run_pending()
time.sleep(1)
Apscheduler
一个功能更为完整的包
安装:
pip install apscheduler
一些基本概念:
Triggers:任务触发逻辑
- cron:cron 格式触发
- interval:固定时间间隔触发
- date:在某固定日期触发一次
- combine:组合条件触发
Scheduler
BlockingScheduler
: 阻塞式,当程序只运行这个 scheduler 时使用BackgroundScheduler
:调度器在后台运行
Executor
ThreadPoolExecutor
:默认使用多线程执行器ProcessPoolExecutor
:如果是 CPU 密集型任务可以使用多进程执行器
Job store:如果任务调度信息存在内存中,当程序退出后会丢失,可以其他存储器进行持久化存储
MemoryJobStore
: 默认使用内存存储SQLAlchemyJobStore
MongoDBJobStore
- etc.
创建 scheduler
# 创建 BlockingScheduler
from apscheduler.schedulers.blocking import BlockingScheduler
scheduler = BlockingScheduler()
# 创建 BackgroundScheduler
from apscheduler.schedulers.background import BackgroundScheduler
scheduler = BackgroundScheduler()
# 自定义 job store、executor、job defaults、time zone
import pytz
from apscheduler.jobstores.sqlalchemy import SQLAlchemyJobStore
from apscheduler.executors.pool import ThreadPoolExecutor, ProcessPoolExecutor
scheduler = BackgroundScheduler(
jobstores={'mongo': MongoDBJobStore()},
executors={'processpool': ProcessPoolExecutor(5)},
job_defaults = {
'coalesce': False,
'max_instances': 3
},
timezone=pytz.utc
)
新增任务
from datetime import datetime
def tick():
print(datetime.now().strftime('%Y-%m-%d %H:%M:%S'))
# add job with cron trigger
scheduler.add_job(tick, "cron", second=0) # every minute at second 0
先创建 triggers 再创建任务
from apscheduler.triggers.interval import IntervalTrigger
from apscheduler.triggers.cron import CronTrigger
c_trigger = CronTrigger(second=0) # every minute at second 0
i_trigger = IntervalTrigger(seconds=10) # every 10 seconds
scheduler.add_job(tick, c_trigger)
scheduler.add_job(tick, i_trigger)
带参数任务
def whoami(name: str, age: int):
print(f"My name is {name}, age is {age}")
scheduler.add_job(whoami, i_trigger, args=("Tom",))
# or
scheduler.add_job(whoami, i_trigger, kwargs={"name":"Tom", "age":13})
启动调度器
# start
scheduler.start()
# shutdown
scheduler.shutdown()