1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258
|
import os
import pytest
from yarsync import YARsync
from yarsync.yarsync import (
CONFIG_ERROR, COMMAND_ERROR
)
from .helpers import clone_repo
from .settings import TEST_DIR, TEST_DIR_FILTER, TEST_DIR_YS_BAD_PERMISSIONS
clone_command = ["yarsync", "clone"]
rsync_error = 23
def test_clone_from(tmp_path_factory, capfd):
dest = tmp_path_factory.mktemp("test")
os.chdir(str(dest))
# when we clone from, we don't affect the original repo
ys = YARsync(clone_command + ["tmp", TEST_DIR])
returncode = ys()
assert not returncode
test_dir_name = 'test_dir' # os.path.basename(TEST_DIR)
# test_dir was cloned into dest
assert os.listdir(str(dest)) == [test_dir_name]
# files from test_dir were transferred
new_repo_dir = os.path.join(dest, test_dir_name)
assert set(os.listdir(new_repo_dir)) == set(['b', 'a', 'c', '.ys'])
# configuration files were transferred
# clone name was used correctly
new_ys_dir = os.path.join(new_repo_dir, ys.YSDIR)
assert set(os.listdir(new_ys_dir)) == set([
'sync', 'commits', 'repo_tmp.txt', 'logs', 'config.ini'
])
# we don't check every commit, because that is done in pull
new_sync_dir = os.path.join(new_ys_dir, ys.SYNCDIRNAME)
assert set(os.listdir(new_sync_dir)) == set([
'2_TEST.txt', '2_tmp.txt', '2_other_repo.txt'
])
# no errors were issued
captured = capfd.readouterr()
assert not captured.err
## Can't clone from a directory with filter
dest3 = tmp_path_factory.mktemp("dest")
os.chdir(dest3)
ys3 = YARsync(["yarsync", "clone", "clone", TEST_DIR_FILTER])
return_code = ys3()
assert return_code == CONFIG_ERROR
@pytest.mark.parametrize(
"test_dir_source", (TEST_DIR, TEST_DIR_FILTER)
)
def test_clone_to(tmp_path_factory, test_dir_source):
# todo: filter is probably irrelevant for this test
# and doesn't work (where do we check for it?)
source_dir = tmp_path_factory.mktemp("test")
clone_dir = tmp_path_factory.mktemp("test")
repo_name = os.path.basename(source_dir)
# we copy the repository low-level to get rid of side effects
# (synchronization, new remote, etc.)
clone_repo(str(test_dir_source), str(source_dir))
os.chdir(str(source_dir))
ys = YARsync(clone_command + ["tmp", str(clone_dir)])
returncode = ys()
assert not returncode
# todo: capture output
# if "-v" in clone_command: ...
# all files were transferred
new_repo = os.path.join(clone_dir, repo_name)
# we compare sets, because the ordering will be different
new_files = set(os.listdir(new_repo))
assert new_files.issubset(set(os.listdir(source_dir)))
# they are not equal, because rsync-filter excludes 'b'
assert 'a' in new_files
def test_clone_from_errors_1(tmp_path_factory, capfd):
invalid_source = tmp_path_factory.mktemp("invalid_source")
dest1 = tmp_path_factory.mktemp("dest_for_cloning")
## Can't clone from an invalid repository
os.chdir(dest1)
ys1 = YARsync(clone_command + ["origin", str(invalid_source)])
# will be called clone_from, because we are outside.
return_code = ys1._clone_from("origin", str(invalid_source))
assert return_code == COMMAND_ERROR
err = capfd.readouterr().err
assert 'No yarsync repository found at ' in err
## Can't clone into a repository with the same name
os.chdir(dest1)
ys_same_name = YARsync(clone_command + ["TEST", TEST_DIR])
returncode = ys_same_name()
assert returncode == COMMAND_ERROR
assert "Name 'TEST' is already used by the remote. " in capfd.readouterr().err
## Can't clone if the new directory already exists
os.mkdir("test_dir")
ys_exists = YARsync(clone_command + ["new_TEST", TEST_DIR])
returncode = ys_exists()
assert returncode == COMMAND_ERROR
assert "Directory 'test_dir' exists. Aborting" in capfd.readouterr().err
def test_clone_from_errors_2(tmp_path_factory, capfd):
## Can't clone from a repository with bad permissions
dest3 = str(tmp_path_factory.mktemp("dest_to_fail"))
os.chdir(dest3)
# slash added for variability (coverage)
ys3 = YARsync(clone_command + ["clone", TEST_DIR_YS_BAD_PERMISSIONS + '/'])
assert ys3() == rsync_error
captured = capfd.readouterr()
assert "An error occurred while pulling data from" in captured.err
# even though the contents of the directory were not transferred,
# it has been created.
forbidden = os.path.join(dest3, "test_dir_ys_bad_permissions", "forbidden")
os.chmod(forbidden, 0o777)
@pytest.mark.usefixtures("test_dir_ys_bad_permissions")
def test_clone_from_repo_with_bad_permissions(test_dir_common_copy, capfd):
## Can't clone from inside a repository with bad permissions
ys = YARsync(clone_command + ["clone", test_dir_common_copy])
# this is a COMMAND_ERROR, because there are uncommitted changes
assert ys._clone_to("clone", test_dir_common_copy) == COMMAND_ERROR
captured = capfd.readouterr()
assert "local repository has uncommitted changes. Exit." in captured.err
@pytest.mark.usefixtures("test_dir_common_copy")
def test_clone_to_errors_1(tmp_path_factory, capfd):
## Can't clone from here if we can't read the remote parent
# create a directory with bad permissions
dest_bad = str(tmp_path_factory.mktemp("bad_dest"))
# from https://stackoverflow.com/a/25988623/952234
# cur_perm = stat.S_IMODE(os.lstat(dest_bad).st_mode)
# os.chmod(dest_bad, cur_perm & ~stat.S_IREAD)
os.chmod(dest_bad, 0o000)
# do the clone
ys_bad_parent = YARsync(clone_command + ["bad_parent", dest_bad])
returncode = ys_bad_parent()
# tear down permissions
os.chmod(dest_bad, 0o755)
# os.chmod(dest_bad, cur_perm | stat.S_IREAD)
assert returncode == COMMAND_ERROR
captured = capfd.readouterr()
assert "Parent folder of the clone could not be read. Aborting"\
in captured.err
@pytest.mark.usefixtures("test_dir_separate_copy")
def test_clone_to_errors_2(tmp_path_factory, capfd):
## Clone name must not exist in remotes
dest4 = str(tmp_path_factory.mktemp("dest_for_errors"))
ys4 = YARsync(clone_command + ["other_repo", dest4])
assert ys4() == COMMAND_ERROR
assert "remote other_repo exists, break." in capfd.readouterr().err
## synchronization changes are updated normally
sync_dir6 = os.path.join(".ys", "sync")
sync60 = os.listdir(sync_dir6)
assert set(sync60) == set(['2_other_repo.txt', '2_TEST.txt'])
dest6 = str(tmp_path_factory.mktemp("new_dest"))
ys6 = YARsync(clone_command + ["new_clone_2", dest6])
assert ys6() == 0
sync61 = os.listdir(sync_dir6)
assert set(sync61) == set(['2_new_clone_2.txt', '2_other_repo.txt', '2_TEST.txt'])
# manually fix it, otherwise pytest complains about garbage,
# see https://github.com/pytest-dev/pytest/issues/7821
def test_clone_to_errors_3(tmp_path_factory, test_dir_separate_copy, capfd):
# if we don't mark it, we have to cd explicitly
os.chdir(test_dir_separate_copy)
# print(os.getcwd())
## synchronization changes are reverted in case of errors
sync_dir5 = os.path.join(".ys", "sync")
sync50 = set(os.listdir(sync_dir5))
assert sync50 == {'2_other_repo.txt', '2_TEST.txt'}
dest5 = str(tmp_path_factory.mktemp("dest"))
ys5 = YARsync(clone_command + ["new", dest5])
comm_1_dir = os.path.join(".ys", "commits", "1")
repo_dir_name = os.path.basename(test_dir_separate_copy)
comm_1_dir_copy = os.path.join(dest5, repo_dir_name, ".ys", "commits", "1")
os.chmod(comm_1_dir, 0o000)
returncode = ys5()
os.chmod(comm_1_dir, 0o755)
os.chmod(comm_1_dir_copy, 0o755)
# command error, because the problem is with local repo
# rsync error, because otherwise test fails
assert returncode == rsync_error
sync51 = set(os.listdir(sync_dir5))
assert sync51 == sync50
def test_clone_to_errors_parent(tmp_path_factory, capfd):
## Can't clone if the remote contains
## a folder with this directory name
test_dir2 = tmp_path_factory.mktemp("test_dir2")
clone_repo(str(TEST_DIR), str(test_dir2))
os.chdir(test_dir2)
dest_exists = str(tmp_path_factory.mktemp("new_dest"))
# create a directory with the name of local repository in dest
os.mkdir(os.path.join(dest_exists, os.path.basename(test_dir2)))
ys_dest_ex = YARsync(clone_command + ["new", dest_exists])
assert ys_dest_ex() == COMMAND_ERROR
captured = capfd.readouterr()
assert "Repository folder already exists at " + dest_exists in captured.err
def test_clone_with_env_path(tmp_path_factory):
### Clone from path with an environmental variable
### preserves that variable in the configuration
test_dir = tmp_path_factory.mktemp("new_test_dir")
test_dir_name = os.path.basename(test_dir)
clone_repo(str(TEST_DIR), str(test_dir))
## Clone from path with an envvar works
dest1 = tmp_path_factory.mktemp("dest")
os.chdir(dest1)
os.environ["TEST_DIR"] = str(test_dir)
ys1 = YARsync(clone_command + ["clone", "$TEST_DIR"])
return_code = ys1()
assert not return_code
# remove the variable, so that it is no longer expanded in _config
del os.environ["TEST_DIR"]
ys11 = YARsync("yarsync remote show".split())
ys11()
# it is TEST, because it is the name of origin.
assert ys11._config["TEST"]["path"] == "$TEST_DIR"
## Clone to path with an envvar works
dest2 = tmp_path_factory.mktemp("dest2")
os.environ["DEST2"] = str(dest2)
# we are still in dest1
ys2 = YARsync(["yarsync", "clone", "clone2", "$DEST2"])
return_code = ys2()
assert not return_code
del os.environ["DEST2"]
ys21 = YARsync("yarsync remote show".split())
ys21()
assert ys21._config["clone2"]["path"] == \
os.path.join("$DEST2", test_dir_name)
|