1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53
|
import jwt
import threading
import logging
from datetime import datetime
from twilio.auth_strategy.auth_type import AuthType
from twilio.auth_strategy.auth_strategy import AuthStrategy
from twilio.http.token_manager import TokenManager
class TokenAuthStrategy(AuthStrategy):
def __init__(self, token_manager: TokenManager):
super().__init__(AuthType.ORGS_TOKEN)
self.token_manager = token_manager
self.token = None
self.lock = threading.Lock()
logging.basicConfig(level=logging.INFO)
self.logger = logging.getLogger(__name__)
def get_auth_string(self) -> str:
self.fetch_token()
return f"Bearer {self.token}"
def requires_authentication(self) -> bool:
return True
def fetch_token(self):
if self.token is None or self.token == "" or self.is_token_expired(self.token):
with self.lock:
if (
self.token is None
or self.token == ""
or self.is_token_expired(self.token)
):
self.logger.info("New token fetched for accessing organization API")
self.token = self.token_manager.fetch_access_token()
def is_token_expired(self, token):
try:
decoded = jwt.decode(token, options={"verify_signature": False})
exp = decoded.get("exp")
if exp is None:
return True # No expiration time present, consider it expired
# Check if the expiration time has passed
return datetime.fromtimestamp(exp) < datetime.utcnow()
except jwt.DecodeError:
return True # Token is invalid
except Exception as e:
print(f"An error occurred: {e}")
return True
|