1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216
|
"""see upath/tests/conftest.py for fixtures"""
import sys
import warnings
import fsspec
import pytest
from upath import UPath
from upath.implementations.cloud import S3Path
from ..cases import BaseTests
from ..utils import OverrideMeta
from ..utils import extends_base
from ..utils import overrides_base
from ..utils import posixify
def silence_botocore_datetime_deprecation(cls):
# botocore uses datetime.datetime.utcnow in 3.12 which is deprecated
# see: https://github.com/boto/boto3/issues/3889#issuecomment-1751296363
if sys.version_info >= (3, 12):
return pytest.mark.filterwarnings(
"ignore"
r":datetime.datetime.utcnow\(\) is deprecated"
":DeprecationWarning"
)(cls)
else:
return cls
@silence_botocore_datetime_deprecation
class TestUPathS3(BaseTests, metaclass=OverrideMeta):
SUPPORTS_EMPTY_DIRS = False
@pytest.fixture(autouse=True)
def path(self, s3_fixture):
path, anon, s3so = s3_fixture
self.path = UPath(path, anon=anon, **s3so)
self.anon = anon
self.s3so = s3so
@overrides_base
def test_is_correct_class(self):
assert isinstance(self.path, S3Path)
@extends_base
def test_rmdir(self):
dirname = "rmdir_test"
mock_dir = self.path.joinpath(dirname)
mock_dir.joinpath("test.txt").touch()
mock_dir.rmdir()
assert not mock_dir.exists()
with pytest.raises(NotADirectoryError):
self.path.joinpath("file1.txt").rmdir()
@extends_base
def test_relative_to_extra(self):
assert "file.txt" == str(
UPath("s3://test_bucket/file.txt").relative_to(UPath("s3://test_bucket"))
)
@extends_base
def test_iterdir_root(self):
client_kwargs = self.path.storage_options["client_kwargs"]
bucket_path = UPath("s3://other_test_bucket", client_kwargs=client_kwargs)
bucket_path.mkdir()
(bucket_path / "test1.txt").touch()
(bucket_path / "test2.txt").touch()
for x in bucket_path.iterdir():
assert x.name != ""
assert x.exists()
@extends_base
@pytest.mark.parametrize(
"joiner", [["bucket", "path", "file"], ["bucket/path/file"]]
)
def test_no_bucket_joinpath(self, joiner):
path = UPath("s3://", anon=self.anon, **self.s3so)
path = path.joinpath(*joiner)
assert str(path) == "s3://bucket/path/file"
@extends_base
def test_creating_s3path_with_bucket(self):
path = UPath("s3://", bucket="bucket", anon=self.anon, **self.s3so)
assert str(path) == "s3://bucket/"
@extends_base
def test_iterdir_with_plus_in_name(self, s3_with_plus_chr_name):
bucket, anon, s3so = s3_with_plus_chr_name
p = UPath(
f"s3://{bucket}/manual__2022-02-19T14:31:25.891270+00:00",
anon=True,
**s3so,
)
files = list(p.iterdir())
assert len(files) == 1
(file,) = files
assert file == p.joinpath("file.txt")
@extends_base
@pytest.mark.xfail(reason="fsspec/universal_pathlib#144")
def test_rglob_with_double_fwd_slash(self, s3_with_double_fwd_slash_files):
import boto3
import botocore.exceptions
bucket, anon, s3so = s3_with_double_fwd_slash_files
conn = boto3.resource("s3", **s3so["client_kwargs"])
# ensure there's no s3://bucket/key.txt object
with pytest.raises(botocore.exceptions.ClientError, match=".*Not Found.*"):
conn.Object(bucket, "key.txt").load()
# ensure there's a s3://bucket//key.txt object
assert conn.Object(bucket, "/key.txt").get()["Body"].read() == b"hello world"
p0 = UPath(f"s3://{bucket}//key.txt", **s3so)
assert p0.read_bytes() == b"hello world"
p1 = UPath(f"s3://{bucket}", **s3so)
assert list(p1.rglob("*.txt")) == [p0]
@pytest.fixture
def s3_with_plus_chr_name(s3_server):
anon, s3so = s3_server
s3 = fsspec.filesystem("s3", anon=False, **s3so)
bucket = "plus_chr_bucket"
path = f"{bucket}/manual__2022-02-19T14:31:25.891270+00:00"
s3.mkdir(path)
s3.touch(f"{path}/file.txt")
s3.invalidate_cache()
try:
yield bucket, anon, s3so
finally:
if s3.exists(bucket):
for dir, _, keys in s3.walk(bucket):
for key in keys:
if key.rstrip("/"):
s3.rm(f"{dir}/{key}")
@pytest.fixture
def s3_with_double_fwd_slash_files(s3_server):
anon, s3so = s3_server
s3 = fsspec.filesystem("s3", anon=False, **s3so)
bucket = "double_fwd_slash_bucket"
s3.mkdir(bucket + "/")
s3.pipe_file(f"{bucket}//key.txt", b"hello world")
try:
yield bucket, anon, s3so
finally:
if s3.exists(bucket):
for dir, _, keys in s3.walk(bucket):
for key in keys:
if key.rstrip("/"):
s3.rm(f"{dir}/{key}")
def test_path_with_hash_and_space():
assert "with#hash and space" in UPath("s3://bucket/with#hash and space/abc").parts
def test_pathlib_consistent_join():
b0 = UPath("s3://mybucket/withkey/").joinpath("subfolder/myfile.txt")
b1 = UPath("s3://mybucket/withkey").joinpath("subfolder/myfile.txt")
assert b0 == b1
assert "s3://mybucket/withkey/subfolder/myfile.txt" == str(b0) == str(b1)
def test_copy__object_key_collides_with_dir_prefix(s3_server, tmp_path):
pytest.importorskip("s3fs")
anon, s3so = s3_server
s3 = fsspec.filesystem("s3", anon=anon, **{**s3so, "use_listings_cache": False})
bucket = "copy_into_collision_bucket"
s3.mkdir(bucket + "/src" + "/common_prefix/")
# object under common prefix as key
s3.pipe_file(f"{bucket}/src/common_prefix", b"hello world")
# store more objects with same prefix
s3.pipe_file(f"{bucket}/src/common_prefix/file1.txt", b"1")
s3.pipe_file(f"{bucket}/src/common_prefix/file2.txt", b"2")
# make sure the sources have a collision
assert s3.isdir(f"{bucket}/src/common_prefix")
assert s3.isfile(f"{bucket}/src/common_prefix")
assert s3.isfile(f"{bucket}/src/common_prefix/file1.txt")
assert s3.isfile(f"{bucket}/src/common_prefix/file2.txt")
# prepare source and destination
src = UPath(f"s3://{bucket}/src", anon=anon, **s3so)
dst = UPath(tmp_path)
def on_collision_rename_file(src, dst):
warnings.warn(
f"{src!s} collides with prefix. Renaming target file object to {dst!s}",
UserWarning,
stacklevel=3,
)
return (
dst.with_suffix(dst.suffix + ".COLLISION"),
dst,
)
# perform copy
src.copy_into(dst, on_name_collision=on_collision_rename_file)
# check results
dst_files = sorted(posixify(x.relative_to(tmp_path)) for x in dst.glob("**/*"))
assert dst_files == [
"src",
"src/common_prefix",
"src/common_prefix.COLLISION",
"src/common_prefix/file1.txt",
"src/common_prefix/file2.txt",
]
|