File: workflow.py

package info (click to toggle)
python-openstacksdk 4.4.0-5
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid, trixie
  • size: 13,352 kB
  • sloc: python: 122,960; sh: 153; makefile: 23
file content (97 lines) | stat: -rw-r--r-- 3,020 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
#      http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.

from openstack import resource


class Workflow(resource.Resource):
    resource_key = 'workflow'
    resources_key = 'workflows'
    base_path = '/workflows'

    # capabilities
    allow_create = True
    allow_commit = True
    allow_list = True
    allow_fetch = True
    allow_delete = True

    _query_mapping = resource.QueryParameters(
        'marker', 'limit', 'sort_keys', 'sort_dirs', 'fields'
    )

    #: The name of this Workflow
    name = resource.Body("name")
    #: The inputs for this Workflow
    input = resource.Body("input")
    #: A Workflow definition using the Mistral v2 DSL
    definition = resource.Body("definition")
    #: A list of values associated with a workflow that users can use
    #: to group workflows by some criteria
    # TODO(briancurtin): type=list
    tags = resource.Body("tags")
    #: Can be either "private" or "public"
    scope = resource.Body("scope")
    #: The ID of the associated project
    project_id = resource.Body("project_id")
    #: The time at which the workflow was created
    created_at = resource.Body("created_at")
    #: The time at which the workflow was created
    updated_at = resource.Body("updated_at")

    def _request_kwargs(self, prepend_key=True, base_path=None):
        request = self._prepare_request(
            requires_id=False, prepend_key=prepend_key, base_path=base_path
        )

        headers = {"Content-Type": 'text/plain'}
        kwargs = {
            "data": self.definition,
        }

        scope = f"?scope={self.scope}"
        uri = request.url + scope

        request.headers.update(headers)
        return dict(url=uri, json=None, headers=request.headers, **kwargs)

    def create(
        self,
        session,
        prepend_key=True,
        base_path=None,
        **kwargs,
    ):
        kwargs = self._request_kwargs(
            prepend_key=prepend_key, base_path=base_path
        )
        response = session.post(**kwargs)
        self._translate_response(response, has_body=False)
        return self

    def commit(
        self,
        session,
        prepend_key=True,
        has_body=True,
        retry_on_conflict=None,
        base_path=None,
        *,
        microversion=None,
        **kwargs,
    ):
        kwargs = self._request_kwargs(
            prepend_key=prepend_key, base_path=base_path
        )
        response = session.put(**kwargs)
        self._translate_response(response, has_body=False)
        return self