摘要:限制同時運行線程數使用類就行,在內部管理著一個計數器。當計數器到時,再調用就會阻塞,直到其他線程來調用,這樣就限制了同時運行線程的數量。
事前最好了解一下什么是進程,什么是線程,什么是GIL,本文不再贅述,直接介紹模塊的使用:
推薦1,推薦2,推薦3,更多自尋
普通的python爬蟲是單進程單線程的,這樣在遇到大量重復的操作時就只能逐個進行,我們就很難過了。舉個栗子:你有1000個美圖的鏈接,逐個喂給下載器(函數),看著圖片只能一個個蹦出來,你不心急嗎?于是我們想,能不能同時跑多個下載器,實現多圖同時下載?——答案是可以的,使用多進程/多線程,把每個帶著不同參數下載器分給每個進程/線程就,然后同時跑多個進程/線程就行了。
本文就介紹如何用多線程和多進程給爬蟲加速
補充主線程與子線程(進程同理):
一個py程序就有一個主線程,主線程負責整個py程序的代碼,當主線程處理到啟用多線程的代碼時,就會創建若干個子線程,這里就有選擇了,主線程是等待子線程的結束再繼續處理還是直接繼續處理讓子線程在外頭跑
多進程Python標準庫原本有threading和multiprocessing模塊編寫相應的多線程/多進程代碼。但從Python3.2開始,標準庫為我們提供了concurrent.futures模塊,它提供了ThreadPoolExecutor和ProcessPoolExecutor兩個類,實現了對threading和multiprocessing的更高級的抽象,對編寫線程池/進程池提供了直接的支持。多進程我們介紹futures的ProcessPoolExecutor
注:python 2.7 請安裝future模塊,pip install future
ProcessPoolExecutor類是Executor類的子類,實例化ProcessPoolExecutor類以創建進程池,在實例化的過程中應指定同時運行的最大進程數
from concurrent.futures import ProcessPoolExecutor pool = ProcessPoolExecutor(max_workers=4) # 運行最大進程數4 #進程池的操作... pool.shutdown(wait=True) # 關閉進程池,默認等待所有進程的完成。 print("Deep") # 有shutdown的情況下所有進程完成后才會運行下面的print,沒有的話會馬上運行 "創建進程也可用with,這時會自帶shutdown功能 with ProcessPoolExecutor(4) as pool: #進程池的操作... "
該類有兩種方法對進程池提交任務建立進程(函數及一組參數構成一個任務),分別是submit()和map(),如果單純想多開進程別無他想,用哪個都行,但submit()會有更靈活的用法
map(fn,*iterables)fn:函數
*iterables:函數每個參數的集合,N個參數就接N個集合
可以理解這是python自帶map()的多進程版,他返回的是一個迭代器,包含每個任務對應的返回值(有序的),下面用例子來分析
from concurrent.futures import ProcessPoolExecutor import time def test(x): time.sleep(x) # 時間阻塞 print(str(x)+"s") return x if __name__ == "__main__": with ProcessPoolExecutor(4) as pool: p = pool.map(test,[2,3,10,5,6]) for i in p: print(i)
輸出
2s 2 3s 3 5s 6s 10s 10 5 6
分析(下面以參數代替某個進程):
帶s的是函數輸出的,進程池最大允許4個進程同時運行,所以參數 2,3,10,5 首先一起進去。2最快完成,馬上讓給6進去,2+6<10 ,所以后進6完成得比10快,最后輸出順序就是 2s,3s,5s,6s,10s
不帶s的是for循環打印迭代器中的結果,由輸出可見,i的值分配是會等待進程完成返回值的,等2的完成返回2,等3的完成返回3,等10的完成返回10,由于10完成前5和6早就完成了,所以返回10后緊接著返回5和6,最后輸出順序為2,3,10,5,6,是有序的,對應各任務的返回值
在爬蟲中,上面代碼中的時間阻塞會對應著網絡I/O阻塞,任務中往往包含著網絡請求。比如你有很多個圖片鏈接,就寫一個下載圖片的函數(接收一個圖片鏈接的參數),把函數和圖片鏈接的集合喂給map()就實現多進程了加速了。
submit(fn, *arg)fn:函數
*arg:函數的參數
該方法是往進程池中提交可回調的任務,并返回一個future實例。提交多個任務可用循環實現,返回的future實例用列表存起來,每個future代表一個進程。關于future對象有許多方法:
future.running():判斷某個future(進程)是否運行中
future.done():判斷某個future(進程)是否正常結束
future.cancel():終止某個future(進程),終止失敗返回False,成功返回True
future.result():獲取future對應任務返回的結果。如果future還沒完成就會去等待
future.add_done_callback(fn):接收函數fn,將fn綁定到future對象上。當future對象被終止或完成時,fn將會被調用并接受該future對象
as_completed(fs):接收futures列表,futures列表中一旦有某個future(進程)完成就將該future對象yield回來,是個迭代器
from concurrent.futures import ProcessPoolExecutor,as_completed import time def test(x): time.sleep(x) print(str(x)+"s") return x if __name__ == "__main__": with ProcessPoolExecutor(4) as pool: futures = [pool.submit(test,i) for i in [2,3,10,5,6]] """for j in futures: print(j.result()) # 對應接收參數有序輸出,輸出2,3,10,5,6 """ for j in as_completed(futures): print(j.result()) # 對應進程完成順序輸出,輸出2,3,5,6,10多線程
建議小心使用,雖然多線程能實現高并發,但由于線程資源共享的特性,某個線程操作這些共享的資源時可能操到一半就停止讓給另一個線程操作,導致錯亂的發生。為避免此情況發生對某些操作需要加鎖,所以這里介紹對鎖有支持的threading模塊,python自帶直接導入。
如果你確信這些操作不會發生錯亂,可以直接使用concurrent.future 的 ThreadPoolExecutor,方法什么的和ProcessPoolExecutor的一樣
創建線程有兩種方法:
實例化 threading.Thread 類,target接收函數,arg以可迭代形式接收參數。這種方法最簡單
import threading import time def test(x): time.sleep(x) print(str(x)+"s") return x t1 = threading.Thread(target=test, args=(1,)) # 創建線程 t2 = threading.Thread(target=test, args=(3,)) t1.start() # 啟動線程 t2.start()
繼承threading.Thread 類,重寫run方法,把函數及參數接收寫進自己寫的多線程類中。這種方法更靈活,threading.Thread 類并沒有供獲取線程調用函數返回值的方法,如果需要函數返回值就需要繼承該類自己實現
import threading import time class TestThread(threading.Thread): def __init__(self,x): threading.Thread.__init__(self) self.x = x # 參數接收 def run(self): time.sleep(self.x) # 原來的函數寫到run中 print(str(self.x)+"s") def result(self): # 實現獲取調用函數的返回值的方法 return self.x t1 = TestThread(1) #創建線程 t2 = TestThread(3) t1.start() # 啟動線程 t2.start() t1.join() # 等待線程結束 t2.join() print(t1.result(),t2.result())
線程相關方法和屬性:
Thread.start():啟動線程
Thread.join():等待線程的結束,沒有join的話會接著運行join下面的代碼
Thread.is_alive():判斷線程是否在運行,線程未開啟/結束時返回 False
Thread.name:返回線程的名字,默認線程名是Thread-N,N指第N個開啟的線程
Thread.setName(str):給線程命名
Thread.setDaemon(True/False):設置子線程是否會隨主線程結束而結束,原本所有子線程默認是不會隨主線程結束而結束的
鎖線程間資源共享,如果多個線程共同對某個數據修改,可能會出現錯誤,為了保證數據的正確性,需要對多個線程進行同步。這時就需要引入鎖了(利用GIL),鎖只有一個,一個線程在持有鎖的狀態下對某些數據進行操作,其他線程就無法對該數據進行操作,直至該線程釋放鎖讓其他線程搶,誰搶到誰就有權修改。
threading提供Lock和RLock兩類鎖,前者一個線程只能獲取獲取一次鎖,后者允許一個線程能重復獲取鎖。如果某個線程對全局數據的操作是割裂的(分塊的),那就使用RLock。
acquire():獲取鎖
release():釋放鎖
有數據操作放在acquire 和 release 之間,就不會出現多個線程修改同一個數據的風險了
acquire 和 release 必須成對存在,如果一個線程只拿不放,其他線程沒有鎖能搶就只能永遠阻塞(停止)
一個錯亂的例子及鎖的使用:
import time, threading lock = threading.Lock() # rlock = threading.RLock() balance = [0] def test(n): for i in range(100000): # 理想的情況是執行了+n,-n操作后才讓另一個線程處理,結果永0 #lock.acquire() balance[0] = balance[0] + n # 某個線程可能處理到這里就終止讓給另一個線程處理了,循環一大,結果可能錯亂不為0 balance[0] = balance[0] - n #lock.release() t1 = threading.Thread(target=test, args=(5,)) t2 = threading.Thread(target=test, args=(8.0,)) t1.start() t2.start() t1.join() t2.join() print(balance[0])
在不加鎖的情況下多跑幾次,你會的到不同的結果。但是加了鎖之后,+n,-n兩個操作完整執行,不會中途中斷,結果永0。
限制同時運行線程數使用 threading.Semaphore 類就行,Semaphore 在內部管理著一個計數器。調用 acquire() 會使這個計數器減1,release() 則是加1。計數器的值永遠不會小于 0。當計數器到 0 時,再調用 acquire() 就會阻塞,直到其他線程來調用release(),這樣就限制了同時運行線程的數量。
使用上非常簡單,實例化Semaphore并指定線程數后,給函數的頭加個acquire(),尾加個release()就行。
import threading, time def test(x): semaphore.acquire() time.sleep(x) print(x) semaphore.release() semaphore = threading.Semaphore(4) # 最大4個線程同時進行 ts = [threading.Thread(target=test,args=(i,)) for i in [2,3,5,10,6]] [t.start() for t in ts] "輸出:2,3,5,6,10 (原理和上面多進程的那個差不多)"
關于threading的其他高級用法本文并未提及,以上都是些常用的用法,如果有更高級的需要,可以參考這文章
應用在爬蟲上講了這么多,都是模塊的用法,沒怎么提到爬蟲。那么最后大概的講下如何把多進程/多線程運用到爬蟲中,并給個代碼實例用作參考。
如果爬蟲需要重復進行某個操作(如下載一張圖片,爬取一張網頁的源碼,破解一次加密【加密耗cpu最好多進程】),那把這個操作抽象成一個接收相應參數的函數,把函數喂給進程/線程即可。
沒了,大概就這么用?? ?
下面給個多進程/多線程結合的網易云音樂評論下載器(下載某首音樂的多頁評論),包含加密算法,如不清楚可看之前的文章,我們用多進程加速加密過程,用多線程加速爬取過程。
本代碼較長,長到高亮效果都沒有了,因此該長代碼分為兩部分,前半部分是之前文章提到的加密方法,后半部分是本文的多進程多線程重點代碼:
import json, re, base64, random, requests, binascii, threading from Crypto.Cipher import AES#新的加密模塊只接受bytes數據,否者報錯,密匙明文什么的要先轉碼 from concurrent.futures import ProcessPoolExecutor from math import ceil secret_key = b"0CoJUm6Qyw8W8jud"#第四參數,aes密匙 pub_key ="010001"#第二參數,rsa公匙組成 modulus = "00e0b509f6259df8642dbc35662901477df22677ec152b5ff68ace615bb7b725152b3ab17a876aea8a5aa76d2e417629ec4ee341f56135fccf695280104e0312ecbda92557c93870114af6c9d05c4f7f0c3685b7a46bee255932575cce10b424d813cfe4875d3e82047b97ddef52741d546b8e289dc6935b3ece0462db0a22b8e7"#第三參數,rsa公匙組成 headers = {"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/65.0.3325.146 Safari/537.36"} def random_16(): return bytes("".join(random.sample("1234567890DeepDarkFantasy",16)),"utf-8") #aes加密 def aes_encrypt(text,key): pad = 16 - len(text)%16#對長度不是16倍數的字符串進行補全,然后在轉為bytes數據 try: #如果接到bytes數據(如第一次aes加密得到的密文)要解碼再進行補全 text = text.decode() except: pass text = text + pad * chr(pad) try: text = text.encode() except: pass encryptor = AES.new(key,AES.MODE_CBC,b"0102030405060708") ciphertext = encryptor.encrypt(text) ciphertext = base64.b64encode(ciphertext)#得到的密文還要進行base64編碼 return ciphertext #rsa加密 def rsa_encrypt(ran_16,pub_key,modulus): text = ran_16[::-1]#明文處理,反序并hex編碼 rsa = int(binascii.hexlify(text), 16) ** int(pub_key, 16) % int(modulus, 16) return format(rsa, "x").zfill(256) #返回加密后內容 def encrypt_data(data): ran_16 = random_16() text = json.dumps(data) params = aes_encrypt(text,secret_key) params = aes_encrypt(params,ran_16) encSecKey = rsa_encrypt(ran_16,pub_key,modulus) return {"params":params.decode(), "encSecKey":encSecKey }
class OnePageComment(threading.Thread): # 下載一頁評論的線程類 def __init__(self,post_url, enc_data): threading.Thread.__init__(self) self.post_url = post_url self.enc_data = enc_data self.comment = "" # 創建一個comment變量儲存爬到的數據 def run(self): semaphore.acquire() content = requests.post(self.post_url, headers = headers, data = self.enc_data ).json() if "hotComments" in content: if content["hotComments"]: self.comment += "*************精彩評論 " self.common(content, "hotComments") self.comment += " *************最新評論 " self.common(content, "comments") else: self.common(content, "comments") semaphore.release() def common(self, content,c_type): for each in content[c_type]: if each ["beReplied"]: if each["beReplied"][0]["content"]: self.comment += each["content"] + " 回復: " + each["beReplied"][0]["content"] + " " + "-" * 60 + " " else: self.comment += each["content"] + " " + "-" * 60 + " " def get_comment(self): # 選擇返回評論而不是直接寫入文件,因為多個線程同時操作一個文件有風險,應先返回,后統一寫入 return self.comment def get_enc_datas(pages, max_workers=4): # 多進程加密 raw_datas = [] for i in range(pages): if i == 0: raw_datas.append({"rid":"", "offset":"0", "total":"true", "limit":"20", "csrf_token":""}) else: raw_datas.append({"rid":"", "offset":str(i*20), "total":"false", "limit":"20", "csrf_token":""}) with ProcessPoolExecutor(max_workers) as pool: # 多進程適合計算密集型任務,如加密 result = pool.map(encrypt_data,raw_datas) return list(result) def one_song_comment(id_): # 爬取一首歌的評論并寫入txt,網絡I/O密集使用多線程 post_url = "http://music.163.com/weapi/v1/resource/comments/R_SO_4_" + str(id_) + "?csrf_token=" ts = [OnePageComment(post_url,i) for i in enc_datas] [i.start() for i in ts] [i.join() for i in ts] comments = [i.get_comment() for i in ts] with open(id_ + ".txt", "w", encoding="utf-8") as f: f.writelines(comments) if __name__ == "__main__": semaphore = threading.Semaphore(4) # 最大線程4 enc_datas = get_enc_datas(10) # 獲取加密后的數據,對所有歌曲都是通用的,這里有十頁的加密數據,對應爬十頁評論 one_song_comment("29498682")
效果提升驚人!!不信你跑一下上面的程序,然后和自己寫的單線程/單進程比較
cpu和網絡都跑到了峰值,網絡峰值在cpu峰值之后,因為是先多進程加密數據,后多線程爬取
文章版權歸作者所有,未經允許請勿轉載,若此文章存在違規行為,您可以聯系管理員刪除。
轉載請注明本文地址:http://specialneedsforspecialkids.com/yun/42342.html
摘要:所以只要得到登錄后的并必要時進行更新,服務器就會認定其為登錄狀態。看看人家知乎,加密到連名字都沒有了,還混淆,如何下手綜上,適用于沒有加密的登錄或者加密算法比較簡單并且不常更新的網站。遇上無解的加密算法要么手操拷貝,要么請大佬出場。 某些網站,登錄和沒登錄,用戶的權限是不一樣的,帳號登錄之后才能獲取更多的信息。更有甚者一上來就是登錄界面,不登錄就不給你進去(如p站)。爬取目標不用登錄固...
摘要:以下這些項目,你拿來學習學習練練手。當你每個步驟都能做到很優秀的時候,你應該考慮如何組合這四個步驟,使你的爬蟲達到效率最高,也就是所謂的爬蟲策略問題,爬蟲策略學習不是一朝一夕的事情,建議多看看一些比較優秀的爬蟲的設計方案,比如說。 (一)如何學習Python 學習Python大致可以分為以下幾個階段: 1.剛上手的時候肯定是先過一遍Python最基本的知識,比如說:變量、數據結構、語法...
摘要:一般用進程池維護,的設為數量。多線程爬蟲多線程版本可以在單進程下進行異步采集,但線程間的切換開銷也會隨著線程數的增大而增大。異步協程爬蟲引入了異步協程語法。 Welcome to the D-age 對于網絡上的公開數據,理論上只要由服務端發送到前端都可以由爬蟲獲取到。但是Data-age時代的到來,數據是新的黃金,毫不夸張的說,數據是未來的一切。基于統計學數學模型的各種人工智能的出現...
摘要:批評的人通常都會說的多線程編程太困難了,眾所周知的全局解釋器鎖,或稱使得多個線程的代碼無法同時運行。多線程起步首先讓我們來創建一個名為的模塊。多進程可能比多線程更易使用,但需要消耗更大的內存。 批評 Python 的人通常都會說 Python 的多線程編程太困難了,眾所周知的全局解釋器鎖(Global Interpreter Lock,或稱 GIL)使得多個線程的 Python 代碼無...
閱讀 3223·2021-11-23 09:51
閱讀 1030·2021-08-05 09:58
閱讀 663·2019-08-29 16:05
閱讀 971·2019-08-28 18:17
閱讀 3028·2019-08-26 14:06
閱讀 2721·2019-08-26 12:20
閱讀 2154·2019-08-26 12:18
閱讀 3064·2019-08-26 11:56