Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions openviking/async_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -215,6 +215,7 @@ async def add_resource(
build_index: bool = True,
summarize: bool = False,
watch_interval: float = 0,
tags: Optional[Union[str, List[str]]] = None,
telemetry: TelemetryRequest = False,
**kwargs,
) -> Dict[str, Any]:
Expand Down Expand Up @@ -247,6 +248,7 @@ async def add_resource(
timeout=timeout,
build_index=build_index,
summarize=summarize,
tags=tags,
telemetry=telemetry,
watch_interval=watch_interval,
**kwargs,
Expand Down Expand Up @@ -313,6 +315,7 @@ async def search(
limit: int = 10,
score_threshold: Optional[float] = None,
filter: Optional[Dict] = None,
tags: Optional[Union[str, List[str]]] = None,
telemetry: TelemetryRequest = False,
):
"""
Expand All @@ -338,6 +341,7 @@ async def search(
limit=limit,
score_threshold=score_threshold,
filter=filter,
tags=tags,
telemetry=telemetry,
)

Expand All @@ -348,6 +352,7 @@ async def find(
limit: int = 10,
score_threshold: Optional[float] = None,
filter: Optional[Dict] = None,
tags: Optional[Union[str, List[str]]] = None,
telemetry: TelemetryRequest = False,
):
"""Semantic search"""
Expand All @@ -358,6 +363,7 @@ async def find(
limit=limit,
score_threshold=score_threshold,
filter=filter,
tags=tags,
telemetry=telemetry,
)

Expand Down
7 changes: 7 additions & 0 deletions openviking/client/local.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
attach_telemetry_payload,
run_with_telemetry,
)
from openviking.utils.tag_utils import canonicalize_user_tags, expand_query_tags
from openviking_cli.client.base import BaseClient
from openviking_cli.session.user_id import UserIdentifier
from openviking_cli.utils import run_async
Expand Down Expand Up @@ -83,6 +84,7 @@ async def add_resource(
summarize: bool = False,
telemetry: TelemetryRequest = False,
watch_interval: float = 0,
tags: Optional[Union[str, List[str]]] = None,
**kwargs,
) -> Dict[str, Any]:
"""Add resource to OpenViking."""
Expand All @@ -104,6 +106,7 @@ async def add_resource(
build_index=build_index,
summarize=summarize,
watch_interval=watch_interval,
tags=canonicalize_user_tags(tags),
**kwargs,
),
)
Expand Down Expand Up @@ -263,6 +266,7 @@ async def find(
limit: int = 10,
score_threshold: Optional[float] = None,
filter: Optional[Dict[str, Any]] = None,
tags: Optional[Union[str, List[str]]] = None,
telemetry: TelemetryRequest = False,
) -> Any:
"""Semantic search without session context."""
Expand All @@ -276,6 +280,7 @@ async def find(
limit=limit,
score_threshold=score_threshold,
filter=filter,
tags=expand_query_tags(tags),
),
)
return attach_telemetry_payload(
Expand All @@ -291,6 +296,7 @@ async def search(
limit: int = 10,
score_threshold: Optional[float] = None,
filter: Optional[Dict[str, Any]] = None,
tags: Optional[Union[str, List[str]]] = None,
telemetry: TelemetryRequest = False,
) -> Any:
"""Semantic search with optional session context."""
Expand All @@ -308,6 +314,7 @@ async def _search():
limit=limit,
score_threshold=score_threshold,
filter=filter,
tags=expand_query_tags(tags),
)

execution = await run_with_telemetry(
Expand Down
13 changes: 13 additions & 0 deletions openviking/core/context.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
from typing import Any, Dict, List, Optional
from uuid import uuid4

from openviking.utils.tag_utils import parse_tags
from openviking.utils.time_utils import format_iso8601, parse_iso_datetime
from openviking_cli.session.user_id import UserIdentifier
from openviking_cli.utils.uri import VikingURI
Expand Down Expand Up @@ -67,6 +68,7 @@ def __init__(
active_count: int = 0,
related_uri: Optional[List[str]] = None,
meta: Optional[Dict[str, Any]] = None,
tags: Optional[List[str] | str] = None,
level: int | ContextLevel | None = None,
session_id: Optional[str] = None,
user: Optional[UserIdentifier] = None,
Expand All @@ -90,6 +92,9 @@ def __init__(
self.active_count = active_count
self.related_uri = related_uri or []
self.meta = meta or {}
self.tags = parse_tags(tags if tags is not None else self.meta.get("tags"))
if self.tags and "tags" not in self.meta:
self.meta["tags"] = list(self.tags)
try:
self.level = int(level) if level is not None else None
except (TypeError, ValueError):
Expand Down Expand Up @@ -172,6 +177,7 @@ def to_dict(self) -> Dict[str, Any]:
"active_count": self.active_count,
"vector": self.vector,
"meta": self.meta,
"tags": self.tags,
"related_uri": self.related_uri,
"session_id": self.session_id,
"account_id": self.account_id,
Expand Down Expand Up @@ -225,6 +231,13 @@ def from_dict(cls, data: Dict[str, Any]) -> "Context":
active_count=data.get("active_count", 0),
related_uri=data.get("related_uri", []),
meta=data.get("meta", {}),
tags=(
data.get("tags")
if data.get("tags") is not None
else data.get("meta", {}).get("tags")
if isinstance(data.get("meta"), dict)
else None
),
level=(
data.get("level")
if data.get("level") is not None
Expand Down
127 changes: 127 additions & 0 deletions openviking/retrieve/hierarchical_retriever.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
from openviking.storage import VikingDBManager, VikingDBManagerProxy
from openviking.storage.viking_fs import get_viking_fs
from openviking.telemetry import get_current_telemetry
from openviking.utils.tag_utils import expand_query_tags, merge_tags, parse_tags
from openviking.utils.time_utils import parse_iso_datetime
from openviking_cli.retrieve.types import (
ContextType,
Expand Down Expand Up @@ -50,6 +51,9 @@ class HierarchicalRetriever:
DIRECTORY_DOMINANCE_RATIO = 1.2 # Directory score must exceed max child score
GLOBAL_SEARCH_TOPK = 10 # Global retrieval count (more candidates = better rerank precision)
HOTNESS_ALPHA = 0.2 # Weight for hotness score in final ranking (0 = disabled)
MAX_TAG_EXPANSION_TAGS = 8 # Upper bound on expansion tags collected per query.
TAG_EXPANSION_LIMIT = 8 # Upper bound on extra nodes discovered via tags.
TAG_EXPANSION_SCORE = 0.15 # Lower seed score for tag-expanded nodes.
LEVEL_URI_SUFFIX = {0: ".abstract.md", 1: ".overview.md"}

def __init__(
Expand Down Expand Up @@ -113,6 +117,7 @@ async def retrieve(
vector_proxy = VikingDBManagerProxy(self.vector_store, ctx)

target_dirs = [d for d in (query.target_directories or []) if d]
query_tags = expand_query_tags(query.tags)

if not await vector_proxy.collection_exists_bound():
logger.warning(
Expand Down Expand Up @@ -146,6 +151,7 @@ async def retrieve(
sparse_query_vector=sparse_query_vector,
context_type=query.context_type.value if query.context_type else None,
target_dirs=target_dirs,
tags=query_tags,
scope_dsl=scope_dsl,
limit=max(limit, self.GLOBAL_SEARCH_TOPK),
)
Expand All @@ -167,11 +173,22 @@ async def retrieve(
f" [{i}] URI: {uri}, score: {score:.4f}, level: {level}, account_id: {account_id}"
)

expanded_points, expanded_candidates = await self._expand_starting_points_by_tags(
vector_proxy=vector_proxy,
global_results=global_results,
explicit_tags=query_tags,
context_type=query.context_type.value if query.context_type else None,
target_dirs=target_dirs,
scope_dsl=scope_dsl,
limit=self.TAG_EXPANSION_LIMIT,
)

# Step 3: Merge starting points
starting_points = self._merge_starting_points(
query.query,
root_uris,
global_results,
extra_points=expanded_points,
mode=mode,
)

Expand All @@ -183,6 +200,10 @@ async def retrieve(
initial_candidates,
mode=mode,
)
initial_candidates = self._merge_initial_candidates(
initial_candidates,
expanded_candidates,
)

# Step 4: Recursive search
candidates = await self._recursive_search(
Expand Down Expand Up @@ -229,6 +250,7 @@ async def _global_vector_search(
sparse_query_vector: Optional[Dict[str, float]],
context_type: Optional[str],
target_dirs: List[str],
tags: List[str],
scope_dsl: Optional[Dict[str, Any]],
limit: int,
) -> List[Dict[str, Any]]:
Expand All @@ -238,6 +260,7 @@ async def _global_vector_search(
sparse_query_vector=sparse_query_vector,
context_type=context_type,
target_directories=target_dirs,
tags=tags,
extra_filter=scope_dsl,
limit=limit,
)
Expand Down Expand Up @@ -284,6 +307,7 @@ def _merge_starting_points(
query: str,
root_uris: List[str],
global_results: List[Dict[str, Any]],
extra_points: Optional[List[Tuple[str, float]]] = None,
mode: str = "thinking",
) -> List[Tuple[str, float]]:
"""Merge starting points.
Expand Down Expand Up @@ -320,8 +344,111 @@ def _merge_starting_points(
points.append((uri, 0.0))
seen.add(uri)

for uri, score in extra_points or []:
if uri not in seen:
points.append((uri, score))
seen.add(uri)

return points

def _merge_initial_candidates(
self,
*candidate_groups: Optional[List[Dict[str, Any]]],
) -> List[Dict[str, Any]]:
merged: Dict[str, Dict[str, Any]] = {}
for group in candidate_groups:
for candidate in group or []:
uri = candidate.get("uri", "")
if not uri:
continue
previous = merged.get(uri)
if previous is None or candidate.get("_score", 0.0) > previous.get("_score", 0.0):
merged[uri] = candidate
return sorted(merged.values(), key=lambda item: item.get("_score", 0.0), reverse=True)

async def _expand_starting_points_by_tags(
self,
vector_proxy: VikingDBManagerProxy,
global_results: List[Dict[str, Any]],
explicit_tags: List[str],
context_type: Optional[str],
target_dirs: List[str],
scope_dsl: Optional[Dict[str, Any]],
limit: int,
) -> Tuple[List[Tuple[str, float]], List[Dict[str, Any]]]:
expansion_tags = self._collect_expansion_tags(global_results, explicit_tags)
if not expansion_tags or limit <= 0:
return [], []

tag_matches = await vector_proxy.search_by_tags_in_tenant(
tags=expansion_tags,
context_type=context_type,
target_directories=target_dirs,
extra_filter=scope_dsl,
levels=[0, 1, 2],
limit=limit,
)
telemetry = get_current_telemetry()
telemetry.count("retrieval.tag_expansion.tags", len(expansion_tags))
telemetry.count("retrieval.tag_expansion.matches", len(tag_matches))

seen_uris = {result.get("uri", "") for result in global_results}
expansion_points: Dict[str, float] = {}
expansion_candidates: Dict[str, Dict[str, Any]] = {}
expansion_tag_set = set(expansion_tags)

for match in tag_matches:
uri = match.get("uri", "")
if not uri or uri in seen_uris:
continue

overlap = expansion_tag_set.intersection(parse_tags(match.get("tags")))
score = self._score_tag_expansion(len(overlap))

if match.get("level", 2) == 2:
candidate = dict(match)
candidate["_score"] = score
previous = expansion_candidates.get(uri)
if previous is None or score > previous.get("_score", 0.0):
expansion_candidates[uri] = candidate

start_uri = self._start_uri_from_record(match)
if start_uri and score > expansion_points.get(start_uri, 0.0):
expansion_points[start_uri] = score

return list(expansion_points.items()), list(expansion_candidates.values())

def _collect_expansion_tags(
self,
global_results: List[Dict[str, Any]],
explicit_tags: List[str],
) -> List[str]:
collected = [explicit_tags]
for result in global_results:
collected.append(parse_tags(result.get("tags")))
return merge_tags(*collected, max_tags=self.MAX_TAG_EXPANSION_TAGS)

def _score_tag_expansion(self, overlap_count: int) -> float:
if overlap_count <= 1:
return self.TAG_EXPANSION_SCORE
return self.TAG_EXPANSION_SCORE * (1.0 + 0.2 * min(overlap_count - 1, 3))

def _start_uri_from_record(self, record: Dict[str, Any]) -> str:
uri = record.get("uri", "")
if not uri:
return ""
if record.get("level", 2) != 2:
return uri

parent_uri = record.get("parent_uri")
if parent_uri:
return parent_uri

normalized = uri.rstrip("/")
if "/" not in normalized:
return ""
return normalized.rsplit("/", 1)[0]

def _prepare_initial_candidates(
self,
query: str,
Expand Down
4 changes: 4 additions & 0 deletions openviking/server/routers/resources.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
from openviking.server.models import Response
from openviking.server.telemetry import run_operation
from openviking.telemetry import TelemetryRequest
from openviking.utils.tag_utils import canonicalize_user_tags
from openviking_cli.exceptions import InvalidArgumentError
from openviking_cli.utils.config.open_viking_config import get_openviking_config

Expand Down Expand Up @@ -50,6 +51,7 @@ class AddResourceRequest(BaseModel):
exclude: Glob pattern for files to exclude during parsing.
directly_upload_media: Whether to directly upload media files. Default is True.
preserve_structure: Whether to preserve directory structure when adding directories.
tags: Optional semicolon-delimited tags to persist on indexed contexts.
watch_interval: Watch interval in minutes for automatic resource monitoring.
- watch_interval > 0: Creates or updates a watch task. The resource will be
automatically re-processed at the specified interval.
Expand Down Expand Up @@ -80,6 +82,7 @@ class AddResourceRequest(BaseModel):
exclude: Optional[str] = None
directly_upload_media: bool = True
preserve_structure: Optional[bool] = None
tags: Optional[str] = None
telemetry: TelemetryRequest = False
watch_interval: float = 0

Expand Down Expand Up @@ -213,6 +216,7 @@ async def add_resource(
instruction=request.instruction,
wait=request.wait,
timeout=request.timeout,
tags=canonicalize_user_tags(request.tags),
allow_local_path_resolution=allow_local_path_resolution,
**kwargs,
),
Expand Down
Loading
Loading