1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74
|
from __future__ import annotations
from distributed.http.utils import RequestHandler
from distributed.utils import log_errors
class CountsJSON(RequestHandler):
def get(self):
scheduler = self.server
erred = 0
nbytes = 0
nthreads = 0
memory = 0
processing = 0
released = 0
waiting = 0
waiting_data = 0
desired_workers = scheduler.adaptive_target()
for ts in scheduler.tasks.values():
if ts.exception_blame is not None:
erred += 1
elif ts.state == "released":
released += 1
if ts.waiting_on:
waiting += 1
if ts.waiters:
waiting_data += 1
for ws in scheduler.workers.values():
nthreads += ws.nthreads
memory += len(ws.has_what)
nbytes += ws.nbytes
processing += len(ws.processing)
response = {
"bytes": nbytes,
"clients": len(scheduler.clients),
"cores": nthreads,
"erred": erred,
"hosts": len(scheduler.host_info),
"idle": len(scheduler.idle),
"memory": memory,
"processing": processing,
"released": released,
"saturated": len(scheduler.saturated),
"tasks": len(scheduler.tasks),
"unrunnable": len(scheduler.unrunnable),
"waiting": waiting,
"waiting_data": waiting_data,
"workers": len(scheduler.workers),
"desired_workers": desired_workers,
}
self.write(response)
class IdentityJSON(RequestHandler):
def get(self):
self.write(self.server.identity())
class IndexJSON(RequestHandler):
@log_errors
def get(self):
r = [url[5:] for url, _, _ in routes if url.endswith(".json")]
self.render(
"json-index.html", routes=r, title="Index of JSON routes", **self.extra
)
routes: list[tuple] = [
(r"json/counts.json", CountsJSON, {}),
(r"json/identity.json", IdentityJSON, {}),
(r"json/index.html", IndexJSON, {}),
]
|