摘要:本人建立個群作為去轉(zhuǎn)盤網(wǎng)的官方群,人數(shù)現(xiàn)在也不多,如果有興趣的話來逛逛吧,多個粉絲去轉(zhuǎn)盤多一份熱鬧,群號
之前我在寫百度網(wǎng)盤爬蟲,百度圖片爬蟲的時候答應(yīng)網(wǎng)友說,抽時間要把ok搜搜的的源碼公開,如今是時候兌現(xiàn)諾言了,下面就是爬蟲的所有代碼,完全,徹底的公開,你會不會寫程序都可以使用,不過請先裝個linux系統(tǒng),具備公網(wǎng)條件,然后運(yùn)行:
python startCrawler.py
有必要提醒你,數(shù)據(jù)庫字段代碼中都有,請你自己建張表格,這個太簡單了,就不多說了。同時我也提供一下下載地址,源碼都在:下載地址1 下載地址2
#!/usr/bin/env python # encoding: utf-8 """ author:haoning create time:2015.8.1 """ import hashlib import os import time import datetime import traceback import sys import random import json import socket import threading from hashlib import sha1 #進(jìn)行hash加密 from random import randint from struct import unpack from socket import inet_ntoa from threading import Timer, Thread from time import sleep from collections import deque from Queue import Queue import MySQLdb as mdb #數(shù)據(jù)庫連接器 import metautils import downloadTorrent from bencode import bencode, bdecode import pygeoip DB_HOST = "127.0.0.1" DB_USER = "root" DB_PASS = "root" BOOTSTRAP_NODES = ( ("67.215.246.10", 6881), ("82.221.103.244", 6881), ("23.21.224.150", 6881) ) RATE = 1 #調(diào)控速率 TID_LENGTH = 2 RE_JOIN_DHT_INTERVAL = 3 TOKEN_LENGTH = 2 INFO_HASH_LEN = 500000 #50w數(shù)據(jù)很小,限制內(nèi)存不至于消耗太大 CACHE_LEN = 100 #更新數(shù)據(jù)庫緩存 WAIT_DOWNLOAD = 80 geoip = pygeoip.GeoIP("GeoIP.dat") def is_ip_allowed(ip): country = geoip.country_code_by_addr(ip) if country in ("CN","TW","JP","HK", "KR"): return True return False def entropy(length): return "".join(chr(randint(0, 255)) for _ in xrange(length)) def random_id(): h = sha1() h.update(entropy(20)) return h.digest() def decode_nodes(nodes): n = [] length = len(nodes) if (length % 26) != 0: return n for i in range(0, length, 26): nid = nodes[i:i+20] ip = inet_ntoa(nodes[i+20:i+24]) port = unpack("!H", nodes[i+24:i+26])[0] n.append((nid, ip, port)) return n def timer(t, f): Timer(t, f).start() def get_neighbor(target, nid, end=10): return target[:end]+nid[end:] class KNode(object): def __init__(self, nid, ip, port): self.nid = nid self.ip = ip self.port = port class DHTClient(Thread): def __init__(self, max_node_qsize): Thread.__init__(self) self.setDaemon(True) self.max_node_qsize = max_node_qsize self.nid = random_id() self.nodes = deque(maxlen=max_node_qsize) def send_krpc(self, msg, address): try: self.ufd.sendto(bencode(msg), address) except Exception: pass def send_find_node(self, address, nid=None): nid = get_neighbor(nid, self.nid) if nid else self.nid tid = entropy(TID_LENGTH) msg = { "t": tid, "y": "q", "q": "find_node", "a": { "id": nid, "target": random_id() } } self.send_krpc(msg, address) def join_DHT(self): for address in BOOTSTRAP_NODES: self.send_find_node(address) def re_join_DHT(self): if len(self.nodes) == 0: self.join_DHT() timer(RE_JOIN_DHT_INTERVAL, self.re_join_DHT) def auto_send_find_node(self): wait = 1.0 / self.max_node_qsize while True: try: node = self.nodes.popleft() self.send_find_node((node.ip, node.port), node.nid) except IndexError: pass try: sleep(wait) except KeyboardInterrupt: os._exit(0) def process_find_node_response(self, msg, address): nodes = decode_nodes(msg["r"]["nodes"]) for node in nodes: (nid, ip, port) = node if len(nid) != 20: continue if ip == self.bind_ip: continue n = KNode(nid, ip, port) self.nodes.append(n) class DHTServer(DHTClient): #獲得info_hash def __init__(self, master, bind_ip, bind_port, max_node_qsize): DHTClient.__init__(self, max_node_qsize) self.master = master self.bind_ip = bind_ip self.bind_port = bind_port self.speed=0 self.process_request_actions = { "get_peers": self.on_get_peers_request, "announce_peer": self.on_announce_peer_request, } self.ufd = socket.socket(socket.AF_INET, socket.SOCK_DGRAM, socket.IPPROTO_UDP) self.ufd.bind((self.bind_ip, self.bind_port)) timer(RE_JOIN_DHT_INTERVAL, self.re_join_DHT) def run(self): self.re_join_DHT() while True: try: (data, address) = self.ufd.recvfrom(65536) msg = bdecode(data) self.on_message(msg, address) except Exception: pass def on_message(self, msg, address): global RATE #設(shè)為全局量 try: if msg["y"] == "r": if msg["r"].has_key("nodes"): self.process_find_node_response(msg, address) #發(fā)現(xiàn)節(jié)點(diǎn) elif msg["y"] == "q": try: self.speed+=1 if self.speed % 10000 ==0: RATE=random.randint(1,3) if RATE==2: RATE=1 if RATE==3: RATE=10 if self.speed>100000: self.speed=0 if self.speed % RATE==0: #數(shù)據(jù)過多,占用cpu太多,劃分限速,1,1,10 self.process_request_actions[msg["q"]](msg, address) #處理其他節(jié)點(diǎn)的請求,這個過程獲取info_hash #self.process_request_actions[msg["q"]](msg, address) #處理其他節(jié)點(diǎn)的請求,這個過程獲取info_hash except KeyError: self.play_dead(msg, address) except KeyError: pass def on_get_peers_request(self, msg, address): try: infohash = msg["a"]["info_hash"] tid = msg["t"] nid = msg["a"]["id"] token = infohash[:TOKEN_LENGTH] msg = { "t": tid, "y": "r", "r": { "id": get_neighbor(infohash, self.nid), "nodes": "", "token": token } } self.master.log(infohash, address) self.send_krpc(msg, address) except KeyError: pass def on_announce_peer_request(self, msg, address): try: infohash = msg["a"]["info_hash"] token = msg["a"]["token"] nid = msg["a"]["id"] tid = msg["t"] if infohash[:TOKEN_LENGTH] == token: if msg["a"].has_key("implied_port ") and msg["a"]["implied_port "] != 0: port = address[1] else: port = msg["a"]["port"] self.master.log_announce(infohash, (address[0], port)) except Exception: print "error" pass finally: self.ok(msg, address) def play_dead(self, msg, address): try: tid = msg["t"] msg = { "t": tid, "y": "e", "e": [202, "Server Error"] } self.send_krpc(msg, address) except KeyError: pass def ok(self, msg, address): try: tid = msg["t"] nid = msg["a"]["id"] msg = { "t": tid, "y": "r", "r": { "id": get_neighbor(nid, self.nid) } } self.send_krpc(msg, address) except KeyError: pass class Master(Thread): #解析info_hash def __init__(self): Thread.__init__(self) self.setDaemon(True) self.queue = Queue() self.cache = Queue() self.count=0 self.mutex = threading.RLock() #可重入鎖,使單線程可以再次獲得已經(jīng)獲得的? self.waitDownload = Queue() self.metadata_queue = Queue() self.dbconn = mdb.connect(DB_HOST, DB_USER, DB_PASS, "oksousou", charset="utf8") self.dbconn.autocommit(False) self.dbcurr = self.dbconn.cursor() self.dbcurr.execute("SET NAMES utf8") self.visited = set() def lock(self): #加鎖 self.mutex.acquire() def unlock(self): #解鎖 self.mutex.release() def work(self,item): print "start thread",item while True: self.prepare_download_metadata() self.lock() self.download_metadata() self.unlock() self.lock() self.got_torrent() self.unlock() def start_work(self,max): for item in xrange(max): t = threading.Thread(target=self.work, args=(item,)) t.setDaemon(True) t.start() #入隊(duì)的種子效率更高 def log_announce(self, binhash, address=None): if self.queue.qsize() < INFO_HASH_LEN : #大于INFO_HASH_LEN就不要入隊(duì),否則后面來不及處理 if is_ip_allowed(address[0]): self.queue.put([address, binhash]) #獲得info_hash def log(self, infohash, address=None): if self.queue.qsize() < INFO_HASH_LEN: #大于INFO_HASH_LEN/2就不要入隊(duì),否則后面來不及處理 if is_ip_allowed(address[0]): self.queue.put([address, infohash]) def prepare_download_metadata(self): if self.queue.qsize() == 0: sleep(2) #從queue中獲得info_hash用來下載 address, binhash= self.queue.get() if binhash in self.visited: return if len(self.visited) > 100000: #大于100000重置隊(duì)列,認(rèn)為已經(jīng)訪問過了 self.visited = set() self.visited.add(binhash) #跟新已經(jīng)訪問過的info_hash info_hash = binhash.encode("hex") utcnow = datetime.datetime.utcnow() self.cache.put((address,binhash,utcnow)) #裝入緩存隊(duì)列 def download_metadata(self): if self.cache.qsize() > CACHE_LEN/2: #出隊(duì)更新下載 while self.cache.qsize() > 0: #排空隊(duì)列 address,binhash,utcnow = self.cache.get() info_hash = binhash.encode("hex") self.dbcurr.execute("SELECT id FROM search_hash WHERE info_hash=%s", (info_hash,)) y = self.dbcurr.fetchone() if y: # 更新最近發(fā)現(xiàn)時間,請求數(shù) self.dbcurr.execute("UPDATE search_hash SET last_seen=%s, requests=requests+1 WHERE info_hash=%s", (utcnow, info_hash)) else: self.waitDownload.put((address, binhash)) self.dbconn.commit() if self.waitDownload.qsize() > WAIT_DOWNLOAD: while self.waitDownload.qsize() > 0: address,binhash = self.waitDownload.get() t = threading.Thread(target=downloadTorrent.download_metadata, args=(address, binhash, self.metadata_queue)) t.setDaemon(True) t.start() def decode(self, s): if type(s) is list: s = ";".join(s) u = s for x in (self.encoding, "utf8", "gbk", "big5"): try: u = s.decode(x) return u except: pass return s.decode(self.encoding, "ignore") def decode_utf8(self, d, i): if i+".utf-8" in d: return d[i+".utf-8"].decode("utf8") return self.decode(d[i]) def parse_metadata(self, data): #解析種子 info = {} self.encoding = "utf8" try: torrent = bdecode(data) #編碼后解析 if not torrent.get("name"): return None except: return None detail = torrent info["name"] = self.decode_utf8(detail, "name") if "files" in detail: info["files"] = [] for x in detail["files"]: if "path.utf-8" in x: v = {"path": self.decode("/".join(x["path.utf-8"])), "length": x["length"]} else: v = {"path": self.decode("/".join(x["path"])), "length": x["length"]} if "filehash" in x: v["filehash"] = x["filehash"].encode("hex") info["files"].append(v) info["length"] = sum([x["length"] for x in info["files"]]) else: info["length"] = detail["length"] info["data_hash"] = hashlib.md5(detail["pieces"]).hexdigest() return info def got_torrent(self): if self.metadata_queue.qsize() == 0: return binhash, address, data,start_time = self.metadata_queue.get() if not data: return try: info = self.parse_metadata(data) if not info: return except: traceback.print_exc() return temp = time.time() x = time.localtime(float(temp)) utcnow = time.strftime("%Y-%m-%d %H:%M:%S",x) # get time now info_hash = binhash.encode("hex") #磁力 info["info_hash"] = info_hash # need to build tags info["tagged"] = False info["classified"] = False info["requests"] = 1 info["last_seen"] = utcnow info["create_time"] = utcnow info["source_ip"] = address[0] if info.get("files"): files = [z for z in info["files"] if not z["path"].startswith("_")] if not files: files = info["files"] else: files = [{"path": info["name"], "length": info["length"]}] files.sort(key=lambda z:z["length"], reverse=True) bigfname = files[0]["path"] info["extension"] = metautils.get_extension(bigfname).lower() info["category"] = metautils.get_category(info["extension"]) try: try: print " ", "Saved", info["info_hash"], info["name"], (time.time()-start_time), "s", address[0] except: print " ", "Saved", info["info_hash"] ret = self.dbcurr.execute("INSERT INTO search_hash(info_hash,category,data_hash,name,extension,classified,source_ip,tagged," + "length,create_time,last_seen,requests) VALUES(%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s)", (info["info_hash"], info["category"], info["data_hash"], info["name"], info["extension"], info["classified"], info["source_ip"], info["tagged"], info["length"], info["create_time"], info["last_seen"], info["requests"])) if self.count %50 ==0: self.dbconn.commit() if self.count>100000: self.count=0 except: print self.name, "save error", self.name, info traceback.print_exc() return if __name__ == "__main__": #啟動客戶端 master = Master() master.start_work(150) #啟動服務(wù)器 dht = DHTServer(master, "0.0.0.0", 6881, max_node_qsize=200) dht.start() dht.auto_send_find_node()
注意,上面的代碼有一段代碼需要下載種子,所以下面的這段代碼十分重要:
#!/usr/bin/env python # encoding: utf-8 """ author:haoning create time:2015.8.1 """ from hashlib import sha1 import math from socket import inet_ntoa import socket from struct import pack, unpack from threading import Timer, Thread from time import sleep, time from bencode import bencode, bdecode from startCrawler import entropy BT_PROTOCOL = "BitTorrent protocol" BT_MSG_ID = 20 EXT_HANDSHAKE_ID = 0 def random_id(): hash = sha1() hash.update(entropy(20)) return hash.digest() def send_packet(the_socket, msg): the_socket.send(msg) def send_message(the_socket, msg): msg_len = pack(">I", len(msg)) send_packet(the_socket, msg_len + msg) def send_handshake(the_socket, infohash): bt_header = chr(len(BT_PROTOCOL)) + BT_PROTOCOL ext_bytes = "x00x00x00x00x00x10x00x00" peer_id = random_id() packet = bt_header + ext_bytes + infohash + peer_id send_packet(the_socket, packet) def check_handshake(packet, self_infohash): try: bt_header_len, packet = ord(packet[:1]), packet[1:] if bt_header_len != len(BT_PROTOCOL): return False except TypeError: return False bt_header, packet = packet[:bt_header_len], packet[bt_header_len:] if bt_header != BT_PROTOCOL: return False packet = packet[8:] infohash = packet[:20] if infohash != self_infohash: return False return True def send_ext_handshake(the_socket): msg = chr(BT_MSG_ID) + chr(EXT_HANDSHAKE_ID) + bencode({"m":{"ut_metadata": 1}}) send_message(the_socket, msg) def request_metadata(the_socket, ut_metadata, piece): """bep_0009""" msg = chr(BT_MSG_ID) + chr(ut_metadata) + bencode({"msg_type": 0, "piece": piece}) send_message(the_socket, msg) def get_ut_metadata(data): ut_metadata = "_metadata" index = data.index(ut_metadata)+len(ut_metadata) + 1 return int(data[index]) def get_metadata_size(data): metadata_size = "metadata_size" start = data.index(metadata_size) + len(metadata_size) + 1 data = data[start:] return int(data[:data.index("e")]) def recvall(the_socket, timeout=5): the_socket.setblocking(0) total_data = [] data = "" begin = time() while True: sleep(0.05) if total_data and time()-begin > timeout: break elif time()-begin > timeout*2: break try: data = the_socket.recv(1024) if data: total_data.append(data) begin = time() except Exception: pass return "".join(total_data) def download_metadata(address, infohash, metadata_queue, timeout=5): metadata = None start_time = time() the_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM) try: the_socket.settimeout(timeout) the_socket.connect(address) # handshake send_handshake(the_socket, infohash) packet = the_socket.recv(4096) # handshake error if not check_handshake(packet, infohash): return # ext handshake send_ext_handshake(the_socket) packet = the_socket.recv(4096) # get ut_metadata and metadata_size ut_metadata, metadata_size = get_ut_metadata(packet), get_metadata_size(packet) # request each piece of metadata metadata = [] for piece in range(int(math.ceil(metadata_size/(16.0*1024)))): #piece是個控制塊,根據(jù)控制塊下載數(shù)據(jù) request_metadata(the_socket, ut_metadata, piece) packet = recvall(the_socket, timeout) #the_socket.recv(1024*17) metadata.append(packet[packet.index("ee")+2:]) metadata = "".join(metadata) except socket.timeout: pass except Exception, e: pass finally: #print "metadata= %s" %(metadata) the_socket.close() #確保沒回都關(guān)閉socket if metadata != None: #只讓不空的種子入? metadata_queue.put((infohash, address, metadata,start_time))
其實(shí)下載種子還有一種方式就是借助libtorrent,但這個太耗費(fèi)cpu了,所以我一般不用他,如下:
#coding: utf8 import threading import traceback import random import time import os import socket import libtorrent as lt threading.stack_size(200*1024) socket.setdefaulttimeout(30) def fetch_torrent(session, ih, timeout): name = ih.upper() url = "magnet:?xt=urn:btih:%s" % (name,) data = "" params = { "save_path": "/tmp/downloads/", "storage_mode": lt.storage_mode_t(2), "paused": False, "auto_managed": False, "duplicate_is_error": True} try: handle = lt.add_magnet_uri(session, url, params) except: return None status = session.status() handle.set_sequential_download(1) meta = None down_time = time.time() down_path = None for i in xrange(0, timeout): if handle.has_metadata(): info = handle.get_torrent_info() down_path = "/tmp/downloads/%s" % info.name() #print "status", "p", status.num_peers, "g", status.dht_global_nodes, "ts", status.dht_torrents, "u", status.total_upload, "d", status.total_download meta = info.metadata() break time.sleep(1) if down_path and os.path.exists(down_path): os.system("rm -rf "%s"" % down_path) session.remove_torrent(handle) return meta def download_metadata(address, binhash, metadata_queue, timeout=20): metadata = None start_time = time.time() try: session = lt.session() r = random.randrange(10000, 50000) session.listen_on(r, r+10) session.add_dht_router("router.bittorrent.com",6881) session.add_dht_router("router.utorrent.com",6881) session.add_dht_router("dht.transmission.com",6881) session.add_dht_router("127.0.0.1",6881) session.start_dht() metadata = fetch_torrent(session, binhash.encode("hex"), timeout) session = None except: traceback.print_exc() finally: metadata_queue.put((binhash, address, metadata,start_time))
這個爬蟲還是耗費(fèi)了本人和其他網(wǎng)上高手的很多時間的,請看到這篇博客的朋友保持鉆研精神,開源精神,多多交流,秉承分享。本人建立個qq群作為去轉(zhuǎn)盤網(wǎng)的官方群,人數(shù)現(xiàn)在也不多,如果有興趣的話來逛逛吧,多個粉絲去轉(zhuǎn)盤多一份熱鬧,qq群號:512245829
文章版權(quán)歸作者所有,未經(jīng)允許請勿轉(zhuǎn)載,若此文章存在違規(guī)行為,您可以聯(lián)系管理員刪除。
轉(zhuǎn)載請注明本文地址:http://specialneedsforspecialkids.com/yun/45501.html
摘要:閑話不多說了,接下來談?wù)劸W(wǎng)絡(luò)爬蟲吧。根據(jù)中的到指定端口使用擴(kuò)展協(xié)議進(jìn)行數(shù)據(jù)的交換即下載下載成功,解析出種子文件列表信息入庫。具體實(shí)現(xiàn)請參考我的開源項(xiàng)目代碼如有問題,歡迎指正,僅供技術(shù)交流,切勿用作非法商業(yè)用途。 演示地址: https://dodder.cc 三年前,照著 Python 版的 DHT 網(wǎng)絡(luò)爬蟲用 Java 重寫了一遍,當(dāng)時大學(xué)還未畢業(yè),寫出來的代碼比較雜亂,數(shù)據(jù)跑到 1...
摘要:項(xiàng)目簡介前端站點(diǎn)項(xiàng)目效果預(yù)覽使用實(shí)現(xiàn)磁力鏈接爬蟲磁力鏈接解析成種子信息,保存到數(shù)據(jù)庫,利用實(shí)現(xiàn)中文檢索。搭建磁力鏈接搜索引擎源碼地址后端腳本磁力鏈接獲取磁力鏈接解析入庫定時同步源碼地址此項(xiàng)目僅用學(xué)習(xí)交流技術(shù)使用不做商業(yè)用途。 項(xiàng)目簡介 前端站點(diǎn) 項(xiàng)目效果預(yù)覽 http://findcl.com 使用 nodejs 實(shí)現(xiàn)磁力鏈接爬蟲 磁力鏈接解析成 torrent種子信息,保存到數(shù)據(jù)...
閱讀 2723·2023-04-25 22:15
閱讀 1804·2021-11-19 09:40
閱讀 2149·2021-09-30 09:48
閱讀 3213·2021-09-03 10:36
閱讀 2026·2021-08-30 09:48
閱讀 1854·2021-08-24 10:00
閱讀 2725·2019-08-30 15:54
閱讀 699·2019-08-30 15:54