摘要:我們以前的教程中的日志系統(tǒng)將所有消息廣播給所有消費(fèi)者。我們希望擴(kuò)展這一點(diǎn),允許基于其一定嚴(yán)重性程度來過濾消息。在這種情況下,交換機(jī)將表現(xiàn)為交換機(jī),并將消息發(fā)送到所有匹配隊(duì)列。
using php-amqplib 前提必讀
本教程假設(shè) RabbitMQ 是運(yùn)行在標(biāo)準(zhǔn)端口上運(yùn)行(5672).
如果您使用不同的主機(jī)、端口或憑據(jù),則連接設(shè)置需要調(diào)整。
如果您在本教程中遇到困難,可以通過郵件列表與我們聯(lián)系。
在前面的教程中,我們構(gòu)建了一個(gè)簡(jiǎn)單的日志系統(tǒng)。我們能夠向許多接收者廣播日志消息。
開始在本教程中,我們將為它添加一個(gè)特性——我們將只可能訂閱消息的一個(gè)子集。例如,我們只能夠?qū)㈥P(guān)鍵錯(cuò)誤消息直接指向日志文件(以節(jié)省磁盤空間),同時(shí)仍然能夠打印控制臺(tái)上的所有日志消息。
綁定(Bindings)在前面的示例中,我們已經(jīng)創(chuàng)建綁定。您可能還記得代碼:
$channel->queue_bind($queue_name, "logs");
綁定是交換和隊(duì)列之間的一種關(guān)系。這可以簡(jiǎn)單地理解為:隊(duì)列對(duì)來自此交換的消息感興趣。
綁定可以采取額外的routing_key參數(shù)。避免混淆和$channel::basic_publish參數(shù)我們要叫它綁定key。這就是我們?nèi)绾斡面I創(chuàng)建綁定的原因:
$binding_key = "black"; $channel->queue_bind($queue_name, $exchange_name, $binding_key);
綁定鍵的含義取決于交換類型。我們以前使用的fanout交換將忽略了它的值。
Direct exchange我們以前的教程中的日志系統(tǒng)將所有消息廣播給所有消費(fèi)者。我們希望擴(kuò)展這一點(diǎn),允許基于其一定嚴(yán)重性程度來過濾消息。例如,我們可能希望將日志消息寫入到磁盤的腳本只接收關(guān)鍵錯(cuò)誤,而不會(huì)在警告或信息日志消息上浪費(fèi)磁盤空間。
我們使用的是fanout交換機(jī),這并不能給我們帶來很大的靈活性——它只能進(jìn)行無意識(shí)的廣播。
我們將使用direct交換機(jī)替代。direct交換機(jī)背后的路由算法很簡(jiǎn)單-消息傳遞到隊(duì)列,其綁定鍵完全匹配消息的路由鍵。
為了說明這一點(diǎn),請(qǐng)考慮以下設(shè)置:
在這個(gè)設(shè)置中,我們可以看到兩個(gè)隊(duì)列綁定到它的direct交換機(jī)X。第一個(gè)隊(duì)列與綁定鍵orange綁定,第二個(gè)綁定有兩個(gè)綁定,一個(gè)綁定鍵black,另一個(gè)綁定green。
在這樣的設(shè)置中,將路由消息發(fā)送到Exchange的路由密鑰orange將被路由到隊(duì)列Q1。帶有black或green路由鍵的消息將轉(zhuǎn)到Q2。所有其他消息都將被丟棄。
多個(gè)綁定 (Multiple bindings)用相同的綁定鍵綁定多個(gè)隊(duì)列是完全合法的。在我們的示例中,我們可以在綁定綁定鍵X和Q1之間添加一個(gè)綁定。在這種情況下,direct交換機(jī)將表現(xiàn)為fanout交換機(jī),并將消息發(fā)送到所有匹配隊(duì)列。將帶有路由鍵black的消息發(fā)送給Q1和Q2。
Emitting logs我們將使用這個(gè)模型作為我們的日志系統(tǒng)。我們將把消息發(fā)送給direct交換機(jī),而不是fanout交換機(jī)。我們將提供日志嚴(yán)重性作為路由鍵。這樣,接收腳本將能夠選擇它想要接收的嚴(yán)重性。讓我們先專注于發(fā)布日志。
和以往一樣,我們需要首先創(chuàng)建一個(gè)交換:
$channel->exchange_declare("direct_logs", "direct", false, false, false);
我們已經(jīng)準(zhǔn)備好發(fā)送消息了:
$channel->exchange_declare("direct_logs", "direct", false, false, false); $channel->basic_publish($msg, "direct_logs", $severity);
為了簡(jiǎn)化事情,我們會(huì)假設(shè)嚴(yán)重錯(cuò)誤有可以是info, warning, error的一種。
訂閱 (Subscribing)接收消息將與前面的教程一樣,只有一個(gè)例外——我們將為我們感興趣的每個(gè)嚴(yán)重性創(chuàng)建一個(gè)新的綁定。
foreach($severities as $severity) { $channel->queue_bind($queue_name, "direct_logs", $severity); }
代碼都放在一起:
emit_log_direct.php源碼:
channel(); $channel->exchange_declare("direct_logs", "direct", false, false, false); $severity = isset($argv[1]) && !empty($argv[1]) ? $argv[1] : "info"; $data = implode(" ", array_slice($argv, 2)); if(empty($data)) $data = "Hello World!"; $msg = new AMQPMessage($data); $channel->basic_publish($msg, "direct_logs", $severity); echo " [x] Sent ",$severity,":",$data," "; $channel->close(); $connection->close(); ?>
receive_logs_direct.php源碼:
channel(); $channel->exchange_declare("direct_logs", "direct", false, false, false); list($queue_name, ,) = $channel->queue_declare("", false, false, true, false); $severities = array_slice($argv, 1); if(empty($severities )) { file_put_contents("php://stderr", "Usage: $argv[0] [info] [warning] [error] "); exit(1); } foreach($severities as $severity) { $channel->queue_bind($queue_name, "direct_logs", $severity); } echo " [*] Waiting for logs. To exit press CTRL+C", " "; $callback = function($msg){ echo " [x] ",$msg->delivery_info["routing_key"], ":", $msg->body, " "; }; $channel->basic_consume($queue_name, "", false, true, false, false, $callback); while(count($channel->callbacks)) { $channel->wait(); } $channel->close(); $connection->close(); ?>
如果您只想保存warning 和 error(不包含info)日志消息到文件,只需打開控制臺(tái)并鍵入:
php receive_logs_direct.php warning error > logs_from_rabbit.log
如果您想查看屏幕上的所有日志消息,請(qǐng)打開一個(gè)新的終端并執(zhí)行:
php receive_logs_direct.php info warning error # => [*] Waiting for logs. To exit press CTRL+C
例如,觸發(fā)錯(cuò)誤日志消息:
php emit_log_direct.php error "Run. Run. Or it will explode." # => [x] Sent "error":"Run. Run. Or it will explode."
(全部源碼 emit_log_direct.php source and receive_logs_direct.php source)
學(xué)習(xí)如何基于模式偵聽消息, 你可以閱讀下一章節(jié):RabbitMQ+PHP 教程五(Topics)。
翻譯來自 RabbitMQ - RabbitMQ tutorial - Routing
文章版權(quán)歸作者所有,未經(jīng)允許請(qǐng)勿轉(zhuǎn)載,若此文章存在違規(guī)行為,您可以聯(lián)系管理員刪除。
轉(zhuǎn)載請(qǐng)注明本文地址:http://specialneedsforspecialkids.com/yun/28316.html
摘要:在客戶端中,當(dāng)我們將隊(duì)列名稱作為空字符串提供時(shí),我們創(chuàng)建一個(gè)帶有生成名稱的非持久隊(duì)列方法返回時(shí),變量包含一個(gè)隨機(jī)生成的隊(duì)列名稱。交換和隊(duì)列之間的關(guān)系稱為綁定。 使用 php-amqplib 介紹 在前面的教程中,我們創(chuàng)建了一個(gè)工作隊(duì)列。工作隊(duì)列背后的假設(shè)是每個(gè)任務(wù)都交付給一個(gè)工作人員處理。在這一部分中,我們將做一些完全不同的事情——我們將向多個(gè)消費(fèi)者發(fā)送消息。此模式稱為發(fā)布/訂閱。 ...
摘要:本文將會(huì)講解如何使用實(shí)現(xiàn)延時(shí)重試和失敗消息隊(duì)列,實(shí)現(xiàn)可靠的消息消費(fèi),消費(fèi)失敗后,自動(dòng)延時(shí)將消息重新投遞,當(dāng)達(dá)到一定的重試次數(shù)后,將消息投遞到失敗消息隊(duì)列,等待人工介入處理。 RabbitMQ是一款使用Erlang開發(fā)的開源消息隊(duì)列。本文假設(shè)讀者對(duì)RabbitMQ是什么已經(jīng)有了基本的了解,如果你還不知道它是什么以及可以用來做什么,建議先從官網(wǎng)的 RabbitMQ Tutorials 入門...
摘要:本文將會(huì)講解如何使用實(shí)現(xiàn)延時(shí)重試和失敗消息隊(duì)列,實(shí)現(xiàn)可靠的消息消費(fèi),消費(fèi)失敗后,自動(dòng)延時(shí)將消息重新投遞,當(dāng)達(dá)到一定的重試次數(shù)后,將消息投遞到失敗消息隊(duì)列,等待人工介入處理。 RabbitMQ是一款使用Erlang開發(fā)的開源消息隊(duì)列。本文假設(shè)讀者對(duì)RabbitMQ是什么已經(jīng)有了基本的了解,如果你還不知道它是什么以及可以用來做什么,建議先從官網(wǎng)的 RabbitMQ Tutorials 入門...
摘要:前提必讀本教程假設(shè)是安裝在標(biāo)準(zhǔn)端口上運(yùn)行。這些詞可以是任何東西,但通常它們指定連接到消息的某些特性。如果我們違背合同,用一個(gè)或四個(gè)詞,如或那么,這些消息將不匹配任何綁定并將丟失。代碼與前面的教程幾乎相同。 (using php-amqplib) 前提必讀 本教程假設(shè)RabbitMQ是安裝在標(biāo)準(zhǔn)端口上運(yùn)行(5672)。如果您使用不同的主機(jī)、端口或憑據(jù),則連接設(shè)置需要調(diào)整。 在哪里得到幫助...
摘要:為了避免與參數(shù)混淆,我們將其稱為綁定鍵。直接交換我們之前教程的日志記錄系統(tǒng)將所有消息廣播給所有消費(fèi)者。在這種設(shè)置中,使用路由鍵發(fā)布到交換機(jī)的消息將被路由到隊(duì)列。所有其他消息將被丟棄。 源碼:https://github.com/ltoddy/rabbitmq-tutorial 路由 本章節(jié)教程重點(diǎn)介紹的內(nèi)容 在之前的教程中,我們構(gòu)建了一個(gè)簡(jiǎn)單的日志系統(tǒng) 我們能夠?qū)⑷罩鞠V播給許多接收...
閱讀 2027·2021-11-08 13:14
閱讀 2935·2021-10-18 13:34
閱讀 2023·2021-09-23 11:21
閱讀 3583·2019-08-30 15:54
閱讀 1752·2019-08-30 15:54
閱讀 2921·2019-08-29 15:33
閱讀 2570·2019-08-29 14:01
閱讀 1941·2019-08-29 13:52