Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
307 changes: 172 additions & 135 deletions src/backend/sql_agents/convert_script.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@
and updates the database with the results.
"""

import asyncio
import json
import logging

Expand Down Expand Up @@ -45,7 +44,12 @@ async def convert_script(
logger.info("Migrating query: %s\n", source_script)

# Setup the group chat for the agents
chat = CommsManager(sql_agents.idx_agents).group_chat
comms_manager = CommsManager(
sql_agents.idx_agents,
max_retries=5, # Retry up to 5 times for rate limits
initial_delay=1.0, # Start with 1 second delay
backoff_factor=2.0, # Double delay each retry
)

# send websocket notification that file processing has started
send_status_update(
Expand All @@ -63,160 +67,194 @@ async def convert_script(
current_migration = "No migration"
is_complete: bool = False
while not is_complete:
await chat.add_chat_message(
await comms_manager.group_chat.add_chat_message(
ChatMessageContent(role=AuthorRole.USER, content=source_script)
)
carry_response = None
async for response in chat.invoke():
# TEMPORARY: awaiting bug fix for rate limits
await asyncio.sleep(5)
carry_response = response
if response.role == AuthorRole.ASSISTANT.value:
# Our process can terminate with either of these as the last response
# before syntax check
match response.name:
case AgentType.MIGRATOR.value:
result = MigratorResponse.model_validate_json(
response.content or ""
)
if result.input_error or result.rai_error:
# If there is an error in input, we end the processing here.
# We do not include this in termination to avoid forking the chat process.
description = {
"role": response.role,
"name": response.name or "*",
"content": response.content,
}
await batch_service.create_file_log(
str(file.file_id),
description,
current_migration,
LogType.ERROR,
AgentType(response.name),
AuthorRole(response.role),
try:

async for response in comms_manager.async_invoke():
carry_response = response
if response.role == AuthorRole.ASSISTANT.value:
# Our process can terminate with either of these as the last response
# before syntax check
match response.name:
case AgentType.MIGRATOR.value:
result = MigratorResponse.model_validate_json(
response.content or ""
)
current_migration = None
break
case AgentType.SYNTAX_CHECKER.value:
result = SyntaxCheckerResponse.model_validate_json(
response.content.lower() or ""
)
# If there are no syntax errors, we can move to the semantic verifier
# We provide both scripts by injecting them into the chat history
if result.syntax_errors == []:
chat.history.add_message(
ChatMessageContent(
role=AuthorRole.USER,
name="candidate",
content=(
f"source_script: {source_script}, \n "
+ f"migrated_script: {current_migration}"
),
if result.input_error or result.rai_error:
# If there is an error in input, we end the processing here.
# We do not include this in termination to avoid forking the chat process.
description = {
"role": response.role,
"name": response.name or "*",
"content": response.content,
}
await batch_service.create_file_log(
str(file.file_id),
description,
current_migration,
LogType.ERROR,
AgentType(response.name),
AuthorRole(response.role),
)
current_migration = None
break
case AgentType.SYNTAX_CHECKER.value:
result = SyntaxCheckerResponse.model_validate_json(
response.content.lower() or ""
)
case AgentType.PICKER.value:
result = PickerResponse.model_validate_json(
response.content or ""
)
current_migration = result.picked_query
case AgentType.FIXER.value:
result = FixerResponse.model_validate_json(
response.content or ""
)
current_migration = result.fixed_query
case AgentType.SEMANTIC_VERIFIER.value:
logger.info(
"Semantic verifier agent response: %s", response.content
)
result = SemanticVerifierResponse.model_validate_json(
response.content or ""
)

# If the semantic verifier agent returns a difference, we need to report it
if len(result.differences) > 0:
description = {
"role": AuthorRole.ASSISTANT.value,
"name": AgentType.SEMANTIC_VERIFIER.value,
"content": "\n".join(result.differences),
}
logger.info(
"Semantic verification had issues. Pass with warnings."
)
# send status update to the client of type in progress with agent status
send_status_update(
status=FileProcessUpdate(
file.batch_id,
file.file_id,
ProcessStatus.COMPLETED,
AgentType.SEMANTIC_VERIFIER,
result.summary,
FileResult.WARNING,
),
# If there are no syntax errors, we can move to the semantic verifier
# We provide both scripts by injecting them into the chat history
if result.syntax_errors == []:
comms_manager.group_chat.history.add_message(
ChatMessageContent(
role=AuthorRole.USER,
name="candidate",
content=(
f"source_script: {source_script}, \n "
+ f"migrated_script: {current_migration}"
),
)
)
case AgentType.PICKER.value:
result = PickerResponse.model_validate_json(
response.content or ""
)
await batch_service.create_file_log(
str(file.file_id),
description,
current_migration,
LogType.WARNING,
AgentType.SEMANTIC_VERIFIER,
AuthorRole.ASSISTANT,
current_migration = result.picked_query
case AgentType.FIXER.value:
result = FixerResponse.model_validate_json(
response.content or ""
)

elif response == "":
# If the semantic verifier agent returns an empty response
current_migration = result.fixed_query
case AgentType.SEMANTIC_VERIFIER.value:
logger.info(
"Semantic verification had no return value. Pass with warnings."
"Semantic verifier agent response: %s", response.content
)
# send status update to the client of type in progress with agent status
send_status_update(
status=FileProcessUpdate(
file.batch_id,
file.file_id,
ProcessStatus.COMPLETED,
result = SemanticVerifierResponse.model_validate_json(
response.content or ""
)

# If the semantic verifier agent returns a difference, we need to report it
if len(result.differences) > 0:
description = {
"role": AuthorRole.ASSISTANT.value,
"name": AgentType.SEMANTIC_VERIFIER.value,
"content": "\n".join(result.differences),
}
logger.info(
"Semantic verification had issues. Pass with warnings."
)
# send status update to the client of type in progress with agent status
send_status_update(
status=FileProcessUpdate(
file.batch_id,
file.file_id,
ProcessStatus.COMPLETED,
AgentType.SEMANTIC_VERIFIER,
result.summary,
FileResult.WARNING,
),
)
await batch_service.create_file_log(
str(file.file_id),
description,
current_migration,
LogType.WARNING,
AgentType.SEMANTIC_VERIFIER,
AuthorRole.ASSISTANT,
)

elif response == "":
# If the semantic verifier agent returns an empty response
logger.info(
"Semantic verification had no return value. Pass with warnings."
)
# send status update to the client of type in progress with agent status
send_status_update(
status=FileProcessUpdate(
file.batch_id,
file.file_id,
ProcessStatus.COMPLETED,
AgentType.SEMANTIC_VERIFIER,
"No return value from semantic verifier agent.",
FileResult.WARNING,
),
)
await batch_service.create_file_log(
str(file.file_id),
"No return value from semantic verifier agent.",
FileResult.WARNING,
),
)
await batch_service.create_file_log(
str(file.file_id),
"No return value from semantic verifier agent.",
current_migration,
LogType.WARNING,
AgentType.SEMANTIC_VERIFIER,
AuthorRole.ASSISTANT,
)
current_migration,
LogType.WARNING,
AgentType.SEMANTIC_VERIFIER,
AuthorRole.ASSISTANT,
)

description = {
"role": response.role,
"name": response.name or "*",
"content": response.content,
}
description = {
"role": response.role,
"name": response.name or "*",
"content": response.content,
}

logger.info(description)
logger.info(description)
try:
parsed_content = json.loads(response.content or "{}")
except json.JSONDecodeError:
logger.warning("Invalid JSON from agent: %s", response.content)
parsed_content = {
"input_summary": "",
"candidates": [],
"summary": "",
"input_error": "",
"rai_error": "Invalid JSON from agent.",
}

# send status update to the client of type in progress with agent status
# Send status update using safe fallback values
send_status_update(
status=FileProcessUpdate(
file.batch_id,
file.file_id,
ProcessStatus.IN_PROGRESS,
AgentType(response.name),
parsed_content.get("summary", ""),
FileResult.INFO,
),
)
# end
await batch_service.create_file_log(
str(file.file_id),
description,
current_migration,
LogType.INFO,
AgentType(response.name),
AuthorRole(response.role),
)
except Exception as e:
logger.error("Error during comms_manager.async_invoke(): %s", str(e))
# Log the error to the batch service for tracking
await batch_service.create_file_log(
str(file.file_id),
f"Critical error during agent communication: {str(e)}",
current_migration,
LogType.ERROR,
AgentType.ALL,
AuthorRole.ASSISTANT,
)
# Send error status update
send_status_update(
status=FileProcessUpdate(
file.batch_id,
file.file_id,
ProcessStatus.IN_PROGRESS,
AgentType(response.name),
json.loads(response.content)["summary"],
FileResult.INFO,
ProcessStatus.COMPLETED,
AgentType.ALL,
f"Processing failed: {str(e)}",
FileResult.ERROR,
),
)
break

await batch_service.create_file_log(
str(file.file_id),
description,
current_migration,
LogType.INFO,
AgentType(response.name),
AuthorRole(response.role),
)

if chat.is_complete:
if comms_manager.group_chat.is_complete:
is_complete = True

break
Expand Down Expand Up @@ -276,7 +314,6 @@ async def validate_migration(
)

logger.error("No migrated query returned. Migration failed.")
# Add needed error or log data to the file record here
return False

# send status update to the client of type completed / success
Expand Down
Loading