1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204
|
class ResultSet(object):
"""
A class used to lazily handle page-to-page navigation through a set of
results.
It presents a transparent iterator interface, so that all the user has
to do is use it in a typical ``for`` loop (or list comprehension, etc.)
to fetch results, even if they weren't present in the current page of
results.
This is used by the ``Table.query`` & ``Table.scan`` methods.
Example::
>>> users = Table('users')
>>> results = ResultSet()
>>> results.to_call(users.query, username__gte='johndoe')
# Now iterate. When it runs out of results, it'll fetch the next page.
>>> for res in results:
... print res['username']
"""
def __init__(self, max_page_size=None):
super(ResultSet, self).__init__()
self.the_callable = None
self.call_args = []
self.call_kwargs = {}
self._results = []
self._offset = -1
self._results_left = True
self._last_key_seen = None
self._fetches = 0
self._max_page_size = max_page_size
self._limit = None
@property
def first_key(self):
return 'exclusive_start_key'
def _reset(self):
"""
Resets the internal state of the ``ResultSet``.
This prevents results from being cached long-term & consuming
excess memory.
Largely internal.
"""
self._results = []
self._offset = 0
def __iter__(self):
return self
def __next__(self):
self._offset += 1
if self._offset >= len(self._results):
if self._results_left is False:
raise StopIteration()
self.fetch_more()
# It's possible that previous call to ``fetch_more`` may not return
# anything useful but there may be more results. Loop until we get
# something back, making sure we guard for no results left.
while not len(self._results) and self._results_left:
self.fetch_more()
if self._offset < len(self._results):
if self._limit is not None:
self._limit -= 1
if self._limit < 0:
raise StopIteration()
return self._results[self._offset]
else:
raise StopIteration()
next = __next__
def to_call(self, the_callable, *args, **kwargs):
"""
Sets up the callable & any arguments to run it with.
This is stored for subsequent calls so that those queries can be
run without requiring user intervention.
Example::
# Just an example callable.
>>> def squares_to(y):
... for x in range(1, y):
... yield x**2
>>> rs = ResultSet()
# Set up what to call & arguments.
>>> rs.to_call(squares_to, y=3)
"""
if not callable(the_callable):
raise ValueError(
'You must supply an object or function to be called.'
)
# We pop the ``limit``, if present, to track how many we should return
# to the user. This isn't the same as the ``limit`` that the low-level
# DDB api calls use (which limit page size, not the overall result set).
self._limit = kwargs.pop('limit', None)
if self._limit is not None and self._limit < 0:
self._limit = None
self.the_callable = the_callable
self.call_args = args
self.call_kwargs = kwargs
def fetch_more(self):
"""
When the iterator runs out of results, this method is run to re-execute
the callable (& arguments) to fetch the next page.
Largely internal.
"""
self._reset()
args = self.call_args[:]
kwargs = self.call_kwargs.copy()
if self._last_key_seen is not None:
kwargs[self.first_key] = self._last_key_seen
# If the page size is greater than limit set them
# to the same value
if self._limit and self._max_page_size and self._max_page_size > self._limit:
self._max_page_size = self._limit
# Put in the max page size.
if self._max_page_size is not None:
kwargs['limit'] = self._max_page_size
elif self._limit is not None:
# If max_page_size is not set and limit is available
# use it as the page size
kwargs['limit'] = self._limit
results = self.the_callable(*args, **kwargs)
self._fetches += 1
new_results = results.get('results', [])
self._last_key_seen = results.get('last_key', None)
if len(new_results):
self._results.extend(results['results'])
# Check the limit, if it's present.
if self._limit is not None and self._limit >= 0:
limit = self._limit
limit -= len(results['results'])
# If we've exceeded the limit, we don't have any more
# results to look for.
if limit <= 0:
self._results_left = False
if self._last_key_seen is None:
self._results_left = False
class BatchGetResultSet(ResultSet):
def __init__(self, *args, **kwargs):
self._keys_left = kwargs.pop('keys', [])
self._max_batch_get = kwargs.pop('max_batch_get', 100)
super(BatchGetResultSet, self).__init__(*args, **kwargs)
def fetch_more(self):
self._reset()
args = self.call_args[:]
kwargs = self.call_kwargs.copy()
# Slice off the max we can fetch.
kwargs['keys'] = self._keys_left[:self._max_batch_get]
self._keys_left = self._keys_left[self._max_batch_get:]
if len(self._keys_left) <= 0:
self._results_left = False
results = self.the_callable(*args, **kwargs)
if not len(results.get('results', [])):
return
self._results.extend(results['results'])
for offset, key_data in enumerate(results.get('unprocessed_keys', [])):
# We've got an unprocessed key. Reinsert it into the list.
# DynamoDB only returns valid keys, so there should be no risk of
# missing keys ever making it here.
self._keys_left.insert(offset, key_data)
if len(self._keys_left) > 0:
self._results_left = True
# Decrease the limit, if it's present.
if self.call_kwargs.get('limit'):
self.call_kwargs['limit'] -= len(results['results'])
|