diff --git a/src/backend/api/api_routes.py b/src/backend/api/api_routes.py index 7d2ae469..879bb9c4 100644 --- a/src/backend/api/api_routes.py +++ b/src/backend/api/api_routes.py @@ -1,21 +1,14 @@ """FastAPI API routes for file processing and conversion.""" -# Standard library import asyncio import io -import logging -import os import zipfile from typing import Optional -# Local application + from api.auth.auth_utils import get_authenticated_user -from api.event_utils import track_event_if_configured from api.status_updates import app_connection_manager, close_connection -# Third-party -from azure.monitor.opentelemetry import configure_azure_monitor - from common.logger.app_logger import AppLogger from common.services.batch_service import BatchService @@ -31,49 +24,17 @@ ) from fastapi.responses import Response -from opentelemetry import trace -from opentelemetry.trace import Status, StatusCode - from sql_agents.process_batch import process_batch_async router = APIRouter() logger = AppLogger("APIRoutes") -# Check if the Application Insights Instrumentation Key is set in the environment variables -instrumentation_key = os.getenv("APPLICATIONINSIGHTS_CONNECTION_STRING") -if instrumentation_key: - # Configure Application Insights if the Instrumentation Key is found - configure_azure_monitor(connection_string=instrumentation_key) - logging.info( - "Application Insights configured with the provided Instrumentation Key" - ) -else: - # Log a warning if the Instrumentation Key is not found - logging.warning( - "No Application Insights Instrumentation Key found. Skipping configuration" - ) - -# Configure logging -logging.basicConfig(level=logging.INFO) - -# Suppress INFO logs from 'azure.core.pipeline.policies.http_logging_policy' -logging.getLogger("azure.core.pipeline.policies.http_logging_policy").setLevel( - logging.WARNING -) -logging.getLogger("azure.identity.aio._internal").setLevel(logging.WARNING) - -# Suppress info logs from OpenTelemetry exporter -logging.getLogger("azure.monitor.opentelemetry.exporter.export._base").setLevel( - logging.WARNING -) - # start processing the batch @router.post("/start-processing") async def start_processing(request: Request): """ Start processing files for a given batch. - --- tags: - File Processing @@ -93,7 +54,6 @@ async def start_processing(request: Request): responses: 200: description: Processing initiated successfully - content: application/json: schema: @@ -105,7 +65,6 @@ async def start_processing(request: Request): type: string 400: description: Invalid processing request - 500: description: Internal server error """ @@ -115,27 +74,16 @@ async def start_processing(request: Request): translate_from = payload.get("translate_from") translate_to = payload.get("translate_to") - track_event_if_configured( - "ProcessingStart", - {"batch_id": batch_id, "from": translate_from, "to": translate_to}, - ) - await process_batch_async( batch_id=batch_id, convert_from=translate_from, convert_to=translate_to ) - await close_connection(batch_id) - return { "batch_id": batch_id, "status": "Processing completed", "message": "Files processed", } except Exception as e: - span = trace.get_current_span() - if span is not None: - span.record_exception(e) - span.set_status(Status(StatusCode.ERROR, str(e))) raise HTTPException(status_code=500, detail=str(e)) from e @@ -148,7 +96,6 @@ async def start_processing(request: Request): async def download_files(batch_id: str): """ Download files as ZIP. - --- tags: - File Download @@ -182,7 +129,6 @@ async def download_files(batch_id: str): file_data = await batch_service.get_batch_for_zip(batch_id) if not file_data: - track_event_if_configured("DownloadBatchNotFound", {"batch_id": batch_id}) raise HTTPException(status_code=404, detail="Batch not found") # Create an in-memory bytes buffer for the zip file @@ -203,14 +149,11 @@ async def download_files(batch_id: str): file_name = file_name.replace("/", "_").replace("\\", "_") # Write the file into the zip archive with its content zf.writestr(file_name, content) - # Reset the stream's position to the beginning zip_stream.seek(0) zip_data = zip_stream.getvalue() - # Get the size of the zip file for the Content-Length header zip_size = len(zip_data) - # Prepare headers for file download headers = { "Content-Disposition": "attachment; filename=tsql_relts.zip", @@ -218,27 +161,17 @@ async def download_files(batch_id: str): } # Return the zip file as a streaming response - track_event_if_configured( - "DownloadZipSuccess", {"batch_id": batch_id, "file_count": len(file_data)} - ) return Response(zip_data, media_type="application/zip", headers=headers) except Exception as e: - span = trace.get_current_span() - if span is not None: - span.record_exception(e) - span.set_status(Status(StatusCode.ERROR, str(e))) raise HTTPException( status_code=404, detail=f"Error creating ZIP file: {str(e)}" ) from e - - @router.websocket("/socket/{batch_id}") async def batch_status_updates( websocket: WebSocket, batch_id: str ): # , request: Request): """ Web-Socket endpoint for real-time batch status updates. - --- tags: - Batch Status @@ -281,11 +214,9 @@ async def batch_status_updates( try: batch_service = BatchService() await batch_service.initialize_database() - # Validate batch_id format if not batch_service.is_valid_uuid(batch_id): await websocket.close(code=4002, reason="Invalid batch_id format") - track_event_if_configured("WebSocketInvalidBatchId", {"batch_id": batch_id}) return # Accept WebSocket connection @@ -293,7 +224,6 @@ async def batch_status_updates( # Add to the connection manager for backend updates app_connection_manager.add_connection(batch_id, websocket) - track_event_if_configured("WebSocketConnectionAccepted", {"batch_id": batch_id}) # Keep the connection open - FastAPI will close the connection if this returns while True: @@ -305,17 +235,10 @@ async def batch_status_updates( pass except WebSocketDisconnect: - track_event_if_configured("WebSocketDisconnect", {"batch_id": batch_id}) logger.info(f"Client disconnected from batch {batch_id}") await close_connection(batch_id) except Exception as e: - span = trace.get_current_span() - if span is not None: - span.record_exception(e) - span.set_status(Status(StatusCode.ERROR, str(e))) - track_event_if_configured( - "WebSocketError", {"batch_id": batch_id, "error": str(e)} - ) + logger.error("Error in WebSocket connection", error=str(e)) await close_connection(batch_id) @@ -323,7 +246,6 @@ async def batch_status_updates( async def get_batch_status(request: Request, batch_id: str): """ Retrieve batch history and file statuses. - --- tags: - Batch History @@ -420,87 +342,51 @@ async def get_batch_status(request: Request, batch_id: str): authenticated_user = get_authenticated_user(request) user_id = authenticated_user.user_principal_id if not user_id: - track_event_if_configured( - "UserIdNotFound", {"status_code": 400, "detail": "no user"} - ) raise HTTPException(status_code=401, detail="User not authenticated") # Validate batch_id format if not batch_service.is_valid_uuid(batch_id): - track_event_if_configured("InvalidBatchId", {"batch_id": batch_id}) raise HTTPException(status_code=400, detail="Invalid batch_id format") # Fetch batch details batch_data = await batch_service.get_batch(batch_id, user_id) if not batch_data: - track_event_if_configured( - "BatchNotFound", {"batch_id": batch_id, "user_id": user_id} - ) raise HTTPException(status_code=404, detail="Batch not found") - track_event_if_configured( - "BatchStoryRetrieved", {"batch_id": batch_id, "user_id": user_id} - ) return batch_data except HTTPException as e: - span = trace.get_current_span() - if span is not None: - span.record_exception(e) - span.set_status(Status(StatusCode.ERROR, str(e))) raise e except Exception as e: logger.error("Error retrieving batch history", error=str(e)) - span = trace.get_current_span() - if span is not None: - span.record_exception(e) - span.set_status(Status(StatusCode.ERROR, str(e))) error_message = str(e) if "403" in error_message: raise HTTPException(status_code=403, detail="Incorrect user_id") from e else: raise HTTPException(status_code=500, detail="Internal server error") from e - - @router.get("/batch-summary/{batch_id}") async def get_batch_summary(request: Request, batch_id: str): """Retrieve batch summary for a given batch ID.""" try: batch_service = BatchService() await batch_service.initialize_database() - # Authenticate user authenticated_user = get_authenticated_user(request) user_id = authenticated_user.user_principal_id if not user_id: - track_event_if_configured( - "UserIdNotFound", {"status_code": 400, "detail": "no user"} - ) raise HTTPException(status_code=401, detail="User not authenticated") + # Retrieve batch summary batch_summary = await batch_service.get_batch_summary(batch_id, user_id) if not batch_summary: - track_event_if_configured( - "BatchSummaryNotFound", {"batch_id": batch_id, "user_id": user_id} - ) raise HTTPException(status_code=404, detail="No batch summary found.") - track_event_if_configured( - "BatchSummaryRetrieved", {"batch_id": batch_id, "user_id": user_id} - ) + return batch_summary except HTTPException as e: logger.error("Error fetching batch summary", error=str(e)) - span = trace.get_current_span() - if span is not None: - span.record_exception(e) - span.set_status(Status(StatusCode.ERROR, str(e))) raise e except Exception as e: logger.error("Error fetching batch summary", error=str(e)) - span = trace.get_current_span() - if span is not None: - span.record_exception(e) - span.set_status(Status(StatusCode.ERROR, str(e))) raise HTTPException(status_code=404, detail="Batch not found") from e @@ -510,7 +396,6 @@ async def upload_file( ): """ Upload file for conversion. - --- tags: - File Conversion @@ -599,15 +484,11 @@ async def upload_file( user_id = authenticated_user.user_principal_id if not user_id: - track_event_if_configured( - "UserIdNotFound", {"status_code": 400, "detail": "no user"} - ) raise HTTPException(status_code=401, detail="User not authenticated") # Validate batch_id format logger.info(f"batch_id: {batch_id}") if not batch_service.is_valid_uuid(batch_id): - track_event_if_configured("InvalidBatchId", {"batch_id": batch_id}) raise HTTPException(status_code=400, detail="Invalid batch_id format") # Upload file via BatchService @@ -615,27 +496,11 @@ async def upload_file( batch_id, user_id, file ) - event_data = { - "batch_id": batch_id, - "user_id": user_id, - } - - # Avoid using 'filename' as it's reserved by the logging module - if hasattr(file, "filename") and file.filename: - event_data["uploaded_filename"] = file.filename - - track_event_if_configured("FileUploaded", event_data) - return upload_result except HTTPException as e: raise e except Exception as e: - span = trace.get_current_span() - if span is not None: - span.record_exception(e) - span.set_status(Status(StatusCode.ERROR, str(e))) - raise HTTPException(status_code=500, detail="Internal server error") from e @@ -643,7 +508,6 @@ async def upload_file( async def get_file_details(request: Request, file_id: str): """ Retrieve file details and processing logs. - --- tags: - File Management @@ -736,37 +600,23 @@ async def get_file_details(request: Request, file_id: str): authenticated_user = get_authenticated_user(request) user_id = authenticated_user.user_principal_id if not user_id: - track_event_if_configured( - "UserIdNotFound", {"endpoint": "get_file_details"} - ) raise HTTPException(status_code=401, detail="User not authenticated") # Validate file_id format if not batch_service.is_valid_uuid(file_id): - track_event_if_configured("InvalidFileId", {"file_id": file_id}) raise HTTPException(status_code=400, detail="Invalid file_id format") # Fetch file details file_data = await batch_service.get_file_report(file_id) if not file_data: - track_event_if_configured("FileNotFound", {"file_id": file_id}) raise HTTPException(status_code=404, detail="File not found") - track_event_if_configured("FileDetailsRetrieved", {"file_id": file_id}) return file_data except HTTPException as e: - span = trace.get_current_span() - if span is not None: - span.record_exception(e) - span.set_status(Status(StatusCode.ERROR, str(e))) raise e except Exception as e: logger.error("Error retrieving file details", error=str(e)) - span = trace.get_current_span() - if span is not None: - span.record_exception(e) - span.set_status(Status(StatusCode.ERROR, str(e))) raise HTTPException(status_code=500, detail="Internal server error") from e @@ -774,7 +624,6 @@ async def get_file_details(request: Request, file_id: str): async def delete_batch_details(request: Request, batch_id: str): """ Delete batch history using batch_id. - --- tags: - Batch Delete @@ -805,37 +654,23 @@ async def delete_batch_details(request: Request, batch_id: str): authenticated_user = get_authenticated_user(request) user_id = authenticated_user.user_principal_id if not user_id: - track_event_if_configured( - "UserIdNotFound", {"endpoint": "delete_batch_details"} - ) raise HTTPException(status_code=401, detail="User not authenticated") # Validate file_id format if not batch_service.is_valid_uuid(batch_id): - track_event_if_configured("InvalidBatchId", {"batch_id": batch_id}) raise HTTPException( status_code=400, detail=f"Invalid batch_id format: {batch_id}" ) await batch_service.delete_batch_and_files(batch_id, user_id) - track_event_if_configured( - "BatchDeleted", {"batch_id": batch_id, "user_id": user_id} - ) + logger.info(f"Batch deleted successfully: {batch_id}") return {"message": "Batch deleted successfully"} except HTTPException as e: - span = trace.get_current_span() - if span is not None: - span.record_exception(e) - span.set_status(Status(StatusCode.ERROR, str(e))) raise e except Exception as e: logger.error("Failed to delete batch from database", error=str(e)) - span = trace.get_current_span() - if span is not None: - span.record_exception(e) - span.set_status(Status(StatusCode.ERROR, str(e))) raise HTTPException(status_code=500, detail="Database connection error") from e @@ -843,7 +678,6 @@ async def delete_batch_details(request: Request, batch_id: str): async def delete_file_details(request: Request, file_id: str): """ Delete file history using batch_id. - --- tags: - File Delete @@ -874,14 +708,10 @@ async def delete_file_details(request: Request, file_id: str): authenticated_user = get_authenticated_user(request) user_id = authenticated_user.user_principal_id if not user_id: - track_event_if_configured( - "UserIdNotFound", {"endpoint": "delete_file_details"} - ) raise HTTPException(status_code=401, detail="User not authenticated") # Validate file_id format if not batch_service.is_valid_uuid(file_id): - track_event_if_configured("InvalidFileId", {"file_id": file_id}) raise HTTPException( status_code=400, detail=f"Invalid file_id format: {file_id}" ) @@ -889,33 +719,21 @@ async def delete_file_details(request: Request, file_id: str): # Delete file file_delete = await batch_service.delete_file(file_id, user_id) if file_delete is None: - track_event_if_configured("FileDeleteNotFound", {"file_id": file_id}) raise HTTPException(status_code=404, detail=f"File not found: {file_id}") logger.info(f"File deleted successfully: {file_delete}") - track_event_if_configured("FileDeleted", {"file_id": file_id}) return {"message": "File deleted successfully"} except HTTPException as e: - span = trace.get_current_span() - if span is not None: - span.record_exception(e) - span.set_status(Status(StatusCode.ERROR, str(e))) raise e except Exception as e: - span = trace.get_current_span() - if span is not None: - span.record_exception(e) - span.set_status(Status(StatusCode.ERROR, str(e))) logger.error("Failed to delete file from database", error=str(e)) raise HTTPException(status_code=500, detail="Database connection error") from e - @router.delete("/delete_all") async def delete_all_details(request: Request): """ Delete all the history of batches, files and logs. - --- tags: - Delete All @@ -936,14 +754,10 @@ async def delete_all_details(request: Request): authenticated_user = get_authenticated_user(request) user_id = authenticated_user.user_principal_id if not user_id: - track_event_if_configured( - "UserIdNotFound", {"endpoint": "delete_all_details"} - ) raise HTTPException(status_code=401, detail="User not authenticated") # Validate file_id format if not batch_service.is_valid_uuid(user_id): - track_event_if_configured("InvalidUserId", {"user_id": user_id}) raise HTTPException( status_code=400, detail=f"Invalid user_id format: {user_id}" ) @@ -951,36 +765,23 @@ async def delete_all_details(request: Request): # Delete all the files from storage and cosmosDB delete_all = await batch_service.delete_all_from_storage_cosmos(user_id) if delete_all is None: - track_event_if_configured("DeleteAllNotFound", {"user_id": user_id}) logger.error("File/Batch not found") raise HTTPException(status_code=404, detail="File/Batch not found") logger.info(f"All user data deleted successfully: {user_id}") - track_event_if_configured("DeleteAllSuccess", {"user_id": user_id}) return {"message": "All user data deleted successfully"} except HTTPException as e: - span = trace.get_current_span() - if span is not None: - span.record_exception(e) - span.set_status(Status(StatusCode.ERROR, str(e))) raise e except Exception as e: - span = trace.get_current_span() - if span is not None: - span.record_exception(e) - span.set_status(Status(StatusCode.ERROR, str(e))) logger.error("Failed to delete user data from database", error=str(e)) raise HTTPException(status_code=500, detail="Database connection error") from e @router.get("/batch-history") -async def list_batch_history( - request: Request, offset: int = 0, limit: Optional[int] = None -): +async def list_batch_history(request: Request, offset: int = 0, limit: Optional[int] = None): """ Retrieve batch processing history for the authenticated user. - --- tags: - Batch History @@ -1035,14 +836,10 @@ async def list_batch_history( try: batch_service = BatchService() await batch_service.initialize_database() - # Authenticate user authenticated_user = get_authenticated_user(request) user_id = authenticated_user.user_principal_id if not user_id: - track_event_if_configured( - "UserIdNotFound", {"endpoint": "list_batch_history"} - ) raise HTTPException(status_code=401, detail="User not authenticated") # Retrieve batch history @@ -1050,26 +847,14 @@ async def list_batch_history( user_id, limit=limit, offset=offset ) if not batch_history: - track_event_if_configured("BatchHistoryEmpty", {"user_id": user_id}) return HTTPException(status_code=404, detail="No batch history found.") - track_event_if_configured( - "BatchHistoryRetrieved", {"user_id": user_id, "count": len(batch_history)} - ) return batch_history except HTTPException as e: - span = trace.get_current_span() - if span is not None: - span.record_exception(e) - span.set_status(Status(StatusCode.ERROR, str(e))) raise e except Exception as e: logger.error("Error fetching batch history", error=str(e)) - span = trace.get_current_span() - if span is not None: - span.record_exception(e) - span.set_status(Status(StatusCode.ERROR, str(e))) raise HTTPException( status_code=500, detail="Error retrieving batch history" - ) from e + ) from e \ No newline at end of file