File: tasks.py

package info (click to toggle)
python-django-rest-hooks 1.6.0-5
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid, trixie
  • size: 292 kB
  • sloc: python: 802; sh: 6; makefile: 5
file content (35 lines) | stat: -rw-r--r-- 1,129 bytes parent folder | download | duplicates (3)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
import requests
import json

from celery.task import Task

from django.core.serializers.json import DjangoJSONEncoder

from rest_hooks.utils import get_hook_model


class DeliverHook(Task):
    def run(self, target, payload, instance=None, hook_id=None, **kwargs):
        """
        target:     the url to receive the payload.
        payload:    a python primitive data structure
        instance:   a possibly null "trigger" instance
        hook:       the defining Hook object (useful for removing)
        """
        response = requests.post(
            url=target,
            data=json.dumps(payload, cls=DjangoJSONEncoder),
            headers={'Content-Type': 'application/json'}
        )

        if response.status_code == 410 and hook_id:
            HookModel = get_hook_model()
            hook = HookModel.object.get(id=hook_id)
            hook.delete()

        # would be nice to log this, at least for a little while...

def deliver_hook_wrapper(target, payload, instance=None, hook=None, **kwargs):
    if hook:
        kwargs['hook_id'] = hook.id
    return DeliverHook.delay(target, payload, **kwargs)