Skip to content

Commit fe85fc8

Browse files
added the lifespan of the agent on container stop and checked the already existed agents are not using the previous resource group connection
1 parent bcd032e commit fe85fc8

6 files changed

Lines changed: 291 additions & 2 deletions

File tree

src/backend/app_kernel.py

Lines changed: 30 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55
# Azure monitoring
66
import re
77
import uuid
8+
from contextlib import asynccontextmanager
89
from typing import Dict, List, Optional
910

1011
from azure.monitor.opentelemetry import configure_azure_monitor
@@ -18,6 +19,34 @@
1819
from v3.api.router import app_v3
1920
# Semantic Kernel imports
2021
from v3.orchestration.orchestration_manager import OrchestrationManager
22+
from v3.config.agent_registry import agent_registry
23+
24+
@asynccontextmanager
25+
async def lifespan(app: FastAPI):
26+
"""Manage FastAPI application lifecycle - startup and shutdown."""
27+
logger = logging.getLogger(__name__)
28+
29+
# Startup
30+
logger.info("🚀 Starting MACAE application...")
31+
yield
32+
33+
# Shutdown
34+
logger.info("🛑 Shutting down MACAE application...")
35+
try:
36+
# Import here to avoid circular imports and get agent registry
37+
38+
39+
# Clean up all agents from Azure AI Foundry when container stops
40+
await agent_registry.cleanup_all_agents()
41+
logger.info("✅ Agent cleanup completed successfully")
42+
43+
except ImportError as ie:
44+
logger.error(f"❌ Could not import agent_registry: {ie}")
45+
except Exception as e:
46+
logger.error(f"❌ Error during shutdown cleanup: {e}")
47+
48+
logger.info("👋 MACAE application shutdown complete")
49+
2150

2251
# Check if the Application Insights Instrumentation Key is set in the environment variables
2352
connection_string = config.APPLICATIONINSIGHTS_CONNECTION_STRING
@@ -48,7 +77,7 @@
4877
)
4978

5079
# Initialize the FastAPI app
51-
app = FastAPI()
80+
app = FastAPI(lifespan=lifespan)
5281

5382
frontend_url = config.FRONTEND_SITE_NAME
5483

src/backend/common/utils/utils_kernel.py

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,8 @@
77
from semantic_kernel.agents.azure_ai.azure_ai_agent import AzureAIAgent
88
from v3.magentic_agents.foundry_agent import FoundryAgentTemplate
99

10+
from v3.config.agent_registry import agent_registry
11+
1012
logging.basicConfig(level=logging.INFO)
1113

1214
# Cache for agent instances by session
@@ -45,6 +47,12 @@ async def create_RAI_agent() -> FoundryAgentTemplate:
4547
)
4648

4749
await agent.open()
50+
51+
try:
52+
agent_registry.register_agent(agent)
53+
54+
except Exception as registry_error:
55+
logging.warning(f"Failed to register agent '{agent.agent_name}' with registry: {registry_error}")
4856
return agent
4957

5058
async def _get_agent_response(agent: FoundryAgentTemplate, query: str) -> str:
Lines changed: 140 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,140 @@
1+
# Copyright (c) Microsoft. All rights reserved.
2+
"""Global agent registry for tracking and managing agent lifecycles across the application."""
3+
4+
import asyncio
5+
import logging
6+
import threading
7+
from typing import List, Dict, Any, Optional
8+
from weakref import WeakSet
9+
10+
11+
class AgentRegistry:
12+
"""Global registry for tracking and managing all agent instances across the application."""
13+
14+
def __init__(self):
15+
self.logger = logging.getLogger(__name__)
16+
self._lock = threading.Lock()
17+
self._all_agents: WeakSet = WeakSet()
18+
self._agent_metadata: Dict[int, Dict[str, Any]] = {}
19+
20+
def register_agent(self, agent: Any, user_id: Optional[str] = None) -> None:
21+
"""Register an agent instance for tracking and lifecycle management."""
22+
with self._lock:
23+
try:
24+
self._all_agents.add(agent)
25+
agent_id = id(agent)
26+
self._agent_metadata[agent_id] = {
27+
'type': type(agent).__name__,
28+
'user_id': user_id,
29+
'name': getattr(agent, 'agent_name', getattr(agent, 'name', 'Unknown'))
30+
}
31+
self.logger.info(f"Registered agent: {type(agent).__name__} (ID: {agent_id}, User: {user_id})")
32+
except Exception as e:
33+
self.logger.error(f"Failed to register agent: {e}")
34+
35+
def unregister_agent(self, agent: Any) -> None:
36+
"""Unregister an agent instance."""
37+
with self._lock:
38+
try:
39+
agent_id = id(agent)
40+
self._all_agents.discard(agent)
41+
if agent_id in self._agent_metadata:
42+
metadata = self._agent_metadata.pop(agent_id)
43+
self.logger.info(f"Unregistered agent: {metadata.get('type', 'Unknown')} (ID: {agent_id})")
44+
except Exception as e:
45+
self.logger.error(f"Failed to unregister agent: {e}")
46+
47+
def get_all_agents(self) -> List[Any]:
48+
"""Get all currently registered agents."""
49+
with self._lock:
50+
return list(self._all_agents)
51+
52+
def get_agent_count(self) -> int:
53+
"""Get the total number of registered agents."""
54+
with self._lock:
55+
return len(self._all_agents)
56+
57+
async def cleanup_all_agents(self) -> None:
58+
"""Clean up all registered agents across all users."""
59+
all_agents = self.get_all_agents()
60+
61+
if not all_agents:
62+
self.logger.info("No agents to clean up")
63+
return
64+
65+
self.logger.info(f"🧹 Starting cleanup of {len(all_agents)} total agents")
66+
67+
# Log agent details for debugging
68+
for i, agent in enumerate(all_agents):
69+
agent_name = getattr(agent, 'agent_name', getattr(agent, 'name', type(agent).__name__))
70+
agent_type = type(agent).__name__
71+
has_close = hasattr(agent, 'close')
72+
self.logger.info(f"Agent {i+1}: {agent_name} (Type: {agent_type}, Has close(): {has_close})")
73+
74+
# Clean up agents concurrently
75+
cleanup_tasks = []
76+
for agent in all_agents:
77+
if hasattr(agent, 'close'):
78+
cleanup_tasks.append(self._safe_close_agent(agent))
79+
else:
80+
agent_name = getattr(agent, 'agent_name', getattr(agent, 'name', type(agent).__name__))
81+
self.logger.warning(f"⚠️ Agent {agent_name} has no close() method - just unregistering from registry")
82+
self.unregister_agent(agent)
83+
84+
if cleanup_tasks:
85+
self.logger.info(f"🔄 Executing {len(cleanup_tasks)} cleanup tasks...")
86+
results = await asyncio.gather(*cleanup_tasks, return_exceptions=True)
87+
88+
# Log any exceptions that occurred during cleanup
89+
success_count = 0
90+
for i, result in enumerate(results):
91+
if isinstance(result, Exception):
92+
self.logger.error(f"❌ Error cleaning up agent {i}: {result}")
93+
else:
94+
success_count += 1
95+
96+
self.logger.info(f"✅ Successfully cleaned up {success_count}/{len(cleanup_tasks)} agents")
97+
98+
# Clear all tracking
99+
with self._lock:
100+
self._all_agents.clear()
101+
self._agent_metadata.clear()
102+
103+
self.logger.info("🎉 Completed cleanup of all agents")
104+
105+
async def _safe_close_agent(self, agent: Any) -> None:
106+
"""Safely close an agent with error handling."""
107+
try:
108+
agent_name = getattr(agent, 'agent_name', getattr(agent, 'name', type(agent).__name__))
109+
self.logger.info(f"Closing agent: {agent_name}")
110+
111+
# Call the agent's close method - it should handle Azure deletion and registry cleanup
112+
if asyncio.iscoroutinefunction(agent.close):
113+
await agent.close()
114+
else:
115+
agent.close()
116+
117+
self.logger.info(f"Successfully closed agent: {agent_name}")
118+
119+
except Exception as e:
120+
agent_name = getattr(agent, 'agent_name', getattr(agent, 'name', type(agent).__name__))
121+
self.logger.error(f"Failed to close agent {agent_name}: {e}")
122+
123+
def get_registry_status(self) -> Dict[str, Any]:
124+
"""Get current status of the agent registry for debugging and monitoring."""
125+
with self._lock:
126+
status = {
127+
'total_agents': len(self._all_agents),
128+
'agent_types': {}
129+
}
130+
131+
# Count agents by type
132+
for agent in self._all_agents:
133+
agent_type = type(agent).__name__
134+
status['agent_types'][agent_type] = status['agent_types'].get(agent_type, 0) + 1
135+
136+
return status
137+
138+
139+
# Global registry instance
140+
agent_registry = AgentRegistry()

src/backend/v3/magentic_agents/common/lifecycle.py

Lines changed: 30 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@
88
from semantic_kernel.agents.azure_ai.azure_ai_agent import AzureAIAgent
99
from semantic_kernel.connectors.mcp import MCPStreamableHttpPlugin
1010
from v3.magentic_agents.models.agent_models import MCPConfig
11+
from v3.config.agent_registry import agent_registry
1112

1213

1314
class MCPEnabledBase:
@@ -120,5 +121,33 @@ async def open(self) -> "AzureAgentBase":
120121
return self
121122

122123
async def close(self) -> None:
123-
await self.creds.close()
124+
"""
125+
Close the agent and clean up Azure AI Foundry resources.
126+
This method deletes the agent from Azure AI Foundry and closes credentials.
127+
"""
128+
129+
try:
130+
# Delete agent from Azure AI Foundry if we have the necessary information
131+
if hasattr(self, '_agent') and self._agent and hasattr(self._agent, 'definition'):
132+
agent_id = getattr(self._agent.definition, 'id', None)
133+
agent_name = getattr(self, 'agent_name', 'Unknown')
134+
135+
if agent_id and self.client:
136+
try:
137+
await self.client.agents.delete_agent(agent_id)
138+
except Exception as delete_error:
139+
pass
140+
# Unregister from agent registry
141+
try:
142+
agent_registry.unregister_agent(self)
143+
except Exception as registry_error:
144+
pass
145+
except Exception as cleanup_error:
146+
pass
147+
# Always close credentials and parent resources
148+
try:
149+
if hasattr(self, 'creds') and self.creds:
150+
await self.creds.close()
151+
except Exception as creds_error:
152+
pass
124153
await super().close()

src/backend/v3/magentic_agents/foundry_agent.py

Lines changed: 75 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,8 @@
1010
from v3.magentic_agents.common.lifecycle import AzureAgentBase
1111
from v3.magentic_agents.models.agent_models import MCPConfig, SearchConfig
1212

13+
from v3.config.agent_registry import agent_registry
14+
1315
# from v3.magentic_agents.models.agent_models import (BingConfig, MCPConfig,
1416
# SearchConfig)
1517

@@ -124,6 +126,15 @@ async def _after_open(self) -> None:
124126

125127
# Try to get existing agent definition from Foundry
126128
definition = await self._get_azure_ai_agent_definition(self.agent_name)
129+
130+
# Check if existing definition uses the same connection name
131+
if definition is not None:
132+
connection_compatible = await self._check_connection_compatibility(definition)
133+
if not connection_compatible:
134+
await self.client.agents.delete_agent(definition.id)
135+
self.logger.info(f"Existing agent '{self.agent_name}' uses different connection. Creating new agent definition.")
136+
definition = None
137+
127138
# If not found in Foundry, create a new one
128139
if definition is None:
129140
# Collect all tools
@@ -152,6 +163,13 @@ async def _after_open(self) -> None:
152163
self.logger.error("Failed to create AzureAIAgent: %s", ex)
153164
raise
154165

166+
# Register agent with global registry for tracking and cleanup
167+
try:
168+
agent_registry.register_agent(self)
169+
self.logger.info(f"📝 Registered agent '{self.agent_name}' with global registry")
170+
except Exception as registry_error:
171+
self.logger.warning(f"⚠️ Failed to register agent '{self.agent_name}' with registry: {registry_error}")
172+
155173
# # After self._agent creation in _after_open:
156174
# # Diagnostics
157175
# try:
@@ -186,6 +204,63 @@ async def fetch_run_details(self, thread_id: str, run_id: str):
186204
except Exception as ex:
187205
self.logger.error("Could not fetch run details: %s", ex)
188206

207+
async def _check_connection_compatibility(self, existing_definition) -> bool:
208+
"""
209+
Check if the existing agent definition uses the same connection name as the current configuration.
210+
211+
Args:
212+
existing_definition: The existing agent definition from Azure AI Foundry
213+
214+
Returns:
215+
bool: True if connections are compatible, False otherwise
216+
"""
217+
try:
218+
# Check if we have search configuration to compare
219+
if not self.search or not self.search.connection_name:
220+
self.logger.info("No search configuration to compare")
221+
return True
222+
223+
# Get tool resources from existing definition
224+
if not hasattr(existing_definition, 'tool_resources') or not existing_definition.tool_resources:
225+
self.logger.info("Existing definition has no tool resources")
226+
return not self.search.connection_name # Compatible if we also don't need search
227+
228+
# Check Azure AI Search tool resources
229+
azure_ai_search_resources = existing_definition.tool_resources.get('azure_ai_search', {})
230+
if not azure_ai_search_resources:
231+
self.logger.info("Existing definition has no Azure AI Search resources")
232+
return not self.search.connection_name # Compatible if we also don't need search
233+
234+
# Get connection ID from existing definition
235+
indexes = azure_ai_search_resources.get('indexes')[0]
236+
existing_connection_id = indexes.get('index_connection_id')
237+
if not existing_connection_id:
238+
self.logger.info("Existing definition has no connection ID")
239+
return False
240+
241+
# Get the current connection to compare
242+
try:
243+
current_connection = await self.client.connections.get(name=self.search.connection_name)
244+
current_connection_id = current_connection.id
245+
246+
# Compare connection IDs
247+
is_compatible = existing_connection_id == current_connection_id
248+
249+
if is_compatible:
250+
self.logger.info(f"Connection compatible: existing connection ID {existing_connection_id} matches current connection")
251+
else:
252+
self.logger.info(f"Connection mismatch: existing connection ID {existing_connection_id} != current connection ID {current_connection_id}")
253+
254+
return is_compatible
255+
256+
except Exception as conn_ex:
257+
self.logger.error(f"Failed to get current connection '{self.search.connection_name}': {conn_ex}")
258+
return False
259+
260+
except Exception as ex:
261+
self.logger.error(f"Error checking connection compatibility: {ex}")
262+
return False
263+
189264
async def _get_azure_ai_agent_definition(self, agent_name: str)-> Awaitable[Agent | None]:
190265
"""
191266
Gets an Azure AI Agent with the specified name and instructions using AIProjectClient if it is already created.

src/backend/v3/magentic_agents/reasoning_agent.py

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@
1010
from v3.magentic_agents.common.lifecycle import MCPEnabledBase
1111
from v3.magentic_agents.models.agent_models import MCPConfig, SearchConfig
1212
from v3.magentic_agents.reasoning_search import ReasoningSearch
13+
from v3.config.agent_registry import agent_registry
1314

1415

1516
class ReasoningAgentTemplate(MCPEnabledBase):
@@ -74,6 +75,13 @@ async def _after_open(self) -> None:
7475
instructions=self.agent_instructions,
7576
)
7677

78+
# Register agent with global registry for tracking and cleanup
79+
try:
80+
agent_registry.register_agent(self)
81+
self.logger.info(f"📝 Registered agent '{self.agent_name}' with global registry")
82+
except Exception as registry_error:
83+
self.logger.warning(f"⚠️ Failed to register agent '{self.agent_name}' with registry: {registry_error}")
84+
7785
async def invoke(self, message: str):
7886
"""Invoke the agent with a message."""
7987
if not self._agent:

0 commit comments

Comments
 (0)