摘要:里提供了多個用于控制多線程同步的同步原語,這些原語,包含在的標準庫當中。例如總結多線程同步,說難也難,說不難也很容易,關鍵是要看你的業務場景和解決問題的思路,盡量降低多線程之間的依賴,理清楚業務流程,選擇合適的方法,則事盡成。
概述
多線程給我們帶來的好處是可以并發的執行多個任務,特別是對于I/O密集型的業務,使用多線程,可以帶來成倍的性能增長。
可是當我們多個線程需要修改同一個數據,在不做任何同步控制的情況下,產生的結果往往是不可預料的,比如兩個線程,一個輸出hello,一個輸出world,實際運行的結果,往往可能是一個是hello world,一個是world hello。
python里提供了多個用于控制多線程同步的同步原語,這些原語,包含在python的標準庫threading.py當中。我今天簡單的介紹一下python里的這些控制多線程同步的原語,包括:Locks、RLocks、Semaphores、Events、Conditions和Barriers,你也可以繼承這些類,實現自己的同步控制原語。
Lock(鎖)Locks是python里最簡單的同步原語,只包括兩個狀態:locked和unlocked,剛創建時狀態是unlocked。Locks有兩個方法,acquire和release。acquire方法加鎖,release方法釋放鎖,如果acquire枷鎖失敗,則阻塞,表明其他線程已經加鎖。release方法只有當狀態是locked調用方法True,如果是unlocked狀態,調用release方法會拋出RunTimeError異常。例如代碼:
from threading import Lock, Thread lock = Lock() g = 0 def add_one(): """ Just used for demonstration. It’s bad to use the ‘global’ statement in general. """ global g lock.acquire() g += 1 lock.release() def add_two(): global g lock.acquire() g += 2 lock.release() threads = [] for func in [add_one, add_two]: threads.append(Thread(target=func)) threads[-1].start() for thread in threads: """ Waits for threads to complete before moving on with the main script. """ thread.join() print(g)
最終輸出的結果是3,通過Lock的使用,雖然在兩個線程中修改了同一個全局變量,但兩個線程是順序計算出結果的。
RLock(循環鎖)上面的Lock對象雖然能達到同步的效果,但是無法得知當前是那個線程獲取到了鎖。如果鎖沒被釋放,則其他獲取這個鎖的線程都會被阻塞住。如果不想阻塞,可以使用RLock,例如:
# 使用Lock import threading num = 0 lock = Threading.Lock() lock.acquire() num += 1 lock.acquire() # 這個地方阻塞 num += 2 lock.release() # 使用RLock lock = Threading.RLock() lock.acquire() num += 3 lock.acquire() # 這不會阻塞 num += 4 lock.release() lock.release() # 這個地方注意是釋放兩次鎖Semaphores
Semaphores是個最簡單的計數器,有兩個方法acquire()和release(),如果有多個線程調用acquire()方法,acquire()方法會阻塞住,每當調用次acquire方法,就做一次減1操作,每當release()方法調用此次,就加1,如果最后的計數數值大于調用acquire()方法的線程數目,release()方法會拋出ValueError異常。下面是個生產者消費者的示例。
import random, time from threading import BoundedSemaphore, Thread max_items = 5 container = BoundedSemaphore(max_items) def producer(nloops): for i in range(nloops): time.sleep(random.randrange(2, 5)) print(time.ctime(), end=": ") try: container.release() print("Produced an item.") except ValueError: print("Full, skipping.") def consumer(nloops): for i in range(nloops): time.sleep(random.randrange(2, 5)) print(time.ctime(), end=": ") if container.acquire(False): print("Consumed an item.") else: print("Empty, skipping.") threads = [] nloops = random.randrange(3, 6) print("Starting with %s items." % max_items) threads.append(Thread(target=producer, args=(nloops,))) threads.append(Thread(target=consumer, args=(random.randrange(nloops, nloops+max_items+2),))) for thread in threads: # Starts all the threads. thread.start() for thread in threads: # Waits for threads to complete before moving on with the main script. thread.join() print("All done.")
threading模塊還提供了一個Semaphore對象,它允許你可以任意次的調用release函數,但是最好還是使用BoundedSemaphore對象,這樣在release調用次數過多時會報錯,有益于查找錯誤。Semaphores最長用來限制資源的使用,比如最多十個進程。
Eventsevent可以充當多進程之間的通信工具,基于一個內部的標志,線程可以調用set()和clear()方法來操作這個標志,其他線程則阻塞在wait()函數,直到標志被設置為True。下面的代碼展示了如何利用Events來追蹤行為。
import random, time from threading import Event, Thread event = Event() def waiter(event, nloops): for i in range(nloops): print(“%s. Waiting for the flag to be set.” % (i+1)) event.wait() # Blocks until the flag becomes true. print(“Wait complete at:”, time.ctime()) event.clear() # Resets the flag. print() def setter(event, nloops): for i in range(nloops): time.sleep(random.randrange(2, 5)) # Sleeps for some time. event.set() threads = [] nloops = random.randrange(3, 6) threads.append(Thread(target=waiter, args=(event, nloops))) threads[-1].start() threads.append(Thread(target=setter, args=(event, nloops))) threads[-1].start() for thread in threads: thread.join() print(“All done.”)Conditions
conditions是比events更加高級一點的同步原語,可以用戶多線程間的通信和通知。比如A線程通知B線程資源已經可以被消費。其他的線程必須在調用wait()方法前調用acquire()方法。同樣的,每個線程在資源使用完以后,要調用release()方法,這樣其他線程就可以繼續執行了。下面是使用conditions實現的一個生產者消費者的例子。
import random, time from threading import Condition, Thread condition = Condition() box = [] def producer(box, nitems): for i in range(nitems): time.sleep(random.randrange(2, 5)) # Sleeps for some time. condition.acquire() num = random.randint(1, 10) box.append(num) # Puts an item into box for consumption. condition.notify() # Notifies the consumer about the availability. print("Produced:", num) condition.release() def consumer(box, nitems): for i in range(nitems): condition.acquire() condition.wait() # Blocks until an item is available for consumption. print("%s: Acquired: %s" % (time.ctime(), box.pop())) condition.release() threads = [] nloops = random.randrange(3, 6) for func in [producer, consumer]: threads.append(Thread(target=func, args=(box, nloops))) threads[-1].start() # Starts the thread. for thread in threads: thread.join() print("All done.")
conditions還有其他很多用戶,比如實現一個數據流API,當數據準備好了可以通知其他線程去處理數據。
Barriersbarriers是個簡單的同步原語,可以用戶多個線程之間的相互等待。每個線程都調用wait()方法,然后阻塞,直到所有線程調用了wait(),然后所有線程同時開始運行。例如:
from random import randrange from threading import Barrier, Thread from time import ctime, sleep num = 4 b = Barrier(num) names = [“Harsh”, “Lokesh”, “George”, “Iqbal”] def player(): name = names.pop() sleep(randrange(2, 5)) print(“%s reached the barrier at: %s” % (name, ctime())) b.wait() threads = [] print(“Race starts now…”) for i in range(num): threads.append(Thread(target=player)) threads[-1].start() for thread in threads: thread.join() print() print(“Race over!”)總結
多線程同步,說難也難,說不難也很容易,關鍵是要看你的業務場景和解決問題的思路,盡量降低多線程之間的依賴,理清楚業務流程,選擇合適的方法,則事盡成。
轉載自我的博客:捕蛇者說
文章版權歸作者所有,未經允許請勿轉載,若此文章存在違規行為,您可以聯系管理員刪除。
轉載請注明本文地址:http://specialneedsforspecialkids.com/yun/40778.html
摘要:具有以下基本同步原語子進程提供了通過創建和管理子進程的。雖然隊列不是線程安全的,但它們被設計為專門用于代碼。表示異步操作的最終結果。 Python的asyncio是使用 async/await 語法編寫并發代碼的標準庫。通過上一節的講解,我們了解了它不斷變化的發展歷史。到了Python最新穩定版 3.7 這個版本,asyncio又做了比較大的調整,把這個庫的API分為了 高層級API和...
摘要:定時檢測器定時拿出一部分重新的用過濾器進行檢測剔除不能用的代理。重載是讓類以統一的方式處理不同類型數據的一種手段。雖然在內存中存儲表數據確實會提供很高的性能,但當守護進程崩潰時,所有的數據都會丟失。第1題: 如何解決驗證碼的問題,用什么模塊,聽過哪些人工打碼平臺? PIL、pytesser、tesseract模塊 平臺的話有:(打碼平臺特殊,不保證時效性) 云打碼 掙碼 斐斐打碼 若快打碼...
摘要:定時檢測器定時拿出一部分重新的用過濾器進行檢測剔除不能用的代理。重載是讓類以統一的方式處理不同類型數據的一種手段。雖然在內存中存儲表數據確實會提供很高的性能,但當守護進程崩潰時,所有的數據都會丟失。第1題: 如何解決驗證碼的問題,用什么模塊,聽過哪些人工打碼平臺? PIL、pytesser、tesseract模塊 平臺的話有:(打碼平臺特殊,不保證時效性) 云打碼 掙碼 斐斐打碼 若快打碼...
摘要:首發于我的博客線程池進程池網絡編程之同步異步阻塞非阻塞后端掘金本文為作者原創,轉載請先與作者聯系。在了解的數據結構時,容器可迭代對象迭代器使用進行并發編程篇二掘金我們今天繼續深入學習。 Python 算法實戰系列之棧 - 后端 - 掘金原文出處: 安生??? 棧(stack)又稱之為堆棧是一個特殊的有序表,其插入和刪除操作都在棧頂進行操作,并且按照先進后出,后進先出的規則進行運作。 如...
閱讀 1805·2021-11-22 09:34
閱讀 3093·2019-08-30 15:55
閱讀 672·2019-08-30 15:53
閱讀 2058·2019-08-30 15:52
閱讀 3005·2019-08-29 18:32
閱讀 1993·2019-08-29 17:15
閱讀 2398·2019-08-29 13:14
閱讀 3561·2019-08-28 18:05