File: utf8validator.pyx

package info (click to toggle)
python-wsaccel 0.6.3-5
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid, trixie
  • size: 160 kB
  • sloc: python: 370; makefile: 41; ansic: 16
file content (109 lines) | stat: -rw-r--r-- 3,802 bytes parent folder | download | duplicates (2)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
# coding=utf-8

###############################################################################
##
##  Copyright 2011 Tavendo GmbH
##
##  Note:
##
##  This code is a Python implementation of the algorithm
##
##            "Flexible and Economical UTF-8 Decoder"
##
##  by Bjoern Hoehrmann
##
##       bjoern@hoehrmann.de
##       http://bjoern.hoehrmann.de/utf-8/decoder/dfa/
##
##  Licensed under the Apache License, Version 2.0 (the "License");
##  you may not use this file except in compliance with the License.
##  You may obtain a copy of the License at
##
##      http://www.apache.org/licenses/LICENSE-2.0
##
##  Unless required by applicable law or agreed to in writing, software
##  distributed under the License is distributed on an "AS IS" BASIS,
##  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
##  See the License for the specific language governing permissions and
##  limitations under the License.
##
###############################################################################

## DFA transitions
cdef extern from "dfa.h":
    cdef char* UTF8VALIDATOR_DFA

DEF _UTF8_ACCEPT = 0
DEF _UTF8_REJECT = 1

cdef class Utf8Validator(object):
    """
    Incremental UTF-8 validator with constant memory consumption (minimal state).

    Implements the algorithm "Flexible and Economical UTF-8 Decoder" by
    Bjoern Hoehrmann (http://bjoern.hoehrmann.de/utf-8/decoder/dfa/).
    """

    UTF8_ACCEPT = 0
    UTF8_REJECT = 1

    cdef public int i
    cdef public int state
    cdef public int codepoint

    def __init__(self):
        self.reset()

    def decode(self, b):
        """
        Eat one UTF-8 octet, and validate on the fly.

        Returns UTF8_ACCEPT when enough octets have been consumed, in which case
        self.codepoint contains the decoded Unicode code point.

        Returns UTF8_REJECT when invalid UTF-8 was encountered.

        Returns some other positive integer when more octets need to be eaten.
        """
        cdef int type = UTF8VALIDATOR_DFA[b]
        if self.state != _UTF8_ACCEPT:
            self.codepoint = (b & 0x3f) | (self.codepoint << 6)
        else:
            self.codepoint = (0xff >> type) & b
        self.state = UTF8VALIDATOR_DFA[256 + self.state * 16 + type]
        return self.state

    def reset(self):
        """
        Reset validator to start new incremental UTF-8 decode/validation.
        """
        self.state = _UTF8_ACCEPT
        self.codepoint = 0
        self.i = 0

    def validate(self, const unsigned char [:] buf):
        """
        Incrementally validate a chunk of bytes provided as bytearray.

        Will return a quad (valid?, endsOnCodePoint?, currentIndex, totalIndex).

        As soon as an octet is encountered which renders the octet sequence
        invalid, a quad with valid? == False is returned. currentIndex returns
        the index within the currently consumed chunk, and totalIndex the
        index within the total consumed sequence that was the point of bail out.
        When valid? == True, currentIndex will be len(buf) and totalIndex the
        total amount of consumed bytes.
        """
        cdef int state = self.state
        cdef int i=0, b
        for i, b in enumerate(buf):
            ## optimized version of decode(), since we are not interested in actual code points
            state = UTF8VALIDATOR_DFA[256 + (state << 4) + UTF8VALIDATOR_DFA[b]]
            if state == _UTF8_REJECT:
                self.i += i
                self.state = state
                return False, False, i, self.i
        i = len(buf)
        self.i += i
        self.state = state
        return True, state == _UTF8_ACCEPT, i, self.i