File: manager.py

package info (click to toggle)
python-os-ken 4.1.1-3
  • links: PTS, VCS
  • area: main
  • in suites: sid
  • size: 21,396 kB
  • sloc: python: 100,059; erlang: 14,517; ansic: 594; sh: 338; makefile: 136
file content (250 lines) | stat: -rw-r--r-- 9,028 bytes parent folder | download | duplicates (4)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
# Copyright (c) 2014 Rackspace Hosting
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#    http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import ssl
import socket

from os_ken import cfg
from os_ken.base import app_manager
from os_ken.lib import hub
from os_ken.lib import ip
from os_ken.services.protocols.ovsdb import client
from os_ken.services.protocols.ovsdb import event
from os_ken.controller import handler


opts = (cfg.StrOpt('address', default='0.0.0.0', help='OVSDB address'),
        cfg.IntOpt('port', default=6640, help='OVSDB port'),
        cfg.IntOpt('probe-interval', help='OVSDB reconnect probe interval'),
        cfg.IntOpt('min-backoff',
                   help=('OVSDB reconnect minimum milliseconds between '
                         'connection attemps')),
        cfg.IntOpt('max-backoff',
                   help=('OVSDB reconnect maximum milliseconds between '
                         'connection attemps')),
        cfg.StrOpt('mngr-privkey', default=None, help='manager private key'),
        cfg.StrOpt('mngr-cert', default=None, help='manager certificate'),
        cfg.ListOpt('whitelist', default=[],
                    help='Whitelist of address to allow to connect'),
        cfg.ListOpt('schema-tables', default=[],
                    help='Tables in the OVSDB schema to configure'),
        cfg.ListOpt('schema-exclude-columns', default=[],
                    help='Table columns in the OVSDB schema to filter out.  '
                         'Values should be in the format: <table>.<column>.'
                         'Ex: Bridge.netflow,Interface.statistics')
        )

cfg.CONF.register_opts(opts, 'ovsdb')


class OVSDB(app_manager.OSKenApp):
    _EVENTS = [event.EventNewOVSDBConnection,
               event.EventModifyRequest,
               event.EventReadRequest]

    def __init__(self, *args, **kwargs):
        super(OVSDB, self).__init__(*args, **kwargs)
        self._address = self.CONF.ovsdb.address
        self._port = self.CONF.ovsdb.port
        self._probe_interval = self.CONF.ovsdb.probe_interval
        self._min_backoff = self.CONF.ovsdb.min_backoff
        self._max_backoff = self.CONF.ovsdb.max_backoff
        self._clients = {}

    def _accept(self, server):
        if self.CONF.ovsdb.whitelist:
            def check(address):
                if address in self.CONF.ovsdb.whitelist:
                    return True

                self.logger.debug('Connection from non-whitelist client '
                                  '(%s:%s)' % address)
                return False

        else:
            def check(address):
                return True

        while self.is_active:
            try:
                # TODO(jkoelker) SSL Certificate Fingerprint check
                sock, client_address = server.accept()

            except:
                if self.is_active:
                    self.logger.exception('Error accepting connection')
                    continue

            if not check(client_address[0]):
                sock.shutdown(socket.SHUT_RDWR)
                sock.close()
                continue

            if ip.valid_ipv6(client_address[0]):
                self.logger.debug(
                    'New connection from [%s]:%s' % client_address[:2])
            else:
                self.logger.debug(
                    'New connection from %s:%s' % client_address[:2])
            t = hub.spawn(self._start_remote, sock, client_address)
            self.threads.append(t)

    def _bulk_read_handler(self, ev):
        results = []

        def done(gt, *args, **kwargs):
            if gt in self.threads:
                self.threads.remove(gt)

            results.append(gt.wait())

        threads = []
        for c in self._clients.values():
            gt = hub.spawn(c.read_request_handler, ev, bulk=True)
            threads.append(gt)
            self.threads.append(gt)
            gt.link(done)

        hub.joinall(threads)
        rep = event.EventReadReply(None, results)
        self.reply_to_request(ev, rep)

    def _proxy_event(self, ev):
        system_id = ev.system_id
        client_name = client.RemoteOvsdb.instance_name(system_id)

        if client_name not in self._clients:
            self.logger.info('Unknown remote system_id %s' % system_id)
            return

        return self.send_event(client_name, ev)

    def _start_remote(self, sock, client_address):
        schema_tables = cfg.CONF.ovsdb.schema_tables
        schema_ex_col = {}
        if cfg.CONF.ovsdb.schema_exclude_columns:
            for c in cfg.CONF.ovsdb.schema_exclude_columns:
                tbl, col = c.split('.')
                if tbl in schema_ex_col:
                    schema_ex_col[tbl].append(col)
                else:
                    schema_ex_col[tbl] = [col]

        app = client.RemoteOvsdb.factory(sock, client_address,
                                         probe_interval=self._probe_interval,
                                         min_backoff=self._min_backoff,
                                         max_backoff=self._max_backoff,
                                         schema_tables=schema_tables,
                                         schema_exclude_columns=schema_ex_col)

        if app:
            self._clients[app.name] = app
            app.start()
            ev = event.EventNewOVSDBConnection(app)
            self.send_event_to_observers(ev)

        else:
            try:
                sock.shutdown(socket.SHUT_RDWR)
            except:
                pass

            sock.close()

    def start(self):
        if ip.valid_ipv6(self._address):
            server = hub.listen(
                (self._address, self._port), family=socket.AF_INET6)
        else:
            server = hub.listen((self._address, self._port))
        key = self.CONF.ovsdb.mngr_privkey or self.CONF.ctl_privkey
        cert = self.CONF.ovsdb.mngr_cert or self.CONF.ctl_cert

        if key is not None and cert is not None:
            ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
            ctx.load_cert_chain(cert, key)

            if self.CONF.ca_certs is not None:
                ctx.verify_mode = ssl.CERT_REQUIRED
                ctx.load_verify_locations(self.CONF.ca_certs)

            server = ctx.wrap_socket(server, server_side=True)

        self._server = server

        if ip.valid_ipv6(self._address):
            self.logger.info(
                'Listening on [%s]:%s for clients', self._address, self._port)
        else:
            self.logger.info(
                'Listening on %s:%s for clients', self._address, self._port)
        t = hub.spawn(self._accept, self._server)
        super(OVSDB, self).start()
        return t

    def stop(self):
        # NOTE(jkoelker) Attempt to gracefully stop the accept loop
        self.is_active = False

        # NOTE(jkoelker) Forceably kill the loop and clear the main_thread
        if self.main_thread:
            hub.kill(self.main_thread)
            self.main_thread = None

        # NOTE(jkoelker) Stop all the clients
        for c in self._clients.values():
            c.stop()

        # NOTE(jkoelker) super will only take care of the event and joining now
        super(OVSDB, self).stop()

    @handler.set_ev_cls(event.EventModifyRequest)
    def modify_request_handler(self, ev):

        system_id = ev.system_id
        client_name = client.RemoteOvsdb.instance_name(system_id)
        remote = self._clients.get(client_name)

        if not remote:
            msg = 'Unknown remote system_id %s' % system_id
            self.logger.info(msg)
            rep = event.EventModifyReply(system_id, None, None, msg)
            return self.reply_to_request(ev, rep)

        return remote.modify_request_handler(ev)

    @handler.set_ev_cls(event.EventReadRequest)
    def read_request_handler(self, ev):
        system_id = ev.system_id

        if system_id is None:
            def done(gt, *args, **kwargs):
                if gt in self.threads:
                    self.threads.remove(gt)

            thread = hub.spawn(self._bulk_read_handler, ev)
            self.threads.append(thread)
            return thread.link(done)

        client_name = client.RemoteOvsdb.instance_name(system_id)
        remote = self._clients.get(client_name)

        if not remote:
            msg = 'Unknown remote system_id %s' % system_id
            self.logger.info(msg)
            rep = event.EventReadReply(system_id, None, msg)
            return self.reply_to_request(ev, rep)

        return remote.read_request_handler(ev)