File: test_bench.py

package info (click to toggle)
ecflow 5.15.2-2
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid
  • size: 51,868 kB
  • sloc: cpp: 269,341; python: 22,756; sh: 3,609; perl: 770; xml: 333; f90: 204; ansic: 141; makefile: 70
file content (45 lines) | stat: -rw-r--r-- 1,984 bytes parent folder | download | duplicates (3)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
# Used to build ecflow, incrementally and nightly
import os
import ecflow 

# =====================================================================================
# Load the defs from disk
# =====================================================================================
generated_defs_file = os.getenv("WK") + "/build_scripts/test_bench/test_force_cmd.def"
defs = ecflow.Defs(generated_defs_file)
 
# =====================================================================================
# Generated scripts in $SCRATCH
# This assumes the ECF_HOME and ECF_INCLUDE have not been defined
# =====================================================================================
for suite in defs.suites:
    suite.add_variable("ECF_HOME", os.getenv("WK") + "/build_scripts/test_bench/test_force_cmd")
    suite.add_variable("ECF_INCLUDE", os.getenv("WK") + "/build_scripts/test_bench/test_force_cmd/includes")

defs.generate_scripts()


# ====================================================================================
# Check job generation     
# ====================================================================================
job_ctrl = ecflow.JobCreationCtrl()  # no set_node_path() hence check job generation for all tasks
defs.check_job_creation(job_ctrl)
assert len(job_ctrl.get_error_msg()) == 0, job_ctrl.get_error_msg()


# =====================================================================================
# delete the definition in the server, load/replace a new definition
# Assumes server already started
# ======================================================================================
ci = ecflow.Client("localhost","4141")
try:
    ci.restart_server();
   
    ci.delete_all(True)           # clear out the server
    ci.load(defs)             # load the definition into the server
    
    #ci.replace("/suite",defs,True,True)     
    ci.begin_all_suites()   # need only begin once 
 
except RuntimeError as e:
    print("failed: " + str(e))