File: examples.rst

package info (click to toggle)
django-q 1.8.0-1
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid
  • size: 1,276 kB
  • sloc: python: 5,686; makefile: 181; sh: 30
file content (395 lines) | stat: -rw-r--r-- 13,264 bytes parent folder | download | duplicates (3)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
Examples
--------
.. py:currentmodule:: django_q

Emails
======

Sending an email can take a while so why not queue it:

.. code-block:: python

    # Welcome mail with follow up example
    from datetime import timedelta
    from django.utils import timezone
    from django_q.tasks import async_task, schedule
    from django_q.models import Schedule


    def welcome_mail(user):
        msg = 'Welcome to our website'
        # send this message right away
        async_task('django.core.mail.send_mail',
                'Welcome',
                msg,
                'from@example.com',
                [user.email])
        # and this follow up email in one hour
        msg = 'Here are some tips to get you started...'
        schedule('django.core.mail.send_mail',
                 'Follow up',
                 msg,
                 'from@example.com',
                 [user.email],
                 schedule_type=Schedule.ONCE,
                 next_run=timezone.now() + timedelta(hours=1))

        # since the `repeats` defaults to -1
        # this schedule will erase itself after having run


Since you're only telling Django Q2 to take care of the emails, you can quickly move on to serving web pages to your user.

Signals
=======

A good place to use async tasks are Django's model signals. You don't want to delay the saving or creation of objects, but sometimes you want to trigger a lot of actions:

.. code-block:: python

    # Message on object change
    from django.contrib.auth.models import User
    from django.db.models.signals import pre_save
    from django.dispatch import receiver
    from django_q.tasks import async_task

    # set up the pre_save signal for our user
    @receiver(pre_save, sender=User)
    def email_changed(sender, instance, **kwargs):
        try:
            user = sender.objects.get(pk=instance.pk)
        except sender.DoesNotExist:
            pass  # new user
        else:
            # has his email changed?
            if not user.email == instance.email:
                # tell everyone
                async_task('tasks.inform_everyone', instance)

The task will send a message to everyone else informing them that the users email address has changed. Note that this adds almost no overhead to the save action:

.. code-block:: python

    # tasks.py
    def inform_everyone(user):
        mails = []
        for u in User.objects.exclude(pk=user.pk):
            msg = f"Dear {u.username}, {user.username} has a new email address: {user.email}"
            mails.append(('New email', msg,
                          'from@example.com', [u.email]))
        return send_mass_mail(mails)

.. code-block:: python

    # or do it async again
    def inform_everyone_async(user):
        for u in User.objects.exclude(pk=user.pk):
            msg = f"Dear {u.username}, {user.username} has a new email address: {user.email}"
            async_task('django.core.mail.send_mail',
                    'New email', msg, 'from@example.com', [u.email])


Of course you can do other things beside sending emails. These are just generic examples. You can use signals with async to update fields in other objects too.
Let's say this users email address is not just on the User object, but you stored it in some other places too without a reference.
By attaching an async action to the save signal, you can now update that email address in those other places without impacting the the time it takes to return your views.


Reports
=======

In this example the user requests a report and we let the cluster do the generating, while handling the result with a hook.

.. code-block:: python

    # Report generation with hook example
    from django_q.tasks import async_task

    # views.py
    # user requests a report.
    def create_report(request):
        async_task('tasks.create_html_report',
                request.user,
                hook='tasks.email_report')

.. code-block:: python

    # tasks.py
    from django_q.tasks import async_task

    # report generator
    def create_html_report(user):
        html_report = 'We had a great quarter!'
        return html_report

    # report mailer
    def email_report(task):
        if task.success:
            # Email the report
            async_task('django.core.mail.send_mail',
                    'The report you requested',
                    task.result,
                    'from@example.com',
                    task.args[0].email)
        else:
            # Tell the admins something went wrong
            async_task('django.core.mail.mail_admins',
                    'Report generation failed',
                    task.result)


The hook is practical here, because it allows us to detach the sending task from the report generation function and to report on possible failures.

Haystack
========
If you use `Haystack <http://haystacksearch.org/>`__ as your projects search engine,
here's an example of how you can have Django Q2 take care of your indexes in real time using model signals:

.. code-block:: python

    # Real time Haystack indexing
    from .models import Document
    from django.db.models.signals import post_save
    from django.dispatch import receiver
    from django_q.tasks import async_task

    # hook up the post save handler
    @receiver(post_save, sender=Document)
    def document_changed(sender, instance, **kwargs):
        async_task('tasks.index_object', sender, instance, save=False)
        # turn off result saving to not flood your database

.. code-block:: python

    # tasks.py
    from haystack import connection_router, connections

    def index_object(sender, instance):
        # get possible backends
        backends = connection_router.for_write(instance=instance)

        for backend in backends:
            # get the index for this model
            index = connections[backend].get_unified_index()\
                .get_index(sender)
            # update it
            index.update_object(instance, using=backend)

Now every time a Document is saved, your indexes will be updated without causing a delay in your save action.
You could expand this to dealing with deletes, by adding a ``post_delete`` signal and calling ``index.remove_object`` in the async_task function.

.. _shell:

Shell
=====
You can execute or schedule shell commands using Pythons :mod:`subprocess` module:

.. code-block:: python

    from django_q.tasks import async_task, result

    # make a backup copy of setup.py
    async_task('subprocess.call', ['cp', 'setup.py', 'setup.py.bak'])

    # call ls -l and dump the output
    task_id=async_task('subprocess.check_output', ['ls', '-l'])

    # get the result
    dir_list = result(task_id)

In Python 3.5 the subprocess module has changed quite a bit and returns a :class:`subprocess.CompletedProcess` object instead:

.. code-block:: python

    from django_q.tasks import async_task, result

    # make a backup copy of setup.py
    tid = async_task('subprocess.run', ['cp', 'setup.py', 'setup.py.bak'])

    # get the result
    r=result(tid, 500)
    # we can now look at the original arguments
    >>> r.args
    ['cp', 'setup.py', 'setup.py.bak']
    # and the returncode
    >>> r.returncode
    0

    # to capture the output we'll need a pipe
    from subprocess import PIPE

    # call ls -l and pipe the output
    tid = async_task('subprocess.run', ['ls', '-l'], stdout=PIPE)
    # get the result
    res = result(tid, 500)
    # print the output
    print(res.stdout)


Instead of :func:`async_task` you can of course also use :func:`schedule` to schedule commands.

For regular Django management commands, it is easier to call them directly:

.. code-block:: python

    from django_q.tasks import async_task, schedule

    async_task('django.core.management.call_command','clearsessions')

    # or clear those sessions every hour

    schedule('django.core.management.call_command',
         'clearsessions',
         schedule_type='H')



Groups
======
A group example with Kernel density estimation for probability density functions using the Parzen-window technique.
Adapted from `Sebastian Raschka's blog <http://sebastianraschka.com/Articles/2014_multiprocessing_intro.html>`__

.. code-block:: python

    # Group example with Parzen-window estimation
    import numpy

    from django_q.tasks import async_task, result_group, delete_group

    # the estimation function
    def parzen_estimation(x_samples, point_x, h):
        k_n = 0
        for row in x_samples:
            x_i = (point_x - row[:, numpy.newaxis]) / h
            for row in x_i:
                if numpy.abs(row) > (1 / 2):
                    break
            else:
                k_n += 1
        return h, (k_n / len(x_samples)) / (h ** point_x.shape[1])

    # create 100 calculations and return the collated result
    def parzen_async():
        # clear the previous results
        delete_group('parzen', cached=True)
        mu_vec = numpy.array([0, 0])
        cov_mat = numpy.array([[1, 0], [0, 1]])
        sample = numpy.random. \
            multivariate_normal(mu_vec, cov_mat, 10000)
        widths = numpy.linspace(1.0, 1.2, 100)
        x = numpy.array([[0], [0]])
        # async_task them with a group label to the cache backend
        for w in widths:
            async_task(parzen_estimation, sample, x, w,
                    group='parzen', cached=True)
        # return after 100 results
        return result_group('parzen', count=100, cached=True)



Django Q2 is not optimized for distributed computing, but this example will give you an idea of what you can do with task :doc:`group`.

Alternatively the ``parzen_async()`` function can also be written with :func:`async_iter`, which automatically utilizes the cache backend and groups to return a single result from an iterable:

.. code-block:: python

    # create 100 calculations and return the collated result
    def parzen_async():
        mu_vec = numpy.array([0, 0])
        cov_mat = numpy.array([[1, 0], [0, 1]])
        sample = numpy.random. \
            multivariate_normal(mu_vec, cov_mat, 10000)
        widths = numpy.linspace(1.0, 1.2, 100)
        x = numpy.array([[0], [0]])
        # async_task them with async_task iterable
        args = [(sample, x, w) for w in widths]
        result_id = async_iter(parzen_estimation, args, cached=True)
        # return the cached result or timeout after 10 seconds
        return result(result_id, wait=10000, cached=True)



Http Health Check
=================
An example of a python http server you can use (localhost:8080) to validate the health status of all the clusters in your environment.  Example is http only.

Requires cache to be enabled. Save file in your Django project's root directory and run with command: ``python worker_hc.py`` in your container or other environment.  Can be customized to show whatever you'd like from the Stat class or modified as needed.

.. code-block:: python

    from http.server import BaseHTTPRequestHandler, HTTPServer
    from mtt_app.settings.base import EMAIL_USE_TLS

    import os
    import django

    #  Set the correct path to you settings module
    os.environ.setdefault("DJANGO_SETTINGS_MODULE", "my.settings.path")

    # All django stuff has to come after the setup:
    django.setup()

    from django_q.status import Stat
    from django_q.conf import Conf

    # Set host and port settings
    hostName = "localhost"
    serverPort = 8080


    class HealthCheckServer(BaseHTTPRequestHandler):
        def do_GET(self):
            
            # Count the clusters and their status
            happy_clusters = 0
            total_clusters = 0

            for stat in Stat.get_all():
                total_clusters += 1
                if stat.status in [Conf.IDLE, Conf.WORKING]:
                    happy_clusters += 1

            # Return 200 response if there is at least 1 cluster running,
            # and make sure all running clusters are happy
            if total_clusters and happy_clusters == total_clusters:
                response_code = 200
            else:
                response_code = 500

            self.send_response(response_code)
            self.send_header("Content-type", "text/html")
            self.end_headers()

            self.wfile.write(
                bytes("<html><head><title>Django-Q Heath Check</title></head>", "utf-8")
            )
            self.wfile.write(
                bytes(f"<p>Health check returned {response_code} response</p>", "utf-8")
            )
            self.wfile.write(
                bytes(
                    f"<p>{happy_clusters} of {total_clusters} cluster(s) are happy</p></html>",
                    "utf-8",
                )
            )


    if __name__ == "__main__":
        webServer = HTTPServer((hostName, serverPort), HealthCheckServer)
        print("Server started at http://%s:%s" % (hostName, serverPort))

        try:
            webServer.serve_forever()
        except KeyboardInterrupt:
            pass

        webServer.server_close()
        print("Server stopped.")
        
        
        
.. note::

    If you have an example you want to share, please submit a pull request on `github <https://github.com/GDay/django-q2/>`__.