摘要:目的曾經想向前臺實時返回任務的狀態監控,也查看了很多博客,但是好多也沒能如愿,因此基于網上已有的博客已經自己的嘗試,寫了一個小的,實現前臺實時獲取后臺傳輸的任務狀態。實現仿照其他例子實現了一個簡單的后臺任務監控。
1. 目的
曾經想向前臺實時返回Celery任務的狀態監控,也查看了很多博客,但是好多也沒能如愿,因此基于網上已有的博客已經自己的嘗試,寫了一個小的demo,實現前臺實時獲取后臺傳輸的任務狀態。
2. 準備
本篇文章使用的是Flask框架,安裝celery,celery采用redis作為存儲。同時用到了Flask-SocketIO建立websocket。同時還用到了協程庫eventlet(這個是Flask-SocketIO文檔建議的,鏈接文檔)。
3. 實現
demo仿照其他例子實現了一個簡單的后臺任務監控。我們直接上代碼吧,下面是server端代碼:
# -*- utf-8 -*- # app.py import time import uuid from flask import Flask, render_template, request, make_response, jsonify from flask_socketio import SocketIO from celery import Celery import eventlet from flask_redis import FlaskRedis eventlet.monkey_patch() app = Flask(__name__) app.config["BROKER_URL"] = "redis://localhost:6379/0" app.config["CELERY_RESULT_BACKEND"] = "redis://localhost:6379/0" app.config["CELERY_ACCEPT_CONTENT"] = ["json", "pickle"] app.config["REDIS_URL"] = "redis://localhost:6379/0" socketio = SocketIO(app, async_mode="eventlet",message_queue=app.config["CELERY_RESULT_BACKEND"]) redis = FlaskRedis(app) celery = Celery(app.name) celery.conf.update(app.config) #模擬后臺耗時任務 @celery.task def background_task(uid): sid = redis.get(uid) socketio.emit("info", {"data": "Task starting ...", "time": time.time() * 1000 },room=sid, namespace="/task") socketio.sleep(4) socketio.emit("info", {"data": "Task running!", "time": time.time() * 1000 }, room=sid, namespace="/task") socketio.sleep(5) socketio.emit("info", {"data": "Task complete!", "time": time.time()*1000 }, room=sid, namespace="/task") #建立鏈接時把sid傳到瀏覽器端保存。 @socketio.on("connect", namespace="/task") def connect_host(): sid = request.sid socketio.emit("hostadd", {"sid": sid}, room=sid, namespace="/task") #將每一個客戶端生成一個uuid存放在cookie中 @app.route("/") def index(): if not request.cookies.get("host_uid", None): uid = uuid.uuid1().get_hex() response = make_response(render_template("index.html")) response.set_cookie("host_uid", uid) return response return render_template("index.html") @app.route("/task") def start_background_task(): uid = request.cookies.get("host_uid") background_task.delay(uid) return "Started" #設置sid建立鏈接后瀏覽器將sid傳送到server,并將uid與sid映射存放在redis里面,默認保留12小時 @app.route("/setsid", methods=["POST"]) def set_uid(): data = request.json uid = request.cookies.get("host_uid") redis.set(uid, data["sid"]) redis.expire(uid, 3600 * 12) return jsonify({"success": True}) if __name__ == "__main__": socketio.run(app, host="0.0.0.0", port=5000, debug=True)
如果不想使用debug模式的話,可以用gunicorn運行,命令如下所示:
gunicorn --worker-class eventlet -w 1 app:app
使用上述命令需要注意,由于gunicorn負載均衡算法的限制,文檔建議worker數量為1,我測試過大于1,確實會出問題。
前端代碼如下,index.html:
test Logging
GitHub地址:https://github.com/junfenggoo...
文章版權歸作者所有,未經允許請勿轉載,若此文章存在違規行為,您可以聯系管理員刪除。
轉載請注明本文地址:http://specialneedsforspecialkids.com/yun/42274.html
摘要:架構消息代理,作為臨時儲存任務的中間媒介,為提供了隊列服務。生產者將任務發送到,消費者再從獲取任務。如果使用,則有可能發生突然斷電之類的問題造成突然終止后的數據丟失等后果。任務調度器,負責調度并觸發定時周期任務。 架構 showImg(https://segmentfault.com/img/bVbmDXa?w=831&h=413); Broker 消息代理,作為臨時儲存任務的中間媒...
摘要:是分布式任務隊列,能實時處理任務,同時支持官方文檔工作原理如下發送給從中消費消息,并將結果存儲在中本文中使用的是,使用的是現在有兩個,分別是加法運算和乘法運算。假定乘法運算的事件優先級高事件也很多,對于加法運算,要求每分鐘最多處理個事件。 Celery是分布式任務隊列,能實時處理任務, 同時支持task scheduling. 官方文檔Celery工作原理如下: celery cli...
摘要:使用異步框架,例如等等,裝飾異步任務。它是一個專注于實時處理的任務隊列,同時也支持任務調度。不存儲任務狀態。標識要使用的默認序列化方法的字符串。指定該任務的結果存儲后端用于此任務。 概述: ????????我們考慮一個場景,公司有一個需求,現在需要做一套web系統,而這套系統某些功能需要使用...
摘要:基于網,分享項目的組網架構和部署。項目組網架構架構說明流項目訪問分為兩個流,通過分兩個端口暴露給外部使用數據流用戶訪問網站。通過進行配置,使用作為異步隊列來存儲任務,并將處理結果存儲在中。 基于Raindrop網,分享項目的組網架構和部署。 項目組網架構 showImg(https://cloud.githubusercontent.com/assets/7239657/1015704...
閱讀 1173·2021-09-27 13:34
閱讀 981·2021-09-13 10:25
閱讀 510·2019-08-30 15:52
閱讀 3449·2019-08-30 13:48
閱讀 647·2019-08-30 11:07
閱讀 2167·2019-08-29 16:23
閱讀 1992·2019-08-29 13:51
閱讀 2327·2019-08-26 17:42