1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50
|
"""
Tests for IterableTransferResponse responses from TransferClient
"""
import globus_sdk
from tests.common import register_api_route
SERVER_LIST_TEXT = """{
"DATA": [
{
"DATA_TYPE": "server",
"hostname": "ep1.transfer.globus.org",
"id": 207976,
"incoming_data_port_end": null,
"incoming_data_port_start": null,
"is_connected": true,
"is_paused": false,
"outgoing_data_port_end": null,
"outgoing_data_port_start": null,
"port": 2811,
"scheme": "gsiftp",
"subject": null,
"uri": "gsiftp://ep1.transfer.globus.org:2811"
}
],
"DATA_TYPE": "endpoint_server_list",
"endpoint": "go#ep1"
}"""
def test_server_list(client):
epid = "epid"
register_api_route(
"transfer", f"/endpoint/{epid}/server_list", body=SERVER_LIST_TEXT
)
res = client.endpoint_server_list(epid)
# it should still be a subclass of GlobusHTTPResponse
assert isinstance(res, globus_sdk.GlobusHTTPResponse)
# fetch top-level attrs
assert res["DATA_TYPE"] == "endpoint_server_list"
assert res["endpoint"] == "go#ep1"
# intentionally access twice -- unlike PaginatedResource, this is allowed
# and works
assert len(list(res)) == 1
assert len(list(res)) == 1
assert list(res)[0]["DATA_TYPE"] == "server"
|