From: Asif Saifuddin Auvi <auvipy@gmail.com>
Date: Mon, 9 Jul 2018 11:33:36 +0600
Subject: Python 3.7 compat issues (#4852)

* renamed banckend.async to asynchronous

* adjust redis imports of async

* adjust imports of async

* import style adjust

* renamed doc from async to asynchronous

* renamed doc contents from async to asynchronous
---
 celery/backends/async.py                           | 299 ---------------------
 celery/backends/asynchronous.py                    | 299 +++++++++++++++++++++
 celery/backends/redis.py                           |   7 +-
 celery/backends/rpc.py                             |   2 +-
 docs/internals/reference/celery.backends.async.rst |  13 -
 .../reference/celery.backends.asynchronous.rst     |  13 +
 t/unit/backends/test_redis.py                      |   4 +-
 7 files changed, 319 insertions(+), 318 deletions(-)
 delete mode 100644 celery/backends/async.py
 create mode 100644 celery/backends/asynchronous.py
 delete mode 100644 docs/internals/reference/celery.backends.async.rst
 create mode 100644 docs/internals/reference/celery.backends.asynchronous.rst

diff --git a/celery/backends/async.py b/celery/backends/async.py
deleted file mode 100644
index 20bf539..0000000
--- a/celery/backends/async.py
+++ /dev/null
@@ -1,299 +0,0 @@
-"""Async I/O backend support utilities."""
-from __future__ import absolute_import, unicode_literals
-
-import socket
-import threading
-from collections import deque
-from time import sleep
-from weakref import WeakKeyDictionary
-
-from kombu.utils.compat import detect_environment
-from kombu.utils.objects import cached_property
-
-from celery import states
-from celery.exceptions import TimeoutError
-from celery.five import Empty, monotonic
-from celery.utils.threads import THREAD_TIMEOUT_MAX
-
-__all__ = (
-    'AsyncBackendMixin', 'BaseResultConsumer', 'Drainer',
-    'register_drainer',
-)
-
-drainers = {}
-
-
-def register_drainer(name):
-    """Decorator used to register a new result drainer type."""
-    def _inner(cls):
-        drainers[name] = cls
-        return cls
-    return _inner
-
-
-@register_drainer('default')
-class Drainer(object):
-    """Result draining service."""
-
-    def __init__(self, result_consumer):
-        self.result_consumer = result_consumer
-
-    def start(self):
-        pass
-
-    def stop(self):
-        pass
-
-    def drain_events_until(self, p, timeout=None, on_interval=None, wait=None):
-        wait = wait or self.result_consumer.drain_events
-        time_start = monotonic()
-
-        while 1:
-            # Total time spent may exceed a single call to wait()
-            if timeout and monotonic() - time_start >= timeout:
-                raise socket.timeout()
-            try:
-                yield self.wait_for(p, wait, timeout=1)
-            except socket.timeout:
-                pass
-            if on_interval:
-                on_interval()
-            if p.ready:  # got event on the wanted channel.
-                break
-
-    def wait_for(self, p, wait, timeout=None):
-        wait(timeout=timeout)
-
-
-class greenletDrainer(Drainer):
-    spawn = None
-    _g = None
-
-    def __init__(self, *args, **kwargs):
-        super(greenletDrainer, self).__init__(*args, **kwargs)
-        self._started = threading.Event()
-        self._stopped = threading.Event()
-        self._shutdown = threading.Event()
-
-    def run(self):
-        self._started.set()
-        while not self._stopped.is_set():
-            try:
-                self.result_consumer.drain_events(timeout=1)
-            except socket.timeout:
-                pass
-        self._shutdown.set()
-
-    def start(self):
-        if not self._started.is_set():
-            self._g = self.spawn(self.run)
-            self._started.wait()
-
-    def stop(self):
-        self._stopped.set()
-        self._shutdown.wait(THREAD_TIMEOUT_MAX)
-
-    def wait_for(self, p, wait, timeout=None):
-        self.start()
-        if not p.ready:
-            sleep(0)
-
-
-@register_drainer('eventlet')
-class eventletDrainer(greenletDrainer):
-
-    @cached_property
-    def spawn(self):
-        from eventlet import spawn
-        return spawn
-
-
-@register_drainer('gevent')
-class geventDrainer(greenletDrainer):
-
-    @cached_property
-    def spawn(self):
-        from gevent import spawn
-        return spawn
-
-
-class AsyncBackendMixin(object):
-    """Mixin for backends that enables the async API."""
-
-    def _collect_into(self, result, bucket):
-        self.result_consumer.buckets[result] = bucket
-
-    def iter_native(self, result, no_ack=True, **kwargs):
-        self._ensure_not_eager()
-
-        results = result.results
-        if not results:
-            raise StopIteration()
-
-        # we tell the result consumer to put consumed results
-        # into these buckets.
-        bucket = deque()
-        for node in results:
-            if node._cache:
-                bucket.append(node)
-            else:
-                self._collect_into(node, bucket)
-
-        for _ in self._wait_for_pending(result, no_ack=no_ack, **kwargs):
-            while bucket:
-                node = bucket.popleft()
-                yield node.id, node._cache
-        while bucket:
-            node = bucket.popleft()
-            yield node.id, node._cache
-
-    def add_pending_result(self, result, weak=False, start_drainer=True):
-        if start_drainer:
-            self.result_consumer.drainer.start()
-        try:
-            self._maybe_resolve_from_buffer(result)
-        except Empty:
-            self._add_pending_result(result.id, result, weak=weak)
-        return result
-
-    def _maybe_resolve_from_buffer(self, result):
-        result._maybe_set_cache(self._pending_messages.take(result.id))
-
-    def _add_pending_result(self, task_id, result, weak=False):
-        concrete, weak_ = self._pending_results
-        if task_id not in weak_ and result.id not in concrete:
-            (weak_ if weak else concrete)[task_id] = result
-            self.result_consumer.consume_from(task_id)
-
-    def add_pending_results(self, results, weak=False):
-        self.result_consumer.drainer.start()
-        return [self.add_pending_result(result, weak=weak, start_drainer=False)
-                for result in results]
-
-    def remove_pending_result(self, result):
-        self._remove_pending_result(result.id)
-        self.on_result_fulfilled(result)
-        return result
-
-    def _remove_pending_result(self, task_id):
-        for map in self._pending_results:
-            map.pop(task_id, None)
-
-    def on_result_fulfilled(self, result):
-        self.result_consumer.cancel_for(result.id)
-
-    def wait_for_pending(self, result,
-                         callback=None, propagate=True, **kwargs):
-        self._ensure_not_eager()
-        for _ in self._wait_for_pending(result, **kwargs):
-            pass
-        return result.maybe_throw(callback=callback, propagate=propagate)
-
-    def _wait_for_pending(self, result,
-                          timeout=None, on_interval=None, on_message=None,
-                          **kwargs):
-        return self.result_consumer._wait_for_pending(
-            result, timeout=timeout,
-            on_interval=on_interval, on_message=on_message,
-        )
-
-    @property
-    def is_async(self):
-        return True
-
-
-class BaseResultConsumer(object):
-    """Manager responsible for consuming result messages."""
-
-    def __init__(self, backend, app, accept,
-                 pending_results, pending_messages):
-        self.backend = backend
-        self.app = app
-        self.accept = accept
-        self._pending_results = pending_results
-        self._pending_messages = pending_messages
-        self.on_message = None
-        self.buckets = WeakKeyDictionary()
-        self.drainer = drainers[detect_environment()](self)
-
-    def start(self, initial_task_id, **kwargs):
-        raise NotImplementedError()
-
-    def stop(self):
-        pass
-
-    def drain_events(self, timeout=None):
-        raise NotImplementedError()
-
-    def consume_from(self, task_id):
-        raise NotImplementedError()
-
-    def cancel_for(self, task_id):
-        raise NotImplementedError()
-
-    def _after_fork(self):
-        self.buckets.clear()
-        self.buckets = WeakKeyDictionary()
-        self.on_message = None
-        self.on_after_fork()
-
-    def on_after_fork(self):
-        pass
-
-    def drain_events_until(self, p, timeout=None, on_interval=None):
-        return self.drainer.drain_events_until(
-            p, timeout=timeout, on_interval=on_interval)
-
-    def _wait_for_pending(self, result,
-                          timeout=None, on_interval=None, on_message=None,
-                          **kwargs):
-        self.on_wait_for_pending(result, timeout=timeout, **kwargs)
-        prev_on_m, self.on_message = self.on_message, on_message
-        try:
-            for _ in self.drain_events_until(
-                    result.on_ready, timeout=timeout,
-                    on_interval=on_interval):
-                yield
-                sleep(0)
-        except socket.timeout:
-            raise TimeoutError('The operation timed out.')
-        finally:
-            self.on_message = prev_on_m
-
-    def on_wait_for_pending(self, result, timeout=None, **kwargs):
-        pass
-
-    def on_out_of_band_result(self, message):
-        self.on_state_change(message.payload, message)
-
-    def _get_pending_result(self, task_id):
-        for mapping in self._pending_results:
-            try:
-                return mapping[task_id]
-            except KeyError:
-                pass
-        raise KeyError(task_id)
-
-    def on_state_change(self, meta, message):
-        if self.on_message:
-            self.on_message(meta)
-        if meta['status'] in states.READY_STATES:
-            task_id = meta['task_id']
-            try:
-                result = self._get_pending_result(task_id)
-            except KeyError:
-                # send to buffer in case we received this result
-                # before it was added to _pending_results.
-                self._pending_messages.put(task_id, meta)
-            else:
-                result._maybe_set_cache(meta)
-                buckets = self.buckets
-                try:
-                    # remove bucket for this result, since it's fulfilled
-                    bucket = buckets.pop(result)
-                except KeyError:
-                    pass
-                else:
-                    # send to waiter via bucket
-                    bucket.append(result)
-        sleep(0)
diff --git a/celery/backends/asynchronous.py b/celery/backends/asynchronous.py
new file mode 100644
index 0000000..20bf539
--- /dev/null
+++ b/celery/backends/asynchronous.py
@@ -0,0 +1,299 @@
+"""Async I/O backend support utilities."""
+from __future__ import absolute_import, unicode_literals
+
+import socket
+import threading
+from collections import deque
+from time import sleep
+from weakref import WeakKeyDictionary
+
+from kombu.utils.compat import detect_environment
+from kombu.utils.objects import cached_property
+
+from celery import states
+from celery.exceptions import TimeoutError
+from celery.five import Empty, monotonic
+from celery.utils.threads import THREAD_TIMEOUT_MAX
+
+__all__ = (
+    'AsyncBackendMixin', 'BaseResultConsumer', 'Drainer',
+    'register_drainer',
+)
+
+drainers = {}
+
+
+def register_drainer(name):
+    """Decorator used to register a new result drainer type."""
+    def _inner(cls):
+        drainers[name] = cls
+        return cls
+    return _inner
+
+
+@register_drainer('default')
+class Drainer(object):
+    """Result draining service."""
+
+    def __init__(self, result_consumer):
+        self.result_consumer = result_consumer
+
+    def start(self):
+        pass
+
+    def stop(self):
+        pass
+
+    def drain_events_until(self, p, timeout=None, on_interval=None, wait=None):
+        wait = wait or self.result_consumer.drain_events
+        time_start = monotonic()
+
+        while 1:
+            # Total time spent may exceed a single call to wait()
+            if timeout and monotonic() - time_start >= timeout:
+                raise socket.timeout()
+            try:
+                yield self.wait_for(p, wait, timeout=1)
+            except socket.timeout:
+                pass
+            if on_interval:
+                on_interval()
+            if p.ready:  # got event on the wanted channel.
+                break
+
+    def wait_for(self, p, wait, timeout=None):
+        wait(timeout=timeout)
+
+
+class greenletDrainer(Drainer):
+    spawn = None
+    _g = None
+
+    def __init__(self, *args, **kwargs):
+        super(greenletDrainer, self).__init__(*args, **kwargs)
+        self._started = threading.Event()
+        self._stopped = threading.Event()
+        self._shutdown = threading.Event()
+
+    def run(self):
+        self._started.set()
+        while not self._stopped.is_set():
+            try:
+                self.result_consumer.drain_events(timeout=1)
+            except socket.timeout:
+                pass
+        self._shutdown.set()
+
+    def start(self):
+        if not self._started.is_set():
+            self._g = self.spawn(self.run)
+            self._started.wait()
+
+    def stop(self):
+        self._stopped.set()
+        self._shutdown.wait(THREAD_TIMEOUT_MAX)
+
+    def wait_for(self, p, wait, timeout=None):
+        self.start()
+        if not p.ready:
+            sleep(0)
+
+
+@register_drainer('eventlet')
+class eventletDrainer(greenletDrainer):
+
+    @cached_property
+    def spawn(self):
+        from eventlet import spawn
+        return spawn
+
+
+@register_drainer('gevent')
+class geventDrainer(greenletDrainer):
+
+    @cached_property
+    def spawn(self):
+        from gevent import spawn
+        return spawn
+
+
+class AsyncBackendMixin(object):
+    """Mixin for backends that enables the async API."""
+
+    def _collect_into(self, result, bucket):
+        self.result_consumer.buckets[result] = bucket
+
+    def iter_native(self, result, no_ack=True, **kwargs):
+        self._ensure_not_eager()
+
+        results = result.results
+        if not results:
+            raise StopIteration()
+
+        # we tell the result consumer to put consumed results
+        # into these buckets.
+        bucket = deque()
+        for node in results:
+            if node._cache:
+                bucket.append(node)
+            else:
+                self._collect_into(node, bucket)
+
+        for _ in self._wait_for_pending(result, no_ack=no_ack, **kwargs):
+            while bucket:
+                node = bucket.popleft()
+                yield node.id, node._cache
+        while bucket:
+            node = bucket.popleft()
+            yield node.id, node._cache
+
+    def add_pending_result(self, result, weak=False, start_drainer=True):
+        if start_drainer:
+            self.result_consumer.drainer.start()
+        try:
+            self._maybe_resolve_from_buffer(result)
+        except Empty:
+            self._add_pending_result(result.id, result, weak=weak)
+        return result
+
+    def _maybe_resolve_from_buffer(self, result):
+        result._maybe_set_cache(self._pending_messages.take(result.id))
+
+    def _add_pending_result(self, task_id, result, weak=False):
+        concrete, weak_ = self._pending_results
+        if task_id not in weak_ and result.id not in concrete:
+            (weak_ if weak else concrete)[task_id] = result
+            self.result_consumer.consume_from(task_id)
+
+    def add_pending_results(self, results, weak=False):
+        self.result_consumer.drainer.start()
+        return [self.add_pending_result(result, weak=weak, start_drainer=False)
+                for result in results]
+
+    def remove_pending_result(self, result):
+        self._remove_pending_result(result.id)
+        self.on_result_fulfilled(result)
+        return result
+
+    def _remove_pending_result(self, task_id):
+        for map in self._pending_results:
+            map.pop(task_id, None)
+
+    def on_result_fulfilled(self, result):
+        self.result_consumer.cancel_for(result.id)
+
+    def wait_for_pending(self, result,
+                         callback=None, propagate=True, **kwargs):
+        self._ensure_not_eager()
+        for _ in self._wait_for_pending(result, **kwargs):
+            pass
+        return result.maybe_throw(callback=callback, propagate=propagate)
+
+    def _wait_for_pending(self, result,
+                          timeout=None, on_interval=None, on_message=None,
+                          **kwargs):
+        return self.result_consumer._wait_for_pending(
+            result, timeout=timeout,
+            on_interval=on_interval, on_message=on_message,
+        )
+
+    @property
+    def is_async(self):
+        return True
+
+
+class BaseResultConsumer(object):
+    """Manager responsible for consuming result messages."""
+
+    def __init__(self, backend, app, accept,
+                 pending_results, pending_messages):
+        self.backend = backend
+        self.app = app
+        self.accept = accept
+        self._pending_results = pending_results
+        self._pending_messages = pending_messages
+        self.on_message = None
+        self.buckets = WeakKeyDictionary()
+        self.drainer = drainers[detect_environment()](self)
+
+    def start(self, initial_task_id, **kwargs):
+        raise NotImplementedError()
+
+    def stop(self):
+        pass
+
+    def drain_events(self, timeout=None):
+        raise NotImplementedError()
+
+    def consume_from(self, task_id):
+        raise NotImplementedError()
+
+    def cancel_for(self, task_id):
+        raise NotImplementedError()
+
+    def _after_fork(self):
+        self.buckets.clear()
+        self.buckets = WeakKeyDictionary()
+        self.on_message = None
+        self.on_after_fork()
+
+    def on_after_fork(self):
+        pass
+
+    def drain_events_until(self, p, timeout=None, on_interval=None):
+        return self.drainer.drain_events_until(
+            p, timeout=timeout, on_interval=on_interval)
+
+    def _wait_for_pending(self, result,
+                          timeout=None, on_interval=None, on_message=None,
+                          **kwargs):
+        self.on_wait_for_pending(result, timeout=timeout, **kwargs)
+        prev_on_m, self.on_message = self.on_message, on_message
+        try:
+            for _ in self.drain_events_until(
+                    result.on_ready, timeout=timeout,
+                    on_interval=on_interval):
+                yield
+                sleep(0)
+        except socket.timeout:
+            raise TimeoutError('The operation timed out.')
+        finally:
+            self.on_message = prev_on_m
+
+    def on_wait_for_pending(self, result, timeout=None, **kwargs):
+        pass
+
+    def on_out_of_band_result(self, message):
+        self.on_state_change(message.payload, message)
+
+    def _get_pending_result(self, task_id):
+        for mapping in self._pending_results:
+            try:
+                return mapping[task_id]
+            except KeyError:
+                pass
+        raise KeyError(task_id)
+
+    def on_state_change(self, meta, message):
+        if self.on_message:
+            self.on_message(meta)
+        if meta['status'] in states.READY_STATES:
+            task_id = meta['task_id']
+            try:
+                result = self._get_pending_result(task_id)
+            except KeyError:
+                # send to buffer in case we received this result
+                # before it was added to _pending_results.
+                self._pending_messages.put(task_id, meta)
+            else:
+                result._maybe_set_cache(meta)
+                buckets = self.buckets
+                try:
+                    # remove bucket for this result, since it's fulfilled
+                    bucket = buckets.pop(result)
+                except KeyError:
+                    pass
+                else:
+                    # send to waiter via bucket
+                    bucket.append(result)
+        sleep(0)
diff --git a/celery/backends/redis.py b/celery/backends/redis.py
index 012db0f..6c311d8 100644
--- a/celery/backends/redis.py
+++ b/celery/backends/redis.py
@@ -19,7 +19,8 @@ from celery.utils.functional import dictfilter
 from celery.utils.log import get_logger
 from celery.utils.time import humanize_seconds
 
-from . import async, base
+from .asynchronous import AsyncBackendMixin, BaseResultConsumer
+from .base import BaseKeyValueStoreBackend
 
 try:
     from urllib.parse import unquote
@@ -74,7 +75,7 @@ E_LOST = 'Connection to Redis lost: Retry (%s/%s) %s.'
 logger = get_logger(__name__)
 
 
-class ResultConsumer(async.BaseResultConsumer):
+class ResultConsumer(BaseResultConsumer):
     _pubsub = None
 
     def __init__(self, *args, **kwargs):
@@ -138,7 +139,7 @@ class ResultConsumer(async.BaseResultConsumer):
             self._pubsub.unsubscribe(key)
 
 
-class RedisBackend(base.BaseKeyValueStoreBackend, async.AsyncBackendMixin):
+class RedisBackend(BaseKeyValueStoreBackend, AsyncBackendMixin):
     """Redis task result store."""
 
     ResultConsumer = ResultConsumer
diff --git a/celery/backends/rpc.py b/celery/backends/rpc.py
index 6e31cef..5e6e407 100644
--- a/celery/backends/rpc.py
+++ b/celery/backends/rpc.py
@@ -17,7 +17,7 @@ from celery._state import current_task, task_join_will_block
 from celery.five import items, range
 
 from . import base
-from .async import AsyncBackendMixin, BaseResultConsumer
+from .asynchronous import AsyncBackendMixin, BaseResultConsumer
 
 __all__ = ('BacklogLimitExceeded', 'RPCBackend')
 
diff --git a/docs/internals/reference/celery.backends.async.rst b/docs/internals/reference/celery.backends.async.rst
deleted file mode 100644
index 03d10fe..0000000
--- a/docs/internals/reference/celery.backends.async.rst
+++ /dev/null
@@ -1,13 +0,0 @@
-=====================================
- ``celery.backends.async``
-=====================================
-
-.. contents::
-    :local:
-.. currentmodule:: celery.backends.async
-
-.. automodule:: celery.backends.async
-    :members:
-    :undoc-members:
-
-
diff --git a/docs/internals/reference/celery.backends.asynchronous.rst b/docs/internals/reference/celery.backends.asynchronous.rst
new file mode 100644
index 0000000..fef5242
--- /dev/null
+++ b/docs/internals/reference/celery.backends.asynchronous.rst
@@ -0,0 +1,13 @@
+=====================================
+ ``celery.backends.asynchronous``
+=====================================
+
+.. contents::
+    :local:
+.. currentmodule:: celery.backends.asynchronous
+
+.. automodule:: celery.backends.asynchronous
+    :members:
+    :undoc-members:
+
+
diff --git a/t/unit/backends/test_redis.py b/t/unit/backends/test_redis.py
index ee2ecf7..bd7277c 100644
--- a/t/unit/backends/test_redis.py
+++ b/t/unit/backends/test_redis.py
@@ -146,7 +146,7 @@ class test_RedisResultConsumer:
     def get_consumer(self):
         return self.get_backend().result_consumer
 
-    @patch('celery.backends.async.BaseResultConsumer.on_after_fork')
+    @patch('celery.backends.asynchronous.BaseResultConsumer.on_after_fork')
     def test_on_after_fork(self, parent_method):
         consumer = self.get_consumer()
         consumer.start('none')
@@ -172,7 +172,7 @@ class test_RedisResultConsumer:
         parent_method.assert_called_once()
 
     @patch('celery.backends.redis.ResultConsumer.cancel_for')
-    @patch('celery.backends.async.BaseResultConsumer.on_state_change')
+    @patch('celery.backends.asynchronous.BaseResultConsumer.on_state_change')
     def test_on_state_change(self, parent_method, cancel_for):
         consumer = self.get_consumer()
         meta = {'task_id': 'testing', 'status': states.SUCCESS}
