You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
171 lines
5.6 KiB
171 lines
5.6 KiB
import importlib
|
|
import six
|
|
|
|
from scrapy.utils.misc import load_object
|
|
|
|
from . import connection, defaults
|
|
|
|
|
|
# TODO: add SCRAPY_JOB support.
|
|
class Scheduler(object):
|
|
"""Redis-based scheduler
|
|
|
|
Settings
|
|
--------
|
|
SCHEDULER_PERSIST : bool (default: False)
|
|
Whether to persist or clear redis queue.
|
|
SCHEDULER_FLUSH_ON_START : bool (default: False)
|
|
Whether to flush redis queue on start.
|
|
SCHEDULER_IDLE_BEFORE_CLOSE : int (default: 0)
|
|
How many seconds to wait before closing if no message is received.
|
|
SCHEDULER_QUEUE_KEY : str
|
|
Scheduler redis key.
|
|
SCHEDULER_QUEUE_CLASS : str
|
|
Scheduler queue class.
|
|
SCHEDULER_DUPEFILTER_KEY : str
|
|
Scheduler dupefilter redis key.
|
|
SCHEDULER_DUPEFILTER_CLASS : str
|
|
Scheduler dupefilter class.
|
|
SCHEDULER_SERIALIZER : str
|
|
Scheduler serializer.
|
|
|
|
"""
|
|
|
|
def __init__(self, server,
|
|
persist=False,
|
|
flush_on_start=False,
|
|
queue_key=defaults.SCHEDULER_QUEUE_KEY,
|
|
queue_cls=defaults.SCHEDULER_QUEUE_CLASS,
|
|
dupefilter_key=defaults.SCHEDULER_DUPEFILTER_KEY,
|
|
dupefilter_cls=defaults.SCHEDULER_DUPEFILTER_CLASS,
|
|
idle_before_close=0,
|
|
serializer=None):
|
|
"""Initialize scheduler.
|
|
|
|
Parameters
|
|
----------
|
|
server : Redis
|
|
The redis server instance.
|
|
persist : bool
|
|
Whether to flush requests when closing. Default is False.
|
|
flush_on_start : bool
|
|
Whether to flush requests on start. Default is False.
|
|
queue_key : str
|
|
Requests queue key.
|
|
queue_cls : str
|
|
Importable path to the queue class.
|
|
dupefilter_key : str
|
|
Duplicates filter key.
|
|
dupefilter_cls : str
|
|
Importable path to the dupefilter class.
|
|
idle_before_close : int
|
|
Timeout before giving up.
|
|
|
|
"""
|
|
if idle_before_close < 0:
|
|
raise TypeError("idle_before_close cannot be negative")
|
|
|
|
self.server = server
|
|
self.persist = persist
|
|
self.flush_on_start = flush_on_start
|
|
self.queue_key = queue_key
|
|
self.queue_cls = queue_cls
|
|
self.dupefilter_cls = dupefilter_cls
|
|
self.dupefilter_key = dupefilter_key
|
|
self.idle_before_close = idle_before_close
|
|
self.serializer = serializer
|
|
self.stats = None
|
|
|
|
def __len__(self):
|
|
return len(self.queue)
|
|
|
|
@classmethod
|
|
def from_settings(cls, settings):
|
|
kwargs = {
|
|
'persist': settings.getbool('SCHEDULER_PERSIST'),
|
|
'flush_on_start': settings.getbool('SCHEDULER_FLUSH_ON_START'),
|
|
'idle_before_close': settings.getint('SCHEDULER_IDLE_BEFORE_CLOSE'),
|
|
}
|
|
|
|
# If these values are missing, it means we want to use the defaults.
|
|
optional = {
|
|
# TODO: Use custom prefixes for this settings to note that are
|
|
# specific to scrapy-redis.
|
|
'queue_key': 'SCHEDULER_QUEUE_KEY',
|
|
'queue_cls': 'SCHEDULER_QUEUE_CLASS',
|
|
'dupefilter_key': 'SCHEDULER_DUPEFILTER_KEY',
|
|
# We use the default setting name to keep compatibility.
|
|
'dupefilter_cls': 'DUPEFILTER_CLASS',
|
|
'serializer': 'SCHEDULER_SERIALIZER',
|
|
}
|
|
for name, setting_name in optional.items():
|
|
val = settings.get(setting_name)
|
|
if val:
|
|
kwargs[name] = val
|
|
|
|
# Support serializer as a path to a module.
|
|
if isinstance(kwargs.get('serializer'), six.string_types):
|
|
kwargs['serializer'] = importlib.import_module(kwargs['serializer'])
|
|
|
|
server = connection.from_settings(settings)
|
|
# Ensure the connection is working.
|
|
server.ping()
|
|
|
|
return cls(server=server, **kwargs)
|
|
|
|
@classmethod
|
|
def from_crawler(cls, crawler):
|
|
instance = cls.from_settings(crawler.settings)
|
|
# FIXME: for now, stats are only supported from this constructor
|
|
instance.stats = crawler.stats
|
|
return instance
|
|
|
|
def open(self, spider):
|
|
self.spider = spider
|
|
|
|
try:
|
|
self.queue = load_object(self.queue_cls)(
|
|
server=self.server,
|
|
spider=spider,
|
|
key=self.queue_key % {'spider': spider.name},
|
|
serializer=self.serializer,
|
|
)
|
|
except TypeError as e:
|
|
raise ValueError("Failed to instantiate queue class '%s': %s",
|
|
self.queue_cls, e)
|
|
|
|
self.df = load_object(self.dupefilter_cls).from_spider(spider)
|
|
|
|
if self.flush_on_start:
|
|
self.flush()
|
|
# notice if there are requests already in the queue to resume the crawl
|
|
if len(self.queue):
|
|
spider.log("Resuming crawl (%d requests scheduled)" % len(self.queue))
|
|
|
|
def close(self, reason):
|
|
if not self.persist:
|
|
self.flush()
|
|
|
|
def flush(self):
|
|
self.df.clear()
|
|
self.queue.clear()
|
|
|
|
def enqueue_request(self, request):
|
|
if not request.dont_filter and self.df.request_seen(request):
|
|
self.df.log(request, self.spider)
|
|
return False
|
|
if self.stats:
|
|
self.stats.inc_value('scheduler/enqueued/redis', spider=self.spider)
|
|
self.queue.push(request)
|
|
return True
|
|
|
|
def next_request(self):
|
|
block_pop_timeout = self.idle_before_close
|
|
request = self.queue.pop(block_pop_timeout)
|
|
if request and self.stats:
|
|
self.stats.inc_value('scheduler/dequeued/redis', spider=self.spider)
|
|
return request
|
|
|
|
def has_pending_requests(self):
|
|
return len(self) > 0
|