摘要:通常用于命名回調隊列。對每個響應執(zhí)行的回調函數做了一個非常簡單的工作,對于每個響應消息它檢查是否是我們正在尋找的。在這個方法中,首先我們生成一個唯一的數并保存回調函數將使用這個值來捕獲適當的響應。
源碼:https://github.com/ltoddy/rabbitmq-tutorial
遠程過程調用(RPC)(using the Pika Python client)
本章節(jié)教程重點介紹的內容在第二篇教程中,我們學習了如何使用工作隊列在多個工作人員之間分配耗時的任務。
但是如果我們需要在遠程計算機上運行某個功能并等待結果呢?那么,這是一個不同的事情。
這種模式通常稱為遠程過程調用(RPC)。
在本教程中,我們將使用RabbitMQ構建一個RPC系統(tǒng):一個客戶端和一個可擴展的RPC服務器。
由于我們沒有任何值得分發(fā)的耗時任務,我們將創(chuàng)建一個返回斐波那契數字的虛擬RPC服務。
為了說明如何使用RPC服務,我們將創(chuàng)建一個簡單的客戶端類。它將公開一個名為call的方法 ,
它發(fā)送一個RPC請求并阻塞,直到收到答案:
fibonacci_rpc = FibonacciRpcClient() result = fibonacci_rpc.call(4) print("fib(4) is %r" % result)
*有關RPC的說明* 雖然RPC是計算中很常見的模式,但它經常被吹毛求疵。當程序員不知道函數調用是本地的還是 慢速的RPC時會出現這些問題。像這樣的混亂導致不可預知的問題,并增加了調試的不必要的復雜性, 而不是我們想要的簡化軟件。 銘記這一點,請考慮以下建議: * 確保顯而易見哪個函數調用是本地的,哪個是遠程的。 * 記錄您的系統(tǒng)。清楚組件之間的依賴關系。 * 處理錯誤情況。當RPC服務器長時間關閉時,客戶端應該如何反應? 有疑問時避免RPC。如果可以的話,你應該使用異步管道 - 而不是類似于RPC的阻塞, 其結果被異步推送到下一個計算階段。回調隊列
一般來說,通過RabbitMQ來執(zhí)行RPC是很容易的。客戶端發(fā)送請求消息,服務器回復響應消息。
為了接收響應,客戶端需要發(fā)送一個“回調”隊列地址和請求。讓我們試試看:
result = channel.queue_declare(exclusive=True) callback_queue = result.method.queue channel.basic_publish(exchange="", routing_key="rpc_queue", properties=pika.BasicProperties( reply_to = callback_queue, ), body=request)
消息屬性 AMQP 0-9-1協(xié)議預定義了一組包含14個屬性的消息。大多數屬性很少使用,但以下情況除外: delivery_mode:將消息標記為持久(值為2)或瞬態(tài)(任何其他值)。你可能會記得第二篇教程中的這個屬性。 content_type:用于描述編碼的MIME類型。例如,對于經常使用的JSON編碼,將此屬性設置為application/json是一種很好的做法。 reply_to:通常用于命名回調隊列。 correlation_id:用于將RPC響應與請求關聯起來。相關ID
在上面介紹的方法中,我們建議為每個RPC請求創(chuàng)建一個回調隊列。這是非常低效的,
但幸運的是有一個更好的方法 - 讓我們?yōu)槊總€客戶端創(chuàng)建一個回調隊列。
這引發(fā)了一個新問題,在該隊列中收到回復后,不清楚回復屬于哪個請求。那是什么時候使用correlation_id屬性。
我們將把它設置為每個請求的唯一值。稍后,當我們在回調隊列中收到消息時,我們將查看此屬性,
并基于此屬性,我們將能夠將響應與請求進行匹配。如果我們看到未知的correlation_id值,
我們可以放心地丟棄該消息 - 它不屬于我們的請求。
您可能會問,為什么我們應該忽略回調隊列中的未知消息,而不是拋出錯誤?
這是由于服務器端可能出現競爭狀況。雖然不太可能,但在發(fā)送給我們答案之后,但在發(fā)送請求的確認消息之前,
RPC服務器可能會死亡。如果發(fā)生這種情況,重新啟動的RPC服務器將再次處理該請求。
這就是為什么在客戶端,我們必須優(yōu)雅地處理重復的響應,理想情況下RPC應該是等冪的。
我們的RPC會像這樣工作:
當客戶端啟動時,它創(chuàng)建一個匿名獨占回調隊列。
對于RPC請求,客戶端將發(fā)送具有兩個屬性的消息:reply_to,該消息設置為回調隊列和correlation_id,該值設置為每個請求的唯一值。
該請求被發(fā)送到rpc_queue隊列。
RPC worker(又名:服務器)正在等待該隊列上的請求。當出現請求時,它執(zhí)行該作業(yè),并使用reply_to字段中的隊列將結果發(fā)送回客戶端。
客戶端在回調隊列中等待數據。當出現消息時,它會檢查correlation_id屬性。如果它匹配來自請求的值,則返回對應用程序的響應。
把它放在一起rpc_server.py的代碼:
#!/usr/bin/env python import pika connection = pika.BlockingConnection(pika.ConnectionParameters("localhost")) channel = connection.channel() channel.queue_declare(queue="rpc_queue") def fib(n): if n == 0: return 0 elif n == 1: return 1 else: return fib(n - 1) + fib(n - 2) def on_request(ch, method, props, body): n = int(body) print(" [.] fib(%s)" % n) response = fib(n) ch.basic_publish(exchange="", routing_key=props.reply_to, properties=pika.BasicProperties( correlation_id=props.correlation_id), body=str(response)) ch.basic_ack(delivery_tag=method.delivery_tag) channel.basic_qos(prefetch_size=1) channel.basic_consume(on_request, queue="rpc_queue") print(" [x] Awaiting RPC requests") channel.start_consuming()
服務器代碼非常簡單:
(4)像往常一樣,我們首先建立連接并聲明隊列。
(11)我們聲明我們的斐波那契函數。它只假定有效的正整數輸入。(不要指望這個版本適用于大數字,它可能是最慢的遞歸實現)。
(20)我們聲明了basic_consume的回調,它是RPC服務器的核心。它在收到請求時執(zhí)行。它完成工作并將響應發(fā)回。
(34)我們可能想運行多個服務器進程。為了在多臺服務器上平均分配負載,我們需要設置prefetch_count設置。
rpc_client.py的代碼:
#!/usr/bin/env python import pika import uuid class FibonacciRpcClient: def __init__(self): self.connection = pika.BlockingConnection(pika.ConnectionParameters("localhost")) self.channel = self.connection.channel() result = self.channel.queue_declare(exclusive=True) self.callback_queue = result.method.queue self.channel.basic_consume(self.on_response, no_ack=True) def on_response(self, ch, method, props, body): if self.corr_id == props.corrrelation_id: self.response = body def call(self, n): self.response = None self.corr_id = str(uuid.uuid4()) self.channel.basic_publish(exchange="", routing_key="rpc_queue", properties=pika.BasicProperties( reply_to=self.callback_queue, correlation_id=self.corr_id), body=str(n)) while self.response is None: self.connection.process_data_events() return int(self.response) fibonacci_rpc = FibonacciRpcClient() print(" [x] Requesting fib(30)") response = fibonacci_rpc.call(30) print(" [.] Got %r" % response)
客戶端代碼稍有涉及:
(8)我們建立連接,通道并為回復聲明獨占的“回調”隊列。
(17)我們訂閱"回調"隊列,以便我們可以接收RPC響應。
(19)對每個響應執(zhí)行的"on_response"回調函數做了一個非常簡單的工作,對于每個響應消息它檢查correlation_id是否是我們正在尋找的。如果是這樣,它將保存self.response中的響應并打破消費循環(huán)。
(23)接下來,我們定義我們的主要調用方法 - 它執(zhí)行實際的RPC請求。
(25)在這個方法中,首先我們生成一個唯一的correlation_id數并保存 - "on_response"回調函數將使用這個值來捕獲適當的響應。
(29)接下來,我們發(fā)布具有兩個屬性的請求消息:reply_to和correlation_id。
(32)在這一點上,我們可以坐下來等待,直到適當的回應到達。
(41)最后,我們將回復返回給用戶。
文章版權歸作者所有,未經允許請勿轉載,若此文章存在違規(guī)行為,您可以聯系管理員刪除。
轉載請注明本文地址:http://specialneedsforspecialkids.com/yun/44712.html
摘要:每當我們收到一條消息,這個回調函數就被皮卡庫調用。接下來,我們需要告訴這個特定的回調函數應該從我們的隊列接收消息為了讓這個命令成功,我們必須確保我們想要訂閱的隊列存在。生產者計劃將在每次運行后停止歡呼我們能夠通過發(fā)送我們的第一條消息。 源碼:https://github.com/ltoddy/rabbitmq-tutorial 介紹 RabbitMQ是一個消息代理:它接受和轉發(fā)消息。你...
摘要:我們將任務封裝為消息并將其發(fā)送到隊列。為了確保消息永不丟失,支持消息確認。沒有任何消息超時當消費者死亡時,將重新傳遞消息。發(fā)生這種情況是因為只在消息進入隊列時調度消息。這告訴一次不要向工作人員發(fā)送多個消息。 源碼:https://github.com/ltoddy/rabbitmq-tutorial 工作隊列 showImg(https://segmentfault.com/img/r...
摘要:交易所在本教程的前幾部分中,我們發(fā)送消息并從隊列中接收消息。消費者是接收消息的用戶的應用程序。中的消息傳遞模型的核心思想是生產者永遠不會將任何消息直接發(fā)送到隊列中。交換和隊列之間的關系稱為綁定。 源碼:https://github.com/ltoddy/rabbitmq-tutorial 發(fā)布 / 訂閱 (using the Pika Python client) 本章節(jié)教程重點介紹的...
摘要:為了避免與參數混淆,我們將其稱為綁定鍵。直接交換我們之前教程的日志記錄系統(tǒng)將所有消息廣播給所有消費者。在這種設置中,使用路由鍵發(fā)布到交換機的消息將被路由到隊列。所有其他消息將被丟棄。 源碼:https://github.com/ltoddy/rabbitmq-tutorial 路由 本章節(jié)教程重點介紹的內容 在之前的教程中,我們構建了一個簡單的日志系統(tǒng) 我們能夠將日志消息廣播給許多接收...
摘要:單詞可以是任何東西,但通常它們指定了與該消息相關的一些功能。消息將使用由三個字兩個點組成的路由鍵發(fā)送。另一方面,只會進入第一個隊列,而只會進入第二個隊列。不匹配任何綁定,因此將被丟棄。代碼幾乎與前一個教程中的代碼相同。 源碼:https://github.com/ltoddy/rabbitmq-tutorial Topics (using the Pika Python client)...
閱讀 632·2021-08-17 10:15
閱讀 1715·2021-07-30 14:57
閱讀 1971·2019-08-30 15:55
閱讀 2813·2019-08-30 15:55
閱讀 2704·2019-08-30 15:44
閱讀 662·2019-08-30 14:13
閱讀 2380·2019-08-30 13:55
閱讀 2588·2019-08-26 13:56