File: util.py

package info (click to toggle)
python-ring-doorbell 0.9.13-4
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid
  • size: 972 kB
  • sloc: python: 4,764; makefile: 14
file content (157 lines) | stat: -rw-r--r-- 5,186 bytes parent folder | download | duplicates (2)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
"""Module for common utility functions."""

from __future__ import annotations

import asyncio
import datetime
import logging
from contextlib import suppress
from functools import update_wrapper
from threading import Lock
from typing import TYPE_CHECKING, Any, Callable
from warnings import warn

from typing_extensions import ParamSpec, TypeVar

from ring_doorbell.exceptions import RingError

if TYPE_CHECKING:
    from collections.abc import Coroutine

    from .auth import Auth
    from .generic import RingGeneric
    from .group import RingLightGroup
    from .listen.eventlistener import RingEventListener
    from .ring import Ring

    _T = TypeVar(
        "_T", bound=Auth | Ring | RingGeneric | RingLightGroup | RingEventListener
    )
    _R = TypeVar("_R")
    _P = ParamSpec("_P")

_logger = logging.getLogger(__name__)


def parse_datetime(datetime_str: str) -> datetime.datetime:
    """Parse a datetime string into a datetime object.

    Ring api has inconsistent datetime string patterns.
    """
    # Check if the datetime string contains a period which precedes 'Z',
    # indicating microseconds
    if "." in datetime_str and datetime_str.endswith("Z"):
        # String contains microseconds and ends with 'Z'
        format_str = "%Y-%m-%dT%H:%M:%S.%fZ"
    else:
        # String does not contain microseconds, should end with 'Z'
        # Could be updated to handle other formats
        format_str = "%Y-%m-%dT%H:%M:%SZ"
    try:
        res = datetime.datetime.strptime(datetime_str, format_str).replace(
            tzinfo=datetime.timezone.utc
        )
    except ValueError:
        _logger.exception(
            "Unable to parse datetime string %s, defaulting to now time", datetime_str
        )
        res = datetime.datetime.now(datetime.timezone.utc)
    return res


class _DeprecatedSyncApiHandler:
    def __init__(self, auth: Auth) -> None:
        self.auth = auth
        self._sync_lock = Lock()

    async def run_and_close_session(
        self,
        async_method: Callable[_P, Coroutine[Any, Any, _R]],
        *args: _P.args,
        **kwargs: _P.kwargs,
    ) -> _R:
        try:
            self._sync_lock.acquire()
            res = await async_method(*args, **kwargs)
        finally:
            with suppress(Exception):
                await self.auth.async_close()
            self._sync_lock.release()

        return res

    @staticmethod
    def check_no_loop(classname: str, method_name: str) -> None:
        current_loop = None
        with suppress(RuntimeError):
            current_loop = asyncio.get_running_loop()
        if current_loop:
            msg = (
                f"You cannot call deprecated sync function {classname}.{method_name} "
                "from within a running event loop."
            )
            raise RingError(msg)

    def get_api_query(
        self,
        class_instance: _T,
        method_name: str,
    ) -> Any:
        """Return deprecated sync api query attribute."""
        classname = type(class_instance).__name__

        def _deprecated_sync_function(
            async_func: Callable[_P, Coroutine[Any, Any, _R]],
        ) -> Callable[_P, _R]:
            def wrapper(*args: _P.args, **kwargs: _P.kwargs) -> _R:
                self.check_no_loop(classname, method_name)
                msg = (
                    f"{classname}.{method_name} is deprecated, use "
                    f"{classname}.{async_method_name}"
                )
                warn(msg, DeprecationWarning, stacklevel=1)
                return asyncio.run(
                    self.run_and_close_session(async_func, *args, **kwargs)
                )

            return update_wrapper(wrapper, async_func)

        async_method_name = f"async_{method_name}"
        async_method = getattr(class_instance, async_method_name)
        return _deprecated_sync_function(async_method)

    def get_api_property(
        self,
        class_instance: _T,
        method_name: str,
    ) -> Any:
        """Return deprecated sync api property value."""
        classname = type(class_instance).__name__
        self.check_no_loop(classname, method_name)
        async_method_name = f"async_get_{method_name}"
        msg = (
            f"{classname}.{method_name} is deprecated, use "
            f"{classname}.{async_method_name}"
        )
        warn(msg, DeprecationWarning, stacklevel=1)
        async_method = getattr(class_instance, async_method_name)
        return asyncio.run(self.run_and_close_session(async_method))

    def set_api_property(
        self,
        class_instance: _T,
        property_name: str,
        value: Any,
    ) -> None:
        """Set sync api property value."""
        classname = type(class_instance).__name__

        self.check_no_loop(classname, property_name)
        async_method_name = f"async_set_{property_name}"
        msg = (
            f"{classname}.{property_name} is deprecated, use "
            f"{classname}.{async_method_name}"
        )
        warn(msg, DeprecationWarning, stacklevel=1)
        async_method = getattr(class_instance, async_method_name)
        asyncio.run(self.run_and_close_session(async_method, value))