From 3249d2e479db84d0a16e8ff90c559b22e23d5e0f Mon Sep 17 00:00:00 2001 From: EnriqueGlv Date: Fri, 14 Mar 2025 14:37:56 +0100 Subject: [PATCH 1/4] Added torchvision.transforms.v2.Transform.transform overrides to enable torchvision>=0.21 support --- .../engine/data/transforms/_transforms.py | 32 +++++++++++++++++++ 1 file changed, 32 insertions(+) diff --git a/src/deimkit/engine/data/transforms/_transforms.py b/src/deimkit/engine/data/transforms/_transforms.py index 31588df5..4914418a 100644 --- a/src/deimkit/engine/data/transforms/_transforms.py +++ b/src/deimkit/engine/data/transforms/_transforms.py @@ -70,6 +70,12 @@ def _transform(self, inpt: Any, params: Dict[str, Any]) -> Any: fill = self._fill[type(inpt)] padding = params['padding'] return F.pad(inpt, padding=padding, fill=fill, padding_mode=self.padding_mode) # type: ignore[arg-type] + + # added override for torchvision >=0.21 + def transform(self, inpt: Any, params: Dict[str, Any]) -> Any: + fill = self._fill[type(inpt)] + padding = params['padding'] + return F.pad(inpt, padding=padding, fill=fill, padding_mode=self.padding_mode) # type: ignore[arg-type] def __call__(self, *inputs: Any) -> Any: outputs = super().forward(*inputs) @@ -113,6 +119,19 @@ def _transform(self, inpt: Any, params: Dict[str, Any]) -> Any: return inpt + # added override for torchvision >=0.21 + def transform(self, inpt: Any, params: Dict[str, Any]) -> Any: + spatial_size = getattr(inpt, _boxes_keys[1]) + if self.fmt: + in_fmt = inpt.format.value.lower() + inpt = torchvision.ops.box_convert(inpt, in_fmt=in_fmt, out_fmt=self.fmt.lower()) + inpt = convert_to_tv_tensor(inpt, key='boxes', box_format=self.fmt.upper(), spatial_size=spatial_size) + + if self.normalize: + inpt = inpt / torch.tensor(spatial_size[::-1]).tile(2)[None] + + return inpt + @register() class ConvertPILImage(T.Transform): @@ -135,3 +154,16 @@ def _transform(self, inpt: Any, params: Dict[str, Any]) -> Any: inpt = Image(inpt) return inpt + + # added override for torchvision >=0.21 + def transform(self, inpt: Any, params: Dict[str, Any]) -> Any: + inpt = F.pil_to_tensor(inpt) + if self.dtype == 'float32': + inpt = inpt.float() + + if self.scale: + inpt = inpt / 255. + + inpt = Image(inpt) + + return inpt From 9e390c177d042ec9032e8aaee642520671e56607 Mon Sep 17 00:00:00 2001 From: EnriqueGlv Date: Tue, 18 Mar 2025 17:07:21 +0100 Subject: [PATCH 2/4] Fixed distributed training for Multi-GPU support --- src/deimkit/trainer.py | 53 +++++++----------------------------------- 1 file changed, 9 insertions(+), 44 deletions(-) diff --git a/src/deimkit/trainer.py b/src/deimkit/trainer.py index 9fc96cb5..e5288deb 100644 --- a/src/deimkit/trainer.py +++ b/src/deimkit/trainer.py @@ -11,6 +11,7 @@ from .engine.optim.lr_scheduler import FlatCosineLRScheduler from .engine.solver import TASKS from .engine.solver.det_engine import evaluate, train_one_epoch +from .engine.misc import dist_utils class Trainer: @@ -54,50 +55,14 @@ def _init_process_group(self) -> None: if self.distributed_initialized: return - logger.info("Initializing process group for single-process training") - - # Set environment variables for distributed training - os.environ["WORLD_SIZE"] = "1" - os.environ["RANK"] = "0" - os.environ["LOCAL_RANK"] = "0" - os.environ["MASTER_ADDR"] = "127.0.0.1" # Required for env:// initialization - os.environ["MASTER_PORT"] = "29500" # Required for env:// initialization - - # Initialize process group - if not torch.distributed.is_initialized(): - try: - # Use file:// initialization which is more reliable for single-process - torch.distributed.init_process_group( - backend="gloo", - init_method="tcp://127.0.0.1:29500", - world_size=1, - rank=0, - ) - logger.info("Process group initialized successfully") - except Exception as e: - logger.warning(f"Failed to initialize process group: {e}") - - # Try an alternative approach using file store - try: - logger.info("Trying alternative initialization approach") - import tempfile - - temp_dir = tempfile.mkdtemp() - file_path = os.path.join(temp_dir, "shared_file") - - store = torch.distributed.FileStore(file_path, 1) - torch.distributed.init_process_group( - backend="gloo", store=store, rank=0, world_size=1 - ) - logger.info("Process group initialized successfully with FileStore") - except Exception as e2: - logger.error(f"All initialization attempts failed: {e2}") - - # Last resort: monkey patch torch.distributed - logger.warning("Using monkey patching as last resort") - self._monkey_patch_distributed() - - self.distributed_initialized = True + rank = torch.distributed.get_rank() + if rank != 0: + logger.remove() + + logger.info(f"Initializing process group for multi-process training") + self.distributed_initialized = dist_utils.setup_distributed() + + logger.info(f"Distributed initialization successful: {self.distributed_initialized}") def _monkey_patch_distributed(self): """Monkey patch torch.distributed functions as a last resort.""" From 620b4612048001382c6de41cb7431e4a7b9f603d Mon Sep 17 00:00:00 2001 From: EnriqueGlv Date: Wed, 19 Mar 2025 09:50:15 +0100 Subject: [PATCH 3/4] Get rank after torch.distributed initialization --- src/deimkit/trainer.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/src/deimkit/trainer.py b/src/deimkit/trainer.py index e5288deb..5a880a12 100644 --- a/src/deimkit/trainer.py +++ b/src/deimkit/trainer.py @@ -55,13 +55,13 @@ def _init_process_group(self) -> None: if self.distributed_initialized: return + logger.info(f"Initializing process group for multi-process training") + self.distributed_initialized = dist_utils.setup_distributed() + rank = torch.distributed.get_rank() if rank != 0: logger.remove() - logger.info(f"Initializing process group for multi-process training") - self.distributed_initialized = dist_utils.setup_distributed() - logger.info(f"Distributed initialization successful: {self.distributed_initialized}") def _monkey_patch_distributed(self): From 82306ae955ff5b5a39084095fd00b443edf4f78c Mon Sep 17 00:00:00 2001 From: EnriqueGlv Date: Wed, 19 Mar 2025 10:39:00 +0100 Subject: [PATCH 4/4] Added back support for running without torchrun --- src/deimkit/trainer.py | 65 +++++++++++++++++++++++++++++++++++++----- 1 file changed, 58 insertions(+), 7 deletions(-) diff --git a/src/deimkit/trainer.py b/src/deimkit/trainer.py index 5a880a12..1ff8b1f5 100644 --- a/src/deimkit/trainer.py +++ b/src/deimkit/trainer.py @@ -45,7 +45,7 @@ def __init__(self, config: Config): self.output_dir = None self.last_epoch = -1 - self.distributed_initialized = False + self.distributed_initialized = False # Initialize process group early self._init_process_group() @@ -55,14 +55,65 @@ def _init_process_group(self) -> None: if self.distributed_initialized: return - logger.info(f"Initializing process group for multi-process training") - self.distributed_initialized = dist_utils.setup_distributed() + # Script executed without torchrun + if "TORCHELASTIC_RUN_ID" not in os.environ: + + logger.info("Initializing process group for single-process training") + + # Set environment variables for distributed training + os.environ["WORLD_SIZE"] = "1" + os.environ["RANK"] = "0" + os.environ["LOCAL_RANK"] = "0" + os.environ["MASTER_ADDR"] = "127.0.0.1" # Required for env:// initialization + os.environ["MASTER_PORT"] = "29500" # Required for env:// initialization + + if not torch.distributed.is_initialized(): + + try: + # Use file:// initialization which is more reliable for single-process + torch.distributed.init_process_group( + backend="gloo", + init_method="tcp://127.0.0.1:29500", + world_size=1, + rank=0, + ) + logger.info("Process group initialized successfully") + except Exception as e: + logger.warning(f"Failed to initialize process group: {e}") + + # Try an alternative approach using file store + try: + logger.info("Trying alternative initialization approach") + import tempfile + + temp_dir = tempfile.mkdtemp() + file_path = os.path.join(temp_dir, "shared_file") + + store = torch.distributed.FileStore(file_path, 1) + torch.distributed.init_process_group( + backend="gloo", store=store, rank=0, world_size=1 + ) + logger.info("Process group initialized successfully with FileStore") + except Exception as e2: + logger.error(f"All initialization attempts failed: {e2}") + + # Last resort: monkey patch torch.distributed + logger.warning("Using monkey patching as last resort") + self._monkey_patch_distributed() + + self.distributed_initialized = True + + # Script executed with torchrun + else: + + logger.info(f"Initializing process group for multi-process training") + self.distributed_initialized = dist_utils.setup_distributed() - rank = torch.distributed.get_rank() - if rank != 0: - logger.remove() + rank = torch.distributed.get_rank() + if rank != 0: + logger.remove() - logger.info(f"Distributed initialization successful: {self.distributed_initialized}") + logger.info(f"Distributed initialization successful: {self.distributed_initialized}") def _monkey_patch_distributed(self): """Monkey patch torch.distributed functions as a last resort."""