Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
32 changes: 32 additions & 0 deletions src/deimkit/engine/data/transforms/_transforms.py
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,12 @@ def _transform(self, inpt: Any, params: Dict[str, Any]) -> Any:
fill = self._fill[type(inpt)]
padding = params['padding']
return F.pad(inpt, padding=padding, fill=fill, padding_mode=self.padding_mode) # type: ignore[arg-type]

# added override for torchvision >=0.21
def transform(self, inpt: Any, params: Dict[str, Any]) -> Any:
fill = self._fill[type(inpt)]
padding = params['padding']
return F.pad(inpt, padding=padding, fill=fill, padding_mode=self.padding_mode) # type: ignore[arg-type]

def __call__(self, *inputs: Any) -> Any:
outputs = super().forward(*inputs)
Expand Down Expand Up @@ -113,6 +119,19 @@ def _transform(self, inpt: Any, params: Dict[str, Any]) -> Any:

return inpt

# added override for torchvision >=0.21
def transform(self, inpt: Any, params: Dict[str, Any]) -> Any:
spatial_size = getattr(inpt, _boxes_keys[1])
if self.fmt:
in_fmt = inpt.format.value.lower()
inpt = torchvision.ops.box_convert(inpt, in_fmt=in_fmt, out_fmt=self.fmt.lower())
inpt = convert_to_tv_tensor(inpt, key='boxes', box_format=self.fmt.upper(), spatial_size=spatial_size)

if self.normalize:
inpt = inpt / torch.tensor(spatial_size[::-1]).tile(2)[None]

return inpt


@register()
class ConvertPILImage(T.Transform):
Expand All @@ -135,3 +154,16 @@ def _transform(self, inpt: Any, params: Dict[str, Any]) -> Any:
inpt = Image(inpt)

return inpt

# added override for torchvision >=0.21
def transform(self, inpt: Any, params: Dict[str, Any]) -> Any:
inpt = F.pil_to_tensor(inpt)
if self.dtype == 'float32':
inpt = inpt.float()

if self.scale:
inpt = inpt / 255.

inpt = Image(inpt)

return inpt
92 changes: 54 additions & 38 deletions src/deimkit/trainer.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
from .engine.optim.lr_scheduler import FlatCosineLRScheduler
from .engine.solver import TASKS
from .engine.solver.det_engine import evaluate, train_one_epoch
from .engine.misc import dist_utils


class Trainer:
Expand Down Expand Up @@ -44,7 +45,7 @@ def __init__(self, config: Config):
self.output_dir = None
self.last_epoch = -1

self.distributed_initialized = False
self.distributed_initialized = False

# Initialize process group early
self._init_process_group()
Expand All @@ -54,50 +55,65 @@ def _init_process_group(self) -> None:
if self.distributed_initialized:
return

logger.info("Initializing process group for single-process training")

# Set environment variables for distributed training
os.environ["WORLD_SIZE"] = "1"
os.environ["RANK"] = "0"
os.environ["LOCAL_RANK"] = "0"
os.environ["MASTER_ADDR"] = "127.0.0.1" # Required for env:// initialization
os.environ["MASTER_PORT"] = "29500" # Required for env:// initialization

# Initialize process group
if not torch.distributed.is_initialized():
try:
# Use file:// initialization which is more reliable for single-process
torch.distributed.init_process_group(
backend="gloo",
init_method="tcp://127.0.0.1:29500",
world_size=1,
rank=0,
)
logger.info("Process group initialized successfully")
except Exception as e:
logger.warning(f"Failed to initialize process group: {e}")
# Script executed without torchrun
if "TORCHELASTIC_RUN_ID" not in os.environ:

# Try an alternative approach using file store
try:
logger.info("Trying alternative initialization approach")
import tempfile
logger.info("Initializing process group for single-process training")

temp_dir = tempfile.mkdtemp()
file_path = os.path.join(temp_dir, "shared_file")
# Set environment variables for distributed training
os.environ["WORLD_SIZE"] = "1"
os.environ["RANK"] = "0"
os.environ["LOCAL_RANK"] = "0"
os.environ["MASTER_ADDR"] = "127.0.0.1" # Required for env:// initialization
os.environ["MASTER_PORT"] = "29500" # Required for env:// initialization

if not torch.distributed.is_initialized():

store = torch.distributed.FileStore(file_path, 1)
try:
# Use file:// initialization which is more reliable for single-process
torch.distributed.init_process_group(
backend="gloo", store=store, rank=0, world_size=1
backend="gloo",
init_method="tcp://127.0.0.1:29500",
world_size=1,
rank=0,
)
logger.info("Process group initialized successfully with FileStore")
except Exception as e2:
logger.error(f"All initialization attempts failed: {e2}")
logger.info("Process group initialized successfully")
except Exception as e:
logger.warning(f"Failed to initialize process group: {e}")

# Try an alternative approach using file store
try:
logger.info("Trying alternative initialization approach")
import tempfile

temp_dir = tempfile.mkdtemp()
file_path = os.path.join(temp_dir, "shared_file")

store = torch.distributed.FileStore(file_path, 1)
torch.distributed.init_process_group(
backend="gloo", store=store, rank=0, world_size=1
)
logger.info("Process group initialized successfully with FileStore")
except Exception as e2:
logger.error(f"All initialization attempts failed: {e2}")

# Last resort: monkey patch torch.distributed
logger.warning("Using monkey patching as last resort")
self._monkey_patch_distributed()

self.distributed_initialized = True

# Script executed with torchrun
else:

logger.info(f"Initializing process group for multi-process training")
self.distributed_initialized = dist_utils.setup_distributed()

# Last resort: monkey patch torch.distributed
logger.warning("Using monkey patching as last resort")
self._monkey_patch_distributed()
rank = torch.distributed.get_rank()
if rank != 0:
logger.remove()

self.distributed_initialized = True
logger.info(f"Distributed initialization successful: {self.distributed_initialized}")

def _monkey_patch_distributed(self):
"""Monkey patch torch.distributed functions as a last resort."""
Expand Down