|
16 | 16 | InMemoryCheckpointStorage, |
17 | 17 | AgentRunUpdateEvent, |
18 | 18 | GroupChatRequestSentEvent, |
| 19 | + GroupChatResponseReceivedEvent, |
| 20 | + ExecutorCompletedEvent, |
19 | 21 | MagenticOrchestratorEvent, |
20 | 22 | MagenticProgressLedger, |
21 | 23 | ) |
@@ -45,6 +47,53 @@ def __init__(self): |
45 | 47 | self.user_id: Optional[str] = None |
46 | 48 | self.logger = self.__class__.logger |
47 | 49 |
|
| 50 | + def _extract_response_text(self, data) -> str: |
| 51 | + """ |
| 52 | + Extract text content from various agent_framework response types. |
| 53 | + |
| 54 | + Handles: |
| 55 | + - ChatMessage: Extract .text |
| 56 | + - AgentResponse: Extract .text |
| 57 | + - AgentExecutorResponse: Extract from agent_response.text or full_conversation[-1].text |
| 58 | + - List of any of the above |
| 59 | + """ |
| 60 | + if data is None: |
| 61 | + return "" |
| 62 | + |
| 63 | + # Direct ChatMessage |
| 64 | + if isinstance(data, ChatMessage): |
| 65 | + return data.text or "" |
| 66 | + |
| 67 | + # Has .text attribute directly (AgentResponse, etc.) |
| 68 | + if hasattr(data, "text") and data.text: |
| 69 | + return data.text |
| 70 | + |
| 71 | + # AgentExecutorResponse - has agent_response and full_conversation |
| 72 | + if hasattr(data, "agent_response"): |
| 73 | + # Try to get text from agent_response first |
| 74 | + agent_resp = data.agent_response |
| 75 | + if agent_resp and hasattr(agent_resp, "text") and agent_resp.text: |
| 76 | + return agent_resp.text |
| 77 | + # Fallback to last message in full_conversation |
| 78 | + if hasattr(data, "full_conversation") and data.full_conversation: |
| 79 | + last_msg = data.full_conversation[-1] |
| 80 | + if isinstance(last_msg, ChatMessage) and last_msg.text: |
| 81 | + return last_msg.text |
| 82 | + |
| 83 | + # List of items - could be AgentExecutorResponse, ChatMessage, etc. |
| 84 | + if isinstance(data, list) and len(data) > 0: |
| 85 | + texts = [] |
| 86 | + for item in data: |
| 87 | + # Recursively extract from each item |
| 88 | + item_text = self._extract_response_text(item) |
| 89 | + if item_text: |
| 90 | + texts.append(item_text) |
| 91 | + if texts: |
| 92 | + # Return the last non-empty response (most recent) |
| 93 | + return texts[-1] |
| 94 | + |
| 95 | + return "" |
| 96 | + |
48 | 97 | # --------------------------- |
49 | 98 | # Orchestration construction |
50 | 99 | # --------------------------- |
@@ -336,6 +385,11 @@ async def run_orchestration(self, user_id: str, input_task) -> None: |
336 | 385 | last_message_id: str | None = None |
337 | 386 | async for event in workflow.run_stream(task_text): |
338 | 387 | try: |
| 388 | + # Only log non-streaming events (reduce noise) |
| 389 | + event_type_name = type(event).__name__ |
| 390 | + if event_type_name != "AgentRunUpdateEvent": |
| 391 | + self.logger.info("[EVENT] %s", event_type_name) |
| 392 | + |
339 | 393 | # Handle orchestrator events (plan, progress ledger) |
340 | 394 | if isinstance(event, MagenticOrchestratorEvent): |
341 | 395 | self.logger.info( |
@@ -378,6 +432,34 @@ async def run_orchestration(self, user_id: str, input_task) -> None: |
378 | 432 | event.participant_name |
379 | 433 | ) |
380 | 434 |
|
| 435 | + # Handle group chat response received - THIS IS WHERE AGENT RESPONSES COME |
| 436 | + elif isinstance(event, GroupChatResponseReceivedEvent): |
| 437 | + self.logger.info( |
| 438 | + "[RESPONSE RECEIVED (round %d)] from agent: %s", |
| 439 | + event.round_index, |
| 440 | + event.participant_name |
| 441 | + ) |
| 442 | + # Send the agent response to the UI |
| 443 | + if event.data: |
| 444 | + response_text = self._extract_response_text(event.data) |
| 445 | + |
| 446 | + if response_text: |
| 447 | + self.logger.info("Sending agent response to UI from %s", event.participant_name) |
| 448 | + agent_response_callback( |
| 449 | + event.participant_name, |
| 450 | + ChatMessage(role="assistant", text=response_text), |
| 451 | + user_id, |
| 452 | + ) |
| 453 | + |
| 454 | + # Handle executor completed - just log, don't send to UI (GroupChatResponseReceivedEvent handles that) |
| 455 | + elif isinstance(event, ExecutorCompletedEvent): |
| 456 | + self.logger.debug( |
| 457 | + "[EXECUTOR COMPLETED] agent: %s", |
| 458 | + event.executor_id |
| 459 | + ) |
| 460 | + # Don't send to UI here - GroupChatResponseReceivedEvent already handles agent messages |
| 461 | + # This avoids duplicate messages |
| 462 | + |
381 | 463 | # Handle workflow output event (captures final result) |
382 | 464 | elif isinstance(event, WorkflowOutputEvent): |
383 | 465 | output_data = event.data |
|
0 commit comments