@@ -46,6 +46,13 @@ async def convert_script(
4646
4747 # Setup the group chat for the agents
4848 chat = CommsManager (sql_agents .idx_agents ).group_chat
49+ #retry logic comms manager
50+ comms_manager = CommsManager (
51+ sql_agents .idx_agents ,
52+ max_retries = 5 , # Retry up to 5 times for rate limits
53+ initial_delay = 1.0 , # Start with 1 second delay
54+ backoff_factor = 2.0 , # Double delay each retry
55+ )
4956
5057 # send websocket notification that file processing has started
5158 send_status_update (
@@ -63,160 +70,211 @@ async def convert_script(
6370 current_migration = "No migration"
6471 is_complete : bool = False
6572 while not is_complete :
66- await chat .add_chat_message (
73+ await comms_manager . group_chat .add_chat_message (
6774 ChatMessageContent (role = AuthorRole .USER , content = source_script )
6875 )
6976 carry_response = None
70- async for response in chat .invoke ():
71- # TEMPORARY: awaiting bug fix for rate limits
72- await asyncio .sleep (5 )
73- carry_response = response
74- if response .role == AuthorRole .ASSISTANT .value :
75- # Our process can terminate with either of these as the last response
76- # before syntax check
77- match response .name :
78- case AgentType .MIGRATOR .value :
79- result = MigratorResponse .model_validate_json (
80- response .content or ""
81- )
82- if result .input_error or result .rai_error :
83- # If there is an error in input, we end the processing here.
84- # We do not include this in termination to avoid forking the chat process.
85- description = {
86- "role" : response .role ,
87- "name" : response .name or "*" ,
88- "content" : response .content ,
89- }
90- await batch_service .create_file_log (
91- str (file .file_id ),
92- description ,
93- current_migration ,
94- LogType .ERROR ,
95- AgentType (response .name ),
96- AuthorRole (response .role ),
77+ try :
78+
79+ async for response in comms_manager .async_invoke ():
80+ # TEMPORARY: awaiting bug fix for rate limits
81+ # await asyncio.sleep(5)
82+ carry_response = response
83+ if response .role == AuthorRole .ASSISTANT .value :
84+ # Our process can terminate with either of these as the last response
85+ # before syntax check
86+ match response .name :
87+ case AgentType .MIGRATOR .value :
88+ result = MigratorResponse .model_validate_json (
89+ response .content or ""
9790 )
98- current_migration = None
99- break
100- case AgentType .SYNTAX_CHECKER .value :
101- result = SyntaxCheckerResponse .model_validate_json (
102- response .content .lower () or ""
103- )
104- # If there are no syntax errors, we can move to the semantic verifier
105- # We provide both scripts by injecting them into the chat history
106- if result .syntax_errors == []:
107- chat .history .add_message (
108- ChatMessageContent (
109- role = AuthorRole .USER ,
110- name = "candidate" ,
111- content = (
112- f"source_script: { source_script } , \n "
113- + f"migrated_script: { current_migration } "
114- ),
91+ if result .input_error or result .rai_error :
92+ # If there is an error in input, we end the processing here.
93+ # We do not include this in termination to avoid forking the chat process.
94+ description = {
95+ "role" : response .role ,
96+ "name" : response .name or "*" ,
97+ "content" : response .content ,
98+ }
99+ await batch_service .create_file_log (
100+ str (file .file_id ),
101+ description ,
102+ current_migration ,
103+ LogType .ERROR ,
104+ AgentType (response .name ),
105+ AuthorRole (response .role ),
115106 )
107+ current_migration = None
108+ break
109+ case AgentType .SYNTAX_CHECKER .value :
110+ result = SyntaxCheckerResponse .model_validate_json (
111+ response .content .lower () or ""
116112 )
117- case AgentType .PICKER .value :
118- result = PickerResponse .model_validate_json (
119- response .content or ""
120- )
121- current_migration = result .picked_query
122- case AgentType .FIXER .value :
123- result = FixerResponse .model_validate_json (
124- response .content or ""
125- )
126- current_migration = result .fixed_query
127- case AgentType .SEMANTIC_VERIFIER .value :
128- logger .info (
129- "Semantic verifier agent response: %s" , response .content
130- )
131- result = SemanticVerifierResponse .model_validate_json (
132- response .content or ""
133- )
134-
135- # If the semantic verifier agent returns a difference, we need to report it
136- if len (result .differences ) > 0 :
137- description = {
138- "role" : AuthorRole .ASSISTANT .value ,
139- "name" : AgentType .SEMANTIC_VERIFIER .value ,
140- "content" : "\n " .join (result .differences ),
141- }
142- logger .info (
143- "Semantic verification had issues. Pass with warnings."
144- )
145- # send status update to the client of type in progress with agent status
146- send_status_update (
147- status = FileProcessUpdate (
148- file .batch_id ,
149- file .file_id ,
150- ProcessStatus .COMPLETED ,
151- AgentType .SEMANTIC_VERIFIER ,
152- result .summary ,
153- FileResult .WARNING ,
154- ),
113+ # If there are no syntax errors, we can move to the semantic verifier
114+ # We provide both scripts by injecting them into the chat history
115+ if result .syntax_errors == []:
116+ comms_manager .group_chat .history .add_message (
117+ ChatMessageContent (
118+ role = AuthorRole .USER ,
119+ name = "candidate" ,
120+ content = (
121+ f"source_script: { source_script } , \n "
122+ + f"migrated_script: { current_migration } "
123+ ),
124+ )
125+ )
126+ case AgentType .PICKER .value :
127+ result = PickerResponse .model_validate_json (
128+ response .content or ""
155129 )
156- await batch_service .create_file_log (
157- str (file .file_id ),
158- description ,
159- current_migration ,
160- LogType .WARNING ,
161- AgentType .SEMANTIC_VERIFIER ,
162- AuthorRole .ASSISTANT ,
130+ current_migration = result .picked_query
131+ case AgentType .FIXER .value :
132+ result = FixerResponse .model_validate_json (
133+ response .content or ""
163134 )
164-
165- elif response == "" :
166- # If the semantic verifier agent returns an empty response
135+ current_migration = result .fixed_query
136+ case AgentType .SEMANTIC_VERIFIER .value :
167137 logger .info (
168- "Semantic verification had no return value. Pass with warnings."
138+ "Semantic verifier agent response: %s" , response . content
169139 )
170- # send status update to the client of type in progress with agent status
171- send_status_update (
172- status = FileProcessUpdate (
173- file .batch_id ,
174- file .file_id ,
175- ProcessStatus .COMPLETED ,
140+ result = SemanticVerifierResponse .model_validate_json (
141+ response .content or ""
142+ )
143+
144+ # If the semantic verifier agent returns a difference, we need to report it
145+ if len (result .differences ) > 0 :
146+ description = {
147+ "role" : AuthorRole .ASSISTANT .value ,
148+ "name" : AgentType .SEMANTIC_VERIFIER .value ,
149+ "content" : "\n " .join (result .differences ),
150+ }
151+ logger .info (
152+ "Semantic verification had issues. Pass with warnings."
153+ )
154+ # send status update to the client of type in progress with agent status
155+ send_status_update (
156+ status = FileProcessUpdate (
157+ file .batch_id ,
158+ file .file_id ,
159+ ProcessStatus .COMPLETED ,
160+ AgentType .SEMANTIC_VERIFIER ,
161+ result .summary ,
162+ FileResult .WARNING ,
163+ ),
164+ )
165+ await batch_service .create_file_log (
166+ str (file .file_id ),
167+ description ,
168+ current_migration ,
169+ LogType .WARNING ,
176170 AgentType .SEMANTIC_VERIFIER ,
171+ AuthorRole .ASSISTANT ,
172+ )
173+
174+ elif response == "" :
175+ # If the semantic verifier agent returns an empty response
176+ logger .info (
177+ "Semantic verification had no return value. Pass with warnings."
178+ )
179+ # send status update to the client of type in progress with agent status
180+ send_status_update (
181+ status = FileProcessUpdate (
182+ file .batch_id ,
183+ file .file_id ,
184+ ProcessStatus .COMPLETED ,
185+ AgentType .SEMANTIC_VERIFIER ,
186+ "No return value from semantic verifier agent." ,
187+ FileResult .WARNING ,
188+ ),
189+ )
190+ await batch_service .create_file_log (
191+ str (file .file_id ),
177192 "No return value from semantic verifier agent." ,
178- FileResult .WARNING ,
179- ),
180- )
181- await batch_service .create_file_log (
182- str (file .file_id ),
183- "No return value from semantic verifier agent." ,
184- current_migration ,
185- LogType .WARNING ,
186- AgentType .SEMANTIC_VERIFIER ,
187- AuthorRole .ASSISTANT ,
188- )
193+ current_migration ,
194+ LogType .WARNING ,
195+ AgentType .SEMANTIC_VERIFIER ,
196+ AuthorRole .ASSISTANT ,
197+ )
189198
190- description = {
191- "role" : response .role ,
192- "name" : response .name or "*" ,
193- "content" : response .content ,
194- }
199+ description = {
200+ "role" : response .role ,
201+ "name" : response .name or "*" ,
202+ "content" : response .content ,
203+ }
195204
196- logger .info (description )
205+ logger .info (description )
197206
198- # send status update to the client of type in progress with agent status
207+ # send status update to the client of type in progress with agent status
208+ # send_status_update(
209+ # status=FileProcessUpdate(
210+ # file.batch_id,
211+ # file.file_id,
212+ # ProcessStatus.IN_PROGRESS,
213+ # AgentType(response.name),
214+ # json.loads(response.content)["summary"],
215+ # FileResult.INFO,
216+ # ),
217+ # )
218+ # Safely parse response content to avoid crashing on malformed or incomplete JSON
219+ #start
220+ try :
221+ parsed_content = json .loads (response .content or "{}" )
222+ except json .JSONDecodeError :
223+ logger .warning ("Invalid JSON from agent: %s" , response .content )
224+ parsed_content = {
225+ "input_summary" : "" ,
226+ "candidates" : [],
227+ "summary" : "" ,
228+ "input_error" : "" ,
229+ "rai_error" : "Invalid JSON from agent." ,
230+ }
231+
232+ # Send status update using safe fallback values
233+ send_status_update (
234+ status = FileProcessUpdate (
235+ file .batch_id ,
236+ file .file_id ,
237+ ProcessStatus .IN_PROGRESS ,
238+ AgentType (response .name ),
239+ parsed_content .get ("summary" , "" ),
240+ FileResult .INFO ,
241+ ),
242+ )
243+ ##end
244+ await batch_service .create_file_log (
245+ str (file .file_id ),
246+ description ,
247+ current_migration ,
248+ LogType .INFO ,
249+ AgentType (response .name ),
250+ AuthorRole (response .role ),
251+ )
252+ except Exception as e :
253+ #logger.error("Error during chat.invoke(): %s", str(e))
254+ logger .error ("Error during comms_manager.async_invoke(): %s" , str (e ))
255+ # Log the error to the batch service for tracking
256+ await batch_service .create_file_log (
257+ str (file .file_id ),
258+ f"Critical error during agent communication: { str (e )} " ,
259+ current_migration ,
260+ LogType .ERROR ,
261+ AgentType .ALL ,
262+ AuthorRole .ASSISTANT ,
263+ )
264+ # Send error status update
199265 send_status_update (
200266 status = FileProcessUpdate (
201267 file .batch_id ,
202268 file .file_id ,
203- ProcessStatus .IN_PROGRESS ,
204- AgentType ( response . name ) ,
205- json . loads ( response . content )[ "summary" ] ,
206- FileResult .INFO ,
269+ ProcessStatus .COMPLETED ,
270+ AgentType . ALL ,
271+ f"Processing failed: { str ( e ) } " ,
272+ FileResult .ERROR ,
207273 ),
208274 )
275+ break # Exit the while loop on critical error
209276
210- await batch_service .create_file_log (
211- str (file .file_id ),
212- description ,
213- current_migration ,
214- LogType .INFO ,
215- AgentType (response .name ),
216- AuthorRole (response .role ),
217- )
218-
219- if chat .is_complete :
277+ if comms_manager .group_chat .is_complete :
220278 is_complete = True
221279
222280 break
@@ -298,4 +356,4 @@ async def validate_migration(
298356 author_role = AuthorRole .ASSISTANT ,
299357 )
300358
301- return True
359+ return True
0 commit comments