摘要:購物網站的相關實現需求登錄和緩存對于一個大型網上商店,假設每天都會有大約萬不同的用戶,這些用戶會給網站帶來億次點擊,并從網站購買超過萬件商品。根據給定的令牌查找與之相應的用戶,檢查用戶是否登錄,并返回該用戶的。
購物網站的redis相關實現
對于一個大型網上商店,假設每天都會有大約500萬不同的用戶,這些用戶會給網站帶來1億次點擊,并從網站購買超過10萬件商品。
我們需要存儲用戶登錄信息,用戶的訪問時長和已瀏覽商品的數量,如果將其保存到數據庫中,會導致大量的數據庫寫入。
大多數關系數據庫在每臺數據庫服務器上面每秒只能插入、更新或者刪除200~2000個數據行,盡管批量操作可以以更快的速度執行,但客戶點每次瀏覽網頁都只更新少數幾行數據,所以高速的批量插入在這里并不適用。
而對于負載量相對比較大的系統,譬如平均情況下每秒大約1200次寫入,高峰時期每秒接近6000次寫入,所以它必須部署10臺關系數據庫服務器才能應對高峰期的負載量。
為了提升系統的處理速度,降低資源的占用量,可以將傳統數據庫的一部分數據處理任務以及存儲任務轉交給Redis來完成。
(2)使用redis實現購物車我們把購物車的信息也存儲到Redis,并且使用與用戶會話令牌一樣的cookie id來引用購物車。
將用戶和購物車都存儲到Redis里面,這種做法除了可以減少請求體積外,我們可以根據用戶瀏覽過的商品,用戶放入購物車的商品以及用戶最終購買的商品進行統計計算,并構建起很多大型網絡零售上都在提供的”在查看過這件商品的用戶當中,有X%的用戶最終購買了這件商品“”購買了這件商品的用戶也購買了某某其他商品“等功能,這些功能可以幫助用戶查找其他相關的商品,并最終提升網站的銷售業績。
(3)網頁緩存購物網站上多數頁面實際上并不會經常發生大變化,雖然會向分類中添加新商品、移除舊商品、有時候特價促銷、有時甚至還有”熱賣商品“頁面,但是在一般情況下,網站只有賬號設置、以往訂單、購物車(結賬信息)以及其他少數幾個頁面才包含需要每次載入都要動態生成的內容。
對于不需要動態生成的頁面,我們需要盡量不再生成,減少網站在動態生成內容上面所花的時間,可以降低網站處理相同負載所需的服務器數量,讓網站速度加快。
python應用框架大都存在中間件,我們創建中間件來調用Redis緩存函數:對于不能被緩存的請求,直接生成并返回頁面,對于可以被緩存的請求,先從緩存取出緩存頁面,如果緩存頁面不存在,那么會生成頁面并將其緩存在Redis,最后將頁面返回給函數調用者。
這樣的方式可以讓網站在5分鐘之內無需再為他們動態地生成視圖頁面。
(4) 數據行緩存為了清空舊庫存和吸引客戶消費,決定開始新一輪的促銷活動,每天都會推出一些特價商品供用戶搶購,所有特價商品的數量都是限定的,賣完為止。在這種情況下,網站是不能對整個促銷頁面進行緩存,這會導致用戶看到錯誤的特價商品和商品剩余數,但每次載入頁面都從數據庫中取出特價商品的剩余數量的話,又會給數據庫帶來巨大的壓力。
為了應付促銷活動帶來的大量負載,需要對數據行進行緩存,可以編寫一個持續運行的守護進程函數,讓這個函數將指定的數據行緩存到Redis里面,并不定期地對這些緩存進行更新。緩存函數將數據和編碼為json字典并存儲在Redis的字符串中。
我們還需要使用兩個有序集合來記錄應該在何時對緩存進行更新,第一個有序集合為調度有序集合,成員為數據行的ID,分值為時間戳,記錄應該在何時將制定的數據行緩存到Redis里面。第二個有序集合為延時有序集合,成員為數據行的ID,而分值記錄指定數據行的緩存需要每隔多少秒更新一次。
對于更新頻率,如果數據行記錄的是特價促銷商品的剩余數量,并且參與促銷活動的用戶非常多,那么我么最好每隔幾秒更新一次數據行緩存,如果數據并不經常改變,或者商品缺貨是可以接受的,我們可以每分鐘更新一次緩存。
(5)網頁分析之前對于網頁的緩存,如果網站總共包含100000件商品,貿然緩存所有商品頁面將耗盡整個網站的全部內存,所以我們可以只針對那些瀏覽量較高的商品頁面進行緩存。
每個用戶都有一個相應的記錄用戶瀏覽商品歷史的有序集合,我們在記錄的過程中,我們也痛死記錄所有商品的瀏覽次數,根據瀏覽次數對商品進行排序,被瀏覽得最多的商品放到有序集合的索引0位置上,并且具有整個有序集合最少的分值。
除了緩存最常被瀏覽的商品外,我們還需要發現那些變得越來越流暢的新商品,于是我們需要定期修剪有序集合的長度并調整已有元素的分值,才能使得新流行的商品在排行榜中占據一席之地。
Redis數據結構設計(1)登錄令牌與用戶映射關系的散列 "login:"
(2)記錄最近登錄用戶的有序集合 "recent:"
(3)記錄各個用戶最近瀏覽商品的有序集合 "viewed:94233rhsYRIq3yi3qryrye"
(4)每個用戶的購物車散列,存儲商品ID與商品訂購數量之間的映射。"cart:94233rhsYRIq3yi3qryrye"
(5)請求頁面緩存集合 "cache:wre9w3rieruerwe3" (wre9w3rieruerwe3代表請求ID)
(94233rhsYRIq3yi3qryrye假設為某個用戶的令牌)
(6)數據行緩存字符串,數據列(column)的名字會被映射為json字典的鍵,而數據行的值會被映射為json字典的值,"inv:273" (其中273為數據行id)。
(7)數據行緩存調度有序集合,成員為數據行的ID,分值為時間戳,記錄應該在何時將制定的數據行緩存到Redis里面,"schedule:"。
(8)數據行緩存延時有序集合,成員為數據行的ID,而分值記錄指定數據行的緩存需要每隔多少秒更新一次,"delay:"。
(9)商品瀏覽次數有序集合,成員為商品,分值為瀏覽次數負值,方便保持在有序集合的較前的索引位置,"viewed"。
Redis實現(1)使用散列來存儲登錄cookie令牌與已登錄用戶之前的映射。根據給定的令牌查找與之相應的用戶,檢查用戶是否登錄,并返回該用戶的ID。
""" 獲取并返回令牌對應的用戶 @param {object} @param {string} token @return {string} 用戶id """ def checkToken(conn, token): return conn.hget("login:", token)
(2)用戶每次瀏覽頁面的時候,需要更新“登錄令牌與用戶映射關系的散列”里面的信息,
并將用戶的令牌和當前時間戳添加到 “記錄最近登錄用戶的有序集合” 里面,
將瀏覽商品添加到記錄“記錄各個用戶最近瀏覽商品的有序集合”中,如果記錄的商品數量超過25個,對這個有序集合進行修剪。
""" 更新令牌時,需要更改用戶令牌信息,將用戶記錄到最近登錄用戶的有序集合中, 如果用戶瀏覽的是商品,則需要將瀏覽商品寫入該用戶瀏覽過商品的有序集合中,并保證該集合不超過25個 @param {object} @param {string} token @param {string} user @param {string} item """ def updateToken(conn, token, user, item = None): timestamp = time.time() # 更新用戶令牌登錄對應的用戶信息 conn.hset("login:", token, user) # 增加最近訪問的用戶到有序集合 conn.zadd("recent:", token, timestamp) # 如果瀏覽產品,記錄該用戶最近訪問的25個產品 if item: conn.zadd("viewed:" + token, item, timestamp) conn.zremrangebyrank("viewed:" + token, 0, -26) # 記錄每個商品的瀏覽量 conn.zincrby("viewed:", item, -1)
(3)存儲會話的內存會隨著時間的推移而不斷增加,需要定期清理會話數據,我們決定只保留最新的1000萬個會話。
我們可以用 守護進程的方式來運行或者定義一個cron job每隔一段時間運行 ,
檢查最近 “記錄最近登錄用戶的有序集合” 大小是否超過了限制,超過限制每秒從集合中刪除最舊的100個令牌,并且移除相應的“登錄令牌與用戶映射關系的散列”的信息和對應的“記錄各個用戶最近瀏覽商品的有序集合”,對應的”美國用戶的購物車散列“。
我們也可以使用EXPIRE命令,為用戶令牌設記錄用戶商品瀏覽記錄的有序集合設置過期時間,讓Redis在一段時間之后自動刪除它們,這樣就不用使用有序集合來記錄最近出現的令牌了,但是這樣我們就沒辦法將會話數限制在1000萬之內了。
""" 定期清理會話數據,只保留最新的1000萬個會話。 使用 *守護進程的方式來運行或者定義一個cron job每隔一段時間運行* , 檢查最近 “記錄最近登錄用戶的有序集合” 大小是否超過了限制,超過限制每秒從集合中刪除最舊的100個令牌, 并且移除相應的“登錄令牌與用戶映射關系的散列”的信息和對應的“記錄各個用戶最近瀏覽商品的有序集合”。 @param {object} """ # 循環判斷,如果是cron job可以不用循環 QUIT = False # 限制保留的最大會話數據 LIMIT = 10000000 def cleanFullSession(conn): # 循環判斷,如果是cron job可以不用循環 while not QUIT: # 查詢最近登錄用戶會話數 size = conn.zcard("recent:") # 沒有超過限制,休眠1秒再繼續執行 if size <= LIMIT: time.sleep(1) continue # 查詢最舊登錄的最多100個令牌范圍 end_index = min(size - LIMIT, 100) tokens = conn.zrange("recent:", 0, end_index - 1) # 將要刪除的key都推入到數組中,要時候一起刪除 session_keys = [] for token in tokens: session_keys.append("viewed:" + token) session_keys.append("cart:" + token) # 批量刪除相應的用戶最近瀏覽商品有序集合,用戶的購物車,登錄令牌與用戶映射關系的散列和記錄最近登錄用戶的有序集合 conn.delete(*session_keys) conn.hdel("login:", *tokens) conn.zrem("recent:", *tokens)
(4)對購物車進行更新,如果用戶訂購某件商品數量大于0,將商品信息添加到 “用戶的購物車散列”中,如果購買商品已經存在,那么更新購買數量。
""" 對購物車進行更新,如果用戶訂購某件商品數量大于0,將商品信息添加到 “用戶的購物車散列”中,如果購買商品已經存在,那么更新購買數量 @param {object} @param {string} session @param {string} item @param {float} count """ def addToCart(conn, session, item, count): if count <= 0: # 從購物車移除指定商品 conn.hrem("cart:" + session, item) else: # 將指定商品添加到對應的購物車中 conn.hset("cart:" + session, item, count)
(5)在用戶請求頁面時,對于不能被緩存的請求,直接生成并返回頁面,對于可以被緩存的請求,先從緩存取出緩存頁面,如果緩存頁面不存在,那么會生成頁面并將其緩存在Redis,最后將頁面返回給函數調用者。
""" 在用戶請求頁面時,對于不能被緩存的請求,直接生成并返回頁面, 對于可以被緩存的請求,先從緩存取出緩存頁面,如果緩存頁面不存在,那么會生成頁面并將其緩存在Redis,最后將頁面返回給函數調用者。 @param {object} conn @param {string} request @param {callback} @return """ def cacheRequest(conn, request, callback): # 判斷請求是否能被緩存,不能的話直接調用回調函數 if not canCache(conn, request): return callback(request) # 將請求轉換為一個簡單的字符串健,方便之后進行查找 page_key = "cache:" + hashRequest(request) content = conn.get(page_key) # 沒有緩存的頁面,調用回調函數生成頁面,并緩存到redis中 if not content: content = callback(request) conn.setex(page_key, content, 300) return content """ 判斷頁面是否能被緩存,檢查商品是否被緩存以及頁面是否為商品頁面,根據商品排名來判斷是否需要緩存 @param {object} conn @param {string} request @return {boolean} """ def canCache(conn, request): # 根據請求的URL,得到商品ID item_id = extractItemId(request) # 檢查這個頁面能否被緩存以及這個頁面是否為商品頁面 if not item_id or isDynamic(request): return False # 商品的瀏覽排名 rank = conn.zrank("viewed:", item_id) return rank is not None and rank < 10000 """ 解析請求的URL,取得query中的item id @param {string} request @return {string} """ def extractItemId(request): parsed = urlparse.urlparse(request) # 返回query字典 query = urlparse.parse_qs(parsed.query) return (query.get("item") or [None])[0] """ 判斷請求的頁面是否動態頁面 @param {string} request @return {boolean} """ def isDynamic(request): parsed = urlparse.urlparse(request) query = urlparse.parse_qs(parsed.query) return "_" in query """ 將請求轉換為一個簡單的字符串健,方便之后進行查找 @param {string} request @return {string} """ def hashRequest(request): return str(hash(request))
(6)為了讓緩存函數定期地緩存數據行,首先需要將行ID和給定的延遲值添加到延遲有序集合中,再將行ID和當前時間的時間戳添加到調度有序集合中。如果某個數據行的延遲值不存在,那么程序將取消對這個數據行的調度。如果我們想要移除某個數據行已有的緩存并且不再緩存那個數據行,只需要把那個數據行的延遲值設置為小于或等于0即可。
""" 設置數據行緩存的延遲值和調度時間 @param {object} conn @param {int} row id @param {int} delay """ def scheduleRowCache(conn, row_id, delay): conn.zadd("delay:", row_id, delay) conn.zadd("schedule:", row_id, time.time())
(7)嘗試讀取”數據行緩存調度有序集合“的第一個元素以及該元素的分支,如果”數據行緩存調度有序集合“沒有包含任何元素,或者分值存儲的時間戳所指定的時間尚未來臨,那么函數先休眠50毫秒,然后再重新進行檢查。
當發現一個需要立即進行更新的數據行時,如果數據行的延遲值小于或者等于0,會從”數據行緩存延時有序集合“和”數據行緩存調度有序集合“移除這個數據行的ID,并從緩存里面刪除這個數據行已有的緩存,再重新進行檢查。
對于延遲值大于0的數據行來說,從數據庫里面取出這些行,將他們編碼為json格式并存儲到Redis里面,然后更新這些行的調度時間。
""" 守護進程,根據調度時間有序集合和延遲值緩存數據行 @param {object} conn """ def cacheRow(conn): while not QUIT: # 需要讀取”數據行緩存調度有序集合“的第一個元素,如果沒有包含任何元素,或者分值存儲的時間戳所指定的時間尚未來臨,那么函數先休眠50毫秒,然后再重新進行檢查 next = conn.zrange("schedule:", 0, 0, withscores=True) now = time.time() if not next or next[0][1] > now: time.sleep(.05) continue row_id = next[0][0] # 取出延遲值 delay = conn.zscore("delay:", row_id) # 如果延遲值小于等于0,則不再緩存該數據行 if delay <= 0: conn.zrem("schedule:", row_id) conn.zrem("delay:", row_id) conn.delete("inv:" + row_id) continue; # 需要緩存的,更新緩存調度的有序集合,并緩存該數據行 row = Inventory.get(row_id) conn.zadd("schedule:", row_id, now + delay) conn.set("inv:" + row_id, json.dumps(row.toDict())) """ 庫存類,庫存的商品信息 """ class Inventory(object): def __init__(self, id): self.id = id @classmethod def get(cls, id): return Inventory(id) def toDict(self): return {"id":self.id, "data":"data to cache...","cached":time.time()}
(8)我們需要在用戶瀏覽頁面時,“商品瀏覽次數有序集合”對應的商品中需要減一,使得保持在有序集合較前的索引位置。
同時我們需要開啟一個守護進程,每隔5分鐘,刪除所有排名在20000名之后的商品瀏覽數,并使用ZINTERSTORE將刪除之后剩余的所有商品的瀏覽次數減半。
而判斷頁面是否需要緩存,我們需要通過ZRANK取出商品的瀏覽次數排名,如果排名在10000內,那么說明該頁面需要緩存。
""" 守護進程,刪除所有排名在20000名之后的商品,并將刪除之后剩余的所有商品瀏覽次數減半,5分鐘執行一次 @param {object} conn """ def rescaleViewed(conn): while not QUIT: conn.zremrangebyrank("viewed:", 20000, -1) conn.zinterstore("viewed:", {"viewed:", .5}) time.sleep(300)測試代碼
""" 測試 """ import time import urlparse import uuid import threading import unittest import json class TestShoppingWebsite(unittest.TestCase): def setUp(self): import redis self.conn = redis.Redis(db=15) def tearDown(self): conn = self.conn to_del = ( conn.keys("login:*") + conn.keys("recent:*") + conn.keys("viewed:*") + conn.keys("cart:*") + conn.keys("cache:*") + conn.keys("delay:*") + conn.keys("schedule:*") + conn.keys("inv:*")) if to_del: conn.delete(*to_del) del self.conn global QUIT, LIMIT QUIT = False LIMIT = 10000000 print print def testLoginCookies(self): conn = self.conn global LIMIT, QUIT token = str(uuid.uuid4()) updateToken(conn, token, "username", "itemX") print "We just logged-in/updated token:", token print "For user:", "username" print print "What username do we get when we look-up that tokan?" r = checkToken(conn, token) print r print self.assertTrue(r) print "Let"s drop the maximun number of cookies to 0 to clear them out" print "We will start a thread to do the cleaning, while we stop it later" LIMIT = 0 t = threading.Thread(target = cleanFullSession, args = (conn,)) t.setDaemon(1) t.start() time.sleep(1) QUIT = True time.sleep(2) if t.isAlive(): raise Exception("The clean sessions thread is still slive?!?") s = conn.hlen("login:") print "The current number of session still available is:", s self.assertFalse(s) def testShoppingCartCookies(self): conn = self.conn global LIMIT, QUIT token = str(uuid.uuid4()) print "We"ll refresh our session..." updateToken(conn, token, "username", "itemX") print "And add an item to the shopping cart" addToCart(conn, token, "itemY", 3) r = conn.hgetall("cart:" + token) print "Our Shopping cart currently has:", r print self.assertTrue(len(r) >= 1) print "Let"s clean out our sessions an carts" LIMIT = 0 t = threading.Thread(target=cleanFullSession, args=(conn,)) t.setDaemon(1) t.start() time.sleep(1) QUIT = True time.sleep(2) if t.isAlive(): raise Exception("The clean sessions thread is still alive?!?") r = conn.hgetall("cart:" + token) print "Our shopping cart now contains:", r self.assertFalse(r) def testCacheRequest(self): conn = self.conn token = str(uuid.uuid4()) def callback(request): return "content for " + request updateToken(conn, token, "username", "itemX") url = "http://test.com/?item=itemX" print "We are going to cache a simple request against", url result = cacheRequest(conn, url, callback) print "We got initial content:", repr(result) print self.assertTrue(result) print "To test that we"ve cached the request, we"ll pass a bad callback" result2 = cacheRequest(conn, url, None) print "We ended up getting the same response!", repr(result2) self.assertEquals(result, result2) self.assertFalse(canCache(conn, "http://test.com/")) self.assertFalse(canCache(conn, "http://test.com/?item=itemX&_=1234567")) def testCacheRows(self): import pprint conn = self.conn global QUIT print "First, let"s schedule caching of itemX every 5 seconds" scheduleRowCache(conn, "itemX", 5) print "Our schedule looks like:" s = conn.zrange("schedule:", 0, -1, withscores = True) pprint.pprint(s) self.assertTrue(s) print "We"ll start a caching thread that will cache the data..." t = threading.Thread(target=cacheRow, args=(conn,)) t.setDaemon(1) t.start() time.sleep(1) print "Our cached data looks like:" r = conn.get("inv:itemX") print repr(r) self.assertTrue(r) print print "We"ll check again in 5 seconds..." time.sleep(5) print "Notice that the data has changed..." r2 = conn.get("inv:itemX") print repr(r2) print self.assertTrue(r2) self.assertTrue(r != r2) print "Let"s force un-caching" scheduleRowCache(conn, "itemX", -1) time.sleep(1) r = conn.get("inv:itemX") print "The cache was cleared?", not r print self.assertFalse(r) QUIT = True time.sleep(2) if t.isAlive(): raise Exception("The database caching thread is still alive?!?") if __name__ == "__main__": unittest.main()
完整示例代碼地址:https://github.com/NancyLin/r...
文章版權歸作者所有,未經允許請勿轉載,若此文章存在違規行為,您可以聯系管理員刪除。
轉載請注明本文地址:http://specialneedsforspecialkids.com/yun/40824.html
摘要:處理器根據取出的數據對模板進行渲染處理器向客戶端返回渲染后的內容作為請求的相應。于此相反,如果令牌的數量沒有超過限制,那么程序會先休眠一秒,之后在重新進行檢查。找出目前已有令牌的數量。 購物網站的redis相關實現 1、使用Redis構建文章投票網站(Java) 本文主要內容: 1、登錄cookie 2、購物車cookie 3、緩存數據庫行 4、測試 必備知識點 WEB應用就是通...
摘要:處理器根據取出的數據對模板進行渲染處理器向客戶端返回渲染后的內容作為請求的相應。于此相反,如果令牌的數量沒有超過限制,那么程序會先休眠一秒,之后在重新進行檢查。找出目前已有令牌的數量。 購物網站的redis相關實現 1、使用Redis構建文章投票網站(Java) 本文主要內容: 1、登錄cookie 2、購物車cookie 3、緩存數據庫行 4、測試 必備知識點 WEB應用就是通...
摘要:需求要構建一個文章投票網站,文章需要在一天內至少獲得張票,才能優先顯示在當天文章列表前列。根據評分或者發布時間對群組文章進行排序和分頁文章添加的群組移除的群組群組有序集合名以上就是一個文章投票網站的相關實現。 需求: 要構建一個文章投票網站,文章需要在一天內至少獲得200張票,才能優先顯示在當天文章列表前列。 但是為了避免發布時間較久的文章由于累計的票數較多而一直停留在文章列表前列,我...
閱讀 4162·2021-09-22 15:34
閱讀 2771·2021-09-22 15:29
閱讀 496·2019-08-29 13:52
閱讀 3356·2019-08-29 11:30
閱讀 2265·2019-08-26 10:40
閱讀 839·2019-08-26 10:19
閱讀 2260·2019-08-23 18:16
閱讀 2315·2019-08-23 17:50