File: service.py

package info (click to toggle)
python-os-ken 4.1.1-3
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid
  • size: 21,396 kB
  • sloc: python: 100,059; erlang: 14,517; ansic: 594; sh: 338; makefile: 136
file content (233 lines) | stat: -rw-r--r-- 8,663 bytes parent folder | download | duplicates (3)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
# Copyright (C) 2014 Nippon Telegraph and Telephone Corporation.
# Copyright (C) 2014 YAMAMOTO Takashi <yamamoto at valinux co jp>
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#    http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.

# ofctl service

import numbers

from os_ken.base import app_manager

from os_ken.controller import ofp_event
from os_ken.controller.handler import CONFIG_DISPATCHER
from os_ken.controller.handler import DEAD_DISPATCHER
from os_ken.controller.handler import MAIN_DISPATCHER
from os_ken.controller.handler import set_ev_cls

from . import event
from . import exception


class _SwitchInfo(object):
    def __init__(self, datapath):
        self.datapath = datapath
        self.xids = {}
        self.barriers = {}
        self.results = {}


class OfctlService(app_manager.OSKenApp):
    def __init__(self, *args, **kwargs):
        super(OfctlService, self).__init__(*args, **kwargs)
        self.name = 'ofctl_service'
        self._switches = {}
        self._observing_events = {}

    def _observe_msg(self, msg_cls):
        assert msg_cls is not None
        ev_cls = ofp_event.ofp_msg_to_ev_cls(msg_cls)
        self._observing_events.setdefault(ev_cls, 0)
        if self._observing_events[ev_cls] == 0:
            self.logger.debug('ofctl: start observing %s', ev_cls)
            self.register_handler(ev_cls, self._handle_reply)
            self.observe_event(ev_cls)
        self._observing_events[ev_cls] += 1

    def _unobserve_msg(self, msg_cls):
        assert msg_cls is not None
        ev_cls = ofp_event.ofp_msg_to_ev_cls(msg_cls)
        assert self._observing_events[ev_cls] > 0
        self._observing_events[ev_cls] -= 1
        if self._observing_events[ev_cls] == 0:
            self.unregister_handler(ev_cls, self._handle_reply)
            self.unobserve_event(ev_cls)
            self.logger.debug('ofctl: stop observing %s', ev_cls)

    def _cancel(self, info, barrier_xid, exception):
        xid = info.barriers.pop(barrier_xid)
        req = info.xids.pop(xid)
        msg = req.msg
        datapath = msg.datapath
        parser = datapath.ofproto_parser
        is_barrier = isinstance(msg, parser.OFPBarrierRequest)

        info.results.pop(xid)

        if not is_barrier and req.reply_cls is not None:
            self._unobserve_msg(req.reply_cls)

        self.logger.error('failed to send message <%s>', req.msg)
        self.reply_to_request(req, event.Reply(exception=exception))

    @staticmethod
    def _is_error(msg):
        return (ofp_event.ofp_msg_to_ev_cls(type(msg)) ==
                ofp_event.EventOFPErrorMsg)

    @set_ev_cls(ofp_event.EventOFPSwitchFeatures, CONFIG_DISPATCHER)
    def _switch_features_handler(self, ev):
        datapath = ev.msg.datapath
        id = datapath.id
        assert isinstance(id, numbers.Integral)
        old_info = self._switches.get(id, None)
        new_info = _SwitchInfo(datapath=datapath)
        self.logger.debug('add dpid %s datapath %s new_info %s old_info %s',
                          id, datapath, new_info, old_info)
        self._switches[id] = new_info
        if old_info:
            old_info.datapath.close()
            for xid in list(old_info.barriers):
                self._cancel(
                    old_info, xid, exception.InvalidDatapath(result=id))

    @set_ev_cls(ofp_event.EventOFPStateChange, DEAD_DISPATCHER)
    def _handle_dead(self, ev):
        datapath = ev.datapath
        id = datapath.id
        self.logger.debug('del dpid %s datapath %s', id, datapath)
        if id is None:
            return
        try:
            info = self._switches[id]
        except KeyError:
            return
        if info.datapath is datapath:
            self.logger.debug('forget info %s', info)
            self._switches.pop(id)
            for xid in list(info.barriers):
                self._cancel(info, xid, exception.InvalidDatapath(result=id))

    @set_ev_cls(event.GetDatapathRequest, MAIN_DISPATCHER)
    def _handle_get_datapath(self, req):
        result = None
        if req.dpid is None:
            result = [v.datapath for v in self._switches.values()]
        else:
            if req.dpid in self._switches:
                result = self._switches[req.dpid].datapath
        self.reply_to_request(req, event.Reply(result=result))

    @set_ev_cls(event.SendMsgRequest, MAIN_DISPATCHER)
    def _handle_send_msg(self, req):
        msg = req.msg
        datapath = msg.datapath
        parser = datapath.ofproto_parser
        is_barrier = isinstance(msg, parser.OFPBarrierRequest)

        try:
            si = self._switches[datapath.id]
        except KeyError:
            self.logger.error('unknown dpid %s' % (datapath.id,))
            rep = event.Reply(exception=exception.
                              InvalidDatapath(result=datapath.id))
            self.reply_to_request(req, rep)
            return

        def _store_xid(xid, barrier_xid):
            assert xid not in si.results
            assert xid not in si.xids
            assert barrier_xid not in si.barriers
            si.results[xid] = []
            si.xids[xid] = req
            si.barriers[barrier_xid] = xid

        if is_barrier:
            barrier = msg
            datapath.set_xid(barrier)
            _store_xid(barrier.xid, barrier.xid)
        else:
            if req.reply_cls is not None:
                self._observe_msg(req.reply_cls)
            datapath.set_xid(msg)
            barrier = datapath.ofproto_parser.OFPBarrierRequest(datapath)
            datapath.set_xid(barrier)
            _store_xid(msg.xid, barrier.xid)
            if not datapath.send_msg(msg):
                return self._cancel(
                    si, barrier.xid,
                    exception.InvalidDatapath(result=datapath.id))

        if not datapath.send_msg(barrier):
            return self._cancel(
                si, barrier.xid,
                exception.InvalidDatapath(result=datapath.id))

    @set_ev_cls(ofp_event.EventOFPBarrierReply, MAIN_DISPATCHER)
    def _handle_barrier(self, ev):
        msg = ev.msg
        datapath = msg.datapath
        parser = datapath.ofproto_parser
        try:
            si = self._switches[datapath.id]
        except KeyError:
            self.logger.error('unknown dpid %s', datapath.id)
            return
        try:
            xid = si.barriers.pop(msg.xid)
        except KeyError:
            self.logger.error('unknown barrier xid %s', msg.xid)
            return
        result = si.results.pop(xid)
        req = si.xids.pop(xid)
        is_barrier = isinstance(req.msg, parser.OFPBarrierRequest)
        if req.reply_cls is not None and not is_barrier:
            self._unobserve_msg(req.reply_cls)
        if is_barrier and req.reply_cls == parser.OFPBarrierReply:
            rep = event.Reply(result=ev.msg)
        elif any(self._is_error(r) for r in result):
            rep = event.Reply(exception=exception.OFError(result=result))
        elif req.reply_multi:
            rep = event.Reply(result=result)
        elif len(result) == 0:
            rep = event.Reply()
        elif len(result) == 1:
            rep = event.Reply(result=result[0])
        else:
            rep = event.Reply(exception=exception.
                              UnexpectedMultiReply(result=result))
        self.reply_to_request(req, rep)

    @set_ev_cls(ofp_event.EventOFPErrorMsg, MAIN_DISPATCHER)
    def _handle_reply(self, ev):
        msg = ev.msg
        datapath = msg.datapath
        try:
            si = self._switches[datapath.id]
        except KeyError:
            self.logger.error('unknown dpid %s', datapath.id)
            return
        try:
            req = si.xids[msg.xid]
        except KeyError:
            self.logger.error('unknown error xid %s', msg.xid)
            return
        if ((not isinstance(ev, ofp_event.EventOFPErrorMsg)) and
                (req.reply_cls is None or not isinstance(ev.msg, req.reply_cls))):
            self.logger.error('unexpected reply %s for xid %s', ev, msg.xid)
            return
        try:
            si.results[msg.xid].append(ev.msg)
        except KeyError:
            self.logger.error('unknown error xid %s', msg.xid)