1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159
|
"""
MemoryUsage extension
See documentation in docs/topics/extensions.rst
"""
from __future__ import annotations
import logging
import socket
import sys
from importlib import import_module
from pprint import pformat
from typing import TYPE_CHECKING
from twisted.internet import task
from scrapy import signals
from scrapy.exceptions import NotConfigured
from scrapy.mail import MailSender
from scrapy.utils.engine import get_engine_status
if TYPE_CHECKING:
# typing.Self requires Python 3.11
from typing_extensions import Self
from scrapy.crawler import Crawler
logger = logging.getLogger(__name__)
class MemoryUsage:
def __init__(self, crawler: Crawler):
if not crawler.settings.getbool("MEMUSAGE_ENABLED"):
raise NotConfigured
try:
# stdlib's resource module is only available on unix platforms.
self.resource = import_module("resource")
except ImportError:
raise NotConfigured
self.crawler: Crawler = crawler
self.warned: bool = False
self.notify_mails: list[str] = crawler.settings.getlist("MEMUSAGE_NOTIFY_MAIL")
self.limit: int = crawler.settings.getint("MEMUSAGE_LIMIT_MB") * 1024 * 1024
self.warning: int = crawler.settings.getint("MEMUSAGE_WARNING_MB") * 1024 * 1024
self.check_interval: float = crawler.settings.getfloat(
"MEMUSAGE_CHECK_INTERVAL_SECONDS"
)
self.mail: MailSender = MailSender.from_crawler(crawler)
crawler.signals.connect(self.engine_started, signal=signals.engine_started)
crawler.signals.connect(self.engine_stopped, signal=signals.engine_stopped)
@classmethod
def from_crawler(cls, crawler: Crawler) -> Self:
return cls(crawler)
def get_virtual_size(self) -> int:
size: int = self.resource.getrusage(self.resource.RUSAGE_SELF).ru_maxrss
if sys.platform != "darwin":
# on macOS ru_maxrss is in bytes, on Linux it is in KB
size *= 1024
return size
def engine_started(self) -> None:
assert self.crawler.stats
self.crawler.stats.set_value("memusage/startup", self.get_virtual_size())
self.tasks: list[task.LoopingCall] = []
tsk = task.LoopingCall(self.update)
self.tasks.append(tsk)
tsk.start(self.check_interval, now=True)
if self.limit:
tsk = task.LoopingCall(self._check_limit)
self.tasks.append(tsk)
tsk.start(self.check_interval, now=True)
if self.warning:
tsk = task.LoopingCall(self._check_warning)
self.tasks.append(tsk)
tsk.start(self.check_interval, now=True)
def engine_stopped(self) -> None:
for tsk in self.tasks:
if tsk.running:
tsk.stop()
def update(self) -> None:
assert self.crawler.stats
self.crawler.stats.max_value("memusage/max", self.get_virtual_size())
def _check_limit(self) -> None:
assert self.crawler.engine
assert self.crawler.stats
peak_mem_usage = self.get_virtual_size()
if peak_mem_usage > self.limit:
self.crawler.stats.set_value("memusage/limit_reached", 1)
mem = self.limit / 1024 / 1024
logger.error(
"Memory usage exceeded %(memusage)dMiB. Shutting down Scrapy...",
{"memusage": mem},
extra={"crawler": self.crawler},
)
if self.notify_mails:
subj = (
f"{self.crawler.settings['BOT_NAME']} terminated: "
f"memory usage exceeded {mem}MiB at {socket.gethostname()}"
)
self._send_report(self.notify_mails, subj)
self.crawler.stats.set_value("memusage/limit_notified", 1)
if self.crawler.engine.spider is not None:
self.crawler.engine.close_spider(
self.crawler.engine.spider, "memusage_exceeded"
)
else:
self.crawler.stop()
else:
logger.info(
"Peak memory usage is %(virtualsize)dMiB",
{"virtualsize": peak_mem_usage / 1024 / 1024},
)
def _check_warning(self) -> None:
if self.warned: # warn only once
return
assert self.crawler.stats
if self.get_virtual_size() > self.warning:
self.crawler.stats.set_value("memusage/warning_reached", 1)
mem = self.warning / 1024 / 1024
logger.warning(
"Memory usage reached %(memusage)dMiB",
{"memusage": mem},
extra={"crawler": self.crawler},
)
if self.notify_mails:
subj = (
f"{self.crawler.settings['BOT_NAME']} warning: "
f"memory usage reached {mem}MiB at {socket.gethostname()}"
)
self._send_report(self.notify_mails, subj)
self.crawler.stats.set_value("memusage/warning_notified", 1)
self.warned = True
def _send_report(self, rcpts: list[str], subject: str) -> None:
"""send notification mail with some additional useful info"""
assert self.crawler.engine
assert self.crawler.stats
stats = self.crawler.stats
s = f"Memory usage at engine startup : {stats.get_value('memusage/startup') / 1024 / 1024}M\r\n"
s += f"Maximum memory usage : {stats.get_value('memusage/max') / 1024 / 1024}M\r\n"
s += f"Current memory usage : {self.get_virtual_size() / 1024 / 1024}M\r\n"
s += (
"ENGINE STATUS ------------------------------------------------------- \r\n"
)
s += "\r\n"
s += pformat(get_engine_status(self.crawler.engine))
s += "\r\n"
self.mail.send(rcpts, subject, s)
|