1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97
|
import pytest
from globus_sdk import (
ActiveScaleStoragePolicies,
AzureBlobStoragePolicies,
BlackPearlStoragePolicies,
BoxStoragePolicies,
CephStoragePolicies,
GoogleCloudStoragePolicies,
GoogleDriveStoragePolicies,
HPSSStoragePolicies,
IrodsStoragePolicies,
OneDriveStoragePolicies,
POSIXStagingStoragePolicies,
POSIXStoragePolicies,
S3StoragePolicies,
StorageGatewayDocument,
)
from globus_sdk.transport import JSONRequestEncoder
@pytest.mark.parametrize(
"use_kwargs,doc_version",
[
({}, "1.0.0"),
({"require_mfa": True}, "1.1.0"),
({"require_mfa": False}, "1.1.0"),
],
)
def test_datatype_version_deduction(use_kwargs, doc_version):
sg = StorageGatewayDocument(**use_kwargs)
assert sg["DATA_TYPE"] == f"storage_gateway#{doc_version}"
def test_storage_gateway_policy_document_conversion():
policies = POSIXStoragePolicies(
groups_allow=["jedi", "wookies"], groups_deny=["sith", "stormtroopers"]
)
sg = StorageGatewayDocument(policies=policies)
assert "policies" in sg
assert isinstance(sg["policies"], POSIXStoragePolicies)
encoder = JSONRequestEncoder()
request_data = encoder.encode("POST", "bogus.url.example", {}, sg, {}).json
assert request_data["policies"] == {
"DATA_TYPE": "posix_storage_policies#1.0.0",
"groups_allow": ["jedi", "wookies"],
"groups_deny": ["sith", "stormtroopers"],
}
def test_posix_staging_env_vars():
p = POSIXStagingStoragePolicies(
groups_allow=("vulcans", "starfleet"),
groups_deny=(x for x in ("ferengi", "romulans")),
stage_app="/globus/bin/posix-stage-data",
environment=({"name": "VOLUME", "value": "/vol/0"},),
)
assert isinstance(p["environment"], list)
assert dict(p) == {
"DATA_TYPE": "posix_staging_storage_policies#1.0.0",
"groups_allow": ["vulcans", "starfleet"],
"groups_deny": ["ferengi", "romulans"],
"stage_app": "/globus/bin/posix-stage-data",
"environment": [{"name": "VOLUME", "value": "/vol/0"}],
}
@pytest.mark.parametrize(
"doc_class",
[
StorageGatewayDocument,
POSIXStagingStoragePolicies,
POSIXStoragePolicies,
BlackPearlStoragePolicies,
BoxStoragePolicies,
CephStoragePolicies,
GoogleDriveStoragePolicies,
GoogleCloudStoragePolicies,
OneDriveStoragePolicies,
AzureBlobStoragePolicies,
S3StoragePolicies,
ActiveScaleStoragePolicies,
IrodsStoragePolicies,
HPSSStoragePolicies,
],
)
def test_storage_gateway_documents_support_additional_fields(doc_class):
d = doc_class()
assert "DATA_TYPE" in d
assert "foo" not in d
d2 = doc_class(additional_fields={"foo": "bar"})
assert "DATA_TYPE" in d2
assert d2["foo"] == "bar"
|