1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41
|
from __future__ import annotations
from collections.abc import Collection
from typing import Any, Literal
from distributed.cluster_dump import (
DEFAULT_CLUSTER_DUMP_EXCLUDE,
DEFAULT_CLUSTER_DUMP_FORMAT,
)
from distributed.diagnostics.plugin import SchedulerPlugin
from distributed.scheduler import Scheduler
class ClusterDump(SchedulerPlugin):
"""Dumps cluster state prior to Scheduler shutdown
The Scheduler may shutdown in cases where it is in an error state,
or when it has been unexpectedly idle for long periods of time.
This plugin dumps the cluster state prior to Scheduler shutdown
for debugging purposes.
"""
def __init__(
self,
url: str,
exclude: Collection[str] = DEFAULT_CLUSTER_DUMP_EXCLUDE,
format_: Literal["msgpack", "yaml"] = DEFAULT_CLUSTER_DUMP_FORMAT,
**storage_options: dict[str, Any],
):
self.url = url
self.exclude = exclude
self.format = format_
self.storage_options = storage_options
async def start(self, scheduler: Scheduler) -> None:
self.scheduler = scheduler
async def before_close(self) -> None:
await self.scheduler.dump_cluster_state_to_url(
self.url, self.exclude, self.format, **self.storage_options
)
|