scrapy笔记之探索数据流程

Feb 19, 2016


探索scrapy数据流程

  • 来自官方的解释:

    数据流(Data flow) Scrapy中的数据流由执行引擎控制,其过程如下:

    引擎打开一个网站(open a domain),找到处理该网站的Spider并向该spider请求第一个要爬取的URL(s)。 引擎从Spider中获取到第一个要爬取的URL并在调度器(Scheduler)以Request调度。 引擎向调度器请求下一个要爬取的URL。 调度器返回下一个要爬取的URL给引擎,引擎将URL通过下载中间件(请求(request)方向)转发给下载器(Downloader)。 一旦页面下载完毕,下载器生成一个该页面的Response,并将其通过下载中间件(返回(response)方向)发送给引擎。 引擎从下载器中接收到Response并通过Spider中间件(输入方向)发送给Spider处理。 Spider处理Response并返回爬取到的Item及(跟进的)新的Request给引擎。 引擎将(Spider返回的)爬取到的Item给Item Pipeline,将(Spider返回的)Request给调度器。 (从第二步)重复直到调度器中没有更多地request,引擎关闭该网站。

  • 在有多个起始Url时,对response的处理如何写到一个parse函数里?

  • 在parse函数里返回Item时,scrapy会打印爬取item的地址信息

  • 调度器(Scheduler)管理request请求队列

  • pipelines 模块主要用于清洗spider返回的items,去重,写入数据库。


Item Pipelines模块

  • 功能主要是清洗验证spider获得的数据,查重以及写入数据库。主要实现的方法有:process_item,open_spider, close_spider, from_crawler

  • 一个实例: 实现去重以及写入mongodb数据库。

      import pymongo
      from scrapy.exceptions import DropItem
    
    
      class SmzdmCrawlerPipeline(object):
            
          def __init__(self, mongodb_domain, mongodb_port, mongodb_db):
              self.mongodb_domain = mongodb_domain
              self.mongodb_port = mongodb_port
              self.mongodb_db = mongodb_db
              self.ids_seen = set()
            
          @classmethod
          def from_crawler(cls, crawler):
              return cls(mongodb_domain = crawler.settings.get('MONGODB_DOMAIN'),
                         mongodb_port = crawler.settings.get('MONGODB_PORT'),
                         mongodb_db = crawler.settings.get('MONGODB_DB'))
                
          def open_spider(self, spider):
              self.client = pymongo.MongoClient(self.mongodb_domain, self.mongodb_port)
              self.db = self.client[self.mongodb_db]
            
          def close_spider(self, spider):
              self.client.close()
                        
          def process_item(self, item, spider):
              collection_name = item.__class__.__name__
              if item['id'] in self.ids_seen:
                  raise DropItem("dulplicate item found: %s" % item)
              else:
                  self.ids_seen.add(item['id'])
                  print len(self.ids_seen)
                  self.db[collection_name].insert(dict(item))
                
                  return item
                    
      根据scrapy终端打印的信息,初步判断运行过程是:通过settings中的pipelines设置启用SmzdmCrawlerPipeline,调用__init__和from_crawler初始化参数,然后spider打开,调用open_spider。每当spider返回一个item,就调用process_item方法处理数据。spider关闭,调用close_spider方法
    
  • 上述实例在实际运行中存在的问题是:由于ids_seen在spider关闭时就会被销毁,下次运行spider又会生成新的ids_seen,所以不能实现更新的去重功能。实际可以维护一个redis队列来解决。


调度器 Scheduler 模块

  • 功能主要是调度request,维护request队列。可以通过重写方法结合redis实现对request的去重过滤。主要实现的方法有:from_crawler,from_settings,open,close,enqueue_request,next_request,has_pending_requests

  • RFPDupeFilter类是scrapy自带的用于检测过滤重复请求的类,主要实现的方法有from_crawler,from_settings,request_seen,可以用redis的set集合重写request_seen来实现对request的去重过滤

      def request_seen(self, request):
          """
              use sismember judge whether fp is duplicate.
          """
            
          fp = request_fingerprint(request)   #request_fingerprint 函数生成的请求指纹。
          if self.server.sismember(self.key,fp):
              return True
          self.server.sadd(self.key, fp)
          return False
    
  • Scheduler模块会调用RFPDupeFilter类的request_seen方法来过滤重复的request

      def enqueue_request(self, request):
          if not request.dont_filter and self.df.request_seen(request): #df为RFPDupeFilter的一个实例
              return
          self.stats.inc_value('scheduler/enqueued/redis', spider=self.spider)
          self.queue.push(request)
        
      def next_request(self):
          request = self.queue.pop()
          if request:
              self.stats.inc_value('scheduler/dequeued/redis', spider=self.spider)
          return request
      Scheduler执行过程为:通过from_settings和from_crawler从settings.py获取参数生成Scheduler实例,蜘蛛开启后,调用open方法,生成队列queue和RFPDupeFilter实例用于管理request队列。当spider返回request时,调用enqueue_request方法把不重复的request加入队列queue,当引擎请求request时,调用next_request返回request给引擎。
    

下载器中间件 DOWNLOADER_MIDDLEWARES

  • 下载器中间件 DOWNLOADER_MIDDLEWARES 是用于全局修改Scrapy request和response的一个轻量、底层的系统。要激活下载器中间件组件,将其加入到 DOWNLOADER_MIDDLEWARES 设置中。通过value的值设置执行的顺序,None则表示关闭该中间件。

      例如,如果您想要关闭user-agent中间件:
      DOWNLOADER_MIDDLEWARES = {           
          'scrapy.contrib.downloadermiddleware.useragent.UserAgentMiddleware': None,
      }
    
  • 下载器中间件DOWNLOADER_MIDDLEWARES主要实现的方法有process_request,process_response,process_exception.其中process_request用于处理request,process_response用于处理下载器生成的response,process_exception则用于处理异常

  • process_request方法中可以设置request的请求头信息:

      user_agent_list = ['...']   #可以设置user_agent池
      def __init__(self, user_agent=''):
          self.user_agent = user_agent
    
      def _user_agent(self, spider):
          if hasattr(spider, 'user_agent'):
              return spider.user_agent
          elif self.user_agent:
              return self.user_agent
    
          return random.choice(self.user_agent_list)
    
      def process_request(self, request, spider):
          ua = self._user_agent(spider)
          if ua:
              request.headers.setdefault('User-Agent', ua)
    

参考文章:scrapy官方教程中文版

参考项目:distribute_crawler