import time
import logging
from datetime import datetime
from scrapy import signals
from scrapy.exceptions import NotConfigured
from scrapy.exceptions import DontCloseSpider
from utils.redisdb import redis_cli
from config import env
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s')
logger = logging.getLogger('Extension')
logger.info(f'extensions env: {env}')
# 原始配置参考https://scrapy-chs.readthedocs.io/zh_CN/0.24/topics/extensions.html
class SpiderInsertStartUrlExtension(object):
"""
Scrapy所有爬虫实现定时调度的扩展
"""
def __init__(self, item_count, crawler):
"""
初始化操作
:param item_count: 程序空闲的最大次数
:param crawler: 类,用于发送关闭程序信号
"""
self.crawler = crawler
self.count = 0 # 统计空闲次数
self.conn = redis_cli()
@classmethod
def from_crawler(cls, crawler):
"""
必须方法
"""
# 判断是否启用扩展
if not crawler.settings.getbool('MYEXT_ENABLED'):
raise NotConfigured
# 每隔5个item 输出当前采集的数量
item_count = crawler.settings.getint('MYEXT_ITEMCOUNT', 5)
ext = cls(item_count, crawler)
crawler.signals.connect(ext.spider_opened, signal=signals.spider_opened)
crawler.signals.connect(ext.spider_closed, signal=signals.spider_closed)
crawler.signals.connect(ext.spider_idle, signal=signals.spider_idle) # 加载空闲信号
return ext
def cron_judgement(self,spider):
"""
定时调度,比如在每日的8:00整调度,支持在spider中定义crontab的语法给变量
比如每日0点1分启动:cron_job = "1 0 * * *"
:return: True or False True则执行
"""
# crontab的语法规则,该语法只约束启动时间,具体下次调度还得结合 schedule_time
# 如果需要定时调度,则执行if语句
cron_job = hasattr(spider,'cron_job')
if cron_job:
cron_job = spider.cron_job.split(' ')
minute = cron_job[0]
hour = cron_job[1]
day = cron_job[2]
month = cron_job[3]
week = cron_job[4]
now_minute = str(datetime.now().minute)
now_hour = str(datetime.now().hour)
now_day = str(datetime.now().day)
now_month = str(datetime.now().month)
now_week = str(datetime.now().weekday() + 1)
if minute == '*':
minute = now_minute
if hour == '*':
hour = now_hour
if day == '*':
day = now_day
if month == '*':
month = now_month
if week == '*':
week = now_week
if (minute==now_minute)and(hour==now_hour)and(day==now_day)and(month==now_month)and(week==now_week):
# 当cronjob定义的最小小单位为分钟级别时;爬虫可能在5秒跑完,那么1分钟内爬虫将有12次满足条件的情况,爬虫将被重复拉起12次
# 为避免次情况的发生,通过redis key 在短时间内做去重
if not self.conn.get(f"{spider.name}:spider_opened"):
self.conn.setex(f"{spider.name}:spider_opened",5*60,1)
return True
else:
logger.info(f'{cron_job} 已经开始执行,由于爬虫速度过快,导致条件再次满足,故做去重处理')
return False
else:
logger.info(f'等待开始调度的时间 {cron_job}...')
return False
else:
return False
def interval_time(self,spider):
"""
根据间隔时间调度 比如每30分钟调度一次: schedule_time = 12 * 30
:return: True or False 真则执行
"""
# 存在cron调度则不需要此间隔调度
cron_job = hasattr(spider,'cron_job')
schedule_time = hasattr(spider,'schedule_time')
if cron_job:
return False
if not schedule_time:
return False
# 空闲超过指定时间
if self.count > spider.schedule_time:
# 每次开始执行任务后,都初始化计数器,重新统计下一轮的空闲时长
self.count = 0
return True
else:
return False
def spider_opened(self, spider):
"""
必须方法
只执行一次,爬虫启动的时候执行
支持指定时间启动
"""
# 爬虫首次判断是否需要定时调度
# 判断是否需要定时调度
# 如果需要定时调度,则执行if语句
cron_job = hasattr(spider,'cron_job')
logger.info(f'环境: {env}')
logger.info(f'cronjob: {cron_job}')
# 线上环境才执行cron coinsadjust
if env == 'prod':
if cron_job:
while True:
run = self.cron_judgement(spider)
if run:
break
else:
time.sleep(59)
# self.insert_start_url(spider)
logger.info("opened spider %s" % spider.name)
# 该变量为了让程序idle的时候输出上次调度的时间,以此判断上次调度是否正常
self.started_time = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
def insert_start_url(self, spider):
"""
"""
logger.info('从 start_request 类方法 直接生成任务...')
for request in spider.start_requests():
self.crawler.engine.crawl(request, spider)
def spider_closed(self, spider):
"""
必须方法
"""
logger.info("closed spider %s" % spider.name)
self.crawler.engine.close_spider(spider, '爬虫已经采集完毕')
def spider_idle(self, spider):
"""
记录信息,作出关闭选择
框架默认5秒执行一次spider_idle
:param spider:
:return:
"""
logger.info(f'{spider.name} Idle 爬虫上次启动时间为 {self.started_time}')
# 判断redis_key中是否为空,如果为空时,则空闲一次,统计 + 1
# if not self.conn.llen(spider.name + ":requests") and not self.conn.llen(spider.redis_key):
# self.count += 1
# time.sleep(1)
# else:
# 每次开始执行任务后,都初始化计数器,重新统计下一轮的空闲时长
# self.count = 0
self.count += 1
self.spider_run(spider)
raise DontCloseSpider
def spider_run(self,spider):
"""
激活调度爬虫(往队列塞链接)
需要根据条件判断是否激活
"""
# 常规来讲,调度方式二选一
cron = self.cron_judgement(spider)
interval = self.interval_time(spider)
if cron or interval:
self.insert_start_url(spider)
# 启动时间计入redis 方便分布式协调任务
started_time_stamp = datetime.strptime(self.started_time, "%Y-%m-%d %H:%M:%S").timestamp()
self.conn.set(f'{spider.name}:starttime', started_time_stamp)
self.started_time = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
For further actions, you may consider blocking this person and/or reporting abuse
Top comments (0)