摘要:交易所在本教程的前幾部分中,我們發送消息并從隊列中接收消息。消費者是接收消息的用戶的應用程序。中的消息傳遞模型的核心思想是生產者永遠不會將任何消息直接發送到隊列中。交換和隊列之間的關系稱為綁定。
源碼:https://github.com/ltoddy/rabbitmq-tutorial
發布 / 訂閱(using the Pika Python client)
本章節教程重點介紹的內容在上一篇教程中,我們創建了工作隊列。工作隊列背后的假設是每個任務只能傳遞給一個工作人員。
在這一部分,我們將做一些完全不同的事情 - 我們會向多個消費者傳遞信息。這種模式被稱為“發布/訂閱”。
為了說明這種模式,我們將建立一個簡單的日志系統。它將包含兩個程序 - 第一個將發送日志消息,第二個將接收并打印它們。
在我們的日志系統中,接收程序的每個運行副本都會收到消息。這樣我們就可以運行一個接收器并將日志指向磁盤; 同時我們將能夠運行另一個接收器并在屏幕上查看日志。
一般來說,發布的日志消息將以廣播的形式發給所有的接收者。
交易所在本教程的前幾部分中,我們發送消息并從隊列中接收消息?,F在是時候在rabbitmq中引入完整的消息傳遞模型。
讓我們快速回顧一下前面教程中的內容:
生產者是發送消息的用戶的應用程序。
隊列是存儲消息的緩沖器。
消費者是接收消息的用戶的應用程序。
RabbitMQ中的消息傳遞模型的核心思想是生產者永遠不會將任何消息直接發送到隊列中。實際上,生產者通常甚至不知道郵件是否會被傳送到任何隊列中。
相反,生產者只能發送消息給交易所。交換是一件非常簡單的事情。一方面它接收來自生產者的消息,另一方則推動他們排隊。
交易所必須知道如何處理收到的消息。是否應該附加到特定隊列?它應該附加到許多隊列中嗎?或者它應該被丟棄。這些規則由交換類型定義 (exchange type)。
有幾種可用的交換類型: direct, topic, header 和 fanout。我們將關注最后一個 - fanout。讓我們創建該類型的交換,并將其稱為logs:
channel.exchange_declare(exchange="logs", exchange_type="fanout")
fanout交換非常簡單。正如你可能從名字中猜出的那樣,它只是將收到的所有消息廣播到它所知道的所有隊列中。這正是我們logger所需要的。
現在,我們可以發布到我們的指定交易所:
channel.basic_publish(exchange="logs", routing_key="", body=message)臨時隊列
正如你以前可能記得我們正在使用具有指定名稱的隊列(還記得hello和task_queue嗎?)。能夠命名隊列對我們至關重要 - 我們需要將工作人員指向同一隊列。
當你想在生產者和消費者之間分享隊列時,給隊列一個名字是很重要的。
但是,我們的記錄器并非如此。我們希望聽到所有日志消息,而不僅僅是其中的一部分。我們也只對目前流動的消息感興趣,而不是舊消息。要解決這個問題,我們需要做兩件事。
首先,每當我們連接到rabbitmq,我們需要一個新的,空的隊列。要做到這一點,我們可以創建一個隨機名稱的隊列,或者甚至更好 - 讓服務器為我們選擇一個隨機隊列名稱。
我們可以通過不將隊列參數提供給queue_declare來實現這一點:
result = channel.queue_declare()
此時,result.method.queue包含一個隨機隊列名稱。例如,它可能看起來像amq.gen-i94oCE_tj3LyWsy-94KXHg。
其次,一旦消費者連接關閉,隊列應該被刪除。這是一個專有標志:
result = channel.queue_declare(exclusive=True)綁定
我們已經創建了一個fanout交換和一個隊列?,F在我們需要告訴交換所將消息發送到我們的隊列。交換和隊列之間的關系稱為綁定。
channel.queue_bind(exchange="logs", queue=result.method.queue)
從現在起,logs 交易所會將消息附加到我們的隊列中。
把它放在一起發出日志消息的生產者程序與之前的教程沒有多大區別。最重要的變化是我們現在想發布消息到我們的logs交易所,而不是無名字的消息。發送時我們需要提供一個routing_key,但是對于fanout交換,它的值將被忽略。這里是emit_log.py腳本的代碼 :
#!/usr/bin/env python import sys import pika connection = pika.BlockingConnection( pika.ConnectionParameters(host="localhost")) channel = connection.channel() channel.exchange_declare(exchange="logs", exchange_type="fanout") message = " ".join(sys.argv[1:]) or "info: Hello world!" channel.basic_publish(exchange="logs", routing_key="", body=message) print(" [x] Sent %r" % message) connection.close()
如你所見,建立連接后,我們宣布交易所。這一步是必要的,因為發布到不存在的交易所是被禁止的。
如果沒有隊列綁定到交換機上,這些消息將會丟失,但這對我們來說沒問題; 如果沒有消費者正在收聽,我們可以放心地丟棄消息。
receive_logs.py的代碼:
#!/usr/bin/env python import pika connection = pika.BlockingConnection(pika.ConnectionParameters(host="localhost")) channel = connection.channel() channel.exchange_declare(exchange="logs", exchange_type="fanout") result = channel.queue_declare(exclusive=True) queue_name = result.method.queue channel.queue_bind(exchange="logs", queue=queue_name) print(" [*] Waiting for logs. To exit press CTRL+C") def callback(ch, method, properties, body): print(" [x] %r" % body) channel.basic_consume(callback, queue=queue_name, no_ack=True) channel.start_consuming()
我們完成了。如果您想將日志保存到文件中,只需打開一個控制臺并輸入:
python receive_logs.py > logs_from_rabbit.log
如果你想在屏幕上看到日志,打開一個新的終端并運行:
python receive_logs.py
當然,
python emit_log.py
使用rabbitmqctl list_bindings,你可以驗證代碼是否真正創建了綁定和隊列。當有兩個receive_logs.py程序正在運行,你應該看到如下所示:
root@921edcb46341:/# rabbitmqctl list_bindings Listing bindings for vhost /... exchange amq.gen-6YXn7BycIwtI7kFuUrTbaA queue amq.gen-6YXn7BycIwtI7kFuUrTbaA [] exchange amq.gen-JhFL-rbMAoricMu5Dyo-hA queue amq.gen-JhFL-rbMAoricMu5Dyo-hA [] logs exchange amq.gen-6YXn7BycIwtI7kFuUrTbaA queue amq.gen-6YXn7BycIwtI7kFuUrTbaA [] logs exchange amq.gen-JhFL-rbMAoricMu5Dyo-hA queue amq.gen-JhFL-rbMAoricMu5Dyo-hA []
文章版權歸作者所有,未經允許請勿轉載,若此文章存在違規行為,您可以聯系管理員刪除。
轉載請注明本文地址:http://specialneedsforspecialkids.com/yun/41446.html
摘要:每當我們收到一條消息,這個回調函數就被皮卡庫調用。接下來,我們需要告訴這個特定的回調函數應該從我們的隊列接收消息為了讓這個命令成功,我們必須確保我們想要訂閱的隊列存在。生產者計劃將在每次運行后停止歡呼我們能夠通過發送我們的第一條消息。 源碼:https://github.com/ltoddy/rabbitmq-tutorial 介紹 RabbitMQ是一個消息代理:它接受和轉發消息。你...
摘要:單詞可以是任何東西,但通常它們指定了與該消息相關的一些功能。消息將使用由三個字兩個點組成的路由鍵發送。另一方面,只會進入第一個隊列,而只會進入第二個隊列。不匹配任何綁定,因此將被丟棄。代碼幾乎與前一個教程中的代碼相同。 源碼:https://github.com/ltoddy/rabbitmq-tutorial Topics (using the Pika Python client)...
摘要:為了避免與參數混淆,我們將其稱為綁定鍵。直接交換我們之前教程的日志記錄系統將所有消息廣播給所有消費者。在這種設置中,使用路由鍵發布到交換機的消息將被路由到隊列。所有其他消息將被丟棄。 源碼:https://github.com/ltoddy/rabbitmq-tutorial 路由 本章節教程重點介紹的內容 在之前的教程中,我們構建了一個簡單的日志系統 我們能夠將日志消息廣播給許多接收...
摘要:通常用于命名回調隊列。對每個響應執行的回調函數做了一個非常簡單的工作,對于每個響應消息它檢查是否是我們正在尋找的。在這個方法中,首先我們生成一個唯一的數并保存回調函數將使用這個值來捕獲適當的響應。 源碼:https://github.com/ltoddy/rabbitmq-tutorial 遠程過程調用(RPC) (using the Pika Python client) 本章節教程...
摘要:我們將任務封裝為消息并將其發送到隊列。為了確保消息永不丟失,支持消息確認。沒有任何消息超時當消費者死亡時,將重新傳遞消息。發生這種情況是因為只在消息進入隊列時調度消息。這告訴一次不要向工作人員發送多個消息。 源碼:https://github.com/ltoddy/rabbitmq-tutorial 工作隊列 showImg(https://segmentfault.com/img/r...
閱讀 3258·2021-09-09 11:39
閱讀 1232·2021-09-09 09:33
閱讀 1134·2019-08-30 15:43
閱讀 554·2019-08-29 14:08
閱讀 1739·2019-08-26 13:49
閱讀 2382·2019-08-26 10:09
閱讀 1550·2019-08-23 17:13
閱讀 2288·2019-08-23 12:57