摘要:今天介紹一下如何在項目中使用搭建一個有兩個節點的任務隊列一個主節點一個子節點主節點發布任務,子節點收到任務并執行。
今天介紹一下如何在django項目中使用celery搭建一個有兩個節點的任務隊列(一個主節點一個子節點;主節點發布任務,子節點收到任務并執行。搭建3個或者以上的節點就類似了),使用到了celery,rabbitmq。這里不會多帶帶介紹celery和rabbitmq中的知識了。
1.項目基礎環境:
兩個ubuntu18.04虛擬機、python3.6.5、django2.0.4、celery3.1.26post2
2.主節點django項目結構:
3.settings.py中關于celery的配置:
</>復制代碼
import djcelery
# 此處的Queue和Exchange都涉及到RabbitMQ中的概念,這里不做介紹
from kombu import Queue, Exchange
djcelery.setup_loader()
BROKER_URL = "amqp://test:test@192.168.43.6:5672/testhost"
CELERY_RESULT_BACKEND = "amqp://test:test@192.168.43.6:5672/testhost"
CELERY_TASK_RESULT_EXPIRES=3600
CELERY_TASK_SERIALIZER="json"
CELERY_RESULT_SERIALIZER="json"
# CELERY_ACCEPT_CONTENT = ["json", "pickle", "msgpack", "yaml"]
CELERY_DEFAULT_EXCHANGE = "train"
CELERY_DEFAULT_EXCHANGE_TYPE = "direct"
CELERY_IMPORTS = ("proj.celery1.tasks", )
CELERY_QUEUES = (
Queue("train", routing_key="train"),
Queue("predict", routing_key="predict"),
)
4.celery.py中的配置:
</>復制代碼
# coding:utf8
from __future__ import absolute_import
import os
from celery import Celery
from django.conf import settings
# set the default Django settings module for the "celery" program.
os.environ.setdefault("DJANGO_SETTINGS_MODULE", "proj.settings")
app = Celery("proj")
# Using a string here means the worker will not have to
# pickle the object when using Windows.
app.config_from_object("django.conf:settings")
# app.autodiscover_tasks(lambda: settings.INSTALLED_APPS)
app.autodiscover_tasks(settings.INSTALLED_APPS)
@app.task(bind=True)
def debug_task(self):
print("Request: {0!r}".format(self.request))
5.proj/init.py中的配置:
</>復制代碼
from __future__ import absolute_import
from .celery import app as celery_app
6.celery1/tasks.py:(主節點中的任務不會執行,只執行子節點中的任務)
</>復制代碼
from __future__ import absolute_import
from celery import task
@task
def do_train(x, y):
return x + y
7.celery1/views.py:
</>復制代碼
from .tasks import do_train
class Test1View(APIView):
def get(self, request):
try:
# 這里的queue和routing_key也涉及到RabiitMQ中的知識
# 關鍵,在這里控制向哪個queue中發送任務,子節點通過這個執行對應queue中的任務
ret = do_train.apply_async(args=[4, 2], queue="train", routing_key="train")
# 獲取結果
data = ret.get()
except Exception as e:
return Response(dict(msg=str(e), code=10001))
return Response(dict(msg="OK", code=10000, data=data))
8.子節點目錄結構:
9.子節點中celery1/celery.py:
</>復制代碼
from __future__ import absolute_import
from celery import Celery
CELERY_IMPORTS = ("celery1.tasks", )
app = Celery("myapp",
# 此處涉及到RabbitMQ的知識,RabbitMQ是對應主節點上的
broker="amqp://test:test@192.168.43.6:5672/testhost",
backend="amqp://test:test@192.168.43.6:5672/testhost",
include=["celery1.tasks"])
app.config_from_object("celery1.config")
if __name__ == "__main__":
app.start()
10.子節點中celery1/config.py:
</>復制代碼
from __future__ import absolute_import
from kombu import Queue,Exchange
from datetime import timedelta
CELERY_TASK_RESULT_EXPIRES=3600
CELERY_TASK_SERIALIZER="json"
CELERY_RESULT_SERIALIZER="json"
CELERY_ACCEPT_CONTENT = ["json","pickle","msgpack","yaml"]
CELERY_DEFAULT_EXCHANGE = "train"
# exchange type可以看RabbitMQ中的相關內容
CELERY_DEFAULT_EXCHANGE_TYPE = "direct"
CELERT_QUEUES = (
Queue("train",exchange="train",routing_key="train"),
)
11.子節點celery1/tasks.py:(這個是要真正執行的task,每個節點可以不同)
</>復制代碼
from __future__ import absolute_import
from celery1.celery import app
import time
from celery import task
@task
def do_train(x, y):
"""
訓練
:param data:
:return:
"""
time.sleep(3)
return dict(data=str(x+y),msg="train")
12.啟動子節點中的celery:
celery1是項目,-Q train表示從train這個queue中接收任務
</>復制代碼
celery -A celery1 worker -l info -Q train
13.啟動主節點中的django項目:
</>復制代碼
python manage.py runserver
14.使用Postman請求對應的view
</>復制代碼
請求url:http://127.0.0.1:8000/api/v1/celery1/test/
返回的結果是:
{
"msg": "OK",
"code": 10000,
"data": {
"data": "6",
"msg": "train"
}
}
15.遇到的問題:
1)celery隊列報錯: AttributeError: ‘str’ object has no attribute ‘items’
解決:將redis庫從3.0回退到了2.10,pip install redis==2.10
解決方法參考鏈接:https://stackoverflow.com/que...
今天就說到這里,如有疑問,歡迎交流。
文章版權歸作者所有,未經允許請勿轉載,若此文章存在違規行為,您可以聯系管理員刪除。
轉載請注明本文地址:http://specialneedsforspecialkids.com/yun/42714.html
摘要:介紹應用舉例是一個基于開發的分布式異步消息任務隊列,通過它可以輕松的實現任務的異步處理,如果你的業務場景中需要用到異步任務,就可以考慮使用你想對臺機器執行一條批量命令,可能會花很長時間,但你不想讓你的程序等著結果返回,? celery 1.celery介紹 1.1 celery應用舉例 Celery 是一個 基于python開發的分布式異步消息任務隊列,通過...
摘要:使用消息來通信,流程為客戶端添加消息到隊列來初始化一個任務,然后消息隊列系統把消息分發給工作進程。可以包含多個工作進程和消息系統,來保證高可用性和進行水平擴展。保存結果可以使用很多例如的,,。 celery是一個簡單的、靈活的、可靠的分布式系統,提供了工具來維護這樣一個系統,用于處理大量的信息(實時信息、定時任務安排),是一個任務隊列,易于使用,易于和其他語言進行配合。 任務隊列 任務...
摘要:文檔中文文檔官方文檔定時服務與結合使用簡介是一個自帶電池的的任務隊列。追蹤任務在不同狀態間的遷移,并檢視返回值。 文檔 中文文檔 官方文檔 celery定時服務、celery與django結合使用 簡介 Celery 是一個自帶電池的的任務隊列。它易于使用,所以你可以無視其所解決問題的復雜程度而輕松入門。它遵照最佳實踐設計,所以你的產品可以擴展,或與其他語言集成,并且它自帶了在生產...
小編寫這篇文章的主要目的,主要是給大家去進行講解Django項目實例情況,包括celery的一些具體使用情況介紹,學習這些的話,對我們的工作和生活幫助還是很大的,但是怎么樣才能夠更快的進行上手呢?下面就一個具體實例給大家進行解答。 1、django應用Celery django框架請求/響應的過程是同步的,框架本身無法實現異步響應。 但是我們在項目過程中會經常會遇到一些耗時的任務,比如:...
閱讀 3721·2023-04-25 17:45
閱讀 3434·2021-09-04 16:40
閱讀 1003·2019-08-30 13:54
閱讀 2131·2019-08-29 12:59
閱讀 1401·2019-08-26 12:11
閱讀 3282·2019-08-23 15:17
閱讀 1524·2019-08-23 12:07
閱讀 3883·2019-08-22 18:00