摘要:執行將異常后的刪除。根據元素個數上限進行清理的策略思路在新緩存值的時候比對下是否緩存容量元素個數已經達到上限,如果達到上限按照算法進行淘汰元素。在讀寫完成后會進行通知源碼會回調進行緩存元素刪除后置處理。
Google Guava LocalLoadingCache 前言
在我們編程的過程中會遇到一些在程序中需要重試使用的數據,在這種情況下我們就可以考慮利用緩存(內存)的優勢來提供程序訪問這些數據的一個性能了。利用了緩存可以在一定程度上緩解很大的性能消耗:
網絡傳輸開銷
數據序列化反序列話
數據庫、文件系統數據訪問慢
緩存器是利用內存進行數據存儲的,在存儲容量上有一定的限制,所以我們在我們使用緩存的時候也分兩種場景:
全量數據緩存
緩存熱數據,這也是基于緩存容量的一個考慮
好了本篇我們就來聊聊寫程序過程中常能用到的本地緩存的方式。
JDK提供的數據結構(Map)緩存數據的存儲格式一般都是以Key-Value的方式,那這里我們主要來討論下Map的實現ConcurrentHashMap實現的緩存。
String key = StringUtils.EMPTY; ConcurrentMaplocalCache = new ConcurrentHashMap(); if(StringUtils.isEmpty(localCache.get(key))) { String value = queryFromDB(key); localCache.put(key,value); return value; } return localCache.get(key);
這樣就能構造一個非常簡單的緩存。
注意:這個緩存還是有非常多的問題
沒有一個清除緩存的策略,最終所有被訪問過得數據都會全量給緩存起來,直到顯式清除。
同時緩存沒命中的情況下需要應用顯式去加載(queryFromDB )。
LocalLoadingCache好了主角要登場了,先簡單介紹下這個cache的一些用法,這個cache比較好的解決了我上面提到通過Map用作緩存的兩個缺陷。
用法LoadingCachegraphs = CacheBuilder.newBuilder() .maximumSize(10000) .expireAfterWrite(10, TimeUnit.MINUTES) .removalListener(MY_LISTENER) .build( new CacheLoader () { public Graph load(Key key) throws AnyException { return createExpensiveGraph(key); } });
通過這種方式一個緩存就已經創建好了,上面定義的load函數在緩存中不存在key對應的value的時候會去執行將數據load放到緩存中。
其底層存儲采用基于數組的java.util.concurrent.atomic.AtomicReferenceArray進行緩存元素的存取。
load如何被加載先分析下load函數是怎么被執行的:graphs.getUnchecked(new Key());從緩存中獲取數據,如果沒有進行put操作,首次get的時候緩存中沒有其緩存值,這個時候必然要觸發load函數進行value load了,那我們就從get函數進行深入分析(分析源碼基于16.0.1)。
com.google.common.cache.LocalCache.Segment#get(K, int, com.google.common.cache.CacheLoader super K,V>) V get(K key, int hash, CacheLoader super K, V> loader) throws ExecutionException { checkNotNull(key); checkNotNull(loader); try { if (count != 0) { // read-volatile // don"t call getLiveEntry, which would ignore loading values ReferenceEntrye = getEntry(key, hash); if (e != null) { long now = map.ticker.read(); V value = getLiveValue(e, now); if (value != null) { recordRead(e, now); statsCounter.recordHits(1); return scheduleRefresh(e, key, hash, value, now, loader); } ValueReference valueReference = e.getValueReference(); if (valueReference.isLoading()) { return waitForLoadingValue(e, key, valueReference); } } } // at this point e is either null or expired; return lockedGetOrLoad(key, hash, loader); } catch (ExecutionException ee) { Throwable cause = ee.getCause(); if (cause instanceof Error) { throw new ExecutionError((Error) cause); } else if (cause instanceof RuntimeException) { throw new UncheckedExecutionException(cause); } throw ee; } finally { postReadCleanup(); } }
首次調用會執行lockedGetOrLoad函數
V lockedGetOrLoad(K key, int hash, CacheLoader super K, V> loader) throws ExecutionException { ReferenceEntrye; ValueReference valueReference = null; LoadingValueReference loadingValueReference = null; boolean createNewEntry = true; lock(); try { // re-read ticker once inside the lock long now = map.ticker.read(); preWriteCleanup(now); int newCount = this.count - 1; AtomicReferenceArray > table = this.table; int index = hash & (table.length() - 1); ReferenceEntry first = table.get(index); for (e = first; e != null; e = e.getNext()) { K entryKey = e.getKey(); if (e.getHash() == hash && entryKey != null && map.keyEquivalence.equivalent(key, entryKey)) { valueReference = e.getValueReference(); if (valueReference.isLoading()) { createNewEntry = false; } else { V value = valueReference.get(); if (value == null) { enqueueNotification(entryKey, hash, valueReference, RemovalCause.COLLECTED); } else if (map.isExpired(e, now)) { // This is a duplicate check, as preWriteCleanup already purged expired // entries, but let"s accomodate an incorrect expiration queue. enqueueNotification(entryKey, hash, valueReference, RemovalCause.EXPIRED); } else { recordLockedRead(e, now); statsCounter.recordHits(1); // we were concurrent with loading; don"t consider refresh return value; } // immediately reuse invalid entries writeQueue.remove(e); accessQueue.remove(e); this.count = newCount; // write-volatile } break; } } if (createNewEntry) { loadingValueReference = new LoadingValueReference (); if (e == null) { e = newEntry(key, hash, first); e.setValueReference(loadingValueReference); table.set(index, e); } else { e.setValueReference(loadingValueReference); } } } finally { unlock(); postWriteCleanup(); } if (createNewEntry) { try { // Synchronizes on the entry to allow failing fast when a recursive load is // detected. This may be circumvented when an entry is copied, but will fail fast most // of the time. synchronized (e) { return loadSync(key, hash, loadingValueReference, loader); } } finally { statsCounter.recordMisses(1); } } else { // The entry already exists. Wait for loading. return waitForLoadingValue(e, key, valueReference); } }
最后調用loadSync(key, hash, loadingValueReference, loader);進行進行數據load。
public ListenableFutureloadFuture(K key, CacheLoader super K, V> loader) { stopwatch.start(); V previousValue = oldValue.get(); try { if (previousValue == null) { V newValue = loader.load(key); return set(newValue) ? futureValue : Futures.immediateFuture(newValue); } ListenableFuture newValue = loader.reload(key, previousValue); if (newValue == null) { return Futures.immediateFuture(null); } // To avoid a race, make sure the refreshed value is set into loadingValueReference // *before* returning newValue from the cache query. return Futures.transform(newValue, new Function () { @Override public V apply(V newValue) { LoadingValueReference.this.set(newValue); return newValue; } }); } catch (Throwable t) { if (t instanceof InterruptedException) { Thread.currentThread().interrupt(); } return setException(t) ? futureValue : fullyFailedFuture(t); } }
執行loader.load將數據load進緩存,可能你會想如果這個時候從DB或其他非內存存儲中也沒找到數據,這個時候LocalLoadingCache是怎么處理的呢?其實在這種情況下只需要throw異常信息就好,這樣LocalLoadingCache會放棄緩存。
但是讀源代碼細心的你可能會發現在lockedGetOrLoad中會先newEntry后面才load
if (createNewEntry) { loadingValueReference = new LoadingValueReference(); if (e == null) { e = newEntry(key, hash, first); e.setValueReference(loadingValueReference); table.set(index, e); } else { e.setValueReference(loadingValueReference); } } finally { unlock(); postWriteCleanup(); } if (createNewEntry) { try { // Synchronizes on the entry to allow failing fast when a recursive load is // detected. This may be circumvented when an entry is copied, but will fail fast most // of the time. synchronized (e) { return loadSync(key, hash, loadingValueReference, loader); } } finally { statsCounter.recordMisses(1); } } else { // The entry already exists. Wait for loading. return waitForLoadingValue(e, key, valueReference); }
其實實現很簡單他在cache到異常信息后又會對緩存中的entry進行remove操作,當時找這段異常被cache的代碼也是找了很久時間了。
com.google.common.cache.LocalCache.Segment#getAndRecordStats V getAndRecordStats(K key, int hash, LoadingValueReferenceloadingValueReference, ListenableFuture newValue) throws ExecutionException { V value = null; try { value = getUninterruptibly(newValue); if (value == null) { throw new InvalidCacheLoadException("CacheLoader returned null for key " + key + "."); } statsCounter.recordLoadSuccess(loadingValueReference.elapsedNanos()); storeLoadedValue(key, hash, loadingValueReference, value); return value; } finally { if (value == null) { statsCounter.recordLoadException(loadingValueReference.elapsedNanos()); removeLoadingValue(key, hash, loadingValueReference); } } }
執行removeLoadingValue將load異常后的key刪除。
緩存策略從用法那小結可以看到我們在創建緩存的時候除了load還有一些其他特性如下:
maximumSize(10000) expireAfterWrite(10, TimeUnit.MINUTES)
這又是什么意思呢?這其實就是LocalLoadingCache提供的緩存策略。
maximumSize(10000) 設置緩存能保存的最多元素數量。
expireAfterWrite(10, TimeUnit.MINUTES) 設置元素在寫后多久進行銷毀。
其實還有maximumWeight、expireAfterAccess兩種元素過期策略。
maximumSize是maximumWeight的一種特殊形式,將所有的元素設置weight為1,也即就轉化為能存儲元素個數的上限值了。
expireAfterAccess和expireAfterWrite基本就一個意思,只是內部用了兩種不同的計數方式(通過不同的queue進行管理,被訪問/修改進行入隊操作)進行訪問、寫操作的記錄。
不多說讓源碼說話。
根據過期時間進行緩存的淘汰策略思路:在進行get/put操作完成后對隊列(每次對緩存的操作頭會被其記錄下來)進行一次遍歷,然后按照過期時間淘汰過期的元素。
根據元素個數上限進行清理的策略思路:在load新緩存值的時候比對下是否緩存容量(元素個數)已經達到上限,如果達到上限按照LRU算法進行淘汰元素。
過期時間淘汰策略
從分析load那小結我們已經展示過get的代碼,其中最后finally中有段postReadCleanup();方法,深入下去方法體就不然看出:
@GuardedBy("Segment.this") void expireEntries(long now) { drainRecencyQueue(); ReferenceEntrye; while ((e = writeQueue.peek()) != null && map.isExpired(e, now)) { if (!removeEntry(e, e.getHash(), RemovalCause.EXPIRED)) { throw new AssertionError(); } } while ((e = accessQueue.peek()) != null && map.isExpired(e, now)) { if (!removeEntry(e, e.getHash(), RemovalCause.EXPIRED)) { throw new AssertionError(); } } }
進行過期key清除策略,從這段代碼也能看出我為什么說expireAfterAccess和expireAfterWrite基本就一個意思了吧。
其實還有一種清除緩存的策略:基于引用的回收但是還沒研究清除不便多說,這個策略清除的時機和過期時間策略一樣。
@GuardedBy("Segment.this") void drainReferenceQueues() { if (map.usesKeyReferences()) { drainKeyReferenceQueue(); } if (map.usesValueReferences()) { drainValueReferenceQueue(); } }
容量回收策略
在新key對應的value load完后需要將value存放到緩存中去,插入完成后會進行容量的check如果超過容量限制會執行淘汰策略。對應源碼:
com.google.common.cache.LocalCache.Segment#storeLoadedValue boolean storeLoadedValue(K key, int hash, LoadingValueReferenceoldValueReference, V newValue) { lock(); try { long now = map.ticker.read(); preWriteCleanup(now); int newCount = this.count + 1; if (newCount > this.threshold) { // ensure capacity expand(); newCount = this.count + 1; } AtomicReferenceArray > table = this.table; int index = hash & (table.length() - 1); ReferenceEntry first = table.get(index); for (ReferenceEntry e = first; e != null; e = e.getNext()) { K entryKey = e.getKey(); if (e.getHash() == hash && entryKey != null && map.keyEquivalence.equivalent(key, entryKey)) { ValueReference valueReference = e.getValueReference(); V entryValue = valueReference.get(); // replace the old LoadingValueReference if it"s live, otherwise // perform a putIfAbsent if (oldValueReference == valueReference || (entryValue == null && valueReference != UNSET)) { ++modCount; if (oldValueReference.isActive()) { RemovalCause cause = (entryValue == null) ? RemovalCause.COLLECTED : RemovalCause.REPLACED; enqueueNotification(key, hash, oldValueReference, cause); newCount--; } setValue(e, key, newValue, now); this.count = newCount; // write-volatile evictEntries(); return true; } // the loaded value was already clobbered valueReference = new WeightedStrongValueReference (newValue, 0); enqueueNotification(key, hash, valueReference, RemovalCause.REPLACED); return false; } } ++modCount; ReferenceEntry newEntry = newEntry(key, hash, first); setValue(newEntry, key, newValue, now); table.set(index, newEntry); this.count = newCount; // write-volatile evictEntries(); return true; } finally { unlock(); postWriteCleanup(); } }
上面的存儲操作最終在進行setValue后會執行:
com.google.common.cache.LocalCache.Segment#evictEntries @GuardedBy("Segment.this") void evictEntries() { if (!map.evictsBySize()) { return; } drainRecencyQueue(); while (totalWeight > maxSegmentWeight) { ReferenceEntrye = getNextEvictable(); if (!removeEntry(e, e.getHash(), RemovalCause.SIZE)) { throw new AssertionError(); } } } // TODO(fry): instead implement this with an eviction head ReferenceEntry getNextEvictable() { for (ReferenceEntry e : accessQueue) { int weight = e.getValueReference().getWeight(); if (weight > 0) { return e; } } throw new AssertionError(); }
這里最終會根據LRU從緩存中將最近沒有使用過的元素進行剔除操作。
最后說下removalListener在LocalLoadingCache中提供了在元素被移除的時候供應用進行回調的函數,這個函數通過removalListener進行注冊,當有元素從緩存中淘汰后就會觸發其進行調用。
接著上面移除元素進行分析函數removeEntry
@GuardedBy("Segment.this") boolean removeEntry(ReferenceEntryentry, int hash, RemovalCause cause) { int newCount = this.count - 1; AtomicReferenceArray > table = this.table; int index = hash & (table.length() - 1); ReferenceEntry first = table.get(index); for (ReferenceEntry e = first; e != null; e = e.getNext()) { if (e == entry) { ++modCount; ReferenceEntry newFirst = removeValueFromChain( first, e, e.getKey(), hash, e.getValueReference(), cause); newCount = this.count - 1; table.set(index, newFirst); this.count = newCount; // write-volatile return true; } } return false; }
最終會調用
@GuardedBy("Segment.this") void enqueueNotification(@Nullable K key, int hash, ValueReferencevalueReference, RemovalCause cause) { totalWeight -= valueReference.getWeight(); if (cause.wasEvicted()) { statsCounter.recordEviction(); } if (map.removalNotificationQueue != DISCARDING_QUEUE) { V value = valueReference.get(); RemovalNotification notification = new RemovalNotification (key, value, cause); map.removalNotificationQueue.offer(notification); } }
將建立一個RemovalNotification隊列進行保存刪除元素。
在讀/寫完成后會進行通知
com.google.common.cache.LocalCache.Segment#postWriteCleanup /** * Performs routine cleanup following a write. */ void postWriteCleanup() { runUnlockedCleanup(); } void cleanUp() { long now = map.ticker.read(); runLockedCleanup(now); runUnlockedCleanup(); }
runUnlockedCleanup源碼會回調com.google.common.cache.RemovalListener#onRemoval進行緩存元素刪除后置處理。
void processPendingNotifications() { RemovalNotification最后類圖一張notification; while ((notification = removalNotificationQueue.poll()) != null) { try { removalListener.onRemoval(notification); } catch (Throwable e) { logger.log(Level.WARNING, "Exception thrown by removal listener", e); } } }
覺得圖不夠清晰可以點擊查看大圖。
本篇也主要是對LocalLoadingCache從運用這個層次更向前走了一步,對我們使用過程其邏輯背后的實現進行了一定深入分析。我在初次看到這個方式也是很疑惑其底層到底是如何實現的,于是有了這篇文章,通過源碼進行跟蹤分析其背后的實現邏輯。
后面還會分析org.springframework.cache.guava.GuavaCacheManager如何將GuavaCache進行管理的,通過和spring更好的結合而消除顯式調用cache get/put的方式。
文章版權歸作者所有,未經允許請勿轉載,若此文章存在違規行為,您可以聯系管理員刪除。
轉載請注明本文地址:http://specialneedsforspecialkids.com/yun/67469.html
摘要:執行將異常后的刪除。根據元素個數上限進行清理的策略思路在新緩存值的時候比對下是否緩存容量元素個數已經達到上限,如果達到上限按照算法進行淘汰元素。在讀寫完成后會進行通知源碼會回調進行緩存元素刪除后置處理。 Google Guava LocalLoadingCache 前言 在我們編程的過程中會遇到一些在程序中需要重試使用的數據,在這種情況下我們就可以考慮利用緩存(內存)的優勢來提供程序訪...
摘要:并且添加了監聽器,當數據被刪除后會打印日志。六總結回顧緩存加載顯示插入緩存回收,定時,,軟弱引用,顯示刪除接口方法,監聽器清理緩存時間只有在獲取數據時才或清理緩存,使用者可以單起線程采用方法主動清理。 摘要: 學習Google內部使用的工具包Guava,在Java項目中輕松地增加緩存,提高程序獲取數據的效率。 一、什么是緩存? 根據科普中國的定義,緩存就是數據交換的緩沖區(稱作Cach...
摘要:并且添加了監聽器,當數據被刪除后會打印日志。六總結回顧緩存加載顯示插入緩存回收,定時,,軟弱引用,顯示刪除接口方法,監聽器清理緩存時間只有在獲取數據時才或清理緩存,使用者可以單起線程采用方法主動清理。 摘要: 學習Google內部使用的工具包Guava,在Java項目中輕松地增加緩存,提高程序獲取數據的效率。一、什么是緩存?根據科普中國的定義,緩存就是數據交換的緩沖區(稱作Cache)...
摘要:依賴這里使用配置配置文件配置配置文件配置使用 maven依賴 org.springframework.boot spring-boot-starter-cache com.google.guava guava 19...
摘要:的配置文件,使用前綴的屬性進行配置。在方法的調用前并不會檢查緩存,方法始終都會被調用。手動使用在實際開發過程中,存在不使用注解,需要自己添加緩存的情況。如果該屬性值為,則表示對象可以無限期地存在于緩存中。 SpringBoot在annotation的層面實現了數據緩存的功能,基于Spring的AOP技術。所有的緩存配置只是在annotation層面配置,像聲明式事務一樣。 Spring...
閱讀 3005·2021-10-12 10:12
閱讀 3052·2021-09-22 16:04
閱讀 3287·2019-08-30 15:54
閱讀 2602·2019-08-29 16:59
閱讀 2902·2019-08-29 16:08
閱讀 868·2019-08-29 11:20
閱讀 3492·2019-08-28 18:08
閱讀 648·2019-08-26 13:43