File: subscriptions.py

package info (click to toggle)
python-instagram 1.3.2%2Bgit20160108~dfeebe9-4
  • links: PTS, VCS
  • area: main
  • in suites: buster, stretch
  • size: 660 kB
  • ctags: 285
  • sloc: python: 1,248; makefile: 36; sh: 18
file content (59 lines) | stat: -rw-r--r-- 1,763 bytes parent folder | download | duplicates (2)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
import hmac
import hashlib
from .json_import import simplejson

class SubscriptionType:
    TAG = 'tag'
    USER = 'user'
    GEOGRAPHY = 'geography'
    LOCATION = 'location'


class SubscriptionError(Exception):
    pass


class SubscriptionVerifyError(SubscriptionError):
    pass


class SubscriptionsReactor(object):

    def __init__(self):
        self.callbacks = {}

    def _process_update(self, update):
        object_callbacks = self.callbacks.get(update['object'], [])

        for callback in object_callbacks:
            callback(update)

    def process(self, client_secret, raw_response, x_hub_signature):
        if not self._verify_signature(client_secret, raw_response, x_hub_signature):
            raise SubscriptionVerifyError("X-Hub-Signature and hmac digest did not match")

        try:
            response = simplejson.loads(raw_response)
        except ValueError:
            raise SubscriptionError('Unable to parse response, not valid JSON.')

        for update in response:
            self._process_update(update)

    def register_callback(self, object_type, callback):
        cb_list = self.callbacks.get(object_type, [])

        if callback not in cb_list:
            cb_list.append(callback)
            self.callbacks[object_type] = cb_list

    def deregister_callback(self, object_type, callback):
        callbacks = self.callbacks.get(object_type, [])
        callbacks.remove(callback)

    def _verify_signature(self, client_secret, raw_response, x_hub_signature):
        digest = hmac.new(client_secret.encode('utf-8'),
                          msg=raw_response.encode('utf-8'),
                          digestmod=hashlib.sha1
                          ).hexdigest()
        return digest == x_hub_signature