File: simafm.py

package info (click to toggle)
mpd-sima 0.9.2-2
  • links: PTS, VCS
  • area: main
  • in suites: wheezy
  • size: 700 kB
  • sloc: python: 2,766; xml: 921; sh: 329; makefile: 83
file content (315 lines) | stat: -rw-r--r-- 10,478 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
# -*- coding: utf-8 -*-

# Copyright (c) 2009, 2010, 2011 Jack Kaliko <efrim@azylum.org>
# Copyright (c) 2010 Eric Casteleijn <thisfred@gmail.com> (Throttle decorator)
#
#   This program is free software: you can redistribute it and/or modify
#   it under the terms of the GNU General Public License as published by
#   the Free Software Foundation, either version 3 of the License, or
#   (at your option) any later version.
#
#   This program is distributed in the hope that it will be useful,
#   but WITHOUT ANY WARRANTY; without even the implied warranty of
#   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
#   GNU General Public License for more details.
#
#   You should have received a copy of the GNU General Public License
#   along with this program.  If not, see <http://www.gnu.org/licenses/>.
#
#

"""
Consume last.fm web service

DOC:
    file:///usr/share/doc/python2.6/html/howto/urllib2.html
        or
    http://docs.python.org/howto/urllib2.html

TODO: Replace SimaFM.cache reference with self.__class__.cache
    http://diveintopython.adrahon.org/object_oriented_framework/class_attributes.html
"""

__version__ = u'0.2.4'
__revison__ = u'$Revision: 598 $'
__author__ = u'$Author: kaliko $'
__date__ = u'$Date: 2012-01-07 13:13:14 +0100 (sam. 07 janv. 2012) $'


import urllib2

from datetime import datetime, timedelta
from httplib import BadStatusLine
from socket import timeout as SocketTimeOut
from time import sleep
from xml.etree.cElementTree import ElementTree

# Some definitions
WAIT_BETWEEN_REQUESTS = timedelta(0, 0.4)
LFM_ERRORS = dict({'2': u'Invalid service -This service does not exist',
    '3': u'Invalid Method - No method with that name in this package',
    '4': u'Authentication Failed - You do not have permissions to access the service',
    '5': u"'Invalid format - This service doesn't exist in that format",
    '6': u'Invalid parameters - Your request is missing a required parameter',
    '7': u'Invalid resource specified',
    '9': u'Invalid session key - Please re-authenticate',
    '10': u'Invalid API key - You must be granted a valid key by last.fm',
    '11': u'Service Offline - This service is temporarily offline. Try again later.',
    '12': u'Subscription Error - The user needs to be subscribed in order to do that',
    '13': u'Invalid method signature supplied',
    '26': u'Suspended API key - Access for your account has been suspended, please contact Last.fm',
    })


class XmlFMError(Exception):  # Errors
    """
    Exception raised for errors in the input.
    """

    def __init__(self, expression):
        self.expression = expression

    def __str__(self):
        return repr(self.expression)


class EncodingError(XmlFMError):
    """Raised when string is not unicode"""
    pass


class XmlFMHTTPError(XmlFMError):
    """Raised when failed to connect server"""

    def __init__(self, expression):
        if hasattr(expression, 'code'):
            self.expression = 'error %d: %s' % (expression.code,
                expression.msg)
        else:
            self.expression = 'error: %s' % expression


class XmlFMNotFound(XmlFMError):
    """Raised when no artist is found"""

    def __init__(self, message=None):
        if not message:
            message = 'Artist probably not found (http error 400)'
        self.expression = (message)


class XmlFMMissingArtist(XmlFMError):
    """Raised when no artist name provided"""

    def __init__(self, message=None):
        if not message:
            message = 'Missing artist name.'
        self.expression = (message)


class XmlFMTimeOut(XmlFMError):
    """Raised when urlib2.urlopen times out"""

    def __init__(self, message=None):
        if not message:
            message = 'Connection to last.fm web services times out!'
        self.expression = (message)


class Throttle(object):
    def __init__(self, wait):
        self.wait = wait
        self.last_called = datetime.now()

    def __call__(self, func):
        def wrapper(*args, **kwargs):
            while self.last_called + self.wait > datetime.now():
                #print "waiting…"
                sleep(0.1)
            result = func(*args, **kwargs)
            self.last_called = datetime.now()
            return result
        return wrapper


class AudioScrobblerCache(object):
    def __init__(self, elem, last):
        self.elemtree = elem
        self.requestdate = last

    def created(self):
        return self.requestdate

    def gettree(self):
        return self.elemtree


class SimaFM(object):
    """
    """
    api_key = u'4a1c9ddec29816ed803d7be9113ba4cb'
    host = u'ws.audioscrobbler.com'
    version = u'2.0'
    root_url = u'http://%s/%s/' % (host, version)
    request = dict({'similar': u'?method=artist.getsimilar&artist=%s&' +\
                                u'api_key=%s' % api_key,
                    'top': u'?method=artist.gettoptracks&artist=%s&' +\
                            u'api_key=%s' % api_key,
                    'track': u'?method=track.getsimilar&artist=%s' +\
                            u'&track=%s' + '&api_key=%s' % api_key,
                    'info': u'?method=artist.getinfo&artist=%s' +\
                            u'&api_key=%s' % api_key,
                    })
    cache = dict({})
    timestamp = datetime.utcnow()
    count = 0

    def __init__(self, artist=None, cache=True):
        self._url = None
        #SimaFM.count += 1
        self.current_element = None
        self.caching = cache
        self.purge_cache()

    def _is_in_cache(self):
        """Controls presence of url in cache.
        """
        if self._url in SimaFM.cache:
            #print "already fetch %s" % self.artist
            return True
        return False

    def _fetch(self):
        """Use cached elements or proceed http request"""
        if self._is_in_cache():
            self.current_element = SimaFM.cache.get(self._url).gettree()
            return
        self._fetch_lfm()

    @Throttle(WAIT_BETWEEN_REQUESTS)
    def _fetch_lfm(self):
        """Get artists, fetch xml from last.fm"""
        try:
            fd = urllib2.urlopen(url=self._url,
                    timeout=30)
        except BadStatusLine, err:
            raise XmlFMHTTPError(err)
        except urllib2.URLError, err:
            if hasattr(err, 'reason'):
                # URLError, failed to reach server
                raise XmlFMError(repr(err.reason))
            if hasattr(err, 'code'):
                # HTTPError, the server couldn't fulfill the request
                if err.code == 400:
                    raise XmlFMNotFound()
                raise XmlFMHTTPError(err)
            raise XmlFMError(err)
        header = fd.info().getheader("Content-Type", "").split(';')
        if header[0].strip() != "text/xml":
            raise XmlFMError('None XML returned from the server')
        if header[1].strip() != "charset=utf-8":
            raise XmlFMError('XML not UTF-8 encoded!')
        try:
            self.current_element = ElementTree(file=fd)
        except SocketTimeOut:
            raise XmlFMTimeOut()
        fd.close()
        self._controls_lfm_answer()
        if self.caching:
            SimaFM.cache[self._url] = AudioScrobblerCache(self.current_element,
                    datetime.utcnow())

    def _controls_lfm_answer(self):
        """Controls last.fm answer.
        """
        status = self.current_element.getroot().attrib.get('status')
        if status == 'ok':
            return True
        if status == 'failed':
            error = self.current_element.find('error').attrib.get('code')
            errormsg = self.current_element.findtext('error')
            #if error in LFM_ERRORS.keys():
            #    print LFM_ERRORS.get(error)
            raise XmlFMNotFound(errormsg)

    def _controls_artist(self, artist):
        """
        """
        self.artist = artist
        if not self.artist:
            raise XmlFMMissingArtist('Missing artist name calling SimaFM.get_<method>()')
        if not isinstance(self.artist, unicode):
            raise EncodingError('"%s" not unicode object' % self.artist)
        # last.fm is UTF-8 encoded URL
        self.artist_utf8 = self.artist.encode('UTF-8')

    def purge_cache(self, age=4):
        now = datetime.utcnow()
        if now.hour == SimaFM.timestamp.hour:
            return
        SimaFM.timestamp = datetime.utcnow()
        cache = SimaFM.cache
        delta = timedelta(hours=age)
        for url in cache.keys():
            timestamp = cache.get(url).created()
            if now - timestamp > delta:
                cache.pop(url)

    def get_similar(self, artist=None):
        """
        """
        self._controls_artist(artist)
        # Construct URL
        url = SimaFM.root_url + SimaFM.request.get(u'similar')
        self._url = url % (urllib2.quote(self.artist_utf8, safe=''))
        self._fetch()
        # TODO: controls name encoding
        elem = self.current_element
        for art in elem.getiterator(tag='artist'):
            yield unicode(art.findtext('name')), 100 * float(art.findtext('match'))

    def get_toptracks(self, artist=None):
        """
        """
        self._controls_artist(artist)
        # Construct URL
        url = SimaFM.root_url + SimaFM.request.get(u'top')
        self._url = url % (urllib2.quote(self.artist_utf8, safe=''))
        self._fetch()
        # TODO: controls name encoding
        elem = self.current_element
        for track in elem.getiterator(tag='track'):
            yield unicode(track.findtext('name')), int(track.attrib.get('rank'))

    def get_mbid(self, artist=None):
        """
        """
        self._controls_artist(artist)
        # Construct URL
        url = SimaFM.root_url + SimaFM.request.get(u'info')
        self._url = url % (urllib2.quote(self.artist_utf8, safe=''))
        self._fetch()
        # TODO: controls name encoding
        elem = self.current_element
        return unicode(elem.find('artist').findtext('mbid'))


def run():
    test = SimaFM()
    for a, m in test.get_similar(artist=u'Tool'):
        pass
    return

if __name__ == '__main__':
    try:
        run()
    except XmlFMHTTPError, conn_err:
        print "error trying to connect: %s" % conn_err
    except XmlFMNotFound, not_found:
        print "looks like no artists were found: %s" % not_found
    except XmlFMError, err:
        print err


# VIM MODLINE
# vim: ai ts=4 sw=4 sts=4 expandtab