1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176
|
"""
Scheduler queues
"""
from __future__ import annotations
import marshal
import pickle
from pathlib import Path
from typing import TYPE_CHECKING, Any
from queuelib import queue
from scrapy.utils.request import request_from_dict
if TYPE_CHECKING:
from collections.abc import Callable
from os import PathLike
# typing.Self requires Python 3.11
from typing_extensions import Self
from scrapy import Request
from scrapy.crawler import Crawler
def _with_mkdir(queue_class: type[queue.BaseQueue]) -> type[queue.BaseQueue]:
class DirectoriesCreated(queue_class): # type: ignore[valid-type,misc]
def __init__(self, path: str | PathLike, *args: Any, **kwargs: Any):
dirname = Path(path).parent
if not dirname.exists():
dirname.mkdir(parents=True, exist_ok=True)
super().__init__(path, *args, **kwargs)
return DirectoriesCreated
def _serializable_queue(
queue_class: type[queue.BaseQueue],
serialize: Callable[[Any], bytes],
deserialize: Callable[[bytes], Any],
) -> type[queue.BaseQueue]:
class SerializableQueue(queue_class): # type: ignore[valid-type,misc]
def push(self, obj: Any) -> None:
s = serialize(obj)
super().push(s)
def pop(self) -> Any | None:
s = super().pop()
if s:
return deserialize(s)
return None
def peek(self) -> Any | None:
"""Returns the next object to be returned by :meth:`pop`,
but without removing it from the queue.
Raises :exc:`NotImplementedError` if the underlying queue class does
not implement a ``peek`` method, which is optional for queues.
"""
try:
s = super().peek()
except AttributeError as ex:
raise NotImplementedError(
"The underlying queue class does not implement 'peek'"
) from ex
if s:
return deserialize(s)
return None
return SerializableQueue
def _scrapy_serialization_queue(
queue_class: type[queue.BaseQueue],
) -> type[queue.BaseQueue]:
class ScrapyRequestQueue(queue_class): # type: ignore[valid-type,misc]
def __init__(self, crawler: Crawler, key: str):
self.spider = crawler.spider
super().__init__(key)
@classmethod
def from_crawler(
cls, crawler: Crawler, key: str, *args: Any, **kwargs: Any
) -> Self:
return cls(crawler, key)
def push(self, request: Request) -> None:
request_dict = request.to_dict(spider=self.spider)
super().push(request_dict)
def pop(self) -> Request | None:
request = super().pop()
if not request:
return None
return request_from_dict(request, spider=self.spider)
def peek(self) -> Request | None:
"""Returns the next object to be returned by :meth:`pop`,
but without removing it from the queue.
Raises :exc:`NotImplementedError` if the underlying queue class does
not implement a ``peek`` method, which is optional for queues.
"""
request = super().peek()
if not request:
return None
return request_from_dict(request, spider=self.spider)
return ScrapyRequestQueue
def _scrapy_non_serialization_queue(
queue_class: type[queue.BaseQueue],
) -> type[queue.BaseQueue]:
class ScrapyRequestQueue(queue_class): # type: ignore[valid-type,misc]
@classmethod
def from_crawler(cls, crawler: Crawler, *args: Any, **kwargs: Any) -> Self:
return cls()
def peek(self) -> Any | None:
"""Returns the next object to be returned by :meth:`pop`,
but without removing it from the queue.
Raises :exc:`NotImplementedError` if the underlying queue class does
not implement a ``peek`` method, which is optional for queues.
"""
try:
s = super().peek()
except AttributeError as ex:
raise NotImplementedError(
"The underlying queue class does not implement 'peek'"
) from ex
return s
return ScrapyRequestQueue
def _pickle_serialize(obj: Any) -> bytes:
try:
return pickle.dumps(obj, protocol=4)
# Both pickle.PicklingError and AttributeError can be raised by pickle.dump(s)
# TypeError is raised from parsel.Selector
except (pickle.PicklingError, AttributeError, TypeError) as e:
raise ValueError(str(e)) from e
# queue.*Queue aren't subclasses of queue.BaseQueue
_PickleFifoSerializationDiskQueue = _serializable_queue(
_with_mkdir(queue.FifoDiskQueue), # type: ignore[arg-type]
_pickle_serialize,
pickle.loads,
)
_PickleLifoSerializationDiskQueue = _serializable_queue(
_with_mkdir(queue.LifoDiskQueue), # type: ignore[arg-type]
_pickle_serialize,
pickle.loads,
)
_MarshalFifoSerializationDiskQueue = _serializable_queue(
_with_mkdir(queue.FifoDiskQueue), # type: ignore[arg-type]
marshal.dumps,
marshal.loads,
)
_MarshalLifoSerializationDiskQueue = _serializable_queue(
_with_mkdir(queue.LifoDiskQueue), # type: ignore[arg-type]
marshal.dumps,
marshal.loads,
)
# public queue classes
PickleFifoDiskQueue = _scrapy_serialization_queue(_PickleFifoSerializationDiskQueue)
PickleLifoDiskQueue = _scrapy_serialization_queue(_PickleLifoSerializationDiskQueue)
MarshalFifoDiskQueue = _scrapy_serialization_queue(_MarshalFifoSerializationDiskQueue)
MarshalLifoDiskQueue = _scrapy_serialization_queue(_MarshalLifoSerializationDiskQueue)
FifoMemoryQueue = _scrapy_non_serialization_queue(queue.FifoMemoryQueue) # type: ignore[arg-type]
LifoMemoryQueue = _scrapy_non_serialization_queue(queue.LifoMemoryQueue) # type: ignore[arg-type]
|