File: benchmark.py

package info (click to toggle)
kitty 0.42.1-1
  • links: PTS, VCS
  • area: main
  • in suites: experimental
  • size: 28,564 kB
  • sloc: ansic: 82,787; python: 55,191; objc: 5,122; sh: 1,295; xml: 364; makefile: 143; javascript: 78
file content (90 lines) | stat: -rwxr-xr-x 2,664 bytes parent folder | download | duplicates (2)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
#!./kitty/launcher/kitty +launch
# License: GPL v3 Copyright: 2016, Kovid Goyal <kovid at kovidgoyal.net>

import fcntl
import io
import os
import select
import signal
import struct
import sys
import termios
import time
from pty import CHILD, fork

from kitty.constants import kitten_exe
from kitty.fast_data_types import Screen, safe_pipe
from kitty.utils import read_screen_size


def run_parsing_benchmark(cell_width: int = 10, cell_height: int = 20, scrollback: int = 20000) -> None:
    isatty = sys.stdout.isatty()
    if isatty:
        sz = read_screen_size()
        columns, rows = sz.cols, sz.rows
    else:
        columns, rows = 80, 25
    child_pid, master_fd = fork()
    is_child = child_pid == CHILD
    argv = [kitten_exe(), '__benchmark__', '--with-scrollback']
    if is_child:
        while read_screen_size().width != columns * cell_width:
            time.sleep(0.01)
        signal.pthread_sigmask(signal.SIG_SETMASK, ())
        os.execvp(argv[0], argv)
    # os.set_blocking(master_fd, False)
    x_pixels = columns * cell_width
    y_pixels = rows * cell_height
    s = struct.pack('HHHH', rows, columns, x_pixels, y_pixels)
    fcntl.ioctl(master_fd, termios.TIOCSWINSZ, s)

    write_buf = b''
    r_pipe, w_pipe = safe_pipe(True)
    class ToChild:
        def write(self, x: bytes | str) -> None:
            nonlocal write_buf
            if isinstance(x, str):
                x = x.encode()
            write_buf += x
            os.write(w_pipe, b'1')

    screen = Screen(None, rows, columns, scrollback, cell_width, cell_height, 0, ToChild())

    def parse_bytes(data: bytes) -> None:
        data = memoryview(data)
        while data:
            dest = screen.test_create_write_buffer()
            s = screen.test_commit_write_buffer(data, dest)
            data = data[s:]
            screen.test_parse_written_data()


    while True:
        rd, wd, _ = select.select([master_fd, r_pipe], [master_fd] if write_buf else [], [])
        if r_pipe in rd:
            os.read(r_pipe, 256)
        if master_fd in rd:
            try:
                data = os.read(master_fd, io.DEFAULT_BUFFER_SIZE)
            except OSError:
                data = b''
            if not data:
                break
            parse_bytes(data)
        if master_fd in wd:
            n = os.write(master_fd, write_buf)
            write_buf = write_buf[n:]
    if isatty:
        lines: list[str] = []
        screen.linebuf.as_ansi(lines.append)
        sys.stdout.write(''.join(lines))
    else:
        sys.stdout.write(str(screen.linebuf))


def main() -> None:
    run_parsing_benchmark()


if __name__ == '__main__':
    main()