File: idle.py

package info (click to toggle)
imap-tools 1.10.0%2Bdfsg-1
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid, trixie
  • size: 1,624 kB
  • sloc: python: 4,709; makefile: 5
file content (130 lines) | stat: -rw-r--r-- 4,335 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
import sys
import socket
import select
import imaplib
from typing import Optional, List

from .utils import check_command_status
from .errors import MailboxTaggedResponseError

imaplib.Commands.setdefault("IDLE", ("NONAUTH", "AUTH", "SELECTED"))  # noqa

SUPPORTS_SELECT_POLL = hasattr(select, 'poll')


def get_socket_poller(sock: socket.socket, timeout: Optional[int] = None):
    """
    Polls the socket for events telling us it's available to read
    :param sock: socket.socket
    :param timeout: seconds or None - seconds
    :return: polling object
    """
    if SUPPORTS_SELECT_POLL:
        # select.poll allows your process to have more than 1024 file descriptors
        poller = select.poll()
        poller.register(sock.fileno(), select.POLLIN)
        timeout = None if timeout is None else timeout * 1000
        return poller.poll(timeout)
    else:
        # select.select fails if your process has more than 1024 file descriptors, needs for windows and some other
        return select.select([sock], [], [], timeout)[0]


class IdleManager:
    """
    Mailbox IDLE logic
    Info about IMAP4 IDLE command at rfc2177
    Workflow examples:
    1.
        mailbox.idle.start()
        resps = mailbox.idle.poll(timeout=60)
        mailbox.idle.stop()
    2.
        resps = mailbox.idle.wait(timeout=60)
    """

    def __init__(self, mailbox):
        self.mailbox = mailbox
        self._idle_tag = None

    def start(self):
        """Switch on mailbox IDLE mode"""
        self._idle_tag = self.mailbox.client._command('IDLE')  # example: b'KLIG3'
        result = self.mailbox.client._get_response()
        check_command_status((result, 'IDLE start'), MailboxTaggedResponseError, expected=None)
        return result

    def stop(self):
        """Switch off mailbox IDLE mode"""
        self.mailbox.client.send(b"DONE\r\n")
        return self.mailbox.consume_until_tagged_response(self._idle_tag)

    def poll(self, timeout: Optional[float]) -> List[bytes]:
        """
        Poll for IDLE responses
        timeout = None
            Blocks until an IDLE response is received
        timeout = float
            Blocks until IDLE response is received or the timeout will expire
        :param timeout: seconds or None
        :return: list of raw responses
        result examples:
            [b'* 36 EXISTS', b'* 1 RECENT']
            [b'* 7 EXISTS']

        socket.settimeout modes:
            0 - non-blocking
            int - timeout mode
            None - blocking
        """
        if timeout is not None:
            timeout = float(timeout)
            if timeout > 29 * 60:
                raise ValueError(
                    'rfc2177 are advised to terminate the IDLE '
                    'and re-issue it at least every 29 minutes to avoid being logged off.'
                )
        sock = self.mailbox.client.sock
        old_timeout = sock.gettimeout()
        # make socket non-blocking so the timeout can be implemented for this call
        sock.settimeout(0)
        try:
            response_set = []
            events = get_socket_poller(sock, timeout)
            if events:
                while True:
                    try:
                        line = self.mailbox.client._get_line()
                    except (socket.timeout, socket.error):
                        break
                    except imaplib.IMAP4.abort:  # noqa
                        etype, evalue, etraceback = sys.exc_info()
                        if "EOF" in evalue.args[0]:
                            break
                        else:
                            raise
                    else:
                        response_set.append(line)
            return response_set
        finally:
            sock.settimeout(old_timeout)

    def wait(self, timeout: Optional[float]) -> List[bytes]:
        """
        Logic, step by step:
        1. Start idle mode
        2. Poll idle response
        3. Stop idle mode on response or timeout
        4. Return poll results
        :param timeout: for poll method
        :return: poll response
        """
        with self as idle:
            return idle.poll(timeout=timeout)

    def __enter__(self):
        self.start()
        return self

    def __exit__(self, exc_type, exc_value, exc_traceback):
        self.stop()