Skip to content

Fix skip-merge shuffle handle lifetime#15064

Open
amahussein wants to merge 3 commits into
NVIDIA:mainfrom
amahussein:rapids-15018
Open

Fix skip-merge shuffle handle lifetime#15064
amahussein wants to merge 3 commits into
NVIDIA:mainfrom
amahussein:rapids-15018

Conversation

@amahussein

Copy link
Copy Markdown
Collaborator

Signed-off-by: Ahmed Hussein (amahussein) a@ahussein.me

Fixes #15018.

Description

This fixes a skip-merge shuffle lifetime bug where unregisterShuffle could close a partial shuffle
file handle while a retained buffer, input stream, or Netty file region was still reading from it.
That could surface as failed shuffle reads when cleanup raced with in-flight fetch consumers.

The fix adds reference-counted lifecycle tracking for partial shuffle file handles. Catalog cleanup
now removes metadata immediately but defers the physical handle close until all active retained
buffers, streams, and file regions release their leases. Lease cleanup is hardened so all retained
handles are released even if one release throws.

This also fixes an interrupt cleanup hole in SpillablePartialFileHandle.doClose(): if close is
interrupted while waiting for an in-progress spill, cleanup now still releases resources and deletes
the temp file before restoring the interrupt flag.

Tests added or updated:

  • MultithreadedShuffleBufferCatalogSuite
    • retained skip-merge buffer remains readable across concurrent unregisterShuffle
    • convertToNetty file-region release closes the retained handle exactly once
  • SpillablePartialFileHandleSuite
    • interrupted close during an in-progress spill still completes cleanup and restores interrupt state

Validation run:

  • git diff --check
  • mvn install -pl sql-plugin -am -DskipTests -Dmaven.scaladoc.skip=true -Dbuildver=356
  • SpillablePartialFileHandleSuite passed, 18/18 tests
  • MultithreadedShuffleBufferCatalogSuite passed, 13/13 tests

Checklists

Documentation

  • Updated for new or modified user-facing features or behaviors
  • No user-facing change

Testing

  • Added or modified tests to cover new code paths
  • Covered by existing tests
    (Please provide the names of the existing tests in the PR description.)
  • Not required

Performance

  • Tests ran and results are added in the PR description
  • Issue filed with a link in the PR description
  • Not required

Signed-off-by: Ahmed Hussein (amahussein) <a@ahussein.me>

Fixes NVIDIA#15018

Retain partial shuffle handles while managed buffers, input streams, and Netty file regions are active so `unregisterShuffle` cannot close data under in-flight readers.

Harden lease cleanup and partial-file close handling so releases are exception-safe and interrupted close waits still finish resource cleanup before restoring the interrupt flag.

Add regression coverage for retained-buffer reads, Netty file-region release, and interrupted partial-file cleanup.
@amahussein amahussein self-assigned this Jun 11, 2026
@amahussein amahussein added the bug Something isn't working label Jun 11, 2026
@greptile-apps

greptile-apps Bot commented Jun 11, 2026

Copy link
Copy Markdown
Contributor

Greptile Summary

This PR fixes a skip-merge shuffle lifetime bug where unregisterShuffle could physically close a partial shuffle file handle while an in-flight fetch consumer (retained ManagedBuffer, MultiSegmentInputStream, or MultiSegmentFileRegion) was still reading from it. It also closes an interrupt-cleanup hole in SpillablePartialFileHandle.doClose().

  • Reference-counted lease system (ShuffleHandleLease): retain(), createInputStream(), and convertToNetty() now each acquire a read lease via acquireRead(). The catalog's unregisterShuffle path calls handle.close(), which sets closeRequested = true and defers doClose() until readRefCount reaches zero; the physical close then runs in the last releaseRead() via closeQuietly().
  • Interrupted-close hardening in doClose(): If the thread is interrupted while waiting for an in-progress spill to finish, the interrupt flag is suppressed, cleanup proceeds fully, and the flag is restored in finally — preventing resource leaks and temp-file accumulation on interrupted executor teardown.
  • Tests: MultithreadedShuffleBufferCatalogSuite gains a multithreaded race regression test and a deterministic convertToNetty/isPhysicallyClosed assertion; SpillablePartialFileHandleSuite gains full lease-lifecycle and interrupted-close unit tests.

Confidence Score: 5/5

Safe to merge. The deferred-close logic is correctly guarded: closed is set atomically inside the handle's monitor via markCloseIfReady(), preventing acquireRead from slipping in after the decision to close. The doClose() path correctly waits for any in-progress spill and restores the interrupt flag, and the lease rollback in ShuffleHandleLease.acquire() is complete.

The lock ordering is consistent (lease monitor then handle monitor, never reversed), doClose() is only ever dispatched once per handle (closed=true gates it), and the consumer release paths are all idempotent. The multithreaded regression test and the deterministic isPhysicallyClosed assertion give good coverage of the targeted race. No logic errors or resource-leak paths found.

No files require special attention.

Important Files Changed

Filename Overview
sql-plugin/src/main/scala/com/nvidia/spark/rapids/MultithreadedShuffleBufferCatalog.scala Adds ShuffleHandleLease (reference-counted read leases) and wires retain/release/createInputStream/convertToNetty to acquire/release leases; deferred close prevents use-after-free on concurrent unregisterShuffle.
sql-plugin/src/main/scala/com/nvidia/spark/rapids/spill/SpillablePartialFileHandle.scala Overrides close() with deferred-close logic guarded by readRefCount; adds closeQuietly() for consumer release paths; fixes doClose() to survive interrupts during spill-wait without losing cleanup.
tests/src/test/scala/com/nvidia/spark/rapids/MultithreadedShuffleBufferCatalogSuite.scala Adds multithreaded regression test for retained-buffer / concurrent-unregisterShuffle race and a deterministic test that convertToNetty closes the handle exactly once on region release.
tests/src/test/scala/com/nvidia/spark/rapids/spill/SpillablePartialFileHandleSuite.scala Adds lease lifecycle unit tests (deferred close, repeated close, immediate close, imbalanced acquire/release) and an interrupted-close test that verifies cleanup completes and interrupt flag is restored.

Sequence Diagram

%%{init: {'theme': 'neutral'}}%%
sequenceDiagram
    participant Reducer
    participant Buffer as MultiBatchManagedBuffer
    participant Lease as ShuffleHandleLease
    participant Handle as SpillablePartialFileHandle
    participant Catalog as MultithreadedShuffleBufferCatalog

    Reducer->>Buffer: retain()
    Buffer->>Lease: acquire(handles)
    Lease->>Handle: "acquireRead() refCount=1"
    Buffer-->>Reducer: this (retain lease stored)

    Reducer->>Buffer: createInputStream()
    Buffer->>Lease: acquire(handles)
    Lease->>Handle: "acquireRead() refCount=2"
    Buffer-->>Reducer: MultiSegmentInputStream (holds lease)

    Note over Catalog: Concurrent unregisterShuffle
    Catalog->>Handle: close()
    Handle->>Handle: "closeRequested=true, refCount>0 defer"

    Reducer->>Buffer: stream.read() handle still open

    Reducer->>Buffer: stream.close()
    Buffer->>Lease: close()
    Lease->>Handle: "releaseRead() refCount=1"

    Reducer->>Buffer: release()
    Buffer->>Lease: close()
    Lease->>Handle: "releaseRead() refCount=0 closeRequested=true doClose()"
    Handle->>Handle: Physical close + file delete
Loading
%%{init: {'theme': 'base', 'themeVariables': {"darkMode": true, "background": "#0d1117", "primaryColor": "#21262d", "primaryTextColor": "#e6edf3", "primaryBorderColor": "#8b949e", "lineColor": "#8b949e", "textColor": "#e6edf3", "edgeLabelBackground": "#161b22", "actorBkg": "#21262d", "actorBorder": "#8b949e", "actorTextColor": "#e6edf3", "actorLineColor": "#8b949e", "signalColor": "#8b949e", "signalTextColor": "#e6edf3", "noteBkgColor": "#373320", "noteBorderColor": "#d4a72c", "noteTextColor": "#f0e6c0", "labelBoxBkgColor": "#21262d", "labelBoxBorderColor": "#8b949e", "labelTextColor": "#e6edf3", "loopTextColor": "#e6edf3", "activationBkgColor": "#30363d", "activationBorderColor": "#8b949e"}}}%%
sequenceDiagram
    participant Reducer
    participant Buffer as MultiBatchManagedBuffer
    participant Lease as ShuffleHandleLease
    participant Handle as SpillablePartialFileHandle
    participant Catalog as MultithreadedShuffleBufferCatalog

    Reducer->>Buffer: retain()
    Buffer->>Lease: acquire(handles)
    Lease->>Handle: "acquireRead() refCount=1"
    Buffer-->>Reducer: this (retain lease stored)

    Reducer->>Buffer: createInputStream()
    Buffer->>Lease: acquire(handles)
    Lease->>Handle: "acquireRead() refCount=2"
    Buffer-->>Reducer: MultiSegmentInputStream (holds lease)

    Note over Catalog: Concurrent unregisterShuffle
    Catalog->>Handle: close()
    Handle->>Handle: "closeRequested=true, refCount>0 defer"

    Reducer->>Buffer: stream.read() handle still open

    Reducer->>Buffer: stream.close()
    Buffer->>Lease: close()
    Lease->>Handle: "releaseRead() refCount=1"

    Reducer->>Buffer: release()
    Buffer->>Lease: close()
    Lease->>Handle: "releaseRead() refCount=0 closeRequested=true doClose()"
    Handle->>Handle: Physical close + file delete
Loading

Reviews (2): Last reviewed commit: "Merge branch 'main' into rapids-15018" | Re-trigger Greptile

@amahussein

Copy link
Copy Markdown
Collaborator Author

build

amahussein added a commit to amahussein/spark-rapids that referenced this pull request Jun 12, 2026
…test changes

Databricks pre-merge CI is conditional: per jenkins/Jenkinsfile-blossom.premerge
it runs only when the PR title contains [databricks] or the diff touches a
Databricks-shim path (sql-plugin/src/main/...db/ or a path containing
"databricks"). The standard Linux pre-merge never runs Databricks.

This leaves a gap. A change can be correct on vanilla Spark yet behave
differently on the Databricks Spark fork without touching any auto-trigger
path -- e.g. integration tests that rely on filesystem/path semantics
(local vs DBFS/abfss, file:// scheme, os.walk/os.path) or that assert on
optimizer plan strings (alias names and plan rendering differ on DBR). Such a
test merges green because the only job that would have exercised it on
Databricks was never triggered, then surfaces as a failure later on an
unrelated PR that does carry [databricks] -- making an innocent PR look broken
and costing triage time.

To close the gap on the review side:

- .greptile/config.json: add the "databricks-ci-tag" rule (scoped to
  integration_tests/**, severity medium) so Greptile recommends adding
  [databricks] when an integration-test change looks Databricks-divergent and
  the PR title lacks the tag. It explicitly does not flag changes already under
  a *db* shim path (auto-covered) or doc-only changes, to avoid noise.
- .greptile/rules.md: split the vague [databricks] mention out of H7 into a
  focused H9 "Databricks coverage" checklist item.
- AGENTS.md: document when [databricks] is needed (not just how), so both
  humans and Greptile (whose instructions reference AGENTS.md) share one
  source of truth.

Scope is integration_tests/** only -- not the Scala unit-test dirs. The
Databricks pre-merge builds with -DskipTests and runs only the Python
integration tests (run_pyspark_from_build.sh); Scala unit tests never execute
on Databricks, so the [databricks] tag cannot validate them and recommending it
there would be misleading. Verified against the NVIDIA#15064 Databricks CI_PART1
console log: all shims built with -DskipTests, scalatest goal skipped (73x
"Tests are skipped", zero ScalaTest/Surefire run summaries), followed only by
run_pyspark_from_build.sh. The rule is advisory -- it nudges; it does not gate
merges.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
Signed-off-by: Ahmed Hussein (amahussein) <a@ahussein.me>
@amahussein amahussein requested review from abellina and binmahone June 15, 2026 12:06
@amahussein

Copy link
Copy Markdown
Collaborator Author

The CI/CD failure is unrelated problem reported in #15073

@amahussein amahussein marked this pull request as ready for review June 15, 2026 12:24
binmahone
binmahone previously approved these changes Jun 16, 2026

@binmahone binmahone left a comment

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

@abellina abellina left a comment

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @amahussein. I'd like to ask if there is an alternative to this implementation. From the error you quoted, the problem is that the file channel is closed while we are reading (full stop). Did I understand that right? If the channel were closed before we started reading, we'd be OK?

If so, could we just make sure that when we area reading from the channel we consistently hold the file handle lock?

https://github.com/amahussein/spark-rapids/blob/0331aa9299d209d6d4d938b2ce7a2f883bf896d9/sql-plugin/src/main/scala/com/nvidia/spark/rapids/spill/SpillablePartialFileHandle.scala#L483-L518

Sometimes, in the code I quoted above, we read it under the lock, sometimes we don't.

Wouldn't that just fix the race between close and read?

@amahussein

Copy link
Copy Markdown
Collaborator Author

Thanks @amahussein. I'd like to ask if there is an alternative to this implementation. From the error you quoted, the problem is that the file channel is closed while we are reading (full stop). Did I understand that right? If the channel were closed before we started reading, we'd be OK?

If so, could we just make sure that when we area reading from the channel we consistently hold the file handle lock?

https://github.com/amahussein/spark-rapids/blob/0331aa9299d209d6d4d938b2ce7a2f883bf896d9/sql-plugin/src/main/scala/com/nvidia/spark/rapids/spill/SpillablePartialFileHandle.scala#L483-L518

Sometimes, in the code I quoted above, we read it under the lock, sometimes we don't.

Wouldn't that just fix the race between close and read?

Thanks @abellina. Yes, you read it right: the failure is the file channel getting closed while a read is in flight. If close always landed strictly before any read started (and stayed closed only after all reads finished), we'd be fine.

The problem with fixing it via a read-time lock is what the failing consumer actually is. A monitor only covers a single in-method readAt() call. But the consumer that breaks isn't a readAt() — it's the Netty MultiSegmentFileRegion, whose transferTo() runs on the event loop and is driven chunk-by-chunk at the remote reducer's pace. To make a lock cover it I'd have to hold the handle monitor for the whole (possibly throttled) network send, which serializes all reads and invites deadlock.

And the produce→read gap here is real and wide, not a tight CPU race — remote serving is two-phase:

  • OpenBlocks: getBlockDatagetMergedBuffer (MultithreadedShuffleBufferCatalog.scala:276) returns the MultiBatchManagedBuffer, which Spark parks in OneForOneStreamManager. The producing method has returned — zero bytes read, no lease held yet.
  • later ChunkFetchRequest (a network round trip later, plus maxChunksBeingTransferred throttling): convertToNetty() builds the FileRegion and transferTo() finally reads.

unregisterShuffle runs on the independent rapids-shuffle-cleanup poller (ShuffleCleanupEndpoint.scala:142) and can fire anywhere in that gap. With skip-merge this is the common path, not an edge case — per the design doc §10.1, host-local fetches are forced remote and served from the catalog over the network.

This is also why I don't think we can lean on the cleanup timing instead. Doc §8.1 requires handles to stay alive "until all reducers have finished reading," but the trigger we actually have is SQL-execution-end + a 1s poll — a heuristic running on a different clock than the reads (speculative/zombie reduce attempts, multi-shuffle executions, driver-end vs. in-flight transfer all overlap). So the lifetime has to be enforced at the handle. Refcounting is already the documented mechanism (§6.3, "each open stream increments ref count on handles"); this PR just extends it to the other two consumers the serving path produces — Netty file regions and retained buffers.

On your specific observation: you're right that the read path is inconsistent — the memory read is under the lock, the file read at :491/:497 isn't. With a lease held that unlocked file read is safe by construction (the channel can't be physically closed while any lease is outstanding), but I'll add a comment there making that explicit so it doesn't read as an accidental omission.

@amahussein amahussein changed the title Fix skip-merge shuffle handle lifetime [databricks] Fix skip-merge shuffle handle lifetime Jun 22, 2026
@abellina

Copy link
Copy Markdown
Collaborator

Thanks @amahussein
I think what we want is, instead of having a separate lease mechanism, fold all of it inside of the partial file handle. The ref count is held there. The handle controls everything about the lifecycle of the data, so I don't think we want a separate lease to manage it.

Then all the changes outside of the handle don't need to be made. Other than perhaps a call to a new method "acquire()" that incRefCounts the handle. The "close()" method would decRefCount until the lease is done.

Thoughts?

@amahussein

Copy link
Copy Markdown
Collaborator Author

Thanks @amahussein I think what we want is, instead of having a separate lease mechanism, fold all of it inside of the partial file handle. The ref count is held there. The handle controls everything about the lifecycle of the data, so I don't think we want a separate lease to manage it.

Then all the changes outside of the handle don't need to be made. Other than perhaps a call to a new method "acquire()" that incRefCounts the handle. The "close()" method would decRefCount until the lease is done.

Thoughts?

Thanks @abellina. I'm on board with moving the ref count into the handle. It lets PartitionSegment go back to just (handle, offset, length) and drops the ShuffleHandleReference wrapper. Good direction.

Two things I want to flag before I make the change:

  1. I'd rather not have close() itself do the decRefCount. SpillablePartialFileHandle.close() is the AutoCloseable "free it now" call, and it's already used that way outside the catalog: the writer cleanup path (RapidsShuffleInternalManagerBase.scala:922) and the merge path (:1037) both call handle.close() to physically release, and so does the spill store on eviction. If close() starts meaning "drop a ref", those paths stop freeing. So I'd add explicit acquire() / release() on the handle and let the last release() run the physical close internally. Same idea you described, just without overloading close().
  2. "No changes outside the handle" is mostly true but not all the way. A buffer, stream, or file region can span multiple handles in the multi-batch case, so a consumer still has to acquire all of them and release all of them exactly once even if one release throws. That's the part ShuffleHandleLease does today. It gets a lot smaller, but I'd keep a thin helper for it rather than copy the loop into the four consumers.

@abellina

Copy link
Copy Markdown
Collaborator

Thanks @amahussein I think what we want is, instead of having a separate lease mechanism, fold all of it inside of the partial file handle. The ref count is held there. The handle controls everything about the lifecycle of the data, so I don't think we want a separate lease to manage it.
Then all the changes outside of the handle don't need to be made. Other than perhaps a call to a new method "acquire()" that incRefCounts the handle. The "close()" method would decRefCount until the lease is done.
Thoughts?

Thanks @abellina. I'm on board with moving the ref count into the handle. It lets PartitionSegment go back to just (handle, offset, length) and drops the ShuffleHandleReference wrapper. Good direction.

Two things I want to flag before I make the change:

  1. I'd rather not have close() itself do the decRefCount. SpillablePartialFileHandle.close() is the AutoCloseable "free it now" call, and it's already used that way outside the catalog: the writer cleanup path (RapidsShuffleInternalManagerBase.scala:922) and the merge path (:1037) both call handle.close() to physically release, and so does the spill store on eviction. If close() starts meaning "drop a ref", those paths stop freeing. So I'd add explicit acquire() / release() on the handle and let the last release() run the physical close internally. Same idea you described, just without overloading close().
  2. "No changes outside the handle" is mostly true but not all the way. A buffer, stream, or file region can span multiple handles in the multi-batch case, so a consumer still has to acquire all of them and release all of them exactly once even if one release throws. That's the part ShuffleHandleLease does today. It gets a lot smaller, but I'd keep a thin helper for it rather than copy the loop into the four consumers.

If close() could check that refCount is == 1, throwing in that case, then ok. I don't think this is possible when you get to implementing it => since what we are talking about is that part of the code (whether it is the removal part or the reader) is going to actually release, it implies close() must not close, it must decRefCount.

@amahussein

Copy link
Copy Markdown
Collaborator Author

If close() could check that refCount is == 1, throwing in that case, then ok. I don't think this is possible when you get to implementing it => since what we are talking about is that part of the code (whether it is the removal part or the reader) is going to actually release, it implies close() must not close, it must decRefCount.

Thanks @abellina. On the two points:

1. On materialize.

I looked at leaning on it instead of a handle refcount. It fits the in-memory case cleanly (incRefCount the host buffer, zero-copy) and we could drop our refcount there. It doesn't fit the file-backed case, which is the one this bug hits (repro is FILE_ONLY): materialize is whole-object, and the framework's disk store is sequential and possibly compressed (DiskHandle.materializeToHostMemoryBuffer / withInputWrappedStream, SpillFramework.scala:1251, :1222), so serving a block that way means copying it entirely into host memory per read, instead of the random-access readAt + 64 KB MultiSegmentFileRegion streaming skip-merge relies on. So it's an implementation choice for memory but a design mismatch for disk. Making it work for disk means adding random-access + hold-open-while-reading to the framework's disk handle, which just relocates this deferred close into the framework.

2. On close() doing the decRefCount.

We agree the decrement belongs on the handle (that's releaseRead()); the question is only whether to call it close(). I don't think we can: the spill store calls handle.close() on every tracked handle at shutdown (HandleStore.close, SpillFramework.scala:1349-1356) without holding a ref, so if close() decremented, that shutdown sweep would drop a ref it never took and free the handle under an active reader. So close() stays an idempotent close-request and acquireRead/releaseRead do the inc/dec. It's your refCount == 1 idea, except it defers instead of throwing when readers are active, which is what lets the store/writer/merge call close() without knowing about readers. Happy to rename the methods, just keep them off close().

Signed-off-by: Ahmed Hussein (amahussein) <a@ahussein.me>

Fixes NVIDIA#15018

Reference-count partial shuffle handles on the handle itself so that while managed buffers,
input streams, and Netty file regions are active, unregisterShuffle (and any other close
caller) defers the physical close instead of freeing data under in-flight readers. close()
is an idempotent close-request; acquireRead/releaseRead track active readers and the last
release performs the deferred close.

Release is exception-safe, and an interrupted close while waiting on an in-progress spill
still finishes resource cleanup before restoring the interrupt flag.

Add regression coverage for the handle read-lease lifecycle (deferred close, repeated close
while a lease is held, immediate close with no leases), retained-buffer reads across
concurrent unregisterShuffle, Netty file-region release, and interrupted partial-file cleanup.
@amahussein

Copy link
Copy Markdown
Collaborator Author

build

@amahussein amahussein requested review from abellina and binmahone June 25, 2026 02:44
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

bug Something isn't working

Projects

None yet

Development

Successfully merging this pull request may close these issues.

[BUG] SkipMerge shuffle closes catalog buffers during active reads

3 participants