44and updates the database with the results.
55"""
66
7- import asyncio
87import json
98import logging
109
@@ -45,7 +44,12 @@ async def convert_script(
4544 logger .info ("Migrating query: %s\n " , source_script )
4645
4746 # Setup the group chat for the agents
48- chat = CommsManager (sql_agents .idx_agents ).group_chat
47+ comms_manager = CommsManager (
48+ sql_agents .idx_agents ,
49+ max_retries = 5 , # Retry up to 5 times for rate limits
50+ initial_delay = 1.0 , # Start with 1 second delay
51+ backoff_factor = 2.0 , # Double delay each retry
52+ )
4953
5054 # send websocket notification that file processing has started
5155 send_status_update (
@@ -63,160 +67,194 @@ async def convert_script(
6367 current_migration = "No migration"
6468 is_complete : bool = False
6569 while not is_complete :
66- await chat .add_chat_message (
70+ await comms_manager . group_chat .add_chat_message (
6771 ChatMessageContent (role = AuthorRole .USER , content = source_script )
6872 )
6973 carry_response = None
70- async for response in chat .invoke ():
71- # TEMPORARY: awaiting bug fix for rate limits
72- await asyncio .sleep (5 )
73- carry_response = response
74- if response .role == AuthorRole .ASSISTANT .value :
75- # Our process can terminate with either of these as the last response
76- # before syntax check
77- match response .name :
78- case AgentType .MIGRATOR .value :
79- result = MigratorResponse .model_validate_json (
80- response .content or ""
81- )
82- if result .input_error or result .rai_error :
83- # If there is an error in input, we end the processing here.
84- # We do not include this in termination to avoid forking the chat process.
85- description = {
86- "role" : response .role ,
87- "name" : response .name or "*" ,
88- "content" : response .content ,
89- }
90- await batch_service .create_file_log (
91- str (file .file_id ),
92- description ,
93- current_migration ,
94- LogType .ERROR ,
95- AgentType (response .name ),
96- AuthorRole (response .role ),
74+ try :
75+
76+ async for response in comms_manager .async_invoke ():
77+ carry_response = response
78+ if response .role == AuthorRole .ASSISTANT .value :
79+ # Our process can terminate with either of these as the last response
80+ # before syntax check
81+ match response .name :
82+ case AgentType .MIGRATOR .value :
83+ result = MigratorResponse .model_validate_json (
84+ response .content or ""
9785 )
98- current_migration = None
99- break
100- case AgentType .SYNTAX_CHECKER .value :
101- result = SyntaxCheckerResponse .model_validate_json (
102- response .content .lower () or ""
103- )
104- # If there are no syntax errors, we can move to the semantic verifier
105- # We provide both scripts by injecting them into the chat history
106- if result .syntax_errors == []:
107- chat .history .add_message (
108- ChatMessageContent (
109- role = AuthorRole .USER ,
110- name = "candidate" ,
111- content = (
112- f"source_script: { source_script } , \n "
113- + f"migrated_script: { current_migration } "
114- ),
86+ if result .input_error or result .rai_error :
87+ # If there is an error in input, we end the processing here.
88+ # We do not include this in termination to avoid forking the chat process.
89+ description = {
90+ "role" : response .role ,
91+ "name" : response .name or "*" ,
92+ "content" : response .content ,
93+ }
94+ await batch_service .create_file_log (
95+ str (file .file_id ),
96+ description ,
97+ current_migration ,
98+ LogType .ERROR ,
99+ AgentType (response .name ),
100+ AuthorRole (response .role ),
115101 )
102+ current_migration = None
103+ break
104+ case AgentType .SYNTAX_CHECKER .value :
105+ result = SyntaxCheckerResponse .model_validate_json (
106+ response .content .lower () or ""
116107 )
117- case AgentType .PICKER .value :
118- result = PickerResponse .model_validate_json (
119- response .content or ""
120- )
121- current_migration = result .picked_query
122- case AgentType .FIXER .value :
123- result = FixerResponse .model_validate_json (
124- response .content or ""
125- )
126- current_migration = result .fixed_query
127- case AgentType .SEMANTIC_VERIFIER .value :
128- logger .info (
129- "Semantic verifier agent response: %s" , response .content
130- )
131- result = SemanticVerifierResponse .model_validate_json (
132- response .content or ""
133- )
134-
135- # If the semantic verifier agent returns a difference, we need to report it
136- if len (result .differences ) > 0 :
137- description = {
138- "role" : AuthorRole .ASSISTANT .value ,
139- "name" : AgentType .SEMANTIC_VERIFIER .value ,
140- "content" : "\n " .join (result .differences ),
141- }
142- logger .info (
143- "Semantic verification had issues. Pass with warnings."
144- )
145- # send status update to the client of type in progress with agent status
146- send_status_update (
147- status = FileProcessUpdate (
148- file .batch_id ,
149- file .file_id ,
150- ProcessStatus .COMPLETED ,
151- AgentType .SEMANTIC_VERIFIER ,
152- result .summary ,
153- FileResult .WARNING ,
154- ),
108+ # If there are no syntax errors, we can move to the semantic verifier
109+ # We provide both scripts by injecting them into the chat history
110+ if result .syntax_errors == []:
111+ comms_manager .group_chat .history .add_message (
112+ ChatMessageContent (
113+ role = AuthorRole .USER ,
114+ name = "candidate" ,
115+ content = (
116+ f"source_script: { source_script } , \n "
117+ + f"migrated_script: { current_migration } "
118+ ),
119+ )
120+ )
121+ case AgentType .PICKER .value :
122+ result = PickerResponse .model_validate_json (
123+ response .content or ""
155124 )
156- await batch_service .create_file_log (
157- str (file .file_id ),
158- description ,
159- current_migration ,
160- LogType .WARNING ,
161- AgentType .SEMANTIC_VERIFIER ,
162- AuthorRole .ASSISTANT ,
125+ current_migration = result .picked_query
126+ case AgentType .FIXER .value :
127+ result = FixerResponse .model_validate_json (
128+ response .content or ""
163129 )
164-
165- elif response == "" :
166- # If the semantic verifier agent returns an empty response
130+ current_migration = result .fixed_query
131+ case AgentType .SEMANTIC_VERIFIER .value :
167132 logger .info (
168- "Semantic verification had no return value. Pass with warnings."
133+ "Semantic verifier agent response: %s" , response . content
169134 )
170- # send status update to the client of type in progress with agent status
171- send_status_update (
172- status = FileProcessUpdate (
173- file .batch_id ,
174- file .file_id ,
175- ProcessStatus .COMPLETED ,
135+ result = SemanticVerifierResponse .model_validate_json (
136+ response .content or ""
137+ )
138+
139+ # If the semantic verifier agent returns a difference, we need to report it
140+ if len (result .differences ) > 0 :
141+ description = {
142+ "role" : AuthorRole .ASSISTANT .value ,
143+ "name" : AgentType .SEMANTIC_VERIFIER .value ,
144+ "content" : "\n " .join (result .differences ),
145+ }
146+ logger .info (
147+ "Semantic verification had issues. Pass with warnings."
148+ )
149+ # send status update to the client of type in progress with agent status
150+ send_status_update (
151+ status = FileProcessUpdate (
152+ file .batch_id ,
153+ file .file_id ,
154+ ProcessStatus .COMPLETED ,
155+ AgentType .SEMANTIC_VERIFIER ,
156+ result .summary ,
157+ FileResult .WARNING ,
158+ ),
159+ )
160+ await batch_service .create_file_log (
161+ str (file .file_id ),
162+ description ,
163+ current_migration ,
164+ LogType .WARNING ,
176165 AgentType .SEMANTIC_VERIFIER ,
166+ AuthorRole .ASSISTANT ,
167+ )
168+
169+ elif response == "" :
170+ # If the semantic verifier agent returns an empty response
171+ logger .info (
172+ "Semantic verification had no return value. Pass with warnings."
173+ )
174+ # send status update to the client of type in progress with agent status
175+ send_status_update (
176+ status = FileProcessUpdate (
177+ file .batch_id ,
178+ file .file_id ,
179+ ProcessStatus .COMPLETED ,
180+ AgentType .SEMANTIC_VERIFIER ,
181+ "No return value from semantic verifier agent." ,
182+ FileResult .WARNING ,
183+ ),
184+ )
185+ await batch_service .create_file_log (
186+ str (file .file_id ),
177187 "No return value from semantic verifier agent." ,
178- FileResult .WARNING ,
179- ),
180- )
181- await batch_service .create_file_log (
182- str (file .file_id ),
183- "No return value from semantic verifier agent." ,
184- current_migration ,
185- LogType .WARNING ,
186- AgentType .SEMANTIC_VERIFIER ,
187- AuthorRole .ASSISTANT ,
188- )
188+ current_migration ,
189+ LogType .WARNING ,
190+ AgentType .SEMANTIC_VERIFIER ,
191+ AuthorRole .ASSISTANT ,
192+ )
189193
190- description = {
191- "role" : response .role ,
192- "name" : response .name or "*" ,
193- "content" : response .content ,
194- }
194+ description = {
195+ "role" : response .role ,
196+ "name" : response .name or "*" ,
197+ "content" : response .content ,
198+ }
195199
196- logger .info (description )
200+ logger .info (description )
201+ try :
202+ parsed_content = json .loads (response .content or "{}" )
203+ except json .JSONDecodeError :
204+ logger .warning ("Invalid JSON from agent: %s" , response .content )
205+ parsed_content = {
206+ "input_summary" : "" ,
207+ "candidates" : [],
208+ "summary" : "" ,
209+ "input_error" : "" ,
210+ "rai_error" : "Invalid JSON from agent." ,
211+ }
197212
198- # send status update to the client of type in progress with agent status
213+ # Send status update using safe fallback values
214+ send_status_update (
215+ status = FileProcessUpdate (
216+ file .batch_id ,
217+ file .file_id ,
218+ ProcessStatus .IN_PROGRESS ,
219+ AgentType (response .name ),
220+ parsed_content .get ("summary" , "" ),
221+ FileResult .INFO ,
222+ ),
223+ )
224+ # end
225+ await batch_service .create_file_log (
226+ str (file .file_id ),
227+ description ,
228+ current_migration ,
229+ LogType .INFO ,
230+ AgentType (response .name ),
231+ AuthorRole (response .role ),
232+ )
233+ except Exception as e :
234+ logger .error ("Error during comms_manager.async_invoke(): %s" , str (e ))
235+ # Log the error to the batch service for tracking
236+ await batch_service .create_file_log (
237+ str (file .file_id ),
238+ f"Critical error during agent communication: { str (e )} " ,
239+ current_migration ,
240+ LogType .ERROR ,
241+ AgentType .ALL ,
242+ AuthorRole .ASSISTANT ,
243+ )
244+ # Send error status update
199245 send_status_update (
200246 status = FileProcessUpdate (
201247 file .batch_id ,
202248 file .file_id ,
203- ProcessStatus .IN_PROGRESS ,
204- AgentType ( response . name ) ,
205- json . loads ( response . content )[ "summary" ] ,
206- FileResult .INFO ,
249+ ProcessStatus .COMPLETED ,
250+ AgentType . ALL ,
251+ f"Processing failed: { str ( e ) } " ,
252+ FileResult .ERROR ,
207253 ),
208254 )
255+ break
209256
210- await batch_service .create_file_log (
211- str (file .file_id ),
212- description ,
213- current_migration ,
214- LogType .INFO ,
215- AgentType (response .name ),
216- AuthorRole (response .role ),
217- )
218-
219- if chat .is_complete :
257+ if comms_manager .group_chat .is_complete :
220258 is_complete = True
221259
222260 break
@@ -276,7 +314,6 @@ async def validate_migration(
276314 )
277315
278316 logger .error ("No migrated query returned. Migration failed." )
279- # Add needed error or log data to the file record here
280317 return False
281318
282319 # send status update to the client of type completed / success
0 commit comments