1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111
|
# -*- coding: utf-8 -
#
# This file is part of couchdbkit released under the MIT license.
# See the NOTICE for more information.
from .base import ConsumerBase
OLD_CONSUMER_URIS = dict(
eventlet = "couchdbkit.consumer.ceventlet.EventletConsumer",
gevent = "couchdbkit.consumer.cgevent.GeventConsumer",
sync = "couchdbkit.consumer.sync.SyncConsumer")
def load_consumer_class(uri):
if uri in ('eventlet', 'gevent', 'sync'):
import warnings
warnings.warn(
"Short names for uri in consumer backend are deprecated.",
DeprecationWarning
)
uri = OLD_CONSUMER_URIS[uri]
components = uri.split('.')
klass = components.pop(-1)
mod = __import__('.'.join(components))
for comp in components[1:]:
mod = getattr(mod, comp)
return getattr(mod, klass)
class Consumer(object):
""" Database change consumer
Example Usage:
>>> from couchdbkit import Server, Consumer
>>> s = Server()
>>> db = s['testdb']
>>> c = Consumer(db)
>>> def print_line(line):
... print "got %s" % line
...
>>> c.wait(print_line,since=0) # Go into receive loop
"""
def __init__(self, db, backend='couchdbkit.consumer.sync.SyncConsumer', **kwargs):
""" Constructor for the consumer
Args:
@param db: Database instance
@param backend: backend entry point uri
The default class (sync) erialize each call to registered
callbacks. Line processing should be fast in this case to not
wait on socket read.
A string referring to one of the following bundled classes:
* ``sync``
* ``eventlet`` - Requires eventlet >= 0.9.7
* ``gevent`` - Requires gevent >= 0.12.2 (?)
You can optionnaly register in ``couchdbkit.consumers``entry point
your own worker.
"""
self.db = db
self.consumer_class = load_consumer_class(backend)
self._consumer = self.consumer_class(db, **kwargs)
def fetch(self, cb=None, **params):
""" Fetch all changes and return. If since is specified, fetch all changes
since this doc sequence
Args:
@param params: kwargs
See Changes API (http://wiki.apache.org/couchdb/HTTP_database_API#Changes)
@return: dict, change result
"""
return self._consumer.fetch(cb=cb, **params)
def wait_once(self, cb=None, **params):
"""Wait for one change and return (longpoll feed)
Args:
@param params: kwargs
See Changes API (http://wiki.apache.org/couchdb/HTTP_database_API#Changes)
@return: dict, change result
"""
return self._consumer.wait_once(cb=cb, **params)
def wait(self, cb, **params):
""" Wait for changes until the connection close (continuous feed)
Args:
@param params: kwargs
See Changes API (http://wiki.apache.org/couchdb/HTTP_database_API#Changes)
@return: dict, line of change
"""
return self._consumer.wait(cb, **params)
def wait_once_async(self, cb, **params):
""" like wait_once but doesn't return anything. """
return self._consumer.wait_once_async(cb=cb, **params)
def wait_async(self, cb, **params):
""" like wait but doesn't return anything. """
return self._consumer.wait_async(cb, **params)
|