1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136
|
#!/usr/bin/env python
# encoding: utf-8
"""
flow.py
Created by Thomas Mangin on 2010-01-14.
Copyright (c) 2009-2015 Exa Networks. All rights reserved.
License: 3-clause BSD. (See the COPYRIGHT file)
"""
import socket
import unittest
from exabgp.util import character
from exabgp.util import ordinal
from exabgp.util import concat_strs
from exabgp.bgp.message.update.nlri import Flow
from exabgp.bgp.message.update.nlri.flow import Flow4Source
from exabgp.bgp.message.update.nlri.flow import Flow4Destination
from exabgp.bgp.message.update.nlri.flow import FlowAnyPort
from exabgp.bgp.message.update.nlri.flow import NumericOperator
# from exabgp.bgp.message.update.attribute.community import *
from exabgp.protocol.ip import IPv4
from exabgp.util.test import data_from_body
from exabgp.configuration.environment import environment
environment.setup('')
class TestFlow(unittest.TestCase):
def setUp(self):
pass
def test_rule(self):
components = {
'destination': Flow4Destination(IPv4.pton("192.0.2.0"), 24),
'source': Flow4Source(IPv4.pton("10.1.2.0"), 24),
'anyport_1': FlowAnyPort(NumericOperator.EQ, 25),
}
messages = {
'destination': [0x01, 0x18, 0xC0, 0x00, 0x02],
'source': [0x02, 0x18, 0x0A, 0x01, 0x02],
'anyport_1': [0x04, 0x01, 0x19],
}
for key in ['destination', 'source', 'anyport_1']:
component = components[key].pack()
message = data_from_body(messages[key])
# if component != message:
# self.fail('content mismatch\n%s\n%s' % (['0x%02X' % ord(_) for _ in component],['0x%02X' % ord(_) for _ in message]))
def test_rule_and(self):
components = {
'destination': Flow4Destination(IPv4.pton("192.0.2.0"), 24),
'source': Flow4Source(IPv4.pton("10.1.2.0"), 24),
'anyport_1': FlowAnyPort(NumericOperator.EQ | NumericOperator.GT, 25),
'anyport_2': FlowAnyPort(NumericOperator.EQ | NumericOperator.LT, 80),
}
messages = {
'destination': [0x01, 0x18, 0xC0, 0x00, 0x02],
'source': [0x02, 0x18, 0x0A, 0x01, 0x02],
'anyport_1': [0x04, 0x43, 0x19],
'anyport_2': [0x04, 0x85, 0x50],
}
flow = Flow()
message = b""
for key in ['destination', 'source', 'anyport_1', 'anyport_2']:
flow.add(components[key])
message += data_from_body(messages[key])
message = character(len(message)) + message
# flow.add(to_FlowAction(65000,False,False))
flow.pack()
# print [hex(ord(_)) for _ in flow]
def test_nlri(self):
components = {
'destination': Flow4Destination(IPv4.pton("192.0.2.0"), 24),
'source': Flow4Source(IPv4.pton("10.1.2.0"), 24),
'anyport_1': FlowAnyPort(NumericOperator.EQ | NumericOperator.GT, 25),
'anyport_2': FlowAnyPort(NumericOperator.EQ | NumericOperator.LT, 80),
}
messages = {
'destination': [0x01, 0x18, 0xC0, 0x00, 0x02],
'source': [0x02, 0x18, 0x0A, 0x01, 0x02],
'anyport_1': [0x04, 0x43, 0x19],
'anyport_2': [0x85, 0x50],
}
flow = Flow()
message = b""
for key in ['destination', 'source', 'anyport_1', 'anyport_2']:
flow.add(components[key])
message += data_from_body(messages[key])
message = character(len(message)) + message
# policy.add(to_FlowAction(65000,False,False))
flow = flow.pack()
if message[0] != flow[0]:
self.fail('size mismatch %s %s\n' % (ordinal(flow[0]), ordinal(message[0])))
if len(flow) != ordinal(flow[0]) + 1:
self.fail('invalid size for message')
# if message[1:] != flow[1:]:
# self.fail('content mismatch\n%s\n%s' % (['0x%02X' % ord(_) for _ in flow],['0x%02X' % ord(_) for _ in message]))
def test_compare(self):
components = {
'destination': Flow4Destination(IPv4.pton("192.0.2.0"), 24),
'source': Flow4Source(IPv4.pton("10.1.2.0"), 24),
'anyport_1': FlowAnyPort(NumericOperator.EQ | NumericOperator.GT, 25),
'anyport_2': FlowAnyPort(NumericOperator.EQ | NumericOperator.LT, 80),
'anyport_3': FlowAnyPort(NumericOperator.EQ, 80),
}
flow1 = Flow()
for key in ['destination', 'source', 'anyport_1', 'anyport_2']:
flow1.add(components[key])
flow2 = Flow()
for key in ['destination', 'source', 'anyport_3']:
flow2.add(components[key])
if flow1 != flow1:
self.fail('the flows are the same')
if flow1 == flow2:
self.fail('the flows are not the same')
if __name__ == '__main__':
unittest.main()
|