Skip to content

Commit 572ece7

Browse files
authored
Add StreamDispatcher (#684)
* Reworked the async code and addressed Andy comments Signed-off-by: Dscano <d.scano89@gmail.com> * Fix streamChannel, messages received on dedicated queues Signed-off-by: Dscano <d.scano89@gmail.com> --------- Signed-off-by: Dscano <d.scano89@gmail.com>
1 parent fd0cc21 commit 572ece7

5 files changed

Lines changed: 127 additions & 59 deletions

File tree

exercises/flowcache/flowcache.p4

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -245,7 +245,7 @@ control MyIngress(inout headers_t hdr,
245245
} else {
246246
// This is a toy demo. It drops all packets that are not
247247
// IPv4, nor PacketOut packets from the controller.
248-
// dropped because they are not IPv4.
248+
drop_packet();
249249
}
250250
}
251251
}

exercises/flowcache/mycontroller.py

Lines changed: 39 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -5,8 +5,8 @@
55
import sys
66
import asyncio
77
import traceback
8+
import time
89

9-
from collections import deque
1010
from collections import Counter
1111
from scapy.all import *
1212

@@ -180,10 +180,24 @@ def addFlowRule( ingress_sw, src_ip_addr, dst_ip_addr, protocol, port, new_dscp,
180180
ingress_sw.WriteTableEntry(table_entry)
181181
print("Installed ingress rule on %s" % ingress_sw.name)
182182

183+
def deleteFlowRule(notif):
184+
notif["sw"].DeleteTableEntry(notif["idle"].table_entry[0])
185+
print("Deleted ingress rule on %s" % notif["sw"].name)
186+
187+
def packetOutMetadataList(opcode, reserved1, operand0):
188+
# This function does not use the generated contents of the P4Info
189+
# file to map PacketOut metadata fields to indices. If you change
190+
# the PacketOut metadata format in the P4 program, this code must
191+
# be manually updated to match.
192+
return [{"value": opcode, "bitwidth": 8},
193+
{"value": reserved1, "bitwidth": 8},
194+
{"value": operand0, "bitwidth": 32}]
195+
183196
def sendPacketOut(sw ,payload, metadatas):
184197
# TODO: Implement the function logic to send a packet-out message
198+
sw.PacketOut(payload, metadatas)
185199

186-
async def readTableRules(p4info_helper, sw):
200+
def readTableRules(p4info_helper, sw):
187201
"""
188202
Reads the table entries from all tables on the switch.
189203
@@ -199,7 +213,7 @@ async def readTableRules(p4info_helper, sw):
199213
print(entry)
200214
print('-----')
201215

202-
async def printCounter(p4info_helper, sw, counter_name, index):
216+
def printCounter(p4info_helper, sw, counter_name, index):
203217
"""
204218
Reads the specified counter at the given index from the switch. In our
205219
program, the index is derived from the first 6 bits of the IP destination address.
@@ -224,17 +238,18 @@ async def printCounter(p4info_helper, sw, counter_name, index):
224238

225239
if e.code() == grpc.StatusCode.UNKNOWN:
226240
print(f"Unknown gRPC error from {sw.name}. Retrying...")
227-
await asyncio.sleep(2)
241+
time.sleep(2)
228242

229243
except Exception as e:
230244
print(f"[Unexpected Error in printCounter for {sw.name}]: {e}")
231245
traceback.print_exc()
232-
await asyncio.sleep(2)
246+
time.sleep(2)
233247

234-
async def processPacket(message):
235-
payload = message["packet-in"].packet.payload
236-
packet = message["packet-in"].packet
237-
print("Received %d PacketIn messages" % (len(payload)))
248+
def processPacket(message):
249+
payload = message["packet-in"].payload
250+
packet = message["packet-in"]
251+
print("Received PacketIn message of length %d bytes from switch %s"
252+
"" % (len(payload), message["sw"].name))
238253
if len(payload) > 0:
239254
i = 0
240255
pkt = Ether(payload)
@@ -267,7 +282,9 @@ async def processPacket(message):
267282
new_dscp_int = 5
268283
global_data['index'] = int(pkt[IP].dst.split('.')[3])
269284
dst_eth_addr = global_data[ip_da_str]
270-
metadatas = [{ "value": 0, "bitwidth": 8 }, { "value": 3, "bitwidth": 32}]
285+
metadatas = packetOutMetadataList(
286+
global_data['controller_opcode_name2int']['SEND_TO_PORT_IN_OPERAND0'],
287+
0, dest_port_int)
271288
sendPacketOut(message["sw"], payload, metadatas)
272289
addFlowRule(message["sw"],
273290
src_ip_addr,
@@ -287,21 +304,21 @@ async def processPacket(message):
287304
async def processNotif(notif_queue):
288305
while True:
289306
notif = await notif_queue.get()
290-
307+
print(notif)
291308
if notif["type"] == "packet-in":
292-
await processPacket(notif)
293-
await printCounter(global_data ['p4info_helper'], notif["sw"], 'MyIngress.ingressPktOutCounter', global_data ['index'])
294-
await printCounter(global_data ['p4info_helper'], notif["sw"], 'MyEgress.egressPktInCounter', global_data ['index'])
295-
await readTableRules(global_data ['p4info_helper'], notif["sw"])
309+
processPacket(notif)
310+
printCounter(global_data ['p4info_helper'], notif["sw"], 'MyIngress.ingressPktOutCounter', global_data ['index'])
311+
printCounter(global_data ['p4info_helper'], notif["sw"], 'MyEgress.egressPktInCounter', global_data ['index'])
312+
readTableRules(global_data ['p4info_helper'], notif["sw"])
296313
elif notif["type"] == "idle-notif":
297-
print(notif["idle"])
314+
deleteFlowRule(notif)
298315

299316
notif_queue.task_done()
300317

301318
async def packetInHandler(notif_queue,sw):
319+
# TODO: Implement the function logic to handle a packet-in message
302320
while True:
303321
try:
304-
# TODO: Implement the function logic to handle a packet-in message
305322

306323
except grpc.RpcError as e:
307324
print(f"[gRPC Error in packetInHandler for {sw.name}]")
@@ -318,6 +335,8 @@ async def packetInHandler(notif_queue,sw):
318335

319336
async def idleTimeHandler(notif_queue,sw):
320337
# TODO: Implement the function logic to handle idle timeout notification
338+
while True:
339+
321340

322341
def printGrpcError(e):
323342
print("gRPC Error:", e.details(), end=' ')
@@ -375,6 +394,8 @@ async def main(p4info_file_path, bmv2_file_path):
375394

376395
global_data['punt_reason_name2int'], global_data['punt_reason_int2name'] = \
377396
serializableEnumDict(p4info_helper.p4info, 'PuntReason_t')
397+
global_data['controller_opcode_name2int'], global_data['controller_opcode_int2name'] = \
398+
serializableEnumDict(p4info_helper.p4info, 'ControllerOpcode_t')
378399

379400
try:
380401
replicas = [{ "egress_port": global_data['CPU_PORT'], "instance": 1 }]
@@ -427,4 +448,4 @@ async def main(p4info_file_path, bmv2_file_path):
427448
parser.print_help()
428449
print("\nBMv2 JSON file not found: %s\nHave you run 'make'?" % args.bmv2_json)
429450
parser.exit(1)
430-
asyncio.run(main(args.p4info, args.bmv2_json))
451+
asyncio.run(main(args.p4info, args.bmv2_json))

exercises/flowcache/solution/flowcache.p4

Lines changed: 4 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -81,15 +81,15 @@ enum bit<8> PuntReason_t {
8181

8282
@controller_header("packet_out")
8383
header packet_out_header_h {
84-
//TODO remove for exercise
84+
/* TODO: Add packet-out fields */
8585
ControllerOpcode_t opcode;
8686
bit<8> reserved1;
8787
bit<32> operand0;
8888
}
8989

9090
@controller_header("packet_in")
9191
header packet_in_header_h {
92-
//TODO remove for exercise
92+
/* TODO: Add packet-in fields */
9393
PortIdToController_t input_port;
9494
PuntReason_t punt_reason;
9595
ControllerOpcode_t opcode;
@@ -225,7 +225,8 @@ control MyIngress(inout headers_t hdr,
225225
drop_packet;
226226
flow_unknown;
227227
}
228-
support_timeout = true; //TODO remove for exercise
228+
/* TODO: Add support timeout */
229+
support_timeout = true;
229230
default_action = flow_unknown();
230231
size = 65536;
231232
}
@@ -251,8 +252,6 @@ control MyIngress(inout headers_t hdr,
251252
} else {
252253
// This is a toy demo. It drops all packets that are not
253254
// IPv4, nor PacketOut packets from the controller.
254-
// TODO: Update per-input-port packet count for packets
255-
// dropped because they are not IPv4.
256255
drop_packet();
257256
}
258257
}

exercises/flowcache/solution/mycontroller.py

Lines changed: 30 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -5,8 +5,8 @@
55
import sys
66
import asyncio
77
import traceback
8+
import time
89

9-
from collections import deque
1010
from collections import Counter
1111
from scapy.all import *
1212

@@ -175,13 +175,17 @@ def addFlowRule( ingress_sw, src_ip_addr, dst_ip_addr, protocol, port, new_dscp,
175175
"new_dscp": new_dscp,
176176
"dst_eth_addr": dst_eth_addr
177177
},
178-
#TODO remove for the exercise
178+
# TODO: Add idle timeout
179179
idle_timeout_ns = 3 * NSEC_PER_SEC
180180
)
181181
ingress_sw.WriteTableEntry(table_entry)
182182
print("Installed ingress rule on %s" % ingress_sw.name)
183183

184-
def PacketOutMetadataList(opcode, reserved1, operand0):
184+
def deleteFlowRule(notif):
185+
notif["sw"].DeleteTableEntry(notif["idle"].table_entry[0])
186+
print("Deleted ingress rule on %s" % notif["sw"].name)
187+
188+
def packetOutMetadataList(opcode, reserved1, operand0):
185189
# This function does not use the generated contents of the P4Info
186190
# file to map PacketOut metadata fields to indices. If you change
187191
# the PacketOut metadata format in the P4 program, this code must
@@ -191,10 +195,10 @@ def PacketOutMetadataList(opcode, reserved1, operand0):
191195
{"value": operand0, "bitwidth": 32}]
192196

193197
def sendPacketOut(sw ,payload, metadatas):
194-
#TODO remove for exercise
198+
# TODO: Implement the function logic to send a packet-out message
195199
sw.PacketOut(payload, metadatas)
196200

197-
async def readTableRules(p4info_helper, sw):
201+
def readTableRules(p4info_helper, sw):
198202
"""
199203
Reads the table entries from all tables on the switch.
200204
@@ -210,7 +214,7 @@ async def readTableRules(p4info_helper, sw):
210214
print(entry)
211215
print('-----')
212216

213-
async def printCounter(p4info_helper, sw, counter_name, index):
217+
def printCounter(p4info_helper, sw, counter_name, index):
214218
"""
215219
Reads the specified counter at the given index from the switch. In our
216220
program, the index is derived from the first 6 bits of the IP destination address.
@@ -235,16 +239,16 @@ async def printCounter(p4info_helper, sw, counter_name, index):
235239

236240
if e.code() == grpc.StatusCode.UNKNOWN:
237241
print(f"Unknown gRPC error from {sw.name}. Retrying...")
238-
await asyncio.sleep(2)
242+
time.sleep(2)
239243

240244
except Exception as e:
241245
print(f"[Unexpected Error in printCounter for {sw.name}]: {e}")
242246
traceback.print_exc()
243-
await asyncio.sleep(2)
247+
time.sleep(2)
244248

245-
async def processPacket(message):
246-
payload = message["packet-in"].packet.payload
247-
packet = message["packet-in"].packet
249+
def processPacket(message):
250+
payload = message["packet-in"].payload
251+
packet = message["packet-in"]
248252
print("Received PacketIn message of length %d bytes from switch %s"
249253
"" % (len(payload), message["sw"].name))
250254
if len(payload) > 0:
@@ -279,7 +283,7 @@ async def processPacket(message):
279283
new_dscp_int = 5
280284
global_data['index'] = int(pkt[IP].dst.split('.')[3])
281285
dst_eth_addr = global_data[ip_da_str]
282-
metadatas = PacketOutMetadataList(
286+
metadatas = packetOutMetadataList(
283287
global_data['controller_opcode_name2int']['SEND_TO_PORT_IN_OPERAND0'],
284288
0, dest_port_int)
285289
sendPacketOut(message["sw"], payload, metadatas)
@@ -301,23 +305,23 @@ async def processPacket(message):
301305
async def processNotif(notif_queue):
302306
while True:
303307
notif = await notif_queue.get()
304-
308+
print(notif)
305309
if notif["type"] == "packet-in":
306-
await processPacket(notif)
307-
await printCounter(global_data ['p4info_helper'], notif["sw"], 'MyIngress.ingressPktOutCounter', global_data ['index'])
308-
await printCounter(global_data ['p4info_helper'], notif["sw"], 'MyEgress.egressPktInCounter', global_data ['index'])
309-
await readTableRules(global_data ['p4info_helper'], notif["sw"])
310+
processPacket(notif)
311+
printCounter(global_data ['p4info_helper'], notif["sw"], 'MyIngress.ingressPktOutCounter', global_data ['index'])
312+
printCounter(global_data ['p4info_helper'], notif["sw"], 'MyEgress.egressPktInCounter', global_data ['index'])
313+
readTableRules(global_data ['p4info_helper'], notif["sw"])
310314
elif notif["type"] == "idle-notif":
311-
print(notif["idle"])
315+
deleteFlowRule(notif)
312316

313317
notif_queue.task_done()
314318

315319
async def packetInHandler(notif_queue,sw):
316-
#TODO remove for exercise
320+
# TODO: Implement the function logic to handle a packet-in message
317321
while True:
318322
try:
319323
packet_in = await asyncio.to_thread(sw.PacketIn)
320-
print(f"Received packet: {packet_in.packet}")
324+
print(f"Received packet: {packet_in}")
321325
message = {"type": "packet-in", "sw": sw, "packet-in": packet_in}
322326
await notif_queue.put(message)
323327

@@ -335,10 +339,12 @@ async def packetInHandler(notif_queue,sw):
335339
await asyncio.sleep(2)
336340

337341
async def idleTimeHandler(notif_queue,sw):
338-
#TODO remove for exercise
339-
idle_notif = await asyncio.to_thread(sw.IdleTimeoutNotification)
340-
message = {"type": "idle-notif", "sw": sw, "idle": idle_notif}
341-
await notif_queue.put(message)
342+
# TODO: Implement the function logic to handle idle timeout notification
343+
while True:
344+
idle_notif = await asyncio.to_thread(sw.IdleTimeoutNotification)
345+
message = {"type": "idle-notif", "sw": sw, "idle": idle_notif}
346+
await notif_queue.put(message)
347+
await asyncio.sleep(5)
342348

343349
def printGrpcError(e):
344350
print("gRPC Error:", e.details(), end=' ')

0 commit comments

Comments
 (0)