Skip to content
Open
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
57 commits
Select commit Hold shift + click to select a range
9cc92f6
Draft
thorjohnsen Sep 3, 2025
28d6dd1
Code simplification
thorjohnsen Sep 9, 2025
8a06a28
Simplify eviction policy into true priority based eviction
thorjohnsen Sep 9, 2025
f30fa42
Remove personal info
thorjohnsen Sep 9, 2025
9d8c420
Remove superfluous method, add better comments
thorjohnsen Sep 9, 2025
6ceed6b
Fixes
thorjohnsen Sep 10, 2025
64f4504
Fixes
thorjohnsen Sep 10, 2025
5eaf9d5
Implement some lookup node methods
thorjohnsen Sep 10, 2025
800b05d
Move some method(s) to make PR diff easier to read
thorjohnsen Sep 10, 2025
21643e3
More fixes
thorjohnsen Sep 15, 2025
e0499cf
Faster and more flexible findNewContextBlock method
thorjohnsen Sep 23, 2025
4840a98
Fix last compile issue
thorjohnsen Sep 23, 2025
8647828
Bug fixes
thorjohnsen Sep 26, 2025
91445c5
Bug fixes
thorjohnsen Sep 26, 2025
76f317e
Turn debug printf into TLLM_LOG_DEBUG
thorjohnsen Oct 1, 2025
ca48af9
Bug fixes
thorjohnsen Oct 3, 2025
3399798
Add multi-node scanning for partial match
thorjohnsen Oct 8, 2025
78bcf4d
Bug fixes
thorjohnsen Oct 9, 2025
8836822
Add lookupBlocks method
thorjohnsen Oct 9, 2025
86ed0f8
Fix priority eviction, add lots of debug printouts
thorjohnsen Oct 10, 2025
42fe938
Fix unit test that relied on leaf block only eviction
thorjohnsen Oct 10, 2025
cef3334
Bug fixes and unit test adjustments
thorjohnsen Oct 14, 2025
ad2aa8f
Fix last unit test
thorjohnsen Oct 16, 2025
cfe0609
Remove superfluous arguments
thorjohnsen Oct 16, 2025
02abe39
Remove superfluous member variable
thorjohnsen Oct 17, 2025
f025cfb
Split setLookupNode into two methods to improve readability of code
thorjohnsen Oct 17, 2025
735fe3c
Move stuff around to simplify diff
thorjohnsen Oct 17, 2025
a678b91
Resolve merge conflicts
thorjohnsen Oct 17, 2025
787b681
Resolve build issue
thorjohnsen Oct 18, 2025
50a6f3c
Merge remote-tracking branch 'upstream/main' into user/tjohnsen/restr…
thorjohnsen Oct 18, 2025
a487b55
Fix remaining merge issues
thorjohnsen Oct 21, 2025
78c6253
Merge remote-tracking branch 'upstream/main' into user/tjohnsen/restr…
thorjohnsen Oct 21, 2025
bb421b9
precommit run
thorjohnsen Oct 21, 2025
f3e5c13
Manual precommit fix
thorjohnsen Oct 21, 2025
d427948
Address issue identified by coderabbit
thorjohnsen Oct 21, 2025
d57c3b4
Bug fixes
thorjohnsen Oct 22, 2025
3f696f6
Bug fixes, more debug printouts
thorjohnsen Oct 24, 2025
c8177d3
Fix last unit test failures after merge with main
thorjohnsen Oct 24, 2025
3802864
precommit run
thorjohnsen Oct 24, 2025
e3a1921
Fix spelling errors
thorjohnsen Oct 24, 2025
24dfa89
Merge remote-tracking branch 'upstream/main' into user/tjohnsen/restr…
thorjohnsen Oct 24, 2025
61f9702
Fix race condition that has been in code for months
thorjohnsen Oct 27, 2025
984a4d2
precommit run
thorjohnsen Oct 27, 2025
23c28fb
Merge remote-tracking branch 'upstream/main' into user/tjohnsen/restr…
thorjohnsen Oct 27, 2025
1014a3e
Update unit test to reflect last bug fix
thorjohnsen Oct 27, 2025
a8ad970
Merge remote-tracking branch 'upstream/main' into user/tjohnsen/restr…
thorjohnsen Oct 27, 2025
39e81e1
Merge remote-tracking branch 'upstream/main' into user/tjohnsen/restr…
thorjohnsen Oct 27, 2025
64ef5f3
Bug fix
thorjohnsen Oct 27, 2025
fd364d9
Bug fix
thorjohnsen Oct 29, 2025
3f22026
Merge remote-tracking branch 'upstream/main' into user/tjohnsen/restr…
thorjohnsen Oct 29, 2025
9367bbe
Fix merge conflicts
thorjohnsen Oct 30, 2025
98c0d29
Merge remote-tracking branch 'upstream/main' into user/tjohnsen/restr…
thorjohnsen Nov 3, 2025
a63d0b6
Add refresh blocks
Tabrizian Nov 3, 2025
12da9ae
Merge remote-tracking branch 'upstream/main' into user/tjohnsen/restr…
thorjohnsen Nov 4, 2025
f2c1d9a
Fix transfer manager synchronization issues
thorjohnsen Nov 6, 2025
0ee0004
Fix some comments
thorjohnsen Nov 10, 2025
b98a2fd
precommit run
thorjohnsen Nov 10, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 12 additions & 0 deletions cpp/include/tensorrt_llm/batch_manager/kvCacheManager.h
Original file line number Diff line number Diff line change
Expand Up @@ -1006,6 +1006,9 @@ class WindowBlockManager
return mBufferManager;
}

//! \brief Sync internal streams used by transfer manager with buffer manager stream
void syncTransferManagerWithBufferManager();

//! \brief Perform per-request bookkeeping
void refreshBlocks();

Expand Down Expand Up @@ -1435,6 +1438,9 @@ class BlockManager
//! \brief Store newest block for reuse
void storeNewBlock(GenerationRequest& sequence, OptionalRef<LlmRequest const> llmRequest);

//! \brief Sync internal streams used by transfer manager with buffer manager stream
void syncTransferManagerWithBufferManager();

//! \brief Perform per-request bookkeeping
void refreshBlocks();

Expand Down Expand Up @@ -1667,6 +1673,7 @@ class BaseKVCacheManager
[[nodiscard]] virtual runtime::ITensor::SharedPtr getIndexerKCachePool() const = 0;
[[nodiscard]] virtual SizeType32 getPoolLayerIdx(SizeType32 layer_idx) const = 0;

virtual void syncTransferManagerWithBufferManager() = 0;
virtual void refreshBlocks() = 0;
virtual void flushIterationEvents() = 0;

Expand Down Expand Up @@ -2032,6 +2039,11 @@ class KVCacheManager : public BaseKVCacheManager
return mBlockManager.getPoolLayerIdx(layer_idx);
}

void syncTransferManagerWithBufferManager() override
{
mBlockManager.syncTransferManagerWithBufferManager();
}

//! \brief Perform per-iteration bookkeeping
void refreshBlocks() override
{
Expand Down
12 changes: 9 additions & 3 deletions cpp/include/tensorrt_llm/batch_manager/kvCacheTransferManager.h
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,11 @@ class KVCacheTransferManager
int numTokensToCopy = 0, executor::KvCacheTransferMode mode = executor::KvCacheTransferMode::DRAM,
std::string const& directory = "");

//! \brief Synchronize the offload/onboard streams with the bufferManager stream.
//! \brief Synchronize internal streams with bufferManager stream.
//! \details The buffer manager uses the same stream as the prefill and decode kernels. This method ensures that the internal kernels used for offloading and onboarding will wait for prefill and decode kernels before performing any block copies. This method must be called before the first call to KVCacheManager::addSequence in every step.
void syncWithBufferManager();

//! \brief Synchronize bufferManager stream with internal streams. This method ensures that prefill and decode kernels for next step will wait for offloading and onboarding work that has already been scheduled. This method must be called after last call to KVCacheManager::addSequence in every step.
void syncTransfers();

private:
Expand Down Expand Up @@ -75,8 +79,10 @@ class KVCacheTransferManager
runtime::BufferManager mOnboardManager;
runtime::BufferManager mOffloadManager;

// Track the block ids offloaded in this iteration.
std::unordered_map<int32_t, tr::CudaEvent> mPendingOffloads;
// Track reads and writes for blocks. Note that it is the memory pool index that
// identifies the raw memory blocks involved in I/O, not the block Id.
std::unordered_map<kernels::KVCacheIndex::UnderlyingType, tr::CudaEvent> mPendingReads;
std::unordered_map<kernels::KVCacheIndex::UnderlyingType, tr::CudaEvent> mPendingWrites;
// Reference to parent loopback agent
std::shared_ptr<kvc::BaseLoopbackAgent> mLoopbackAgent;
int mDeviceId;
Expand Down
2 changes: 2 additions & 0 deletions cpp/tensorrt_llm/batch_manager/allocateKvCache.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@ void tensorrt_llm::batch_manager::AllocateKvCache::operator()(BaseKVCacheManager
TLLM_LOG_TRACE("%s start", __PRETTY_FUNCTION__);
NVTX3_SCOPED_RANGE(allocateKvCache);

kvCacheManager.syncTransferManagerWithBufferManager();

for (auto const& llmReq : contextRequests)
{
if (llmReq->isFirstContextChunk())
Expand Down
13 changes: 13 additions & 0 deletions cpp/tensorrt_llm/batch_manager/kvCacheManager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1968,6 +1968,19 @@ SizeType32 WindowBlockManager::loadOrAllocateBlocks(

return numMatchedTokens;
}

void BlockManager::syncTransferManagerWithBufferManager()
{
for (auto& [_, manager] : mWindowBlockManagers)
{
manager.syncTransferManagerWithBufferManager();
}
}

void WindowBlockManager::syncTransferManagerWithBufferManager()
{
mTransferManager->syncWithBufferManager();
}

void BlockManager::refreshBlocks()
{
Expand Down
119 changes: 106 additions & 13 deletions cpp/tensorrt_llm/batch_manager/kvCacheTransferManager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -207,47 +207,140 @@ void KVCacheTransferManager::copyBlock(BlockPtr const& src, BlockPtr const& dst,
}
}

//
// Note about recording events to wait for cudaMempyAsync calls between blocks:
// The memory copy involves raw memory blocks, which are pointed to by the
// memory pool block index. When recording events, you must use getMemoryPoolBlockIndex()
// as the raw memory block identifier. Earlier versions of this code used getBlockId()
// when recording events, this is just wrong. getBlockId() returns the logical block id,
// which has nothing to do with the raw memory block pointers involved in a cudaMemcpy.
//

//
// Notes about need for synchronization:
//
// Earlier versions of this code relied on decoder implicitly syncing GPU with CPU.
// This is inherently dangerous, it is not given that decoder will always explicitly sync
// GPU with CPU for every step, a major design goal of ongoing work is to avoid this.
// To make the code future proof, we introdduce a new method SyncWithBufferManager()
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Type-o, introdduce

// that ensures that internal copy streams will wait for prefill and decode kernels
// that have already been scheduled.
//
// Earlier versions of this code did not account for all possible cases were a new block copy
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

were -> where

// needed to wait for a previously scheduled copy to finish. For instance, it is possible
// that two primary blocks are offloaded to the same secondary block in a single step,
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Rightfully the secondary block provided by the policy should have the popped block removed from the secondary free block queue. Should the problem be inside eviction policy that this happens?

// scheduling the second offloading without waiting for the first one to finish leads to
// a corrupted block after offloading. It is possible that partial reuse will copy
// from a block that is currently being onboarded, scheduling the partial copy without
// waiting for the onboarding to finish will lead to a corrupted block. To handle all
// possible cases needing synchronization we record separate events for reads and writes
// to a block. When a new block copy is scheduled, we wait for all writes to the source
// block and all reads and writes to a destination block.
//
// As before, syncTransfers() must be called after last call to KVCacheManager::addSequence.
// Failing to do so will lead to corrupted blocks eventually.
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we have a mechanism to fool-proof this?

//

void KVCacheTransferManager::onboard(BlockPtr const& offloadBlock, BlockPtr const& block,
std::vector<KVCacheBlockPool> const& pools, int numTokensToCopy, executor::KvCacheTransferMode mode,
std::string const& directory)
{
if (mode != executor::KvCacheTransferMode::DRAM
&& mPendingOffloads.find(offloadBlock->getBlockId()) == mPendingOffloads.end())
// Wait for any pending writes before reading from offloadBlock
auto offloadBlockPendingWriteItr = mPendingWrites.find(offloadBlock->getMemoryPoolBlockIndex());
if (offloadBlockPendingWriteItr != mPendingWrites.end())
{
TLLM_LOG_DEBUG("Skipping onboard for block %d because it was never previously offloaded to disk",
offloadBlock->getBlockId());
return;
mOnboardManager.getStream().wait(offloadBlockPendingWriteItr->second);
// Don't erase, we are not changing state of offloadBlock
}

if (mPendingOffloads.find(offloadBlock->getBlockId()) != mPendingOffloads.end())
// Wait for any pending reads before overwriting block
auto blockPendingReadItr = mPendingReads.find(block->getMemoryPoolBlockIndex());
if (blockPendingReadItr != mPendingReads.end())
{
mOnboardManager.getStream().wait(mPendingOffloads[offloadBlock->getBlockId()]);
mOnboardManager.getStream().wait(blockPendingReadItr->second);
mPendingReads.erase(blockPendingReadItr);
}
// Wait for any pending writes before overwriting block
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it possible to have a pending read depnding on the pending write here?

auto blockPendingWriteItr = mPendingWrites.find(block->getMemoryPoolBlockIndex());
if (blockPendingWriteItr != mPendingWrites.end())
{
mOnboardManager.getStream().wait(blockPendingWriteItr->second);
mPendingWrites.erase(blockPendingWriteItr);
}

copyBlock(offloadBlock, block, pools, false, numTokensToCopy, mode, directory);

// Record new pending read from offloadBlock
mPendingReads[offloadBlock->getMemoryPoolBlockIndex()] = tr::CudaEvent();
mOnboardManager.getStream().record(mPendingReads[offloadBlock->getMemoryPoolBlockIndex()]);
// Record new pending write to block
mPendingWrites[block->getMemoryPoolBlockIndex()] = tr::CudaEvent();
mOnboardManager.getStream().record(mPendingWrites[block->getMemoryPoolBlockIndex()]);
}

void KVCacheTransferManager::offload(BlockPtr const& block, BlockPtr const& offloadBlock,
std::vector<KVCacheBlockPool> const& pools, int numTokensToCopy, executor::KvCacheTransferMode mode,
std::string const& directory)
{
mPendingOffloads[block->getBlockId()] = tr::CudaEvent();
// Wait for any pending writes before reading from block
auto blockPendingWriteItr = mPendingWrites.find(block->getMemoryPoolBlockIndex());
if (blockPendingWriteItr != mPendingWrites.end())
{
mOffloadManager.getStream().wait(blockPendingWriteItr->second);
// Don't erase, we are not changing state of block
}
// Wait for any pending reads before overwriting offloadBlock
auto offloadBlockPendingReadItr = mPendingReads.find(offloadBlock->getMemoryPoolBlockIndex());
if (offloadBlockPendingReadItr != mPendingReads.end())
{
mOffloadManager.getStream().wait(offloadBlockPendingReadItr->second);
mPendingReads.erase(offloadBlockPendingReadItr);
}
// Wait for any pending writes before overwriting offloadBlock
auto offloadBlockPendingWriteItr = mPendingWrites.find(offloadBlock->getMemoryPoolBlockIndex());
if (offloadBlockPendingWriteItr != mPendingWrites.end())
{
mOffloadManager.getStream().wait(offloadBlockPendingWriteItr->second);
mPendingWrites.erase(offloadBlockPendingWriteItr);
}

copyBlock(block, offloadBlock, pools, true, numTokensToCopy, mode, directory);
mOffloadManager.getStream().record(mPendingOffloads[block->getBlockId()]);

// Record new pending read from block
mPendingReads[block->getMemoryPoolBlockIndex()] = tr::CudaEvent();
mOffloadManager.getStream().record(mPendingReads[block->getMemoryPoolBlockIndex()]);
// Record new pending write to offloadBlock
mPendingWrites[offloadBlock->getMemoryPoolBlockIndex()] = tr::CudaEvent();
mOffloadManager.getStream().record(mPendingWrites[offloadBlock->getMemoryPoolBlockIndex()]);
}

void KVCacheTransferManager::syncWithBufferManager()
{
tr::CudaEvent readyForOffloadEvent;
mBufferManager.getStream().record(readyForOffloadEvent);
mOffloadManager.getStream().wait(readyForOffloadEvent);

tr::CudaEvent readyForOnboardEvent;
mBufferManager.getStream().record(readyForOnboardEvent);
mOnboardManager.getStream().wait(readyForOnboardEvent);

// Once we synchronize, clear our list of pending thransfers.
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

type-o, transfer

mPendingReads.clear();
mPendingWrites.clear();
}

void KVCacheTransferManager::syncTransfers()
{
tr::CudaEvent offloadEvent;
mOffloadManager.getStream().record(offloadEvent);
mBufferManager.getStream().wait(offloadEvent);

tr::CudaEvent onboardEvent;
mOnboardManager.getStream().record(onboardEvent);

mBufferManager.getStream().wait(offloadEvent);
mBufferManager.getStream().wait(onboardEvent);

// Once we synchronize, clear our list of pending thransfers.
mPendingOffloads.clear();
mPendingReads.clear();
mPendingWrites.clear();
}

} // namespace tensorrt_llm::batch_manager::kv_cache_manager
6 changes: 6 additions & 0 deletions cpp/tensorrt_llm/nanobind/batch_manager/kvCacheManager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -235,6 +235,11 @@ class PyKvCacheManager : public tbk::BaseKVCacheManager
NB_OVERRIDE_PURE(getPoolLayerIdx, layer_idx);
}

void syncTransferManagerWithBufferManager() override
{
NB_OVERRIDE_PURE(syncTransferManagerWithBufferManager);
}

void refreshBlocks() override
{
NB_OVERRIDE_PURE(refreshBlocks);
Expand Down Expand Up @@ -483,6 +488,7 @@ void tb::kv_cache_manager::KVCacheManagerBindings::initBindings(nb::module_& m)
nb::call_guard<nb::gil_scoped_release>())
.def("get_last_block_id", &BaseKVCacheManager::getLastBlockId, nb::call_guard<nb::gil_scoped_release>())
.def("unpin_blocks_by_id", &BaseKVCacheManager::unpinBlocksById, nb::call_guard<nb::gil_scoped_release>())
.def("sync_transfer_manager_with_buffer_manager", &BaseKVCacheManager::syncTransferManagerWithBufferManager, nb::call_guard<nb::gil_scoped_release>())
.def("refresh_blocks", &BaseKVCacheManager::refreshBlocks, nb::call_guard<nb::gil_scoped_release>());

nb::bind_vector<CacheBlockIds>(m, "CacheBlockIds")
Expand Down
6 changes: 6 additions & 0 deletions cpp/tensorrt_llm/pybind/batch_manager/kvCacheManager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -240,6 +240,11 @@ class PyKvCacheManager : public tbk::BaseKVCacheManager
PYBIND11_OVERLOAD_PURE(SizeType32, tbk::BaseKVCacheManager, getPoolLayerIdx, layer_idx);
}

void syncTransferManagerWithBufferManager() override
{
NB_OVERRIDE_PURE(syncTransferManagerWithBufferManager);
}

void refreshBlocks() override
{
PYBIND11_OVERLOAD_PURE(void, tbk::BaseKVCacheManager, refreshBlocks);
Expand Down Expand Up @@ -487,6 +492,7 @@ void tb::kv_cache_manager::KVCacheManagerBindings::initBindings(py::module_& m)
py::call_guard<py::gil_scoped_release>())
.def("get_last_block_id", &BaseKVCacheManager::getLastBlockId, py::call_guard<py::gil_scoped_release>())
.def("unpin_blocks_by_id", &BaseKVCacheManager::unpinBlocksById, py::call_guard<py::gil_scoped_release>())
.def("sync_transfer_manager_with_buffer_manager", &BaseKVCacheManager::syncTransferManagerWithBufferManager, nb::call_guard<nb::gil_scoped_release>())
.def("refresh_blocks", &BaseKVCacheManager::refreshBlocks, py::call_guard<py::gil_scoped_release>());

py::enum_<tbk::CacheType>(m, "CacheType")
Expand Down
2 changes: 1 addition & 1 deletion tensorrt_llm/_torch/pyexecutor/resource_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -409,6 +409,7 @@ def prepare_resources(self, scheduled_batch: ScheduledRequests):
context_batch = scheduled_batch.context_requests
generation_batch = scheduled_batch.generation_requests
# allocate KV Cache
self.impl.sync_transfer_manager_with_buffer_manager()
for req in context_batch:
req_beam_width = req.sampling_config.beam_width
if 'cp_type' in self.mapping.cp_config and CpType.STAR == self.mapping.cp_config[
Expand Down Expand Up @@ -436,7 +437,6 @@ def prepare_resources(self, scheduled_batch: ScheduledRequests):
block_ids = self.get_cache_indices(req)
self.kv_connector_manager.update_state_after_alloc(
req, block_ids)

self.impl.refresh_blocks()

for req in generation_batch:
Expand Down