1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67
|
######################################################################
#
# File: b2/_internal/_cli/b2api.py
#
# Copyright 2023 Backblaze Inc. All Rights Reserved.
#
# License https://www.backblaze.com/using_b2_code.html
#
######################################################################
import os
from typing import Optional
from b2sdk.v2 import (
AuthInfoCache,
B2Api,
B2HttpApiConfig,
InMemoryAccountInfo,
InMemoryCache,
SqliteAccountInfo,
)
from b2sdk.v2.exception import MissingAccountData
from b2._internal._cli.const import B2_USER_AGENT_APPEND_ENV_VAR
def _get_b2api_for_profile(
profile: Optional[str] = None,
raise_if_does_not_exist: bool = False,
**kwargs,
) -> B2Api:
if raise_if_does_not_exist:
account_info_file = SqliteAccountInfo.get_user_account_info_path(profile=profile)
if not os.path.exists(account_info_file):
raise MissingAccountData(account_info_file)
account_info = SqliteAccountInfo(profile=profile)
b2api = B2Api(
api_config=_get_b2httpapiconfig(),
account_info=account_info,
cache=AuthInfoCache(account_info),
**kwargs,
)
if os.getenv('CI', False) and os.getenv(
'GITHUB_REPOSITORY',
'',
).endswith('/B2_Command_Line_Tool'):
b2http = b2api.session.raw_api.b2_http
b2http.CONNECTION_TIMEOUT = 3 + 6 + 1
b2http.TIMEOUT = 12
b2http.TIMEOUT_FOR_COPY = 24
b2http.TIMEOUT_FOR_UPLOAD = 24
b2http.TRY_COUNT_DATA = 2
b2http.TRY_COUNT_DOWNLOAD = 2
b2http.TRY_COUNT_HEAD = 2
b2http.TRY_COUNT_OTHER = 2
return b2api
def _get_inmemory_b2api(**kwargs) -> B2Api:
return B2Api(InMemoryAccountInfo(), cache=InMemoryCache(), **kwargs)
def _get_b2httpapiconfig():
return B2HttpApiConfig(user_agent_append=os.environ.get(B2_USER_AGENT_APPEND_ENV_VAR),)
|