File: test_PROTON_1709_application_event_object_leak.py

package info (click to toggle)
qpid-proton 0.37.0-7
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid
  • size: 18,384 kB
  • sloc: ansic: 37,828; cpp: 37,140; python: 15,302; ruby: 6,018; xml: 477; sh: 320; pascal: 52; makefile: 18
file content (95 lines) | stat: -rw-r--r-- 2,962 bytes parent folder | download | duplicates (3)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements.  See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership.  The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License.  You may obtain a copy of the License at
#
#   http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied.  See the License for the
# specific language governing permissions and limitations
# under the License
#

"""
PROTON-1709 [python] ApplicationEvent causing memory growth
"""

from __future__ import absolute_import
from __future__ import division
from __future__ import print_function

import gc
import platform
import threading
import unittest

import proton
from proton.handlers import MessagingHandler
from proton.reactor import Container, ApplicationEvent, EventInjector


class Program(MessagingHandler):
    def __init__(self, injector):
        self.injector = injector
        self.counter = 0
        self.on_start_ = threading.Event()

    def on_reactor_init(self, event):
        event.reactor.selectable(self.injector)
        self.on_start_.set()

    def on_count_up(self, event):
        self.counter += 1
        gc.collect()

    def on_done(self, event):
        event.subject.stop()


class Proton1709Test(unittest.TestCase):
    @unittest.skipIf(platform.system() == 'Windows', "TODO jdanek: Test is broken on Windows")
    def test_application_event_no_object_leaks(self):
        event_types_count = len(proton.EventType.TYPES)

        injector = EventInjector()
        p = Program(injector)
        c = Container(p)
        t = threading.Thread(target=c.run)
        t.start()

        p.on_start_.wait()

        object_counts = []

        gc.collect()
        object_counts.append(len(gc.get_objects()))

        for i in range(100):
            injector.trigger(ApplicationEvent("count_up"))

        gc.collect()
        object_counts.append(len(gc.get_objects()))

        self.assertEqual(len(proton.EventType.TYPES), event_types_count + 1)

        injector.trigger(ApplicationEvent("done", subject=c))
        self.assertEqual(len(proton.EventType.TYPES), event_types_count + 2)

        t.join()

        gc.collect()
        object_counts.append(len(gc.get_objects()))

        self.assertEqual(p.counter, 100)

        self.assertTrue(object_counts[1] - object_counts[0] <= 220,
                        "Object counts should not be increasing too fast: {0}".format(object_counts))
        self.assertTrue(object_counts[2] - object_counts[0] <= 10,
                        "No objects should be leaking at the end: {0}".format(object_counts))