File: conftest.py

package info (click to toggle)
fenics-dolfinx 1%3A0.9.0-10
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid
  • size: 5,376 kB
  • sloc: cpp: 33,701; python: 22,338; makefile: 230; sh: 170; xml: 55
file content (198 lines) | stat: -rw-r--r-- 6,219 bytes parent folder | download | duplicates (2)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
import gc
import os
import shutil
import sys
import time
from collections import defaultdict

from mpi4py import MPI

import numpy as np
import pytest

from dolfinx.la import vector as dolfinx_vector


def pytest_runtest_teardown(item):
    """Collect garbage after every test to force calling
    destructors which might be collective"""

    # Do the normal teardown
    item.teardown()

    # Collect the garbage (call destructors collectively)
    del item
    # NOTE: How are we sure that 'item' does not hold references to
    # temporaries and someone else does not hold a reference to 'item'?!
    # Well, it seems that it works...
    gc.collect()
    comm = MPI.COMM_WORLD
    comm.Barrier()


# Add 'skip_in_parallel' skip
def pytest_runtest_setup(item):
    marker = item.get_closest_marker("skip_in_parallel")
    if marker and MPI.COMM_WORLD.size > 1:
        pytest.skip("This test should be run in serial only.")
    marker = item.get_closest_marker("xfail_win32_complex")
    if marker and sys.platform.startswith("win32"):
        pytest.xfail("missing _Complex")


@pytest.fixture(scope="module")
def datadir(request):
    """Return the directory of the shared test data. Assumes run from
    within repository filetree."""
    d = os.path.dirname(os.path.abspath(request.module.__file__))
    t = os.path.join(d, "data")
    while not os.path.isdir(t):
        d, t = os.path.split(d)
        t = os.path.join(d, "data")
    return t


def _worker_id(request):
    """Returns thread id when running with pytest-xdist in parallel."""
    try:
        return request.config.workerinput["workerid"]
    except AttributeError:
        return "master"


def _create_tempdir(request):
    # Get directory name of test_foo.py file
    testfile = request.module.__file__
    testfiledir = os.path.dirname(os.path.abspath(testfile))

    # Construct name test_foo_tempdir from name test_foo.py
    testfilename = os.path.basename(testfile)
    outputname = testfilename.replace(".py", f"_tempdir_{_worker_id(request)}")

    # Get function name test_something from test_foo.py
    function = request.function.__name__

    # Join all of these to make a unique path for this test function
    basepath = os.path.join(testfiledir, outputname)
    path = os.path.join(basepath, function)

    # Add a sequence number to avoid collisions when tests are otherwise
    # parameterized
    comm = MPI.COMM_WORLD
    if comm.rank == 0:
        _create_tempdir._sequencenumber[path] += 1
        sequencenumber = _create_tempdir._sequencenumber[path]
    else:
        sequencenumber = None

    sequencenumber = comm.bcast(sequencenumber)
    path += "__" + str(sequencenumber)

    # Delete and re-create directory on root node
    if comm.rank == 0:
        # First time visiting this basepath, delete the old and create a
        # new
        if basepath not in _create_tempdir._basepaths:
            _create_tempdir._basepaths.add(basepath)
            if os.path.exists(basepath):
                shutil.rmtree(basepath)
            # Make sure we have the base path test_foo_tempdir for this
            # test_foo.py file
            if not os.path.exists(basepath):
                os.mkdir(basepath)

        # Delete path from old test run
        if os.path.exists(path):
            shutil.rmtree(path)
        # Make sure we have the path for this test execution: e.g.
        # test_foo_tempdir/test_something__3
        if not os.path.exists(path):
            os.mkdir(path)

        # Wait until the above created the directory
        waited = 0
        while not os.path.exists(path):
            time.sleep(0.1)
            waited += 0.1
            if waited > 1:
                raise RuntimeError(f"Unable to create test directory {path}")

    comm.Barrier()

    return path


# Assigning a function member variables is a bit of a nasty hack
_create_tempdir._sequencenumber = defaultdict(int)  # type: ignore
_create_tempdir._basepaths = set()  # type: ignore


@pytest.fixture(scope="function")
def tempdir(request):
    """Return a unique directory name for this test function instance.

    Deletes and re-creates directory from previous test runs but lets
    the directory stay after the test run for eventual inspection.

    Returns the directory name, derived from the test file and
    function plus a sequence number to work with parameterized tests.

    Does NOT change the current directory.

    MPI safe (assuming MPI.COMM_WORLD context).

    """
    return _create_tempdir(request)


@pytest.fixture(scope="function", autouse=True)
def cg_solver():
    """Conjugate Gradient solver for SPD problems, which can work in
    serial or parallel for testing use. Not suitable for large
    problems."""

    def _cg(comm, A, b, x, maxit=500, rtol=None):
        rtol2 = 10 * np.finfo(x.array.dtype).eps if rtol is None else rtol**2

        def _global_dot(comm, v0, v1):
            return comm.allreduce(np.vdot(v0, v1), MPI.SUM)

        A_op = A.to_scipy()
        nr = A_op.shape[0]
        assert nr == A.index_map(0).size_local * A.block_size[0]

        # Create larger ghosted vector based on matrix column space
        # and get initial y = A.x
        p = dolfinx_vector(A.index_map(1), bs=A.block_size[1], dtype=x.array.dtype)
        p.array[:nr] = x.array[:nr]
        p.scatter_forward()
        y = A_op @ p.array

        # Copy residual to p
        r = b.array[:nr] - y
        p.array[:nr] = r

        # Iterations of CG
        rnorm0 = _global_dot(comm, r, r)
        rnorm = rnorm0
        k = 0
        while k < maxit:
            k += 1
            p.scatter_forward()
            y = A_op @ p.array
            alpha = rnorm / _global_dot(comm, p.array[:nr], y)
            x.array[:nr] += alpha * p.array[:nr]
            r -= alpha * y
            rnorm_new = _global_dot(comm, r, r)
            beta = rnorm_new / rnorm
            rnorm = rnorm_new
            if rnorm / rnorm0 < rtol2:
                x.scatter_forward()
                return
            p.array[:nr] = beta * p.array[:nr] + r

        raise RuntimeError(
            f"Solver exceeded max iterations ({maxit}). Relative residual={rnorm/rnorm0}."
        )

    return _cg