File: test_notify.py

package info (click to toggle)
odoo 18.0.0%2Bdfsg-2
  • links: PTS, VCS
  • area: main
  • in suites: sid
  • size: 878,716 kB
  • sloc: javascript: 927,937; python: 685,670; xml: 388,524; sh: 1,033; sql: 415; makefile: 26
file content (100 lines) | stat: -rw-r--r-- 4,340 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
# Part of Odoo. See LICENSE file for full copyright and licensing details.

import json
import selectors
import threading

import odoo
from odoo.tests import TransactionCase

from ..models.bus import json_dump, get_notify_payloads, NOTIFY_PAYLOAD_MAX_LENGTH, ODOO_NOTIFY_FUNCTION


class NotifyTests(TransactionCase):

    def test_get_notify_payloads(self):
        """
        Asserts that the implementation of `get_notify_payloads`
        actually splits correctly large payloads
        """
        def check_payloads_size(payloads):
            for payload in payloads:
                self.assertLess(len(payload.encode()), NOTIFY_PAYLOAD_MAX_LENGTH)

        channel = ('dummy_db', 'dummy_model', 12345)
        channels = [channel]
        self.assertLess(len(json_dump(channels).encode()), NOTIFY_PAYLOAD_MAX_LENGTH)
        payloads = get_notify_payloads(channels)
        self.assertEqual(len(payloads), 1,
                         "The payload is less then the threshold, "
                         "there should be 1 payload only, as it shouldn't be split")
        channels = [channel] * 100
        self.assertLess(len(json_dump(channels).encode()), NOTIFY_PAYLOAD_MAX_LENGTH)
        payloads = get_notify_payloads(channels)
        self.assertEqual(len(payloads), 1,
                         "The payload is less then the threshold, "
                         "there should be 1 payload only, as it shouldn't be split")
        check_payloads_size(payloads)
        channels = [channel] * 1000
        self.assertGreaterEqual(len(json_dump(channels).encode()), NOTIFY_PAYLOAD_MAX_LENGTH)
        payloads = get_notify_payloads(channels)
        self.assertGreater(len(payloads), 1,
                           "Payload was larger than the threshold, it should've been split")
        check_payloads_size(payloads)

        fat_channel = tuple(item * 1000 for item in channel)
        channels = [fat_channel]
        self.assertEqual(len(channels), 1, "There should be only 1 channel")
        self.assertGreaterEqual(len(json_dump(channels).encode()), NOTIFY_PAYLOAD_MAX_LENGTH)
        payloads = get_notify_payloads(channels)
        self.assertEqual(len(payloads), 1,
                         "Payload was larger than the threshold, but shouldn't be split, "
                         "as it contains only 1 channel")
        with self.assertRaises(AssertionError):
            check_payloads_size(payloads)

    def test_postcommit(self):
        """Asserts all ``postcommit`` channels are fetched with a single listen."""
        if ODOO_NOTIFY_FUNCTION != 'pg_notify':
            return
        channels = []
        stop_event = threading.Event()

        def single_listen():
            nonlocal channels
            with odoo.sql_db.db_connect(
                "postgres"
            ).cursor() as cr, selectors.DefaultSelector() as sel:
                cr.execute("listen imbus")
                cr.commit()
                conn = cr._cnx
                sel.register(conn, selectors.EVENT_READ)
                while sel.select(timeout=5) and not stop_event.is_set():
                    conn.poll()
                    if notify_channels := [
                        c
                        for c in json.loads(conn.notifies.pop().payload)
                        if c[0] == self.env.cr.dbname
                    ]:
                        channels = notify_channels
                        break

        thread = threading.Thread(target=single_listen)
        thread.start()

        self.env["bus.bus"].search([]).unlink()
        self.env["bus.bus"]._sendone("channel 1", "test 1", {})
        self.env["bus.bus"]._sendone("channel 2", "test 2", {})
        self.env["bus.bus"]._sendone("channel 1", "test 3", {})
        self.assertEqual(self.env["bus.bus"].search_count([]), 0)
        self.assertEqual(channels, [])
        self.env.cr.precommit.run()  # trigger the creation of bus.bus records
        self.assertEqual(self.env["bus.bus"].search_count([]), 3)
        self.assertEqual(channels, [])
        self.env.cr.postcommit.run()  # notify
        thread.join(timeout=5)
        stop_event.set()
        self.assertEqual(self.env["bus.bus"].search_count([]), 3)
        self.assertEqual(
            channels, [[self.env.cr.dbname, "channel 1"], [self.env.cr.dbname, "channel 2"]]
        )