1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123
|
import argparse
import sys
from enum import Enum
from typing import Any, Dict, List, Optional, Tuple, Type
from knot_resolver.client.command import Command, CommandArgs, CompWords, comp_get_words, register_command
from knot_resolver.datamodel.cache_schema import CacheClearRPCSchema
from knot_resolver.utils.modeling.exceptions import AggregateDataValidationError, DataValidationError
from knot_resolver.utils.modeling.parsing import DataFormat, parse_json
from knot_resolver.utils.requests import request
class CacheOperations(Enum):
CLEAR = 0
@register_command
class CacheCommand(Command):
def __init__(self, namespace: argparse.Namespace) -> None:
super().__init__(namespace)
self.operation: Optional[CacheOperations] = namespace.operation if hasattr(namespace, "operation") else None
self.output_format: DataFormat = (
namespace.output_format if hasattr(namespace, "output_format") else DataFormat.YAML
)
# CLEAR operation
self.clear_dict: Dict[str, Any] = {}
if hasattr(namespace, "exact_name"):
self.clear_dict["exact-name"] = namespace.exact_name
if hasattr(namespace, "name"):
self.clear_dict["name"] = namespace.name
if hasattr(namespace, "rr_type"):
self.clear_dict["rr-type"] = namespace.rr_type
if hasattr(namespace, "chunk_size"):
self.clear_dict["chunk-size"] = namespace.chunk_size
@staticmethod
def register_args_subparser(
subparser: "argparse._SubParsersAction[argparse.ArgumentParser]",
) -> Tuple[argparse.ArgumentParser, "Type[Command]"]:
cache_parser = subparser.add_parser("cache", help="Performs operations on the cache of the running resolver.")
config_subparsers = cache_parser.add_subparsers(help="operation type")
# 'clear' operation
clear_subparser = config_subparsers.add_parser(
"clear", help="Purge cache records that match specified criteria."
)
clear_subparser.set_defaults(operation=CacheOperations.CLEAR, exact_name=False)
clear_subparser.add_argument(
"--exact-name",
help="If set, only records with the same name are purged.",
action="store_true",
dest="exact_name",
)
clear_subparser.add_argument(
"--rr-type",
help="Optional, the resource record type to purge. It is supported only with the '--exact-name' flag set.",
action="store",
type=str,
)
clear_subparser.add_argument(
"--chunk-size",
help="Optional, the number of records to remove in one round; the default is 100."
" The purpose is not to block the resolver for long."
" The resolver repeats the cache clearing after one millisecond until all matching data is cleared.",
action="store",
type=int,
default=100,
)
clear_subparser.add_argument(
"name",
type=str,
nargs="?",
help="Optional, subtree name to purge; if omitted, the entire cache is purged (and all other parameters are ignored).",
default=None,
)
output_format = clear_subparser.add_mutually_exclusive_group()
output_format_default = DataFormat.YAML
output_format.add_argument(
"--json",
help="Set JSON as the output format.",
const=DataFormat.JSON,
action="store_const",
dest="output_format",
default=output_format_default,
)
output_format.add_argument(
"--yaml",
help="Set YAML as the output format. YAML is the default.",
const=DataFormat.YAML,
action="store_const",
dest="output_format",
default=output_format_default,
)
return cache_parser, CacheCommand
@staticmethod
def completion(args: List[str], parser: argparse.ArgumentParser) -> CompWords:
return comp_get_words(args, parser)
def run(self, args: CommandArgs) -> None:
if not self.operation:
args.subparser.print_help()
sys.exit()
if self.operation == CacheOperations.CLEAR:
try:
validated = CacheClearRPCSchema(self.clear_dict)
except (AggregateDataValidationError, DataValidationError) as e:
print(e, file=sys.stderr)
sys.exit(1)
body: str = DataFormat.JSON.dict_dump(validated.get_unparsed_data())
response = request(args.socket, "POST", "cache/clear", body)
body_dict = parse_json(response.body)
if response.status != 200:
print(response, file=sys.stderr)
sys.exit(1)
print(self.output_format.dict_dump(body_dict, indent=4))
|