File: utf8validator.py

package info (click to toggle)
django-websocket-redis 0.4.7-2
  • links: PTS, VCS
  • area: main
  • in suites: bullseye
  • size: 204 kB
  • sloc: python: 885; makefile: 3
file content (137 lines) | stat: -rw-r--r-- 5,339 bytes parent folder | download | duplicates (2)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
###############################################################################
##
##  Copyright 2011-2013 Tavendo GmbH
##
##  Note:
##
##  This code is a Python implementation of the algorithm
##
##            "Flexible and Economical UTF-8 Decoder"
##
##  by Bjoern Hoehrmann
##
##       bjoern@hoehrmann.de
##       http://bjoern.hoehrmann.de/utf-8/decoder/dfa/
##
##  Licensed under the Apache License, Version 2.0 (the "License");
##  you may not use this file except in compliance with the License.
##  You may obtain a copy of the License at
##
##      http://www.apache.org/licenses/LICENSE-2.0
##
##  Unless required by applicable law or agreed to in writing, software
##  distributed under the License is distributed on an "AS IS" BASIS,
##  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
##  See the License for the specific language governing permissions and
##  limitations under the License.
##
###############################################################################

import six

if six.PY3:
    xrange = range

## use Cython implementation of UTF8 validator if available
##
try:
    from wsaccel.utf8validator import Utf8Validator
except:
    ## fallback to pure Python implementation

    class Utf8Validator:
        """
        Incremental UTF-8 validator with constant memory consumption (minimal
        state).

        Implements the algorithm "Flexible and Economical UTF-8 Decoder" by
        Bjoern Hoehrmann (http://bjoern.hoehrmann.de/utf-8/decoder/dfa/).
        """

        ## DFA transitions
        UTF8VALIDATOR_DFA = [
            0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,  # 00..1f
            0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,  # 20..3f
            0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,  # 40..5f
            0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,  # 60..7f
            1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,9,9,9,9,9,9,9,9,9,9,9,9,9,9,9,9,  # 80..9f
            7,7,7,7,7,7,7,7,7,7,7,7,7,7,7,7,7,7,7,7,7,7,7,7,7,7,7,7,7,7,7,7,  # a0..bf
            8,8,2,2,2,2,2,2,2,2,2,2,2,2,2,2,2,2,2,2,2,2,2,2,2,2,2,2,2,2,2,2,  # c0..df
            0xa,0x3,0x3,0x3,0x3,0x3,0x3,0x3,0x3,0x3,0x3,0x3,0x3,0x4,0x3,0x3,  # e0..ef
            0xb,0x6,0x6,0x6,0x5,0x8,0x8,0x8,0x8,0x8,0x8,0x8,0x8,0x8,0x8,0x8,  # f0..ff
            0x0,0x1,0x2,0x3,0x5,0x8,0x7,0x1,0x1,0x1,0x4,0x6,0x1,0x1,0x1,0x1,  # s0..s0
            1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,0,1,1,1,1,1,0,1,0,1,1,1,1,1,1,  # s1..s2
            1,2,1,1,1,1,1,2,1,2,1,1,1,1,1,1,1,1,1,1,1,1,1,2,1,1,1,1,1,1,1,1,  # s3..s4
            1,2,1,1,1,1,1,1,1,2,1,1,1,1,1,1,1,1,1,1,1,1,1,3,1,3,1,1,1,1,1,1,  # s5..s6
            1,3,1,1,1,1,1,3,1,3,1,1,1,1,1,1,1,3,1,1,1,1,1,1,1,1,1,1,1,1,1,1,  # s7..s8
        ]

        UTF8_ACCEPT = 0
        UTF8_REJECT = 1

        def __init__(self):
            self.state = None
            self.codepoint = None
            self.i = None
            self.reset()

        def decode(self, b):
            """
            Eat one UTF-8 octet, and validate on the fly.

            Returns UTF8_ACCEPT when enough octets have been consumed, in which case
            self.codepoint contains the decoded Unicode code point.

            Returns UTF8_REJECT when invalid UTF-8 was encountered.

            Returns some other positive integer when more octets need to be eaten.
            """
            type = Utf8Validator.UTF8VALIDATOR_DFA[b]

            if self.state != Utf8Validator.UTF8_ACCEPT:
                self.codepoint = (b & 0x3f) | (self.codepoint << 6)
            else:
                self.codepoint = (0xff >> type) & b

            self.state = Utf8Validator.UTF8VALIDATOR_DFA[256 + self.state * 16 + type]

            return self.state

        def reset(self):
            """
            Reset validator to start new incremental UTF-8 decode/validation.
            """
            self.state = Utf8Validator.UTF8_ACCEPT
            self.codepoint = 0
            self.i = 0

        def validate(self, ba):
            """
            Incrementally validate a chunk of bytes provided as string.

            Will return a quad (valid?, endsOnCodePoint?, currentIndex, totalIndex).

            As soon as an octet is encountered which renders the octet sequence
            invalid, a quad with valid? == False is returned. currentIndex returns
            the index within the currently consumed chunk, and totalIndex the
            index within the total consumed sequence that was the point of bail out.
            When valid? == True, currentIndex will be len(ba) and totalIndex the
            total amount of consumed bytes.
            """

            if not isinstance(ba, six.text_type):
                ba = ba.decode()
            l = len(ba)

            for i in xrange(l):
                ## optimized version of decode(), since we are not interested in actual code points

                self.state = Utf8Validator.UTF8VALIDATOR_DFA[256 + (self.state << 4) + Utf8Validator.UTF8VALIDATOR_DFA[ord(ba[i])]]

                if self.state == Utf8Validator.UTF8_REJECT:
                    self.i += i
                    return False, False, i, self.i

            self.i += l

            return True, self.state == Utf8Validator.UTF8_ACCEPT, l, self.i