国产xxxx99真实实拍_久久不雅视频_高清韩国a级特黄毛片_嗯老师别我我受不了了小说

資訊專欄INFORMATION COLUMN

LocalMQ:從零構(gòu)建類 RocketMQ 高性能消息隊列

kel / 721人閱讀

摘要:從零構(gòu)建類高性能消息隊列所謂消息隊列,直觀來看有點(diǎn)像蓄水池,能夠在生產(chǎn)者與消費(fèi)者之間完成解耦,并且平衡生產(chǎn)者與消費(fèi)者之間的計算量與可計算時間之間的差異目前主流的消息隊列有著名的等等。

本文記錄了月前筆者參與阿里云中間件比賽中,實(shí)現(xiàn)的簡要具有持久化功能的消息隊列的設(shè)計與實(shí)現(xiàn)過程。需要聲明的是,LocalMQ 借鑒了 RocketMQ 在 Broker 部分的核心設(shè)計思想,最早的源碼也是基于 RocketMQ 源碼改造而來。本文涉及引用以及其他消息隊列相關(guān)資料參考這里,源代碼放于 LocalMQ 倉庫;另外筆者水平有限,后來因為畢業(yè)旅行也未繼續(xù)優(yōu)化,本文很多內(nèi)容可能存在謬誤與不足,請批評指正。

LocalMQ:從零構(gòu)建類 RocketMQ 高性能消息隊列

所謂消息隊列,直觀來看有點(diǎn)像蓄水池,能夠在生產(chǎn)者與消費(fèi)者之間完成解耦,并且平衡生產(chǎn)者與消費(fèi)者之間的計算量與可計算時間之間的差異;目前主流的消息隊列有著名的 Kafka、RabbitMQ、RocketMQ 等等。在筆者實(shí)現(xiàn)的 LocalMQ 中,從簡到復(fù)依次實(shí)現(xiàn)了 MemoryMessageMQ、EmbeddedMessageQueue 與 LocalMessageQueue 這三個版本;需要說明的是,在三個版本的消息隊列中,都是采取所謂的拉模式,即消費(fèi)者主動向消息隊列請求拉取消息的模式。在 wx.demo.* 包下提供了很多的內(nèi)部功能與性能測試用例,

// 首先在這里:https://parg.co/beX 下載代碼
// 然后修改 DefaultProducer 對應(yīng)的繼承類
// 測試 MemoryMessageQueue,則繼承 MemoryProducer;
// 測試 EmbeddedMessageQueue,則繼承 EmbeddedProducer;
// 默認(rèn)測試 LocalMessageQueue,注意,需要對 DefaultPullConsumer 進(jìn)行同樣修改
public class DefaultProducer extends LocalProducer

// 使用 mvn 運(yùn)行測試用例,也可以在 Eclipse 或者 Intellij 中打開
mvn clean package -U assembly:assembly -Dmaven.test.skip=true

java -Xmx2048m -Xms2048m  -cp open-messaging-wx.demo-1.0.jar  wx.demo.benchmark.ProducerBenchmark

最簡單的 MemoryMessageQueue 即是將消息數(shù)據(jù)按照選定主題存放在內(nèi)存中,其主要結(jié)構(gòu)如下圖所示:

MemoryMessageQueue 提供了同步的消息提交與拉取操作,其利用 HashMap 堆上存儲來緩存所有的消息;并且在內(nèi)存中維護(hù)了另一個所謂的 QueueOffsets 來記錄每個主題對應(yīng)隊列的消費(fèi)偏移量。相較于 MemoryMessageQueue 實(shí)現(xiàn)的簡單的不能進(jìn)行持久化存儲的消息隊列,EmbeddedMessageQueue 則提供了稍微復(fù)雜點(diǎn)的支持磁盤持久化的消息隊列。EmbeddedMessageQueue 構(gòu)建了基于 Java NIO 提供的 MappedByteBuffer 的 MappedPartitionQueue。每個 MappedPartitionQueue 對應(yīng)磁盤上的多個物理文件,并且為上層應(yīng)用抽象提供了邏輯上的單一文件。EmbeddedMessageQueue 結(jié)構(gòu)如下圖所示:

EmbeddedMessageQueue 的主要流程為生產(chǎn)者同步地像 Bucket Queue 中提交消息,每個 Bucket 可以視作某個主題(Topic)或者隊列(Queue)。而 EmbeddedMessageQueue 還包含著負(fù)責(zé)定期將 MappedPartitionQueue 中數(shù)據(jù)持久化寫入到磁盤的異步線程,該線程會定期地完成 Flush 操作。EmbeddedMessageQueue 假設(shè)某個 BucketQueue 被分配給某個 Consumer 之后就被其占用,該 Consumer 會消費(fèi)其中全部的緩存消息;每個 Consumer 會包含獨(dú)立地 Consumer Offset Table 來記錄當(dāng)前某個隊列地消費(fèi)情況。EmbeddedMessageQueue 的缺陷在于:

混合處理與標(biāo)記位:EmbeddedMessageQueue 僅提供了最簡單的消息序列化模型,無法記錄額外的消息屬性;

持久化存儲到磁盤的時機(jī):EmbeddedMessageQueue 僅使用了一級緩存,并且僅在某個 Partition 寫滿時才進(jìn)行文件的持久化操作;

添加消息的后處理:EmbeddedMessageQueue 是將消息直接寫入到 BucketQueue 包含的 MappedPartitionQueue 中,無法動態(tài)地進(jìn)行索引、篩選等消息后處理,其可擴(kuò)展性較差。

未考慮斷續(xù)拉取的情況:EmbeddedMessageQueue 中是假設(shè) Consumer 能夠單次處理完某個 BucketQueue 中的單個 Partition 的全部消息,因此記錄其處理值時也僅是記錄了文件級別的位移,如果存在某次是僅拉取了單個 Partition 中部分內(nèi)容,則下次的起始拉取點(diǎn)還是下個文件首。

EmbeddedMessageQueue 中我們可以在各 Producer 線程中多帶帶將消息持久化入文件中,而在 LocalMessageQueue 中,我們是將消息統(tǒng)一寫入 MessageStore 中,然后又 PostPutMessageService 進(jìn)行二次處理。 LocalMessageQueue 的結(jié)構(gòu)如下所示:

LocalMessageQueue 最大的變化在于將消息統(tǒng)一存儲在獨(dú)立地 MessageStore 中(類似于 RocketMQ 中的 CommitLog),然后針對 Topic-queueId 將消息劃分到不同的 ConsumeQueue 中;這里的 queueId 是由對應(yīng)的 Producer 專屬編號決定的,每個 Consumer 即會被分配占用某個 ConsumeQueue(類似于 RocketMQ 中的 consumequeue),從而保證某個 Producer 生產(chǎn)的某個主題下的消息被專一的 Consumer 消費(fèi)。LocalMessageQueue 同樣使用 MappedPartitionQueue 提供底層文件系統(tǒng)抽象,并且構(gòu)建了獨(dú)立的 ConsumerOffsetManager 對消費(fèi)者的消費(fèi)進(jìn)度進(jìn)行管理,從而方便異常恢復(fù)。

設(shè)計概要 順序消費(fèi)

本部分圖來源于分布式開放消息系統(tǒng)(RocketMQ)的原理與實(shí)踐

消息產(chǎn)品的一個重要特性是順序保證,也就是消息消費(fèi)的順序要與發(fā)送的時間順序保持一致;在多發(fā)送端的情況下,保證全局順序代價比較大,只要求各個發(fā)送端的順序有保障即可; 舉個例子 P1 發(fā)送 M11, M12, M13,P2 發(fā)送 M21, M22, M23,在消費(fèi)的時候,只要求保證 M11, M12, M13(M21,M22,M23)的順序,也就是說,實(shí)際消費(fèi)順序為: M11, M21, M12, M13, M22, M23 正確; M11, M21, M22, M12, M13, M23 正確 M11, M13, M21, M22, M23, M12 錯誤,M12 與 M13 的順序顛倒了;假如生產(chǎn)者產(chǎn)生了 2 條消息:M1、M2,要保證這兩條消息的順序,最直觀的方式就是采取類似于 TCP 中的確認(rèn)消息:

不過該模型中如果 M1 與 M2 分別被發(fā)送到了兩臺不同的消息服務(wù)器上,我們無法控制消息服務(wù)器發(fā)送 M1 與 M2 的先后時機(jī);有可能 M2 已經(jīng)被發(fā)送到了消費(fèi)者,M1 才被發(fā)送到了消息服務(wù)器上。針對這個問題改進(jìn)版的思路即是將 M1 與 M2 發(fā)送到單一消息服務(wù)器中,然后根據(jù)先到達(dá)先消費(fèi)的原則發(fā)送給對應(yīng)的消費(fèi)者:

不過在實(shí)際情況下往往因為網(wǎng)絡(luò)延遲或其他問題導(dǎo)致在 M1 發(fā)送耗時大于 M2 的情況下,M2 會先于 M1 被消費(fèi)。因此如果我們要保證嚴(yán)格的順序消息,那么必須要保證生產(chǎn)者、消息服務(wù)器與消費(fèi)者之間的一對一對應(yīng)關(guān)系。在 LocalMQ 的實(shí)現(xiàn)中,我們首先會將消息按照生產(chǎn)者劃分到唯一的 Topic-queueId 隊列中;并且保證同一時刻該消費(fèi)隊列只會被某個消費(fèi)者獨(dú)占。如果某個消費(fèi)者在消費(fèi)完該隊列之前意外中斷,那么在保留窗口期內(nèi)不會將該隊列重新分配;在窗口期之外則將該隊列分配給新的消費(fèi)者,并且即使原有消費(fèi)者恢復(fù)工作也無法繼續(xù)拉取該隊列中包含的消息。

數(shù)據(jù)存儲

LocalMQ 中目前是實(shí)現(xiàn)了基于文件系統(tǒng)的持久化存儲,主要功能實(shí)現(xiàn)在 MappedPartition 與 MappedPartitionQueue 這兩個類中,筆者也會在下文中詳細(xì)介紹這兩個類的實(shí)現(xiàn)。本部分我們討論下數(shù)據(jù)存儲的文件格式,對于 LocalMessageQueue 而言,其文件存儲如下:

 * messageStore
 * -- MapFile1
 * -- MapFile2
 * consumeQueue
 * -- Topic1
 * ---- queueId1
 * ------ MapFile1
 * ------ MapFile2
 * ---- queueId2
 * ------ MapFile1
 * ------ MapFile2
 * -- Queue1
 * ---- queueId1
 * ------ MapFile1
 * ------ MapFile2
 * ---- queueId2
 * ------ MapFile1
 * ------ MapFile2

LocalMessageQueue 中采用了消息統(tǒng)一存儲的方案,因此所有的消息實(shí)際內(nèi)容會被存放在 messageStore 目錄下。而 consumeQueue 中則存放了消息的索引,即在 messageStore 中的偏移地址。LocalMQ 中使用 MappedPartitionQueue 來管理某個邏輯上單一的文件,而根據(jù)不同的單文件大小限制會自動將其切割為多個物理上獨(dú)立的 Mapped File。每個 MappedPartition 使用 offset,即該文件首地址的全局偏移量命名;而使用 pos / position 統(tǒng)一表示單文件中局部偏移量,使用 index 表示某個文件在其文件夾中的下標(biāo)。

性能優(yōu)化

在編寫的過程中,筆者發(fā)現(xiàn)對于執(zhí)行流的優(yōu)化、避免重復(fù)計算與額外變量、選擇使用合適的并發(fā)策略都會對結(jié)果造成極大的影響,譬如筆者從 SpinLock 切換到重入鎖之后,本地測試 TPS 增加了約 5%。另外筆者也統(tǒng)計了消費(fèi)者工作中不同階段的時間占比,其中構(gòu)建(包括消息屬性的序列化)與發(fā)送操作(寫入到 MappedFileQueue 中,未使用二級緩存)都是同步進(jìn)行,二者的時間占比也是最多。

[2017-06-01 12:13:21,802] INFO: 構(gòu)建耗時占比:0.471270,發(fā)送耗時占比:0.428567,持久化耗時占比:0.100163
[2017-06-01 12:25:31,275] INFO: 構(gòu)建耗時占比:0.275170,發(fā)送耗時占比:0.573520,持久化耗時占比:0.151309
代碼級別優(yōu)化

筆者在實(shí)現(xiàn) LocalMQ 的過程中感觸最深的就是實(shí)現(xiàn)相同功能的不同代碼在性能上的差異可能會很大。在實(shí)現(xiàn)過程中應(yīng)該避免冗余變量聲明與創(chuàng)建、避免額外空間申請與垃圾回收、避免冗余的執(zhí)行過程;另外盡可能選用合適的數(shù)據(jù)結(jié)構(gòu),譬如筆者在部分實(shí)現(xiàn)中從 ArrayList 遷移到了 LinkedList,從 ConcurrentHashMap 遷移到了 HashMap,都帶來了一定的評測指標(biāo)提升。

異步 IO

異步 IO,順序 Flush;筆者發(fā)現(xiàn),如果多個線程進(jìn)行并發(fā) Flush 操作,反而不如單線程進(jìn)行順序 Flush。

并發(fā)控制

盡量減少鎖控制的范圍。

并發(fā)計算優(yōu)化,將所有的耗時計算放到可以并發(fā)的 Producer 中。

使用合理的鎖,重入鎖相較于自旋鎖有近 5 倍的 TPS 提升。

MemoryMessageQueue

源代碼參考這里

MemoryMessageQueue 是最簡易的實(shí)現(xiàn),不過其代碼能夠反映出某個消息隊列的基本流程,首先在生產(chǎn)者我們需要創(chuàng)建消息并且發(fā)送給消息隊列:

// 創(chuàng)建消息
BytesMessage message = messageFactory.createBytesMessageToTopic(topic, body);

// 發(fā)送消息
messageQueue.putMessage(topic, message);

putMessage 函數(shù)中則將消息存入內(nèi)存存儲中:

// 存放所有消息
private Map> messageBuckets = new HashMap<>();

// 添加消息
public synchronized PutMessageResult putMessage(String bucket, Message message) {
        if (!messageBuckets.containsKey(bucket)) {
            messageBuckets.put(bucket, new ArrayList<>(1024));
        }
        ArrayList bucketList = messageBuckets.get(bucket);
        bucketList.add(message);

        return new PutMessageResult(PutMessageStatus.PUT_OK, null);
    }

而 Consumer 則根據(jù)指定的 Bucket 與 queueId 來拉取消息,如果存在多個 Bucket 需要拉取則進(jìn)行輪詢:

//use Round Robin
int checkNum = 0;

while (++checkNum <= bucketList.size()) {
    String bucket = bucketList.get((++lastIndex) % (bucketList.size()));
    Message message = messageQueue.pullMessage(queue, bucket);
    if (message != null) {
        return message;
    }
}

而 MemoryMessageQueue 的 pullMessage 函數(shù)則首先判斷目標(biāo) Bucket 是否存在,并且根據(jù)內(nèi)置的 queueOffset 中記錄的拉取偏移量來判斷是否拉取完畢。若沒有拉取完畢則返回消息并且更新本地偏移量;

private Map> queueOffsets = new HashMap<>();
...
public synchronized Message pullMessage(String queue, String bucket) {
    ...
    ArrayList bucketList = messageBuckets.get(bucket);
    if (bucketList == null) {
        return null;
    }
    HashMap offsetMap = queueOffsets.get(queue);
    if (offsetMap == null) {
        offsetMap = new HashMap<>();
        queueOffsets.put(queue, offsetMap);
    }
    int offset = offsetMap.getOrDefault(bucket, 0);
    if (offset >= bucketList.size()) {
        return null;
    }
    Message message = bucketList.get(offset);
    offsetMap.put(bucket, ++offset);
    ...
}
EmbeddedMessageQueue

源代碼參考這里

EmbeddedMessageQueue 中引入了消息持久化支持,本部分我們也主要討論消息序列化與底層的 MappedPartitionQueue 實(shí)現(xiàn)。

消息序列化

EmbeddedMessageQueue 中定義的消息格式如下:

序號 消息存儲結(jié)構(gòu) 備注 長度(字節(jié)數(shù))
1 TOTALSIZE 消息大小 4
2 MAGICCODE 消息的 MAGIC CODE 4
3 BODY 前 4 個字節(jié)存放消息體大小值,后 bodyLength 大小的空間存儲消息體內(nèi)容 4 + bodyLength
4 headers* 前 2 個字節(jié)(short)存放頭部大小,后存放 headersLength 大小的頭部數(shù)據(jù) 2 + headersLength
5 properties* 前 2 個字節(jié)(short)存放屬性值大小,后存放 propertiesLength 大小的屬性數(shù)據(jù) 2 + propertiesLength

EmbeddedMessageSerializer 是繼承自 MessageSerializer 的主要負(fù)責(zé)消息持久化的類,其提供了消息長度的計算函數(shù):

/**
    * Description 計算某個消息的長度,注意,headersByteArray 與 propertiesByteArray 在發(fā)送消息時完成轉(zhuǎn)換
    * @param message
    * @param headersByteArray
    * @param propertiesByteArray
    * @return
    */
public static int calMsgLength(DefaultBytesMessage message, byte[] headersByteArray, byte[] propertiesByteArray) {

    // 消息體
    byte[] body = message.getBody();

    int bodyLength = body == null ? 0 : body.length;

    // 計算頭部長度
    short headersLength = (short) headersByteArray.length;

    // 計算屬性長度
    short propertiesLength = (short) propertiesByteArray.length;

    // 計算消息體總長度
    return calMsgLength(bodyLength, headersLength, propertiesLength);

}

而 EmbeddedMessageEncoder 的 encode 函數(shù)負(fù)責(zé)具體的消息序列化操作:

/**
    * Description 執(zhí)行消息的編碼操作
    * @param message 消息對象
    * @param msgStoreItemMemory 內(nèi)部緩存句柄
    * @param msgLen 計算的消息長度
    * @param headersByteArray 消息頭字節(jié)序列
    * @param propertiesByteArray 消息屬性字節(jié)序列
*/
public static final void encode(
    DefaultBytesMessage message,
    final ByteBuffer msgStoreItemMemory,
    int msgLen,
    byte[] headersByteArray,
    byte[] propertiesByteArray
) {

// 消息體
byte[] body = message.getBody();

int bodyLength = body == null ? 0 : body.length;

// 計算頭部長度
short headersLength = (short) headersByteArray.length;

// 計算屬性長度
short propertiesLength = (short) propertiesByteArray.length;

// 初始化存儲空間
resetByteBuffer(msgStoreItemMemory, msgLen);

// 1 TOTALSIZE
msgStoreItemMemory.putInt(msgLen);

// 2 MAGICCODE
msgStoreItemMemory.putInt(MESSAGE_MAGIC_CODE);

// 3 BODY
msgStoreItemMemory.putInt(bodyLength);
if (bodyLength > 0)
    msgStoreItemMemory.put(message.getBody());

// 4 HEADERS
msgStoreItemMemory.putShort((short) headersLength);
if (headersLength > 0)
    msgStoreItemMemory.put(headersByteArray);

// 5 PROPERTIES
msgStoreItemMemory.putShort((short) propertiesLength);
if (propertiesLength > 0)
    msgStoreItemMemory.put(propertiesByteArray);

}

對應(yīng)的反序列化操作則是由 EmbeddedMessageDecoder 完成,其主要從某個 ByteBuffer 中讀取數(shù)據(jù):

/**
    * Description 從輸入的 ByteBuffer 中反序列化消息對象
    *
    * @return 0 Come the end of the file // >0 Normal messages // -1 Message checksum failure
    */
public static DefaultBytesMessage readMessageFromByteBuffer(ByteBuffer byteBuffer) {

    // 1 TOTAL SIZE
    int totalSize = byteBuffer.getInt();

    // 2 MAGIC CODE
    int magicCode = byteBuffer.getInt();

    switch (magicCode) {
        case MESSAGE_MAGIC_CODE:
            break;
        case BLANK_MAGIC_CODE:
            return null;
        default:
//                log.warning("found a illegal magic code 0x" + Integer.toHexString(magicCode));
            return null;
    }

    byte[] bytesContent = new byte[totalSize];

    // 3 BODY
    int bodyLen = byteBuffer.getInt();
    byte[] body = new byte[bodyLen];

    if (bodyLen > 0) {
        // 讀取并且校驗消息體內(nèi)容
        byteBuffer.get(body, 0, bodyLen);
    }

    // 4 HEADERS
    short headersLength = byteBuffer.getShort();
    KeyValue headers = null;
    if (headersLength > 0) {
        byteBuffer.get(bytesContent, 0, headersLength);
        String headersStr = new String(bytesContent, 0, headersLength, EmbeddedMessageDecoder.CHARSET_UTF8);
        headers = string2KeyValue(headersStr);

    }

    // 5 PROPERTIES

    // 獲取 properties 尺寸
    short propertiesLength = byteBuffer.getShort();
    KeyValue properties = null;
    if (propertiesLength > 0) {
        byteBuffer.get(bytesContent, 0, propertiesLength);
        String propertiesStr = new String(bytesContent, 0, propertiesLength, EmbeddedMessageDecoder.CHARSET_UTF8);
        properties = string2KeyValue(propertiesStr);

    }

    // 返回讀取到的消息
    return new DefaultBytesMessage(
            totalSize,
            headers,
            properties,
            body
    );


}
消息寫入

EmbeddedMessageQueue 中消息的寫入實(shí)際上是由 BucketQueue 的 putMessage/putMessages 函數(shù)完成的,這里的某個 BucketQueue 就對應(yīng)著 Topic-queueId 這個唯一的標(biāo)識。這里以批量寫入消息為例,首先我們從 BucketQueue 包含的 MappedPartitionQueue 中獲取到最新可用的某個 MappedPartition:

mappedPartition = this.mappedPartitionQueue.getLastMappedFileOrCreate(0);

然后調(diào)用 MappedPartition 的 appendMessages 方法,該方法會在下文介紹;這里則是要討論添加消息的幾種結(jié)果對應(yīng)的處理。如果添加成功,則直接返回成功;如果該 MappedPartition 剩余空間不足以寫入消息隊列中的某條消息,則需要調(diào)用 MappedPartitionQueue 創(chuàng)建新的 MappedPartition,并且重新計算待寫入的消息序列:

...
// 調(diào)用對應(yīng)的 MappedPartition 追加消息
// 注意,這里經(jīng)過填充之后,會逆向地將消息在 MessageStore 中的偏移與 QueueOffset 中偏移添加進(jìn)去
result = mappedPartition.appendMessages(messages, this.appendMessageCallback);

// 根據(jù)追加結(jié)果進(jìn)行不同的操作
switch (result.getStatus()) {
    case PUT_OK:
        break;
    case END_OF_FILE:

        this.messageQueue.getFlushAndUnmapPartitionService().putPartition(mappedPartition);

        // 如果已經(jīng)到了文件最后,則創(chuàng)建新文件
        mappedPartition = this.mappedPartitionQueue.getLastMappedFileOrCreate(0);

        if (null == mappedPartition) {
            // XXX: warn and notify me
            log.warning("創(chuàng)建 MappedPartition 錯誤, topic: " + messages.get(0).getTopicOrQueueName());
            beginTimeInLock = 0;
            return new PutMessageResult(PutMessageStatus.CREATE_MAPEDFILE_FAILED, result);
        }
        // 否則重新進(jìn)行添加操作
        // 從結(jié)果中獲取處理完畢的消息數(shù)
        int appendedMessageNum = result.getAppendedMessageNum();

        // 創(chuàng)建臨時的 LeftMessages
        ArrayList leftMessages = new ArrayList<>();

        // 添加所有未消費(fèi)的消息
        for (int i = appendedMessageNum; i < messages.size(); i++) {
            leftMessages.add(messages.get(i));
        }

        result = mappedPartition.appendMessages(leftMessages, this.appendMessageCallback);

        break;
    case MESSAGE_SIZE_EXCEEDED:
    case PROPERTIES_SIZE_EXCEEDED:
        beginTimeInLock = 0;
        return new PutMessageResult(PutMessageStatus.MESSAGE_ILLEGAL, result);
    case UNKNOWN_ERROR:
        beginTimeInLock = 0;
        return new PutMessageResult(PutMessageStatus.UNKNOWN_ERROR, result);
    default:
        beginTimeInLock = 0;
        return new PutMessageResult(PutMessageStatus.UNKNOWN_ERROR, result);
}
...
邏輯文件存儲 Mapped Partition

某個 MappedPartition 映射物理上的單個文件,其初始化時如下傳入文件名與文件尺寸屬性:

/**
    * Description 初始化某個內(nèi)存映射文件
    *
    * @param fileName 文件名
    * @param fileSize 文件尺寸
    * @throws IOException 打開文件出現(xiàn)異常
    */
private void init(final String fileName, final int fileSize) throws IOException {
    ...

    // 從文件名中獲取到當(dāng)前文件的全局偏移量
    this.fileFromOffset = Long.parseLong(this.file.getName());

    ... 

    // 嘗試打開文件
    this.fileChannel = new RandomAccessFile(this.file, "rw").getChannel();

    // 將文件映射到內(nèi)存中
    this.mappedByteBuffer = this.fileChannel.map(FileChannel.MapMode.READ_WRITE, 0, fileSize);
}

初始化階段即打開文件映射,而后在寫入消息或者其他內(nèi)容時,其會調(diào)用傳入的消息編碼回調(diào)(即是我們上文中介紹的消息序列化的包裹對象)將對象編碼為字節(jié)流并且寫入:

public AppendMessageResult appendMessage(final DefaultBytesMessage message, final AppendMessageCallback cb) {

    ...

    // 獲取當(dāng)前的寫入位置
    int currentPos = this.wrotePosition.get();

    // 如果當(dāng)前還是可寫的
    if (currentPos < this.fileSize) {

        // 獲取到實(shí)際的寫入句柄
        ByteBuffer byteBuffer = this.mappedByteBuffer.slice();

        // 調(diào)整當(dāng)前寫入位置
        byteBuffer.position(currentPos);

        // 記錄信息
        AppendMessageResult result = null;

        // 調(diào)用回調(diào)函數(shù)中的實(shí)際寫入操作
        result = cb.doAppend(this.getFileFromOffset(), byteBuffer, this.fileSize - currentPos, message);

        this.wrotePosition.addAndGet(result.getWroteBytes());
        this.storeTimestamp = result.getStoreTimestamp();
        return result;
    }

    ...
}
MappedPartitionQueue

MappedPartitionQueue 用來管理多個物理上的映射文件,其構(gòu)造函數(shù)如下:

// 存放所有的映射文件
private final CopyOnWriteArrayList mappedPartitions = new CopyOnWriteArrayList();

...

/**
    * Description  默認(rèn)構(gòu)造函數(shù)
    *
    * @param storePath                      傳入的存儲文件目錄,有可能傳入 MessageStore 目錄或者 ConsumeQueue 目錄
    * @param mappedFileSize
    * @param allocateMappedPartitionService
    */
public MappedPartitionQueue(final String storePath, int mappedFileSize,
                            AllocateMappedPartitionService allocateMappedPartitionService) {
    this.storePath = storePath;
    this.mappedFileSize = mappedFileSize;
    this.allocateMappedPartitionService = allocateMappedPartitionService;
}{}

這里以 load 函數(shù)為例說明其加載過程:

/**
    * Description 加載內(nèi)存映射文件序列
    *
    * @return
    */
public boolean load() {

    // 讀取存儲路徑
    File dir = new File(this.storePath);

    // 列舉目錄下所有文件
    File[] files = dir.listFiles();

    // 如果文件不為空,則表示有必要加載
    if (files != null) {

        // 重排序
        Arrays.sort(files);

        // 遍歷所有的文件
        for (File file : files) {

            // 如果碰到某個文件尚未填滿,則返回加載完畢
            if (file.length() != this.mappedFileSize) {
                log.warning(file + "	" + file.length()
                        + " length not matched message store config value, ignore it");
                return true;
            }

            // 否則加載文件
            try {

                // 實(shí)際讀取文件
                MappedPartition mappedPartition = new MappedPartition(file.getPath(), mappedFileSize);

                // 設(shè)置當(dāng)前文件指針到文件尾
                mappedPartition.setWrotePosition(this.mappedFileSize);
                mappedPartition.setFlushedPosition(this.mappedFileSize);

                // 將文件放置到 MappedFiles 數(shù)組中
                this.mappedPartitions.add(mappedPartition);
//                    log.info("load " + file.getPath() + " OK");

            } catch (IOException e) {
                log.warning("load file " + file + " error");
                return false;
            }
        }
    }

    return true;
}
異步預(yù)創(chuàng)建文件

處于性能的考慮,MappedPartitionQueue 還會提前創(chuàng)建文件,在 getLastMappedFileOrCreate 函數(shù)中,當(dāng) allocateMappedPartitionService 存在的情況下則會調(diào)用該異步服務(wù)預(yù)創(chuàng)建文件:

/**
    * Description 根據(jù)起始偏移量查找最后一個文件
    *
    * @param startOffset
    * @return
*/
public MappedPartition getLastMappedFileOrCreate(final long startOffset) {

    ...

    // 如果有必要創(chuàng)建文件
    if (createOffset != -1) {

        // 獲取到下一個文件的路徑與文件名
        String nextFilePath = this.storePath + File.separator + FSExtra.offset2FileName(createOffset);

        // 以及下下個文件的路徑與文件名
        String nextNextFilePath = this.storePath + File.separator
                + FSExtra.offset2FileName(createOffset + this.mappedFileSize);

        // 指向待創(chuàng)建的映射文件句柄
        MappedPartition mappedPartition = null;

        // 判斷是否存在創(chuàng)建映射文件的服務(wù)
        if (this.allocateMappedPartitionService != null) {

            // 使用服務(wù)創(chuàng)建
            mappedPartition = this.allocateMappedPartitionService.putRequestAndReturnMappedFile(nextFilePath,
                    nextNextFilePath, this.mappedFileSize);
            // 進(jìn)行預(yù)熱處理
        } else {

            // 否則直接創(chuàng)建
            try {
                mappedPartition = new MappedPartition(nextFilePath, this.mappedFileSize);
            } catch (IOException e) {
                log.warning("create mappedPartition exception");
            }
        }

        ...

        return mappedPartition;
    }

    return mappedPartitionLast;
}

這里的 AllocateMappedPartitionService 則會不間斷地執(zhí)行創(chuàng)建文件的請求:

@Override
public void run() {

    ...

    // 循環(huán)執(zhí)行文件分配請求
    while (!this.isStopped() && this.mmapOperation()) {}
    ...
}


/**
    * Description 循環(huán)執(zhí)行映射文件預(yù)分配
    *
    * @Exception Only interrupted by the external thread, will return false
    */
private boolean mmapOperation() {

    ...

    // 執(zhí)行操作
    try {

        // 取出最新的執(zhí)行對象
        req = this.requestQueue.take();

        // 取得待執(zhí)行對象在請求表中的實(shí)例
        AllocateRequest expectedRequest = this.requestTable.get(req.getFilePath());

        ...

        // 判斷是否已經(jīng)存在創(chuàng)建好的對象
        if (req.getMappedPartition() == null) {

            // 記錄起始創(chuàng)建時間
            long beginTime = System.currentTimeMillis();

            // 構(gòu)建內(nèi)存映射文件對象
            MappedPartition mappedPartition = new MappedPartition(req.getFilePath(), req.getFileSize());

            ...

            // 進(jìn)行文件預(yù)熱,僅預(yù)熱 MessageStore
            if (mappedPartition.getFileSize() >= mapedFileSizeCommitLog && isWarmMappedFileEnable) {
                mappedPartition.warmMappedFile();
            }

            // 將創(chuàng)建好的對象回寫到請求中
            req.setMappedPartition(mappedPartition);

            // 異常設(shè)置為 false
            this.hasException = false;

            // 成功設(shè)置為 true
            isSuccess = true;
        }
    ...
}
異步 Flush

EmbeddedMessageQueue 中還包含了某個 flushAndUnmapPartitionServices 用于異步 Flush 文件并且完成不用映射文件的關(guān)閉操作。該服務(wù)的核心代碼如下:

private final ConcurrentLinkedQueue mappedPartitions = new ConcurrentLinkedQueue<>();

...

@Override
public void run() {

    while (!this.isStopped()) {

        int interval = 100;

        try {

            if (this.mappedPartitions.size() > 0) {

                long startTime = now();

                // 取出待處理的 MappedPartition
                MappedPartition mappedPartition = this.mappedPartitions.poll();

                // 將當(dāng)前內(nèi)容寫入到磁盤
                mappedPartition.flush(0);

                // 釋放當(dāng)前不需要使用的空間
                mappedPartition.cleanup();

                long past = now() - startTime;

//                    EmbeddedProducer.flushEclipseTime.addAndGet(past);

                if (past > 500) {
                    log.info("Flush data to disk and unmap MappedPartition costs " + past + " ms:" + mappedPartition.getFileName());
                }
            } else {
                // 定時進(jìn)行 Flush 操作
                this.waitForRunning(interval);
            }


        } catch (Throwable e) {
            log.warning(this.getServiceName() + " service has exception. ");
        }

    }

}

這里的 mappedPartitions 即是在上文介紹的當(dāng)添加消息且返回為 END_OF_FILE 時候添加進(jìn)來的。

LocalMessageQueue

源代碼參考這里

消息存儲

LocalMessageQueue 中采用了中心化的消息存儲方案,其提供的 putMessage / putMessages 函數(shù)實(shí)際上會調(diào)用內(nèi)置 MessageStore 對象的消息寫入函數(shù):

// 使用 MessageStore 進(jìn)行提交
PutMessageResult result = this.messageStore.putMessage(message);

而 MessageStore 即是存放所有真實(shí)消息的中心存儲,LocalMessageQueue 中支持更為復(fù)雜的消息屬性:

序號 消息存儲結(jié)構(gòu) 備注 長度(字節(jié)數(shù))
1 TOTALSIZE 消息大小 4
2 MAGICCODE 消息的 MAGIC CODE 4
3 BODYCRC 消息體 BODY CRC,用于重啟時校驗 4
4 QUEUEID 隊列編號,queueID 4
5 QUEUEOFFSET 自增值,不是真正的 consume queue 的偏移量,可以代表這個隊列中消息的個數(shù),要通過這個值查找到 consume queue 中數(shù)據(jù),QUEUEOFFSET * 12 才是偏移地址 8
6 PHYSICALOFFSET 消息在 commitLog 中的物理起始地址偏移量 8
7 STORETIMESTAMP 存儲時間戳 8
8 BODY 前 4 個字節(jié)存放消息體大小值,后 bodyLength 大小的空間存儲消息體內(nèi)容 4 + bodyLength
9 TOPICORQUEUENAME 前 1 個字節(jié)存放 Topic 大小,后存放 topicOrQueueNameLength 大小的主題名 1 + topicOrQueueNameLength
10 headers* 前 2 個字節(jié)(short)存放頭部大小,后存放 headersLength 大小的頭部數(shù)據(jù) 2 + headersLength
11 properties* 前 2 個字節(jié)(short)存放屬性值大小,后存放 propertiesLength 大小的屬性數(shù)據(jù) 2 + propertiesLength

其構(gòu)造函數(shù)中初始化創(chuàng)建的 MappedPartitionQueue 是按照固定大小(默認(rèn)單文件 1G)的映射文件組:

// 構(gòu)造映射文件類
this.mappedPartitionQueue = new MappedPartitionQueue(
        ((LocalMessageQueueConfig) this.messageStore.getMessageQueueConfig()).getStorePathCommitLog(),
        mapedFileSizeCommitLog,
        messageStore.getAllocateMappedPartitionService(),
        this.flushMessageStoreService
);
構(gòu)建 ConsumeQueue

不同于 EmbeddedMessageQueue,LocalMessageQueue 并沒有在初次提交消息時就直接寫入按照 Topic-queueId 劃分的存儲內(nèi);而是依賴于內(nèi)置的 PostPutMessageService :

/**
    * Description 執(zhí)行消息后操作
    */
private void doReput() {

    for (boolean doNext = true; this.isCommitLogAvailable() && doNext; ) {

        ...

        // 讀取當(dāng)前的消息
        SelectMappedBufferResult result = this.messageStore.getMessageStore().getData(reputFromOffset);


        // 如果消息不存在,則停止當(dāng)前操作
        if (result == null) {
            doNext = false;
            continue;
        }
        try {

            // 獲取當(dāng)前消息的起始位置
            this.reputFromOffset = result.getStartOffset();

            // 順序讀取所有消息
            for (int readSize = 0; readSize < result.getSize() && doNext; ) {

                // 讀取當(dāng)前位置的消息
                PostPutMessageRequest postPutMessageRequest =
                        checkMessageAndReturnSize(result.getByteBuffer());

                int size = postPutMessageRequest.getMsgSize();

                readSpendTime.addAndGet(now() - startTime);

                startTime = now();
                // 如果處理成功
                if (postPutMessageRequest.isSuccess()) {
                    if (size > 0) {

                        // 執(zhí)行消息寫入到 ConsumeQueue 的操作
                        this.messageStore.putMessagePositionInfo(postPutMessageRequest);

                        // 修正當(dāng)前讀取的位置
                        this.reputFromOffset += size;
                        readSize += size;

                    } else if (size == 0) {
                        this.reputFromOffset = this.messageStore.getMessageStore().rollNextFile(this.reputFromOffset);
                        readSize = result.getSize();
                    }

                    putSpendTime.addAndGet(now() - startTime);

                } else if (!postPutMessageRequest.isSuccess()) {

                    ...
                }
            }

        } finally {
            result.release();
        }

    }
}

而在 putMessagePositionInfo 函數(shù)中即進(jìn)行實(shí)際的 ConsumeQueue 創(chuàng)建:

/**
    * Description 將消息的位置放置到 ConsumeQueue 中
    *
    * @param postPutMessageRequest
    */
public void putMessagePositionInfo(PostPutMessageRequest postPutMessageRequest) {

    // 尋找或者創(chuàng)建 ConsumeQueue
    ConsumeQueue cq = this.findConsumeQueue(postPutMessageRequest.getTopic(), postPutMessageRequest.getQueueId());

    // 將消息放置到 ConsumeQueue 中合適的位置
    cq.putMessagePositionInfoWrapper(postPutMessageRequest.getCommitLogOffset(), postPutMessageRequest.getMsgSize(), postPutMessageRequest.getConsumeQueueOffset());

}

/**
    * Description 根據(jù)主題與 QueueId 查找 ConsumeQueue,如果不存在則創(chuàng)建
    *
    * @param topic
    * @param queueId
    * @return
*/
public ConsumeQueue findConsumeQueue(String topic, int queueId) {
    ConcurrentHashMap map = consumeQueueTable.get(topic);

    ...

    // 判斷該主題下是否存在 queueId,不存在則創(chuàng)建
    ConsumeQueue logic = map.get(queueId);

    // 如果獲取為空,則創(chuàng)建新的 ConsumeQueue
    if (null == logic) {

        ConsumeQueue newLogic = new ConsumeQueue(//
                topic, // 主題
                queueId, // queueId
                LocalMessageQueueConfig.mapedFileSizeConsumeQueue, // 映射文件尺寸
                this);


        ConsumeQueue oldLogic = map.putIfAbsent(queueId, newLogic);

        ...
    }

    return logic;
}

而在 ConsumeQueue 的構(gòu)造函數(shù)中完成實(shí)際的文件映射與讀取:

/**
    * Description 主要構(gòu)造函數(shù)
    *
    * @param topic
    * @param queueId
    * @param mappedFileSize
    * @param localMessageStore
    */
public ConsumeQueue(
        final String topic,
        final int queueId,
        final int mappedFileSize,
        final LocalMessageQueue localMessageStore) {

    ...

    // 當(dāng)前隊列的路徑
    String queueDir = this.storePath
            + File.separator + topic
            + File.separator + queueId;

    // 初始化內(nèi)存映射隊列
    this.mappedPartitionQueue = new MappedPartitionQueue(queueDir, mappedFileSize, null);

    this.byteBufferIndex = ByteBuffer.allocate(CQ_STORE_UNIT_SIZE);

}

ConsumeQueue 的文件格式則相對簡單:

// ConsumeQueue 文件內(nèi)存放的單條 Message 尺寸
// 1 | MessageStore Offset | int 8 Byte
// 2 | Size | short 8 Byte
消息拉取

在 LocalPullConsumer 拉取消息時,設(shè)置的批量拉取機(jī)制;即一次性從 LocalMessageQueue 拉取多條消息到本地,然后再批次返回給本地進(jìn)行處理(假設(shè)處理也有一定耗時)。在批次拉取的函數(shù)中,我們首先需要獲取當(dāng)前 Consumer 處理的主題與隊列編號對應(yīng)的 ConsumeQueue 是否包含數(shù)據(jù),然后再申請具體的讀取句柄并且占用該隊列:

/**
    * Description 批量抓取消息,注意,這里只進(jìn)行預(yù)抓取,僅當(dāng)消費(fèi)者真正獲取后才會修正讀取偏移量
    */
private void batchPoll() {
    // 如果是 LocalMessageQueue
    // 執(zhí)行預(yù)抓取
    LocalMessageQueue localMessageStore = (LocalMessageQueue) this.messageQueue;

    // 獲取當(dāng)前待抓取的桶名
    String bucket = bucketList.get((lastIndex) % (bucketList.size()));

    // 首先獲取待抓取的隊列和偏移
    long offsetInQueue = localMessageStore.getConsumerScheduler().queryOffsetAndLock("127.0.0.1:" + this.refId, bucket, this.getQueueId());

    // 如果當(dāng)前待抓取的 queueId 已經(jīng)被占用,則直接切換到下一個主題
    if (offsetInQueue == -2) {
        // 將當(dāng)前主題設(shè)置為 true
        this.isFinishedTable.put(bucket, true);

        // 重置當(dāng)前的 LastIndex 或者 RefOffset,即 queueId
        this.resetLastIndexOrRefOffsetWhenNotFound();

    } else {

        // 獲取到了有效的隊列偏移量之后,開始嘗試獲取消息
        consumerOffsetTable.put(bucket, new AtomicLong(offsetInQueue));

        // 設(shè)置每次最多抓一個文件內(nèi)包含的消息數(shù),等價于變相的一次性讀完,注意,這里的數(shù)目還受到單個文件尺寸的限制
        GetMessageResult getMessageResult = localMessageStore.getMessage(bucket, this.getQueueId(), this.consumerOffsetTable.get(bucket).get() + 1, mapedFileSizeConsumeQueue / ConsumeQueue.CQ_STORE_UNIT_SIZE);

        // 如果沒有找到數(shù)據(jù),則切換到下一個
        if (getMessageResult.getStatus() != GetMessageStatus.FOUND) {

            // 將當(dāng)前主題設(shè)置為 true
            this.isFinishedTable.put(bucket, true);

            this.resetLastIndexOrRefOffsetWhenNotFound();

        } else {

            // 這里不考慮 Consumer 被惡意干掉的情況,因此直接更新遠(yuǎn)端的 Offset 值
            localMessageStore.getConsumerScheduler().updateOffset("127.0.0.1:" + this.refId, bucket, this.getQueueId(), consumerOffsetTable.get(bucket).addAndGet(getMessageResult.getMessageCount()));

            // 首先從文件系統(tǒng)中一次性讀出所有的消息
            ArrayList messages = readMessagesFromGetMessageResult(getMessageResult);

            // 將消息添加到隊列中
            this.messages.addAll(messages);

            // 本次抓取成功后才開始抓取下一個
            lastIndex++;

        }
    }

}
消費(fèi)者調(diào)度

ConsumerScheduler 為我們提供了核心的消費(fèi)者調(diào)度功能,其內(nèi)置的 ConsumerOffsetManager 包含了兩個核心存儲:

// 存放映射到內(nèi)存中
private ConcurrentHashMap> offsetTable =
        new ConcurrentHashMap>(512);

// 存放某個 Topic 下面的某個 Queue 被某個 Consumer 占用的信息
private ConcurrentHashMap> queueIdOccupiedByConsumerTable =
        new ConcurrentHashMap>(512);

分別對應(yīng)了某個 ConsumeQueue 被消費(fèi)的進(jìn)度和被消費(fèi)者的占用信息。同時 ConsumerOffsetManager 還提供了基于 JSON 格式的持久化功能,并且通過 ConsumerScheduler 中的定期服務(wù) scheduledExecutorService 進(jìn)行自動定期持久化。在消息提交階段,LocalMessageQueue 會自動調(diào)用 updateOffset 函數(shù)更初始化某個 ConsumeQueue 的偏移情況(在恢復(fù)時也會使用):

public void updateOffset(final String topic, final int queueId, final long offset) {

    this.consumerOffsetManager.commitOffset("Broker Inner", topic, queueId, offset);

}

而某個 Consumer 在初次拉取時,會調(diào)用 queryOffsetAndLock 函數(shù)來查詢某個 ConsumeQueue 的可拉取情況:

/**
    * Description 修正某個 ConsumerOffset 隊列中的值
    *
    * @param topic
    * @param queueId
    * @return
    */
public long queryOffsetAndLock(final String clientHostAndPort, final String topic, final int queueId) {

    String key = topic;

    // 首先判斷該 Topic-queueId 是否被占用
    if (this.queueIdOccupiedByConsumerTable.containsKey(topic)) {

        ...
    }

    // 如果沒有被占用,則此時宣告占用
    ConcurrentHashMap consumerQueueIdMap = this.queueIdOccupiedByConsumerTable.get(key);

    ...

    // 真實(shí)進(jìn)行查找操作
    ConcurrentHashMap map = this.offsetTable.get(key);
    if (null != map) {
        Long offset = map.get(queueId);
        if (offset != null)
            return offset;
    }

    // 默認(rèn)返回值為 -1
    return -1;
}

并且在拉取完畢后調(diào)用 updateOffset 函數(shù)來更新拉取進(jìn)度。

消息讀取

在某個 Consumer 通過 ConsumerManager 獲取可用的拉取偏移量之后,即從 LocalMessageQueue 中進(jìn)行真實(shí)地消息讀取操作:

/**
    * Description Consumer 從存儲中讀取數(shù)據(jù)的接口
    *
    * @param topic
    * @param queueId
    * @param offset     下一個開始抓取的起始下標(biāo)
    * @param maxMsgNums
    * @return
    */
public GetMessageResult getMessage(final String topic, final int queueId, final long offset, final int maxMsgNums) {

        ...

        // 根據(jù) Topic 與 queueId 構(gòu)建消費(fèi)者隊列
        ConsumeQueue consumeQueue = findConsumeQueue(topic, queueId);


        // 保證當(dāng)前 ConsumeQueue 存在
        if (consumeQueue != null) {

            // 獲取當(dāng)前 ConsumeQueue 中包含的最小的消息在 MessageStore 中的位移
            minOffset = consumeQueue.getMinOffsetInQueue();

            // 注意,最大的位移地址即是不可達(dá)地址,是當(dāng)前所有消息的下一個消息的下標(biāo)
            maxOffset = consumeQueue.getMaxOffsetInQueue();

            // 如果 maxOffset 為零,則表示沒有可用消息
            if (maxOffset == 0) {
                status = GetMessageStatus.NO_MESSAGE_IN_QUEUE;
                nextBeginOffset = 0;
            } else if (offset < minOffset) {
                status = GetMessageStatus.OFFSET_TOO_SMALL;
                nextBeginOffset = minOffset;
            } else if (offset == maxOffset) {
                status = GetMessageStatus.OFFSET_OVERFLOW_ONE;
                nextBeginOffset = offset;
            } else if (offset > maxOffset) {
                status = GetMessageStatus.OFFSET_OVERFLOW_BADLY;
                if (0 == minOffset) {
                    nextBeginOffset = minOffset;
                } else {
                    nextBeginOffset = maxOffset;
                }
            } else {

                // 根據(jù)偏移量獲取當(dāng)前 ConsumeQueue 的緩存
                SelectMappedBufferResult bufferConsumeQueue = consumeQueue.getIndexBuffer(offset);

                if (bufferConsumeQueue != null) {
                    try {
                        status = GetMessageStatus.NO_MATCHED_MESSAGE;

                        long nextPhyFileStartOffset = Long.MIN_VALUE;
                        long maxPhyOffsetPulling = 0;

                        int i = 0;

                        // 設(shè)置每次獲取的最大消息數(shù)
                        final int maxFilterMessageCount = Math.max(16000, maxMsgNums * ConsumeQueue.CQ_STORE_UNIT_SIZE);

                        // 遍歷所有的 Consume Queue 中的消息指針
                        for (; i < bufferConsumeQueue.getSize() && i < maxFilterMessageCount; i += ConsumeQueue.CQ_STORE_UNIT_SIZE) {
                            long offsetPy = bufferConsumeQueue.getByteBuffer().getLong();
                            int sizePy = bufferConsumeQueue.getByteBuffer().getInt();

                            maxPhyOffsetPulling = offsetPy;

                            if (nextPhyFileStartOffset != Long.MIN_VALUE) {
                                if (offsetPy < nextPhyFileStartOffset)
                                    continue;
                            }

                            boolean isInDisk = checkInDiskByCommitOffset(offsetPy, maxOffsetPy);

                            if (isTheBatchFull(sizePy, maxMsgNums, getResult.getBufferTotalSize(), getResult.getMessageCount(),
                                    isInDisk)) {
                                break;
                            }

                            // 從 MessageStore 中獲取消息
                            SelectMappedBufferResult selectResult = this.messageStore.getMessage(offsetPy, sizePy);

                            // 如果沒有獲取到數(shù)據(jù),則切換到下一個文件繼續(xù)
                            if (null == selectResult) {
                                if (getResult.getBufferTotalSize() == 0) {
                                    status = GetMessageStatus.MESSAGE_WAS_REMOVING;
                                }

                                nextPhyFileStartOffset = this.messageStore.rollNextFile(offsetPy);
                                continue;
                            }

                            // 如果獲取到了,則返回結(jié)果
                            getResult.addMessage(selectResult);
                            status = GetMessageStatus.FOUND;
                            nextPhyFileStartOffset = Long.MIN_VALUE;
                        }

                        nextBeginOffset = offset + (i / ConsumeQueue.CQ_STORE_UNIT_SIZE);

                        long diff = maxOffsetPy - maxPhyOffsetPulling;

                        // 獲取當(dāng)前內(nèi)存情況
                        long memory = (long) (getTotalPhysicalMemorySize()
                                * (LocalMessageQueueConfig.accessMessageInMemoryMaxRatio / 100.0));

                        getResult.setSuggestPullingFromSlave(diff > memory);

                    } finally {

                        bufferConsumeQueue.release();
                    }
                } else {
                    status = GetMessageStatus.OFFSET_FOUND_NULL;
                    nextBeginOffset = consumeQueue.rollNextFile(offset);
                    log.warning("consumer request topic: " + topic + "offset: " + offset + " minOffset: " + minOffset + " maxOffset: "
                            + maxOffset + ", but access logic queue failed.");
                }
            }
        } else {
            ...
        }

        ...

}

注意,這里返回的其實(shí)只是消息在 MessageStore 中的存放地址,真實(shí)地消息讀取還需要通過 readMessagesFromGetMessageResult 函數(shù):

/**
    * Description 從 GetMessageResult 中抓取全部的消息
    *
    * @param getMessageResult
    * @return
    */
public static ArrayList readMessagesFromGetMessageResult(final GetMessageResult getMessageResult) {

    ArrayList messages = new ArrayList<>();

    try {
        List messageBufferList = getMessageResult.getMessageBufferList();
        for (ByteBuffer bb : messageBufferList) {

            messages.add(readMessageFromByteBuffer(bb));
        }
    } finally {
        getMessageResult.release();
    }

    // 獲取字節(jié)數(shù)組

    return messages;
}

/**
    * Description 從輸入的 ByteBuffer 中反序列化消息對象
    *
    * @return 0 Come the end of the file // >0 Normal messages // -1 Message checksum failure
    */
public static DefaultBytesMessage readMessageFromByteBuffer(java.nio.ByteBuffer byteBuffer) {

    // 1 TOTAL SIZE
    int totalSize = byteBuffer.getInt();

    // 2 MAGIC CODE
    int magicCode = byteBuffer.getInt();

    switch (magicCode) {
        case MESSAGE_MAGIC_CODE:
            break;
        case BLANK_MAGIC_CODE:
            return null;
        default:
            log.warning("found a illegal magic code 0x" + Integer.toHexString(magicCode));
            return null;
    }

    byte[] bytesContent = new byte[totalSize];

    ...


}
后記

端午前后即已停止代碼編寫,原以為周把時間可以完成文檔編寫;可惜畢業(yè)旅行和畢業(yè)聚會一直拖到了七月,最后也是匆匆寫完,也是我個人拖延癌晚期,不由感慨啊。

文章版權(quán)歸作者所有,未經(jīng)允許請勿轉(zhuǎn)載,若此文章存在違規(guī)行為,您可以聯(lián)系管理員刪除。

轉(zhuǎn)載請注明本文地址:http://specialneedsforspecialkids.com/yun/67284.html

相關(guān)文章

  • 高并發(fā)異步解耦利器:RocketMQ究竟強(qiáng)在哪里?

    摘要:它是阿里巴巴于年開源的第三代分布式消息中間件。是一個分布式消息中間件,具有低延遲高性能和可靠性萬億級別的容量和靈活的可擴(kuò)展性,它是阿里巴巴于年開源的第三代分布式消息中間件。上篇文章消息隊列那么多,為什么建議深入了解下RabbitMQ?我們講到了消息隊列的發(fā)展史:并且詳細(xì)介紹了RabbitMQ,其功能也是挺強(qiáng)大的,那么,為啥又要搞一個RocketMQ出來呢?是重復(fù)造輪子嗎?本文我們就帶大家來詳...

    tainzhi 評論0 收藏0
  • java篇

    摘要:多線程編程這篇文章分析了多線程的優(yōu)缺點(diǎn),如何創(chuàng)建多線程,分享了線程安全和線程通信線程池等等一些知識。 中間件技術(shù)入門教程 中間件技術(shù)入門教程,本博客介紹了 ESB、MQ、JMS 的一些知識... SpringBoot 多數(shù)據(jù)源 SpringBoot 使用主從數(shù)據(jù)源 簡易的后臺管理權(quán)限設(shè)計 從零開始搭建自己權(quán)限管理框架 Docker 多步構(gòu)建更小的 Java 鏡像 Docker Jav...

    honhon 評論0 收藏0
  • rocketmq簡介

    摘要:和之間的關(guān)系通過來綁定,來定義,即相同的,等于表示節(jié)點(diǎn),非表示節(jié)點(diǎn)。所有的節(jié)點(diǎn)與集群的所有節(jié)點(diǎn)保持長連接,定時注冊信息到所有的。對磁盤的訪問串行化,避免磁盤竟?fàn)帲粫驗殛犃性黾訉?dǎo)致增高。要保證與完全的一致,增加了編程的復(fù)雜度。 Apache RocketMQ?是一個開源的分布式消息和流數(shù)據(jù)平臺。 1、既然是消息系統(tǒng),最核心的功能就是要提供消息的發(fā)布與訂閱功能,最簡單的概念模型如下: ...

    chnmagnus 評論0 收藏0

發(fā)表評論

0條評論

kel

|高級講師

TA的文章

閱讀更多
最新活動
閱讀需要支付1元查看
<