1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93
|
from __future__ import annotations
from typing import Any, Literal
from urllib.parse import urlencode
import requests
from requests import Session
from todoist_api_python._core.endpoints import (
ACCESS_TOKEN_PATH,
ACCESS_TOKENS_PATH,
AUTHORIZE_PATH,
get_api_url,
get_oauth_url,
)
from todoist_api_python._core.http_requests import delete, post
from todoist_api_python._core.utils import run_async
from todoist_api_python.models import AuthResult
"""
Possible permission scopes:
- `data:read`: Read-only access
- `data:read_write`: Read and write access
- `data:delete`: Full access including delete
- `task:add`: Can create new tasks
- `project:delete`: Can delete projects
- `backups:read`: Can access user backups without MFA
"""
Scope = Literal[
"task:add",
"data:read",
"data:read_write",
"data:delete",
"project:delete",
"backups:read",
]
def get_authentication_url(client_id: str, scopes: list[Scope], state: str) -> str:
"""Get authorization URL to initiate OAuth flow."""
if len(scopes) == 0:
raise ValueError("At least one authorization scope should be requested.")
endpoint = get_oauth_url(AUTHORIZE_PATH)
query = {
"client_id": client_id,
"scope": ",".join(scopes),
"state": state,
}
return f"{endpoint}?{urlencode(query)}"
def get_auth_token(
client_id: str, client_secret: str, code: str, session: Session | None = None
) -> AuthResult:
"""Get access token using provided client ID, client secret, and auth code."""
endpoint = get_oauth_url(ACCESS_TOKEN_PATH)
session = session or requests.Session()
data = {
"client_id": client_id,
"client_secret": client_secret,
"code": code,
}
response: dict[str, Any] = post(session=session, url=endpoint, data=data)
return AuthResult.from_dict(response)
async def get_auth_token_async(
client_id: str, client_secret: str, code: str
) -> AuthResult:
return await run_async(lambda: get_auth_token(client_id, client_secret, code))
def revoke_auth_token(
client_id: str, client_secret: str, token: str, session: Session | None = None
) -> bool:
"""Revoke an access token."""
# `get_api_url` is not a typo. Deleting access tokens is done using the regular API.
endpoint = get_api_url(ACCESS_TOKENS_PATH)
session = session or requests.Session()
params = {
"client_id": client_id,
"client_secret": client_secret,
"access_token": token,
}
return delete(session=session, url=endpoint, params=params)
async def revoke_auth_token_async(
client_id: str, client_secret: str, token: str
) -> bool:
return await run_async(lambda: revoke_auth_token(client_id, client_secret, token))
|