国产xxxx99真实实拍_久久不雅视频_高清韩国a级特黄毛片_嗯老师别我我受不了了小说

資訊專欄INFORMATION COLUMN

RabbitMQ+PHP 教程二(Work Queues)

iKcamp / 2230人閱讀

摘要:平均每個消費者將得到相同數量的消息。消息確認完成任務可能需要幾秒鐘。為了確保消息不會丟失,支持消息確認。沒有任何消息超時當這個消費者中止了,將會重新分配消息時。這是因為只是調度消息時,消息進入隊列。

介紹

在上一個 Hello World 教程中,我們編寫了從指定隊列發送和接收消息的程序。在這篇文章中,我們將創建一個工作隊列,用于在多個工人(消費者)之間分配耗時的任務。

工作隊列(又名任務隊列)背后的主要思想是避免立即執行資源密集型任務,必須等待它完成。相反,我們計劃稍后完成任務。我們將任務封裝為消息并將其發送到隊列中。后臺運行的一個工作進程將彈出任務并最終執行該任務。當你運行許多工人(消費者)時,任務將在他們之間分擔。

這個概念在Web應用程序中尤其有用,因為在短HTTP請求中不可能處理復雜的任務。

先決條件

在本教程的前一部分,我們發送了一條包含“Hello World”的消息。現在,我們將發送支持復雜任務的字符串。我們沒有一個真實環境的任務,如圖像進行調整或PDF文件的渲染,讓我們利用sleep()模擬真實環境的業務功能。我們將字符串中的點數作為其復雜度;每個點都將占“工作”的一秒鐘。例如,由Hello...描述的一個偽任務…需要三秒。

new_task.php

我們會稍微修改send.php代碼從我們先前的例子,允許任意的消息是從命令行發送。這一計劃將任務分配給我們的工作隊列,所以我們命名它 new_task.php

$data = implode(" ", array_slice($argv, 1));
if(empty($data)) $data = "Hello World!";
$msg = new AMQPMessage($data);

$channel->basic_publish($msg, "", "hello");

echo " [x] Sent ", $data, "
";

我們的上一個版本的receive.php腳本也需要一些改變:它需要假第二工作在消息體中每一點。它會從隊列彈出消息和執行任務,所以讓我們把命名worker.php:

$callback = function($msg){
  echo " [x] Received ", $msg->body, "
"; //根據"."數量個數獲取延遲時間,單位秒
  sleep(substr_count($msg->body, "."));  //模擬業務執行時間延遲
  echo " [x] Done", "
";
};
$channel->basic_consume("hello", "", false, true, false, false, $callback);
單worker簡單運行測試 消費者
php worker.php
消息生產者
php new_task.php "A very hard task which takes two seconds.."
循環調度

一個使用任務隊列的優點是容易并行工作的能力。如果我們積壓了大量的工作,我們可以增加更多的工人,這樣就可以輕松地規模化。

首先,讓我們嘗試同時運行兩worker.php腳本。他們都會從隊列中獲得消息,看看效果如何?讓我們看看。

你需要打開三個console命令。兩將運行worker.php腳本。這些控制臺將是我們的兩個消費者C1和C2。

消費者1
php worker.php
消費者2
php worker.php
消息生產者
php new_task.php msg1...

默認情況下,RabbitMQ將會發送的每一條消息給下一個消費者,在序列。平均每個消費者將得到相同數量的消息。這種分發消息的方式稱為循環輪詢。試著用三個或更多的工人。

消息確認

完成任務可能需要幾秒鐘。你可能遇到如果一個消費者開始一個長期的任務,并且只完成了部分任務,那么會發生什么?。我們目前的代碼,一旦RabbitMQ發送一個消息給客戶立即標記為刪除。在這種情況下,如果您中止一個消費者,我們將丟失它正在處理的消息。我們還將丟失發送給該消費者所有的尚未處理的消息。

如果我們不想失去任何任務。如果一個消費者意外中止了,我們希望把任務交給另一個消費者。

為了確保消息不會丟失,RabbitMQ支持消息確認。ACK(nowledgement)消費者返回的結果告訴RabbitMQ有一條消息收到,你可以自由可控的刪除他

如果一個消費者中止了(其通道關閉,連接被關閉,或TCP連接丟失)不發送ACK,RabbitMQ將會理解這個消息并沒有完全處理,將它重新加入隊列。如果有其他用戶同時在線,它就會快速地傳遞到另一個消費者。這樣,即使意外中止了,也可以確保沒有丟失信息。

沒有任何消息超時;當這個消費者中止了,RabbitMQ將會重新分配消息時。即使處理消息花費很長很長時間也很好。

消息確認是默認關閉。可通過設置的第四個參數basic_consume設置為false(true意味著沒有ACK)和從消費者發送合適的確認,一旦我們完成一個任務。

$callback = function($msg){
  echo " [x] Received ", $msg->body, "
";
  sleep(substr_count($msg->body, "."));
  echo " [x] Done", "
";
  $msg->delivery_info["channel"]->basic_ack($msg->delivery_info["delivery_tag"]);
};

$channel->basic_consume("task_queue", "", false, false, false, false, $callback);

使用此代碼,我們可以確信,即使在處理消息時使用Ctrl + C殺死一名消費者,也不會丟失任何東西。消費者中止都未確認的消息后很快會被重新分配。

忘了確認(Forgotten acknowledgment)

丟失ACK確認是一個常見的錯誤。這是一個容易犯的錯誤,但后果很嚴重。當你的客戶退出,消息會被重新分配(這可能看起來像是隨機的分配),RabbitMQ將會消耗更多的內存,它不會釋放任何延遲確認消息。

為了調試這種錯誤,你可以使用rabbitmqctl打印messages_unacknowledged字段:

rabbitmqctl list_queues name messages_ready messages_unacknowledged
消息持久化(Message durability)

我們已經學會了如何確保即使消費者死了,任務也不會丟失。但是如果RabbitMQ服務器停止了,我們的任務仍然有可能會丟失。

當RabbitMQ退出或崩潰了,會丟失隊列和消息除非你不要。要確保消息不會丟失,需要兩件事:我們需要將隊列和消息都標記為持久的。

首先,我們需要確保RabbitMQ永遠不會丟失隊列。為了做到這一點,我們需要聲明它是持久的。為此我們通過queue_declare作為第三參數為true:

$channel->queue_declare("hello", false, true, false, false);

雖然這個命令本身是正確的,但它在我們當前的設置中不起作用。這是因為我們已經定義了一個名為hello的隊列,該隊列不持久。RabbitMQ不允許你重新定義現有隊列用不同的參數,將返回一個錯誤的任何程序,試圖這么做。但有一個快速的解決方法-讓我們聲明一個名稱不同的隊列,例如task_queue:

$channel->queue_declare("task_queue", false, true, false, false);

需要應用到生產者和消費者代碼中設置為true。

在這一點上,我們可以確保即使RabbitMQ重啟了,task_queue隊列不會丟失。現在我們要標記我們的消息持續通過設置delivery_mode = 2消息屬性,amqpmessage作為屬性數組的一部分。

$msg = new AMQPMessage($data, array("delivery_mode" => AMQPMessage::DELIVERY_MODE_PERSISTENT) );
關于消息持久性的說明(Note on message persistence)

將消息標記為持久性不能完全保證消息不會丟失。雖然它告訴RabbitMQ保存信息到磁盤上,還有一個短的時間窗口時,RabbitMQ 已經接受信息并沒有保存它。另外,RabbitMQ不做fsync(2)每一個消息--它可能只是保存到緩存并沒有真正寫入到磁盤。持久性保證不強,但對于我們的簡單任務隊列來說已經足夠了。如果你需要更強的保證,那么你可以使用消費者確認。

公平調度

您可能已經注意到,調度仍然不完全按照我們的要求工作。例如,在一個有兩個消費者的情況下,當所有的奇數信息都很重,甚至很輕的消息,一個消費者會一直忙,而另一個消費者幾乎不做任何工作。嗯,RabbitMQ不知道發生了什么事,仍將均勻消息發送。

這是因為RabbitMQ只是調度消息時,消息進入隊列。當存在未確認的消息時。它只是盲目的分發n-th條消息給n-th個消費者。

為了改變這個分配方式,我們可以調用basic_qos方法,設置參數prefetch_count = 1。這告訴RabbitMQ不要在一個時間給一個消費者多個消息。或者,換句話說,在處理和確認以前的消息之前,不要向消費者發送新消息。相反,它將發送給下一個仍然不忙的消費者。

$channel->basic_qos(null, 1, null);
關于隊列大小的注釋(Note about queue size)

如果所有的消費者都很忙,你的隊列填滿了。你會想留意到這一點,也許增加更多的工人,或者有其他的策略。

源碼 new_task.php
channel();

$channel->queue_declare("task_queue", false, true, false, false);

$data = implode(" ", array_slice($argv, 1));
if(empty($data)) $data = "Hello World!";
$msg = new AMQPMessage($data,
                        array("delivery_mode" => AMQPMessage::DELIVERY_MODE_PERSISTENT)
                      );

$channel->basic_publish($msg, "", "task_queue");

echo " [x] Sent ", $data, "
";

$channel->close();
$connection->close();

?>
worker.php
channel();

$channel->queue_declare("task_queue", false, true, false, false);

echo " [*] Waiting for messages. To exit press CTRL+C", "
";

$callback = function($msg){
  echo " [x] Received ", $msg->body, "
";
  sleep(substr_count($msg->body, "."));
  echo " [x] Done", "
";
  $msg->delivery_info["channel"]->basic_ack($msg->delivery_info["delivery_tag"]);
};

$channel->basic_qos(null, 1, null);
$channel->basic_consume("task_queue", "", false, false, false, false, $callback);

while(count($channel->callbacks)) {
    $channel->wait();
}

$channel->close();
$connection->close();

?>

使用消息的確認和預取,你可以設置一個工作隊列。耐久性的配置選項讓任務存在,即使RabbitMQ重啟。

學習如何向許多消費者傳遞同樣的信息, 你可以閱讀下一章節:RabbitMQ+PHP 教程三(Publish/Subscribe)。

翻譯來自 RabbitMQ - RabbitMQ tutorial - Work Queues

文章版權歸作者所有,未經允許請勿轉載,若此文章存在違規行為,您可以聯系管理員刪除。

轉載請注明本文地址:http://specialneedsforspecialkids.com/yun/26023.html

相關文章

  • RabbitMQ+PHP 教程一(Hello World)

    摘要:在中間的框是一個隊列的消息緩沖區,保持代表的消費。本教程介紹,這是一個開放的通用的協議消息。我們將在本教程中使用,解決依賴管理。發送者將連接到,發送一條消息,然后退出。注意,這與發送發布的隊列匹配。 介紹 RabbitMQ是一個消息代理器:它接受和轉發消息。你可以把它當作一個郵局:當你把郵件放在信箱里時,你可以肯定郵差先生最終會把郵件送到你的收件人那里。在這個比喻中,RabbitMQ就...

    silencezwm 評論0 收藏0
  • 【譯】RabbitMQ系列()-Work模式

    摘要:每個消費者會得到平均數量的。為了確保不會丟失,采用確認機制。如果中斷退出了關閉了,關閉了,或是連接丟失了而沒有發送,會認為該消息沒有完整的執行,會將該消息重新入隊。該消息會被發送給其他的。當消費者中斷退出,會重新分派。 Work模式 原文地址showImg(https://segmentfault.com/img/bVbqlXr?w=694&h=252); 在第一章中,我們寫了通過一個...

    lcodecorex 評論0 收藏0
  • RabbitMQ+PHP 消息隊列環境配置

    摘要:參考文檔依賴包安裝環境配置環境變量增加內容保存退出,并刷新變量測試是否安裝成功安裝完成以后,執行看是否能打開,用退出,注意后面的點號,那是的結束符。 參考文檔:http://www.cnblogs.com/phpinfo/p/4104551...http://blog.csdn.net/historyasamirror/ar... 依賴包安裝 yum install ncurses-d...

    geekidentity 評論0 收藏0
  • RabbitMQ+PHP 教程三(Publish/Subscribe)

    摘要:在客戶端中,當我們將隊列名稱作為空字符串提供時,我們創建一個帶有生成名稱的非持久隊列方法返回時,變量包含一個隨機生成的隊列名稱。交換和隊列之間的關系稱為綁定。 使用 php-amqplib 介紹 在前面的教程中,我們創建了一個工作隊列。工作隊列背后的假設是每個任務都交付給一個工作人員處理。在這一部分中,我們將做一些完全不同的事情——我們將向多個消費者發送消息。此模式稱為發布/訂閱。 ...

    Neilyo 評論0 收藏0
  • [譯] RabbitMQ tutorials (2) ---- 'work queue&#

    摘要:這樣的消息分發機制稱作輪詢。在進程掛了之后,所有的未被確認的消息會被重新分發。忘記確認這是一個普遍的錯誤,丟失。為了使消息不會丟失,兩件事情需要確保,我們需要持久化隊列和消息。 工作隊列 showImg(https://segmentfault.com/img/remote/1460000008229494?w=332&h=111); 在第一篇中,我們寫了一個程序從已經聲明的隊列中收發...

    joyvw 評論0 收藏0

發表評論

0條評論

iKcamp

|高級講師

TA的文章

閱讀更多
最新活動
閱讀需要支付1元查看
<