DEV Community

drake
drake

Posted on

通过extension实现scrapy定时调度

import time
import logging
from datetime import datetime
from scrapy import signals
from scrapy.exceptions import NotConfigured
from scrapy.exceptions import DontCloseSpider
from utils.redisdb import redis_cli
from config import env

logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s')
logger = logging.getLogger('Extension')
logger.info(f'extensions env: {env}')

# 原始配置参考https://scrapy-chs.readthedocs.io/zh_CN/0.24/topics/extensions.html
class SpiderInsertStartUrlExtension(object):
    """
    Scrapy所有爬虫实现定时调度的扩展
    """
    def __init__(self, item_count, crawler):
        """
        初始化操作
        :param item_count: 程序空闲的最大次数
        :param crawler: 类,用于发送关闭程序信号
        """
        self.crawler = crawler
        self.count = 0      # 统计空闲次数
        self.conn = redis_cli()

    @classmethod
    def from_crawler(cls, crawler):
        """
        必须方法
        """
        # 判断是否启用扩展
        if not crawler.settings.getbool('MYEXT_ENABLED'):
            raise NotConfigured
        # 每隔5个item 输出当前采集的数量
        item_count = crawler.settings.getint('MYEXT_ITEMCOUNT', 5)
        ext = cls(item_count, crawler)
        crawler.signals.connect(ext.spider_opened, signal=signals.spider_opened)
        crawler.signals.connect(ext.spider_closed, signal=signals.spider_closed)
        crawler.signals.connect(ext.spider_idle, signal=signals.spider_idle)  # 加载空闲信号
        return ext

    def cron_judgement(self,spider):
        """
        定时调度,比如在每日的8:00整调度,支持在spider中定义crontab的语法给变量
        比如每日0点1分启动:cron_job = "1 0 * * *"
        :return: True or False True则执行
        """
        # crontab的语法规则,该语法只约束启动时间,具体下次调度还得结合 schedule_time
        # 如果需要定时调度,则执行if语句
        cron_job = hasattr(spider,'cron_job')
        if cron_job:
            cron_job = spider.cron_job.split(' ')
            minute = cron_job[0]
            hour = cron_job[1]
            day = cron_job[2]
            month = cron_job[3]
            week = cron_job[4]
            now_minute = str(datetime.now().minute)
            now_hour = str(datetime.now().hour)
            now_day = str(datetime.now().day)
            now_month = str(datetime.now().month)
            now_week = str(datetime.now().weekday() + 1)
            if minute == '*':
                minute = now_minute
            if hour == '*':
                hour = now_hour
            if day == '*':
                day = now_day
            if month == '*':
                month = now_month
            if week == '*':
                week = now_week
            if (minute==now_minute)and(hour==now_hour)and(day==now_day)and(month==now_month)and(week==now_week):
                # 当cronjob定义的最小小单位为分钟级别时;爬虫可能在5秒跑完,那么1分钟内爬虫将有12次满足条件的情况,爬虫将被重复拉起12次
                # 为避免次情况的发生,通过redis key 在短时间内做去重
                if not self.conn.get(f"{spider.name}:spider_opened"):
                    self.conn.setex(f"{spider.name}:spider_opened",5*60,1)
                    return True
                else:
                    logger.info(f'{cron_job} 已经开始执行,由于爬虫速度过快,导致条件再次满足,故做去重处理')
                    return False
            else:
                logger.info(f'等待开始调度的时间 {cron_job}...')
                return False
        else:
            return False

    def interval_time(self,spider):
        """
        根据间隔时间调度 比如每30分钟调度一次: schedule_time = 12 * 30
        :return: True or False 真则执行
        """
        # 存在cron调度则不需要此间隔调度
        cron_job = hasattr(spider,'cron_job')
        schedule_time = hasattr(spider,'schedule_time')

        if cron_job:
            return False
        if not schedule_time:
            return False
        # 空闲超过指定时间
        if self.count > spider.schedule_time:
            # 每次开始执行任务后,都初始化计数器,重新统计下一轮的空闲时长
            self.count = 0
            return True
        else:
            return False

    def spider_opened(self, spider):
        """
        必须方法
        只执行一次,爬虫启动的时候执行
        支持指定时间启动
        """
        # 爬虫首次判断是否需要定时调度
        # 判断是否需要定时调度
        # 如果需要定时调度,则执行if语句
        cron_job = hasattr(spider,'cron_job')
        logger.info(f'环境: {env}')
        logger.info(f'cronjob: {cron_job}')
        # 线上环境才执行cron coinsadjust
        if env == 'prod':
            if cron_job:
                while True:
                    run = self.cron_judgement(spider)
                    if run:
                        break
                    else:
                        time.sleep(59)
        # self.insert_start_url(spider)
        logger.info("opened spider %s" % spider.name)
        # 该变量为了让程序idle的时候输出上次调度的时间,以此判断上次调度是否正常
        self.started_time = datetime.now().strftime("%Y-%m-%d %H:%M:%S")

    def insert_start_url(self, spider):
        """
        """
        logger.info('从 start_request 类方法 直接生成任务...')
        for request in spider.start_requests():
            self.crawler.engine.crawl(request, spider)

    def spider_closed(self, spider):
        """
        必须方法
        """
        logger.info("closed spider %s" % spider.name)
        self.crawler.engine.close_spider(spider, '爬虫已经采集完毕')

    def spider_idle(self, spider):
        """
        记录信息,作出关闭选择
        框架默认5秒执行一次spider_idle
        :param spider:
        :return:
        """
        logger.info(f'{spider.name} Idle 爬虫上次启动时间为 {self.started_time}')
        # 判断redis_key中是否为空,如果为空时,则空闲一次,统计 + 1
        # if not self.conn.llen(spider.name + ":requests") and not self.conn.llen(spider.redis_key):
        #     self.count += 1
        #     time.sleep(1)
        # else:
            # 每次开始执行任务后,都初始化计数器,重新统计下一轮的空闲时长
            # self.count = 0
        self.count += 1
        self.spider_run(spider)
        raise DontCloseSpider

    def spider_run(self,spider):
        """
        激活调度爬虫(往队列塞链接)
        需要根据条件判断是否激活
        """
        # 常规来讲,调度方式二选一
        cron = self.cron_judgement(spider)
        interval = self.interval_time(spider)
        if cron or interval:
            self.insert_start_url(spider)
            # 启动时间计入redis  方便分布式协调任务
            started_time_stamp = datetime.strptime(self.started_time, "%Y-%m-%d %H:%M:%S").timestamp()
            self.conn.set(f'{spider.name}:starttime', started_time_stamp)
            self.started_time = datetime.now().strftime("%Y-%m-%d %H:%M:%S")

Enter fullscreen mode Exit fullscreen mode

Top comments (0)