File: push.re

package info (click to toggle)
re2c 4.4-1
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid
  • size: 51,512 kB
  • sloc: cpp: 34,160; ml: 8,494; sh: 5,311; makefile: 1,014; haskell: 611; python: 431; ansic: 234; javascript: 113
file content (129 lines) | stat: -rw-r--r-- 3,466 bytes parent folder | download | duplicates (2)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
# re2py $INPUT -o $OUTPUT -f

from enum import Enum
import os

# Use a small buffer to cover the case when a lexeme doesn't fit.
# In real world use a larger buffer.
BUFSIZE = 10
DEBUG = False

class State:
    def __init__(self, file):
        self.file = file
        self.yyinput = bytearray(BUFSIZE)
        self.yylimit = BUFSIZE - 1 # exclude terminating null
        self.yycursor = self.yylimit
        self.yymarker = self.yylimit
        self.token = self.yylimit
        self.yystate = -1

class Status(Enum):
    END = 0
    READY = 1
    WAITING = 2
    BIG_PACKET = 3
    BAD_PACKET = 4

def fill(st):
    # Error: lexeme too long. In real life could reallocate a larger buffer.
    if st.token < 1:
        return Status.BIG_PACKET

    # Shift buffer contents (discard everything up to the current token).
    st.yyinput = st.yyinput[st.token:st.yylimit]
    st.yycursor -= st.token;
    st.yymarker -= st.token;
    st.yylimit -= st.token;
    st.token = 0;

    # Fill free space at the end of buffer with new data from file.
    bytes = st.file.read(BUFSIZE - st.yylimit - 1) # -1 for sentinel
    if bytes:
        st.yylimit += len(bytes);
        st.yyinput += bytes

    st.yyinput += b'\0' # append sentinel

    return Status.READY

def lex(yyrecord, recv):
    while True:
        yyrecord.token = yyrecord.yycursor
    %{
        re2c:api = record;
        re2c:YYFILL = "return Status.WAITING, recv";
        re2c:eof = 0;
        re2c:indent:top = 2;

        packet = [a-z]+[;];

        *      { return Status.BAD_PACKET, recv }
        $      { return Status.END, recv }
        packet {
            recv += 1
            break
        }
    %}

def test(packets, expect):
    # Create a pipe (open the same file for reading and writing).
    fname = "pipe"
    fw = open(fname, "wb")
    fr = open(fname, "rb")

    # Initialize lexer state
    st = State(fr)

    # Main loop. The buffer contains incomplete data which appears packet by
    # packet. When the lexer needs more input it saves its internal state and
    # returns to the caller which should provide more input and resume lexing.
    send = 0
    recv = 0
    while True:
        status, recv = lex(st, recv)

        if status == Status.END:
            if DEBUG: print("done: got {} packets".format(recv))
            break

        elif status == Status.WAITING:
            if DEBUG: print("waiting...");

            if send < len(packets):
                if DEBUG: print("sent packet {}: {}".format(send, packets[send]))
                fw.write(packets[send])
                fw.flush()
                send += 1

            status = fill(st)
            if DEBUG: print("queue: '{}', status: {}".format(st.yyinput, status))
            if status == Status.BIG_PACKET:
                if DEBUG: print("error: packet too big")
                break

            assert status == Status.READY

        else:
            assert status == Status.BAD_PACKET
            if DEBUG: print("error: ill-formed packet")
            break

    # Check results.
    assert status == expect
    if status == Status.END:
        assert recv == send

    # Cleanup: remove input file.
    fr.close()
    fw.close()
    os.remove(fname)

def main():
    test([], Status.END)
    test([b"zero;", b"one;", b"two;", b"three;", b"four;"], Status.END)
    test([b"zer0;"], Status.BAD_PACKET)
    test([b"goooooooooogle;"], Status.BIG_PACKET)

if __name__ == '__main__':
    main()