摘要:文章結構來自七周七并發模型互斥和內存模型創建線程這段代碼創建并啟動了一個實例,首先從開始,函數的余下部分一起并發執行。在鎖定狀態下,某些線程擁有鎖在非鎖定狀態下,沒有線程擁有它。
并發&并行
并發程序含有多個邏輯上的獨立執行塊,他們可以獨立的并行執行,也可以串行執行。
并行程序解決問題的速度比串行程序快的多,因為其可以同時執行整個任務的多個部分。并行程序可能有多個獨立執行塊,也可能只有一個。
引用Rob Pike的經典描述就是:
并發是同一時間應對多件事情的能力;
并行是同一時間動手做多件事情的能力。
常見的并發模型有:
線程與鎖
函數式編程
actor模型和通信順序是進行(Communicating Sequential Processes, CSP)
數據級并行
lambda 架構
分離標識與狀態模型
這篇主要介紹線程與鎖模型
線程與鎖模型線程與鎖模型是對底層硬件運行過程的形式化,非常簡單直接,幾乎所有的編程語言都對其提供了支持,且不對其使用方法加以限制(易出錯)。
這篇文章主要使用python語言來演示線程與鎖模型。文章結構來自《七周七并發模型》互斥和內存模型 創建線程
from threading import Thread def hello_world(): print("Hello from new thread") def main(): my_thread = Thread(target=hello_world) my_thread.start() print("Hello from main thread") my_thread.join() main()
這段代碼創建并啟動了一個Thread實例,首先從start() 開始,my_thread.start() main()函數的余下部分一起并發執行。最后調用join() 來等待my_thread線程結束。
運行這段代碼輸出結果有幾種:
Hello from new thread Hello from main thread
或者
Hello from main thread Hello from new thread
或者
Hello from new threadHello from main thread
究竟哪個結果取決于哪個線程先執行print()。多線程編程很難的原因之一就是運行結果可能依賴于時序,多次運行結果并不穩定。
第一把鎖from threading import Thread, Lock class Counter(object): def __init__(self, count=0): self.count = count def increment(self): self.count += 1 def get_count(self): print("Count: %s" % self.count) return self.count def test_count(): counter = Counter() class CounterThread(Thread): def run(self): for i in range(10000): counter.increment() t1 = CounterThread() t2 = CounterThread() t1.start() t2.start() t1.join() t2.join() counter.get_count() test_count()
這段代碼創建一個簡單的類Counter 和兩個線程,每個線程都調用counter.increment() 10000次。
多次運行這段代碼會得到不同的值,原因是兩個線程在使用 counter.count 時發生了競態條件(代碼行為取決于各操作的時序)。
一個可能的操作是:
線程t1 獲取count的值時,線程t2也同時獲取到了count 的值(假設是100),
這時t1 count + 1, 此時count 為101,回寫count 值,然后t2 執行了相同的操作 count+1,因為t2 取到的值也是100 此時 count 仍是101,回寫后count 依然是101,但是 +1 操作執行了兩次。
競態條件的解決方法是對 count 進行同步(synchronize)訪問。一種操作是使用 內置鎖(也稱互斥鎖(mutex)、管程(monitor)或臨界區(critical section))來同步對increment() 的調用。
線程同步能夠保證多個線程安全訪問競爭資源,最簡單的同步機制是引入互斥鎖。互斥鎖為資源引入一個狀態:鎖定/非鎖定。某個線程要更改共享數據時,先將其鎖定,此時資源的狀態為“鎖定”,其他線程不能更改;
直到該線程釋放資源,將資源的狀態變成“非鎖定”,其他的線程才能再次鎖定該資源。互斥鎖保證了每次只有一個線程進行寫入操作,從而保證了多線程情況下數據的正確性。當一個線程調用鎖的acquire()方法獲得鎖時,鎖就進入“locked”狀態。每次只有一個線程可以獲得鎖。如果此時另一個線程試圖獲得這個鎖,該線程就會變為“blocked”狀態,稱為“同步阻塞”。
直到擁有鎖的線程調用鎖的release()方法釋放鎖之后,鎖進入“unlocked”狀態。線程調度程序從處于同步阻塞狀態的線程中選擇一個來獲得鎖,并使得該線程進入運行(running)狀態。
python 鎖的使用流程如下:
#創建鎖 mutex = threading.Lock() #鎖定 mutex.acquire([timeout]) #釋放 mutex.release()
推薦使用上下文管理器來操作鎖,
with lock: do someting # 相當于 lock.acquire() try: # do something... finally: lock.release()
acquire(blocking=True, timeout=-1)
可以阻塞或非阻塞地獲得鎖。
當調用時參數 blocking 設置為 True (缺省值),阻塞直到鎖被釋放,然后將鎖鎖定并返回 True 。
在參數 blocking 被設置為 False 的情況下調用,將不會發生阻塞。如果調用時 blocking 設為 True 會阻塞,并立即返回 False ;否則,將鎖鎖定并返回 True。
當浮點型 timeout 參數被設置為正值調用時,只要無法獲得鎖,將最多阻塞 timeout 設定的秒數。timeout 參數被設置為 -1 時將無限等待。當 blocking 為 false 時,timeout 指定的值將被忽略。
如果成功獲得鎖,則返回 True,否則返回 False (例如發生 超時 的時候)。
timeout 參數需要 python3.2+
from threading import Thread, Lock mutex = Lock() class SynchronizeCounter(object): def __init__(self, count=0): self.count = count def increment(self): # if mutex.acquire(1): # 獲取鎖 # self.count += 1 # mutex.release() # 釋放鎖 # 等同于上述代碼 with mutex: self.count += 1 def get_count(self): print("Count: %s" % self.count) return self.count def test_synchronize_count(): counter = SynchronizeCounter() class CounterThread(Thread): def run(self): for i in range(100000): counter.increment() t1 = CounterThread() t2 = CounterThread() t1.start() t2.start() t1.join() t2.join() counter.get_count() if __name__ == "__main__": for i in range(100): test_synchronize_count()
這段代碼還有一個隱藏的bug,那就是 get_count(),這里get_count() 是在join()之后調用的,因此是線程安全的,但是如果在其它地方調用了 get_count() 函數。詭異的內存
由于在 get_count() 中沒有進行線程同步,調用時可能會獲取到一個失效的值。
對于JAVA等競態編譯語言,
編譯器的靜態優化可能會打亂代碼的執行順序
JVM 的動態優化也會打亂代碼的執行順序
硬件可以通過亂序執行來優化性能
更糟糕的是,有時一個線程產生的修改可能會對另一個線程不可見。
從直覺上來說,編譯器、JVM、硬件都不應插手修改原本的代碼邏輯。但是近幾年的運行效率提升,尤其是共享內存交媾的運行效率提升,都仰仗于此類代碼優化。多把鎖
具體的副作用,Java 內存模型有明確說明。
Java 內存模型定義了何時一個線程對內存的修改對另一個線程可見。基本原則是:如果讀線程和寫線程不進行同步,就不能保證可見性。
一個重點: 兩個線程都需要進行同步。只在其中一個線程進行同步是不夠的。
可如果所有的方法都同步,大多數線程可能都會被阻塞,失去了并發的意義,并且可能會出現死鎖。
哲學家進餐問題
哲學家就餐問題:假設有五位哲學家圍坐在一張圓形餐桌旁,做以下兩件事情之一:吃飯,或者思考。吃東西的時候,他們就停止思考,思考的時候也停止吃東西。餐桌中間有一大碗意大利面,每兩個哲學家之間有一只餐叉。因為用一只餐叉很難吃到意大利面,所以假設哲學家必須用兩只餐叉吃東西。他們只能使用自己左右手邊的那兩只餐叉。哲學家從來不交談,這就很危險,可能產生死鎖,每個哲學家都拿著左手的餐叉,永遠都在等右邊的餐叉(或者相反)。
即使沒有死鎖,也有可能發生資源耗盡。例如,假設規定當哲學家等待另一只餐叉超過五分鐘后就放下自己手里的那一只餐叉,并且再等五分鐘后進行下一次嘗試。這個策略消除了死鎖(系統總會進入到下一個狀態),但仍然有可能發生“活鎖”。如果五位哲學家在完全相同的時刻進入餐廳,并同時拿起左邊的餐叉,那么這些哲學家就會等待五分鐘,同時放下手中的餐叉,再等五分鐘,又同時拿起這些餐叉。
下面是哲學家進餐問題的一個實現:
import threading import random import time class Philosopher(threading.Thread): running = True def __init__(self, xname, forkOnLeft, forkOnRight): threading.Thread.__init__(self) self.name = xname self.forkOnLeft = forkOnLeft self.forkOnRight = forkOnRight def run(self): while self.running: # Philosopher is thinking (but really is sleeping). time.sleep(random.uniform(1, 3)) print("%s is hungry." % self.name) self.dine() def dine(self): fork1, fork2 = self.forkOnLeft, self.forkOnRight while self.running: fork1.acquire(True) # 阻塞式獲取left 鎖 # locked = fork2.acquire(True) # 阻塞式 獲取right 鎖 容易產生死鎖 locked = fork2.acquire(False) # 非阻塞式 獲取right 鎖 if locked: break # 如果被鎖定,釋放 left 退出等待 fork1.release() print("%s swaps forks" % self.name) fork1, fork2 = fork2, fork1 else: return self.dining() fork2.release() fork1.release() def dining(self): print("%s starts eating " % self.name) time.sleep(random.uniform(1, 5)) print("%s finishes eating and leaves to think." % self.name) def DiningPhilosophers(): forks = [threading.Lock() for n in range(5)] philosopherNames = ("Aristotle", "Kant", "Buddha", "Marx", "Russel") philosophers = [ Philosopher(philosopherNames[i], forks[i % 5], forks[(i + 1) % 5]) for i in range(5) ] Philosopher.running = True for p in philosophers: p.start() for p in philosophers: p.join() time.sleep(100) Philosopher.running = False print("Now we"re finishing.") DiningPhilosophers()外星方法的危害
規模較大的程序常用監聽器模式來解耦模塊,這里我們構造一個類從一個URL進行下載,Listeners 監聽下載進度。
import requests import threading class Listeners(object): def __init__(self, count=0): self.count = count self.done_count = 0.0 self.listeners = [] def append(self, listener): self.listeners.append(listener) def remove(self, listener): self.listeners.remove(listener) def on_progress(self, n): # 一些我們不知道的實現 # do someting # self.done_count += 1 # print("Process: %f" % (self.done_count / self.count)) pass listeners = Listeners(5) class Downloader(threading.Thread): def __init__( self, group=None, target=None, name=None, args=(), kwargs=None, daemon=None ): threading.Thread.__init__( self, group=group, target=target, name=name, daemon=daemon ) self.url = kwargs.get("url") def download(self): resp = requests.get(self.url) def add_listener(self, listener): listeners.append(listener) def remove_listener(self, listener): listeners.delete(listener) def update_progress(self, n): for listener in listeners: listner.on_progress(n) def run(self): self.download() print(self.url) listeners.on_progress(1) def test(): urls = [ "https://www.baidu.com", "https://www.google.com", "https://www.bing.com", "https://www.zaih.com", "https://www.github.com", ] ts = [Downloader(kwargs=dict(url=url)) for url in urls] print(ts) [t.start() for t in ts] [t.join() for t in ts] if __name__ == "__main__": test()
這段代碼中,add_listener, remove_listener 和 update_progress 都是同步方法,但 update_progress 調用了一個我們不知道如何實現的方法。如果這個方法中,獲取了一把鎖,程序在執行的過程中就可能發生死鎖。所以,我們要盡量避免使用這種方法。還有一種方法是在遍歷之前對 listeners 進行保護性復制,再針對這份副本進行遍歷。(現在調用外星方法不再需要加鎖)
超越內置鎖 可重入鎖Lock() 雖然方便,但限制很多:
一個線程因為等待內置鎖而進入阻塞之后,就無法中斷該線程
Lock() 不知道當前擁有鎖的線程是否是當前線程,如果當前線程獲取了鎖,再次獲取也會阻塞。
重入鎖是(threading.RLock)一個可以被同一個線程多次獲取的同步基元組件。在內部,它在基元鎖的鎖定/非鎖定狀態上附加了 "所屬線程" 和 "遞歸等級" 的概念。在鎖定狀態下,某些線程擁有鎖 ; 在非鎖定狀態下, 沒有線程擁有它。
若要鎖定鎖,線程調用其 acquire() 方法;一旦線程擁有了鎖,方法將返回。若要解鎖,線程調用 release() 方法。 acquire()/release() 對可以嵌套;只有最終 release() (最外面一對的 release() ) 將鎖解開,才能讓其他線程繼續處理 acquire() 阻塞。
threading.RLock 提供了顯式的 acquire() 和 release() 方法
一個好的實踐是:
lock = threading.RLock()
Lock 和 RLock 的使用區別如下:
#rlock_tut.py import threading num = 0 lock = Threading.Lock() lock.acquire() num += 1 lock.acquire() # 這里會被阻塞 num += 2 lock.release() # With RLock, that problem doesn’t happen. lock = Threading.RLock() lock.acquire() num += 3 lock.acquire() # 不會被阻塞. num += 4 lock.release() lock.release() # 兩個鎖都需要調用 release() 來釋放.超時
使用內置鎖時,阻塞的線程無法被中斷,程序不能從死鎖恢復,可以給鎖設置超時時間來解決這個問題。
timeout 參數需要 python3.2+
import time from threading import Thread, Lock lock1 = RLock() lock2 = RLock() # 這個程序會一直死鎖下去,如果想突破這個限制,可以在獲取鎖的時候加上超時時間 # > python threading 沒有實現 銷毀(destroy),停止(stop),暫停(suspend),繼續(resume),中斷(interrupt)等 class T1(Thread): def run(self): print("start run T1") lock1.acquire() # lock1.acquire(timeout=2) # 設置超時時間可避免死鎖 time.sleep(1) lock2.acquire() # lock2.acquire(timeout=2) # 設置超時時間可避免死鎖 lock1.release() lock2.release() class T2(Thread): def run(self): print("start run T2") lock2.acquire() # lock2.acquire(timeout=2) # 設置超時時間可避免死鎖 time.sleep(1) lock1.acquire() # lock1.acquire(timeout=2) # 設置超時時間可避免死鎖 lock2.release() lock1.release() def test(): t1, t2 = T1(), T2() t1.start() t2.start() t1.join() t2.join() if __name__ == "__main__": test()交替鎖
如果我們要在鏈表中插入一個節點。一種做法是用鎖保護整個鏈表,但鏈表加鎖時其它使用者無法訪問。交替鎖可以只所追殺鏈表的一部分,允許不涉及被鎖部分的其它線程自由訪問。
from random import randint from threading import Thread, Lock class Node(object): def __init__(self, value, prev=None, next=None): self.value = value self.prev = prev self.next = next self.lock = Lock() class SortedList(Thread): def __init__(self, head): Thread.__init__(self) self.head = head def insert(self, value): head = self.head node = Node(value) print("insert: %d" % value) while True: if head.value <= value: if head.next != None: head = head.next else: head.lock.acquire() head.next = node node.prev = head head.lock.release() break else: prev = head.prev prev.lock.acquire() head.lock.acquire() if prev != None: prev.next = node else: self.head = node node.prev = prev prev.lock.release() node.next = head head.prev = node head.lock.release() break def run(self): for i in range(5): self.insert(randint(10, 20)) def test(): head = Node(10) t1 = SortedList(head) t2 = SortedList(head) t1.start() t2.start() t1.join() t2.join() while head: print(head.value) head = head.next if __name__ == "__main__": test()
這種方案不僅可以讓多個線程并發的進行鏈表插入操作,還能讓其他的鏈表操作安全的并發。
條件變量并發編程經常需要等待某個事件發生。比如從隊列刪除元素前需要等待隊列非空、向緩存添加數據前需要等待緩存有足夠的空間。條件變量就是為這種情況設計的。
條件變量總是與某種類型的鎖對象相關聯,鎖對象可以通過傳入獲得,或者在缺省的情況下自動創建。當多個條件變量需要共享同一個鎖時,傳入一個鎖很有用。鎖是條件對象的一部分,不必多帶帶地跟蹤它。
條件變量服從上下文管理協議:使用 with 語句會在它包圍的代碼塊內獲取關聯的鎖。 acquire() 和 release() 方法也能調用關聯鎖的相關方法。
其它方法必須在持有關聯的鎖的情況下調用。 wait() 方法釋放鎖,然后阻塞直到其它線程調用 notify() 方法或 notify_all() 方法喚醒它。一旦被喚醒, wait() 方法重新獲取鎖并返回。它也可以指定超時時間。
#condition_tut.py import random, time from threading import Condition, Thread """ "condition" variable will be used to represent the availability of a produced item. """ condition = Condition() box = [] def producer(box, nitems): for i in range(nitems): time.sleep(random.randrange(2, 5)) # Sleeps for some time. condition.acquire() num = random.randint(1, 10) box.append(num) # Puts an item into box for consumption. condition.notify() # Notifies the consumer about the availability. print("Produced:", num) condition.release() def consumer(box, nitems): for i in range(nitems): condition.acquire() condition.wait() # Blocks until an item is available for consumption. print("%s: Acquired: %s" % (time.ctime(), box.pop())) condition.release() threads = [] """ "nloops" is the number of times an item will be produced and consumed. """ nloops = random.randrange(3, 6) for func in [producer, consumer]: threads.append(Thread(target=func, args=(box, nloops))) threads[-1].start() # Starts the thread. for thread in threads: """Waits for the threads to complete before moving on with the main script. """ thread.join() print("All done.")原子變量
與鎖相比使用原子變量的優點:
不會忘記在正確的時候獲取鎖
由于沒有鎖的參與,對原子變量的操作不會引發死鎖。
原子變量時無鎖(lock-free)非阻塞(non-blocking)算法的基礎,這種算法可以不用鎖和阻塞來達到同步的目的。
python 不支持原子變量
總結 優點線程與鎖模型最大的優點是適用面廣,更接近于“本質”--近似于對硬件工作方式的形式化--正確使用時效率高。
此外,線程與鎖模型也可輕松的集成到大多數編程語言。
線程與鎖模型沒有為并行提供直接的支持
線程與鎖模型只支持共享內存模型,如果要支持分布式內存模型,就需要尋求其他技術的幫助。
用線程與鎖模型編寫的代碼難以測試(比如死鎖問題可能很久才會出現),出了問題后很難找到問題在哪,并且bug難以復現
代碼難以維護(要保證所有對象的同步都是正確的、必須按 順序來獲取多把鎖、持有鎖時不調用外星方法。還要保證維護代碼的開發者都遵守這個規則
參考鏈接Let’s Synchronize Threads in Python
哲學家進餐問題
References[1] 哲學家進餐問題: https://zh.wikipedia.org/wiki...
[2] Let’s Synchronize Threads in Python: https://hackernoon.com/synchr...
文章版權歸作者所有,未經允許請勿轉載,若此文章存在違規行為,您可以聯系管理員刪除。
轉載請注明本文地址:http://specialneedsforspecialkids.com/yun/43847.html
摘要:本篇博客主要針對虛擬機的晚期編譯優化,內存模型與線程,線程安全與鎖優化進行總結,其余部分總結請點擊虛擬總結上篇,虛擬機總結中篇。 本篇博客主要針對Java虛擬機的晚期編譯優化,Java內存模型與線程,線程安全與鎖優化進行總結,其余部分總結請點擊Java虛擬總結上篇 ,Java虛擬機總結中篇。 一.晚期運行期優化 即時編譯器JIT 即時編譯器JIT的作用就是熱點代碼轉換為平臺相關的機器碼...
摘要:性能較好是因為避免了線程進入內核的阻塞狀態請求總數同時并發執行的線程數我們首先使用聲明一個所得實例,然后使用進行加鎖和解鎖操作。 ReentrantLock與鎖 Synchronized和ReentrantLock異同 可重入性:兩者都具有可重入性 鎖的實現:Synchronized是依賴jvm實現的,ReentrantLock是jdk實現的。(我們可以理解為一個是操作系統層面的實現...
小編寫這篇文章的一個主要目的,主要是來給大家介紹關于python的一些事情,python的使用場景是比較的多的,主要涉及到其中的一些方方面面,那么,它的并發場景使用方法是什么呢?下面就給大家詳細解答下。 前言 如果你學過操作系統,那么對于鎖應該不陌生。鎖的含義是線程鎖,可以用來指定某一個邏輯或者是資源同一時刻只能有一個線程訪問。這個很好理解,就好像是有一個房間被一把鎖鎖住了,只有拿到鑰匙的...
閱讀 878·2021-10-13 09:39
閱讀 3531·2021-09-26 10:16
閱讀 2861·2019-08-30 15:54
閱讀 1037·2019-08-30 14:22
閱讀 2886·2019-08-29 15:39
閱讀 3253·2019-08-27 10:52
閱讀 809·2019-08-26 13:59
閱讀 1703·2019-08-26 12:20