摘要:我們將任務封裝為消息并將其發送到隊列。為了確保消息永不丟失,支持消息確認。沒有任何消息超時當消費者死亡時,將重新傳遞消息。發生這種情況是因為只在消息進入隊列時調度消息。這告訴一次不要向工作人員發送多個消息。
源碼:https://github.com/ltoddy/rabbitmq-tutorial
工作隊列(using the Pika Python client)
本章節教程重點介紹的內容在第一篇教程中,我們編寫了用于從命名隊列發送和接收消息的程序。在這一個中,我們將創建一個工作隊列,用于在多個工作人員之間分配耗時的任務。
工作隊列(又名:任務隊列)背后的主要思想是避免立即執行資源密集型任務,并且必須等待它完成。相反,我們安排稍后完成任務。我們將任務封裝 為消息并將其發送到隊列。
在后臺運行的工作進程將彈出任務并最終執行作業。當你運行許多工人時,任務將在他們之間共享。
這個概念在Web應用程序中特別有用,因為在短的HTTP請求窗口中無法處理復雜的任務。
在本教程的前一部分中,我們發送了一條包含“Hello World!”的消息。現在我們將發送代表復雜任務的字符串。
我們沒有真實世界的任務,比如要調整大小的圖像或要渲染的PDF文件,所以讓我們假裝我們很忙 - 使用 time.sleep() 函數來偽裝它。
我們將把字符串中的點(".")數作為復雜度; 每一個點都會占用一秒的“工作”。例如,Hello ... 描述的假任務將需要三秒鐘。
我們稍微修改前面例子中的send.py代碼,以允許從命令行發送任意消息。這個程序將把任務安排到我們的工作隊列中,所以讓我們把它命名為new_task.py:
import sys message = " ".join(sys.argv[1:]) or "Hello World" channel.basic_publish(exchange="", routing_key="hello", body=message) print(" [x] Sent %r" % message)
我們的舊版receive.py腳本也需要進行一些更改:它需要為郵件正文中的每個點偽造第二個工作。它會從隊列中彈出消息并執行任務,所以我們稱之為worker.py:
import time def callback(ch, method, properties, body): print(" [x] Received %r" % body) time.sleep(body.count(b".")) print(" [x] Done")循環調度
使用任務隊列的優點之一是可以輕松地平行工作。如果我們正在積累積壓的工作,我們可以增加更多的工作人員,并且這種方式很容易擴展。
首先,我們試著同時運行兩個worker.py腳本。他們都會從隊列中獲取消息,但具體到底是什么?讓我們來看看。
您需要打開三個控制臺。兩個將運行worker.py腳本。這些控制臺將成為我們的兩個消費者 - C1和C2。
默認情況下,RabbitMQ將按順序將每條消息發送給下一個使用者。平均而言,每個消費者將獲得相同數量的消息。這種分配消息的方式稱為循環法。請嘗試與三名或更多的工人。
消息確認做任務可能需要幾秒鐘的時間。你可能想知道如果其中一個消費者開始一項長期任務并且只是部分完成而死亡會發生什么。
用我們目前的代碼,一旦RabbitMQ將消息傳遞給客戶,它立即將其標記為刪除。在這種情況下,如果你殺了一個工人,我們將失去剛剛處理的信息。
我們也會失去所有派發給這個特定工作人員但尚未處理的消息。
但我們不想失去任何任務。如果一名工人死亡,我們希望將任務交付給另一名工人。
為了確保消息永不丟失,RabbitMQ支持消息確認。消費者發回ack(請求)告訴RabbitMQ已經收到,處理了特定的消息,并且RabbitMQ可以自由刪除它。
如果消費者死亡(其通道關閉,連接關閉或TCP連接丟失),RabbitMQ將理解消息未被完全處理,并將重新排隊。如果有其他消費者同時在線,它會迅速將其重新發送給另一位消費者。
這樣,即使工作人員偶爾死亡,也可以確保沒有任何信息丟失。
沒有任何消息超時; 當消費者死亡時,RabbitMQ將重新傳遞消息。即使處理消息需要非常很長的時間也沒關系。
消息確認默認是被打開的。在前面的例子中,我們通過 no_ack = True 標志明確地將它們關閉。一旦我們完成了一項任務,現在是時候清除這個標志并且發送工人的正確確認。
def callback(ch, method, properties, body): print(" [x] Received %r" % body) time.sleep(body.count(b".")) print(" [x] Done") ch.basic_ack(delivery_tag = method.delivery_tag) channel.basic_consume(callback, queue="hello")
使用這段代碼,我們可以確定,即使在處理消息時使用CTRL + C來殺死一個工作者,也不會丟失任何東西。工人死后不久,所有未確認的消息將被重新發送。
消息持久性我們已經學會了如何確保即使消費者死亡,任務也不會丟失。但是如果RabbitMQ服務器停止,我們的任務仍然會丟失。
當RabbitMQ退出或崩潰時,它會忘記隊列和消息,除非您告訴它不要。需要做兩件事來確保消息不會丟失:我們需要將隊列和消息標記為持久。
首先,我們需要確保RabbitMQ永遠不會失去我們的隊列。為了做到這一點,我們需要宣布它是持久的:
channel.queue_declare(queue="hello", durable=True)
雖然這個命令本身是正確的,但它在我們的設置中不起作用。那是因為我們已經定義了一個名為hello的隊列 ,這個隊列并不"耐用"。
RabbitMQ不允許您使用不同的參數重新定義現有的隊列,并會向任何試圖執行該操作的程序返回錯誤。
但是有一個快速的解決方法 - 讓我們聲明一個具有不同名稱的隊列,例如task_queue:
channel.queue_declare(queue="task_queue", durable=True)
此queue_declare更改需要應用于生產者和消費者代碼。
此時我們確信,即使RabbitMQ重新啟動,task_queue隊列也不會丟失。現在我們需要將消息標記為持久 - 通過提供值為2的delivery_mode屬性。
channel.basic_publish(exchange="", routing_key="task_queue", body=message, properties=pika.BasicProperties( delivery_mode=2, # 確保消息是持久的 ))公平派遣
您可能已經注意到調度仍然無法完全按照我們的要求工作。例如,在有兩名工人的情況下,當所有奇怪的信息都很重,甚至信息很少時,一名工作人員會一直很忙,
另一名工作人員幾乎不會做任何工作。那么,RabbitMQ不知道任何有關這一點,并仍將均勻地發送消息。
發生這種情況是因為RabbitMQ只在消息進入隊列時調度消息。它沒有考慮消費者未確認消息的數量。它只是盲目地將第n條消息分發給第n位消費者。
為了解決這個問題,我們可以使用basic.qos方法和設置prefetch_count = 1。這告訴RabbitMQ一次不要向工作人員發送多個消息。
或者換句話說,不要向工作人員發送新消息,直到它處理并確認了前一個消息。相反,它會將其分派給不是仍然忙碌的下一個工作人員。
channel.basic_qos(prefetch_count=1)把它放在一起
我們的new_task.py腳本的最終代碼:
#!/usr/bin/env python import sys import pika connection = pika.BlockingConnection(pika.ConnectionParameters(host="localhost")) channel = connection.channel() channel.queue_declare(queue="task_queue", durable=True) message = " ".join(sys.argv[1:]) or "Hello World" channel.basic_publish(exchange="", routing_key="task_queue", body=message, properties=pika.BasicProperties( delivery_mode=2, # 確保消息是持久的 )) print(" [x] Sent %r" % message) connection.close()
而我們的工人 worker.py:
#!/usr/bin/env python import time import pika connection = pika.BlockingConnection(pika.ConnectionParameters(host="localhost")) channel = connection.channel() channel.queue_declare(queue="task_queue", durable=True) def callback(ch, method, properties, body): print(" [x] Received %r" % body) time.sleep(body.count(b".")) print(" [x] Done") ch.basic_ack(delivery_tag=method.delivery_tag) channel.basic_consume(callback, queue="hello") channel.basic_qos(prefetch_count=1) print(" [*] Waiting for messages. To exit press CTRL+C") channel.start_consuming()
使用消息確認和prefetch_count,您可以設置一個工作隊列。即使RabbitMQ重新啟動,持久性選項也可讓任務繼續存在。
文章版權歸作者所有,未經允許請勿轉載,若此文章存在違規行為,您可以聯系管理員刪除。
轉載請注明本文地址:http://specialneedsforspecialkids.com/yun/44704.html
摘要:交易所在本教程的前幾部分中,我們發送消息并從隊列中接收消息。消費者是接收消息的用戶的應用程序。中的消息傳遞模型的核心思想是生產者永遠不會將任何消息直接發送到隊列中。交換和隊列之間的關系稱為綁定。 源碼:https://github.com/ltoddy/rabbitmq-tutorial 發布 / 訂閱 (using the Pika Python client) 本章節教程重點介紹的...
摘要:每當我們收到一條消息,這個回調函數就被皮卡庫調用。接下來,我們需要告訴這個特定的回調函數應該從我們的隊列接收消息為了讓這個命令成功,我們必須確保我們想要訂閱的隊列存在。生產者計劃將在每次運行后停止歡呼我們能夠通過發送我們的第一條消息。 源碼:https://github.com/ltoddy/rabbitmq-tutorial 介紹 RabbitMQ是一個消息代理:它接受和轉發消息。你...
摘要:通常用于命名回調隊列。對每個響應執行的回調函數做了一個非常簡單的工作,對于每個響應消息它檢查是否是我們正在尋找的。在這個方法中,首先我們生成一個唯一的數并保存回調函數將使用這個值來捕獲適當的響應。 源碼:https://github.com/ltoddy/rabbitmq-tutorial 遠程過程調用(RPC) (using the Pika Python client) 本章節教程...
摘要:為了避免與參數混淆,我們將其稱為綁定鍵。直接交換我們之前教程的日志記錄系統將所有消息廣播給所有消費者。在這種設置中,使用路由鍵發布到交換機的消息將被路由到隊列。所有其他消息將被丟棄。 源碼:https://github.com/ltoddy/rabbitmq-tutorial 路由 本章節教程重點介紹的內容 在之前的教程中,我們構建了一個簡單的日志系統 我們能夠將日志消息廣播給許多接收...
摘要:單詞可以是任何東西,但通常它們指定了與該消息相關的一些功能。消息將使用由三個字兩個點組成的路由鍵發送。另一方面,只會進入第一個隊列,而只會進入第二個隊列。不匹配任何綁定,因此將被丟棄。代碼幾乎與前一個教程中的代碼相同。 源碼:https://github.com/ltoddy/rabbitmq-tutorial Topics (using the Pika Python client)...
閱讀 1234·2021-09-26 09:46
閱讀 1588·2021-09-06 15:00
閱讀 717·2019-08-30 15:52
閱讀 1121·2019-08-29 13:10
閱讀 1282·2019-08-26 13:47
閱讀 1482·2019-08-26 13:35
閱讀 2031·2019-08-23 18:38
閱讀 727·2019-08-23 17:59