国产xxxx99真实实拍_久久不雅视频_高清韩国a级特黄毛片_嗯老师别我我受不了了小说

資訊專欄INFORMATION COLUMN

RabbitMQ+PHP 教程六(RPC)

anquan / 2806人閱讀

摘要:有助于將響應(yīng)與請(qǐng)求關(guān)聯(lián)起來。如果發(fā)生這種情況,重新啟動(dòng)的服務(wù)器將再次處理請(qǐng)求。又名服務(wù)器正在等待該隊(duì)列上的請(qǐng)求。當(dāng)消息出現(xiàn)時(shí),它檢查屬性。然后,我們進(jìn)入循環(huán),在其中等待請(qǐng)求消息,完成工作并發(fā)送響應(yīng)。

(using php-amqplib)

前提必讀

本教程假設(shè)RabbitMQ是安裝在標(biāo)準(zhǔn)端口上運(yùn)行(5672)。如果您使用不同的主機(jī)、端口或憑據(jù),則連接設(shè)置需要調(diào)整。

如果您在本教程中遇到困難,可以通過郵件列表與我們聯(lián)系。

開始

在第二個(gè)教程中,我們學(xué)習(xí)了如何使用工作隊(duì)列在多個(gè)工人之間分配耗時(shí)的任務(wù)。

但是如果我們需要在遠(yuǎn)程計(jì)算機(jī)上運(yùn)行一個(gè)函數(shù)并等待結(jié)果呢?嗯,那是另一回事了。這種模式通常稱為遠(yuǎn)程過程調(diào)用或RPC。

在本教程中我們將使用RabbitMQ搭建一個(gè)RPC系統(tǒng):一個(gè)客戶端和一個(gè)可擴(kuò)展的RPC服務(wù)器。由于我們沒有任何值得分配的耗時(shí)的任務(wù),所以我們將創(chuàng)建一個(gè)返回Fibonacci數(shù)的模擬一個(gè)RPC服務(wù)。

Client interface

為了說明如何使用RPC服務(wù),我們將創(chuàng)建一個(gè)簡(jiǎn)單的客戶類。它將公開一個(gè)名為調(diào)用的方法,該方法發(fā)送一個(gè)RPC請(qǐng)求并阻塞直到接收到結(jié)果為止:

$fibonacci_rpc = new FibonacciRpcClient();
$response = $fibonacci_rpc->call(30);
echo " [.] Got ", $response, "
";
關(guān)于RPC的一些建議

雖然RPC是計(jì)算中非常常見的模式,但它經(jīng)常遭到批評(píng)。當(dāng)程序員不知道函數(shù)調(diào)用是本地的,或者它是一個(gè)緩慢的RPC時(shí),問題就出現(xiàn)了。這樣的混亂導(dǎo)致了不可預(yù)知的系統(tǒng),并給調(diào)試增加了不必要的復(fù)雜性。而簡(jiǎn)化軟件,濫用會(huì)導(dǎo)致難以維護(hù)的RPC代碼。

考慮到這一點(diǎn),請(qǐng)考慮以下建議:

確保很明顯哪個(gè)函數(shù)調(diào)用是本地調(diào)用,并且它是遠(yuǎn)程的。
記錄系統(tǒng)。使組件之間的依賴關(guān)系清晰。
處理錯(cuò)誤案例。RPC服務(wù)器長(zhǎng)時(shí)間處于下行狀態(tài)時(shí),客戶端應(yīng)如何響應(yīng)?
有疑問時(shí)避免RPC。如果可以,則應(yīng)該使用異步管道,而不是像阻塞這樣的RPC,結(jié)果被異步推送到下一個(gè)計(jì)算階段。

回調(diào)隊(duì)列(Callback queue)

一般在RabbitMQ做RPC是容易的。客戶端發(fā)送一條請(qǐng)求消息和一個(gè)響應(yīng)消息的服務(wù)器回復(fù)。為了接收響應(yīng),我們需要向請(qǐng)求發(fā)送一個(gè)“回調(diào)”隊(duì)列地址。我們可以使用默認(rèn)隊(duì)列。讓我們?cè)囋嚳矗?/p>

list($queue_name, ,) = $channel->queue_declare("", false, false, true, false);

$msg = new AMQPMessage(
    $payload,
    array("reply_to" => $queue_name));

$channel->basic_publish($msg, "", "rpc_queue");
# ... then code to read a response message from the callback_queue ...
消息屬性
AMQP協(xié)議(0-9-1 protocol)預(yù)定義了一套14個(gè)屬性,去一個(gè)消息。大多數(shù)屬性很少使用,除了以下內(nèi)容:

delivery_mode: 將消息標(biāo)記為持久性。 (with a value of 2) or transient (1). 您可能會(huì)從第二個(gè)教程中記住這個(gè)屬性。
content_type:用來描述編碼的MIME類型。例如,對(duì)于常用的JSON編碼,將此屬性設(shè)置為應(yīng)用程序/ JSON是一個(gè)很好的做法。
reply_to:常用的名字一個(gè)回調(diào)隊(duì)列。
correlation_id:有助于將RPC響應(yīng)與請(qǐng)求關(guān)聯(lián)起來。

Correlation Id

在上面介紹的方法中,我們建議為每個(gè)RPC請(qǐng)求創(chuàng)建一個(gè)回調(diào)隊(duì)列。這是非常低效的,但幸運(yùn)的是有一個(gè)更好的方法——讓我們?yōu)槊總€(gè)客戶機(jī)創(chuàng)建一個(gè)回調(diào)隊(duì)列。

這引發(fā)了一個(gè)新問題,在隊(duì)列中收到了響應(yīng),不清楚響應(yīng)的請(qǐng)求屬于哪個(gè)。那時(shí)候correlation_id屬性用于。我們將把它設(shè)置為每個(gè)請(qǐng)求的唯一值。稍后,當(dāng)我們?cè)诨卣{(diào)隊(duì)列中接收消息時(shí),我們將查看這個(gè)屬性,并在此基礎(chǔ)上,我們將能夠?qū)㈨憫?yīng)與請(qǐng)求匹配。如果我們看到一個(gè)未知的correlation_id值,我們可以安全地忽略信息-它不屬于我們的請(qǐng)求。

您可能會(huì)問,為什么我們應(yīng)該忽略回調(diào)隊(duì)列中的未知消息,而不是失敗出錯(cuò)呢?這是由于服務(wù)器端可能出現(xiàn)競(jìng)爭(zhēng)情況。雖然不太可能,RPC服務(wù)器可能在發(fā)送完答案后死亡,但在發(fā)出請(qǐng)求的確認(rèn)消息之前。如果發(fā)生這種情況,重新啟動(dòng)的RPC服務(wù)器將再次處理請(qǐng)求。這就是為什么在客戶機(jī)上我們必須優(yōu)雅地處理重復(fù)響應(yīng),而RPC應(yīng)該理想地是冪等的。

總結(jié)

我們的RPC會(huì)像這樣工作:

當(dāng)客戶端啟動(dòng)時(shí),它創(chuàng)建一個(gè)匿名的獨(dú)占回調(diào)隊(duì)列。

一個(gè)RPC請(qǐng)求,客戶端發(fā)送消息,兩個(gè)屬性:reply_to,設(shè)置回調(diào)隊(duì)列和correlation_id,它被設(shè)置為每個(gè)請(qǐng)求的唯一值。

請(qǐng)求被發(fā)送到一個(gè)rpc_queue隊(duì)列。

RPC worker(又名:服務(wù)器)正在等待該隊(duì)列上的請(qǐng)求。當(dāng)一個(gè)請(qǐng)求時(shí),它的工作和發(fā)送消息的結(jié)果返回給客戶端,使用從reply_to隊(duì)列。

客戶機(jī)等待回調(diào)隊(duì)列上的數(shù)據(jù)。當(dāng)消息出現(xiàn)時(shí),它檢查correlation_id屬性。如果它與請(qǐng)求的值匹配,則返回對(duì)應(yīng)用程序的響應(yīng)。

匯總

Fibonacci 遞歸源碼:

function fib($n) {
    if ($n == 0)
        return 0;
    if ($n == 1)
        return 1;
    return fib($n-1) + fib($n-2);
}
``
我們聲明fibonacci(斐波那契)函數(shù)。它只假設(shè)有效的正整數(shù)輸入。(不要指望這一個(gè)能為大數(shù)字工作,而且這可能是最慢的遞歸實(shí)現(xiàn))。

我們的RPC服務(wù)器rpc_server.php代碼看起來像這樣:

require_once DIR . "/vendor/autoload.php";
use PhpAmqpLibConnectionAMQPStreamConnection;
use PhpAmqpLibMessageAMQPMessage;

$connection = new AMQPStreamConnection("localhost", 5672, "guest", "guest");
$channel = $connection->channel();

$channel->queue_declare("rpc_queue", false, false, false, false);

function fib($n) {

if ($n == 0)
    return 0;
if ($n == 1)
    return 1;
return fib($n-1) + fib($n-2);

}

echo " [x] Awaiting RPC requestsn";
$callback = function($req) {

$n = intval($req->body);
echo " [.] fib(", $n, ")
";

$msg = new AMQPMessage(
    (string) fib($n),
    array("correlation_id" => $req->get("correlation_id"))
    );

$req->delivery_info["channel"]->basic_publish(
    $msg, "", $req->get("reply_to"));
$req->delivery_info["channel"]->basic_ack(
    $req->delivery_info["delivery_tag"]);

};

$channel->basic_qos(null, 1, null);
$channel->basic_consume("rpc_queue", "", false, false, false, false, $callback);

while(count($channel->callbacks)) {

$channel->wait();

}

$channel->close();
$connection->close();

?>

服務(wù)器代碼相當(dāng)簡(jiǎn)單:

像往常一樣,我們從建立連接、通道和聲明隊(duì)列開始。

我們可能需要運(yùn)行多個(gè)服務(wù)器進(jìn)程。為了分散負(fù)載同樣多的服務(wù)器需要設(shè)置`prefetch_count`, 設(shè)置`$channel.basic_qos`美元。

我們用`basic_consume`訪問隊(duì)列。然后,我們進(jìn)入while循環(huán),在其中等待請(qǐng)求消息,完成工作并發(fā)送響應(yīng)。

我們r(jià)pc_client.php RPC客戶端代碼:

require_once DIR . "/vendor/autoload.php";
use PhpAmqpLibConnectionAMQPStreamConnection;
use PhpAmqpLibMessageAMQPMessage;

class FibonacciRpcClient {

private $connection;
private $channel;
private $callback_queue;
private $response;
private $corr_id;

public function __construct() {
    $this->connection = new AMQPStreamConnection(
        "localhost", 5672, "guest", "guest");
    $this->channel = $this->connection->channel();
    list($this->callback_queue, ,) = $this->channel->queue_declare(
        "", false, false, true, false);
    $this->channel->basic_consume(
        $this->callback_queue, "", false, false, false, false,
        array($this, "on_response"));
}
public function on_response($rep) {
    if($rep->get("correlation_id") == $this->corr_id) {
        $this->response = $rep->body;
    }
}

public function call($n) {
    $this->response = null;
    $this->corr_id = uniqid();

    $msg = new AMQPMessage(
        (string) $n,
        array("correlation_id" => $this->corr_id,
              "reply_to" => $this->callback_queue)
        );
    $this->channel->basic_publish($msg, "", "rpc_queue");
    while(!$this->response) {
        $this->channel->wait();
    }
    return intval($this->response);
}

};

$fibonacci_rpc = new FibonacciRpcClient();
$response = $fibonacci_rpc->call(30);
echo " [.] Got ", $response, "n";

?>

現(xiàn)在是一個(gè)很好的時(shí)間來讓我們完整的示例源代碼rpc_client.php和rpc_server.php。

我們的RPC服務(wù)現(xiàn)在準(zhǔn)備好了。我們可以啟動(dòng)服務(wù)器:

php rpc_server.php
# => [x] Awaiting RPC requests

請(qǐng)求斐波那契數(shù)運(yùn)行客戶機(jī):

php rpc_client.php
# => [x] Requesting fib(30)
``
這里介紹的設(shè)計(jì)并不是RPC服務(wù)的唯一實(shí)現(xiàn),但它有一些重要的要點(diǎn):

如果RPC服務(wù)器太慢,您可以通過運(yùn)行另一個(gè)服務(wù)器來擴(kuò)展。試著在一個(gè)新的控制臺(tái)再運(yùn)行第一個(gè):rpc_server.php。

在客戶端,RPC只需要發(fā)送和接收一條消息。不喜歡queue_declare需要同步調(diào)用。因此,RPC客戶機(jī)只需要一次RPC請(qǐng)求的一次網(wǎng)絡(luò)往返。

我們的代碼仍然非常簡(jiǎn)單,并沒有試圖解決更復(fù)雜(但重要)的問題,例如:

如果沒有服務(wù)器運(yùn)行,客戶端應(yīng)該如何反應(yīng)?

客戶端應(yīng)該對(duì)RPC有某種超時(shí)嗎?

如果服務(wù)器發(fā)生故障并引發(fā)異常,是否應(yīng)該轉(zhuǎn)發(fā)給客戶端?

在處理前防止無效傳入消息(如檢查邊界、類型)。

如果您想進(jìn)行實(shí)驗(yàn),您可能會(huì)發(fā)現(xiàn)management UI對(duì)于查看隊(duì)列非常有用。

文章版權(quán)歸作者所有,未經(jīng)允許請(qǐng)勿轉(zhuǎn)載,若此文章存在違規(guī)行為,您可以聯(lián)系管理員刪除。

轉(zhuǎn)載請(qǐng)注明本文地址:http://specialneedsforspecialkids.com/yun/28314.html

相關(guān)文章

  • RabbitMQ+PHP 教程五(Topics)

    摘要:前提必讀本教程假設(shè)是安裝在標(biāo)準(zhǔn)端口上運(yùn)行。這些詞可以是任何東西,但通常它們指定連接到消息的某些特性。如果我們違背合同,用一個(gè)或四個(gè)詞,如或那么,這些消息將不匹配任何綁定并將丟失。代碼與前面的教程幾乎相同。 (using php-amqplib) 前提必讀 本教程假設(shè)RabbitMQ是安裝在標(biāo)準(zhǔn)端口上運(yùn)行(5672)。如果您使用不同的主機(jī)、端口或憑據(jù),則連接設(shè)置需要調(diào)整。 在哪里得到幫助...

    nemo 評(píng)論0 收藏0
  • 【譯】RabbitMQ系列()-RPC模式

    摘要:如果涉及返回值,就要用到本章提到的了。方法發(fā)送請(qǐng)求,并阻塞知道結(jié)果返回。當(dāng)有消息時(shí),進(jìn)行計(jì)算并通過指定的發(fā)送給客戶端。當(dāng)接收到,則檢查。如果和之前的匹配,則將消息返回給應(yīng)用進(jìn)行處理。 RPC模式 在第二章中我們學(xué)習(xí)了如何使用Work模式在多個(gè)worker之間派發(fā)時(shí)間敏感的任務(wù)。這種情況是不涉及到返回值的,worker執(zhí)行任務(wù)就好。如果涉及返回值,就要用到本章提到的RPC(Remote ...

    894974231 評(píng)論0 收藏0
  • 白話RabbitMQ(): RPC

    摘要:因?yàn)橄M(fèi)消息是在另外一個(gè)進(jìn)程中,我們需要阻塞我們的進(jìn)程直到結(jié)果返回,使用阻塞隊(duì)列是一種非常好的方式,這里我們使用了長(zhǎng)度為的,的功能是檢查消息的的是不是我們之前所發(fā)送的,如果是,將返回值返回到。 推廣 RabbitMQ專題講座 https://segmentfault.com/l/15... CoolMQ開源項(xiàng)目 我們利用消息隊(duì)列實(shí)現(xiàn)了分布式事務(wù)的最終一致性解決方案,請(qǐng)大家圍觀。可以參考...

    KevinYan 評(píng)論0 收藏0
  • rabbitmq中文教程python版 - 遠(yuǎn)程過程調(diào)用

    摘要:通常用于命名回調(diào)隊(duì)列。對(duì)每個(gè)響應(yīng)執(zhí)行的回調(diào)函數(shù)做了一個(gè)非常簡(jiǎn)單的工作,對(duì)于每個(gè)響應(yīng)消息它檢查是否是我們正在尋找的。在這個(gè)方法中,首先我們生成一個(gè)唯一的數(shù)并保存回調(diào)函數(shù)將使用這個(gè)值來捕獲適當(dāng)?shù)捻憫?yīng)。 源碼:https://github.com/ltoddy/rabbitmq-tutorial 遠(yuǎn)程過程調(diào)用(RPC) (using the Pika Python client) 本章節(jié)教程...

    chuyao 評(píng)論0 收藏0
  • RabbitMQ+PHP 教程一(Hello World)

    摘要:在中間的框是一個(gè)隊(duì)列的消息緩沖區(qū),保持代表的消費(fèi)。本教程介紹,這是一個(gè)開放的通用的協(xié)議消息。我們將在本教程中使用,解決依賴管理。發(fā)送者將連接到,發(fā)送一條消息,然后退出。注意,這與發(fā)送發(fā)布的隊(duì)列匹配。 介紹 RabbitMQ是一個(gè)消息代理器:它接受和轉(zhuǎn)發(fā)消息。你可以把它當(dāng)作一個(gè)郵局:當(dāng)你把郵件放在信箱里時(shí),你可以肯定郵差先生最終會(huì)把郵件送到你的收件人那里。在這個(gè)比喻中,RabbitMQ就...

    silencezwm 評(píng)論0 收藏0

發(fā)表評(píng)論

0條評(píng)論

最新活動(dòng)
閱讀需要支付1元查看
<