Skip to content

Implement request-scoped wait for write APIs#1212

Open
zhoujh01 wants to merge 1 commit intomainfrom
feat/content-write
Open

Implement request-scoped wait for write APIs#1212
zhoujh01 wants to merge 1 commit intomainfrom
feat/content-write

Conversation

@zhoujh01
Copy link
Copy Markdown
Collaborator

@zhoujh01 zhoujh01 commented Apr 3, 2026

将写接口 wait 改为请求级完成

Summary

把 add-resource、add-skill、content/write 的 wait=True 从“等待全局队列清空”改成“等待当前
请求自己的异步处理链完成”。本次改造同时覆盖:

  • HTTP 接口
  • local SDK
  • async/sync SDK 对 local client 的封装调用

外部 API 不变,wait_processed() 继续保留全局等待语义。响应里继续返回 queue_status,但其语
义从“全局队列状态”改成“当前请求聚合状态”。

实现策略是最大化复用现有机制:

  • telemetry_id 作为请求归属键
  • EmbeddingTaskTracker 的归零回调机制
  • semantic / embedding 队列消息已有的 telemetry_id 透传链
  • telemetry summary 继续走现有统计逻辑

Key Changes

1. 新增 RequestWaitTracker

新增一个最小版请求级等待跟踪器,职责只有两件事:

  • 等待某个请求完成
  • 生成该请求的 queue_status

内部按 telemetry_id 维护:

  • pending_semantic_roots: set[str]
  • pending_embedding_roots: set[str]
  • semantic.processed
  • semantic.error_count
  • semantic.errors[]
  • embedding.processed
  • embedding.error_count
  • embedding.errors[]
  • 跨线程完成信号

最小接口:

  • register_request(telemetry_id)
  • register_semantic_root(telemetry_id, semantic_msg_id)
  • register_embedding_root(telemetry_id, root_id)
  • mark_semantic_done(telemetry_id, semantic_msg_id, processed_delta=1)
  • mark_semantic_failed(telemetry_id, semantic_msg_id, message)
  • mark_embedding_done(telemetry_id, root_id, processed_delta=1)
  • mark_embedding_failed(telemetry_id, root_id, message)
  • wait_for_request(telemetry_id, timeout)
  • build_queue_status(telemetry_id)
  • cleanup(telemetry_id)

输出格式保持与现有兼容:

{
"Semantic": {
"processed": 1,
"error_count": 0,
"errors": []
},
"Embedding": {
"processed": 3,
"error_count": 0,
"errors": []
}
}

2. 范围明确覆盖 HTTP 和 local SDK

当前两条公开调用链都会创建启用态 OperationTelemetry,因此都有可用 telemetry_id:

  • HTTP router 通过 run_operation()
  • local client 通过 run_with_telemetry()

因此本次范围明确包括:

  • /api/v1/resources
  • /api/v1/skills
  • /api/v1/content/write
  • LocalClient.add_resource()
  • LocalClient.add_skill()
  • LocalClient.write()
  • 上层 AsyncOpenViking 和 sync wrapper 的对应方法

本次不新增 request id / task id。

3. add_resource(wait=True) 改造

文件:

  • openviking/service/resource_service.py
  • openviking/utils/resource_processor.py

实现:

  • wait=True 时先 register_request(telemetry_id)
  • 资源导入最终 enqueue 根 SemanticMsg 时,调用 register_semantic_root(telemetry_id,
    msg.id)
  • 原来的 queue_manager.wait_complete(timeout=...) 替换为
    request_wait_tracker.wait_for_request(telemetry_id, timeout)
  • 完成后用 build_queue_status(telemetry_id) 写入 result["queue_status"]
  • 在 finally 中清理 request tracker 状态

结果:

  • add_resource(wait=True) 只等本次导入对应的 semantic root 完成
  • 不再被其他请求的 semantic / embedding 队列拖住

4. semantic 完成链复用现有 EmbeddingTaskTracker

文件:

  • openviking/storage/queuefs/semantic_dag.py
  • openviking/storage/queuefs/semantic_processor.py
  • openviking/storage/queuefs/embedding_tracker.py

当前已有:

  • semantic DAG 会把 embedding 子任务数注册到 EmbeddingTaskTracker
  • embedding 全部完成后,通过 on_complete 做 semantic 收尾

本次在这个现成链路末尾补请求级通知:

  • semantic 成功且无 embedding 子任务:
    • mark_semantic_done(...)
  • semantic 成功且有 embedding 子任务:
    • 在 EmbeddingTaskTracker.on_complete 回调末尾补 mark_semantic_done(...)
  • semantic 终态失败:
    • mark_semantic_failed(...)

不改变 EmbeddingTaskTracker 的原职责,它仍然只负责一个 semantic_msg_id 下 embedding 子任
务的归零与回调。

5. content/write(wait=True) 改造

文件:

  • openviking/storage/content_write.py
  • openviking/service/fs_service.py

实现:

  • enqueue semantic refresh 时,使用当前 telemetry_id 并
    register_semantic_root(telemetry_id, msg.id)
  • _wait_for_queues() 替换为 wait_for_request(telemetry_id, timeout)
  • 返回前继续附带请求级 queue_status

注意 memory 分支:

  • 先同步执行 _vectorize_single_file()
  • 再 enqueue memory refresh 的 SemanticMsg
  • wait=True 只等待这个 refresh root 结束
  • 不为同步那一步额外创建 root task

6. add_skill(wait=True) 按单 embedding root 实现

文件:

  • openviking/utils/skill_processor.py
  • openviking/service/resource_service.py
  • openviking/storage/collection_schemas.py

现状:

  • add_skill 不走 semantic queue
  • 它只 enqueue 一个 EmbeddingMsg

本次最小实现:

  • wait=True 时 register_request(telemetry_id)
  • _index_skill() enqueue 该 EmbeddingMsg 前,注册一个 embedding root
  • TextEmbeddingHandler 成功处理该消息后:
    • mark_embedding_done(...)
  • 终态失败时:
    • mark_embedding_failed(...)

不抽象通用 embedding batch。以后如果 skill 变成多条 embedding message,再扩展。

7. queue_status 与 telemetry summary 分离

文件:

  • openviking/telemetry/resource_summary.py

原则:

  • queue_status 由 RequestWaitTracker 生成
  • telemetry summary 继续由现有 consume_request_stats() / consume_dag_stats() 生成
  • 两边不共享同一份 destructive source

这样可以保证:

  • queue_status 继续返回,兼容现有调用方
  • telemetry summary 不会因为先读取 queue_status 而丢数据
  • 两边的计数都按同一个 telemetry_id 归属

8. re-enqueue 状态机

请求级等待必须正确处理 retry / re-enqueue:

  • re-enqueue 不算 done
  • re-enqueue 不算 failed
  • root 继续保留在 pending 集合
  • 只有终态成功或终态失败,才能从 pending 集合移除

这条规则适用于:

  • semantic queue
  • embedding queue

因此:

  • circuit breaker / retry 分支不能调用 mark_*_done/failed
  • 只在终态分支发请求级状态通知

9. queue_status 成本控制

本次继续返回 queue_status,但不再通过全局队列状态生成它,也不让它成为主要成本来源。

做法:

  • wait 的主要成本只来自“等待请求级 root 完成”
  • queue_status 本身只由 tracker 内存状态组装,O(当前请求错误数)
  • 不做额外全局 queue status 拉取
  • 不再调用全局 check_status() 作为返回值来源

10. 保持全局等待接口不变

文件:

  • openviking/service/resource_service.py
  • openviking/server/routers/system.py
  • openviking/client/local.py

不改:

  • wait_processed()
  • /api/v1/system/wait

它们继续调用 QueueManager.wait_complete(),保持全局 drain 语义。

Test Plan

HTTP 路径

  1. POST /api/v1/resources with wait=true
  • 当前请求完成后立即返回,不等待别的请求
  1. POST /api/v1/content/write with wait=true
  • 当前 refresh root 完成后返回
  1. POST /api/v1/skills with wait=true
  • 当前 skill 的 embedding 完成后返回

local SDK 路径

  1. LocalClient.add_resource(wait=True)
  • 行为与 HTTP 一致
  1. LocalClient.write(wait=True)
  • 行为与 HTTP 一致
  1. LocalClient.add_skill(wait=True)
  • 行为与 HTTP 一致

上层 SDK 路径

  1. AsyncOpenViking.add_resource/write/add_skill(wait=True)
  • 与 local client 一致
  1. sync wrapper 对应方法
  • 与 async/local 一致

状态机与错误

  1. semantic root 无 embedding
  • semantic 完成即请求完成
  1. semantic root 有 embedding
  • 必须等 embedding 全部完成才请求完成
  1. re-enqueue 场景
  • 请求保持 pending,不提前返回
  1. semantic 终态失败
  • queue_status["Semantic"]["error_count"] > 0
  • errors[] 包含错误消息
  1. embedding 终态失败
  • queue_status["Embedding"]["error_count"] > 0
  • errors[] 包含错误消息

回归

  1. queue_status 结构兼容
  • 仍有 processed/error_count/errors
  1. telemetry summary 回归
  • resources.add_resource
  • resources.add_skill
  • content.write
    summary 继续正确输出请求级统计

优先修改/新增测试:

  • tests/server/test_api_resources.py
  • tests/server/test_api_content_write.py
  • tests/client/test_resource_management.py
  • tests/server/test_content_write_service.py

建议新增:

  • local SDK 请求级等待测试
  • 请求级并发隔离测试
  • re-enqueue 状态机测试

Assumptions

  • 本次范围明确包含 local SDK,不仅是 HTTP
  • 公开写接口都经过 telemetry wrapper,因此能拿到可用 telemetry_id
  • 不新增外部 task_id 协议
  • add_skill 本次按单 embedding root 落地,不做通用 batch 抽象
  • queue_status 继续保留,但其语义改为请求级聚合状态

fix: request wait telemetry id

fix: register request wait before enqueue

add log
@github-actions
Copy link
Copy Markdown

github-actions bot commented Apr 3, 2026

PR Reviewer Guide 🔍

Here are some key observations to aid the review process:

⏱️ Estimated effort to review: 4 🔵🔵🔵🔵⚪
🏅 Score: 92
🧪 PR contains tests
🔒 No security concerns identified
✅ No TODO sections
🔀 No multiple PR themes
⚡ No major issues detected

@github-actions
Copy link
Copy Markdown

github-actions bot commented Apr 3, 2026

PR Code Suggestions ✨

Explore these optional code suggestions:

CategorySuggestion                                                                                                                                    Impact
Possible issue
Use register_wait_telemetry in add_skill

Use register_wait_telemetry(wait) to get telemetry_id in add_skill, mirroring
add_resource. This ensures the telemetry handle is registered for async queue
consumers when enabled. Also, add unregister_wait_telemetry(telemetry_id) to the
finally block to clean up.

openviking/service/resource_service.py [407-445]

 self._ensure_initialized()
-telemetry_id = get_current_telemetry().telemetry_id
+telemetry_id = register_wait_telemetry(wait)
 request_wait_tracker = get_request_wait_tracker()
 if wait and telemetry_id:
     request_wait_tracker.register_request(telemetry_id)
 
 try:
     result = await self._skill_processor.process_skill(
         data=data,
         viking_fs=self._viking_fs,
         ctx=ctx,
         allow_local_path_resolution=allow_local_path_resolution,
     )
 
     if wait:
         wait_start = time.perf_counter()
         try:
             if telemetry_id:
                 await request_wait_tracker.wait_for_request(telemetry_id, timeout=timeout)
                 status = request_wait_tracker.build_queue_status(telemetry_id)
             else:
                 qm = get_queue_manager()
                 status = build_queue_status_payload(await qm.wait_complete(timeout=timeout))
         except TimeoutError as exc:
             get_current_telemetry().set_error(
                 "resource_service.wait_complete",
                 "DEADLINE_EXCEEDED",
                 str(exc),
             )
             raise DeadlineExceededError("queue processing", timeout) from exc
         get_current_telemetry().set(
             "queue.wait.duration_ms",
             round((time.perf_counter() - wait_start) * 1000, 3),
         )
         result["queue_status"] = status
 
     return result
 finally:
     request_wait_tracker.cleanup(telemetry_id)
+    unregister_wait_telemetry(telemetry_id)
Suggestion importance[1-10]: 6

__

Why: The suggestion aligns add_skill with add_resource by using register_wait_telemetry (to register telemetry for async queue consumers) and unregister_wait_telemetry for cleanup, improving consistency and correctness.

Low

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

Status: Backlog

Development

Successfully merging this pull request may close these issues.

2 participants