Skip to content

Commit 2e228d6

Browse files
committed
fix: App Config credential scopes, print-to-logging, poll status delegation, async concurrency
- Fix AzureAppConfigurationClient credential_scopes across all 3 projects (Workflow, Processor, API) to use canonical https://azconfig.io/.default - Replace all print() calls with proper logging in Workflow source files - Add on_poll callback to ContentProcessService.poll_status() for intermediate status updates during document processing - Guard poll callback against race conditions (skip duplicate/terminal status) - Wrap sync blob/queue I/O in asyncio.to_thread() for true concurrent document processing with semaphore - Make get_steps() async and update call sites - Fix tests for credential_scopes kwargs and async get_steps - Apply ruff format across codebase
1 parent 2d2200d commit 2e228d6

34 files changed

Lines changed: 312 additions & 262 deletions

src/ContentProcessor/src/libs/azure_helper/app_configuration.py

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -44,8 +44,13 @@ def _initialize_client(self):
4444
if self.app_config_endpoint is None:
4545
raise ValueError("App Configuration Endpoint is not set.")
4646

47+
# The SDK incorrectly derives the credential scope from the endpoint URL
48+
# instead of using the canonical audience "https://azconfig.io/.default",
49+
# which causes a 403 when authenticating via Entra ID.
4750
self.app_config_client = AzureAppConfigurationClient(
48-
self.app_config_endpoint, self.credential
51+
self.app_config_endpoint,
52+
self.credential,
53+
credential_scopes=["https://azconfig.io/.default"],
4954
)
5055

5156
def read_configuration(self):

src/ContentProcessorAPI/app/libs/azure/app_configuration/helper.py

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -45,8 +45,13 @@ def _initialize_client(self):
4545
if self.app_config_endpoint is None:
4646
raise ValueError("App Configuration Endpoint is not set.")
4747

48+
# The SDK incorrectly derives the credential scope from the endpoint URL
49+
# instead of using the canonical audience "https://azconfig.io/.default",
50+
# which causes a 403 when authenticating via Entra ID.
4851
self.app_config_client = AzureAppConfigurationClient(
49-
self.app_config_endpoint, self.credential
52+
self.app_config_endpoint,
53+
self.credential,
54+
credential_scopes=["https://azconfig.io/.default"],
5055
)
5156

5257
def read_configuration(self):

src/ContentProcessorWorkflow/src/libs/agent_framework/agent_framework_settings.py

Lines changed: 12 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -32,13 +32,16 @@
3232
print(cfg.endpoint, cfg.chat_deployment_name)
3333
"""
3434

35+
import logging
3536
import os
3637

3738
from pydantic import Field, model_validator
3839

3940
from libs.application.application_configuration import _configuration_base
4041
from libs.application.service_config import ServiceConfig
4142

43+
logger = logging.getLogger(__name__)
44+
4245

4346
class AgentFrameworkSettings(_configuration_base):
4447
"""Pydantic settings that auto-discover Azure OpenAI services from env vars.
@@ -180,8 +183,10 @@ def discover_services(self):
180183
config = ServiceConfig(service_id, prefix, env_vars, use_entra_id=True)
181184
if config.is_valid():
182185
discovered_configs[service_id] = config
183-
print(
184-
f"Discovered valid service configuration: {service_id} (prefix: {prefix})"
186+
logger.info(
187+
"Discovered valid service configuration: %s (prefix: %s)",
188+
service_id,
189+
prefix,
185190
)
186191
else:
187192
missing_fields = []
@@ -191,8 +196,11 @@ def discover_services(self):
191196
missing_fields.append("ENDPOINT")
192197
if not config.chat_deployment_name:
193198
missing_fields.append("CHAT_DEPLOYMENT_NAME")
194-
print(
195-
f"Incomplete service configuration for {service_id} (prefix: {prefix}) - Missing: {', '.join(missing_fields)}"
199+
logger.warning(
200+
"Incomplete service configuration for %s (prefix: %s) - Missing: %s",
201+
service_id,
202+
prefix,
203+
", ".join(missing_fields),
196204
)
197205

198206
self.service_configs = discovered_configs

src/ContentProcessorWorkflow/src/libs/agent_framework/agent_speaking_capture.py

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -32,11 +32,14 @@
3232
``GroupChatOrchestrator``’s response buffer.
3333
"""
3434

35+
import logging
3536
from datetime import datetime
3637
from typing import Any, Callable, Optional
3738

3839
from agent_framework import AgentMiddleware, AgentRunContext
3940

41+
logger = logging.getLogger(__name__)
42+
4043

4144
class AgentSpeakingCaptureMiddleware(AgentMiddleware):
4245
"""Middleware to capture agent name and response for each agent invocation with callback support.
@@ -231,8 +234,8 @@ async def _trigger_callback(self, capture_data: dict[str, Any]):
231234
await loop.run_in_executor(None, self.callback, capture_data)
232235
except Exception as e:
233236
# Log error but don't break the middleware chain
234-
print(
235-
f"[WARNING] Callback error in AgentSpeakingCaptureMiddleware: {e}"
237+
logger.warning(
238+
"Callback error in AgentSpeakingCaptureMiddleware: %s", e
236239
)
237240

238241
async def _trigger_stream_complete_callback(self, capture_data: dict[str, Any]):
@@ -267,7 +270,7 @@ async def _trigger_stream_complete_callback(self, capture_data: dict[str, Any]):
267270
)
268271
except Exception as e:
269272
# Log error but don't break the middleware chain
270-
print(f"[WARNING] Stream complete callback error: {e}")
273+
logger.warning("Stream complete callback error: %s", e)
271274

272275
def get_all_responses(self) -> list[dict[str, Any]]:
273276
"""Get all captured responses.

src/ContentProcessorWorkflow/src/libs/agent_framework/groupchat_orchestrator.py

Lines changed: 14 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -991,23 +991,27 @@ def _extract_function_calls(self, contents: Any) -> list[dict[str, Any]]:
991991
name = getattr(item, "name", None)
992992
call_id = getattr(item, "call_id", None)
993993
if name and call_id:
994-
calls.append({
995-
"name": name,
996-
"call_id": call_id,
997-
"arguments": getattr(item, "arguments", None),
998-
})
994+
calls.append(
995+
{
996+
"name": name,
997+
"call_id": call_id,
998+
"arguments": getattr(item, "arguments", None),
999+
}
1000+
)
9991001
continue
10001002

10011003
# Dict path (serialized content)
10021004
if isinstance(item, dict) and item.get("type") in {
10031005
"function_call",
10041006
"tool_call",
10051007
}:
1006-
calls.append({
1007-
"name": item.get("name"),
1008-
"call_id": item.get("call_id"),
1009-
"arguments": item.get("arguments"),
1010-
})
1008+
calls.append(
1009+
{
1010+
"name": item.get("name"),
1011+
"call_id": item.get("call_id"),
1012+
"arguments": item.get("arguments"),
1013+
}
1014+
)
10111015
continue
10121016

10131017
return calls

src/ContentProcessorWorkflow/src/libs/agent_framework/middlewares.py

Lines changed: 44 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@
2828
structured logging via ``logging.getLogger``.
2929
"""
3030

31+
import logging
3132
import time
3233
from collections.abc import Awaitable, Callable
3334

@@ -42,6 +43,8 @@
4243
Role,
4344
)
4445

46+
logger = logging.getLogger(__name__)
47+
4548

4649
class DebuggingMiddleware(AgentMiddleware):
4750
"""Run-level debugging middleware that prints diagnostic info.
@@ -74,19 +77,18 @@ async def process(
7477
streaming configuration.
7578
next: Callable that invokes the next middleware or the agent itself.
7679
"""
77-
print("[Debug] Debug mode enabled for this run")
78-
print(f"[Debug] Messages count: {len(context.messages)}")
79-
print(f"[Debug] Is streaming: {context.is_streaming}")
80+
logger.debug("Debug mode enabled for this run")
81+
logger.debug("Messages count: %d", len(context.messages))
82+
logger.debug("Is streaming: %s", context.is_streaming)
8083

81-
# Log existing metadata from agent middleware
8284
if context.metadata:
83-
print(f"[Debug] Existing metadata: {context.metadata}")
85+
logger.debug("Existing metadata: %s", context.metadata)
8486

8587
context.metadata["debug_enabled"] = True
8688

8789
await next(context)
8890

89-
print("[Debug] Debug information collected")
91+
logger.debug("Debug information collected")
9092

9193

9294
class LoggingFunctionMiddleware(FunctionMiddleware):
@@ -136,59 +138,60 @@ async def process(
136138
duration = end_time - start_time
137139

138140
# Build comprehensive log output
139-
print("\n" + "=" * 80)
140-
print("[LoggingFunctionMiddleware] Function Call")
141-
print("=" * 80)
142-
print(f"Function Name: {function_name}")
143-
print(f"Execution Time: {duration:.5f}s")
141+
log_lines = [
142+
"",
143+
"=" * 80,
144+
"[LoggingFunctionMiddleware] Function Call",
145+
"=" * 80,
146+
f"Function Name: {function_name}",
147+
f"Execution Time: {duration:.5f}s",
148+
]
144149

145-
# Display arguments
146150
if args_info:
147-
print("\nArguments:")
151+
log_lines.append("\nArguments:")
148152
for arg in args_info:
149-
print(f" - {arg}")
153+
log_lines.append(f" - {arg}")
150154
else:
151-
print("\nArguments: None")
155+
log_lines.append("\nArguments: None")
152156

153-
# Display output results
154157
if context.result:
155-
print("\nOutput Results:")
158+
log_lines.append("\nOutput Results:")
156159

157-
# Ensure context.result is treated as a list
158160
results = (
159161
context.result if isinstance(context.result, list) else [context.result]
160162
)
161163

162164
for idx, result in enumerate(results):
163-
print(f" Result #{idx + 1}:")
165+
log_lines.append(f" Result #{idx + 1}:")
164166

165-
# Use raw_representation to get the actual output
166167
if hasattr(result, "raw_representation"):
167168
raw_output = result.raw_representation
168169
raw_type = type(raw_output).__name__
169-
print(f" Type: {raw_type}")
170+
log_lines.append(f" Type: {raw_type}")
170171

171-
# Limit output length for very large content
172172
output_str = str(raw_output)
173173
if len(output_str) > 1000:
174-
print(f" Output (truncated): {output_str[:1000]}...")
174+
log_lines.append(
175+
f" Output (truncated): {output_str[:1000]}..."
176+
)
175177
else:
176-
print(f" Output: {output_str}")
177-
# result is just string or primitive
178+
log_lines.append(f" Output: {output_str}")
178179
else:
179180
output_str = str(result)
180181
if len(output_str) > 1000:
181-
print(f" Output (truncated): {output_str[:1000]}...")
182+
log_lines.append(
183+
f" Output (truncated): {output_str[:1000]}..."
184+
)
182185
else:
183-
print(f" Output: {output_str}")
186+
log_lines.append(f" Output: {output_str}")
184187

185-
# Check if result has error flag
186188
if hasattr(result, "is_error"):
187-
print(f" Is Error: {result.is_error}")
189+
log_lines.append(f" Is Error: {result.is_error}")
188190
else:
189-
print("\nOutput Results: None")
191+
log_lines.append("\nOutput Results: None")
190192

191-
print("=" * 80 + "\n")
193+
log_lines.append("=" * 80)
194+
logger.debug("\n".join(log_lines))
192195

193196

194197
class InputObserverMiddleware(ChatMiddleware):
@@ -234,13 +237,15 @@ async def process(
234237
context: The chat context containing the message list.
235238
next: Callable that invokes the next middleware or the chat client.
236239
"""
237-
print("[InputObserverMiddleware] Observing input messages:")
240+
logger.debug("[InputObserverMiddleware] Observing input messages:")
238241

239242
for i, message in enumerate(context.messages):
240243
content = message.text if message.text else str(message.contents)
241-
print(f" Message {i + 1} ({message.role.value}): {content}")
244+
logger.debug(" Message %d (%s): %s", i + 1, message.role.value, content)
242245

243-
print(f"[InputObserverMiddleware] Total messages: {len(context.messages)}")
246+
logger.debug(
247+
"[InputObserverMiddleware] Total messages: %d", len(context.messages)
248+
)
244249

245250
# Modify user messages by creating new messages with enhanced text
246251
modified_messages: list[ChatMessage] = []
@@ -253,8 +258,10 @@ async def process(
253258

254259
if self.replacement:
255260
updated_text = self.replacement
256-
print(
257-
f"[InputObserverMiddleware] Updated: '{original_text}' -> '{updated_text}'"
261+
logger.debug(
262+
"[InputObserverMiddleware] Updated: '%s' -> '%s'",
263+
original_text,
264+
updated_text,
258265
)
259266

260267
modified_message = ChatMessage(role=message.role, text=updated_text)
@@ -270,4 +277,4 @@ async def process(
270277
await next(context)
271278

272279
# Observe that processing is complete
273-
print("[InputObserverMiddleware] Processing completed")
280+
logger.debug("[InputObserverMiddleware] Processing completed")

src/ContentProcessorWorkflow/src/libs/application/application_configuration.py

Lines changed: 2 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -156,12 +156,8 @@ class Configuration(_configuration_base):
156156
app_cosmos_container_process: str = Field(
157157
default="Processes", alias="APP_COSMOS_CONTAINER_PROCESS"
158158
)
159-
app_storage_blob_url: str = Field(
160-
default="", alias="APP_STORAGE_BLOB_URL"
161-
)
162-
app_storage_queue_url: str = Field(
163-
default="", alias="APP_STORAGE_QUEUE_URL"
164-
)
159+
app_storage_blob_url: str = Field(default="", alias="APP_STORAGE_BLOB_URL")
160+
app_storage_queue_url: str = Field(default="", alias="APP_STORAGE_QUEUE_URL")
165161
app_message_queue_extract: str = Field(
166162
default="content-pipeline-extract-queue", alias="APP_MESSAGE_QUEUE_EXTRACT"
167163
)

src/ContentProcessorWorkflow/src/libs/azure/app_configuration.py

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -108,8 +108,14 @@ def _initialize_client(self):
108108
if self.credential is None:
109109
raise ValueError("Azure credential is not set.")
110110

111+
# The SDK incorrectly derives the credential scope from the endpoint URL
112+
# (e.g. "https://<name>.azconfig.io/.default") instead of using the
113+
# canonical audience "https://azconfig.io/.default", which causes a 403
114+
# when authenticating via Entra ID. Override it explicitly.
111115
self.app_config_client = AzureAppConfigurationClient(
112-
self.app_config_endpoint, self.credential
116+
self.app_config_endpoint,
117+
self.credential,
118+
credential_scopes=["https://azconfig.io/.default"],
113119
)
114120

115121
def read_configuration(self):

src/ContentProcessorWorkflow/src/main.py

Lines changed: 10 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@
99
"""
1010

1111
import asyncio
12+
import logging
1213
import os
1314

1415
from sas.storage.blob.async_helper import AsyncStorageBlobHelper
@@ -25,6 +26,8 @@
2526
from services.content_process_service import ContentProcessService
2627
from steps.claim_processor import ClaimProcessor
2728

29+
logger = logging.getLogger(__name__)
30+
2831

2932
class Application(ApplicationBase):
3033
"""Local-development application that runs a single claim workflow.
@@ -41,11 +44,7 @@ def __init__(self):
4144

4245
def initialize(self):
4346
"""Bootstrap the application context and register services."""
44-
print(
45-
"Application initialized with configuration:",
46-
self.application_context.configuration,
47-
)
48-
47+
logger.info("Application initialized.")
4948
self.register_services()
5049

5150
def register_services(self):
@@ -58,8 +57,9 @@ def register_services(self):
5857
)
5958

6059
(
61-
self.application_context
62-
.add_singleton(DebuggingMiddleware, DebuggingMiddleware)
60+
self.application_context.add_singleton(
61+
DebuggingMiddleware, DebuggingMiddleware
62+
)
6363
.add_singleton(LoggingFunctionMiddleware, LoggingFunctionMiddleware)
6464
.add_singleton(InputObserverMiddleware, InputObserverMiddleware)
6565
.add_singleton(Mem0AsyncMemoryManager, Mem0AsyncMemoryManager)
@@ -97,7 +97,9 @@ async def run(self):
9797
input_data = "cbe699df-9cc2-440f-adb5-17ab55154d92"
9898
process_output = await claim_processor.run(input_data=input_data)
9999

100-
print(f"Claim Processing Workflow completed. Final Output: {process_output}")
100+
logger.info(
101+
"Claim Processing Workflow completed. Final Output: %s", process_output
102+
)
101103

102104

103105
async def main():

0 commit comments

Comments
 (0)