1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56
|
#
# Copyright 2009- ECMWF.
#
# This software is licensed under the terms of the Apache Licence version 2.0
# which can be obtained at http://www.apache.org/licenses/LICENSE-2.0.
# In applying this licence, ECMWF does not waive the privileges and immunities
# granted to it by virtue of its status as an intergovernmental organisation
# nor does it submit to any jurisdiction.
#
import os
from ecflow import Suite, Family, Task, Defs, Client, debug_build
import ecflow_test_util as Test
def check_then_auto_add_extern(defs):
error_msg = defs.check()
print(error_msg)
assert len(error_msg) != 0, "Expect error in checks\n" + str(defs)
defs.auto_add_externs(True)
error_msg = defs.check()
assert len(error_msg) == 0, "Expect check to pass after auto add extern\n" + error_msg + "\n" + str(defs)
if __name__ == "__main__":
Test.print_test_start(os.path.basename(__file__))
defs = Defs()
error_msg = defs.check();
assert len(error_msg) == 0, "Expect empty defs to pass check"
suite = defs.add_suite("ext");
f1 = suite.add_family("f1")
f1.add_task("t").add_trigger("/a/b/c/d == complete");
check_then_auto_add_extern(defs)
f1.add_task("t1").add_trigger("/a/b/c/d/e:event == set");
check_then_auto_add_extern(defs)
f1.add_task("t2").add_trigger("/a/b/c/d/x:event");
check_then_auto_add_extern(defs)
f1.add_task("t3").add_trigger("/a/b/c/d/y:meter le 30");
check_then_auto_add_extern(defs)
f1.add_task("t4").add_trigger("/a/b/c/d/z<flag>late");
check_then_auto_add_extern(defs)
f2 = suite.add_family("f2")
f2.add_inlimit("hpcd", "limits");
check_then_auto_add_extern(defs)
f2.add_task("t").add_inlimit("sg1", "/suiteName");
check_then_auto_add_extern(defs)
f2.add_task("t1").add_inlimit("hpcd", "/obs/limits");
check_then_auto_add_extern(defs)
f2.add_task("t2").add_inlimit("c1a", "/limits");
check_then_auto_add_extern(defs)
print(defs)
print("All Tests pass")
|