Skip to content

Commit 37d4e32

Browse files
authored
Implement request-scoped wait for write APIs (#1212)
fix: request wait telemetry id fix: register request wait before enqueue add log
1 parent 89c3aa3 commit 37d4e32

19 files changed

+957
-56
lines changed

openviking/service/resource_service.py

Lines changed: 50 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515
from openviking.storage.queuefs import get_queue_manager
1616
from openviking.storage.viking_fs import VikingFS
1717
from openviking.telemetry import get_current_telemetry
18+
from openviking.telemetry.request_wait_tracker import get_request_wait_tracker
1819
from openviking.telemetry.resource_summary import (
1920
build_queue_status_payload,
2021
record_resource_wait_metrics,
@@ -150,6 +151,9 @@ async def add_resource(
150151
request_start = time.perf_counter()
151152
telemetry = get_current_telemetry()
152153
telemetry_id = register_wait_telemetry(wait)
154+
request_wait_tracker = get_request_wait_tracker()
155+
if wait and telemetry_id:
156+
request_wait_tracker.register_request(telemetry_id)
153157
watch_manager = self._get_watch_manager()
154158
watch_enabled = bool(
155159
watch_manager and to and not skip_watch_management and watch_interval > 0
@@ -194,11 +198,19 @@ async def add_resource(
194198
)
195199

196200
if wait:
197-
qm = get_queue_manager()
198201
wait_start = time.perf_counter()
199202
try:
200203
with telemetry.measure("resource.wait"):
201-
status = await qm.wait_complete(timeout=timeout)
204+
if telemetry_id:
205+
await request_wait_tracker.wait_for_request(
206+
telemetry_id, timeout=timeout
207+
)
208+
status = request_wait_tracker.build_queue_status(telemetry_id)
209+
else:
210+
qm = get_queue_manager()
211+
status = build_queue_status_payload(
212+
await qm.wait_complete(timeout=timeout)
213+
)
202214
except TimeoutError as exc:
203215
telemetry.set_error(
204216
"resource_service.wait_complete",
@@ -207,7 +219,7 @@ async def add_resource(
207219
)
208220
raise DeadlineExceededError("queue processing", timeout) from exc
209221
queue_wait_duration_ms = round((time.perf_counter() - wait_start) * 1000, 3)
210-
result["queue_status"] = build_queue_status_payload(status)
222+
result["queue_status"] = status
211223
record_resource_wait_metrics(
212224
telemetry_id=telemetry_id,
213225
queue_status=status,
@@ -257,6 +269,7 @@ async def add_resource(
257269
"resource.request.duration_ms",
258270
round((time.perf_counter() - request_start) * 1000, 3),
259271
)
272+
get_request_wait_tracker().cleanup(telemetry_id)
260273
unregister_wait_telemetry(telemetry_id)
261274

262275
async def _handle_watch_task_creation(
@@ -392,33 +405,44 @@ async def add_skill(
392405
Processing result
393406
"""
394407
self._ensure_initialized()
408+
telemetry_id = get_current_telemetry().telemetry_id
409+
request_wait_tracker = get_request_wait_tracker()
410+
if wait and telemetry_id:
411+
request_wait_tracker.register_request(telemetry_id)
395412

396-
result = await self._skill_processor.process_skill(
397-
data=data,
398-
viking_fs=self._viking_fs,
399-
ctx=ctx,
400-
allow_local_path_resolution=allow_local_path_resolution,
401-
)
413+
try:
414+
result = await self._skill_processor.process_skill(
415+
data=data,
416+
viking_fs=self._viking_fs,
417+
ctx=ctx,
418+
allow_local_path_resolution=allow_local_path_resolution,
419+
)
402420

403-
if wait:
404-
qm = get_queue_manager()
405-
wait_start = time.perf_counter()
406-
try:
407-
status = await qm.wait_complete(timeout=timeout)
408-
except TimeoutError as exc:
409-
get_current_telemetry().set_error(
410-
"resource_service.wait_complete",
411-
"DEADLINE_EXCEEDED",
412-
str(exc),
421+
if wait:
422+
wait_start = time.perf_counter()
423+
try:
424+
if telemetry_id:
425+
await request_wait_tracker.wait_for_request(telemetry_id, timeout=timeout)
426+
status = request_wait_tracker.build_queue_status(telemetry_id)
427+
else:
428+
qm = get_queue_manager()
429+
status = build_queue_status_payload(await qm.wait_complete(timeout=timeout))
430+
except TimeoutError as exc:
431+
get_current_telemetry().set_error(
432+
"resource_service.wait_complete",
433+
"DEADLINE_EXCEEDED",
434+
str(exc),
435+
)
436+
raise DeadlineExceededError("queue processing", timeout) from exc
437+
get_current_telemetry().set(
438+
"queue.wait.duration_ms",
439+
round((time.perf_counter() - wait_start) * 1000, 3),
413440
)
414-
raise DeadlineExceededError("queue processing", timeout) from exc
415-
get_current_telemetry().set(
416-
"queue.wait.duration_ms",
417-
round((time.perf_counter() - wait_start) * 1000, 3),
418-
)
419-
result["queue_status"] = build_queue_status_payload(status)
441+
result["queue_status"] = status
420442

421-
return result
443+
return result
444+
finally:
445+
request_wait_tracker.cleanup(telemetry_id)
422446

423447
async def build_index(
424448
self, resource_uris: List[str], ctx: RequestContext, **kwargs

openviking/storage/collection_schemas.py

Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
from openviking.storage.queuefs.named_queue import DequeueHandlerBase
2323
from openviking.storage.viking_vector_index_backend import VikingVectorIndexBackend
2424
from openviking.telemetry import bind_telemetry, resolve_telemetry
25+
from openviking.telemetry.request_wait_tracker import get_request_wait_tracker
2526
from openviking.utils.circuit_breaker import (
2627
CircuitBreaker,
2728
CircuitBreakerOpen,
@@ -229,6 +230,7 @@ async def on_dequeue(self, data: Optional[Dict[str, Any]]) -> Optional[Dict[str,
229230
collector = None
230231
report_success = False
231232
report_error_args: Optional[tuple[str, Optional[Dict[str, Any]]]] = None
233+
request_failed_message: Optional[str] = None
232234
try:
233235
queue_data = json.loads(data["data"])
234236
# Parse EmbeddingMsg from data
@@ -241,13 +243,15 @@ async def on_dequeue(self, data: Optional[Dict[str, Any]]) -> Optional[Dict[str,
241243
if self._vikingdb.is_closing:
242244
logger.debug("Skip embedding dequeue during shutdown")
243245
self._merge_request_stats(embedding_msg.telemetry_id, processed=1)
246+
self._record_request_success(embedding_msg)
244247
report_success = True
245248
return None
246249

247250
# Only process string messages
248251
if not isinstance(embedding_msg.message, str):
249252
logger.debug(f"Skipping non-string message type: {type(embedding_msg.message)}")
250253
self._merge_request_stats(embedding_msg.telemetry_id, processed=1)
254+
self._record_request_success(embedding_msg)
251255
report_success = True
252256
return data
253257

@@ -266,6 +270,7 @@ async def on_dequeue(self, data: Optional[Dict[str, Any]]) -> Optional[Dict[str,
266270
report_success = True
267271
return None
268272
# No queue manager — cannot re-enqueue, drop with error
273+
request_failed_message = "Circuit breaker open and no queue manager"
269274
report_error_args = ("Circuit breaker open and no queue manager", data)
270275
return None
271276

@@ -306,6 +311,7 @@ async def on_dequeue(self, data: Optional[Dict[str, Any]]) -> Optional[Dict[str,
306311
logger.critical(error_msg)
307312
self._circuit_breaker.record_failure(embed_err)
308313
self._merge_request_stats(embedding_msg.telemetry_id, error_count=1)
314+
request_failed_message = error_msg
309315
report_error_args = (error_msg, data)
310316
return None
311317

@@ -324,6 +330,7 @@ async def on_dequeue(self, data: Optional[Dict[str, Any]]) -> Optional[Dict[str,
324330
logger.error(f"Failed to re-enqueue message: {requeue_err}")
325331

326332
self._merge_request_stats(embedding_msg.telemetry_id, error_count=1)
333+
request_failed_message = error_msg
327334
report_error_args = (error_msg, data)
328335
return None
329336

@@ -335,6 +342,7 @@ async def on_dequeue(self, data: Optional[Dict[str, Any]]) -> Optional[Dict[str,
335342
error_msg = f"Dense vector dimension mismatch: expected {self._vector_dim}, got {len(result.dense_vector)}"
336343
logger.error(error_msg)
337344
self._merge_request_stats(embedding_msg.telemetry_id, error_count=1)
345+
request_failed_message = error_msg
338346
report_error_args = (error_msg, data)
339347
return None
340348

@@ -348,6 +356,7 @@ async def on_dequeue(self, data: Optional[Dict[str, Any]]) -> Optional[Dict[str,
348356
error_msg = "Embedder not initialized, skipping vector generation"
349357
logger.warning(error_msg)
350358
self._merge_request_stats(embedding_msg.telemetry_id, error_count=1)
359+
request_failed_message = error_msg
351360
report_error_args = (error_msg, data)
352361
return None
353362

@@ -377,27 +386,32 @@ async def on_dequeue(self, data: Optional[Dict[str, Any]]) -> Optional[Dict[str,
377386
if self._vikingdb.is_closing:
378387
logger.debug(f"Skip embedding write during shutdown: {db_err}")
379388
self._merge_request_stats(embedding_msg.telemetry_id, processed=1)
389+
self._record_request_success(embedding_msg)
380390
report_success = True
381391
return None
382392
logger.error(f"Failed to write to vector database: {db_err}")
383393
self._merge_request_stats(embedding_msg.telemetry_id, error_count=1)
394+
request_failed_message = str(db_err)
384395
report_error_args = (str(db_err), data)
385396
return None
386397
except Exception as db_err:
387398
if self._vikingdb.is_closing:
388399
logger.debug(f"Skip embedding write during shutdown: {db_err}")
389400
self._merge_request_stats(embedding_msg.telemetry_id, processed=1)
401+
self._record_request_success(embedding_msg)
390402
report_success = True
391403
return None
392404
logger.error(f"Failed to write to vector database: {db_err}")
393405
import traceback
394406

395407
traceback.print_exc()
396408
self._merge_request_stats(embedding_msg.telemetry_id, error_count=1)
409+
request_failed_message = str(db_err)
397410
report_error_args = (str(db_err), data)
398411
return None
399412

400413
self._merge_request_stats(embedding_msg.telemetry_id, processed=1)
414+
self._record_request_success(embedding_msg)
401415
report_success = True
402416
self._circuit_breaker.record_success()
403417
return inserted_data
@@ -409,9 +423,12 @@ async def on_dequeue(self, data: Optional[Dict[str, Any]]) -> Optional[Dict[str,
409423
traceback.print_exc()
410424
if embedding_msg is not None:
411425
self._merge_request_stats(embedding_msg.telemetry_id, error_count=1)
426+
request_failed_message = str(e)
412427
report_error_args = (str(e), data)
413428
return None
414429
finally:
430+
if embedding_msg is not None and request_failed_message is not None:
431+
self._record_request_failure(embedding_msg, request_failed_message)
415432
if embedding_msg and embedding_msg.semantic_msg_id:
416433
from openviking.storage.queuefs.embedding_tracker import EmbeddingTaskTracker
417434

@@ -424,3 +441,19 @@ async def on_dequeue(self, data: Optional[Dict[str, Any]]) -> Optional[Dict[str,
424441
self.report_error(*report_error_args)
425442
elif report_success:
426443
self.report_success()
444+
445+
@staticmethod
446+
def _record_request_success(embedding_msg: EmbeddingMsg) -> None:
447+
tracker = get_request_wait_tracker()
448+
if embedding_msg.semantic_msg_id:
449+
tracker.record_embedding_processed(embedding_msg.telemetry_id)
450+
else:
451+
tracker.mark_embedding_done(embedding_msg.telemetry_id, embedding_msg.id)
452+
453+
@staticmethod
454+
def _record_request_failure(embedding_msg: EmbeddingMsg, message: str) -> None:
455+
tracker = get_request_wait_tracker()
456+
if embedding_msg.semantic_msg_id:
457+
tracker.record_embedding_error(embedding_msg.telemetry_id, message)
458+
else:
459+
tracker.mark_embedding_failed(embedding_msg.telemetry_id, embedding_msg.id, message)

0 commit comments

Comments
 (0)