1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 392 393 394 395 396 397 398 399 400 401 402 403 404 405 406 407 408 409 410 411 412 413 414 415
|
"""
Copyright (c) 2023 Proton AG
This file is part of Proton VPN.
Proton VPN is free software: you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.
Proton VPN is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with ProtonVPN. If not, see <https://www.gnu.org/licenses/>.
"""
import asyncio
from os.path import basename
from threading import Event
from typing import Optional
from proton.session import Session, FormData, FormField
from proton.session.api import Fido2Assertion, Fido2AssertionParameters
from proton.vpn import logging
from proton.vpn.session.account import VPNAccount
from proton.vpn.session.fetcher import (VPNSessionFetcher, EndpointVersion)
from proton.vpn.session.client_config import ClientConfig
from proton.vpn.session.credentials import VPNSecrets
from proton.vpn.session.dataclasses import LoginResult, BugReportForm, VPNCertificate, VPNLocation
from proton.vpn.session.exceptions import VPNAccountDecodeError, ServerListDecodeError
from proton.vpn.session.servers.logicals import ServerList
from proton.vpn.session.feature_flags_fetcher import FeatureFlags
from proton.vpn.session.u2f_interaction import UserInteraction
logger = logging.getLogger(__name__)
BINARY_SERVER_STATUS = "BinaryServerStatus"
class VPNSession(Session):
"""
Augmented Session that provides helpers to a persistent offline keyring
access to user account information available from the PROTON VPN REST API.
Usage example:
.. code-block::
from proton.vpn.session import VPNSession
from proton.sso import ProtonSSO
sso = ProtonSSO()
session=sso.get_session(username, override_class=VPNSession)
session.authenticate('USERNAME','PASSWORD')
if session.authenticated:
pubkey_credentials = session.vpn_account.vpn_credentials.pubkey_credentials
wireguard_private_key = pubkey_credentials.wg_private_key
api_pem_certificate = pubkey_credentials.certificate_pem
"""
BUG_REPORT_ENDPOINT = "/core/v4/reports/bug"
def __init__(
self, *args,
fetcher: Optional[VPNSessionFetcher] = None,
vpn_account: Optional[VPNAccount] = None,
server_list: Optional[ServerList] = None,
client_config: Optional[ClientConfig] = None,
feature_flags: Optional[FeatureFlags] = None,
**kwargs
): # pylint: disable=too-many-arguments
self._fetcher = fetcher or VPNSessionFetcher(session=self)
try:
# pylint: disable=import-outside-toplevel
from proton.vpn.session.u2f import U2FKeys
self._u2f_keys = U2FKeys()
except ImportError as error:
self._u2f_keys = None
logger.warning(error)
self._vpn_account = vpn_account
self._server_list = server_list
self._client_config = client_config
self._feature_flags = feature_flags
super().__init__(*args, **kwargs)
@property
def loaded(self) -> bool:
""":returns: whether the VPN session data was already loaded or not."""
return self._vpn_account \
and self._server_list \
and self._client_config \
and self._feature_flags
def __setstate__(self, data):
"""This method is called when deserializing the session."""
# It might be useful to load feature flags cache/defaults, even in logged out state.
self._feature_flags = self._fetcher.load_feature_flags_from_cache()
if 'vpn' in data:
try:
self._vpn_account = VPNAccount.from_dict(data['vpn'])
except VPNAccountDecodeError:
logger.warning("VPN account could not be deserialized", exc_info=True)
try:
self._server_list = self._fetcher.load_server_list_from_cache()
except ServerListDecodeError:
logger.warning("Server list could not be deserialized", exc_info=True)
self._client_config = self._fetcher.load_client_config_from_cache()
super().__setstate__(data)
def __getstate__(self):
"""This method is called to retrieve the session data to be serialized in the keyring."""
state = super().__getstate__()
if state and self._vpn_account:
state['vpn'] = self._vpn_account.to_dict()
# Note the server list is not persisted to the keyring
return state
async def login(self, username: str, password: str) -> LoginResult:
"""
Logs the user in.
:returns: the login result, indicating whether it was successful
and whether 2FA is required or not.
"""
if self.logged_in:
return LoginResult(success=True, authenticated=True, twofa_required=False)
if not await self.async_authenticate(username, password):
return LoginResult(success=False, authenticated=False, twofa_required=False)
if self.needs_twofa:
return LoginResult(success=False, authenticated=True, twofa_required=True)
return LoginResult(success=True, authenticated=True, twofa_required=False)
async def provide_2fa_code(self, code: str) -> LoginResult:
"""
Submits the 2FA code.
:returns: whether the 2FA was successful or not.
"""
valid_code = await super().async_validate_2fa_code(code)
if not valid_code:
return LoginResult(success=False, authenticated=True, twofa_required=True)
return LoginResult(success=True, authenticated=True, twofa_required=False)
@property
def fido2_lib_available(self) -> Fido2AssertionParameters:
"""
:returns: whether the expected FIDO2 library is available or not.
"""
return bool(self._u2f_keys)
async def generate_2fa_fido2_assertion(
self,
user_interaction: Optional[UserInteraction] = None,
cancel_assertion: Optional[Event] = None
) -> Fido2Assertion:
"""
Scans for U2F/FIDO2 keys and generates a FIDO2 assertion.
:param user_interaction: object handling any required user interaction
while generating the assertion.
:param cancel_assertion: optional event that can be set to cancel the
fido 2 assertion process.
:returns: the generated FIDO2 assertion.
"""
if not self.fido2_lib_available:
raise RuntimeError("U2F/FIDO2 support is not available on this platform")
return await self._u2f_keys.scan_keys_and_get_assertion(
self, user_interaction, cancel_assertion
)
async def provide_2fa_fido2(self, fido2_assertion: Fido2Assertion) -> LoginResult:
"""
Submits the 2FA FIDO2 assertion.
:returns: whether the 2FA was successful or not.
"""
if not self.fido2_lib_available:
raise RuntimeError("U2F/FIDO2 support is not available on this platform")
valid_assertion = await super().async_validate_2fa_fido2(fido2_assertion)
if not valid_assertion:
return LoginResult(success=False, authenticated=True, twofa_required=True)
return LoginResult(success=True, authenticated=True, twofa_required=False)
async def logout(self, no_condition_check=False, additional_headers=None) -> bool:
"""
Log out and reset session data.
"""
result = await super().async_logout(no_condition_check, additional_headers)
self._vpn_account = None
self._server_list = None
self._client_config = None
self._feature_flags = None
self._fetcher.clear_cache()
return result
@property
def logged_in(self) -> bool:
"""
:returns: whether the user already logged in or not.
"""
return self.authenticated and not self.needs_twofa
async def fetch_session_data(self, features: Optional[dict] = None):
"""
Fetches the required session data from Proton's REST APIs.
"""
# We have to use `no_condition_check=True` with `_requests_lock`
# because otherwise all requests after that will be blocked
# until the lock created by `_requests_lock` is released.
# Since the previous lock is only released at the end of the try/except/finally the
# requests will never be executed, thus blocking and never releasing the lock.
# Each request in `proton.session.api.Session` already creates and holds the lock by itself,
# but the problem here is that we want to add additional data to be stored to the keyring.
# Thus we need to resort to some manual
# triggering of `_requests_lock` and `_requests_unlock`.
# The former caches keyring data to memory while the latter does three different things:
# 1. It checks if the new data is different from the old one
# 2. If they are different then it proceeds to delete old one from keyring
# 3. Add new data to the keyring
# So if we want to add additional data to the keyring, as in VPN relevant data,
# we must ensure that we always call `_requests_unlock()` after any requests
# because this is currently the only way to store data that is attached
# to a specific account.
# So the consequence for passing `no_condition_check=True` is that the keyring data will
# not get cached to memory, for later to be compared (as previously described).
# This means that later when the comparison will be made, the "old" data will just be empty,
# forcing it to always be replaced by the new data to keyring. Thus this solution is just a
# temporary hack until a better approach is found.
# For further clarification on how these methods see the following, in the specified order:
# `proton.session.api.Session._requests_lock`
# `proton.sso.sso.ProtonSSO._acquire_session_lock`
# `proton.session.api.Session._requests_unlock`
# `proton.sso.sso.ProtonSSO._release_session_lock`
self._requests_lock(no_condition_check=True)
try:
secrets = (
VPNSecrets(
ed25519_privatekey=self._vpn_account.vpn_credentials
.pubkey_credentials.ed_255519_private_key
)
if self._vpn_account
else VPNSecrets()
)
vpninfo, certificate, location, client_config = await asyncio.gather(
self._fetcher.fetch_vpn_info(),
self._fetcher.fetch_certificate(
client_public_key=secrets.ed25519_pk_pem, features=features),
self._fetcher.fetch_location(),
self._fetcher.fetch_client_config(),
)
self._vpn_account = VPNAccount(
vpninfo=vpninfo, certificate=certificate, secrets=secrets, location=location
)
self._client_config = client_config
# The feature flags must be fetched before the server list,
# since the server list can be fetched differently depending on
# what feature flags are enabled.
self._feature_flags = await self._fetcher.fetch_feature_flags()
# The server list should be retrieved after the VPNAccount object
# has been created, since it requires the location, and it should
# be retrieved after the feature flags have been fetched, since it
# depends in them for chosing the fetch method.
await self.fetch_server_list()
finally:
# IMPORTANT: apart from releasing the lock, _requests_unlock triggers the
# serialization of the session to the keyring.
self._requests_unlock()
async def fetch_certificate(self, features: Optional[dict] = None) -> VPNCertificate:
"""Fetches new certificate from API."""
self._requests_lock(no_condition_check=True)
try:
secrets = (
VPNSecrets(
ed25519_privatekey=self._vpn_account.vpn_credentials
.pubkey_credentials.ed_255519_private_key
)
)
new_certificate = await self._fetcher.fetch_certificate(
client_public_key=secrets.ed25519_pk_pem,
features=features
)
self._vpn_account.set_certificate(new_certificate)
return new_certificate
finally:
self._requests_unlock()
@property
def vpn_account(self) -> VPNAccount:
"""
Information related to the VPN user account.
If it was not loaded yet then None is returned instead.
"""
return self._vpn_account
def set_location(self, location: VPNLocation):
"""Set new location data and store it."""
self._requests_lock(no_condition_check=False)
try:
self._vpn_account.location = location
finally:
self._requests_unlock()
def _serverlist_endpoint_version(self) -> EndpointVersion:
"""Returns the endpoint version to be used for server list fetching."""
if self._feature_flags.get(BINARY_SERVER_STATUS):
return EndpointVersion.V2
return EndpointVersion.V1
async def fetch_server_list(self) -> ServerList:
"""
Fetches the server list from the REST API.
"""
self._server_list = await self._fetcher.fetch_server_list(
self._serverlist_endpoint_version()
)
return self._server_list
@property
def server_list(self) -> ServerList:
"""The current server list."""
return self._server_list
async def update_server_loads(self) -> ServerList:
"""
Fetches the server loads from the REST API and updates the current
server list with them.
"""
self._server_list = await self._fetcher.update_server_loads(
self._serverlist_endpoint_version()
)
return self._server_list
async def fetch_client_config(self) -> ClientConfig:
"""Fetches the client configuration from the REST api."""
self._client_config = await self._fetcher.fetch_client_config()
return self._client_config
@property
def client_config(self) -> ClientConfig:
"""The current client configuration."""
return self._client_config
async def fetch_feature_flags(self) -> FeatureFlags:
"""Fetches API features that dictates which features are to be enabled or not."""
self._feature_flags = await self._fetcher.fetch_feature_flags()
return self._feature_flags
@property
def feature_flags(self) -> FeatureFlags:
"""Fetches general client configuration to connect to VPN servers."""
if self._feature_flags is None:
return FeatureFlags.default()
return self._feature_flags
async def submit_bug_report(self, bug_report: BugReportForm):
"""Submits a bug report to customer support."""
data = FormData()
data.add(FormField(name="OS", value=bug_report.os))
data.add(FormField(name="OSVersion", value=bug_report.os_version))
data.add(FormField(name="Client", value=bug_report.client))
data.add(FormField(name="ClientVersion", value=bug_report.client_version))
data.add(FormField(name="ClientType", value=bug_report.client_type))
data.add(FormField(name="Title", value=bug_report.title))
data.add(FormField(name="Description", value=bug_report.description))
data.add(FormField(name="Username", value=bug_report.username))
data.add(FormField(name="Email", value=bug_report.email))
if self._vpn_account:
location = self._vpn_account.location
data.add(FormField(name="ISP", value=location.ISP))
data.add(FormField(name="Country", value=location.Country))
for i, attachment in enumerate(bug_report.attachments):
data.add(FormField(
name=f"Attachment-{i}", value=attachment,
filename=basename(attachment.name)
))
return await self.async_api_request(
endpoint=VPNSession.BUG_REPORT_ENDPOINT, data=data
)
|