-
Notifications
You must be signed in to change notification settings - Fork 0
/
sgraph.py
executable file
·168 lines (130 loc) · 6.24 KB
/
sgraph.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
#! env python3
from collections import defaultdict
from collections import deque
class SGraph:
"""Compute path information for directed graphs with non-negative edge costs."""
class NoSuchRoute(Exception):
"""Exception thrown by SGraph in route_distance"""
pass
# use Johnson's reweighting if we discover wormholes and have negative distances,
# else: Floyd–Warshall or Bellman–Ford depending on dist. of graphs
def __init__(self, graph):
"""Make an SGraph.
graph - an iterable of edges/arcs with costs as tuples of (src, dest, cost)
[('A', 'B', 5)] => graph with one edge from node 'A' to node 'B' of cost 5
NOTE: edge costs must be non-negative
"""
self.V = set() # all vertices
# optimize: use arrays and map idx <-> names, should we get much larger graphs
# TODO pre-compute/cache all-pairs shortest paths? [dist for src == dest > 0]
# self.A = defaultdict(dict) # shortest-paths, lookup A[from][to] => dist
self.G = defaultdict(dict) # the graph, lookup G[from][to] to get direct distance (check for existence!)
# makes cases 1-5 quick & easy, but memory hog for much larger graphs
for edge in graph:
src, dest, cost = edge
self.V.add(src)
self.V.add(dest)
self.G[src][dest] = cost
def route_distance(self, route):
"""Compute the distance along a given route.
route - a list of nodes as strings, e.g. ['A', 'B', 'C']
"""
dist = 0
src = route[0]
if src not in self.G:
# don't return two diff types/meanings, throw exception instead. same below
# TODO best impl?
raise SGraph.NoSuchRoute('NO SUCH ROUTE')
for city in route[1:]:
if city not in self.G[src]:
raise SGraph.NoSuchRoute('NO SUCH ROUTE')
dist += self.G[src][city]
src = city
return dist
def routes_with_criteria(self, src, target, criteria):
"""Return a list of routes that start at src, end at target, and meet the criteria.
criteria - a callable returning True/False, accepting two args: stops, distance
stops - number of edges from src node to target node, e.g. 'ABC', if valid, is two stops
distance - sum of edge costs from src node to target node, e.g. 'AB' in graph 'AB5' is 5
Nodes which fail the criteria will not be part of a particular path.
"""
# BFS
routes = []
q = deque() # <- [ ... ] <-
stops = 0
distance = 0 # not true for this app, but it works out in the conditional check
q.append((src, stops, distance, [src]))
while q:
# this city, stops to this city, distance to this city, route to this city
city, stops, distance, route = q.popleft()
if target == city and distance: # no self-loops!
r = list(route)
routes.append(r)
for dest, cost in self.G[city].items():
if criteria(stops + 1, distance + cost):
new_route = list(route)
new_route.append(dest)
q.append((dest, stops + 1, distance + cost, new_route))
return routes
def count_routes_max_distance(self, src, dest, max_distance):
"""Return the number of routes starting at src, ending at dest, with distance < max_distance.
Convenience wrapper for routes_with_criteria.
src - source node
dest - destination node
max_distance - the maximum sum of edge costs; the criteria is 'strictly less than'
"""
criteria = lambda stops, distance: distance < max_distance # inconsistent max, per test cases
return len(self.routes_with_criteria(src, dest, criteria))
def count_routes_max_stops(self, src, dest, max_stops):
"""Return the number of routes starting at src, ending at dest, with number of stops <= max_stops.
Convenience wrapper for routes_with_criteria.
src - source node
dest - destination node
max_stops - the maximum number of stops; the criteria is 'less than or equal to'
Example: A three node route "ABC' has two stops.
"""
criteria = lambda stops, distance: stops <= max_stops # inconsistent max, per test cases
return len(self.routes_with_criteria(src, dest, criteria))
def count_routes_exact_stops(self, src, dest, num_stops):
"""Return the number of routes starting at src, ending at dest, with number of stops == max_stops.
Convenience wrapper for routes_with_criteria.
src - source node
dest - destination node
num_stops - the exact number of stops; the criteria is 'equal to'
Example: A three node route "ABC' has two stops.
"""
count = 0
criteria = lambda stops, distance: stops <= num_stops # inconsistent max, per test cases
for route in self.routes_with_criteria(src, dest, criteria):
if num_stops == len(route) - 1:
count += 1
return count
def shortest_route(self, src, dest):
"""Return the smallest sum of edge costs starting at src, ending at dest (shortest path).
src - source node
dest - destination node
"""
# Dijkstra with unusual start condition to prevent src -> src == 0 distance
x_in = set()
a = defaultdict(lambda: float('inf'))
v = self.V.copy()
for node, cost in self.G[src].items():
a[node] = cost
x_in.add(node)
v.remove(node)
while x_in != self.V:
mn = float('inf')
new = None
for x in x_in:
for node, cost in self.G[x].items():
if node in v:
if (a[x] + cost) < mn: # optimize large/dense G with pri. q
mn = a[x] + cost
new = (x, node, cost)
if new is None:
break
x, node, cost = new
x_in.add(node)
v.remove(node)
a[node] = a[x] + cost
return a[dest]