File: _auth.py

package info (click to toggle)
azure-multiapi-storage-python 1.0.0-1
  • links: PTS, VCS
  • area: main
  • in suites: bookworm
  • size: 17,948 kB
  • sloc: python: 189,257; sh: 60; makefile: 2
file content (130 lines) | stat: -rw-r--r-- 4,842 bytes parent folder | download | duplicates (3)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
# -------------------------------------------------------------------------
# Copyright (c) Microsoft.  All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#   http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# --------------------------------------------------------------------------
from ._common_conversion import (
    _sign_string,
)

import logging
logger = logging.getLogger(__name__)


class _StorageSharedKeyAuthentication(object):
    def __init__(self, account_name, account_key):
        self.account_name = account_name
        self.account_key = account_key

    def _get_headers(self, request, headers_to_sign):
        headers = dict((name.lower(), value) for name, value in request.headers.items() if value)
        if 'content-length' in headers and headers['content-length'] == '0':
            del headers['content-length']
        return '\n'.join(headers.get(x, '') for x in headers_to_sign) + '\n'

    def _get_verb(self, request):
        return request.method + '\n'

    def _get_canonicalized_resource(self, request):
        uri_path = request.path.split('?')[0]
        return '/' + self.account_name + uri_path

    def _get_canonicalized_headers(self, request):
        string_to_sign = ''
        x_ms_headers = []
        for name, value in request.headers.items():
            if name.startswith('x-ms-'):
                x_ms_headers.append((name.lower(), value))
        x_ms_headers.sort()
        for name, value in x_ms_headers:
            if value is not None:
                string_to_sign += ''.join([name, ':', value, '\n'])
        return string_to_sign

    def _add_authorization_header(self, request, string_to_sign):
        signature = _sign_string(self.account_key, string_to_sign)
        auth_string = 'SharedKey ' + self.account_name + ':' + signature
        request.headers['Authorization'] = auth_string


class _StorageSharedKeyAuthentication(_StorageSharedKeyAuthentication):
    def sign_request(self, request):
        string_to_sign = \
            self._get_verb(request) + \
            self._get_headers(
                request,
                [
                    'content-encoding', 'content-language', 'content-length',
                    'content-md5', 'content-type', 'date', 'if-modified-since',
                    'if-match', 'if-none-match', 'if-unmodified-since', 'byte_range'
                ]
            ) + \
            self._get_canonicalized_headers(request) + \
            self._get_canonicalized_resource(request) + \
            self._get_canonicalized_resource_query(request)

        self._add_authorization_header(request, string_to_sign)
        logger.debug("String_to_sign=%s", string_to_sign)

    def _get_canonicalized_resource_query(self, request):
        sorted_queries = [(name, value) for name, value in request.query.items()]
        sorted_queries.sort()

        string_to_sign = ''
        for name, value in sorted_queries:
            if value:
                string_to_sign += '\n' + name.lower() + ':' + value

        return string_to_sign


class _StorageTableSharedKeyAuthentication(_StorageSharedKeyAuthentication):
    def sign_request(self, request):
        string_to_sign = \
            self._get_verb(request) + \
            self._get_headers(
                request,
                ['content-md5', 'content-type', 'x-ms-date'],
            ) + \
            self._get_canonicalized_resource(request) + \
            self._get_canonicalized_resource_query(request)

        self._add_authorization_header(request, string_to_sign)
        logger.debug("String_to_sign=%s", string_to_sign)

    def _get_canonicalized_resource_query(self, request):
        for name, value in request.query.items():
            if name == 'comp':
                return '?comp=' + value
        return ''


class _StorageNoAuthentication(object):
    def sign_request(self, request):
        pass


class _StorageSASAuthentication(object):
    def __init__(self, sas_token):
        self.sas_token = sas_token

    def sign_request(self, request):
        # if 'sig=' is present, then the request has already been signed
        # as is the case when performing retries
        if 'sig=' in request.path:
            return
        if '?' in request.path:
            request.path += '&'
        else:
            request.path += '?'

        request.path += self.sas_token