1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109
|
# coding=utf-8
###############################################################################
##
## Copyright 2011 Tavendo GmbH
##
## Note:
##
## This code is a Python implementation of the algorithm
##
## "Flexible and Economical UTF-8 Decoder"
##
## by Bjoern Hoehrmann
##
## bjoern@hoehrmann.de
## http://bjoern.hoehrmann.de/utf-8/decoder/dfa/
##
## Licensed under the Apache License, Version 2.0 (the "License");
## you may not use this file except in compliance with the License.
## You may obtain a copy of the License at
##
## http://www.apache.org/licenses/LICENSE-2.0
##
## Unless required by applicable law or agreed to in writing, software
## distributed under the License is distributed on an "AS IS" BASIS,
## WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
## See the License for the specific language governing permissions and
## limitations under the License.
##
###############################################################################
## DFA transitions
cdef extern from "dfa.h":
cdef char* UTF8VALIDATOR_DFA
DEF _UTF8_ACCEPT = 0
DEF _UTF8_REJECT = 1
cdef class Utf8Validator(object):
"""
Incremental UTF-8 validator with constant memory consumption (minimal state).
Implements the algorithm "Flexible and Economical UTF-8 Decoder" by
Bjoern Hoehrmann (http://bjoern.hoehrmann.de/utf-8/decoder/dfa/).
"""
UTF8_ACCEPT = 0
UTF8_REJECT = 1
cdef public int i
cdef public int state
cdef public int codepoint
def __init__(self):
self.reset()
def decode(self, b):
"""
Eat one UTF-8 octet, and validate on the fly.
Returns UTF8_ACCEPT when enough octets have been consumed, in which case
self.codepoint contains the decoded Unicode code point.
Returns UTF8_REJECT when invalid UTF-8 was encountered.
Returns some other positive integer when more octets need to be eaten.
"""
cdef int type = UTF8VALIDATOR_DFA[b]
if self.state != _UTF8_ACCEPT:
self.codepoint = (b & 0x3f) | (self.codepoint << 6)
else:
self.codepoint = (0xff >> type) & b
self.state = UTF8VALIDATOR_DFA[256 + self.state * 16 + type]
return self.state
def reset(self):
"""
Reset validator to start new incremental UTF-8 decode/validation.
"""
self.state = _UTF8_ACCEPT
self.codepoint = 0
self.i = 0
def validate(self, const unsigned char [:] buf):
"""
Incrementally validate a chunk of bytes provided as bytearray.
Will return a quad (valid?, endsOnCodePoint?, currentIndex, totalIndex).
As soon as an octet is encountered which renders the octet sequence
invalid, a quad with valid? == False is returned. currentIndex returns
the index within the currently consumed chunk, and totalIndex the
index within the total consumed sequence that was the point of bail out.
When valid? == True, currentIndex will be len(buf) and totalIndex the
total amount of consumed bytes.
"""
cdef int state = self.state
cdef int i=0, b
for i, b in enumerate(buf):
## optimized version of decode(), since we are not interested in actual code points
state = UTF8VALIDATOR_DFA[256 + (state << 4) + UTF8VALIDATOR_DFA[b]]
if state == _UTF8_REJECT:
self.i += i
self.state = state
return False, False, i, self.i
i = len(buf)
self.i += i
self.state = state
return True, state == _UTF8_ACCEPT, i, self.i
|