摘要:上一篇文章進程專題進程池下一篇文章進程專題共享數據與同步模塊支持的進程間通信主要有兩種管道和隊列。隊列底層使用管道和鎖,同時運行支持線程講隊列中的數據傳輸到底層管道中,來實習進程間通信。
上一篇文章:Python進程專題4:進程池Pool
下一篇文章:Python進程專題6:共享數據與同步
multiprocessing模塊支持的進程間通信主要有兩種:管道和隊列。一般來說,發送較少的大對象比發送大量的小對象要好。
Queue隊列底層使用管道和鎖,同時運行支持線程講隊列中的數據傳輸到底層管道中,來實習進程間通信。
Queue([maxsize]) 創建共享隊列。使用multiprocessing模塊的Queue實現多進程之間的數據傳遞。Queue本身是一個消息隊列, maxsize是隊列運行的最大項數,如果不指定,則不限制大小。
q.close():關閉隊列,不再向隊列中添加數據,那些已經進入隊列的數據會繼續處理。q被回收時將自動調用此方法。 q.empty():如果調用此方法時,隊列為null返回True,單由于此時其他進程或者線程正在添加或刪除項, 所以結果不可靠,而且有些平臺運行該方法會直接報錯,我的mac系統運行該方法,直接報錯。 q.full():如果調用此方法時,隊列已滿,返回True,同q.empty()方法,結果不可靠。 q.get([block,timeout]):返回q中的一個項,block如果設置為True,如果q隊列為空,該方法會阻塞(就是不往下運行了,處于等待狀態), 直到隊列中有項可用為止,如果同時頁設置了timeout,那么在改時間間隔內,都沒有等到有用的項,就會引發Queue.Empty異常。 如果block設置為false,timeout沒有意義,如果隊列為空,將引發Queue.Empt異常。 q.get_nowait():等同于q.get(False) q.put(item,block,timeout):將item放入隊列,如果此時隊列已滿: 如果block=True,timeout沒有設置,就會阻塞,直到有可用空間為止。 如果block=True,timeout也設置,就會阻塞到timeout,超過這個時間會報Queue.Full異常。 如果block=False,timeout設置無效,直接報Queue.Full異常。 q.put_nowait(item):等同于q.put(item,False) q.qsize():返回當前隊列項的數量,結果不可靠,而且mac會直接報錯:NotImplementedError。
實例:
#驗證:put方法會阻塞 from multiprocessing import Queue queue=Queue(3)#初始化一個Queue隊列,可以接受3個消息 queue.put("我是第1條信息") queue.put("我是第2條信息") queue.put("我是第3條信息") print("插入第4條信息之前") queue.put("我是第4條信息") print("插入第4條信息之后")
效果:程序會一直阻塞,最后一句輸永遠也不會輸出。
代碼:
#closse方法、get方法、put方法簡單使用:多進程訪問同一個Queue from multiprocessing import Queue,Process import time,os #參數q就是Queue實例 def mark(q,interval): time.sleep(interval) # 打印信息 print("進程%d取出數據:"%os.getpid()+queue.get(True)) if __name__=="__main__": queue = Queue(3) # 初始化一個Queue隊列,可以接受3個消息 queue.put("我是第1條信息") queue.put("我是第2條信息") queue.put("我是第3條信息") p1=Process(target=mark,args=(queue,1)) p2=Process(target=mark,args=(queue,2)) p3=Process(target=mark,args=(queue,3)) p1.start() p2.start() p3.start() # 關閉隊列,不再插入信息 queue.close() # 下面插入會導致異常 # queue.put("我是第4條信息") # 打印第1條信息 print("程序語句執行完成")
效果
JoinableQueue隊列創建可連接的共享進程隊列,可以看做還是一個Queue,只不過這個Queue除了Queue特有功能外,允許項的消費者通知項的生產者,項已經處理成功。該通知進程時使用共享的信號和條件變量來實現的。
JoinableQueue實例除了與Queue對象相同的方法外,還具有下列方法:
q.task_done():消費者使用此方法發送信號,表示q.get()返回的項已經被處理。 注意??:如果調用此方法的次數大于隊列中刪除的項的數量,將引發ValueError異常。 q.join():生產者使用此方法進行阻塞,直到隊列中所有的項都被處理完成,即阻塞將持續到隊列中的每一項均調用q.task_done()方法為止。
代碼實例:
#利用JoinableQueue實現生產者與消費者,并且加入了哨兵,來監聽生產者的要求 from multiprocessing import JoinableQueue,Process import time #參數q為JoinableQueue隊列實例 def mark(q): #循環接受信息,一直運行,這也下面為什么要將它設為后臺進程的原因,必須保證當主線程退出時,它可以退出 while True: value = q.get() print(value) # 實際開發過程中,此處一般用來進行有用的處理 # 消費者發送信號:任務完成(此處實例的任務就是打印一下下) q.task_done() #我來方便看出效果,特意停留1s time.sleep(1) #使用哨兵,監聽生產者的消息,此處通過判斷value是否為None來判斷傳遞的消息 if value==None: #執行哨兵計劃后,后面的語句都不會輸出 break if __name__=="__main__": #實例化JoinableQueue q=JoinableQueue() #定義消費者進程 p=Process(target=mark,args=(q,)) #將消費者線程設置為后臺進程,隨創建它的進程(此處是主進程)的終止而終止 #也就是當它的創建進程(此處是主現場)意外退出時,它也會跟隨一起退出。 #并且后臺進程無法創建新的進程 p.daemon=True #啟動消費者進程 p.start() #模擬生產者,生產多個項 for xx in range(5): print(xx) #當xx==3時去執行哨兵計劃 if xx==3: print("我用哨兵計劃了") q.put(None) print("哨兵計完美執行") q.put("第%d條消息"%xx) #等待所有項都處理完成再退出,由于使用了哨兵計劃,隊列沒有完全執行,所以會一直卡在這個位置 q.join() print("程序真正退出了")
效果:
管道除了使用隊列來進行進程間通信,還可以使用管道來進行消息傳遞。
(connection1,connection2)=Pipe([duplex]) 在進程間創建一條管道,并返回元祖(connection1,connection2),其中connection1、connection2表示兩端的Connection對象。 默認情況下,duplex=True,此時管道是雙向的,如果設置duplex=false,connection1只能用于接收,connection2只能用于發送。 注意:必須在多進程創建之前創建管道。
connection.close() :關閉連接,當connection被垃圾回收時,默認會調用該方法。 connection.fileno() :返回連接使用的整數文件描述符 connection.poll([timeout]):如果連接上的數據可用,返回True,timeout為等待的最長時間,如果不指定,該方法將立刻返回結果。 如果指定為None,該方法將會無限等待直到數據到達。 connection.send(obj):通過連接發送對象,obj是與序列號兼容的任意對象。 connection.send_bytes(buffer[,offset,size]):通過連接發送字節數據緩沖區,buffer是支持緩沖區的任何對象。 offset是緩沖區的字節偏移量,而size是要發送的字節數。 connection.recv():接收connection.send()方法返回的對象。如果連接的另一端已經關閉,再也不會存在任何數據, 該方法將引起EOFError異常。 connection.recv_bytes([maxlength]):接收connection.send_bytes()方法發送的一條完整字節信息,maxlength為可以接受的 最大字節數。如果消息超過這個最大數,將引發IOError異常,并且在連接上無法進一步讀取。如果連接的另一端已經關閉, 再也不會有任何數據,該方法將引發EOFError異常。 connection.recv_bytes_into(buffer[,offset]):接收一條完整的字節信息,兵把它保存在buffer對象中, 該對象支持可寫入的緩沖區接口(就是bytearray對象或類似對象)。 offset指定緩沖區放置消息的字節偏移量。返回值是收到的字節數。如果消息長度大于可用的緩沖區空間,將引發BufferTooShort異常。
示意圖:
代碼:
#理解管道的生產者與消費者 from multiprocessing import Pipe, Process import time def mark(pipe): #接受參數 output_p, input_p = pipe print("mark方法內部調用input_p.close()") #消費者(子進程)此實例只接收,所以把輸入關閉 input_p.close() while True: try: item = output_p.recv() except EOFError: print("報錯了") break print(item) time.sleep(1) print("mark執行完成") if __name__ == "__main__": #必須在多進程創建之前,創建管道,該管道是雙向的 (output_p, input_p) = Pipe()#創建管道 #創建一個進程,并把管道兩端都作為參數傳遞過去 p = Process(target=mark, args=((output_p, input_p),)) #啟動進程 p.start() #生產者(主進程)此實例只輸入,所以關閉輸出(接收端) output_p.close() for item in list(range(5)): input_p.send(item) print("主方法內部調用input_p.close()()") #關閉生產者(主進程)的輸入端 input_p.close()
效果圖:
代碼:
#利用管道實現多進程協作:子線程計算結果,返回給主線程 from multiprocessing import Pipe, Process def mark(pipe): #接受參數 server_p, client_p = pipe #消費者(子進程)此實例只接收,所以把輸入關閉 client_p.close() while True: try: x,y = server_p.recv() except EOFError: print("報錯了") break result=x+y server_p.send(result) print("mark執行完成") if __name__ == "__main__": #必須在多進程創建之前,創建管道,該管道是雙向的 (server_p, client_p) = Pipe()#創建管道 #創建一個進程,并把管道兩端都作為參數傳遞過去 p = Process(target=mark, args=((server_p, client_p),)) #啟動進程 p.start() #生產者(主進程)此實例只輸入,所以關閉輸出(接收端) server_p.close() #發送數據 client_p.send((4,5)) #打印接受到的數據 print(client_p.recv()) client_p.send(("Mark", "大帥哥")) # 打印接受到的數據 print(client_p.recv()) #關閉生產者(主進程)的輸入端 client_p.close()
結果:
9 Mark大帥哥 報錯了 mark執行完成
文章版權歸作者所有,未經允許請勿轉載,若此文章存在違規行為,您可以聯系管理員刪除。
轉載請注明本文地址:http://specialneedsforspecialkids.com/yun/42356.html
摘要:可以使用標準的索引切片迭代操作訪問它,其中每項操作均鎖進程同步,對于字節字符串,還具有屬性,可以把整個數組當做一個字符串進行訪問。當所編寫的程序必須一次性操作大量的數組項時,如果同時使用這種數據類型和用于同步的單獨大的鎖,性能將極大提升。 上一篇文章:Python進程專題5:進程間通信下一篇文章:Python進程專題7:托管對象 我們現在知道,進程之間彼此是孤立的,唯一通信的方式是隊...
摘要:類常用屬性布爾值,指示進程是否是后臺進程。當創建它的進程終止時,后臺進程會自動終止。進程的整數退出指令。如果進程仍然在運行,它的值為,如果值為負數,就表示進程由信號所終止。 上一篇文章:Python進程專題1:fork():創建子進程、getpid()、getppid()下一篇文章:Python進程專題3:繼承Process來創建進程 由于fork()無法對Windows使用,而py...
摘要:上一篇文章進程專題繼承來創建進程下一篇文章進程專題進程間通信當我們需要創建大量的進程時,利用模塊提供的來創建進程。關閉進程池,不再接受進的進程請求,但已經接受的進程還是會繼續執行。 上一篇文章:Python進程專題3:繼承Process來創建進程下一篇文章:Python進程專題5:進程間通信 當我們需要創建大量的進程時,利用multiprocessing模塊提供的Pool來創建進程。 ...
摘要:代表網絡地址的元組或者代表域套接字的文件名,或者代表形式的字符串,代表遠程系統本地計算機的為上的一條命名管道。是一個整數,當參數指定了一個網絡連接時,對應于傳遞給套接字的方法的值,默認為。 上一篇文章:Python進程專題7:托管對象下一篇文章:Python進程專題9:關于進程的實用工具函數 使用multiprocessing模塊的程序不僅可以于運行在同一計算機的其它程序進行消息傳遞...
摘要:連接帶遠程管理器對象,該對象的地址在構造函數中支出。在當前進程中運行管理器服務器。啟動一個單的子進程,并在該子進程中啟動管理器服務器。如果無法序列號對象將引發異常。 上一篇文章:Python進程專題6:共享數據與同步下一篇文章:Python進程專題8:分布集群的消息傳遞 進程不支持共享對象,上面描述的創建共享值和數組,但都是指定的特殊類型,對高級的Python對象(如:字典、列表、用...
閱讀 3207·2021-11-19 09:40
閱讀 3005·2021-09-09 09:32
閱讀 792·2021-09-02 09:55
閱讀 1393·2019-08-26 13:23
閱讀 2403·2019-08-26 11:46
閱讀 1229·2019-08-26 10:19
閱讀 2054·2019-08-23 16:53
閱讀 1072·2019-08-23 12:44