国产xxxx99真实实拍_久久不雅视频_高清韩国a级特黄毛片_嗯老师别我我受不了了小说

資訊專欄INFORMATION COLUMN

Python 的并發編程

happen / 2655人閱讀

摘要:本文最先發布在博客這篇文章將講解并發編程的基本操作。并發是指能夠多任務處理,并行則是是能夠同時多任務處理。雖然自帶了很好的類庫支持多線程進程編程,但眾所周知,因為的存在,很難做好真正的并行。

本文最先發布在博客:https://blog.ihypo.net/151628...

這篇文章將講解 Python 并發編程的基本操作。并發和并行是對孿生兄弟,概念經常混淆。并發是指能夠多任務處理,并行則是是能夠同時多任務處理。Erlang 之父 Joe Armstrong 有一張非常有趣的圖說明這兩個概念:

我個人更喜歡的一種說法是:并發是宏觀并行而微觀串行。

GIL

雖然 Python 自帶了很好的類庫支持多線程/進程編程,但眾所周知,因為 GIL 的存在,Python 很難做好真正的并行。

GIL 指全局解釋器鎖,對于 GIL 的介紹:

全局解釋器鎖(英語:Global Interpreter Lock,縮寫GIL),是計算機程序設計語言解釋器用于同步線程的一種機制,它使得任何時刻僅有一個線程在執行。

維基百科

其實與其說 GIL 是 Python 解釋器的限制,不如說是 CPython 的限制,因為 Python 為了保障性能,底層大多使用 C 實現的,而 CPython 的內存管理并不是線程安全的,為了保障整體的線程安全,解釋器便禁止多線程的并行執行。

因為 Python 社區認為操作系統的線程調度已經非常成熟了,沒有必要自己再實現一遍,因此 Python 的線程切換基本是依賴操作系統,在實際的使用中,對于單核 CPU,GIL 并沒有太大的影響,但對于多核 CPU 卻引入了線程顛簸(thrashing)問題。

線程顛簸是指作為單一資源的 GIL 鎖,在被多核心競爭強占時資源額外消耗的現象。

比如下圖,線程1 在釋放 GIL 鎖后,操作系統喚醒了 線程2,并將 線程2 分配給 核心2 執行,但是如果此時 線程2 卻沒有成功獲得 GIL 鎖,只能再次被掛起。此時切換線程、切換上下文的資源都將白白浪費。

因此,Python 多線程程序在多核 CPU 機器下的性能不一定比單核高。那么如果是計算密集型的程序,一般還是考慮用 C 重寫關鍵部分,或者使用多進程避開 GIL。

多線程

在 Python 中使用多線程,有 threadthreading 可供原則,thread 提供了低級別的、原始的線程以及一個簡單的鎖,因為 thread 過于簡陋,線程管理容易出現人為失誤,因此官方更建議使用 threading,而 threading 也不過是對 thread 的封裝和補充。(Python3 中 thread 被改名為 _thread)。

在 Python 中創建線程非常簡單:

import time
import threading


def do_task(task_name):
    print("Get task: {}".format(task_name))
    time.sleep(1)
    print("Finish task: {}".format(task_name))


if __name__ == "__main__":
    tasks = []
    for i in range(0, 10):
        # 創建 task
        tasks.append(threading.Thread(
            target=do_task,
            args=("task_{}".format(i),)))
    for t in tasks:
        # 開始執行 task
        t.start()

    for t in tasks:
        # 等待 task 執行完畢
        # 完畢前會阻塞住主線程
        t.join()
    print("Finish.")

直接創建線程簡單優雅,如果邏輯復雜,也可以通過繼承 Thread 基類完成多線程:

import time
import threading


class MyTask(threading.Thread):
    def __init__(self, task_name):
        super(MyTask, self).__init__()
        self.task_name = task_name

    def run(self):
        print("Get task: {}".format(self.task_name))
        time.sleep(1)
        print("Finish task: {}".format(self.task_name))


if __name__ == "__main__":
    tasks = []
    for i in range(0, 10):
        # 創建 task
        tasks.append(MyTask("task_{}".format(i)))
    for t in tasks:
        # 開始執行 task
        t.start()

    for t in tasks:
        # 等待 task 執行完畢
        # 完畢前會阻塞住主線程
        t.join()
    print("Finish.")
多進程

在 Python 中,可以使用 multiprocessing 庫來實現多進程編程,和多線程一樣,有兩種方法可以使用多進程編程。

直接創建進程:

import time
import random
import multiprocessing


def do_something(task_name):
    print("Get task: {}".format(task_name))
    time.sleep(random.randint(1, 5))
    print("Finish task: {}".format(task_name))


if __name__ == "__main__":
    tasks = []
    for i in range(0, 10):
        # 創建 task
        tasks.append(multiprocessing.Process(
            target=do_something,
            args=("task_{}".format(i),)))
    for t in tasks:
        # 開始執行 task
        t.start()

    for t in tasks:
        # 等待 task 執行完畢
        # 完畢前會阻塞住主線程
        t.join()
    print("Finish.")

繼承進程父類:

import time
import random
import multiprocessing


class MyTask(multiprocessing.Process):
    def __init__(self, task_name):
        super(MyTask, self).__init__()
        self.task_name = task_name

    def run(self):
        print("Get task: {}".format(self.task_name))
        time.sleep(random.randint(1, 5))
        print("Finish task: {}".format(self.task_name))


if __name__ == "__main__":
    tasks = []
    for i in range(0, 10):
        # 創建 task
        tasks.append(MyTask("task_{}".format(i)))
    for t in tasks:
        # 開始執行 task
        t.start()

    for t in tasks:
        # 等待 task 執行完畢
        # 完畢前會阻塞住主線程
        t.join()
    print("Finish.")

multiprocessing 除了常用的多進程編程外,我認為它最大的意義在于提供了一套規范,在該庫下有一個 dummy 模塊,即 multiprocessing.dummy,里面對 threading 進行封裝,提供了和 multiprocessing 相同 API 的線程實現,換句話說,class::multiprocessing.Process 提供的是進程任務類,而 class::multiprocessing.dummy.Process,也正是有 multiprocessing.dummy 的存在,可以快速的講一個多進程程序改為多線程:

import time
import random
from multiprocessing.dummy import Process


class MyTask(Process):
    def __init__(self, task_name):
        super(MyTask, self).__init__()
        self.task_name = task_name

    def run(self):
        print("Get task: {}".format(self.task_name))
        time.sleep(random.randint(1, 5))
        print("Finish task: {}".format(self.task_name))


if __name__ == "__main__":
    tasks = []
    for i in range(0, 10):
        # 創建 task
        tasks.append(MyTask("task_{}".format(i)))
    for t in tasks:
        # 開始執行 task
        t.start()

    for t in tasks:
        # 等待 task 執行完畢
        # 完畢前會阻塞住主線程
        t.join()
    print("Finish.")

無論是多線程還是多進程編程,這也是我一般會選擇 multiprocessing 的原因。

除了直接創建進程,還可以用進程池(或者 multiprocessing.dummy 里的進程池):

import time
import random
from multiprocessing import Pool


def do_task(task_name):
    print("Get task: {}".format(task_name))
    time.sleep(random.randint(1, 5))
    print("Finish task: {}".format(task_name))


if __name__ == "__main__":
    pool = Pool(5)
    for i in range(0, 10):
        #     創建 task
        pool.apply_async(do_task, ("task_{}".format(i),))
    pool.close()
    pool.join()
    print("Finish.")

線程池:

import time
import random
from multiprocessing.dummy import Pool


def do_task(task_name):
    print("Get task: {}".format(task_name))
    time.sleep(random.randint(1, 5))
    print("Finish task: {}".format(task_name))


if __name__ == "__main__":
    pool = Pool(5)
    for i in range(0, 10):
        #     創建 task
        pool.apply_async(do_task, ("task_{}".format(i),))
    pool.close()
    pool.join()
    print("Finish.")

這里示例有個問題,pool 在 join 前需要 close 掉,否則就會拋出異常,不過 Python 之禪的作者 Tim Peters 給出解釋:

As to Pool.close(), you should call that when - and only when - you"re never going to submit more work to the Pool instance. So Pool.close() is typically called when the parallelizable part of your main program is finished. Then the worker processes will terminate when all work already assigned has completed.

It"s also excellent practice to call Pool.join() to wait for the worker processes to terminate. Among other reasons, there"s often no good way to report exceptions in parallelized code (exceptions occur in a context only vaguely related to what your main program is doing), and Pool.join() provides a synchronization point that can report some exceptions that occurred in worker processes that you"d otherwise never see.

同步原語

在多進程編程中,因為進程間的資源隔離,不需要考慮內存的線程安全問題,而在多線程編程中便需要同步原語來保存線程安全,因為 Python 是一門簡單的語言,很多操作都是封裝的操作系統 API,因此支持的同步原語蠻全,但這里只寫兩種常見的同步原語:鎖和信號量。

通過使用鎖可以用來保護一段內存空間,而信號量可以被多個線程共享。

threading 中可以看到 Lock 鎖和 RLock 重用鎖兩種鎖,區別如名。這兩種鎖都只能被一個線程擁有,第一種鎖只能被獲得一次,而重用鎖可以被多次獲得,但也需要同樣次數的釋放才能真正的釋放。

當多個線程對同一塊內存空間同時進行修改的時候,經常遇到奇怪的問題:

import time
import random
from threading import Thread, Lock

count = 0


def do_task():
    global count
    time.sleep(random.randint(1, 10) * 0.1)
    tmp = count
    tmp += 1
    time.sleep(random.randint(1, 10) * 0.1)
    count = tmp
    print(count)


if __name__ == "__main__":
    tasks = []
    for i in range(0, 10):
        tasks.append(Thread(target=do_task))
    for t in tasks:
        t.start()
    for t in tasks:
        t.join()
    print("Finish. Count = {}".format(count))

如上就是典型的非線程安全導致 count 沒有達到預期的效果。而通過鎖便可以控制某一段代碼,或者說某段內存空間的訪問:

import time
import random
from threading import Thread, Lock

count = 0
lock = Lock()


def do_task():
    lock.acquire()
    global count
    time.sleep(random.randint(1, 10) * 0.1)
    tmp = count
    tmp += 1
    time.sleep(random.randint(1, 10) * 0.1)
    count = tmp
    print(count)
    lock.release()


if __name__ == "__main__":
    tasks = []
    for i in range(0, 10):
        tasks.append(Thread(target=do_task))
    for t in tasks:
        t.start()
    for t in tasks:
        t.join()
    print("Finish. Count = {}".format(count))

當然,上述例子非常暴力,直接強行把并發改為串行。

對于信號量常見于有限資源強占的場景,可以定義固定大小的信號量供多個線程獲取或者釋放,從而控制線程的任務執行,比如下面的例子,控制最多有 5 個任務在執行:

import time
import random
from threading import Thread, BoundedSemaphore

sep = BoundedSemaphore(5)


def do_task(task_name):
    sep.acquire()
    print("do Task: {}".format(task_name))
    time.sleep(random.randint(1, 10))
    sep.release()


if __name__ == "__main__":
    tasks = []
    for i in range(0, 10):
        tasks.append(Thread(target=do_task, args=("task_{}".format(i),)))
    for t in tasks:
        t.start()
    for t in tasks:
        t.join()
    print("Finish.")
Queue 和 Pipe

因為多進程的內存隔離,不會存在內存競爭的問題。但同時,多個進程間的數據共享成為了新的問題,而進程間通信常見:隊列,管道,信號。

這里只講解隊列和管道。

隊列常見于雙進程模型,一般用作生產者-消費者模式,由生產者進程向隊列中發布任務,并由消費者從隊列首部拿出任務進行執行:

import time
from multiprocessing import Process, Queue


class Task1(Process):
    def __init__(self, queue):
        super(Task1, self).__init__()
        self.queue = queue

    def run(self):
        item = self.queue.get()
        print("get item: [{}]".format(item))


class Task2(Process):
    def __init__(self, queue):
        super(Task2, self).__init__()
        self.queue = queue

    def run(self):
        print("put item: [Hello]")
        time.sleep(1)
        self.queue.put("Hello")


if __name__ == "__main__":
    queue = Queue()
    t1 = Task1(queue)
    t2 = Task2(queue)
    t1.start()
    t2.start()
    t1.join()
    print("Finish.")

理論上每個進程都可以向隊列里的讀或者寫,可以認為隊列是半雙工路線。但是往往只有特定的讀進程(比如消費者)和寫進程(比如生產者),盡管這些進程只是開發者自己定義的。

而 Pipe 更像一個全工路線:

import time
from multiprocessing import Process, Pipe


class Task1(Process):
    def __init__(self, pipe):
        super(Task1, self).__init__()
        self.pipe = pipe

    def run(self):
        item = self.pipe.recv()
        print("Task1: recv item: [{}]".format(item))
        print("Task1: send item: [Hi]")
        self.pipe.send("Hi")


class Task2(Process):
    def __init__(self, pipe):
        super(Task2, self).__init__()
        self.pipe = pipe

    def run(self):
        print("Task2: send item: [Hello]")
        time.sleep(1)
        self.pipe.send("Hello")
        time.sleep(1)
        item = self.pipe.recv()
        print("Task2: recv item: [{}]".format(item))


if __name__ == "__main__":
    pipe = Pipe()
    t1 = Task1(pipe[0])
    t2 = Task2(pipe[1])
    t1.start()
    t2.start()
    t1.join()
    t2.join()
    print("Finish.")

除了上面介紹的 threadingmultiprocessing 兩個庫外,還有一個好用的令人發指的庫 concurrent.futures。和前面兩個庫不同,這個庫是更高等級的抽象,隱藏了很多底層的東西,但也因此非常好用。用官方的例子:

with ThreadPoolExecutor(max_workers=1) as executor:
    future = executor.submit(pow, 323, 1235)
    print(future.result())

該庫中自帶了進程池和線程池,可以通過上下文管理器來管理,而且對于異步任務執行完后,結果的獲得也非常簡單。再拿一個官方的多進程計算的例子作為結束:

import concurrent.futures
import math

PRIMES = [
    112272535095293,
    112582705942171,
    112272535095293,
    115280095190773,
    115797848077099,
    1099726899285419]

def is_prime(n):
    if n % 2 == 0:
        return False

    sqrt_n = int(math.floor(math.sqrt(n)))
    for i in range(3, sqrt_n + 1, 2):
        if n % i == 0:
            return False
    return True

def main():
    with concurrent.futures.ProcessPoolExecutor() as executor:
        for number, prime in zip(PRIMES, executor.map(is_prime, PRIMES)):
            print("%d is prime: %s" % (number, prime))

if __name__ == "__main__":
    main()

歡迎關注個人公眾號:CS實驗室

文章版權歸作者所有,未經允許請勿轉載,若此文章存在違規行為,您可以聯系管理員刪除。

轉載請注明本文地址:http://specialneedsforspecialkids.com/yun/41698.html

相關文章

  • 使用 Python 進行并發編程系列 - 收藏集 - 掘金

    摘要:使用進行并發編程篇三掘金這是使用進行并發編程系列的最后一篇。所以我考慮啟用一個本地使用進行并發編程篇二掘金我們今天繼續深入學習。 使用 Python 進行并發編程 - asyncio 篇 (三) - 掘金 這是「使用Python進行并發編程」系列的最后一篇。我特意地把它安排在了16年最后一天。 重新實驗上篇的效率對比的實現 在第一篇我們曾經對比并發執行的效率,但是請求的是httpb...

    MorePainMoreGain 評論0 收藏0
  • Python基礎之使用期物處理并發

    摘要:本文重點掌握異步編程的相關概念了解期物的概念意義和使用方法了解中的阻塞型函數釋放的特點。一異步編程相關概念阻塞程序未得到所需計算資源時被掛起的狀態。 導語:本文章記錄了本人在學習Python基礎之控制流程篇的重點知識及個人心得,打算入門Python的朋友們可以來一起學習并交流。 本文重點: 1、掌握異步編程的相關概念;2、了解期物future的概念、意義和使用方法;3、了解Python...

    asoren 評論0 收藏0
  • 從小白程序員一路晉升為大廠高級技術專家我看過哪些書籍?(建議收藏)

    摘要:大家好,我是冰河有句話叫做投資啥都不如投資自己的回報率高。馬上就十一國慶假期了,給小伙伴們分享下,從小白程序員到大廠高級技術專家我看過哪些技術類書籍。 大家好,我是...

    sf_wangchong 評論0 收藏0
  • python初學——網絡編程之FTP服務器支持多并發版本

    摘要:擴展支持多用戶并發訪問與線程池。項目請見初學網絡編程之服務器。不允許超過磁盤配額。該文件是一個使用模塊編寫的線程池類。這一步就做到了線程池的作用。 對MYFTP項目進行升級。擴展支持多用戶并發訪問與線程池。MYFTP項目請見python初學——網絡編程之FTP服務器。 擴展需求 1.在之前開發的FTP基礎上,開發支持多并發的功能2.不能使用SocketServer模塊,必須自己實現多線...

    oysun 評論0 收藏0
  • 史上最詳細Python學習路線-從入門到精通,只需90天

    摘要:針對的初學者,從無到有的語言如何入門,主要包括了的簡介,如何下載,如何安裝,如何使用終端,等各種開發環境進行開發,中的語法和基本知識概念和邏輯,以及繼續深入學習的方法。 ...

    gghyoo 評論0 收藏0

發表評論

0條評論

最新活動
閱讀需要支付1元查看
<