1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208
|
"""Library for handling local event sync.
This library implements the best practices for how to
[Synchronize resources](https://developers.google.com/calendar/api/guides/sync)
for a Google Calendar.
To use this library, you create an instance of a sync manager then run the
`run` method on a regular basis to pull down the latest information from the server. You
may then query the latest information from the local store service.
"""
# pylint: disable=duplicate-code
from __future__ import annotations
import datetime
import json
import logging
from collections.abc import Awaitable, Callable
from typing import Any, TypeVar
from .api import (
CalendarEventStoreService,
CalendarListRequest,
CalendarListResponse,
CalendarListStoreService,
GoogleCalendarService,
ListEventsRequest,
SyncableRequest,
SyncableResponse,
SyncEventsRequest,
_ListEventsResponseModel,
)
from .const import CALENDAR_LIST_SYNC, EVENT_SYNC, ITEMS, SYNC_TOKEN, SYNC_TOKEN_VERSION
from .exceptions import InvalidSyncTokenException
from .store import CalendarStore, InMemoryCalendarStore, ScopedCalendarStore
_LOGGER = logging.getLogger(__name__)
# Can be incremented to blow away existing store
VERSION = 2
MIN_SYNC_DATETIME = datetime.datetime(2006, 1, 1, 0, 0, 0, tzinfo=datetime.timezone.utc)
T = TypeVar("T", bound=SyncableRequest)
S = TypeVar("S", bound=SyncableResponse)
def _items_func(
result: CalendarListResponse | _ListEventsResponseModel,
) -> dict[str, Any]:
items = {}
for item in result.items:
if not item.id:
continue
items[item.id] = json.loads(item.json())
return items
async def _run_sync(
store_data: dict[str, Any],
new_request: Callable[[str | None], T],
api_call: Callable[[T], Awaitable[S]],
items_func: Callable[[S], dict[str, Any]],
) -> dict[str, Any]:
store_data.setdefault(ITEMS, {})
# Invalid existing data in store if no longer valid
sync_token_version = store_data.get(SYNC_TOKEN_VERSION)
if sync_token_version and sync_token_version < VERSION:
_LOGGER.debug(
"Invaliding token with version %s, %s", sync_token_version, VERSION
)
store_data[SYNC_TOKEN] = None
store_data[ITEMS] = {}
# Load sync token from last execution if any
sync_token = store_data.get(SYNC_TOKEN)
request = new_request(sync_token)
while True:
try:
result = await api_call(request)
except InvalidSyncTokenException:
_LOGGER.debug("Invalidating sync token")
store_data[SYNC_TOKEN] = None
store_data[ITEMS] = {}
return await _run_sync(store_data, new_request, api_call, items_func)
store_data[ITEMS].update(items_func(result))
if not result.page_token:
if not result.sync_token:
raise InvalidSyncTokenException(
"Unexpected API response, missing sync_token"
)
store_data[SYNC_TOKEN] = result.sync_token
store_data[SYNC_TOKEN_VERSION] = VERSION
break
request.page_token = result.page_token
return store_data
class CalendarListSyncManager:
"""Manages synchronizing a calend arlist from API to local store."""
def __init__(
self, api: GoogleCalendarService, store: CalendarStore | None = None
) -> None:
"""Initialize CalendarListSyncManager."""
self._api = api
self._store = (
ScopedCalendarStore(store, CALENDAR_LIST_SYNC)
if store
else InMemoryCalendarStore()
)
@property
def store_service(self) -> CalendarListStoreService:
"""Return the local API for fetching events."""
return CalendarListStoreService(self._store)
@property
def api(self) -> GoogleCalendarService:
"""Return the cloud API."""
return self._api
async def run(self) -> None:
"""Run the event sync manager."""
def new_request(sync_token: str | None) -> CalendarListRequest:
request = CalendarListRequest()
if not sync_token:
_LOGGER.debug("Performing full calendar sync for calendar list")
else:
_LOGGER.debug(
"Performing incremental sync for calendar list (%s)",
sync_token,
)
request.sync_token = sync_token
return request
store_data = await self._store.async_load() or {}
store_data = await _run_sync(
store_data, new_request, self._api.async_list_calendars, _items_func
)
await self._store.async_save(store_data)
class CalendarEventSyncManager:
"""Manages synchronizing events from API to local store."""
def __init__(
self,
api: GoogleCalendarService,
calendar_id: str | None = None,
store: CalendarStore | None = None,
request_template: SyncEventsRequest | None = None,
) -> None:
"""Initialize CalendarEventSyncManager."""
self._api = api
if request_template is None:
if not calendar_id:
raise ValueError("Required either calendar_id or request_template")
self._request_template = SyncEventsRequest(calendar_id=calendar_id)
else:
self._request_template = request_template
self._calendar_id = self._request_template.calendar_id
self._store = (
ScopedCalendarStore(
ScopedCalendarStore(store, EVENT_SYNC), self._calendar_id
)
if store
else InMemoryCalendarStore()
)
@property
def store_service(self) -> CalendarEventStoreService:
"""Return the local API for fetching events."""
return CalendarEventStoreService(self._store, self._calendar_id, self._api)
@property
def api(self) -> GoogleCalendarService:
"""Return the cloud API."""
return self._api
async def run(self) -> None:
"""Run the event sync manager."""
def new_request(sync_token: str | None) -> ListEventsRequest:
if not sync_token:
_LOGGER.debug(
"Performing full calendar sync for calendar %s", self._calendar_id
)
return self._request_template.copy()
_LOGGER.debug(
"Performing incremental sync for calendar %s (%s)",
self._calendar_id,
sync_token,
)
return self._request_template.copy(
include={"calendar_id"}, update={"sync_token": sync_token}
)
store_data = await self._store.async_load() or {}
store_data = await _run_sync(
store_data, new_request, self._api.async_list_events_page, _items_func
)
await self._store.async_save(store_data)
|