摘要:的讀寫效率要高于。進(jìn)程間的基于機(jī)制建立。當(dāng)主進(jìn)程創(chuàng)建子進(jìn)程后,也被拷貝了一份。此后,關(guān)閉主進(jìn)程的一個(gè),關(guān)閉一個(gè)子進(jìn)程的一個(gè)。據(jù)官方文檔也是基于的實(shí)現(xiàn)。
上面寫了Python如何創(chuàng)建多個(gè)進(jìn)程,但是前面文章中創(chuàng)建的進(jìn)程都是啞巴和聾子,自己顧自己執(zhí)行,不會(huì)相互交流。
那么如何讓進(jìn)程間相互說(shuō)說(shuō)話呢?
Python為我們提供了一個(gè)函數(shù)multiprocessing.Pipe
和一個(gè)類:multiprocessing.Queue。
multiprocessing.Pipe()即管道模式,調(diào)用Pipe()返回管道的兩端的Connection。
Python官方文檔的描述: Returns a pair (conn1, conn2) of Connection objects representing the ends of a pipe.
因此, Pipe僅僅適用于只有兩個(gè)進(jìn)程一讀一寫的單雙工情況,也就是說(shuō)信息是只向一個(gè)方向流動(dòng)。例如電視、廣播,看電視的人只能看,電視臺(tái)是能播送電視節(jié)目。
Pipe的讀寫效率要高于Queue。
進(jìn)程間的Pipe基于fork機(jī)制建立。
當(dāng)主進(jìn)程創(chuàng)建Pipe的時(shí)候,Pipe的兩個(gè)Connections連接的的都是主進(jìn)程。
當(dāng)主進(jìn)程創(chuàng)建子進(jìn)程后,Connections也被拷貝了一份。此時(shí)有了4個(gè)Connections。
此后,關(guān)閉主進(jìn)程的一個(gè)Out Connection,關(guān)閉一個(gè)子進(jìn)程的一個(gè)In Connection。那么就建立好了一個(gè)輸入在主進(jìn)程,輸出在子進(jìn)程的管道。
原理示意圖如下:
跟多資料可以閱讀:http://www.tuicool.com/articl...
# 示例代碼 # coding=utf-8 from multiprocessing import Pipe, Process def son_process(x, pipe): _out_pipe, _in_pipe = pipe # 關(guān)閉fork過(guò)來(lái)的輸入端 _in_pipe.close() while True: try: msg = _out_pipe.recv() print msg except EOFError: # 當(dāng)out_pipe接受不到輸出的時(shí)候且輸入被關(guān)閉的時(shí)候,會(huì)拋出EORFError,可以捕獲并且退出子進(jìn)程 break if __name__ == "__main__": out_pipe, in_pipe = Pipe(True) son_p = Process(target=son_process, args=(100, (out_pipe, in_pipe))) son_p.start() # 等pipe被fork 后,關(guān)閉主進(jìn)程的輸出端 # 這樣,創(chuàng)建的Pipe一端連接著主進(jìn)程的輸入,一端連接著子進(jìn)程的輸出口 out_pipe.close() for x in range(1000): in_pipe.send(x) in_pipe.close() son_p.join() print "主進(jìn)程也結(jié)束了"
總結(jié)一下:
上面的代碼中主要用到了pipe的send()、recv()、close()方法。當(dāng)pipe的輸入端被關(guān)閉,且無(wú)法接收到輸入的值,那么就會(huì)拋出EOFError。
新建一個(gè)Pipe(duplex)的時(shí)候,如果duplex為True,那么創(chuàng)建的管道是雙向的;如果duplex為False,那么創(chuàng)建的管道是單向的。
multiprocessing.QueueQueue據(jù)官方文檔也是基于pipe的實(shí)現(xiàn)。
Queue的使用主要是一邊put(),一邊get().但是Queue可以是多個(gè)Process 進(jìn)行put操作,也可以是多個(gè)Process進(jìn)行g(shù)et()操作。
Demo:
# coding=utf-8 from multiprocessing import Queue, Process from Queue import Empty as QueueEmpty import random def getter(name, queue): print "Son process %s" % name while True: try: value = queue.get(True, 10) # block為True,就是如果隊(duì)列中無(wú)數(shù)據(jù)了。 # |—————— 若timeout默認(rèn)是None,那么會(huì)一直等待下去。 # |—————— 若timeout設(shè)置了時(shí)間,那么會(huì)等待timeout秒后才會(huì)拋出Queue.Empty異常 # block 為False,如果隊(duì)列中無(wú)數(shù)據(jù),就拋出Queue.Empty異常 print "Process getter get: %f" % value except QueueEmpty: break def putter(name, queue): print "Son process %s" % name for i in range(0, 1000): value = random.random() queue.put(value) # 放入數(shù)據(jù) put(obj[, block[, timeout]]) # 若block為True,如隊(duì)列是滿的: # |—————— 若timeout是默認(rèn)None,那么就會(huì)一直等下去 # |—————— 若timeout設(shè)置了等待時(shí)間,那么會(huì)等待timeout秒后,如果還是滿的,那么就拋出Queue.Full. # 若block是False,如果隊(duì)列滿了,直接拋出Queue.Full print "Process putter put: %f" % value if __name__ == "__main__": queue = Queue() getter_process = Process(target=getter, args=("Getter", queue)) putter_process = Process(target=putter, args=("Putter", queue)) getter_process.start() putter_process.start()
Queue的一些說(shuō)明已經(jīng)寫在代碼中了。
文章版權(quán)歸作者所有,未經(jīng)允許請(qǐng)勿轉(zhuǎn)載,若此文章存在違規(guī)行為,您可以聯(lián)系管理員刪除。
轉(zhuǎn)載請(qǐng)注明本文地址:http://specialneedsforspecialkids.com/yun/38359.html
小編寫這篇文章的主要目的,主要是給大家介紹關(guān)于python3 queue多線程通信,這里面有很多的技術(shù)性的難點(diǎn),那么,該怎么去進(jìn)行處理呢,下面小編給大家進(jìn)行詳細(xì)的解答一下。 queue分類 python3 queue分三類: 先進(jìn)先出隊(duì)列 后進(jìn)先出的棧 優(yōu)先級(jí)隊(duì)列 他們的導(dǎo)入方式分別是: fromqueueimportQueue fromqueueimportLifoQueue...
摘要:目前開(kāi)發(fā)中有遇到進(jìn)程間需要共享數(shù)據(jù)的情況所以研究了下主要會(huì)以為例子說(shuō)明下進(jìn)程間共享同一個(gè)父進(jìn)程使用說(shuō)明創(chuàng)建一個(gè)對(duì)象創(chuàng)建一個(gè)創(chuàng)建一個(gè)測(cè)試程序創(chuàng)建進(jìn)程池進(jìn)行測(cè)試簡(jiǎn)單的源碼分析這時(shí)我們?cè)倏匆粋€(gè)例子創(chuàng)建一個(gè)對(duì)象創(chuàng)建一個(gè)創(chuàng)建一個(gè)測(cè)試程序創(chuàng)建進(jìn)程池進(jìn)行 目前開(kāi)發(fā)中有遇到進(jìn)程間需要共享數(shù)據(jù)的情況. 所以研究了下multiprocessing.Manager, 主要會(huì)以dict為例子, 說(shuō)明下進(jìn)程間共...
小編寫這篇文章的目的,主要是給大家講解一下,關(guān)于實(shí)現(xiàn)配置熱加載的方法,具體是怎么操作呢?下面就給大家詳細(xì)的解答下。 背景 由于最近有相關(guān)的工作需求,需要進(jìn)行增添相關(guān)的新功能,實(shí)現(xiàn)配置熱加載的功能。所謂的配置熱加載,也就是說(shuō)當(dāng)服務(wù)收到配置更新消息之后,我們不用重啟服務(wù)就可以使用最新的配置去執(zhí)行任務(wù)。 如何實(shí)現(xiàn) 下面我分別采用多進(jìn)程、多線程、協(xié)程的方式去實(shí)現(xiàn)配置熱加載。 使用多進(jìn)程實(shí)現(xiàn)配...
閱讀 3099·2021-08-03 14:05
閱讀 2144·2019-08-29 15:35
閱讀 683·2019-08-29 13:30
閱讀 3172·2019-08-29 13:20
閱讀 2534·2019-08-23 18:15
閱讀 1802·2019-08-23 14:57
閱讀 2220·2019-08-23 13:57
閱讀 1315·2019-08-23 12:10