File: token_auth_strategy.py

package info (click to toggle)
python-twilio 9.4.1-1
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid, trixie
  • size: 13,756 kB
  • sloc: python: 8,281; makefile: 65
file content (53 lines) | stat: -rw-r--r-- 1,809 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
import jwt
import threading
import logging
from datetime import datetime

from twilio.auth_strategy.auth_type import AuthType
from twilio.auth_strategy.auth_strategy import AuthStrategy
from twilio.http.token_manager import TokenManager


class TokenAuthStrategy(AuthStrategy):
    def __init__(self, token_manager: TokenManager):
        super().__init__(AuthType.ORGS_TOKEN)
        self.token_manager = token_manager
        self.token = None
        self.lock = threading.Lock()
        logging.basicConfig(level=logging.INFO)
        self.logger = logging.getLogger(__name__)

    def get_auth_string(self) -> str:
        self.fetch_token()
        return f"Bearer {self.token}"

    def requires_authentication(self) -> bool:
        return True

    def fetch_token(self):
        if self.token is None or self.token == "" or self.is_token_expired(self.token):
            with self.lock:
                if (
                    self.token is None
                    or self.token == ""
                    or self.is_token_expired(self.token)
                ):
                    self.logger.info("New token fetched for accessing organization API")
                    self.token = self.token_manager.fetch_access_token()

    def is_token_expired(self, token):
        try:
            decoded = jwt.decode(token, options={"verify_signature": False})
            exp = decoded.get("exp")

            if exp is None:
                return True  # No expiration time present, consider it expired

            # Check if the expiration time has passed
            return datetime.fromtimestamp(exp) < datetime.utcnow()

        except jwt.DecodeError:
            return True  # Token is invalid
        except Exception as e:
            print(f"An error occurred: {e}")
            return True