File: testwrap.py

package info (click to toggle)
drmaa 0.7.9-3
  • links: PTS, VCS
  • area: main
  • in suites: bookworm, sid
  • size: 360 kB
  • sloc: python: 1,295; makefile: 130; sh: 34
file content (198 lines) | stat: -rw-r--r-- 6,545 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
# -----------------------------------------------------------
#  Copyright (C) 2009 StatPro Italia s.r.l.
#
#  StatPro Italia
#  Via G. B. Vico 4
#  I-20123 Milano
#  ITALY
#
#  phone: +39 02 96875 1
#  fax:   +39 02 96875 605
#
#  email: info@riskmap.net
#
#  This program is distributed in the hope that it will be
#  useful, but WITHOUT ANY WARRANTY; without even the
#  warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR
#  PURPOSE. See the license for more details.
# -----------------------------------------------------------

"""
test module for the functional interface
"""

from __future__ import absolute_import, print_function, unicode_literals

import sys
import unittest
from os import environ

from nose.tools import eq_

from drmaa import Session, JobTemplate
from drmaa.const import (JobControlAction, JobSubmissionState, PLACEHOLDER_HD,
                         SUBMISSION_STATE_ACTIVE)


# Python 3 compatability help
if sys.version_info < (3, 0):
    bytes = str
    str = unicode


def setup():
    "initialize DRMAA library"
    Session.initialize()


def teardown():
    "finalize DRMAA session"
    Session.exit()


def test_allocate():
    "job template allocation"
    jt = Session.createJobTemplate()
    Session.deleteJobTemplate(jt)


class SubmitBase(unittest.TestCase):

    def setUp(self):
        self.jt = jt = Session.createJobTemplate()
        jt.remoteCommand = 'python'
        jt.args = ['-c', "print('hello from python!')"]
        if hasattr(self, 'jt_tweaks'):
            self.jt_tweaks()
        self.jid = Session.runJob(jt)

    def tearDown(self):
        Session.deleteJobTemplate(self.jt)


class EnvironmentTest(SubmitBase):

    def jt_tweaks(self):
        environ['PIPPO'] = 'aaaa'
        self.jt.args = (["-c",
                         ("from os import environ as env; assert ('PIPPO' in " +
                          "env) and (env['PIPPO'] == 'aaaa')")])
        self.jt.jobEnvironment = environ

    def test_environment(self):
        """environment variables are correctly passed to submitted jobs"""
        jinfo = Session.wait(self.jid)
        eq_(jinfo.jobId, self.jid)
        assert hasattr(jinfo, 'hasExited')
        assert hasattr(jinfo, 'exitStatus') and jinfo.exitStatus == 0


class Submit(SubmitBase):

    def test_run_bulk(self):
        """run bulk job"""
        jids = Session.runBulkJobs(self.jt, 1, 2, 1)

    def test_wait(self):
        """waiting for job completion"""
        jinfo = Session.wait(self.jid)
        eq_(jinfo.jobId, self.jid)
        assert hasattr(jinfo, 'hasExited')
        assert hasattr(jinfo, 'hasExited') and type(jinfo.hasExited) is bool
        assert hasattr(jinfo, 'hasSignal') and type(jinfo.hasSignal) is bool
        assert hasattr(jinfo, 'terminatedSignal') and type(jinfo.terminatedSignal) is str
        assert hasattr(jinfo, 'hasCoreDump') and type(jinfo.hasCoreDump) is bool
        assert hasattr(jinfo, 'wasAborted') and type(jinfo.wasAborted) is bool
        assert hasattr(jinfo, 'exitStatus') and type(jinfo.exitStatus) is int
        assert hasattr(jinfo, 'resourceUsage') and type(jinfo.resourceUsage) is dict

    def test_sync(self):
        """sync with a job"""
        Session.synchronize(self.jid)

    def test_control_terminate(self):
        """control/terminate works"""
        Session.control(self.jid, JobControlAction.TERMINATE)
        Session.synchronize(self.jid,
                            Session.TIMEOUT_WAIT_FOREVER,
                            False)
        try:
            Session.wait(self.jid, Session.TIMEOUT_WAIT_FOREVER)
        except Exception as e:
            assert e.args[0].startswith('code 24')  # no rusage


class JobTemplateTests(unittest.TestCase):

    def setUp(self):
        self.jt = Session.createJobTemplate()

    def test_scalar_attributes(self):
        """scalar attributes work"""
        for name, value in [("remoteCommand", 'cat'),
                            ("jobSubmissionState", SUBMISSION_STATE_ACTIVE),
                            ("workingDirectory", JobTemplate.HOME_DIRECTORY),
                            ("nativeSpecification", '-shell yes'),
                            ("blockEmail", False),
                            ("jobName", 'pippo'),
                            ("inputPath", ":%s" % PLACEHOLDER_HD),
                            ("outputPath", ":%s/pippo.out" % PLACEHOLDER_HD),
                            ("errorPath", ":%s/pippo.out" % PLACEHOLDER_HD),
                            ("joinFiles", True)]:
            setattr(self.jt, name, value)
            eq_(getattr(self.jt, name), value)

    # skipping this. the parameters above have to be tweaked a bit
    def xtest_tmp(self):
        self.test_scalar_attributes()
        self.jt.args = ['.colordb']
        jid = Session.runJob(self.jt)
        jinfo = Session.wait(jid)
        print(jinfo)

    def test_vector_attributes(self):
        """vector attributes work"""
        args = [10, 'de', 'arglebargle']
        self.jt.args = args
        eq_(self.jt.args, ['10', 'de', 'arglebargle'])
        em = ['baz@quz.edu', 'foo@bar.com']
        self.jt.email = em
        eq_(self.jt.email, em)

    def test_dict_attribute(self):
        """dict attributes work"""
        from drmaa.const import ATTR_BUFFER
        self.jt.jobEnvironment = environ
        for x in environ:
            # attribute values could be truncated. For some reason,
            # GE returns the first 1014 chars available (!)
            eq_(environ[x][:ATTR_BUFFER - 10],
                self.jt.jobEnvironment[x][:ATTR_BUFFER - 10])

    def test_attribute_names(self):
        """attribute names work"""
        assert len(self.jt.attributeNames) > 0

    def test_block_email(self):
        """blockEmail works"""
        self.jt.blockEmail = True
        assert self.jt.blockEmail
        self.jt.blockEmail = False
        assert not self.jt.blockEmail

    def test_join_files(self):
        """joinFiles works"""
        self.jt.joinFiles = True
        assert self.jt.joinFiles
        self.jt.joinFiles = False
        assert not self.jt.joinFiles

    def test_submission_state(self):
        """submission state attributes work"""
        self.jt.jobSubmissionState = JobSubmissionState.HOLD_STATE
        eq_(self.jt.jobSubmissionState, JobSubmissionState.HOLD_STATE)
        self.jt.jobSubmissionState = JobSubmissionState.ACTIVE_STATE
        eq_(self.jt.jobSubmissionState, JobSubmissionState.ACTIVE_STATE)

    def tearDown(self):
        Session.deleteJobTemplate(self.jt)