Skip to content

Commit ddf3cbd

Browse files
authored
Fix for issue 685 (#690)
* Fix the Python code to properly close the dispatcher. Signed-off-by: Dscano <d.scano89@gmail.com> * Fix flow rule deletion no GRPC Error Signed-off-by: Dscano <d.scano89@gmail.com> * Added a notification database to prevent gRPC errors caused by multiple deletions Signed-off-by: Dscano <d.scano89@gmail.com> * Address Andy's comments Signed-off-by: Dscano <d.scano89@gmail.com> --------- Signed-off-by: Dscano <d.scano89@gmail.com>
1 parent e8456d2 commit ddf3cbd

4 files changed

Lines changed: 285 additions & 41 deletions

File tree

exercises/flowcache/README.md

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -165,6 +165,13 @@ You might notice that the rules that are printed by `mycontroller.py` contain th
165165
IDs rather than the table names. You can use the P4Info helper to translate these IDs
166166
into entry names.
167167

168+
The current mechanism for triggering the deletion of a stale flow rule can be improved.
169+
As shown in the code of `mycontroller.py`, a notification database is maintained to track
170+
the flow rules. Thanks to this, the deletion of a single flow rule is performed only once.
171+
However, there is a need for a mechanism to ensure that the deletion is triggered only once.
172+
Without this safeguard, errors may occur when attempting to delete the same flow rule multiple
173+
times or when trying to delete a flow rule that no longer exists. To address this issue,
174+
you can provide and experiment with different approaches.
168175

169176
If you are interested, you can find the protocol buffer and gRPC definitions here:
170177
- [P4Runtime](https://github.com/p4lang/p4runtime/blob/main/proto/p4/v1/p4runtime.proto)

exercises/flowcache/mycontroller.py

Lines changed: 115 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -6,8 +6,11 @@
66
import asyncio
77
import traceback
88
import time
9+
import ipaddress
10+
import pprint
911

1012
from collections import Counter
13+
from datetime import datetime, timedelta
1114
from scapy.all import *
1215

1316
import grpc
@@ -34,8 +37,11 @@
3437
global_data["10.0.2.2"] = "08:00:00:00:02:22"
3538
global_data["10.0.3.3"] = "08:00:00:00:03:33"
3639

40+
## The notification database keeps track of the received idle notifications and triggers the deletion of stale flow rules.
41+
notif_db = {}
42+
3743
# The lookup table is defined to simplify reachability and provide connectivity
38-
# among hosts. In a real-world scenario, however, you should use an algorithm
44+
# among hosts. In a real-world scenario, however, you should use an algorithm
3945
# to solve this problem more effectively.
4046

4147
lookup_table = {
@@ -67,6 +73,23 @@ def ipv4ToInt(addr):
6773
# to add a separate check for that here.
6874
return int.from_bytes(bytes(bytes_), byteorder='big')
6975

76+
def intToIpv4(n):
77+
"""Take an argument 'n' containing a 32-bit IPv4 address as an
78+
integer in the range [0, 2^32-1], and return a string in dotted
79+
decimal notation."""
80+
return "%d.%d.%d.%d" % ((n >> 24) & 0xff,
81+
(n >> 16) & 0xff,
82+
(n >> 8) & 0xff,
83+
n & 0xff)
84+
85+
def flowCacheEntryToDebugStr(table_entry, include_action=False):
86+
# TODO: The match fields are hardcoded to specific indices to retrieve specific parameters, such as hdr.ipv4.srcAddr and its value.
87+
src_ip = intToIpv4(int.from_bytes(table_entry.match[1].exact.value, byteorder='big'))
88+
dst_ip = intToIpv4(int.from_bytes(table_entry.match[2].exact.value, byteorder='big'))
89+
proto = int.from_bytes(table_entry.match[0].exact.value, byteorder='big')
90+
return ("(SA=%s, DA=%s, proto=%d)"
91+
"" % (src_ip, dst_ip, proto))
92+
7093
def decodePacketInMetadata(pktin_info, packet):
7194
pktin_field_to_val = {}
7295
for md in packet.metadata:
@@ -119,6 +142,7 @@ def controllerPacketMetadataDictKeyId(p4info_obj_map, name):
119142
ret[md.id] = {'id': md.id, 'name': md.name, 'bitwidth': md.bitwidth}
120143
return ret
121144

145+
122146
def makeP4infoObjMap(p4info_data):
123147
p4info_obj_map = {}
124148
suffix_count = Counter()
@@ -178,11 +202,59 @@ def addFlowRule( ingress_sw, src_ip_addr, dst_ip_addr, protocol, port, new_dscp,
178202
# TODO: Add idle timeout
179203
)
180204
ingress_sw.WriteTableEntry(table_entry)
181-
print("Installed ingress rule on %s" % ingress_sw.name)
182205

183-
def deleteFlowRule(notif):
184-
notif["sw"].DeleteTableEntry(notif["idle"].table_entry[0])
185-
print("Deleted ingress rule on %s" % notif["sw"].name)
206+
def createFlowRule(notif):
207+
# TODO: This function generates a flow entry to trigger deletion.
208+
# The match fields are populated using values retrieved from the IDLE notification.
209+
# Hardcoded values are configured to extract specific parameters, such as hdr.ipv4.protocol, from the IDLE notification.
210+
table_entry = global_data['p4info_helper'].buildTableEntry(
211+
table_name="MyIngress.flow_cache",
212+
match_fields={
213+
"hdr.ipv4.protocol": int.from_bytes(notif["idle"].table_entry[0].match[0].exact.value,byteorder='big'),
214+
"hdr.ipv4.srcAddr": int(ipaddress.IPv4Address(notif["idle"].table_entry[0].match[1].exact.value)),
215+
"hdr.ipv4.dstAddr": int(ipaddress.IPv4Address(notif["idle"].table_entry[0].match[2].exact.value))
216+
},
217+
)
218+
return table_entry
219+
220+
def deleteFlowRule(sw, table_entry):
221+
sw.DeleteTableEntry(table_entry)
222+
print("Deleted flow_cache entry on %s. %s"
223+
"" % (sw.name, flowCacheEntryToDebugStr(table_entry)))
224+
225+
def addNotification(sw_name, flow_rule):
226+
# Add notification to notification DB
227+
notification = {
228+
"timestamp": datetime.now(),
229+
"flow_rule": flow_rule,
230+
}
231+
notif_db[sw_name].append(notification)
232+
233+
def checkFlowRule(sw_name, flow_rule):
234+
# Checks if a flow rule is already in the notification DB
235+
# to avoid storing multiple notifications for the same flow rule
236+
if sw_name not in notif_db:
237+
return False
238+
239+
for notif in notif_db[sw_name]:
240+
if notif["flow_rule"] == flow_rule:
241+
return True
242+
243+
return False
244+
245+
def isExpired(timestamp, timeout):
246+
return datetime.now() - timestamp > timedelta(seconds=timeout)
247+
248+
def cleanExpiredNotifiction(sw_name, timeout=5):
249+
# Removes expired notifications
250+
if sw_name not in notif_db:
251+
return False
252+
# Filter the notifications to remove expired ones
253+
notif_db[sw_name] = [
254+
notif for notif in notif_db[sw_name]
255+
if not isExpired(notif["timestamp"], timeout)
256+
]
257+
return True
186258

187259
def packetOutMetadataList(opcode, reserved1, operand0):
188260
# This function does not use the generated contents of the P4Info
@@ -195,7 +267,6 @@ def packetOutMetadataList(opcode, reserved1, operand0):
195267

196268
def sendPacketOut(sw ,payload, metadatas):
197269
# TODO: Implement the function logic to send a packet-out message
198-
sw.PacketOut(payload, metadatas)
199270

200271
def readTableRules(p4info_helper, sw):
201272
"""
@@ -295,30 +366,54 @@ def processPacket(message):
295366
decrement_ttl_bool,
296367
dst_eth_addr)
297368

298-
print("For flow (SA=%s, DA=%s, proto=%d)"
369+
print("For switch %s flow (SA=%s, DA=%s, proto=%d)"
299370
" added table entry to send packets"
300371
" to port %d with new DSCP %d"
301-
"" % (ip_sa_str, ip_da_str, ip_proto,
302-
dest_port_int, new_dscp_int))
372+
"" % (message["sw"].name, ip_sa_str, ip_da_str,
373+
ip_proto, dest_port_int, new_dscp_int))
303374

304375
async def processNotif(notif_queue):
305376
while True:
306377
notif = await notif_queue.get()
307-
print(notif)
378+
debug_notif = False
379+
if debug_notif:
380+
print(notif)
381+
pprint.pprint(notif_db)
308382
if notif["type"] == "packet-in":
309383
processPacket(notif)
310384
printCounter(global_data ['p4info_helper'], notif["sw"], 'MyIngress.ingressPktOutCounter', global_data ['index'])
311385
printCounter(global_data ['p4info_helper'], notif["sw"], 'MyEgress.egressPktInCounter', global_data ['index'])
312-
readTableRules(global_data ['p4info_helper'], notif["sw"])
386+
if debug_notif:
387+
readTableRules(global_data ['p4info_helper'], notif["sw"])
313388
elif notif["type"] == "idle-notif":
314-
deleteFlowRule(notif)
315-
389+
# TODO: For extra credit, you can experiment with adjusting the stale time for notifications (e.g., 10 seconds)
390+
# and optimize the behavior of the notification database (notif_db).
391+
if notif["sw"].name not in notif_db:
392+
notif_db[notif["sw"].name] = []
393+
else:
394+
# Check if a notification is older than 10 seconds
395+
cleanExpiredNotifiction(notif["sw"].name, 10)
396+
397+
table_entry = createFlowRule(notif)
398+
399+
if not checkFlowRule(notif["sw"].name, table_entry):
400+
addNotification(notif["sw"].name, table_entry)
401+
deleteFlowRule(notif["sw"], table_entry)
402+
else:
403+
print("Received idle timeout notification for switch=%s %s"
404+
" It is duplicate of recently processed notification, so ignoring it."
405+
"" % (notif["sw"].name,
406+
flowCacheEntryToDebugStr(table_entry)))
316407
notif_queue.task_done()
317408

318409
async def packetInHandler(notif_queue,sw):
319410
# TODO: Implement the function logic to handle a packet-in message
320411
while True:
321412
try:
413+
packet_in = await asyncio.to_thread(sw.PacketIn)
414+
#print(f"Received packet: {packet_in}")
415+
message = {"type": "packet-in", "sw": sw, "packet-in": packet_in}
416+
await notif_queue.put(message)
322417

323418
except grpc.RpcError as e:
324419
print(f"[gRPC Error in packetInHandler for {sw.name}]")
@@ -336,7 +431,10 @@ async def packetInHandler(notif_queue,sw):
336431
async def idleTimeHandler(notif_queue,sw):
337432
# TODO: Implement the function logic to handle idle timeout notification
338433
while True:
339-
434+
idle_notif = await asyncio.to_thread(sw.IdleTimeoutNotification)
435+
message = {"type": "idle-notif", "sw": sw, "idle": idle_notif}
436+
await notif_queue.put(message)
437+
await asyncio.sleep(5)
340438

341439
def printGrpcError(e):
342440
print("gRPC Error:", e.details(), end=' ')
@@ -426,7 +524,9 @@ async def main(p4info_file_path, bmv2_file_path):
426524
except KeyboardInterrupt:
427525
print(" Shutting down.")
428526
except grpc.RpcError as e:
429-
printGrpcError(e)
527+
print(f"gRPC error occurred: {e}")
528+
print(f"Status code: {e.code()}") # e.g., StatusCode.UNAVAILABLE or StatusCode.INVALID_ARGUMENT
529+
print(f"Details: {e.details()}")
430530

431531
ShutdownAllSwitchConnections()
432532

0 commit comments

Comments
 (0)