Skip to content

Commit a3da8ff

Browse files
author
Shreyas-Microsoft
committed
Final retry
1 parent aa101e5 commit a3da8ff

2 files changed

Lines changed: 77 additions & 181 deletions

File tree

src/backend/sql_agents/convert_script.py

Lines changed: 7 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
"""This module loops through each file in a batch and processes it using the SQL agents.More actions
1+
"""This module loops through each file in a batch and processes it using the SQL agents.
22
It sets up a group chat for the agents, sends the source script to the chat, and processes
33
the responses from the agents. It also reports in real-time to the client using websockets
44
and updates the database with the results.
@@ -49,9 +49,9 @@ async def convert_script(
4949
#retry logic comms manager
5050
comms_manager = CommsManager(
5151
sql_agents.idx_agents,
52-
max_retries=3, # Retry up to 5 times for rate limits
53-
initial_delay=0.2, # Start with 1 second delay
54-
backoff_factor=1.2, # Double delay each retry
52+
max_retries=5, # Retry up to 5 times for rate limits
53+
initial_delay=1.0, # Start with 1 second delay
54+
backoff_factor=2.0, # Double delay each retry
5555
)
5656

5757
# send websocket notification that file processing has started
@@ -354,4 +354,6 @@ async def validate_migration(
354354
log_type=LogType.SUCCESS,
355355
agent_type=AgentType.ALL,
356356
author_role=AuthorRole.ASSISTANT,
357-
)
357+
)
358+
359+
return True

src/backend/sql_agents/helpers/comms_manager.py

Lines changed: 70 additions & 176 deletions
Original file line numberDiff line numberDiff line change
@@ -27,15 +27,6 @@ class CommsManager:
2727
# regex to extract the recommended wait time in seconds from response
2828
_EXTRACT_WAIT_TIME = r"in (\d+) seconds"
2929

30-
# Rate limit error indicators
31-
_RATE_LIMIT_INDICATORS = [
32-
"rate limit",
33-
"too many requests",
34-
"quota exceeded",
35-
"throttled",
36-
"429",
37-
]
38-
3930
group_chat: AgentGroupChat = None
4031

4132
class SelectionStrategy(SequentialSelectionStrategy):
@@ -128,8 +119,8 @@ def __init__(
128119
agent_dict,
129120
exception_types: tuple = (Exception,),
130121
max_retries: int = 10,
131-
initial_delay: float = 0.5,
132-
backoff_factor: float = 1.5,
122+
initial_delay: float = 1.0,
123+
backoff_factor: float = 2.0,
133124
simple_truncation: int = None,
134125
):
135126
"""Initialize the CommsManager and agent_chat with the given agents.
@@ -138,11 +129,18 @@ def __init__(
138129
agent_dict: Dictionary of agents
139130
exception_types: Tuple of exception types that should trigger a retry
140131
max_retries: Maximum number of retry attempts (default: 10)
141-
initial_delay: Initial delay in seconds before first retry (default: 0.5)
142-
backoff_factor: Factor by which the delay increases with each retry (default: 1.5)
132+
initial_delay: Initial delay in seconds before first retry (default: 1.0)
133+
backoff_factor: Factor by which the delay increases with each retry (default: 2.0)
143134
simple_truncation: Optional truncation limit for chat history
144135
"""
145-
# Initialize the group chat (exactly like original)
136+
# Store retry configuration
137+
self.max_retries = max_retries
138+
self.initial_delay = initial_delay
139+
self.backoff_factor = backoff_factor
140+
self.exception_types = exception_types
141+
self.simple_truncation = simple_truncation
142+
143+
# Initialize the group chat (maintaining original functionality)
146144
self.group_chat = AgentGroupChat(
147145
agents=agent_dict.values(),
148146
termination_strategy=self.ApprovalTerminationStrategy(
@@ -156,130 +154,99 @@ def __init__(
156154
selection_strategy=self.SelectionStrategy(agents=agent_dict.values()),
157155
)
158156

159-
# Store retry configuration
160-
self.max_retries = max_retries
161-
self.initial_delay = initial_delay
162-
self.backoff_factor = backoff_factor
163-
self.exception_types = exception_types
164-
self.simple_truncation = simple_truncation
165-
166-
# Adaptive retry state - starts optimistic
167-
self._rate_limit_detected_recently = False
168-
self._consecutive_successes = 0
169-
self._session_has_rate_limits = False
170-
171-
def _is_rate_limit_error(self, error_message: str) -> bool:
172-
"""Check if the error message indicates a rate limit issue."""
173-
error_lower = error_message.lower()
174-
return any(indicator in error_lower for indicator in self._RATE_LIMIT_INDICATORS)
175-
176-
def _should_use_zero_overhead_path(self) -> bool:
177-
"""
178-
Determine if we should use zero-overhead path.
179-
180-
Use zero overhead when:
181-
- No rate limits detected in current session AND
182-
- We have some successful calls OR this is the first call
183-
"""
184-
return (
185-
not self._session_has_rate_limits
186-
and (self._consecutive_successes >= 1 or self._consecutive_successes == 0)
187-
188-
)
189-
190-
async def _zero_overhead_invoke(self) -> AsyncIterable[ChatMessageContent]:
191-
"""Pure delegation to original group_chat.invoke() - zero overhead."""
192-
async for item in self.group_chat.invoke():
193-
yield item
157+
async def invoke_async(self):
158+
"""Invoke the group chat with the given agents (original method maintained for compatibility)."""
159+
return self.group_chat.invoke()
194160

195-
async def _retry_enabled_invoke(self) -> AsyncIterable[ChatMessageContent]:
196-
"""Invoke with retry logic - only used when rate limits are expected."""
161+
async def async_invoke(self) -> AsyncIterable[ChatMessageContent]:
162+
"""Invoke the group chat with retry logic and error handling."""
197163
attempt = 0
198164
current_delay = self.initial_delay
199165

200-
# Create history snapshot only when we need it
201-
history_snapshot = None
202-
203166
while attempt < self.max_retries:
204167
try:
205-
# Apply truncation if configured and on first attempt
168+
# Grab a snapshot of the history of the group chat
169+
# Using copy to avoid getting a reference to the original list
170+
history_snap = copy.deepcopy(self.group_chat.history)
171+
172+
self.logger.debug(
173+
"History before invoke: %s",
174+
[msg.name for msg in self.group_chat.history],
175+
)
176+
177+
# Get a fresh iterator from the function
178+
async_iter = self.group_chat.invoke()
179+
180+
# If simple truncation is set, truncate the history
206181
if (
207-
attempt == 0
208-
and self.simple_truncation
182+
self.simple_truncation
209183
and len(self.group_chat.history) > self.simple_truncation
210184
):
211-
if history_snapshot is None:
212-
history_snapshot = copy.deepcopy(self.group_chat.history)
213-
self.group_chat.history = history_snapshot[-self.simple_truncation:]
214-
185+
# Truncate the history to the last n messages
186+
self.group_chat.history = history_snap[-self.simple_truncation :]
215187

216-
# Execute and yield results
217-
async for item in self.group_chat.invoke():
188+
# Yield each item from the iterator
189+
async for item in async_iter:
218190
yield item
219191

220-
# Success - exit retry loop
221-
return
192+
# If we get here without exception, we're done
193+
break
222194

223195
except AgentInvokeException as aie:
224-
# Create snapshot only when we actually need to retry
225-
if history_snapshot is None:
226-
history_snapshot = copy.deepcopy(self.group_chat.history)
227-
228196
attempt += 1
229197
if attempt >= self.max_retries:
230198
self.logger.error(
231-
"AgentInvokeException: Max retries (%d) exceeded. Final error: %s",
199+
"Function invoke failed after %d attempts. Final error: %s. Consider increasing the models rate limit.",
232200
self.max_retries,
233201
str(aie),
234202
)
203+
# Re-raise the last exception if all retries failed
235204
raise
236205

237-
# Restore history from snapshot
238-
self.group_chat.history = copy.deepcopy(history_snapshot)
239-
240-
# Check for rate limit specific wait time
241-
wait_time_match = re.search(self._EXTRACT_WAIT_TIME, str(aie))
242-
if wait_time_match:
243-
current_delay = int(wait_time_match.group(1))
244-
self.logger.info(
245-
"Rate limit detected, waiting %d seconds as requested",
246-
current_delay
206+
# Return history state for retry
207+
self.group_chat.history = history_snap
208+
209+
try:
210+
# Try to extract wait time from error message
211+
wait_time_match = re.search(self._EXTRACT_WAIT_TIME, str(aie))
212+
if wait_time_match:
213+
# If regex is found, set the delay to the value in seconds
214+
current_delay = int(wait_time_match.group(1))
215+
else:
216+
current_delay = self.initial_delay
217+
218+
self.logger.warning(
219+
"Attempt %d/%d for function invoke failed: %s. Retrying in %.2f seconds...",
220+
attempt,
221+
self.max_retries,
222+
str(aie),
223+
current_delay,
247224
)
248-
else:
249-
current_delay = self.initial_delay * (self.backoff_factor ** (attempt - 1))
250-
251-
self.logger.warning(
252-
"Attempt %d/%d failed with AgentInvokeException: %s. Retrying in %.2f seconds...",
253-
attempt,
254-
self.max_retries,
255-
str(aie),
256-
current_delay,
257-
)
258-
259-
await asyncio.sleep(current_delay)
260225

226+
# Wait before retrying
227+
await asyncio.sleep(current_delay)
261228

229+
if not wait_time_match:
230+
# Increase delay for next attempt using backoff factor
231+
current_delay *= self.backoff_factor
262232

233+
except Exception as ex:
234+
self.logger.error(
235+
"Retry error: %s. Using default delay.",
236+
ex,
237+
)
238+
current_delay = self.initial_delay
263239

264240
except self.exception_types as e:
265-
if history_snapshot is None:
266-
history_snapshot = copy.deepcopy(self.group_chat.history)
267-
268-
269241
attempt += 1
270242
if attempt >= self.max_retries:
271243
self.logger.error(
272-
"Generic exception: Max retries (%d) exceeded. Final error: %s",
244+
"Function invoke failed after %d attempts. Final error: %s",
273245
self.max_retries,
274246
str(e),
275247
)
276248
raise
277249

278-
# Restore history from snapshot
279-
self.group_chat.history = copy.deepcopy(history_snapshot)
280-
281-
current_delay = self.initial_delay * (self.backoff_factor ** (attempt - 1))
282-
283250
self.logger.warning(
284251
"Attempt %d/%d failed with %s: %s. Retrying in %.2f seconds...",
285252
attempt,
@@ -290,77 +257,4 @@ async def _retry_enabled_invoke(self) -> AsyncIterable[ChatMessageContent]:
290257
)
291258

292259
await asyncio.sleep(current_delay)
293-
294-
295-
async def async_invoke(self) -> AsyncIterable[ChatMessageContent]:
296-
"""
297-
Optimized invoke method that dynamically chooses between zero-overhead and retry modes.
298-
299-
Performance targets:
300-
- 200k tokens: 1.2 mins (zero overhead when no rate limits expected)
301-
- 30k-50k tokens: 1.8-2 mins (retry overhead only when needed)
302-
"""
303-
304-
# Decide which path to take
305-
use_zero_overhead = self._should_use_zero_overhead_path()
306-
307-
if use_zero_overhead:
308-
# Zero overhead path - matches original performance exactly
309-
try:
310-
async for item in self._zero_overhead_invoke():
311-
yield item
312-
313-
# Track success
314-
self._consecutive_successes += 1
315-
return
316-
317-
except (AgentInvokeException, *self.exception_types) as e:
318-
# Check if this is a rate limit error
319-
error_str = str(e)
320-
if self._is_rate_limit_error(error_str):
321-
self.logger.info(
322-
"Rate limit detected on zero-overhead path, switching to retry mode for this session"
323-
)
324-
self._session_has_rate_limits = True
325-
self._rate_limit_detected_recently = True
326-
# Fall through to retry logic below
327-
else:
328-
# Non-rate-limit error, re-raise immediately (fail fast)
329-
self.logger.error("Non-rate-limit error in zero-overhead path: %s", error_str)
330-
raise
331-
332-
# Retry-enabled path - used when rate limits are expected or detected
333-
try:
334-
async for item in self._retry_enabled_invoke():
335-
yield item
336-
337-
# Track success
338-
self._consecutive_successes += 1
339-
340-
# Gradually become more optimistic about rate limits
341-
if self._consecutive_successes >= 5:
342-
self._rate_limit_detected_recently = False
343-
# Note: We keep _session_has_rate_limits = True to remember for this session
344-
345-
346-
347-
348-
except Exception as e:
349-
# Reset success counter on failure
350-
self._consecutive_successes = 0
351-
self._rate_limit_detected_recently = True
352-
raise
353-
354-
async def invoke_async(self):
355-
"""Legacy method - maintained for compatibility."""
356-
return self.group_chat.invoke()
357-
358-
def reset_rate_limit_state(self):
359-
"""
360-
Reset rate limit detection state - call this between different processing sessions
361-
if you want to reset the adaptive behavior.
362-
"""
363-
self._rate_limit_detected_recently = False
364-
self._consecutive_successes = 0
365-
self._session_has_rate_limits = False
366-
self.logger.info("Rate limit detection state reset")
260+
current_delay *= self.backoff_factor

0 commit comments

Comments
 (0)