Skip to content

Conversation

@zhengchenyu
Copy link
Contributor

@zhengchenyu zhengchenyu commented Jul 8, 2025

What changes were proposed in this pull request?

If there is a save_to_storage operation before, save_to_men needs to wait for the last save_to_storage to complete.

Why are the changes needed?

If the frequency of save_to_memory is high enough that the next save_to_memory is triggered before save_to_stoarge is completed, save_to_stoarge will fail. The error is as follows:

[2025-07-08 11:24:07,066] [ERROR] [ckpt_saver.py:702:_save_shard] The step 100 in event is no equal to step 101 in memory.

Since it is recommended to use megatron checkpoint directly in #1529, the modifications related to megatron checkpoint are ignored here.

Does this PR introduce any user-facing change?

No

How was this patch tested?

unittest, real ddp job.

@zhengchenyu zhengchenyu changed the title Avoid save_to_memory running too fast then causing save_to_storage to fail Avoid save_to_memory running too fast then causing save_to_storage to fail. Jul 8, 2025
@codecov
Copy link

codecov bot commented Jul 8, 2025

Codecov Report

❌ Patch coverage is 13.04348% with 20 lines in your changes missing coverage. Please review.
✅ Project coverage is 19.86%. Comparing base (935f268) to head (6bd9397).
⚠️ Report is 73 commits behind head on master.

Files with missing lines Patch % Lines
...trainer/torch/flash_checkpoint/deepspeed_engine.py 0.00% 5 Missing ⚠️
...over/trainer/torch/flash_checkpoint/fsdp_engine.py 0.00% 5 Missing ⚠️
...trainer/torch/flash_checkpoint/full_ckpt_engine.py 0.00% 5 Missing ⚠️
dlrover/trainer/torch/flash_checkpoint/engine.py 0.00% 3 Missing ⚠️
dlrover/python/elastic_agent/torch/ckpt_saver.py 60.00% 2 Missing ⚠️

❌ Your patch status has failed because the patch coverage (13.04%) is below the target coverage (90.00%). You can increase the patch coverage or adjust the target coverage.
❌ Your project status has failed because the head coverage (19.86%) is below the target coverage (83.00%). You can increase the head coverage or adjust the target coverage.
❌ Your project status has failed because you have indirect coverage changes. Learn more about Unexpected Coverage Changes and reasons for indirect coverage changes.

❗ There is a different number of reports uploaded between BASE (935f268) and HEAD (6bd9397). Click for more details.

HEAD has 4 uploads less than BASE
Flag BASE (935f268) HEAD (6bd9397)
5 1
Additional details and impacted files
@@             Coverage Diff             @@
##           master    #1585       +/-   ##
===========================================
- Coverage   84.03%   19.86%   -64.17%     
===========================================
  Files         285      285               
  Lines       29847    29870       +23     
===========================================
- Hits        25082     5934    -19148     
- Misses       4765    23936    +19171     

☔ View full report in Codecov by Sentry.
📢 Have feedback on the report? Share it here.

@zhengchenyu zhengchenyu marked this pull request as draft July 8, 2025 10:17
@zhengchenyu zhengchenyu marked this pull request as ready for review July 8, 2025 10:43
@BalaBalaYi BalaBalaYi added the enhancement New feature or request label Jul 16, 2025
create=False,
)
self._notify_queue = SharedQueue(
name=CheckpointSharedObjPrefix.SAVE_STEP_QNAME
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

implement this magic concatenation as a method of ckpt_saver

if self._local_rank != self.local_shard_id:
return False

if self._checkpoint_event_step > 0:
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

move this part into parent class(engine)

"""
if self._checkpoint_event_step > 0:
notify_event = self._notify_queue.get()
assert notify_event.step == self._checkpoint_event_step
Copy link
Collaborator

@BalaBalaYi BalaBalaYi Jul 16, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what's ur intention if assert is false?
In my personal opinion, this situation should be handled softly, such as skipping this step and providing a log explanation. So the previous implementation that directly failed didn't have too many issues. At most, it was just not easy to understand why it was 'not equal' the log express.

@BalaBalaYi
Copy link
Collaborator

If this part involves code changes, it is still necessary to make all the changes comprehensively, even though #1529.

self._event_queue.put(event)
self._checkpoint_event_step = step
if success:
self.latest_step = step
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

All ranks should expect a notify to drain their local shard queue

        if self._local_rank == 0 and success:
            event = CheckpointEvent(type=CheckpointEventType.SAVE, step=step)
            self._event_queue.put(event)
        if success:
            self._checkpoint_event_step = step
            self.latest_step = step

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

enhancement New feature or request

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants