1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72
|
import json
import globus_sdk
from globus_sdk._missing import filter_missing
from globus_sdk.testing import get_last_request, load_response
def test_dummy_timer_creation(client):
# create a timer with a dummy payload and validate how it gets wrapped by the
# sending method
meta = load_response(client.create_timer).metadata
timer = client.create_timer(timer={"foo": "bar"})
assert timer["timer"]["job_id"] == meta["timer_id"]
req = get_last_request()
sent = json.loads(req.body)
assert sent == {"timer": {"foo": "bar"}}
def test_transfer_timer_creation(client):
# create a timer using the payload helpers and confirm that it is serialized as
# desired
meta = load_response(client.create_timer).metadata
body = globus_sdk.TransferData(
source_endpoint=meta["source_endpoint"],
destination_endpoint=meta["destination_endpoint"],
)
body.add_item("/share/godata/file1.txt", "/~/file1.txt")
schedule = globus_sdk.RecurringTimerSchedule(
interval_seconds=60, end={"condition": "iterations", "iterations": 3}
)
timer = client.create_timer(
timer=globus_sdk.TransferTimer(body=body, schedule=schedule)
)
assert timer["timer"]["job_id"] == meta["timer_id"]
req = get_last_request()
sent = json.loads(req.body)
assert sent["timer"]["schedule"] == {
"type": "recurring",
"interval_seconds": 60,
"end": {"condition": "iterations", "iterations": 3},
}
assert sent["timer"]["body"] == {
k: [filter_missing(data_val) for data_val in v] if k == "DATA" else v
for k, v in filter_missing(body).items()
if k != "skip_activation_check"
}
def test_flow_timer_creation(client):
# Setup
meta = load_response(client.create_timer, case="flow_timer_success").metadata
# Act
client.create_timer(
timer=globus_sdk.FlowTimer(
flow_id=meta["flow_id"],
body=meta["callback_body"],
schedule=meta["schedule"],
)
)
# Verify
req = get_last_request()
sent = json.loads(req.body)
assert sent["timer"]["flow_id"] == meta["flow_id"]
assert sent["timer"]["body"] == meta["callback_body"]
assert sent["timer"]["schedule"] == meta["schedule"]
|