diff --git a/recipes/full_dpo_distributed.py b/recipes/full_dpo_distributed.py index afad07a245..debbe08f96 100644 --- a/recipes/full_dpo_distributed.py +++ b/recipes/full_dpo_distributed.py @@ -36,6 +36,7 @@ from torchtune.utils import get_world_size_and_rank from tqdm import tqdm +from datetime import timedelta class FullDPORecipeDistributed(FTRecipeInterface): """ @@ -153,8 +154,9 @@ def __init__(self, cfg: DictConfig) -> None: self.distributed_backend = training.get_distributed_backend( cfg.device, offload_ops_to_cpu=True ) - init_process_group(self.distributed_backend) - self._checkpoint_client = CheckpointClient(cfg) + # delay ProcessGroupNCCL.cpp Watchdog caught collective operation timeout to 1 hour + init_process_group(backend=self.distributed_backend, timeout=timedelta(seconds=3600)) + self._checkpoint_client = CheckpointClient(cfg) self.world_size, self.rank = get_world_size_and_rank() self._is_rank_zero = self.rank == 0 diff --git a/recipes/full_finetune_distributed.py b/recipes/full_finetune_distributed.py index e31224142f..c373395264 100644 --- a/recipes/full_finetune_distributed.py +++ b/recipes/full_finetune_distributed.py @@ -49,6 +49,7 @@ from tqdm import tqdm +from datetime import timedelta class FullFinetuneRecipeDistributed(FTRecipeInterface): """ @@ -148,8 +149,9 @@ def __init__(self, cfg: DictConfig) -> None: offload_ops_to_cpu=self.fsdp_cpu_offload or self._enable_async_checkpointing, ) - init_process_group(self.distributed_backend) - + # delay ProcessGroupNCCL.cpp Watchdog caught collective operation timeout to 1 hour + init_process_group(backend=self.distributed_backend, timeout=timedelta(seconds=3600)) + # Initialize distributed variables self.world_size, self.rank = utils.get_world_size_and_rank() self._is_rank_zero = self.rank == 0 diff --git a/recipes/knowledge_distillation_distributed.py b/recipes/knowledge_distillation_distributed.py index 539c6c006c..5eb6c080fc 100644 --- a/recipes/knowledge_distillation_distributed.py +++ b/recipes/knowledge_distillation_distributed.py @@ -39,6 +39,8 @@ from tqdm import tqdm +from datetime import timedelta + class KDRecipeDistributed(FTRecipeInterface): """ @@ -124,8 +126,8 @@ def __init__(self, cfg: DictConfig) -> None: offload_ops_to_cpu=self.fsdp_cpu_offload or self._enable_async_checkpointing, ) - init_process_group(self.distributed_backend) - + # delay ProcessGroupNCCL.cpp Watchdog caught collective operation timeout to 1 hour + init_process_group(backend=self.distributed_backend, timeout=timedelta(seconds=3600)) self.world_size, self.rank = utils.get_world_size_and_rank() self._is_rank_zero = self.rank == 0 diff --git a/recipes/lora_dpo_distributed.py b/recipes/lora_dpo_distributed.py index e101ab4796..838aabed1f 100644 --- a/recipes/lora_dpo_distributed.py +++ b/recipes/lora_dpo_distributed.py @@ -41,6 +41,7 @@ ) from tqdm import tqdm +from datetime import timedelta class LoRADPORecipeDistributed(FTRecipeInterface): """ @@ -143,7 +144,8 @@ def __init__(self, cfg: DictConfig) -> None: cfg.device, offload_ops_to_cpu=True ) - init_process_group(self.distributed_backend) + # delay ProcessGroupNCCL.cpp Watchdog caught collective operation timeout to 1 hour + init_process_group(backend=self.distributed_backend, timeout=timedelta(seconds=3600)) self.world_size, self.rank = utils.get_world_size_and_rank() diff --git a/recipes/lora_finetune_distributed.py b/recipes/lora_finetune_distributed.py index 5e1766ed59..903c6795be 100644 --- a/recipes/lora_finetune_distributed.py +++ b/recipes/lora_finetune_distributed.py @@ -45,6 +45,8 @@ ) from tqdm import tqdm +from datetime import timedelta + class LoRAFinetuneRecipeDistributed(FTRecipeInterface): """ @@ -148,8 +150,9 @@ def __init__(self, cfg: DictConfig) -> None: offload_ops_to_cpu=self.fsdp_cpu_offload or self._enable_async_checkpointing, ) - init_process_group(self.distributed_backend) - + # delay ProcessGroupNCCL.cpp Watchdog caught collective operation timeout to 1 hour + init_process_group(backend=self.distributed_backend, timeout=timedelta(seconds=3600)) + self.world_size, self.rank = utils.get_world_size_and_rank() self._is_rank_zero = self.rank == 0 diff --git a/recipes/qat_distributed.py b/recipes/qat_distributed.py index 03c203cf38..d3c6ff8a81 100644 --- a/recipes/qat_distributed.py +++ b/recipes/qat_distributed.py @@ -44,6 +44,8 @@ from tqdm import tqdm +from datetime import timedelta + class QATRecipeDistributed(FTRecipeInterface): """ @@ -157,7 +159,8 @@ def __init__(self, cfg: DictConfig) -> None: offload_ops_to_cpu=self.fsdp_cpu_offload or self._enable_async_checkpointing, ) - init_process_group(self.distributed_backend) + # delay ProcessGroupNCCL.cpp Watchdog caught collective operation timeout to 1 hour + init_process_group(backend=self.distributed_backend, timeout=timedelta(seconds=3600)) # Initialize distributed variables self.world_size, self.rank = utils.get_world_size_and_rank() diff --git a/recipes/qat_lora_finetune_distributed.py b/recipes/qat_lora_finetune_distributed.py index 7796715526..114f004557 100644 --- a/recipes/qat_lora_finetune_distributed.py +++ b/recipes/qat_lora_finetune_distributed.py @@ -47,6 +47,8 @@ from torchtune.training.quantization import swap_lora_linear_with_qat from tqdm import tqdm +from datetime import timedelta + class QATLoRAFinetuneRecipeDistributed(FTRecipeInterface): """ @@ -156,7 +158,8 @@ def __init__(self, cfg: DictConfig) -> None: offload_ops_to_cpu=self.fsdp_cpu_offload or self._enable_async_checkpointing, ) - init_process_group(self.distributed_backend) + # delay ProcessGroupNCCL.cpp Watchdog caught collective operation timeout to 1 hour + init_process_group(backend=self.distributed_backend, timeout=timedelta(seconds=3600)) self.world_size, self.rank = utils.get_world_size_and_rank()