File: network.py

package info (click to toggle)
python-tempestconf 3.5.1-1
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid, trixie
  • size: 964 kB
  • sloc: python: 4,530; makefile: 18; sh: 9
file content (101 lines) | stat: -rw-r--r-- 4,103 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
# Copyright 2013, 2016 Red Hat, Inc.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
#      http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.

import json

from config_tempest.constants import LOG
from config_tempest.services.base import VersionedService


class NetworkService(VersionedService):
    def set_extensions(self):
        body = self.do_get(self.service_url + '/v2.0/extensions.json')
        body = json.loads(body)
        self.extensions = list(map(lambda x: x['alias'], body['extensions']))

    def set_versions(self):
        super(NetworkService, self).set_versions(top_level=False)

    def create_tempest_networks(self, conf, network):
        LOG.info("Setting up network")
        self.client = self.client.networks
        self.create_tempest_networks_neutron(conf, network)

    def create_tempest_networks_neutron(self, conf, public_network):
        self._public_network = public_network
        self._public_network_name = None
        self._public_network_id = None
        # if user supplied the network we should use
        if public_network:
            self._supplied_network()
        # no network id provided, try to auto discover a public network
        else:
            self._discover_network()
        if self._public_network_id is not None:
            conf.set('network', 'public_network_id', self._public_network_id)
        if self._public_network_name is not None:
            conf.set('network', 'floating_network_name',
                     self._public_network_name)

    def get_service_extension_key(self):
        return 'api_extensions'

    def _supplied_network(self):
        LOG.info("Looking for existing network: %s",
                 self._public_network)
        given_net = self._public_network
        # check if network exists
        network_list = self.client.list_networks()
        for network in network_list['networks']:
            if network['id'] == given_net or network['name'] == given_net:
                self._public_network_name = network['name']
                self._public_network_id = network['id']
                break
        else:
            raise ValueError('provided network: {0} was not found.'
                             ''.format(self._public_network))

    def _discover_network(self):
        LOG.info("No network supplied, trying auto discover for an external "
                 "network while prioritizing the one called public, if not "
                 "found, the network discovered last will be used.")
        network_list = self.client.list_networks()

        for network in network_list['networks']:
            if network['router:external'] and network['subnets']:
                if network['status'] != 'ACTIVE':
                    continue
                self._public_network_id = network['id']
                self._public_network_name = network['name']
                # usually the external network we use is called 'public'
                if network['name'] == 'public':
                    # we found a network called public, end the loop
                    break
        # Couldn't find an existing external network
        if not self._public_network_name:
            LOG.error("No external networks found. "
                      "Please note that any test that relies on external "
                      "connectivity would most likely fail.")
            return
        LOG.info("Setting %s as the public network for tempest",
                 self._public_network_id)

    @staticmethod
    def get_service_type():
        return ['network']

    @staticmethod
    def get_codename():
        return 'neutron'