1616from abc import abstractmethod
1717from datetime import datetime
1818from queue import Queue
19- import threading
2019
2120import grpc
2221from p4 .tmp import p4config_pb2
@@ -31,36 +30,6 @@ def ShutdownAllSwitchConnections():
3130 for c in connections :
3231 c .shutdown ()
3332
34- class StreamDispatcher :
35- def __init__ (self , stream ):
36- self .stream = stream
37- self .running = True
38- # Queues for each message type
39- self .arbitration_queue = Queue ()
40- self .packet_in_queue = Queue ()
41- self .timeout_queue = Queue ()
42- self .error_queue = Queue ()
43-
44- self .thread = threading .Thread (target = self ._dispatch_loop , daemon = True )
45- self .thread .start ()
46-
47- def _dispatch_loop (self ):
48- for msg in self .stream :
49- if not self .running :
50- break
51- if msg .HasField ("arbitration" ):
52- self .arbitration_queue .put (msg .arbitration )
53- elif msg .HasField ("packet" ):
54- self .packet_in_queue .put (msg .packet )
55- elif msg .HasField ("idle_timeout_notification" ):
56- self .timeout_queue .put (msg .idle_timeout_notification )
57- elif msg .HasField ("error" ):
58- self .error_queue .put (msg .error )
59- else :
60- print ("Unknown StreamMessageResponse:" , msg )
61-
62- def stop (self ):
63- self .running = False
6433class SwitchConnection (object ):
6534
6635 def __init__ (self , name = None , address = '127.0.0.1:50051' , device_id = 0 ,
@@ -76,7 +45,6 @@ def __init__(self, name=None, address='127.0.0.1:50051', device_id=0,
7645 self .client_stub = p4runtime_pb2_grpc .P4RuntimeStub (self .channel )
7746 self .requests_stream = IterableQueue ()
7847 self .stream_msg_resp = self .client_stub .StreamChannel (iter (self .requests_stream ))
79- self .dispatcher = StreamDispatcher (self .stream_msg_resp )
8048 self .proto_dump_file = proto_dump_file
8149 connections .append (self )
8250
@@ -98,7 +66,8 @@ def MasterArbitrationUpdate(self, dry_run=False, **kwargs):
9866 print ("P4Runtime MasterArbitrationUpdate: " , request )
9967 else :
10068 self .requests_stream .put (request )
101- return self .dispatcher .arbitration_queue .get ()
69+ for item in self .stream_msg_resp :
70+ return item # just one
10271
10372 def SetForwardingPipelineConfig (self , p4info , dry_run = False , ** kwargs ):
10473 device_config = self .buildDeviceConfig (** kwargs )
@@ -131,18 +100,6 @@ def WriteTableEntry(self, table_entry, dry_run=False):
131100 else :
132101 self .client_stub .Write (request )
133102
134- def DeleteTableEntry (self , table_entry , dry_run = False ):
135- request = p4runtime_pb2 .WriteRequest ()
136- request .device_id = self .device_id
137- request .election_id .low = 1
138- update = request .updates .add ()
139- update .type = p4runtime_pb2 .Update .DELETE
140- update .entity .table_entry .CopyFrom (table_entry )
141- if dry_run :
142- print ("P4Runtime Write:" , request )
143- else :
144- self .client_stub .Write (request )
145-
146103 def ReadTableEntries (self , table_id = None , dry_run = False ):
147104 request = p4runtime_pb2 .ReadRequest ()
148105 request .device_id = self .device_id
@@ -188,11 +145,14 @@ def WritePREEntry(self, pre_entry, dry_run=False):
188145 self .client_stub .Write (request )
189146
190147 def PacketIn (self , dry_run = False ):
191- request = self . dispatcher . packet_in_queue . get ()
148+ request = p4runtime_pb2 . StreamMessageRequest ()
192149 if dry_run :
193150 print ("P4 Runtime PacketIn: " , request )
194151 else :
195- return request
152+ self .requests_stream .put (request )
153+ for item in self .stream_msg_resp :
154+ print (item )
155+ return item
196156
197157 def PacketOut (self , payload , metadatas ):
198158 packet_out = p4runtime_pb2 .PacketOut ()
@@ -212,12 +172,10 @@ def PacketOut(self, payload, metadatas):
212172 request .packet .CopyFrom (packet_out )
213173 self .requests_stream .put (request )
214174
215- def IdleTimeoutNotification (self , dry_run = False ):
216- msg = self .dispatcher .timeout_queue .get ()
217- if dry_run :
218- print ("P4 Runtime PacketIn: " , msg )
219- else :
220- return msg
175+ def IdleTimeoutNotification (self ):
176+ msg = p4runtime_pb2 .IdleTimeoutNotification ()
177+ return msg
178+
221179class GrpcRequestLogger (grpc .UnaryUnaryClientInterceptor ,
222180 grpc .UnaryStreamClientInterceptor ):
223181 """Implementation of a gRPC interceptor that logs request to a file"""
0 commit comments