1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107
|
import warnings
import fsspec
import pytest
from upath import UPath
from upath.implementations.cloud import GCSPath
from ..cases import BaseTests
from ..utils import OverrideMeta
from ..utils import extends_base
from ..utils import overrides_base
from ..utils import posixify
from ..utils import skip_on_windows
@skip_on_windows
class TestGCSPath(BaseTests, metaclass=OverrideMeta):
SUPPORTS_EMPTY_DIRS = False
@pytest.fixture(autouse=True, scope="function")
def path(self, gcs_fixture):
path, endpoint_url = gcs_fixture
self.path = UPath(path, endpoint_url=endpoint_url, token="anon")
@overrides_base
def test_is_correct_class(self):
assert isinstance(self.path, GCSPath)
@extends_base
def test_rmdir(self):
dirname = "rmdir_test"
mock_dir = self.path.joinpath(dirname)
mock_dir.joinpath("test.txt").write_text("hello")
mock_dir.fs.invalidate_cache()
mock_dir.rmdir()
assert not mock_dir.exists()
with pytest.raises(NotADirectoryError):
self.path.joinpath("file1.txt").rmdir()
@skip_on_windows
def test_mkdir_in_empty_bucket(docker_gcs):
fs = fsspec.filesystem("gcs", endpoint_url=docker_gcs, token="anon")
fs.mkdir("my-fresh-bucket")
assert "my-fresh-bucket/" in fs.buckets
fs.invalidate_cache()
del fs
UPath(
"gs://my-fresh-bucket/some-dir/another-dir/file",
endpoint_url=docker_gcs,
token="anon",
).parent.mkdir(parents=True, exist_ok=True)
@skip_on_windows
@pytest.mark.xfail(reason="gcsfs returns isdir false")
def test_copy__object_key_collides_with_dir_prefix(docker_gcs, tmp_path):
gcs = fsspec.filesystem(
"gcs",
endpoint_url=docker_gcs,
token="anon",
use_listings_cache=False,
)
bucket = "copy_into_collision_bucket"
gcs.mkdir(bucket)
# gcs.mkdir(bucket + "/src" + "/common_prefix/")
# object under common prefix as key
gcs.pipe_file(f"{bucket}/src/common_prefix", b"hello world")
# store more objects with same prefix
gcs.pipe_file(f"{bucket}/src/common_prefix/file1.txt", b"1")
gcs.pipe_file(f"{bucket}/src/common_prefix/file2.txt", b"2")
gcs.invalidate_cache()
# make sure the sources have a collision
assert gcs.isfile(f"{bucket}/src/common_prefix")
assert gcs.isdir(f"{bucket}/src/common_prefix") # BROKEN in gcsfs
assert gcs.isfile(f"{bucket}/src/common_prefix/file1.txt")
assert gcs.isfile(f"{bucket}/src/common_prefix/file2.txt")
# prepare source and destination
src = UPath(f"gs://{bucket}/src", endpoint_url=docker_gcs, token="anon")
dst = UPath(tmp_path)
def on_collision_rename_file(src, dst):
warnings.warn(
f"{src!s} collides with prefix. Renaming target file object to {dst!s}",
UserWarning,
stacklevel=3,
)
return (
dst.with_suffix(dst.suffix + ".COLLISION"),
dst,
)
# perform copy
src.copy_into(dst, on_name_collision=on_collision_rename_file)
# check results
dst_files = sorted(posixify(x.relative_to(tmp_path)) for x in dst.glob("**/*"))
assert dst_files == [
"src",
"src/common_prefix",
"src/common_prefix.COLLISION",
"src/common_prefix/file1.txt",
"src/common_prefix/file2.txt",
]
|