1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100
|
import base64
import re
from typing import Union
from uuid import uuid4
TMP_THREADS = []
def is_list_or_tuple(obj) -> bool:
return isinstance(obj, (list, tuple))
def select_attributes(obj: dict, attributes: list[str]) -> dict:
"""Select a subset of attributes from the given dict (returns a copy)"""
attributes = attributes if is_list_or_tuple(attributes) else [attributes]
return {k: v for k, v in obj.items() if k in attributes}
class SubtypesInstanceManager:
"""Simple instance manager base class that scans the subclasses of a base type for concrete named
implementations, and lazily creates and returns (singleton) instances on demand."""
_instances: dict[str, "SubtypesInstanceManager"]
@classmethod
def get(cls, subtype_name: str, raise_if_missing: bool = False):
instances = cls.instances()
base_type = cls.get_base_type()
instance = instances.get(subtype_name)
if instance is None:
# lazily load subtype instance (required if new plugins are dynamically loaded at runtime)
for clazz in get_all_subclasses(base_type):
impl_name = clazz.impl_name()
if impl_name not in instances:
instances[impl_name] = clazz()
instance = instances.get(subtype_name)
return instance
@classmethod
def instances(cls) -> dict[str, "SubtypesInstanceManager"]:
base_type = cls.get_base_type()
if not hasattr(base_type, "_instances"):
base_type._instances = {}
return base_type._instances
@staticmethod
def impl_name() -> str:
"""Name of this concrete subtype - to be implemented by subclasses."""
raise NotImplementedError
@classmethod
def get_base_type(cls) -> type:
"""Get the base class for which instances are being managed - can be customized by subtypes."""
return cls
def to_str(obj, encoding: str = "utf-8", errors="strict") -> str:
"""If ``obj`` is an instance of ``binary_type``, return
``obj.decode(encoding, errors)``, otherwise return ``obj``"""
return obj.decode(encoding, errors) if isinstance(obj, bytes) else obj
def to_bytes(obj: Union[str, bytes], encoding: str = "utf-8", errors="strict") -> bytes:
"""If ``obj`` is an instance of ``text_type``, return
``obj.encode(encoding, errors)``, otherwise return ``obj``"""
return obj.encode(encoding, errors) if isinstance(obj, str) else obj
_re_camel_to_snake_case = re.compile("((?<=[a-z0-9])[A-Z]|(?!^)[A-Z](?=[a-z]))")
def camel_to_snake_case(string: str) -> str:
return _re_camel_to_snake_case.sub(r"_\1", string).replace("__", "_").lower()
def snake_to_camel_case(string: str, capitalize_first: bool = True) -> str:
components = string.split("_")
start_idx = 0 if capitalize_first else 1
components = [x.title() for x in components[start_idx:]]
return "".join(components)
def get_all_subclasses(clazz: type) -> set[type]:
"""Recursively get all subclasses of the given class."""
result = set()
subs = clazz.__subclasses__()
for sub in subs:
result.add(sub)
result.update(get_all_subclasses(sub))
return result
def long_uid() -> str:
return str(uuid4())
def token_generator(item: str) -> str:
base64_bytes = base64.b64encode(item.encode("utf-8"))
token = base64_bytes.decode("utf-8")
return token
|