Skip to content

Commit f2eb8e3

Browse files
Revert "Add user state cleanup and enhance AzureAIClient initialization with deployment name fallback"
This reverts commit c513f4b.
1 parent c513f4b commit f2eb8e3

4 files changed

Lines changed: 40 additions & 64 deletions

File tree

src/backend/v4/config/settings.py

Lines changed: 0 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -220,26 +220,6 @@ def cleanup_clarification(self, request_id: str) -> None:
220220
self.clarifications.pop(request_id, None)
221221
self._clarification_events.pop(request_id, None)
222222

223-
def cleanup_user_state(self, user_id: str) -> None:
224-
"""Clean up all state for a user to prevent cross-scenario leakage.
225-
226-
This removes any pending approvals, clarifications, and plans
227-
associated with the user to ensure fresh state for new runs.
228-
"""
229-
# Clean up any plans associated with this user
230-
plans_to_remove = [
231-
plan_id for plan_id, plan in self.plans.items()
232-
if getattr(plan, 'user_id', None) == user_id
233-
]
234-
for plan_id in plans_to_remove:
235-
self.plans.pop(plan_id, None)
236-
self.cleanup_approval(plan_id)
237-
238-
# Clean up any pending approvals/clarifications for this user
239-
# Note: We can't easily map approvals to users without plan context,
240-
# so this primarily clears the plans and their associated approvals
241-
logger.debug("Cleaned up state for user %s (removed %d plans)", user_id, len(plans_to_remove))
242-
243223

244224
class ConnectionConfig:
245225
"""Connection manager for WebSocket connections."""

src/backend/v4/magentic_agents/common/lifecycle.py

Lines changed: 3 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,6 @@
1414
from agent_framework_azure_ai import AzureAIClient
1515
from azure.ai.agents.aio import AgentsClient
1616
from azure.identity.aio import DefaultAzureCredential
17-
from common.config.app_config import config
1817
from common.database.database_base import DatabaseBase
1918
from common.models.messages_af import CurrentTeamAgent, TeamConfiguration
2019
from common.utils.utils_agents import (
@@ -161,12 +160,10 @@ def get_chat_client(self, chat_client) -> AzureAIClient:
161160
and self._agent.chat_client
162161
):
163162
return self._agent.chat_client # type: ignore
164-
# Use model_deployment_name with fallback to default model if empty
165-
deployment_name = self.model_deployment_name or config.AZURE_OPENAI_DEPLOYMENT_NAME
166163
chat_client = AzureAIClient(
167164
project_endpoint=self.project_endpoint,
168165
agent_name=self.agent_name,
169-
model_deployment_name=deployment_name,
166+
model_deployment_name=self.model_deployment_name,
170167
credential=self.creds,
171168
use_latest_version=True,
172169
)
@@ -280,26 +277,20 @@ async def get_database_team_agent(self) -> Optional[AzureAIClient]:
280277

281278
# Create client with resolved ID
282279
if self.agent_name == "RAIAgent" and self.project_client:
283-
# Use RAI deployment name for RAI agents
284-
rai_deployment = config.AZURE_OPENAI_RAI_DEPLOYMENT_NAME
285280
chat_client = AzureAIClient(
286281
project_endpoint=self.project_endpoint,
287282
agent_id=resolved,
288-
model_deployment_name=rai_deployment,
289283
credential=self.creds,
290284
)
291285
self.logger.info(
292-
"RAI.AgentReuseSuccess: Created AzureAIClient (id=%s, model=%s)",
286+
"RAI.AgentReuseSuccess: Created AzureAIClient (id=%s)",
293287
resolved,
294-
rai_deployment,
295288
)
296289
else:
297-
# Use model_deployment_name with fallback to default model if empty
298-
deployment_name = self.model_deployment_name or config.AZURE_OPENAI_DEPLOYMENT_NAME
299290
chat_client = AzureAIClient(
300291
project_endpoint=self.project_endpoint,
301292
agent_id=resolved,
302-
model_deployment_name=deployment_name,
293+
model_deployment_name=self.model_deployment_name,
303294
credential=self.creds,
304295
)
305296
self.logger.info(

src/backend/v4/magentic_agents/foundry_agent.py

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -222,13 +222,11 @@ async def _create_azure_search_enabled_client(self, chatClient=None) -> Optional
222222
)
223223

224224
# Wrap in AzureAIClient using agent_name and agent_version (NOT agent_id)
225-
# Include model_deployment_name to ensure SDK has model info for streaming
226-
deployment_name = self.model_deployment_name or config.AZURE_OPENAI_DEPLOYMENT_NAME
225+
# AzureAIClient constructor: agent_name, agent_version, project_endpoint, credential
227226
chat_client = AzureAIClient(
228227
project_endpoint=self.project_endpoint,
229228
agent_name=azure_agent.name,
230229
agent_version=azure_agent.version, # Use the specific version we just created
231-
model_deployment_name=deployment_name,
232230
credential=self.creds,
233231
)
234232
return chat_client

src/backend/v4/orchestration/orchestration_manager.py

Lines changed: 36 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -133,11 +133,9 @@ async def init_orchestration(
133133

134134
try:
135135
# Create the chat client (AzureAIClient)
136-
# Use team deployment_name with fallback to default model if empty
137-
deployment_name = team_config.deployment_name or config.AZURE_OPENAI_DEPLOYMENT_NAME
138136
chat_client = AzureAIClient(
139137
project_endpoint=config.AZURE_AI_PROJECT_ENDPOINT,
140-
model_deployment_name=deployment_name,
138+
model_deployment_name=team_config.deployment_name,
141139
agent_name=agent_name,
142140
credential=credential,
143141
)
@@ -152,7 +150,7 @@ async def init_orchestration(
152150

153151
cls.logger.info(
154152
"Created AzureAIClient and manager ChatAgent for orchestration with model '%s' at endpoint '%s'",
155-
deployment_name,
153+
team_config.deployment_name,
156154
config.AZURE_AI_PROJECT_ENDPOINT,
157155
)
158156
except Exception as e:
@@ -199,17 +197,19 @@ async def init_orchestration(
199197
# Assemble workflow with callback
200198
storage = InMemoryCheckpointStorage()
201199

202-
# New API: .participants() accepts a list of agents
200+
# New SDK: participants() accepts a Sequence (list) of agents
201+
# The orchestrator uses agent.name to identify them
203202
participant_list = list(participants.values())
203+
cls.logger.info("Participants for workflow: %s", list(participants.keys()))
204+
print(f"[DEBUG] Participants for workflow: {list(participants.keys())}", flush=True)
204205

205206
builder = (
206207
MagenticBuilder()
207-
.participants(participant_list)
208+
.participants(participant_list) # New SDK: pass as list
208209
.with_manager(
209210
manager=manager, # Pass manager instance (extends StandardMagenticManager)
210211
max_round_count=orchestration_config.max_rounds,
211-
max_stall_count=3,
212-
max_reset_count=2,
212+
max_stall_count=0, # CRITICAL: Prevent re-calling agents when stalled (default is 3!)
213213
)
214214
.with_checkpointing(storage)
215215
)
@@ -239,14 +239,16 @@ async def get_current_or_new_orchestration(
239239
Return an existing workflow for the user or create a new one if:
240240
- None exists
241241
- Team switched flag is True
242-
- force_rebuild is True (for new tasks that need fresh workflow)
242+
- force_rebuild is True (for new tasks after workflow completion)
243243
"""
244244
current = orchestration_config.get_current_orchestration(user_id)
245-
if current is None or team_switched or force_rebuild:
245+
needs_rebuild = current is None or team_switched or force_rebuild
246+
247+
if needs_rebuild:
246248
if current is not None and (team_switched or force_rebuild):
247-
reason = "team switched" if team_switched else "force rebuild"
249+
reason = "team switched" if team_switched else "force rebuild for new task"
248250
cls.logger.info(
249-
"Closing previous agents for user '%s' (reason: %s)", user_id, reason
251+
"Rebuilding orchestration for user '%s' (reason: %s)", user_id, reason
250252
)
251253
# Close prior agents (same logic as old version)
252254
for agent in getattr(current, "_participants", {}).values():
@@ -303,11 +305,6 @@ async def run_orchestration(self, user_id: str, input_task) -> None:
303305
Execute the Magentic workflow for the provided user and task description.
304306
"""
305307
job_id = str(uuid.uuid4())
306-
307-
# Clean up any accumulated state from previous runs (cancelled plans, etc.)
308-
# This prevents cross-scenario leakage
309-
orchestration_config.cleanup_user_state(user_id)
310-
311308
orchestration_config.set_approval_pending(job_id)
312309
self.logger.info(
313310
"Starting orchestration job '%s' for user '%s'", job_id, user_id
@@ -320,16 +317,6 @@ async def run_orchestration(self, user_id: str, input_task) -> None:
320317
if workflow is None:
321318
print(f"[ERROR] Orchestration not initialized for user '{user_id}'")
322319
raise ValueError("Orchestration not initialized for user.")
323-
324-
# Reset manager's plan state to prevent leakage from cancelled plans
325-
manager = getattr(workflow, "_manager", None)
326-
if manager and hasattr(manager, "magentic_plan"):
327-
manager.magentic_plan = None
328-
self.logger.debug("Reset manager's magentic_plan for fresh run")
329-
if manager and hasattr(manager, "task_ledger"):
330-
manager.task_ledger = None
331-
self.logger.debug("Reset manager's task_ledger for fresh run")
332-
333320
# Fresh thread per participant to avoid cross-run state bleed
334321
executors = getattr(workflow, "executors", {})
335322
self.logger.debug("Executor keys at run start: %s", list(executors.keys()))
@@ -396,12 +383,16 @@ async def run_orchestration(self, user_id: str, input_task) -> None:
396383
task_text = getattr(input_task, "description", str(input_task))
397384
self.logger.debug("Task: %s", task_text)
398385

386+
# Track how many times each agent is called (for debugging duplicate calls)
387+
agent_call_counts: dict = {}
388+
399389
try:
400390
# Execute workflow using run_stream with task as positional parameter
401391
# The execution settings are configured in the manager/client
402392
final_output: str | None = None
403393

404394
self.logger.info("Starting workflow execution...")
395+
print(f"[ORCHESTRATOR] 🚀 Starting workflow with max_rounds={orchestration_config.max_rounds}", flush=True)
405396
last_message_id: str | None = None
406397
async for event in workflow.run_stream(task_text):
407398
try:
@@ -446,11 +437,20 @@ async def run_orchestration(self, user_id: str, input_task) -> None:
446437

447438
# Handle group chat request sent
448439
elif isinstance(event, GroupChatRequestSentEvent):
440+
agent_name = event.participant_name
441+
agent_call_counts[agent_name] = agent_call_counts.get(agent_name, 0) + 1
442+
call_num = agent_call_counts[agent_name]
443+
449444
self.logger.info(
450-
"[REQUEST SENT (round %d)] to agent: %s",
445+
"[REQUEST SENT (round %d)] to agent: %s (call #%d)",
451446
event.round_index,
452-
event.participant_name
447+
agent_name,
448+
call_num
453449
)
450+
print(f"[ORCHESTRATOR] 📤 REQUEST SENT round={event.round_index} to agent={agent_name} (call #{call_num})", flush=True)
451+
452+
if call_num > 1:
453+
print(f"[ORCHESTRATOR] ⚠️ WARNING: Agent '{agent_name}' called {call_num} times!", flush=True)
454454

455455
# Handle group chat response received - THIS IS WHERE AGENT RESPONSES COME
456456
elif isinstance(event, GroupChatResponseReceivedEvent):
@@ -511,6 +511,13 @@ async def run_orchestration(self, user_id: str, input_task) -> None:
511511
# Extract final result
512512
final_text = final_output if final_output else ""
513513

514+
# Log agent call summary
515+
print(f"\n[ORCHESTRATOR] 📊 AGENT CALL SUMMARY:", flush=True)
516+
for agent_name, count in agent_call_counts.items():
517+
status = "✅" if count == 1 else "⚠️ DUPLICATE"
518+
print(f" {status} {agent_name}: called {count} time(s)", flush=True)
519+
self.logger.info("Agent call counts: %s", agent_call_counts)
520+
514521
# Log results
515522
self.logger.info("\nAgent responses:")
516523
self.logger.info(

0 commit comments

Comments
 (0)