Skip to content

Commit 402f7ec

Browse files
authored
feat(ai): add Prompt management and HTTP transport support (#307)
- Add Prompt model with template variable rendering ({{var}} syntax) - Add gRPC and HTTP transport for Prompt queries (QueryPromptRequest/Response) - Add AiHttpClientProxy for HTTP-based AI operations (GET /v3/client/ai/prompt) - Add PromptCacheHolder with MD5 conditional polling and change notification - Add PromptSubscribeManager for prompt change subscriptions - Support transport mode selection via ClientConfig.ai_transport_mode (grpc/http) - Register QueryPromptResponse in GrpcUtils.remote_type - Bump version to 3.2.0b1
1 parent f1cce89 commit 402f7ec

19 files changed

Lines changed: 543 additions & 15 deletions

setup.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -53,7 +53,7 @@ def run(self):
5353

5454
setup(
5555
name="nacos-sdk-python",
56-
version="3.0.4",
56+
version="3.2.0b1",
5757
packages=find_packages(
5858
exclude=["test", "*.tests", "*.tests.*", "tests.*", "tests"]),
5959
url="https://github.com/nacos-group/nacos-sdk-python",

v2/nacos/__init__.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
__version__ = "3.0.4"
1+
__version__ = "3.2.0b1"
22

33
from .common.client_config import (KMSConfig,
44
GRPCConfig,

v2/nacos/ai/model/ai_constant.py

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,4 +23,11 @@ class AIConstants:
2323

2424
A2A_ENDPOINT_TYPE_SERVICE = "SERVICE"
2525

26-
A2A_ENDPOINT_DEFAULT_TRANSPORT = "JSONRPC"
26+
A2A_ENDPOINT_DEFAULT_TRANSPORT = "JSONRPC"
27+
28+
# AI transport mode: "grpc" (default) or "http"
29+
AI_TRANSPORT_MODE_GRPC = "grpc"
30+
AI_TRANSPORT_MODE_HTTP = "http"
31+
32+
# Default prompt cache update interval in seconds
33+
DEFAULT_PROMPT_CACHE_UPDATE_INTERVAL = 10

v2/nacos/ai/model/ai_param.py

Lines changed: 20 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@
77
from v2.nacos.ai.model.ai_constant import AIConstants
88
from v2.nacos.ai.model.mcp.mcp import McpServerBasicInfo, McpToolSpecification, \
99
McpEndpointSpec, McpServerDetailInfo
10+
from v2.nacos.ai.model.prompt.prompt import Prompt
1011

1112

1213
class GetMcpServerParam(BaseModel):
@@ -97,4 +98,22 @@ class SubscribeAgentCardParam(BaseModel):
9798
version: Optional[str] = None
9899
# Parameters: agent_name, agent_card_detail_info
99100
subscribe_callback: Optional[
100-
Callable[[str, AgentCardDetailInfo], Awaitable[None]]] = None
101+
Callable[[str, AgentCardDetailInfo], Awaitable[None]]] = None
102+
103+
104+
# ==================== Prompt Params ====================
105+
106+
class GetPromptParam(BaseModel):
107+
"""Parameter model for retrieving prompt information"""
108+
prompt_key: str
109+
version: Optional[str] = None
110+
label: Optional[str] = None
111+
112+
113+
class SubscribePromptParam(BaseModel):
114+
"""Parameter model for subscribing to prompt changes"""
115+
prompt_key: str
116+
version: Optional[str] = None
117+
label: Optional[str] = None
118+
subscribe_callback: Optional[
119+
Callable[[str, Prompt], Awaitable[None]]] = None

v2/nacos/ai/model/ai_request.py

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -131,3 +131,27 @@ class ReleaseMcpServerRequest(AbstractMcpRequest):
131131
def get_request_type(self) -> str:
132132
"""Returns the release MCP server request type"""
133133
return RELEASE_MCP_SERVER_REQUEST_TYPE
134+
135+
136+
# ==================== Prompt Requests ====================
137+
138+
QUERY_PROMPT_REQUEST_TYPE = "QueryPromptRequest"
139+
140+
141+
class AbstractPromptRequest(AbstractAIRequest, ABC):
142+
"""Abstract base class for Prompt requests"""
143+
namespaceId: Optional[str] = None
144+
promptKey: Optional[str] = None
145+
146+
def get_request_type(self) -> str:
147+
raise NotImplementedError("Subclasses should implement this method.")
148+
149+
150+
class QueryPromptRequest(AbstractPromptRequest):
151+
"""Request for querying prompt by key, version, or label"""
152+
version: Optional[str] = None
153+
label: Optional[str] = None
154+
md5: Optional[str] = None
155+
156+
def get_request_type(self) -> str:
157+
return QUERY_PROMPT_REQUEST_TYPE

v2/nacos/ai/model/ai_response.py

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22

33
from v2.nacos.ai.model.a2a.a2a import AgentCardDetailInfo
44
from v2.nacos.ai.model.mcp.mcp import McpServerDetailInfo
5+
from v2.nacos.ai.model.prompt.prompt import Prompt
56
from v2.nacos.transport.model.rpc_response import Response
67

78

@@ -58,3 +59,11 @@ def get_response_type(self) -> str:
5859
"""Returns the release agent card response type"""
5960
return "ReleaseAgentCardResponse"
6061

62+
63+
class QueryPromptResponse(Response):
64+
"""Response for prompt query requests"""
65+
promptInfo: Optional[Prompt] = None
66+
67+
def get_response_type(self) -> str:
68+
return "QueryPromptResponse"
69+
Lines changed: 134 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,134 @@
1+
import asyncio
2+
import logging
3+
from typing import Dict, Optional
4+
5+
from v2.nacos.ai.model.cache.prompt_subscribe_manager import PromptSubscribeManager
6+
from v2.nacos.ai.model.prompt.prompt import Prompt
7+
from v2.nacos.ai.util.prompt_util import build_prompt_cache_key
8+
from v2.nacos.common.constants import Constants
9+
from v2.nacos.common.nacos_exception import NacosException, NOT_FOUND, NOT_MODIFIED
10+
11+
DEFAULT_PROMPT_UPDATE_INTERVAL = 10
12+
13+
14+
class PromptCacheHolder:
15+
16+
def __init__(self, subscribe_manager: PromptSubscribeManager, ai_client_proxy,
17+
update_interval: float = DEFAULT_PROMPT_UPDATE_INTERVAL):
18+
"""
19+
Args:
20+
subscribe_manager: Manager for prompt change subscribers
21+
ai_client_proxy: Any object with async query_prompt(prompt_key, version, label, md5) method.
22+
Can be AIGRPCClientProxy or AiHttpClientProxy.
23+
update_interval: Poll interval in seconds (default 10s)
24+
"""
25+
self.prompt_subscribe_manager = subscribe_manager
26+
self.ai_client_proxy = ai_client_proxy
27+
self.update_interval = update_interval
28+
self.prompt_cache: Dict[str, Prompt] = {}
29+
self.cache_lock = asyncio.Lock()
30+
self.logger = logging.getLogger(Constants.AI_MODULE)
31+
self.tasks: Dict[str, asyncio.Task] = {}
32+
33+
async def shutdown(self):
34+
for task in self.tasks.values():
35+
task.cancel()
36+
for task in self.tasks.values():
37+
try:
38+
await task
39+
except asyncio.CancelledError:
40+
pass
41+
self.tasks.clear()
42+
43+
async def subscribe_prompt(self, prompt_key: str, version: Optional[str],
44+
label: Optional[str]) -> Optional[Prompt]:
45+
cache_key = build_prompt_cache_key(prompt_key, version, label)
46+
prompt = None
47+
try:
48+
prompt = await self.ai_client_proxy.query_prompt(prompt_key, version, label, None)
49+
self._process_prompt(prompt_key, cache_key, prompt)
50+
except NacosException as e:
51+
if e.error_code != NOT_FOUND:
52+
raise
53+
self._process_prompt(prompt_key, cache_key, None)
54+
55+
self._add_update_task(prompt_key, version, label)
56+
self.logger.info(f"Subscribed prompt: {prompt_key}, version: {version}, label: {label}")
57+
return prompt
58+
59+
def unsubscribe_prompt(self, prompt_key: str, version: Optional[str],
60+
label: Optional[str]):
61+
cache_key = build_prompt_cache_key(prompt_key, version, label)
62+
self._remove_update_task(prompt_key, version, label)
63+
self.prompt_cache.pop(cache_key, None)
64+
self.logger.info(f"Unsubscribed prompt: {prompt_key}, version: {version}, label: {label}")
65+
66+
def get_cached_prompt(self, prompt_key: str, version: Optional[str],
67+
label: Optional[str]) -> Optional[Prompt]:
68+
cache_key = build_prompt_cache_key(prompt_key, version, label)
69+
return self.prompt_cache.get(cache_key)
70+
71+
def _add_update_task(self, prompt_key: str, version: Optional[str],
72+
label: Optional[str]):
73+
cache_key = build_prompt_cache_key(prompt_key, version, label)
74+
if cache_key not in self.tasks:
75+
self.tasks[cache_key] = asyncio.create_task(
76+
self._update_prompt_loop(prompt_key, version, label, cache_key))
77+
78+
def _remove_update_task(self, prompt_key: str, version: Optional[str],
79+
label: Optional[str]):
80+
cache_key = build_prompt_cache_key(prompt_key, version, label)
81+
task = self.tasks.pop(cache_key, None)
82+
if task is not None:
83+
task.cancel()
84+
85+
async def _update_prompt_loop(self, prompt_key: str, version: Optional[str],
86+
label: Optional[str], cache_key: str):
87+
while True:
88+
try:
89+
await asyncio.sleep(self.update_interval)
90+
except asyncio.CancelledError:
91+
return
92+
93+
try:
94+
current_prompt = self.prompt_cache.get(cache_key)
95+
current_md5 = current_prompt.md5 if current_prompt else None
96+
latest_prompt = await self.ai_client_proxy.query_prompt(
97+
prompt_key, version, label, current_md5)
98+
self._process_prompt(prompt_key, cache_key, latest_prompt)
99+
except NacosException as e:
100+
if e.error_code == NOT_FOUND:
101+
self._process_prompt(prompt_key, cache_key, None)
102+
elif e.error_code == NOT_MODIFIED:
103+
pass
104+
else:
105+
self.logger.warning(
106+
f"Prompt updater query failed: promptKey={prompt_key}, err={e.message}")
107+
except asyncio.CancelledError:
108+
return
109+
except Exception as e:
110+
self.logger.warning(
111+
f"Prompt updater unexpected error: promptKey={prompt_key}, err={e}")
112+
113+
def _process_prompt(self, prompt_key: str, cache_key: str,
114+
new_prompt: Optional[Prompt]):
115+
old_prompt = self.prompt_cache.get(cache_key)
116+
if new_prompt is None:
117+
self.prompt_cache.pop(cache_key, None)
118+
else:
119+
self.prompt_cache[cache_key] = new_prompt
120+
121+
if self._is_prompt_changed(old_prompt, new_prompt):
122+
subscribers = self.prompt_subscribe_manager.subscribers.get(cache_key, [])
123+
for callback_func in subscribers:
124+
try:
125+
asyncio.ensure_future(callback_func(prompt_key, new_prompt))
126+
except Exception as e:
127+
self.logger.error(f"Prompt change callback error: {e}")
128+
129+
@staticmethod
130+
def _is_prompt_changed(old_prompt: Optional[Prompt],
131+
new_prompt: Optional[Prompt]) -> bool:
132+
old_json = "" if old_prompt is None else old_prompt.model_dump_json()
133+
new_json = "" if new_prompt is None else new_prompt.model_dump_json()
134+
return old_json != new_json
Lines changed: 41 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,41 @@
1+
import asyncio
2+
from typing import Callable, Awaitable, List, Optional
3+
4+
from v2.nacos.ai.model.prompt.prompt import Prompt
5+
from v2.nacos.ai.util.prompt_util import build_prompt_cache_key
6+
7+
8+
class PromptSubscribeManager:
9+
10+
def __init__(self):
11+
self.subscribers: dict[str, List[Callable[[str, Prompt], Awaitable[None]]]] = {}
12+
self.lock = asyncio.Lock()
13+
14+
async def register_subscriber(self, prompt_key: str, version: Optional[str],
15+
label: Optional[str],
16+
callback_func: Callable[[str, Prompt], Awaitable[None]]):
17+
key = build_prompt_cache_key(prompt_key, version, label)
18+
async with self.lock:
19+
if key not in self.subscribers:
20+
self.subscribers[key] = []
21+
self.subscribers[key].append(callback_func)
22+
23+
async def deregister_subscriber(self, prompt_key: str, version: Optional[str],
24+
label: Optional[str],
25+
callback_func: Callable[[str, Prompt], Awaitable[None]]):
26+
if not callback_func:
27+
return
28+
key = build_prompt_cache_key(prompt_key, version, label)
29+
async with self.lock:
30+
if key not in self.subscribers:
31+
return
32+
self.subscribers[key] = [func for func in self.subscribers[key] if func != callback_func]
33+
if not self.subscribers[key]:
34+
del self.subscribers[key]
35+
36+
def is_subscribed(self, prompt_key: str, version: Optional[str],
37+
label: Optional[str]) -> bool:
38+
key = build_prompt_cache_key(prompt_key, version, label)
39+
if key not in self.subscribers:
40+
return False
41+
return len(self.subscribers[key]) > 0

v2/nacos/ai/model/prompt/__init__.py

Whitespace-only changes.

v2/nacos/ai/model/prompt/prompt.py

Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,38 @@
1+
from typing import Optional, Dict
2+
3+
from pydantic import BaseModel
4+
5+
6+
class Prompt(BaseModel):
7+
"""Prompt entity for AI Prompt management.
8+
9+
Prompt is stored as a Nacos configuration with fixed group "nacos-ai-prompt"
10+
and dataId "{promptKey}.json". The content is stored as JSON format.
11+
"""
12+
13+
promptKey: Optional[str] = None
14+
version: Optional[str] = None
15+
template: Optional[str] = None
16+
md5: Optional[str] = None
17+
18+
def render(self, variables: Optional[Dict[str, str]] = None) -> Optional[str]:
19+
"""Render the prompt template by replacing {{variableName}} with values.
20+
21+
Example:
22+
prompt = Prompt(template="Hello {{name}}, welcome to {{place}}!")
23+
result = prompt.render({"name": "Alice", "place": "Nacos"})
24+
# Result: "Hello Alice, welcome to Nacos!"
25+
"""
26+
if self.template is None:
27+
return None
28+
if not variables:
29+
return self.template
30+
31+
result = self.template
32+
for key, value in variables.items():
33+
placeholder = "{{" + key + "}}"
34+
result = result.replace(placeholder, value if value is not None else "")
35+
return result
36+
37+
def __str__(self):
38+
return f"Prompt(promptKey='{self.promptKey}', version='{self.version}')"

0 commit comments

Comments
 (0)