File: journal.py

package info (click to toggle)
networking-mlnx 1%3A13.1.0-2
  • links: PTS, VCS
  • area: main
  • in suites: buster
  • size: 704 kB
  • sloc: python: 4,863; sh: 180; makefile: 36
file content (234 lines) | stat: -rw-r--r-- 10,158 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
# Copyright 2016 Mellanox Technologies, Ltd
# All Rights Reserved.
#
#    Licensed under the Apache License, Version 2.0 (the "License"); you may
#    not use this file except in compliance with the License. You may obtain
#    a copy of the License at
#
#         http://www.apache.org/licenses/LICENSE-2.0
#
#    Unless required by applicable law or agreed to in writing, software
#    distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
#    WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
#    License for the specific language governing permissions and limitations
#    under the License.

import re
import threading

from neutron_lib import context as nl_context
from oslo_config import cfg
from oslo_log import log as logging
from oslo_serialization import jsonutils
import requests
from six.moves import html_parser

from networking_mlnx._i18n import _LI, _LE, _LW
from networking_mlnx.db import db
from networking_mlnx.journal import dependency_validations
from networking_mlnx.plugins.ml2.drivers.sdn import client
from networking_mlnx.plugins.ml2.drivers.sdn import constants as sdn_const
from networking_mlnx.plugins.ml2.drivers.sdn import exceptions as sdn_exc
from networking_mlnx.plugins.ml2.drivers.sdn import utils as sdn_utils

LOG = logging.getLogger(__name__)


def call_thread_on_end(func):
    def new_func(obj, *args, **kwargs):
        return_value = func(obj, *args, **kwargs)
        obj.journal.set_sync_event()
        return return_value
    return new_func


def record(db_session, object_type, object_uuid, operation, data,
           context=None):
    db.create_pending_row(db_session, object_type, object_uuid, operation,
                          data)


class SdnJournalThread(object):
    """Thread worker for the SDN Journal Database."""
    def __init__(self):
        self.client = client.SdnRestClient.create_client()
        self._sync_timeout = cfg.CONF.sdn.sync_timeout
        self._row_retry_count = cfg.CONF.sdn.retry_count
        self.event = threading.Event()
        self.lock = threading.Lock()
        self._sync_thread = self.start_sync_thread()
        self._start_sync_timer()

    def start_sync_thread(self):
        # Start the sync thread
        LOG.debug("Starting a new sync thread")
        sync_thread = threading.Thread(
            name='sync',
            target=self.run_sync_thread)
        sync_thread.start()
        return sync_thread

    def set_sync_event(self):
        # Prevent race when starting the timer
        with self.lock:
            LOG.debug("Resetting thread timer")
            self._timer.cancel()
            self._start_sync_timer()
        self.event.set()

    def _start_sync_timer(self):
        self._timer = threading.Timer(self._sync_timeout,
                                      self.set_sync_event)
        self._timer.start()

    def run_sync_thread(self, exit_after_run=False):
        while True:
            try:
                self.event.wait()
                self.event.clear()

                context = nl_context.get_admin_context()
                self._sync_pending_rows(context.session, exit_after_run)
                self._sync_progress_rows(context.session)

                LOG.debug("Clearing sync thread event")
                if exit_after_run:
                    # Permanently waiting thread model breaks unit tests
                    # Adding this arg to exit here only for unit tests
                    break
            except Exception:
                # Catch exceptions to protect the thread while running
                LOG.exception(_LE("Error on run_sync_thread"))

    def _sync_pending_rows(self, session, exit_after_run):
        while True:
            LOG.debug("sync_pending_rows operation walking database")
            row = db.get_oldest_pending_db_row_with_lock(session)
            if not row:
                LOG.debug("No rows to sync")
                break

            # Validate the operation
            valid = dependency_validations.validate(session, row)
            if not valid:
                LOG.info(_LI("%(operation)s %(type)s %(uuid)s is not a "
                             "valid operation yet, skipping for now"),
                         {'operation': row.operation,
                          'type': row.object_type,
                          'uuid': row.object_uuid})
                # Set row back to pending.
                db.update_db_row_state(session, row, sdn_const.PENDING)

                if exit_after_run:
                    break
                continue

            LOG.info(_LI("Syncing %(operation)s %(type)s %(uuid)s"),
                     {'operation': row.operation, 'type': row.object_type,
                      'uuid': row.object_uuid})

            # Add code to sync this to NEO
            urlpath = sdn_utils.strings_to_url(row.object_type)
            if row.operation != sdn_const.POST:
                urlpath = sdn_utils.strings_to_url(urlpath, row.object_uuid)
            try:
                client_operation_method = (
                    getattr(self.client, row.operation.lower()))
                response = (
                    client_operation_method(
                        urlpath, jsonutils.loads(row.data)))
                if response.status_code == requests.codes.not_implemented:
                    db.update_db_row_state(session, row, sdn_const.COMPLETED)
                elif (response.status_code == requests.codes.not_found and
                      row.operation == sdn_const.DELETE):
                    db.update_db_row_state(session, row, sdn_const.COMPLETED)
                else:
                    # update in progress and job_id
                    job_id = None
                    try:
                        try:
                            job_id = response.json()
                        except ValueError:
                            # Note(moshele) workaround for NEO
                            # because for POST port it return html
                            # and not json
                            parser = html_parser.HTMLParser()
                            parser.feed(response.text)
                            parser.handle_starttag('a', [])
                            url = parser.get_starttag_text()
                            match = re.match(
                                r'<a href="([a-zA-Z0-9\/]+)">', url)
                            if match:
                                job_id = match.group(1)
                    except Exception as e:
                        LOG.error(_LE("Failed to extract job_id %s"), e)

                    if job_id:
                        db.update_db_row_job_id(
                            session, row, job_id=job_id)
                        db.update_db_row_state(
                            session, row, sdn_const.MONITORING)
                    else:
                        LOG.warning(_LW("object %s has join id is NULL"),
                                    row.object_uuid)
            except sdn_exc.SDNConnectionError and sdn_exc.SDNLoginError:
                # Don't raise the retry count, just log an error
                LOG.error(_LE("Cannot connect to the NEO Controller"))
                db.update_pending_db_row_retry(session, row,
                                               self._row_retry_count)
                # Break our of the loop and retry with the next
                # timer interval
                break

    def _sync_progress_rows(self, session):
        # 1. get all progressed job
        # 2. get status for NEO
        # 3. Update status if completed/failed
        LOG.debug("sync_progress_rows operation walking database")
        rows = db.get_all_monitoring_db_row_by_oldest(session)
        if not rows:
            LOG.debug("No rows to sync")
            return
        for row in rows:
            try:
                if row.job_id is None:
                    LOG.warning(_LW("object %s has join id is NULL"),
                                row.object_uuid)
                    continue
                response = self.client.get(row.job_id.strip("/"))
                if response:
                    try:
                        job_status = response.json().get('Status')
                        if job_status == 'Completed':
                            db.update_db_row_state(
                                session, row, sdn_const.COMPLETED)
                            continue
                        elif job_status in ("Pending", "Running"):
                            LOG.debug("NEO Job id %(job_id)s is %(status)s "
                                      "continue monitoring",
                                      {'job_id': row.job_id,
                                       'status': job_status})
                            continue
                        else:
                            LOG.error(_LE("NEO Job id %(job_id)s, failed with"
                                          " %(status)s"),
                                      {'job_id': row.job_id,
                                       'status': job_status})
                            db.update_db_row_state(
                                session, row, sdn_const.PENDING)
                    except ValueError or AttributeError:
                        LOG.error(_LE("failed to extract response for job"
                                      "id %s"), row.job_id)
                else:
                    LOG.error(_LE("NEO Job id %(job_id)s, failed with "
                                  "%(status)s"),
                              {'job_id': row.job_id, 'status': job_status})
                    db.update_db_row_state(session, row, sdn_const.PENDING)

            except sdn_exc.SDNConnectionError and sdn_exc.SDNLoginError:
                # Don't raise the retry count, just log an error
                LOG.error(_LE("Cannot connect to the NEO Controller"))
                db.update_db_row_state(session, row, sdn_const.PENDING)
                # Break our of the loop and retry with the next
                # timer interval
                break