File: runner.py

package info (click to toggle)
python-molotov 2.7-3
  • links: PTS, VCS
  • area: main
  • in suites: sid
  • size: 8,268 kB
  • sloc: python: 4,121; makefile: 60
file content (229 lines) | stat: -rw-r--r-- 7,285 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
import asyncio
import functools
import os
import signal

import multiprocess

from molotov.api import get_fixture
from molotov.listeners import EventSender
from molotov.shared import Counters, Tasks
from molotov.stats import get_statsd_client
from molotov.util import (
    cancellable_sleep,
    event_loop,
    is_stopped,
    set_timer,
    stop,
)
from molotov.worker import Worker


class Runner:
    """Manages processes & workers and grabs results."""

    def __init__(self, args, loop=None):
        self.args = args
        self.console = self.args.shared_console
        if loop is None:
            loop = event_loop()
        self.loop = loop
        # the stastd client gets initialized after we fork
        # processes in case -p was used
        self.statsd = None
        self._tasks = Tasks()
        self._procs = []
        self._results = Counters(
            "WORKER",
            "REACHED",
            "RATIO",
            "OK",
            "FAILED",
            "MINUTE_OK",
            "MINUTE_FAILED",
            "MAX_WORKERS",
            "SETUP_FAILED",
            "SESSION_SETUP_FAILED",
            "PROCESS",
        )
        self.eventer = EventSender(self.console)

    def _set_statsd(self):
        if self.args.statsd:
            self.statsd = get_statsd_client(self.args.statsd_address)
        else:
            self.statsd = None

    def gather(self, *futures):
        return asyncio.gather(*futures, return_exceptions=True)

    def __call__(self):
        global_setup = get_fixture("global_setup")
        if global_setup is not None:
            try:
                global_setup(self.args)
            except Exception as e:
                self.console.print("The global_setup() fixture failed")
                self.console.print_error(e)
                raise

        if not self.args.quiet:
            self._tasks.ensure_future(self._display_results(self.args.console_update))

        self._tasks.ensure_future(self._send_workers_event(1))
        try:
            return self._launch_processes()
        finally:
            global_teardown = get_fixture("global_teardown")
            if global_teardown is not None:
                try:
                    global_teardown()
                except Exception as e:
                    # we can't stop the teardown process and the ui is down
                    print(e)

            self._shutdown()

    def _launch_processes(self):
        args = self.args
        self.loop.add_signal_handler(signal.SIGTERM, self._shutdown)
        self.loop.add_signal_handler(
            signal.SIGINT, functools.partial(os.kill, os.getpid(), signal.SIGTERM)
        )
        args.original_pid = os.getpid()

        if args.processes > 1:
            if not args.quiet:
                self.console.print("Forking %d processes" % args.processes)
            jobs = []
            for _i in range(args.processes):
                p = multiprocess.Process(target=self._process)
                jobs.append(p)
                p.start()
                self._results["PROCESS"] += 1

            for job in jobs:
                self._procs.append(job)

            async def run(quiet, console):
                while len(self._procs) > 0:
                    for job in jobs:
                        if job.exitcode is not None and job in self._procs:
                            self._procs.remove(job)
                            self._results["PROCESS"] -= 1
                    await cancellable_sleep(args.console_update)
                await self.eventer.stop()

            try:
                self.loop.run_until_complete(run(args.quiet, self.console))
            finally:
                stop()
                self.loop.run_until_complete(self._tasks.cancel_all())
        else:
            self._results["PROCESS"] = 1
            self._process()

        return self._results

    def _shutdown(self):
        if is_stopped():
            return
        stop()
        # send sigterms
        for proc in self._procs:
            proc.terminate()

    def create_workers(self):
        args = self.args

        def _prepare():
            tasks = []
            delay = 0
            if args.ramp_up > 0.0:
                step = args.ramp_up / args.workers
            else:
                step = 0.0
            for i in range(self.args.workers):
                worker = Worker(
                    i,
                    self._results,
                    self.console,
                    self.args,
                    self.statsd,
                    delay,
                    self.loop,
                )

                tasks.append(asyncio.ensure_future(worker.run()))
                delay += step
            return tasks

        if self.args.quiet:
            return _prepare()
        else:
            msg = "Preparing {} worker{}"
            msg = msg.format(args.workers, "s" if args.workers > 1 else "")
            return self.console.print_block(msg, _prepare)

    def _process(self):
        set_timer()

        # coroutine that will kill everything when duration is up
        if self.args.duration and self.args.force_shutdown:

            async def _duration_killer():
                cancelled = object()
                res = await cancellable_sleep(self.args.duration, result=cancelled)
                await self.eventer.stop()

                if res is cancelled or (res and not res.canceled()):
                    self._shutdown()
                    await asyncio.sleep(0)

            self._tasks.ensure_future(_duration_killer())

        if self.args.processes > 1:
            self.loop = asyncio.new_event_loop()
            asyncio.set_event_loop(self.loop)
            self.loop.add_signal_handler(signal.SIGTERM, self._shutdown)

        if self.args.debug:
            self.console.print("**** RUNNING IN DEBUG MODE == SLOW ****")
            self.loop.set_debug(True)

        self._set_statsd()

        if self.args.original_pid == os.getpid():
            self._tasks.ensure_future(self._send_workers_event(1))

        def _stop(*args):
            stop()

        workers_tasks = self.create_workers()
        gathered = self.gather(*workers_tasks)
        gathered.add_done_callback(_stop)
        try:
            self.loop.run_until_complete(gathered)
        finally:
            if self.statsd is not None and not self.statsd.disconnected:
                self.loop.run_until_complete(self._tasks.ensure_future(self.statsd.close()))
            self.loop.run_until_complete(self._tasks.cancel_all())
            self.loop.close()

    async def _display_results(self, update_interval):
        if self.args.original_pid != os.getpid():
            raise OSError("Wrong process")

        await self.console.start()

        while not is_stopped():
            self.console.print_results(self._results.to_dict())
            await cancellable_sleep(update_interval)

        await self.console.stop()

    async def _send_workers_event(self, update_interval):
        while not self.eventer.stopped() and not is_stopped():
            workers = self._results["WORKER"].value
            await self.eventer.send_event("current_workers", workers=workers)
            await cancellable_sleep(update_interval)