摘要:因為它是線程安全的,所以多個線程很輕松地使用同一個實例。后進先出隊列使用后進先出順序,與棧結構相似這就是全部代碼了,這正是設計很棒的一個原因,它將底層的數據操作抽象成四個操作函數,本身來處理線程安全的問題,使得其子類只需關注底層的操作。
起步
queue 模塊提供適用于多線程編程的先進先出(FIFO)數據結構。因為它是線程安全的,所以多個線程很輕松地使用同一個實例。
源碼分析先從初始化的函數來看:
class Queue: def __init__(self, maxsize=0): # 設置隊列的最大容量 self.maxsize = maxsize self._init(maxsize) # 線程鎖,互斥變量 self.mutex = threading.Lock() # 由鎖衍生出三個條件變量 self.not_empty = threading.Condition(self.mutex) self.not_full = threading.Condition(self.mutex) self.all_tasks_done = threading.Condition(self.mutex) self.unfinished_tasks = 0 def _init(self, maxsize): # 初始化底層數據結構 self.queue = deque()
從這初始化函數能得到哪些信息呢?首先,隊列是可以設置其容量大小的,并且具體的底層存放元素的它使用了 collections.deque() 雙端列表的數據結構,這使得能很方便的做先進先出操作。這里還特地抽象為 _init 函數是為了方便其子類進行覆蓋,允許子類使用其他結構來存放元素(比如優先隊列使用了 list)。
然后就是線程鎖 self.mutex ,對于底層數據結構 self.queue 的操作都要先獲得這把鎖;再往下是三個條件變量,這三個 Condition 都以 self.mutex 作為參數,也就是說它們共用一把鎖;從這可以知道諸如 with self.mutex 與 with self.not_empty 等都是互斥的。
基于這些鎖而做的一些簡單的操作:
class Queue: ... def qsize(self): # 返回隊列中的元素數 with self.mutex: return self._qsize() def empty(self): # 隊列是否為空 with self.mutex: return not self._qsize() def full(self): # 隊列是否已滿 with self.mutex: return 0 < self.maxsize <= self._qsize() def _qsize(self): return len(self.queue)
這個代碼片段挺好理解的,無需分析。
作為隊列,主要得完成入隊與出隊的操作,首先是入隊:
class Queue: ... def put(self, item, block=True, timeout=None): with self.not_full: # 獲取條件變量not_full if self.maxsize > 0: if not block: if self._qsize() >= self.maxsize: raise Full # 如果 block 是 False,并且隊列已滿,那么拋出 Full 異常 elif timeout is None: while self._qsize() >= self.maxsize: self.not_full.wait() # 阻塞直到由剩余空間 elif timeout < 0: # 不合格的參數值,拋出ValueError raise ValueError(""timeout" must be a non-negative number") else: endtime = time() + timeout # 計算等待的結束時間 while self._qsize() >= self.maxsize: remaining = endtime - time() if remaining <= 0.0: raise Full # 等待期間一直沒空間,拋出 Full 異常 self.not_full.wait(remaining) self._put(item) # 往底層數據結構中加入一個元素 self.unfinished_tasks += 1 self.not_empty.notify() def _put(self, item): self.queue.append(item)
盡管只有二十幾行的代碼,但這里的邏輯還是比較復雜的。它要處理超時與隊列剩余空間不足的情況,具體幾種情況如下:
如果 block 是 False,忽略timeout參數
若此時隊列已滿,則拋出 Full 異常;
若此時隊列未滿,則立即把元素保存到底層數據結構中;
如果 block 是 True
若 timeout 是 None 時,那么put操作可能會阻塞,直到隊列中有空閑的空間(默認);
若 timeout 是非負數,則會阻塞相應時間直到隊列中有剩余空間,在這個期間,如果隊列中一直沒有空間,拋出 Full 異常;
處理好參數邏輯后,,將元素保存到底層數據結構中,并遞增unfinished_tasks,同時通知 not_empty ,喚醒在其中等待數據的線程。
出隊操作:
class Queue: ... def get(self, block=True, timeout=None): with self.not_empty: if not block: if not self._qsize(): raise Empty elif timeout is None: while not self._qsize(): self.not_empty.wait() elif timeout < 0: raise ValueError(""timeout" must be a non-negative number") else: endtime = time() + timeout while not self._qsize(): remaining = endtime - time() if remaining <= 0.0: raise Empty self.not_empty.wait(remaining) item = self._get() self.not_full.notify() return item def _get(self): return self.queue.popleft()
get() 操作是 put() 相反的操作,代碼塊也及其相似,get() 是從隊列中移除最先插入的元素并將其返回。
如果 block 是 False,忽略timeout參數
若此時隊列沒有元素,則拋出 Empty 異常;
若此時隊列由元素,則立即把元素保存到底層數據結構中;
如果 block 是 True
若 timeout 是 None 時,那么get操作可能會阻塞,直到隊列中有元素(默認);
若 timeout 是非負數,則會阻塞相應時間直到隊列中有元素,在這個期間,如果隊列中一直沒有元素,則拋出 Empty 異常;
最后,通過 self.queue.popleft() 將最早放入隊列的元素移除,并通知 not_full ,喚醒在其中等待數據的線程。
這里有個值得注意的地方,在 put() 操作中遞增了 self.unfinished_tasks ,而 get() 中卻沒有遞減,這是為什么?
這其實是為了留給用戶一個消費元素的時間,get() 僅僅是獲取元素,并不代表消費者線程處理的該元素,用戶需要調用 task_done() 來通知隊列該任務處理完成了:
class Queue: ... def task_done(self): with self.all_tasks_done: unfinished = self.unfinished_tasks - 1 if unfinished <= 0: if unfinished < 0: # 也就是成功調用put()的次數小于調用task_done()的次數時,會拋出異常 raise ValueError("task_done() called too many times") self.all_tasks_done.notify_all() # 當unfinished為0時,會通知all_tasks_done self.unfinished_tasks = unfinished def join(self): with self.all_tasks_done: while self.unfinished_tasks: # 如果有未完成的任務,將調用wait()方法等待 self.all_tasks_done.wait()
由于 task_done() 使用方調用的,當 task_done() 次數大于 put() 次數時會拋出異常。
task_done() 操作的作用是喚醒正在阻塞的 join() 操作。join() 方法會一直阻塞,直到隊列中所有的元素都被取出,并被處理了(和線程的join方法類似)。也就是說 join() 方法必須配合 task_done() 來使用才行。
LIFO 后進先出隊列LifoQueue使用后進先出順序,與棧結構相似:
class LifoQueue(Queue): """Variant of Queue that retrieves most recently added entries first.""" def _init(self, maxsize): self.queue = [] def _qsize(self): return len(self.queue) def _put(self, item): self.queue.append(item) def _get(self): return self.queue.pop()
這就是 LifoQueue 全部代碼了,這正是 Queue 設計很棒的一個原因,它將底層的數據操作抽象成四個操作函數,本身來處理線程安全的問題,使得其子類只需關注底層的操作。
LifoQueue 底層數據結構改用 list 來存放,通過 self.queue.pop() 就能將 list 中最后一個元素移除,無需重置索引。
PriorityQueue 優先隊列from heapq import heappush, heappop class PriorityQueue(Queue): """Variant of Queue that retrieves open entries in priority order (lowest first). Entries are typically tuples of the form: (priority number, data). """ def _init(self, maxsize): self.queue = [] def _qsize(self): return len(self.queue) def _put(self, item): heappush(self.queue, item) def _get(self): return heappop(self.queue)
優先隊列使用了 heapq 模塊的結構,也就是最小堆的結構。優先隊列更為常用,隊列中項目的處理順序需要基于這些項目的特征,一個簡單的例子:
import queue class A: def __init__(self, priority, value): self.priority = priority self.value = value def __lt__(self, other): return self.priority < other.priority q = queue.PriorityQueue() q.put(A(1, "a")) q.put(A(0, "b")) q.put(A(1, "c")) print(q.get().value) # "b"
使用優先隊列的時候,需要定義 __lt__ 魔術方法,來定義它們之間如何比較大小。若元素的 priority 相同,依然使用先進先出的順序。
參考https://pymotw.com/3/queue/in...
文章版權歸作者所有,未經允許請勿轉載,若此文章存在違規行為,您可以聯系管理員刪除。
轉載請注明本文地址:http://specialneedsforspecialkids.com/yun/43547.html
摘要:介紹今天花了近乎一天的時間研究關于多線程的問題,查看了大量源碼自己也實踐了一個生產消費者模型,所以把一天的收獲總結一下。提供了兩個模塊和來支持的多線程操作。使用來阻塞線程。 介紹 今天花了近乎一天的時間研究python關于多線程的問題,查看了大量源碼 自己也實踐了一個生產消費者模型,所以把一天的收獲總結一下。 由于GIL(Global Interpreter Lock)鎖的關系,純的p...
摘要:消息隊列的接受消息隊列的接受是利用函數,其中是消息的類型,該參數會取出指定類型的消息,如果設定的是爭搶模式,該值會統一為,否則該值就是消息發送目的的。環形隊列的消息入隊發送消息首先要確定環形隊列的隊尾。取模操作可以優化 前言 swoole 的底層隊列有兩種:進程間通信 IPC 的消息隊列 swMsgQueue,與環形隊列 swRingQueue。IPC 的消息隊列用于 task_wor...
摘要:對線程池的研究是之前對分析的附加工作。在之前對源碼分析的文章中,寫到調度器將任務放入線程池的函數這里分析的線程池類是,也就是上述代碼中所使用的類。 對Python線程池的研究是之前對Apshceduler分析的附加工作。 在之前對Apshceduler源碼分析的文章中,寫到調度器將任務放入線程池的函數 def _do_submit_job(self, job, run_time...
摘要:默認值為,指定為時代表可以阻塞,若同時指定,在超時時返回。當消費者線程調用意味著有消費者取得任務并完成任務,未完成的任務數就會減少。當未完成的任務數降到,解除阻塞。 學習契機 最近的一個項目中在使用grpc時遇到一個問題,由于client端可多達200,每個端口每10s向grpc server發送一次請求,server端接受client的請求后根據request信息更新數據庫,再將數據...
摘要:最近稿定設計這個站點挺火,設計組的大哥一直在提,啊,這個好,這個好。目的是給設計組大哥提供素材參考,畢竟做設計的可不能抄襲哦思路枯竭的時候,借鑒一下還湊合。看了一眼設計大哥的頭發,我覺得夠他用一年了。 ...
閱讀 1463·2021-11-24 09:39
閱讀 1781·2021-11-22 15:25
閱讀 3732·2021-11-19 09:40
閱讀 3291·2021-09-22 15:31
閱讀 1293·2021-07-29 13:49
閱讀 1201·2019-08-26 11:59
閱讀 1313·2019-08-26 11:39
閱讀 927·2019-08-26 11:00