Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 14 additions & 1 deletion recipe/fully_async_policy/fsdp_workers.py
Original file line number Diff line number Diff line change
Expand Up @@ -135,8 +135,11 @@ def sync_rollout_weights_by_checkpoint(self, sync_group_name="actor_rollout"):
assert (self._is_actor or self._is_rollout) and not self.config.hybrid_engine
assert hasattr(self, "_weights_info") and self._weights_info is not None

# Load model to GPU
load_start_time = time.time()
if self._is_actor and self._is_offload_param:
load_fsdp_model_to_gpu(self.actor_module_fsdp)
load_duration = time.time() - load_start_time
Comment on lines +139 to +142
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

high

The current implementation for measuring load_duration is imprecise. When self._is_actor and self._is_offload_param is false, it measures the time for the conditional check, resulting in a small non-zero value instead of zero. When true, it includes the overhead of the check. To measure the duration accurately, the timing logic should be contained entirely within the conditional block, and load_duration should be initialized to 0.0.

Suggested change
load_start_time = time.time()
if self._is_actor and self._is_offload_param:
load_fsdp_model_to_gpu(self.actor_module_fsdp)
load_duration = time.time() - load_start_time
load_duration = 0.0
if self._is_actor and self._is_offload_param:
load_start_time = time.time()
load_fsdp_model_to_gpu(self.actor_module_fsdp)
load_duration = time.time() - load_start_time


from ray.util.collective import collective

Expand Down Expand Up @@ -172,14 +175,24 @@ def sync_rollout_weights_by_checkpoint(self, sync_group_name="actor_rollout"):
update_end_time = time.time()
update_duration = update_end_time - update_start_time

collective.barrier(group_name=sync_group_name)
offload_start_time = time.time()
if self._is_actor and self._is_offload_param:
offload_fsdp_model_to_cpu(self.actor_module_fsdp)
offload_duration = time.time() - offload_start_time
Comment on lines +178 to +181
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

high

Similar to load_duration, the measurement for offload_duration is imprecise. It should be calculated only when the offload operation is performed to avoid including the conditional check overhead and to ensure it is zero when no offload occurs.

Suggested change
offload_start_time = time.time()
if self._is_actor and self._is_offload_param:
offload_fsdp_model_to_cpu(self.actor_module_fsdp)
offload_duration = time.time() - offload_start_time
offload_duration = 0.0
if self._is_actor and self._is_offload_param:
offload_start_time = time.time()
offload_fsdp_model_to_cpu(self.actor_module_fsdp)
offload_duration = time.time() - offload_start_time


print(
f"sync_rollout_weights_by_checkpoint finish!, rank:{torch.distributed.get_rank()},"
f" is_actor:{self._is_actor}, is_rollout:{self._is_rollout},"
f" total cost:{update_end_time - cache_start_time} seconds, while cache cost {cache_duration} seconds, "
f" register cost {register_duration} seconds, update cost {update_duration} seconds"
)

if self._is_actor and self._is_offload_param:
print(
f"sync_rollout_weights_by_checkpoint load model to gpu cost {load_duration} seconds,"
f" offload model to cpu cost {offload_duration} seconds"
)


class DetachActorWorker(DetachNcclSync):
def _get_actor_params(self):
Expand Down
17 changes: 17 additions & 0 deletions recipe/fully_async_policy/megatron_worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -140,6 +140,12 @@ def sync_rollout_weights_by_checkpoint(self, sync_group_name="actor_rollout"):
assert (self._is_actor or self._is_rollout) and not self.config.hybrid_engine
assert hasattr(self, "_weights_info") and self._weights_info is not None

# Load model to GPU
load_start_time = time.time()
if self._is_actor and self._is_offload_param:
load_megatron_model_to_gpu(self.actor_module)
load_duration = time.time() - load_start_time
Comment on lines +144 to +147
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

high

The measurement for load_duration is imprecise. It includes the overhead of the conditional check and will not be zero if the condition is false. For accurate profiling, the timing should only wrap the load_megatron_model_to_gpu call, and load_duration should be initialized to 0.0.

Suggested change
load_start_time = time.time()
if self._is_actor and self._is_offload_param:
load_megatron_model_to_gpu(self.actor_module)
load_duration = time.time() - load_start_time
load_duration = 0.0
if self._is_actor and self._is_offload_param:
load_start_time = time.time()
load_megatron_model_to_gpu(self.actor_module)
load_duration = time.time() - load_start_time


from ray.util.collective import collective

# Cache actor weights to CPU and measure the time taken
Expand Down Expand Up @@ -174,13 +180,24 @@ def sync_rollout_weights_by_checkpoint(self, sync_group_name="actor_rollout"):
update_end_time = time.time()
update_duration = update_end_time - update_start_time

offload_start_time = time.time()
if self._is_actor and self._is_offload_param:
offload_megatron_model_to_cpu(self.actor_module)
offload_duration = time.time() - offload_start_time
Comment on lines +183 to +186
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

high

The measurement for offload_duration is imprecise for the same reasons as load_duration. To ensure correctness, the timing logic should be moved inside the conditional block.

Suggested change
offload_start_time = time.time()
if self._is_actor and self._is_offload_param:
offload_megatron_model_to_cpu(self.actor_module)
offload_duration = time.time() - offload_start_time
offload_duration = 0.0
if self._is_actor and self._is_offload_param:
offload_start_time = time.time()
offload_megatron_model_to_cpu(self.actor_module)
offload_duration = time.time() - offload_start_time


print(
f"sync_rollout_weights_by_checkpoint finish!, rank:{torch.distributed.get_rank()},"
f" is_actor:{self._is_actor}, is_rollout:{self._is_rollout},"
f" total cost:{update_end_time - cache_start_time} seconds, while cache cost {cache_duration} seconds, "
f" register cost {register_duration} seconds, update cost {update_duration} seconds"
)

if self._is_actor and self._is_offload_param:
print(
f"sync_rollout_weights_by_checkpoint load model to gpu cost {load_duration} seconds,"
f" offload model to cpu cost {offload_duration} seconds"
)


class DetachActorWorker(DetachNcclSync):
def _get_actor_params_generator(self):
Expand Down
Loading