1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233
|
# Copyright 2012 Canonical Ltd. This software is licensed under the GNU
# General Public License version 3 (see the file LICENSE).
__metaclass__ = type
import errno
from http.client import parse_headers
import socket
from io import BytesIO, StringIO
from urllib.parse import quote
import urllib.request, urllib.error
import base64
import pycurl
class UserError(Exception):
"""An error message that should be presented to the user."""
def __init__(self, msg=None):
if msg is None:
msg = self.__doc__
super(UserError, self).__init__(msg)
class Unauthorized(UserError):
"""Invalid username or password"""
status = 2
class CouldNotConnect(UserError):
"""Could not connect"""
status = 3
class CertificateVerificationFailed(UserError):
"""Certificate verification failed"""
status = 4
class URLLibGetter:
"""Get data from URLs using URLib."""
@staticmethod
def get_response_body(request, verify_ssl):
"""Return the body of the response to the supplied request.
:param request: A urllib2.Request
:param verify_ssl: Unused
:raises CouldNotConnect: if there is a connection error.
"""
try:
return urllib.request.urlopen(request).read()
except urllib.error.URLError as e:
if not isinstance(e.args[0], socket.error):
raise
if e.args[0].errno == errno.ECONNREFUSED:
raise CouldNotConnect
raise
class PycURLGetter:
"""Get data from URLs using PycURL."""
def __init__(self, _curl=None):
if _curl is None:
_curl = pycurl.Curl()
self.curl = _curl
self.result = BytesIO()
self.response_header = BytesIO()
self.curl.setopt(pycurl.HEADERFUNCTION, self.response_header.write)
self.curl.setopt(pycurl.WRITEFUNCTION, self.result.write)
self.verify_ssl = True
def prepare_curl(self, request):
"""Prepare the curl object for the supplied request.
:param request: a urllib2.Request instance.
"""
self.curl.setopt(pycurl.URL, request.get_full_url())
request_headers = ['%s: %s' % item for
item in list(request.headers.items())]
self.curl.setopt(pycurl.HTTPHEADER, request_headers)
self.curl.setopt(pycurl.SSL_VERIFYPEER, self.verify_ssl)
@classmethod
def get_response_body(cls, request, verify_ssl=True, _curl=None):
"""Return the body of the response to the supplied request.
:param request: A urllib2.Request instance.
:param verify_ssl: If true, verify SSL certificates.
:param _curl: The pycurl.Curl object to use (for testing).
:raises CouldNotConnect: if there is a connection error.
:raises CertificateVerificationFailed: if the SSL certificate could
not be verified.
:raises HTTPError: if the response status is not 200.
"""
instance = cls(_curl)
instance.verify_ssl = verify_ssl
instance.prepare_curl(request)
return instance.handle_response()
def handle_response(self):
"""Perform the curl operation and handle the response.
:return: The body of the response on success.
:raises CouldNotConnect: if there is a connection error.
:raises CertificateVerificationFailed: if the SSL certificate could
not be verified.
:raises HTTPError: if the response status is not 200.
"""
try:
self.curl.perform()
except pycurl.error as e:
if e.args[0] in (pycurl.E_COULDNT_CONNECT,
pycurl.E_COULDNT_RESOLVE_HOST):
raise CouldNotConnect
elif e.args[0] == pycurl.E_SSL_CACERT:
raise CertificateVerificationFailed
else:
raise
status = self.curl.getinfo(pycurl.HTTP_CODE)
if status == 200:
return self.result.getvalue().decode('utf-8')
else:
lines = self.response_header.getvalue().decode('utf-8').splitlines(True)
header_ = ''.join(lines[1:])
headers = parse_headers(BytesIO(header_.encode('ascii')))
raise urllib.error.HTTPError(
self.curl.getinfo(pycurl.EFFECTIVE_URL),
code=status,
msg=self.result.getvalue(),
hdrs=headers,
fp=StringIO()
)
class GetMazaData:
"""Base class for retrieving data from MAZA server."""
@classmethod
def run(cls, username, password, server_root=None, verify_ssl=True):
"""Return the requested data.
:param username: The username of the user.
:param password: The user's password.
:param server_root: The root URL to make queries to.
:param verify_ssl: If true, verify SSL certificates.
"""
return cls(username, password, server_root).get_data(verify_ssl)
def __init__(self, username, password, server_root=None):
self.username = username
self.password = password
if server_root is not None:
self.server_root = server_root
else:
self.server_root = 'https://uccs.landscape.canonical.com'
self.getter = PycURLGetter
def get_api_url(self):
"""Return the URL for an API version."""
return '%s/api/%s/' % (self.server_root, self.api_version)
def get_data(self, verify_ssl=True):
"""Return the data for this version of the API."""
try:
return self.getter.get_response_body(self.make_request(),
verify_ssl)
except urllib.error.HTTPError as e:
if e.getcode() == 401:
raise Unauthorized
else:
raise
class GetMazaDataAPI1(GetMazaData):
"""Get the maza data for a given email and password via API v1."""
api_version = 1
def make_request(self):
path = '%s/%s' % (quote(self.username), quote(self.password))
return urllib.request.Request(self.get_api_url() + path)
class GetMazaDataAPI5(GetMazaData):
"""Get the maza data for a given email and password via API v5."""
api_version = 5
def get_url(self):
return self.get_api_url()
def make_request(self):
request = urllib.request.Request(self.get_url())
credentials = '%s:%s' % (self.username, self.password)
credentials64 = base64.encodebytes(credentials.encode('ascii'))
authorization = 'Basic %s' % credentials64.decode('ascii')
request.add_header('Authorization', authorization)
return request
class GetMazaDataAPI4(GetMazaDataAPI5):
"""Get the maza data for a given username/email and password via API v4."""
# identical for now with API v4 regarding the URL request part.
api_version = 4
class GetMazaDataAPI3(GetMazaDataAPI4):
"""Get the maza data for a given email and password via API v3."""
api_version = 3
def get_url(self):
return self.get_api_url() + quote(self.username)
class GetMazaDataAPI2(GetMazaDataAPI3):
"""Get the maza data for a given email and password via API v2."""
api_version = 2
api_versions = {
'1': GetMazaDataAPI1,
'2': GetMazaDataAPI2,
'3': GetMazaDataAPI3,
'4': GetMazaDataAPI4,
'5': GetMazaDataAPI5,
'default': GetMazaDataAPI5,
}
|