1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315
|
# -*- coding: utf-8 -*-
# Copyright (c) 2009, 2010, 2011 Jack Kaliko <efrim@azylum.org>
# Copyright (c) 2010 Eric Casteleijn <thisfred@gmail.com> (Throttle decorator)
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
#
#
"""
Consume last.fm web service
DOC:
file:///usr/share/doc/python2.6/html/howto/urllib2.html
or
http://docs.python.org/howto/urllib2.html
TODO: Replace SimaFM.cache reference with self.__class__.cache
http://diveintopython.adrahon.org/object_oriented_framework/class_attributes.html
"""
__version__ = u'0.2.4'
__revison__ = u'$Revision: 598 $'
__author__ = u'$Author: kaliko $'
__date__ = u'$Date: 2012-01-07 13:13:14 +0100 (sam. 07 janv. 2012) $'
import urllib2
from datetime import datetime, timedelta
from httplib import BadStatusLine
from socket import timeout as SocketTimeOut
from time import sleep
from xml.etree.cElementTree import ElementTree
# Some definitions
WAIT_BETWEEN_REQUESTS = timedelta(0, 0.4)
LFM_ERRORS = dict({'2': u'Invalid service -This service does not exist',
'3': u'Invalid Method - No method with that name in this package',
'4': u'Authentication Failed - You do not have permissions to access the service',
'5': u"'Invalid format - This service doesn't exist in that format",
'6': u'Invalid parameters - Your request is missing a required parameter',
'7': u'Invalid resource specified',
'9': u'Invalid session key - Please re-authenticate',
'10': u'Invalid API key - You must be granted a valid key by last.fm',
'11': u'Service Offline - This service is temporarily offline. Try again later.',
'12': u'Subscription Error - The user needs to be subscribed in order to do that',
'13': u'Invalid method signature supplied',
'26': u'Suspended API key - Access for your account has been suspended, please contact Last.fm',
})
class XmlFMError(Exception): # Errors
"""
Exception raised for errors in the input.
"""
def __init__(self, expression):
self.expression = expression
def __str__(self):
return repr(self.expression)
class EncodingError(XmlFMError):
"""Raised when string is not unicode"""
pass
class XmlFMHTTPError(XmlFMError):
"""Raised when failed to connect server"""
def __init__(self, expression):
if hasattr(expression, 'code'):
self.expression = 'error %d: %s' % (expression.code,
expression.msg)
else:
self.expression = 'error: %s' % expression
class XmlFMNotFound(XmlFMError):
"""Raised when no artist is found"""
def __init__(self, message=None):
if not message:
message = 'Artist probably not found (http error 400)'
self.expression = (message)
class XmlFMMissingArtist(XmlFMError):
"""Raised when no artist name provided"""
def __init__(self, message=None):
if not message:
message = 'Missing artist name.'
self.expression = (message)
class XmlFMTimeOut(XmlFMError):
"""Raised when urlib2.urlopen times out"""
def __init__(self, message=None):
if not message:
message = 'Connection to last.fm web services times out!'
self.expression = (message)
class Throttle(object):
def __init__(self, wait):
self.wait = wait
self.last_called = datetime.now()
def __call__(self, func):
def wrapper(*args, **kwargs):
while self.last_called + self.wait > datetime.now():
#print "waiting…"
sleep(0.1)
result = func(*args, **kwargs)
self.last_called = datetime.now()
return result
return wrapper
class AudioScrobblerCache(object):
def __init__(self, elem, last):
self.elemtree = elem
self.requestdate = last
def created(self):
return self.requestdate
def gettree(self):
return self.elemtree
class SimaFM(object):
"""
"""
api_key = u'4a1c9ddec29816ed803d7be9113ba4cb'
host = u'ws.audioscrobbler.com'
version = u'2.0'
root_url = u'http://%s/%s/' % (host, version)
request = dict({'similar': u'?method=artist.getsimilar&artist=%s&' +\
u'api_key=%s' % api_key,
'top': u'?method=artist.gettoptracks&artist=%s&' +\
u'api_key=%s' % api_key,
'track': u'?method=track.getsimilar&artist=%s' +\
u'&track=%s' + '&api_key=%s' % api_key,
'info': u'?method=artist.getinfo&artist=%s' +\
u'&api_key=%s' % api_key,
})
cache = dict({})
timestamp = datetime.utcnow()
count = 0
def __init__(self, artist=None, cache=True):
self._url = None
#SimaFM.count += 1
self.current_element = None
self.caching = cache
self.purge_cache()
def _is_in_cache(self):
"""Controls presence of url in cache.
"""
if self._url in SimaFM.cache:
#print "already fetch %s" % self.artist
return True
return False
def _fetch(self):
"""Use cached elements or proceed http request"""
if self._is_in_cache():
self.current_element = SimaFM.cache.get(self._url).gettree()
return
self._fetch_lfm()
@Throttle(WAIT_BETWEEN_REQUESTS)
def _fetch_lfm(self):
"""Get artists, fetch xml from last.fm"""
try:
fd = urllib2.urlopen(url=self._url,
timeout=30)
except BadStatusLine, err:
raise XmlFMHTTPError(err)
except urllib2.URLError, err:
if hasattr(err, 'reason'):
# URLError, failed to reach server
raise XmlFMError(repr(err.reason))
if hasattr(err, 'code'):
# HTTPError, the server couldn't fulfill the request
if err.code == 400:
raise XmlFMNotFound()
raise XmlFMHTTPError(err)
raise XmlFMError(err)
header = fd.info().getheader("Content-Type", "").split(';')
if header[0].strip() != "text/xml":
raise XmlFMError('None XML returned from the server')
if header[1].strip() != "charset=utf-8":
raise XmlFMError('XML not UTF-8 encoded!')
try:
self.current_element = ElementTree(file=fd)
except SocketTimeOut:
raise XmlFMTimeOut()
fd.close()
self._controls_lfm_answer()
if self.caching:
SimaFM.cache[self._url] = AudioScrobblerCache(self.current_element,
datetime.utcnow())
def _controls_lfm_answer(self):
"""Controls last.fm answer.
"""
status = self.current_element.getroot().attrib.get('status')
if status == 'ok':
return True
if status == 'failed':
error = self.current_element.find('error').attrib.get('code')
errormsg = self.current_element.findtext('error')
#if error in LFM_ERRORS.keys():
# print LFM_ERRORS.get(error)
raise XmlFMNotFound(errormsg)
def _controls_artist(self, artist):
"""
"""
self.artist = artist
if not self.artist:
raise XmlFMMissingArtist('Missing artist name calling SimaFM.get_<method>()')
if not isinstance(self.artist, unicode):
raise EncodingError('"%s" not unicode object' % self.artist)
# last.fm is UTF-8 encoded URL
self.artist_utf8 = self.artist.encode('UTF-8')
def purge_cache(self, age=4):
now = datetime.utcnow()
if now.hour == SimaFM.timestamp.hour:
return
SimaFM.timestamp = datetime.utcnow()
cache = SimaFM.cache
delta = timedelta(hours=age)
for url in cache.keys():
timestamp = cache.get(url).created()
if now - timestamp > delta:
cache.pop(url)
def get_similar(self, artist=None):
"""
"""
self._controls_artist(artist)
# Construct URL
url = SimaFM.root_url + SimaFM.request.get(u'similar')
self._url = url % (urllib2.quote(self.artist_utf8, safe=''))
self._fetch()
# TODO: controls name encoding
elem = self.current_element
for art in elem.getiterator(tag='artist'):
yield unicode(art.findtext('name')), 100 * float(art.findtext('match'))
def get_toptracks(self, artist=None):
"""
"""
self._controls_artist(artist)
# Construct URL
url = SimaFM.root_url + SimaFM.request.get(u'top')
self._url = url % (urllib2.quote(self.artist_utf8, safe=''))
self._fetch()
# TODO: controls name encoding
elem = self.current_element
for track in elem.getiterator(tag='track'):
yield unicode(track.findtext('name')), int(track.attrib.get('rank'))
def get_mbid(self, artist=None):
"""
"""
self._controls_artist(artist)
# Construct URL
url = SimaFM.root_url + SimaFM.request.get(u'info')
self._url = url % (urllib2.quote(self.artist_utf8, safe=''))
self._fetch()
# TODO: controls name encoding
elem = self.current_element
return unicode(elem.find('artist').findtext('mbid'))
def run():
test = SimaFM()
for a, m in test.get_similar(artist=u'Tool'):
pass
return
if __name__ == '__main__':
try:
run()
except XmlFMHTTPError, conn_err:
print "error trying to connect: %s" % conn_err
except XmlFMNotFound, not_found:
print "looks like no artists were found: %s" % not_found
except XmlFMError, err:
print err
# VIM MODLINE
# vim: ai ts=4 sw=4 sts=4 expandtab
|