Skip to content

Commit 63165a7

Browse files
author
Shreyas-Microsoft
committed
removed async implementation
1 parent b1e109a commit 63165a7

2 files changed

Lines changed: 95 additions & 113 deletions

File tree

src/backend/sql_agents/convert_script.py

Lines changed: 12 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -44,24 +44,24 @@ async def convert_script(
4444
logger.info("Migrating query: %s\n", source_script)
4545

4646
# Setup the group chat for the agents
47-
async with CommsManager(
47+
comms_manager = CommsManager(
4848
sql_agents.idx_agents,
4949
max_retries=5,
5050
initial_delay=1.0,
5151
backoff_factor=2.0,
52-
) as comms_manager:
52+
)
5353

5454
# send websocket notification that file processing has started
55-
send_status_update(
56-
status=FileProcessUpdate(
57-
file.batch_id,
58-
file.file_id,
59-
ProcessStatus.IN_PROGRESS,
60-
AgentType.ALL,
61-
"File processing started",
62-
file_result=FileResult.INFO,
63-
),
64-
)
55+
send_status_update(
56+
status=FileProcessUpdate(
57+
file.batch_id,
58+
file.file_id,
59+
ProcessStatus.IN_PROGRESS,
60+
AgentType.ALL,
61+
"File processing started",
62+
file_result=FileResult.INFO,
63+
),
64+
)
6565

6666
# orchestrate the chat
6767
current_migration = "No migration"

src/backend/sql_agents/helpers/comms_manager.py

Lines changed: 83 additions & 101 deletions
Original file line numberDiff line numberDiff line change
@@ -156,117 +156,108 @@ def __init__(
156156

157157
async def invoke_async(self):
158158
"""Invoke the group chat with the given agents (original method maintained for compatibility)."""
159-
try:
160-
return self.group_chat.invoke()
161-
finally:
162-
# Ensure cleanup happens even if invoke fails
163-
await self.cleanup()
159+
return self.group_chat.invoke()
164160

165161
async def async_invoke(self) -> AsyncIterable[ChatMessageContent]:
166162
"""Invoke the group chat with retry logic and error handling."""
167163
attempt = 0
168164
current_delay = self.initial_delay
169165

170-
try:
171-
while attempt < self.max_retries:
172-
try:
173-
# Grab a snapshot of the history of the group chat
174-
# Using "SHALLOW" copy to avoid getting a reference to the original list
175-
history_snap = copy.copy(self.group_chat.history)
176-
177-
self.logger.debug(
178-
"History before invoke: %s",
179-
[msg.name for msg in self.group_chat.history],
166+
while attempt < self.max_retries:
167+
try:
168+
# Grab a snapshot of the history of the group chat
169+
# Using "SHALLOW" copy to avoid getting a reference to the original list
170+
history_snap = copy.copy(self.group_chat.history)
171+
172+
self.logger.debug(
173+
"History before invoke: %s",
174+
[msg.name for msg in self.group_chat.history],
175+
)
176+
177+
# Get a fresh iterator from the function
178+
async_iter = self.group_chat.invoke()
179+
180+
# If simple truncation is set, truncate the history
181+
if (
182+
self.simple_truncation
183+
and len(self.group_chat.history) > self.simple_truncation
184+
):
185+
# Truncate the history to the last n messages
186+
self.group_chat.history = history_snap[-self.simple_truncation :]
187+
188+
# Yield each item from the iterator
189+
async for item in async_iter:
190+
yield item
191+
192+
# If we get here without exception, we're done
193+
break
194+
195+
except AgentInvokeException as aie:
196+
attempt += 1
197+
if attempt >= self.max_retries:
198+
self.logger.error(
199+
"Function invoke failed after %d attempts. Final error: %s. Consider increasing the models rate limit.",
200+
self.max_retries,
201+
str(aie),
180202
)
203+
# Re-raise the last exception if all retries failed
204+
raise
181205

182-
# Get a fresh iterator from the function
183-
async_iter = self.group_chat.invoke()
206+
# Return history state for retry
207+
self.group_chat.history = history_snap
184208

185-
# If simple truncation is set, truncate the history
186-
if (
187-
self.simple_truncation
188-
and len(self.group_chat.history) > self.simple_truncation
189-
):
190-
# Truncate the history to the last n messages
191-
self.group_chat.history = history_snap[-self.simple_truncation :]
192-
193-
# Yield each item from the iterator
194-
async for item in async_iter:
195-
yield item
196-
197-
# If we get here without exception, we're done
198-
break
199-
200-
except AgentInvokeException as aie:
201-
attempt += 1
202-
if attempt >= self.max_retries:
203-
self.logger.error(
204-
"Function invoke failed after %d attempts. Final error: %s. Consider increasing the models rate limit.",
205-
self.max_retries,
206-
str(aie),
207-
)
208-
# Re-raise the last exception if all retries failed
209-
raise
210-
211-
# Return history state for retry
212-
self.group_chat.history = history_snap
213-
214-
try:
215-
# Try to extract wait time from error message
216-
wait_time_match = re.search(self._EXTRACT_WAIT_TIME, str(aie))
217-
if wait_time_match:
218-
# If regex is found, set the delay to the value in seconds
219-
current_delay = int(wait_time_match.group(1))
220-
else:
221-
current_delay = self.initial_delay
222-
223-
self.logger.warning(
224-
"Attempt %d/%d for function invoke failed: %s. Retrying in %.2f seconds...",
225-
attempt,
226-
self.max_retries,
227-
str(aie),
228-
current_delay,
229-
)
230-
231-
# Wait before retrying
232-
await asyncio.sleep(current_delay)
233-
234-
if not wait_time_match:
235-
# Increase delay for next attempt using backoff factor
236-
current_delay *= self.backoff_factor
237-
238-
except Exception as ex:
239-
self.logger.error(
240-
"Retry error: %s. Using default delay.",
241-
ex,
242-
)
209+
try:
210+
# Try to extract wait time from error message
211+
wait_time_match = re.search(self._EXTRACT_WAIT_TIME, str(aie))
212+
if wait_time_match:
213+
# If regex is found, set the delay to the value in seconds
214+
current_delay = int(wait_time_match.group(1))
215+
else:
243216
current_delay = self.initial_delay
244217

245-
except self.exception_types as e:
246-
attempt += 1
247-
if attempt >= self.max_retries:
248-
self.logger.error(
249-
"Function invoke failed after %d attempts. Final error: %s",
250-
self.max_retries,
251-
str(e),
252-
)
253-
raise
254-
255218
self.logger.warning(
256-
"Attempt %d/%d failed with %s: %s. Retrying in %.2f seconds...",
219+
"Attempt %d/%d for function invoke failed: %s. Retrying in %.2f seconds...",
257220
attempt,
258221
self.max_retries,
259-
type(e).__name__,
260-
str(e),
222+
str(aie),
261223
current_delay,
262224
)
263225

226+
# Wait before retrying
264227
await asyncio.sleep(current_delay)
265-
current_delay *= self.backoff_factor
266228

267-
finally:
268-
# Ensure cleanup happens regardless of success or failure
269-
await self.cleanup()
229+
if not wait_time_match:
230+
# Increase delay for next attempt using backoff factor
231+
current_delay *= self.backoff_factor
232+
233+
except Exception as ex:
234+
self.logger.error(
235+
"Retry error: %s. Using default delay.",
236+
ex,
237+
)
238+
current_delay = self.initial_delay
239+
240+
except self.exception_types as e:
241+
attempt += 1
242+
if attempt >= self.max_retries:
243+
self.logger.error(
244+
"Function invoke failed after %d attempts. Final error: %s",
245+
self.max_retries,
246+
str(e),
247+
)
248+
raise
249+
250+
self.logger.warning(
251+
"Attempt %d/%d failed with %s: %s. Retrying in %.2f seconds...",
252+
attempt,
253+
self.max_retries,
254+
type(e).__name__,
255+
str(e),
256+
current_delay,
257+
)
258+
259+
await asyncio.sleep(current_delay)
260+
current_delay *= self.backoff_factor
270261

271262
async def cleanup(self):
272263
"""Clean up all resources including internal threads."""
@@ -281,15 +272,6 @@ async def cleanup(self):
281272

282273
except Exception as e:
283274
self.logger.error("Error during cleanup: %s", str(e))
284-
# Don't re-raise cleanup errors to avoid masking original exceptions
285-
286-
async def __aenter__(self):
287-
"""Context manager entry."""
288-
return self
289-
290-
async def __aexit__(self, exc_type, exc_val, exc_tb):
291-
"""Context manager exit with cleanup."""
292-
await self.cleanup()
293275

294276
def __del__(self):
295277
"""Destructor to ensure cleanup if not explicitly called."""

0 commit comments

Comments
 (0)