File: test_assync.py

package info (click to toggle)
pywps 4.7.0-1
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid
  • size: 2,016 kB
  • sloc: python: 8,846; xml: 723; makefile: 106
file content (79 lines) | stat: -rw-r--r-- 2,792 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
##################################################################
# Copyright 2018 Open Source Geospatial Foundation and others    #
# licensed under MIT, Please consult LICENSE.txt for details     #
##################################################################

from basic import TestBase
import pytest
import time
from pywps import Service, configuration
from pywps import get_ElementMakerForVersion
from pywps.tests import client_for, assert_response_accepted, assert_response_success
from processes import Sleep
from owslib.wps import WPSExecution
from pathlib import Path
from tempfile import TemporaryDirectory
from pywps import dblog

VERSION = "1.0.0"

WPS, OWS = get_ElementMakerForVersion(VERSION)


class ExecuteTest(TestBase):

    def setUp(self) -> None:
        super().setUp()
        # Running processes using the MultiProcessing scheduler and a file-based database
        configuration.CONFIG.set('processing', 'mode', 'distributed')

    @pytest.mark.xfail(reason="async fails")
    def test_async(self):
        client = client_for(Service(processes=[Sleep()]))
        wps = WPSExecution()

        # Build an asynchronous request (requires specifying outputs and setting the mode).
        doc = wps.buildRequest('sleep',
                               inputs=[('seconds', '.01')],
                               output=[('finished', None, None)],
                               mode='async')

        resp = client.post_xml(doc=doc)
        wps.parseResponse(resp.xml)
        assert_response_accepted(resp)

        # The process should not have finished by now. If it does, it's running in sync mode.
        with pytest.raises(AssertionError):
            assert_response_success(resp)

        # Parse response to extract the status file path
        url = resp.xml.xpath("//@statusLocation")[0]
        print(url)

        # OWSlib only reads from URLs, not local files. So we need to read the response manually.
        p = Path(configuration.get_config_value('server', 'outputpath')) / url.split('/')[-1]

        # Poll the process until it completes
        total_time = 0
        sleep_time = .01
        while wps.status not in ["ProcessSucceeded", "ProcessFailed"]:
            resp = p.read_bytes()
            if resp:
                wps.checkStatus(response=resp, sleepSecs=0.01)
            else:
                time.sleep(sleep_time)
                total_time += sleep_time
            if total_time > 1:
                raise TimeoutError

        assert wps.status == 'ProcessSucceeded'


def load_tests(loader=None, tests=None, pattern=None):
    import unittest
    if not loader:
        loader = unittest.TestLoader()
    suite_list = [
        loader.loadTestsFromTestCase(ExecuteTest),
    ]
    return unittest.TestSuite(suite_list)