File: pids.py

package info (click to toggle)
knot-resolver 6.1.0-1
  • links: PTS, VCS
  • area: main
  • in suites: sid
  • size: 16,392 kB
  • sloc: javascript: 42,732; ansic: 40,312; python: 12,616; cpp: 2,121; sh: 1,997; xml: 193; makefile: 181
file content (63 lines) | stat: -rw-r--r-- 2,141 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
import argparse
import json
import sys
from typing import Iterable, List, Optional, Tuple, Type

from knot_resolver.client.command import Command, CommandArgs, CompWords, register_command
from knot_resolver.utils.requests import request

PROCESSES_TYPE = Iterable


@register_command
class PidsCommand(Command):
    def __init__(self, namespace: argparse.Namespace) -> None:
        self.proc_type: Optional[str] = namespace.proc_type
        self.json: int = namespace.json

        super().__init__(namespace)

    @staticmethod
    def register_args_subparser(
        subparser: "argparse._SubParsersAction[argparse.ArgumentParser]",
    ) -> Tuple[argparse.ArgumentParser, "Type[Command]"]:
        pids = subparser.add_parser("pids", help="List the PIDs of the Manager's subprocesses")
        pids.add_argument(
            "proc_type",
            help="Optional, the type of process to query. May be 'kresd', 'gc', or 'all' (default).",
            nargs="?",
            default="all",
        )
        pids.add_argument(
            "--json",
            help="Optional, makes the output more verbose, in JSON.",
            action="store_true",
            default=False,
        )
        return pids, PidsCommand

    @staticmethod
    def completion(args: List[str], parser: argparse.ArgumentParser) -> CompWords:
        return {}

    def run(self, args: CommandArgs) -> None:
        response = request(args.socket, "GET", f"processes/{self.proc_type}")

        if response.status == 200:
            processes = json.loads(response.body)
            if isinstance(processes, PROCESSES_TYPE):
                if self.json:
                    print(json.dumps(processes, indent=2))
                else:
                    for p in processes:
                        print(p["pid"])

            else:
                print(
                    f"Unexpected response type '{type(processes).__name__}' from manager. Expected '{PROCESSES_TYPE.__name__}'",
                    file=sys.stderr,
                )
                sys.exit(1)
        else:
            print(response, file=sys.stderr)
            sys.exit(1)