Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2,083 changes: 282 additions & 1,801 deletions Cargo.lock

Large diffs are not rendered by default.

13 changes: 7 additions & 6 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ members = [
"pegainfer-qwen3-4b",
"pegainfer-qwen35-4b",
"pegainfer-kv-cache",
"pegainfer-kv-offload",
# ---- pegainfer-comm (EP all-to-all) ----
"pegainfer-comm",
"pegainfer-comm/crates/pegainfer-comm-build-utils",
Expand All @@ -40,12 +41,9 @@ members = [
"kvbm/dynamo-kv-hashing",
"kvbm/dynamo-kv-router",
"kvbm/kvbm-common",
"kvbm/kvbm-config",
"kvbm/kvbm-consolidator",
"kvbm/kvbm-engine",
"kvbm/kvbm-kernels",
"kvbm/kvbm-logical",
"kvbm/kvbm-physical",
]

# Inherited by dynamo-ported crates that use `edition.workspace = true` etc.
Expand All @@ -66,12 +64,9 @@ dynamo-memory = { path = "kvbm/dynamo-memory" }
dynamo-kv-hashing = { path = "kvbm/dynamo-kv-hashing" }
dynamo-kv-router = { path = "kvbm/dynamo-kv-router", features = ["metrics"] }
kvbm-common = { path = "kvbm/kvbm-common" }
kvbm-config = { path = "kvbm/kvbm-config" }
kvbm-consolidator = { path = "kvbm/kvbm-consolidator" }
kvbm-engine = { path = "kvbm/kvbm-engine" }
kvbm-kernels = { path = "kvbm/kvbm-kernels" }
kvbm-logical = { path = "kvbm/kvbm-logical" }
kvbm-physical = { path = "kvbm/kvbm-physical" }
# ---- third-party ----
anyhow = "1.0"
async-nats = { version = "0.45.0", features = ["service"] }
Expand All @@ -96,6 +91,11 @@ cudarc = { version = "0.19.7", features = [
"cublas",
"f16",
"nccl",
# nvrtc: embedded pegaflow-core's transfer/kernel.rs references the nvrtc
# bindings unconditionally (its KernelBackend JIT-compiles the copy kernel).
# Lazy per-symbol loading (0.19.5+) keeps this off the runtime driver floor,
# so it stays compatible with the cuda-12090 binding level (issue #263).
"nvrtc",
] }
cxx = "1.0.187"
cxx-build = "1.0.187"
Expand Down Expand Up @@ -129,6 +129,7 @@ parking_lot = "0.12.5"
pegainfer-bench = { path = "pegainfer-bench" }
pegainfer-core = { path = "pegainfer-core" }
pegainfer-kv-cache = { path = "pegainfer-kv-cache" }
pegainfer-kv-offload = { path = "pegainfer-kv-offload" }
pegainfer-cupti = { path = "pegainfer-cupti" }
pegainfer-deepseek-v4 = { path = "pegainfer-deepseek-v4" }
pegainfer-engine = { path = "pegainfer-engine" }
Expand Down
1 change: 1 addition & 0 deletions docs/index.md
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,7 @@ Organized by domain (model line / subsystem / playbook / lesson) instead of by l
| --- | --- |
| `subsystems/runtime/runtime.md` | Runtime complexity is controlled by a shared `pegainfer-core` that owns the generation contract and orchestration; per-model crates implement `ModelForward` so prefill/decode and hybrid attention stay hidden from the caller. State (`&mut`) is separated from weights (`&self`) for future bs > 1. |
| `subsystems/runtime/kv-cache-design.md` | Dynamo 式 logical/physical 分层 KV cache:BlockManager 管 block 生命周期和 admission,PhysicalBackend trait 管 GPU 内存和布局(FullAttention / MLA)。支持 TP / DP。基于 vLLM/Dynamo/pegaflow 调研。 |
| `subsystems/runtime/pegaflow-offload-integration.md` | 把 `pegaflow-core` 当进程内 Rust 库做 KV 卸载物理后端(HBM→DRAM/SSD/RDMA),补 kvbm 没写的卸载层。**Qwen3-4B full-attn 首发,端到端已在真实 GPU 跑通并验证**(async SAVE+LOAD 接进 executor/scheduler,纯 CPU-hit 与 GPU+CPU 组合 hit 恢复后 logits 与冷算一致)。pegaflow 经 git rev pin(#331+#333)。默认关,未接 server CLI。linear 排除,sparse 暂缓。 |

## subsystems / scheduler

Expand Down
157 changes: 157 additions & 0 deletions docs/subsystems/runtime/pegaflow-offload-integration.md

Large diffs are not rendered by default.

10 changes: 10 additions & 0 deletions kvbm/kvbm-logical/src/manager/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,16 @@ impl<T: BlockMetadata + Sync> BlockManager<T> {
Ok(())
}

/// Evict every cached-but-unused block: drain the inactive pool back to the
/// reset pool. Active blocks (held by a request or an external strong ref,
/// e.g. a leaked padding reservation) are untouched. Unlike
/// [`reset_inactive_pool`](Self::reset_inactive_pool) this makes no
/// assertion about the resulting free count, so it is safe to call on a
/// pool that still has pinned blocks — a cold-cache flush, not a reset.
pub fn evict_inactive(&self) {
drop(self.store.drain_inactive_to_mutable());
}

/// Register a batch of completed blocks.
pub fn register_blocks(&self, blocks: Vec<CompleteBlock<T>>) -> Vec<ImmutableBlock<T>> {
blocks
Expand Down
14 changes: 13 additions & 1 deletion pegainfer-kv-cache/src/buffer.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use std::sync::Arc;

use cudarc::driver::{CudaSlice, CudaStream};
use cudarc::driver::{CudaSlice, CudaStream, DevicePtr};
use half::bf16;

use crate::KvLayout;
Expand Down Expand Up @@ -51,6 +51,18 @@ impl KvBuffer {
&self.inner.buffer
}

/// Base device address of the fused KV buffer.
///
/// Stable for the buffer's lifetime — cudarc allocations don't move — so
/// the KV-offload connector registers this once with pegaflow and the
/// page-first [`KvLayout`] strides reach every (layer, block, K/V) segment
/// from it. The returned address outlives the transient stream-ordering
/// guard precisely because the `Arc<Inner>` keeps the slice alive.
pub fn device_ptr(&self, stream: &CudaStream) -> u64 {
let (ptr, _guard) = self.inner.buffer.device_ptr(stream);
ptr
}

pub fn num_blocks(&self) -> usize {
self.inner.num_blocks
}
Expand Down
2 changes: 1 addition & 1 deletion pegainfer-kv-cache/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ mod view;
pub use buffer::KvBuffer;
pub use layout::KvLayout;
pub use manager::KvCacheManager;
pub use pool::{BlockPool, RequestKv};
pub use pool::{BlockPool, KvBlockGuard, LoadReservation, PrefixProbe, RequestKv};
pub use view::{KvView, KvViewDesc};

pub use kvbm_logical;
235 changes: 235 additions & 0 deletions pegainfer-kv-cache/src/pool.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
use kvbm_logical::SequenceHash;
use kvbm_logical::blocks::{ImmutableBlock, MutableBlock};
use kvbm_logical::integrations::{DecodeOutcome, SchedulableSequence, ScheduleError};
use kvbm_logical::manager::BlockManager;
use kvbm_logical::pools::BlockDuplicationPolicy;
Expand Down Expand Up @@ -78,6 +79,14 @@ impl BlockPool {
self.block_manager.total_blocks().saturating_sub(1)
}

/// Evict every cached-but-unused block from the GPU prefix cache (drain the
/// inactive pool). In-use blocks are untouched. A cold-cache flush — and,
/// for the offload path, the way to force a prefix out of HBM so a
/// subsequent request must restore it from the CPU tier.
pub fn evict_inactive(&self) {
self.block_manager.evict_inactive();
}

/// `lora_name` scopes the prefix cache: blocks registered under one
/// adapter (or the base model, `None`) never match a request running
/// under a different adapter — the name is folded into the block-hash
Expand All @@ -100,6 +109,139 @@ impl BlockPool {
);
RequestKv { seq }
}

// ── KV-offload prefetch (CPU-tier load before prefill) ─────────────

/// Resolve `prompt_tokens` against the GPU prefix cache *without* creating
/// a request, returning a [`PrefixProbe`] that holds the GPU-hit prefix
/// blocks alive so an async CPU-tier load can extend it. The connector
/// queries the probe's [`PrefixProbe::cpu_query_hashes`] against the host
/// tier, then [`reserve_loaded_blocks`](Self::reserve_loaded_blocks) +
/// load + [`commit_loaded_blocks`](Self::commit_loaded_blocks).
///
/// `lora_name` must match the request's adapter — it salts the block
/// hashes, so probing under the wrong adapter would query unrelated keys.
pub fn probe_prefix(&self, prompt_tokens: Vec<u32>, lora_name: Option<&str>) -> PrefixProbe {
let num_input = prompt_tokens.len();
let rkv = self.new_request(prompt_tokens, 0, lora_name);
let seq_hashes = rkv.seq.inner().sequence().all_sequence_hashes();
// match_and_add_prefix leaves >=1 prompt token uncached, so a request
// can reuse at most this many leading blocks — the CPU load must not
// exceed it, or the trailing loaded block would never be re-matched.
let cacheable = num_input.saturating_sub(1) / self.block_size;
let gpu_guard = self.block_manager.match_blocks(&seq_hashes);
let gpu_hit = gpu_guard.len();
PrefixProbe {
seq_hashes,
gpu_hit,
cacheable,
held: gpu_guard,
}
}

/// Reserve `count` mutable blocks as the GPU destinations for a CPU→GPU
/// load. Returns `None` under block pressure (the caller then skips the
/// prefetch and prefills from scratch). The reservation's
/// [`LoadReservation::page_ids`] feed the connector's load; on completion
/// hand it to [`commit_loaded_blocks`](Self::commit_loaded_blocks).
pub fn reserve_loaded_blocks(&self, count: usize) -> Option<LoadReservation> {
let blocks = self.block_manager.allocate_blocks(count)?;
Some(LoadReservation { blocks })
}

/// Stage + register the freshly-loaded blocks under the probe's
/// continuation hashes (`seq_hashes[gpu_hit .. gpu_hit + reserved]`) and
/// fold them into the probe's held set, so a following
/// `new_request().match_and_add_prefix()` reuses the full GPU+CPU prefix.
///
/// The probe keeps holding every registered block until the request
/// prefills, closing the eviction window between registration and re-match.
pub fn commit_loaded_blocks(&self, probe: &mut PrefixProbe, reservation: LoadReservation) {
let start = probe.gpu_hit;
for (i, block) in reservation.blocks.into_iter().enumerate() {
let hash = probe.seq_hashes[start + i];
let complete = block
.stage(hash, self.block_size)
.expect("loaded block stage: block_size invariant violated");
probe.held.push(self.block_manager.register_block(complete));
}
}
}

/// A prompt's prefix resolved against the GPU cache, ready to drive a CPU-tier
/// prefetch. Holds every GPU-hit (and, after commit, CPU-loaded) block so they
/// can't be evicted while the load is in flight and before the request prefills.
pub struct PrefixProbe {
/// Content hashes of every complete prompt block, in order (native form).
seq_hashes: Vec<SequenceHash>,
/// Length of the contiguous GPU-resident prefix.
gpu_hit: usize,
/// Reuse cap: blocks past this are never matched (the final chunk forwards).
cacheable: usize,
/// Strong refs keeping matched/loaded blocks resident until prefill.
held: Vec<ImmutableBlock<()>>,
}

impl PrefixProbe {
/// Blocks already resident in GPU HBM (the existing prefix-cache hit).
pub fn gpu_hit_blocks(&self) -> usize {
self.gpu_hit
}

/// Total blocks this probe holds: the GPU-hit prefix plus any committed from
/// a CPU-tier load. They are already out of the free pool and become the
/// request's cached prefix at prefill, so admission credits them against the
/// request's block need (avoiding a double-count against `available_blocks`).
pub fn held_blocks(&self) -> usize {
self.held.len()
}

/// Content hashes to query the CPU tier with: the blocks past the GPU hit,
/// capped at the reuse boundary. Empty when the GPU hit already covers
/// every reusable block (nothing to load — prefill normally).
pub fn cpu_query_hashes(&self) -> Vec<Vec<u8>> {
if self.gpu_hit >= self.cacheable {
return Vec::new();
}
self.seq_hashes[self.gpu_hit..self.cacheable]
.iter()
.map(|h| sequence_hash_bytes(h).to_vec())
.collect()
}
}

/// An opaque strong pin on one registered KV block. While held it keeps the
/// block in the active pool (out of the free/inactive pools), so the physical
/// slot cannot be reallocated. Used to hold a block stable across an in-flight
/// async offload save; cheap to clone/drop (one `Arc` bump). See
/// [`RequestKv::assigned_block_guards`].
///
/// The inner guard is never read — it exists purely for its `Drop`, which
/// releases the pin. Holding the value *is* the contract.
pub struct KvBlockGuard(#[allow(dead_code)] ImmutableBlock<()>);

/// GPU destination blocks reserved for a CPU→GPU load, consumed by
/// [`BlockPool::commit_loaded_blocks`] once the DMA lands.
pub struct LoadReservation {
blocks: Vec<MutableBlock<()>>,
}

impl LoadReservation {
/// Physical page ids the connector loads the leased CPU blocks into, in
/// lease order (the i-th leased block lands in `page_ids()[i]`).
pub fn page_ids(&self) -> Vec<i32> {
self.blocks.iter().map(|b| b.block_id() as i32).collect()
}

/// Number of reserved destination blocks.
pub fn len(&self) -> usize {
self.blocks.len()
}

/// True when no destinations were reserved.
pub fn is_empty(&self) -> bool {
self.blocks.is_empty()
}
}

/// Per-request KV state wrapping `SchedulableSequence`.
Expand Down Expand Up @@ -229,12 +371,105 @@ impl RequestKv {
pages.truncate(kv_tokens.div_ceil(self.seq.block_size()));
pages
}

// ── KV offload bridge ──────────────────────────────────────────────

/// Content hashes of every *full* prompt block, in prompt order.
///
/// These are the keys the KV-offload connector queries the CPU tier with,
/// so they must be identical across any two requests sharing a prefix.
/// They are kvbm's lineage-based [`SequenceHash`], which is exactly that:
/// position + content + parent fragment, so block `i` of prompt `P` hashes
/// the same no matter which request computed it.
pub fn prompt_block_hashes(&self) -> Vec<[u8; 16]> {
self.seq
.inner()
.sequence()
.all_sequence_hashes()
.iter()
.map(sequence_hash_bytes)
.collect()
}

/// `(page_id, content_hash)` for every block currently assigned to this
/// request, in prompt order. Drives the offload save once a block seals;
/// the first [`prefix_matched_blocks`](Self::prefix_matched_blocks) entries
/// are GPU-hit reuse (already resident) and are normally skipped.
pub fn assigned_block_hashes(&self) -> Vec<(i32, [u8; 16])> {
self.seq
.inner()
.assignments()
.assigned_iter()
.map(|(&id, block)| (id as i32, sequence_hash_bytes(&block.sequence_hash())))
.collect()
}

/// Strong pins for every block currently assigned to this request, aligned
/// 1:1 (same order) with [`assigned_block_hashes`](Self::assigned_block_hashes).
///
/// An offload save's GPU→CPU copy runs asynchronously after the save is
/// submitted; holding the matching [`KvBlockGuard`] keeps that block out of
/// the free/inactive pool until the copy lands, so a later request can't be
/// allocated the same slot and overwrite it mid-copy. Drop the guard once
/// the save reports done.
pub fn assigned_block_guards(&self) -> Vec<KvBlockGuard> {
self.seq
.inner()
.assignments()
.assigned_iter()
.map(|(_, block)| KvBlockGuard(block.clone()))
.collect()
}

/// Number of leading blocks reused from the GPU prefix cache.
pub fn prefix_matched_blocks(&self) -> usize {
self.seq.inner().prefix_matched_blocks()
}
}

/// Pack a kvbm [`SequenceHash`] (lineage hash) into the 16-byte content key the
/// offload tier addresses blocks by. Big-endian for a stable on-wire ordering.
fn sequence_hash_bytes(hash: &SequenceHash) -> [u8; 16] {
hash.as_u128().to_be_bytes()
}

#[cfg(test)]
mod tests {
use super::*;

/// The offload CPU-tier query keys are `prompt_block_hashes`. The whole
/// load path is built on these being identical for any two requests that
/// share a prefix (and diverging the moment content does) — otherwise a
/// warm block saved by one request would never match the next. Guard it.
#[test]
fn prompt_block_hashes_stable_across_shared_prefix() {
let pool = BlockPool::new(16, 256).unwrap();
let shared: Vec<u32> = (0..48).map(|i| 1000 + i).collect(); // 3 full blocks
let mut a_tokens = shared.clone();
a_tokens.extend((0..16).map(|i| 7000 + i)); // 4th block diverges
let mut b_tokens = shared.clone();
b_tokens.extend((0..16).map(|i| 9000 + i));

let a = pool.new_request(a_tokens, 8, None);
let b = pool.new_request(b_tokens, 8, None);
let ha = a.prompt_block_hashes();
let hb = b.prompt_block_hashes();

assert_eq!(ha.len(), 4, "64 tokens / 16 = 4 full blocks");
assert_eq!(hb.len(), 4);
assert_eq!(ha[..3], hb[..3], "shared prefix must hash identically");
assert_ne!(ha[3], hb[3], "divergent block must hash differently");
assert!(ha.iter().all(|h| *h != [0u8; 16]), "hashes are non-trivial");

// A different LoRA salt must poison the match — same tokens, new keys.
let c = pool.new_request(shared, 8, Some("adapter-x"));
assert_ne!(
c.prompt_block_hashes()[0],
ha[0],
"salt (lora) must scope the prefix cache"
);
}

/// kvbm's `schedule_decode` allocates the next generation block when the
/// appended token fills the current block (`need = pending + 1`), so the
/// raw `page_indices()` exceeds `ceil(kv_tokens / block_size)` at every
Expand Down
Loading