1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295
|
# Copyright (C) 2015 Canonical Ltd.
# Copyright (C) 2016 VMware INC.
#
# Author: Sankar Tanguturi <stanguturi@vmware.com>
#
# This file is part of cloud-init. See LICENSE file for license information.
import logging
import os
import re
from cloudinit import subp, util
from cloudinit.net.network_state import ipv4_mask_to_net_prefix
logger = logging.getLogger(__name__)
def gen_subnet(ip, netmask):
"""
Return the subnet for a given ip address and a netmask
@return (str): the subnet
@param ip: ip address
@param netmask: netmask
"""
ip_array = ip.split(".")
mask_array = netmask.split(".")
result = []
for index in list(range(4)):
result.append(int(ip_array[index]) & int(mask_array[index]))
return ".".join([str(x) for x in result])
class NicConfigurator:
def __init__(self, nics, use_system_devices=True):
"""
Initialize the Nic Configurator
@param nics (list) an array of nics to configure
@param use_system_devices (Bool) Get the MAC names from the system
if this is True. If False, then mac names will be retrieved from
the specified nics.
"""
self.nics = nics
self.mac2Name = {}
self.ipv4PrimaryGateway = None
self.ipv6PrimaryGateway = None
if use_system_devices:
self.find_devices()
else:
for nic in self.nics:
self.mac2Name[nic.mac.lower()] = nic.name
self._primaryNic = self.get_primary_nic()
def get_primary_nic(self):
"""
Retrieve the primary nic if it exists
@return (NicBase): the primary nic if exists, None otherwise
"""
primary_nics = [nic for nic in self.nics if nic.primary]
if not primary_nics:
return None
elif len(primary_nics) > 1:
raise Exception(
"There can only be one primary nic",
[nic.mac for nic in primary_nics],
)
else:
return primary_nics[0]
def find_devices(self):
"""
Create the mac2Name dictionary
The mac address(es) are in the lower case
"""
cmd = ["ip", "addr", "show"]
output, _err = subp.subp(cmd)
sections = re.split(r"\n\d+: ", "\n" + output)[1:]
macPat = r"link/ether (([0-9A-Fa-f]{2}[:]){5}([0-9A-Fa-f]{2}))"
for section in sections:
match = re.search(macPat, section)
if not match: # Only keep info about nics
continue
mac = match.group(1).lower()
name = section.split(":", 1)[0]
self.mac2Name[mac] = name
def gen_one_nic(self, nic):
"""
Return the config list needed to configure a nic
@return (list): the subnets and routes list to configure the nic
@param nic (NicBase): the nic to configure
"""
mac = nic.mac.lower()
name = self.mac2Name.get(mac)
if not name:
raise ValueError("No known device has MACADDR: %s" % nic.mac)
nics_cfg_list = []
cfg = {"type": "physical", "name": name, "mac_address": mac}
subnet_list = []
route_list = []
# Customize IPv4
(subnets, routes) = self.gen_ipv4(name, nic)
subnet_list.extend(subnets)
route_list.extend(routes)
# Customize IPv6
(subnets, routes) = self.gen_ipv6(name, nic)
subnet_list.extend(subnets)
route_list.extend(routes)
cfg.update({"subnets": subnet_list})
nics_cfg_list.append(cfg)
if route_list:
nics_cfg_list.extend(route_list)
return nics_cfg_list
def gen_ipv4(self, name, nic):
"""
Return the set of subnets and routes needed to configure the
IPv4 settings of a nic
@return (set): the set of subnet and routes to configure the gateways
@param name (str): subnet and route list for the nic
@param nic (NicBase): the nic to configure
"""
subnet = {}
route_list = []
if nic.onboot:
subnet.update({"control": "auto"})
bootproto = nic.bootProto.lower()
if nic.ipv4_mode.lower() == "disabled":
bootproto = "manual"
if bootproto != "static":
subnet.update({"type": "dhcp"})
return ([subnet], route_list)
else:
subnet.update({"type": "static"})
# Static Ipv4
addrs = nic.staticIpv4
if not addrs:
return ([subnet], route_list)
v4 = addrs[0]
if v4.ip:
subnet.update({"address": v4.ip})
if v4.netmask:
subnet.update({"netmask": v4.netmask})
# Add the primary gateway
if nic.primary and v4.gateways:
self.ipv4PrimaryGateway = v4.gateways[0]
subnet.update({"gateway": self.ipv4PrimaryGateway})
return ([subnet], route_list)
# Add routes if there is no primary nic
if not self._primaryNic and v4.gateways:
subnet.update(
{"routes": self.gen_ipv4_route(nic, v4.gateways, v4.netmask)}
)
return ([subnet], route_list)
def gen_ipv4_route(self, nic, gateways, netmask):
"""
Return the routes list needed to configure additional Ipv4 route
@return (list): the route list to configure the gateways
@param nic (NicBase): the nic to configure
@param gateways (str list): the list of gateways
"""
route_list = []
cidr = ipv4_mask_to_net_prefix(netmask)
for gateway in gateways:
destination = "%s/%d" % (gen_subnet(gateway, netmask), cidr)
route_list.append(
{
"destination": destination,
"type": "route",
"gateway": gateway,
"metric": 10000,
}
)
return route_list
def gen_ipv6(self, name, nic):
"""
Return the set of subnets and routes needed to configure the
gateways for a nic
@return (set): the set of subnets and routes to configure the gateways
@param name (str): name of the nic
@param nic (NicBase): the nic to configure
"""
if not nic.staticIpv6:
return ([], [])
subnet_list = []
# Static Ipv6
addrs = nic.staticIpv6
for addr in addrs:
subnet = {
"type": "static6",
"address": addr.ip,
"netmask": addr.netmask,
}
subnet_list.append(subnet)
# TODO: Add the primary gateway
route_list = []
# TODO: Add routes if there is no primary nic
# if not self._primaryNic:
# route_list.extend(self._genIpv6Route(name, nic, addrs))
return (subnet_list, route_list)
def _genIpv6Route(self, name, nic, addrs):
route_list = []
for addr in addrs:
route_list.append(
{"type": "route", "gateway": addr.gateway, "metric": 10000}
)
return route_list
def generate(self, configure=False, osfamily=None):
"""Return the config elements that are needed to configure the nics"""
if configure:
logger.info("Configuring the interfaces file")
self.configure(osfamily)
nics_cfg_list = []
for nic in self.nics:
nics_cfg_list.extend(self.gen_one_nic(nic))
return nics_cfg_list
def clear_dhcp(self):
logger.info("Clearing DHCP leases")
# Ignore the return code 1.
subp.subp(["pkill", "dhclient"], rcs=[0, 1])
subp.subp(["rm", "-f", "/var/lib/dhcp/*"])
def configure(self, osfamily=None):
"""
Configure the /etc/network/interfaces
Make a back up of the original
"""
if not osfamily or osfamily != "debian":
logger.info("Debian OS not detected. Skipping the configure step")
return
containingDir = "/etc/network"
interfaceFile = os.path.join(containingDir, "interfaces")
originalFile = os.path.join(
containingDir, "interfaces.before_vmware_customization"
)
if not os.path.exists(originalFile) and os.path.exists(interfaceFile):
os.rename(interfaceFile, originalFile)
lines = [
"# DO NOT EDIT THIS FILE BY HAND --"
" AUTOMATICALLY GENERATED BY cloud-init",
"source /etc/network/interfaces.d/*",
"source-directory /etc/network/interfaces.d",
]
util.write_file(interfaceFile, content="\n".join(lines))
self.clear_dhcp()
# vi: ts=4 expandtab
|