From 41208124c769d4c090b69b99bb757c303199b256 Mon Sep 17 00:00:00 2001 From: pnhekgfuf <1913997697@qq.com> Date: Sat, 29 Apr 2023 12:18:20 +0800 Subject: [PATCH] ADD file via upload --- Scrapy-Redis-Zhihu/scrapy_redis/queue.py | 147 +++++++++++++++++++++++ 1 file changed, 147 insertions(+) create mode 100644 Scrapy-Redis-Zhihu/scrapy_redis/queue.py diff --git a/Scrapy-Redis-Zhihu/scrapy_redis/queue.py b/Scrapy-Redis-Zhihu/scrapy_redis/queue.py new file mode 100644 index 0000000..476cefd --- /dev/null +++ b/Scrapy-Redis-Zhihu/scrapy_redis/queue.py @@ -0,0 +1,147 @@ +from scrapy.utils.reqser import request_to_dict, request_from_dict + +from . import picklecompat + + +class Base(object): + """Per-spider base queue class""" + + def __init__(self, server, spider, key, serializer=None): + """Initialize per-spider redis queue. + + Parameters + ---------- + server : StrictRedis + Redis client instance. + spider : Spider + Scrapy spider instance. + key: str + Redis key where to put and get messages. + serializer : object + Serializer object with ``loads`` and ``dumps`` methods. + + """ + if serializer is None: + # Backward compatibility. + # TODO: deprecate pickle. + serializer = picklecompat + if not hasattr(serializer, 'loads'): + raise TypeError("serializer does not implement 'loads' function: %r" + % serializer) + if not hasattr(serializer, 'dumps'): + raise TypeError("serializer '%s' does not implement 'dumps' function: %r" + % serializer) + + self.server = server + self.spider = spider + self.key = key % {'spider': spider.name} + self.serializer = serializer + + def _encode_request(self, request): + """Encode a request object""" + obj = request_to_dict(request, self.spider) + return self.serializer.dumps(obj) + + def _decode_request(self, encoded_request): + """Decode an request previously encoded""" + obj = self.serializer.loads(encoded_request) + return request_from_dict(obj, self.spider) + + def __len__(self): + """Return the length of the queue""" + raise NotImplementedError + + def push(self, request): + """Push a request""" + raise NotImplementedError + + def pop(self, timeout=0): + """Pop a request""" + raise NotImplementedError + + def clear(self): + """Clear queue/stack""" + self.server.delete(self.key) + + +class FifoQueue(Base): + """Per-spider FIFO queue""" + + def __len__(self): + """Return the length of the queue""" + return self.server.llen(self.key) + + def push(self, request): + """Push a request""" + self.server.lpush(self.key, self._encode_request(request)) + + def pop(self, timeout=0): + """Pop a request""" + if timeout > 0: + data = self.server.brpop(self.key, timeout) + if isinstance(data, tuple): + data = data[1] + else: + data = self.server.rpop(self.key) + if data: + return self._decode_request(data) + + +class PriorityQueue(Base): + """Per-spider priority queue abstraction using redis' sorted set""" + + def __len__(self): + """Return the length of the queue""" + return self.server.zcard(self.key) + + def push(self, request): + """Push a request""" + data = self._encode_request(request) + score = -request.priority + # We don't use zadd method as the order of arguments change depending on + # whether the class is Redis or StrictRedis, and the option of using + # kwargs only accepts strings, not bytes. + self.server.execute_command('ZADD', self.key, score, data) + + def pop(self, timeout=0): + """ + Pop a request + timeout not support in this queue class + """ + # use atomic range/remove using multi/exec + pipe = self.server.pipeline() + pipe.multi() + pipe.zrange(self.key, 0, 0).zremrangebyrank(self.key, 0, 0) + results, count = pipe.execute() + if results: + return self._decode_request(results[0]) + + +class LifoQueue(Base): + """Per-spider LIFO queue.""" + + def __len__(self): + """Return the length of the stack""" + return self.server.llen(self.key) + + def push(self, request): + """Push a request""" + self.server.lpush(self.key, self._encode_request(request)) + + def pop(self, timeout=0): + """Pop a request""" + if timeout > 0: + data = self.server.blpop(self.key, timeout) + if isinstance(data, tuple): + data = data[1] + else: + data = self.server.lpop(self.key) + + if data: + return self._decode_request(data) + + +# TODO: Deprecate the use of these names. +SpiderQueue = FifoQueue +SpiderStack = LifoQueue +SpiderPriorityQueue = PriorityQueue