摘要:延遲任務使用列表結構可以實現只能執行一種任務的隊列,也可以實現通過調用不同回調函數來執行不同任務的隊列,甚至還可以實現簡單的優先級隊列。
在處理Web客戶端發送的命令請求時,某些操作的執行時間可能會比我們預期的更長一些。通過將待執行任務的相關信息放入隊列里面,并在之后對隊列進行處理,用戶可以推遲那些需要一段時間才能完成的操作,這種工作交給任務處理器來執行的做法被稱為任務隊列(task queue)。現在有很多專門的任務隊列軟件(如ActiveMQ、RabbitMQ、Gearman、Amazon SQS),接下來實現兩種不同類型的任務隊列,第一種隊列會根據任務被插入隊列的順序來盡快地執行任務,第二種隊列具有安排任務在未來某個特定時間執行的能力。
先進先出隊列除了任務隊列之外,還有先進先出(FIFO)隊列、后進后出(LIFO)隊列和優先級(priority)隊列。
使用任務隊列來記錄郵件的收信人以及發送郵件的原因,并構建一個可以在郵件發送服務器運行變得緩慢的時候,以并行方式一次發送多封郵件的工作進程(worker process)。
要編寫的隊列將以“先到先服務”(first-come,first-served)的方式發送郵件,并且無論發送是否成功,程序都會把發送結果記錄到日志里面。Redis的列表結構允許用戶通過RPUSH和LPUSH以及RPOP和LPOP,從列表的兩端推入和彈出元素。郵件隊列使用RPUSH命令來將待發送的郵件推入列表的右端,并且因為工作進程除了發送郵件之外不需要執行其他工作,所以它將使用阻塞版本的彈出命令BLPOP從隊列中彈出待發送的郵件,而命令的最大阻塞時限為30秒。
郵件隊列由一個Redis列表構成,包含多個JSON編碼對象。為了將待發送的郵件推入隊列里面,程序會獲取發送郵件所需的全部信息,并將這些信息序列化為JSON對象,最后使用RPUSH命令將JSON對象推入郵件隊列里面。
def send_sold_email_via_queue(conn, seller, item, price, buyer): data = { "seller_id": seller, "item_id": item, "price": price, "buyer_id": buyer, "time": time.time() } conn.rpush("queue:email", json.dumps(data))
從隊列里獲取待發送郵件,程序首先使用BLPOP命令從郵件隊列里面彈出一個JSON對象,接著通過解碼JSON對象來取得待發送郵件的相關信息,最后根據這些信息來發送郵件。
def process_sold_email_queue(conn): while not QUIT: packed = conn.blpop(["queue:email"], 30) //獲取一封待發送郵件 if not packed: //隊列里面暫時還沒有待發送郵件,重試 continue to_send = json.loads(packed[1]) //從JSON對象中解碼出郵件信息 try: fetch_data_and_send_sold_email(to_send) except EmailSendError as err: log_error("Failed to send sold email", err, to_send) else: log_success("Send sold email", to_send)多個可執行任務
因為BLPOP命令每次只會從隊列里面彈出一封待發送郵件,所以待發送郵件不會出現重復,也不會被重復發送。并且因為隊列只會存放待發送郵件,所以工作進程要處理的任務是非常單一的。下面代碼的工作進程會監視用戶提供的多個隊列,并從多個已知的已注冊回調函數里面,選出一個函數來處理JSON編碼的函數調用。
def worker_watch_queue(conn, queue, callback): while not QUIT: packed = conn.blpop([queue], 30) if not packed: continue name, args = json.loads(packed[1]) if name not in callbacks: //沒有找到任務指定的回調函數,用日志記錄錯誤并重試 log_error("Unknown callback %s"%name) continue callbacks[name](*args) //執行任務任務優先級
在使用隊列的時候,程序可能會需要讓特定的操作優先于其他操作執行。
假設現在我們需要為任務設置高、中、低3種優先級別,其中:高優先級任務在出現之后會第一時間被執行,而中等優先級任務則會在沒有任何高優先級任務存在的情況下被執行,而低優先級任務則會在既沒有任何高優先級任務,又沒有任何中等優先級任務的情況下被執行。
def worker_watch_queues(conn, queues, callbacks): while not QUIT: packed = conn.blpop(queues, 30) if not packed: continue name, args = json.loads(packed[1]) if name not in callbacks: log_error("Unknown callback %s"%name) continue callbacks[name](*args)
同時使用多個隊列可以降低實現優先級特性的難度。除此之外,多隊列有時候也會被用于分隔不同的任務(如同一個隊列存放公告郵件,而另一個隊列則存放提醒郵件),在這種情況下,處理不同隊列時可能出現不公平現象。為此,我們可以偶爾重新排列各個隊列的順序,使得針對隊列的處理操作變得更公平一些,當某個隊列的增長速度比其他隊列的增長速度快的時候,這種重拍操作尤為重要。
延遲任務使用列表結構可以實現只能執行一種任務的隊列,也可以實現通過調用不同回調函數來執行不同任務的隊列,甚至還可以實現簡單的優先級隊列。
以下3種方法可以為隊列中的任務添加延遲性質:
在任務信息中包含任務的執行時間,如果工作進程發現任務的執行時間尚未來臨,那么它將在短暫等待之后,把任務重新推入隊列里面。
工作進程使用一個本地的等待列表來記錄所有需要在未來執行的任務,并在每次進行while循環的時候,檢查等待列表并執行那些已經到期的任務。
把所有需要在未來執行的任務都添加到有序集合里面,并將任務的執行時間設置為分值,另外再使用一個進程來查找有序集合里面是否存在可以立即被執行的任務,如果有的話,就從有序集合里面移除那個任務,并將它添加到適當得任務隊列里面。
因為無論是進行短暫的等待,還是將任務重新推入隊列里面,都會浪費工作進程的時間,所以我們不會采用第一種方法。此外,因為工作進程可能會因為崩潰而丟失本地記錄的所有待執行任務,所以我們也不會采用第二種方法。最后,因為使用有序集合的第三種方法最簡單直接,所以我們將采取這一方法,并使用鎖來保證任務從有序集合移動到任務隊列時的安全性。
有序集合隊列(ZSET queue)存儲的每個被延遲的任務都是一個包含4個值的JSON列表,這4個分值分別是:唯一標識符、處理任務隊列的名字、處理任務的回調函數的名字、傳給回調函數的參數。在有序集合里面,任務的分值會被設置為任務的執行時間,而立即可執行的任務將被直接插入任務隊列里面。下面代碼展示了創建延遲任務(任務是否延遲是可選的,只要把任務的延遲時間設置為0就可以創建一個立即執行的任務)。
def execute_later(conn, queue, name, args, delay=0): identifier = str(uuid.uuid4()) item = json.dumps([identifier, queue, name, args]) if delay > 0: conn.zadd("delayed:", item, time.time() + delay) else: conn.rpush("queue:" + queue, item) return identifier
因為Redis沒有提供直接的方法可以阻塞有序集合直到元素的分值低于當前UNIX時間戳為止,所以我們需要自己來查找有序集合里面分值低于當前UNIX時間戳的任務。因為所有被延遲的任務都存儲在同一個有序集合隊列里面,所以程序只需要獲取有序集合里面排名第一的元素以及該元素的分值就可以了:如果隊列里面沒有任何任務,或者任務的執行時間尚未來臨,那么程序將在短暫等待之后重試;如果任務的執行時間已到,那么程序將根據任務包含的標識符來獲取一個細粒度鎖,接著從有序集合里面移除要被執行的任務,并將它添加到適當的任務隊列里面。通過將可執行的任務添加到任務隊列里面而不是直接執行它們,我們可以把獲取可執行任務的進程數量限制在一兩個之內,而不必根據工作進程的數量來決定運行多少個獲取進程,這減少了獲取可執行任務所需的花銷。
def poll_queue(conn): while not QUIT: item = conn.zrange("delayed:", 0, 0, withscores=True) if not item or item[0][1] > time.time(): time.sleep(.01) continue item = item[0][0] identifier, queue, function, args = json.loads(item) locked = acquire_lock(conn, identifier) if not locked: continue if conn.zrem("delayed:", item): conn.rpush("queue:" + queue, item) release_lock(conn, identifier, locked)
因為有序集合并不具備像列表那樣的阻塞彈出機制,所以程序需要不斷地進行循環,并嘗試從隊列里面獲取要被執行的任務,雖然這一操作會增大網絡和處理器的負載,但因為我們只會運行一兩個這樣的程序,所以不會消耗太多資源。
文章版權歸作者所有,未經允許請勿轉載,若此文章存在違規行為,您可以聯系管理員刪除。
轉載請注明本文地址:http://specialneedsforspecialkids.com/yun/44607.html
摘要:如果任務沒有在規定時間內完成,那么該有序集合的任務將會被重新放入隊列中。這兩個進程操縱了三個隊列,其中一個,負責即時任務,兩個,負責延時任務與待處理任務。如果任務執行成功,就會刪除中的任務,否則會被重新放入隊列中。 在實際的項目開發中,我們經常會遇到需要輕量級隊列的情形,例如發短信、發郵件等,這些任務不足以使用 kafka、RabbitMQ 等重量級的消息隊列,但是又的確需要異步、重試...
摘要:配置項用于配置失敗隊列任務存放的數據庫及數據表。要使用隊列驅動,需要在配置文件中配置數據庫連接。如果應用使用了,那么可以使用時間或并發來控制隊列任務。你可以使用命令運行這個隊列進程。如果隊列進程意外關閉,它會自動重啟啟動隊列進程。 一、概述 在Web開發中,我們經常會遇到需要批量處理任務的場景,比如群發郵件、秒殺資格獲取等,我們將這些耗時或者高并發的操作放到隊列中異步執行可以有效緩解系...
摘要:場景說明用于處理比較耗時的請求,例如批量發送郵件,如果直接在網頁觸發執行發送,程序會出現超時高并發場景,當某個時刻請求瞬間增加時,可以把請求寫入到隊列,后臺在去處理這些請求搶購場景,先入先出的模式命令或往列表右側推入數據客戶端阻塞直到隊列有 場景說明: 用于處理比較耗時的請求,例如批量發送郵件,如果直接在網頁觸發執行發送,程序會出現超時 高并發場景,當某個時刻請求瞬間增加時,可以把請...
閱讀 1743·2021-09-22 15:25
閱讀 1307·2019-08-29 12:34
閱讀 1908·2019-08-26 13:57
閱讀 3188·2019-08-26 10:48
閱讀 1443·2019-08-26 10:45
閱讀 793·2019-08-23 18:23
閱讀 733·2019-08-23 18:01
閱讀 1945·2019-08-23 16:07