diff --git a/openviking/async_client.py b/openviking/async_client.py index 2042bbfa0..75cbbccd8 100644 --- a/openviking/async_client.py +++ b/openviking/async_client.py @@ -215,6 +215,7 @@ async def add_resource( build_index: bool = True, summarize: bool = False, watch_interval: float = 0, + tags: Optional[Union[str, List[str]]] = None, telemetry: TelemetryRequest = False, **kwargs, ) -> Dict[str, Any]: @@ -247,6 +248,7 @@ async def add_resource( timeout=timeout, build_index=build_index, summarize=summarize, + tags=tags, telemetry=telemetry, watch_interval=watch_interval, **kwargs, @@ -313,6 +315,7 @@ async def search( limit: int = 10, score_threshold: Optional[float] = None, filter: Optional[Dict] = None, + tags: Optional[Union[str, List[str]]] = None, telemetry: TelemetryRequest = False, ): """ @@ -338,6 +341,7 @@ async def search( limit=limit, score_threshold=score_threshold, filter=filter, + tags=tags, telemetry=telemetry, ) @@ -348,6 +352,7 @@ async def find( limit: int = 10, score_threshold: Optional[float] = None, filter: Optional[Dict] = None, + tags: Optional[Union[str, List[str]]] = None, telemetry: TelemetryRequest = False, ): """Semantic search""" @@ -358,6 +363,7 @@ async def find( limit=limit, score_threshold=score_threshold, filter=filter, + tags=tags, telemetry=telemetry, ) diff --git a/openviking/client/local.py b/openviking/client/local.py index a994843a6..8a2175bee 100644 --- a/openviking/client/local.py +++ b/openviking/client/local.py @@ -14,6 +14,7 @@ attach_telemetry_payload, run_with_telemetry, ) +from openviking.utils.tag_utils import canonicalize_user_tags, expand_query_tags from openviking_cli.client.base import BaseClient from openviking_cli.session.user_id import UserIdentifier from openviking_cli.utils import run_async @@ -83,6 +84,7 @@ async def add_resource( summarize: bool = False, telemetry: TelemetryRequest = False, watch_interval: float = 0, + tags: Optional[Union[str, List[str]]] = None, **kwargs, ) -> Dict[str, Any]: """Add resource to OpenViking.""" @@ -104,6 +106,7 @@ async def add_resource( build_index=build_index, summarize=summarize, watch_interval=watch_interval, + tags=canonicalize_user_tags(tags), **kwargs, ), ) @@ -263,6 +266,7 @@ async def find( limit: int = 10, score_threshold: Optional[float] = None, filter: Optional[Dict[str, Any]] = None, + tags: Optional[Union[str, List[str]]] = None, telemetry: TelemetryRequest = False, ) -> Any: """Semantic search without session context.""" @@ -276,6 +280,7 @@ async def find( limit=limit, score_threshold=score_threshold, filter=filter, + tags=expand_query_tags(tags), ), ) return attach_telemetry_payload( @@ -291,6 +296,7 @@ async def search( limit: int = 10, score_threshold: Optional[float] = None, filter: Optional[Dict[str, Any]] = None, + tags: Optional[Union[str, List[str]]] = None, telemetry: TelemetryRequest = False, ) -> Any: """Semantic search with optional session context.""" @@ -308,6 +314,7 @@ async def _search(): limit=limit, score_threshold=score_threshold, filter=filter, + tags=expand_query_tags(tags), ) execution = await run_with_telemetry( diff --git a/openviking/core/context.py b/openviking/core/context.py index 55bce1c47..58d0d43f2 100644 --- a/openviking/core/context.py +++ b/openviking/core/context.py @@ -7,6 +7,7 @@ from typing import Any, Dict, List, Optional from uuid import uuid4 +from openviking.utils.tag_utils import parse_tags from openviking.utils.time_utils import format_iso8601, parse_iso_datetime from openviking_cli.session.user_id import UserIdentifier from openviking_cli.utils.uri import VikingURI @@ -67,6 +68,7 @@ def __init__( active_count: int = 0, related_uri: Optional[List[str]] = None, meta: Optional[Dict[str, Any]] = None, + tags: Optional[List[str] | str] = None, level: int | ContextLevel | None = None, session_id: Optional[str] = None, user: Optional[UserIdentifier] = None, @@ -90,6 +92,9 @@ def __init__( self.active_count = active_count self.related_uri = related_uri or [] self.meta = meta or {} + self.tags = parse_tags(tags if tags is not None else self.meta.get("tags")) + if self.tags and "tags" not in self.meta: + self.meta["tags"] = list(self.tags) try: self.level = int(level) if level is not None else None except (TypeError, ValueError): @@ -172,6 +177,7 @@ def to_dict(self) -> Dict[str, Any]: "active_count": self.active_count, "vector": self.vector, "meta": self.meta, + "tags": self.tags, "related_uri": self.related_uri, "session_id": self.session_id, "account_id": self.account_id, @@ -225,6 +231,13 @@ def from_dict(cls, data: Dict[str, Any]) -> "Context": active_count=data.get("active_count", 0), related_uri=data.get("related_uri", []), meta=data.get("meta", {}), + tags=( + data.get("tags") + if data.get("tags") is not None + else data.get("meta", {}).get("tags") + if isinstance(data.get("meta"), dict) + else None + ), level=( data.get("level") if data.get("level") is not None diff --git a/openviking/retrieve/hierarchical_retriever.py b/openviking/retrieve/hierarchical_retriever.py index 3192ccff8..25870d6d4 100644 --- a/openviking/retrieve/hierarchical_retriever.py +++ b/openviking/retrieve/hierarchical_retriever.py @@ -22,6 +22,7 @@ from openviking.storage import VikingDBManager, VikingDBManagerProxy from openviking.storage.viking_fs import get_viking_fs from openviking.telemetry import get_current_telemetry +from openviking.utils.tag_utils import expand_query_tags, merge_tags, parse_tags from openviking.utils.time_utils import parse_iso_datetime from openviking_cli.retrieve.types import ( ContextType, @@ -50,6 +51,9 @@ class HierarchicalRetriever: DIRECTORY_DOMINANCE_RATIO = 1.2 # Directory score must exceed max child score GLOBAL_SEARCH_TOPK = 10 # Global retrieval count (more candidates = better rerank precision) HOTNESS_ALPHA = 0.2 # Weight for hotness score in final ranking (0 = disabled) + MAX_TAG_EXPANSION_TAGS = 8 # Upper bound on expansion tags collected per query. + TAG_EXPANSION_LIMIT = 8 # Upper bound on extra nodes discovered via tags. + TAG_EXPANSION_SCORE = 0.15 # Lower seed score for tag-expanded nodes. LEVEL_URI_SUFFIX = {0: ".abstract.md", 1: ".overview.md"} def __init__( @@ -113,6 +117,7 @@ async def retrieve( vector_proxy = VikingDBManagerProxy(self.vector_store, ctx) target_dirs = [d for d in (query.target_directories or []) if d] + query_tags = expand_query_tags(query.tags) if not await vector_proxy.collection_exists_bound(): logger.warning( @@ -146,6 +151,7 @@ async def retrieve( sparse_query_vector=sparse_query_vector, context_type=query.context_type.value if query.context_type else None, target_dirs=target_dirs, + tags=query_tags, scope_dsl=scope_dsl, limit=max(limit, self.GLOBAL_SEARCH_TOPK), ) @@ -167,11 +173,22 @@ async def retrieve( f" [{i}] URI: {uri}, score: {score:.4f}, level: {level}, account_id: {account_id}" ) + expanded_points, expanded_candidates = await self._expand_starting_points_by_tags( + vector_proxy=vector_proxy, + global_results=global_results, + explicit_tags=query_tags, + context_type=query.context_type.value if query.context_type else None, + target_dirs=target_dirs, + scope_dsl=scope_dsl, + limit=self.TAG_EXPANSION_LIMIT, + ) + # Step 3: Merge starting points starting_points = self._merge_starting_points( query.query, root_uris, global_results, + extra_points=expanded_points, mode=mode, ) @@ -183,6 +200,10 @@ async def retrieve( initial_candidates, mode=mode, ) + initial_candidates = self._merge_initial_candidates( + initial_candidates, + expanded_candidates, + ) # Step 4: Recursive search candidates = await self._recursive_search( @@ -229,6 +250,7 @@ async def _global_vector_search( sparse_query_vector: Optional[Dict[str, float]], context_type: Optional[str], target_dirs: List[str], + tags: List[str], scope_dsl: Optional[Dict[str, Any]], limit: int, ) -> List[Dict[str, Any]]: @@ -238,6 +260,7 @@ async def _global_vector_search( sparse_query_vector=sparse_query_vector, context_type=context_type, target_directories=target_dirs, + tags=tags, extra_filter=scope_dsl, limit=limit, ) @@ -284,6 +307,7 @@ def _merge_starting_points( query: str, root_uris: List[str], global_results: List[Dict[str, Any]], + extra_points: Optional[List[Tuple[str, float]]] = None, mode: str = "thinking", ) -> List[Tuple[str, float]]: """Merge starting points. @@ -320,8 +344,111 @@ def _merge_starting_points( points.append((uri, 0.0)) seen.add(uri) + for uri, score in extra_points or []: + if uri not in seen: + points.append((uri, score)) + seen.add(uri) + return points + def _merge_initial_candidates( + self, + *candidate_groups: Optional[List[Dict[str, Any]]], + ) -> List[Dict[str, Any]]: + merged: Dict[str, Dict[str, Any]] = {} + for group in candidate_groups: + for candidate in group or []: + uri = candidate.get("uri", "") + if not uri: + continue + previous = merged.get(uri) + if previous is None or candidate.get("_score", 0.0) > previous.get("_score", 0.0): + merged[uri] = candidate + return sorted(merged.values(), key=lambda item: item.get("_score", 0.0), reverse=True) + + async def _expand_starting_points_by_tags( + self, + vector_proxy: VikingDBManagerProxy, + global_results: List[Dict[str, Any]], + explicit_tags: List[str], + context_type: Optional[str], + target_dirs: List[str], + scope_dsl: Optional[Dict[str, Any]], + limit: int, + ) -> Tuple[List[Tuple[str, float]], List[Dict[str, Any]]]: + expansion_tags = self._collect_expansion_tags(global_results, explicit_tags) + if not expansion_tags or limit <= 0: + return [], [] + + tag_matches = await vector_proxy.search_by_tags_in_tenant( + tags=expansion_tags, + context_type=context_type, + target_directories=target_dirs, + extra_filter=scope_dsl, + levels=[0, 1, 2], + limit=limit, + ) + telemetry = get_current_telemetry() + telemetry.count("retrieval.tag_expansion.tags", len(expansion_tags)) + telemetry.count("retrieval.tag_expansion.matches", len(tag_matches)) + + seen_uris = {result.get("uri", "") for result in global_results} + expansion_points: Dict[str, float] = {} + expansion_candidates: Dict[str, Dict[str, Any]] = {} + expansion_tag_set = set(expansion_tags) + + for match in tag_matches: + uri = match.get("uri", "") + if not uri or uri in seen_uris: + continue + + overlap = expansion_tag_set.intersection(parse_tags(match.get("tags"))) + score = self._score_tag_expansion(len(overlap)) + + if match.get("level", 2) == 2: + candidate = dict(match) + candidate["_score"] = score + previous = expansion_candidates.get(uri) + if previous is None or score > previous.get("_score", 0.0): + expansion_candidates[uri] = candidate + + start_uri = self._start_uri_from_record(match) + if start_uri and score > expansion_points.get(start_uri, 0.0): + expansion_points[start_uri] = score + + return list(expansion_points.items()), list(expansion_candidates.values()) + + def _collect_expansion_tags( + self, + global_results: List[Dict[str, Any]], + explicit_tags: List[str], + ) -> List[str]: + collected = [explicit_tags] + for result in global_results: + collected.append(parse_tags(result.get("tags"))) + return merge_tags(*collected, max_tags=self.MAX_TAG_EXPANSION_TAGS) + + def _score_tag_expansion(self, overlap_count: int) -> float: + if overlap_count <= 1: + return self.TAG_EXPANSION_SCORE + return self.TAG_EXPANSION_SCORE * (1.0 + 0.2 * min(overlap_count - 1, 3)) + + def _start_uri_from_record(self, record: Dict[str, Any]) -> str: + uri = record.get("uri", "") + if not uri: + return "" + if record.get("level", 2) != 2: + return uri + + parent_uri = record.get("parent_uri") + if parent_uri: + return parent_uri + + normalized = uri.rstrip("/") + if "/" not in normalized: + return "" + return normalized.rsplit("/", 1)[0] + def _prepare_initial_candidates( self, query: str, diff --git a/openviking/server/routers/resources.py b/openviking/server/routers/resources.py index 1f03b5190..44bab0e50 100644 --- a/openviking/server/routers/resources.py +++ b/openviking/server/routers/resources.py @@ -20,6 +20,7 @@ from openviking.server.models import Response from openviking.server.telemetry import run_operation from openviking.telemetry import TelemetryRequest +from openviking.utils.tag_utils import canonicalize_user_tags from openviking_cli.exceptions import InvalidArgumentError from openviking_cli.utils.config.open_viking_config import get_openviking_config @@ -50,6 +51,7 @@ class AddResourceRequest(BaseModel): exclude: Glob pattern for files to exclude during parsing. directly_upload_media: Whether to directly upload media files. Default is True. preserve_structure: Whether to preserve directory structure when adding directories. + tags: Optional semicolon-delimited tags to persist on indexed contexts. watch_interval: Watch interval in minutes for automatic resource monitoring. - watch_interval > 0: Creates or updates a watch task. The resource will be automatically re-processed at the specified interval. @@ -80,6 +82,7 @@ class AddResourceRequest(BaseModel): exclude: Optional[str] = None directly_upload_media: bool = True preserve_structure: Optional[bool] = None + tags: Optional[str] = None telemetry: TelemetryRequest = False watch_interval: float = 0 @@ -213,6 +216,7 @@ async def add_resource( instruction=request.instruction, wait=request.wait, timeout=request.timeout, + tags=canonicalize_user_tags(request.tags), allow_local_path_resolution=allow_local_path_resolution, enforce_public_remote_targets=True, **kwargs, diff --git a/openviking/server/routers/search.py b/openviking/server/routers/search.py index f8c14ead8..e9224681f 100644 --- a/openviking/server/routers/search.py +++ b/openviking/server/routers/search.py @@ -14,6 +14,7 @@ from openviking.server.models import Response from openviking.server.telemetry import run_operation from openviking.telemetry import TelemetryRequest +from openviking.utils.tag_utils import expand_query_tags def _sanitize_floats(obj: Any) -> Any: @@ -40,6 +41,7 @@ class FindRequest(BaseModel): limit: int = 10 node_limit: Optional[int] = None score_threshold: Optional[float] = None + tags: Optional[str] = None filter: Optional[Dict[str, Any]] = None include_provenance: bool = False telemetry: TelemetryRequest = False @@ -54,6 +56,7 @@ class SearchRequest(BaseModel): limit: int = 10 node_limit: Optional[int] = None score_threshold: Optional[float] = None + tags: Optional[str] = None filter: Optional[Dict[str, Any]] = None include_provenance: bool = False telemetry: TelemetryRequest = False @@ -95,6 +98,7 @@ async def find( target_uri=request.target_uri, limit=actual_limit, score_threshold=request.score_threshold, + tags=expand_query_tags(request.tags), filter=request.filter, ), ) @@ -130,6 +134,7 @@ async def _search(): session=session, limit=actual_limit, score_threshold=request.score_threshold, + tags=expand_query_tags(request.tags), filter=request.filter, ) diff --git a/openviking/service/resource_service.py b/openviking/service/resource_service.py index 0447a784a..9076fa01e 100644 --- a/openviking/service/resource_service.py +++ b/openviking/service/resource_service.py @@ -29,6 +29,7 @@ from openviking.utils.network_guard import ensure_public_remote_target from openviking.utils.resource_processor import ResourceProcessor from openviking.utils.skill_processor import SkillProcessor +from openviking.utils.tag_utils import canonicalize_user_tags from openviking_cli.exceptions import ( ConflictError, DeadlineExceededError, @@ -111,6 +112,7 @@ async def add_resource( instruction: str = "", wait: bool = False, timeout: Optional[float] = None, + tags: Optional[List[str]] = None, build_index: bool = True, summarize: bool = False, watch_interval: float = 0, @@ -171,6 +173,7 @@ async def add_resource( telemetry.set("resource.flags.build_index", build_index) telemetry.set("resource.flags.summarize", summarize) telemetry.set("resource.flags.watch_enabled", watch_enabled) + tags = canonicalize_user_tags(tags) try: # add_resource only supports resources scope @@ -202,6 +205,7 @@ async def add_resource( scope="resources", to=to, parent=parent, + tags=tags, build_index=build_index, summarize=summarize, allow_local_path_resolution=allow_local_path_resolution, @@ -242,6 +246,8 @@ async def add_resource( if watch_interval > 0: try: processor_kwargs = self._sanitize_watch_processor_kwargs(kwargs) + if tags: + processor_kwargs["tags"] = tags await self._handle_watch_task_creation( path=path, to_uri=to, diff --git a/openviking/service/search_service.py b/openviking/service/search_service.py index af2562dcf..edd4e3cfd 100644 --- a/openviking/service/search_service.py +++ b/openviking/service/search_service.py @@ -6,10 +6,11 @@ Provides semantic search operations: search, find. """ -from typing import TYPE_CHECKING, Any, Dict, Optional +from typing import TYPE_CHECKING, Any, Dict, List, Optional from openviking.server.identity import RequestContext from openviking.storage.viking_fs import VikingFS +from openviking.utils.tag_utils import expand_query_tags from openviking_cli.exceptions import NotInitializedError from openviking_cli.utils import get_logger @@ -43,6 +44,7 @@ async def search( session: Optional["Session"] = None, limit: int = 10, score_threshold: Optional[float] = None, + tags: Optional[List[str]] = None, filter: Optional[Dict] = None, ) -> Any: """Complex search with session context. @@ -59,6 +61,7 @@ async def search( FindResult """ viking_fs = self._ensure_initialized() + tags = expand_query_tags(tags) session_info = None if session: @@ -71,6 +74,7 @@ async def search( session_info=session_info, limit=limit, score_threshold=score_threshold, + tags=tags, filter=filter, ) return result @@ -82,6 +86,7 @@ async def find( target_uri: str = "", limit: int = 10, score_threshold: Optional[float] = None, + tags: Optional[List[str]] = None, filter: Optional[Dict] = None, ) -> Any: """Semantic search without session context. @@ -97,12 +102,14 @@ async def find( FindResult """ viking_fs = self._ensure_initialized() + tags = expand_query_tags(tags) result = await viking_fs.find( query=query, ctx=ctx, target_uri=target_uri, limit=limit, score_threshold=score_threshold, + tags=tags, filter=filter, ) return result diff --git a/openviking/storage/collection_schemas.py b/openviking/storage/collection_schemas.py index a37276f6f..16fe37f9e 100644 --- a/openviking/storage/collection_schemas.py +++ b/openviking/storage/collection_schemas.py @@ -91,7 +91,7 @@ def context_collection(name: str, vector_dim: int) -> Dict[str, Any]: {"FieldName": "level", "FieldType": "int64"}, {"FieldName": "name", "FieldType": "string"}, {"FieldName": "description", "FieldType": "string"}, - {"FieldName": "tags", "FieldType": "string"}, + {"FieldName": "tags", "FieldType": "list"}, {"FieldName": "abstract", "FieldType": "string"}, {"FieldName": "account_id", "FieldType": "string"}, {"FieldName": "owner_space", "FieldType": "string"}, diff --git a/openviking/storage/queuefs/semantic_dag.py b/openviking/storage/queuefs/semantic_dag.py index 57c4d5e3b..2220a98bc 100644 --- a/openviking/storage/queuefs/semantic_dag.py +++ b/openviking/storage/queuefs/semantic_dag.py @@ -59,6 +59,7 @@ class VectorizeTask: summary_dict: Optional[Dict[str, str]] = None parent_uri: Optional[str] = None use_summary: bool = False + tags: Optional[List[str]] = None # For directory tasks abstract: Optional[str] = None overview: Optional[str] = None @@ -80,6 +81,7 @@ def __init__( recursive: bool = True, lifecycle_lock_handle_id: str = "", is_code_repo: bool = False, + tags: Optional[List[str]] = None, ): self._processor = processor self._context_type = context_type @@ -92,6 +94,7 @@ def __init__( self._recursive = recursive self._lifecycle_lock_handle_id = lifecycle_lock_handle_id self._is_code_repo = is_code_repo + self._tags = list(tags or []) self._llm_sem = asyncio.Semaphore(max_concurrent_llm) self._viking_fs = get_viking_fs() self._nodes: Dict[str, DirNode] = {} @@ -204,6 +207,7 @@ async def wrapped_on_complete() -> None: ctx=task.ctx, semantic_msg_id=task.semantic_msg_id, use_summary=task.use_summary, + tags=task.tags, ) ) else: @@ -215,6 +219,7 @@ async def wrapped_on_complete() -> None: task.overview, ctx=task.ctx, semantic_msg_id=task.semantic_msg_id, + tags=task.tags, ) ) else: @@ -458,6 +463,7 @@ async def _file_summary_task(self, parent_uri: str, file_path: str) -> None: summary_dict=summary_dict, parent_uri=parent_uri, use_summary=use_summary, + tags=self._tags, ) await self._add_vectorize_task(task) except Exception as e: @@ -576,6 +582,7 @@ async def _overview_task(self, dir_uri: str) -> None: semantic_msg_id=self._semantic_msg_id, abstract=abstract, overview=overview, + tags=self._tags, ) await self._add_vectorize_task(task) except Exception as e: diff --git a/openviking/storage/queuefs/semantic_msg.py b/openviking/storage/queuefs/semantic_msg.py index ba27d841a..0ebde9a50 100644 --- a/openviking/storage/queuefs/semantic_msg.py +++ b/openviking/storage/queuefs/semantic_msg.py @@ -3,11 +3,13 @@ """SemanticMsg: Semantic extraction queue message dataclass.""" import json -from dataclasses import asdict, dataclass +from dataclasses import asdict, dataclass, field from datetime import datetime from typing import Any, Dict, List, Optional from uuid import uuid4 +from openviking.utils.tag_utils import parse_tags + @dataclass class SemanticMsg: @@ -41,6 +43,7 @@ class SemanticMsg: target_uri: str = "" lifecycle_lock_handle_id: str = "" is_code_repo: bool = False + tags: List[str] = field(default_factory=list) changes: Optional[Dict[str, List[str]]] = ( None # {"added": [...], "modified": [...], "deleted": [...]} ) @@ -59,6 +62,7 @@ def __init__( target_uri: str = "", lifecycle_lock_handle_id: str = "", is_code_repo: bool = False, + tags: Optional[str | List[str]] = None, changes: Optional[Dict[str, List[str]]] = None, ): self.id = str(uuid4()) @@ -74,6 +78,7 @@ def __init__( self.target_uri = target_uri self.lifecycle_lock_handle_id = lifecycle_lock_handle_id self.is_code_repo = is_code_repo + self.tags = parse_tags(tags) self.changes = changes def to_dict(self) -> Dict[str, Any]: @@ -114,6 +119,7 @@ def from_dict(cls, data: Dict[str, Any]) -> "SemanticMsg": target_uri=data.get("target_uri", ""), lifecycle_lock_handle_id=data.get("lifecycle_lock_handle_id", ""), is_code_repo=data.get("is_code_repo", False), + tags=data.get("tags"), changes=data.get("changes"), ) if "id" in data and data["id"]: diff --git a/openviking/storage/queuefs/semantic_processor.py b/openviking/storage/queuefs/semantic_processor.py index 2a6e2653a..42b32368e 100644 --- a/openviking/storage/queuefs/semantic_processor.py +++ b/openviking/storage/queuefs/semantic_processor.py @@ -307,6 +307,7 @@ async def on_dequeue(self, data: Optional[Dict[str, Any]]) -> Optional[Dict[str, recursive=msg.recursive, lifecycle_lock_handle_id=msg.lifecycle_lock_handle_id, is_code_repo=msg.is_code_repo, + tags=msg.tags, ) self._dag_executor = executor if msg.lifecycle_lock_handle_id: @@ -1271,6 +1272,7 @@ async def _vectorize_directory( overview: str, ctx: Optional[RequestContext] = None, semantic_msg_id: Optional[str] = None, + tags: Optional[List[str]] = None, ) -> None: """Create directory Context and enqueue to EmbeddingQueue.""" @@ -1288,6 +1290,7 @@ async def _vectorize_directory( context_type=context_type, ctx=active_ctx, semantic_msg_id=semantic_msg_id, + tags=tags, ) async def _vectorize_single_file( @@ -1299,6 +1302,7 @@ async def _vectorize_single_file( ctx: Optional[RequestContext] = None, semantic_msg_id: Optional[str] = None, use_summary: bool = False, + tags: Optional[List[str]] = None, ) -> None: """Vectorize a single file using its content or summary.""" from openviking.utils.embedding_utils import vectorize_file @@ -1312,4 +1316,5 @@ async def _vectorize_single_file( ctx=active_ctx, semantic_msg_id=semantic_msg_id, use_summary=use_summary, + tags=tags, ) diff --git a/openviking/storage/viking_fs.py b/openviking/storage/viking_fs.py index 3cbdfa439..d1492ba06 100644 --- a/openviking/storage/viking_fs.py +++ b/openviking/storage/viking_fs.py @@ -899,6 +899,7 @@ async def find( target_uri: str = "", limit: int = 10, score_threshold: Optional[float] = None, + tags: Optional[List[str]] = None, filter: Optional[Dict] = None, ctx: Optional[RequestContext] = None, ): @@ -947,6 +948,7 @@ async def find( context_type=context_type, intent="", target_directories=[target_uri] if target_uri else None, + tags=tags or [], ) real_ctx = self._ctx_or_default(ctx) @@ -987,6 +989,7 @@ async def search( session_info: Optional[Dict] = None, limit: int = 10, score_threshold: Optional[float] = None, + tags: Optional[List[str]] = None, filter: Optional[Dict] = None, ctx: Optional[RequestContext] = None, ): @@ -1047,6 +1050,8 @@ async def search( target_abstract=target_abstract, ) typed_queries = query_plan.queries + for tq in typed_queries: + tq.tags = list(tags or []) # Set target_directories if target_uri_list: for tq in typed_queries: @@ -1062,6 +1067,7 @@ async def search( intent="", priority=1, target_directories=target_uri_list, + tags=tags or [], ) ] else: @@ -1073,6 +1079,7 @@ async def search( intent="", priority=1, target_directories=target_uri_list, + tags=tags or [], ) for ctx_type in [ContextType.MEMORY, ContextType.RESOURCE, ContextType.SKILL] ] diff --git a/openviking/storage/viking_vector_index_backend.py b/openviking/storage/viking_vector_index_backend.py index ce5dc8e7b..44f153194 100644 --- a/openviking/storage/viking_vector_index_backend.py +++ b/openviking/storage/viking_vector_index_backend.py @@ -12,6 +12,7 @@ from openviking.storage.vectordb.collection.collection import Collection from openviking.storage.vectordb.utils.logging_init import init_cpp_logging from openviking.storage.vectordb_adapters import create_collection_adapter +from openviking.utils.tag_utils import serialize_tags from openviking_cli.utils import get_logger from openviking_cli.utils.config.vectordb_config import DEFAULT_INDEX_NAME, VectorDBBackendConfig @@ -19,8 +20,10 @@ RETRIEVAL_OUTPUT_FIELDS = [ "uri", + "parent_uri", "level", "context_type", + "tags", "abstract", "active_count", "updated_at", @@ -136,10 +139,29 @@ def _filter_known_fields(self, data: Dict[str, Any]) -> Dict[str, Any]: except Exception: return data + def _get_field_type(self, field_name: str) -> Optional[str]: + try: + coll = self._get_collection() + fields = self._get_meta_data(coll).get("Fields", []) + except Exception: + return None + + for field in fields: + if field.get("FieldName") != field_name: + continue + field_type = field.get("FieldType") + if hasattr(field_type, "value"): + return field_type.value + return str(field_type) if field_type is not None else None + return None + def _prepare_upsert_payload(self, data: Dict[str, Any]) -> Dict[str, Any]: """Drop runtime-only or stale legacy fields before writing back to the current schema.""" payload = {k: v for k, v in data.items() if v is not None} filtered = self._filter_known_fields(payload) + tags_field_type = self._get_field_type("tags") + if tags_field_type == "string" and "tags" in filtered: + filtered["tags"] = serialize_tags(filtered.get("tags")) return {k: v for k, v in filtered.items() if v is not None} # ========================================================================= @@ -777,6 +799,7 @@ async def search_in_tenant( ctx=ctx, context_type=context_type, target_directories=target_directories, + tags=None, extra_filter=extra_filter, ) return await self.search( @@ -796,6 +819,7 @@ async def search_global_roots_in_tenant( sparse_query_vector: Optional[Dict[str, float]] = None, context_type: Optional[str] = None, target_directories: Optional[List[str]] = None, + tags: Optional[List[str]] = None, extra_filter: Optional[FilterExpr | Dict[str, Any]] = None, limit: int = 10, ) -> List[Dict[str, Any]]: @@ -807,6 +831,7 @@ async def search_global_roots_in_tenant( ctx=ctx, context_type=context_type, target_directories=target_directories, + tags=tags, extra_filter=extra_filter, ), In("level", [0, 1, 2]), # TODO: smj fix this @@ -837,6 +862,7 @@ async def search_children_in_tenant( ctx=ctx, context_type=context_type, target_directories=target_directories, + tags=None, extra_filter=extra_filter, ), ) @@ -849,6 +875,38 @@ async def search_children_in_tenant( ctx=ctx, ) + async def search_by_tags_in_tenant( + self, + ctx: RequestContext, + tags: List[str], + context_type: Optional[str] = None, + target_directories: Optional[List[str]] = None, + extra_filter: Optional[FilterExpr | Dict[str, Any]] = None, + levels: Optional[List[int]] = None, + limit: int = 10, + ) -> List[Dict[str, Any]]: + if not tags: + return [] + + merged_filter = self._build_scope_filter( + ctx=ctx, + context_type=context_type, + target_directories=target_directories, + tags=tags, + extra_filter=extra_filter, + ) + if levels: + merged_filter = self._merge_filters(merged_filter, In("level", levels)) + + return await self.filter( + filter=merged_filter, + limit=limit, + output_fields=RETRIEVAL_OUTPUT_FIELDS, + order_by="active_count", + order_desc=True, + ctx=ctx, + ) + async def search_similar_memories( self, owner_space: Optional[str], @@ -1014,6 +1072,7 @@ def _build_scope_filter( ctx: RequestContext, context_type: Optional[str], target_directories: Optional[List[str]], + tags: Optional[List[str]], extra_filter: Optional[FilterExpr | Dict[str, Any]], ) -> Optional[FilterExpr]: filters: List[FilterExpr] = [] @@ -1033,6 +1092,10 @@ def _build_scope_filter( if uri_conds: filters.append(Or(uri_conds)) + tag_filter = self._build_tag_filter(tags) + if tag_filter: + filters.append(tag_filter) + if extra_filter: if isinstance(extra_filter, dict): filters.append(RawDSL(extra_filter)) @@ -1042,6 +1105,17 @@ def _build_scope_filter( merged = self._merge_filters(*filters) return merged + @staticmethod + def _build_tag_filter(tags: Optional[List[str]]) -> Optional[FilterExpr]: + normalized = [tag for tag in tags or [] if tag] + if not normalized: + return None + + tag_filters = [In("tags", [tag]) for tag in normalized] + if len(tag_filters) == 1: + return tag_filters[0] + return Or(tag_filters) + @staticmethod def _tenant_filter( ctx: RequestContext, context_type: Optional[str] = None diff --git a/openviking/storage/vikingdb_manager.py b/openviking/storage/vikingdb_manager.py index 899e6a1af..0e3d6f431 100644 --- a/openviking/storage/vikingdb_manager.py +++ b/openviking/storage/vikingdb_manager.py @@ -426,6 +426,7 @@ async def search_global_roots_in_tenant( sparse_query_vector: Optional[Dict[str, float]] = None, context_type: Optional[str] = None, target_directories: Optional[List[str]] = None, + tags: Optional[List[str]] = None, extra_filter: Optional[FilterExpr | Dict[str, Any]] = None, limit: int = 10, ) -> List[Dict[str, Any]]: @@ -435,6 +436,7 @@ async def search_global_roots_in_tenant( sparse_query_vector=sparse_query_vector, context_type=context_type, target_directories=target_directories, + tags=tags, extra_filter=extra_filter, limit=limit, ) @@ -460,6 +462,25 @@ async def search_children_in_tenant( limit=limit, ) + async def search_by_tags_in_tenant( + self, + tags: List[str], + context_type: Optional[str] = None, + target_directories: Optional[List[str]] = None, + extra_filter: Optional[FilterExpr | Dict[str, Any]] = None, + levels: Optional[List[int]] = None, + limit: int = 10, + ) -> List[Dict[str, Any]]: + return await self._manager.search_by_tags_in_tenant( + self._ctx, + tags=tags, + context_type=context_type, + target_directories=target_directories, + extra_filter=extra_filter, + levels=levels, + limit=limit, + ) + async def search_similar_memories( self, owner_space: Optional[str], diff --git a/openviking/sync_client.py b/openviking/sync_client.py index 1f1067505..4edd41a94 100644 --- a/openviking/sync_client.py +++ b/openviking/sync_client.py @@ -6,7 +6,7 @@ from __future__ import annotations -from typing import TYPE_CHECKING, Any, Dict, List, Optional +from typing import TYPE_CHECKING, Any, Dict, List, Optional, Union if TYPE_CHECKING: from openviking.session import Session @@ -114,6 +114,7 @@ def add_resource( timeout: float = None, build_index: bool = True, summarize: bool = False, + tags: Optional[Union[str, List[str]]] = None, telemetry: TelemetryRequest = False, **kwargs, ) -> Dict[str, Any]: @@ -138,6 +139,7 @@ def add_resource( timeout=timeout, build_index=build_index, summarize=summarize, + tags=tags, telemetry=telemetry, **kwargs, ) @@ -164,12 +166,21 @@ def search( limit: int = 10, score_threshold: Optional[float] = None, filter: Optional[Dict] = None, + tags: Optional[Union[str, List[str]]] = None, telemetry: TelemetryRequest = False, ): """Execute complex retrieval (intent analysis, hierarchical retrieval).""" return run_async( self._async_client.search( - query, target_uri, session, session_id, limit, score_threshold, filter, telemetry + query=query, + target_uri=target_uri, + session=session, + session_id=session_id, + limit=limit, + score_threshold=score_threshold, + filter=filter, + tags=tags, + telemetry=telemetry, ) ) @@ -180,17 +191,19 @@ def find( limit: int = 10, score_threshold: Optional[float] = None, filter: Optional[Dict] = None, + tags: Optional[Union[str, List[str]]] = None, telemetry: TelemetryRequest = False, ): """Quick retrieval""" return run_async( self._async_client.find( - query, - target_uri, - limit, - score_threshold, - filter, - telemetry, + query=query, + target_uri=target_uri, + limit=limit, + score_threshold=score_threshold, + filter=filter, + tags=tags, + telemetry=telemetry, ) ) @@ -282,9 +295,7 @@ def grep( ) -> Dict: """Content search""" return run_async( - self._async_client.grep( - uri, pattern, case_insensitive, node_limit, exclude_uri - ) + self._async_client.grep(uri, pattern, case_insensitive, node_limit, exclude_uri) ) def glob(self, pattern: str, uri: str = "viking://") -> Dict: diff --git a/openviking/utils/embedding_utils.py b/openviking/utils/embedding_utils.py index cf442cc0a..b78274567 100644 --- a/openviking/utils/embedding_utils.py +++ b/openviking/utils/embedding_utils.py @@ -15,6 +15,12 @@ from openviking.storage.queuefs import get_queue_manager from openviking.storage.queuefs.embedding_msg_converter import EmbeddingMsgConverter from openviking.storage.viking_fs import get_viking_fs +from openviking.utils.tag_utils import ( + AUTO_TAG_NAMESPACE, + extract_context_tags, + merge_tags, + namespace_tags, +) from openviking_cli.utils import VikingURI, get_logger from openviking_cli.utils.config import get_openviking_config @@ -134,6 +140,7 @@ async def vectorize_directory_meta( context_type: str = "resource", ctx: Optional[RequestContext] = None, semantic_msg_id: Optional[str] = None, + tags: Optional[str | list[str]] = None, ) -> None: """ Vectorize directory metadata (.abstract.md and .overview.md). @@ -151,6 +158,11 @@ async def vectorize_directory_meta( parent_uri = VikingURI(uri).parent.uri owner_space = _owner_space_for_uri(uri, ctx) + resolved_tags = extract_context_tags( + uri, + texts=[abstract, overview], + inherited_tags=tags, + ) # Vectorize L0: .abstract.md (abstract) context_abstract = Context( @@ -163,6 +175,7 @@ async def vectorize_directory_meta( user=ctx.user, account_id=ctx.account_id, owner_space=owner_space, + tags=resolved_tags, ) context_abstract.set_vectorize(Vectorize(text=abstract)) msg_abstract = EmbeddingMsgConverter.from_context(context_abstract) @@ -189,6 +202,7 @@ async def vectorize_directory_meta( user=ctx.user, account_id=ctx.account_id, owner_space=owner_space, + tags=resolved_tags, ) context_overview.set_vectorize(Vectorize(text=overview)) msg_overview = EmbeddingMsgConverter.from_context(context_overview) @@ -215,6 +229,7 @@ async def vectorize_file( ctx: Optional[RequestContext] = None, semantic_msg_id: Optional[str] = None, use_summary: bool = False, + tags: Optional[str | list[str]] = None, ) -> None: """ Vectorize a single file. @@ -236,6 +251,15 @@ async def vectorize_file( file_name = summary_dict.get("name") or os.path.basename(file_path) summary = summary_dict.get("summary", "") + summary_tags = namespace_tags( + summary_dict.get("tags"), + AUTO_TAG_NAMESPACE, + ) + resolved_tags = extract_context_tags( + file_path, + texts=[summary, file_name], + inherited_tags=merge_tags(tags, summary_tags), + ) context = Context( uri=file_path, @@ -247,6 +271,7 @@ async def vectorize_file( user=ctx.user, account_id=ctx.account_id, owner_space=_owner_space_for_uri(file_path, ctx), + tags=resolved_tags, ) content_type = get_resource_content_type(file_name) @@ -347,7 +372,10 @@ async def index_resource( overview = content.decode("utf-8") if abstract or overview: - await vectorize_directory_meta(uri, abstract, overview, ctx=ctx) + directory_tags = extract_context_tags(uri, texts=[abstract, overview]) + await vectorize_directory_meta(uri, abstract, overview, ctx=ctx, tags=directory_tags) + else: + directory_tags = extract_context_tags(uri) # 2. Index Files try: @@ -368,7 +396,11 @@ async def index_resource( # For direct indexing, we might not have summaries. # We pass empty summary_dict, vectorize_file will try to read content for text files. await vectorize_file( - file_path=file_uri, summary_dict={"name": file_name}, parent_uri=uri, ctx=ctx + file_path=file_uri, + summary_dict={"name": file_name}, + parent_uri=uri, + ctx=ctx, + tags=directory_tags, ) except Exception as e: diff --git a/openviking/utils/resource_processor.py b/openviking/utils/resource_processor.py index 01aaf75df..35b025dcc 100644 --- a/openviking/utils/resource_processor.py +++ b/openviking/utils/resource_processor.py @@ -124,6 +124,7 @@ async def process_resource( "source_path": None, } telemetry = get_current_telemetry() + resource_tags = kwargs.pop("tags", None) with telemetry.measure("resource.process"): # ============ Phase 1: Parse source and writes to temp viking fs ============ @@ -281,6 +282,7 @@ async def process_resource( lifecycle_lock_handle_id=lifecycle_lock_handle_id, temp_uris=[temp_uri_for_summarize], is_code_repo=is_code_repo, + tags=resource_tags, **kwargs, ) except Exception as e: diff --git a/openviking/utils/summarizer.py b/openviking/utils/summarizer.py index ca3c601f2..4013bbf2c 100644 --- a/openviking/utils/summarizer.py +++ b/openviking/utils/summarizer.py @@ -10,6 +10,7 @@ from openviking.core.directories import get_context_type_for_uri from openviking.storage.queuefs import SemanticMsg, get_queue_manager from openviking.telemetry import get_current_telemetry +from openviking.utils.tag_utils import parse_tags from openviking.telemetry.request_wait_tracker import get_request_wait_tracker from openviking_cli.utils import get_logger @@ -73,6 +74,7 @@ async def summarize( target_uri=uri if uri != temp_uri else None, lifecycle_lock_handle_id=lifecycle_lock_handle_id, is_code_repo=kwargs.get("is_code_repo", False), + tags=parse_tags(kwargs.get("tags")), ) await semantic_queue.enqueue(msg) if msg.telemetry_id: diff --git a/openviking/utils/tag_utils.py b/openviking/utils/tag_utils.py new file mode 100644 index 000000000..ce2bb9fda --- /dev/null +++ b/openviking/utils/tag_utils.py @@ -0,0 +1,304 @@ +# Copyright (c) 2026 Beijing Volcano Engine Technology Co., Ltd. +# SPDX-License-Identifier: AGPL-3.0 +"""Tag parsing and lightweight extraction helpers.""" + +from __future__ import annotations + +import re +from collections import Counter +from typing import Iterable, List, Sequence + +_TAG_SPLIT_RE = re.compile(r"[;,\n]+") +_TAG_NAMESPACE_RE = re.compile(r"[a-z0-9][a-z0-9-]{0,15}") +_WORD_RE = re.compile(r"[a-z0-9][a-z0-9.+-]{1,31}") +_CJK_RE = re.compile(r"[\u4e00-\u9fff]{2,16}") +_CJK_SPLIT_RE = re.compile(r"[的了和与及在中对用为将把等以及、/]+") +_MULTI_DASH_RE = re.compile(r"-{2,}") +USER_TAG_NAMESPACE = "user" +AUTO_TAG_NAMESPACE = "auto" +_GENERIC_PATH_SEGMENTS = { + "agent", + "agents", + "default", + "memories", + "resource", + "resources", + "session", + "sessions", + "skill", + "skills", + "user", + "users", + "viking", +} +_STOP_WORDS = { + "about", + "after", + "also", + "and", + "best", + "build", + "content", + "demo", + "details", + "document", + "documents", + "example", + "examples", + "feature", + "features", + "file", + "files", + "first", + "for", + "from", + "guide", + "how", + "into", + "just", + "more", + "overview", + "project", + "related", + "resource", + "resources", + "sample", + "summary", + "test", + "testing", + "that", + "the", + "their", + "them", + "these", + "this", + "using", + "with", +} + + +def _normalize_namespace(value: str) -> str: + namespace = str(value or "").strip().lower() + if not namespace: + return "" + + namespace = re.sub(r"[^0-9a-z-]+", "-", namespace) + namespace = _MULTI_DASH_RE.sub("-", namespace).strip("-.") + if not _TAG_NAMESPACE_RE.fullmatch(namespace): + return "" + return namespace + + +def _normalize_tag_body(value: str) -> str: + tag = str(value or "").strip().lower() + if not tag: + return "" + + tag = re.sub(r"[\s_/]+", "-", tag) + tag = re.sub(r"[^0-9a-z\u4e00-\u9fff.+-]+", "-", tag) + tag = _MULTI_DASH_RE.sub("-", tag).strip("-.+") + if len(tag) < 2: + return "" + return tag + + +def _normalize_tag(value: str, default_namespace: str | None = None) -> str: + raw = str(value or "").strip() + if not raw: + return "" + + namespace = "" + body = raw + if ":" in raw: + candidate_namespace, body = raw.split(":", 1) + namespace = _normalize_namespace(candidate_namespace) + if not namespace: + return "" + + normalized_body = _normalize_tag_body(body) + if not normalized_body: + return "" + + if not namespace and default_namespace: + namespace = _normalize_namespace(default_namespace) + + if namespace: + return f"{namespace}:{normalized_body}" + return normalized_body + + +def parse_tags( + tags: str | Sequence[str] | None, + *, + default_namespace: str | None = None, +) -> List[str]: + """Parse semicolon-delimited tags or a string sequence into normalized tags.""" + if not tags: + return [] + + if isinstance(tags, str): + raw_items = _TAG_SPLIT_RE.split(tags) + else: + raw_items = [] + for item in tags: + if item is None: + continue + if isinstance(item, str): + raw_items.extend(_TAG_SPLIT_RE.split(item)) + else: + raw_items.append(str(item)) + + seen = set() + normalized: List[str] = [] + for item in raw_items: + tag = _normalize_tag(item, default_namespace=default_namespace) + if not tag or tag in seen: + continue + seen.add(tag) + normalized.append(tag) + return normalized + + +def serialize_tags( + tags: str | Sequence[str] | None, + *, + default_namespace: str | None = None, +) -> str | None: + parsed = parse_tags(tags, default_namespace=default_namespace) + if not parsed: + return None + return ";".join(parsed) + + +def merge_tags(*sources: str | Sequence[str] | None, max_tags: int = 8) -> List[str]: + merged: List[str] = [] + seen = set() + for source in sources: + for tag in parse_tags(source): + if tag in seen: + continue + merged.append(tag) + seen.add(tag) + if len(merged) >= max_tags: + return merged + return merged + + +def _force_namespace(tags: str | Sequence[str] | None, namespace: str) -> List[str]: + normalized_namespace = _normalize_namespace(namespace) + if not normalized_namespace: + return [] + + forced: List[str] = [] + seen = set() + for tag in parse_tags(tags): + body = tag.split(":", 1)[1] if ":" in tag else tag + normalized = f"{normalized_namespace}:{body}" + if normalized in seen: + continue + seen.add(normalized) + forced.append(normalized) + return forced + + +def canonicalize_user_tags(tags: str | Sequence[str] | None) -> List[str]: + """Canonicalize persisted user-provided tags as ``user:``.""" + return _force_namespace(tags, USER_TAG_NAMESPACE) + + +def namespace_tags(tags: str | Sequence[str] | None, namespace: str) -> List[str]: + """Apply a namespace to every tag body, overriding any existing prefix.""" + return _force_namespace(tags, namespace) + + +def expand_query_tags( + tags: str | Sequence[str] | None, + *, + namespaces: Sequence[str] = (USER_TAG_NAMESPACE, AUTO_TAG_NAMESPACE), +) -> List[str]: + """Expand bare query tags across known namespaces for backwards-compatible search.""" + normalized = parse_tags(tags) + if not normalized: + return [] + + expanded: List[str] = [] + seen = set() + for tag in normalized: + candidates = [tag] if ":" in tag else [f"{namespace}:{tag}" for namespace in namespaces] + for candidate in candidates: + if candidate in seen: + continue + seen.add(candidate) + expanded.append(candidate) + return expanded + + +def _extract_path_tags(uri: str) -> List[str]: + raw_path = str(uri or "").removeprefix("viking://") + if not raw_path: + return [] + + results: List[str] = [] + for segment in raw_path.split("/"): + cleaned = segment.strip() + if not cleaned or cleaned.startswith("."): + continue + cleaned = cleaned.rsplit(".", 1)[0] + normalized = _normalize_tag(cleaned) + if not normalized or normalized in _GENERIC_PATH_SEGMENTS: + continue + results.append(normalized) + for token in normalized.split("-"): + if len(token) >= 3 and token not in _GENERIC_PATH_SEGMENTS: + results.append(token) + return results + + +def _extract_text_tags(texts: Iterable[str]) -> List[str]: + words: Counter[str] = Counter() + bigrams: Counter[str] = Counter() + cjk_terms: Counter[str] = Counter() + + for text in texts: + if not text: + continue + + lowered = text.lower() + english_tokens = [ + token + for token in (_normalize_tag(token) for token in _WORD_RE.findall(lowered)) + if token and len(token) >= 3 and token not in _STOP_WORDS + ] + for token in english_tokens: + words[token] += 1 + for left, right in zip(english_tokens, english_tokens[1:], strict=False): + if left == right: + continue + bigrams[f"{left}-{right}"] += 1 + + for chunk in _CJK_RE.findall(text): + for part in _CJK_SPLIT_RE.split(chunk): + normalized = _normalize_tag(part) + if normalized: + cjk_terms[normalized] += 1 + + ranked = [tag for tag, _ in bigrams.most_common(4)] + ranked.extend(tag for tag, _ in cjk_terms.most_common(4)) + ranked.extend(tag for tag, _ in words.most_common(6)) + return ranked + + +def extract_context_tags( + uri: str, + texts: Sequence[str] | None = None, + inherited_tags: str | Sequence[str] | None = None, + max_tags: int = 8, +) -> List[str]: + """Build conservative tags from path segments, text keywords, and inherited tags.""" + text_values = [text for text in (texts or []) if text] + return merge_tags( + inherited_tags, + namespace_tags(_extract_path_tags(uri), AUTO_TAG_NAMESPACE), + namespace_tags(_extract_text_tags(text_values), AUTO_TAG_NAMESPACE), + max_tags=max_tags, + ) diff --git a/openviking_cli/client/base.py b/openviking_cli/client/base.py index 55c21b833..0fbedeedc 100644 --- a/openviking_cli/client/base.py +++ b/openviking_cli/client/base.py @@ -42,6 +42,7 @@ async def add_resource( wait: bool = False, timeout: Optional[float] = None, watch_interval: float = 0, + tags: Optional[Union[str, List[str]]] = None, telemetry: TelemetryRequest = False, ) -> Dict[str, Any]: """Add resource to OpenViking.""" @@ -156,6 +157,7 @@ async def find( limit: int = 10, score_threshold: Optional[float] = None, filter: Optional[Dict] = None, + tags: Optional[Union[str, List[str]]] = None, telemetry: TelemetryRequest = False, ) -> Any: """Semantic search without session context.""" @@ -170,6 +172,7 @@ async def search( limit: int = 10, score_threshold: Optional[float] = None, filter: Optional[Dict] = None, + tags: Optional[Union[str, List[str]]] = None, telemetry: TelemetryRequest = False, ) -> Any: """Semantic search with optional session context.""" @@ -182,7 +185,7 @@ async def grep( pattern: str, case_insensitive: bool = False, exclude_uri: Optional[str] = None, - node_limit: Optional[int] = None + node_limit: Optional[int] = None, ) -> Dict[str, Any]: """Content search with pattern.""" ... diff --git a/openviking_cli/client/http.py b/openviking_cli/client/http.py index edc0852fd..08abbb598 100644 --- a/openviking_cli/client/http.py +++ b/openviking_cli/client/http.py @@ -14,6 +14,7 @@ import httpx from openviking.telemetry import TelemetryRequest, normalize_telemetry_request +from openviking.utils.tag_utils import serialize_tags from openviking_cli.client.base import BaseClient from openviking_cli.exceptions import ( AlreadyExistsError, @@ -330,6 +331,7 @@ async def add_resource( exclude: Optional[str] = None, directly_upload_media: bool = True, preserve_structure: Optional[bool] = None, + tags: Optional[Union[str, List[str]]] = None, telemetry: TelemetryRequest = False, ) -> Dict[str, Any]: """Add resource to OpenViking.""" @@ -349,6 +351,7 @@ async def add_resource( "include": include, "exclude": exclude, "directly_upload_media": directly_upload_media, + "tags": serialize_tags(tags), "telemetry": telemetry, } if preserve_structure is not None: @@ -590,6 +593,7 @@ async def find( node_limit: Optional[int] = None, score_threshold: Optional[float] = None, filter: Optional[Dict[str, Any]] = None, + tags: Optional[Union[str, List[str]]] = None, telemetry: TelemetryRequest = False, ) -> FindResult: """Semantic search without session context.""" @@ -605,6 +609,7 @@ async def find( "limit": actual_limit, "score_threshold": score_threshold, "filter": filter, + "tags": serialize_tags(tags), "telemetry": telemetry, }, ) @@ -621,6 +626,7 @@ async def search( node_limit: Optional[int] = None, score_threshold: Optional[float] = None, filter: Optional[Dict[str, Any]] = None, + tags: Optional[Union[str, List[str]]] = None, telemetry: TelemetryRequest = False, ) -> FindResult: """Semantic search with optional session context.""" @@ -638,6 +644,7 @@ async def search( "limit": actual_limit, "score_threshold": score_threshold, "filter": filter, + "tags": serialize_tags(tags), "telemetry": telemetry, }, ) diff --git a/openviking_cli/client/sync_http.py b/openviking_cli/client/sync_http.py index d5cf7a301..230213901 100644 --- a/openviking_cli/client/sync_http.py +++ b/openviking_cli/client/sync_http.py @@ -156,6 +156,7 @@ def add_resource( include: Optional[str] = None, exclude: Optional[str] = None, directly_upload_media: bool = True, + tags: Optional[Union[str, List[str]]] = None, telemetry: TelemetryRequest = False, ) -> Dict[str, Any]: """Add resource to OpenViking.""" @@ -163,18 +164,19 @@ def add_resource( raise ValueError("Cannot specify both 'to' and 'parent' at the same time.") return run_async( self._async_client.add_resource( - path, - to, - parent, - reason, - instruction, - wait, - timeout, - strict, - ignore_dirs, - include, - exclude, - directly_upload_media, + path=path, + to=to, + parent=parent, + reason=reason, + instruction=instruction, + wait=wait, + timeout=timeout, + strict=strict, + ignore_dirs=ignore_dirs, + include=include, + exclude=exclude, + directly_upload_media=directly_upload_media, + tags=tags, telemetry=telemetry, ) ) @@ -207,6 +209,7 @@ def search( node_limit: Optional[int] = None, score_threshold: Optional[float] = None, filter: Optional[Dict] = None, + tags: Optional[Union[str, List[str]]] = None, telemetry: TelemetryRequest = False, ): """Semantic search with optional session context.""" @@ -220,6 +223,7 @@ def search( node_limit=node_limit, score_threshold=score_threshold, filter=filter, + tags=tags, telemetry=telemetry, ) ) @@ -232,17 +236,19 @@ def find( node_limit: Optional[int] = None, score_threshold: Optional[float] = None, filter: Optional[Dict] = None, + tags: Optional[Union[str, List[str]]] = None, telemetry: TelemetryRequest = False, ): """Semantic search without session context.""" return run_async( self._async_client.find( - query, - target_uri, - limit, - node_limit, - score_threshold, - filter, + query=query, + target_uri=target_uri, + limit=limit, + node_limit=node_limit, + score_threshold=score_threshold, + filter=filter, + tags=tags, telemetry=telemetry, ) ) diff --git a/openviking_cli/retrieve/types.py b/openviking_cli/retrieve/types.py index b3540011f..5b6d90e67 100644 --- a/openviking_cli/retrieve/types.py +++ b/openviking_cli/retrieve/types.py @@ -244,6 +244,7 @@ class TypedQuery: intent: Query intent description priority: Priority (1-5, 1 is highest) target_directories: Directory URIs located by LLM + tags: Explicit tag constraints supplied by the caller """ query: str @@ -251,6 +252,7 @@ class TypedQuery: intent: str priority: int = 3 target_directories: List[str] = field(default_factory=list) + tags: List[str] = field(default_factory=list) @dataclass diff --git a/tests/client/test_search.py b/tests/client/test_search.py index f24e161e8..6d99f6317 100644 --- a/tests/client/test_search.py +++ b/tests/client/test_search.py @@ -3,7 +3,12 @@ """Search tests""" +from types import SimpleNamespace +from unittest.mock import AsyncMock + from openviking.message import TextPart +from openviking.sync_client import SyncOpenViking +from openviking_cli.client.sync_http import SyncHTTPClient class TestFind: @@ -74,3 +79,83 @@ async def test_search(self, client_with_resource_sync): result = await client.search(query="sample", target_uri=parent_uri) assert hasattr(result, "resources") + + +class TestSyncWrappers: + def test_sync_http_client_find_forwards_keyword_args(self): + mock_find = AsyncMock(return_value="ok") + client = object.__new__(SyncHTTPClient) + client._async_client = SimpleNamespace(find=mock_find) + + result = client.find( + query="sample", + target_uri="viking://resources/demo", + limit=5, + node_limit=3, + score_threshold=0.25, + filter={"kind": "resource"}, + tags=["machine-learning"], + telemetry=True, + ) + + assert result == "ok" + mock_find.assert_awaited_once_with( + query="sample", + target_uri="viking://resources/demo", + limit=5, + node_limit=3, + score_threshold=0.25, + filter={"kind": "resource"}, + tags=["machine-learning"], + telemetry=True, + ) + + def test_sync_openviking_search_and_find_forward_keyword_args(self): + mock_search = AsyncMock(return_value="search-ok") + mock_find = AsyncMock(return_value="find-ok") + client = object.__new__(SyncOpenViking) + client._async_client = SimpleNamespace(search=mock_search, find=mock_find) + + search_result = client.search( + query="sample", + target_uri="viking://resources/demo", + session="session-object", + session_id="sess-1", + limit=4, + score_threshold=0.2, + filter={"kind": "resource"}, + tags=["feature-store"], + telemetry=True, + ) + find_result = client.find( + query="sample", + target_uri="viking://resources/demo", + limit=2, + score_threshold=0.1, + filter={"kind": "resource"}, + tags=["feature-store"], + telemetry=True, + ) + + assert search_result == "search-ok" + assert find_result == "find-ok" + mock_search.assert_awaited_once_with( + query="sample", + target_uri="viking://resources/demo", + session="session-object", + session_id="sess-1", + limit=4, + score_threshold=0.2, + filter={"kind": "resource"}, + tags=["feature-store"], + telemetry=True, + ) + mock_find.assert_awaited_once_with( + query="sample", + target_uri="viking://resources/demo", + limit=2, + score_threshold=0.1, + filter={"kind": "resource"}, + tags=["feature-store"], + telemetry=True, + ) diff --git a/tests/retrieve/test_hierarchical_retriever_rerank.py b/tests/retrieve/test_hierarchical_retriever_rerank.py index 45af89ee6..c28a3bf2c 100644 --- a/tests/retrieve/test_hierarchical_retriever_rerank.py +++ b/tests/retrieve/test_hierarchical_retriever_rerank.py @@ -39,6 +39,7 @@ async def search_global_roots_in_tenant( sparse_query_vector=None, context_type=None, target_directories=None, + tags=None, extra_filter=None, limit: int = 10, ): @@ -49,6 +50,7 @@ async def search_global_roots_in_tenant( "sparse_query_vector": sparse_query_vector, "context_type": context_type, "target_directories": target_directories, + "tags": tags, "extra_filter": extra_filter, "limit": limit, } @@ -114,6 +116,18 @@ async def search_children_in_tenant( ] return [] + async def search_by_tags_in_tenant( + self, + ctx, + tags, + context_type=None, + target_directories=None, + extra_filter=None, + levels=None, + limit: int = 10, + ): + return [] + class LevelTwoGlobalStorage(DummyStorage): async def search_global_roots_in_tenant( @@ -123,6 +137,7 @@ async def search_global_roots_in_tenant( sparse_query_vector=None, context_type=None, target_directories=None, + tags=None, extra_filter=None, limit: int = 10, ): @@ -133,6 +148,7 @@ async def search_global_roots_in_tenant( "sparse_query_vector": sparse_query_vector, "context_type": context_type, "target_directories": target_directories, + "tags": tags, "extra_filter": extra_filter, "limit": limit, } diff --git a/tests/retrieve/test_hierarchical_retriever_tags.py b/tests/retrieve/test_hierarchical_retriever_tags.py new file mode 100644 index 000000000..8aa47ad0b --- /dev/null +++ b/tests/retrieve/test_hierarchical_retriever_tags.py @@ -0,0 +1,259 @@ +# Copyright (c) 2026 Beijing Volcano Engine Technology Co., Ltd. +# SPDX-License-Identifier: AGPL-3.0 + +"""Hierarchical retriever tag expansion tests.""" + +import pytest + +from openviking.models.embedder.base import EmbedResult +from openviking.retrieve.hierarchical_retriever import HierarchicalRetriever +from openviking.server.identity import RequestContext, Role +from openviking_cli.retrieve.types import ContextType, TypedQuery +from openviking_cli.session.user_id import UserIdentifier + + +class DummyEmbedder: + def embed(self, _text: str, is_query: bool = False) -> EmbedResult: + return EmbedResult(dense_vector=[0.1, 0.2, 0.3]) + + +class TagExpansionStorage: + def __init__(self) -> None: + self.collection_name = "context" + self.global_search_calls = [] + self.tag_search_calls = [] + self.child_search_calls = [] + + async def collection_exists_bound(self) -> bool: + return True + + async def search_global_roots_in_tenant( + self, + ctx, + query_vector=None, + sparse_query_vector=None, + context_type=None, + target_directories=None, + tags=None, + extra_filter=None, + limit: int = 10, + ): + self.global_search_calls.append( + { + "ctx": ctx, + "context_type": context_type, + "target_directories": target_directories, + "tags": tags, + "extra_filter": extra_filter, + "limit": limit, + } + ) + return [ + { + "uri": "viking://resources/machine-learning", + "parent_uri": "viking://resources", + "abstract": "Machine learning notes", + "_score": 0.9, + "level": 1, + "context_type": "resource", + "tags": ["auto:model-training"], + } + ] + + async def search_by_tags_in_tenant( + self, + ctx, + tags, + context_type=None, + target_directories=None, + extra_filter=None, + levels=None, + limit: int = 10, + ): + self.tag_search_calls.append( + { + "ctx": ctx, + "tags": tags, + "context_type": context_type, + "target_directories": target_directories, + "extra_filter": extra_filter, + "levels": levels, + "limit": limit, + } + ) + return [ + { + "uri": "viking://resources/data-engineering/feature-store.md", + "parent_uri": "viking://resources/data-engineering", + "abstract": "Feature store guidance for model training", + "level": 2, + "context_type": "resource", + "tags": ["auto:model-training", "auto:feature-store"], + } + ] + + async def search_children_in_tenant( + self, + ctx, + parent_uri: str, + query_vector=None, + sparse_query_vector=None, + context_type=None, + target_directories=None, + extra_filter=None, + limit: int = 10, + ): + self.child_search_calls.append(parent_uri) + if parent_uri == "viking://resources/data-engineering": + return [ + { + "uri": "viking://resources/data-engineering/feature-store.md", + "parent_uri": "viking://resources/data-engineering", + "abstract": "Feature store guidance for model training", + "_score": 0.95, + "level": 2, + "context_type": "resource", + "tags": ["auto:model-training", "auto:feature-store"], + } + ] + return [] + + +class ExplicitTagStorage(TagExpansionStorage): + async def search_global_roots_in_tenant( + self, + ctx, + query_vector=None, + sparse_query_vector=None, + context_type=None, + target_directories=None, + tags=None, + extra_filter=None, + limit: int = 10, + ): + self.global_search_calls.append( + { + "ctx": ctx, + "context_type": context_type, + "target_directories": target_directories, + "tags": tags, + "extra_filter": extra_filter, + "limit": limit, + } + ) + return [] + + async def search_by_tags_in_tenant( + self, + ctx, + tags, + context_type=None, + target_directories=None, + extra_filter=None, + levels=None, + limit: int = 10, + ): + self.tag_search_calls.append( + { + "ctx": ctx, + "tags": tags, + "context_type": context_type, + "target_directories": target_directories, + "extra_filter": extra_filter, + "levels": levels, + "limit": limit, + } + ) + return [ + { + "uri": "viking://resources/architecture/microservices.md", + "parent_uri": "viking://resources/architecture", + "abstract": "Microservice architecture patterns", + "level": 2, + "context_type": "resource", + "tags": ["auto:microservice"], + } + ] + + async def search_children_in_tenant( + self, + ctx, + parent_uri: str, + query_vector=None, + sparse_query_vector=None, + context_type=None, + target_directories=None, + extra_filter=None, + limit: int = 10, + ): + self.child_search_calls.append(parent_uri) + if parent_uri == "viking://resources/architecture": + return [ + { + "uri": "viking://resources/architecture/microservices.md", + "parent_uri": "viking://resources/architecture", + "abstract": "Microservice architecture patterns", + "_score": 0.88, + "level": 2, + "context_type": "resource", + "tags": ["auto:microservice"], + } + ] + return [] + + +def _ctx() -> RequestContext: + return RequestContext(user=UserIdentifier("acc1", "user1", "agent1"), role=Role.ROOT) + + +@pytest.fixture(autouse=True) +def _disable_viking_fs(monkeypatch): + monkeypatch.setattr("openviking.retrieve.hierarchical_retriever.get_viking_fs", lambda: None) + + +@pytest.mark.asyncio +async def test_retrieve_expands_related_subtree_from_global_hit_tags(): + storage = TagExpansionStorage() + retriever = HierarchicalRetriever(storage=storage, embedder=DummyEmbedder(), rerank_config=None) + + result = await retriever.retrieve( + TypedQuery( + query="model training best practices", + context_type=ContextType.RESOURCE, + intent="", + ), + ctx=_ctx(), + limit=3, + ) + + assert storage.tag_search_calls + assert storage.tag_search_calls[0]["tags"] == ["auto:model-training"] + assert "viking://resources/data-engineering" in storage.child_search_calls + assert [ctx.uri for ctx in result.matched_contexts] == [ + "viking://resources/data-engineering/feature-store.md" + ] + + +@pytest.mark.asyncio +async def test_retrieve_uses_explicit_tags_when_global_search_returns_nothing(): + storage = ExplicitTagStorage() + retriever = HierarchicalRetriever(storage=storage, embedder=DummyEmbedder(), rerank_config=None) + + result = await retriever.retrieve( + TypedQuery( + query="architecture guidance", + context_type=ContextType.RESOURCE, + intent="", + tags=["microservice"], + ), + ctx=_ctx(), + limit=3, + ) + + assert storage.global_search_calls + assert storage.global_search_calls[0]["tags"] == ["user:microservice", "auto:microservice"] + assert storage.tag_search_calls + assert storage.tag_search_calls[0]["tags"] == ["user:microservice", "auto:microservice"] + assert [ctx.uri for ctx in result.matched_contexts] == [ + "viking://resources/architecture/microservices.md" + ] diff --git a/tests/retrieve/test_hierarchical_retriever_target_dirs.py b/tests/retrieve/test_hierarchical_retriever_target_dirs.py index 6df26a238..ea2281d50 100644 --- a/tests/retrieve/test_hierarchical_retriever_target_dirs.py +++ b/tests/retrieve/test_hierarchical_retriever_target_dirs.py @@ -29,6 +29,7 @@ async def search_global_roots_in_tenant( sparse_query_vector=None, context_type=None, target_directories=None, + tags=None, extra_filter=None, limit: int = 10, ): @@ -39,6 +40,7 @@ async def search_global_roots_in_tenant( "sparse_query_vector": sparse_query_vector, "context_type": context_type, "target_directories": target_directories, + "tags": tags, "extra_filter": extra_filter, "limit": limit, } @@ -70,6 +72,18 @@ async def search_children_in_tenant( ) return [] + async def search_by_tags_in_tenant( + self, + ctx, + tags, + context_type=None, + target_directories=None, + extra_filter=None, + levels=None, + limit: int = 10, + ): + return [] + @pytest.mark.asyncio async def test_retrieve_honors_target_directories_scope_filter(): diff --git a/tests/server/conftest.py b/tests/server/conftest.py index 577420d56..bdfddabd3 100644 --- a/tests/server/conftest.py +++ b/tests/server/conftest.py @@ -3,12 +3,14 @@ """Shared fixtures for OpenViking server tests.""" +import json import shutil import socket import threading import time from pathlib import Path from types import SimpleNamespace +from unittest.mock import patch import httpx import pytest @@ -24,7 +26,9 @@ from openviking.storage.transaction import reset_lock_manager from openviking_cli.session.user_id import UserIdentifier from openviking_cli.utils.config.embedding_config import EmbeddingConfig +from openviking_cli.utils.config.open_viking_config import OpenVikingConfigSingleton from openviking_cli.utils.config.vlm_config import VLMConfig +from tests.utils.mock_agfs import MockLocalAGFS # --------------------------------------------------------------------------- # Paths @@ -84,6 +88,44 @@ async def _fake_get_vision_completion(self, prompt, images, thinking=False): monkeypatch.setattr(VLMConfig, "get_vision_completion_async", _fake_get_vision_completion) +@pytest.fixture(scope="function") +def _configure_test_env(monkeypatch, tmp_path): + """Provide an isolated local config for in-process server tests.""" + config_path = tmp_path / "ov.conf" + config_path.write_text( + json.dumps( + { + "storage": { + "workspace": str(tmp_path / "workspace"), + "agfs": {"backend": "local", "mode": "binding-client"}, + "vectordb": {"backend": "local"}, + }, + "embedding": { + "dense": { + "provider": "openai", + "model": "test-embedder", + "api_base": "http://127.0.0.1:11434/v1", + "dimension": 2048, + } + }, + "encryption": {"enabled": False}, + "server": {"host": "127.0.0.1"}, + } + ), + encoding="utf-8", + ) + + mock_agfs = MockLocalAGFS(root_path=tmp_path / "mock_agfs_root") + + monkeypatch.setenv("OPENVIKING_CONFIG_FILE", str(config_path)) + OpenVikingConfigSingleton.reset_instance() + + with patch("openviking.utils.agfs_utils.create_agfs_client", return_value=mock_agfs): + yield + + OpenVikingConfigSingleton.reset_instance() + + # --------------------------------------------------------------------------- # Core fixtures: service + app + async client (HTTP API tests, in-process) # --------------------------------------------------------------------------- @@ -126,7 +168,7 @@ def upload_temp_dir(temp_dir: Path, monkeypatch) -> Path: @pytest_asyncio.fixture(scope="function") -async def service(temp_dir: Path, monkeypatch): +async def service(temp_dir: Path, monkeypatch, _configure_test_env): """Create and initialize an OpenVikingService in embedded mode.""" reset_lock_manager() fake_embedder_cls = _install_fake_embedder(monkeypatch) @@ -180,7 +222,7 @@ async def client_with_resource(client, service, sample_markdown_file): @pytest_asyncio.fixture(scope="function") -async def running_server(temp_dir: Path, monkeypatch): +async def running_server(temp_dir: Path, monkeypatch, _configure_test_env): """Start a real uvicorn server in a background thread.""" await AsyncOpenViking.reset() reset_lock_manager() diff --git a/tests/server/test_api_resources.py b/tests/server/test_api_resources.py index 2357fd5a4..f4ad0c456 100644 --- a/tests/server/test_api_resources.py +++ b/tests/server/test_api_resources.py @@ -132,6 +132,39 @@ async def fake_add_resource(**kwargs): } +async def test_add_resource_forwards_tags_to_service( + client: httpx.AsyncClient, + service, + monkeypatch, + upload_temp_dir, +): + captured = {} + + async def fake_add_resource(**kwargs): + captured.update(kwargs) + return { + "status": "success", + "root_uri": "viking://resources/demo", + } + + monkeypatch.setattr(service.resources, "add_resource", fake_add_resource) + + demo_file = upload_temp_dir / "demo.md" + demo_file.write_text("# demo\n") + + resp = await client.post( + "/api/v1/resources", + json={ + "temp_file_id": demo_file.name, + "reason": "tagged resource", + "tags": "auto:machine-learning;feature-store", + }, + ) + + assert resp.status_code == 200 + assert captured["tags"] == ["user:machine-learning", "user:feature-store"] + + async def test_add_resource_with_summary_only_telemetry( client: httpx.AsyncClient, sample_markdown_file, diff --git a/tests/server/test_api_search.py b/tests/server/test_api_search.py index 07a2922ab..bb1b0cb61 100644 --- a/tests/server/test_api_search.py +++ b/tests/server/test_api_search.py @@ -65,6 +65,63 @@ async def test_find_no_results(client: httpx.AsyncClient): assert resp.json()["status"] == "ok" +async def test_find_forwards_tags_to_service(client_with_resource, service, monkeypatch): + client, _ = client_with_resource + captured = {} + + async def fake_find(**kwargs): + captured.update(kwargs) + + class _Result: + def to_dict(self, include_provenance: bool = False): + return {"resources": [], "memories": [], "skills": []} + + return _Result() + + monkeypatch.setattr(service.search, "find", fake_find) + + resp = await client.post( + "/api/v1/search/find", + json={"query": "sample", "tags": "machine-learning;feature-store"}, + ) + + assert resp.status_code == 200 + assert captured["tags"] == [ + "user:machine-learning", + "auto:machine-learning", + "user:feature-store", + "auto:feature-store", + ] + + +async def test_search_forwards_tags_to_service(client, service, monkeypatch): + captured = {} + + async def fake_search(**kwargs): + captured.update(kwargs) + + class _Result: + def to_dict(self, include_provenance: bool = False): + return {"resources": [], "memories": [], "skills": []} + + return _Result() + + monkeypatch.setattr(service.search, "search", fake_search) + + resp = await client.post( + "/api/v1/search/search", + json={"query": "sample", "tags": "machine-learning;feature-store"}, + ) + + assert resp.status_code == 200 + assert captured["tags"] == [ + "user:machine-learning", + "auto:machine-learning", + "user:feature-store", + "auto:feature-store", + ] + + async def test_search_basic(client_with_resource): client, uri = client_with_resource resp = await client.post( @@ -195,8 +252,6 @@ async def test_grep_case_insensitive(client_with_resource): assert resp.json()["status"] == "ok" - - async def test_grep_exclude_uri_excludes_specific_uri_range( client: httpx.AsyncClient, upload_temp_dir, @@ -231,7 +286,7 @@ async def test_grep_exclude_uri_excludes_specific_uri_range( assert body["status"] == "ok" matches = body["result"]["matches"] assert matches - assert all(not m["uri"].startswith(exclude_uri.rstrip('/')) for m in matches) + assert all(not m["uri"].startswith(exclude_uri.rstrip("/")) for m in matches) async def test_grep_exclude_uri_does_not_exclude_same_named_sibling_dirs( @@ -275,6 +330,7 @@ async def test_grep_exclude_uri_does_not_exclude_same_named_sibling_dirs( assert any(uri.startswith("viking://resources/group_b/cache/") for uri in uris) assert all(not uri.startswith("viking://resources/group_a/cache/") for uri in uris) + async def test_glob(client_with_resource): client, _ = client_with_resource resp = await client.post( diff --git a/tests/storage/test_collection_schemas.py b/tests/storage/test_collection_schemas.py index 59618d22c..10c02e68f 100644 --- a/tests/storage/test_collection_schemas.py +++ b/tests/storage/test_collection_schemas.py @@ -15,6 +15,7 @@ init_context_collection, ) from openviking.storage.queuefs.embedding_msg import EmbeddingMsg +from openviking.storage.vectordb.utils.data_processor import DataProcessor from openviking.storage.viking_vector_index_backend import _SingleAccountBackend from openviking_cli.utils.config.vectordb_config import VectorDBBackendConfig @@ -37,15 +38,18 @@ def __init__(self, embedder: _DummyEmbedder, backend: str = "volcengine"): ) -def _build_queue_payload() -> dict: +def _build_queue_payload(*, tags=None) -> dict: + context_data = { + "id": "id-1", + "uri": "viking://resources/sample", + "account_id": "default", + "abstract": "sample", + } + if tags is not None: + context_data["tags"] = tags msg = EmbeddingMsg( message="hello", - context_data={ - "id": "id-1", - "uri": "viking://resources/sample", - "account_id": "default", - "abstract": "sample", - }, + context_data=context_data, ) return {"data": json.dumps(msg.to_dict())} @@ -208,6 +212,46 @@ def test_context_collection_excludes_parent_uri(): assert "parent_uri" not in schema["ScalarIndex"] +def test_context_collection_uses_list_string_tags(): + schema = CollectionSchemas.context_collection("ctx", 8) + tags_field = next(field for field in schema["Fields"] if field["FieldName"] == "tags") + + assert tags_field["FieldType"] == "list" + assert "tags" in schema["ScalarIndex"] + + +def test_context_collection_accepts_list_and_legacy_string_tags(): + schema = CollectionSchemas.context_collection("ctx", 8) + fields = {field["FieldName"]: field for field in schema["Fields"]} + processor = DataProcessor(fields, collection_name="ctx") + + processed = processor.validate_and_process( + { + "uri": "viking://resources/sample", + "vector": [0.1] * 8, + "tags": ["user:model-training", "auto:feature-store"], + } + ) + legacy = processor.validate_and_process( + { + "uri": "viking://resources/sample", + "vector": [0.1] * 8, + "tags": "user:model-training;auto:feature-store", + } + ) + empty = processor.validate_and_process( + { + "uri": "viking://resources/sample", + "vector": [0.1] * 8, + "tags": [], + } + ) + + assert processed["tags"] == ["user:model-training", "auto:feature-store"] + assert legacy["tags"] == ["user:model-training", "auto:feature-store"] + assert empty["tags"] == [] + + def test_context_collection_signature_has_no_include_parent_uri(): signature = inspect.signature(CollectionSchemas.context_collection) @@ -355,3 +399,237 @@ def upsert(self, data): "active_count": 2, "account_id": "acc1", } + + +@pytest.mark.asyncio +async def test_single_account_backend_serializes_tag_lists_for_legacy_string_schema(): + captured = {} + + class _Collection: + def get_meta_data(self): + return { + "Fields": [ + {"FieldName": "id", "FieldType": "string"}, + {"FieldName": "uri", "FieldType": "path"}, + {"FieldName": "abstract", "FieldType": "string"}, + {"FieldName": "tags", "FieldType": "string"}, + {"FieldName": "account_id", "FieldType": "string"}, + ] + } + + class _Adapter: + mode = "local" + + def get_collection(self): + return _Collection() + + def upsert(self, data): + captured["data"] = dict(data) + return ["rec-tags"] + + backend = _SingleAccountBackend( + config=VectorDBBackendConfig(backend="local", name="context", dimension=2), + bound_account_id="acc1", + shared_adapter=_Adapter(), + ) + + record_id = await backend.upsert( + { + "id": "rec-tags", + "uri": "viking://resources/sample", + "abstract": "sample", + "tags": ["user:model-training", "auto:feature-store"], + "account_id": "acc1", + } + ) + + assert record_id == "rec-tags" + assert captured["data"]["tags"] == "user:model-training;auto:feature-store" + + +@pytest.mark.asyncio +async def test_single_account_backend_drops_empty_tag_lists_for_legacy_string_schema(): + captured = {} + + class _Collection: + def get_meta_data(self): + return { + "Fields": [ + {"FieldName": "id", "FieldType": "string"}, + {"FieldName": "uri", "FieldType": "path"}, + {"FieldName": "abstract", "FieldType": "string"}, + {"FieldName": "tags", "FieldType": "string"}, + {"FieldName": "account_id", "FieldType": "string"}, + ] + } + + class _Adapter: + mode = "local" + + def get_collection(self): + return _Collection() + + def upsert(self, data): + captured["data"] = dict(data) + return ["rec-tags"] + + backend = _SingleAccountBackend( + config=VectorDBBackendConfig(backend="local", name="context", dimension=2), + bound_account_id="acc1", + shared_adapter=_Adapter(), + ) + + record_id = await backend.upsert( + { + "id": "rec-tags", + "uri": "viking://resources/sample", + "abstract": "sample", + "tags": [], + "account_id": "acc1", + } + ) + + assert record_id == "rec-tags" + assert "tags" not in captured["data"] + + +@pytest.mark.asyncio +async def test_single_account_backend_keeps_tag_lists_for_list_schema(): + captured = {} + + class _Collection: + def get_meta_data(self): + return { + "Fields": [ + {"FieldName": "id", "FieldType": "string"}, + {"FieldName": "uri", "FieldType": "path"}, + {"FieldName": "abstract", "FieldType": "string"}, + {"FieldName": "tags", "FieldType": "list"}, + {"FieldName": "account_id", "FieldType": "string"}, + ] + } + + class _Adapter: + mode = "local" + + def get_collection(self): + return _Collection() + + def upsert(self, data): + captured["data"] = dict(data) + return ["rec-tags"] + + backend = _SingleAccountBackend( + config=VectorDBBackendConfig(backend="local", name="context", dimension=2), + bound_account_id="acc1", + shared_adapter=_Adapter(), + ) + + record_id = await backend.upsert( + { + "id": "rec-tags", + "uri": "viking://resources/sample", + "abstract": "sample", + "tags": ["user:model-training", "auto:feature-store"], + "account_id": "acc1", + } + ) + + assert record_id == "rec-tags" + assert captured["data"]["tags"] == ["user:model-training", "auto:feature-store"] + + +@pytest.mark.asyncio +async def test_embedding_handler_forwards_list_tags_to_backend_upsert(monkeypatch): + captured = {} + + class _CapturingVikingDB: + is_closing = False + mode = "local" + + async def upsert(self, data, *, ctx): + captured["data"] = dict(data) + return "rec-tags" + + embedder = _DummyEmbedder() + monkeypatch.setattr( + "openviking_cli.utils.config.get_openviking_config", + lambda: _DummyConfig(embedder), + ) + + handler = TextEmbeddingHandler(_CapturingVikingDB()) + result = await handler.on_dequeue( + _build_queue_payload(tags=["user:model-training", "auto:feature-store"]) + ) + + assert result is not None + assert captured["data"]["tags"] == ["user:model-training", "auto:feature-store"] + + +@pytest.mark.asyncio +@pytest.mark.parametrize( + ("input_tags", "expected_written_tags"), + [ + (["user:model-training", "auto:feature-store"], "user:model-training;auto:feature-store"), + ([], None), + ], +) +async def test_embedding_handler_preserves_list_tags_while_writing_legacy_string_schema( + monkeypatch, + input_tags, + expected_written_tags, +): + captured = {} + + class _Collection: + def get_meta_data(self): + return { + "Fields": [ + {"FieldName": "id", "FieldType": "string"}, + {"FieldName": "uri", "FieldType": "path"}, + {"FieldName": "abstract", "FieldType": "string"}, + {"FieldName": "vector", "FieldType": "vector"}, + {"FieldName": "tags", "FieldType": "string"}, + {"FieldName": "account_id", "FieldType": "string"}, + ] + } + + class _Adapter: + mode = "local" + + def get_collection(self): + return _Collection() + + def upsert(self, data): + captured["data"] = dict(data) + return ["rec-tags"] + + backend = _SingleAccountBackend( + config=VectorDBBackendConfig(backend="local", name="context", dimension=2), + bound_account_id="default", + shared_adapter=_Adapter(), + ) + + class _BackendFacade: + is_closing = False + mode = "local" + + async def upsert(self, data, *, ctx): + del ctx + return await backend.upsert(data) + + embedder = _DummyEmbedder() + monkeypatch.setattr( + "openviking_cli.utils.config.get_openviking_config", + lambda: _DummyConfig(embedder), + ) + + handler = TextEmbeddingHandler(_BackendFacade()) + result = await handler.on_dequeue(_build_queue_payload(tags=input_tags)) + + assert result is not None + assert result["tags"] == input_tags + if expected_written_tags is None: + assert "tags" not in captured["data"] + else: + assert captured["data"]["tags"] == expected_written_tags diff --git a/tests/storage/test_semantic_processor_lifecycle_lock.py b/tests/storage/test_semantic_processor_lifecycle_lock.py index 9fd43e9ae..5dcc6d61f 100644 --- a/tests/storage/test_semantic_processor_lifecycle_lock.py +++ b/tests/storage/test_semantic_processor_lifecycle_lock.py @@ -86,3 +86,13 @@ def get_stats(self): ) assert lock_manager.release_calls == [] + + +def test_semantic_msg_normalizes_missing_tags_to_empty_list(): + msg = SemanticMsg( + uri="viking://resources/demo", + context_type="resource", + recursive=False, + ) + + assert msg.tags == [] diff --git a/tests/unit/test_vectorize_file_strategy.py b/tests/unit/test_vectorize_file_strategy.py index 5be9a387e..d14a1fe0a 100644 --- a/tests/unit/test_vectorize_file_strategy.py +++ b/tests/unit/test_vectorize_file_strategy.py @@ -107,3 +107,37 @@ async def test_vectorize_file_truncates_content_when_content_only(monkeypatch): text = queue.items[0].get_vectorization_text() assert text.startswith("A" * 1000) assert text.endswith("...(truncated for embedding)") + + +@pytest.mark.asyncio +async def test_vectorize_file_forces_summary_tags_into_auto_namespace(monkeypatch): + queue = DummyQueue() + monkeypatch.setattr(embedding_utils, "get_queue_manager", lambda: DummyQueueManager(queue)) + monkeypatch.setattr(embedding_utils, "get_viking_fs", lambda: DummyFS("content")) + monkeypatch.setattr( + embedding_utils, + "get_openviking_config", + lambda: types.SimpleNamespace( + embedding=types.SimpleNamespace(text_source="summary_first", max_input_chars=1000) + ), + ) + monkeypatch.setattr( + embedding_utils.EmbeddingMsgConverter, + "from_context", + lambda context: context, + ) + + await embedding_utils.vectorize_file( + file_path="viking://user/default/resources/ml-notes.md", + summary_dict={ + "name": "ml-notes.md", + "summary": "short summary", + "tags": "user:model-training;pytorch", + }, + parent_uri="viking://user/default/resources", + ctx=DummyReq(), + ) + + assert len(queue.items) == 1 + assert "auto:model-training" in queue.items[0].tags + assert "user:model-training" not in queue.items[0].tags diff --git a/tests/utils/test_tag_utils.py b/tests/utils/test_tag_utils.py new file mode 100644 index 000000000..d8ae5ec34 --- /dev/null +++ b/tests/utils/test_tag_utils.py @@ -0,0 +1,77 @@ +# Copyright (c) 2026 Beijing Volcano Engine Technology Co., Ltd. +# SPDX-License-Identifier: AGPL-3.0 + +"""Tests for tag normalization and namespace expansion helpers.""" + +from openviking.utils.tag_utils import ( + AUTO_TAG_NAMESPACE, + USER_TAG_NAMESPACE, + canonicalize_user_tags, + expand_query_tags, + extract_context_tags, + namespace_tags, + parse_tags, + serialize_tags, +) + + +def test_parse_tags_preserves_explicit_namespace(): + assert parse_tags("user:machine-learning;auto:pytorch") == [ + "user:machine-learning", + "auto:pytorch", + ] + + +def test_canonicalize_user_tags_adds_default_namespace(): + assert canonicalize_user_tags("machine-learning;feature-store") == [ + "user:machine-learning", + "user:feature-store", + ] + + +def test_canonicalize_user_tags_rewrites_explicit_auto_namespace(): + assert canonicalize_user_tags("auto:pytorch;feature-store") == [ + "user:pytorch", + "user:feature-store", + ] + + +def test_expand_query_tags_matches_both_known_namespaces_for_bare_input(): + assert expand_query_tags("machine-learning") == [ + "user:machine-learning", + "auto:machine-learning", + ] + + +def test_expand_query_tags_preserves_namespaced_input(): + assert expand_query_tags("auto:pytorch;user:model-training") == [ + "auto:pytorch", + "user:model-training", + ] + + +def test_extract_context_tags_namespaces_auto_generated_terms(): + tags = extract_context_tags( + "viking://resources/ml/feature-store.md", + texts=["Feature store guidance for model training systems."], + inherited_tags=["user:retrieval"], + ) + assert "user:retrieval" in tags + assert any(tag.startswith(f"{AUTO_TAG_NAMESPACE}:") for tag in tags if tag != "user:retrieval") + + +def test_namespace_tags_overrides_existing_prefix(): + assert namespace_tags("user:feature-store;model-training", AUTO_TAG_NAMESPACE) == [ + "auto:feature-store", + "auto:model-training", + ] + + +def test_serialize_tags_preserves_namespaces(): + assert ( + serialize_tags( + ["user:machine-learning", "auto:feature-store"], + default_namespace=USER_TAG_NAMESPACE, + ) + == "user:machine-learning;auto:feature-store" + )