File: test_otf2_io.py

package info (click to toggle)
otf2 3.1.1-4
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid
  • size: 29,000 kB
  • sloc: ansic: 92,997; python: 16,977; cpp: 9,057; sh: 6,299; makefile: 238; awk: 54
file content (107 lines) | stat: -rw-r--r-- 4,858 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
import os
import os.path
import unittest
from shutil import rmtree
from tempfile import mkdtemp
import time

import otf2


TIMER_GRANULARITY = 1000000


def t():
    return int(round(time.time() * TIMER_GRANULARITY))


class TestOTF2IO(unittest.TestCase):
    def setUp(self):
        self.old_cwd = os.getcwd()
        self.tmp_dirname = mkdtemp(prefix=os.path.basename(os.path.abspath(__file__))[:-3] + '_tmp', dir=self.old_cwd)

        self.orig_trace = os.path.join(self.tmp_dirname, "orig")
        self.retrace = os.path.join(self.tmp_dirname, "retrc")

        os.mkdir(self.orig_trace)
        os.mkdir(self.retrace)

        os.chdir(self.tmp_dirname)

    def tearDown(self):
        os.chdir(self.old_cwd)
        if os.getenv('KEEP_TEST_OUTPUT', '') != '':
            print(self.tmp_dirname)
        else:
            rmtree(self.tmp_dirname)

    def generate_trace(self, archive_name):
        with otf2.writer.open(archive_name, timer_resolution=TIMER_GRANULARITY) as trace:
            system_tree_node = trace.definitions.system_tree_node("Root", parent=None)
            location_group = trace.definitions.location_group("Process",
                                                              system_tree_parent=system_tree_node)

            writer = trace.event_writer("thread", group=location_group)

            file = trace.definitions.io_regular_file("/home/me/foo.txt", system_tree_node)
            trace.definitions.io_file_property(file, "mount", otf2.Type.STRING, value="/home")
            directory = trace.definitions.io_directory("/etc", system_tree_node)

            posix_paradigm = trace.definitions.io_paradigm("POSIX", "MyFirstPosixParadigm",
                                                           otf2.IoParadigmClass.SERIAL,
                                                           otf2.IoParadigmFlag.NONE,
                                                           {otf2.IoParadigmProperty.VERSION: "1.0"})
            self.assertEqual(posix_paradigm[otf2.IoParadigmProperty.VERSION], "1.0")

            posix_paradigm[otf2.IoParadigmProperty.VERSION] = "2.0"
            self.assertEqual(posix_paradigm[otf2.IoParadigmProperty.VERSION], "2.0")

            posix_paradigm[otf2.IoParadigmProperty.VERSION] =\
                otf2.attribute_value.AttributeValue("3.0", otf2.Type.STRING)
            self.assertEqual(posix_paradigm[otf2.IoParadigmProperty.VERSION], "3.0")

            mpi_io_paradigm = trace.definitions.io_paradigm(
                identification="MPI-IO", name="MyFirstMPI-IOParadigm",
                io_paradigm_class=otf2.IoParadigmClass.PARALLEL,
                io_paradigm_flags=otf2.IoParadigmFlag.NONE,
                properties={otf2.IoParadigmProperty.VERSION:
                            otf2.attribute_value.AttributeValue("0.1", otf2.Type.STRING)})
            self.assertEqual(mpi_io_paradigm[otf2.IoParadigmProperty.VERSION], "0.1")

            stdin = trace.definitions.io_handle('stdin', None, posix_paradigm, otf2.IoHandleFlag.PRE_CREATED)
            trace.definitions.io_pre_created_handle_state(stdin, otf2.IoAccessMode.READ_ONLY)
            stdout = trace.definitions.io_handle('stdout', None, posix_paradigm)
            trace.definitions.io_pre_created_handle_state(stdout, otf2.IoAccessMode.WRITE_ONLY)
            stderr = trace.definitions.io_handle('stderr', None, posix_paradigm)
            trace.definitions.io_pre_created_handle_state(stderr, otf2.IoAccessMode.WRITE_ONLY)

            file_handle = trace.definitions.io_handle('', file, posix_paradigm)
            directory_handle = trace.definitions.io_handle('', directory, mpi_io_paradigm)

            writer.io_create_handle(t(), file_handle, otf2.IoAccessMode.WRITE_ONLY,
                                    otf2.IoCreationFlag.CREATE |
                                    otf2.IoCreationFlag.PATH,
                                    otf2.IoStatusFlag.SYNC |
                                    otf2.IoStatusFlag.CLOSE_ON_EXEC)

    def read_trace(self, archive_name):
        with otf2.reader.open(archive_name) as trace:
            # TODO check something
            pass

    def rewrite_trace(self, old_anchor_path, new_archive_path):
        with otf2.reader.open(old_anchor_path) as trace_reader:
            with otf2.writer.open(new_archive_path,
                                  definitions=trace_reader.definitions) as write_trace:
                for location, event in trace_reader.events:
                    writer = write_trace.event_writer_from_location(location)
                    writer(event)

    def test_rewrite(self):
        self.generate_trace(self.orig_trace)
        self.rewrite_trace(os.path.join(self.orig_trace, "traces.otf2"), self.retrace)
        self.read_trace(os.path.join(self.retrace, "traces.otf2"))


if __name__ == '__main__':
    unittest.main()