File: sso.py

package info (click to toggle)
python-proton-core 0.4.0-3
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid, trixie
  • size: 540 kB
  • sloc: python: 3,574; makefile: 15
file content (341 lines) | stat: -rw-r--r-- 14,294 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
"""
Copyright (c) 2023 Proton AG

This file is part of Proton.

Proton is free software: you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.

Proton is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
GNU General Public License for more details.

You should have received a copy of the GNU General Public License
along with ProtonVPN.  If not, see <https://www.gnu.org/licenses/>.
"""
from __future__ import annotations

import base64
import fcntl
import os
import re
from typing import TYPE_CHECKING, Optional

from proton.keyring import Keyring

if TYPE_CHECKING:
    from ..session import Session


# We don't necessarily need it to be a singleton, it doesn't harm in itself if multiple instances are created
class ProtonSSO:
    """Proton Single Sign On implementation. This allows session persistence for the current user.

    The general approach for this is to create a SSO instance, and then to get either a specific or the default session, and work from there:

    .. code-block::

        from proton.sso import ProtonSSO
        sso = ProtonSSO()
        session = sso.get_default_session()
        # or:
        session = sso.get_session('pro') # get session for account pro

    Note that it is advised not to try to "guess" the state of the session, but instead to just try to use it, and handle any exception that would arise.

    This object uses advisory locks (using ``flock``) to protect the session from multiple conflicting changes. This does not guarantee that
    Session objects are immune to what happens in another process (i.e. imagine if one process terminates the session.), but at least makes it consistent.
    In the future, it would be nice to use an IPC mechanism to make sure other processes are aware of the state change.
    """
    def __init__(self, appversion : str = "Other", user_agent: str ="None", keyring_backend_name=None):
        """Create a SSO instance

        :param appversion: Application version (see :class:`proton.session.Session`), defaults to "Other"
        :type appversion: str, optional
        :param user_agent: User agent version (see :class:`proton.session.Session`), defaults to "None"
        :type user_agent: str, optional
        :param keyring_backend_name: Name of Keyring backend to load (see :class:`proton.keyring.Keyring`), defaults to None
        :type keyring_backend_name: str, optional
        """
        # Store appversion and user_agent for subsequent sessions
        self._appversion = appversion
        self._user_agent = user_agent

        from ..utils import ExecutionEnvironment
        self._adv_locks_path = ExecutionEnvironment().path_runtime
        self._adv_locks = {}

        self._session_data_cache = {}

        # This is a global lock, we use it when we modify the indexes
        self._global_adv_lock = open(os.path.join(self._adv_locks_path, f'proton-sso.lock'), 'w')
        self.__keyring_backend = None
        self.__keyring_backend_name = keyring_backend_name

    def __encode_name(self, account_name) -> str:
        """Helper function to convert an account_name into a safe alphanumeric string.

        :param account_name: normalized account_name
        :type account_name: str
        :return: base32 encoded string, without padding.
        :rtype: str
        """
        return base64.b32encode(account_name.encode('utf8')).decode('ascii').rstrip('=').lower()

    def __keyring_key_name(self, account_name : str) -> str:
        """Helper function to get the keyring key for account_name

        :param account_name: normalized account_name
        :type account_name: str
        :return: keyring key
        :rtype: str
        """
        return f'proton-sso-account-{self.__encode_name(account_name)}'

    def __keyring_index_name(self) -> str:
        """Helper function to get the keyring key to store the index (i.e. account names in order)

        :return: keyring key
        :rtype: str
        """
        return f'proton-sso-accounts'

    @property
    def _keyring(self) -> "Keyring":
        """Shortcut to get the default keyring backend

        :return: an instance of the default Keyring
        :rtype: Keyring
        """
        if self.__keyring_backend is None:
            self.__keyring_backend = Keyring.get_from_factory(self.__keyring_backend_name)
        elif not isinstance(self.__keyring_backend, type(Keyring.get_from_factory(self.__keyring_backend_name))):
            # If the current keyring does not match the keyring we were using previously,
            # then something must've changed in the env and we should raise an exception.
            raise RuntimeError("Keyring backends do not match")

        return self.__keyring_backend

    @property
    def sessions(self) -> list[str]:
        """Returns the account names for the current system user

        :return: list of normalized account_names
        :rtype: list[str]
        """

        # We might remove invalid session and clean the index, so create a full lock on the SSO object
        fcntl.flock(self._global_adv_lock, fcntl.LOCK_EX)
        try:
            keyring = self._keyring

            try:
                keyring_index = keyring[self.__keyring_index_name()]
            except KeyError:
                keyring_index = []

            cleaned_index = [account_name for account_name in keyring_index if len(self._get_session_data(account_name)) > 0]
            if cleaned_index != keyring_index:
                keyring[self.__keyring_index_name()] = cleaned_index

            # Try to remove any account from keyring that we've removed from SSO
            for removed_account in set(keyring_index).difference(cleaned_index):
                try:
                    del keyring[self.__keyring_key_name(removed_account)]
                except KeyError:
                    pass

            return cleaned_index
        finally:
            fcntl.flock(self._global_adv_lock, fcntl.LOCK_UN)

    def get_session(self, account_name : Optional[str], override_class : Optional[type] = None) -> "Session":
        """Get the session identified by account_name

        :param account_name: account name to use. If None will return an empty session (can be used as a factory)
        :type account_name: Optional[str]
        :param override_class: Class to use for the session to be returned, by default will use proton.session.Session
        :type override_class: Optional[type]
        :return: the Session object. It will be an empty session if there's no session for account_name
        :rtype: Session
        """
        from ..session import Session

        if override_class is None:
            override_class = Session

        session = override_class(self._appversion, self._user_agent)
        session.register_persistence_observer(self)

        # If we have an account, then let's fetch the data from it. Otherwise we just ignore and return a blank session
        if account_name is not None:
            try:
                session_data = self._get_session_data(account_name)
            except KeyError:
                session_data = None
        else:
            session_data = None

        if session_data is not None:
            session.__setstate__(session_data)

        return session

    def get_default_session(self, override_class : Optional[type] = None)  -> "Session":
        """Get the default session for the system user. It will always be one valid session if one exists.

        :param override_class: Class to use for the session to be returned, see :meth:`get_session`.
        :type override_class: Optional[type]
        :return: the Session object. It will be an empty session if there's no session at all
        :rtype: Session
        """
        sessions = self.sessions
        if len(sessions) == 0:
            account_name = None
        else:
            account_name = sessions[0]

        return self.get_session(account_name, override_class)

    def set_default_account(self, account_name : str):
        """Set the default account for user to be account_name

        :param account_name: the account_name to use as default
        :type account_name: str
        :raises KeyError: if the account name is unknown
        """
        # We might be reordering accounts, so let's lock the full sso so we can't have concurrent actions here
        fcntl.flock(self._global_adv_lock, fcntl.LOCK_EX)
        try:
            keyring = self._keyring

            try:
                keyring_index = keyring[self.__keyring_index_name()]
            except KeyError:
                keyring_index = []

            if account_name not in keyring_index:
                raise KeyError(account_name)

            new_keyring_index = [account_name] + [x for x in keyring_index if x != account_name]
            if new_keyring_index != keyring_index:
                keyring[self.__keyring_index_name()] = new_keyring_index
            
        finally:
            fcntl.flock(self._global_adv_lock, fcntl.LOCK_UN)

    def _get_session_data(self, account_name : str) -> dict:
        """Helper function to get data of a session, returns an empty dict if no data is present

        :param account_name: normalized account name
        :type account_name: str
        :return: content of the session data, empty dict if it doesn't exist.
        :rtype: dict
        """
        try:
            data = self._keyring[self.__keyring_key_name(account_name)]
        except KeyError:
            data = {}

        # This is an encapsulation violation (we're not supposed to know that the account name is stored in AccountName)
        # It allows us nevertheless to validate that the session contains actual data, which is good to not break if a
        # Session implementation is invalid.
        if data.get('AccountName') != account_name:
            data = {}

        return data


    def _acquire_session_lock(self, account_name : str, current_data : dict) -> None:
        """Observer pattern for :class:`proton.session.Session` (see :meth:`proton.session.Session.register_persistence_observer`). It is called when the Session object is getting locked, because it's expected to be changed
        and we want to avoid race conditions.

        :param account_name: account name of the session
        :type account_name: str
        :param current_data: current session data serialized as a dictionary
        :type current_data: dict
        """
        if not account_name:
            # Don't do anything, we don't know the account yet!
            return

        self._adv_locks[account_name] = open(os.path.join(self._adv_locks_path, f'proton-sso-{self.__encode_name(account_name)}.lock'), 'w')
        # This is a blocking call. 
        # FIXME: this is Linux specific
        fcntl.flock(self._adv_locks[account_name], fcntl.LOCK_EX)

        self._session_data_cache[account_name] = current_data


    def _release_session_lock(self, account_name : str, new_data : dict) -> None:
        """Observer pattern for :class:`proton.session.Session` (see :meth:`proton.session.Session.register_persistence_observer`). It is called when the Session object is getting unlocked.

        If the data between has changed since :meth:`_acquire_session_lock` was called, it will be persisted in the keyring.

        :param account_name: account name of the session
        :type account_name: str
        :param new_data: current session data serialized as a dictionary
        :type new_data: dict
        """
        if not account_name:
            # Don't do anything, we don't know the account yet!
            return

        if new_data is not None and len(new_data) > 0 and new_data.get('AccountName', None) != account_name:
            raise ValueError("Sessions need to store a valid AccountName in order to store data.")

        # Don't do anything if data hasn't changed
        if account_name in self._session_data_cache:
            if self._session_data_cache[account_name] == new_data:
                return
            del self._session_data_cache[account_name]

        # We might be reordering accounts, so let's lock the full sso so we can't have concurrent actions here
        fcntl.flock(self._global_adv_lock, fcntl.LOCK_EX)
        try:
            keyring = self._keyring

            # Get current data
            try:
                keyring_entry = keyring[self.__keyring_key_name(account_name)]
            except KeyError:
                keyring_entry = {}

            try:
                keyring_index = keyring[self.__keyring_index_name()]
            except KeyError:
                keyring_index = []

            # By default, we don't change anything
            new_keyring_index = keyring_index

            # No data, this is probably a logout
            if new_data is None or len(new_data) == 0:
                # Discard from the index
                new_keyring_index = [x for x in keyring_index if x != account_name]

                # Delete the entry if we had some data previously
                if len(keyring_entry) > 0:
                    del keyring[self.__keyring_key_name(account_name)]
            # We have new data
            else:
                # If this is a new entry, then append the index with the account (we leave the default as is)
                if account_name not in keyring_index:
                    new_keyring_index = keyring_index + [account_name]

                # Store the new data
                keyring[self.__keyring_key_name(account_name)] = new_data

            # We only store the new index if it has changed (wouldn't harm to do it anyway)
            if new_keyring_index != keyring_index:
                keyring[self.__keyring_index_name()] = new_keyring_index

        finally:
            fcntl.flock(self._global_adv_lock, fcntl.LOCK_UN)

        if account_name in self._adv_locks:
            # FIXME: this is Linux specific
            fcntl.flock(self._adv_locks[account_name], fcntl.LOCK_UN)