摘要:在客戶端中,當我們將隊列名稱作為空字符串提供時,我們創建一個帶有生成名稱的非持久隊列方法返回時,變量包含一個隨機生成的隊列名稱。交換和隊列之間的關系稱為綁定。
使用 php-amqplib 介紹
在前面的教程中,我們創建了一個工作隊列。工作隊列背后的假設是每個任務都交付給一個工作人員處理。在這一部分中,我們將做一些完全不同的事情——我們將向多個消費者發送消息。此模式稱為“發布/訂閱”。
為了說明這個模式,我們將構建一個簡單的日志系統。它將由兩個程序組成,第一個程序將發出日志消息,第二個程序將接收并打印它們。
在我們的日志系統中,接收程序的每個運行副本都會收到消息。這樣我們就可以運行一個接收器,并將日志引導到磁盤;同時,我們還可以運行另一個接收器,并在屏幕上看到日志。
本質上,已發布的日志消息將被廣播到所有接收器。
交換機(Exchanges)在本教程的前幾部分中,我們從隊列中發送和接收消息。現在是在Rabbit中引入完整消息傳遞模型的時候了。
讓我們快速瀏覽一下前面教程中介紹的內容:
生產者是發送消息的用戶應用程序。
隊列是存儲消息的緩沖區。
消費者是接收消息的用戶應用程序。
RabbitMQ消息傳遞模型的核心思想是,生產者不發送任何信息直接到隊列。事實上,生產者甚至不知道消息是否會發送到任何隊列。
相反,生產商只能向交換機(Exchange)發送消息。交換機做的事情很簡單。一方面,它接收來自生產者的信息,另一邊則推他們排隊。Exchange必須知道如何處理接收到的消息。應該附加到特定隊列嗎?它應該被添加到多個隊列?還是應該被拋棄?。這個規則是由交換類型定義的。
有幾種交換類型可用:direct, topic, headers 和 fanout。我們將集中討論最后一個——fanout。讓我們創建這種類型的交換,并稱之為日志:
$channel->exchange_declare("logs", "fanout", false, false, false);
fanout交換非常簡單。正如你可能從這個名字猜到的,它只廣播它收到的所有消息給它所知道的所有隊列。這正是我們需要的記錄器。
Listing exchanges列出服務器上的交換機,你可以運行rabbitmqctl:
sudo rabbitmqctl list_exchanges在這個列表中會有一些amq. *交流和默認(未命名)交換。默認情況下創建這些>,但目前不太可能使用它們。
默認的交換機在本教程的前幾部分中,我們對交換機一無所知,但仍然能夠將消息發送到隊列中。這是可能的,因為我們使用的是默認的交換,我們通過空字符串(“”)來標識它們。
回想一下我們之前如何發布消息:
$channel->basic_publish($msg, "", "hello");我們在這里使用默認的或無名的交換:消息路由到指定的routing_key名稱的隊列,如果它存在的話。路由鍵是第三個參數:basic_publish
現在,我們可以將其發布到我們命名的Exchange中:
$channel->exchange_declare("logs", "fanout", false, false, false); $channel->basic_publish($msg, "logs");臨時隊列(Temporary queues)
也許你還記得以前我們使用的隊列所指定的名稱(記得hello和task_queue?). 能夠說出一個隊列對我們來說至關重要 -- 我們需要把工人指向同一個隊列。當你想在生產者和消費者之間共享一個隊列時,給隊列一個名字是很重要的。
但我們的記錄器不是這樣的。我們想了解所有日志消息,而不僅僅是其中的一個子集。我們也只對當前流動的消息感興趣,而不是舊消息。為了解決這個問題,我們需要兩件事。
首先,每當我們與Rabbit連接時,我們需要一個新的空隊列。為此,我們可以創建一個帶有隨機名稱的隊列,或者更好 - 讓服務器為我們選擇一個隨機隊列名。
第二,一旦斷開消費者,隊列應該自動刪除。
在php客戶端中,當我們將隊列名稱作為空字符串提供時,我們創建一個帶有生成名稱的非持久隊列:
list($queue_name, ,) = $channel->queue_declare("");
方法返回時,queue_name變量包含一個隨機生成的RabbitMQ隊列名稱。例如,它可能看起來像amq.gen-jzty20brgko-hjmujj0wlg
當聲明它關閉的連接時,隊列將被刪除,因為它被聲明為獨占。
綁定(Bindings)我們已經創建了fanout交換機和隊列。現在我們需要告訴Exchange發送消息到我們的隊列中。交換和隊列之間的關系稱為綁定。
$channel->queue_bind($queue_name, "logs");
從現在開始,日志交換將向隊列添加消息。
列出綁定列表(Listing bindings)讓我們把所有整理在一起(Putting it all together)您可以使用現有的綁定列表,使用下面命令:
rabbitmqctl list_bindings
生成日志消息的生成程序與前面的教程沒有多大區別。最重要的變化是,我們現在希望把消息發布到我們的日志交換,而不是無名的。這里給出emit_log.php代碼:
channel(); $channel->exchange_declare("logs", "fanout", false, false, false); $data = implode(" ", array_slice($argv, 1)); if(empty($data)) $data = "info: Hello World!"; $msg = new AMQPMessage($data); $channel->basic_publish($msg, "logs"); echo " [x] Sent ", $data, " "; $channel->close(); $connection->close(); ?>
emit_log.php源碼
如您所見,在建立連接之后,我們聲明交換。這一步是必要的,因為發布到一個不存在的交換機是禁止的。
如果沒有隊列綁定到Exchange,消息將丟失,但這對我們來說是好的;如果沒有用戶正在監聽,我們可以安全地丟棄消息。
receive_logs.php代碼:
channel(); $channel->exchange_declare("logs", "fanout", false, false, false); list($queue_name, ,) = $channel->queue_declare("", false, false, true, false); $channel->queue_bind($queue_name, "logs"); echo " [*] Waiting for logs. To exit press CTRL+C", " "; $callback = function($msg){ echo " [x] ", $msg->body, " "; }; $channel->basic_consume($queue_name, "", false, true, false, false, $callback); while(count($channel->callbacks)) { $channel->wait(); } $channel->close(); $connection->close(); ?>
receive_logs.php
如果要將日志保存到文件中,只需打開控制臺并鍵入:
php receive_logs.php > logs_from_rabbit.log
如果您希望看到屏幕上的日志,生成一個新的終端并運行:
php receive_logs.php
當然,然后觸發日志類型:
php emit_log.php
使用rabbitmqctl list_bindings可以驗證代碼實際上是創建綁定和隊列是我們想要的。兩receive_logs.php程序運行你應該看到:
sudo rabbitmqctl list_bindings # => Listing bindings ... # => logs exchange amq.gen-JzTY20BRgKO-HjmUJj0wLg queue [] # => logs exchange amq.gen-vso0PVvyiRIL2WoV3i48Yg queue [] # => ...done.
對結果的解釋很簡單:來自Exchange日志的數據使用服務器分配的名稱到兩個隊列中。這正是我們想要的。
要了解如何偵聽一個消息的子集,讓我們轉到RabbitMQ+PHP 教程四(Routing)。
翻譯來自 RabbitMQ - RabbitMQ tutorial - Publish/Subscribe
文章版權歸作者所有,未經允許請勿轉載,若此文章存在違規行為,您可以聯系管理員刪除。
轉載請注明本文地址:http://specialneedsforspecialkids.com/yun/28262.html
摘要:平均每個消費者將得到相同數量的消息。消息確認完成任務可能需要幾秒鐘。為了確保消息不會丟失,支持消息確認。沒有任何消息超時當這個消費者中止了,將會重新分配消息時。這是因為只是調度消息時,消息進入隊列。 showImg(https://segmentfault.com/img/bVXNuN?w=332&h=111); 介紹 在上一個 Hello World 教程中,我們編寫了從指定隊列發送...
摘要:消息隊列選擇是一個由開發的的開源實現的產品,是一個消息代理,從生產者接收消息并傳遞消息至消費者,期間可根據規則路由緩存持久化消息。綁定隊列和交換機之間的關系。根據消息的屬性和的屬性來轉發消息。 消息隊列選擇:RabbitMQ & Redis RabbitMQ RabbitMQ是一個由erlang開發的AMQP(Advanced Message Queue )的開源實現的產品,Rabbi...
摘要:生產者只能把消息發到交換器。是否要追加到一個特殊的隊列是否要追加到許多的隊列或者丟掉這條消息這些規則被定義為交換類型。有一點很關鍵,向不存在的交換器發布消息是被禁止的。如果仍然沒有隊列綁定交換器,消息會丟失。 發布與訂閱 (Publish/Subscribe) 在之前的章節中,我們創建了工作隊列,之前的工作隊列的假設是每個任務只被分發到一個worker。在這一節中,我們會做一些完全不一...
摘要:概述概述消息隊列,是分布式系統中重要的組件,是一種進程間通信或者是同一進程的不同線程的通信方式。消息隊列的使用場景消息隊列的使用場景異步處理流量控制應用解耦應用解耦應用解耦消息隊列的一個作用就是實現系統應用之間的解耦。概述消息隊列(Message Queue),是分布式系統中重要的組件,是一種進程間通信或者是同一進程的不同線程的通信方式。和 http 同步協議不同的是,消息隊列是一種異步的通...
閱讀 3475·2021-10-13 09:39
閱讀 1458·2021-10-08 10:05
閱讀 2260·2021-09-26 09:56
閱讀 2275·2021-09-03 10:28
閱讀 2673·2019-08-29 18:37
閱讀 2032·2019-08-29 17:07
閱讀 600·2019-08-29 16:23
閱讀 2191·2019-08-29 11:24