diff --git a/infra/main.bicep b/infra/main.bicep index 88b4efce..1c02d6c9 100644 --- a/infra/main.bicep +++ b/infra/main.bicep @@ -285,7 +285,6 @@ module applicationInsights 'br/public:avm/res/insights/component:0.7.0' = if (en name: 'appi-${solutionSuffix}' location: location workspaceResourceId: logAnalyticsWorkspaceResourceId - diagnosticSettings: [{ workspaceResourceId: logAnalyticsWorkspaceResourceId }] tags: allTags enableTelemetry: enableTelemetry retentionInDays: 365 diff --git a/infra/main.json b/infra/main.json index 9141a8ca..f98bb0c5 100644 --- a/infra/main.json +++ b/infra/main.json @@ -6,7 +6,7 @@ "_generator": { "name": "bicep", "version": "0.41.2.15936", - "templateHash": "18262017557117529046" + "templateHash": "3093757051086668797" }, "name": "Modernize Your Code Solution Accelerator", "description": "CSA CTO Gold Standard Solution Accelerator for Modernize Your Code. \r\n" @@ -3919,13 +3919,6 @@ "value": "[parameters('location')]" }, "workspaceResourceId": "[if(variables('useExistingLogAnalytics'), createObject('value', parameters('existingLogAnalyticsWorkspaceId')), createObject('value', reference('logAnalyticsWorkspace').outputs.resourceId.value))]", - "diagnosticSettings": { - "value": [ - { - "workspaceResourceId": "[if(variables('useExistingLogAnalytics'), parameters('existingLogAnalyticsWorkspaceId'), reference('logAnalyticsWorkspace').outputs.resourceId.value)]" - } - ] - }, "tags": { "value": "[variables('allTags')]" }, @@ -12902,11 +12895,11 @@ }, "dependsOn": [ "applicationInsights", + "[format('avmPrivateDnsZones[{0}]', variables('dnsZoneIndex').storageBlob)]", "[format('avmPrivateDnsZones[{0}]', variables('dnsZoneIndex').ods)]", "[format('avmPrivateDnsZones[{0}]', variables('dnsZoneIndex').agentSvc)]", - "[format('avmPrivateDnsZones[{0}]', variables('dnsZoneIndex').storageBlob)]", - "[format('avmPrivateDnsZones[{0}]', variables('dnsZoneIndex').oms)]", "[format('avmPrivateDnsZones[{0}]', variables('dnsZoneIndex').monitor)]", + "[format('avmPrivateDnsZones[{0}]', variables('dnsZoneIndex').oms)]", "dataCollectionEndpoint", "logAnalyticsWorkspace", "virtualNetwork" @@ -16624,15 +16617,9 @@ "osType": { "value": "Windows" }, - "vmSize": { - "value": "[if(not(empty(parameters('vmSize'))), parameters('vmSize'), 'Standard_D2s_v5')]" - }, - "adminUsername": { - "value": "[if(not(empty(parameters('vmAdminUsername'))), parameters('vmAdminUsername'), 'JumpboxAdminUser')]" - }, - "adminPassword": { - "value": "[if(not(empty(parameters('vmAdminPassword'))), parameters('vmAdminPassword'), 'JumpboxAdminP@ssw0rd1234!')]" - }, + "vmSize": "[if(not(empty(parameters('vmSize'))), createObject('value', parameters('vmSize')), createObject('value', 'Standard_D2s_v5'))]", + "adminUsername": "[if(not(empty(parameters('vmAdminUsername'))), createObject('value', parameters('vmAdminUsername')), createObject('value', 'JumpboxAdminUser'))]", + "adminPassword": "[if(not(empty(parameters('vmAdminPassword'))), createObject('value', parameters('vmAdminPassword')), createObject('value', 'JumpboxAdminP@ssw0rd1234!'))]", "managedIdentities": { "value": { "systemAssigned": true @@ -31930,8 +31917,8 @@ "dependsOn": [ "aiServices", "[format('avmPrivateDnsZones[{0}]', variables('dnsZoneIndex').openAI)]", - "[format('avmPrivateDnsZones[{0}]', variables('dnsZoneIndex').aiServices)]", "[format('avmPrivateDnsZones[{0}]', variables('dnsZoneIndex').cognitiveServices)]", + "[format('avmPrivateDnsZones[{0}]', variables('dnsZoneIndex').aiServices)]", "virtualNetwork" ] }, diff --git a/infra/main_custom.bicep b/infra/main_custom.bicep index c1746731..b6998587 100644 --- a/infra/main_custom.bicep +++ b/infra/main_custom.bicep @@ -288,7 +288,6 @@ module applicationInsights 'br/public:avm/res/insights/component:0.7.0' = if (en name: 'appi-${solutionSuffix}' location: location workspaceResourceId: logAnalyticsWorkspaceResourceId - diagnosticSettings: [{ workspaceResourceId: logAnalyticsWorkspaceResourceId }] tags: allTags enableTelemetry: enableTelemetry retentionInDays: 365 diff --git a/src/backend/api/api_routes.py b/src/backend/api/api_routes.py index bff54d75..57d33e75 100644 --- a/src/backend/api/api_routes.py +++ b/src/backend/api/api_routes.py @@ -44,6 +44,15 @@ def record_exception_to_trace(e): span.set_status(Status(StatusCode.ERROR, str(e))) +def set_span_attributes(**attributes): + """Set key-value attributes on the current OpenTelemetry span.""" + span = trace.get_current_span() + if span and span.is_recording(): + for key, value in attributes.items(): + if value: + span.set_attribute(key, str(value)) + + # start processing the batch @router.post("/start-processing") async def start_processing(request: Request): @@ -82,11 +91,13 @@ async def start_processing(request: Request): 500: description: Internal server error """ + batch_id = None try: payload = await request.json() batch_id = payload.get("batch_id") translate_from = payload.get("translate_from") translate_to = payload.get("translate_to") + set_span_attributes(batch_id=batch_id) track_event_if_configured( "ProcessingStart", @@ -103,6 +114,10 @@ async def start_processing(request: Request): "message": "Files processed", } except Exception as e: + event_data = {"error": str(e)} + if batch_id is not None: + event_data["batch_id"] = batch_id + track_event_if_configured("ProcessingFailed", event_data) record_exception_to_trace(e) raise HTTPException(status_code=500, detail=str(e)) from e @@ -146,6 +161,7 @@ async def download_files(batch_id: str): # call batch_service get_batch_for_zip to get all files for batch_id batch_service = BatchService() await batch_service.initialize_database() + set_span_attributes(batch_id=batch_id) file_data = await batch_service.get_batch_for_zip(batch_id) if not file_data: @@ -187,6 +203,7 @@ async def download_files(batch_id: str): ) return Response(zip_data, media_type="application/zip", headers=headers) except Exception as e: + track_event_if_configured("DownloadZipFailed", {"batch_id": batch_id, "error": str(e)}) record_exception_to_trace(e) raise HTTPException( status_code=404, detail=f"Error creating ZIP file: {str(e)}" @@ -270,7 +287,7 @@ async def batch_status_updates( await close_connection(batch_id) except Exception as e: record_exception_to_trace(e) - logger.error("Error in WebSocket connection", error=str(e)) + logger.error("Error in WebSocket connection", error=str(e), batch_id=batch_id) await close_connection(batch_id) @@ -373,15 +390,16 @@ async def get_batch_status(request: Request, batch_id: str): # Authenticate user authenticated_user = get_authenticated_user(request) user_id = authenticated_user.user_principal_id + set_span_attributes(batch_id=batch_id, user_id=user_id) if not user_id: track_event_if_configured( - "UserIdNotFound", {"status_code": 400, "detail": "no user"} + "UserIdNotFound", {"status_code": 400, "detail": "no user", "batch_id": batch_id} ) raise HTTPException(status_code=401, detail="User not authenticated") # Validate batch_id format if not batch_service.is_valid_uuid(batch_id): - track_event_if_configured("InvalidBatchId", {"batch_id": batch_id}) + track_event_if_configured("InvalidBatchId", {"batch_id": batch_id, "user_id": user_id}) raise HTTPException(status_code=400, detail="Invalid batch_id format") # Fetch batch details @@ -399,7 +417,7 @@ async def get_batch_status(request: Request, batch_id: str): record_exception_to_trace(e) raise e except Exception as e: - logger.error("Error retrieving batch history", error=str(e)) + logger.error("Error retrieving batch history", error=str(e), batch_id=batch_id) record_exception_to_trace(e) error_message = str(e) if "403" in error_message: @@ -417,9 +435,10 @@ async def get_batch_summary(request: Request, batch_id: str): # Authenticate user authenticated_user = get_authenticated_user(request) user_id = authenticated_user.user_principal_id + set_span_attributes(batch_id=batch_id, user_id=user_id) if not user_id: track_event_if_configured( - "UserIdNotFound", {"status_code": 400, "detail": "no user"} + "UserIdNotFound", {"status_code": 400, "detail": "no user", "batch_id": batch_id} ) raise HTTPException(status_code=401, detail="User not authenticated") @@ -431,16 +450,16 @@ async def get_batch_summary(request: Request, batch_id: str): ) raise HTTPException(status_code=404, detail="No batch summary found.") track_event_if_configured( - "BatchSummaryRetrieved", {"batch_summary": batch_summary} + "BatchSummaryRetrieved", {"batch_id": batch_id, "user_id": user_id, "batch_summary": batch_summary} ) return batch_summary except HTTPException as e: - logger.error("Error fetching batch summary", error=str(e)) + logger.error("Error fetching batch summary", error=str(e), batch_id=batch_id) record_exception_to_trace(e) raise e except Exception as e: - logger.error("Error fetching batch summary", error=str(e)) + logger.error("Error fetching batch summary", error=str(e), batch_id=batch_id) record_exception_to_trace(e) raise HTTPException(status_code=404, detail="Batch not found") from e @@ -538,16 +557,17 @@ async def upload_file( authenticated_user = get_authenticated_user(request) user_id = authenticated_user.user_principal_id + set_span_attributes(batch_id=batch_id, user_id=user_id) if not user_id: track_event_if_configured( - "UserIdNotFound", {"status_code": 400, "detail": "no user"} + "UserIdNotFound", {"status_code": 400, "detail": "no user", "batch_id": batch_id} ) raise HTTPException(status_code=401, detail="User not authenticated") # Validate batch_id format - logger.info(f"batch_id: {batch_id}") + logger.info("Upload request received", batch_id=batch_id, filename=str(file.filename)) if not batch_service.is_valid_uuid(batch_id): - track_event_if_configured("InvalidBatchId", {"batch_id": batch_id}) + track_event_if_configured("InvalidBatchId", {"batch_id": batch_id, "user_id": user_id}) raise HTTPException(status_code=400, detail="Invalid batch_id format") # Upload file via BatchService @@ -660,6 +680,7 @@ async def get_file_details(request: Request, file_id: str): # Authenticate user authenticated_user = get_authenticated_user(request) user_id = authenticated_user.user_principal_id + set_span_attributes(file_id=file_id, user_id=user_id) if not user_id: track_event_if_configured( "UserIdNotFound", {"endpoint": "get_file_details"} @@ -668,22 +689,22 @@ async def get_file_details(request: Request, file_id: str): # Validate file_id format if not batch_service.is_valid_uuid(file_id): - track_event_if_configured("InvalidFileId", {"file_id": file_id}) + track_event_if_configured("InvalidFileId", {"file_id": file_id, "user_id": user_id}) raise HTTPException(status_code=400, detail="Invalid file_id format") # Fetch file details file_data = await batch_service.get_file_report(file_id) if not file_data: - track_event_if_configured("FileNotFound", {"file_id": file_id}) + track_event_if_configured("FileNotFound", {"file_id": file_id, "user_id": user_id}) raise HTTPException(status_code=404, detail="File not found") - track_event_if_configured("FileDetailsRetrieved", {"file_data": file_data}) + track_event_if_configured("FileDetailsRetrieved", {"file_id": file_id, "user_id": user_id, "file_data": file_data}) return file_data except HTTPException as e: record_exception_to_trace(e) raise e except Exception as e: - logger.error("Error retrieving file details", error=str(e)) + logger.error("Error retrieving file details", error=str(e), file_id=file_id) record_exception_to_trace(e) raise HTTPException(status_code=500, detail="Internal server error") from e @@ -721,15 +742,16 @@ async def delete_batch_details(request: Request, batch_id: str): # Authenticate user authenticated_user = get_authenticated_user(request) user_id = authenticated_user.user_principal_id + set_span_attributes(batch_id=batch_id, user_id=user_id) if not user_id: track_event_if_configured( - "UserIdNotFound", {"endpoint": "delete_batch_details"} + "UserIdNotFound", {"batch_id": batch_id, "endpoint": "delete_batch_details"} ) raise HTTPException(status_code=401, detail="User not authenticated") # Validate file_id format if not batch_service.is_valid_uuid(batch_id): - track_event_if_configured("InvalidBatchId", {"batch_id": batch_id}) + track_event_if_configured("InvalidBatchId", {"batch_id": batch_id, "user_id": user_id}) raise HTTPException( status_code=400, detail=f"Invalid batch_id format: {batch_id}" ) @@ -745,7 +767,7 @@ async def delete_batch_details(request: Request, batch_id: str): record_exception_to_trace(e) raise e except Exception as e: - logger.error("Failed to delete batch from database", error=str(e)) + logger.error("Failed to delete batch from database", error=str(e), batch_id=batch_id) record_exception_to_trace(e) raise HTTPException(status_code=500, detail="Database connection error") from e @@ -783,6 +805,7 @@ async def delete_file_details(request: Request, file_id: str): # Authenticate user authenticated_user = get_authenticated_user(request) user_id = authenticated_user.user_principal_id + set_span_attributes(file_id=file_id, user_id=user_id) if not user_id: track_event_if_configured( "UserIdNotFound", {"endpoint": "delete_file_details"} @@ -791,7 +814,7 @@ async def delete_file_details(request: Request, file_id: str): # Validate file_id format if not batch_service.is_valid_uuid(file_id): - track_event_if_configured("InvalidFileId", {"file_id": file_id}) + track_event_if_configured("InvalidFileId", {"file_id": file_id, "user_id": user_id}) raise HTTPException( status_code=400, detail=f"Invalid file_id format: {file_id}" ) @@ -799,7 +822,7 @@ async def delete_file_details(request: Request, file_id: str): # Delete file file_delete = await batch_service.delete_file(file_id, user_id) if file_delete is None: - track_event_if_configured("FileDeleteNotFound", {"file_id": file_id}) + track_event_if_configured("FileDeleteNotFound", {"file_id": file_id, "user_id": user_id}) raise HTTPException(status_code=404, detail=f"File not found: {file_id}") logger.info(f"File deleted successfully: {file_delete}") @@ -810,7 +833,7 @@ async def delete_file_details(request: Request, file_id: str): record_exception_to_trace(e) raise e except Exception as e: - logger.error("Failed to delete file from database", error=str(e)) + logger.error("Failed to delete file from database", error=str(e), file_id=file_id) record_exception_to_trace(e) raise HTTPException(status_code=500, detail="Database connection error") from e diff --git a/src/backend/api/event_utils.py b/src/backend/api/event_utils.py index 11905a8a..137fb82d 100644 --- a/src/backend/api/event_utils.py +++ b/src/backend/api/event_utils.py @@ -3,69 +3,18 @@ import os # Third-party -from applicationinsights import TelemetryClient -from applicationinsights.channel import SynchronousQueue, SynchronousSender, TelemetryChannel +from azure.monitor.events.extension import track_event from dotenv import load_dotenv load_dotenv() -# Global telemetry client (initialized once) -_telemetry_client = None - - -def _get_telemetry_client(): - """Get or create the Application Insights telemetry client.""" - global _telemetry_client - - if _telemetry_client is None: - connection_string = os.getenv("APPLICATIONINSIGHTS_CONNECTION_STRING") - if connection_string: - try: - # Extract instrumentation key from connection string - # Format: InstrumentationKey=xxx;IngestionEndpoint=https://... - parts = dict(part.split('=', 1) for part in connection_string.split(';') if '=' in part) - instrumentation_key = parts.get('InstrumentationKey') - - if instrumentation_key: - # Create a synchronous channel for immediate sending - sender = SynchronousSender() - queue = SynchronousQueue(sender) - channel = TelemetryChannel(None, queue) - - _telemetry_client = TelemetryClient(instrumentation_key, channel) - logging.info("Application Insights TelemetryClient initialized successfully") - else: - logging.error("Could not extract InstrumentationKey from connection string") - except Exception as e: - logging.error(f"Failed to initialize TelemetryClient: {e}") - - return _telemetry_client - def track_event_if_configured(event_name: str, event_data: dict): - """Track a custom event to Application Insights customEvents table. - - This uses the Application Insights SDK TelemetryClient which properly - sends custom events to the customEvents table in Application Insights. - """ + """Track a custom event to Application Insights customEvents table.""" instrumentation_key = os.getenv("APPLICATIONINSIGHTS_CONNECTION_STRING") if instrumentation_key: - try: - client = _get_telemetry_client() - if client: - # Convert all values to strings to ensure compatibility - properties = {k: str(v) for k, v in event_data.items()} - - # Track the custom event - client.track_event(event_name, properties=properties) - client.flush() # Ensure immediate sending - - logging.debug(f"Tracked custom event: {event_name} with data: {event_data}") - else: - logging.warning("TelemetryClient not available, custom event not tracked") - except Exception as e: - logging.error(f"Failed to track event {event_name}: {e}") + track_event(event_name, event_data) else: logging.warning( f"Skipping track_event for {event_name} as Application Insights is not configured" diff --git a/src/backend/app.py b/src/backend/app.py index 58ee1a29..358916c5 100644 --- a/src/backend/app.py +++ b/src/backend/app.py @@ -5,10 +5,11 @@ from api.api_routes import router as backend_router -from azure.monitor.opentelemetry.exporter import AzureMonitorLogExporter, AzureMonitorTraceExporter +from azure.monitor.opentelemetry import configure_azure_monitor from common.config.config import app_config from common.logger.app_logger import AppLogger +from common.telemetry import patch_instrumentors from dotenv import load_dotenv @@ -17,13 +18,7 @@ from helper.azure_credential_utils import get_azure_credential -from opentelemetry import trace -from opentelemetry._logs import set_logger_provider from opentelemetry.instrumentation.fastapi import FastAPIInstrumentor -from opentelemetry.sdk._logs import LoggerProvider, LoggingHandler -from opentelemetry.sdk._logs.export import BatchLogRecordProcessor -from opentelemetry.sdk.trace import TracerProvider -from opentelemetry.sdk.trace.export import BatchSpanProcessor from semantic_kernel.agents.azure_ai.azure_ai_agent import AzureAIAgent # pylint: disable=E0611 @@ -52,14 +47,17 @@ format='%(asctime)s - %(name)s - %(levelname)s - %(message)s' ) +# Suppress noisy Azure SDK and OpenTelemetry internal loggers. +logging.getLogger("azure.core.pipeline.policies.http_logging_policy").setLevel(logging.WARNING) +logging.getLogger("azure.core.pipeline.policies._universal").setLevel(logging.WARNING) +logging.getLogger("azure.cosmos").setLevel(logging.WARNING) +logging.getLogger("opentelemetry.sdk").setLevel(logging.WARNING) +logging.getLogger("azure.monitor.opentelemetry.exporter.export._base").setLevel(logging.WARNING) + # Package config: Azure loggers set to WARNING to suppress INFO for logger_name in AZURE_LOGGING_PACKAGES: logging.getLogger(logger_name).setLevel(getattr(logging, AZURE_PACKAGE_LOGGING_LEVEL, logging.WARNING)) -# Suppress noisy OpenTelemetry and Azure Monitor logs -# logging.getLogger("opentelemetry.sdk").setLevel(logging.ERROR) -# logging.getLogger("azure.core.pipeline.policies.http_logging_policy").setLevel(logging.WARNING) -# logging.getLogger("azure.monitor.opentelemetry.exporter.export._base").setLevel(logging.WARNING) logger = AppLogger("app") @@ -75,7 +73,7 @@ async def lifespan(app: FastAPI): # Startup try: - logger.logger.info("Initializing SQL agents...") + logger.info("Initializing SQL agents...") # Create Azure credentials and client creds = get_azure_credential(app_config.azure_client_id) @@ -96,10 +94,10 @@ async def lifespan(app: FastAPI): # Set the global agents instance set_sql_agents(sql_agents) - logger.logger.info("SQL agents initialized successfully.") + logger.info("SQL agents initialized successfully.") - except Exception as exc: - logger.logger.error("Failed to initialize SQL agents: %s", exc) + except Exception: # noqa: BLE001 + logger.error("Failed to initialize SQL agents") # Don't raise the exception to allow the app to start even if agents fail yield # Application runs here @@ -107,9 +105,9 @@ async def lifespan(app: FastAPI): # Shutdown try: if sql_agents: - logger.logger.info("Application shutting down - cleaning up SQL agents...") + logger.info("Application shutting down - cleaning up SQL agents...") await sql_agents.delete_agents() - logger.logger.info("SQL agents cleaned up successfully.") + logger.info("SQL agents cleaned up successfully.") # Clear the global agents instance await clear_sql_agents() @@ -117,8 +115,8 @@ async def lifespan(app: FastAPI): if azure_client: await azure_client.close() - except Exception as exc: - logger.logger.error("Error during agent cleanup: %s", exc) + except Exception: # noqa: BLE001 + logger.error("Error during agent cleanup") def create_app() -> FastAPI: @@ -138,54 +136,24 @@ def create_app() -> FastAPI: # This must happen AFTER app creation but BEFORE route registration instrumentation_key = os.getenv("APPLICATIONINSIGHTS_CONNECTION_STRING") if instrumentation_key: - # SOLUTION: Use manual telemetry setup instead of configure_azure_monitor - # This gives us precise control over what gets instrumented, avoiding interference - # with Semantic Kernel's async generators while still tracking Azure SDK calls - - # Set up Azure Monitor exporter for traces - azure_trace_exporter = AzureMonitorTraceExporter(connection_string=instrumentation_key) - - # Create a tracer provider and add the Azure Monitor exporter - tracer_provider = TracerProvider() - tracer_provider.add_span_processor(BatchSpanProcessor(azure_trace_exporter)) - - # Set the global tracer provider - trace.set_tracer_provider(tracer_provider) + # Patch azure.ai.agents, azure.ai.projects instrumentor to handle dict response_format (prevents ValueError) + patch_instrumentors() - # Set up Azure Monitor exporter for logs (appears in traces table) - azure_log_exporter = AzureMonitorLogExporter(connection_string=instrumentation_key) - - # Create a logger provider and add the Azure Monitor exporter - logger_provider = LoggerProvider() - logger_provider.add_log_record_processor(BatchLogRecordProcessor(azure_log_exporter)) - set_logger_provider(logger_provider) - - # Attach OpenTelemetry handler to Python's root logger - handler = LoggingHandler(logger_provider=logger_provider) - logging.getLogger().addHandler(handler) + # Configure Azure Monitor with FULL auto-instrumentation + configure_azure_monitor( + connection_string=instrumentation_key, + enable_live_metrics=True + ) - # Instrument ONLY FastAPI for HTTP request/response tracing - # This is safe because it only wraps HTTP handlers, not internal async operations + # Instrument FastAPI for HTTP request/response tracing FastAPIInstrumentor.instrument_app( app, - excluded_urls="socket,ws", # Exclude WebSocket URLs to reduce noise - tracer_provider=tracer_provider + excluded_urls="health,socket,ws", ) - # Optional: Add manual spans in your code for Azure SDK operations using: - # from opentelemetry import trace - # tracer = trace.get_tracer(__name__) - # with tracer.start_as_current_span("operation_name"): - # # your Azure SDK call here - - logger.logger.info("Application Insights configured with selective instrumentation") - logger.logger.info("✓ FastAPI HTTP tracing enabled") - logger.logger.info("✓ Python logging export to Application Insights enabled") - logger.logger.info("✓ Manual span support enabled for Azure SDK operations") - logger.logger.info("✓ Custom events via OpenTelemetry enabled") - logger.logger.info("✓ Semantic Kernel async generators unaffected") + logger.info("Application Insights configured with full auto-instrumentation") else: - logger.logger.warning("No Application Insights connection string found. Telemetry disabled.") + logger.warning("No Application Insights connection string found. Telemetry disabled.") # Include routers with /api prefix app.include_router(backend_router, prefix="/api", tags=["backend"]) diff --git a/src/backend/common/logger/app_logger.py b/src/backend/common/logger/app_logger.py index b9aed467..872af1f6 100644 --- a/src/backend/common/logger/app_logger.py +++ b/src/backend/common/logger/app_logger.py @@ -1,5 +1,6 @@ import json import logging +import sys from typing import Any @@ -43,8 +44,10 @@ def info(self, message: Any, **kwargs) -> None: def warning(self, message: Any, **kwargs) -> None: self.logger.warning(self._format_message(message, **kwargs)) - def error(self, message: Any, **kwargs) -> None: - self.logger.error(self._format_message(message, **kwargs)) + def error(self, message: Any, exc_info=None, **kwargs) -> None: + if exc_info is None: + exc_info = sys.exc_info()[0] is not None + self.logger.error(self._format_message(message, **kwargs), exc_info=exc_info) def critical(self, message: Any, **kwargs) -> None: self.logger.critical(self._format_message(message, **kwargs)) diff --git a/src/backend/common/services/batch_service.py b/src/backend/common/services/batch_service.py index f467820a..828c16d5 100644 --- a/src/backend/common/services/batch_service.py +++ b/src/backend/common/services/batch_service.py @@ -285,6 +285,7 @@ async def upload_file_to_batch(self, batch_id: str, user_id: str, file: UploadFi content_type=file.content_type, metadata={"batch_id": batch_id, "user_id": user_id, "file_id": file_id}, ) + self.logger.info("File uploaded to blob storage", filename=file.filename, batch_id=batch_id) # Create file entry await self.database.add_file(batch_id, file_id, file.filename, blob_path) @@ -370,6 +371,7 @@ async def delete_batch_and_files(self, batch_id: str, user_id: str): # Delete file and log entry from database await self.database.delete_file(user_id, str(file_record.file_id)) await self.database.delete_batch(user_id, batch_id) + self.logger.info("Batch and all files deleted successfully", batch_id=batch_id) return {"message": "Files deleted successfully"} except (RuntimeError, ValueError, IOError) as e: @@ -429,6 +431,7 @@ async def update_batch(self, batch_id: str, status: ProcessStatus): batch_record.status = status batch_record.updated_at = datetime.utcnow() await self.database.update_batch(batch_record) + self.logger.info("Batch status updated", batch_id=batch_id, status=status.value) async def create_candidate(self, file_id: str, candidate: str): """Create a new candidate entry in the database and upload the candita file to storage.""" diff --git a/src/backend/common/telemetry/__init__.py b/src/backend/common/telemetry/__init__.py index 1e725b70..799b0942 100644 --- a/src/backend/common/telemetry/__init__.py +++ b/src/backend/common/telemetry/__init__.py @@ -1,17 +1,7 @@ """Telemetry utilities for Application Insights integration.""" -from common.telemetry.telemetry_helper import ( - add_span_attributes, - get_tracer, - trace_context, - trace_operation, - trace_sync_context, -) +from common.telemetry.patch_instrumentor import patch_instrumentors __all__ = [ - "trace_operation", - "trace_context", - "trace_sync_context", - "get_tracer", - "add_span_attributes", + "patch_instrumentors", ] diff --git a/src/backend/common/telemetry/patch_instrumentor.py b/src/backend/common/telemetry/patch_instrumentor.py new file mode 100644 index 00000000..f2baca23 --- /dev/null +++ b/src/backend/common/telemetry/patch_instrumentor.py @@ -0,0 +1,62 @@ +""" +Patch for Azure AI telemetry instrumentors. + +Fixes GitHub issue: https://github.com/microsoft/semantic-kernel/issues/13715 + +The bug: agent_api_response_to_str() in both azure.ai.agents and azure.ai.projects +raises ValueError when response_format is a dict (e.g. from Semantic Kernel's AzureAIAgent). +It only handles str and None types. + +The fix: Monkey-patch that method to convert dict/other types to JSON string instead of raising. +Must be called BEFORE configure_azure_monitor() triggers any instrumentation. +""" + +import json +import logging +from typing import Any, Optional + +logger = logging.getLogger(__name__) + + +def _fixed_response_to_str(response_format: Any) -> Optional[str]: + """Convert response_format to string, handling dict types.""" + if response_format is None: + return None + if isinstance(response_format, str): + return response_format + try: + return json.dumps(response_format, default=str) + except (TypeError, ValueError): + return str(response_format) + + +def patch_instrumentors(): + """ + Patch Azure AI telemetry instrumentors to handle dict response_format. + + This fixes the ValueError: "Unknown response format " error + that occurs when Semantic Kernel's AzureAIAgent passes a dict as response_format + and Azure Monitor telemetry instrumentor tries to serialize it. + + Patches both azure.ai.agents and azure.ai.projects packages. + Must be called BEFORE configure_azure_monitor(). + """ + # Patch azure.ai.agents (primary package with the bug) + try: + from azure.ai.agents.telemetry._ai_agents_instrumentor import ( + _AIAgentsInstrumentorPreview as _AgentsPreview, + ) + _AgentsPreview.agent_api_response_to_str = staticmethod(_fixed_response_to_str) + logger.info("Patched azure.ai.agents instrumentor") + except ImportError: + logger.debug("azure.ai.agents telemetry not installed, skipping patch") + + # Patch azure.ai.projects (in case it exists in the environment) + try: + from azure.ai.projects.telemetry._ai_project_instrumentor import ( + _AIAgentsInstrumentorPreview as _ProjectsPreview, + ) + _ProjectsPreview.agent_api_response_to_str = staticmethod(_fixed_response_to_str) + logger.info("Patched azure.ai.projects instrumentor") + except ImportError: + logger.debug("azure.ai.projects telemetry not installed, skipping patch") diff --git a/src/backend/common/telemetry/telemetry_helper.py b/src/backend/common/telemetry/telemetry_helper.py deleted file mode 100644 index e1f87900..00000000 --- a/src/backend/common/telemetry/telemetry_helper.py +++ /dev/null @@ -1,160 +0,0 @@ -"""Helper utilities for adding telemetry spans to Azure SDK operations. - -This module provides decorators and context managers for adding OpenTelemetry -spans to Azure SDK calls (CosmosDB, Blob Storage, etc.) without interfering -with Semantic Kernel's async generators. - -Example usage: - from common.telemetry.telemetry_helper import trace_operation - - @trace_operation("cosmosdb_query") - async def query_items(self, query: str): - # Your CosmosDB query here - pass -""" - -import asyncio -import functools -from contextlib import asynccontextmanager, contextmanager -from typing import Optional - -from opentelemetry import trace -from opentelemetry.trace import Status, StatusCode - - -def get_tracer(name: str = __name__): - """Get a tracer instance for the given name.""" - return trace.get_tracer(name) - - -def trace_operation(operation_name: str, attributes: Optional[dict] = None): - """Decorator to add telemetry span to a function or method. - - Args: - operation_name: Name of the operation for the span - attributes: Optional dictionary of attributes to add to the span - - Example: - @trace_operation("batch_processing", {"service": "sql_agents"}) - async def process_batch(batch_id: str): - # Your code here - pass - """ - def decorator(func): - @functools.wraps(func) - async def async_wrapper(*args, **kwargs): - tracer = get_tracer(func.__module__) - with tracer.start_as_current_span(operation_name) as span: - # Add custom attributes if provided - if attributes: - for key, value in attributes.items(): - span.set_attribute(key, str(value)) - - # Add function arguments as attributes (optional, for debugging) - span.set_attribute("function", func.__name__) - - try: - result = await func(*args, **kwargs) - span.set_status(Status(StatusCode.OK)) - return result - except Exception as e: - span.record_exception(e) - span.set_status(Status(StatusCode.ERROR, str(e))) - raise - - @functools.wraps(func) - def sync_wrapper(*args, **kwargs): - tracer = get_tracer(func.__module__) - with tracer.start_as_current_span(operation_name) as span: - if attributes: - for key, value in attributes.items(): - span.set_attribute(key, str(value)) - - span.set_attribute("function", func.__name__) - - try: - result = func(*args, **kwargs) - span.set_status(Status(StatusCode.OK)) - return result - except Exception as e: - span.record_exception(e) - span.set_status(Status(StatusCode.ERROR, str(e))) - raise - - # Return appropriate wrapper based on function type - if asyncio.iscoroutinefunction(func): - return async_wrapper - else: - return sync_wrapper - - return decorator - - -@asynccontextmanager -async def trace_context(operation_name: str, attributes: Optional[dict] = None): - """Async context manager for adding telemetry span to a code block. - - Args: - operation_name: Name of the operation for the span - attributes: Optional dictionary of attributes to add to the span - - Example: - async with trace_context("cosmosdb_batch_query", {"batch_id": batch_id}): - results = await database.query_items(query) - # Your code here - """ - tracer = get_tracer() - with tracer.start_as_current_span(operation_name) as span: - if attributes: - for key, value in attributes.items(): - span.set_attribute(key, str(value)) - - try: - yield span - span.set_status(Status(StatusCode.OK)) - except Exception as e: - span.record_exception(e) - span.set_status(Status(StatusCode.ERROR, str(e))) - raise - - -@contextmanager -def trace_sync_context(operation_name: str, attributes: Optional[dict] = None): - """Sync context manager for adding telemetry span to a code block. - - Args: - operation_name: Name of the operation for the span - attributes: Optional dictionary of attributes to add to the span - - Example: - with trace_sync_context("blob_upload", {"file_name": file_name}): - blob_client.upload_blob(data) - """ - tracer = get_tracer() - with tracer.start_as_current_span(operation_name) as span: - if attributes: - for key, value in attributes.items(): - span.set_attribute(key, str(value)) - - try: - yield span - span.set_status(Status(StatusCode.OK)) - except Exception as e: - span.record_exception(e) - span.set_status(Status(StatusCode.ERROR, str(e))) - raise - - -def add_span_attributes(attributes: dict): - """Add attributes to the current span. - - Args: - attributes: Dictionary of attributes to add - - Example: - add_span_attributes({"user_id": user_id, "batch_id": batch_id}) - """ - span = trace.get_current_span() - if span and span.is_recording(): - for key, value in attributes.items(): - span.set_attribute(key, str(value)) diff --git a/src/backend/requirements.txt b/src/backend/requirements.txt index 4a2fdaf1..685792f6 100644 --- a/src/backend/requirements.txt +++ b/src/backend/requirements.txt @@ -44,7 +44,7 @@ starlette aiortc opentelemetry-exporter-otlp-proto-grpc opentelemetry-exporter-otlp-proto-http -applicationinsights +azure-monitor-events-extension opentelemetry-sdk==1.39.0 opentelemetry-api==1.39.0 opentelemetry-semantic-conventions==0.60b0 diff --git a/src/backend/sql_agents/convert_script.py b/src/backend/sql_agents/convert_script.py index eb4a2767..10b8dbae 100644 --- a/src/backend/sql_agents/convert_script.py +++ b/src/backend/sql_agents/convert_script.py @@ -5,10 +5,10 @@ """ import json -import logging from api.status_updates import send_status_update +from common.logger.app_logger import AppLogger from common.models.api import ( FileProcessUpdate, FileRecord, @@ -29,8 +29,7 @@ from sql_agents.helpers.comms_manager import CommsManager from sql_agents.helpers.models import AgentType -logger = logging.getLogger(__name__) -logger.setLevel(logging.DEBUG) +logger = AppLogger("ConvertScript") async def convert_script( @@ -41,7 +40,7 @@ async def convert_script( # agent_config: AgentBaseConfig, ) -> str: """Use the team of agents to migrate a sql script.""" - logger.info("Migrating query: %s\n", source_script) + logger.info("Starting migration", file_id=str(file.file_id), batch_id=str(file.batch_id)) # Setup the group chat for the agents comms_manager = CommsManager( @@ -124,7 +123,7 @@ async def convert_script( response.content or "" ) except Exception as picker_exc: - logger.error("Picker agent returned invalid response: %s", picker_exc) + logger.error("Picker agent returned invalid response", error=str(picker_exc)) # Fallback to a valid PickerResponse with default values result = PickerResponse( conclusion="No valid candidate could be selected. The agent did not return a proper response.", @@ -139,14 +138,14 @@ async def convert_script( current_migration = result.fixed_query case AgentType.SEMANTIC_VERIFIER.value: logger.info( - "Semantic verifier agent response: %s", response.content + "Semantic verifier agent response received", content=response.content ) try: result = SemanticVerifierResponse.model_validate_json( response.content or "" ) except Exception as verifier_exc: - logger.error("Semantic Verifier agent returned invalid response: %s", verifier_exc) + logger.error("Semantic Verifier agent returned invalid response", error=str(verifier_exc)) # Fallback to a valid SemanticVerifierResponse with default values result = SemanticVerifierResponse( judgement="Semantic verifier agent failed to return a valid response.", @@ -215,11 +214,11 @@ async def convert_script( "content": response.content, } - logger.info(description) + logger.info("Agent response received", role=response.role, name=response.name or "*") try: parsed_content = json.loads(response.content or "{}") except json.JSONDecodeError: - logger.warning("Invalid JSON from agent: %s", response.content) + logger.warning("Invalid JSON from agent", name=response.name or "*") parsed_content = { "input_summary": "", "candidates": [], @@ -249,7 +248,7 @@ async def convert_script( AuthorRole(response.role), ) except Exception as e: - logger.error("Error during comms_manager.async_invoke(): %s", str(e)) + logger.error("Error during comms_manager.async_invoke()", file_id=str(file.file_id), batch_id=str(file.batch_id), error=str(e)) # Log the error to the batch service for tracking await batch_service.create_file_log( str(file.file_id), @@ -279,7 +278,7 @@ async def convert_script( # Handle the case where migration failed and current_migration is None if current_migration is None: - logger.info("# Migration failed - no valid migration produced.") + logger.info("Migration failed - no valid migration produced", file_id=str(file.file_id)) return "" is_valid = await validate_migration( @@ -287,14 +286,10 @@ async def convert_script( ) if not is_valid: - logger.info("# Migration failed.") + logger.info("Migration validation failed", file_id=str(file.file_id)) return "" - logger.info("# Migration complete.") - logger.info("Final query: %s\n", migrated_query) - logger.info( - "Analysis of source and migrated queries:\n%s", "semantic verifier response" - ) + logger.info("Migration completed successfully", file_id=str(file.file_id), batch_id=str(file.batch_id)) return migrated_query @@ -302,9 +297,9 @@ async def convert_script( # Clean up threads and communication resources - guaranteed to run try: await comms_manager.cleanup() - logger.debug("Thread cleanup completed successfully for file %s", file.file_id) + logger.debug("Thread cleanup completed", file_id=str(file.file_id)) except Exception as cleanup_exc: - logger.error("Error during thread cleanup for file %s: %s", file.file_id, cleanup_exc) + logger.error("Error during thread cleanup", file_id=str(file.file_id), error=str(cleanup_exc)) async def validate_migration( diff --git a/src/backend/sql_agents/process_batch.py b/src/backend/sql_agents/process_batch.py index 1da3467f..f5c1656f 100644 --- a/src/backend/sql_agents/process_batch.py +++ b/src/backend/sql_agents/process_batch.py @@ -4,10 +4,9 @@ It is the main entry point for the SQL migration process. """ -import logging - from api.status_updates import send_status_update +from common.logger.app_logger import AppLogger from common.models.api import ( FileProcessUpdate, FileRecord, @@ -17,7 +16,6 @@ ) from common.services.batch_service import BatchService from common.storage.blob_factory import BlobStorageFactory -from common.telemetry import trace_context from fastapi import HTTPException @@ -29,8 +27,7 @@ from sql_agents.helpers.models import AgentType from sql_agents.helpers.utils import is_text -logger = logging.getLogger(__name__) -logger.setLevel(logging.DEBUG) +logger = AppLogger("ProcessBatch") # Walk through batch structure processing each file @@ -38,120 +35,115 @@ async def process_batch_async( batch_id: str, convert_from: str = "informix", convert_to: str = "tsql" ): """Central batch processing function to process each file in the batch""" - # Add telemetry span for the entire batch processing operation - async with trace_context("process_batch", { - "batch_id": batch_id, - "convert_from": convert_from, - "convert_to": convert_to - }): - logger.info("Processing batch: %s", batch_id) - storage = await BlobStorageFactory.get_storage() - batch_service = BatchService() - await batch_service.initialize_database() - + logger.info("Processing batch", batch_id=batch_id) + storage = await BlobStorageFactory.get_storage() + batch_service = BatchService() + await batch_service.initialize_database() + + try: + batch_files = await batch_service.database.get_batch_files(batch_id) + if not batch_files: + raise HTTPException(status_code=404, detail="Batch not found") + logger.info("Batch files found", batch_id=batch_id, file_count=len(batch_files)) + # Retrieve list of file paths + await batch_service.update_batch(batch_id, ProcessStatus.IN_PROGRESS) + except Exception as exc: + logger.error("Error updating batch status", batch_id=batch_id, error=str(exc)) + # Mark batch as failed and stop further processing if initialization fails + await batch_service.update_batch(batch_id, ProcessStatus.FAILED) + return + + # Get the global SQL agents instance + sql_agents = get_sql_agents() + if not sql_agents: + logger.error("SQL agents not initialized", exc_info=False, batch_id=batch_id) + await batch_service.update_batch(batch_id, ProcessStatus.FAILED) + return + + # Update agent configuration for this batch's conversion requirements + await update_agent_config(convert_from, convert_to) + + # Walk through each file name and retrieve it from blob storage + # Send file to the agents for processing + # Send status update to the client of type in progress, completed, or failed + for file in batch_files: + # Get the file from blob storage try: - batch_files = await batch_service.database.get_batch_files(batch_id) - if not batch_files: - raise HTTPException(status_code=404, detail="Batch not found") - # Retrieve list of file paths - await batch_service.update_batch(batch_id, ProcessStatus.IN_PROGRESS) - except Exception as exc: - logger.error("Error updating batch status. %s", exc) - - # Get the global SQL agents instance - sql_agents = get_sql_agents() - if not sql_agents: - logger.error("SQL agents not initialized. Application may not have started properly.") - await batch_service.update_batch(batch_id, ProcessStatus.FAILED) - return - - # Update agent configuration for this batch's conversion requirements - await update_agent_config(convert_from, convert_to) - - # Walk through each file name and retrieve it from blob storage - # Send file to the agents for processing - # Send status update to the client of type in progress, completed, or failed - for file in batch_files: - # Get the file from blob storage + file_record = FileRecord.fromdb(file) + # Update the file status try: - file_record = FileRecord.fromdb(file) - # Update the file status - try: - file_record.status = ProcessStatus.IN_PROGRESS - await batch_service.update_file_record(file_record) - except Exception as exc: - logger.error("Error updating file status. %s", exc) - - sql_in_file = await storage.get_file(file_record.blob_path) - - # split into base validation routine - # Check if the file is a valid text file <-- - if not is_text(sql_in_file): - logger.error("File is not a valid text file. Skipping.") - # insert data base write to file record stating invalid file - await batch_service.create_file_log( - str(file_record.file_id), - "File is not a valid text file. Skipping.", - "", - LogType.ERROR, - AgentType.ALL, - AuthorRole.ASSISTANT, - ) - # send status update to the client of type failed - send_status_update( - status=FileProcessUpdate( - file_record.batch_id, - file_record.file_id, - ProcessStatus.COMPLETED, - file_result=FileResult.ERROR, - ), - ) - file_record.file_result = FileResult.ERROR - file_record.status = ProcessStatus.COMPLETED - file_record.error_count = 1 - await batch_service.update_file_record(file_record) - continue - else: - logger.info("sql_in_file: %s", sql_in_file) - - # Convert the file - converted_query = await convert_script( - sql_in_file, - file_record, - batch_service, - sql_agents, - ) - if converted_query: - # Add RAI disclaimer to the converted query - converted_query = add_rai_disclaimer(converted_query) - await batch_service.create_candidate( - file["file_id"], converted_query - ) - else: - await batch_service.update_file_counts(file["file_id"]) - except UnicodeDecodeError as ucde: - logger.error("Error decoding file: %s", file) - logger.error("Error decoding file. %s", ucde) - await process_error(ucde, file_record, batch_service) - except ServiceResponseException as sre: - logger.error(file) - logger.error("Error processing file. %s", sre) - # insert data base write to file record stating invalid file - await process_error(sre, file_record, batch_service) + file_record.status = ProcessStatus.IN_PROGRESS + await batch_service.update_file_record(file_record) except Exception as exc: - logger.error(file) - logger.error("Error processing file. %s", exc) - # insert data base write to file record stating invalid file - await process_error(exc, file_record, batch_service) + logger.error("Error updating file status", batch_id=batch_id, file_id=str(file_record.file_id), error=str(exc)) - # Update batch status to completed or failed - try: - await batch_service.batch_files_final_update(batch_id) - await batch_service.update_batch(batch_id, ProcessStatus.COMPLETED) + sql_in_file = await storage.get_file(file_record.blob_path) + + # split into base validation routine + # Check if the file is a valid text file <-- + if not is_text(sql_in_file): + logger.error("File is not a valid text file. Skipping.", exc_info=False, batch_id=batch_id, file_id=str(file_record.file_id)) + # insert data base write to file record stating invalid file + await batch_service.create_file_log( + str(file_record.file_id), + "File is not a valid text file. Skipping.", + "", + LogType.ERROR, + AgentType.ALL, + AuthorRole.ASSISTANT, + ) + # send status update to the client of type failed + send_status_update( + status=FileProcessUpdate( + file_record.batch_id, + file_record.file_id, + ProcessStatus.COMPLETED, + file_result=FileResult.ERROR, + ), + ) + file_record.file_result = FileResult.ERROR + file_record.status = ProcessStatus.COMPLETED + file_record.error_count = 1 + await batch_service.update_file_record(file_record) + continue + else: + logger.info("File content loaded", file_id=str(file_record.file_id), batch_id=batch_id) + + # Convert the file + converted_query = await convert_script( + sql_in_file, + file_record, + batch_service, + sql_agents, + ) + if converted_query: + # Add RAI disclaimer to the converted query + converted_query = add_rai_disclaimer(converted_query) + await batch_service.create_candidate( + file["file_id"], converted_query + ) + else: + await batch_service.update_file_counts(file["file_id"]) + except UnicodeDecodeError as ucde: + logger.error("Error decoding file", batch_id=batch_id, file_id=str(file_record.file_id), error=str(ucde)) + await process_error(ucde, file_record, batch_service) + except ServiceResponseException as sre: + logger.error("Error processing file", batch_id=batch_id, file_id=str(file_record.file_id), error=str(sre)) + # insert data base write to file record stating invalid file + await process_error(sre, file_record, batch_service) except Exception as exc: - await batch_service.update_batch(batch_id, ProcessStatus.FAILED) - logger.error("Error updating batch status. %s", exc) - logger.info("Batch processing complete.") + logger.error("Error processing file", batch_id=batch_id, file_id=str(file_record.file_id), error=str(exc)) + # insert data base write to file record stating invalid file + await process_error(exc, file_record, batch_service) + + # Update batch status to completed or failed + try: + await batch_service.batch_files_final_update(batch_id) + await batch_service.update_batch(batch_id, ProcessStatus.COMPLETED) + except Exception as exc: + await batch_service.update_batch(batch_id, ProcessStatus.FAILED) + logger.error("Error updating final batch status", batch_id=batch_id, error=str(exc)) + logger.info("Batch processing complete", batch_id=batch_id) async def process_error( diff --git a/src/tests/backend/api/api_routes_test.py b/src/tests/backend/api/api_routes_test.py index 7815cd11..101f65a4 100644 --- a/src/tests/backend/api/api_routes_test.py +++ b/src/tests/backend/api/api_routes_test.py @@ -138,6 +138,22 @@ async def test_start_processing_exception(self, mock_process_batch, mock_track_e await start_processing(mock_request) assert exc_info.value.status_code == 500 + @pytest.mark.asyncio + async def test_start_processing_json_parse_error(self, mock_track_event): + """Test processing start when request.json() raises before batch_id is assigned.""" + mock_request = AsyncMock() + mock_request.json = AsyncMock(side_effect=Exception("Invalid JSON")) + + with patch("backend.api.api_routes.record_exception_to_trace"): + with pytest.raises(HTTPException) as exc_info: + await start_processing(mock_request) + assert exc_info.value.status_code == 500 + # batch_id should not be in the telemetry event data when it was never assigned + called_args = mock_track_event.call_args + assert called_args is not None + event_data = called_args[0][1] + assert "batch_id" not in event_data + class TestDownloadFiles: """Tests for download_files endpoint.""" diff --git a/src/tests/backend/api/event_utils_test.py b/src/tests/backend/api/event_utils_test.py index 9e6e2b0f..d4495aff 100644 --- a/src/tests/backend/api/event_utils_test.py +++ b/src/tests/backend/api/event_utils_test.py @@ -1,7 +1,7 @@ """Tests for event_utils module.""" import os -from unittest.mock import MagicMock, patch +from unittest.mock import patch from backend.api.event_utils import track_event_if_configured @@ -11,50 +11,36 @@ class TestTrackEventIfConfigured: def test_track_event_with_instrumentation_key(self): """Test tracking event when instrumentation key is set.""" - connection_string = "InstrumentationKey=test-key;IngestionEndpoint=https://test.com" - with patch.dict(os.environ, {"APPLICATIONINSIGHTS_CONNECTION_STRING": connection_string}): - with patch("backend.api.event_utils._get_telemetry_client") as mock_get_client: - mock_client = MagicMock() - mock_get_client.return_value = mock_client - + with patch.dict(os.environ, {"APPLICATIONINSIGHTS_CONNECTION_STRING": "test-key"}): + with patch("backend.api.event_utils.track_event") as mock_track: track_event_if_configured("TestEvent", {"key": "value"}) - mock_client.track_event.assert_called_once_with("TestEvent", properties={"key": "value"}) - mock_client.flush.assert_called_once() + mock_track.assert_called_once_with("TestEvent", {"key": "value"}) def test_track_event_without_instrumentation_key(self): """Test tracking event when instrumentation key is not set.""" with patch.dict(os.environ, {}, clear=True): # Remove the key if it exists os.environ.pop("APPLICATIONINSIGHTS_CONNECTION_STRING", None) - with patch("backend.api.event_utils._get_telemetry_client") as mock_get_client: + with patch("backend.api.event_utils.track_event") as mock_track: with patch("backend.api.event_utils.logging.warning") as mock_warning: track_event_if_configured("TestEvent", {"key": "value"}) - mock_get_client.assert_not_called() + mock_track.assert_not_called() mock_warning.assert_called_once() def test_track_event_with_empty_data(self): """Test tracking event with empty data.""" - connection_string = "InstrumentationKey=test-key;IngestionEndpoint=https://test.com" - with patch.dict(os.environ, {"APPLICATIONINSIGHTS_CONNECTION_STRING": connection_string}): - with patch("backend.api.event_utils._get_telemetry_client") as mock_get_client: - mock_client = MagicMock() - mock_get_client.return_value = mock_client - + with patch.dict(os.environ, {"APPLICATIONINSIGHTS_CONNECTION_STRING": "test-key"}): + with patch("backend.api.event_utils.track_event") as mock_track: track_event_if_configured("TestEvent", {}) - mock_client.track_event.assert_called_once_with("TestEvent", properties={}) - mock_client.flush.assert_called_once() + mock_track.assert_called_once_with("TestEvent", {}) def test_track_event_with_complex_data(self): """Test tracking event with complex data.""" - connection_string = "InstrumentationKey=test-key;IngestionEndpoint=https://test.com" - with patch.dict(os.environ, {"APPLICATIONINSIGHTS_CONNECTION_STRING": connection_string}): - with patch("backend.api.event_utils._get_telemetry_client") as mock_get_client: - mock_client = MagicMock() - mock_get_client.return_value = mock_client - + with patch.dict(os.environ, {"APPLICATIONINSIGHTS_CONNECTION_STRING": "test-key"}): + with patch("backend.api.event_utils.track_event") as mock_track: complex_data = { "batch_id": "test-batch", "file_count": 10, @@ -64,38 +50,4 @@ def test_track_event_with_complex_data(self): track_event_if_configured("ComplexEvent", complex_data) - # Values are converted to strings in the actual implementation - expected_properties = { - "batch_id": "test-batch", - "file_count": "10", - "status": "completed", - "nested": "{'key': 'value'}", - } - - mock_client.track_event.assert_called_once_with("ComplexEvent", properties=expected_properties) - mock_client.flush.assert_called_once() - - def test_track_event_client_returns_none(self): - """Test tracking event when client initialization fails.""" - connection_string = "InstrumentationKey=test-key;IngestionEndpoint=https://test.com" - with patch.dict(os.environ, {"APPLICATIONINSIGHTS_CONNECTION_STRING": connection_string}): - with patch("backend.api.event_utils._get_telemetry_client") as mock_get_client: - mock_get_client.return_value = None - with patch("backend.api.event_utils.logging.warning") as mock_warning: - track_event_if_configured("TestEvent", {"key": "value"}) - - mock_warning.assert_called_once() - - def test_track_event_with_exception(self): - """Test tracking event when an exception occurs.""" - connection_string = "InstrumentationKey=test-key;IngestionEndpoint=https://test.com" - with patch.dict(os.environ, {"APPLICATIONINSIGHTS_CONNECTION_STRING": connection_string}): - with patch("backend.api.event_utils._get_telemetry_client") as mock_get_client: - mock_client = MagicMock() - mock_client.track_event.side_effect = Exception("Test error") - mock_get_client.return_value = mock_client - - with patch("backend.api.event_utils.logging.error") as mock_error: - track_event_if_configured("TestEvent", {"key": "value"}) - - mock_error.assert_called_once() + mock_track.assert_called_once_with("ComplexEvent", complex_data) diff --git a/src/tests/backend/common/logger/app_logger_test.py b/src/tests/backend/common/logger/app_logger_test.py index 9301eb30..263b0f45 100644 --- a/src/tests/backend/common/logger/app_logger_test.py +++ b/src/tests/backend/common/logger/app_logger_test.py @@ -76,6 +76,38 @@ def test_error_log(logger_instance): log_json = json.loads(mock_error.call_args[0][0]) assert log_json["message"] == "Error log" assert log_json["context"]["error_code"] == 500 + # Called outside an exception context, so exc_info should be False + assert mock_error.call_args[1]["exc_info"] is False + + +def test_error_log_with_active_exception(logger_instance): + with patch.object(logger_instance.logger, "error") as mock_error: + try: + raise ValueError("test error") + except ValueError: + logger_instance.error("Error log inside exception") + mock_error.assert_called_once() + # Called inside an exception context, so exc_info should be True + assert mock_error.call_args[1]["exc_info"] is True + + +def test_error_log_without_active_exception(logger_instance): + with patch.object(logger_instance.logger, "error") as mock_error: + logger_instance.error("Error log outside exception") + mock_error.assert_called_once() + # Called outside an exception context, so exc_info should be False + assert mock_error.call_args[1]["exc_info"] is False + + +def test_error_log_explicit_exc_info_override(logger_instance): + with patch.object(logger_instance.logger, "error") as mock_error: + try: + raise ValueError("test error") + except ValueError: + logger_instance.error("Suppressed traceback", exc_info=False) + mock_error.assert_called_once() + # Explicit exc_info=False should override auto-detection even inside an exception context + assert mock_error.call_args[1]["exc_info"] is False def test_critical_log(logger_instance): diff --git a/src/tests/backend/common/storage/blob_azure_test.py b/src/tests/backend/common/storage/blob_azure_test.py index afc76e7e..519b0395 100644 --- a/src/tests/backend/common/storage/blob_azure_test.py +++ b/src/tests/backend/common/storage/blob_azure_test.py @@ -218,7 +218,7 @@ async def test_blob_storage_init_exception(): }) # Assert that error logging happened with the expected JSON string - mock_logger_instance.error.assert_called_once_with(expected_error_log) + mock_logger_instance.error.assert_called_once_with(expected_error_log, exc_info=True) # Assert that debug log is written for container existence mock_logger_instance.debug.assert_called_once_with(expected_debug_log)