File: oauth.py

package info (click to toggle)
python-azure-storage 20181109%2Bgit-1
  • links: PTS, VCS
  • area: main
  • in suites: buster
  • size: 76,472 kB
  • sloc: python: 28,724; makefile: 204; sh: 1
file content (111 lines) | stat: -rw-r--r-- 4,526 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
# -------------------------------------------------------------------------
# Copyright (c) Microsoft Corporation. All rights reserved.
# Licensed under the MIT License. See License.txt in the project root for
# license information.
# --------------------------------------------------------------------------

from azure.storage.common import (
    TokenCredential
)
from azure.storage.blob import (
    BlockBlobService,
)
import tests.settings_real as settings
import time
import datetime
import threading
import adal


class AutoUpdatedTokenCredential(TokenCredential):
    """
    This class can be used as a TokenCredential, it periodically updates its token through a timer triggered operation.
    It shows one way of making sure the credential does not become expired.
    """
    def __init__(self):
        super(AutoUpdatedTokenCredential, self).__init__()

        # a timer is used to trigger a callback to update the token
        # the timer needs to be protected, as later on it is possible that one thread is setting a new timer and
        # another thread is trying to cancel the timer
        self.lock = threading.Lock()
        self.timer_stopped = False

        # get the initial token and schedule the timer for the very first time
        self.refresh_token()

    # support context manager
    def __enter__(self):
        return self

    # support context manager
    def __exit__(self, *args):
        self.stop_refreshing_token()

    def refresh_token(self):
        # call the get token function to get a new token, as well as the time to wait before calling it again
        token, next_interval = self.get_token_func()

        # the token is set instantaneously, and can be used by BlockBlobService right away
        self.token = token

        with self.lock:
            if self.timer_stopped is False:
                self.timer = threading.Timer(next_interval, self.refresh_token)
                self.timer.start()

    def stop_refreshing_token(self):
        """
        The timer needs to be canceled if the application is terminating, if not the timer will keep going.
        """
        with self.lock:
            self.timer_stopped = True
            self.timer.cancel()

    @staticmethod
    def get_token_func():
        """
        This function makes a call to AAD to fetch an OAuth token
        :return: the OAuth token and the interval to wait before refreshing it
        """
        print("{}: token updater was triggered".format(datetime.datetime.now()))

        # in this example, the OAuth token is obtained using the ADAL library
        # however, the user can use any preferred method
        context = adal.AuthenticationContext(
            str.format("https://login.microsoftonline.com/{}", settings.ACTIVE_DIRECTORY_TENANT_ID),
            api_version=None, validate_authority=True)

        oauth_token = context.acquire_token_with_client_credentials(
            "https://storage.azure.com",
            settings.ACTIVE_DIRECTORY_APPLICATION_ID,
            settings.ACTIVE_DIRECTORY_APPLICATION_SECRET)

        # return the token itself and the interval to wait before this function should be called again
        # generally oauth_token['expiresIn'] - 180 is a good interval to give, as it tells the caller to
        # refresh the token 3 minutes before it expires, so here we are assuming that the token expiration
        # is at least longer than 3 minutes, the user should adjust it according to their AAD policy
        return oauth_token['accessToken'], oauth_token['expiresIn'] - 180


def test_token_credential_with_timer():
    # AutoUpdatedTokenCredential behaves like a context manager
    # it cancels the timer when exiting this block so that the script could terminate
    with AutoUpdatedTokenCredential() as token_credential:
        # initialize a BlockBlobService with the token credential that was just created
        service = BlockBlobService(settings.STORAGE_ACCOUNT_NAME, token_credential=token_credential)

        # loop for 100 minutes, and periodically check whether a container exists
        # this verifies whether the token gets updated properly
        for i in range(10):
            result = service.exists("test")
            if result is None:
                print("{}: something is wrong".format(datetime.datetime.now()))
            else:
                print("{}: all is well".format(datetime.datetime.now()))

            time.sleep(600)


if __name__ == '__main__':
    test_token_credential_with_timer()