探索scrapy数据流程
-
来自官方的解释:
数据流(Data flow) Scrapy中的数据流由执行引擎控制,其过程如下:
引擎打开一个网站(open a domain),找到处理该网站的Spider并向该spider请求第一个要爬取的URL(s)。 引擎从Spider中获取到第一个要爬取的URL并在调度器(Scheduler)以Request调度。 引擎向调度器请求下一个要爬取的URL。 调度器返回下一个要爬取的URL给引擎,引擎将URL通过下载中间件(请求(request)方向)转发给下载器(Downloader)。 一旦页面下载完毕,下载器生成一个该页面的Response,并将其通过下载中间件(返回(response)方向)发送给引擎。 引擎从下载器中接收到Response并通过Spider中间件(输入方向)发送给Spider处理。 Spider处理Response并返回爬取到的Item及(跟进的)新的Request给引擎。 引擎将(Spider返回的)爬取到的Item给Item Pipeline,将(Spider返回的)Request给调度器。 (从第二步)重复直到调度器中没有更多地request,引擎关闭该网站。
-
在有多个起始Url时,对response的处理如何写到一个parse函数里?
-
在parse函数里返回Item时,scrapy会打印爬取item的地址信息
-
调度器(Scheduler)管理request请求队列
-
pipelines 模块主要用于清洗spider返回的items,去重,写入数据库。
Item Pipelines模块
-
功能主要是清洗验证spider获得的数据,查重以及写入数据库。主要实现的方法有:process_item,open_spider, close_spider, from_crawler
-
一个实例: 实现去重以及写入mongodb数据库。
import pymongo from scrapy.exceptions import DropItem class SmzdmCrawlerPipeline(object): def __init__(self, mongodb_domain, mongodb_port, mongodb_db): self.mongodb_domain = mongodb_domain self.mongodb_port = mongodb_port self.mongodb_db = mongodb_db self.ids_seen = set() @classmethod def from_crawler(cls, crawler): return cls(mongodb_domain = crawler.settings.get('MONGODB_DOMAIN'), mongodb_port = crawler.settings.get('MONGODB_PORT'), mongodb_db = crawler.settings.get('MONGODB_DB')) def open_spider(self, spider): self.client = pymongo.MongoClient(self.mongodb_domain, self.mongodb_port) self.db = self.client[self.mongodb_db] def close_spider(self, spider): self.client.close() def process_item(self, item, spider): collection_name = item.__class__.__name__ if item['id'] in self.ids_seen: raise DropItem("dulplicate item found: %s" % item) else: self.ids_seen.add(item['id']) print len(self.ids_seen) self.db[collection_name].insert(dict(item)) return item 根据scrapy终端打印的信息,初步判断运行过程是:通过settings中的pipelines设置启用SmzdmCrawlerPipeline,调用__init__和from_crawler初始化参数,然后spider打开,调用open_spider。每当spider返回一个item,就调用process_item方法处理数据。spider关闭,调用close_spider方法
-
上述实例在实际运行中存在的问题是:由于ids_seen在spider关闭时就会被销毁,下次运行spider又会生成新的ids_seen,所以不能实现更新的去重功能。实际可以维护一个redis队列来解决。
调度器 Scheduler 模块
-
功能主要是调度request,维护request队列。可以通过重写方法结合redis实现对request的去重过滤。主要实现的方法有:from_crawler,from_settings,open,close,enqueue_request,next_request,has_pending_requests
-
RFPDupeFilter类是scrapy自带的用于检测过滤重复请求的类,主要实现的方法有from_crawler,from_settings,request_seen,可以用redis的set集合重写request_seen来实现对request的去重过滤
def request_seen(self, request): """ use sismember judge whether fp is duplicate. """ fp = request_fingerprint(request) #request_fingerprint 函数生成的请求指纹。 if self.server.sismember(self.key,fp): return True self.server.sadd(self.key, fp) return False
-
Scheduler模块会调用RFPDupeFilter类的request_seen方法来过滤重复的request
def enqueue_request(self, request): if not request.dont_filter and self.df.request_seen(request): #df为RFPDupeFilter的一个实例 return self.stats.inc_value('scheduler/enqueued/redis', spider=self.spider) self.queue.push(request) def next_request(self): request = self.queue.pop() if request: self.stats.inc_value('scheduler/dequeued/redis', spider=self.spider) return request Scheduler执行过程为:通过from_settings和from_crawler从settings.py获取参数生成Scheduler实例,蜘蛛开启后,调用open方法,生成队列queue和RFPDupeFilter实例用于管理request队列。当spider返回request时,调用enqueue_request方法把不重复的request加入队列queue,当引擎请求request时,调用next_request返回request给引擎。
下载器中间件 DOWNLOADER_MIDDLEWARES
-
下载器中间件 DOWNLOADER_MIDDLEWARES 是用于全局修改Scrapy request和response的一个轻量、底层的系统。要激活下载器中间件组件,将其加入到 DOWNLOADER_MIDDLEWARES 设置中。通过value的值设置执行的顺序,None则表示关闭该中间件。
例如,如果您想要关闭user-agent中间件: DOWNLOADER_MIDDLEWARES = { 'scrapy.contrib.downloadermiddleware.useragent.UserAgentMiddleware': None, }
-
下载器中间件DOWNLOADER_MIDDLEWARES主要实现的方法有process_request,process_response,process_exception.其中process_request用于处理request,process_response用于处理下载器生成的response,process_exception则用于处理异常
-
process_request方法中可以设置request的请求头信息:
user_agent_list = ['...'] #可以设置user_agent池 def __init__(self, user_agent=''): self.user_agent = user_agent def _user_agent(self, spider): if hasattr(spider, 'user_agent'): return spider.user_agent elif self.user_agent: return self.user_agent return random.choice(self.user_agent_list) def process_request(self, request, spider): ua = self._user_agent(spider) if ua: request.headers.setdefault('User-Agent', ua)
参考文章:scrapy官方教程中文版
参考项目:distribute_crawler