1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140
|
# -*- coding: utf-8 -*-
"""Maximum flow algorithms test suite on large graphs.
"""
__author__ = """Loïc Séguin-C. <loicseguin@gmail.com>"""
# Copyright (C) 2010 Loïc Séguin-C. <loicseguin@gmail.com>
# All rights reserved.
# BSD license.
import os
from nose.tools import *
import networkx as nx
from networkx.algorithms.flow import build_flow_dict, build_residual_network
from networkx.algorithms.flow import (edmonds_karp, ford_fulkerson,
preflow_push, shortest_augmenting_path)
# Do not use the legacy ford_fulkerson implementation
# for tests on large graphs.
flow_funcs = [edmonds_karp, preflow_push, shortest_augmenting_path]
msg = "Assertion failed in function: {0}"
def gen_pyramid(N):
# This graph admits a flow of value 1 for which every arc is at
# capacity (except the arcs incident to the sink which have
# infinite capacity).
G = nx.DiGraph()
for i in range(N - 1):
cap = 1. / (i + 2)
for j in range(i + 1):
G.add_edge((i, j), (i + 1, j),
capacity = cap)
cap = 1. / (i + 1) - cap
G.add_edge((i, j), (i + 1, j + 1),
capacity = cap)
cap = 1. / (i + 2) - cap
for j in range(N):
G.add_edge((N - 1, j), 't')
return G
def read_graph(name):
dirname = os.path.dirname(__file__)
path = os.path.join(dirname, name + '.gpickle.bz2')
return nx.read_gpickle(path)
def validate_flows(G, s, t, soln_value, R, flow_func):
flow_value = R.graph['flow_value']
flow_dict = build_flow_dict(G, R)
assert_equal(soln_value, flow_value, msg=msg.format(flow_func.__name__))
assert_equal(set(G), set(flow_dict), msg=msg.format(flow_func.__name__))
for u in G:
assert_equal(set(G[u]), set(flow_dict[u]),
msg=msg.format(flow_func.__name__))
excess = dict((u, 0) for u in flow_dict)
for u in flow_dict:
for v, flow in flow_dict[u].items():
ok_(flow <= G[u][v].get('capacity', float('inf')),
msg=msg.format(flow_func.__name__))
ok_(flow >= 0, msg=msg.format(flow_func.__name__))
excess[u] -= flow
excess[v] += flow
for u, exc in excess.items():
if u == s:
assert_equal(exc, -soln_value, msg=msg.format(flow_func.__name__))
elif u == t:
assert_equal(exc, soln_value, msg=msg.format(flow_func.__name__))
else:
assert_equal(exc, 0, msg=msg.format(flow_func.__name__))
class TestMaxflowLargeGraph:
def test_complete_graph(self):
N = 50
G = nx.complete_graph(N)
nx.set_edge_attributes(G, 'capacity', 5)
R = build_residual_network(G, 'capacity')
kwargs = dict(residual=R)
for flow_func in flow_funcs:
kwargs['flow_func'] = flow_func
flow_value = nx.maximum_flow_value(G, 1, 2, **kwargs)
assert_equal(flow_value, 5 * (N - 1),
msg=msg.format(flow_func.__name__))
def test_pyramid(self):
N = 10
#N = 100 # this gives a graph with 5051 nodes
G = gen_pyramid(N)
R = build_residual_network(G, 'capacity')
kwargs = dict(residual=R)
for flow_func in flow_funcs:
kwargs['flow_func'] = flow_func
flow_value = nx.maximum_flow_value(G, (0, 0), 't', **kwargs)
assert_almost_equal(flow_value, 1.,
msg=msg.format(flow_func.__name__))
def test_gl1(self):
G = read_graph('gl1')
s = 1
t = len(G)
R = build_residual_network(G, 'capacity')
kwargs = dict(residual=R)
for flow_func in flow_funcs:
validate_flows(G, s, t, 156545, flow_func(G, s, t, **kwargs),
flow_func)
def test_gw1(self):
G = read_graph('gw1')
s = 1
t = len(G)
R = build_residual_network(G, 'capacity')
kwargs = dict(residual=R)
for flow_func in flow_funcs:
validate_flows(G, s, t, 1202018, flow_func(G, s, t, **kwargs),
flow_func)
def test_wlm3(self):
G = read_graph('wlm3')
s = 1
t = len(G)
R = build_residual_network(G, 'capacity')
kwargs = dict(residual=R)
for flow_func in flow_funcs:
validate_flows(G, s, t, 11875108, flow_func(G, s, t, **kwargs),
flow_func)
def test_preflow_push_global_relabel(self):
G = read_graph('gw1')
R = preflow_push(G, 1, len(G), global_relabel_freq=50)
assert_equal(R.graph['flow_value'], 1202018)
|