摘要:項目中需要用到定時器和循環執行。運用線程執行輪詢操作,也有運用系統的的文章最多,但是太麻煩。和中間人的消息傳輸支持所有特性,但也提供大量其他實驗性方案的支持,包括用進行本地開發。同時也包含了對任務的控制。后續有需求在繼續。
項目中需要用到定時器和循環執行。去網上搜了一下,比較常見的有一下集中。運用Python線程執行輪詢操作,也有運用Linux系統的Cron,Celery的文章最多,但是太麻煩。看看就知道,Celery 需要一個發送和接受消息的傳輸者。RabbitMQ 和 Redis 中間人的消息傳輸支持所有特性,但也提供大量其他實驗性方案的支持,包括用 SQLite 進行本地開發。需要用到隊列,對于這點需求簡直就是大材小用。最后找到了比較合適的Flask-APScheduler。
介紹看看 github的flask-apscheduler介紹。
Loads scheduler configuration from Flask configuration.(支持從Flask中加載調度)
Loads job definitions from Flask configuration.(支持從Flask中加載任務配置)
Allows to specify the hostname which the scheduler will run on.(允許指定服務器運行任務)
Provides a REST API to manage the scheduled jobs.(提供Rest接口管理任務)
Provides authentication for the REST API.(提供Rest接口認證)
安裝及配置pip install Flask-APScheduler
在Flask配置文件中添加
SCHEDULER_API_ENABLED = True JOBS = [ { "id": "job_1h_data", "func": job_1h_data, "args": "", "trigger": { "type": "cron", "day_of_week": "0-6", "hour": "*", "minute": "1", "second": "0" } }, { "id": "job_announce", "func": exchange_an, "args": "", "trigger": "interval", "seconds": 300 } ]
上面指定了每一小時獲取所有貨幣24h最高位以及交易所公告。
獲取公告def exchange_an(): """ :param start_date: 開始時間 YYYY-MM-DD HH:MM:SS :param end_date: 結束時間 YYYY-MM-DD HH:MM:SS :return: 推送消息,保持數據庫 """ current_local = time.time() start_date = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime(current_local - 300)) end_date = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime(current_local)) announce = pro.query("exchange_ann", start_date=start_date, end_date=end_date) print("請求交易所公告...") for x in announce.values: s = { "title": x[0], "content": x[1], "type": x[2], "url": x[3], "datetime": x[4] } value = json.dumps(s) print(value) mqttClient.publish("system/ex_announce", value)動態添加任務
# coding:utf-8 from apscheduler.schedulers.blocking import BlockingScheduler import datetime def aps_test(x): print datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S"), x scheduler = BlockingScheduler() scheduler.add_job(func=aps_test, args=("定時任務",), trigger="cron", second="*/5") scheduler.add_job(func=aps_test, args=("一次性任務",), next_run_time=datetime.datetime.now() + datetime.timedelta(seconds=12)) scheduler.add_job(func=aps_test, args=("循環任務",), trigger="interval", seconds=3, id="interval_task") scheduler.start() """ 暫停任務 """ scheduler.pause_job("interval_task") """ 恢復任務 """ scheduler.resume_job("interval_task") """ 刪除任務 """ scheduler.remove_job("interval_task")
apscheduler支持添加三種方式的任務,分別是定時任務,一次性任務及循環任務。同時也包含了對任務的控制。
總結因為是單機版本,所以指定服務器運行任務,Rest接口管理任務,Rest接口認證就不寫了。后續有需求在繼續。
歡迎長按下圖 -> 識別圖中二維碼或者微信掃一掃關注我的公眾號
文章版權歸作者所有,未經允許請勿轉載,若此文章存在違規行為,您可以聯系管理員刪除。
轉載請注明本文地址:http://specialneedsforspecialkids.com/yun/42439.html
摘要:日期觸發一次性指定日期作業的運行日期或時間指定時區運行一次運行一次間隔調度間隔幾周間隔幾天間隔幾小時間隔幾分鐘間隔多少秒開始日期結束日期時區每兩個小時調一下觸發年,位數字月范圍日范圍周范圍周內第幾天或者星期幾范圍或者時范圍 Flask Schedule Flask-APScheduler a Flask extension supported for the APScheduler w...
摘要:貢獻者飛龍版本最近總是有人問我,把這些資料看完一遍要用多長時間,如果你一本書一本書看的話,的確要用很長時間。為了方便大家,我就把每本書的章節拆開,再按照知識點合并,手動整理了這個知識樹。 Special Sponsors showImg(https://segmentfault.com/img/remote/1460000018907426?w=1760&h=200); 貢獻者:飛龍版...
摘要:一步一步學習一直都有發布他開發的教程。在上有他免費的教程,并且宣稱是世上最深入的系列。基礎在上有個非常的視頻教程。的官網教程非常值得你從頭讀到尾。使用框架這是我們最后一個教程的介紹。不過在和已經有為你準備了不錯的免費課程哈 一步一步學習Vue 2 (Laracasts) Jeffrey Way一直都有發布他web開發的教程。他曾經在30天內教會了我使用jquery。在Laracast...
摘要:層疊樣式表二修訂版這是對作出的官方說明。速查表兩份表來自一份關于基礎特性,一份關于布局。核心第一篇一份來自的基礎參考指南簡寫速查表簡寫形式參考書使用層疊樣式表基礎指南,包含使用的好處介紹個方法快速寫成高質量的寫出高效的一些提示。 迄今為止,我已經收集了100多個精通CSS的資源,它們能讓你更好地掌握CSS技巧,使你的布局設計脫穎而出。 CSS3 資源 20個學習CSS3的有用資源 C...
摘要:下表整理了目前的版本與版本的兼容關系還未所以,不論您是在讀我的基礎教程基礎教程還是正在連載的系列教程。 這篇博文是臨時增加出來的內容,主要是由于最近連載《Spring Cloud Alibaba基礎教程》系列的時候,碰到讀者咨詢的大量問題中存在一個比較普遍的問題:版本的選擇。其實這類問題,在之前寫Spring Cloud基礎教程的時候,就已經發過一篇《聊聊Spring Cloud版本的...
閱讀 1050·2021-11-22 15:35
閱讀 1685·2021-10-26 09:49
閱讀 3230·2021-09-02 15:11
閱讀 2075·2019-08-30 15:53
閱讀 2636·2019-08-30 15:53
閱讀 2917·2019-08-30 14:11
閱讀 3527·2019-08-30 12:59
閱讀 3241·2019-08-30 12:53