File: buckets.py

package info (click to toggle)
python-b2sdk 2.10.2-1
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid
  • size: 3,228 kB
  • sloc: python: 32,094; sh: 13; makefile: 8
file content (71 lines) | stat: -rw-r--r-- 1,883 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
######################################################################
#
# File: b2sdk/_internal/testing/fixtures/buckets.py
#
# Copyright 2021 Backblaze Inc. All Rights Reserved.
#
# License https://www.backblaze.com/using_b2_code.html
#
######################################################################
from __future__ import annotations

import secrets

import pytest

from b2sdk._internal.testing.helpers.bucket_manager import BucketManager
from b2sdk._internal.testing.helpers.buckets import (
    GENERAL_BUCKET_NAME_PREFIX,
    get_bucket_name_prefix,
)


def pytest_addoption(parser):
    """Add a flag for not cleaning up old buckets"""
    parser.addoption(
        '--dont-cleanup-old-buckets',
        action='store_true',
        default=False,
    )


@pytest.fixture(scope='session')
def dont_cleanup_old_buckets(request):
    return request.config.getoption('--dont-cleanup-old-buckets')


@pytest.fixture(scope='session')
def general_bucket_name_prefix():
    return GENERAL_BUCKET_NAME_PREFIX


@pytest.fixture(scope='session')
def bucket_name_prefix(general_bucket_name_prefix):
    return get_bucket_name_prefix(8, general_bucket_name_prefix)


@pytest.fixture(scope='session')
def bucket_manager(
    bucket_name_prefix, general_bucket_name_prefix, dont_cleanup_old_buckets, _b2_api
):
    manager = BucketManager(
        dont_cleanup_old_buckets,
        _b2_api,
        current_run_prefix=bucket_name_prefix,
        general_prefix=general_bucket_name_prefix,
    )
    yield manager
    manager.clean_buckets()


@pytest.fixture
def bucket(bucket_name_prefix, bucket_manager):
    bucket = bucket_manager.create_bucket()
    yield bucket
    bucket_manager.clean_bucket(bucket)


@pytest.fixture
def b2_subfolder(bucket, request):
    subfolder_name = f'{request.node.name}_{secrets.token_urlsafe(4)}'
    return f'b2://{bucket.name}/{subfolder_name}'