设为首页收藏本站

EPS数据狗论坛

 找回密码
 立即注册

QQ登录

只需一步,快速开始

查看: 4361|回复: 3

分布式爬虫爬取静态数据

  [复制链接]

22

主题

189

金钱

298

积分

入门用户

发表于 2019-7-8 14:50:00 | 显示全部楼层 |阅读模式

目的意义
爬虫应该能够快速高效的完成数据爬取和分析任务。使用多个进程协同完成一个任务,提高了数据爬取的效率。
以百度百科的一条为起点,抓取百度百科2000左右词条数据。

说明
作者说是简单的分布式爬虫(hh),在书中有详细的说明和注解。
这里只是补漏和梳理。
因为进程传递参数的问题,搞了几天还是放弃了在WIndows上跑,换用了Linux。
又因为各种各样的问题,弃用CentOS(它确实是安全可靠的,但是...我不会装QQ,输入法等),换用了软件容易安装的Ubuntu。然后才装了Eclipse等各种软件后,才开始多进程的调试。

构造
主节点和从节点的方案实现信息爬取。结构应该让各个节点高效工作。

从节点:
爬虫爬取速度受到网络延时的影响和网页信息解析的影响比较严重,所以使用多个从节点用来专门负责下载网页信息,解析网页信息。

则分为三个文件,爬取文件,下载网页文件,解析网页文件。

爬取文件接收来自主节点发送来的网页地址。然后调用下载网页文件并完成解析,将处理好的数据发送给主节点。

主节点:
主节点负责发送给从节点网页地址,并接收来自从节点的解析后的网页信息,将网页信息存储下来。

主节点任务分为分发网址,接收从节点的信息,存储网页三部分。在代码里,他建立了三个进程,来分别实现。

主节点任务中,存储信息,定义一套存储信息的方法。分发网址,定义一套分发网址过程中可能用到的方法。主文件中,设立三个函数,建立三个进程。

主节点设计
主节点的三个任务,分成三个进程,三个进程(分发网址,数据接收,数据存储),做一个类。

数据接收与分发网址,需要分布式进程。分布式进程需要使用队列Queue。这里一定是multiprocessing中的导入的队列。网址分发、数据接收分别使用一个队列。

注册,设定地址,秘钥,完成初始化过程,将url_q,result_q分别注册到网络中。

然后设立分发任务,传递队列给分发任务函数。分发任务使用url_q队列完成数据的发送。使用conn_q接收了新的网址,并进行存储,再次分发到url_q上。

数据接收任务,完成了数据的接收过程,接收以后需要及时将数据存储,在这里使用了两个队列conn_q,放置接收数据中的地址信息,store_q,放置接收数据中的网页信息。

数据存储任务,接收数据接收任务中的store_q队列信息,及时写入到磁盘中。

所有涉及到的文件如下:

NodeManager.py
  1. import time
  2. #import sys
  3. #sys.path.append('/home')#if needed ,add path as package
  4. from UrlManager import UrlManager
  5. from multiprocessing import Process,Queue
  6. from multiprocessing.managers import BaseManager
  7. from DataOutput import DataOutput

  8. class NodeManager():
  9.     def start_manager(self,url_q,result_q):
  10.         BaseManager.register('get_task_queue', callable=lambda:url_q)
  11.         BaseManager.register('get_result_queue',callable=lambda:result_q)
  12.         manager=BaseManager(address=('127.0.0.1',8001),authkey='baike'.encode('utf-8'))
  13.         return manager
  14.      
  15.     def url_manager_proc(self,url_q,conn_q,root_url):
  16.         #send url to queue and receive new urls for storing to object
  17.         url_manager=UrlManager()
  18.         url_manager.add_new_url(root_url)
  19.         while True:
  20.             while(url_manager.has_new_url()):
  21.                 new_url=url_manager.get_new_url()
  22.                 url_q.put(new_url)
  23.                 print('old url size:'+str(url_manager.old_url_size()))
  24.                 if(url_manager.old_url_size()>2000):
  25.                     url_q.put('end')
  26.                     url_manager.save_process('new_urls.txt',url_manager.new_urls)
  27.                     url_manager.save_process('old_urls.txt',url_manager.old_urls)
  28.                     print('finish url_manager_proc')
  29.                     return
  30.             try:
  31.                 urls=conn_q.get()
  32.                 url_manager.add_new_urls(urls)
  33.                 print('get:'+urls)
  34.             except Exception:
  35.                 time.sleep(0.1)
  36.          
  37.      
  38.     def result_solve_proc(self,result_q,conn_q,store_q):
  39.         while True:
  40.             if not result_q.empty():
  41.                 content=result_q.get(True)
  42.                 if content['new_urls']=='end':
  43.                     print('finish result_solve_proc')
  44.                     store_q.put('end')
  45.                     return
  46.                 conn_q.put(content["new_urls"])
  47.                 store_q.put(content["data"])
  48.             else:
  49.                 time.sleep(0.1)
  50.      
  51.     def store_proc(self,store_q):
  52.         output=DataOutput()
  53.         while True:
  54.             if not store_q.empty():
  55.                 data=store_q.get()
  56.                 if data =='end':
  57.                     print('finish store_proc')
  58.                     output.output_end(output.path)
  59.                     return
  60.                 output.store_data(data)



  61. if __name__=='__main__':
  62.     url_q=Queue()#send url to workers
  63.     result_q=Queue()#receive url's analytical data from works
  64.     store_q=Queue()#analytical data which is fresh is used for storing to disk for further extract
  65.     conn_q=Queue()#urls which is fresh are used for storing to object for further extract
  66.     nodeObject=NodeManager()
  67.     manager=nodeObject.start_manager(url_q,result_q)
  68.      
  69.     root_url='https://baike.baidu.com/item/%E7%BD%91%E7%BB%9C%E7%88%AC%E8%99%AB/5162711?fr=aladdin'
  70.     url_manager=Process(target=nodeObject.url_manager_proc,args=(url_q,conn_q,root_url,))
  71.     result_solve=Process(target=nodeObject.result_solve_proc,args=(result_q,conn_q,store_q,))
  72.     store=Process(target=nodeObject.store_proc,args=(store_q,))
  73.     url_manager.start()
  74.     result_solve.start()
  75.     store.start()
  76.     manager.get_server().serve_forever()
复制代码


UrlManager.py
  1. import hashlib
  2. import pickle
  3. class UrlManager():
  4.     def __init__(self):
  5.         self.old_urls=self.load_process('new_urls.txt')
  6.         self.new_urls=self.load_process('old_urls.txt')
  7.         pass
  8.      
  9.     def has_new_url(self):
  10.         return self.new_url_size()!=0
  11.      
  12.     def new_url_size(self):
  13.         return len(self.new_urls)
  14.      
  15.     def old_url_size(self):
  16.         return len(self.old_urls)
  17.      
  18.     def get_new_url(self):
  19.         new_url=self.new_urls.pop()
  20.         m=hashlib.md5()
  21.         m.update(new_url.encode("utf8"))
  22.         self.old_urls.add(m.hexdigest()[8:-8])
  23.         return new_url
  24.      
  25.     def add_new_url(self,url):
  26.         if url is None:
  27.             return
  28.         m=hashlib.md5()
  29.         m.update(url.encode('utf-8'))      
  30.         url_md5=m.hexdigest()[8:-8]
  31.         if url not in self.new_urls and url_md5 not in self.old_urls:
  32.             self.new_urls.add(url)
  33.      
  34.     def add_new_urls(self,urls):
  35.         if urls is None or len(urls) == 0:
  36.             return
  37.         for url in urls:
  38.             self.add_new_url(url)
  39.         pass
  40.      
  41.     def save_process(self,path,data):
  42.         with open(path,'wb') as f:
  43.             pickle.dump(data,f)
  44.      
  45.     def load_process(self,path):
  46.         print('loading..')
  47.         try:
  48.             with open(path,'rb') as f:
  49.                 tmp=pickle.load(f)
  50.                 return tmp
  51.         except:
  52.             print('loading error maybe loading file not exist and will create it:'+path)
  53.         newSet=set()
  54.         self.save_process(path, newSet)
  55.         return newSet
复制代码


DataOutput.py
  1. import codecs
  2. from os.path import os
  3. class DataOutput(object):
  4.     def __init__(self):
  5.         self.path='baike.html'
  6.         self.output_head(self.path)
  7.         self.datas=[]
  8.      
  9.     def store_data(self,data):
  10.         if data is None:
  11.             return
  12.         self.datas.append(data)
  13.         self.output_html(self.path,data)
  14.      
  15.     def output_head(self,path):
  16.         if os.path.exists(path):
  17.             return
  18.         fout=codecs.open('baike.html', 'w', encoding='utf-8')
  19.         fout.write("<html>")
  20.         fout.write("<head><meta charset='urf-8'></head>")
  21.         fout.write("<body>")
  22.         fout.write("<table border='1' width=1800  style='word-break:break-all;word-wrap:break-word;'>")
  23.         fout.write("<tr>")
  24.         fout.write("<td width='20'>序号</td>")
  25.         fout.write("<td width='300'>URL</td>")
  26.         fout.write("<td width='100'>标题</td>")
  27.         fout.write("<td width='1200'>释义</td>")
  28.         fout.write("</tr>")  
  29.         fout.close()
  30.          
  31.     def output_end(self,path):
  32.         fout=codecs.open(path, 'a', encoding='utf-8')
  33.         fout.write("</table>")
  34.         fout.write("</body>")     
  35.         fout.write("</html>")
  36.         fout.close()      
  37.          
  38.     def output_html(self,path,data):
  39.         fout=codecs.open(path, 'a', encoding='utf-8')   
  40.         fout.write("<tr>")
  41.         fout.write("<td>%s</td>"%str(len(self.datas)))
  42.         fout.write("<td><a href=%s>%s</a></td>"%(data['url'],data['url']))
  43.         fout.write("<td>%s</td>"%data['title'])
  44.         fout.write("<td>%s</td>"%data['summary'])
  45.         fout.write("</tr>")
  46.         fout.close()
复制代码


从节点设计
从节点首先是连接到指定地址并验证秘钥。连接后获取url_q、result_q。
从url_q中获取发来的地址,调用HTML下载器下载数据,调动HTML解析器解析数据,然后把结果放到result_q队列上。
代码如下
SpiderWork.py
  1. from multiprocessing.managers import BaseManager
  2. from HtmlDownloader import HtmlDownloader
  3. from HtmlParser import HtmlParser
  4. class SpiderWork():
  5.     def __init__(self):
  6.         BaseManager.register('get_task_queue')
  7.         BaseManager.register('get_result_queue')
  8.         server_addr='127.0.0.1'
  9.         print('connect'+server_addr)
  10.         self.m=BaseManager(address=(server_addr,8001),authkey='baike'.encode('utf-8'))
  11.         self.m.connect()
  12.         self.task=self.m.get_task_queue()
  13.         self.result=self.m.get_result_queue()
  14.         print(self.task)
  15.         self.downloader=HtmlDownloader()
  16.         self.parser=HtmlParser()
  17.         print('initial finish')
  18.      
  19.     def crawl(self):
  20.         while (True):
  21.             try:
  22.                 if not self.task.empty():
  23.                     url=self.task.get()
  24.                     if url == 'end':
  25.                         print('stop spider1')
  26.                         self.result.put({'new_urls':'end','data':'end'})
  27.                         return
  28.                     print('working:'+url)#url
  29.                     content=self.downloader.download(url)
  30.                     new_urls,data=self.parser.parser(url,content)
  31.                     self.result.put({"new_urls":new_urls,"data":data})
  32.             except Exception as e:
  33.                 print(e,url)
  34.                  
  35. if __name__=="__main__":
  36.     spider=SpiderWork()
  37.     spider.crawl()
复制代码


HtmlDownloader.py
  1. import requests
  2. import chardet
  3. class HtmlDownloader(object):
  4.     def download(self,url):
  5.         if url is None:
  6.             return None
  7.         user_agent='Mozilla/5.0 (Windows NT 10.0; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/58.0.3029.110 Safari/537.36 SE 2.X MetaSr 1.0'
  8.         headers={'User-Agent':user_agent}
  9.         r=requests.get(url,headers=headers)
  10.         if r.status_code is 200:
  11.             r.encoding=chardet.detect(r.content)['encoding']
  12.             return r.text
  13.         return None
复制代码


HtmlParser.py
  1. import re
  2. from urllib import parse
  3. from bs4 import BeautifulSoup
  4. class HtmlParser(object):
  5.     def parser(self,page_url,html_cont):
  6.         if page_url is None or html_cont is None:
  7.             return
  8.          
  9.         soup=BeautifulSoup(html_cont,'lxml')
  10.          
  11.         new_urls=self.getNewUrls(page_url,soup)
  12.          
  13.         new_data=self.getNewData(page_url,soup)
  14.         return new_urls,new_data
  15.      
  16.     def getNewUrls(self,page_url,soup):
  17.         new_urls=set()
  18.         links=soup.find_all('a',href=re.compile(r'/item/.*'))
  19.         for link in links:
  20.             new_url=link['href']
  21.             new_full_url=parse.urljoin(page_url,new_url)
  22.             new_urls.add(new_full_url)
  23.         return new_urls

  24.     def getNewData(self,page_url,soup):
  25.         data={}
  26.         data['url']=page_url
  27.         title = soup.find('dd',class_='lemmaWgt-lemmaTitle-title').find('h1')
  28.         data['title']=title.get_text()
  29.         summary = soup.find('div',class_='lemma-summary')
  30.         #获取到tag中包含的所有文版内容包括子孙tag中的内容,并将结果作为Unicode字符串返回
  31.         data['summary']=summary.get_text()
  32.         return data
复制代码


结果
建立.sh文件如下:
  1. #!/bin/bash
  2. rm -rf log/*
  3. rm -rf baike.html
  4. rm -rf new_urls.txt
  5. rm -rf old_urls.txt<br>
  6. python3 control/NodeManager.py &> log/control.log &
  7. for ((i=1;i<=10;i++))
  8. do
  9.         python3 spider/SpiderWork.py &>log/spider$i.log &
  10. done
复制代码


启动主节点,然后启动10个从节点。将它们所产生的日志信息记录到log/下,并都是在后台运行的进程。
两分钟左右,完成约1900条的数据获取。
1.png
可能用到的命令:
kill -9 (ps aux | grep python | awk '{print2}')
!kill

可能用到的软件:
Eclipse的pydev进程调试。
最后
这代码里面真的有好多的细节文件,序列化操作与存储,md5的压缩方案等,都是值得思考的。

7

主题

362

金钱

3127

积分

中级用户

发表于 2020-1-2 20:38:51 | 显示全部楼层
太好了,谢谢!
ximenyan
回复 支持 反对

使用道具 举报

7

主题

362

金钱

3127

积分

中级用户

发表于 2020-1-2 20:39:18 | 显示全部楼层
,great!
ximenyan
回复

使用道具 举报

2

主题

9

金钱

28

积分

新手用户

发表于 2020-2-24 14:58:48 | 显示全部楼层
厉害,感谢分享
回复 支持 反对

使用道具 举报

您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

关闭

站长推荐上一条 /1 下一条

客服中心
关闭
在线时间:
周一~周五
8:30-17:30
QQ群:
653541906
联系电话:
010-85786021-8017
在线咨询
客服中心

意见反馈|网站地图|手机版|小黑屋|EPS数据狗论坛 ( 京ICP备09019565号-3 )   

Powered by BFIT! X3.4

© 2008-2028 BFIT Inc.

快速回复 返回顶部 返回列表