1- """This module loops through each file in a batch and processes it using the SQL agents.
1+ """This module loops through each file in a batch and processes it using the SQL agents.More actions
22It sets up a group chat for the agents, sends the source script to the chat, and processes
33the responses from the agents. It also reports in real-time to the client using websockets
44and updates the database with the results.
55"""
66
7+ import asyncio
78import json
89import logging
910
@@ -45,6 +46,13 @@ async def convert_script(
4546
4647 # Setup the group chat for the agents
4748 chat = CommsManager (sql_agents .idx_agents ).group_chat
49+ #retry logic comms manager
50+ comms_manager = CommsManager (
51+ sql_agents .idx_agents ,
52+ max_retries = 3 , # Retry up to 5 times for rate limits
53+ initial_delay = 0.2 , # Start with 1 second delay
54+ backoff_factor = 1.2 , # Double delay each retry
55+ )
4856
4957 # send websocket notification that file processing has started
5058 send_status_update (
@@ -62,160 +70,211 @@ async def convert_script(
6270 current_migration = "No migration"
6371 is_complete : bool = False
6472 while not is_complete :
65- await chat .add_chat_message (
73+ await comms_manager . group_chat .add_chat_message (
6674 ChatMessageContent (role = AuthorRole .USER , content = source_script )
6775 )
6876 carry_response = None
69- async for response in chat .invoke ():
70- # TEMPORARY: awaiting bug fix for rate limits
71- #await asyncio.sleep(5)
72- carry_response = response
73- if response .role == AuthorRole .ASSISTANT .value :
74- # Our process can terminate with either of these as the last response
75- # before syntax check
76- match response .name :
77- case AgentType .MIGRATOR .value :
78- result = MigratorResponse .model_validate_json (
79- response .content or ""
80- )
81- if result .input_error or result .rai_error :
82- # If there is an error in input, we end the processing here.
83- # We do not include this in termination to avoid forking the chat process.
84- description = {
85- "role" : response .role ,
86- "name" : response .name or "*" ,
87- "content" : response .content ,
88- }
89- await batch_service .create_file_log (
90- str (file .file_id ),
91- description ,
92- current_migration ,
93- LogType .ERROR ,
94- AgentType (response .name ),
95- AuthorRole (response .role ),
77+ try :
78+
79+ async for response in comms_manager .async_invoke ():
80+ # TEMPORARY: awaiting bug fix for rate limits
81+ # await asyncio.sleep(5)
82+ carry_response = response
83+ if response .role == AuthorRole .ASSISTANT .value :
84+ # Our process can terminate with either of these as the last response
85+ # before syntax check
86+ match response .name :
87+ case AgentType .MIGRATOR .value :
88+ result = MigratorResponse .model_validate_json (
89+ response .content or ""
9690 )
97- current_migration = None
98- break
99- case AgentType .SYNTAX_CHECKER .value :
100- result = SyntaxCheckerResponse .model_validate_json (
101- response .content .lower () or ""
102- )
103- # If there are no syntax errors, we can move to the semantic verifier
104- # We provide both scripts by injecting them into the chat history
105- if result .syntax_errors == []:
106- chat .history .add_message (
107- ChatMessageContent (
108- role = AuthorRole .USER ,
109- name = "candidate" ,
110- content = (
111- f"source_script: { source_script } , \n "
112- + f"migrated_script: { current_migration } "
113- ),
91+ if result .input_error or result .rai_error :
92+ # If there is an error in input, we end the processing here.
93+ # We do not include this in termination to avoid forking the chat process.
94+ description = {
95+ "role" : response .role ,
96+ "name" : response .name or "*" ,
97+ "content" : response .content ,
98+ }
99+ await batch_service .create_file_log (
100+ str (file .file_id ),
101+ description ,
102+ current_migration ,
103+ LogType .ERROR ,
104+ AgentType (response .name ),
105+ AuthorRole (response .role ),
114106 )
107+ current_migration = None
108+ break
109+ case AgentType .SYNTAX_CHECKER .value :
110+ result = SyntaxCheckerResponse .model_validate_json (
111+ response .content .lower () or ""
115112 )
116- case AgentType .PICKER .value :
117- result = PickerResponse .model_validate_json (
118- response .content or ""
119- )
120- current_migration = result .picked_query
121- case AgentType .FIXER .value :
122- result = FixerResponse .model_validate_json (
123- response .content or ""
124- )
125- current_migration = result .fixed_query
126- case AgentType .SEMANTIC_VERIFIER .value :
127- logger .info (
128- "Semantic verifier agent response: %s" , response .content
129- )
130- result = SemanticVerifierResponse .model_validate_json (
131- response .content or ""
132- )
133-
134- # If the semantic verifier agent returns a difference, we need to report it
135- if len (result .differences ) > 0 :
136- description = {
137- "role" : AuthorRole .ASSISTANT .value ,
138- "name" : AgentType .SEMANTIC_VERIFIER .value ,
139- "content" : "\n " .join (result .differences ),
140- }
141- logger .info (
142- "Semantic verification had issues. Pass with warnings."
143- )
144- # send status update to the client of type in progress with agent status
145- send_status_update (
146- status = FileProcessUpdate (
147- file .batch_id ,
148- file .file_id ,
149- ProcessStatus .COMPLETED ,
150- AgentType .SEMANTIC_VERIFIER ,
151- result .summary ,
152- FileResult .WARNING ,
153- ),
113+ # If there are no syntax errors, we can move to the semantic verifier
114+ # We provide both scripts by injecting them into the chat history
115+ if result .syntax_errors == []:
116+ comms_manager .group_chat .history .add_message (
117+ ChatMessageContent (
118+ role = AuthorRole .USER ,
119+ name = "candidate" ,
120+ content = (
121+ f"source_script: { source_script } , \n "
122+ + f"migrated_script: { current_migration } "
123+ ),
124+ )
125+ )
126+ case AgentType .PICKER .value :
127+ result = PickerResponse .model_validate_json (
128+ response .content or ""
154129 )
155- await batch_service .create_file_log (
156- str (file .file_id ),
157- description ,
158- current_migration ,
159- LogType .WARNING ,
160- AgentType .SEMANTIC_VERIFIER ,
161- AuthorRole .ASSISTANT ,
130+ current_migration = result .picked_query
131+ case AgentType .FIXER .value :
132+ result = FixerResponse .model_validate_json (
133+ response .content or ""
162134 )
163-
164- elif response == "" :
165- # If the semantic verifier agent returns an empty response
135+ current_migration = result .fixed_query
136+ case AgentType .SEMANTIC_VERIFIER .value :
166137 logger .info (
167- "Semantic verification had no return value. Pass with warnings."
138+ "Semantic verifier agent response: %s" , response .content
139+ )
140+ result = SemanticVerifierResponse .model_validate_json (
141+ response .content or ""
168142 )
169- # send status update to the client of type in progress with agent status
170- send_status_update (
171- status = FileProcessUpdate (
172- file .batch_id ,
173- file .file_id ,
174- ProcessStatus .COMPLETED ,
143+
144+ # If the semantic verifier agent returns a difference, we need to report it
145+ if len (result .differences ) > 0 :
146+ description = {
147+ "role" : AuthorRole .ASSISTANT .value ,
148+ "name" : AgentType .SEMANTIC_VERIFIER .value ,
149+ "content" : "\n " .join (result .differences ),
150+ }
151+ logger .info (
152+ "Semantic verification had issues. Pass with warnings."
153+ )
154+ # send status update to the client of type in progress with agent status
155+ send_status_update (
156+ status = FileProcessUpdate (
157+ file .batch_id ,
158+ file .file_id ,
159+ ProcessStatus .COMPLETED ,
160+ AgentType .SEMANTIC_VERIFIER ,
161+ result .summary ,
162+ FileResult .WARNING ,
163+ ),
164+ )
165+ await batch_service .create_file_log (
166+ str (file .file_id ),
167+ description ,
168+ current_migration ,
169+ LogType .WARNING ,
175170 AgentType .SEMANTIC_VERIFIER ,
171+ AuthorRole .ASSISTANT ,
172+ )
173+
174+ elif response == "" :
175+ # If the semantic verifier agent returns an empty response
176+ logger .info (
177+ "Semantic verification had no return value. Pass with warnings."
178+ )
179+ # send status update to the client of type in progress with agent status
180+ send_status_update (
181+ status = FileProcessUpdate (
182+ file .batch_id ,
183+ file .file_id ,
184+ ProcessStatus .COMPLETED ,
185+ AgentType .SEMANTIC_VERIFIER ,
186+ "No return value from semantic verifier agent." ,
187+ FileResult .WARNING ,
188+ ),
189+ )
190+ await batch_service .create_file_log (
191+ str (file .file_id ),
176192 "No return value from semantic verifier agent." ,
177- FileResult . WARNING ,
178- ) ,
179- )
180- await batch_service . create_file_log (
181- str ( file . file_id ),
182- "No return value from semantic verifier agent." ,
183- current_migration ,
184- LogType . WARNING ,
185- AgentType . SEMANTIC_VERIFIER ,
186- AuthorRole . ASSISTANT ,
187- )
193+ current_migration ,
194+ LogType . WARNING ,
195+ AgentType . SEMANTIC_VERIFIER ,
196+ AuthorRole . ASSISTANT ,
197+ )
198+
199+ description = {
200+ "role" : response . role ,
201+ "name" : response . name or "*" ,
202+ "content" : response . content ,
203+ }
188204
189- description = {
190- "role" : response .role ,
191- "name" : response .name or "*" ,
192- "content" : response .content ,
193- }
205+ logger .info (description )
194206
195- logger .info (description )
207+ # send status update to the client of type in progress with agent status
208+ # send_status_update(
209+ # status=FileProcessUpdate(
210+ # file.batch_id,
211+ # file.file_id,
212+ # ProcessStatus.IN_PROGRESS,
213+ # AgentType(response.name),
214+ # json.loads(response.content)["summary"],
215+ # FileResult.INFO,
216+ # ),
217+ # )
218+ # Safely parse response content to avoid crashing on malformed or incomplete JSON
219+ #start
220+ try :
221+ parsed_content = json .loads (response .content or "{}" )
222+ except json .JSONDecodeError :
223+ logger .warning ("Invalid JSON from agent: %s" , response .content )
224+ parsed_content = {
225+ "input_summary" : "" ,
226+ "candidates" : [],
227+ "summary" : "" ,
228+ "input_error" : "" ,
229+ "rai_error" : "Invalid JSON from agent." ,
230+ }
196231
197- # send status update to the client of type in progress with agent status
232+ # Send status update using safe fallback values
233+ send_status_update (
234+ status = FileProcessUpdate (
235+ file .batch_id ,
236+ file .file_id ,
237+ ProcessStatus .IN_PROGRESS ,
238+ AgentType (response .name ),
239+ parsed_content .get ("summary" , "" ),
240+ FileResult .INFO ,
241+ ),
242+ )
243+ ##end
244+ await batch_service .create_file_log (
245+ str (file .file_id ),
246+ description ,
247+ current_migration ,
248+ LogType .INFO ,
249+ AgentType (response .name ),
250+ AuthorRole (response .role ),
251+ )
252+ except Exception as e :
253+ #logger.error("Error during chat.invoke(): %s", str(e))
254+ logger .error ("Error during comms_manager.async_invoke(): %s" , str (e ))
255+ # Log the error to the batch service for tracking
256+ await batch_service .create_file_log (
257+ str (file .file_id ),
258+ f"Critical error during agent communication: { str (e )} " ,
259+ current_migration ,
260+ LogType .ERROR ,
261+ AgentType .ALL ,
262+ AuthorRole .ASSISTANT ,
263+ )
264+ # Send error status update
198265 send_status_update (
199266 status = FileProcessUpdate (
200267 file .batch_id ,
201268 file .file_id ,
202- ProcessStatus .IN_PROGRESS ,
203- AgentType ( response . name ) ,
204- json . loads ( response . content )[ "summary" ] ,
205- FileResult .INFO ,
269+ ProcessStatus .COMPLETED ,
270+ AgentType . ALL ,
271+ f"Processing failed: { str ( e ) } " ,
272+ FileResult .ERROR ,
206273 ),
207274 )
275+ break # Exit the while loop on critical error
208276
209- await batch_service .create_file_log (
210- str (file .file_id ),
211- description ,
212- current_migration ,
213- LogType .INFO ,
214- AgentType (response .name ),
215- AuthorRole (response .role ),
216- )
217-
218- if chat .is_complete :
277+ if comms_manager .group_chat .is_complete :
219278 is_complete = True
220279
221280 break
@@ -295,6 +354,4 @@ async def validate_migration(
295354 log_type = LogType .SUCCESS ,
296355 agent_type = AgentType .ALL ,
297356 author_role = AuthorRole .ASSISTANT ,
298- )
299-
300- return True
357+ )
0 commit comments