File: __init__.py

package info (click to toggle)
pywps 4.7.0-1
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid
  • size: 2,016 kB
  • sloc: python: 8,846; xml: 723; makefile: 106
file content (136 lines) | stat: -rw-r--r-- 4,338 bytes parent folder | download | duplicates (2)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
##################################################################
# Copyright 2018 Open Source Geospatial Foundation and others    #
# licensed under MIT, Please consult LICENSE.txt for details     #
##################################################################

from pywps import Process
from pywps.inout import LiteralInput, LiteralOutput, BoundingBoxInput, BoundingBoxOutput
from pywps.inout.literaltypes import ValuesReference


class SimpleProcess(Process):
    identifier = "simpleprocess"

    def __init__(self):
        self.add_input(LiteralInput())


class UltimateQuestion(Process):
    def __init__(self):
        super(UltimateQuestion, self).__init__(
            self._handler,
            identifier='ultimate_question',
            title='Ultimate Question',
            outputs=[LiteralOutput('outvalue', 'Output Value', data_type='string')])

    @staticmethod
    def _handler(request, response):
        response.outputs['outvalue'].data = '42'
        return response


class Greeter(Process):
    def __init__(self):
        super(Greeter, self).__init__(
            self.greeter,
            identifier='greeter',
            title='Greeter',
            inputs=[LiteralInput('name', 'Input name', data_type='string')],
            outputs=[LiteralOutput('message', 'Output message', data_type='string')]
        )

    @staticmethod
    def greeter(request, response):
        name = request.inputs['name'][0].data
        assert type(name) is str
        response.outputs['message'].data = "Hello {}!".format(name)
        return response


class InOut(Process):
    def __init__(self):
        super(InOut, self).__init__(
            self.inout,
            identifier='inout',
            title='In and Out',
            inputs=[
                LiteralInput('string', 'String', data_type='string'),
                LiteralInput('time', 'Time', data_type='time',
                             default='12:00:00'),
                LiteralInput('ref_value', 'Referenced Value', data_type='string',
                    allowed_values=ValuesReference(reference="https://en.wikipedia.org/w/api.php?action=opensearch&search=scotland&format=json"),  # noqa
                    default='Scotland',),
            ],
            outputs=[
                LiteralOutput('string', 'Output', data_type='string')
            ]
        )

    @staticmethod
    def inout(request, response):
        a_string = request.inputs['string'][0].data
        response.outputs['string'].data = "".format(a_string)
        return response


class BBox(Process):
    def __init__(self):
        super(BBox, self).__init__(
            self.bbox,
            identifier='bbox_test',
            title='BBox Test',
            inputs=[
                BoundingBoxInput(
                    'area',
                    'An area',
                    abstract='Define the area of interest',
                    crss=['epsg:4326', 'epsg:3857'],
                    min_occurs=1,
                    max_occurs=1
                ),
            ],
            outputs=[
                BoundingBoxOutput('extent', 'Extent', crss=['epsg:4326', 'epsg:3857'])
            ]
        )

    @staticmethod
    def bbox(request, response):
        area = request.inputs['area'][0].data
        response.outputs['extent'].data = area
        return response


class Sleep(Process):
    """A long running process, just sleeping."""
    def __init__(self):
        inputs = [
            LiteralInput('seconds', title='Seconds', data_type='float')
        ]
        outputs = [
            LiteralOutput('finished', title='Finished', data_type='boolean')
        ]

        super(Sleep, self).__init__(
            self._handler,
            identifier='sleep',
            title='Sleep',
            abstract='Wait for specified number of seconds.',
            inputs=inputs,
            outputs=outputs,
            store_supported=True,
            status_supported=True
        )

    @staticmethod
    def _handler(request, response):
        import time

        seconds = request.inputs['seconds'][0].data
        step = seconds / 3
        for i in range(3):
            response.update_status('Sleep in progress...', i / 3 * 100)
            time.sleep(step)

        response.outputs['finished'].data = "True"
        return response