国产xxxx99真实实拍_久久不雅视频_高清韩国a级特黄毛片_嗯老师别我我受不了了小说

資訊專欄INFORMATION COLUMN

python并發(fā)4:使用thread處理并發(fā)

joywek / 1578人閱讀

摘要:如果某線程并未使用很多操作,它會在自己的時間片內(nèi)一直占用處理器和。在中使用線程在和等大多數(shù)類系統(tǒng)上運(yùn)行時,支持多線程編程。守護(hù)線程另一個避免使用模塊的原因是,它不支持守護(hù)線程。

這一篇是Python并發(fā)的第四篇,主要介紹進(jìn)程和線程的定義,Python線程和全局解釋器鎖以及Python如何使用thread模塊處理并發(fā)

引言&動機(jī)

考慮一下這個場景,我們有10000條數(shù)據(jù)需要處理,處理每條數(shù)據(jù)需要花費(fèi)1秒,但讀取數(shù)據(jù)只需要0.1秒,每條數(shù)據(jù)互不干擾。該如何執(zhí)行才能花費(fèi)時間最短呢?

在多線程(MT)編程出現(xiàn)之前,電腦程序的運(yùn)行由一個執(zhí)行序列組成,執(zhí)行序列按順序在主機(jī)的中央處理器(CPU)中運(yùn)行。無論是任務(wù)本身要求順序執(zhí)行還是整個程序是由多個子任務(wù)組成,程序都是按這種方式執(zhí)行的。即使子任務(wù)相互獨(dú)立,互相無關(guān)(即,一個子任務(wù)的結(jié)果不影響其它子 任務(wù)的結(jié)果)時也是這樣。

對于上邊的問題,如果使用一個執(zhí)行序列來完成,我們大約需要花費(fèi) 10000*0.1 + 10000 = 11000 秒。這個時間顯然是太長了。

那我們有沒有可能在執(zhí)行計算的同時取數(shù)據(jù)呢?或者是同時處理幾條數(shù)據(jù)呢?如果可以,這樣就能大幅提高任務(wù)的效率。這就是多線程編程的目的。

對于本質(zhì)上就是異步的, 需要有多個并發(fā)事務(wù),各個事務(wù)的運(yùn)行順序可以是不確定的,隨機(jī)的,不可預(yù)測的問題,多線程是最理想的解決方案。這樣的任務(wù)可以被分成多個執(zhí)行流,每個流都有一個要完成的目標(biāo),然后將得到的結(jié)果合并,得到最終的結(jié)果。

線程和進(jìn)程 什么是進(jìn)程

進(jìn)程(有時被稱為重量級進(jìn)程)是程序的一次 執(zhí)行。每個進(jìn)程都有自己的地址空間,內(nèi)存,數(shù)據(jù)棧以及其它記錄其運(yùn)行軌跡的輔助數(shù)據(jù)。操作系 統(tǒng)管理在其上運(yùn)行的所有進(jìn)程,并為這些進(jìn)程公平地分配時間。進(jìn)程也可以通過 fork 和 spawn 操作 來完成其它的任務(wù)。不過各個進(jìn)程有自己的內(nèi)存空間,數(shù)據(jù)棧等,所以只能使用進(jìn)程間通訊(IPC), 而不能直接共享信息。

什么是線程

線程(有時被稱為輕量級進(jìn)程)跟進(jìn)程有些相似,不同的是,所有的線程運(yùn)行在同一個進(jìn)程中, 共享相同的運(yùn)行環(huán)境。它們可以想像成是在主進(jìn)程或“主線程”中并行運(yùn)行的“迷你進(jìn)程”。

線程狀態(tài)如圖

線程有開始,順序執(zhí)行和結(jié)束三部分。它有一個自己的指令指針,記錄自己運(yùn)行到什么地方。 線程的運(yùn)行可能被搶占(中斷),或暫時的被掛起(也叫睡眠),讓其它的線程運(yùn)行,這叫做讓步。 一個進(jìn)程中的各個線程之間共享同一片數(shù)據(jù)空間,所以線程之間可以比進(jìn)程之間更方便地共享數(shù)據(jù)以及相互通訊。

當(dāng)然,這樣的共享并不是完全沒有危險的。如果多個線程共同訪問同一片數(shù)據(jù),則由于數(shù)據(jù)訪 問的順序不一樣,有可能導(dǎo)致數(shù)據(jù)結(jié)果的不一致的問題。這叫做競態(tài)條件(race condition)。

線程一般都是并發(fā)執(zhí)行的,不過在單 CPU 的系統(tǒng)中,真正的并發(fā)是不可能的,每個線程會被安排成每次只運(yùn)行一小會,然后就把 CPU 讓出來,讓其它的線程去運(yùn)行。由于有的函數(shù)會在完成之前阻塞住,在沒有特別為多線程做修改的情 況下,這種“貪婪”的函數(shù)會讓 CPU 的時間分配有所傾斜。導(dǎo)致各個線程分配到的運(yùn)行時間可能不 盡相同,不盡公平。

Python、線程和全局解釋器鎖 全局解釋器鎖(GIL)

首先需要明確的一點(diǎn)是GIL并不是Python的特性,它是在實(shí)現(xiàn)Python解析器(CPython)時所引入的一個概念。就好比C++是一套語言(語法)標(biāo)準(zhǔn),但是可以用不同的編譯器來編譯成可執(zhí)行代碼。同樣一段代碼可以通過CPython,PyPy,Psyco等不同的Python執(zhí)行環(huán)境來執(zhí)行(其中的JPython就沒有GIL)。

那么CPython實(shí)現(xiàn)中的GIL又是什么呢?GIL全稱Global Interpreter Lock為了避免誤導(dǎo),我們還是來看一下官方給出的解釋:

In CPython, the global interpreter lock, or GIL, is a mutex that prevents multiple native threads from executing Python bytecodes at once. This lock is necessary mainly because CPython’s memory management is not thread-safe. (However, since the GIL exists, other features have grown to depend on the guarantees that it enforces.)

盡管Python完全支持多線程編程, 但是解釋器的C語言實(shí)現(xiàn)部分在完全并行執(zhí)行時并不是線程安全的。 實(shí)際上,解釋器被一個全局解釋器鎖保護(hù)著,它確保任何時候都只有一個Python線程執(zhí)行。

在多線程環(huán)境中,Python 虛擬機(jī)按以下方式執(zhí)行:

設(shè)置GIL

切換到一個線程去執(zhí)行

運(yùn)行

指定數(shù)量的字節(jié)碼指令

線程主動讓出控制(可以調(diào)用time.sleep(0))

把線程設(shè)置完睡眠狀態(tài)

解鎖GIL

再次重復(fù)以上步驟

對所有面向 I/O 的(會調(diào)用內(nèi)建的操作系統(tǒng) C 代碼的)程序來說,GIL 會在這個 I/O 調(diào)用之 前被釋放,以允許其它的線程在這個線程等待 I/O 的時候運(yùn)行。如果某線程并未使用很多 I/O 操作, 它會在自己的時間片內(nèi)一直占用處理器(和 GIL)。也就是說,I/O 密集型的 Python 程序比計算密集 型的程序更能充分利用多線程環(huán)境的好處。

退出線程

當(dāng)一個線程結(jié)束計算,它就退出了。線程可以調(diào)用 thread.exit()之類的退出函數(shù),也可以使用 Python 退出進(jìn)程的標(biāo)準(zhǔn)方法,如 sys.exit()或拋出一個 SystemExit 異常等。不過,你不可以直接 “殺掉”("kill")一個線程。

在 Python 中使用線程

在 Win32 和 Linux, Solaris, MacOS, *BSD 等大多數(shù)類 Unix 系統(tǒng)上運(yùn)行時,Python 支持多線程 編程。Python 使用 POSIX 兼容的線程,即 pthreads。

默認(rèn)情況下,只要在解釋器中

>> import thread

如果沒有報錯,則說明線程可用。

Python 的 threading 模塊

Python 供了幾個用于多線程編程的模塊,包括 thread, threading 和 Queue 等。thread 和 threading 模塊允許程序員創(chuàng)建和管理線程。thread 模塊 供了基本的線程和鎖的支持,而 threading 供了更高級別,功能更強(qiáng)的線程管理的功能。Queue 模塊允許用戶創(chuàng)建一個可以用于多個線程之間 共享數(shù)據(jù)的隊列數(shù)據(jù)結(jié)構(gòu)。

核心 示:避免使用 thread 模塊

出于以下幾點(diǎn)考慮,我們不建議您使用 thread 模塊。

更高級別的 threading 模塊更為先 進(jìn),對線程的支持更為完善,而且使用 thread 模塊里的屬性有可能會與 threading 出現(xiàn)沖突。其次, 低級別的 thread 模塊的同步原語很少(實(shí)際上只有一個),而 threading 模塊則有很多。

對于你的進(jìn)程什么時候應(yīng)該結(jié)束完全沒有控制,當(dāng)主線程結(jié)束 時,所有的線程都會被強(qiáng)制結(jié)束掉,沒有警告也不會有正常的清除工作。我們之前說過,至少 threading 模塊能確保重要的子線程退出后進(jìn)程才退出。

thread 模塊

除了產(chǎn)生線程外,thread 模塊也提供了基本的同步數(shù) 據(jù)結(jié)構(gòu)鎖對象(lock object,也叫原語鎖,簡單鎖,互斥鎖,互斥量,二值信號量)。

thread 模塊函數(shù)

start_new_thread(function, args, kwargs=None):產(chǎn)生一個新的線程,在新線程中用指定的參數(shù)和可選的 kwargs 來調(diào)用這個函數(shù)。

allocate_lock():分配一個 LockType 類型的鎖對象

exit():讓線程退出

acquire(wait=None):嘗試獲取鎖對象

locked():如果獲取了鎖對象返回 True,否則返回 False

release():釋放鎖

下面是一個使用 thread 的例子:

import thread
from time import sleep, time


def loop(num):
    print("start loop at:", time())
    sleep(num)
    print("loop done at:", time())


def loop1(num):
    print("start loop 1 at:", time())
    sleep(num)
    print("loop 1 done at:", time())


def main():
    print("starting at:", time())
    thread.start_new_thread(loop, (4,))
    thread.start_new_thread(loop1, (5,))
    sleep(6)
    print("all DONE at:", time())

if __name__ == "__main__":
    main()

("starting at:", 1489387024.886667)
("start loop at:", 1489387024.88705)
("start loop 1 at:", 1489387024.887277)
("loop done at:", 1489387028.888182)
("loop 1 done at:", 1489387029.888904)
("all DONE at:", 1489387030.889918)

start_new_thread()要求一定要有前兩個參數(shù)。所以,就算我們想要運(yùn)行的函數(shù)不要參數(shù),也要傳一個空的元組。
為什么要加上sleep(6)這一句呢? 因?yàn)椋绻覀儧]有讓主線程停下來,那主線程就會運(yùn)行下一條語句,顯示 “all done”,然后就關(guān)閉運(yùn)行著 loop()和 loop1()的兩個線程,退出了。

我們有沒有更好的辦法替換使用sleep() 這種不靠譜的同步方式呢?答案是使用鎖,使用了鎖,我們就可以在兩個線程都退出之后馬上退出。

#! -*- coding: utf-8 -*-

import thread
from time import sleep, time

loops = [4, 2]

def loop(nloop, nsec, lock):
    print("start loop %s at: %s" % (nloop, time()))
    sleep(nsec)
    print("loop %s done at: %s" % (nloop, time()))
    # 每個線程都會被分配一個事先已經(jīng)獲得的鎖,在 sleep()的時間到了之后就釋放 相應(yīng)的鎖以通知主線程,這個線程已經(jīng)結(jié)束了。
    lock.release()


def main():
    print("starting at:", time())
    locks = []
    nloops = range(len(loops))

    for i in nloops:
        # 調(diào)用 thread.allocate_lock()函數(shù)創(chuàng)建一個鎖的列表
        lock = thread.allocate_lock()
        # 分別調(diào)用各個鎖的 acquire()函數(shù)獲得, 獲得鎖表示“把鎖鎖上”
        lock.acquire()
        locks.append(lock)

    for i in nloops:
        # 創(chuàng)建線程,每個線程都用各自的循環(huán)號,睡眠時間和鎖為參數(shù)去調(diào)用 loop()函數(shù)
        thread.start_new_thread(loop, (i, loops[i], locks[i]))

    for i in nloops:
        # 在線程結(jié)束的時候,線程要自己去做解鎖操作
        # 當(dāng)前循環(huán)只是坐在那一直等(達(dá)到暫停主 線程的目的),直到兩個鎖都被解鎖為止才繼續(xù)運(yùn)行。
        while locks[i].locked(): pass

    print("all DONE at:", time())

if __name__ == "__main__":
    main()

為什么我們不在創(chuàng)建鎖的循環(huán)里創(chuàng)建線程呢?有以下幾個原因:

我們想到實(shí)現(xiàn)線程的同步,所以要讓“所有的馬同時沖出柵欄”。

獲取鎖要花一些時間,如果你的 線程退出得“太快”,可能會導(dǎo)致還沒有獲得鎖,線程就已經(jīng)結(jié)束了的情況。

threading 模塊

threading 模塊不僅提供了 Thread 類,還提供了各種非常好用的同步機(jī)制。

下面是threading 模塊里所有的對象:

Thread: 表示一個線程的執(zhí)行的對象

Lock: 鎖原語對象(跟 thread 模塊里的鎖對象相同)

RLock: 可重入鎖對象。使單線程可以再次獲得已經(jīng)獲得了的鎖(遞歸鎖定)。

Condition: 條件變量對象能讓一個線程停下來,等待其它線程滿足了某個“條件”。 如,狀態(tài)的改變或值的改變。

Event: 通用的條件變量。多個線程可以等待某個事件的發(fā)生,在事件發(fā)生后, 所有的線程都會被激活。

Semaphore: 為等待鎖的線程 供一個類似“等候室”的結(jié)構(gòu)

BoundedSemaphore: 與 Semaphore 類似,只是它不允許超過初始值

Timer: 與 Thread 相似,只是,它要等待一段時間后才開始運(yùn)行。

守護(hù)線程

另一個避免使用 thread 模塊的原因是,它不支持守護(hù)線程。當(dāng)主線程退出時,所有的子線程不 論它們是否還在工作,都會被強(qiáng)行退出。有時,我們并不期望這種行為,這時,就引入了守護(hù)線程 的概念
threading 模塊支持守護(hù)線程,它們是這樣工作的:守護(hù)線程一般是一個等待客戶請求的服務(wù)器, 如果沒有客戶 出請求,它就在那等著。如果你設(shè)定一個線程為守護(hù)線程,就表示你在說這個線程 是不重要的,在進(jìn)程退出的時候,不用等待這個線程退出。
如果你的主線程要退出的時候,不用等待那些子線程完成,那就設(shè)定這些線程的 daemon 屬性。 即,在線程開始(調(diào)用 thread.start())之前,調(diào)用 setDaemon()函數(shù)設(shè)定線程的 daemon 標(biāo)志 (thread.setDaemon(True))就表示這個線程“不重要”
如果你想要等待子線程完成再退出,那就什么都不用做,或者顯式地調(diào)用 thread.setDaemon(False)以保證其 daemon 標(biāo)志為 False。你可以調(diào)用 thread.isDaemon()函數(shù)來判 斷其 daemon 標(biāo)志的值。新的子線程會繼承其父線程的 daemon 標(biāo)志。整個 Python 會在所有的非守護(hù) 線程退出后才會結(jié)束,即進(jìn)程中沒有非守護(hù)線程存在的時候才結(jié)束。

Thread 類

Thread類提供了以下方法:

run(): 用以表示線程活動的方法。

start():啟動線程活動。

join([time]): 等待至線程中止。這阻塞調(diào)用線程直至線程的join() 方法被調(diào)用中止-正常退出或者拋出未處理的異常-或者是可選的超時發(fā)生。

is_alive(): 返回線程是否活動的。

name(): 設(shè)置/返回線程名。

daemon(): 返回/設(shè)置線程的 daemon 標(biāo)志,一定要在調(diào)用 start()函數(shù)前設(shè)置

用 Thread 類,你可以用多種方法來創(chuàng)建線程。我們在這里介紹三種比較相像的方法。

創(chuàng)建一個Thread的實(shí)例,傳給它一個函數(shù)

創(chuàng)建一個Thread的實(shí)例,傳給它一個可調(diào)用的類對象

從Thread派生出一個子類,創(chuàng)建一個這個子類的實(shí)例

下邊是三種不同方式的創(chuàng)建線程的示例:

#! -*- coding: utf-8 -*-

# 創(chuàng)建一個Thread的實(shí)例,傳給它一個函數(shù)

import threading
from time import sleep, time

loops = [4, 2]

def loop(nloop, nsec, lock):
    print("start loop %s at: %s" % (nloop, time()))
    sleep(nsec)
    print("loop %s done at: %s" % (nloop, time()))
    # 每個線程都會被分配一個事先已經(jīng)獲得的鎖,在 sleep()的時間到了之后就釋放 相應(yīng)的鎖以通知主線程,這個線程已經(jīng)結(jié)束了。


def main():
    print("starting at:", time())
    threads = []
    nloops = range(len(loops))

    for i in nloops:
        t = threading.Thread(target=loop, args=(i, loops[i]))
        threads.append(t)

    for i in nloops:
        # start threads
        threads[i].start()

    for i in nloops:
        # wait for all
        # join()會等到線程結(jié)束,或者在給了 timeout 參數(shù)的時候,等到超時為止。
        # 使用 join()看上去 會比使用一個等待鎖釋放的無限循環(huán)清楚一些(這種鎖也被稱為"spinlock")
        threads[i].join()  # threads to finish

    print("all DONE at:", time())

if __name__ == "__main__":
    main()

與傳一個函數(shù)很相似的另一個方法是在創(chuàng)建線程的時候,傳一個可調(diào)用的類的實(shí)例供線程啟動 的時候執(zhí)行——這是多線程編程的一個更為面向?qū)ο蟮姆椒āO鄬τ谝粋€或幾個函數(shù)來說,由于類 對象里可以使用類的強(qiáng)大的功能,可以保存更多的信息,這種方法更為靈活

#! -*- coding: utf-8 -*-

# 創(chuàng)建一個 Thread 的實(shí)例,傳給它一個可調(diào)用的類對象

from threading import Thread
from time import sleep, time


loops = [4, 2]


class ThreadFunc(object):

    def __init__(self, func, args, name=""):
        self.name = name
        self.func = func
        self.args = args

    def __call__(self):
        # 創(chuàng)建新線程的時候,Thread 對象會調(diào)用我們的 ThreadFunc 對象,這時會用到一個特殊函數(shù) __call__()。
        self.func(*self.args)


def loop(nloop, nsec):
    print("start loop %s at: %s" % (nloop, time()))
    sleep(nsec)
    print("loop %s done at: %s" % (nloop, time()))


def main():
    print("starting at:", time())
    threads = []
    nloops = range(len(loops))

    for i in nloops:
        t = Thread(target=ThreadFunc(loop, (i, loops[i]), loop.__name__))
        threads.append(t)

    for i in nloops:
        # start threads
        threads[i].start()

    for i in nloops:
        # wait for all
        # join()會等到線程結(jié)束,或者在給了 timeout 參數(shù)的時候,等到超時為止。
        # 使用 join()看上去 會比使用一個等待鎖釋放的無限循環(huán)清楚一些(這種鎖也被稱為"spinlock")
        threads[i].join()  # threads to finish

    print("all DONE at:", time())


if __name__ == "__main__":
    main()

最后一個例子介紹如何子類化 Thread 類,這與上一個例子中的創(chuàng)建一個可調(diào)用的類非常像。使用子類化創(chuàng)建線程(第 29-30 行)使代碼看上去更清晰明了。

#! -*- coding: utf-8 -*-

# 創(chuàng)建一個 Thread 的實(shí)例,傳給它一個可調(diào)用的類對象

from threading import Thread
from time import sleep, time


loops = [4, 2]


class MyThread(Thread):

    def __init__(self, func, args, name=""):
        super(MyThread, self).__init__()
        self.name = name
        self.func = func
        self.args = args

    def getResult(self):
        return self.res

    def run(self):
        # 創(chuàng)建新線程的時候,Thread 對象會調(diào)用我們的 ThreadFunc 對象,這時會用到一個特殊函數(shù) __call__()。
        print "starting", self.name, "at:", time()
        self.res = self.func(*self.args)
        print self.name, "finished at:", time()



def loop(nloop, nsec):
    print("start loop %s at: %s" % (nloop, time()))
    sleep(nsec)
    print("loop %s done at: %s" % (nloop, time()))


def main():
    print("starting at:", time())
    threads = []
    nloops = range(len(loops))

    for i in nloops:
        t = MyThread(loop, (i, loops[i]), loop.__name__)
        threads.append(t)

    for i in nloops:
        # start threads
        threads[i].start()

    for i in nloops:
        # wait for all
        # join()會等到線程結(jié)束,或者在給了 timeout 參數(shù)的時候,等到超時為止。
        # 使用 join()看上去 會比使用一個等待鎖釋放的無限循環(huán)清楚一些(這種鎖也被稱為"spinlock")
        threads[i].join()  # threads to finish

    print("all DONE at:", time())


if __name__ == "__main__":
    main()
下載國旗的例子

下面,我們接我們之前按之前并發(fā)的套路,用實(shí)現(xiàn)一下使用 threading 并發(fā)下載國旗

# python3

import threading
from threading import Thread

from flags import save_flag, show, main, get_flag


class MyThread(Thread):

    def __init__(self, func, args, name=""):
        super(MyThread, self).__init__()
        self.name = name
        self.func = func
        self.args = args

    def getResult(self):
        return self.res

    def run(self):
        # 創(chuàng)建新線程的時候,Thread 對象會調(diào)用我們的 ThreadFunc 對象,這時會用到一個特殊函數(shù) __call__()。
        self.res = self.func(*self.args)


def download_one(cc):  # <3>
    image = get_flag(cc)
    show(cc)
    save_flag(image, cc.lower() + ".gif")
    return cc


def download_many(cc_list):
    threads = []
    for cc in cc_list:
        thread = MyThread(download_one, (cc, ), download_one.__name__)
        threads.append(thread)

    for thread in threads:
        # 啟動線程
        thread.start()

    for thread in threads:
        # wait for all
        # join()會等到線程結(jié)束,或者在給了 timeout 參數(shù)的時候,等到超時為止。
        # 使用 join()看上去 會比使用一個等待鎖釋放的無限循環(huán)清楚一些(這種鎖也被稱為"spinlock")
        thread.join()

    return len(list(threads))  # <7>


if __name__ == "__main__":
    main(download_many)

執(zhí)行代碼發(fā)現(xiàn)和使用協(xié)程相比速度基本一致。

除了各種同步對象和線程對象外,threading 模塊還 供了一些函數(shù)。

active_count(): 當(dāng)前活動的線程對象的數(shù)量

current_thread(): 返回當(dāng)前線程對象

enumerate(): 返回當(dāng)前活動線程的列表

settrace(func): 為所有線程設(shè)置一個跟蹤函數(shù)

setprofile(func): 為所有線程設(shè)置一個 profile 函數(shù)

Lock & RLock

原語鎖定是一個同步原語,狀態(tài)是鎖定或未鎖定。兩個方法acquire()和release() 用于加鎖和釋放鎖。
RLock 可重入鎖是一個類似于Lock對象的同步原語,但同一個線程可以多次調(diào)用。

Lock 不支持遞歸加鎖,也就是說即便在同 線程中,也必須等待鎖釋放。通常建議改 RLock, 它會處理 "owning thread" 和 "recursion level" 狀態(tài),對于同 線程的多次請求鎖 為,只累加
計數(shù)器。每次調(diào) release() 將遞減該計數(shù)器,直到 0 時釋放鎖,因此 acquire() 和 release() 必須 要成對出現(xiàn)。

from time import sleep
from threading import current_thread, Thread

lock = Rlock()

def show():
    with lock:
        print current_thread().name, i
        sleep(0.1)

def test():
    with lock:
        for i in range(3):
            show(i)

for i in range(2):
    Thread(target=test).start()
Event

事件用于在線程間通信。一個線程發(fā)出一個信號,其他一個或多個線程等待。
Event 通過通過 個內(nèi)部標(biāo)記來協(xié)調(diào)多線程運(yùn) 。 法 wait() 阻塞線程執(zhí) ,直到標(biāo)記為 True。 set() 將標(biāo)記設(shè)為 True,clear() 更改標(biāo)記為 False。isSet() 用于判斷標(biāo)記狀態(tài)。

from threading import Event

def test_event():
    e = Event()
    def test():
        for i in range(5):
            print "start wait"
            e.wait()
            e.clear()  # 如果不調(diào)用clear(),那么標(biāo)記一直為 True,wait()就不會發(fā)生阻塞行為
            print i
Thread(target=test).start()
return e


e = test_event()
Condition

條件變量和 Lock 參數(shù)一樣,也是一個,也是一個同步原語,當(dāng)需要線程關(guān)注特定的狀態(tài)變化或事件的發(fā)生時使用這個鎖定。

可以認(rèn)為,除了Lock帶有的鎖定池外,Condition還包含一個等待池,池中的線程處于狀態(tài)圖中的等待阻塞狀態(tài),直到另一個線程調(diào)用notify()/notifyAll()通知;得到通知后線程進(jìn)入鎖定池等待鎖定。

構(gòu)造方法:
Condition([lock/rlock])

Condition 有以下這些方法:

acquire([timeout])/release(): 調(diào)用關(guān)聯(lián)的鎖的相應(yīng)方法。

wait([timeout]): 調(diào)用這個方法將使線程進(jìn)入Condition的等待池等待通知,并釋放鎖。使用前線程必須已獲得鎖定,否則將拋出異常。

notify(): 調(diào)用這個方法將從等待池挑選一個線程并通知,收到通知的線程將自動調(diào)用acquire()嘗試獲得鎖定(進(jìn)入鎖定池);其他線程仍然在等待池中。調(diào)用這個方法不會釋放鎖定。使用前線程必須已獲得鎖定,否則將拋出異常。

notifyAll(): 調(diào)用這個方法將通知等待池中所有的線程,這些線程都將進(jìn)入鎖定池嘗試獲得鎖定。調(diào)用這個方法不會釋放鎖定。使用前線程必須已獲得鎖定,否則將拋出異常。

from threading import Condition, current_thread, Thread

con = Condition()

def tc1():
    with con:
        for i in range(5):
            print current_thread().name, i
            sleep(0.3)
            if i == 3:
                con.wait()


def tc2():
    with con:
        for i in range(5):
            print current_thread().name, i
            sleep(0.1)
            con.notify()

Thread(target=tc1).start()
Thread(target=tc2).start()

Thread-1 0
Thread-1 1
Thread-1 2
Thread-1 3    # 讓出鎖
Thread-2 0
Thread-2 1
Thread-2 2
Thread-2 3
Thread-2 4
Thread-1 4    # 重新獲取鎖,繼續(xù)執(zhí)

只有獲取鎖的線程才能調(diào)用 wait() 和 notify(),因此必須在鎖釋放前調(diào)用。
當(dāng) wait() 釋放鎖后,其他線程也可進(jìn)入 wait 狀態(tài)。notifyAll() 激活所有等待線程,讓它們?nèi)屾i然后完成后續(xù)執(zhí)行。

生產(chǎn)者-消費(fèi)者問題和 Queue 模塊

現(xiàn)在我們用一個經(jīng)典的(生產(chǎn)者消費(fèi)者)例子來介紹一下 Queue模塊。

生產(chǎn)者消費(fèi)者的場景是: 生產(chǎn)者生產(chǎn)貨物,然后把貨物放到一個隊列之類的數(shù)據(jù)結(jié)構(gòu)中,生產(chǎn)貨物所要花費(fèi)的時間無法預(yù)先確定。消費(fèi)者消耗生產(chǎn)者生產(chǎn)的貨物的時間也是不確定的。

常用的 Queue 模塊的屬性:

queue(size): 創(chuàng)建一個大小為size的Queue對象。

qsize(): 返回隊列的大小(由于在返回的時候,隊列可能會被其它線程修改,所以這個值是近似值)

empty(): 如果隊列為空返回 True,否則返回 False

full(): 如果隊列已滿返回 True,否則返回 False

put(item,block=0): 把item放到隊列中,如果給了block(不為0),函數(shù)會一直阻塞到隊列中有空間為止

get(block=0): 從隊列中取一個對象,如果給了 block(不為 0),函數(shù)會一直阻塞到隊列中有對象為止

Queue 模塊可以用來進(jìn)行線程間通訊,讓各個線程之間共享數(shù)據(jù)。

現(xiàn)在,我們創(chuàng)建一個隊列,讓 生產(chǎn)者(線程)把新生產(chǎn)的貨物放進(jìn)去供消費(fèi)者(線程)使用。

# python2
#! -*- coding: utf-8 -*-

from Queue import Queue
from random import randint
from time import sleep, time
from threading import Thread


class MyThread(Thread):

    def __init__(self, func, args, name=""):
        super(MyThread, self).__init__()
        self.name = name
        self.func = func
        self.args = args

    def getResult(self):
        return self.res

    def run(self):
        # 創(chuàng)建新線程的時候,Thread 對象會調(diào)用我們的 ThreadFunc 對象,這時會用到一個特殊函數(shù) __call__()。
        print "starting", self.name, "at:", time()
        self.res = self.func(*self.args)
        print self.name, "finished at:", time()


# writeQ()和 readQ()函數(shù)分別用來把對象放入隊列和消耗隊列中的一個對象。在這里我們使用 字符串"xxx"來表示隊列中的對象。
def writeQ(queue):
    print "producing object for Q..."
    queue.put("xxx", 1)
    print "size now", queue.qsize()


def readQ(queue):
    queue.get(1)
    print("consumed object from Q... size now", queue.qsize())


def writer(queue, loops):
    # writer()函數(shù)只做一件事,就是一次往隊列中放入一個對象,等待一會,然后再做同樣的事
    for i in range(loops):
        writeQ(queue)
        sleep(1)


def reader(queue, loops):
    # reader()函數(shù)只做一件事,就是一次從隊列中取出一個對象,等待一會,然后再做同樣的事
    for i in range(loops):
        readQ(queue)
        sleep(randint(2, 5))


# 設(shè)置有多少個線程要被運(yùn)行
funcs = [writer, reader]
nfuncs = range(len(funcs))


def main():
    nloops = randint(10, 20)
    q = Queue(32)
    threads = []

    for i in nfuncs:
        t = MyThread(funcs[i], (q, nloops), funcs[i].__name__)
        threads.append(t)

    for i in nfuncs:
        threads[i].start()

    for i in nfuncs:
        threads[i].join()
        print threads[i].getResult()

    print "all DONE"


if __name__ == "__main__":
    main()
FAQ
進(jìn)程與線程。線程與進(jìn)程的區(qū)別是什么?

進(jìn)程(有時被稱為重量級進(jìn)程)是程序的一次 執(zhí)行。每個進(jìn)程都有自己的地址空間,內(nèi)存,數(shù)據(jù)棧以及其它記錄其運(yùn)行軌跡的輔助數(shù)據(jù)。
線程(有時被稱為輕量級進(jìn)程)跟進(jìn)程有些相似,不同的是,所有的線程運(yùn)行在同一個進(jìn)程中, 共享相同的運(yùn)行環(huán)境。它們可以想像成是在主進(jìn)程或“主線程”中并行運(yùn)行的“迷你進(jìn)程”。

這篇文章很好的解釋了 線程和進(jìn)程的區(qū)別,推薦閱讀: http://www.ruanyifeng.com/blo...

Python 的線程。在 Python 中,哪一種多線程的程序表現(xiàn)得更好,I/O 密集型的還是計算 密集型的?

由于GIL的緣故,對所有面向 I/O 的(會調(diào)用內(nèi)建的操作系統(tǒng) C 代碼的)程序來說,GIL 會在這個 I/O 調(diào)用之 前被釋放,以允許其它的線程在這個線程等待 I/O 的時候運(yùn)行。如果某線程并未使用很多 I/O 操作, 它會在自己的時間片內(nèi)一直占用處理器(和 GIL)。也就是說,I/O 密集型的 Python 程序比計算密集 型的程序更能充分利用多線程環(huán)境的好處。

線程。你認(rèn)為,多 CPU 的系統(tǒng)與一般的系統(tǒng)有什么大的不同?多線程的程序在這種系統(tǒng)上的表現(xiàn)會怎么樣?

Python的線程就是C語言的一個pthread,并通過操作系統(tǒng)調(diào)度算法進(jìn)行調(diào)度(例如linux是CFS)。為了讓各個線程能夠平均利用CPU時間,python會計算當(dāng)前已執(zhí)行的微代碼數(shù)量,達(dá)到一定閾值后就強(qiáng)制釋放GIL。而這時也會觸發(fā)一次操作系統(tǒng)的線程調(diào)度(當(dāng)然是否真正進(jìn)行上下文切換由操作系統(tǒng)自主決定)。
偽代碼

while True:
    acquire GIL
    for i in 1000:
        do something
    release GIL
    /* Give Operating System a chance to do thread scheduling */

這種模式在只有一個CPU核心的情況下毫無問題。任何一個線程被喚起時都能成功獲得到GIL(因?yàn)橹挥嗅尫帕薌IL才會引發(fā)線程調(diào)度)。
但當(dāng)CPU有多個核心的時候,問題就來了。從偽代碼可以看到,從release GIL到acquire GIL之間幾乎是沒有間隙的。所以當(dāng)其他在其他核心上的線程被喚醒時,大部分情況下主線程已經(jīng)又再一次獲取到GIL了。這個時候被喚醒執(zhí)行的線程只能白白的浪費(fèi)CPU時間,看著另一個線程拿著GIL歡快的執(zhí)行著。然后達(dá)到切換時間后進(jìn)入待調(diào)度狀態(tài),再被喚醒,再等待,以此往復(fù)惡性循環(huán)。
簡單的總結(jié)下就是:Python的多線程在多核CPU上,只對于IO密集型計算產(chǎn)生正面效果;而當(dāng)有至少有一個CPU密集型線程存在,那么多線程效率會由于GIL而大幅下降。

線程池。修改 生成者消費(fèi)者 的代碼,不再是一個生產(chǎn)者和一個消費(fèi)者,而是可以有任意個 消費(fèi)者線程(一個線程池),每個線程可以在任意時刻處理或消耗任意多個產(chǎn)品。
參考文章

進(jìn)程與線程的一個簡單解釋 http://www.ruanyifeng.com/blog/2013/04/processes_and_threads.html

Python的GIL是什么鬼,多線程性能究竟如何 http://cenalulu.github.io/python/gil-in-python/

Python的全局鎖問題 http://python3-cookbook.readthedocs.io/zh_CN/latest/c12/p09_dealing_with_gil_stop_worring_about_it.html

Python線程指南 http://www.cnblogs.com/huxi/archive/2010/06/26/1765808.html

>歡迎關(guān)注 >請我喝芬達(dá)

文章版權(quán)歸作者所有,未經(jīng)允許請勿轉(zhuǎn)載,若此文章存在違規(guī)行為,您可以聯(lián)系管理員刪除。

轉(zhuǎn)載請注明本文地址:http://specialneedsforspecialkids.com/yun/38518.html

相關(guān)文章

  • Python Process/Thread 概念整理

    摘要:每個在同一時間只能執(zhí)行一個線程在單核下的多線程其實(shí)都只是并發(fā),不是并行,并發(fā)和并行從宏觀上來講都是同時處理多路請求的概念。在多線程下,每個線程的執(zhí)行方式獲取執(zhí)行代碼直到或者是虛擬機(jī)將其掛起。拿不到通行證的線程,就不允許進(jìn)入執(zhí)行。 進(jìn)程與線程 并發(fā)與并行 進(jìn)程與線程   首先要理解的是,我們的軟件都是運(yùn)行在操作系統(tǒng)之上,操作系統(tǒng)再控制硬件,比如 處理器、內(nèi)存、IO設(shè)備等。操作系統(tǒng)為了向上...

    Youngs 評論0 收藏0
  • GIL 已經(jīng)被殺死了么?

    摘要:酷睿代在年取代了奔騰,主頻遠(yuǎn)低于此。該詞被敏捷開發(fā)團(tuán)隊使用較多,含義與形式會略有不同,更改已經(jīng)開始將垃圾收集器的狀態(tài)轉(zhuǎn)到解釋器,因此每個子解釋器將擁有它自己的本該如此。結(jié)論死亡了嗎對于單線程的應(yīng)用程序,仍然存活。showImg(https://user-gold-cdn.xitu.io/2019/5/19/16ad09f554fdf443); 本文原創(chuàng)并首發(fā)于公眾號【Python貓】,未經(jīng)授...

    番茄西紅柿 評論0 收藏0
  • GIL 已經(jīng)被殺死了么?

    摘要:酷睿代在年取代了奔騰,主頻遠(yuǎn)低于此。該詞被敏捷開發(fā)團(tuán)隊使用較多,含義與形式會略有不同,更改已經(jīng)開始將垃圾收集器的狀態(tài)轉(zhuǎn)到解釋器,因此每個子解釋器將擁有它自己的本該如此。結(jié)論死亡了嗎對于單線程的應(yīng)用程序,仍然存活。showImg(https://user-gold-cdn.xitu.io/2019/5/19/16ad09f554fdf443); 本文原創(chuàng)并首發(fā)于公眾號【Python貓】,未經(jīng)授...

    pkwenda 評論0 收藏0
  • GIL 已經(jīng)被殺死了么?

    摘要:酷睿代在年取代了奔騰,主頻遠(yuǎn)低于此。該詞被敏捷開發(fā)團(tuán)隊使用較多,含義與形式會略有不同,更改已經(jīng)開始將垃圾收集器的狀態(tài)轉(zhuǎn)到解釋器,因此每個子解釋器將擁有它自己的本該如此。結(jié)論死亡了嗎對于單線程的應(yīng)用程序,仍然存活。 showImg(https://segmentfault.com/img/remote/1460000019229774); 本文原創(chuàng)并首發(fā)于公眾號【Python貓】,未經(jīng)授...

    xietao3 評論0 收藏0
  • 7-并發(fā)編程

    摘要:進(jìn)程線程切換都需要使用一定的時間。子進(jìn)程在中,如果要運(yùn)行系統(tǒng)命令,會使用來運(yùn)行,官方建議使用方法來運(yùn)行系統(tǒng)命令,更高級的用法是直接使用其接口。 多線程 簡單示例 對于CPU計算密集型的任務(wù),python的多線程跟單線程沒什么區(qū)別,甚至有可能會更慢,但是對于IO密集型的任務(wù),比如http請求這類任務(wù),python的多線程還是有用處。在日常的使用中,經(jīng)常會結(jié)合多線程和隊列一起使用,比如,以...

    Cheriselalala 評論0 收藏0

發(fā)表評論

0條評論

最新活動
閱讀需要支付1元查看
<