摘要:有助于將響應(yīng)與請(qǐng)求關(guān)聯(lián)起來。如果發(fā)生這種情況,重新啟動(dòng)的服務(wù)器將再次處理請(qǐng)求。又名服務(wù)器正在等待該隊(duì)列上的請(qǐng)求。當(dāng)消息出現(xiàn)時(shí),它檢查屬性。然后,我們進(jìn)入循環(huán),在其中等待請(qǐng)求消息,完成工作并發(fā)送響應(yīng)。
(using php-amqplib)
前提必讀本教程假設(shè)RabbitMQ是安裝在標(biāo)準(zhǔn)端口上運(yùn)行(5672)。如果您使用不同的主機(jī)、端口或憑據(jù),則連接設(shè)置需要調(diào)整。
如果您在本教程中遇到困難,可以通過郵件列表與我們聯(lián)系。
開始在第二個(gè)教程中,我們學(xué)習(xí)了如何使用工作隊(duì)列在多個(gè)工人之間分配耗時(shí)的任務(wù)。
但是如果我們需要在遠(yuǎn)程計(jì)算機(jī)上運(yùn)行一個(gè)函數(shù)并等待結(jié)果呢?嗯,那是另一回事了。這種模式通常稱為遠(yuǎn)程過程調(diào)用或RPC。
在本教程中我們將使用RabbitMQ搭建一個(gè)RPC系統(tǒng):一個(gè)客戶端和一個(gè)可擴(kuò)展的RPC服務(wù)器。由于我們沒有任何值得分配的耗時(shí)的任務(wù),所以我們將創(chuàng)建一個(gè)返回Fibonacci數(shù)的模擬一個(gè)RPC服務(wù)。
Client interface為了說明如何使用RPC服務(wù),我們將創(chuàng)建一個(gè)簡(jiǎn)單的客戶類。它將公開一個(gè)名為調(diào)用的方法,該方法發(fā)送一個(gè)RPC請(qǐng)求并阻塞直到接收到結(jié)果為止:
$fibonacci_rpc = new FibonacciRpcClient(); $response = $fibonacci_rpc->call(30); echo " [.] Got ", $response, " ";
關(guān)于RPC的一些建議回調(diào)隊(duì)列(Callback queue)雖然RPC是計(jì)算中非常常見的模式,但它經(jīng)常遭到批評(píng)。當(dāng)程序員不知道函數(shù)調(diào)用是本地的,或者它是一個(gè)緩慢的RPC時(shí),問題就出現(xiàn)了。這樣的混亂導(dǎo)致了不可預(yù)知的系統(tǒng),并給調(diào)試增加了不必要的復(fù)雜性。而簡(jiǎn)化軟件,濫用會(huì)導(dǎo)致難以維護(hù)的RPC代碼。
考慮到這一點(diǎn),請(qǐng)考慮以下建議:
確保很明顯哪個(gè)函數(shù)調(diào)用是本地調(diào)用,并且它是遠(yuǎn)程的。
記錄系統(tǒng)。使組件之間的依賴關(guān)系清晰。
處理錯(cuò)誤案例。RPC服務(wù)器長(zhǎng)時(shí)間處于下行狀態(tài)時(shí),客戶端應(yīng)如何響應(yīng)?
有疑問時(shí)避免RPC。如果可以,則應(yīng)該使用異步管道,而不是像阻塞這樣的RPC,結(jié)果被異步推送到下一個(gè)計(jì)算階段。
一般在RabbitMQ做RPC是容易的。客戶端發(fā)送一條請(qǐng)求消息和一個(gè)響應(yīng)消息的服務(wù)器回復(fù)。為了接收響應(yīng),我們需要向請(qǐng)求發(fā)送一個(gè)“回調(diào)”隊(duì)列地址。我們可以使用默認(rèn)隊(duì)列。讓我們?cè)囋嚳矗?/p>
list($queue_name, ,) = $channel->queue_declare("", false, false, true, false); $msg = new AMQPMessage( $payload, array("reply_to" => $queue_name)); $channel->basic_publish($msg, "", "rpc_queue"); # ... then code to read a response message from the callback_queue ...
消息屬性Correlation Id
AMQP協(xié)議(0-9-1 protocol)預(yù)定義了一套14個(gè)屬性,去一個(gè)消息。大多數(shù)屬性很少使用,除了以下內(nèi)容:delivery_mode: 將消息標(biāo)記為持久性。 (with a value of 2) or transient (1). 您可能會(huì)從第二個(gè)教程中記住這個(gè)屬性。
content_type:用來描述編碼的MIME類型。例如,對(duì)于常用的JSON編碼,將此屬性設(shè)置為應(yīng)用程序/ JSON是一個(gè)很好的做法。
reply_to:常用的名字一個(gè)回調(diào)隊(duì)列。
correlation_id:有助于將RPC響應(yīng)與請(qǐng)求關(guān)聯(lián)起來。
在上面介紹的方法中,我們建議為每個(gè)RPC請(qǐng)求創(chuàng)建一個(gè)回調(diào)隊(duì)列。這是非常低效的,但幸運(yùn)的是有一個(gè)更好的方法——讓我們?yōu)槊總€(gè)客戶機(jī)創(chuàng)建一個(gè)回調(diào)隊(duì)列。
這引發(fā)了一個(gè)新問題,在隊(duì)列中收到了響應(yīng),不清楚響應(yīng)的請(qǐng)求屬于哪個(gè)。那時(shí)候correlation_id屬性用于。我們將把它設(shè)置為每個(gè)請(qǐng)求的唯一值。稍后,當(dāng)我們?cè)诨卣{(diào)隊(duì)列中接收消息時(shí),我們將查看這個(gè)屬性,并在此基礎(chǔ)上,我們將能夠?qū)㈨憫?yīng)與請(qǐng)求匹配。如果我們看到一個(gè)未知的correlation_id值,我們可以安全地忽略信息-它不屬于我們的請(qǐng)求。
您可能會(huì)問,為什么我們應(yīng)該忽略回調(diào)隊(duì)列中的未知消息,而不是失敗出錯(cuò)呢?這是由于服務(wù)器端可能出現(xiàn)競(jìng)爭(zhēng)情況。雖然不太可能,RPC服務(wù)器可能在發(fā)送完答案后死亡,但在發(fā)出請(qǐng)求的確認(rèn)消息之前。如果發(fā)生這種情況,重新啟動(dòng)的RPC服務(wù)器將再次處理請(qǐng)求。這就是為什么在客戶機(jī)上我們必須優(yōu)雅地處理重復(fù)響應(yīng),而RPC應(yīng)該理想地是冪等的。
總結(jié)我們的RPC會(huì)像這樣工作:
當(dāng)客戶端啟動(dòng)時(shí),它創(chuàng)建一個(gè)匿名的獨(dú)占回調(diào)隊(duì)列。
一個(gè)RPC請(qǐng)求,客戶端發(fā)送消息,兩個(gè)屬性:reply_to,設(shè)置回調(diào)隊(duì)列和correlation_id,它被設(shè)置為每個(gè)請(qǐng)求的唯一值。
請(qǐng)求被發(fā)送到一個(gè)rpc_queue隊(duì)列。
RPC worker(又名:服務(wù)器)正在等待該隊(duì)列上的請(qǐng)求。當(dāng)一個(gè)請(qǐng)求時(shí),它的工作和發(fā)送消息的結(jié)果返回給客戶端,使用從reply_to隊(duì)列。
客戶機(jī)等待回調(diào)隊(duì)列上的數(shù)據(jù)。當(dāng)消息出現(xiàn)時(shí),它檢查correlation_id屬性。如果它與請(qǐng)求的值匹配,則返回對(duì)應(yīng)用程序的響應(yīng)。
匯總Fibonacci 遞歸源碼:
function fib($n) { if ($n == 0) return 0; if ($n == 1) return 1; return fib($n-1) + fib($n-2); } `` 我們聲明fibonacci(斐波那契)函數(shù)。它只假設(shè)有效的正整數(shù)輸入。(不要指望這一個(gè)能為大數(shù)字工作,而且這可能是最慢的遞歸實(shí)現(xiàn))。 我們的RPC服務(wù)器rpc_server.php代碼看起來像這樣:
require_once DIR . "/vendor/autoload.php";
use PhpAmqpLibConnectionAMQPStreamConnection;
use PhpAmqpLibMessageAMQPMessage;
$connection = new AMQPStreamConnection("localhost", 5672, "guest", "guest");
$channel = $connection->channel();
$channel->queue_declare("rpc_queue", false, false, false, false);
function fib($n) {
if ($n == 0) return 0; if ($n == 1) return 1; return fib($n-1) + fib($n-2);
}
echo " [x] Awaiting RPC requestsn";
$callback = function($req) {
$n = intval($req->body); echo " [.] fib(", $n, ") "; $msg = new AMQPMessage( (string) fib($n), array("correlation_id" => $req->get("correlation_id")) ); $req->delivery_info["channel"]->basic_publish( $msg, "", $req->get("reply_to")); $req->delivery_info["channel"]->basic_ack( $req->delivery_info["delivery_tag"]);
};
$channel->basic_qos(null, 1, null);
$channel->basic_consume("rpc_queue", "", false, false, false, false, $callback);
while(count($channel->callbacks)) {
$channel->wait();
}
$channel->close();
$connection->close();
?>
服務(wù)器代碼相當(dāng)簡(jiǎn)單: 像往常一樣,我們從建立連接、通道和聲明隊(duì)列開始。 我們可能需要運(yùn)行多個(gè)服務(wù)器進(jìn)程。為了分散負(fù)載同樣多的服務(wù)器需要設(shè)置`prefetch_count`, 設(shè)置`$channel.basic_qos`美元。 我們用`basic_consume`訪問隊(duì)列。然后,我們進(jìn)入while循環(huán),在其中等待請(qǐng)求消息,完成工作并發(fā)送響應(yīng)。 我們r(jià)pc_client.php RPC客戶端代碼:
require_once DIR . "/vendor/autoload.php";
use PhpAmqpLibConnectionAMQPStreamConnection;
use PhpAmqpLibMessageAMQPMessage;
class FibonacciRpcClient {
private $connection; private $channel; private $callback_queue; private $response; private $corr_id; public function __construct() { $this->connection = new AMQPStreamConnection( "localhost", 5672, "guest", "guest"); $this->channel = $this->connection->channel(); list($this->callback_queue, ,) = $this->channel->queue_declare( "", false, false, true, false); $this->channel->basic_consume( $this->callback_queue, "", false, false, false, false, array($this, "on_response")); } public function on_response($rep) { if($rep->get("correlation_id") == $this->corr_id) { $this->response = $rep->body; } } public function call($n) { $this->response = null; $this->corr_id = uniqid(); $msg = new AMQPMessage( (string) $n, array("correlation_id" => $this->corr_id, "reply_to" => $this->callback_queue) ); $this->channel->basic_publish($msg, "", "rpc_queue"); while(!$this->response) { $this->channel->wait(); } return intval($this->response); }
};
$fibonacci_rpc = new FibonacciRpcClient();
$response = $fibonacci_rpc->call(30);
echo " [.] Got ", $response, "n";
?>
現(xiàn)在是一個(gè)很好的時(shí)間來讓我們完整的示例源代碼rpc_client.php和rpc_server.php。 我們的RPC服務(wù)現(xiàn)在準(zhǔn)備好了。我們可以啟動(dòng)服務(wù)器:
php rpc_server.php
# => [x] Awaiting RPC requests
請(qǐng)求斐波那契數(shù)運(yùn)行客戶機(jī):
php rpc_client.php
# => [x] Requesting fib(30)
``
這里介紹的設(shè)計(jì)并不是RPC服務(wù)的唯一實(shí)現(xiàn),但它有一些重要的要點(diǎn):
如果RPC服務(wù)器太慢,您可以通過運(yùn)行另一個(gè)服務(wù)器來擴(kuò)展。試著在一個(gè)新的控制臺(tái)再運(yùn)行第一個(gè):rpc_server.php。
在客戶端,RPC只需要發(fā)送和接收一條消息。不喜歡queue_declare需要同步調(diào)用。因此,RPC客戶機(jī)只需要一次RPC請(qǐng)求的一次網(wǎng)絡(luò)往返。
我們的代碼仍然非常簡(jiǎn)單,并沒有試圖解決更復(fù)雜(但重要)的問題,例如:
如果沒有服務(wù)器運(yùn)行,客戶端應(yīng)該如何反應(yīng)?
客戶端應(yīng)該對(duì)RPC有某種超時(shí)嗎?
如果服務(wù)器發(fā)生故障并引發(fā)異常,是否應(yīng)該轉(zhuǎn)發(fā)給客戶端?
在處理前防止無效傳入消息(如檢查邊界、類型)。
如果您想進(jìn)行實(shí)驗(yàn),您可能會(huì)發(fā)現(xiàn)management UI對(duì)于查看隊(duì)列非常有用。
文章版權(quán)歸作者所有,未經(jīng)允許請(qǐng)勿轉(zhuǎn)載,若此文章存在違規(guī)行為,您可以聯(lián)系管理員刪除。
轉(zhuǎn)載請(qǐng)注明本文地址:http://specialneedsforspecialkids.com/yun/28314.html
摘要:前提必讀本教程假設(shè)是安裝在標(biāo)準(zhǔn)端口上運(yùn)行。這些詞可以是任何東西,但通常它們指定連接到消息的某些特性。如果我們違背合同,用一個(gè)或四個(gè)詞,如或那么,這些消息將不匹配任何綁定并將丟失。代碼與前面的教程幾乎相同。 (using php-amqplib) 前提必讀 本教程假設(shè)RabbitMQ是安裝在標(biāo)準(zhǔn)端口上運(yùn)行(5672)。如果您使用不同的主機(jī)、端口或憑據(jù),則連接設(shè)置需要調(diào)整。 在哪里得到幫助...
摘要:如果涉及返回值,就要用到本章提到的了。方法發(fā)送請(qǐng)求,并阻塞知道結(jié)果返回。當(dāng)有消息時(shí),進(jìn)行計(jì)算并通過指定的發(fā)送給客戶端。當(dāng)接收到,則檢查。如果和之前的匹配,則將消息返回給應(yīng)用進(jìn)行處理。 RPC模式 在第二章中我們學(xué)習(xí)了如何使用Work模式在多個(gè)worker之間派發(fā)時(shí)間敏感的任務(wù)。這種情況是不涉及到返回值的,worker執(zhí)行任務(wù)就好。如果涉及返回值,就要用到本章提到的RPC(Remote ...
摘要:因?yàn)橄M(fèi)消息是在另外一個(gè)進(jìn)程中,我們需要阻塞我們的進(jìn)程直到結(jié)果返回,使用阻塞隊(duì)列是一種非常好的方式,這里我們使用了長(zhǎng)度為的,的功能是檢查消息的的是不是我們之前所發(fā)送的,如果是,將返回值返回到。 推廣 RabbitMQ專題講座 https://segmentfault.com/l/15... CoolMQ開源項(xiàng)目 我們利用消息隊(duì)列實(shí)現(xiàn)了分布式事務(wù)的最終一致性解決方案,請(qǐng)大家圍觀。可以參考...
摘要:通常用于命名回調(diào)隊(duì)列。對(duì)每個(gè)響應(yīng)執(zhí)行的回調(diào)函數(shù)做了一個(gè)非常簡(jiǎn)單的工作,對(duì)于每個(gè)響應(yīng)消息它檢查是否是我們正在尋找的。在這個(gè)方法中,首先我們生成一個(gè)唯一的數(shù)并保存回調(diào)函數(shù)將使用這個(gè)值來捕獲適當(dāng)?shù)捻憫?yīng)。 源碼:https://github.com/ltoddy/rabbitmq-tutorial 遠(yuǎn)程過程調(diào)用(RPC) (using the Pika Python client) 本章節(jié)教程...
摘要:在中間的框是一個(gè)隊(duì)列的消息緩沖區(qū),保持代表的消費(fèi)。本教程介紹,這是一個(gè)開放的通用的協(xié)議消息。我們將在本教程中使用,解決依賴管理。發(fā)送者將連接到,發(fā)送一條消息,然后退出。注意,這與發(fā)送發(fā)布的隊(duì)列匹配。 介紹 RabbitMQ是一個(gè)消息代理器:它接受和轉(zhuǎn)發(fā)消息。你可以把它當(dāng)作一個(gè)郵局:當(dāng)你把郵件放在信箱里時(shí),你可以肯定郵差先生最終會(huì)把郵件送到你的收件人那里。在這個(gè)比喻中,RabbitMQ就...
閱讀 1768·2023-04-26 01:44
閱讀 1211·2021-11-12 10:34
閱讀 1579·2021-09-09 09:33
閱讀 1729·2019-08-30 15:44
閱讀 2892·2019-08-30 13:49
閱讀 2191·2019-08-29 15:26
閱讀 944·2019-08-26 13:30
閱讀 1409·2019-08-23 18:15