国产xxxx99真实实拍_久久不雅视频_高清韩国a级特黄毛片_嗯老师别我我受不了了小说

資訊專欄INFORMATION COLUMN

Python任務調度模塊APScheduler

zxhaaa / 2651人閱讀

介紹

官網文檔:http://apscheduler.readthedoc...
API:http://apscheduler.readthedoc...

APScheduler是一個python的第三方庫,用來提供python的后臺程序。包含四個組件,分別是:

triggers: 任務觸發器組件,提供任務觸發方式

job stores: 任務商店組件,提供任務保存方式

executors: 任務調度組件,提供任務調度方式

schedulers: 任務調度組件,提供任務工作方式

安裝

pip 安裝

$ pip install apscheduler

源碼安裝

$ python setup.py install
簡單的實例
from apscheduler.schedulers.blocking import BlockingScheduler
import time

# 實例化一個調度器
scheduler = BlockingScheduler()
 
def job1():
    print "%s: 執行任務"  % time.asctime()

# 添加任務并設置觸發方式為3s一次
scheduler.add_job(job1, "interval", seconds=3)

# 開始運行調度器
scheduler.start()

輸出:

$ python first.py
Fri Sep  8 20:41:55 2017: 執行任務
Fri Sep  8 20:41:58 2017: 執行任務
...
各組件功能 trigger組件

trigger提供任務的觸發方式,共三種方式:

date:只在某個時間點執行一次run_date(datetime|str)

scheduler.add_job(my_job, "date", run_date=date(2017, 9, 8), args=[])
scheduler.add_job(my_job, "date", run_date=datetime(2017, 9, 8, 21, 30, 5), args=[])
scheduler.add_job(my_job, "date", run_date="2017-9-08 21:30:05", args=[])
# The "date" trigger and datetime.now() as run_date are implicit
sched.add_job(my_job, args=[[])

interval: 每隔一段時間執行一次weeks=0 | days=0 | hours=0 | minutes=0 | seconds=0, start_date=None, end_date=None, timezone=None

scheduler.add_job(my_job, "interval", hours=2)
scheduler.add_job(my_job, "interval", hours=2, start_date="2017-9-8 21:30:00", end_date="2018-06-15 21:30:00)

@scheduler.scheduled_job("interval", id="my_job_id", hours=2)
def my_job():
    print("Hello World")

cron: 使用同linux下crontab的方式(year=None, month=None, day=None, week=None, day_of_week=None, hour=None, minute=None, second=None, start_date=None, end_date=None, timezone=None)

sched.add_job(my_job, "cron", hour=3, minute=30)
sched.add_job(my_job, "cron", day_of_week="mon-fri", hour=5, minute=30, end_date="2017-10-30")

@sched.scheduled_job("cron", id="my_job_id", day="last sun")
def some_decorated_task():
    print("I am printed at 00:00:00 on the last Sunday of every month!")
scheduler組件

scheduler組件提供執行的方式,在不同的運用環境中選擇合適的方式

BlockingScheduler: 進程中只運行調度器時的方式

from apscheduler.schedulers.blocking import BlockingScheduler
import time

scheduler = BlockingScheduler()
 
def job1():
    print "%s: 執行任務"  % time.asctime()

scheduler.add_job(job1, "interval", seconds=3)
scheduler.start()

BackgroundScheduler: 不想使用任何框架時的方式

from apscheduler.schedulers.background import BackgroundScheduler
import time

scheduler = BackgroundScheduler()
 
def job1():
    print "%s: 執行任務"  % time.asctime()

scheduler.add_job(job1, "interval", seconds=3)
scheduler.start()

while True:
    pass

AsyncIOScheduler: asyncio module的方式(Python3)

from apscheduler.schedulers.asyncio import AsyncIOScheduler
try:
    import asyncio
except ImportError:
    import trollius as asyncio
...
...
# while True:pass 
try:
    asyncio.get_event_loop().run_forever()
except (KeyboardInterrupt, SystemExit):
    pass

GeventScheduler: gevent方式

from apscheduler.schedulers.gevent import GeventScheduler

...
...

g = scheduler.start()
# while True:pass
try:
    g.join()
except (KeyboardInterrupt, SystemExit):
    pass

TornadoScheduler: Tornado方式

from tornado.ioloop import IOLoop
from apscheduler.schedulers.tornado import TornadoScheduler

...
...

# while True:pass
try:
    IOLoop.instance().start()
except (KeyboardInterrupt, SystemExit):
    pass

TwistedScheduler: Twisted方式

from twisted.internet import reactor
from apscheduler.schedulers.twisted import TwistedScheduler

...
...

# while True:pass
try:
    reactor.run()
except (KeyboardInterrupt, SystemExit):
    pass

QtScheduler: Qt方式

executors組件

executors組件提供任務的調度方式

base

debug

gevent

pool(max_workers=10)

twisted

jobstore組件

jobstore提供任務的各種持久化方式

base

memory

mongodb
scheduler.add_jobstore("mongodb", collection="example_jobs")

redis
scheduler.add_jobstore("redis", jobs_key="example.jobs", run_times_key="example.run_times")

rethinkdb
scheduler.add_jobstore("rethinkdb", database="apscheduler_example")

sqlalchemy
scheduler.add_jobstore("sqlalchemy", url=url)

zookeeper
scheduler.add_jobstore("zookeeper", path="/example_jobs")

任務操作 添加任務add_job(如上)

如果使用了任務的存儲,開啟時最好添加replace_existing=True,否則每次開啟都會創建任務的副本
開啟后任務不會馬上啟動,可修改trigger參數

刪除任務remove_job
# 根據任務實例刪除
job = scheduler.add_job(myfunc, "interval", minutes=2)
job.remove()

# 根據任務id刪除
scheduler.add_job(myfunc, "interval", minutes=2, id="my_job_id")
scheduler.remove_job("my_job_id")
任務的暫停pause_job和繼續resume_job
job = scheduler.add_job(myfunc, "interval", minutes=2)
# 根據任務實例
job.pause()
job.resume()

# 根據任務id暫停
scheduler.add_job(myfunc, "interval", minutes=2, id="my_job_id")
scheduler.pause_job("my_job_id")
scheduler.resume_job("my_job_id")
任務的修飾modify和重設reschedule_job

修飾:job.modify(max_instances=6, name="Alternate name")
重設:scheduler.reschedule_job("my_job_id", trigger="cron", minute="*/5")

調度器操作

開啟 scheduler.start()

關閉 scheduler.shotdown(wait=True | False)

暫停 scheduler.pause()

繼續 scheduler.resume()

監聽 http://apscheduler.readthedoc...

def my_listener(event):
    if event.exception:
        print("The job crashed :(")
    else:
        print("The job worked :)")

scheduler.add_listener(my_listener, EVENT_JOB_EXECUTED | EVENT_JOB_ERROR)
官方實例
from pytz import utc

from apscheduler.schedulers.background import BackgroundScheduler
from apscheduler.jobstores.mongodb import MongoDBJobStore
from apscheduler.jobstores.sqlalchemy import SQLAlchemyJobStore
from apscheduler.executors.pool import ThreadPoolExecutor, ProcessPoolExecutor


jobstores = {
    "mongo": MongoDBJobStore(),
    "default": SQLAlchemyJobStore(url="sqlite:///jobs.sqlite")
}
executors = {
    "default": ThreadPoolExecutor(20),
    "processpool": ProcessPoolExecutor(5)
}
job_defaults = {
    "coalesce": False,
    "max_instances": 3
}
scheduler = BackgroundScheduler(jobstores=jobstores, executors=executors, job_defaults=job_defaults, timezone=utc)

文章版權歸作者所有,未經允許請勿轉載,若此文章存在違規行為,您可以聯系管理員刪除。

轉載請注明本文地址:http://specialneedsforspecialkids.com/yun/40820.html

相關文章

  • 定時任務框架APScheduler學習詳解

    摘要:安裝利用進行安裝源碼安裝有四種組成部分觸發器包含調度邏輯,每一個作業有它自己的觸發器,用于決定接下來哪一個作業會運行。除了他們自己初始配置意外,觸發器完全是無狀態的。 APScheduler簡介 在平常的工作中幾乎有一半的功能模塊都需要定時任務來推動,例如項目中有一個定時統計程序,定時爬出網站的URL程序,定時檢測釣魚網站的程序等等,都涉及到了關于定時任務的問題,第一時間想到的是利用t...

    sewerganger 評論0 收藏0
  • APScheduler任務調度利器

    摘要:中任務調度一般用中的任務調度工具也有不少等。調度器配置示例方式一方式二三略。移除調用放到,參數為調用實例的方法注意如果任務已經調度完畢,并且之后也不會再被執行的情況下,會被自動移除。可以監聽調度任務執行情況相關的事件。 Java中任務調度一般用Quartz,Python中的任務調度工具也有不少:Celery,RQ,APScheduler等。Celery:非常強大的分布式任務調度框架RQ...

    Flink_China 評論0 收藏0
  • Python Apscheduler源代碼解析(一) 任務調度流程

    摘要:最近公司有項目需要使用到定時任務,其定時邏輯類似于的,就使用了這個類庫。在一次循環結束之前會計算任務下次執行事件與當前時間之差,然后讓調度線程掛起直到那個時間到來。 最近公司有項目需要使用到定時任務,其定時邏輯類似于linux的Cron,就使用了Apscheduler這個類庫。基于公司的業務,需要修改Apshceduler,故而研究了一下Apscheduler的代碼。 Apschedu...

    chavesgu 評論0 收藏0
  • Flask-APScheduler使用教程

    摘要:項目中需要用到定時器和循環執行。運用線程執行輪詢操作,也有運用系統的的文章最多,但是太麻煩。和中間人的消息傳輸支持所有特性,但也提供大量其他實驗性方案的支持,包括用進行本地開發。同時也包含了對任務的控制。后續有需求在繼續。 項目中需要用到定時器和循環執行。去網上搜了一下,比較常見的有一下集中。運用Python線程執行輪詢操作,也有運用Linux系統的Cron,Celery的文章最多,但...

    Noodles 評論0 收藏0
  • django開發-定時任務的使用

    摘要:今天介紹在中使用定時任務的兩種方式。添加并啟動定時任務其它命令顯示當前的定時任務刪除所有定時任務今天的定時任務就說到這里,有錯誤之處,歡迎交流指正 今天介紹在django中使用定時任務的兩種方式。 方式一: APScheduler1)安裝: pip install apscheduler 2)使用: from apscheduler.scheduler import Scheduler...

    wean 評論0 收藏0

發表評論

0條評論

zxhaaa

|高級講師

TA的文章

閱讀更多
最新活動
閱讀需要支付1元查看
<