1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174
|
# Copyright (c) 2014, Menno Smits
# Released subject to the New BSD License
# Please see http://en.wikipedia.org/wiki/BSD_licenses
"""
A lexical analyzer class for IMAP responses.
Although Lexer does all the work, TokenSource is the class to use for
external callers.
"""
from typing import Iterator, List, Optional, Tuple, TYPE_CHECKING, Union
from .util import assert_imap_protocol
__all__ = ["TokenSource"]
CTRL_CHARS = frozenset(c for c in range(32))
ALL_CHARS = frozenset(c for c in range(256))
SPECIALS = frozenset(c for c in b' ()%"[')
NON_SPECIALS = ALL_CHARS - SPECIALS - CTRL_CHARS
WHITESPACE = frozenset(c for c in b" \t\r\n")
BACKSLASH = ord("\\")
OPEN_SQUARE = ord("[")
CLOSE_SQUARE = ord("]")
DOUBLE_QUOTE = ord('"')
class TokenSource:
"""
A simple iterator for the Lexer class that also provides access to
the current IMAP literal.
"""
def __init__(self, text: List[bytes]):
self.lex = Lexer(text)
self.src = iter(self.lex)
@property
def current_literal(self) -> Optional[bytes]:
if TYPE_CHECKING:
assert self.lex.current_source is not None
return self.lex.current_source.literal
def __iter__(self) -> Iterator[bytes]:
return self.src
class Lexer:
"""
A lexical analyzer class for IMAP
"""
def __init__(self, text: List[bytes]):
self.sources = (LiteralHandlingIter(chunk) for chunk in text)
self.current_source: Optional[LiteralHandlingIter] = None
def read_until(
self, stream_i: "PushableIterator", end_char: int, escape: bool = True
) -> bytearray:
token = bytearray()
try:
for nextchar in stream_i:
if escape and nextchar == BACKSLASH:
escaper = nextchar
nextchar = next(stream_i)
if nextchar not in (escaper, end_char):
token.append(escaper) # Don't touch invalid escaping
elif nextchar == end_char:
break
token.append(nextchar)
else:
raise ValueError("No closing '%s'" % chr(end_char))
except StopIteration:
raise ValueError("No closing '%s'" % chr(end_char))
token.append(end_char)
return token
def read_token_stream(self, stream_i: "PushableIterator") -> Iterator[bytearray]:
whitespace = WHITESPACE
wordchars = NON_SPECIALS
read_until = self.read_until
while True:
# Whitespace
for nextchar in stream_i:
if nextchar not in whitespace:
stream_i.push(nextchar)
break # done skipping over the whitespace
# Non-whitespace
token = bytearray()
for nextchar in stream_i:
if nextchar in wordchars:
token.append(nextchar)
elif nextchar == OPEN_SQUARE:
token.append(nextchar)
token.extend(read_until(stream_i, CLOSE_SQUARE, escape=False))
else:
if nextchar in whitespace:
yield token
elif nextchar == DOUBLE_QUOTE:
assert_imap_protocol(not token)
token.append(nextchar)
token.extend(read_until(stream_i, nextchar))
yield token
else:
# Other punctuation, eg. "(". This ends the current token.
if token:
yield token
yield bytearray([nextchar])
break
else:
if token:
yield token
break
def __iter__(self) -> Iterator[bytes]:
for source in self.sources:
self.current_source = source
for tok in self.read_token_stream(iter(source)):
yield bytes(tok)
# imaplib has poor handling of 'literals' - it both fails to remove the
# {size} marker, and fails to keep responses grouped into the same logical
# 'line'. What we end up with is a list of response 'records', where each
# record is either a simple string, or tuple of (str_with_lit, literal) -
# where str_with_lit is a string with the {xxx} marker at its end. Note
# that each element of this list does *not* correspond 1:1 with the
# untagged responses.
# (http://bugs.python.org/issue5045 also has comments about this)
# So: we have a special object for each of these records. When a
# string literal is processed, we peek into this object to grab the
# literal.
class LiteralHandlingIter:
def __init__(self, resp_record: Union[Tuple[bytes, bytes], bytes]):
self.literal: Optional[bytes]
if isinstance(resp_record, tuple):
# A 'record' with a string which includes a literal marker, and
# the literal itself.
self.src_text = resp_record[0]
assert_imap_protocol(self.src_text.endswith(b"}"), self.src_text)
self.literal = resp_record[1]
else:
# just a line with no literals.
self.src_text = resp_record
self.literal = None
def __iter__(self) -> "PushableIterator":
return PushableIterator(self.src_text)
class PushableIterator:
NO_MORE = object()
def __init__(self, it: bytes):
self.it = iter(it)
self.pushed: List[int] = []
def __iter__(self) -> "PushableIterator":
return self
def __next__(self) -> int:
if self.pushed:
return self.pushed.pop()
return next(self.it)
# For Python 2 compatibility
next = __next__
def push(self, item: int) -> None:
self.pushed.append(item)
|