File: firewall.py

package info (click to toggle)
python-softlayer 6.2.5-1
  • links: PTS, VCS
  • area: main
  • in suites: forky, trixie
  • size: 7,508 kB
  • sloc: python: 57,195; makefile: 133; xml: 97; sh: 59
file content (337 lines) | stat: -rw-r--r-- 13,364 bytes parent folder | download | duplicates (2)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
"""
    SoftLayer.firewall
    ~~~~~~~~~~~~~~~~~~
    Firewall Manager/helpers

    :license: MIT, see LICENSE for more details.
"""
from SoftLayer import exceptions
from SoftLayer import utils

RULE_MASK = ('mask[orderValue,action,destinationIpAddress,'
             'destinationIpSubnetMask,protocol,destinationPortRangeStart,'
             'destinationPortRangeEnd,sourceIpAddress,sourceIpSubnetMask,'
             'version,notes]')


def has_firewall(vlan):
    """Helper to determine whether or not a VLAN has a firewall.

    :param dict vlan: A dictionary representing a VLAN
    :returns: True if the VLAN has a firewall, false if it doesn't.
    """
    return bool(
        vlan.get('dedicatedFirewallFlag', None) or
        vlan.get('highAvailabilityFirewallFlag', None) or
        vlan.get('firewallInterfaces', None) or
        vlan.get('firewallNetworkComponents', None) or
        vlan.get('firewallGuestNetworkComponents', None)
    )


class FirewallManager(utils.IdentifierMixin, object):
    """Manages SoftLayer firewalls

    See product information here: https://www.ibm.com/cloud/network-security

    :param SoftLayer.API.BaseClient client: the client instance

    """

    def __init__(self, client):
        self.client = client
        self.account = self.client['Account']
        self.prod_pkg = self.client['Product_Package']

    def get_standard_package(self, server_id, is_virt=True):
        """Retrieves the standard firewall package for the virtual server.

        :param int server_id: The ID of the server to create the firewall for
        :param bool is_virt: True if the ID provided is for a virtual server,
                             False for a server
        :returns: A dictionary containing the standard virtual server firewall
                  package
        """

        firewall_port_speed = self._get_fwl_port_speed(server_id, is_virt)

        _value = "%s%s" % (firewall_port_speed, "Mbps Hardware Firewall")
        _filter = {'items': {'description': utils.query_filter(_value)}}

        return self.prod_pkg.getItems(id=0, filter=_filter)

    def get_dedicated_package(self, ha_enabled=False):
        """Retrieves the dedicated firewall package.

        :param bool ha_enabled: True if HA is to be enabled on the firewall
                                False for No HA
        :returns: A dictionary containing the dedicated virtual server firewall
                  package
        """

        fwl_filter = 'Hardware Firewall (Dedicated)'
        ha_fwl_filter = 'Hardware Firewall (High Availability)'
        _filter = utils.NestedDict({})
        if ha_enabled:
            _filter['items']['description'] = utils.query_filter(ha_fwl_filter)
        else:
            _filter['items']['description'] = utils.query_filter(fwl_filter)

        return self.prod_pkg.getItems(id=0, filter=_filter.to_dict())

    def cancel_firewall(self, firewall_id, dedicated=False):
        """Cancels the specified firewall.

        :param int firewall_id: Firewall ID to be cancelled.
        :param bool dedicated: If true, the firewall instance is dedicated,
                               otherwise, the firewall instance is shared.
        """

        fwl_billing = self._get_fwl_billing_item(firewall_id, dedicated)
        billing_item_service = self.client['Billing_Item']
        return billing_item_service.cancelService(id=fwl_billing['id'])

    def add_standard_firewall(self, server_id, is_virt=True):
        """Creates a firewall for the specified virtual/hardware server.

        :param int server_id: The ID of the server to create the firewall for
        :param bool is_virt: If true, will create the firewall for a virtual
                             server, otherwise for a hardware server.
        :returns: A dictionary containing the standard virtual server firewall
                  order
        """

        package = self.get_standard_package(server_id, is_virt)
        if is_virt:
            product_order = {
                'complexType': 'SoftLayer_Container_Product_Order_Network_'
                               'Protection_Firewall',
                'quantity': 1,
                'packageId': 0,
                'virtualGuests': [{'id': server_id}],
                'prices': [{'id': package[0]['prices'][0]['id']}]
            }
        else:
            product_order = {
                'complexType': 'SoftLayer_Container_Product_Order_Network_'
                               'Protection_Firewall',
                'quantity': 1,
                'packageId': 0,
                'hardware': [{'id': server_id}],
                'prices': [{'id': package[0]['prices'][0]['id']}]
            }
        return self.client['Product_Order'].placeOrder(product_order)

    def add_vlan_firewall(self, vlan_id, ha_enabled=False):
        """Creates a firewall for the specified vlan.

        :param int vlan_id: The ID of the vlan to create the firewall for
        :param bool ha_enabled: If True, an HA firewall will be created

        :returns: A dictionary containing the VLAN firewall order
        """

        package = self.get_dedicated_package(ha_enabled)
        product_order = {
            'complexType': 'SoftLayer_Container_Product_Order_Network_'
                           'Protection_Firewall_Dedicated',
            'quantity': 1,
            'packageId': 0,
            'vlanId': vlan_id,
            'prices': [{'id': package[0]['prices'][0]['id']}]
        }
        return self.client['Product_Order'].placeOrder(product_order)

    def _get_fwl_billing_item(self, firewall_id, dedicated=False):
        """Retrieves the billing item of the firewall.

        :param int firewall_id: Firewall ID to get the billing item for
        :param bool dedicated: whether the firewall is dedicated or standard
        :returns: A dictionary of the firewall billing item.
        """

        mask = 'mask[id,billingItem[id]]'
        if dedicated:
            firewall_service = self.client['Network_Vlan_Firewall']
        else:
            firewall_service = self.client['Network_Component_Firewall']
        firewall = firewall_service.getObject(id=firewall_id, mask=mask)
        if firewall is None:
            raise exceptions.SoftLayerError(f"Unable to find firewall {firewall_id}")
        if firewall.get('billingItem') is None:
            raise exceptions.SoftLayerError(f"Unable to find billing item for firewall {firewall_id}")

        return firewall['billingItem']

    def _get_fwl_port_speed(self, server_id, is_virt=True):
        """Determines the appropriate speed for a firewall.

        :param int server_id: The ID of server the firewall is for
        :param bool is_virt: True if the server_id is for a virtual server
        :returns: a integer representing the Mbps speed of a firewall
        """

        fwl_port_speed = 0
        if is_virt:
            mask = 'primaryNetworkComponent[maxSpeed]'
            svc = self.client['Virtual_Guest']
            primary = svc.getObject(mask=mask, id=server_id)
            fwl_port_speed = primary['primaryNetworkComponent']['maxSpeed']
        else:
            mask = 'id,maxSpeed,networkComponentGroup.networkComponents'
            svc = self.client['Hardware_Server']
            network_components = svc.getFrontendNetworkComponents(
                mask=mask, id=server_id)
            grouped = [interface['networkComponentGroup']['networkComponents']
                       for interface in network_components
                       if 'networkComponentGroup' in interface]
            ungrouped = [interface
                         for interface in network_components
                         if 'networkComponentGroup' not in interface]

            # For each group, sum the maxSpeeds of each compoment in the
            # group. Put the sum for each in a new list
            group_speeds = []
            for group in grouped:
                group_speed = 0
                for interface in group:
                    group_speed += interface['maxSpeed']
                group_speeds.append(group_speed)

            # The max speed of all groups is the max of the list
            max_grouped_speed = max(group_speeds)

            max_ungrouped = 0
            for interface in ungrouped:
                max_ungrouped = max(max_ungrouped, interface['maxSpeed'])

            fwl_port_speed = max(max_grouped_speed, max_ungrouped)

        return fwl_port_speed

    def get_firewalls(self):
        """Returns a list of all firewalls on the account.

        :returns: A list of firewalls on the current account.
        """

        mask = ('firewallNetworkComponents,'
                'networkVlanFirewall,'
                'dedicatedFirewallFlag,'
                'firewallGuestNetworkComponents,'
                'firewallInterfaces,'
                'firewallRules,'
                'highAvailabilityFirewallFlag')

        return [firewall
                for firewall in self.account.getNetworkVlans(mask=mask)
                if has_firewall(firewall)]

    def get_standard_fwl_rules(self, firewall_id):
        """Get the rules of a standard firewall.

        :param integer firewall_id: the instance ID of the standard firewall
        :returns: A list of the rules.
        """

        svc = self.client['Network_Component_Firewall']
        return svc.getRules(id=firewall_id, mask=RULE_MASK)

    def get_dedicated_fwl_rules(self, firewall_id):
        """Get the rules of a dedicated firewall.

        :param integer firewall_id: the instance ID of the dedicated firewall
        :returns: A list of the rules.
        """

        svc = self.client['Network_Vlan_Firewall']
        return svc.getRules(id=firewall_id, mask=RULE_MASK)

    def edit_dedicated_fwl_rules(self, firewall_id, rules):
        """Edit the rules for dedicated firewall.

        :param integer firewall_id: the instance ID of the dedicated firewall
        :param list rules: the rules to be pushed on the firewall as defined by
                           SoftLayer_Network_Firewall_Update_Request_Rule
        """

        mask = ('mask[networkVlan[firewallInterfaces'
                '[firewallContextAccessControlLists]]]')
        svc = self.client['Network_Vlan_Firewall']
        fwl = svc.getObject(id=firewall_id, mask=mask)
        network_vlan = fwl['networkVlan']

        for fwl1 in network_vlan['firewallInterfaces']:
            if fwl1['name'] == 'inside':
                continue

            for control_list in fwl1['firewallContextAccessControlLists']:
                if control_list['direction'] == 'out':
                    continue
                fwl_ctx_acl_id = control_list['id']

        template = {'firewallContextAccessControlListId': fwl_ctx_acl_id,
                    'rules': rules}

        svc = self.client['Network_Firewall_Update_Request']
        return svc.createObject(template)

    def edit_standard_fwl_rules(self, firewall_id, rules):
        """Edit the rules for standard firewall.

        :param integer firewall_id: the instance ID of the standard firewall
        :param list rules: the rules to be pushed on the firewall as defined by
                           SoftLayer_Network_Firewall_Update_Request_Rule
        """

        rule_svc = self.client['Network_Firewall_Update_Request']
        template = {'networkComponentFirewallId': firewall_id, 'rules': rules}

        return rule_svc.createObject(template)

    def get_instance(self, firewall_id, mask=None):
        """Get the firewall information

        :param integer firewall_id: the instance ID of the standard firewall
        """
        if not mask:
            mask = 'mask[firewallType,datacenter,managementCredentials,networkVlan,' \
                   'metricTrackingObject[data,type],networkGateway[insideVlans,members,privateIpAddress,' \
                   'publicIpAddress,publicIpv6Address,privateVlan,publicVlan,status]]'

        svc = self.client['Network_Vlan_Firewall']

        return svc.getObject(id=firewall_id, mask=mask)

    def get_firewalls_gatewalls(self):
        """Returns a list of all gateway firewalls (gatewalls) on the account.

        returns: A list of gateway firewalls (gatewalls) on the current account.
        """
        mask = 'mask[id,networkSpace,name,' \
               'networkFirewall[id,firewallType,datacenter[name]],' \
               'status[keyName],' \
               'insideVlans[id],' \
               'privateIpAddress[ipAddress],' \
               'publicVlan[id,primaryRouter[hostname]],' \
               'publicIpAddress[ipAddress],members[id,hardware[hostname]]]'
        _filter = {"networkGateways": {"networkFirewall": {"operation": "not null"}}}

        return self.account.getNetworkGateways(mask=mask, filter=_filter)

    def get_summary(self, identifier, start_date, end_date):
        """Returns the metric data for the date range provided

        :param integer metric_tracking_id
        """
        body = [{
            "keyName": "PUBLICIN",
            "summaryType": "sum"

        }, {
            "keyName": "PUBLICOUT",
            "summaryType": "sum"
        }]

        return self.client['Metric_Tracking_Object'].getSummaryData(start_date,
                                                                    end_date, body, 86400, id=identifier)