diff --git a/src/deimkit/engine/data/transforms/_transforms.py b/src/deimkit/engine/data/transforms/_transforms.py index 31588df5..4914418a 100644 --- a/src/deimkit/engine/data/transforms/_transforms.py +++ b/src/deimkit/engine/data/transforms/_transforms.py @@ -70,6 +70,12 @@ def _transform(self, inpt: Any, params: Dict[str, Any]) -> Any: fill = self._fill[type(inpt)] padding = params['padding'] return F.pad(inpt, padding=padding, fill=fill, padding_mode=self.padding_mode) # type: ignore[arg-type] + + # added override for torchvision >=0.21 + def transform(self, inpt: Any, params: Dict[str, Any]) -> Any: + fill = self._fill[type(inpt)] + padding = params['padding'] + return F.pad(inpt, padding=padding, fill=fill, padding_mode=self.padding_mode) # type: ignore[arg-type] def __call__(self, *inputs: Any) -> Any: outputs = super().forward(*inputs) @@ -113,6 +119,19 @@ def _transform(self, inpt: Any, params: Dict[str, Any]) -> Any: return inpt + # added override for torchvision >=0.21 + def transform(self, inpt: Any, params: Dict[str, Any]) -> Any: + spatial_size = getattr(inpt, _boxes_keys[1]) + if self.fmt: + in_fmt = inpt.format.value.lower() + inpt = torchvision.ops.box_convert(inpt, in_fmt=in_fmt, out_fmt=self.fmt.lower()) + inpt = convert_to_tv_tensor(inpt, key='boxes', box_format=self.fmt.upper(), spatial_size=spatial_size) + + if self.normalize: + inpt = inpt / torch.tensor(spatial_size[::-1]).tile(2)[None] + + return inpt + @register() class ConvertPILImage(T.Transform): @@ -135,3 +154,16 @@ def _transform(self, inpt: Any, params: Dict[str, Any]) -> Any: inpt = Image(inpt) return inpt + + # added override for torchvision >=0.21 + def transform(self, inpt: Any, params: Dict[str, Any]) -> Any: + inpt = F.pil_to_tensor(inpt) + if self.dtype == 'float32': + inpt = inpt.float() + + if self.scale: + inpt = inpt / 255. + + inpt = Image(inpt) + + return inpt diff --git a/src/deimkit/trainer.py b/src/deimkit/trainer.py index 9fc96cb5..1ff8b1f5 100644 --- a/src/deimkit/trainer.py +++ b/src/deimkit/trainer.py @@ -11,6 +11,7 @@ from .engine.optim.lr_scheduler import FlatCosineLRScheduler from .engine.solver import TASKS from .engine.solver.det_engine import evaluate, train_one_epoch +from .engine.misc import dist_utils class Trainer: @@ -44,7 +45,7 @@ def __init__(self, config: Config): self.output_dir = None self.last_epoch = -1 - self.distributed_initialized = False + self.distributed_initialized = False # Initialize process group early self._init_process_group() @@ -54,50 +55,65 @@ def _init_process_group(self) -> None: if self.distributed_initialized: return - logger.info("Initializing process group for single-process training") - - # Set environment variables for distributed training - os.environ["WORLD_SIZE"] = "1" - os.environ["RANK"] = "0" - os.environ["LOCAL_RANK"] = "0" - os.environ["MASTER_ADDR"] = "127.0.0.1" # Required for env:// initialization - os.environ["MASTER_PORT"] = "29500" # Required for env:// initialization - - # Initialize process group - if not torch.distributed.is_initialized(): - try: - # Use file:// initialization which is more reliable for single-process - torch.distributed.init_process_group( - backend="gloo", - init_method="tcp://127.0.0.1:29500", - world_size=1, - rank=0, - ) - logger.info("Process group initialized successfully") - except Exception as e: - logger.warning(f"Failed to initialize process group: {e}") + # Script executed without torchrun + if "TORCHELASTIC_RUN_ID" not in os.environ: - # Try an alternative approach using file store - try: - logger.info("Trying alternative initialization approach") - import tempfile + logger.info("Initializing process group for single-process training") - temp_dir = tempfile.mkdtemp() - file_path = os.path.join(temp_dir, "shared_file") + # Set environment variables for distributed training + os.environ["WORLD_SIZE"] = "1" + os.environ["RANK"] = "0" + os.environ["LOCAL_RANK"] = "0" + os.environ["MASTER_ADDR"] = "127.0.0.1" # Required for env:// initialization + os.environ["MASTER_PORT"] = "29500" # Required for env:// initialization + + if not torch.distributed.is_initialized(): - store = torch.distributed.FileStore(file_path, 1) + try: + # Use file:// initialization which is more reliable for single-process torch.distributed.init_process_group( - backend="gloo", store=store, rank=0, world_size=1 + backend="gloo", + init_method="tcp://127.0.0.1:29500", + world_size=1, + rank=0, ) - logger.info("Process group initialized successfully with FileStore") - except Exception as e2: - logger.error(f"All initialization attempts failed: {e2}") + logger.info("Process group initialized successfully") + except Exception as e: + logger.warning(f"Failed to initialize process group: {e}") + + # Try an alternative approach using file store + try: + logger.info("Trying alternative initialization approach") + import tempfile + + temp_dir = tempfile.mkdtemp() + file_path = os.path.join(temp_dir, "shared_file") + + store = torch.distributed.FileStore(file_path, 1) + torch.distributed.init_process_group( + backend="gloo", store=store, rank=0, world_size=1 + ) + logger.info("Process group initialized successfully with FileStore") + except Exception as e2: + logger.error(f"All initialization attempts failed: {e2}") + + # Last resort: monkey patch torch.distributed + logger.warning("Using monkey patching as last resort") + self._monkey_patch_distributed() + + self.distributed_initialized = True + + # Script executed with torchrun + else: + + logger.info(f"Initializing process group for multi-process training") + self.distributed_initialized = dist_utils.setup_distributed() - # Last resort: monkey patch torch.distributed - logger.warning("Using monkey patching as last resort") - self._monkey_patch_distributed() + rank = torch.distributed.get_rank() + if rank != 0: + logger.remove() - self.distributed_initialized = True + logger.info(f"Distributed initialization successful: {self.distributed_initialized}") def _monkey_patch_distributed(self): """Monkey patch torch.distributed functions as a last resort."""