摘要:如果某線程并未使用很多操作,它會在自己的時間片內(nèi)一直占用處理器和。在中使用線程在和等大多數(shù)類系統(tǒng)上運(yùn)行時,支持多線程編程。守護(hù)線程另一個避免使用模塊的原因是,它不支持守護(hù)線程。
引言&動機(jī)這一篇是Python并發(fā)的第四篇,主要介紹進(jìn)程和線程的定義,Python線程和全局解釋器鎖以及Python如何使用thread模塊處理并發(fā)
考慮一下這個場景,我們有10000條數(shù)據(jù)需要處理,處理每條數(shù)據(jù)需要花費(fèi)1秒,但讀取數(shù)據(jù)只需要0.1秒,每條數(shù)據(jù)互不干擾。該如何執(zhí)行才能花費(fèi)時間最短呢?
在多線程(MT)編程出現(xiàn)之前,電腦程序的運(yùn)行由一個執(zhí)行序列組成,執(zhí)行序列按順序在主機(jī)的中央處理器(CPU)中運(yùn)行。無論是任務(wù)本身要求順序執(zhí)行還是整個程序是由多個子任務(wù)組成,程序都是按這種方式執(zhí)行的。即使子任務(wù)相互獨(dú)立,互相無關(guān)(即,一個子任務(wù)的結(jié)果不影響其它子 任務(wù)的結(jié)果)時也是這樣。
對于上邊的問題,如果使用一個執(zhí)行序列來完成,我們大約需要花費(fèi) 10000*0.1 + 10000 = 11000 秒。這個時間顯然是太長了。
那我們有沒有可能在執(zhí)行計算的同時取數(shù)據(jù)呢?或者是同時處理幾條數(shù)據(jù)呢?如果可以,這樣就能大幅提高任務(wù)的效率。這就是多線程編程的目的。
對于本質(zhì)上就是異步的, 需要有多個并發(fā)事務(wù),各個事務(wù)的運(yùn)行順序可以是不確定的,隨機(jī)的,不可預(yù)測的問題,多線程是最理想的解決方案。這樣的任務(wù)可以被分成多個執(zhí)行流,每個流都有一個要完成的目標(biāo),然后將得到的結(jié)果合并,得到最終的結(jié)果。
線程和進(jìn)程 什么是進(jìn)程進(jìn)程(有時被稱為重量級進(jìn)程)是程序的一次 執(zhí)行。每個進(jìn)程都有自己的地址空間,內(nèi)存,數(shù)據(jù)棧以及其它記錄其運(yùn)行軌跡的輔助數(shù)據(jù)。操作系 統(tǒng)管理在其上運(yùn)行的所有進(jìn)程,并為這些進(jìn)程公平地分配時間。進(jìn)程也可以通過 fork 和 spawn 操作 來完成其它的任務(wù)。不過各個進(jìn)程有自己的內(nèi)存空間,數(shù)據(jù)棧等,所以只能使用進(jìn)程間通訊(IPC), 而不能直接共享信息。
什么是線程線程(有時被稱為輕量級進(jìn)程)跟進(jìn)程有些相似,不同的是,所有的線程運(yùn)行在同一個進(jìn)程中, 共享相同的運(yùn)行環(huán)境。它們可以想像成是在主進(jìn)程或“主線程”中并行運(yùn)行的“迷你進(jìn)程”。
線程狀態(tài)如圖
線程有開始,順序執(zhí)行和結(jié)束三部分。它有一個自己的指令指針,記錄自己運(yùn)行到什么地方。 線程的運(yùn)行可能被搶占(中斷),或暫時的被掛起(也叫睡眠),讓其它的線程運(yùn)行,這叫做讓步。 一個進(jìn)程中的各個線程之間共享同一片數(shù)據(jù)空間,所以線程之間可以比進(jìn)程之間更方便地共享數(shù)據(jù)以及相互通訊。
當(dāng)然,這樣的共享并不是完全沒有危險的。如果多個線程共同訪問同一片數(shù)據(jù),則由于數(shù)據(jù)訪 問的順序不一樣,有可能導(dǎo)致數(shù)據(jù)結(jié)果的不一致的問題。這叫做競態(tài)條件(race condition)。
線程一般都是并發(fā)執(zhí)行的,不過在單 CPU 的系統(tǒng)中,真正的并發(fā)是不可能的,每個線程會被安排成每次只運(yùn)行一小會,然后就把 CPU 讓出來,讓其它的線程去運(yùn)行。由于有的函數(shù)會在完成之前阻塞住,在沒有特別為多線程做修改的情 況下,這種“貪婪”的函數(shù)會讓 CPU 的時間分配有所傾斜。導(dǎo)致各個線程分配到的運(yùn)行時間可能不 盡相同,不盡公平。
Python、線程和全局解釋器鎖 全局解釋器鎖(GIL)首先需要明確的一點(diǎn)是GIL并不是Python的特性,它是在實(shí)現(xiàn)Python解析器(CPython)時所引入的一個概念。就好比C++是一套語言(語法)標(biāo)準(zhǔn),但是可以用不同的編譯器來編譯成可執(zhí)行代碼。同樣一段代碼可以通過CPython,PyPy,Psyco等不同的Python執(zhí)行環(huán)境來執(zhí)行(其中的JPython就沒有GIL)。
那么CPython實(shí)現(xiàn)中的GIL又是什么呢?GIL全稱Global Interpreter Lock為了避免誤導(dǎo),我們還是來看一下官方給出的解釋:
In CPython, the global interpreter lock, or GIL, is a mutex that prevents multiple native threads from executing Python bytecodes at once. This lock is necessary mainly because CPython’s memory management is not thread-safe. (However, since the GIL exists, other features have grown to depend on the guarantees that it enforces.)
盡管Python完全支持多線程編程, 但是解釋器的C語言實(shí)現(xiàn)部分在完全并行執(zhí)行時并不是線程安全的。 實(shí)際上,解釋器被一個全局解釋器鎖保護(hù)著,它確保任何時候都只有一個Python線程執(zhí)行。
在多線程環(huán)境中,Python 虛擬機(jī)按以下方式執(zhí)行:
設(shè)置GIL
切換到一個線程去執(zhí)行
運(yùn)行
指定數(shù)量的字節(jié)碼指令
線程主動讓出控制(可以調(diào)用time.sleep(0))
把線程設(shè)置完睡眠狀態(tài)
解鎖GIL
再次重復(fù)以上步驟
退出線程對所有面向 I/O 的(會調(diào)用內(nèi)建的操作系統(tǒng) C 代碼的)程序來說,GIL 會在這個 I/O 調(diào)用之 前被釋放,以允許其它的線程在這個線程等待 I/O 的時候運(yùn)行。如果某線程并未使用很多 I/O 操作, 它會在自己的時間片內(nèi)一直占用處理器(和 GIL)。也就是說,I/O 密集型的 Python 程序比計算密集 型的程序更能充分利用多線程環(huán)境的好處。
當(dāng)一個線程結(jié)束計算,它就退出了。線程可以調(diào)用 thread.exit()之類的退出函數(shù),也可以使用 Python 退出進(jìn)程的標(biāo)準(zhǔn)方法,如 sys.exit()或拋出一個 SystemExit 異常等。不過,你不可以直接 “殺掉”("kill")一個線程。
在 Python 中使用線程在 Win32 和 Linux, Solaris, MacOS, *BSD 等大多數(shù)類 Unix 系統(tǒng)上運(yùn)行時,Python 支持多線程 編程。Python 使用 POSIX 兼容的線程,即 pthreads。
默認(rèn)情況下,只要在解釋器中
>> import thread
如果沒有報錯,則說明線程可用。
Python 的 threading 模塊Python 供了幾個用于多線程編程的模塊,包括 thread, threading 和 Queue 等。thread 和 threading 模塊允許程序員創(chuàng)建和管理線程。thread 模塊 供了基本的線程和鎖的支持,而 threading 供了更高級別,功能更強(qiáng)的線程管理的功能。Queue 模塊允許用戶創(chuàng)建一個可以用于多個線程之間 共享數(shù)據(jù)的隊列數(shù)據(jù)結(jié)構(gòu)。
出于以下幾點(diǎn)考慮,我們不建議您使用 thread 模塊。
更高級別的 threading 模塊更為先 進(jìn),對線程的支持更為完善,而且使用 thread 模塊里的屬性有可能會與 threading 出現(xiàn)沖突。其次, 低級別的 thread 模塊的同步原語很少(實(shí)際上只有一個),而 threading 模塊則有很多。
對于你的進(jìn)程什么時候應(yīng)該結(jié)束完全沒有控制,當(dāng)主線程結(jié)束 時,所有的線程都會被強(qiáng)制結(jié)束掉,沒有警告也不會有正常的清除工作。我們之前說過,至少 threading 模塊能確保重要的子線程退出后進(jìn)程才退出。
thread 模塊除了產(chǎn)生線程外,thread 模塊也提供了基本的同步數(shù) 據(jù)結(jié)構(gòu)鎖對象(lock object,也叫原語鎖,簡單鎖,互斥鎖,互斥量,二值信號量)。
thread 模塊函數(shù)
start_new_thread(function, args, kwargs=None):產(chǎn)生一個新的線程,在新線程中用指定的參數(shù)和可選的 kwargs 來調(diào)用這個函數(shù)。
allocate_lock():分配一個 LockType 類型的鎖對象
exit():讓線程退出
acquire(wait=None):嘗試獲取鎖對象
locked():如果獲取了鎖對象返回 True,否則返回 False
release():釋放鎖
下面是一個使用 thread 的例子:
import thread from time import sleep, time def loop(num): print("start loop at:", time()) sleep(num) print("loop done at:", time()) def loop1(num): print("start loop 1 at:", time()) sleep(num) print("loop 1 done at:", time()) def main(): print("starting at:", time()) thread.start_new_thread(loop, (4,)) thread.start_new_thread(loop1, (5,)) sleep(6) print("all DONE at:", time()) if __name__ == "__main__": main() ("starting at:", 1489387024.886667) ("start loop at:", 1489387024.88705) ("start loop 1 at:", 1489387024.887277) ("loop done at:", 1489387028.888182) ("loop 1 done at:", 1489387029.888904) ("all DONE at:", 1489387030.889918)
start_new_thread()要求一定要有前兩個參數(shù)。所以,就算我們想要運(yùn)行的函數(shù)不要參數(shù),也要傳一個空的元組。
為什么要加上sleep(6)這一句呢? 因?yàn)椋绻覀儧]有讓主線程停下來,那主線程就會運(yùn)行下一條語句,顯示 “all done”,然后就關(guān)閉運(yùn)行著 loop()和 loop1()的兩個線程,退出了。
我們有沒有更好的辦法替換使用sleep() 這種不靠譜的同步方式呢?答案是使用鎖,使用了鎖,我們就可以在兩個線程都退出之后馬上退出。
#! -*- coding: utf-8 -*- import thread from time import sleep, time loops = [4, 2] def loop(nloop, nsec, lock): print("start loop %s at: %s" % (nloop, time())) sleep(nsec) print("loop %s done at: %s" % (nloop, time())) # 每個線程都會被分配一個事先已經(jīng)獲得的鎖,在 sleep()的時間到了之后就釋放 相應(yīng)的鎖以通知主線程,這個線程已經(jīng)結(jié)束了。 lock.release() def main(): print("starting at:", time()) locks = [] nloops = range(len(loops)) for i in nloops: # 調(diào)用 thread.allocate_lock()函數(shù)創(chuàng)建一個鎖的列表 lock = thread.allocate_lock() # 分別調(diào)用各個鎖的 acquire()函數(shù)獲得, 獲得鎖表示“把鎖鎖上” lock.acquire() locks.append(lock) for i in nloops: # 創(chuàng)建線程,每個線程都用各自的循環(huán)號,睡眠時間和鎖為參數(shù)去調(diào)用 loop()函數(shù) thread.start_new_thread(loop, (i, loops[i], locks[i])) for i in nloops: # 在線程結(jié)束的時候,線程要自己去做解鎖操作 # 當(dāng)前循環(huán)只是坐在那一直等(達(dá)到暫停主 線程的目的),直到兩個鎖都被解鎖為止才繼續(xù)運(yùn)行。 while locks[i].locked(): pass print("all DONE at:", time()) if __name__ == "__main__": main()
為什么我們不在創(chuàng)建鎖的循環(huán)里創(chuàng)建線程呢?有以下幾個原因:
我們想到實(shí)現(xiàn)線程的同步,所以要讓“所有的馬同時沖出柵欄”。
獲取鎖要花一些時間,如果你的 線程退出得“太快”,可能會導(dǎo)致還沒有獲得鎖,線程就已經(jīng)結(jié)束了的情況。
threading 模塊threading 模塊不僅提供了 Thread 類,還提供了各種非常好用的同步機(jī)制。
下面是threading 模塊里所有的對象:
Thread: 表示一個線程的執(zhí)行的對象
Lock: 鎖原語對象(跟 thread 模塊里的鎖對象相同)
RLock: 可重入鎖對象。使單線程可以再次獲得已經(jīng)獲得了的鎖(遞歸鎖定)。
Condition: 條件變量對象能讓一個線程停下來,等待其它線程滿足了某個“條件”。 如,狀態(tài)的改變或值的改變。
Event: 通用的條件變量。多個線程可以等待某個事件的發(fā)生,在事件發(fā)生后, 所有的線程都會被激活。
Semaphore: 為等待鎖的線程 供一個類似“等候室”的結(jié)構(gòu)
BoundedSemaphore: 與 Semaphore 類似,只是它不允許超過初始值
Timer: 與 Thread 相似,只是,它要等待一段時間后才開始運(yùn)行。
守護(hù)線程另一個避免使用 thread 模塊的原因是,它不支持守護(hù)線程。當(dāng)主線程退出時,所有的子線程不 論它們是否還在工作,都會被強(qiáng)行退出。有時,我們并不期望這種行為,這時,就引入了守護(hù)線程 的概念
threading 模塊支持守護(hù)線程,它們是這樣工作的:守護(hù)線程一般是一個等待客戶請求的服務(wù)器, 如果沒有客戶 出請求,它就在那等著。如果你設(shè)定一個線程為守護(hù)線程,就表示你在說這個線程 是不重要的,在進(jìn)程退出的時候,不用等待這個線程退出。
如果你的主線程要退出的時候,不用等待那些子線程完成,那就設(shè)定這些線程的 daemon 屬性。 即,在線程開始(調(diào)用 thread.start())之前,調(diào)用 setDaemon()函數(shù)設(shè)定線程的 daemon 標(biāo)志 (thread.setDaemon(True))就表示這個線程“不重要”
如果你想要等待子線程完成再退出,那就什么都不用做,或者顯式地調(diào)用 thread.setDaemon(False)以保證其 daemon 標(biāo)志為 False。你可以調(diào)用 thread.isDaemon()函數(shù)來判 斷其 daemon 標(biāo)志的值。新的子線程會繼承其父線程的 daemon 標(biāo)志。整個 Python 會在所有的非守護(hù) 線程退出后才會結(jié)束,即進(jìn)程中沒有非守護(hù)線程存在的時候才結(jié)束。
Thread類提供了以下方法:
run(): 用以表示線程活動的方法。
start():啟動線程活動。
join([time]): 等待至線程中止。這阻塞調(diào)用線程直至線程的join() 方法被調(diào)用中止-正常退出或者拋出未處理的異常-或者是可選的超時發(fā)生。
is_alive(): 返回線程是否活動的。
name(): 設(shè)置/返回線程名。
daemon(): 返回/設(shè)置線程的 daemon 標(biāo)志,一定要在調(diào)用 start()函數(shù)前設(shè)置
用 Thread 類,你可以用多種方法來創(chuàng)建線程。我們在這里介紹三種比較相像的方法。
創(chuàng)建一個Thread的實(shí)例,傳給它一個函數(shù)
創(chuàng)建一個Thread的實(shí)例,傳給它一個可調(diào)用的類對象
從Thread派生出一個子類,創(chuàng)建一個這個子類的實(shí)例
下邊是三種不同方式的創(chuàng)建線程的示例:
#! -*- coding: utf-8 -*- # 創(chuàng)建一個Thread的實(shí)例,傳給它一個函數(shù) import threading from time import sleep, time loops = [4, 2] def loop(nloop, nsec, lock): print("start loop %s at: %s" % (nloop, time())) sleep(nsec) print("loop %s done at: %s" % (nloop, time())) # 每個線程都會被分配一個事先已經(jīng)獲得的鎖,在 sleep()的時間到了之后就釋放 相應(yīng)的鎖以通知主線程,這個線程已經(jīng)結(jié)束了。 def main(): print("starting at:", time()) threads = [] nloops = range(len(loops)) for i in nloops: t = threading.Thread(target=loop, args=(i, loops[i])) threads.append(t) for i in nloops: # start threads threads[i].start() for i in nloops: # wait for all # join()會等到線程結(jié)束,或者在給了 timeout 參數(shù)的時候,等到超時為止。 # 使用 join()看上去 會比使用一個等待鎖釋放的無限循環(huán)清楚一些(這種鎖也被稱為"spinlock") threads[i].join() # threads to finish print("all DONE at:", time()) if __name__ == "__main__": main()
與傳一個函數(shù)很相似的另一個方法是在創(chuàng)建線程的時候,傳一個可調(diào)用的類的實(shí)例供線程啟動 的時候執(zhí)行——這是多線程編程的一個更為面向?qū)ο蟮姆椒āO鄬τ谝粋€或幾個函數(shù)來說,由于類 對象里可以使用類的強(qiáng)大的功能,可以保存更多的信息,這種方法更為靈活
#! -*- coding: utf-8 -*- # 創(chuàng)建一個 Thread 的實(shí)例,傳給它一個可調(diào)用的類對象 from threading import Thread from time import sleep, time loops = [4, 2] class ThreadFunc(object): def __init__(self, func, args, name=""): self.name = name self.func = func self.args = args def __call__(self): # 創(chuàng)建新線程的時候,Thread 對象會調(diào)用我們的 ThreadFunc 對象,這時會用到一個特殊函數(shù) __call__()。 self.func(*self.args) def loop(nloop, nsec): print("start loop %s at: %s" % (nloop, time())) sleep(nsec) print("loop %s done at: %s" % (nloop, time())) def main(): print("starting at:", time()) threads = [] nloops = range(len(loops)) for i in nloops: t = Thread(target=ThreadFunc(loop, (i, loops[i]), loop.__name__)) threads.append(t) for i in nloops: # start threads threads[i].start() for i in nloops: # wait for all # join()會等到線程結(jié)束,或者在給了 timeout 參數(shù)的時候,等到超時為止。 # 使用 join()看上去 會比使用一個等待鎖釋放的無限循環(huán)清楚一些(這種鎖也被稱為"spinlock") threads[i].join() # threads to finish print("all DONE at:", time()) if __name__ == "__main__": main()
最后一個例子介紹如何子類化 Thread 類,這與上一個例子中的創(chuàng)建一個可調(diào)用的類非常像。使用子類化創(chuàng)建線程(第 29-30 行)使代碼看上去更清晰明了。
#! -*- coding: utf-8 -*- # 創(chuàng)建一個 Thread 的實(shí)例,傳給它一個可調(diào)用的類對象 from threading import Thread from time import sleep, time loops = [4, 2] class MyThread(Thread): def __init__(self, func, args, name=""): super(MyThread, self).__init__() self.name = name self.func = func self.args = args def getResult(self): return self.res def run(self): # 創(chuàng)建新線程的時候,Thread 對象會調(diào)用我們的 ThreadFunc 對象,這時會用到一個特殊函數(shù) __call__()。 print "starting", self.name, "at:", time() self.res = self.func(*self.args) print self.name, "finished at:", time() def loop(nloop, nsec): print("start loop %s at: %s" % (nloop, time())) sleep(nsec) print("loop %s done at: %s" % (nloop, time())) def main(): print("starting at:", time()) threads = [] nloops = range(len(loops)) for i in nloops: t = MyThread(loop, (i, loops[i]), loop.__name__) threads.append(t) for i in nloops: # start threads threads[i].start() for i in nloops: # wait for all # join()會等到線程結(jié)束,或者在給了 timeout 參數(shù)的時候,等到超時為止。 # 使用 join()看上去 會比使用一個等待鎖釋放的無限循環(huán)清楚一些(這種鎖也被稱為"spinlock") threads[i].join() # threads to finish print("all DONE at:", time()) if __name__ == "__main__": main()下載國旗的例子
下面,我們接我們之前按之前并發(fā)的套路,用實(shí)現(xiàn)一下使用 threading 并發(fā)下載國旗
# python3 import threading from threading import Thread from flags import save_flag, show, main, get_flag class MyThread(Thread): def __init__(self, func, args, name=""): super(MyThread, self).__init__() self.name = name self.func = func self.args = args def getResult(self): return self.res def run(self): # 創(chuàng)建新線程的時候,Thread 對象會調(diào)用我們的 ThreadFunc 對象,這時會用到一個特殊函數(shù) __call__()。 self.res = self.func(*self.args) def download_one(cc): # <3> image = get_flag(cc) show(cc) save_flag(image, cc.lower() + ".gif") return cc def download_many(cc_list): threads = [] for cc in cc_list: thread = MyThread(download_one, (cc, ), download_one.__name__) threads.append(thread) for thread in threads: # 啟動線程 thread.start() for thread in threads: # wait for all # join()會等到線程結(jié)束,或者在給了 timeout 參數(shù)的時候,等到超時為止。 # 使用 join()看上去 會比使用一個等待鎖釋放的無限循環(huán)清楚一些(這種鎖也被稱為"spinlock") thread.join() return len(list(threads)) # <7> if __name__ == "__main__": main(download_many)
執(zhí)行代碼發(fā)現(xiàn)和使用協(xié)程相比速度基本一致。
除了各種同步對象和線程對象外,threading 模塊還 供了一些函數(shù)。
active_count(): 當(dāng)前活動的線程對象的數(shù)量
current_thread(): 返回當(dāng)前線程對象
enumerate(): 返回當(dāng)前活動線程的列表
settrace(func): 為所有線程設(shè)置一個跟蹤函數(shù)
setprofile(func): 為所有線程設(shè)置一個 profile 函數(shù)
Lock & RLock原語鎖定是一個同步原語,狀態(tài)是鎖定或未鎖定。兩個方法acquire()和release() 用于加鎖和釋放鎖。
RLock 可重入鎖是一個類似于Lock對象的同步原語,但同一個線程可以多次調(diào)用。
Lock 不支持遞歸加鎖,也就是說即便在同 線程中,也必須等待鎖釋放。通常建議改 RLock, 它會處理 "owning thread" 和 "recursion level" 狀態(tài),對于同 線程的多次請求鎖 為,只累加
計數(shù)器。每次調(diào) release() 將遞減該計數(shù)器,直到 0 時釋放鎖,因此 acquire() 和 release() 必須 要成對出現(xiàn)。
from time import sleep from threading import current_thread, Thread lock = Rlock() def show(): with lock: print current_thread().name, i sleep(0.1) def test(): with lock: for i in range(3): show(i) for i in range(2): Thread(target=test).start()Event
事件用于在線程間通信。一個線程發(fā)出一個信號,其他一個或多個線程等待。
Event 通過通過 個內(nèi)部標(biāo)記來協(xié)調(diào)多線程運(yùn) 。 法 wait() 阻塞線程執(zhí) ,直到標(biāo)記為 True。 set() 將標(biāo)記設(shè)為 True,clear() 更改標(biāo)記為 False。isSet() 用于判斷標(biāo)記狀態(tài)。
from threading import Event def test_event(): e = Event() def test(): for i in range(5): print "start wait" e.wait() e.clear() # 如果不調(diào)用clear(),那么標(biāo)記一直為 True,wait()就不會發(fā)生阻塞行為 print i Thread(target=test).start() return e e = test_event()Condition
條件變量和 Lock 參數(shù)一樣,也是一個,也是一個同步原語,當(dāng)需要線程關(guān)注特定的狀態(tài)變化或事件的發(fā)生時使用這個鎖定。
可以認(rèn)為,除了Lock帶有的鎖定池外,Condition還包含一個等待池,池中的線程處于狀態(tài)圖中的等待阻塞狀態(tài),直到另一個線程調(diào)用notify()/notifyAll()通知;得到通知后線程進(jìn)入鎖定池等待鎖定。
構(gòu)造方法:
Condition([lock/rlock])
Condition 有以下這些方法:
acquire([timeout])/release(): 調(diào)用關(guān)聯(lián)的鎖的相應(yīng)方法。
wait([timeout]): 調(diào)用這個方法將使線程進(jìn)入Condition的等待池等待通知,并釋放鎖。使用前線程必須已獲得鎖定,否則將拋出異常。
notify(): 調(diào)用這個方法將從等待池挑選一個線程并通知,收到通知的線程將自動調(diào)用acquire()嘗試獲得鎖定(進(jìn)入鎖定池);其他線程仍然在等待池中。調(diào)用這個方法不會釋放鎖定。使用前線程必須已獲得鎖定,否則將拋出異常。
notifyAll(): 調(diào)用這個方法將通知等待池中所有的線程,這些線程都將進(jìn)入鎖定池嘗試獲得鎖定。調(diào)用這個方法不會釋放鎖定。使用前線程必須已獲得鎖定,否則將拋出異常。
from threading import Condition, current_thread, Thread con = Condition() def tc1(): with con: for i in range(5): print current_thread().name, i sleep(0.3) if i == 3: con.wait() def tc2(): with con: for i in range(5): print current_thread().name, i sleep(0.1) con.notify() Thread(target=tc1).start() Thread(target=tc2).start() Thread-1 0 Thread-1 1 Thread-1 2 Thread-1 3 # 讓出鎖 Thread-2 0 Thread-2 1 Thread-2 2 Thread-2 3 Thread-2 4 Thread-1 4 # 重新獲取鎖,繼續(xù)執(zhí)
只有獲取鎖的線程才能調(diào)用 wait() 和 notify(),因此必須在鎖釋放前調(diào)用。
當(dāng) wait() 釋放鎖后,其他線程也可進(jìn)入 wait 狀態(tài)。notifyAll() 激活所有等待線程,讓它們?nèi)屾i然后完成后續(xù)執(zhí)行。
現(xiàn)在我們用一個經(jīng)典的(生產(chǎn)者消費(fèi)者)例子來介紹一下 Queue模塊。
生產(chǎn)者消費(fèi)者的場景是: 生產(chǎn)者生產(chǎn)貨物,然后把貨物放到一個隊列之類的數(shù)據(jù)結(jié)構(gòu)中,生產(chǎn)貨物所要花費(fèi)的時間無法預(yù)先確定。消費(fèi)者消耗生產(chǎn)者生產(chǎn)的貨物的時間也是不確定的。
常用的 Queue 模塊的屬性:
queue(size): 創(chuàng)建一個大小為size的Queue對象。
qsize(): 返回隊列的大小(由于在返回的時候,隊列可能會被其它線程修改,所以這個值是近似值)
empty(): 如果隊列為空返回 True,否則返回 False
full(): 如果隊列已滿返回 True,否則返回 False
put(item,block=0): 把item放到隊列中,如果給了block(不為0),函數(shù)會一直阻塞到隊列中有空間為止
get(block=0): 從隊列中取一個對象,如果給了 block(不為 0),函數(shù)會一直阻塞到隊列中有對象為止
Queue 模塊可以用來進(jìn)行線程間通訊,讓各個線程之間共享數(shù)據(jù)。
現(xiàn)在,我們創(chuàng)建一個隊列,讓 生產(chǎn)者(線程)把新生產(chǎn)的貨物放進(jìn)去供消費(fèi)者(線程)使用。
# python2 #! -*- coding: utf-8 -*- from Queue import Queue from random import randint from time import sleep, time from threading import Thread class MyThread(Thread): def __init__(self, func, args, name=""): super(MyThread, self).__init__() self.name = name self.func = func self.args = args def getResult(self): return self.res def run(self): # 創(chuàng)建新線程的時候,Thread 對象會調(diào)用我們的 ThreadFunc 對象,這時會用到一個特殊函數(shù) __call__()。 print "starting", self.name, "at:", time() self.res = self.func(*self.args) print self.name, "finished at:", time() # writeQ()和 readQ()函數(shù)分別用來把對象放入隊列和消耗隊列中的一個對象。在這里我們使用 字符串"xxx"來表示隊列中的對象。 def writeQ(queue): print "producing object for Q..." queue.put("xxx", 1) print "size now", queue.qsize() def readQ(queue): queue.get(1) print("consumed object from Q... size now", queue.qsize()) def writer(queue, loops): # writer()函數(shù)只做一件事,就是一次往隊列中放入一個對象,等待一會,然后再做同樣的事 for i in range(loops): writeQ(queue) sleep(1) def reader(queue, loops): # reader()函數(shù)只做一件事,就是一次從隊列中取出一個對象,等待一會,然后再做同樣的事 for i in range(loops): readQ(queue) sleep(randint(2, 5)) # 設(shè)置有多少個線程要被運(yùn)行 funcs = [writer, reader] nfuncs = range(len(funcs)) def main(): nloops = randint(10, 20) q = Queue(32) threads = [] for i in nfuncs: t = MyThread(funcs[i], (q, nloops), funcs[i].__name__) threads.append(t) for i in nfuncs: threads[i].start() for i in nfuncs: threads[i].join() print threads[i].getResult() print "all DONE" if __name__ == "__main__": main()FAQ
進(jìn)程(有時被稱為重量級進(jìn)程)是程序的一次 執(zhí)行。每個進(jìn)程都有自己的地址空間,內(nèi)存,數(shù)據(jù)棧以及其它記錄其運(yùn)行軌跡的輔助數(shù)據(jù)。
線程(有時被稱為輕量級進(jìn)程)跟進(jìn)程有些相似,不同的是,所有的線程運(yùn)行在同一個進(jìn)程中, 共享相同的運(yùn)行環(huán)境。它們可以想像成是在主進(jìn)程或“主線程”中并行運(yùn)行的“迷你進(jìn)程”。
這篇文章很好的解釋了 線程和進(jìn)程的區(qū)別,推薦閱讀: http://www.ruanyifeng.com/blo...
由于GIL的緣故,對所有面向 I/O 的(會調(diào)用內(nèi)建的操作系統(tǒng) C 代碼的)程序來說,GIL 會在這個 I/O 調(diào)用之 前被釋放,以允許其它的線程在這個線程等待 I/O 的時候運(yùn)行。如果某線程并未使用很多 I/O 操作, 它會在自己的時間片內(nèi)一直占用處理器(和 GIL)。也就是說,I/O 密集型的 Python 程序比計算密集 型的程序更能充分利用多線程環(huán)境的好處。
Python的線程就是C語言的一個pthread,并通過操作系統(tǒng)調(diào)度算法進(jìn)行調(diào)度(例如linux是CFS)。為了讓各個線程能夠平均利用CPU時間,python會計算當(dāng)前已執(zhí)行的微代碼數(shù)量,達(dá)到一定閾值后就強(qiáng)制釋放GIL。而這時也會觸發(fā)一次操作系統(tǒng)的線程調(diào)度(當(dāng)然是否真正進(jìn)行上下文切換由操作系統(tǒng)自主決定)。
偽代碼
while True: acquire GIL for i in 1000: do something release GIL /* Give Operating System a chance to do thread scheduling */
這種模式在只有一個CPU核心的情況下毫無問題。任何一個線程被喚起時都能成功獲得到GIL(因?yàn)橹挥嗅尫帕薌IL才會引發(fā)線程調(diào)度)。
但當(dāng)CPU有多個核心的時候,問題就來了。從偽代碼可以看到,從release GIL到acquire GIL之間幾乎是沒有間隙的。所以當(dāng)其他在其他核心上的線程被喚醒時,大部分情況下主線程已經(jīng)又再一次獲取到GIL了。這個時候被喚醒執(zhí)行的線程只能白白的浪費(fèi)CPU時間,看著另一個線程拿著GIL歡快的執(zhí)行著。然后達(dá)到切換時間后進(jìn)入待調(diào)度狀態(tài),再被喚醒,再等待,以此往復(fù)惡性循環(huán)。
簡單的總結(jié)下就是:Python的多線程在多核CPU上,只對于IO密集型計算產(chǎn)生正面效果;而當(dāng)有至少有一個CPU密集型線程存在,那么多線程效率會由于GIL而大幅下降。
進(jìn)程與線程的一個簡單解釋 http://www.ruanyifeng.com/blog/2013/04/processes_and_threads.html
Python的GIL是什么鬼,多線程性能究竟如何 http://cenalulu.github.io/python/gil-in-python/
Python的全局鎖問題 http://python3-cookbook.readthedocs.io/zh_CN/latest/c12/p09_dealing_with_gil_stop_worring_about_it.html
Python線程指南 http://www.cnblogs.com/huxi/archive/2010/06/26/1765808.html
>歡迎關(guān)注 | >請我喝芬達(dá) |
---|---|
文章版權(quán)歸作者所有,未經(jīng)允許請勿轉(zhuǎn)載,若此文章存在違規(guī)行為,您可以聯(lián)系管理員刪除。
轉(zhuǎn)載請注明本文地址:http://specialneedsforspecialkids.com/yun/38518.html
摘要:每個在同一時間只能執(zhí)行一個線程在單核下的多線程其實(shí)都只是并發(fā),不是并行,并發(fā)和并行從宏觀上來講都是同時處理多路請求的概念。在多線程下,每個線程的執(zhí)行方式獲取執(zhí)行代碼直到或者是虛擬機(jī)將其掛起。拿不到通行證的線程,就不允許進(jìn)入執(zhí)行。 進(jìn)程與線程 并發(fā)與并行 進(jìn)程與線程 首先要理解的是,我們的軟件都是運(yùn)行在操作系統(tǒng)之上,操作系統(tǒng)再控制硬件,比如 處理器、內(nèi)存、IO設(shè)備等。操作系統(tǒng)為了向上...
摘要:酷睿代在年取代了奔騰,主頻遠(yuǎn)低于此。該詞被敏捷開發(fā)團(tuán)隊使用較多,含義與形式會略有不同,更改已經(jīng)開始將垃圾收集器的狀態(tài)轉(zhuǎn)到解釋器,因此每個子解釋器將擁有它自己的本該如此。結(jié)論死亡了嗎對于單線程的應(yīng)用程序,仍然存活。showImg(https://user-gold-cdn.xitu.io/2019/5/19/16ad09f554fdf443); 本文原創(chuàng)并首發(fā)于公眾號【Python貓】,未經(jīng)授...
摘要:酷睿代在年取代了奔騰,主頻遠(yuǎn)低于此。該詞被敏捷開發(fā)團(tuán)隊使用較多,含義與形式會略有不同,更改已經(jīng)開始將垃圾收集器的狀態(tài)轉(zhuǎn)到解釋器,因此每個子解釋器將擁有它自己的本該如此。結(jié)論死亡了嗎對于單線程的應(yīng)用程序,仍然存活。showImg(https://user-gold-cdn.xitu.io/2019/5/19/16ad09f554fdf443); 本文原創(chuàng)并首發(fā)于公眾號【Python貓】,未經(jīng)授...
摘要:酷睿代在年取代了奔騰,主頻遠(yuǎn)低于此。該詞被敏捷開發(fā)團(tuán)隊使用較多,含義與形式會略有不同,更改已經(jīng)開始將垃圾收集器的狀態(tài)轉(zhuǎn)到解釋器,因此每個子解釋器將擁有它自己的本該如此。結(jié)論死亡了嗎對于單線程的應(yīng)用程序,仍然存活。 showImg(https://segmentfault.com/img/remote/1460000019229774); 本文原創(chuàng)并首發(fā)于公眾號【Python貓】,未經(jīng)授...
摘要:進(jìn)程線程切換都需要使用一定的時間。子進(jìn)程在中,如果要運(yùn)行系統(tǒng)命令,會使用來運(yùn)行,官方建議使用方法來運(yùn)行系統(tǒng)命令,更高級的用法是直接使用其接口。 多線程 簡單示例 對于CPU計算密集型的任務(wù),python的多線程跟單線程沒什么區(qū)別,甚至有可能會更慢,但是對于IO密集型的任務(wù),比如http請求這類任務(wù),python的多線程還是有用處。在日常的使用中,經(jīng)常會結(jié)合多線程和隊列一起使用,比如,以...
閱讀 3708·2023-04-26 00:56
閱讀 2686·2021-09-30 10:01
閱讀 961·2021-09-22 15:30
閱讀 3915·2021-09-07 10:21
閱讀 1506·2021-09-02 15:40
閱讀 2750·2021-08-30 09:47
閱讀 1234·2021-08-16 10:57
閱讀 1862·2019-08-30 14:01