File: pydev_runfiles_parallel_client.py

package info (click to toggle)
pydevd 3.3.0%2Bds-4
  • links: PTS, VCS
  • area: main
  • in suites: trixie
  • size: 13,892 kB
  • sloc: python: 77,508; cpp: 1,869; sh: 368; makefile: 50; ansic: 4
file content (194 lines) | stat: -rw-r--r-- 7,712 bytes parent folder | download | duplicates (2)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
from _pydev_bundle.pydev_imports import xmlrpclib, _queue

Queue = _queue.Queue
import traceback
import sys
from _pydev_runfiles.pydev_runfiles_coverage import start_coverage_support_from_params
import threading


# =======================================================================================================================
# ParallelNotification
# =======================================================================================================================
class ParallelNotification(object):
    def __init__(self, method, args, kwargs):
        self.method = method
        self.args = args
        self.kwargs = kwargs

    def to_tuple(self):
        return self.method, self.args, self.kwargs


# =======================================================================================================================
# KillServer
# =======================================================================================================================
class KillServer(object):
    pass


# =======================================================================================================================
# ServerComm
# =======================================================================================================================
class ServerComm(threading.Thread):
    def __init__(self, job_id, server):
        self.notifications_queue = Queue()
        threading.Thread.__init__(self)
        self.setDaemon(False)  # Wait for all the notifications to be passed before exiting!
        assert job_id is not None
        assert port is not None
        self.job_id = job_id

        self.finished = False
        self.server = server

    def run(self):
        while True:
            kill_found = False
            commands = []
            command = self.notifications_queue.get(block=True)
            if isinstance(command, KillServer):
                kill_found = True
            else:
                assert isinstance(command, ParallelNotification)
                commands.append(command.to_tuple())

            try:
                while True:
                    command = self.notifications_queue.get(block=False)  # No block to create a batch.
                    if isinstance(command, KillServer):
                        kill_found = True
                    else:
                        assert isinstance(command, ParallelNotification)
                        commands.append(command.to_tuple())
            except:
                pass  # That's OK, we're getting it until it becomes empty so that we notify multiple at once.

            if commands:
                try:
                    # Batch notification.
                    self.server.lock.acquire()
                    try:
                        self.server.notifyCommands(self.job_id, commands)
                    finally:
                        self.server.lock.release()
                except:
                    traceback.print_exc()

            if kill_found:
                self.finished = True
                return


# =======================================================================================================================
# ServerFacade
# =======================================================================================================================
class ServerFacade(object):
    def __init__(self, notifications_queue):
        self.notifications_queue = notifications_queue

    def notifyTestsCollected(self, *args, **kwargs):
        pass  # This notification won't be passed

    def notifyTestRunFinished(self, *args, **kwargs):
        pass  # This notification won't be passed

    def notifyStartTest(self, *args, **kwargs):
        self.notifications_queue.put_nowait(ParallelNotification("notifyStartTest", args, kwargs))

    def notifyTest(self, *args, **kwargs):
        self.notifications_queue.put_nowait(ParallelNotification("notifyTest", args, kwargs))


# =======================================================================================================================
# run_client
# =======================================================================================================================
def run_client(job_id, port, verbosity, coverage_output_file, coverage_include):
    job_id = int(job_id)

    from _pydev_bundle import pydev_localhost

    server = xmlrpclib.Server("http://%s:%s" % (pydev_localhost.get_localhost(), port))
    server.lock = threading.Lock()

    server_comm = ServerComm(job_id, server)
    server_comm.start()

    try:
        server_facade = ServerFacade(server_comm.notifications_queue)
        from _pydev_runfiles import pydev_runfiles
        from _pydev_runfiles import pydev_runfiles_xml_rpc

        pydev_runfiles_xml_rpc.set_server(server_facade)

        # Starts None and when the 1st test is gotten, it's started (because a server may be initiated and terminated
        # before receiving any test -- which would mean a different process got all the tests to run).
        coverage = None

        try:
            tests_to_run = [1]
            while tests_to_run:
                # Investigate: is it dangerous to use the same xmlrpclib server from different threads?
                # It seems it should be, as it creates a new connection for each request...
                server.lock.acquire()
                try:
                    tests_to_run = server.GetTestsToRun(job_id)
                finally:
                    server.lock.release()

                if not tests_to_run:
                    break

                if coverage is None:
                    _coverage_files, coverage = start_coverage_support_from_params(None, coverage_output_file, 1, coverage_include)

                files_to_tests = {}
                for test in tests_to_run:
                    filename_and_test = test.split("|")
                    if len(filename_and_test) == 2:
                        files_to_tests.setdefault(filename_and_test[0], []).append(filename_and_test[1])

                configuration = pydev_runfiles.Configuration(
                    "",
                    verbosity,
                    None,
                    None,
                    None,
                    files_to_tests,
                    1,  # Always single job here
                    None,
                    # The coverage is handled in this loop.
                    coverage_output_file=None,
                    coverage_include=None,
                )
                test_runner = pydev_runfiles.PydevTestRunner(configuration)
                sys.stdout.flush()
                test_runner.run_tests(handle_coverage=False)
        finally:
            if coverage is not None:
                coverage.stop()
                coverage.save()

    except:
        traceback.print_exc()
    server_comm.notifications_queue.put_nowait(KillServer())


# =======================================================================================================================
# main
# =======================================================================================================================
if __name__ == "__main__":
    if len(sys.argv) - 1 == 3:
        job_id, port, verbosity = sys.argv[1:]
        coverage_output_file, coverage_include = None, None

    elif len(sys.argv) - 1 == 5:
        job_id, port, verbosity, coverage_output_file, coverage_include = sys.argv[1:]

    else:
        raise AssertionError("Could not find out how to handle the parameters: " + sys.argv[1:])

    job_id = int(job_id)
    port = int(port)
    verbosity = int(verbosity)
    run_client(job_id, port, verbosity, coverage_output_file, coverage_include)