摘要:每當我們收到一條消息,這個回調函數就被皮卡庫調用。接下來,我們需要告訴這個特定的回調函數應該從我們的隊列接收消息為了讓這個命令成功,我們必須確保我們想要訂閱的隊列存在。生產者計劃將在每次運行后停止歡呼我們能夠通過發送我們的第一條消息。
源碼:https://github.com/ltoddy/rabbitmq-tutorial
介紹RabbitMQ是一個消息代理:它接受和轉發消息。你可以把它想象成一個郵局:當你把你想要發布的郵件放在郵箱中時,你可以確定郵差先生最終將郵件發送給你的收件人。在這個比喻中,RabbitMQ是郵政信箱,郵局和郵遞員。
RabbitMQ和郵局的主要區別在于它不處理紙張,而是接受,存儲和轉發二進制數據塊 -- 消息。
請注意,生產者,消費者和消息代理不必駐留在同一主機上; 實際上在大多數應用程序中它們不是同一主機上。
Hello World!(using the Pika Python client)
pip3 install pika
在本教程的這一部分,我們將使用Python編寫兩個小程序; 發送單個消息的生產者(發送者),以及接收消息并將其打印出來的消費者(接收者)。這是一個消息傳遞的“Hello World”。
在下圖中,“P”是我們的生產者,“C”是我們的消費者。中間的盒子是一個隊列 - RabbitMQ代表消費者保存的消息緩沖區。
我們的整體設計將如下所示:
生產者將消息發送到“hello”隊列,消費者接收來自該隊列的消息。發送
我們的第一個程序 send.py 會向隊列發送一條消息。我們需要做的第一件事是與RabbitMQ服務器建立連接。
#!/usr/bin/env python import pika connection = pika.BlockingConnection(pika.ConnectionParameters("localhost")) channel = connection.channel()
我們現在連接到本地上的的代理 - 因此是 "localhost"。如果我們想連接到另一臺機器上的代理,我們只需在此指定其名稱或IP地址。
接下來,在發送之前,我們需要確保收件人隊列存在。如果我們發送消息到不存在的位置,RabbitMQ將只刪除該消息。我們來創建一個將傳遞消息的 hello 隊列:
channel.queue_declare(queue="hello")
此時我們準備發送消息。我們的第一條消息將只包含一個字符串 "Hello World!"我們想把它發送給我們的 hello 隊列。
在RabbitMQ中,消息永遠不會直接發送到隊列,它總是需要經過交換。我們現在需要知道的是如何使用由空字符串標識的默認交換。這種交換是特殊的 - 它允許我們準確地指定消息應該到達哪個隊列。隊列名稱需要在routing_key參數中指定:
channel.basic_publish(exchange="", routing_key="hello", body="Hello World!") print(" [x] Sent "Hello World!"")
在退出程序之前,我們需要確保網絡緩沖區被刷新,并且我們的消息被實際傳送到RabbitMQ。我們可以通過輕輕關閉連接來完成。
connection.close()接收
我們的第二個程序 receive.py 將接收隊列中的消息并將它們打印在屏幕上。
再次,我們首先需要連接到RabbitMQ服務器。負責連接到Rabbit的代碼與以前相同。
下一步,就像以前一樣,要確保隊列存在。使用queue_declare創建一個隊列是冪等的 - 我們可以根據需要多次運行該命令,并且只會創建一個。
channel.queue_declare()
您可能會問為什么我們再次聲明隊列 - 我們已經在之前的代碼中聲明了它。如果我們確信隊列已經存在,我們可以避免這種情況。例如,如果 send.py 程序之前運行過。但我們還不確定首先運行哪個程序。在這種情況下,重復在兩個程序中重復聲明隊列是一種很好的做法。
列出隊列 您可能希望看到RabbitMQ有什么隊列以及它們中有多少條消息。您可以使用rabbitmqctl工具(作為特權用戶)執行此操作: > sudo rabbitmqctl list_queues 在Windows上,省略sudo: > rabbitmqctl.bat list_queues
從隊列接收消息更為復雜。它通過向隊列訂閱 回調函數 來工作。每當我們收到一條消息,這個回調函數就被皮卡庫調用。在我們的例子中,這個函數會在屏幕上打印消息的內容。
def callback(ch, method, propertites, body): print(" [x] Received {}".format(body))
接下來,我們需要告訴RabbitMQ這個特定的回調函數應該從我們的hello隊列接收消息:
channel.basic_consume(callable, queue="hello", no_ack=True)
為了讓這個命令成功,我們必須確保我們想要訂閱的隊列存在。幸運的是,我們對此有信心 - 我們已經使用queue_declare創建了一個隊列。
NO_ACK參數,后面(幾篇之后)會有解釋。
最后,我們進入一個永無止境的循環,等待數據并在必要時運行回調。
print(" [*] Waiting for messages. To exit press CTRL+C") channel.start_consuming()把它放在一起
send.py的完整代碼:
#!/usr/bin/env python import pika connection = pika.BlockingConnection(pika.ConnectionParameters(host="localhost")) channel = connection.channel() channel.queue_declare(queue="hello") channel.basic_publish(exchange="", routing_key="hello", body="Hello World!") print(" [x] Sent "Hello World!"") connection.close()
receive.py的完整代碼:
#!/usr/bin/env python import pika connection = pika.BlockingConnection(pika.ConnectionParameters("localhost")) channel = connection.channel() channel.queue_declare(queue="hello") def callback(ch, method, propertites, body): print(" [x] Received {}".format(body)) channel.basic_consume(callable, queue="hello", no_ack=True) print(" [*] Waiting for messages. To exit press CTRL+C") channel.start_consuming()
現在我們可以在終端上試用我們的程序。首先,讓我們開始一個消費者,它將持續運行等待交付:
python receive.py # => [*] Waiting for messages. To exit press CTRL+C # => [x] Received "Hello World!"
現在開始制作。生產者計劃將在每次運行后停止:
python send.py # => [x] Sent "Hello World!"
歡呼!我們能夠通過RabbitMQ發送我們的第一條消息。正如您可能已經注意到的,receive.py 程序不會退出。它會隨時準備接收更多消息,并可能會被Ctrl-C中斷。
嘗試在新終端中再次運行 send.py。
文章版權歸作者所有,未經允許請勿轉載,若此文章存在違規行為,您可以聯系管理員刪除。
轉載請注明本文地址:http://specialneedsforspecialkids.com/yun/44690.html
摘要:我們將任務封裝為消息并將其發送到隊列。為了確保消息永不丟失,支持消息確認。沒有任何消息超時當消費者死亡時,將重新傳遞消息。發生這種情況是因為只在消息進入隊列時調度消息。這告訴一次不要向工作人員發送多個消息。 源碼:https://github.com/ltoddy/rabbitmq-tutorial 工作隊列 showImg(https://segmentfault.com/img/r...
摘要:交易所在本教程的前幾部分中,我們發送消息并從隊列中接收消息。消費者是接收消息的用戶的應用程序。中的消息傳遞模型的核心思想是生產者永遠不會將任何消息直接發送到隊列中。交換和隊列之間的關系稱為綁定。 源碼:https://github.com/ltoddy/rabbitmq-tutorial 發布 / 訂閱 (using the Pika Python client) 本章節教程重點介紹的...
摘要:為了避免與參數混淆,我們將其稱為綁定鍵。直接交換我們之前教程的日志記錄系統將所有消息廣播給所有消費者。在這種設置中,使用路由鍵發布到交換機的消息將被路由到隊列。所有其他消息將被丟棄。 源碼:https://github.com/ltoddy/rabbitmq-tutorial 路由 本章節教程重點介紹的內容 在之前的教程中,我們構建了一個簡單的日志系統 我們能夠將日志消息廣播給許多接收...
摘要:單詞可以是任何東西,但通常它們指定了與該消息相關的一些功能。消息將使用由三個字兩個點組成的路由鍵發送。另一方面,只會進入第一個隊列,而只會進入第二個隊列。不匹配任何綁定,因此將被丟棄。代碼幾乎與前一個教程中的代碼相同。 源碼:https://github.com/ltoddy/rabbitmq-tutorial Topics (using the Pika Python client)...
摘要:通常用于命名回調隊列。對每個響應執行的回調函數做了一個非常簡單的工作,對于每個響應消息它檢查是否是我們正在尋找的。在這個方法中,首先我們生成一個唯一的數并保存回調函數將使用這個值來捕獲適當的響應。 源碼:https://github.com/ltoddy/rabbitmq-tutorial 遠程過程調用(RPC) (using the Pika Python client) 本章節教程...
閱讀 688·2021-11-18 10:07
閱讀 2878·2021-09-22 16:04
閱讀 873·2021-08-16 10:50
閱讀 3326·2019-08-30 15:56
閱讀 1784·2019-08-29 13:22
閱讀 2647·2019-08-26 17:15
閱讀 1229·2019-08-26 10:57
閱讀 1103·2019-08-23 15:23