Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
76 changes: 50 additions & 26 deletions openviking/service/resource_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
from openviking.storage.queuefs import get_queue_manager
from openviking.storage.viking_fs import VikingFS
from openviking.telemetry import get_current_telemetry
from openviking.telemetry.request_wait_tracker import get_request_wait_tracker
from openviking.telemetry.resource_summary import (
build_queue_status_payload,
record_resource_wait_metrics,
Expand Down Expand Up @@ -150,6 +151,9 @@ async def add_resource(
request_start = time.perf_counter()
telemetry = get_current_telemetry()
telemetry_id = register_wait_telemetry(wait)
request_wait_tracker = get_request_wait_tracker()
if wait and telemetry_id:
request_wait_tracker.register_request(telemetry_id)
watch_manager = self._get_watch_manager()
watch_enabled = bool(
watch_manager and to and not skip_watch_management and watch_interval > 0
Expand Down Expand Up @@ -194,11 +198,19 @@ async def add_resource(
)

if wait:
qm = get_queue_manager()
wait_start = time.perf_counter()
try:
with telemetry.measure("resource.wait"):
status = await qm.wait_complete(timeout=timeout)
if telemetry_id:
await request_wait_tracker.wait_for_request(
telemetry_id, timeout=timeout
)
status = request_wait_tracker.build_queue_status(telemetry_id)
else:
qm = get_queue_manager()
status = build_queue_status_payload(
await qm.wait_complete(timeout=timeout)
)
except TimeoutError as exc:
telemetry.set_error(
"resource_service.wait_complete",
Expand All @@ -207,7 +219,7 @@ async def add_resource(
)
raise DeadlineExceededError("queue processing", timeout) from exc
queue_wait_duration_ms = round((time.perf_counter() - wait_start) * 1000, 3)
result["queue_status"] = build_queue_status_payload(status)
result["queue_status"] = status
record_resource_wait_metrics(
telemetry_id=telemetry_id,
queue_status=status,
Expand Down Expand Up @@ -257,6 +269,7 @@ async def add_resource(
"resource.request.duration_ms",
round((time.perf_counter() - request_start) * 1000, 3),
)
get_request_wait_tracker().cleanup(telemetry_id)
unregister_wait_telemetry(telemetry_id)

async def _handle_watch_task_creation(
Expand Down Expand Up @@ -392,33 +405,44 @@ async def add_skill(
Processing result
"""
self._ensure_initialized()
telemetry_id = get_current_telemetry().telemetry_id
request_wait_tracker = get_request_wait_tracker()
if wait and telemetry_id:
request_wait_tracker.register_request(telemetry_id)

result = await self._skill_processor.process_skill(
data=data,
viking_fs=self._viking_fs,
ctx=ctx,
allow_local_path_resolution=allow_local_path_resolution,
)
try:
result = await self._skill_processor.process_skill(
data=data,
viking_fs=self._viking_fs,
ctx=ctx,
allow_local_path_resolution=allow_local_path_resolution,
)

if wait:
qm = get_queue_manager()
wait_start = time.perf_counter()
try:
status = await qm.wait_complete(timeout=timeout)
except TimeoutError as exc:
get_current_telemetry().set_error(
"resource_service.wait_complete",
"DEADLINE_EXCEEDED",
str(exc),
if wait:
wait_start = time.perf_counter()
try:
if telemetry_id:
await request_wait_tracker.wait_for_request(telemetry_id, timeout=timeout)
status = request_wait_tracker.build_queue_status(telemetry_id)
else:
qm = get_queue_manager()
status = build_queue_status_payload(await qm.wait_complete(timeout=timeout))
except TimeoutError as exc:
get_current_telemetry().set_error(
"resource_service.wait_complete",
"DEADLINE_EXCEEDED",
str(exc),
)
raise DeadlineExceededError("queue processing", timeout) from exc
get_current_telemetry().set(
"queue.wait.duration_ms",
round((time.perf_counter() - wait_start) * 1000, 3),
)
raise DeadlineExceededError("queue processing", timeout) from exc
get_current_telemetry().set(
"queue.wait.duration_ms",
round((time.perf_counter() - wait_start) * 1000, 3),
)
result["queue_status"] = build_queue_status_payload(status)
result["queue_status"] = status

return result
return result
finally:
request_wait_tracker.cleanup(telemetry_id)

async def build_index(
self, resource_uris: List[str], ctx: RequestContext, **kwargs
Expand Down
33 changes: 33 additions & 0 deletions openviking/storage/collection_schemas.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
from openviking.storage.queuefs.named_queue import DequeueHandlerBase
from openviking.storage.viking_vector_index_backend import VikingVectorIndexBackend
from openviking.telemetry import bind_telemetry, resolve_telemetry
from openviking.telemetry.request_wait_tracker import get_request_wait_tracker
from openviking.utils.circuit_breaker import (
CircuitBreaker,
CircuitBreakerOpen,
Expand Down Expand Up @@ -229,6 +230,7 @@ async def on_dequeue(self, data: Optional[Dict[str, Any]]) -> Optional[Dict[str,
collector = None
report_success = False
report_error_args: Optional[tuple[str, Optional[Dict[str, Any]]]] = None
request_failed_message: Optional[str] = None
try:
queue_data = json.loads(data["data"])
# Parse EmbeddingMsg from data
Expand All @@ -241,13 +243,15 @@ async def on_dequeue(self, data: Optional[Dict[str, Any]]) -> Optional[Dict[str,
if self._vikingdb.is_closing:
logger.debug("Skip embedding dequeue during shutdown")
self._merge_request_stats(embedding_msg.telemetry_id, processed=1)
self._record_request_success(embedding_msg)
report_success = True
return None

# Only process string messages
if not isinstance(embedding_msg.message, str):
logger.debug(f"Skipping non-string message type: {type(embedding_msg.message)}")
self._merge_request_stats(embedding_msg.telemetry_id, processed=1)
self._record_request_success(embedding_msg)
report_success = True
return data

Expand All @@ -266,6 +270,7 @@ async def on_dequeue(self, data: Optional[Dict[str, Any]]) -> Optional[Dict[str,
report_success = True
return None
# No queue manager — cannot re-enqueue, drop with error
request_failed_message = "Circuit breaker open and no queue manager"
report_error_args = ("Circuit breaker open and no queue manager", data)
return None

Expand Down Expand Up @@ -306,6 +311,7 @@ async def on_dequeue(self, data: Optional[Dict[str, Any]]) -> Optional[Dict[str,
logger.critical(error_msg)
self._circuit_breaker.record_failure(embed_err)
self._merge_request_stats(embedding_msg.telemetry_id, error_count=1)
request_failed_message = error_msg
report_error_args = (error_msg, data)
return None

Expand All @@ -324,6 +330,7 @@ async def on_dequeue(self, data: Optional[Dict[str, Any]]) -> Optional[Dict[str,
logger.error(f"Failed to re-enqueue message: {requeue_err}")

self._merge_request_stats(embedding_msg.telemetry_id, error_count=1)
request_failed_message = error_msg
report_error_args = (error_msg, data)
return None

Expand All @@ -335,6 +342,7 @@ async def on_dequeue(self, data: Optional[Dict[str, Any]]) -> Optional[Dict[str,
error_msg = f"Dense vector dimension mismatch: expected {self._vector_dim}, got {len(result.dense_vector)}"
logger.error(error_msg)
self._merge_request_stats(embedding_msg.telemetry_id, error_count=1)
request_failed_message = error_msg
report_error_args = (error_msg, data)
return None

Expand All @@ -348,6 +356,7 @@ async def on_dequeue(self, data: Optional[Dict[str, Any]]) -> Optional[Dict[str,
error_msg = "Embedder not initialized, skipping vector generation"
logger.warning(error_msg)
self._merge_request_stats(embedding_msg.telemetry_id, error_count=1)
request_failed_message = error_msg
report_error_args = (error_msg, data)
return None

Expand Down Expand Up @@ -377,27 +386,32 @@ async def on_dequeue(self, data: Optional[Dict[str, Any]]) -> Optional[Dict[str,
if self._vikingdb.is_closing:
logger.debug(f"Skip embedding write during shutdown: {db_err}")
self._merge_request_stats(embedding_msg.telemetry_id, processed=1)
self._record_request_success(embedding_msg)
report_success = True
return None
logger.error(f"Failed to write to vector database: {db_err}")
self._merge_request_stats(embedding_msg.telemetry_id, error_count=1)
request_failed_message = str(db_err)
report_error_args = (str(db_err), data)
return None
except Exception as db_err:
if self._vikingdb.is_closing:
logger.debug(f"Skip embedding write during shutdown: {db_err}")
self._merge_request_stats(embedding_msg.telemetry_id, processed=1)
self._record_request_success(embedding_msg)
report_success = True
return None
logger.error(f"Failed to write to vector database: {db_err}")
import traceback

traceback.print_exc()
self._merge_request_stats(embedding_msg.telemetry_id, error_count=1)
request_failed_message = str(db_err)
report_error_args = (str(db_err), data)
return None

self._merge_request_stats(embedding_msg.telemetry_id, processed=1)
self._record_request_success(embedding_msg)
report_success = True
self._circuit_breaker.record_success()
return inserted_data
Expand All @@ -409,9 +423,12 @@ async def on_dequeue(self, data: Optional[Dict[str, Any]]) -> Optional[Dict[str,
traceback.print_exc()
if embedding_msg is not None:
self._merge_request_stats(embedding_msg.telemetry_id, error_count=1)
request_failed_message = str(e)
report_error_args = (str(e), data)
return None
finally:
if embedding_msg is not None and request_failed_message is not None:
self._record_request_failure(embedding_msg, request_failed_message)
if embedding_msg and embedding_msg.semantic_msg_id:
from openviking.storage.queuefs.embedding_tracker import EmbeddingTaskTracker

Expand All @@ -424,3 +441,19 @@ async def on_dequeue(self, data: Optional[Dict[str, Any]]) -> Optional[Dict[str,
self.report_error(*report_error_args)
elif report_success:
self.report_success()

@staticmethod
def _record_request_success(embedding_msg: EmbeddingMsg) -> None:
tracker = get_request_wait_tracker()
if embedding_msg.semantic_msg_id:
tracker.record_embedding_processed(embedding_msg.telemetry_id)
else:
tracker.mark_embedding_done(embedding_msg.telemetry_id, embedding_msg.id)

@staticmethod
def _record_request_failure(embedding_msg: EmbeddingMsg, message: str) -> None:
tracker = get_request_wait_tracker()
if embedding_msg.semantic_msg_id:
tracker.record_embedding_error(embedding_msg.telemetry_id, message)
else:
tracker.mark_embedding_failed(embedding_msg.telemetry_id, embedding_msg.id, message)
Loading
Loading