|
7 | 7 | import logging |
8 | 8 |
|
9 | 9 | from api.status_updates import send_status_update |
10 | | -from sql_agents.agent_manager import get_sql_agents, update_agent_config |
11 | 10 |
|
12 | 11 | from common.models.api import ( |
13 | 12 | FileProcessUpdate, |
|
24 | 23 | from semantic_kernel.contents import AuthorRole |
25 | 24 | from semantic_kernel.exceptions.service_exceptions import ServiceResponseException |
26 | 25 |
|
| 26 | +from sql_agents.agent_manager import get_sql_agents, update_agent_config |
27 | 27 | from sql_agents.convert_script import convert_script |
28 | 28 | from sql_agents.helpers.models import AgentType |
29 | 29 | from sql_agents.helpers.utils import is_text |
@@ -65,77 +65,77 @@ async def process_batch_async( |
65 | 65 | # Send file to the agents for processing |
66 | 66 | # Send status update to the client of type in progress, completed, or failed |
67 | 67 | for file in batch_files: |
68 | | - # Get the file from blob storage |
| 68 | + # Get the file from blob storage |
| 69 | + try: |
| 70 | + file_record = FileRecord.fromdb(file) |
| 71 | + # Update the file status |
69 | 72 | try: |
70 | | - file_record = FileRecord.fromdb(file) |
71 | | - # Update the file status |
72 | | - try: |
73 | | - file_record.status = ProcessStatus.IN_PROGRESS |
74 | | - await batch_service.update_file_record(file_record) |
75 | | - except Exception as exc: |
76 | | - logger.error("Error updating file status. %s", exc) |
77 | | - |
78 | | - sql_in_file = await storage.get_file(file_record.blob_path) |
79 | | - |
80 | | - # split into base validation routine |
81 | | - # Check if the file is a valid text file <-- |
82 | | - if not is_text(sql_in_file): |
83 | | - logger.error("File is not a valid text file. Skipping.") |
84 | | - # insert data base write to file record stating invalid file |
85 | | - await batch_service.create_file_log( |
86 | | - str(file_record.file_id), |
87 | | - "File is not a valid text file. Skipping.", |
88 | | - "", |
89 | | - LogType.ERROR, |
90 | | - AgentType.ALL, |
91 | | - AuthorRole.ASSISTANT, |
92 | | - ) |
93 | | - # send status update to the client of type failed |
94 | | - send_status_update( |
95 | | - status=FileProcessUpdate( |
96 | | - file_record.batch_id, |
97 | | - file_record.file_id, |
98 | | - ProcessStatus.COMPLETED, |
99 | | - file_result=FileResult.ERROR, |
100 | | - ), |
101 | | - ) |
102 | | - file_record.file_result = FileResult.ERROR |
103 | | - file_record.status = ProcessStatus.COMPLETED |
104 | | - file_record.error_count = 1 |
105 | | - await batch_service.update_file_record(file_record) |
106 | | - continue |
107 | | - else: |
108 | | - logger.info("sql_in_file: %s", sql_in_file) |
109 | | - |
110 | | - # Convert the file |
111 | | - converted_query = await convert_script( |
112 | | - sql_in_file, |
113 | | - file_record, |
114 | | - batch_service, |
115 | | - sql_agents, |
116 | | - ) |
117 | | - if converted_query: |
118 | | - # Add RAI disclaimer to the converted query |
119 | | - converted_query = add_rai_disclaimer(converted_query) |
120 | | - await batch_service.create_candidate( |
121 | | - file["file_id"], converted_query |
122 | | - ) |
123 | | - else: |
124 | | - await batch_service.update_file_counts(file["file_id"]) |
125 | | - except UnicodeDecodeError as ucde: |
126 | | - logger.error("Error decoding file: %s", file) |
127 | | - logger.error("Error decoding file. %s", ucde) |
128 | | - await process_error(ucde, file_record, batch_service) |
129 | | - except ServiceResponseException as sre: |
130 | | - logger.error(file) |
131 | | - logger.error("Error processing file. %s", sre) |
132 | | - # insert data base write to file record stating invalid file |
133 | | - await process_error(sre, file_record, batch_service) |
| 73 | + file_record.status = ProcessStatus.IN_PROGRESS |
| 74 | + await batch_service.update_file_record(file_record) |
134 | 75 | except Exception as exc: |
135 | | - logger.error(file) |
136 | | - logger.error("Error processing file. %s", exc) |
| 76 | + logger.error("Error updating file status. %s", exc) |
| 77 | + |
| 78 | + sql_in_file = await storage.get_file(file_record.blob_path) |
| 79 | + |
| 80 | + # split into base validation routine |
| 81 | + # Check if the file is a valid text file <-- |
| 82 | + if not is_text(sql_in_file): |
| 83 | + logger.error("File is not a valid text file. Skipping.") |
137 | 84 | # insert data base write to file record stating invalid file |
138 | | - await process_error(exc, file_record, batch_service) |
| 85 | + await batch_service.create_file_log( |
| 86 | + str(file_record.file_id), |
| 87 | + "File is not a valid text file. Skipping.", |
| 88 | + "", |
| 89 | + LogType.ERROR, |
| 90 | + AgentType.ALL, |
| 91 | + AuthorRole.ASSISTANT, |
| 92 | + ) |
| 93 | + # send status update to the client of type failed |
| 94 | + send_status_update( |
| 95 | + status=FileProcessUpdate( |
| 96 | + file_record.batch_id, |
| 97 | + file_record.file_id, |
| 98 | + ProcessStatus.COMPLETED, |
| 99 | + file_result=FileResult.ERROR, |
| 100 | + ), |
| 101 | + ) |
| 102 | + file_record.file_result = FileResult.ERROR |
| 103 | + file_record.status = ProcessStatus.COMPLETED |
| 104 | + file_record.error_count = 1 |
| 105 | + await batch_service.update_file_record(file_record) |
| 106 | + continue |
| 107 | + else: |
| 108 | + logger.info("sql_in_file: %s", sql_in_file) |
| 109 | + |
| 110 | + # Convert the file |
| 111 | + converted_query = await convert_script( |
| 112 | + sql_in_file, |
| 113 | + file_record, |
| 114 | + batch_service, |
| 115 | + sql_agents, |
| 116 | + ) |
| 117 | + if converted_query: |
| 118 | + # Add RAI disclaimer to the converted query |
| 119 | + converted_query = add_rai_disclaimer(converted_query) |
| 120 | + await batch_service.create_candidate( |
| 121 | + file["file_id"], converted_query |
| 122 | + ) |
| 123 | + else: |
| 124 | + await batch_service.update_file_counts(file["file_id"]) |
| 125 | + except UnicodeDecodeError as ucde: |
| 126 | + logger.error("Error decoding file: %s", file) |
| 127 | + logger.error("Error decoding file. %s", ucde) |
| 128 | + await process_error(ucde, file_record, batch_service) |
| 129 | + except ServiceResponseException as sre: |
| 130 | + logger.error(file) |
| 131 | + logger.error("Error processing file. %s", sre) |
| 132 | + # insert data base write to file record stating invalid file |
| 133 | + await process_error(sre, file_record, batch_service) |
| 134 | + except Exception as exc: |
| 135 | + logger.error(file) |
| 136 | + logger.error("Error processing file. %s", exc) |
| 137 | + # insert data base write to file record stating invalid file |
| 138 | + await process_error(exc, file_record, batch_service) |
139 | 139 |
|
140 | 140 | # Update batch status to completed or failed |
141 | 141 | try: |
|
0 commit comments