1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159
|
from __future__ import annotations
import logging
from datetime import datetime, timezone
from typing import TYPE_CHECKING, Any
from twisted.internet import task
from scrapy import Spider, signals
from scrapy.exceptions import NotConfigured
from scrapy.utils.serialize import ScrapyJSONEncoder
if TYPE_CHECKING:
# typing.Self requires Python 3.11
from json import JSONEncoder
from typing_extensions import Self
from scrapy.crawler import Crawler
from scrapy.statscollectors import StatsCollector
logger = logging.getLogger(__name__)
class PeriodicLog:
"""Log basic scraping stats periodically"""
def __init__(
self,
stats: StatsCollector,
interval: float = 60.0,
ext_stats: dict[str, Any] = {},
ext_delta: dict[str, Any] = {},
ext_timing_enabled: bool = False,
):
self.stats: StatsCollector = stats
self.interval: float = interval
self.multiplier: float = 60.0 / self.interval
self.task: task.LoopingCall | None = None
self.encoder: JSONEncoder = ScrapyJSONEncoder(sort_keys=True, indent=4)
self.ext_stats_enabled: bool = bool(ext_stats)
self.ext_stats_include: list[str] = ext_stats.get("include", [])
self.ext_stats_exclude: list[str] = ext_stats.get("exclude", [])
self.ext_delta_enabled: bool = bool(ext_delta)
self.ext_delta_include: list[str] = ext_delta.get("include", [])
self.ext_delta_exclude: list[str] = ext_delta.get("exclude", [])
self.ext_timing_enabled: bool = ext_timing_enabled
@classmethod
def from_crawler(cls, crawler: Crawler) -> Self:
interval: float = crawler.settings.getfloat("LOGSTATS_INTERVAL")
if not interval:
raise NotConfigured
try:
ext_stats: dict[str, Any] | None = crawler.settings.getdict(
"PERIODIC_LOG_STATS"
)
except (TypeError, ValueError):
ext_stats = (
{"enabled": True}
if crawler.settings.getbool("PERIODIC_LOG_STATS")
else None
)
try:
ext_delta: dict[str, Any] | None = crawler.settings.getdict(
"PERIODIC_LOG_DELTA"
)
except (TypeError, ValueError):
ext_delta = (
{"enabled": True}
if crawler.settings.getbool("PERIODIC_LOG_DELTA")
else None
)
ext_timing_enabled: bool = crawler.settings.getbool(
"PERIODIC_LOG_TIMING_ENABLED", False
)
if not (ext_stats or ext_delta or ext_timing_enabled):
raise NotConfigured
assert crawler.stats
assert ext_stats is not None
assert ext_delta is not None
o = cls(
crawler.stats,
interval,
ext_stats,
ext_delta,
ext_timing_enabled,
)
crawler.signals.connect(o.spider_opened, signal=signals.spider_opened)
crawler.signals.connect(o.spider_closed, signal=signals.spider_closed)
return o
def spider_opened(self, spider: Spider) -> None:
self.time_prev: datetime = datetime.now(tz=timezone.utc)
self.delta_prev: dict[str, int | float] = {}
self.stats_prev: dict[str, int | float] = {}
self.task = task.LoopingCall(self.log)
self.task.start(self.interval)
def log(self) -> None:
data: dict[str, Any] = {}
if self.ext_timing_enabled:
data.update(self.log_timing())
if self.ext_delta_enabled:
data.update(self.log_delta())
if self.ext_stats_enabled:
data.update(self.log_crawler_stats())
logger.info(self.encoder.encode(data))
def log_delta(self) -> dict[str, Any]:
num_stats: dict[str, int | float] = {
k: v
for k, v in self.stats._stats.items()
if isinstance(v, (int, float))
and self.param_allowed(k, self.ext_delta_include, self.ext_delta_exclude)
}
delta = {k: v - self.delta_prev.get(k, 0) for k, v in num_stats.items()}
self.delta_prev = num_stats
return {"delta": delta}
def log_timing(self) -> dict[str, Any]:
now = datetime.now(tz=timezone.utc)
time = {
"log_interval": self.interval,
"start_time": self.stats._stats["start_time"],
"utcnow": now,
"log_interval_real": (now - self.time_prev).total_seconds(),
"elapsed": (now - self.stats._stats["start_time"]).total_seconds(),
}
self.time_prev = now
return {"time": time}
def log_crawler_stats(self) -> dict[str, Any]:
stats = {
k: v
for k, v in self.stats._stats.items()
if self.param_allowed(k, self.ext_stats_include, self.ext_stats_exclude)
}
return {"stats": stats}
def param_allowed(
self, stat_name: str, include: list[str], exclude: list[str]
) -> bool:
if not include and not exclude:
return True
for p in exclude:
if p in stat_name:
return False
if exclude and not include:
return True
return any(p in stat_name for p in include)
def spider_closed(self, spider: Spider, reason: str) -> None:
self.log()
if self.task and self.task.running:
self.task.stop()
|