摘要:需要特別明確的概念交換機的持久化,并不等于消息的持久化。消息的處理,是有兩種方式,一次性。在上述示例中,使用的,意味著接收全部的消息。注意與是兩個不同的隊列。后端處理,可以針對每一個啟動一個或多個,以提高消息處理的實時性。
RabbitMQ與PHP(一)
項目中使用RabbitMQ作為隊列處理用戶消息通知,消息由前端PHP代碼產(chǎn)生,處理消息使用Python,這就導(dǎo)致代碼一致性問題,調(diào)整消息定義時需要PHP和Python都進行修改。這兩天抽時間研究了下,如何將消息的產(chǎn)生與處理(消費)全部用PHP來做。查資料時發(fā)現(xiàn),關(guān)于PHP處理消息隊列的資料很少,有必要把一些初學者容易混淆的地方再講一下。
擬分成兩部分: 一,RabbitMQ的原理與操作示例;二,具體服務(wù)安裝及如何用PHP作為守護模式處理消息。
RabbitMQ是流行的開源消息隊列系統(tǒng),用erlang語言開發(fā),完整的實現(xiàn)了AMPQ(高級消息隊列協(xié)議)。網(wǎng)站在: http://www.rabbitmq.com/ 上面有教程和實例代碼(Python和Java的)。
AMPQ協(xié)議為了能夠滿足各種消息隊列需求,在概念上比較復(fù)雜。首先,rabbitMQ啟動默認是沒有任何配置的,需要客戶端連接上去,設(shè)置交換機等才能工作。不把這些基礎(chǔ)概念弄清楚,后面程序設(shè)計就容易產(chǎn)生問題。
1,vhosts : 虛擬主機一個RabbitMQ的實體上可以有多個vhosts,用戶與權(quán)限設(shè)置就是依附于vhosts。對一般PHP應(yīng)用,不需要用戶權(quán)限設(shè)定,直接使用默認就存在的"/"就可以了,用戶可以使用默認就存在的"guest"。一個簡單的配置示例:
$conn_args = array( "host" => "127.0.0.1", "port" => "5672", "login" => "guest", "password" => "guest", "vhost"=>"/" );2,connection 與 channel : 連接與信道
connection是指物理的連接,一個client與一個server之間有一個連接;一個連接上可以建立多個channel,可以理解為邏輯上的連接。一般應(yīng)用的情況下,有一個channel就夠用了,不需要創(chuàng)建更多的channel。示例代碼:
//創(chuàng)建連接和channel $conn = new AMQPConnection($conn_args); if (!$conn->connect()) { die("Cannot connect to the broker! "); } $channel = new AMQPChannel($conn); ##3,exchange 與 routingkey : 交換機 與 路由鍵##
為了將不同類型的消息進行區(qū)分,設(shè)置了交換機與路由兩個概念。比如,將A類型的消息發(fā)送到名為‘C1’的交換機,將類型為B的發(fā)送到"C2"的交換機。當客戶端連接C1處理隊列消息時,取到的就只是A類型消息。進一步的,如果A類型消息也非常多,需要進一步細化區(qū)分,比如某個客戶端只處理A類型消息中針對K用戶的消息,routingkey就是來做這個用途的。
$e_name = "e_linvo"; //交換機名 $k_route = array(0=> "key_1", 1=> "key_2"); //路由key //創(chuàng)建交換機 $ex = new AMQPExchange($channel); $ex->setName($e_name); $ex->setType(AMQP_EX_TYPE_DIRECT); //direct類型 $ex->setFlags(AMQP_DURABLE); //持久化 echo "Exchange Status:".$ex->declare()." "; for($i=0; $i<5; ++$i){ echo "Send Message:".$ex->publish($message . date("H:i:s"), $k_route[i%2])." "; }
由以上代碼可以看到,發(fā)送消息時,只要有“交換機”就夠了。至于交換機后面有沒有對應(yīng)的處理隊列,發(fā)送方是不用管的。routingkey可以是空的字符串。在示例中,我使用了兩個key交替發(fā)送消息,是為了下面更便于理解routingkey的作用。
對于交換機,有兩個重要的概念:
A,類型。有三種類型: Fanout類型最簡單,這種模型忽略routingkey;Direct類型是使用最多的,使用確定的routingkey。這種模型下,接收消息時綁定"key_1"則只接收key_1的消息;最后一種是Topic,這種模式與Direct類似,但是支持通配符進行匹配,比如: "key_*",就會接受key_1和key_2。Topic貌似美好,但是有可能導(dǎo)致不嚴謹,所以還是推薦使用Direct。
B,持久化。指定了持久化的交換機,在重新啟動時才能重建,否則需要客戶端重新聲明生成才行。
需要特別明確的概念:交換機的持久化,并不等于消息的持久化。只有在持久化隊列中的消息,才能持久化;如果沒有隊列,消息是沒有地方存儲的;消息本身在投遞時也有一個持久化標志的,PHP中默認投遞到持久化交換機就是持久的消息,不用特別指定。
4,queue: 隊列講了這么多,才講到隊列呀。事實上,隊列僅是針對接收方(consumer)的,由接收方根據(jù)需求創(chuàng)建的。只有隊列創(chuàng)建了,交換機才會將新接受到的消息送到隊列中,交換機是不會在隊列創(chuàng)建之前的消息放進來的。換句話說,在建立隊列之前,發(fā)出的所有消息都被丟棄了。下面這個圖比RabbitMQ官方的圖更清楚——Queue是屬于ReceiveMessage的一部分。
接下來看一下創(chuàng)建隊列及接收消息的示例:
$e_name = "e_linvo"; //交換機名 $q_name = "q_linvo"; //隊列名 $k_route = ""; //路由key //創(chuàng)建連接和channel $conn = new AMQPConnection($conn_args); if (!$conn->connect()) { die("Cannot connect to the broker! "); } $channel = new AMQPChannel($conn); //創(chuàng)建交換機 $ex = new AMQPExchange($channel); $ex->setName($e_name); $ex->setType(AMQP_EX_TYPE_DIRECT); //direct類型 $ex->setFlags(AMQP_DURABLE); //持久化 echo "Exchange Status:".$ex->declare()." "; //創(chuàng)建隊列 $q = new AMQPQueue($channel); $q->setName($q_name); $q->setFlags(AMQP_DURABLE); //持久化 //綁定交換機與隊列,并指定路由鍵 echo "Queue Bind: ".$q->bind($e_name, $k_route)." "; //阻塞模式接收消息 echo "Message: "; $q->consume("processMessage", AMQP_AUTOACK); //自動ACK應(yīng)答 $conn->disconnect(); /** * 消費回調(diào)函數(shù) * 處理消息 */ function processMessage($envelope, $queue) { var_dump($envelope->getRoutingKey); $msg = $envelope->getBody(); echo $msg." "; //處理消息 }
從上述示例中可以看到,交換機既可以由消息發(fā)送端創(chuàng)建,也可以由消息消費者創(chuàng)建。
創(chuàng)建一個隊列(line:20)后,需要將隊列綁定到交換機上(line:25)隊列才能工作,routingkey也是在這里指定的。有的資料上寫成bindingkey,其實一回事兒,弄兩個名詞反倒容易混淆。
消息的處理,是有兩種方式:
A,一次性。用 $q->get([...]),不管取到取不到消息都會立即返回,一般情況下使用輪詢處理消息隊列就要用這種方式;
B,阻塞。用 $q->consum( callback, [...] ) 程序會進入持續(xù)偵聽狀態(tài),每收到一個消息就會調(diào)用callback指定的函數(shù)一次,直到某個callback函數(shù)返回FALSE才結(jié)束。
關(guān)于callback,這里多說幾句: PHP的call_back是支持使用數(shù)組的,比如: $c = new MyClass(); $c->counter = 100; $q->consume( array($c,"myfunc") ) 這樣就可以調(diào)用自己寫的處理類。MyClass中myfunc的參數(shù)定義,與上例中processMessage一樣就行。
在上述示例中,使用的$routingkey = "", 意味著接收全部的消息。我們可以將其改為 $routingkey = "key_1",可以看到結(jié)果中僅有設(shè)置routingkey為key_1的內(nèi)容了。
注意: routingkey = "key_1" 與 routingkey = "key_2" 是兩個不同的隊列。假設(shè): client1 與 client2 都連接到 key_1 的隊列上,一個消息被client1處理之后,就不會被client2處理。而 routingkey = "" 是另類,client_all綁定到 "" 上,將消息全都處理后,client1和client2上也就沒有消息了。
在程序設(shè)計上,需要規(guī)劃好exchange的名稱,以及如何使用key區(qū)分開不同類型的標記,在消息產(chǎn)生的地方插入發(fā)送消息代碼。后端處理,可以針對每一個key啟動一個或多個client,以提高消息處理的實時性。如何使用PHP進行多線程的消息處理,將在下一節(jié)中講述。
更多消息模型,可以參考: http://www.rabbitmq.com/tutorials/tutorial-two-python.html
文章版權(quán)歸作者所有,未經(jīng)允許請勿轉(zhuǎn)載,若此文章存在違規(guī)行為,您可以聯(lián)系管理員刪除。
轉(zhuǎn)載請注明本文地址:http://specialneedsforspecialkids.com/yun/21006.html
摘要:在客戶端中,當我們將隊列名稱作為空字符串提供時,我們創(chuàng)建一個帶有生成名稱的非持久隊列方法返回時,變量包含一個隨機生成的隊列名稱。交換和隊列之間的關(guān)系稱為綁定。 使用 php-amqplib 介紹 在前面的教程中,我們創(chuàng)建了一個工作隊列。工作隊列背后的假設(shè)是每個任務(wù)都交付給一個工作人員處理。在這一部分中,我們將做一些完全不同的事情——我們將向多個消費者發(fā)送消息。此模式稱為發(fā)布/訂閱。 ...
摘要:在中間的框是一個隊列的消息緩沖區(qū),保持代表的消費。本教程介紹,這是一個開放的通用的協(xié)議消息。我們將在本教程中使用,解決依賴管理。發(fā)送者將連接到,發(fā)送一條消息,然后退出。注意,這與發(fā)送發(fā)布的隊列匹配。 介紹 RabbitMQ是一個消息代理器:它接受和轉(zhuǎn)發(fā)消息。你可以把它當作一個郵局:當你把郵件放在信箱里時,你可以肯定郵差先生最終會把郵件送到你的收件人那里。在這個比喻中,RabbitMQ就...
摘要:前提必讀本教程假設(shè)是安裝在標準端口上運行。這些詞可以是任何東西,但通常它們指定連接到消息的某些特性。如果我們違背合同,用一個或四個詞,如或那么,這些消息將不匹配任何綁定并將丟失。代碼與前面的教程幾乎相同。 (using php-amqplib) 前提必讀 本教程假設(shè)RabbitMQ是安裝在標準端口上運行(5672)。如果您使用不同的主機、端口或憑據(jù),則連接設(shè)置需要調(diào)整。 在哪里得到幫助...
摘要:概述是一款消息隊列中間件。他提供了幾乎覆蓋所有語言的與文檔,簡直強大的不的了。要詳細的去了解學習,我建議還是看官方文檔吧。對文章有什么問題或疑問,歡迎在評論區(qū)留言。 showImg(https://segmentfault.com/img/bVbc7Yy?w=1066&h=608); 概述 RabbitMQ是一款消息隊列中間件。他提供了幾乎覆蓋所有語言的SDK與文檔,簡直強大的不的了。...
摘要:我們以前的教程中的日志系統(tǒng)將所有消息廣播給所有消費者。我們希望擴展這一點,允許基于其一定嚴重性程度來過濾消息。在這種情況下,交換機將表現(xiàn)為交換機,并將消息發(fā)送到所有匹配隊列。 using php-amqplib 前提必讀 本教程假設(shè) RabbitMQ 是運行在標準端口上運行(5672). 如果您使用不同的主機、端口或憑據(jù),則連接設(shè)置需要調(diào)整。 如果您在本教程中遇到困難,可以通過郵件列...
閱讀 2269·2021-11-23 09:51
閱讀 5656·2021-09-22 15:39
閱讀 3343·2021-09-02 15:15
閱讀 3492·2019-08-30 15:54
閱讀 2355·2019-08-30 15:53
閱讀 1397·2019-08-30 14:04
閱讀 2445·2019-08-29 18:33
閱讀 2364·2019-08-29 13:08