File: cdp.py

package info (click to toggle)
cockpit 239-1
  • links: PTS, VCS
  • area: main
  • in suites: bullseye
  • size: 67,268 kB
  • sloc: javascript: 245,474; ansic: 72,273; python: 23,634; xml: 6,155; sh: 2,919; makefile: 923; sed: 5
file content (320 lines) | stat: -rw-r--r-- 11,973 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
import fcntl
import glob
import json
import os
import random
import resource
import shutil
import socket
import subprocess
import sys
import tempfile
import time

TEST_DIR = os.path.normpath(os.path.dirname(os.path.realpath(os.path.join(__file__, ".."))))


def browser_path(browser, show_browser):
    if browser == "chromium":
        return browser_path_chromium(show_browser)
    elif browser == "firefox":
        return browser_path_firefox()
    else:
        raise SystemError("Unsupported browser")


def browser_path_firefox():
    """ Return path to Firefox browser """
    p = subprocess.check_output("which firefox-nightly || which firefox || true",
                                shell=True, universal_newlines=True).strip()
    if p:
        return p
    return None


def browser_path_chromium(show_browser):
    """Return path to chromium browser.

    Support the following locations:
     - /usr/lib*/chromium-browser/headless_shell (chromium-headless RPM)
     - "chromium-browser", "chromium", or "google-chrome"  in $PATH (distro package)
     - node_modules/chromium/lib/chromium/chrome-linux/chrome (npm install chromium)

    Exit with an error if none is found.
    """

    # If we want to have interactive chromium, we don't want to use headless_shell
    if not show_browser:
        g = glob.glob("/usr/lib*/chromium-browser/headless_shell")
        if g:
            return g[0]

    p = subprocess.check_output("which chromium-browser || which chromium || which google-chrome || true",
                                shell=True, universal_newlines=True).strip()
    if p:
        return p

    p = os.path.join(os.path.dirname(TEST_DIR), "node_modules/chromium/lib/chromium/chrome-linux/chrome")
    if os.access(p, os.X_OK):
        return p

    return None


def jsquote(str):
    return json.dumps(str)


class CDP:
    def __init__(self, lang=None, verbose=False, trace=False, inject_helpers=[]):
        self.lang = lang
        self.timeout = 60
        self.valid = False
        self.verbose = verbose
        self.trace = trace
        self.inject_helpers = inject_helpers
        self.browser = os.environ.get("TEST_BROWSER", "chromium")
        self.show_browser = bool(os.environ.get("TEST_SHOW_BROWSER", ""))
        self.download_dir = tempfile.mkdtemp()
        self._driver = None
        self._browser = None
        self._browser_home = None
        self._browser_path = None
        self._cdp_port_lockfile = None

    def invoke(self, fn, **kwargs):
        """Call a particular CDP method such as Runtime.evaluate

        Use command() for arbitrary JS code.
        """
        trace = self.trace and not kwargs.get("no_trace", False)
        try:
            del kwargs["no_trace"]
        except KeyError:
            pass

        cmd = fn + "(" + json.dumps(kwargs) + ")"

        # frame support for Runtime.evaluate(): map frame name to
        # executionContextId and insert into argument object; this must not be quoted
        # see "Frame tracking" in cdp-driver.js for how this works
        if fn == 'Runtime.evaluate':
            cmd = "%s, contextId: getFrameExecId(%s)%s" % (cmd[:-2], jsquote(self.cur_frame), cmd[-2:])

        if trace:
            print("-> " + kwargs.get('trace', cmd))

        # avoid having to write the "client." prefix everywhere
        cmd = "client." + cmd
        res = self.command(cmd)
        if trace:
            if "result" in res:
                print("<- " + repr(res["result"]))
            else:
                print("<- " + repr(res))
        return res

    def command(self, cmd):
        if not self._driver:
            self.start()
        self._driver.stdin.write(cmd.encode("UTF-8"))
        self._driver.stdin.write(b"\n")
        self._driver.stdin.flush()
        line = self._driver.stdout.readline().decode("UTF-8")
        if not line:
            self.kill()
            raise RuntimeError("CDP broken")
        try:
            res = json.loads(line)
        except ValueError:
            print(line.strip())
            raise

        if "error" in res:
            if self.trace:
                print("<- raise %s" % str(res["error"]))
            raise RuntimeError(res["error"])
        return res["result"]

    def claim_port(self, port):
        f = None
        try:
            f = open(os.path.join(tempfile.gettempdir(), ".cdp-%i.lock" % port), "w")
            fcntl.flock(f, fcntl.LOCK_EX | fcntl.LOCK_NB)
            self._cdp_port_lockfile = f
            return True
        except (IOError, OSError):
            if f:
                f.close()
            return False

    def find_cdp_port(self):
        """Find an unused port and claim it through lock file"""

        for retry in range(100):
            # don't use the default CDP port 9222 to avoid interfering with running browsers
            port = random.randint(9223, 10222)
            if self.claim_port(port):
                return port
        else:
            raise RuntimeError("unable to find free port")

    def get_browser_path(self):
        if self._browser_path is None:
            self._browser_path = browser_path(self.browser, self.show_browser)

        return self._browser_path

    def browser_cmd(self, cdp_port, env):
        exe = self.get_browser_path()
        if not exe:
            raise SystemError(self.browser + " is not installed")

        if self.browser == "chromium":
            return [exe, "--headless" if not self.show_browser else "", "--disable-gpu", "--no-sandbox", "--disable-setuid-sandbox",
                    "--disable-namespace-sandbox", "--disable-seccomp-filter-sandbox",
                    "--disable-sandbox-denial-logging", "--disable-pushstate-throttle",
                    "--v=0", "--window-size=1920x1200", "--remote-debugging-port=%i" % cdp_port, "about:blank"]
        elif self.browser == "firefox":
            subprocess.Popen([exe, "--headless", "--no-remote", "-CreateProfile", "blank"], env=env).communicate()
            profile = glob.glob(os.path.join(self._browser_home, ".mozilla/firefox/*.blank"))[0]

            with open(os.path.join(profile, "user.js"), "w") as f:
                f.write("""
                    user_pref("remote.enabled", true);
                    user_pref("remote.frames.enabled", true);
                    user_pref("app.update.auto", false);
                    user_pref("datareporting.policy.dataSubmissionEnabled", false);
                    user_pref("toolkit.telemetry.reportingpolicy.firstRun", false);
                    user_pref("dom.disable_beforeunload", true);
                    user_pref("browser.download.dir", "{0}");
                    user_pref("browser.download.folderList", 2);
                    user_pref("signon.rememberSignons", false);
                    user_pref("dom.navigation.locationChangeRateLimit.count", 9999);
                    """.format(self.download_dir))

            with open(os.path.join(profile, "handlers.json"), "w") as f:
                f.write('{"defaultHandlersVersion":{"en-US":4},"mimeTypes":{"application/xz":{"action":0,"extensions":["xz"]}}}')

            cmd = [exe, "-P", "blank", "--window-size=1920,1200", "--remote-debugging-port=%i" % cdp_port, "--no-remote", "localhost"]
            if not self.show_browser:
                cmd.insert(3, "--headless")
            return cmd

    def start(self):
        environ = os.environ.copy()
        if self.lang:
            environ["LC_ALL"] = self.lang
        self.cur_frame = None

        # allow attaching to external browser
        cdp_port = None
        if "TEST_CDP_PORT" in os.environ:
            p = int(os.environ["TEST_CDP_PORT"])
            if self.claim_port(p):
                # can fail when a test starts multiple browsers; only show the first one
                cdp_port = p

        if not cdp_port:
            # start browser on a new port
            cdp_port = self.find_cdp_port()
            self._browser_home = tempfile.mkdtemp()
            environ = os.environ.copy()
            environ["HOME"] = self._browser_home
            environ["LC_ALL"] = "en_US.UTF-8"
            # this might be set for the tests themselves, but we must isolate caching between tests
            try:
                del environ["XDG_CACHE_HOME"]
            except KeyError:
                pass

            # sandboxing does not work in Docker container
            self._browser = subprocess.Popen(
                self.browser_cmd(cdp_port, environ), env=environ, close_fds=True,
                preexec_fn=lambda: resource.setrlimit(resource.RLIMIT_CORE, (0, 0)))
            if self.verbose:
                sys.stderr.write("Started %s (pid %i) on port %i\n" % (self._browser_path, self._browser.pid, cdp_port))

        # wait for CDP to be up
        s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
        for retry in range(3000):
            try:
                s.connect(('127.0.0.1', cdp_port))
                break
            except socket.error:
                time.sleep(0.1)
        else:
            raise RuntimeError('timed out waiting for browser to start')

        # now start the driver
        if self.trace:
            # enable frame/execution context debugging if tracing is on
            environ["TEST_CDP_DEBUG"] = "1"
        self._driver = subprocess.Popen(["{0}/{1}-cdp-driver.js".format(os.path.dirname(__file__), self.browser), str(cdp_port)],
                                        env=environ,
                                        stdout=subprocess.PIPE,
                                        stdin=subprocess.PIPE,
                                        close_fds=True)
        self.valid = True

        for inject in self.inject_helpers:
            with open(inject) as f:
                src = f.read()
            # HACK: injecting sizzle fails on missing `document` in assert()
            src = src.replace('function assert( fn ) {', 'function assert( fn ) { if (true) return true; else ')
            # HACK: sizzle tracks document and when we switch frames, it sees the old document
            # although we execute it in different context.
            if (self.browser == "firefox"):
                src = src.replace('context = context || document;', 'context = context || window.document;')
            self.invoke("Page.addScriptToEvaluateOnNewDocument", source=src, no_trace=True)

    def kill(self):
        self.valid = False
        self.cur_frame = None
        if self._driver:
            self._driver.stdin.close()
            self._driver.wait()
            self._driver = None

        shutil.rmtree(self.download_dir, ignore_errors=True)

        if self._browser:
            if self.verbose:
                sys.stderr.write("Killing browser (pid %i)\n" % self._browser.pid)
            try:
                self._browser.terminate()
            except OSError:
                pass  # ignore if it crashed for some reason
            self._browser.wait()
            self._browser = None
            shutil.rmtree(self._browser_home, ignore_errors=True)
            os.remove(self._cdp_port_lockfile.name)
            self._cdp_port_lockfile.close()

    def set_frame(self, frame):
        self.cur_frame = frame
        if self.trace:
            print("-> switch to frame %s" % frame)

    def get_js_log(self):
        """Return the current javascript console log"""

        if self.valid:
            # needs to be wrapped in Promise
            messages = self.command("Promise.resolve(messages)")
            return map(lambda m: "%s: %s" % tuple(m), messages)
        return []

    def read_log(self):
        """Returns an iterator that produces log messages one by one.

        Blocks if there are no new messages right now."""

        if not self.valid:
            yield []
            return

        while True:
            messages = self.command("waitLog()")
            for m in messages:
                yield m