1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168
|
import subprocess
import sys
import time
from _typeshed import ReadableBuffer, SizedBuffer
from builtins import list as _list # conflicts with a method named "list"
from collections.abc import Callable
from datetime import datetime
from re import Pattern
from socket import socket as _socket
from ssl import SSLContext, SSLSocket
from types import TracebackType
from typing import IO, Any, Literal, SupportsAbs, SupportsInt
from typing_extensions import Self, TypeAlias
__all__ = ["IMAP4", "IMAP4_stream", "Internaldate2tuple", "Int2AP", "ParseFlags", "Time2Internaldate", "IMAP4_SSL"]
# TODO: Commands should use their actual return types, not this type alias.
# E.g. Tuple[Literal["OK"], List[bytes]]
_CommandResults: TypeAlias = tuple[str, list[Any]]
_AnyResponseData: TypeAlias = list[None] | list[bytes | tuple[bytes, bytes]]
Commands: dict[str, tuple[str, ...]]
class IMAP4:
class error(Exception): ...
class abort(error): ...
class readonly(abort): ...
mustquote: Pattern[str]
debug: int
state: str
literal: str | None
tagged_commands: dict[bytes, _list[bytes] | None]
untagged_responses: dict[str, _list[bytes | tuple[bytes, bytes]]]
continuation_response: str
is_readonly: bool
tagnum: int
tagpre: str
tagre: Pattern[str]
welcome: bytes
capabilities: tuple[str, ...]
PROTOCOL_VERSION: str
if sys.version_info >= (3, 9):
def __init__(self, host: str = "", port: int = 143, timeout: float | None = None) -> None: ...
def open(self, host: str = "", port: int = 143, timeout: float | None = None) -> None: ...
else:
def __init__(self, host: str = "", port: int = 143) -> None: ...
def open(self, host: str = "", port: int = 143) -> None: ...
def __getattr__(self, attr: str) -> Any: ...
host: str
port: int
sock: _socket
file: IO[str] | IO[bytes]
def read(self, size: int) -> bytes: ...
def readline(self) -> bytes: ...
def send(self, data: ReadableBuffer) -> None: ...
def shutdown(self) -> None: ...
def socket(self) -> _socket: ...
def recent(self) -> _CommandResults: ...
def response(self, code: str) -> _CommandResults: ...
def append(self, mailbox: str, flags: str, date_time: str, message: ReadableBuffer) -> str: ...
def authenticate(self, mechanism: str, authobject: Callable[[bytes], bytes | None]) -> tuple[str, str]: ...
def capability(self) -> _CommandResults: ...
def check(self) -> _CommandResults: ...
def close(self) -> _CommandResults: ...
def copy(self, message_set: str, new_mailbox: str) -> _CommandResults: ...
def create(self, mailbox: str) -> _CommandResults: ...
def delete(self, mailbox: str) -> _CommandResults: ...
def deleteacl(self, mailbox: str, who: str) -> _CommandResults: ...
def enable(self, capability: str) -> _CommandResults: ...
def __enter__(self) -> Self: ...
def __exit__(self, t: type[BaseException] | None, v: BaseException | None, tb: TracebackType | None) -> None: ...
def expunge(self) -> _CommandResults: ...
def fetch(self, message_set: str, message_parts: str) -> tuple[str, _AnyResponseData]: ...
def getacl(self, mailbox: str) -> _CommandResults: ...
def getannotation(self, mailbox: str, entry: str, attribute: str) -> _CommandResults: ...
def getquota(self, root: str) -> _CommandResults: ...
def getquotaroot(self, mailbox: str) -> _CommandResults: ...
def list(self, directory: str = '""', pattern: str = "*") -> tuple[str, _AnyResponseData]: ...
def login(self, user: str, password: str) -> tuple[Literal["OK"], _list[bytes]]: ...
def login_cram_md5(self, user: str, password: str) -> _CommandResults: ...
def logout(self) -> tuple[str, _AnyResponseData]: ...
def lsub(self, directory: str = '""', pattern: str = "*") -> _CommandResults: ...
def myrights(self, mailbox: str) -> _CommandResults: ...
def namespace(self) -> _CommandResults: ...
def noop(self) -> tuple[str, _list[bytes]]: ...
def partial(self, message_num: str, message_part: str, start: str, length: str) -> _CommandResults: ...
def proxyauth(self, user: str) -> _CommandResults: ...
def rename(self, oldmailbox: str, newmailbox: str) -> _CommandResults: ...
def search(self, charset: str | None, *criteria: str) -> _CommandResults: ...
def select(self, mailbox: str = "INBOX", readonly: bool = False) -> tuple[str, _list[bytes | None]]: ...
def setacl(self, mailbox: str, who: str, what: str) -> _CommandResults: ...
def setannotation(self, *args: str) -> _CommandResults: ...
def setquota(self, root: str, limits: str) -> _CommandResults: ...
def sort(self, sort_criteria: str, charset: str, *search_criteria: str) -> _CommandResults: ...
def starttls(self, ssl_context: Any | None = None) -> tuple[Literal["OK"], _list[None]]: ...
def status(self, mailbox: str, names: str) -> _CommandResults: ...
def store(self, message_set: str, command: str, flags: str) -> _CommandResults: ...
def subscribe(self, mailbox: str) -> _CommandResults: ...
def thread(self, threading_algorithm: str, charset: str, *search_criteria: str) -> _CommandResults: ...
def uid(self, command: str, *args: str) -> _CommandResults: ...
def unsubscribe(self, mailbox: str) -> _CommandResults: ...
if sys.version_info >= (3, 9):
def unselect(self) -> _CommandResults: ...
def xatom(self, name: str, *args: str) -> _CommandResults: ...
def print_log(self) -> None: ...
class IMAP4_SSL(IMAP4):
if sys.version_info < (3, 12):
keyfile: str
certfile: str
if sys.version_info >= (3, 12):
def __init__(
self, host: str = "", port: int = 993, *, ssl_context: SSLContext | None = None, timeout: float | None = None
) -> None: ...
elif sys.version_info >= (3, 9):
def __init__(
self,
host: str = "",
port: int = 993,
keyfile: str | None = None,
certfile: str | None = None,
ssl_context: SSLContext | None = None,
timeout: float | None = None,
) -> None: ...
else:
def __init__(
self,
host: str = "",
port: int = 993,
keyfile: str | None = None,
certfile: str | None = None,
ssl_context: SSLContext | None = None,
) -> None: ...
sslobj: SSLSocket
file: IO[Any]
if sys.version_info >= (3, 9):
def open(self, host: str = "", port: int | None = 993, timeout: float | None = None) -> None: ...
else:
def open(self, host: str = "", port: int | None = 993) -> None: ...
def ssl(self) -> SSLSocket: ...
class IMAP4_stream(IMAP4):
command: str
def __init__(self, command: str) -> None: ...
file: IO[Any]
process: subprocess.Popen[bytes]
writefile: IO[Any]
readfile: IO[Any]
if sys.version_info >= (3, 9):
def open(self, host: str | None = None, port: int | None = None, timeout: float | None = None) -> None: ...
else:
def open(self, host: str | None = None, port: int | None = None) -> None: ...
class _Authenticator:
mech: Callable[[bytes], bytes | bytearray | memoryview | str | None]
def __init__(self, mechinst: Callable[[bytes], bytes | bytearray | memoryview | str | None]) -> None: ...
def process(self, data: str) -> str: ...
def encode(self, inp: bytes | bytearray | memoryview) -> str: ...
def decode(self, inp: str | SizedBuffer) -> bytes: ...
def Internaldate2tuple(resp: ReadableBuffer) -> time.struct_time | None: ...
def Int2AP(num: SupportsAbs[SupportsInt]) -> bytes: ...
def ParseFlags(resp: ReadableBuffer) -> tuple[bytes, ...]: ...
def Time2Internaldate(date_time: float | time.struct_time | time._TimeTuple | datetime | str) -> str: ...
|