File: utils.py

package info (click to toggle)
python-bimmer-connected 0.16.3-1.2
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid, trixie
  • size: 8,304 kB
  • sloc: python: 4,469; makefile: 15
file content (97 lines) | stat: -rw-r--r-- 3,434 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
"""General utils and base classes used in the library."""

import datetime
import inspect
import json
import logging
import pathlib
from enum import Enum
from typing import TYPE_CHECKING, Dict, List, Optional, Union

from bimmer_connected.models import AnonymizedResponse

if TYPE_CHECKING:
    from typing import TypeVar

    from typing_extensions import ParamSpec

    _T = TypeVar("_T")
    _R = TypeVar("_R")
    _P = ParamSpec("_P")
_LOGGER = logging.getLogger(__name__)


JSON_IGNORED_KEYS = ["account", "_account", "vehicle", "_vehicle", "status", "remote_services"]


def get_class_property_names(obj: object):
    """Return the names of all properties of a class."""
    return [p[0] for p in inspect.getmembers(type(obj), inspect.isdatadescriptor) if not p[0].startswith("_")]


def parse_datetime(date_str: str) -> Optional[datetime.datetime]:
    """Convert a time string into datetime."""
    if not date_str:
        return None
    date_formats = ["%Y-%m-%dT%H:%M:%S.%f%z", "%Y-%m-%dT%H:%M:%S%z", "%Y-%m-%dT%H:%M:%S.%fZ", "%Y-%m-%dT%H:%M:%SZ"]
    for date_format in date_formats:
        try:
            parsed = datetime.datetime.strptime(date_str, date_format)
            parsed = parsed.replace(microsecond=0)
            return parsed
        except ValueError:
            pass
    _LOGGER.error("unable to parse '%s' using %s", date_str, date_formats)
    return None


def get_next_occurrence(now: datetime.datetime, time: datetime.time) -> datetime.datetime:
    """Get the next occurrence of a given time."""

    # If current time is past the given time, add one day to the current date
    # Otherwise use the current date
    next_date = now.date() + datetime.timedelta(days=1) if now.time() > time else now.date()
    next_occurrence = datetime.datetime.combine(next_date, time)
    return next_occurrence


class MyBMWJSONEncoder(json.JSONEncoder):
    """JSON Encoder that handles data classes, properties and additional data types."""

    def default(self, o) -> Union[str, dict]:  # noqa: D102
        if isinstance(o, (datetime.datetime, datetime.date, datetime.time)):
            return o.isoformat()
        if not isinstance(o, Enum) and hasattr(o, "__dict__") and isinstance(o.__dict__, Dict):
            retval: Dict = o.__dict__
            retval.update({p: getattr(o, p) for p in get_class_property_names(o)})
            return {k: v for k, v in retval.items() if k not in JSON_IGNORED_KEYS}
        return str(o)


def to_camel_case(input_str: str) -> str:
    """Convert SNAKE_CASE or snake_case to camelCase."""

    retval = ""
    flag_upper = False
    for curr in input_str.lower():
        if not curr.isalnum():
            if curr == "_":
                flag_upper = True
            continue
        retval = retval + (curr.upper() if flag_upper else curr)
        flag_upper = False
    return retval


def log_response_store_to_file(response_store: List[AnonymizedResponse], logfile_path: pathlib.Path) -> None:
    """Log all responses to files."""

    for response in response_store:
        output_path = logfile_path / response.filename
        content = response.content

        with open(output_path, "w", encoding="UTF-8") as logfile:
            if output_path.suffix == ".json" or not isinstance(content, str):
                json.dump(content or [], logfile, indent=4, sort_keys=True)
            else:
                logfile.write(content or "NO CONTENT")