Skip to content

Commit d5c0ba8

Browse files
Merge pull request #392 from nlweb-ai/pr-364-updated
Pr 364 updated
2 parents 623e1de + a3b205f commit d5c0ba8

7 files changed

Lines changed: 116 additions & 901 deletions

File tree

code/python/core/embedding.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -272,10 +272,10 @@ async def batch_get_embeddings(
272272
# Gemini might not have a native batch API, so process one by one
273273
logger.debug("Getting Gemini batch embeddings (sequential)")
274274
from embedding_providers.gemini_embedding import get_gemini_batch_embeddings
275-
# Process texts one by one with individual timeouts
275+
# Process texts one by one - use longer timeout since sequential processing with rate limits
276276
result = await asyncio.wait_for(
277277
get_gemini_batch_embeddings(texts, model=model_id),
278-
timeout=30 # Individual timeout per text
278+
timeout=timeout * len(texts) # Scale timeout with batch size
279279
)
280280
logger.debug(f"Gemini batch embeddings received, count: {len(result)}")
281281
return result

code/python/core/ranking.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -291,7 +291,7 @@ async def sendAnswers(self, answers, force=False):
291291
self.handler.fastTrackWorked = True
292292

293293
# Use the new schema to create and auto-send the message
294-
create_assistant_result(json_results, handler=self.handler)
294+
await create_assistant_result(json_results, handler=self.handler)
295295
self.num_results_sent += len(json_results)
296296
except (BrokenPipeError, ConnectionResetError) as e:
297297
self.handler.connection_alive_event.clear()

code/python/core/schemas.py

Lines changed: 3 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,6 @@
1010
from enum import Enum
1111
import uuid
1212

13-
1413
class SenderType(str, Enum):
1514
"""Who sent the message."""
1615
USER = "user"
@@ -298,7 +297,7 @@ def create_user_message(query: str, site: Optional[str] = None, mode: Optional[s
298297
return message
299298

300299

301-
def create_assistant_result(results: List[Dict[str, Any]],
300+
async def create_assistant_result(results: List[Dict[str, Any]],
302301
handler=None,
303302
metadata: Optional[Dict[str, Any]] = None,
304303
send: bool = True) -> Message:
@@ -323,8 +322,7 @@ def create_assistant_result(results: List[Dict[str, Any]],
323322
)
324323

325324
if send and handler:
326-
import asyncio
327-
asyncio.create_task(handler.send_message(message.to_dict()))
325+
await handler.send_message(message.to_dict())
328326

329327
return message
330328

@@ -471,4 +469,4 @@ def create_legacy_message(message_type: str, content: Any,
471469
if sender_info:
472470
message["sender_info"] = sender_info
473471

474-
return message
472+
return message

code/python/core/whoRanking.py

Lines changed: 97 additions & 90 deletions
Original file line numberDiff line numberDiff line change
@@ -13,12 +13,14 @@
1313
from misc.logger.logging_config_helper import get_configured_logger
1414
from core.schemas import create_assistant_result
1515

16+
1617
logger = get_configured_logger("who_ranking_engine")
1718

1819
DEBUG_PRINT = False
1920

2021
class WhoRanking:
2122

23+
2224
EARLY_SEND_THRESHOLD = 59
2325
NUM_RESULTS_TO_SEND = 10
2426

@@ -29,6 +31,7 @@ def __init__(self, handler, items, level="high"): # default to low level for WHO
2931
self.items = items
3032
self.num_results_sent = 0
3133
self.rankedAnswers = []
34+
3235

3336

3437
def get_ranking_prompt(self, query, site_description):
@@ -48,81 +51,107 @@ def get_ranking_prompt(self, query, site_description):
4851
4952
The site's description is: {site_description}
5053
"""
51-
54+
5255
response_structure = {
5356
"score": "integer between 0 and 100",
5457
"description": "short description of why this site is relevant",
55-
"query": "the optimized query to send to this site (only if score > 70)"
58+
"query": "the optimized query to send to this site (only if score > 70)",
5659
}
57-
60+
5861
return prompt, response_structure
5962

6063
async def rankItem(self, url, json_str, name, site):
6164
"""Rank a single site for relevance to the query."""
6265
try:
6366
description = trim_json(json_str)
64-
prompt, ans_struc = self.get_ranking_prompt(self.handler.query, description)
65-
ranking = await ask_llm(prompt, ans_struc, level=self.level,
66-
query_params=self.handler.query_params, timeout=8)
67-
67+
prompt, ans_struc = self.get_ranking_prompt(
68+
self.handler.query, description
69+
)
70+
ranking = await ask_llm(
71+
prompt,
72+
ans_struc,
73+
level=self.level,
74+
query_params=self.handler.query_params,
75+
timeout=90,
76+
)
77+
6878
# Ensure ranking has required fields (handle LLM failures/timeouts)
6979
if not ranking or not isinstance(ranking, dict):
70-
ranking = {"score": 0, "description": "Failed to rank", "query": self.handler.query}
80+
ranking = {
81+
"score": 0,
82+
"description": "Failed to rank",
83+
"query": self.handler.query,
84+
}
7185
if "score" not in ranking:
7286
ranking["score"] = 0
7387
if "query" not in ranking:
7488
ranking["query"] = self.handler.query
75-
89+
7690
# Log the LLM score
7791
# LLM Score recorded
78-
92+
7993
# Handle both string and dictionary inputs for json_str
80-
schema_object = json_str if isinstance(json_str, dict) else json.loads(json_str)
81-
94+
schema_object = (
95+
json_str if isinstance(json_str, dict) else json.loads(json_str)
96+
)
97+
8298
# Store the result
8399
ansr = {
84-
'url': url,
85-
'site': site,
86-
'name': name,
87-
'ranking': ranking,
88-
'schema_object': schema_object,
89-
'sent': False,
100+
"url": url,
101+
"site": site,
102+
"name": name,
103+
"ranking": ranking,
104+
"schema_object": schema_object,
105+
"sent": False,
90106
}
91-
107+
92108
# Send immediately if high score
93109
if ranking.get("score", 0) > self.EARLY_SEND_THRESHOLD:
94-
logger.info(f"High score site: {name} (score: {ranking['score']}) - sending early")
110+
logger.info(
111+
f"High score site: {name} (score: {ranking['score']}) - sending early"
112+
)
95113
await self.sendAnswers([ansr])
96-
114+
97115
self.rankedAnswers.append(ansr)
98116
logger.debug(f"Site {name} added to ranked answers")
99-
117+
100118
except Exception as e:
101119
logger.error(f"Error in rankItem for {name}: {str(e)}")
102120
logger.debug(f"Full error trace: ", exc_info=True)
103121
# Still add the item with a zero score so we don't lose it completely
104122
try:
105-
schema_object = json_str if isinstance(json_str, dict) else json.loads(json_str)
123+
schema_object = (
124+
json_str if isinstance(json_str, dict) else json.loads(json_str)
125+
)
106126
ansr = {
107-
'url': url,
108-
'site': site,
109-
'name': name,
110-
'ranking': {"score": 0, "description": f"Error: {str(e)}", "query": self.handler.query},
111-
'schema_object': schema_object,
112-
'sent': False,
127+
"url": url,
128+
"site": site,
129+
"name": name,
130+
"ranking": {
131+
"score": 0,
132+
"description": f"Error: {str(e)}",
133+
"query": self.handler.query,
134+
},
135+
"schema_object": schema_object,
136+
"sent": False,
113137
}
114138
self.rankedAnswers.append(ansr)
115139
except:
116140
pass # Skip this item entirely if we can't even create a basic record
117141

118142
async def sendAnswers(self, answers, force=False):
119143
"""Send ranked sites to the client."""
144+
# Get max_results from handler, or use default
145+
max_results = getattr(self.handler, 'max_results', self.NUM_RESULTS_TO_SEND)
120146
json_results = []
121147

148+
122149
for result in answers:
123150
# Stop if we've already sent enough
124-
if self.num_results_sent + len(json_results) >= self.NUM_RESULTS_TO_SEND:
125-
logger.info(f"Stopping at {len(json_results)} results to avoid exceeding limit of {self.NUM_RESULTS_TO_SEND}")
151+
if self.num_results_sent + len(json_results) >= max_results:
152+
logger.info(
153+
f"Stopping at {len(json_results)} results to avoid exceeding limit of {max_results}"
154+
)
126155
break
127156

128157
# Extract site type from schema_object
@@ -143,24 +172,34 @@ async def sendAnswers(self, answers, force=False):
143172
"@type": site_type, # Use the actual site type
144173
"url": complete_url, # Use the complete URL with gateway
145174
"name": result["name"],
146-
"score": result["ranking"]["score"]
175+
"score": result["ranking"]["score"],
147176
}
148177

178+
149179
# Include description if available
150180
if "description" in result["ranking"]:
151181
result_item["description"] = result["ranking"]["description"]
152182

183+
# Always include query field (required for WHO ranking)
184+
if "query" in result["ranking"]:
185+
result_item["query"] = result["ranking"]["query"]
186+
else:
187+
# Fallback to original query if no custom query provided
188+
result_item["query"] = self.handler.query
189+
190+
153191
# Include optimized query field (for display purposes)
154192
result_item["query"] = site_query
155193

156194
json_results.append(result_item)
157195
result["sent"] = True
158-
159196
if json_results:
160197
# Use the new schema to create and auto-send the message
161-
create_assistant_result(json_results, handler=self.handler)
198+
await create_assistant_result(json_results, handler=self.handler)
162199
self.num_results_sent += len(json_results)
163-
logger.info(f"Sent {len(json_results)} results, total sent: {self.num_results_sent}/{self.NUM_RESULTS_TO_SEND}")
200+
logger.info(
201+
f"Sent {len(json_results)} results, total sent: {self.num_results_sent}/{max_results}"
202+
)
164203

165204
async def do(self):
166205
"""Main execution method - rank all sites concurrently."""
@@ -169,69 +208,37 @@ async def do(self):
169208
tasks = []
170209
for url, json_str, name, site in self.items:
171210
tasks.append(asyncio.create_task(self.rankItem(url, json_str, name, site)))
172-
211+
173212
# Wait for all ranking tasks to complete
174213
try:
175214
await asyncio.gather(*tasks, return_exceptions=True)
176215
except Exception as e:
177216
logger.error(f"Error during ranking tasks: {str(e)}")
178-
179-
# Filter and sort final results
180-
filtered = [r for r in self.rankedAnswers if r.get('ranking', {}).get('score', 0) > 70]
217+
218+
# Use min_score from handler if available, otherwise default to 51
219+
min_score_threshold = getattr(self.handler, 'min_score', 51)
220+
# Use max_results from handler if available, otherwise use NUM_RESULTS_TO_SEND
221+
max_results = getattr(self.handler, 'max_results', self.NUM_RESULTS_TO_SEND)
222+
filtered = [r for r in self.rankedAnswers if r.get('ranking', {}).get('score', 0) > min_score_threshold]
181223
ranked = sorted(filtered, key=lambda x: x.get('ranking', {}).get("score", 0), reverse=True)
182-
self.handler.final_ranked_answers = ranked[:self.NUM_RESULTS_TO_SEND]
183-
184-
if (DEBUG_PRINT):
185-
print(f"\n=== WHO RANKING: Filtered to {len(filtered)} results with score > 70 ===")
186-
187-
# Print the ranked sites with scores
188-
print("\nRanked sites (top 10):")
189-
for i, r in enumerate(ranked[:self.NUM_RESULTS_TO_SEND], 1):
190-
score = r.get('ranking', {}).get('score', 0)
191-
print(f" {i}. {r['name']} - Score: {score}")
192-
print("=" * 60)
193-
194-
# Print sites that were not returned
195-
print("\n=== SITES NOT RETURNED (sorted by score) ===")
196-
197-
# Get all sites that were not included in the top 10
198-
not_returned_high_score = ranked[self.NUM_RESULTS_TO_SEND:] # Sites with score > 70 but beyond top 10
199-
not_returned_low_score = [r for r in self.rankedAnswers if r.get('ranking', {}).get('score', 0) <= 70]
200-
201-
# Sort low score sites by score (descending)
202-
not_returned_low_score = sorted(not_returned_low_score,
203-
key=lambda x: x.get('ranking', {}).get("score", 0),
204-
reverse=True)
205-
206-
# Combine both lists
207-
all_not_returned = not_returned_high_score + not_returned_low_score
208-
209-
if all_not_returned:
210-
print(f"\nTotal sites not returned: {len(all_not_returned)}")
211-
212-
# Print sites with score > 70 that didn't make top 10
213-
if not_returned_high_score:
214-
print(f"\nSites with score > 70 but beyond top {self.NUM_RESULTS_TO_SEND}:")
215-
for i, r in enumerate(not_returned_high_score, 1):
216-
score = r.get('ranking', {}).get('score', 0)
217-
print(f" {i}. {r['name']} - Score: {score}")
218-
219-
# Print sites with score <= 70
220-
if not_returned_low_score:
221-
print(f"\nSites with score <= 70:")
222-
for i, r in enumerate(not_returned_low_score, 1):
223-
score = r.get('ranking', {}).get('score', 0)
224-
print(f" {i}. {r['name']} - Score: {score}")
225-
else:
226-
print("All retrieved sites were returned to the user.")
224+
self.handler.final_ranked_answers = ranked[:max_results]
225+
226+
print(f"\n=== WHO RANKING: Filtered to {len(filtered)} results with score > {min_score_threshold} ===")
227+
228+
# Print the ranked sites with scores
229+
print("\nRanked sites (top 10):")
230+
for i, r in enumerate(ranked[:max_results], 1):
231+
score = r.get('ranking', {}).get('score', 0)
232+
print(f" {i}. {r['name']} - Score: {score}")
233+
print("=" * 60)
227234

228-
print("=" * 60)
229-
230235
# Final ranked results processed
231-
236+
232237
# Send any remaining results that haven't been sent
233-
results_to_send = [r for r in ranked if not r['sent']][:self.NUM_RESULTS_TO_SEND - self.num_results_sent]
234-
238+
results_to_send = [r for r in ranked if not r["sent"]][
239+
: max_results - self.num_results_sent
240+
]
241+
235242
if results_to_send:
236243
logger.info(f"Sending final batch of {len(results_to_send)} results")
237-
await self.sendAnswers(results_to_send, force=True)
244+
await self.sendAnswers(results_to_send, force=True)

code/python/embedding_providers/gemini_embedding.py

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,10 @@
1313
import threading
1414
from typing import List, Optional
1515
import time
16+
import logging
17+
18+
# Suppress gRPC ALTS credentials warning (not running on GCP)
19+
logging.getLogger("grpc").setLevel(logging.ERROR)
1620

1721
import google.generativeai as genai
1822
from core.config import CONFIG

static/who.html

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -382,11 +382,20 @@ <h1>Agent Finder</h1>
382382
this.eventSource.close();
383383
}
384384

385+
const debugParams = new URLSearchParams(window.location.search);
386+
const queryObj = Object.fromEntries(debugParams.entries());
387+
385388
// Create URL with query parameters
386389
const params = new URLSearchParams({
387390
query: query,
388391
streaming: 'true' // Use streaming mode
389392
});
393+
394+
// Add previous queries for context (limit to last 3 queries)
395+
if (this.queryHistory.length > 0) {
396+
const recentQueries = this.queryHistory.slice(-3);
397+
params.append('prev_queries', JSON.stringify(recentQueries));
398+
}
390399

391400
const url = `/who?${params.toString()}`;
392401

0 commit comments

Comments
 (0)