File: application.py

package info (click to toggle)
python-django-channels 4.3.1-1
  • links: PTS, VCS
  • area: main
  • in suites: sid
  • size: 1,036 kB
  • sloc: python: 3,109; makefile: 155; javascript: 60; sh: 8
file content (17 lines) | stat: -rw-r--r-- 533 bytes parent folder | download | duplicates (2)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
from unittest import mock

from asgiref.testing import ApplicationCommunicator as BaseApplicationCommunicator


def no_op():
    pass


class ApplicationCommunicator(BaseApplicationCommunicator):
    async def send_input(self, message):
        with mock.patch("channels.db.close_old_connections", no_op):
            return await super().send_input(message)

    async def receive_output(self, timeout=1):
        with mock.patch("channels.db.close_old_connections", no_op):
            return await super().receive_output(timeout)