国产xxxx99真实实拍_久久不雅视频_高清韩国a级特黄毛片_嗯老师别我我受不了了小说

資訊專欄INFORMATION COLUMN

白話RabbitMQ(二): 任務隊列

fnngj / 1754人閱讀

摘要:任務隊列最主要的功能就是解耦高耗時的操作,否則程序會一直等在那里,浪費大量資源。我們將任務封裝成一個消息發(fā)送給隊列,后臺的任務進程會得到這個任務并執(zhí)行它,而且可以配置多個任務進程,進一步加大吞吐率。為了確保消息不丟失,支持消息確認。

推廣
RabbitMQ專題講座

https://segmentfault.com/l/15...

CoolMQ開源項目

我們利用消息隊列實現(xiàn)了分布式事務的最終一致性解決方案,請大家圍觀。可以參考源碼:https://github.com/vvsuperman…,項目支持網站: http://rabbitmq.org.cn,最新文章或實現(xiàn)會更新在上面

前言

在第一篇中我們描述了如何最簡單的RabbitMQ操作,如何發(fā)送、接受消息。在今天這篇文章中我們將描述如何創(chuàng)建一個任務隊列,來將高耗時的任務分發(fā)到多個消費者,從而提高處理效率。

任務隊列最主要的功能就是解耦高耗時的操作,否則程序會一直等在那里,浪費大量資源。反之我們會把這個操作交給隊列,讓它延后再做。我們將任務封裝成一個消息發(fā)送給隊列,后臺的任務進程會得到這個任務并執(zhí)行它,而且可以配置多個任務進程,進一步加大吞吐率。

特別是對于網絡請求,一次短短的HTTP請求是要求迅速響應的,不可能讓它一直停頓在高耗時操作上。

準備工作

在第一章中我們發(fā)送了“Hello World!”。現(xiàn)在來完成更復雜一點的,因為這里并沒有真正的高耗時操作,比如縮放圖像或輸出一個pdf。因此我們只是用Thread.sleep()來假裝我們很繁忙,而且會用"."來表示需要停頓的秒數(shù),比如一個叫Hello...的任務將停頓3秒鐘。

我們簡單的更改下Send.java,稱之為 NewTask.java.

String message = getMessage(argv);

channel.basicPublish("", "hello", null, message.getBytes());
System.out.println(" [x] Sent "" + message + """);

然后是工具類

private static String getMessage(String[] strings){
    if (strings.length < 1)
        return "Hello World!";
    return joinStrings(strings, " ");
}

private static String joinStrings(String[] strings, String delimiter) {
    int length = strings.length;
    if (length == 0) return "";
    StringBuilder words = new StringBuilder(strings[0]);
    for (int i = 1; i < length; i++) {
        words.append(delimiter).append(strings[i]);
    }
    return words.toString();
}

當然,我們的Recv.java也需要進行一些改造,它需要對每一個"."停頓1秒,Work.java如下

final Consumer consumer = new DefaultConsumer(channel) {
  @Override
  public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
    String message = new String(body, "UTF-8");

    System.out.println(" [x] Received "" + message + """);
    try {
      doWork(message);
    } finally {
      System.out.println(" [x] Done");
    }
  }
};
boolean autoAck = true; // acknowledgment is covered below
channel.basicConsume(TASK_QUEUE_NAME, autoAck, consumer);

private static void doWork(String task) throws InterruptedException {
    for (char ch: task.toCharArray()) {
        if (ch == ".") Thread.sleep(1000);
    }
}    

編譯上面這些代碼

javac -cp $CP NewTask.java Worker.java
輪詢調度

任務隊列的一個最大優(yōu)點是可以并行工作,能夠非常容易的水平擴張。

首先,讓我們同時運行兩個工作線程,他們能夠同時從隊列獲取消息。我們也需要同時開啟3個console:1個生產者,2個消費者

消費者C1

# shell 1
java -cp $CP Worker
# => [*] Waiting for messages. To exit press CTRL+C

消費者C2

# shell 2
java -cp $CP Worker
# => [*] Waiting for messages. To exit press CTRL+C

讓我們運行生產者

# shell 3
java -cp $CP NewTask
# => First message.
java -cp $CP NewTask
# => Second message..
java -cp $CP NewTask
# => Third message...
java -cp $CP NewTask
# => Fourth message....
java -cp $CP NewTask
# => Fifth message.....    

讓我們看看消費者們
消費者C1

java -cp $CP Worker
# => [*] Waiting for messages. To exit press CTRL+C
# => [x] Received "First message."
# => [x] Received "Third message..."
# => [x] Received "Fifth message....."   

消費者C2

java -cp $CP Worker
# => [*] Waiting for messages. To exit press CTRL+C
# => [x] Received "Second message.."
# => [x] Received "Fourth message..   

RabbitMQ默認有序的將會發(fā)送消息給下一個消費者,所以每一個消費者都會得到相同數(shù)量的消息,這種方式就叫做輪詢調度(round-robin),你可以嘗試下更多的消費者

消息確認

一個任務可能非常耗時,如果消費者在做一個高耗時任務時掛掉了,我們將會丟失所有發(fā)送到這個消費者上的消息。這是非常不可取的,所以我們希望能夠明確的知道消息是否消費成功,如果一個消費掛了,我們能夠知道,并且將消息發(fā)送給下一個消費者。

為了確保消息不丟失,RabbitMQ支持消息確認。收到消息后消費者會給RabbitMQ服務器發(fā)送一個ack(我已經收到消息了),RabbitMQ就會在服務上刪除這個消息了。

如果一個消費者掛了(連接關閉,channel關閉,或者是TCP連接丟失)而沒有發(fā)送ack,RabbitMQ就會知道消息并沒有消費成功,于是乎消息會被放到消息隊列重新消費。如果此時還有其它消費者的話,消息會發(fā)送給其它消費者來消費,確保消息不會丟失

消息并沒有超時時間這個概念,消息只會在消費者掛掉了時候重發(fā),即使是一個非常非常耗時的的消費者也不會發(fā)生重發(fā)

手動消息確認(Manual message acknowledgments)默認是打開的,雖然我們之前關閉了它:autoAck=true。讓我們先將它設置為false

channel.basicQos(1); // accept only one unack-ed message at a time (see below)

final Consumer consumer = new DefaultConsumer(channel) {
  @Override
  public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
    String message = new String(body, "UTF-8");

    System.out.println(" [x] Received "" + message + """);
    try {
      doWork(message);
    } finally {
      System.out.println(" [x] Done");
      channel.basicAck(envelope.getDeliveryTag(), false);
    }
  }
};
boolean autoAck = false;
channel.basicConsume(TASK_QUEUE_NAME, autoAck, consumer);

這樣一來,即使你使用CTRL+C強制殺死了一個消費者,消費者所丟失的消息也將會被重發(fā),會被另一個消費者所接受并消費。

忘記應答

很容易犯忘記應答的錯誤,但會導致非常嚴重的后果。Messages會被重發(fā),RabbitMQ會消耗越來越多的內存因為unacked的消息無法釋放(甚至更嚴重,RabbitMQ內部維護了一個最大打開線程數(shù),如果太多的消息沒有應答,RabbitMQ甚至會整個崩潰掉)

你可以用Rabbitmqctl查看未被應答的消息數(shù)

sudo rabbitmqctl list_queues name messages_ready      
 messages_unacknowledged

windows下:

rabbitmqctl.bat list_queues name messages_ready     
messages_unacknowledged
消息持久化

我們現(xiàn)在知道了可以通過應答來保證消息不丟失,但萬一RabbitMQ掛了呢?還是可能會導致消息丟失。因此我們可以通過持久化的機制,包括將隊列以及隊列中的消息持久化的方式,來保證即便RabbitMQ掛了,當它重啟的時候,隊列以及消息也能夠恢復

首先做隊列的持久化,聲明隊列為durable

boolean durable = true;
channel.queueDeclare("hello", durable, false, false, null);

但很可惜的是,這種聲明方式并不適用與上面的方法,因為我們已經將“Hello”定義為一個非持久化的隊列了,是不能再將他改為持久化的,如果這樣做,將會直接返回一個error信息。所以,我們需要重新再定義一個隊列

boolean durable = true;
channel.queueDeclare("task_queue", durable, false, false, null);

在保證隊列的持久化后需要保證消息的持久化-將消息設置為PERSISTENT_TEXT_PLAIN

import com.rabbitmq.client.MessageProperties;

channel.basicPublish("", "task_queue",
        MessageProperties.PERSISTENT_TEXT_PLAIN,
        message.getBytes());  
公平分發(fā)

但這樣還是存在問題:假設有如下的情形,一個消費者非常耗時,而一個消費者非???,由于消息都是公平的發(fā)送,所以它們都是接收到相同數(shù)量的消息,會導致一個消費者非常忙碌,而另外一個消費者非??臻e,而RabbitMQ無法得知這一點。

為了解決這個缺陷我們引入了basicQos方法以及prefetchCount =1的設置。這會告訴RabbitMQ一次只給消費者一個消息:如果這個消息未確認,將不會發(fā)送新的消息,從而它會將消息發(fā)送給其它并不那么忙的消費者

int prefetchCount = 1;
channel.basicQos(prefetchCount);
留意queue size

如果所有的消費者都非常忙,隊列可能會很快被填滿,所以你需要留意這一點,要么增加更多的消費者,或者采取其它的策略。

整合

NewTask.java

import java.io.IOException;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.MessageProperties;

public class NewTask {

  private static final String TASK_QUEUE_NAME = "task_queue";

  public static void main(String[] argv)
                      throws java.io.IOException {

    ConnectionFactory factory = new ConnectionFactory();
    factory.setHost("localhost");
    Connection connection = factory.newConnection();
    Channel channel = connection.createChannel();

    channel.queueDeclare(TASK_QUEUE_NAME, true, false, false, null);

    String message = getMessage(argv);

    channel.basicPublish( "", TASK_QUEUE_NAME,
            MessageProperties.PERSISTENT_TEXT_PLAIN,
            message.getBytes());
    System.out.println(" [x] Sent "" + message + """);

    channel.close();
    connection.close();
  }      
  //...
}

Worker.java

import com.rabbitmq.client.*;

import java.io.IOException;

public class Worker {
  private static final String TASK_QUEUE_NAME = "task_queue";

  public static void main(String[] argv) throws Exception {
    ConnectionFactory factory = new ConnectionFactory();
    factory.setHost("localhost");
    final Connection connection = factory.newConnection();
    final Channel channel = connection.createChannel();

    channel.queueDeclare(TASK_QUEUE_NAME, true, false, false, null);
    System.out.println(" [*] Waiting for messages. To exit press CTRL+C");

    channel.basicQos(1);

    final Consumer consumer = new DefaultConsumer(channel) {
      @Override
      public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
        String message = new String(body, "UTF-8");

        System.out.println(" [x] Received "" + message + """);
        try {
          doWork(message);
        } finally {
          System.out.println(" [x] Done");
          channel.basicAck(envelope.getDeliveryTag(), false);
        }
      }
    };
    boolean autoAck = false;
    channel.basicConsume(TASK_QUEUE_NAME, autoAck, consumer);
  }

  private static void doWork(String task) {
    for (char ch : task.toCharArray()) {
      if (ch == ".") {
        try {
          Thread.sleep(1000);
        } catch (InterruptedException _ignored) {
          Thread.currentThread().interrupt();
        }
      }
    }
  }
}

使用消息確認和prefetchCount你就能設置一個持久化隊列了,同時,使用durable和persist,,即使RabbitMQ掛掉了,重啟后也能夠重發(fā)消息

文章版權歸作者所有,未經允許請勿轉載,若此文章存在違規(guī)行為,您可以聯(lián)系管理員刪除。

轉載請注明本文地址:http://specialneedsforspecialkids.com/yun/68099.html

相關文章

  • 白話RabbitMQ(三):發(fā)布/訂閱

    摘要:推廣專題講座開源項目我們利用消息隊列實現(xiàn)了分布式事務的最終一致性解決方案,請大家圍觀。因此一旦有有消息,消息會廣播到所有的消費者。如此一來路由器就能夠把消息發(fā)送給相應的隊列了。 推廣 RabbitMQ專題講座 https://segmentfault.com/l/15... CoolMQ開源項目 我們利用消息隊列實現(xiàn)了分布式事務的最終一致性解決方案,請大家圍觀??梢詤⒖荚创a:https...

    Ververica 評論0 收藏0
  • 白話rabbitmq(一): HelloWorld

    摘要:作為消息隊列的一個典型實踐,完全實現(xiàn)了標準,與的快快快不同,它追求的穩(wěn)定可靠。同一個隊列不僅可以綁定多個生產者,而且能夠發(fā)送消息到多個消費者。消費者接受并消費消息。幾乎于完全類似是一個繼承了接口的類,方便我們來存儲消息隊列來的消息。 推廣 RabbitMQ專題講座 https://segmentfault.com/l/15... CoolMQ開源項目 我們利用消息隊列實現(xiàn)了分布式事務的...

    garfileo 評論0 收藏0
  • 白話RabbitMQ(四): 建立路由

    摘要:可以參考源碼,項目支持網站,最新文章或實現(xiàn)會更新在上面前言在訂閱發(fā)布中我們建立了一個簡單的日志系統(tǒng),從而將消息廣播給一些消費者。因此,發(fā)送到路由鍵的消息會發(fā)送給隊列,發(fā)送到路由鍵或者的消息會發(fā)送給,其它的消息將被丟棄。 推廣 RabbitMQ專題講座 https://segmentfault.com/l/15... CoolMQ開源項目 我們利用消息隊列實現(xiàn)了分布式事務的最終一致性解決...

    CoderStudy 評論0 收藏0
  • 白話RabbitMQ(五): 主題路由器(Topic Exchange)

    摘要:推廣專題講座開源項目我們利用消息隊列實現(xiàn)了分布式事務的最終一致性解決方案,請大家圍觀。主題交換機也可以當成其它交換機來使用,假如隊列綁定到了那么它會接收所有的消息,就像廣播路由器一樣而如果未使用,那么就跟直達路由器一樣了。 推廣 RabbitMQ專題講座 https://segmentfault.com/l/15... CoolMQ開源項目 我們利用消息隊列實現(xiàn)了分布式事務的最終一致性...

    Gilbertat 評論0 收藏0
  • 白話RabbitMQ(六): RPC

    摘要:因為消費消息是在另外一個進程中,我們需要阻塞我們的進程直到結果返回,使用阻塞隊列是一種非常好的方式,這里我們使用了長度為的,的功能是檢查消息的的是不是我們之前所發(fā)送的,如果是,將返回值返回到。 推廣 RabbitMQ專題講座 https://segmentfault.com/l/15... CoolMQ開源項目 我們利用消息隊列實現(xiàn)了分布式事務的最終一致性解決方案,請大家圍觀??梢詤⒖?..

    KevinYan 評論0 收藏0

發(fā)表評論

0條評論

fnngj

|高級講師

TA的文章

閱讀更多
最新活動
閱讀需要支付1元查看
<