1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125
|
"""
Scheduler queues
"""
import marshal
import os
import pickle
from queuelib import queue
from scrapy.utils.reqser import request_to_dict, request_from_dict
def _with_mkdir(queue_class):
class DirectoriesCreated(queue_class):
def __init__(self, path, *args, **kwargs):
dirname = os.path.dirname(path)
if not os.path.exists(dirname):
os.makedirs(dirname, exist_ok=True)
super().__init__(path, *args, **kwargs)
return DirectoriesCreated
def _serializable_queue(queue_class, serialize, deserialize):
class SerializableQueue(queue_class):
def push(self, obj):
s = serialize(obj)
super().push(s)
def pop(self):
s = super().pop()
if s:
return deserialize(s)
return SerializableQueue
def _scrapy_serialization_queue(queue_class):
class ScrapyRequestQueue(queue_class):
def __init__(self, crawler, key):
self.spider = crawler.spider
super().__init__(key)
@classmethod
def from_crawler(cls, crawler, key, *args, **kwargs):
return cls(crawler, key)
def push(self, request):
request = request_to_dict(request, self.spider)
return super().push(request)
def pop(self):
request = super().pop()
if not request:
return None
request = request_from_dict(request, self.spider)
return request
return ScrapyRequestQueue
def _scrapy_non_serialization_queue(queue_class):
class ScrapyRequestQueue(queue_class):
@classmethod
def from_crawler(cls, crawler, *args, **kwargs):
return cls()
return ScrapyRequestQueue
def _pickle_serialize(obj):
try:
return pickle.dumps(obj, protocol=4)
# Both pickle.PicklingError and AttributeError can be raised by pickle.dump(s)
# TypeError is raised from parsel.Selector
except (pickle.PicklingError, AttributeError, TypeError) as e:
raise ValueError(str(e)) from e
PickleFifoDiskQueueNonRequest = _serializable_queue(
_with_mkdir(queue.FifoDiskQueue),
_pickle_serialize,
pickle.loads
)
PickleLifoDiskQueueNonRequest = _serializable_queue(
_with_mkdir(queue.LifoDiskQueue),
_pickle_serialize,
pickle.loads
)
MarshalFifoDiskQueueNonRequest = _serializable_queue(
_with_mkdir(queue.FifoDiskQueue),
marshal.dumps,
marshal.loads
)
MarshalLifoDiskQueueNonRequest = _serializable_queue(
_with_mkdir(queue.LifoDiskQueue),
marshal.dumps,
marshal.loads
)
PickleFifoDiskQueue = _scrapy_serialization_queue(
PickleFifoDiskQueueNonRequest
)
PickleLifoDiskQueue = _scrapy_serialization_queue(
PickleLifoDiskQueueNonRequest
)
MarshalFifoDiskQueue = _scrapy_serialization_queue(
MarshalFifoDiskQueueNonRequest
)
MarshalLifoDiskQueue = _scrapy_serialization_queue(
MarshalLifoDiskQueueNonRequest
)
FifoMemoryQueue = _scrapy_non_serialization_queue(queue.FifoMemoryQueue)
LifoMemoryQueue = _scrapy_non_serialization_queue(queue.LifoMemoryQueue)
|