-
Notifications
You must be signed in to change notification settings - Fork 7
/
switch.py
229 lines (191 loc) · 7.79 KB
/
switch.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
import netaddr
import logging
import time
import datetime
from threading import Thread
from ryu.topology import switches
from ryu.topology.switches import Port as Port_type
from ryu.lib.dpid import dpid_to_str
from ryu.lib.port_no import port_no_to_str
from ryu.lib import hub
from ryu.ofproto.ofproto_v1_0_parser import OFPPhyPort
from ryu.lib import ofctl_v1_0
from rip import RIPRoutingTable as RoutingTable
from rip import RIPRoutingEntry as RoutingEntry
from gateway import Gateway
logger = logging.getLogger(__name__)
FLOW_IDLE_TIMEOUT = 60
FLOW_HARD_TIMEOUT = 600
class Switch(switches.Switch):
def __init__(self, dp, s):
super(Switch, self).__init__(dp)
self.name = None
# switches[dpid] = Switch
# reference to routing.py
self.switches = s
# neigbors[Switch] = port_no
self.neighbors = {}
# ports[port_no] = Port
# overshadow super.ports
self.ports = {}
# ARP table
# ip_to_mac[ip_addr] = (mac_addr, time_stamp)
self.ip_to_mac = {}
# temporarily store packets without ARP entry
self.msg_buffer = []
self.queue = hub.Queue()
self.tbl = RoutingTable(self.dp.id)
self.init_thread()
def init_thread(self):
"""
create a thread for routing table advertising.
"""
logger.info('broadcast thread start with interval %ds (dpid=%s)', self.tbl.advertise_interval, dpid_to_str(self.dp.id))
broadcast_thread = Thread(target=self.broadcast_thread)
broadcast_thread.setDaemon(True)
broadcast_thread.start()
def broadcast_thread(self):
"""
infinite loop to advertise the routing table to
all the neighbors. trigger neighbor swtich to update
routing information instantly.
"""
while True:
try:
logger.info('broadcast routing table (dpid=%s)', dpid_to_str(self.dp.id))
for port_no, port in self.ports.items():
if port.neighbor_switch_dpid:
self.switches[port.neighbor_switch_dpid].add_to_queue((port, self.tbl))
self.switches[port.neighbor_switch_dpid].trigger_update()
time.sleep(self.tbl.advertise_interval)
except:
logger.info('broadcast thread of dpid=%s is killed', dpid_to_str(self.dp.id))
break
def process_queued_msg(self):
"""
try to process all the queued routing information.
"""
try:
while not self.queue.empty():
port, tbl = self.queue.get()
reveived_port = self.switches[port.neighbor_switch_dpid].ports[port.neighbor_port_no]
self.tbl.update_by_neighbor(reveived_port, port, tbl)
self.deploy_routing_table()
except:
pass
def add_to_queue(self, msg):
"""
a interface to add a object into queue.
"""
if not self.queue.full():
self.queue.put(msg)
def trigger_update(self):
"""
create a thread to update the routing table.
"""
update_thread = Thread(target=self.process_queued_msg)
update_thread.setDaemon(True)
update_thread.start()
def get_arp_list(self):
"""
return ARP table as a list of dictionary.
"""
arp_list = []
for ip, value in self.ip_to_mac.items():
arp_list.append({'ip': str(ip),
'hw_addr': str(value[0]),
'last_update': datetime.datetime.fromtimestamp(value[1]).strftime('%Y-%m-%d %H:%M:%S')})
return arp_list
def get_routing_table(self):
"""
return routing table as a list of dictionary.
"""
routing_tbl = []
for subnet, entry in self.tbl.items():
d = entry.to_dict()
d['subnet'] = str(subnet)
routing_tbl.append(d)
return routing_tbl
def deploy_routing_table(self):
"""
deploy all the routing entry in the routing table.
"""
for subnet, entry in self.tbl.items():
if entry.neighbor_port:
self.deploy_flow_entry(subnet=subnet, outport=entry.receive_port, dstport=entry.neighbor_port)
def deploy_flow_entry(self, subnet, outport, dstport):
"""
translate the routing information into flow entry format
and send FlowMod.
"""
if outport is None:
logger.warning('fail to deploy flow entry, cant find output port for %s', str(subnet))
return
# match by destination IP address
match = ofctl_v1_0.to_match(self.dp, {'nw_dst': str(subnet), 'dl_type': '2048', 'nw_proto': '1'})
# rewrite source MAC address with gateway's MAC address
# rewrite destination MAC address with host's MAC address
# set output port
actions = []
actions.append(self.dp.ofproto_parser.OFPActionSetDlSrc(outport.hw_addr.packed))
actions.append(self.dp.ofproto_parser.OFPActionSetDlDst(dstport.hw_addr.packed))
actions.append(self.dp.ofproto_parser.OFPActionOutput(outport.port_no))
mod = self.dp.ofproto_parser.OFPFlowMod(
datapath = self.dp, match = match,
priority = 1, cookie = 0, actions = actions,
idle_timeout = FLOW_IDLE_TIMEOUT,
hard_timeout = FLOW_HARD_TIMEOUT,
command = self.dp.ofproto.OFPFC_MODIFY)
# send FlowMod
self.dp.send_msg(mod)
def find_outport_by_subnet(self, subnet):
"""
return port_no by subent.
"""
for port_no, port in self.ports.items():
if port.gateway and port.gateway.ipv4_subnet == subnet:
return port_no
return None
def find_outport_by_ip(self, dst_ip):
"""
return port_no by match the destination IP address with
the subnets.
"""
for port_no, port in self.ports.items():
if port.gateway and dst_ip in port.gateway.ipv4_subnet:
return port_no
return None
def update_gateway_with_prefixlen(self, ipv4='', ipv4_prefixlen=0,
ipv6='', ipv6_prefixlen=0, port_no=''):
"""
update the gateway information for Port object.
"""
port = self.ports[port_no]
if port.gateway is None:
port.gateway = Gateway(name=port.name, port_no=port.port_no,
ipv4=ipv4, ipv4_prefixlen=ipv4_prefixlen,
ipv6=ipv6, ipv6_prefixlen=ipv6_prefixlen)
else:
port.gateway.name = port.name
port.gateway.ipv4 = netaddr.IPAddress(ipv4)
port.gateway.ipv4_subnet = netaddr.IPNetwork(ipv4 + '/' + str(ipv4_prefixlen))
port.gateway.ipv6 = netaddr.IPAddress(ipv6)
port.gateway.ipv6_subnet = netaddr.IPNetwork(ipv6 + '/' + str(ipv6_prefixlen))
port.gateway.port_no = port.port_no
self.tbl.update_entry(subnet=port.gateway.ipv4_subnet, receive_port=port, metric=0, source="CONNECTED")
def to_dict(self):
return {'dpid': dpid_to_str(self.dp.id),
'name': self.name,
'neighbors': [dpid_to_str(switch.dp.id) for switch in self.neighbors],
'ports': [port.to_dict() for (port_no, port) in self.ports.items()],
'arp_table': self.get_arp_list(),
'routing_table': self.get_routing_table()}
def __eq__(self, s):
try:
if self.dp.id == s.dp.id:
return True
except:
return False
return False
def __str__(self):
return '<Switch: %s>' % self.name