国产xxxx99真实实拍_久久不雅视频_高清韩国a级特黄毛片_嗯老师别我我受不了了小说

資訊專(zhuān)欄INFORMATION COLUMN

【譯】RabbitMQ系列(二)-Work模式

lcodecorex / 3143人閱讀

摘要:每個(gè)消費(fèi)者會(huì)得到平均數(shù)量的。為了確保不會(huì)丟失,采用確認(rèn)機(jī)制。如果中斷退出了關(guān)閉了,關(guān)閉了,或是連接丟失了而沒(méi)有發(fā)送,會(huì)認(rèn)為該消息沒(méi)有完整的執(zhí)行,會(huì)將該消息重新入隊(duì)。該消息會(huì)被發(fā)送給其他的。當(dāng)消費(fèi)者中斷退出,會(huì)重新分派。

Work模式

原文地址

在第一章中,我們寫(xiě)了通過(guò)一個(gè)queue來(lái)發(fā)送和接收message的簡(jiǎn)單程序。在這一章中,我們會(huì)創(chuàng)建一個(gè)workqueue,來(lái)將執(zhí)行時(shí)間敏感的任務(wù)分發(fā)到多個(gè)worker中。

work模式主要的意圖是要避免等待完成一個(gè)耗時(shí)的任務(wù)。取而代之地,我們延遲任務(wù)的執(zhí)行,將任務(wù)封裝成消息,將之發(fā)送到queue。一個(gè)運(yùn)行著的worker進(jìn)程會(huì)彈出這個(gè)任務(wù)并執(zhí)行它。當(dāng)運(yùn)行多個(gè)worker進(jìn)程時(shí),任務(wù)會(huì)在它們之間分派。

這種模式在web應(yīng)用中特別有用,因?yàn)樵谝粋€(gè)較短的HTTP請(qǐng)求窗口中不會(huì)去執(zhí)行一個(gè)復(fù)雜的任務(wù)。

準(zhǔn)備工作

在上一章中,我們發(fā)送了一個(gè)”Hello World!"的message。現(xiàn)在我們將發(fā)送一個(gè)代表了復(fù)雜任務(wù)的字符串。這不是一個(gè)實(shí)際的任務(wù),比如像調(diào)整圖片大小或是重新渲染pdf文檔,我們通Thead.sleep() 來(lái)模擬一個(gè)耗時(shí)的任務(wù)。message中的小圓點(diǎn)表示其復(fù)雜度,圓點(diǎn)越多則任務(wù)的執(zhí)行越耗時(shí)。比如“Hello..."的message將耗時(shí)3秒。

我們簡(jiǎn)單的修改上一章的Send.java代碼,允許在命令行發(fā)送任意message。新的類(lèi)叫做NewTask.java

String message = String.join(" ", argv);
channel.basicPublish("", "hello", null, message.getBytes());
System.out.println(" [x] Sent "" + message + """);

同樣的,我們修改上一章中的Recv.java,讓它在處理message的時(shí)候根據(jù)小圓點(diǎn)進(jìn)行睡眠。新的類(lèi)叫Worker.java

DeliverCallback deliverCallback = (consumerTag, delivery) -> {
  String message = new String(delivery.getBody(), "UTF-8");
  System.out.println(" [x] Received "" + message + """);
  try {
    doWork(message);
  } finally {
    System.out.println(" [x] Done");
  }
};
boolean autoAck = true; // acknowledgment is covered below
channel.basicConsume(TASK_QUEUE_NAME, autoAck, deliverCallback, consumerTag -> { });
private static void doWork(String task) throws InterruptedException {
    for (char ch: task.toCharArray()) {
        if (ch == ".") Thread.sleep(1000);
    }
}

像在第一章一樣編譯這兩個(gè)類(lèi)

javac -cp $CP NewTask.java Worker.java
Round-robin分派

使用Task模式的一個(gè)明顯的優(yōu)勢(shì)是讓并行執(zhí)行任務(wù)變得簡(jiǎn)單。我們只需要啟動(dòng)更多的worker就可以消減堆積的message,系統(tǒng)水平擴(kuò)展簡(jiǎn)單。

首先,我們?cè)谕粫r(shí)間啟動(dòng)兩個(gè)worker。他們都會(huì)從queue獲得message,來(lái)看一下具體細(xì)節(jié)。

打開(kāi)了三個(gè)終端,兩個(gè)是跑worker的。

java -cp $CP Worker
java -cp $CP Worker

第三個(gè)終端里來(lái)發(fā)布新的任務(wù)message。

java -cp $CP NewTask First message.
java -cp $CP NewTask Second message..
java -cp $CP NewTask Third message...
java -cp $CP NewTask Fourth message....
java -cp $CP NewTask Fifth message.....

讓我們看看worker的處理message的情況.第一個(gè)worker收到了第1,3,5message,第二個(gè)worker收到了第2,4個(gè)message。

默認(rèn)情況下,RabbitMQ會(huì)順序的將message發(fā)給下一個(gè)消費(fèi)者。每個(gè)消費(fèi)者會(huì)得到平均數(shù)量的message。這種方式稱(chēng)之為round-robin(輪詢(xún)).

Message 確認(rèn)

執(zhí)行任務(wù)需要一定的時(shí)間。你可能會(huì)好奇如果一個(gè)worker開(kāi)始執(zhí)行任務(wù),但是中途異常退出,會(huì)是什么結(jié)果。在我們現(xiàn)在的代碼中,一旦RabbitMQ將消息發(fā)送出去了,它會(huì)立即將該message刪除。這樣的話(huà),就可能丟失message。

在實(shí)際場(chǎng)景中,我們不想丟失任何一個(gè)task。如果一個(gè)worker異常中斷了,我們希望這個(gè)task能分派給另一個(gè)worker。

為了確保不會(huì)丟失message,RabbitMQ采用message確認(rèn)機(jī)制。RabbitMQ只有收到該message的Ack之后,才會(huì)刪除該消息。

如果worker中斷退出了( channel關(guān)閉了,connection關(guān)閉了,或是TCP連接丟失了)而沒(méi)有發(fā)送Ack,RabbitMQ會(huì)認(rèn)為該消息沒(méi)有完整的執(zhí)行,會(huì)將該消息重新入隊(duì)。該消息會(huì)被發(fā)送給其他的worker。這樣就不用message丟失,即使是在worker經(jīng)常異常中斷退出的場(chǎng)景下。

不會(huì)有任何message會(huì)timeout。當(dāng)消費(fèi)者中斷退出,RabbitMQ會(huì)重新分派message。即使消息的執(zhí)行會(huì)花費(fèi)很長(zhǎng)的時(shí)間。

默認(rèn)情況下,message是需要人工確認(rèn)的。在上面的例子中,我們通過(guò)autoAck=true來(lái)關(guān)閉了人工確認(rèn)。像下面這樣,我們將該標(biāo)志設(shè)置為false,worker就需要在完成了任務(wù)之后,發(fā)送確認(rèn)。

channel.basicQos(1); // accept only one unack-ed message at a time (see below)

DeliverCallback deliverCallback = (consumerTag, delivery) -> {
  String message = new String(delivery.getBody(), "UTF-8");

  System.out.println(" [x] Received "" + message + """);
  try {
    doWork(message);
  } finally {
    System.out.println(" [x] Done");
    channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
  }
};
boolean autoAck = false;
channel.basicConsume(TASK_QUEUE_NAME, autoAck, deliverCallback, consumerTag -> { });

上面的代碼保證即使當(dāng)worker還在處理一條消息,而強(qiáng)制它退出,也不會(huì)丟失message。然后不久,所有未被確認(rèn)的消息都會(huì)被重新分派。

發(fā)送確認(rèn)必須和接收相同的channel。使用不同的channel進(jìn)行確認(rèn)會(huì)導(dǎo)致channel-level protocol 異常。

忘記確認(rèn)消息是一個(gè)比較常見(jiàn)的錯(cuò)誤,但是其后果是很?chē)?yán)重的。當(dāng)client退出時(shí),message會(huì)被重新分派,但是RabbitMQ會(huì)占用越來(lái)越多的內(nèi)存,因它無(wú)法釋放那些未被確認(rèn)的message。
可以通過(guò)rabbitmqctl來(lái)打印messages_unacknowledged:
##linux
sudo rabbitmqctl list_queues name messages_ready messages_unacknowledged 
##windows
rabbitmqctl.bat list_queues name messages_ready messages_unacknowledged
Message 持久化

我們學(xué)習(xí)了在消費(fèi)者出現(xiàn)問(wèn)題的時(shí)候不丟失message。但是如果RabbitMQ服務(wù)器宕機(jī)了,我們還是會(huì)丟失message。

當(dāng)RabbitMQ宕機(jī)時(shí),默認(rèn)情況下,它會(huì)”忘記“所有的queue和message。為了確保message不丟失,我們需要確認(rèn)兩件事情:我們要使得queue和message都是持久的。

首先,我們要確保RabbitMQ不會(huì)丟失我們?cè)O(shè)置好的queue。所以,我們要把它聲明成持久的:

boolean durable = true;
channel.queueDeclare("hello", durable, false, false, null);

雖然代碼沒(méi)有任何問(wèn)題,但是光這樣是無(wú)效的。因?yàn)槲覀冎耙呀?jīng)定義過(guò)名字為hello的queue。RabbitMQ不允許你使用不同的參數(shù)去重新定義一個(gè)已經(jīng)存在的queue,而且這還不會(huì)反悔任何錯(cuò)誤信息。但是我們還是有別的方法,讓我們使用一個(gè)別的名字,比如task_queue:

boolean durable = true;
channel.queueDeclare("task_queue", durable, false, false, null);

聲明queue的改變要在生產(chǎn)者和消費(fèi)者的代碼里都進(jìn)行修改。

接著我們要設(shè)置message的持久性,我們通過(guò)設(shè)置MessageProperties為PERSISTENT_TEXT_PLAIN:

import com.rabbitmq.client.MessageProperties;
channel.basicPublish("", "task_queue",
MessageProperties.PERSISTENT_TEXT_PLAIN,message.getBytes());
將message標(biāo)記成持久的不能100%保證message不會(huì)丟失,雖然這告訴RabbitMQ將message保存到磁盤(pán),然而在RabbitMQ從接到message到保存之間,仍然有一小段時(shí)間。同時(shí)RabbitMQ不會(huì)給每一條message執(zhí)行fsync(2) -- 可能只是保存到了cache而沒(méi)有寫(xiě)到磁盤(pán)上去。所以持久的保證也不是非常強(qiáng),然后對(duì)我們簡(jiǎn)單的task queue來(lái)說(shuō)則足夠了。如果需要一個(gè)非常強(qiáng)的保證,則可以使用發(fā)布確認(rèn)的方式。
Fair 分派

你可能已經(jīng)注意到分派的工作沒(méi)有如我們所期望的來(lái)執(zhí)行。比如在有2個(gè)worker的情況系,所有偶數(shù)的message耗時(shí)很長(zhǎng),而所有奇數(shù)的message則耗時(shí)很短,這樣其中一個(gè)worker則一直被分派到偶數(shù)的message,而另一個(gè)則一直是奇數(shù)的message。RabbitMQ對(duì)此并不知曉,進(jìn)而繼續(xù)這樣分派著message。

這樣的原因是RabbitMQ是在message入queue的時(shí)候確定分派的。它不關(guān)心消費(fèi)者ack的情況。

我們可以通過(guò)basicQos方法和prefetchCount(1)來(lái)解決這個(gè)問(wèn)題。這個(gè)設(shè)置是讓RabbitMQ給worker一次一個(gè)message。或者這么說(shuō),直到worker處理完之前的message并發(fā)送ack,才給worker下一個(gè)message。否則,Rabbitmq會(huì)將message發(fā)送給其它不忙的worker。

int prefetchCount = 1;
channel.basicQos(prefetchCount);
注意queue的大小。如果所有的worker都處于忙碌狀態(tài),queue可能會(huì)被裝滿(mǎn)。必須監(jiān)控queue深度,可能要開(kāi)啟更多的worker,或者采取其他的措施。
開(kāi)始執(zhí)行

NewTask.java的最終版本

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.MessageProperties;

public class NewTask {

  private static final String TASK_QUEUE_NAME = "task_queue";

  public static void main(String[] argv) throws Exception {
    ConnectionFactory factory = new ConnectionFactory();
    factory.setHost("localhost");
    try (Connection connection = factory.newConnection();
         Channel channel = connection.createChannel()) {
        channel.queueDeclare(TASK_QUEUE_NAME, true, false, false, null);

        String message = String.join(" ", argv);

        channel.basicPublish("", TASK_QUEUE_NAME,
                MessageProperties.PERSISTENT_TEXT_PLAIN,
                message.getBytes("UTF-8"));
        System.out.println(" [x] Sent "" + message + """);
    }
  }

}

Worker.java的最終版本

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.DeliverCallback;

public class Worker {

  private static final String TASK_QUEUE_NAME = "task_queue";

  public static void main(String[] argv) throws Exception {
    ConnectionFactory factory = new ConnectionFactory();
    factory.setHost("localhost");
    final Connection connection = factory.newConnection();
    final Channel channel = connection.createChannel();

    channel.queueDeclare(TASK_QUEUE_NAME, true, false, false, null);
    System.out.println(" [*] Waiting for messages. To exit press CTRL+C");

    channel.basicQos(1);

    DeliverCallback deliverCallback = (consumerTag, delivery) -> {
        String message = new String(delivery.getBody(), "UTF-8");

        System.out.println(" [x] Received "" + message + """);
        try {
            doWork(message);
        } finally {
            System.out.println(" [x] Done");
            channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
        }
    };
    channel.basicConsume(TASK_QUEUE_NAME, false, deliverCallback, consumerTag -> { });
  }

  private static void doWork(String task) {
    for (char ch : task.toCharArray()) {
        if (ch == ".") {
            try {
                Thread.sleep(1000);
            } catch (InterruptedException _ignored) {
                Thread.currentThread().interrupt();
            }
        }
    }
  }
}

使用message ack和prefetchCount,來(lái)設(shè)定work queue。持久化選項(xiàng)則在RabbitMQ重啟后能讓任務(wù)得以恢復(fù)。

文章版權(quán)歸作者所有,未經(jīng)允許請(qǐng)勿轉(zhuǎn)載,若此文章存在違規(guī)行為,您可以聯(lián)系管理員刪除。

轉(zhuǎn)載請(qǐng)注明本文地址:http://specialneedsforspecialkids.com/yun/73901.html

相關(guān)文章

  • RabbitMQ系列(三) - 發(fā)布/訂閱模式

    摘要:發(fā)布訂閱模式在之前的文章里,創(chuàng)建了。我們稱(chēng)之為發(fā)布訂閱模式。其實(shí)我們是用到了默認(rèn)的,用空字符串來(lái)標(biāo)識(shí)。空字符串代表了沒(méi)有名字的被路由到了由指定名字的。和這種關(guān)系的建立我們稱(chēng)之為從現(xiàn)在開(kāi)始這個(gè)就會(huì)將推向我們的隊(duì)列了。 發(fā)布訂閱模式 在之前的文章里,創(chuàng)建了work queue。work queue中,每一個(gè)task都會(huì)派發(fā)給一個(gè)worker。在本章中,我們會(huì)完成完全不一樣的事情 - 我們會(huì)...

    WrBug 評(píng)論0 收藏0
  • RabbitMQ系列(六)-RPC模式

    摘要:如果涉及返回值,就要用到本章提到的了。方法發(fā)送請(qǐng)求,并阻塞知道結(jié)果返回。當(dāng)有消息時(shí),進(jìn)行計(jì)算并通過(guò)指定的發(fā)送給客戶(hù)端。當(dāng)接收到,則檢查。如果和之前的匹配,則將消息返回給應(yīng)用進(jìn)行處理。 RPC模式 在第二章中我們學(xué)習(xí)了如何使用Work模式在多個(gè)worker之間派發(fā)時(shí)間敏感的任務(wù)。這種情況是不涉及到返回值的,worker執(zhí)行任務(wù)就好。如果涉及返回值,就要用到本章提到的RPC(Remote ...

    894974231 評(píng)論0 收藏0
  • [] RabbitMQ tutorials (2) ---- 'work queue&#

    摘要:這樣的消息分發(fā)機(jī)制稱(chēng)作輪詢(xún)。在進(jìn)程掛了之后,所有的未被確認(rèn)的消息會(huì)被重新分發(fā)。忘記確認(rèn)這是一個(gè)普遍的錯(cuò)誤,丟失。為了使消息不會(huì)丟失,兩件事情需要確保,我們需要持久化隊(duì)列和消息。 工作隊(duì)列 showImg(https://segmentfault.com/img/remote/1460000008229494?w=332&h=111); 在第一篇中,我們寫(xiě)了一個(gè)程序從已經(jīng)聲明的隊(duì)列中收發(fā)...

    joyvw 評(píng)論0 收藏0
  • RabbitMQ系列(五) - 主題模式

    摘要:主題模式在上一章我們改進(jìn)了我們的日志系統(tǒng),如果使用我們只能簡(jiǎn)單進(jìn)行廣播,而使用則允許消費(fèi)者可以進(jìn)行一定程度的選擇。為的會(huì)同時(shí)發(fā)布到這兩個(gè)。當(dāng)為時(shí),會(huì)接收所有的。當(dāng)中沒(méi)有使用通配符和時(shí),的行為和一致。 主題模式 在上一章我們改進(jìn)了我們的日志系統(tǒng),如果使用fanout我們只能簡(jiǎn)單進(jìn)行廣播,而使用direct則允許消費(fèi)者可以進(jìn)行一定程度的選擇。但是direct還是有其局限性,其路由不支持多個(gè)...

    pingan8787 評(píng)論0 收藏0
  • RabbitMQ系列(四) - 路由模式

    摘要:路由模式在之前的文章中我們建立了一個(gè)簡(jiǎn)單的日志系統(tǒng)。更形象的表示,如對(duì)中的感興趣。為了進(jìn)行說(shuō)明,像下圖這么來(lái)設(shè)置如圖,可以看到有兩個(gè)綁到了類(lèi)型為的上。如圖的設(shè)置中,一個(gè)為的就會(huì)同時(shí)發(fā)送到和。接收程序可以選擇要接收日志的嚴(yán)重性級(jí)別。 路由模式 在之前的文章中我們建立了一個(gè)簡(jiǎn)單的日志系統(tǒng)。我們可以通過(guò)這個(gè)系統(tǒng)將日志message廣播給很多接收者。 在本篇文章中,我們?cè)谶@之上,添加一個(gè)新的功...

    liuchengxu 評(píng)論0 收藏0

發(fā)表評(píng)論

0條評(píng)論

最新活動(dòng)
閱讀需要支付1元查看
<