File: tfa.py

package info (click to toggle)
python-pyaarlo 0.8.0.15-1
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid, trixie
  • size: 556 kB
  • sloc: python: 6,064; makefile: 6; sh: 1
file content (235 lines) | stat: -rw-r--r-- 7,814 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
import email
import imaplib
import re
import time
import ssl

import requests


class Arlo2FAConsole:
    """2FA authentication via console.
    Accepts input from console and returns that for 2FA.
    """

    def __init__(self, arlo):
        self._arlo = arlo

    def start(self):
        self.debug("starting")
        return True

    def get(self):
        self.debug("checking")
        return input("Enter Code: ")

    def stop(self):
        self.debug("stopping")

    def debug(self, msg):
        self._arlo.debug(f"2fa-console: {msg}")


class Arlo2FAImap:
    """2FA authentication via IMAP
    Connects to IMAP server and waits for email from Arlo with 2FA code in it.

    Note: will probably need tweaking for other IMAP setups...
    """

    def __init__(self, arlo):
        self._arlo = arlo
        self._imap = None
        self._old_ids = None
        self._new_ids = None

    def start(self):
        self.debug("starting")

        # clean up
        if self._imap is not None:
            self.stop()

        try:
            # allow default ciphers to be specified
            cipher_list = self._arlo.cfg.cipher_list
            if cipher_list != "":
                ctx = ssl.create_default_context()
                ctx.set_ciphers(cipher_list)
                self.debug(f"imap is using custom ciphers {cipher_list}")
            else:
                ctx = None

            self._imap = imaplib.IMAP4_SSL(self._arlo.cfg.tfa_host, port=self._arlo.cfg.tfa_port, ssl_context=ctx)
            if self._arlo.cfg.verbose:
                self._imap.debug = 4
            res, status = self._imap.login(
                self._arlo.cfg.tfa_username, self._arlo.cfg.tfa_password
            )
            if res.lower() != "ok":
                self.debug("imap login failed")
                return False
            res, status = self._imap.select(mailbox='INBOX', readonly=True)
            if res.lower() != "ok":
                self.debug("imap select failed")
                return False
            res, self._old_ids = self._imap.search(
                None, "FROM", "do_not_reply@arlo.com"
            )
            if res.lower() != "ok":
                self.debug("imap search failed")
                return False
        except Exception as e:
            self._arlo.error(f"imap connection failed{str(e)}")
            return False

        self._new_ids = self._old_ids
        self.debug("old-ids={}".format(self._old_ids))
        if res.lower() == "ok":
            return True

        return False

    def get(self):
        self.debug("checking")

        # give tfa_total_timeout seconds for email to arrive
        start = time.time()
        while True:

            # wait a short while, stop after a total timeout
            # ok to do on first run gives email time to arrive
            time.sleep(self._arlo.cfg.tfa_timeout)
            if time.time() > (start + self._arlo.cfg.tfa_total_timeout):
                return None

            try:
                # grab new email ids
                self._imap.check()
                res, self._new_ids = self._imap.search(
                    None, "FROM", "do_not_reply@arlo.com"
                )
                self.debug("new-ids={}".format(self._new_ids))
                if self._new_ids == self._old_ids:
                    self.debug("no change in emails")
                    continue

                # New message. Reverse so we look at the newest one first.
                old_ids = self._old_ids[0].split()
                msg_ids = self._new_ids[0].split()
                msg_ids.reverse()
                for msg_id in msg_ids:

                    # Seen it?
                    if msg_id in old_ids:
                        continue

                    # New message. Look at all the parts and try to grab the code, if we catch an exception
                    # just move onto the next part.
                    self.debug("new-msg={}".format(msg_id))
                    res, parts = self._imap.fetch(msg_id, "(BODY.PEEK[])")
                    # res, parts = self._imap.fetch(msg_id, "(RFC822)")

                    for msg in parts:
                        try:
                            if isinstance(msg[1], bytes):
                                for part in email.message_from_bytes(msg[1]).walk():
                                    if part.get_content_type() != "text/html":
                                        continue
                                    for line in part.get_payload(decode=True).splitlines():
                                        # match code in email, this might need some work if the email changes
                                        code = re.match(r"^\W+(\d{6})\W*$", line.decode())
                                        if code is not None:
                                            self.debug(f"code={code.group(1)}")
                                            return code.group(1)
                        except Exception as e:
                            self.debug(f"trying next part {str(e)}")

                # Update old so we don't keep trying new.
                # Yahoo can lose ids so we extend the old list.
                self._old_ids.extend(new_id for new_id in self._new_ids if new_id not in self._old_ids)

            # problem parsing the message, force a fail
            except Exception as e:
                self._arlo.error(f"imap message read failed{str(e)}")
                return None

        return None

    def stop(self):
        self.debug("stopping")

        self._imap.close()
        self._imap.logout()
        self._imap = None
        self._old_ids = None
        self._new_ids = None

    def debug(self, msg):
        self._arlo.debug(f"2fa-imap: {msg}")


class Arlo2FARestAPI:
    """2FA authentication via rest API.
    Queries web site until code appears
    """

    def __init__(self, arlo):
        self._arlo = arlo

    def start(self):
        self.debug("starting")
        if self._arlo.cfg.tfa_host is None or self._arlo.cfg.tfa_password is None:
            self.debug("invalid config")
            return False

        self.debug("clearing")
        response = requests.get(
            "{}/clear?email={}&token={}".format(
                self._arlo.cfg.tfa_host_with_scheme("https"),
                self._arlo.cfg.tfa_username,
                self._arlo.cfg.tfa_password,
            ),
            timeout=10,
        )
        if response.status_code != 200:
            self.debug("possible problem clearing")

        return True

    def get(self):
        self.debug("checking")

        # give tfa_total_timeout seconds for email to arrive
        start = time.time()
        while True:

            # wait a short while, stop after a total timeout
            # ok to do on first run gives email time to arrive
            time.sleep(self._arlo.cfg.tfa_timeout)
            if time.time() > (start + self._arlo.cfg.tfa_total_timeout):
                return None

            # Try for the token.
            self.debug("checking")
            response = requests.get(
                "{}/get?email={}&token={}".format(
                    self._arlo.cfg.tfa_host_with_scheme("https"),
                    self._arlo.cfg.tfa_username,
                    self._arlo.cfg.tfa_password,
                ),
                timeout=10,
            )
            if response.status_code == 200:
                code = response.json().get("data", {}).get("code", None)
                if code is not None:
                    self.debug("code={}".format(code))
                    return code

            self.debug("retrying")

    def stop(self):
        self.debug("stopping")

    def debug(self, msg):
        self._arlo.debug(f"2fa-rest-api: {msg}")