From 8633398cb672652b4071afed100374b50f8342b9 Mon Sep 17 00:00:00 2001 From: "zhoujiahui.01" Date: Thu, 2 Apr 2026 18:04:49 +0800 Subject: [PATCH] Implement request-scoped wait for write APIs fix: request wait telemetry id fix: register request wait before enqueue add log --- openviking/service/resource_service.py | 76 ++-- openviking/storage/collection_schemas.py | 33 ++ openviking/storage/content_write.py | 49 ++- openviking/storage/queuefs/semantic_dag.py | 7 + .../storage/queuefs/semantic_processor.py | 33 ++ openviking/storage/vectordb_adapters/base.py | 2 +- openviking/telemetry/context.py | 13 +- openviking/telemetry/execution.py | 33 +- openviking/telemetry/operation.py | 2 +- openviking/telemetry/request_wait_tracker.py | 200 ++++++++++ openviking/telemetry/resource_summary.py | 10 +- openviking/utils/skill_processor.py | 7 +- openviking/utils/summarizer.py | 7 +- tests/client/test_filesystem.py | 38 ++ tests/client/test_resource_management.py | 43 ++ tests/client/test_skill_management.py | 46 +++ tests/server/test_request_wait_tracking.py | 369 ++++++++++++++++++ tests/telemetry/test_request_wait_tracker.py | 38 ++ tests/test_telemetry_runtime.py | 7 + 19 files changed, 957 insertions(+), 56 deletions(-) create mode 100644 openviking/telemetry/request_wait_tracker.py create mode 100644 tests/server/test_request_wait_tracking.py create mode 100644 tests/telemetry/test_request_wait_tracker.py diff --git a/openviking/service/resource_service.py b/openviking/service/resource_service.py index 056a09fd0..522493f23 100644 --- a/openviking/service/resource_service.py +++ b/openviking/service/resource_service.py @@ -15,6 +15,7 @@ from openviking.storage.queuefs import get_queue_manager from openviking.storage.viking_fs import VikingFS from openviking.telemetry import get_current_telemetry +from openviking.telemetry.request_wait_tracker import get_request_wait_tracker from openviking.telemetry.resource_summary import ( build_queue_status_payload, record_resource_wait_metrics, @@ -150,6 +151,9 @@ async def add_resource( request_start = time.perf_counter() telemetry = get_current_telemetry() telemetry_id = register_wait_telemetry(wait) + request_wait_tracker = get_request_wait_tracker() + if wait and telemetry_id: + request_wait_tracker.register_request(telemetry_id) watch_manager = self._get_watch_manager() watch_enabled = bool( watch_manager and to and not skip_watch_management and watch_interval > 0 @@ -194,11 +198,19 @@ async def add_resource( ) if wait: - qm = get_queue_manager() wait_start = time.perf_counter() try: with telemetry.measure("resource.wait"): - status = await qm.wait_complete(timeout=timeout) + if telemetry_id: + await request_wait_tracker.wait_for_request( + telemetry_id, timeout=timeout + ) + status = request_wait_tracker.build_queue_status(telemetry_id) + else: + qm = get_queue_manager() + status = build_queue_status_payload( + await qm.wait_complete(timeout=timeout) + ) except TimeoutError as exc: telemetry.set_error( "resource_service.wait_complete", @@ -207,7 +219,7 @@ async def add_resource( ) raise DeadlineExceededError("queue processing", timeout) from exc queue_wait_duration_ms = round((time.perf_counter() - wait_start) * 1000, 3) - result["queue_status"] = build_queue_status_payload(status) + result["queue_status"] = status record_resource_wait_metrics( telemetry_id=telemetry_id, queue_status=status, @@ -257,6 +269,7 @@ async def add_resource( "resource.request.duration_ms", round((time.perf_counter() - request_start) * 1000, 3), ) + get_request_wait_tracker().cleanup(telemetry_id) unregister_wait_telemetry(telemetry_id) async def _handle_watch_task_creation( @@ -392,33 +405,44 @@ async def add_skill( Processing result """ self._ensure_initialized() + telemetry_id = get_current_telemetry().telemetry_id + request_wait_tracker = get_request_wait_tracker() + if wait and telemetry_id: + request_wait_tracker.register_request(telemetry_id) - result = await self._skill_processor.process_skill( - data=data, - viking_fs=self._viking_fs, - ctx=ctx, - allow_local_path_resolution=allow_local_path_resolution, - ) + try: + result = await self._skill_processor.process_skill( + data=data, + viking_fs=self._viking_fs, + ctx=ctx, + allow_local_path_resolution=allow_local_path_resolution, + ) - if wait: - qm = get_queue_manager() - wait_start = time.perf_counter() - try: - status = await qm.wait_complete(timeout=timeout) - except TimeoutError as exc: - get_current_telemetry().set_error( - "resource_service.wait_complete", - "DEADLINE_EXCEEDED", - str(exc), + if wait: + wait_start = time.perf_counter() + try: + if telemetry_id: + await request_wait_tracker.wait_for_request(telemetry_id, timeout=timeout) + status = request_wait_tracker.build_queue_status(telemetry_id) + else: + qm = get_queue_manager() + status = build_queue_status_payload(await qm.wait_complete(timeout=timeout)) + except TimeoutError as exc: + get_current_telemetry().set_error( + "resource_service.wait_complete", + "DEADLINE_EXCEEDED", + str(exc), + ) + raise DeadlineExceededError("queue processing", timeout) from exc + get_current_telemetry().set( + "queue.wait.duration_ms", + round((time.perf_counter() - wait_start) * 1000, 3), ) - raise DeadlineExceededError("queue processing", timeout) from exc - get_current_telemetry().set( - "queue.wait.duration_ms", - round((time.perf_counter() - wait_start) * 1000, 3), - ) - result["queue_status"] = build_queue_status_payload(status) + result["queue_status"] = status - return result + return result + finally: + request_wait_tracker.cleanup(telemetry_id) async def build_index( self, resource_uris: List[str], ctx: RequestContext, **kwargs diff --git a/openviking/storage/collection_schemas.py b/openviking/storage/collection_schemas.py index ae09e8219..a37276f6f 100644 --- a/openviking/storage/collection_schemas.py +++ b/openviking/storage/collection_schemas.py @@ -22,6 +22,7 @@ from openviking.storage.queuefs.named_queue import DequeueHandlerBase from openviking.storage.viking_vector_index_backend import VikingVectorIndexBackend from openviking.telemetry import bind_telemetry, resolve_telemetry +from openviking.telemetry.request_wait_tracker import get_request_wait_tracker from openviking.utils.circuit_breaker import ( CircuitBreaker, CircuitBreakerOpen, @@ -229,6 +230,7 @@ async def on_dequeue(self, data: Optional[Dict[str, Any]]) -> Optional[Dict[str, collector = None report_success = False report_error_args: Optional[tuple[str, Optional[Dict[str, Any]]]] = None + request_failed_message: Optional[str] = None try: queue_data = json.loads(data["data"]) # Parse EmbeddingMsg from data @@ -241,6 +243,7 @@ async def on_dequeue(self, data: Optional[Dict[str, Any]]) -> Optional[Dict[str, if self._vikingdb.is_closing: logger.debug("Skip embedding dequeue during shutdown") self._merge_request_stats(embedding_msg.telemetry_id, processed=1) + self._record_request_success(embedding_msg) report_success = True return None @@ -248,6 +251,7 @@ async def on_dequeue(self, data: Optional[Dict[str, Any]]) -> Optional[Dict[str, if not isinstance(embedding_msg.message, str): logger.debug(f"Skipping non-string message type: {type(embedding_msg.message)}") self._merge_request_stats(embedding_msg.telemetry_id, processed=1) + self._record_request_success(embedding_msg) report_success = True return data @@ -266,6 +270,7 @@ async def on_dequeue(self, data: Optional[Dict[str, Any]]) -> Optional[Dict[str, report_success = True return None # No queue manager — cannot re-enqueue, drop with error + request_failed_message = "Circuit breaker open and no queue manager" report_error_args = ("Circuit breaker open and no queue manager", data) return None @@ -306,6 +311,7 @@ async def on_dequeue(self, data: Optional[Dict[str, Any]]) -> Optional[Dict[str, logger.critical(error_msg) self._circuit_breaker.record_failure(embed_err) self._merge_request_stats(embedding_msg.telemetry_id, error_count=1) + request_failed_message = error_msg report_error_args = (error_msg, data) return None @@ -324,6 +330,7 @@ async def on_dequeue(self, data: Optional[Dict[str, Any]]) -> Optional[Dict[str, logger.error(f"Failed to re-enqueue message: {requeue_err}") self._merge_request_stats(embedding_msg.telemetry_id, error_count=1) + request_failed_message = error_msg report_error_args = (error_msg, data) return None @@ -335,6 +342,7 @@ async def on_dequeue(self, data: Optional[Dict[str, Any]]) -> Optional[Dict[str, error_msg = f"Dense vector dimension mismatch: expected {self._vector_dim}, got {len(result.dense_vector)}" logger.error(error_msg) self._merge_request_stats(embedding_msg.telemetry_id, error_count=1) + request_failed_message = error_msg report_error_args = (error_msg, data) return None @@ -348,6 +356,7 @@ async def on_dequeue(self, data: Optional[Dict[str, Any]]) -> Optional[Dict[str, error_msg = "Embedder not initialized, skipping vector generation" logger.warning(error_msg) self._merge_request_stats(embedding_msg.telemetry_id, error_count=1) + request_failed_message = error_msg report_error_args = (error_msg, data) return None @@ -377,16 +386,19 @@ async def on_dequeue(self, data: Optional[Dict[str, Any]]) -> Optional[Dict[str, if self._vikingdb.is_closing: logger.debug(f"Skip embedding write during shutdown: {db_err}") self._merge_request_stats(embedding_msg.telemetry_id, processed=1) + self._record_request_success(embedding_msg) report_success = True return None logger.error(f"Failed to write to vector database: {db_err}") self._merge_request_stats(embedding_msg.telemetry_id, error_count=1) + request_failed_message = str(db_err) report_error_args = (str(db_err), data) return None except Exception as db_err: if self._vikingdb.is_closing: logger.debug(f"Skip embedding write during shutdown: {db_err}") self._merge_request_stats(embedding_msg.telemetry_id, processed=1) + self._record_request_success(embedding_msg) report_success = True return None logger.error(f"Failed to write to vector database: {db_err}") @@ -394,10 +406,12 @@ async def on_dequeue(self, data: Optional[Dict[str, Any]]) -> Optional[Dict[str, traceback.print_exc() self._merge_request_stats(embedding_msg.telemetry_id, error_count=1) + request_failed_message = str(db_err) report_error_args = (str(db_err), data) return None self._merge_request_stats(embedding_msg.telemetry_id, processed=1) + self._record_request_success(embedding_msg) report_success = True self._circuit_breaker.record_success() return inserted_data @@ -409,9 +423,12 @@ async def on_dequeue(self, data: Optional[Dict[str, Any]]) -> Optional[Dict[str, traceback.print_exc() if embedding_msg is not None: self._merge_request_stats(embedding_msg.telemetry_id, error_count=1) + request_failed_message = str(e) report_error_args = (str(e), data) return None finally: + if embedding_msg is not None and request_failed_message is not None: + self._record_request_failure(embedding_msg, request_failed_message) if embedding_msg and embedding_msg.semantic_msg_id: from openviking.storage.queuefs.embedding_tracker import EmbeddingTaskTracker @@ -424,3 +441,19 @@ async def on_dequeue(self, data: Optional[Dict[str, Any]]) -> Optional[Dict[str, self.report_error(*report_error_args) elif report_success: self.report_success() + + @staticmethod + def _record_request_success(embedding_msg: EmbeddingMsg) -> None: + tracker = get_request_wait_tracker() + if embedding_msg.semantic_msg_id: + tracker.record_embedding_processed(embedding_msg.telemetry_id) + else: + tracker.mark_embedding_done(embedding_msg.telemetry_id, embedding_msg.id) + + @staticmethod + def _record_request_failure(embedding_msg: EmbeddingMsg, message: str) -> None: + tracker = get_request_wait_tracker() + if embedding_msg.semantic_msg_id: + tracker.record_embedding_error(embedding_msg.telemetry_id, message) + else: + tracker.mark_embedding_failed(embedding_msg.telemetry_id, embedding_msg.id, message) diff --git a/openviking/storage/content_write.py b/openviking/storage/content_write.py index 2c5fc8ace..e71dcac78 100644 --- a/openviking/storage/content_write.py +++ b/openviking/storage/content_write.py @@ -14,6 +14,7 @@ from openviking.storage.transaction import get_lock_manager from openviking.storage.viking_fs import VikingFS from openviking.telemetry import get_current_telemetry +from openviking.telemetry.request_wait_tracker import get_request_wait_tracker from openviking.telemetry.resource_summary import build_queue_status_payload from openviking.utils.embedding_utils import vectorize_file from openviking_cli.exceptions import DeadlineExceededError, InvalidArgumentError, NotFoundError @@ -52,6 +53,7 @@ async def write( context_type = self._context_type_for_uri(normalized_uri) root_uri = await self._resolve_root_uri(normalized_uri, ctx=ctx) written_bytes = len(content.encode("utf-8")) + telemetry_id = get_current_telemetry().telemetry_id if context_type == "memory": return await self._write_memory_with_refresh( @@ -63,6 +65,7 @@ async def write( timeout=timeout, ctx=ctx, written_bytes=written_bytes, + telemetry_id=telemetry_id, ) lock_manager = get_lock_manager() @@ -78,6 +81,8 @@ async def write( temp_root_uri = "" lock_transferred = False try: + if wait and telemetry_id: + get_request_wait_tracker().register_request(telemetry_id) temp_root_uri, temp_target_uri = await self._prepare_temp_write( uri=normalized_uri, root_uri=root_uri, @@ -94,7 +99,11 @@ async def write( lifecycle_lock_handle_id=handle.id, ) lock_transferred = True - queue_status = await self._wait_for_queues(timeout=timeout) if wait else None + queue_status = ( + await self._wait_for_request(telemetry_id=telemetry_id, timeout=timeout) + if wait + else None + ) return { "uri": normalized_uri, "root_uri": root_uri, @@ -114,6 +123,9 @@ async def write( if not lock_transferred: await lock_manager.release(handle) raise + finally: + if wait and telemetry_id: + get_request_wait_tracker().cleanup(telemetry_id) def _validate_mode(self, mode: str) -> None: if mode not in {"replace", "append"}: @@ -237,11 +249,13 @@ async def _enqueue_semantic_refresh( agent_id=ctx.user.agent_id, role=ctx.role.value, skip_vectorization=False, - telemetry_id=telemetry.telemetry_id if telemetry.enabled else "", + telemetry_id=telemetry.telemetry_id, lifecycle_lock_handle_id=lifecycle_lock_handle_id, changes={"modified": [temp_target_uri]}, ) await semantic_queue.enqueue(msg) + if msg.telemetry_id: + get_request_wait_tracker().register_semantic_root(msg.telemetry_id, msg.id) async def _enqueue_memory_refresh( self, @@ -262,11 +276,13 @@ async def _enqueue_memory_refresh( agent_id=ctx.user.agent_id, role=ctx.role.value, skip_vectorization=False, - telemetry_id=telemetry.telemetry_id if telemetry.enabled else "", + telemetry_id=telemetry.telemetry_id, lifecycle_lock_handle_id=lifecycle_lock_handle_id, changes={"modified": [modified_uri]}, ) await semantic_queue.enqueue(msg) + if msg.telemetry_id: + get_request_wait_tracker().register_semantic_root(msg.telemetry_id, msg.id) async def _wait_for_queues(self, *, timeout: Optional[float]) -> Dict[str, Any]: queue_manager = get_queue_manager() @@ -276,6 +292,21 @@ async def _wait_for_queues(self, *, timeout: Optional[float]) -> Dict[str, Any]: raise DeadlineExceededError("queue processing", timeout) from exc return build_queue_status_payload(status) + async def _wait_for_request( + self, + *, + telemetry_id: str, + timeout: Optional[float], + ) -> Dict[str, Any]: + if not telemetry_id: + return await self._wait_for_queues(timeout=timeout) + tracker = get_request_wait_tracker() + try: + await tracker.wait_for_request(telemetry_id, timeout=timeout) + except TimeoutError as exc: + raise DeadlineExceededError("queue processing", timeout) from exc + return tracker.build_queue_status(telemetry_id) + async def _vectorize_single_file( self, uri: str, @@ -330,6 +361,7 @@ async def _write_memory_with_refresh( timeout: Optional[float], ctx: RequestContext, written_bytes: int, + telemetry_id: str, ) -> Dict[str, Any]: lock_manager = get_lock_manager() handle = lock_manager.create_handle() @@ -341,6 +373,8 @@ async def _write_memory_with_refresh( lock_transferred = False try: + if wait and telemetry_id: + get_request_wait_tracker().register_request(telemetry_id) await self._write_in_place(uri, content, mode=mode, ctx=ctx) await self._vectorize_single_file(uri, context_type="memory", ctx=ctx) await self._enqueue_memory_refresh( @@ -350,7 +384,11 @@ async def _write_memory_with_refresh( lifecycle_lock_handle_id=handle.id, ) lock_transferred = True - queue_status = await self._wait_for_queues(timeout=timeout) if wait else None + queue_status = ( + await self._wait_for_request(telemetry_id=telemetry_id, timeout=timeout) + if wait + else None + ) return { "uri": uri, "root_uri": root_uri, @@ -365,6 +403,9 @@ async def _write_memory_with_refresh( if not lock_transferred: await lock_manager.release(handle) raise + finally: + if wait and telemetry_id: + get_request_wait_tracker().cleanup(telemetry_id) async def _resolve_root_uri(self, uri: str, *, ctx: RequestContext) -> str: parsed = VikingURI(uri) diff --git a/openviking/storage/queuefs/semantic_dag.py b/openviking/storage/queuefs/semantic_dag.py index bbd7d6008..57c4d5e3b 100644 --- a/openviking/storage/queuefs/semantic_dag.py +++ b/openviking/storage/queuefs/semantic_dag.py @@ -8,6 +8,7 @@ from openviking.server.identity import RequestContext from openviking.storage.viking_fs import get_viking_fs +from openviking.telemetry.request_wait_tracker import get_request_wait_tracker from openviking_cli.utils import VikingURI from openviking_cli.utils.logger import get_logger @@ -75,6 +76,7 @@ def __init__( incremental_update: bool = False, target_uri: Optional[str] = None, semantic_msg_id: Optional[str] = None, + telemetry_id: str = "", recursive: bool = True, lifecycle_lock_handle_id: str = "", is_code_repo: bool = False, @@ -86,6 +88,7 @@ def __init__( self._incremental_update = incremental_update self._target_uri = target_uri self._semantic_msg_id = semantic_msg_id + self._telemetry_id = telemetry_id self._recursive = recursive self._lifecycle_lock_handle_id = lifecycle_lock_handle_id self._is_code_repo = is_code_repo @@ -168,6 +171,10 @@ async def wrapped_on_complete() -> None: try: if original_on_complete: await original_on_complete() + if self._telemetry_id and self._semantic_msg_id: + get_request_wait_tracker().mark_semantic_done( + self._telemetry_id, self._semantic_msg_id + ) finally: await self._release_lifecycle_lock() diff --git a/openviking/storage/queuefs/semantic_processor.py b/openviking/storage/queuefs/semantic_processor.py index a069c0bf9..2a6e2653a 100644 --- a/openviking/storage/queuefs/semantic_processor.py +++ b/openviking/storage/queuefs/semantic_processor.py @@ -28,6 +28,7 @@ from openviking.storage.queuefs.semantic_msg import SemanticMsg from openviking.storage.viking_fs import get_viking_fs from openviking.telemetry import bind_telemetry, resolve_telemetry +from openviking.telemetry.request_wait_tracker import get_request_wait_tracker from openviking.utils.circuit_breaker import ( CircuitBreaker, CircuitBreakerOpen, @@ -302,6 +303,7 @@ async def on_dequeue(self, data: Optional[Dict[str, Any]]) -> Optional[Dict[str, incremental_update=is_incremental, target_uri=msg.target_uri, semantic_msg_id=msg.id, + telemetry_id=msg.telemetry_id, recursive=msg.recursive, lifecycle_lock_handle_id=msg.lifecycle_lock_handle_id, is_code_repo=msg.is_code_repo, @@ -332,6 +334,9 @@ async def on_dequeue(self, data: Optional[Dict[str, Any]]) -> Optional[Dict[str, self._circuit_breaker.record_failure(e) if msg is not None: self._merge_request_stats(msg.telemetry_id, error_count=1) + get_request_wait_tracker().mark_semantic_failed( + msg.telemetry_id, msg.id, str(e) + ) self.report_error(str(e), data) else: # Transient or unknown — re-enqueue for retry @@ -346,6 +351,9 @@ async def on_dequeue(self, data: Optional[Dict[str, Any]]) -> Optional[Dict[str, except Exception as requeue_err: logger.error(f"Failed to re-enqueue semantic message: {requeue_err}") self._merge_request_stats(msg.telemetry_id, error_count=1) + get_request_wait_tracker().mark_semantic_failed( + msg.telemetry_id, msg.id, str(e) + ) self.report_error(str(e), data) return None self.report_success() @@ -411,11 +419,21 @@ async def _process_memory_directory(self, msg: SemanticMsg) -> None: dir_uri = msg.uri ctx = self._current_ctx llm_sem = asyncio.Semaphore(self.max_concurrent_llm) + request_wait_tracker = get_request_wait_tracker() + + def _mark_done() -> None: + if msg.telemetry_id and msg.id: + request_wait_tracker.mark_semantic_done(msg.telemetry_id, msg.id) + + def _mark_failed(message: str) -> None: + if msg.telemetry_id and msg.id: + request_wait_tracker.mark_semantic_failed(msg.telemetry_id, msg.id, message) try: entries = await viking_fs.ls(dir_uri, ctx=ctx) except Exception as e: logger.warning(f"Failed to list memory directory {dir_uri}: {e}") + _mark_failed(str(e)) if msg.lifecycle_lock_handle_id: await self._release_memory_lifecycle_lock(msg.lifecycle_lock_handle_id) return @@ -431,6 +449,7 @@ async def _process_memory_directory(self, msg: SemanticMsg) -> None: if not file_paths: logger.info(f"No memory files found in {dir_uri}") + _mark_done() if msg.lifecycle_lock_handle_id: await self._release_memory_lifecycle_lock(msg.lifecycle_lock_handle_id) return @@ -505,11 +524,25 @@ async def _gen(idx: int, file_path: str) -> None: logger.info(f"Generated abstract.md and overview.md for {dir_uri}") except Exception as e: logger.error(f"Failed to write abstract/overview for {dir_uri}: {e}") + _mark_failed(str(e)) if msg.lifecycle_lock_handle_id: await self._release_memory_lifecycle_lock(msg.lifecycle_lock_handle_id) return try: + if msg.telemetry_id and msg.id: + from openviking.storage.queuefs.embedding_tracker import EmbeddingTaskTracker + + async def _on_complete() -> None: + get_request_wait_tracker().mark_semantic_done(msg.telemetry_id, msg.id) + + tracker = EmbeddingTaskTracker.get_instance() + await tracker.register( + semantic_msg_id=msg.id, + total_count=2, + on_complete=_on_complete, + metadata={"uri": dir_uri}, + ) await self._vectorize_directory( uri=dir_uri, context_type="memory", diff --git a/openviking/storage/vectordb_adapters/base.py b/openviking/storage/vectordb_adapters/base.py index a831076fe..163e7b59b 100644 --- a/openviking/storage/vectordb_adapters/base.py +++ b/openviking/storage/vectordb_adapters/base.py @@ -114,7 +114,7 @@ def create_collection( self._collection_name = name self._index_name = index_name collection_meta = dict(schema) - scalar_index_fields = collection_meta.pop("ScalarIndex", []) + scalar_index_fields = collection_meta.get("ScalarIndex", []) if "CollectionName" not in collection_meta: collection_meta["CollectionName"] = name diff --git a/openviking/telemetry/context.py b/openviking/telemetry/context.py index 9fe154f3c..35528ee90 100644 --- a/openviking/telemetry/context.py +++ b/openviking/telemetry/context.py @@ -10,16 +10,19 @@ from .operation import OperationTelemetry -_NOOP_TELEMETRY = OperationTelemetry(operation="noop", enabled=False) -_CURRENT_TELEMETRY: contextvars.ContextVar[OperationTelemetry] = contextvars.ContextVar( +_CURRENT_TELEMETRY: contextvars.ContextVar[OperationTelemetry | None] = contextvars.ContextVar( "openviking_operation_telemetry", - default=_NOOP_TELEMETRY, + default=None, ) def get_current_telemetry() -> OperationTelemetry: - """Get current operation telemetry or disabled no-op collector.""" - return _CURRENT_TELEMETRY.get() + """Get current operation telemetry or create a request-local disabled collector.""" + telemetry = _CURRENT_TELEMETRY.get() + if telemetry is None: + telemetry = OperationTelemetry(operation="noop", enabled=False) + _CURRENT_TELEMETRY.set(telemetry) + return telemetry @contextmanager diff --git a/openviking/telemetry/execution.py b/openviking/telemetry/execution.py index 8eda9e1c3..18d85dcbd 100644 --- a/openviking/telemetry/execution.py +++ b/openviking/telemetry/execution.py @@ -8,12 +8,14 @@ from typing import Any, Awaitable, Callable, Generic, Optional, TypeVar from openviking_cli.exceptions import InvalidArgumentError +from openviking_cli.utils import get_logger from .context import bind_telemetry -from .operation import OperationTelemetry +from .operation import OperationTelemetry, TelemetrySnapshot from .request import TelemetryRequest, TelemetrySelection, normalize_telemetry_request T = TypeVar("T") +logger = get_logger(__name__) @dataclass @@ -34,21 +36,22 @@ def parse_telemetry_selection(telemetry: TelemetryRequest) -> TelemetrySelection def build_telemetry_payload( - collector: OperationTelemetry, + snapshot: TelemetrySnapshot | None, selection: TelemetrySelection, - *, - status: str = "ok", ) -> dict[str, Any] | None: - """Build a telemetry payload from a finished collector.""" - snapshot = collector.finish(status=status) - if snapshot is None: + """Build a telemetry payload from a finished snapshot.""" + if snapshot is None or not selection.include_payload: return None + return snapshot.to_dict(include_summary=selection.include_summary) - if not selection.include_payload: - return None - return snapshot.to_dict( - include_summary=selection.include_summary, +def _log_telemetry_summary(snapshot: TelemetrySnapshot | None) -> None: + if snapshot is None: + return + logger.info( + "Telemetry summary (id=%s): %s", + snapshot.telemetry_id, + snapshot.summary, ) @@ -91,13 +94,15 @@ async def run_with_telemetry( result = await fn() except Exception as exc: collector.set_error(operation, type(exc).__name__, str(exc)) - collector.finish(status=error_status) + snapshot = collector.finish(status=error_status) + _log_telemetry_summary(snapshot) raise + snapshot = collector.finish(status="ok") + _log_telemetry_summary(snapshot) telemetry_payload = build_telemetry_payload( - collector, + snapshot, selection, - status="ok", ) return TelemetryExecutionResult( result=result, diff --git a/openviking/telemetry/operation.py b/openviking/telemetry/operation.py index 520fd9543..08fdd32f3 100644 --- a/openviking/telemetry/operation.py +++ b/openviking/telemetry/operation.py @@ -281,7 +281,7 @@ def __init__( ): self.operation = operation self.enabled = enabled - self.telemetry_id = f"tm_{uuid4().hex}" if enabled else "" + self.telemetry_id = f"tm_{uuid4().hex}" self._start_time = time.perf_counter() self._counters: Dict[str, float] = defaultdict(float) self._gauges: Dict[str, Any] = {} diff --git a/openviking/telemetry/request_wait_tracker.py b/openviking/telemetry/request_wait_tracker.py new file mode 100644 index 000000000..69a122797 --- /dev/null +++ b/openviking/telemetry/request_wait_tracker.py @@ -0,0 +1,200 @@ +# Copyright (c) 2026 Beijing Volcano Engine Technology Co., Ltd. +# SPDX-License-Identifier: AGPL-3.0 +"""Request-scoped wait tracker for write APIs.""" + +from __future__ import annotations + +import asyncio +import threading +import time +from dataclasses import dataclass, field +from typing import Dict, List, Optional, Set + + +@dataclass +class _RequestWaitState: + pending_semantic_roots: Set[str] = field(default_factory=set) + pending_embedding_roots: Set[str] = field(default_factory=set) + semantic_processed: int = 0 + semantic_error_count: int = 0 + semantic_errors: List[str] = field(default_factory=list) + embedding_processed: int = 0 + embedding_error_count: int = 0 + embedding_errors: List[str] = field(default_factory=list) + created_at: float = field(default_factory=time.time) + + +class RequestWaitTracker: + """Track request-scoped queue completion using telemetry_id.""" + + _instance: Optional["RequestWaitTracker"] = None + + def __new__(cls) -> "RequestWaitTracker": + if cls._instance is None: + cls._instance = super().__new__(cls) + return cls._instance + + def __init__(self) -> None: + if hasattr(self, "_lock"): + return + self._lock = threading.Lock() + self._states: Dict[str, _RequestWaitState] = {} + + @classmethod + def get_instance(cls) -> "RequestWaitTracker": + return cls() + + def _create_state(self, telemetry_id: str) -> Optional[_RequestWaitState]: + if not telemetry_id: + return None + with self._lock: + return self._states.setdefault(telemetry_id, _RequestWaitState()) + + def register_request(self, telemetry_id: str) -> None: + self._create_state(telemetry_id) + + def register_semantic_root(self, telemetry_id: str, semantic_msg_id: str) -> None: + if not telemetry_id or not semantic_msg_id: + return + with self._lock: + state = self._states.get(telemetry_id) + if state is None: + return + state.pending_semantic_roots.add(semantic_msg_id) + + def register_embedding_root(self, telemetry_id: str, root_id: str) -> None: + if not telemetry_id or not root_id: + return + with self._lock: + state = self._states.get(telemetry_id) + if state is None: + return + state.pending_embedding_roots.add(root_id) + + def record_embedding_processed(self, telemetry_id: str, delta: int = 1) -> None: + if not telemetry_id: + return + with self._lock: + state = self._states.get(telemetry_id) + if state is None: + return + state.embedding_processed += max(delta, 0) + + def record_embedding_error(self, telemetry_id: str, message: str) -> None: + if not telemetry_id: + return + with self._lock: + state = self._states.get(telemetry_id) + if state is None: + return + state.embedding_error_count += 1 + if message: + state.embedding_errors.append(message) + + def mark_semantic_done( + self, + telemetry_id: str, + semantic_msg_id: str, + processed_delta: int = 1, + ) -> None: + if not telemetry_id: + return + with self._lock: + state = self._states.get(telemetry_id) + if state is None: + return + state.pending_semantic_roots.discard(semantic_msg_id) + state.semantic_processed += max(processed_delta, 0) + + def mark_semantic_failed(self, telemetry_id: str, semantic_msg_id: str, message: str) -> None: + if not telemetry_id: + return + with self._lock: + state = self._states.get(telemetry_id) + if state is None: + return + state.pending_semantic_roots.discard(semantic_msg_id) + state.semantic_error_count += 1 + if message: + state.semantic_errors.append(message) + + def mark_embedding_done( + self, + telemetry_id: str, + root_id: str, + processed_delta: int = 1, + ) -> None: + if not telemetry_id: + return + with self._lock: + state = self._states.get(telemetry_id) + if state is None: + return + state.pending_embedding_roots.discard(root_id) + state.embedding_processed += max(processed_delta, 0) + + def mark_embedding_failed(self, telemetry_id: str, root_id: str, message: str) -> None: + if not telemetry_id: + return + with self._lock: + state = self._states.get(telemetry_id) + if state is None: + return + state.pending_embedding_roots.discard(root_id) + state.embedding_error_count += 1 + if message: + state.embedding_errors.append(message) + + def is_complete(self, telemetry_id: str) -> bool: + if not telemetry_id: + return True + with self._lock: + state = self._states.get(telemetry_id) + if state is None: + return True + return not state.pending_semantic_roots and not state.pending_embedding_roots + + async def wait_for_request( + self, + telemetry_id: str, + timeout: Optional[float] = None, + poll_interval: float = 0.05, + ) -> None: + if not telemetry_id: + return + start = time.time() + while True: + if self.is_complete(telemetry_id): + return + if timeout is not None and (time.time() - start) > timeout: + raise TimeoutError(f"Request processing not complete after {timeout}s") + await asyncio.sleep(poll_interval) + + def build_queue_status(self, telemetry_id: str) -> Dict[str, Dict[str, object]]: + with self._lock: + state = self._states.get(telemetry_id) or _RequestWaitState() + return { + "Semantic": { + "processed": state.semantic_processed, + "error_count": state.semantic_error_count, + "errors": [{"message": msg} for msg in state.semantic_errors], + }, + "Embedding": { + "processed": state.embedding_processed, + "error_count": state.embedding_error_count, + "errors": [{"message": msg} for msg in state.embedding_errors], + }, + } + + def cleanup(self, telemetry_id: str) -> None: + if not telemetry_id: + return + with self._lock: + self._states.pop(telemetry_id, None) + + +def get_request_wait_tracker() -> RequestWaitTracker: + return RequestWaitTracker.get_instance() + + +__all__ = ["RequestWaitTracker", "get_request_wait_tracker"] diff --git a/openviking/telemetry/resource_summary.py b/openviking/telemetry/resource_summary.py index 519799c96..a43a996f9 100644 --- a/openviking/telemetry/resource_summary.py +++ b/openviking/telemetry/resource_summary.py @@ -41,9 +41,10 @@ def _consume_semantic_dag_stats(telemetry_id: str, root_uri: str | None): def register_wait_telemetry(wait: bool) -> str: """Register current telemetry collector for async queue consumers when needed.""" handle = get_current_telemetry() - if not wait or not handle.enabled: + if not wait or not handle.telemetry_id: return "" - register_telemetry(handle) + if handle.enabled: + register_telemetry(handle) return handle.telemetry_id @@ -76,6 +77,11 @@ def _resolve_queue_group( } if fallback_status is None: return {"processed": 0, "error_count": 0} + if isinstance(fallback_status, dict): + return { + "processed": int(fallback_status.get("processed", 0) or 0), + "error_count": int(fallback_status.get("error_count", 0) or 0), + } return { "processed": fallback_status.processed, "error_count": fallback_status.error_count, diff --git a/openviking/utils/skill_processor.py b/openviking/utils/skill_processor.py index d95be3cce..458608ca2 100644 --- a/openviking/utils/skill_processor.py +++ b/openviking/utils/skill_processor.py @@ -21,6 +21,7 @@ from openviking.storage.queuefs.embedding_msg_converter import EmbeddingMsgConverter from openviking.storage.viking_fs import VikingFS from openviking.telemetry import get_current_telemetry +from openviking.telemetry.request_wait_tracker import get_request_wait_tracker from openviking.utils.zip_safe import safe_extract_zip from openviking_cli.utils import get_logger from openviking_cli.utils.config import get_openviking_config @@ -266,4 +267,8 @@ async def _index_skill(self, context: Context, skill_dir_uri: str): context.set_vectorize(Vectorize(text=context.abstract)) embedding_msg = EmbeddingMsgConverter.from_context(context) if embedding_msg: - await self.vikingdb.enqueue_embedding_msg(embedding_msg) + enqueued = await self.vikingdb.enqueue_embedding_msg(embedding_msg) + if enqueued and embedding_msg.telemetry_id: + get_request_wait_tracker().register_embedding_root( + embedding_msg.telemetry_id, embedding_msg.id + ) diff --git a/openviking/utils/summarizer.py b/openviking/utils/summarizer.py index 4e6129668..ca3c601f2 100644 --- a/openviking/utils/summarizer.py +++ b/openviking/utils/summarizer.py @@ -10,6 +10,7 @@ from openviking.core.directories import get_context_type_for_uri from openviking.storage.queuefs import SemanticMsg, get_queue_manager from openviking.telemetry import get_current_telemetry +from openviking.telemetry.request_wait_tracker import get_request_wait_tracker from openviking_cli.utils import get_logger if TYPE_CHECKING: @@ -56,7 +57,7 @@ async def summarize( enqueued_count = 0 telemetry = get_current_telemetry() - for uri, temp_uri in zip(resource_uris, temp_uris, strict=False): + for uri, temp_uri in zip(resource_uris, temp_uris, strict=True): # Determine context_type based on URI context_type = get_context_type_for_uri(uri) @@ -68,12 +69,14 @@ async def summarize( agent_id=ctx.user.agent_id, role=ctx.role.value, skip_vectorization=skip_vectorization, - telemetry_id=telemetry.telemetry_id if telemetry.enabled else "", + telemetry_id=telemetry.telemetry_id, target_uri=uri if uri != temp_uri else None, lifecycle_lock_handle_id=lifecycle_lock_handle_id, is_code_repo=kwargs.get("is_code_repo", False), ) await semantic_queue.enqueue(msg) + if msg.telemetry_id: + get_request_wait_tracker().register_semantic_root(msg.telemetry_id, msg.id) enqueued_count += 1 logger.info( f"Enqueued semantic generation for: {uri} (skip_vectorization={skip_vectorization})" diff --git a/tests/client/test_filesystem.py b/tests/client/test_filesystem.py index 82ed1e66e..3b4bcb3d4 100644 --- a/tests/client/test_filesystem.py +++ b/tests/client/test_filesystem.py @@ -3,11 +3,16 @@ """Filesystem operation tests""" +from types import SimpleNamespace from unittest.mock import AsyncMock import pytest from openviking import AsyncOpenViking, OpenViking +from openviking.client import LocalClient +from openviking.server.identity import RequestContext, Role +from openviking.telemetry import get_current_telemetry +from openviking_cli.session.user_id import UserIdentifier class TestLs: @@ -70,6 +75,39 @@ async def test_read_nonexistent_file(self, client: AsyncOpenViking): with pytest.raises(Exception): # noqa: B017 await client.read("viking://nonexistent/file.txt") + async def test_write_with_wait_returns_queue_status(self): + """Test local SDK write(wait=True) preserves queue_status and binds telemetry.""" + queue_status = { + "Semantic": {"processed": 1, "error_count": 0, "errors": []}, + "Embedding": {"processed": 0, "error_count": 0, "errors": []}, + } + seen: dict[str, object] = {} + + async def _fake_write(**kwargs): + telemetry = get_current_telemetry() + seen["enabled"] = telemetry.enabled + seen["telemetry_id"] = telemetry.telemetry_id + seen["kwargs"] = kwargs + return {"uri": kwargs["uri"], "queue_status": queue_status} + + client = LocalClient.__new__(LocalClient) + client._ctx = RequestContext(user=UserIdentifier.the_default_user(), role=Role.USER) + client._service = SimpleNamespace(fs=SimpleNamespace(write=_fake_write)) + + result = await LocalClient.write( + client, + uri="viking://resources/demo.md", + content="Updated from client test", + wait=True, + telemetry=False, + ) + + assert result["uri"] == "viking://resources/demo.md" + assert result["queue_status"] == queue_status + assert seen["enabled"] is True + assert str(seen["telemetry_id"]).startswith("tm_") + assert seen["kwargs"]["wait"] is True + class TestAbstract: """Test abstract operation""" diff --git a/tests/client/test_resource_management.py b/tests/client/test_resource_management.py index 4c44b7d69..1015c641b 100644 --- a/tests/client/test_resource_management.py +++ b/tests/client/test_resource_management.py @@ -4,9 +4,14 @@ """Resource management tests""" from pathlib import Path +from types import SimpleNamespace from unittest.mock import AsyncMock, patch from openviking import AsyncOpenViking +from openviking.client import LocalClient +from openviking.server.identity import RequestContext, Role +from openviking.telemetry import get_current_telemetry +from openviking_cli.session.user_id import UserIdentifier class TestAddResource: @@ -33,6 +38,44 @@ async def test_add_resource_with_wait( assert "root_uri" in result assert "queue_status" in result + async def test_local_client_add_resource_with_wait_preserves_queue_status(self): + """Local SDK add_resource(wait=True) should keep queue_status and internal telemetry.""" + queue_status = { + "Semantic": {"processed": 1, "error_count": 0, "errors": []}, + "Embedding": {"processed": 2, "error_count": 0, "errors": []}, + } + seen: dict[str, object] = {} + + async def _fake_add_resource(**kwargs): + telemetry = get_current_telemetry() + seen["enabled"] = telemetry.enabled + seen["telemetry_id"] = telemetry.telemetry_id + seen["kwargs"] = kwargs + return { + "root_uri": "viking://resources/demo", + "queue_status": queue_status, + } + + client = LocalClient.__new__(LocalClient) + client._ctx = RequestContext(user=UserIdentifier.the_default_user(), role=Role.USER) + client._service = SimpleNamespace( + resources=SimpleNamespace(add_resource=_fake_add_resource) + ) + + result = await LocalClient.add_resource( + client, + path="/tmp/demo.md", + reason="Test resource", + wait=True, + telemetry=False, + ) + + assert result["root_uri"] == "viking://resources/demo" + assert result["queue_status"] == queue_status + assert seen["enabled"] is True + assert str(seen["telemetry_id"]).startswith("tm_") + assert seen["kwargs"]["wait"] is True + async def test_add_resource_without_wait( self, client: AsyncOpenViking, sample_markdown_file: Path ): diff --git a/tests/client/test_skill_management.py b/tests/client/test_skill_management.py index 15e864126..c13c990c1 100644 --- a/tests/client/test_skill_management.py +++ b/tests/client/test_skill_management.py @@ -4,8 +4,13 @@ """Skill management tests""" from pathlib import Path +from types import SimpleNamespace from openviking import AsyncOpenViking +from openviking.client import LocalClient +from openviking.server.identity import RequestContext, Role +from openviking.telemetry import get_current_telemetry +from openviking_cli.session.user_id import UserIdentifier class TestAddSkill: @@ -63,6 +68,47 @@ async def test_add_skill_from_string(self, client: AsyncOpenViking): assert "uri" in result assert "viking://agent/skills/" in result["uri"] + async def test_add_skill_with_wait_returns_queue_status(self, client: AsyncOpenViking): + """Test local SDK add_skill(wait=True) preserves queue_status and binds telemetry.""" + del client + queue_status = { + "Semantic": {"processed": 0, "error_count": 0, "errors": []}, + "Embedding": {"processed": 1, "error_count": 0, "errors": []}, + } + seen: dict[str, object] = {} + + async def _fake_add_skill(**kwargs): + telemetry = get_current_telemetry() + seen["enabled"] = telemetry.enabled + seen["telemetry_id"] = telemetry.telemetry_id + seen["kwargs"] = kwargs + return { + "uri": "viking://agent/skills/waited-skill", + "queue_status": queue_status, + } + + local_client = LocalClient.__new__(LocalClient) + local_client._ctx = RequestContext( + user=UserIdentifier.the_default_user(), + role=Role.USER, + ) + local_client._service = SimpleNamespace( + resources=SimpleNamespace(add_skill=_fake_add_skill) + ) + + result = await LocalClient.add_skill( + local_client, + data={"name": "waited-skill", "content": "# Waited Skill"}, + wait=True, + telemetry=False, + ) + + assert result["uri"] == "viking://agent/skills/waited-skill" + assert result["queue_status"] == queue_status + assert seen["enabled"] is True + assert str(seen["telemetry_id"]).startswith("tm_") + assert seen["kwargs"]["wait"] is True + async def test_add_skill_from_mcp_tool(self, client: AsyncOpenViking): """Test adding skill from MCP Tool format""" mcp_tool = { diff --git a/tests/server/test_request_wait_tracking.py b/tests/server/test_request_wait_tracking.py new file mode 100644 index 000000000..f9b9d4b50 --- /dev/null +++ b/tests/server/test_request_wait_tracking.py @@ -0,0 +1,369 @@ +# Copyright (c) 2026 Beijing Volcano Engine Technology Co., Ltd. +# SPDX-License-Identifier: AGPL-3.0 + +"""Tests for request-scoped wait behavior on write APIs.""" + +from types import SimpleNamespace + +import pytest + +from openviking.server.identity import RequestContext, Role +from openviking.storage.content_write import ContentWriteCoordinator +from openviking.telemetry.context import bind_telemetry +from openviking.telemetry.operation import OperationTelemetry +from openviking_cli.session.user_id import UserIdentifier + + +class _FakeRequestWaitTracker: + def __init__(self, queue_status): + self.queue_status = queue_status + self.registered_requests = [] + self.wait_calls = [] + self.build_calls = [] + self.cleaned = [] + + def register_request(self, telemetry_id: str) -> None: + self.registered_requests.append(telemetry_id) + + async def wait_for_request(self, telemetry_id: str, timeout): + self.wait_calls.append((telemetry_id, timeout)) + + def build_queue_status(self, telemetry_id: str): + self.build_calls.append(telemetry_id) + return self.queue_status + + def cleanup(self, telemetry_id: str) -> None: + self.cleaned.append(telemetry_id) + + +class _ExplodingQueueManager: + async def wait_complete(self, *args, **kwargs): + raise AssertionError("global queue wait should not be used") + + +class _FakeVikingFS: + def __init__(self, file_uri: str, root_uri: str): + self._file_uri = file_uri + self._root_uri = root_uri + + async def stat(self, uri: str, ctx=None): + del ctx + if uri == self._file_uri: + return {"isDir": False} + if uri == self._root_uri: + return {"isDir": True} + raise AssertionError(f"unexpected stat uri: {uri}") + + def _uri_to_path(self, uri: str, ctx=None): + del ctx + return f"/fake/{uri.replace('://', '/').strip('/')}" + + async def delete_temp(self, temp_uri: str, ctx=None): + del temp_uri, ctx + return None + + +@pytest.mark.asyncio +async def test_add_resource_wait_uses_request_tracker(service, monkeypatch): + tracker = _FakeRequestWaitTracker( + { + "Semantic": {"processed": 1, "error_count": 0, "errors": []}, + "Embedding": {"processed": 2, "error_count": 0, "errors": []}, + } + ) + ctx = RequestContext(user=service.user, role=Role.ROOT) + telemetry = OperationTelemetry(operation="resources.add_resource", enabled=True) + + async def _fake_process_resource(**kwargs): + del kwargs + return {"status": "success", "root_uri": "viking://resources/demo"} + + monkeypatch.setattr( + service.resources._resource_processor, "process_resource", _fake_process_resource + ) + monkeypatch.setattr( + "openviking.service.resource_service.get_queue_manager", + lambda: _ExplodingQueueManager(), + ) + monkeypatch.setattr( + "openviking.service.resource_service.get_request_wait_tracker", + lambda: tracker, + raising=False, + ) + + with bind_telemetry(telemetry): + result = await service.resources.add_resource( + path="/tmp/demo.md", + ctx=ctx, + reason="request wait test", + wait=True, + timeout=12.0, + ) + + assert result["queue_status"] == tracker.queue_status + assert tracker.registered_requests == [telemetry.telemetry_id] + assert tracker.wait_calls == [(telemetry.telemetry_id, 12.0)] + assert tracker.build_calls == [telemetry.telemetry_id] + assert tracker.cleaned == [telemetry.telemetry_id] + + +@pytest.mark.asyncio +async def test_add_resource_wait_uses_request_tracker_when_telemetry_disabled(service, monkeypatch): + tracker = _FakeRequestWaitTracker( + { + "Semantic": {"processed": 1, "error_count": 0, "errors": []}, + "Embedding": {"processed": 2, "error_count": 0, "errors": []}, + } + ) + ctx = RequestContext(user=service.user, role=Role.ROOT) + telemetry = OperationTelemetry(operation="resources.add_resource", enabled=False) + + async def _fake_process_resource(**kwargs): + del kwargs + return {"status": "success", "root_uri": "viking://resources/demo"} + + monkeypatch.setattr( + service.resources._resource_processor, "process_resource", _fake_process_resource + ) + monkeypatch.setattr( + "openviking.service.resource_service.get_queue_manager", + lambda: _ExplodingQueueManager(), + ) + monkeypatch.setattr( + "openviking.service.resource_service.get_request_wait_tracker", + lambda: tracker, + raising=False, + ) + + with bind_telemetry(telemetry): + result = await service.resources.add_resource( + path="/tmp/demo.md", + ctx=ctx, + reason="request wait test", + wait=True, + timeout=12.0, + ) + + assert result["queue_status"] == tracker.queue_status + assert tracker.registered_requests == [telemetry.telemetry_id] + assert tracker.wait_calls == [(telemetry.telemetry_id, 12.0)] + assert tracker.build_calls == [telemetry.telemetry_id] + assert tracker.cleaned == [telemetry.telemetry_id] + + +@pytest.mark.asyncio +async def test_add_skill_wait_uses_request_tracker(service, monkeypatch): + tracker = _FakeRequestWaitTracker( + { + "Semantic": {"processed": 0, "error_count": 0, "errors": []}, + "Embedding": {"processed": 1, "error_count": 0, "errors": []}, + } + ) + ctx = RequestContext(user=service.user, role=Role.ROOT) + telemetry = OperationTelemetry(operation="resources.add_skill", enabled=True) + + async def _fake_process_skill(**kwargs): + del kwargs + return {"status": "success", "uri": "viking://agent/skills/demo", "name": "demo"} + + monkeypatch.setattr(service.resources._skill_processor, "process_skill", _fake_process_skill) + monkeypatch.setattr( + "openviking.service.resource_service.get_queue_manager", + lambda: _ExplodingQueueManager(), + ) + monkeypatch.setattr( + "openviking.service.resource_service.get_request_wait_tracker", + lambda: tracker, + raising=False, + ) + + with bind_telemetry(telemetry): + result = await service.resources.add_skill( + data={"name": "demo", "content": "# Demo"}, + ctx=ctx, + wait=True, + timeout=9.0, + ) + + assert result["queue_status"] == tracker.queue_status + assert tracker.registered_requests == [telemetry.telemetry_id] + assert tracker.wait_calls == [(telemetry.telemetry_id, 9.0)] + assert tracker.build_calls == [telemetry.telemetry_id] + assert tracker.cleaned == [telemetry.telemetry_id] + + +@pytest.mark.asyncio +async def test_add_skill_wait_uses_request_tracker_when_telemetry_disabled(service, monkeypatch): + tracker = _FakeRequestWaitTracker( + { + "Semantic": {"processed": 0, "error_count": 0, "errors": []}, + "Embedding": {"processed": 1, "error_count": 0, "errors": []}, + } + ) + ctx = RequestContext(user=service.user, role=Role.ROOT) + telemetry = OperationTelemetry(operation="resources.add_skill", enabled=False) + + async def _fake_process_skill(**kwargs): + del kwargs + return {"status": "success", "uri": "viking://agent/skills/demo", "name": "demo"} + + monkeypatch.setattr(service.resources._skill_processor, "process_skill", _fake_process_skill) + monkeypatch.setattr( + "openviking.service.resource_service.get_queue_manager", + lambda: _ExplodingQueueManager(), + ) + monkeypatch.setattr( + "openviking.service.resource_service.get_request_wait_tracker", + lambda: tracker, + raising=False, + ) + + with bind_telemetry(telemetry): + result = await service.resources.add_skill( + data={"name": "demo", "content": "# Demo"}, + ctx=ctx, + wait=True, + timeout=9.0, + ) + + assert result["queue_status"] == tracker.queue_status + assert tracker.registered_requests == [telemetry.telemetry_id] + assert tracker.wait_calls == [(telemetry.telemetry_id, 9.0)] + assert tracker.build_calls == [telemetry.telemetry_id] + assert tracker.cleaned == [telemetry.telemetry_id] + + +@pytest.mark.asyncio +async def test_content_write_wait_uses_request_tracker(monkeypatch): + file_uri = "viking://resources/demo/doc.md" + root_uri = "viking://resources/demo" + ctx = RequestContext(user=UserIdentifier.the_default_user(), role=Role.USER) + telemetry = OperationTelemetry(operation="content.write", enabled=True) + tracker = _FakeRequestWaitTracker( + { + "Semantic": {"processed": 1, "error_count": 0, "errors": []}, + "Embedding": {"processed": 0, "error_count": 0, "errors": []}, + } + ) + coordinator = ContentWriteCoordinator( + viking_fs=_FakeVikingFS(file_uri=file_uri, root_uri=root_uri) + ) + lock_manager = SimpleNamespace( + create_handle=lambda: SimpleNamespace(id="lock-1"), + acquire_subtree=lambda handle, path: _return_true(handle, path), + release=lambda handle: _return_none(handle), + ) + + monkeypatch.setattr( + "openviking.storage.content_write.get_lock_manager", + lambda: lock_manager, + ) + monkeypatch.setattr( + "openviking.storage.content_write.get_request_wait_tracker", + lambda: tracker, + raising=False, + ) + + async def _fake_prepare_temp_write(**kwargs): + del kwargs + return "viking://temp/demo", "viking://temp/demo/doc.md" + + async def _fake_enqueue_semantic_refresh(**kwargs): + del kwargs + return None + + async def _explode_wait_for_queues(*, timeout): + del timeout + raise AssertionError("global queue wait should not be used") + + monkeypatch.setattr(coordinator, "_prepare_temp_write", _fake_prepare_temp_write) + monkeypatch.setattr(coordinator, "_enqueue_semantic_refresh", _fake_enqueue_semantic_refresh) + monkeypatch.setattr(coordinator, "_wait_for_queues", _explode_wait_for_queues) + + with bind_telemetry(telemetry): + result = await coordinator.write( + uri=file_uri, + content="updated", + ctx=ctx, + wait=True, + timeout=5.0, + ) + + assert result["queue_status"] == tracker.queue_status + assert tracker.registered_requests == [telemetry.telemetry_id] + assert tracker.wait_calls == [(telemetry.telemetry_id, 5.0)] + assert tracker.build_calls == [telemetry.telemetry_id] + assert tracker.cleaned == [telemetry.telemetry_id] + + +@pytest.mark.asyncio +async def test_content_write_wait_uses_request_tracker_when_telemetry_disabled(monkeypatch): + file_uri = "viking://resources/demo/doc.md" + root_uri = "viking://resources/demo" + ctx = RequestContext(user=UserIdentifier.the_default_user(), role=Role.USER) + telemetry = OperationTelemetry(operation="content.write", enabled=False) + tracker = _FakeRequestWaitTracker( + { + "Semantic": {"processed": 1, "error_count": 0, "errors": []}, + "Embedding": {"processed": 0, "error_count": 0, "errors": []}, + } + ) + coordinator = ContentWriteCoordinator( + viking_fs=_FakeVikingFS(file_uri=file_uri, root_uri=root_uri) + ) + lock_manager = SimpleNamespace( + create_handle=lambda: SimpleNamespace(id="lock-1"), + acquire_subtree=lambda handle, path: _return_true(handle, path), + release=lambda handle: _return_none(handle), + ) + + monkeypatch.setattr( + "openviking.storage.content_write.get_lock_manager", + lambda: lock_manager, + ) + monkeypatch.setattr( + "openviking.storage.content_write.get_request_wait_tracker", + lambda: tracker, + raising=False, + ) + + async def _fake_prepare_temp_write(**kwargs): + del kwargs + return "viking://temp/demo", "viking://temp/demo/doc.md" + + async def _fake_enqueue_semantic_refresh(**kwargs): + del kwargs + return None + + async def _explode_wait_for_queues(*, timeout): + del timeout + raise AssertionError("global queue wait should not be used") + + monkeypatch.setattr(coordinator, "_prepare_temp_write", _fake_prepare_temp_write) + monkeypatch.setattr(coordinator, "_enqueue_semantic_refresh", _fake_enqueue_semantic_refresh) + monkeypatch.setattr(coordinator, "_wait_for_queues", _explode_wait_for_queues) + + with bind_telemetry(telemetry): + result = await coordinator.write( + uri=file_uri, + content="updated", + ctx=ctx, + wait=True, + timeout=5.0, + ) + + assert result["queue_status"] == tracker.queue_status + assert tracker.registered_requests == [telemetry.telemetry_id] + assert tracker.wait_calls == [(telemetry.telemetry_id, 5.0)] + assert tracker.build_calls == [telemetry.telemetry_id] + assert tracker.cleaned == [telemetry.telemetry_id] + + +async def _return_true(handle, path): + del handle, path + return True + + +async def _return_none(handle): + del handle + return None diff --git a/tests/telemetry/test_request_wait_tracker.py b/tests/telemetry/test_request_wait_tracker.py new file mode 100644 index 000000000..9e5850e1e --- /dev/null +++ b/tests/telemetry/test_request_wait_tracker.py @@ -0,0 +1,38 @@ +# Copyright (c) 2026 Beijing Volcano Engine Technology Co., Ltd. +# SPDX-License-Identifier: AGPL-3.0 + +from openviking.telemetry.request_wait_tracker import RequestWaitTracker + + +def test_request_wait_tracker_cleanup_prevents_state_recreation(): + tracker = RequestWaitTracker() + telemetry_id = "tm_cleanup" + + tracker.register_request(telemetry_id) + tracker.register_semantic_root(telemetry_id, "semantic-1") + tracker.cleanup(telemetry_id) + + tracker.mark_semantic_done(telemetry_id, "semantic-1") + tracker.mark_embedding_done(telemetry_id, "embedding-1") + + assert tracker.build_queue_status(telemetry_id) == { + "Semantic": {"processed": 0, "error_count": 0, "errors": []}, + "Embedding": {"processed": 0, "error_count": 0, "errors": []}, + } + + +def test_request_wait_tracker_cleanup_prevents_root_recreation(): + tracker = RequestWaitTracker() + telemetry_id = "tm_late_root" + + tracker.register_request(telemetry_id) + tracker.cleanup(telemetry_id) + + tracker.register_semantic_root(telemetry_id, "semantic-1") + tracker.register_embedding_root(telemetry_id, "embedding-1") + + assert tracker.is_complete(telemetry_id) is True + assert tracker.build_queue_status(telemetry_id) == { + "Semantic": {"processed": 0, "error_count": 0, "errors": []}, + "Embedding": {"processed": 0, "error_count": 0, "errors": []}, + } diff --git a/tests/test_telemetry_runtime.py b/tests/test_telemetry_runtime.py index 1973103cc..7c6dbe7fb 100644 --- a/tests/test_telemetry_runtime.py +++ b/tests/test_telemetry_runtime.py @@ -75,6 +75,13 @@ def test_telemetry_summary_breaks_down_llm_and_embedding_token_usage(): assert "errors" not in summary +def test_disabled_telemetry_still_has_request_id(): + telemetry = MemoryOperationTelemetry(operation="resources.add_resource", enabled=False) + + assert telemetry.telemetry_id + assert telemetry.telemetry_id.startswith("tm_") + + def test_telemetry_summary_uses_simplified_internal_metric_keys(): summary = MemoryOperationTelemetry( operation="search.find",