国产xxxx99真实实拍_久久不雅视频_高清韩国a级特黄毛片_嗯老师别我我受不了了小说

資訊專欄INFORMATION COLUMN

一個線程安全的 lrucache 實現(xiàn) --- 讀 leveldb 源碼

widuu / 3837人閱讀

摘要:在閱讀的源代碼的時候,發(fā)現(xiàn)其中的類正是一個線程安全的實現(xiàn),代碼非常優(yōu)雅。至此一個線程安全的類就已經(jīng)全部實現(xiàn),在中使用的緩存是,其實就是聚合多個實例,真正的邏輯都在類中。

緩存是計算機的每一個層次中都是一個非常重要的概念,緩存的存在可以大大提高軟件的運行速度。Least Recently Used(lru) cache 即最近最久未使用的緩存,多見與頁面置換算法,lru 緩存算法在緩存的大小達(dá)到最大值之后,換出最早未被使用的緩存。

在閱讀 leveldb 的源代碼的時候,發(fā)現(xiàn)其中的 cache 類正是一個線程安全的 lru-cache 實現(xiàn),代碼非常優(yōu)雅。筆者讀完之后受益良多,希望借助這篇博客,一來可以加深自己的理解,而來可以與大家共享這些優(yōu)秀代碼(PS:因為這個 cache 類主要是為了支持 leveldb 的實現(xiàn),所以其實現(xiàn)的接口可能與其他常見緩存有所差別)。

leveldb 中的 cache 提供的并不是一個類似于 std::map 的模板類,其有一下幾個特點:

鍵值必須為 string-like (代碼中為 Slice 類).

存儲的內(nèi)容為 void *.所以支持存儲任何類型的數(shù)據(jù),因為存儲的是指針,所以 cache 類負(fù)責(zé)管理其所有 entries 的生命周期以及資源回收。在插入的時候必須明確傳入 deleter.

對 entries 的查詢需要先獲取 Handle(像pthread_t一樣的一個對客戶端透明的結(jié)構(gòu),如果客戶端不再使用,必4. 須要顯示的調(diào)用Cache::Release(handle).不直接提供訪問 entries 的接口,防止了客戶端修改,但引入了一次額外的函數(shù)調(diào)用(Cache::Value(hanle)).

使用兩個雙向鏈表來分別存儲被客戶端使用的 entries 和未被使用的 entries.

NewId() 接口可以生成一個唯一的 id,多線程環(huán)境下可以使用這個 id 與自己的鍵值拼接起來,防止不同線程之間互相覆寫.

使用很直觀的 mutex 類提供線程安全性.

不止巧妙,而且代碼的可讀性非常之高的,下面讓我們來看一下主要的代碼(完整代碼可以參見 leveldb 源代碼或者本人的 github)。
class Cache {
 public:
  Cache() { }

  // 需要調(diào)用插入時的 deleter 銷毀所有 entries
  virtual ~Cache();

  // 每個 entry 對應(yīng)的 handle,客戶端只能通過調(diào)用 this->Value(handle) 來獲取緩存的內(nèi)容,不可以直接訪問緩存
  struct Handle { };

  // 插入一個 key->value 的映射,支持通過 charge 為不同的緩存設(shè)定不同的權(quán)重。
  // 返回一個 entry 對應(yīng)的 handle 指針到客戶端,客戶端不需要使用后必須顯示的調(diào)用 this->Release(handle),
  // 當(dāng)緩存要被換出的時候,會調(diào)用傳入的 deleter
  virtual Handle *Insert(const Slice &key, void *value, size_t charge,
                         void (*deleter)(const Slice &key, void *value)) = 0;

  // 查詢接口,如果緩存不存在,返回 NULL 指針。否則返回 entry 對應(yīng)的 handle 指針,同樣必須調(diào)用 this->Release(handle)
  virtual Handle *Lookup(const Slice &key) = 0;

  // 釋放由 Insert 或者 Lokkup 返回的 handle。Release a mapping returned by a previous Lookup().
  // NOTE: 1.僅能在一個 handle 上調(diào)用一次 this->Release(handle)
  // NOTE: 2.handle 在調(diào)用該方法后不能再使用(handle 指針會被 free 掉)
  virtual void Release(Handle *handle) = 0;

  // 獲取緩存內(nèi)容
  virtual void *Value(Handle *hanle) = 0;

  // 見上面第6點
  virtual uint64_t NewId() = 0;

  // 清楚所有的未使用緩存
  virtual void Prune() { }

  // 查看緩存使用總量
  virtual size_t TotalCharge() const = 0;

 private:
  // No copying allowed
  Cache(const Cache &);
  void operator=(const Cache &);
};
因為 leveldb 中支持對定制自己的緩存類,所以其頭文件中的 Cache 是一個虛擬基類,更像是一個接口定義,《effective c++》中稱之為 Interface class。每個接口的含義我都已經(jīng)加上了注釋,為了保證除了 Cache 類,沒有人可以操作緩存數(shù)據(jù)(這是非常重要的一點,因為 Cache 中存儲的都是 void *,Cache 類必須要確保它自己唯一管理這些緩存對象生命周期),這里使用了 Handle(句柄)。雖然在查詢的時候額外引入了一層間接層,但是可以充分保證數(shù)據(jù)安全。

下面讓我們看一下實現(xiàn)代碼:

struct LRUHandle { // 這便是返回的 Handle,在返回的時候會 return reinterpret_cast(handle)
   void *value; 
   void (*deleter)(const Slice &, void *);
   LRUHandle *next_hash; // 給自己實現(xiàn)的 hahs_table 使用;
   LRUHandle *next; // 雙向鏈表
   LRUHandle *prev; // 雙向鏈表
   size_t charge; // 權(quán)重
   size_t key_length; // key 長度
   bool in_cache; // 是否被緩存(我們的類可以通過將 capacity 設(shè)置為0來關(guān)閉緩存)
   uint32_t refs; // 引用計數(shù)
   uint32_t hash; // 我們要將 hash 值緩存起來,防止需要重復(fù) hash 計算
   char key_data[1]; // 柔性數(shù)組,存儲鍵

   Slice key() const {
     assert(next != this); // Hold true iff this is the head of an empty list
     return Slice(key_data, key_length);
   }
 };
代碼中 Slice 是 google 項目常見的一個工具類,其實現(xiàn)非常簡單,可以認(rèn)為它僅僅是對一個字符串的引用(因為它自己不管理內(nèi)存,咱們在這里可以簡單的把它當(dāng)做 std::string 也是可以的,它可以使用的地方都可以使用 std::string(反過來不成立))。

可以看到我們這個 LRUHandle 類中的數(shù)據(jù)還是不少的,在后面的實現(xiàn)代碼中我們能看到它們的用武之地。下面要看的代碼是 levedb 自己實現(xiàn)的一個很短小精悍的 hash_table,其使用開鏈法解決沖突,代碼真的很優(yōu)雅,很值得一讀:

class HandleTable {
 public:
   HandleTable() : length_(0), elems_(0), list_(NULL) { Resize(); }
   ~HandleTable() { delete[] list_; }

   LRUHandle *Lookup(const Slice &key, uint32_t hash) {
     return *FindPointer(key, hash);
   }

   // Insert into the last position
   LRUHandle *Insert(LRUHandle *h) {
     LRUHandle **ptr = FindPointer(h->key(), h->hash);
     LRUHandle *old = *ptr;
     h->next_hash = (old == NULL ? NULL : old->next_hash);
     *ptr = h;
     if (old == NULL) {
       ++elems_;
       if (elems_ > length_) {
         Resize();
       }
     }
     return old;
   }

   LRUHandle *Remove(const Slice &key, uint32_t hash) {
     LRUHandle **ptr = FindPointer(key, hash);
     LRUHandle *result = *ptr;
     if (result != NULL) {
       *ptr = result->next_hash;
       --elems_;
     }
     return result;
   }
 private:
   uint32_t length_; // hash table 的 bucket 數(shù)
   uint32_t elems_; // hash table 中 entry 數(shù)
   LRUHandle **list_; // hash bucket 數(shù)組

   LRUHandle **FindPointer(const Slice &key, uint32_t hash) { 
     assert(list_ != NULL);
     LRUHandle **ptr = &list_[hash & (length_ - 1)]; // 找到對應(yīng)鍵的桶,取第一個 entry
     while (*ptr != NULL &&
            ((*ptr)->hash != hash || (*ptr)->key() != key)) {
       ptr = &(*ptr)->next_hash; // 注意,我們返回的不是一個 LRUHandle *,而是一個 LRUHandle **(&(LRUHandle::next_hash))
     }
     return ptr;
   }

   void Resize() {
     uint32_t new_length = 4; // length_ 一定是 2 的冪次,這樣可以使用 hash & (length_ - 1) 來算出 hash % length_
     while (new_length < elems_) new_length <<= 1; // 算出新 length,因為 length > elems_; 所以裝載因子<1
     LRUHandle **new_list = new LRUHandle *[new_length]; 
     memset(new_list, 0, sizeof(new_list[0]) * new_length);
     uint32_t count = 0;
     for (uint32_t i = 0; i < length_; i++) { // 遍歷所有的桶,為桶內(nèi)元素搬家
       LRUHandle *h = list_[i]; // 每個桶的第一個 entry
       while (h != NULL) {
         LRUHandle *next = h->next_hash; // 先保存起來 next
         uint32_t hash = h->hash;
         LRUHandle **ptr = &new_list[hash & (new_length-1)];  // 計算在新表中的桶下標(biāo),取桶內(nèi)第一個 entry
         h->next_hash = *ptr; // 頭插法
         *ptr = h;
         h = next;
         ++count;
       }
     }
     assert(count == elems_);
     delete[] list_; // 更新 list_ 和 length_
     list_ = new_list;
     length_ = new_length;
   }
};
上面的代碼真的寫的太好了,筆者都有怎么讀都讀不夠的感覺啊,谷歌工程師,真的太強了。簡要的說下最重要的兩個接口:Resize() 和 FindPointer()。

先說 Resize()。在 hash_table 中擴容的時候,往往有兩個常見的選擇,一是 bucket 的數(shù)量選用一個素數(shù),這樣可以保證 hash mod 之后的結(jié)果最均勻,還有一種是使用2的冪次結(jié)果作為 bucket 數(shù),后者是通過優(yōu)化 hash % #bucket 這個運算來優(yōu)化(因為 #bucket = 2^n,hash % #bucket 等價于 hash & (length - 1))。在 leveldb 中選用的是后者來優(yōu)化。

然后是 FindPointer(),在讀這份源碼之前,我對鏈表內(nèi)元素的插入和刪除之后使用兩個指針,一前一后,來操作鏈表,但是作者使用一個 LRUHandle **指針,一個指針就解決了對鏈表元素的插入和刪除,真的太巧妙了。我覺得這個代碼都值得多帶帶拿出來講,所以這里并不準(zhǔn)備全部展開,但是推薦大家好好品讀,我也會學(xué)習(xí)這個寫法,自己寫一個 list 實現(xiàn),到時候也會發(fā)到博客,和大家一起交流。

有了 hash_table,我們可以開始實現(xiàn)了 LRUCache 了:

class LRUCache {
 public:
  LRUCache();
  ~LRUCache();

  void SetCapacity(size_t cap) { capacity_ = cap; }

  Cache::Handle *Insert(const Slice &key, uint32_t hash,
                        void *value, size_t charge,
                        void (*deleter)(const Slice &, void *));

  Cache::Handle *Lookup(const Slice &key, uint32_t hash);
  void Release(Cache::Handle *handle);
  void Erase(const Slice &key, uint32_t hash);
  void Prune();
  size_t TotalCharge() const {
    MutexLock l(&mutex_);
    return usage_;
  }

 private:
  void LRU_Remove(LRUHandle *e);
  void LRU_Append(LRUHandle *list, LRUHandle *e);
  void Ref(LRUHandle *e);
  void Unref(LRUHandle *e);
  bool FinishErase(LRUHandle *e);
  // Initialized before use.
  size_t capacity_;

  // Capacity usage guarded by mutex
  mutable Mutex mutex_;
  size_t usage_;

  // 所有未被使用的 entries 都被存儲于 lru_ 這個雙向鏈表中,這里使用 dummy head 的方法實現(xiàn)雙向鏈表
  // lru_.prev 是最新的 entry,lru_.next 是最早的 entry(也就是說換出緩存的時候就從 lru_.next 開刀)
  // lru_ 中的所有 entries 都有 refs == 1 && in_cache == true (僅被 LRUCache 類引用,而且被緩存)
  LRUHandle lru_;

  // 所有現(xiàn)在被使用(客戶端持有至少一個尚未 Release() 的 handle)的 entries 存儲在 in_use_ 雙向鏈表中
  // in_use_ 中 entries 有 refs >= 2 && in_cache = true
  LRUHandle in_use_;

  HandleTable table_; // 用于進行 key->handle 的 mapping
};

LRUCache::LRUCache() : usage_(0) { // 初始化兩個雙向鏈表的 dummy head
  lru_.next = &lru_; 
  lru_.prev = &lru_;
  in_use_.next = &in_use_;
  in_use_.prev = &in_use_;
}

LRUCache::~LRUCache() {
  assert(in_use_.next == &in_use_); // 如果在 LRUCache 在析構(gòu)的時候,還有客戶端持有尚未 Release() 的 handle,那么就會出問題
  LRUHandle *e = lru_.next;
  while (e != &lru_) { // 釋放所有 entries
    LRUHandle *next = e->next;
    assert(e->in_cache); 
    e->in_cache = false;
    assert(e->refs == 1);
    Unref(e);
    e = next;
  }
}

void LRUCache::Ref(LRUHandle *e) { // 增加引用計數(shù),必要的話將其從 lru_. 挪到 in_use_
  if (e->refs == 1 && e->in_cache) { // ++ 后 refs 大于1,代表有客戶端使用
    LRU_Remove(e); // 從 lru_ 中刪除
    LRU_Append(&in_use_, e); // 插入 in_use_ 中
  }
  e->refs++; // 增加引用計數(shù)
}

void LRUCache::Unref(LRUHandle *e) {
  assert(e->refs > 0); // Unref() 調(diào)用前,這個 handle 應(yīng)該是被引用的
  if (--e->refs == 0) { // 無人引用(包括 LRUCache 類),需要調(diào)用其構(gòu)造時傳入的刪除器,管理資源
    assert(!e->in_cache);
    (*e->deleter)(e->key(), e->value);
    free(e); // 注意所有的 handle 都是 heap-allocated,必須要free
  } else if (e->in_cache && e->refs == 1) { // refs==1 代表僅有 LRUCache 類引用
    LRU_Remove(e);  // 從 in_use_ 中刪除
    LRU_Append(&lru_, e); // 插入 lru_ 中
  }
}
// 將元素從雙向鏈表中刪除(刪除并不需要知道其所屬的 list)
void LRUCache::LRU_Remove(LRUHandle *e) {
  e->prev->next = e->next; 
  e->next->prev = e->prev;
}
// 將元素插入到雙向鏈表 head.prev
void LRUCache::LRU_Append(LRUHandle *list, LRUHandle *e) {
  list->prev->next = e;
  e->prev = list->prev;
  e->next = list;
  list->prev = e;
}

Cache::Handle *LRUCache::Lookup(const Slice &key, uint32_t hash) {
  MutexLock l(&mutex_);
  LRUHandle *e = table_.Lookup(key, hash); // 在 hash_table 中查找
  if (e != NULL) Ref(e); // 如果有被緩存起來,增加引用計數(shù)等
  return reinterpret_cast(e); // 將其 cast 為一個 Handle * 返回,還記得 Cache 類中的接口吧?
}

void LRUCache::Release(Cache::Handle *e) {
  MutexLock l(&mutex_);
  Unref(reinterpret_cast(e));
}

Cache::Handle *LRUCache::Insert(
    const Slice &key, uint32_t hash, void *value, size_t charge,
    void (*deleter)(const Slice &, void *)) {
  MutexLock l(&mutex_);

  LRUHandle *e = reinterpret_cast(
      malloc(sizeof(LRUHandle) + (key.size() - 1))); // 這里注意,因為 LRUHandle 中保存了 key_length,所以這里在申請內(nèi)存的時候,并沒有為 "