From c40959bf9df280fc852795aa78f84c37ecc40a7f Mon Sep 17 00:00:00 2001 From: cmgzn Date: Thu, 14 May 2026 17:51:37 +0800 Subject: [PATCH 1/2] fix(checkpoint): resolve relative path issue in Ray checkpoint writer Ray's write_parquet is executed by Ray workers whose working directory may differ from the main process. When checkpoint_dir or work_dir is a relative path (e.g., './tmp/...'), Ray workers cannot resolve it correctly, resulting in empty checkpoint directories. Fix: convert work_dir and checkpoint_dir to absolute paths during PartitionedRayExecutor initialization. --- data_juicer/core/executor/ray_executor_partitioned.py | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/data_juicer/core/executor/ray_executor_partitioned.py b/data_juicer/core/executor/ray_executor_partitioned.py index c9bb55b50d..133d0f82b2 100644 --- a/data_juicer/core/executor/ray_executor_partitioned.py +++ b/data_juicer/core/executor/ray_executor_partitioned.py @@ -155,7 +155,7 @@ def __init__(self, cfg: Optional[Namespace] = None): super().__init__(cfg) self.executor_type = "ray_partitioned" - self.work_dir = self.cfg.work_dir + self.work_dir = os.path.abspath(self.cfg.work_dir) self.job_id = self.cfg.get("job_id", None) # Initialize temporary directory for Ray operations @@ -177,7 +177,9 @@ def __init__(self, cfg: Optional[Namespace] = None): # Checkpoint configuration and manager initialization checkpoint_cfg = getattr(self.cfg, "checkpoint", None) - checkpoint_dir = getattr(self.cfg, "checkpoint_dir", os.path.join(self.work_dir, "checkpoints")) + checkpoint_dir = os.path.abspath( + getattr(self.cfg, "checkpoint_dir", os.path.join(self.work_dir, "checkpoints")) + ) if checkpoint_cfg: # Use ConfigAccessor to handle both dict and object configurations From b803ba087042cdb86d3e5fffa226c8c7a59afe95 Mon Sep 17 00:00:00 2001 From: MeiXin Chen <85746275+cmgzn@users.noreply.github.com> Date: Thu, 14 May 2026 18:02:51 +0800 Subject: [PATCH 2/2] Apply suggestions from code review Co-authored-by: gemini-code-assist[bot] <176961590+gemini-code-assist[bot]@users.noreply.github.com> --- data_juicer/core/executor/ray_executor_partitioned.py | 10 ++++++---- 1 file changed, 6 insertions(+), 4 deletions(-) diff --git a/data_juicer/core/executor/ray_executor_partitioned.py b/data_juicer/core/executor/ray_executor_partitioned.py index 133d0f82b2..0a0ca90557 100644 --- a/data_juicer/core/executor/ray_executor_partitioned.py +++ b/data_juicer/core/executor/ray_executor_partitioned.py @@ -155,7 +155,9 @@ def __init__(self, cfg: Optional[Namespace] = None): super().__init__(cfg) self.executor_type = "ray_partitioned" - self.work_dir = os.path.abspath(self.cfg.work_dir) + self.work_dir = self.cfg.work_dir + if self.work_dir and "://" not in self.work_dir: + self.work_dir = os.path.abspath(self.work_dir) self.job_id = self.cfg.get("job_id", None) # Initialize temporary directory for Ray operations @@ -177,9 +179,9 @@ def __init__(self, cfg: Optional[Namespace] = None): # Checkpoint configuration and manager initialization checkpoint_cfg = getattr(self.cfg, "checkpoint", None) - checkpoint_dir = os.path.abspath( - getattr(self.cfg, "checkpoint_dir", os.path.join(self.work_dir, "checkpoints")) - ) + checkpoint_dir = getattr(self.cfg, "checkpoint_dir", os.path.join(self.work_dir, "checkpoints")) + if checkpoint_dir and "://" not in checkpoint_dir: + checkpoint_dir = os.path.abspath(checkpoint_dir) if checkpoint_cfg: # Use ConfigAccessor to handle both dict and object configurations