1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165
|
import json
import collections
import urllib.parse
from json import JSONEncoder
import duo_client
class MockObjectJsonEncoder(json.JSONEncoder):
def default(self, obj):
return getattr(obj.__class__, "to_json")(obj)
# put params in a dict to avoid inconsistent ordering
def params_to_dict(param_str):
param_dict = collections.defaultdict(list)
for (key, val) in (param.split('=') for param in param_str.split('&')):
param_dict[key].append(urllib.parse.unquote(val))
return param_dict
class MockHTTPConnection(object):
"""
Mock HTTP(S) connection that returns a dummy JSON response.
"""
status = 200 # success!
def __init__(
self,
data_response_should_be_list=False,
data_response_from_get_authlog=False,
data_response_from_get_dtm_events=False,
data_response_from_get_items=False,
):
# if a response object should be a list rather than
# a dict, then set this flag to true
self.data_response_should_be_list = data_response_should_be_list
self.data_response_from_get_authlog = data_response_from_get_authlog
self.data_response_from_get_dtm_events = data_response_from_get_dtm_events
self.data_response_from_get_items = data_response_from_get_items
def dummy(self):
return self
_connect = _disconnect = close = getresponse = dummy
def read(self):
response = self.__dict__
if self.data_response_should_be_list:
response = [self.__dict__]
if self.data_response_from_get_authlog:
response['authlogs'] = []
if self.data_response_from_get_items:
response['items'] = []
if self.data_response_from_get_dtm_events:
response['events'] = [{"foo": "bar"}, {"bar": "foo"}]
return json.dumps({"stat":"OK", "response":response},
cls=MockObjectJsonEncoder)
def request(self, method, uri, body, headers):
self.method = method
self.uri = uri
self.body = body
self.headers = {}
for k, v in headers.items():
if isinstance(k, bytes):
k = k.decode('ascii')
if isinstance(v, bytes):
v = v.decode('ascii')
self.headers[k] = v
class MockJsonObject(object):
def to_json(self):
return {'id': id(self)}
class CountingClient(duo_client.client.Client):
def __init__(self, *args, **kwargs):
super(CountingClient, self).__init__(*args, **kwargs)
self.counter = 0
def _make_request(self, *args, **kwargs):
self.counter += 1
return super(CountingClient, self)._make_request(*args, **kwargs)
class MockPagingHTTPConnection(MockHTTPConnection):
def __init__(self, objects=None):
if objects is not None:
self.objects = objects
def dummy(self):
return self
_connect = _disconnect = close = getresponse = dummy
def read(self):
metadata = {}
metadata['total_objects'] = len(self.objects)
if self.offset + self.limit < len(self.objects):
metadata['next_offset'] = self.offset + self.limit
if self.offset > 0:
metadata['prev_offset'] = max(self.offset-self.limit, 0)
return json.dumps(
{"stat":"OK",
"response": self.objects[self.offset: self.offset+self.limit],
"metadata": metadata},
cls=MockObjectJsonEncoder)
def request(self, method, uri, body, headers):
self.method = method
self.uri = uri
self.body = body
self.headers = headers
parsed = urllib.parse.urlparse(uri)
params = urllib.parse.parse_qs(parsed.query)
self.limit = int(params['limit'][0])
# offset is always present with list-based paging but cannot be
# present on the initial request with cursor-based paging
self.offset = int(params.get('offset', [0])[0])
class MockAlternatePagingHTTPConnection(MockPagingHTTPConnection):
def read(self):
metadata = {}
metadata['total_objects'] = len(self.objects)
if self.offset + self.limit < len(self.objects):
metadata['next_offset'] = self.offset + self.limit
if self.offset > 0:
metadata['prev_offset'] = max(self.offset-self.limit, 0)
return json.dumps(
{"stat":"OK",
"response": {
"data" : self.objects[self.offset: self.offset+self.limit],
"metadata": metadata
},
},
cls=MockObjectJsonEncoder)
class MockMultipleRequestHTTPConnection(MockHTTPConnection):
def __init__(self, statuses):
super(MockMultipleRequestHTTPConnection, self).__init__()
self.statuses = statuses
self.status_iterator = iter(statuses)
self.requests = 0
self.status = None
def read(self):
response = {'foo': 'bar'}
return json.dumps({"stat":"OK", "response":response},
cls=MockObjectJsonEncoder)
def request(self, method, uri, body, headers):
self.requests += 1
self.status = next(self.status_iterator)
super(MockMultipleRequestHTTPConnection, self).request(
method, uri, body, headers)
|