File: context.py

package info (click to toggle)
python-neutron-lib 3.21.1-1
  • links: PTS, VCS
  • area: main
  • in suites: experimental
  • size: 7,660 kB
  • sloc: python: 22,829; sh: 137; makefile: 24
file content (233 lines) | stat: -rw-r--r-- 8,948 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
#    Licensed under the Apache License, Version 2.0 (the "License"); you may
#    not use this file except in compliance with the License. You may obtain
#    a copy of the License at
#
#         http://www.apache.org/licenses/LICENSE-2.0
#
#    Unless required by applicable law or agreed to in writing, software
#    distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
#    WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
#    License for the specific language governing permissions and limitations
#    under the License.

"""Context: context for security/db session."""

import collections
import copy

from oslo_context import context as oslo_context
from oslo_db.sqlalchemy import enginefacade
from oslo_log import log as logging
from oslo_utils import timeutils

from neutron_lib.db import api as db_api
from neutron_lib.policy import _engine as policy_engine

LOG = logging.getLogger(__name__)


class ContextBase(oslo_context.RequestContext):
    """Security context and request information.

    Represents the user taking a given action within the system.

    """

    def __init__(self, user_id=None, project_id=None, is_admin=None,
                 timestamp=None, project_name=None, user_name=None,
                 is_advsvc=None, tenant_id=None, tenant_name=None,
                 has_global_access=False, **kwargs):
        # NOTE(jamielennox): We maintain this argument order for tests that
        # pass arguments positionally.

        # Prefer project_id and project_name, as that's what's going to be
        # set by keystone.
        # NOTE(haleyb): remove fall-back and warning in E+2 release, or when
        # all callers have been changed to use project_*.
        project_id = project_id or tenant_id
        project_name = project_name or tenant_name
        if tenant_id:
            LOG.warning('Keyword tenant_id has been deprecated, use '
                        'project_id instead')
        if tenant_name:
            LOG.warning('Keyword tenant_name has been deprecated, use '
                        'project_name instead')
        kwargs.setdefault('project_id', project_id)
        kwargs.setdefault('project_name', project_name)
        self._has_global_access = has_global_access
        super().__init__(
            is_admin=is_admin, user_id=user_id, **kwargs)

        self.user_name = user_name
        self.timestamp = timestamp or timeutils.utcnow()
        self._is_advsvc = is_advsvc
        if self._is_advsvc is None:
            self._is_advsvc = (self.is_admin or
                               policy_engine.check_is_advsvc(self))
        self._is_service_role = policy_engine.check_is_service_role(self)
        if self.is_admin is None:
            self.is_admin = policy_engine.check_is_admin(self)
        if not self._has_global_access:
            self._has_global_access = policy_engine.check_has_global_access(
                self)

    @property
    def tenant_id(self):
        return self.project_id

    @tenant_id.setter
    def tenant_id(self, tenant_id):
        self.project_id = tenant_id

    @property
    def tenant_name(self):
        return self.project_name

    @tenant_name.setter
    def tenant_name(self, tenant_name):
        self.project_name = tenant_name

    @property
    def is_service_role(self):
        # TODO(slaweq): we will not need to check ContextBase._is_advsvc once
        # we will get rid of the old API policies
        return self._is_service_role or self._is_advsvc

    @property
    def is_advsvc(self):
        # TODO(slaweq): this property should be removed once we will get rid
        # of old policy rules and only new, S-RBAC rules will be available as
        # then we will use ContextBase.is_service_role instead
        LOG.warning("Method 'is_advsvc' is deprecated since 2023.2 release "
                    "(neutron-lib 3.8.0) and will be removed once support for "
                    "the old RBAC API policies will be removed from Neutron. "
                    "Please use method 'is_service_role' instead.")
        return self.is_service_role

    @property
    def has_global_access(self):
        return self.is_admin or self._has_global_access

    def to_dict(self):
        context = super().to_dict()
        context.update({
            'user_id': self.user_id,
            'tenant_id': self.project_id,
            'project_id': self.project_id,
            'timestamp': str(self.timestamp),
            'tenant_name': self.project_name,
            'project_name': self.project_name,
            'user_name': self.user_name,
            'has_global_access': self.has_global_access,
        })
        return context

    def to_policy_values(self):
        values = super().to_policy_values()
        values['tenant_id'] = self.project_id
        values['is_admin'] = self.is_admin
        values['has_global_access'] = self.has_global_access

        # NOTE(jamielennox): These are almost certainly unused and non-standard
        # but kept for backwards compatibility. Remove them in Pike
        # (oslo.context from Ocata release already issues deprecation warnings
        # for non-standard keys).
        values['user'] = self.user_id
        values['tenant'] = self.project_id
        values['domain'] = self.domain_id
        values['user_domain'] = self.user_domain_id
        values['project_domain'] = self.project_domain_id
        values['tenant_name'] = self.project_name
        values['project_name'] = self.project_name
        values['user_name'] = self.user_name

        return values

    @classmethod
    def from_dict(cls, values):
        cls_obj = super().from_dict(values)
        if values.get('timestamp'):
            cls_obj.timestamp = values['timestamp']
        cls_obj.user_id = values.get('user_id', values.get('user'))
        cls_obj.tenant_id = values.get('tenant_id', values.get('project_id'))
        cls_obj.tenant_name = values.get('tenant_name')
        return cls_obj

    def elevated(self):
        """Return a version of this context with admin flag set."""
        context = copy.copy(self)
        context.is_admin = True

        context.roles = list(
            set(context.roles) | {'admin', 'member', 'reader'}
        )

        return context


@enginefacade.transaction_context_provider
class ContextBaseWithSession(ContextBase):
    pass


_TransactionConstraint = collections.namedtuple(
    '_TransactionConstraint', ['if_revision_match', 'resource', 'resource_id'])


class Context(ContextBaseWithSession):
    def __init__(self, *args, **kwargs):
        super().__init__(*args, **kwargs)
        self._session = None
        self._txn_constraint = None

    @property
    def session(self):
        # TODO(ralonsoh): "Context.session" is provided by
        # "enginefacade.transaction_context_provider" when a new transaction,
        # READER or WRITER, is created. This property is just a temporary fix
        # for those transactions that are not executed inside a transaction.
        # By manually selecting the type of transaction we can speed up the
        # code because by default a writer session is always created, even
        # within read only operations.
        if hasattr(super(), 'session'):
            self._session = super().session
        if self._session is None:
            self._session = db_api.get_writer_session()
        return self._session

    def set_transaction_constraint(self, resource, resource_id, rev_number):
        """Set a revision constraint to enforce before resource_id is changed.

        :param resource: Collection name of resource (e.g. ports or networks)
        :param resource_id: The primary key ID of the individual resource that
                            should have its revision number matched before
                            allowing the transaction to proceed.
        :param rev_number: The revision_number that the resource should be at.
        """

        self._txn_constraint = _TransactionConstraint(
            if_revision_match=rev_number, resource=resource,
            resource_id=resource_id)

    def clear_transaction_constraint(self):
        self._txn_constraint = None

    def get_transaction_constraint(self):
        return self._txn_constraint


def get_admin_context():
    # NOTE(slaweq): elevated() method will set is_admin=True but setting it
    # explicity here will avoid checking in policy rules if is_admin should be
    # set to True or not
    return Context(user_id=None,
                   project_id=None,
                   is_admin=True,
                   overwrite=False).elevated()


def get_admin_context_without_session():
    # NOTE(slaweq): elevated() method will set is_admin=True but setting it
    # explicity here will avoid checking in policy rules if is_admin should be
    # set to True or not
    return ContextBase(user_id=None, project_id=None, is_admin=True).elevated()