摘要:獲取正在運(yùn)行的線程數(shù),用于狀態(tài)監(jiān)控。之后初始化組件主要是初始化線程池將到中,初始化開始時(shí)間等。如果線程池中運(yùn)行線程數(shù)量為,并且默認(rèn),那么就停止退出,結(jié)束爬蟲。
本系列文章,針對Webmagic 0.6.1版本
一個(gè)普通爬蟲啟動(dòng)代碼
public static void main(String[] args) { Spider.create(new GithubRepoPageProcessor()) 從https:github.com/code4craft開始抓 .addUrl("https://github.com/code4craft") //設(shè)置Scheduler,使用Redis來管理URL隊(duì)列 .setScheduler(new RedisScheduler("localhost")) //設(shè)置Pipeline,將結(jié)果以json方式保存到文件 .addPipeline(new JsonFilePipeline("D:datawebmagic")) //開啟5個(gè)線程同時(shí)執(zhí)行 .thread(5) //啟動(dòng)爬蟲 .run(); }
1、spider可配置插拔組件:
Downloader 提供自定義的Downloader,默認(rèn)為HttpClientDownloader
Pipeline 提供自定義的Pipeline,可以配置多個(gè),多個(gè)Pipeline鏈?zhǔn)教幚斫Y(jié)果。默認(rèn)為ConsolePipeline
Scheduler 提供自定義的調(diào)度器,默認(rèn)為QueueScheduler
PageProcessor 頁面處理組件,開發(fā)者爬蟲的實(shí)現(xiàn)
ExecutorService 可以用于提供自己實(shí)現(xiàn)的線程池來監(jiān)控,默認(rèn)為Fixed ExecutorService
SpiderListener 頁面狀態(tài)監(jiān)聽器,提供每個(gè)頁面成功和錯(cuò)誤的回調(diào)。可配置多個(gè)。
其中有:WebMagic四大組件:Pipeline,Scheduler,Downloader和PageProcesser 。這和Python中的Scrapy的理念是一致的。但是Scrapy還有一些中間件的概念,從結(jié)構(gòu)圖中便可以看出區(qū)別
2、狀態(tài)變量:
stat 0,初始化;1,運(yùn)行中;2,已停止
pageCount 已經(jīng)抓取的頁面數(shù)。注意:這里統(tǒng)計(jì)的是GET請求的頁面,POST請求的頁面不在統(tǒng)計(jì)的范圍之內(nèi)。具體原因見DuplicateRemovedScheduler類
startTime:開始時(shí)間,可用于計(jì)算耗時(shí)。
emptySleepTime 最大空閑等待時(shí)間,默認(rèn)30s。如果抓取隊(duì)列為空,且url隊(duì)列為空的最大等待時(shí)長,超過該時(shí)間,就認(rèn)為爬蟲抓取完成,停止運(yùn)行。
threadNum : 啟用的線程數(shù),默認(rèn)1.
threadPool:這是Webmagic提供的CountableThreadPool實(shí)例,內(nèi)部封裝了ExecutorService,CountableThreadPool 提供了額外的獲取線程運(yùn)行數(shù)的方法,此外為防止大量urls入池等待,提供了阻塞方式管理urls入池。(后續(xù)細(xì)說)
destroyWhenExit:默認(rèn)true。是否在調(diào)用stop()時(shí)立即停止所有任務(wù)并退出。
spawUrl : 默認(rèn)為true,是否抓取除了入口頁面starturls之外的其他頁面(targetRequests).
3、需要配置的項(xiàng):
Site 全局站點(diǎn)配置,如UA,timeout,sleep等
PageProcessor 頁面處理組件,開發(fā)者爬蟲的實(shí)現(xiàn)
Request 配置入口頁面url,可以多個(gè)。
uuid ,可選,Spider的名字,用于分析和日志。
需要注意的是:每個(gè)修改配置的方法都進(jìn)行了checkIfRunning檢查,如果檢查當(dāng)前Spider正在運(yùn)行,它會(huì)拋出IllegalStateException。
所有配置方法都return this,便于鏈?zhǔn)秸{(diào)用,類似于builder模式。
4、運(yùn)行方式:
Spider實(shí)現(xiàn)了Runnable接口(還有一個(gè)Webmagic自己的Task接口)。
run(),跟普通的Runnable一樣,阻塞式運(yùn)行,會(huì)阻塞當(dāng)前線程直至Spider運(yùn)行結(jié)束。
runAsync(),就是new一個(gè)Thread來運(yùn)行當(dāng)前Spider這個(gè)Runnable,異步運(yùn)行。
start(),runAsync()的別名方法,異步運(yùn)行。
5、狀態(tài)相關(guān)方法
stop(),結(jié)束當(dāng)前爬蟲的運(yùn)行,內(nèi)部只是簡單地修改一下狀態(tài),如果設(shè)置了destroyWhenExit=true(默認(rèn)就是true)那么會(huì)立即停止所有任務(wù)并清除資源,否則并不會(huì)停止正在線程池中運(yùn)行的線程,也不會(huì)銷毀線程池。
getThreadAlive() 獲取正在運(yùn)行的線程數(shù),用于狀態(tài)監(jiān)控。
6、核心代碼分析
public void run() { checkRunningStat(); initComponent(); logger.info("Spider " + getUUID() + " started!"); while (!Thread.currentThread().isInterrupted() && stat.get() == STAT_RUNNING) { final Request request = scheduler.poll(this); if (request == null) { if (threadPool.getThreadAlive() == 0 && exitWhenComplete) { break; } // wait until new url added waitNewUrl(); } else { threadPool.execute(new Runnable() { @Override public void run() { try { processRequest(request); onSuccess(request); } catch (Exception e) { onError(request); logger.error("process request " + request + " error", e); } finally { pageCount.incrementAndGet(); signalNewUrl(); } } }); } } stat.set(STAT_STOPPED); // release some resources if (destroyWhenExit) { close(); } }
首先通過checkRunningStat()來檢查并設(shè)置運(yùn)行狀態(tài),如果已經(jīng)在運(yùn)行了,那么會(huì)拋出IllegalStateException。之后初始化組件(主要是初始化Downloader、線程池、將starturls push到Scheduler中,初始化開始時(shí)間等)。之后進(jìn)入循環(huán),從scheduler中poll出Request給線程池去執(zhí)行。如果scheduler中沒有request了:繼而判斷是否有線程在運(yùn)行和是否設(shè)置了立即退出標(biāo)志,如果設(shè)置了立即退出循環(huán),否則調(diào)用waitNewUrl()等待有新的url被加入。
waitNewUrl()采用RetreentLock和Condition來進(jìn)行超時(shí)阻塞,一旦阻塞時(shí)間超過emptySleepTime就返回。如果線程池中運(yùn)行線程數(shù)量為0,并且exitWhenComplete=true(默認(rèn)),那么就停止退出,結(jié)束爬蟲。如果exitWhenComplete=false,那么需要開發(fā)者手動(dòng)調(diào)用stop()來停止退出爬蟲,并調(diào)用close()來清理資源。
通過processRequest來處理抓取url的整個(gè)流程,代碼如下:
protected void processRequest(Request request) { Page page = downloader.download(request, this); if (page == null) { sleep(site.getSleepTime()); onError(request); return; } // for cycle retry if (page.isNeedCycleRetry()) { extractAndAddRequests(page, true); sleep(site.getRetrySleepTime()); return; } pageProcessor.process(page); extractAndAddRequests(page, spawnUrl); if (!page.getResultItems().isSkip()) { for (Pipeline pipeline : pipelines) { pipeline.process(page.getResultItems(), this); } } //for proxy status management request.putExtra(Request.STATUS_CODE, page.getStatusCode()); sleep(site.getSleepTime()); }
它在內(nèi)部調(diào)用downloader下載頁面得到Page(Page代表了一個(gè)頁面),然后判斷是否需要重試(needCycleRetry標(biāo)志會(huì)在downloader下載頁面發(fā)生異常時(shí)被設(shè)置為true,同時(shí)會(huì)把自己本身request加到targetRequests當(dāng)中),如果需要,則抽取targetRequests到scheduler當(dāng)中。如果都沒問題,繼續(xù)調(diào)用我們實(shí)現(xiàn)的頁面處理器進(jìn)行處理,之后再抽取我們在頁面處理器中放入的targetRequests(即需要繼續(xù)抓取的url)到scheduler當(dāng)中。之后便是調(diào)用pipeline進(jìn)行處理(一般做持久化操作,寫到數(shù)據(jù)庫、文件之類的),但是如果我們在頁面處理器中為page設(shè)置了skip標(biāo)志,那么就不會(huì)調(diào)用pipeline進(jìn)行處理。
當(dāng)然其中還包括一些重試休眠時(shí)間、繼續(xù)抓取等待時(shí)間等來更好地控制爬蟲抓取頻率。
說完processRequest,我們回到run()繼續(xù)分析,處理完之后,就是調(diào)用監(jiān)聽器,告訴其成功還是失敗,最后抓取數(shù)加+1,然后通知新url被加入(通知waitNewUrl()可以返回繼續(xù)了)。
需要說明的一點(diǎn)是,Spider類中的狀態(tài)管理大量用到了Jdk Atomic原子包下的CAS并發(fā)原子類。
7、CountableThreadPool
前面說過Spider采用的線程池對象CountableThreadPool內(nèi)部封裝了ExecutorService,CountableThreadPool 提供了額外的獲取線程運(yùn)行數(shù)的方法,此外為防止大量urls入池等待,提供了阻塞方式管理urls入池。
阻塞方式的實(shí)現(xiàn)是通過ReentrantLock和它的Condition來實(shí)現(xiàn)的。具體代碼如下:
public void execute(final Runnable runnable) { if (threadAlive.get() >= threadNum) { try { reentrantLock.lock(); while (threadAlive.get() >= threadNum) { try { condition.await(); } catch (InterruptedException e) { } } } finally { reentrantLock.unlock(); } } threadAlive.incrementAndGet(); executorService.execute(new Runnable() { @Override public void run() { try { runnable.run(); } finally { try { reentrantLock.lock(); threadAlive.decrementAndGet(); condition.signal(); } finally { reentrantLock.unlock(); } } } }); }
邏輯是這樣的,如果正在運(yùn)行的線程數(shù)threadAlive超過允許的線程數(shù),就阻塞等待,直至收到某個(gè)線程結(jié)束通知。
羅嗦一句,這里的線程安全控制,主要是用到了JDK atomic包來表示狀態(tài)和ReentrantLock、Condition來控制達(dá)到類似生產(chǎn)者消費(fèi)者的阻塞機(jī)制。
關(guān)于Spider就分析到這里,后續(xù)主題待定。
文章版權(quán)歸作者所有,未經(jīng)允許請勿轉(zhuǎn)載,若此文章存在違規(guī)行為,您可以聯(lián)系管理員刪除。
轉(zhuǎn)載請注明本文地址:http://specialneedsforspecialkids.com/yun/66862.html
摘要:爬蟲框架源碼分析之爬蟲框架源碼分析之爬蟲框架源碼分析之爬蟲框架源碼分析之爬蟲框架源碼分析之之進(jìn)階 爬蟲框架Webmagic源碼分析之Spider爬蟲框架WebMagic源碼分析之Scheduler爬蟲框架WebMagic源碼分析之Downloader爬蟲框架WebMagic源碼分析之Selector爬蟲框架WebMagic源碼分析之SeleniumWebMagic之Spider進(jìn)階
摘要:包主要實(shí)現(xiàn)類,這是一個(gè)抽象類,實(shí)現(xiàn)了通用的模板方法,并在方法內(nèi)部判斷錯(cuò)誤重試去重處理等。重置重復(fù)檢查就是清空,獲取請求總數(shù)也就是獲取的。至于請求總數(shù)統(tǒng)計(jì),就是返回中維護(hù)的的大小。 Scheduler是Webmagic中的url調(diào)度器,負(fù)責(zé)從Spider處理收集(push)需要抓取的url(Page的targetRequests)、并poll出將要被處理的url給Spider,同時(shí)還負(fù)責(zé)...
摘要:方法,首先判斷是否有這是在中配置的,如果有,直接調(diào)用的將相應(yīng)內(nèi)容轉(zhuǎn)化成對應(yīng)編碼字符串,否則智能檢測響應(yīng)內(nèi)容的字符編碼。 Downloader是負(fù)責(zé)請求url獲取返回值(html、json、jsonp等)的一個(gè)組件。當(dāng)然會(huì)同時(shí)處理POST重定向、Https驗(yàn)證、ip代理、判斷失敗重試等。 接口:Downloader 定義了download方法返回Page,定義了setThread方法來...
摘要:實(shí)際運(yùn)行中就發(fā)現(xiàn)了一個(gè)有趣的現(xiàn)象。爬蟲抓取的速度超過了我用給它推送的速度,導(dǎo)致爬蟲從獲取不到同時(shí)此刻線程池所有線程都已停止。如何管理設(shè)置,避免返回,且沒有工作線程時(shí)退出循環(huán)。退出檢測循環(huán)說明結(jié)束了,手動(dòng)調(diào)用來是退出調(diào)度循環(huán),終止爬蟲。 Webmagic源碼分析系列文章,請看這里 從解決問題開始吧。 問題描述:由于數(shù)據(jù)庫的數(shù)據(jù)量特別大,而且公司沒有搞主從讀寫分離,導(dǎo)致從數(shù)據(jù)庫讀取數(shù)據(jù)比較...
摘要:優(yōu)雅的使用框架,爬取唐詩別苑網(wǎng)的詩人詩歌數(shù)據(jù)同時(shí)在幾種動(dòng)態(tài)加載技術(shù)中對比作選擇雖然差不多兩年沒有維護(hù),但其本身是一個(gè)優(yōu)秀的爬蟲框架的實(shí)現(xiàn),源碼中有很多值得參考的地方,特別是對爬蟲多線程的控制。 優(yōu)雅的使用WebMagic框架,爬取唐詩別苑網(wǎng)的詩人詩歌數(shù)據(jù) 同時(shí)在幾種動(dòng)態(tài)加載技術(shù)(HtmlUnit、PhantomJS、Selenium、JavaScriptEngine)中對比作選擇 We...
閱讀 3197·2021-11-08 13:18
閱讀 1353·2021-10-09 09:57
閱讀 1182·2021-09-22 15:33
閱讀 3960·2021-08-17 10:12
閱讀 5053·2021-08-16 11:02
閱讀 2676·2019-08-30 10:56
閱讀 962·2019-08-29 18:31
閱讀 3251·2019-08-29 16:30