摘要:最近查看文檔發(fā)現(xiàn)有個功能是我們之前沒有留意到的但是有著很高的潛在實(shí)用價值什么是中的每一條數(shù)據(jù)都有一對和數(shù)據(jù)存放在磁盤上一般不會被永久保留而是在到達(dá)一定的量或者時間后對最早寫入的數(shù)據(jù)進(jìn)行刪除在默認(rèn)的刪除規(guī)則之外提供了另一種刪除過時數(shù)據(jù)或者說保
最近查看Kafka文檔, 發(fā)現(xiàn) Kafka 有個 Log Compaction 功能是我們之前沒有留意到的, 但是有著很高的潛在實(shí)用價值.
什么是Log CompactionKafka 中的每一條數(shù)據(jù)都有一對 Key 和 Value, 數(shù)據(jù)存放在磁盤上, 一般不會被永久保留, 而是在到達(dá)一定的量或者時間后對最早寫入的數(shù)據(jù)進(jìn)行刪除. Log Compaction 在默認(rèn)的刪除規(guī)則之外提供了另一種刪除過時數(shù)據(jù)(或者說保留有價值的數(shù)據(jù))的方式, 就是對于有相同 Key 的不同數(shù)據(jù), 只保留最后一條, 前面的數(shù)據(jù)在合適的情況下刪除.
Log Compaction 的應(yīng)用場景Log Compaction 特性, 就實(shí)時計(jì)算而言, 可以在災(zāi)難恢復(fù)方面有很好地應(yīng)用場景. 比如說我們在 Storm 里做計(jì)算時, 需要長期在內(nèi)存里維護(hù)一些數(shù)據(jù), 這些數(shù)據(jù)可能是通過聚合了一天或者一周的日志得到的, 這些數(shù)據(jù)一旦由于偶然的原因(磁盤,網(wǎng)絡(luò)等)崩潰了, 從頭開始計(jì)算需要漫長的時間.一個可行的應(yīng)對方法是定時將內(nèi)存里的數(shù)據(jù)備份到外部存儲中, 比如 Redis 或者 Mysql 等, 當(dāng)崩潰發(fā)生的時候再從外部存儲讀回來繼續(xù)計(jì)算.
使用 Log Compaction 來代替這些外部存儲有以下好處.
Kafka 既是數(shù)據(jù)源又是存儲工具, 可以簡化技術(shù)棧, 降低維護(hù)成本.
使用 Mysql 或者 Redis 作為外部存儲的話, 需要將存儲的 Key 記錄下來, 恢復(fù)時再用這些 Key 將數(shù)據(jù)取回, 實(shí)現(xiàn)起來有一定的工程復(fù)雜度. 用Log Compaction 特性的話只要把數(shù)據(jù)一股腦兒地寫進(jìn) Kafka, 等災(zāi)難恢復(fù)的時候再讀回內(nèi)存就行了.
Kafka 針對磁盤讀寫都有很高的順序性, 相對于 Mysql 沒有索引查詢等工作量的負(fù)擔(dān), 可以實(shí)現(xiàn)高性能, 相對于 Redis 而言, 它可以充分利用廉價的磁盤而對內(nèi)存要求很低, 在接近的性能下能實(shí)現(xiàn)非常高的性價比(僅僅針對災(zāi)難恢復(fù)這個場景而言).
實(shí)現(xiàn)方式的簡要介紹當(dāng) topic 的 cleanup.policy (默認(rèn)為delete) 設(shè)置為 compact 時, Kafka 的后臺線程會定時把 topic 遍歷兩次, 第一次把每個 key 的哈希值最后一次出現(xiàn)的 offset 都存下來, 第二次檢查每個 offset 對應(yīng)的 key 是否在更后面的日志中出現(xiàn)過,如果出現(xiàn)了就刪除對應(yīng)的日志.
源碼解析Log Compaction 的大部分功能由CleanerThread完成, 核心邏輯在 Cleaner 的 clean方法
/** * Clean the given log * * @param cleanable The log to be cleaned * * @return The first offset not cleaned and the statistics for this round of cleaning */ private[log] def clean(cleanable: LogToClean): (Long, CleanerStats) = { val stats = new CleanerStats() info("Beginning cleaning of log %s.".format(cleanable.log.name)) val log = cleanable.log // build the offset map info("Building offset map for %s...".format(cleanable.log.name)) val upperBoundOffset = cleanable.firstUncleanableOffset buildOffsetMap(log, cleanable.firstDirtyOffset, upperBoundOffset, offsetMap, stats) // <----- 這里第一次遍歷所有offset將key索引 val endOffset = offsetMap.latestOffset + 1 stats.indexDone() // figure out the timestamp below which it is safe to remove delete tombstones // this position is defined to be a configurable time beneath the last modified time of the last clean segment val deleteHorizonMs = log.logSegments(0, cleanable.firstDirtyOffset).lastOption match { case None => 0L case Some(seg) => seg.lastModified - log.config.deleteRetentionMs } // determine the timestamp up to which the log will be cleaned // this is the lower of the last active segment and the compaction lag val cleanableHorizonMs = log.logSegments(0, cleanable.firstUncleanableOffset).lastOption.map(_.lastModified).getOrElse(0L) // group the segments and clean the groups info("Cleaning log %s (cleaning prior to %s, discarding tombstones prior to %s)...".format(log.name, new Date(cleanableHorizonMs), new Date(deleteHorizonMs))) for (group <- groupSegmentsBySize(log.logSegments(0, endOffset), log.config.segmentSize, log.config.maxIndexSize)) cleanSegments(log, group, offsetMap, deleteHorizonMs, stats) // <-- 這里第二次遍歷所有offset,刪除冗余的日志,并且將多個小的segment合并為一個 // record buffer utilization stats.bufferUtilization = offsetMap.utilization stats.allDone() (endOffset, stats) }
log compaction 通過兩次遍歷所有數(shù)據(jù)來實(shí)現(xiàn), 兩次遍歷之間交流的媒介就是一個
OffsetMap, 下面是OffsetMap的簽名
trait OffsetMap { def slots: Int def put(key: ByteBuffer, offset: Long) def get(key: ByteBuffer): Long def clear() def size: Int def utilization: Double = size.toDouble / slots def latestOffset: Long }
這基本就是個普通的mutable map, 在 Kafka 項(xiàng)目中,它的實(shí)現(xiàn)只有一個, 叫做SkimpyOffsetMap
put方法put 方法會為每個 key 生成一份摘要,默認(rèn)使用 md5 方法生成一個 16byte 的摘要, 根據(jù)這個摘要在 bytes 中哈希的到一個下標(biāo), 如果這個下標(biāo)已經(jīng)被別的摘要占據(jù), 則線性查找到下個空余的下標(biāo)為止, 然后在對應(yīng)位置插入該 key 對應(yīng)的 offset
/** * Associate this offset to the given key. * @param key The key * @param offset The offset */ override def put(key: ByteBuffer, offset: Long) { require(entries < slots, "Attempt to add a new entry to a full offset map.") lookups += 1 hashInto(key, hash1) // probe until we find the first empty slot var attempt = 0 var pos = positionOf(hash1, attempt) while(!isEmpty(pos)) { bytes.position(pos) bytes.get(hash2) if(Arrays.equals(hash1, hash2)) { // we found an existing entry, overwrite it and return (size does not change) bytes.putLong(offset) lastOffset = offset return } attempt += 1 pos = positionOf(hash1, attempt) } // found an empty slot, update it--size grows by 1 bytes.position(pos) bytes.put(hash1) bytes.putLong(offset) lastOffset = offset entries += 1 }get方法
get 方法使用和 put 同樣的摘要算法獲得 key 的摘要, 通過摘要獲得 offset 的存儲位置
/** * Get the offset associated with this key. * @param key The key * @return The offset associated with this key or -1 if the key is not found */ override def get(key: ByteBuffer): Long = { lookups += 1 hashInto(key, hash1) // search for the hash of this key by repeated probing until we find the hash we are looking for or we find an empty slot var attempt = 0 var pos = 0 //we need to guard against attempt integer overflow if the map is full //limit attempt to number of slots once positionOf(..) enters linear search mode val maxAttempts = slots + hashSize - 4 do { if(attempt >= maxAttempts) return -1L pos = positionOf(hash1, attempt) bytes.position(pos) if(isEmpty(pos)) return -1L bytes.get(hash2) attempt += 1 } while(!Arrays.equals(hash1, hash2)) bytes.getLong() }可能的空間問題 性能問題 沖突問題 空間問題
默認(rèn)情況下, Kafka 用 16 個 byte 存放key的摘要, 用 8 個 byte 存放摘要對應(yīng)的 offset, 1GB 的空間可以保存 1024* 1024*1024 / 24 = 44,739,242.666... 個 key 對應(yīng)的數(shù)據(jù).
性能問題這個 log compaction 的原理挺簡單, 就是定期把所有日志讀兩遍,寫一遍, cpu 的速度超過磁盤完全不是問題, 只要日志的量對應(yīng)的讀兩遍寫一遍的時間在可接受的范圍內(nèi), 它的性能就是可以接受的.
沖突問題現(xiàn)在的 OffsetMap 唯一的實(shí)現(xiàn)名字叫做 SkimpyOffsetMap, 相信你們已經(jīng)從這個名字里看出端倪, 最初的作者本身也認(rèn)為這樣的實(shí)現(xiàn)不夠嚴(yán)謹(jǐn). 這個算法在兩個 key 的 md5 值相同的情況下就判斷 key 是相同的, 如果遇到了 key 不同而 md5 值相同的情況, 那兩個 key 中其中一個的消息就丟失了. 雖然 md5 值相同的概率很低, 但如果真的碰上了, 那就是100%, 概率值再低也沒用, 而且從網(wǎng)上的反映看似乎沖突還不少見.
我個人目前想到的處理方案是, 大部分的 key 總長度并不算長, 可以把這類 key 所有可能的情況都md5一遍看一下是否有沖突, 如果沒有的話就放心用.
文章版權(quán)歸作者所有,未經(jīng)允許請勿轉(zhuǎn)載,若此文章存在違規(guī)行為,您可以聯(lián)系管理員刪除。
轉(zhuǎn)載請注明本文地址:http://specialneedsforspecialkids.com/yun/66390.html
摘要:所以消息可以重復(fù)的放入不同的隊(duì)列中。而是對于消息來說的,在其發(fā)送消息到交換器時,需指定。與發(fā)布訂閱模式的相同點(diǎn)是可以將消息重復(fù)發(fā)送。它需要處理低延遲的傳遞,用于支持傳統(tǒng)的消息傳遞系統(tǒng)用例。 理解概念的一個方法 之前說過學(xué)習(xí)一個新的東西,最核心的就是掌握概念。而如何掌握概念呢?我的其中一個方法就是對比,把相似且模糊不清的兩個概念進(jìn)行對比,這樣就理解更快。 RabbitMQ模式 Rabbi...
閱讀 3711·2023-04-25 22:43
閱讀 3707·2021-09-06 15:15
閱讀 1332·2019-08-30 15:54
閱讀 3543·2019-08-30 14:20
閱讀 2884·2019-08-29 17:16
閱讀 3117·2019-08-29 15:28
閱讀 3397·2019-08-29 11:08
閱讀 1071·2019-08-28 18:05