国产xxxx99真实实拍_久久不雅视频_高清韩国a级特黄毛片_嗯老师别我我受不了了小说

資訊專欄INFORMATION COLUMN

RabbitMQ 總結

Mr_houzi / 1911人閱讀

摘要:消息確認為,會等待的顯式確認。在消息發送到之后會立刻路由到中,因此未持久化的在重啟后會丟失元數據以及綁定,對和消息的持久化無影響。指定如果一個或者有多個的情況下,只有最大的那個才會生效。要求集群中至少要有一個磁盤節點,儲存了所有的元數據。

Connection & Channel

Connection 代表一個 TCP 連接,Channel 是建立在 Connection 上的虛擬連接。RabbitMQ 每條指令都是通過 Channel 完成的。

對于 OS 而言,創建和銷毀 TCP 連接的代價非常高,在高峰期很容易遇到瓶頸。程序中一般會有多個線程需要與 RabbitMQ建立通信,消費或生產消息,通過 TCP 連接復用來減少性能開銷。

Connection 可以創建多個 Channel ,但是 Channel 不是線程安全的所以不能在線程間共享。

Connection 在創建時可以傳入一個 ExecutorService ,這個線程池時給該 Connection 上的 Consumer 用的。

Channel.isOpen 以及 Connection.isOpen 方法是同步的,因此如果在發送消息時頻繁調用會產生競爭。我們可以認為在 createChannel 方法后 Channel 以及處于開啟狀態。若在使用過程中 Channel 關閉了,那么只要捕獲拋出的 ShutDownSignalException 就可以了,同時建議捕獲 IOException 以及 SocketException 防止連接意外關閉。

Exchange & Queue

消費者和生產者都可以聲明一個已經存在的 Exchange 或者 Queue ,前提是參數完全匹配現有的 Exchange 或者 Queue,否則會拋出異常。

QueueDeclare 參數:
exclusive: 排他隊列,只有同一個 ConnectionChannel 可以訪問,且在 Connection 關閉或者客戶端退出后自動刪除,即使 durabletrue

queuePurge(String queue):清空隊列

Exchange 可以綁定另一個 ExchangeexchangeBind(String destination, String source, String routeKey), 從 sourcedestination

若業務允許,則最好預先創建好 Exchange 以及 Queue 并進行綁定(rabbitmqadmin),防止 Exchange 沒有綁定 Queue 或 綁定錯誤的 Queue 而導致消息丟失(關鍵信息應當使用 mandatory 參數)。

Alternate Exchange: 在 Channel.exchangeDeclare 時添加 alternate-exchange 參數或在 Policy 中聲明。mandatorytrue 時,未被路由的消息會被發送到 Alternate Exchange 。建議 Exchange Type 設置為 fanout ,否則當 RoutingKey 依然不匹配就會被返回 Producer。

P.S. 有些書上講備份交換器和 mandatory 參數一起使用 mandatory 參數失效是錯的,當 RoutingKey 不匹配 Alternate Exchange 依然會被返回 Producer 。
(rabbitmq v3.7 測試)
Map arg = new HashMap() {{
    put("alternate-exchange", "alt");
}};
channel.exchangeDeclare("normalExchange", "direct", true, false, arg);
channel.exchangeDeclare("alt", "fanout", true, false, null);
channel.queueDeclare("normalQueue", true, false, false, null);
channel.queueDeclare("notSend", true, false, false, null);
channel.queueBind("normalQueue", "normalExchange", "key");
channel.queueBind("notSend", "alt", "");
Publish & Consume Publish Confirm

消息發送到服務器后可能還沒來的及刷到磁盤中,服務器就掛掉,從而造成消息丟失。 Publish Confirm 能夠在消息確實到達服務器(開啟持久化的消息會在刷入磁盤之后)之后返回一個確認給 Publisher。

通過 channel.confirmSelectedChannel 設置為 Confirm 模式,并為 Channel 添加一個 ConfirmLister 來監聽返回的確認。

SortedSet unconfirmedSet = new TreeSet<>();
channel.confirmSelect();
channel.addConfirmListener((deliveryTag, multiple) -> {

    System.out.println("handleAck: " + deliveryTag + " " + multiple);
    if (multiple) {
        unconfirmedSet.headSet(deliveryTag - 1).clear();
    } else {
        unconfirmedSet.remove(deliveryTag);
    }
}, (deliveryTag, multiple) -> {

    System.out.println("handleNack: " + deliveryTag + " " + multiple);
    if (multiple) {
        unconfirmedSet.headSet(deliveryTag - 1).clear();
    } else {
        unconfirmedSet.remove(deliveryTag);
    }
});
while (true) {
    long seq = channel.getNextPublishSeqNo();
    channel.basicPublish("normalExchange", "key", true, null, message.getBytes(StandardCharsets.UTF_8));
    unconfirmedSet.add(seq);
    Thread.sleep(1000);
}

除了異步處理的方式之外還有批量確認以及事務的方法。批量確認的速度在大量連續發送的情況下和異步的方法差不多。不管怎樣這兩種消息確認的方法都要比事務的方式快7倍左右。

Consumer

一般應當實現 Consumer 接口或者繼承 DefaultConsumer ,Consumer 通過 consumerTag 來進行區分。

消費消息有兩種方式,一種是 Push ,一種是 Get。

Push 是由 RabbitMQ 以輪詢的方式將消息推送到 Consumer ,方法為 basicConsume 。一般一個 Channel 對應一個 Consumer 。

Get 由客戶端主動從 RabbitMQ 拉取一條消息,方法為 basicGet 。__不能循環執行 basicGet 來代替 basicConsume ,不然會嚴重影響性能。__

消息確認:autoAckfalseRabbitMQ 會等待 basicAck 的顯式確認。除非 Consumer 連接斷開否則一直等待確認。當 Consumer 顯式調用 basicReject 或者 basicNack 并將 requeue 設為 true 后會將消息重新入隊投遞。一般我們在業務處理完之后再 ack .
mandatory : 當 Exchange 無法匹配 QueueExchange 時,mandatorytrue 的消息會被返回給 Producer,否則會被丟棄。 通過 Channel.addReturnListener 來添加 ReturnListener 監視器。

TTL

queueDeclare 時添加 x-message-ttl 參數,單位毫秒。

Map arg = new HashMap() {{
    put("x-message-ttl", "1000000");
}};
channel.exchangeDeclare("normalExchange", "direct", true, false, arg);

使用 AMQP.BasicProperties.Builder 創建 AMQP.BasicProperties 并設置 expiration 參數。

AMQP.BasicProperties.Builder builder = new AMQP.BasicProperties.Builder();
builder.expiration("100000");
channel.basicPublish("normalExchange", "key", true, builder.build(), message.getBytes(StandardCharsets.UTF_8));

Dead Letter Exchange (DLX)

Dead Letter(死信):

Basic.Reject / Basic.Nack 并且 requeuetrue

消息 TTL 過期

隊列達到最大長度

當消息成為 Dead Letter 之后, RabbitMQ 會自動把這個消息發到 DLX 上。

// 當發送到 normalQueue 中的消息成為 Dead Letter 之后會自動以
// dead-letter 為 routingKey 發送到 dlxQueue Exchange
Map arg = new HashMap() {{
    put("x-dead-letter-exchange", "dlx");
    put("x-dead-letter-routing-key", "dead-letter");
}};
channel.queueDeclare("normalQueue", true, false, false, arg);
channel.exchangeDeclare("dlx", "direct", true, false, false, null);
channel.queueDeclare("dlxQueue", true, false, false, null);

DLX 其他用法:延遲隊列,消息 發送到一個暫存的、沒有 ConsumerQueue 并設置 TTL,Consumer 消費 DLX 綁定的 Queue 的消息,建議給暫存的 Queue 設置一個最大的 TTL,防止消息沒有設置 TTL 而一直堆積在 Queue 中。

Priority

消息的消費可以有優先級,Queue 的最大優先級可以通過 x-max-priority 進行設置。

Map arg = new HashMap() {{
    put("x-max-priority", 5);
}};
channel.queueDeclare("normalQueue", true, false, false, arg);
channel.exchangeDeclare("normalExchange", "direct", true, false, null);

AMQP.BasicProperties.Builder builder = new AMQP.BasicProperties.Builder();
builder.priority(2);
channel.basicPublish("normalExchange", "key", true, builder.build(), messsage.getBytes(StandardCharsets.UTF_8));
Durability

Exchange , Queue , 消息都可以進行持久化。在消息發送到 Exchange 之后會立刻路由到 Queue 中,因此未持久化的 Exchange 在重啟后會丟失 Exchange 元數據以及綁定,對 Queue 和消息的持久化無影響。

未持久化的 Queue 在重啟后會丟失,包括 Queue 中的消息,不管消息是否設置了持久化。

未持久化的消息在重啟后會丟失,即使所在的 Queue 已持久化。

channel.queueDeclare("normalQueue", true, false, false, null); // Queue 持久化
channel.exchangeDeclare("normalExchange", "direct", true, false, null); // Exchange 持久化
channel.queueBind("normalQueue", "normalExchange", "key");
AMQP.BasicProperties.Builder builder = new AMQP.BasicProperties.Builder();
builder.deliveryMode(2);    // 消息持久化
channel.basicPublish("normalExchange", "key", true, builder.build(), messsage.getBytes(StandardCharsets.UTF_8));
Qos

Qos 的作用時負載均衡。當一個隊列有兩個 Consumer ,一個性能很好 A,另一個不那么好 B,RabbitMQ 會輪詢,將消息平均地分給這兩個 Consumer??梢?B 上的堆積的消息會越來越多,而 A 上的線程可能會空閑。 Qos 的作用就是防止一個 Consumer 堆積了過多的消息,把這些消息分給其他 Consumer。

global 參數:

channel.basicQos(3, false);  // each Consumer limit 3
channel.basicQos(5, true);   // this channel limit to 5

global 參數會讓 RabbitMQ 調用更多資源,盡量不要設置(默認值為 false)。

Relibility

RabbitMQ 支持最少一次和最多一次。
最少一次:

- 啟用 Publisher Confirm 或者 事務保證消息能夠到達服務器。
- 啟用 mandatory 參數保證消息不回被 Exchange 丟掉。
- 消息和 Queue 開啟持久化。
- Consumer autoAck off, 并確保消息在處理完之后再 ack
Policy

Policy 可以很方便的批量設置 Exchange 以及 Queue 的屬性,但是 Policy 的優先級較低,請注意。

Policy 可以通過 HTTP API, web console,以及 cli 的方式。

rabbitmqctl set_policy [-p vhost] [--priority prirority] [--apply-to apply-to] {name} {pattern} {defination}

vhost : 指定 vhost

proiority : 如果一個 Queue 或者 Exchange 有多個 Policy 的情況下,只有 priority 最大的那個 Policy 才會生效。

apply-to : 應用到

Exchange and Queue

Exchange

Queue

name : Policy 的名字

pattern : Exchange 或者 Queue 名字的正則表達式

defination : 屬性值,可以通過 management > Admin > Policies 的查看。

Cluster

RabbitMQ 會把所有的元數據存儲到所有的節點上,但是隊列是分散在集群中所有的節點上的。

Build A Cluster with docker

我們嘗試使用 Docker Compose 創建一個由 3 個服務組成的集群

version: "3"
services:
  node1:
    image: rabbitmq:3.7-management-alpine
    container_name: node1
    hostname: node1
    environment:
      RABBITMQ_ERLANG_COOKIE: secret_cookie_here
    ports:
      - "5673:5672"
      - "15673:15672"
  node2:
    image: rabbitmq:3.7-management-alpine
    container_name: node2
    hostname: node2
    environment:
      RABBITMQ_ERLANG_COOKIE: secret_cookie_here
    ports:
      - "5674:5672"
      - "15674:15672"
  node3:
    image: rabbitmq:3.7-management-alpine
    container_name: node3
    hostname: node3
    environment:
      RABBITMQ_ERLANG_COOKIE: secret_cookie_here
    ports:
      - "5675:5672"
      - "15675:15672"

通過設置 hostname ,容器內部的 rabbitmq 的 nodename 就變成類似 rabbitmq@node1。同時集群中的 RabbitMQ 需要相同的 RABBITMQ_ERLANG_COOKIE 來進行互相認證。

啟動服務:

docker-compose up -d

然后將 node2 , node3 加入 node1 ,注意,加入集群之前 RabbitMQ 必須停止:

# 停止 rabbitmq
docker-compose exec node2 rabbitmqctl stop_app
docker-compose exec node3 rabbitmqctl stop_app
# 加入 node1
docker-compose exec node2 rabbitmqctl join_cluster rabbitmq@node1
docker-compose exec node3 rabbitmqctl join_cluster rabbitmq@node1
# 重新啟動 
docker-compose exec node2 rabbitmqctl start_app
docker-compose exec node3 rabbitmqctl start_app

在任意一個節點上查詢集群狀態:

docker-compose exec node2 rabbitmqctl cluster_status

可以看到如下狀態:

Cluster status of node rabbit@node2 ...
[{nodes,[{disc,[rabbit@node1,rabbit@node2,rabbit@node3]}]},
 {running_nodes,[rabbit@node3,rabbit@node1,rabbit@node2]},
 {cluster_name,<<"rabbit@node2">>},
 {partitions,[]},
 {alarms,[{rabbit@node3,[]},{rabbit@node1,[]},{rabbit@node2,[]}]}]
手動下線節點

將節點從在線狀態下線, 首先停止節點,然后重置節點。

docker-compose exec node2 rabbitmqctl stop_app
docker-compose exec node2 rabbitmqctl reset
docker-compose exec node2 rabbitmqctl stop_app

在重新啟動服務器之后可以發現該節點已經脫離了集群。

Cluster status of node rabbit@node2 ...
[{nodes,[{disc,[rabbit@node2]}]},
 {running_nodes,[rabbit@node2]},
 {cluster_name,<<"rabbit@node2">>},
 {partitions,[]},
 {alarms,[{rabbit@node2,[]}]}]
節點類型

RabbitMQ 的節點類型有兩種,一種是 disc , 第二種是 ram。 RabbitMQ 要求集群中至少要有一個磁盤節點,儲存了所有的元數據。當集群中的唯一一個磁盤節點崩潰后,集群可以繼續收發消息,但是不能創建隊列等操作。

RabbitMQ 在加入集群時默認為磁盤模式,如果要以內存模式加入:

docker-compose exec node2 rabbitmqctl join_cluster rabbit@node1 --ram

更改節點類型:

docker-compose exec node 2 rabbitmqctl change cluster_node_type desc
Mirror Queue

RabbitMQ 提供了 Master/Slave 模式的 Mirror Queue 機制。請注意,開啟 Publisher Confirmed 或者事務的情況下,只有所有的 Slave 都 ACK 之后才會返回 ACK 給客戶端。

開啟 Mirror Queue 主要通過設置 Policy 其中最主要的是 defination

ha-mode: Mirror Queue 的模式

all : 默認的模式,表示在集群中的所有節點上進行鏡像

exactly : 在指定數量的節點上進行鏡像,數量由 ha-params 指定。

nodes : 在指定的節點上進行鏡像,節點名稱由 ha-params 指定。

ha-params : 如上所述

ha-sync-mode : 消息的同步模式

automatic : 當新的 Slave 加入集群之后會自動同步消息。

manual: 默認,當加入新的 Slave 之后不會自動把消息同步到新的 Slave 上。指導調用命令顯式同步。

ha-promote-on-shutdown:

when-synced: 默認,如果主動停止 master ,那么 slave 不會自動接管。也就是說會期望 master 會重啟啟動,這可以保證消息不會丟失。

always: 不管 master 是因為什么原因停止的,slave 會立刻接管,有可能有一部分數據沒有從 master 同步到 slave.

ha-promote-on-failure: 默認 always ,不推薦設置為 when-synced

文章版權歸作者所有,未經允許請勿轉載,若此文章存在違規行為,您可以聯系管理員刪除。

轉載請注明本文地址:http://specialneedsforspecialkids.com/yun/75346.html

相關文章

  • 慕課網_《RabbitMQ消息中間件極速入門與實戰》學習總結

    摘要:慕課網消息中間件極速入門與實戰學習總結時間年月日星期三說明本文部分內容均來自慕課網。 慕課網《RabbitMQ消息中間件極速入門與實戰》學習總結 時間:2018年09月05日星期三 說明:本文部分內容均來自慕課網。@慕課網:https://www.imooc.com 教學源碼:無 學習源碼:https://github.com/zccodere/s... 第一章:RabbitM...

    mykurisu 評論0 收藏0
  • RabbitMQ 基礎教程(1) - Hello World

    摘要:基礎教程注本文是對眾多博客的學習和總結,可能存在理解錯誤。請帶著懷疑的眼光,同時如果有錯誤希望能指出。安裝庫這里我們首先將消息推入隊列,然后消費者從隊列中去除消息進行消費。 RabbitMQ 基礎教程(1) - Hello World 注:本文是對眾多博客的學習和總結,可能存在理解錯誤。請帶著懷疑的眼光,同時如果有錯誤希望能指出。 如果你喜歡我的文章,可以關注我的私人博客:http:...

    wushuiyong 評論0 收藏0
  • SpringCloud(第 037 篇)通過bus/refresh半自動刷新ConfigClient

    摘要:添加應用啟動類通過半自動刷新配置。配置客戶端服務想要實現自動刷新配置的話,一端是不要做任何處理,只需要在一端處理即可。 SpringCloud(第 037 篇)通過bus/refresh半自動刷新ConfigClient配置 - 一、大致介紹 1、上章節我們講到了手動刷新配置,但是我們假設如果微服務一多的話,那么我們是不是需要對每臺服務進行手動刷新呢? 2、答案肯定是不需要的,我們也可...

    CloudwiseAPM 評論0 收藏0
  • 如何保證消息隊列的可靠性傳輸?

    摘要:消息丟失分成三種情況,可能出現生產者消費者。生產者丟失數據生產者丟失數據首先要確保寫入的消息別丟,消息隊列通過請求確認機制,保證消息的可靠傳輸。只有消息被持久化到磁盤以后,才會回傳消息。消息丟失分成三種情況,可能出現生產者、RabbitMQ、消費者。 生產者丟失數據 首先要確保寫入 RabbitMQ 的消息別丟,消息隊列通過請求確認機制,保證消息的可靠傳輸。生產開啟 comf...

    snifes 評論0 收藏0

發表評論

0條評論

最新活動
閱讀需要支付1元查看
<