1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89
|
from __future__ import annotations
import json
from typing import TYPE_CHECKING, Any, TypeVar, cast
from requests.status_codes import codes
from todoist_api_python._core.http_headers import create_headers
if TYPE_CHECKING:
from requests import Session
# Timeouts for requests.
#
# 10 seconds for connecting is a recurring default and adheres to python-requests's
# recommendation of picking a value slightly larger than a multiple of 3.
#
# 60 seconds for reading aligns with Todoist's own internal timeout. All requests are
# forcefully terminated after this time, so there is no point waiting any longer.
TIMEOUT = (10, 60)
T = TypeVar("T")
def get(
session: Session,
url: str,
token: str | None = None,
request_id: str | None = None,
params: dict[str, Any] | None = None,
) -> T: # type: ignore[type-var]
headers = create_headers(token=token, request_id=request_id)
response = session.get(
url,
params=params,
headers=headers,
timeout=TIMEOUT,
)
if response.status_code == codes.OK:
return cast("T", response.json())
response.raise_for_status()
return cast("T", response.ok)
def post(
session: Session,
url: str,
token: str | None = None,
request_id: str | None = None,
*,
params: dict[str, Any] | None = None,
data: dict[str, Any] | None = None,
) -> T: # type: ignore[type-var]
headers = create_headers(
token=token, with_content=bool(data), request_id=request_id
)
response = session.post(
url,
headers=headers,
data=json.dumps(data) if data else None,
params=params,
timeout=TIMEOUT,
)
if response.status_code == codes.OK:
return cast("T", response.json())
response.raise_for_status()
return cast("T", response.ok)
def delete(
session: Session,
url: str,
token: str | None = None,
request_id: str | None = None,
params: dict[str, Any] | None = None,
) -> bool:
headers = create_headers(token=token, request_id=request_id)
response = session.delete(url, params=params, headers=headers, timeout=TIMEOUT)
response.raise_for_status()
return response.ok
|