摘要:消息隊列,用于存儲還未被消費者消費的消息。由在與時指定,而由發送時指定,兩者的匹配方式由決定。需要為每一個創建,協議規定只有通過才能執行的命令。建議客戶端線程之間不要共用,至少要保證共用的線程發送消息必須是串行的,但是建議盡量共用。
安裝
rabbitmq 在 mac 下可以直接用 brew 安裝
默認安裝在 /usr/local/Cellar/下
命令被軟連接加入到了/usr/local/sbin 下,因此可以把此目錄放到環境變量中,建議加入到~/.bash_profile 中
rabbitmq-server start 開啟服務
端口5672 默認
端口15672 web 端登錄管理端口127.0.0.1:15672
rabbitmq 默認提供的用戶 guest 密碼 guest
停止服務
rabbitmqctl stop
開啟應用 [服務依舊運行]
rabbitmqctl start_app
停止應用 [服務依舊運行]
rabbitmqctl stop_app
添加用戶
sudo rabbitmqctl add_user username password
刪除用戶
sudo rabbitmqctl delete_user username
修改密碼
sudo rabbitmqctl change_password username newpassword
清除用戶密碼,禁止用戶登錄
sudo rabbitmqctl clear_password
列出所有用戶
sudo rabbitmqctl list_users
設置用戶角色
rabbitmqctl set_user_tags username tag
virtual host只是起到一個命名空間的作用,所以可以多個user共同使用一個virtual host,"/"這個是系統默認的vhost,就是說當我們創建一個到rabbitmq的connection時候,它的命名空間是"/",需要注意的是不同的命名空間之間的資源是不能訪問的,比如 exchang,queue ,bingding等
創建虛擬主機
sudo rabbitmqctl add_vhost vhostpath
刪除虛擬主機
sudo rabbitmqctl delete_vhost vhostpath
列出所有虛擬主機
sudo rabbitmqctl list_vhosts
列出某個 vhost 的所有用戶和權限
list_permissions [-p vhostpath]
列出某個用戶的所有權限。
list_user_permissions {username}
清除用戶對某個 vhost 的權限。
clear_permissions [-p vhostpath] {username}
設置用戶對某個 virtual host 的權限,如果不指定 vhost,則默認為“/” vhost。
set_permissions [-p vhostpath] {user}
rabbitmqctl set_permissions -p test_host kang “." "." ".*"
添加一個管理員代替 guest
rabbitmqctl add_user admin 123456
指定用戶的角色
rabbitmqctl set_user_tags admin administrator
rabbitmqctl set_permissions -p / admin "." "." ".*”
分配給用戶指定虛擬主機的權限,雖然是administrator角色,但不對所有虛擬主機都有權限,一樣需要對每個虛擬主機都授權
顯示信息
rabbitmqctl list_queues [-p
列出某個 vhost 的所有 queue。
rabbitmqctl list_exchanges [-p
列出某個 vhost 的所有 exchange。
rabbitmqctl list_bindings [-p
列出某個 vhost 的所有 binding。
rabbitmqctl list_connections [
列出 RabbitMQ broker 的所有 connection。
rabbitmqctl list_channels [
列出 RabbitMQ broker 的所有 channel
rabbitmqcrl list_consumers [-p
列出某個 vhost 的所有 consumer。
1.Server(broker): 接受客戶端連接,實現AMQP消息隊列和路由功能的進程。
2.Virtual Host:其實是一個虛擬概念,類似于權限控制組,一個Virtual Host里面可以有若干個Exchange和Queue,但是權限控制的最小粒度是Virtual Host
3.Exchange:接受生產者發送的消息,并根據Binding規則將消息路由給服務器中的隊列。ExchangeType決定了Exchange路由消息的行為,例如,在RabbitMQ中,ExchangeType有direct、Fanout和Topic三種,不同類型的Exchange路由的行為是不一樣的。
4.Message Queue:消息隊列,用于存儲還未被消費者消費的消息。
5.Message: 由Header和Body組成,Header是由生產者添加的各種屬性的集合,包括Message是否被持久化、由哪個Message Queue接受、優先級是多少等。而Body是真正需要傳輸的APP數據。
6.Binding:Binding聯系了Exchange與Message Queue。Exchange在與多個Message Queue發生Binding后會生成一張路由表,路由表中存儲著Message Queue所需消息的限制條件即Binding Key。當Exchange收到Message時會解析其Header得到Routing Key,Exchange根據Routing Key與Exchange Type將Message路由到Message Queue。Binding Key由Consumer在Binding Exchange與Message Queue時指定,而Routing Key由Producer發送Message時指定,兩者的匹配方式由Exchange Type決定。
7.Connection:連接,對于RabbitMQ而言,其實就是一個位于客戶端和Broker之間的TCP連接。
8.Channel:信道,僅僅創建了客戶端到Broker之間的連接后,客戶端還是不能發送消息的。需要為每一個Connection創建Channel,AMQP協議規定只有通過Channel才能執行AMQP的命令。一個Connection可以包含多個Channel。之所以需要Channel,是因為TCP連接的建立和釋放都是十分昂貴的,如果一個客戶端每一個線程都需要與Broker交互,如果每一個線程都建立一個TCP連接,暫且不考慮TCP連接是否浪費,就算操作系統也無法承受每秒建立如此多的TCP連接。RabbitMQ建議客戶端線程之間不要共用Channel,至少要保證共用Channel的線程發送消息必須是串行的,但是建議盡量共用Connection。
9.Command:AMQP的命令,客戶端通過Command完成與AMQP服務器的交互來實現自身的邏輯。例如在RabbitMQ中,客戶端可以通過publish命令發送消息,txSelect開啟一個事務,txCommit提交一個事務。
客戶端管理php端 rabbitmq 客戶端可以使用 composer 下的庫
{ "require": { "php-amqplib/php-amqplib": "2.5.*" } }
使用時,用到了這兩個東西
use PhpAmqpLibConnectionAMQPStreamConnection; use PhpAmqpLibMessageAMQPMessage;
開始發送消息
public function handle() { //連接到 test_host 虛擬主機,每個虛擬主機有自己的隊列,交換機... $connection = new AMQPStreamConnection("127.0.0.1", 5672, "kang", "a943434603", "test_host"); //創建一個 channel $channel = $connection->channel(); //聲明 hello 隊列 $channel->queue_declare("hello", false, false, false, false); //創建一個消息 $msg = new AMQPMessage(time()); //把消息推送到默認的交換機中,并且告訴交換機要把消息交給 hello 隊列 $channel->basic_publish($msg, "", "hello"); echo " [x] Sent ".time()." "; }
重要概念,消息是保存在交換機中的,當消息存放時指定的隊列存在,交換機會把消息推送到該隊列
消息隊列發送消息給消費者,一個消息發給一個消費者
public function handle() { //連接 $connection = new AMQPStreamConnection("localhost", 5672, "kang", "a943434603", "test_host"); //創建一個 channel $channel = $connection->channel(); //可以運行這個命令很多次,但是只有一個隊列會被創建, 在程序中重復將隊列重復聲明一下是種值得推薦的做法,保證隊列存在 $channel->queue_declare("hello", false, false, false, false); echo " [*] Waiting for messages. To exit press CTRL+C", " "; $callback = function($msg) { echo " [x] Received ", $msg->body, " "; sleep($msg->body); $msg->delivery_info["channel"]->basic_ack($msg->delivery_info["delivery_tag"]); }; //默認情況下,隊列會把消息公平的分配給各個消費者 //如果某個消費者腳本處理完成分配給他的消息任務后,會一直空閑 //另外一個消費者腳本處理的消息都非常耗時,這就容易導致消費者腳本得不到合理利用, //加入此句話,是告訴隊列,取消把消息公平分配到各個腳本,而是那個腳本空閑,就交給它一個消息任務 //這樣,合理利用到每一個空閑的消費者腳本 $channel->basic_qos(null, 1, null); /** * basic_consume 方法 從隊列中讀取數據 * @param string $queue 指定隊列 * @param string $consumer_tag * @param bool $no_local * @param bool $no_ack 消費者處理完消息后,是否不需要告訴隊列已經處理完成,true 不需要 false 需要, * true 默認情況下,隊列會把消息公平分配到各個消費者中,然后一次性把消息交給消費者,如果消費者處理了一半掛了,那么消息就丟失了 * false 默認情況下,隊列會把消息公平的分配給各個消費者,然后一個一個的把消息分配到消費者腳本中,腳本處理完成后,告訴隊列,隊列會刪除這個消息,并且接著給下一個消息, 當腳本掛掉,不會丟失消息,隊列會把未完成的消息分配給其他消費者 在 callback 函數中需要加入這句話,處理完后通知隊列可以刪除消息了 $msg->delivery_info["channel"]->basic_ack($msg->delivery_info["delivery_tag"]); 未加入這句話,隊列不會刪除已處理完的消息,當腳本掛掉時,會把分配給當前隊列的所有消息再次重新分配給其他隊列,會導致消息會重復處理 */ $channel->basic_consume("hello", "", false, false, false, false, $callback); while(count($channel->callbacks)) { $channel->wait(); } $channel->close(); $connection->close(); }發布/訂閱
一個消息發送給多個消費者
扇形交換機 fanout
發布訂閱模式,科院實現一個消息發送到多個隊列中
在發布消息腳本中,創建一個扇形交換機,把消息推送到交換機,不需要推動到指定的隊列中,隊列在消費者腳本中創建
消費腳本定義個臨時隊列,并綁定這個臨時隊列到交換機中,扇形交換機會把接收到的消息推動到每一個綁定的隊列中
生產者腳本
public function handle() { //連接到 test_host 虛擬主機,每個虛擬主機有自己的隊列,交換機... $connection = new AMQPStreamConnection("127.0.0.1", 5672, "kang", "a943434603", "test_host"); //創建一個 channel $channel = $connection->channel(); //聲明 log 隊列 //$channel->queue_declare("log", false, false, false, false); //創建一個fanout類型交換機 $channel->exchange_declare("logs","fanout",false,false,false); //創建一個消息 $msg = new AMQPMessage( time() ); $channel->basic_publish ( $msg , "logs" ); echo " [x] Sent ".time()." "; $channel->close(); $connection->close(); }
消費者腳本
public function handle() { //連接 $connection = new AMQPStreamConnection("localhost", 5672, "kang", "a943434603", "test_host"); //創建一個 channel $channel = $connection->channel(); //可以運行這個命令很多次,但是只有一個隊列會被創建, 在程序中重復將隊列重復聲明一下是種值得推薦的做法,保證隊列存在 //$channel->queue_declare("hello", false, false, false, false); //創建一個fanout類型交換機 $channel->exchange_declare("logs", "fanout", false, false, false); //系統創建一個臨時隊列 list($queue_name, ,) = $channel->queue_declare("", false, false, true, false); //綁定臨時隊列到交換機上 $channel->queue_bind($queue_name, "logs"); $callback = function($msg){ echo " [x] ", $msg->body, " "; }; $channel->basic_consume($queue_name, "", false, true, false, false, $callback); while(count($channel->callbacks)) { $channel->wait(); } $channel->close(); $connection->close(); }
直連交換機 direct
交換機將會對綁定鍵(binding key)和路由鍵(routing key)進行精確匹配,從而確定消息該分發到哪個隊列
生產者,創建基本上和扇形交換機一樣,不同的是
$channel->exchange_declare("direct_logs","direct",false,false,false); //創建一個消息 $msg = new AMQPMessage( time() ); //把消息推動到direct_logs交換機,并給消息加上路由 key,讓消費者隊列來根據 key 接收消息 $channel->basic_publish ( $msg , "direct_logs", "warning" );
消費者
$channel->exchange_declare("direct_logs","direct",false,false,false); //系統創建一個臨時隊列 list($queue_name, ,) = $channel->queue_declare("", false, false, true, false); //綁定臨時隊列到交換機上,并指定消息的路由 key $channel->queue_bind($queue_name, "direct_logs", "error"); $channel->queue_bind($queue_name, "direct_logs", "warning");
主題交換機 topic
直連交換機中路由 key 匹配模式
(星號*) 用來表示一個單詞.
(井號#) 用來表示任意數量(零個或多個)單詞。
Q1會接收到 a.orange.b 等key 值中間為 orange 的消息
Q2會接收到 a.b.rabbit, lazy.a, lazy.a.b.c 等消息
當 * (星號) 和 # (井號) 這兩個特殊字符都未在綁定鍵中出現的時候,此時主題交換機就擁有的直連交換機的行為。
生產者,指定路由 key
$channel->exchange_declare("topic_logs","topic",false,false,false); //創建一個消息 $msg = new AMQPMessage( time() ); //把消息推動到direct_logs交換機,并給消息加上路由 key,讓消費者隊列來根據 key 接收消息 $channel->basic_publish ( $msg , "topic_logs", "baidu.warning" );
消費者1
$channel->exchange_declare("topic_logs","topic",false,false,false); list($queue_name, ,) = $channel->queue_declare("", false, false, true, false); $channel->queue_bind($queue_name, "topic_logs", "baidu.#");
消費者2
$channel->exchange_declare("topic_logs","topic",false,false,false); list($queue_name, ,) = $channel->queue_declare("", false, false, true, false); $channel->queue_bind($queue_name, "topic_logs", "ali.#");參考
https://www.rabbitmq.com/tutorials/tutorial-one-php.html
http://rabbitmq-into-chinese.readthedocs.org/zh_CN/latest/
文章版權歸作者所有,未經允許請勿轉載,若此文章存在違規行為,您可以聯系管理員刪除。
轉載請注明本文地址:http://specialneedsforspecialkids.com/yun/21527.html
摘要:第一步安裝因為是語言編寫的,所以我們首先需要安裝第二步安裝官網提供的安裝方式本人安裝成功的方式第三步查看是否已經安裝好了,能查到說明已經安裝完成了。 第一步:安裝Erlang 因為rabbitMQ是Erlang語言編寫的,所以我們首先需要安裝Erlang rpm -Uvh http://www.rabbitmq.com/releases/erlang/erlang-18.1-1.el...
摘要:第一步安裝因為是語言編寫的,所以我們首先需要安裝第二步安裝官網提供的安裝方式本人安裝成功的方式第三步查看是否已經安裝好了,能查到說明已經安裝完成了。 第一步:安裝Erlang 因為rabbitMQ是Erlang語言編寫的,所以我們首先需要安裝Erlang rpm -Uvh http://www.rabbitmq.com/releases/erlang/erlang-18.1-1.el...
摘要:參考文檔依賴包安裝環境配置環境變量增加內容保存退出,并刷新變量測試是否安裝成功安裝完成以后,執行看是否能打開,用退出,注意后面的點號,那是的結束符。 參考文檔:http://www.cnblogs.com/phpinfo/p/4104551...http://blog.csdn.net/historyasamirror/ar... 依賴包安裝 yum install ncurses-d...
摘要:在中間的框是一個隊列的消息緩沖區,保持代表的消費。本教程介紹,這是一個開放的通用的協議消息。我們將在本教程中使用,解決依賴管理。發送者將連接到,發送一條消息,然后退出。注意,這與發送發布的隊列匹配。 介紹 RabbitMQ是一個消息代理器:它接受和轉發消息。你可以把它當作一個郵局:當你把郵件放在信箱里時,你可以肯定郵差先生最終會把郵件送到你的收件人那里。在這個比喻中,RabbitMQ就...
閱讀 1391·2019-08-30 12:54
閱讀 1877·2019-08-30 11:16
閱讀 1620·2019-08-30 10:50
閱讀 2454·2019-08-29 16:17
閱讀 1273·2019-08-26 12:17
閱讀 1385·2019-08-26 10:15
閱讀 2393·2019-08-23 18:38
閱讀 791·2019-08-23 17:50