Skip to content

Commit b1e109a

Browse files
author
Shreyas-Microsoft
committed
properly cleanup agents and conversation
1 parent 9460edb commit b1e109a

2 files changed

Lines changed: 142 additions & 104 deletions

File tree

src/backend/sql_agents/convert_script.py

Lines changed: 15 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -44,24 +44,24 @@ async def convert_script(
4444
logger.info("Migrating query: %s\n", source_script)
4545

4646
# Setup the group chat for the agents
47-
comms_manager = CommsManager(
47+
async with CommsManager(
4848
sql_agents.idx_agents,
49-
max_retries=5, # Retry up to 5 times for rate limits
50-
initial_delay=1.0, # Start with 1 second delay
51-
backoff_factor=2.0, # Double delay each retry
52-
)
49+
max_retries=5,
50+
initial_delay=1.0,
51+
backoff_factor=2.0,
52+
) as comms_manager:
5353

5454
# send websocket notification that file processing has started
55-
send_status_update(
56-
status=FileProcessUpdate(
57-
file.batch_id,
58-
file.file_id,
59-
ProcessStatus.IN_PROGRESS,
60-
AgentType.ALL,
61-
"File processing started",
62-
file_result=FileResult.INFO,
63-
),
64-
)
55+
send_status_update(
56+
status=FileProcessUpdate(
57+
file.batch_id,
58+
file.file_id,
59+
ProcessStatus.IN_PROGRESS,
60+
AgentType.ALL,
61+
"File processing started",
62+
file_result=FileResult.INFO,
63+
),
64+
)
6565

6666
# orchestrate the chat
6767
current_migration = "No migration"

src/backend/sql_agents/helpers/comms_manager.py

Lines changed: 127 additions & 89 deletions
Original file line numberDiff line numberDiff line change
@@ -156,113 +156,151 @@ def __init__(
156156

157157
async def invoke_async(self):
158158
"""Invoke the group chat with the given agents (original method maintained for compatibility)."""
159-
return self.group_chat.invoke()
159+
try:
160+
return self.group_chat.invoke()
161+
finally:
162+
# Ensure cleanup happens even if invoke fails
163+
await self.cleanup()
160164

161165
async def async_invoke(self) -> AsyncIterable[ChatMessageContent]:
162166
"""Invoke the group chat with retry logic and error handling."""
163167
attempt = 0
164168
current_delay = self.initial_delay
165169

166-
while attempt < self.max_retries:
167-
try:
168-
# Grab a snapshot of the history of the group chat
169-
# Using "SHALLOW" copy to avoid getting a reference to the original list
170-
history_snap = copy.copy(self.group_chat.history)
171-
172-
self.logger.debug(
173-
"History before invoke: %s",
174-
[msg.name for msg in self.group_chat.history],
175-
)
176-
177-
# Get a fresh iterator from the function
178-
async_iter = self.group_chat.invoke()
179-
180-
# If simple truncation is set, truncate the history
181-
if (
182-
self.simple_truncation
183-
and len(self.group_chat.history) > self.simple_truncation
184-
):
185-
# Truncate the history to the last n messages
186-
self.group_chat.history = history_snap[-self.simple_truncation :]
187-
188-
# Yield each item from the iterator
189-
async for item in async_iter:
190-
yield item
191-
192-
# If we get here without exception, we're done
193-
break
194-
195-
except AgentInvokeException as aie:
196-
attempt += 1
197-
if attempt >= self.max_retries:
198-
self.logger.error(
199-
"Function invoke failed after %d attempts. Final error: %s. Consider increasing the models rate limit.",
200-
self.max_retries,
201-
str(aie),
170+
try:
171+
while attempt < self.max_retries:
172+
try:
173+
# Grab a snapshot of the history of the group chat
174+
# Using "SHALLOW" copy to avoid getting a reference to the original list
175+
history_snap = copy.copy(self.group_chat.history)
176+
177+
self.logger.debug(
178+
"History before invoke: %s",
179+
[msg.name for msg in self.group_chat.history],
202180
)
203-
# Re-raise the last exception if all retries failed
204-
raise
205181

206-
# Return history state for retry
207-
self.group_chat.history = history_snap
182+
# Get a fresh iterator from the function
183+
async_iter = self.group_chat.invoke()
208184

209-
try:
210-
# Try to extract wait time from error message
211-
wait_time_match = re.search(self._EXTRACT_WAIT_TIME, str(aie))
212-
if wait_time_match:
213-
# If regex is found, set the delay to the value in seconds
214-
current_delay = int(wait_time_match.group(1))
215-
else:
185+
# If simple truncation is set, truncate the history
186+
if (
187+
self.simple_truncation
188+
and len(self.group_chat.history) > self.simple_truncation
189+
):
190+
# Truncate the history to the last n messages
191+
self.group_chat.history = history_snap[-self.simple_truncation :]
192+
193+
# Yield each item from the iterator
194+
async for item in async_iter:
195+
yield item
196+
197+
# If we get here without exception, we're done
198+
break
199+
200+
except AgentInvokeException as aie:
201+
attempt += 1
202+
if attempt >= self.max_retries:
203+
self.logger.error(
204+
"Function invoke failed after %d attempts. Final error: %s. Consider increasing the models rate limit.",
205+
self.max_retries,
206+
str(aie),
207+
)
208+
# Re-raise the last exception if all retries failed
209+
raise
210+
211+
# Return history state for retry
212+
self.group_chat.history = history_snap
213+
214+
try:
215+
# Try to extract wait time from error message
216+
wait_time_match = re.search(self._EXTRACT_WAIT_TIME, str(aie))
217+
if wait_time_match:
218+
# If regex is found, set the delay to the value in seconds
219+
current_delay = int(wait_time_match.group(1))
220+
else:
221+
current_delay = self.initial_delay
222+
223+
self.logger.warning(
224+
"Attempt %d/%d for function invoke failed: %s. Retrying in %.2f seconds...",
225+
attempt,
226+
self.max_retries,
227+
str(aie),
228+
current_delay,
229+
)
230+
231+
# Wait before retrying
232+
await asyncio.sleep(current_delay)
233+
234+
if not wait_time_match:
235+
# Increase delay for next attempt using backoff factor
236+
current_delay *= self.backoff_factor
237+
238+
except Exception as ex:
239+
self.logger.error(
240+
"Retry error: %s. Using default delay.",
241+
ex,
242+
)
216243
current_delay = self.initial_delay
217244

245+
except self.exception_types as e:
246+
attempt += 1
247+
if attempt >= self.max_retries:
248+
self.logger.error(
249+
"Function invoke failed after %d attempts. Final error: %s",
250+
self.max_retries,
251+
str(e),
252+
)
253+
raise
254+
218255
self.logger.warning(
219-
"Attempt %d/%d for function invoke failed: %s. Retrying in %.2f seconds...",
256+
"Attempt %d/%d failed with %s: %s. Retrying in %.2f seconds...",
220257
attempt,
221258
self.max_retries,
222-
str(aie),
259+
type(e).__name__,
260+
str(e),
223261
current_delay,
224262
)
225263

226-
# Wait before retrying
227264
await asyncio.sleep(current_delay)
265+
current_delay *= self.backoff_factor
228266

229-
if not wait_time_match:
230-
# Increase delay for next attempt using backoff factor
231-
current_delay *= self.backoff_factor
232-
233-
except Exception as ex:
234-
self.logger.error(
235-
"Retry error: %s. Using default delay.",
236-
ex,
237-
)
238-
current_delay = self.initial_delay
267+
finally:
268+
# Ensure cleanup happens regardless of success or failure
269+
await self.cleanup()
239270

240-
except self.exception_types as e:
241-
attempt += 1
242-
if attempt >= self.max_retries:
243-
self.logger.error(
244-
"Function invoke failed after %d attempts. Final error: %s",
245-
self.max_retries,
246-
str(e),
247-
)
248-
raise
249-
250-
self.logger.warning(
251-
"Attempt %d/%d failed with %s: %s. Retrying in %.2f seconds...",
252-
attempt,
253-
self.max_retries,
254-
type(e).__name__,
255-
str(e),
256-
current_delay,
257-
)
258-
259-
await asyncio.sleep(current_delay)
260-
current_delay *= self.backoff_factor
261-
262-
async def close(self):
263-
"""Clean up the thread after agent execution is complete."""
271+
async def cleanup(self):
272+
"""Clean up all resources including internal threads."""
273+
try:
274+
if self.group_chat is not None:
275+
self.logger.debug("Cleaning up AgentGroupChat resources...")
276+
277+
# Reset the group chat - this clears conversation state and deletes remote threads
278+
await self.group_chat.reset()
279+
280+
self.logger.debug("AgentGroupChat cleanup completed successfully")
281+
282+
except Exception as e:
283+
self.logger.error("Error during cleanup: %s", str(e))
284+
# Don't re-raise cleanup errors to avoid masking original exceptions
285+
286+
async def __aenter__(self):
287+
"""Context manager entry."""
288+
return self
289+
290+
async def __aexit__(self, exc_type, exc_val, exc_tb):
291+
"""Context manager exit with cleanup."""
292+
await self.cleanup()
293+
294+
def __del__(self):
295+
"""Destructor to ensure cleanup if not explicitly called."""
264296
try:
265-
await self.group_chat.delete()
266-
self.logger.info("Deleted group chat thread successfully.")
267-
except Exception as ex:
268-
self.logger.warning("Failed to delete group chat thread: %s", str(ex))
297+
# Only attempt cleanup if there's an active event loop
298+
loop = asyncio.get_running_loop()
299+
if loop and not loop.is_closed():
300+
# Schedule cleanup as a task
301+
loop.create_task(self.cleanup())
302+
except RuntimeError:
303+
# No event loop running, can't clean up asynchronously
304+
self.logger.warning("No event loop available for cleanup in destructor")
305+
except Exception as e:
306+
self.logger.error("Error in destructor cleanup: %s", str(e))

0 commit comments

Comments
 (0)