1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242
|
# Copyright (c) 2006,2007 Mitch Garnaat http://garnaat.org/
#
# Permission is hereby granted, free of charge, to any person obtaining a
# copy of this software and associated documentation files (the
# "Software"), to deal in the Software without restriction, including
# without limitation the rights to use, copy, modify, merge, publish, dis-
# tribute, sublicense, and/or sell copies of the Software, and to permit
# persons to whom the Software is furnished to do so, subject to the fol-
# lowing conditions:
#
# The above copyright notice and this permission notice shall be included
# in all copies or substantial portions of the Software.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
# OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABIL-
# ITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT
# SHALL THE AUTHOR BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY,
# WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS
# IN THE SOFTWARE.
import urllib
import xml.sax
import threading
import boto
from boto import handler
from boto.connection import AWSQueryConnection
from boto.sdb.domain import Domain
from boto.sdb.item import Item
from boto.exception import SDBResponseError
from boto.resultset import ResultSet
class ItemThread(threading.Thread):
def __init__(self, name, domain_name, item_names):
threading.Thread.__init__(self, name=name)
print 'starting %s with %d items' % (name, len(item_names))
self.domain_name = domain_name
self.conn = SDBConnection()
self.item_names = item_names
self.items = []
def run(self):
for item_name in self.item_names:
item = self.conn.get_attributes(self.domain_name, item_name)
self.items.append(item)
class SDBConnection(AWSQueryConnection):
APIVersion = '2007-11-07'
SignatureVersion = '1'
ResponseError = SDBResponseError
def __init__(self, aws_access_key_id=None, aws_secret_access_key=None,
is_secure=False, port=None, proxy=None, proxy_port=None,
proxy_user=None, proxy_pass=None, host='sdb.amazonaws.com', debug=0,
https_connection_factory=None):
AWSQueryConnection.__init__(self, aws_access_key_id, aws_secret_access_key,
is_secure, port, proxy, proxy_port, proxy_user, proxy_pass,
host, debug, https_connection_factory)
self.box_usage = 0.0
def build_name_value_list(self, params, attributes, replace=False):
keys = attributes.keys()
keys.sort()
i = 1
for key in keys:
value = attributes[key]
if isinstance(value, list):
for v in value:
params['Attribute.%d.Name'%i] = key
params['Attribute.%d.Value'%i] = v
i += 1
else:
params['Attribute.%d.Name'%i] = key
params['Attribute.%d.Value'%i] = value
if replace:
params['Attribute.%d.Replace'%i] = 'true'
i += 1
def build_name_list(self, params, attribute_names):
i = 1
attribute_names.sort()
for name in attribute_names:
params['Attribute.%d.Name'%i] = name
i += 1
def get_usage(self):
return self.box_usage
def print_usage(self):
print 'Total Usage: %f compute seconds' % self.box_usage
cost = self.box_usage * 0.14
print 'Approximate Cost: $%f' % cost
def get_domain(self, domain_name, validate=True):
"""
Returns a Domain object for a given domain_name.
If the validate parameter is True, the domain_name is validated
by performing a query (returning a max of 1 item) against the domain.
"""
domain = Domain(self, domain_name)
if validate:
self.query(domain, '', max_items=1)
return domain
def lookup(self, domain_name):
try:
domain = self.get_domain(domain_name)
except:
domain = None
return domain
def get_all_domains(self, max_domains=None, next_token=None):
params = {}
if max_domains:
params['MaxNumberOfDomains'] = max_domains
if next_token:
params['NextToken'] = next_token
return self.get_list('ListDomains', params, [('DomainName', Domain)])
def create_domain(self, domain_name):
params = {'DomainName':domain_name}
d = self.get_object('CreateDomain', params, Domain)
d.name = domain_name
return d
def get_domain_and_name(self, domain_or_name):
if (isinstance(domain_or_name, Domain)):
return (domain_or_name, domain_or_name.name)
else:
return (self.get_domain(domain_or_name), domain_or_name)
def delete_domain(self, domain_or_name):
domain, domain_name = self.get_domain_and_name(domain_or_name)
params = {'DomainName':domain_name}
return self.get_status('DeleteDomain', params)
def put_attributes(self, domain_or_name, item_name, attributes, replace=True):
"""
Store attributes for a given item in a domain.
Parameters:
domain__or_name - either a domain object or the name of a domain in SimpleDB
item_name - the name of the SDB item the attributes will be
associated with
attributes - a dict containing the name/value pairs to store
as attributes
replace - a boolean value that determines whether the attribute
values passed in will replace any existing values or will
be added as additional values. Defaults to True.
Returns:
Boolean True or raises an exception
"""
domain, domain_name = self.get_domain_and_name(domain_or_name)
params = {'DomainName' : domain_name,
'ItemName' : item_name}
self.build_name_value_list(params, attributes, replace)
return self.get_status('PutAttributes', params)
def get_attributes(self, domain_or_name, item_name, attribute_name=None, item=None):
domain, domain_name = self.get_domain_and_name(domain_or_name)
params = {'DomainName' : domain_name,
'ItemName' : item_name}
if attribute_name:
params['AttributeName'] = attribute_name
response = self.make_request('GetAttributes', params)
body = response.read()
if response.status == 200:
if item == None:
item = Item(domain, item_name)
h = handler.XmlHandler(item, self)
xml.sax.parseString(body, h)
return item
else:
raise SDBResponseError(response.status, response.reason, body)
def delete_attributes(self, domain_or_name, item_name, attr_names=None):
"""
Delete attributes from a given item in a domain.
Parameters:
domain__or_name - either a domain object or the name of a domain in SimpleDB
item_name - the name of the SDB item the attributes will be
removed from
attributes - either a list containing attribute names which will cause
all values associated with that attribute name to be deleted or
a dict containing the attribute names and keys and list of values
to delete as the value
Returns:
Boolean True or raises an exception
"""
domain, domain_name = self.get_domain_and_name(domain_or_name)
params = {'DomainName':domain_name,
'ItemName' : item_name}
if attr_names:
if isinstance(attr_names, list):
self.build_name_list(params, attr_names)
elif isinstance(attr_names, dict):
self.build_name_value_list(params, attr_names)
return self.get_status('DeleteAttributes', params)
def query(self, domain_or_name, query='', max_items=None, next_token=None):
"""
Returns a list of item names within domain_name that match the query.
"""
domain, domain_name = self.get_domain_and_name(domain_or_name)
params = {'DomainName':domain_name,
'QueryExpression' : query}
if max_items:
params['MaxNumberOfItems'] = max_items
if next_token:
params['NextToken'] = next_token
return self.get_object('Query', params, ResultSet)
def threaded_query(self, domain_or_name, query='', max_items=None, next_token=None, num_threads=6):
"""
Returns a list of fully populated items that match the query provided.
The name/value pairs for all of the matching item names are retrieved in a number of separate
threads (specified by num_threads) to achieve maximum throughput.
The ResultSet that is returned has an attribute called next_token that can be used
to retrieve additional results for the same query.
"""
domain, domain_name = self.get_domain_and_name(domain_or_name)
if max_items and num_threads > max_items:
num_threads = max_items
rs = self.query(domain_or_name, query, max_items, next_token)
threads = []
n = len(rs) / num_threads
for i in range(0, num_threads):
if i+1 == num_threads:
thread = ItemThread('Thread-%d' % i, domain_name, rs[n*i:])
else:
thread = ItemThread('Thread-%d' % i, domain_name, rs[n*i:n*(i+1)])
threads.append(thread)
thread.start()
del rs[0:]
for thread in threads:
thread.join()
for item in thread.items:
rs.append(item)
return rs
|