Skip to content

Commit b0b713f

Browse files
authored
Add the background and foreground mode for runners (flagos-ai#1134)
### PR Category <!-- One of [ Train | Inference | Compress | Serve | RL | Core | Hardware | CICD | Tools | Others ] --> Core ### PR Types <!-- One of [ User Experience | New Features | Bug Fixes | Improvements | Performance | Breaking Change| Deprecations | Test Case | Docs | Others ] --> Improvements ### PR Description <!-- Describe what you’ve done --> This pull request refactors how test/background execution is handled in the `flagscale` runner and its backends. The main change is the removal of the `with_test` parameter in favor of a unified `background` flag, simplifying script generation and making execution behavior more consistent across different backends. Additionally, the foreground execution logic is improved to use `tee` for real-time output streaming and better error handling. Key changes include: **Unified Execution Control and API Simplification:** - Removed the `with_test` parameter from all backend `generate_run_script` methods and replaced it with a single `background` parameter, standardizing how foreground/background execution is specified throughout the codebase. [[1]](diffhunk://#diff-bfd13ebd35f064c032620db5685d478276b27ebfbb251bc2062ae00f151e0e20L139-R139) [[2]](diffhunk://#diff-631edcb28a8f12e6d60166e341b21846774b505e6413603fa05b2c7a42e44b25L39-R39) [[3]](diffhunk://#diff-4dc79b3be860d48c2997c3532bd0d494700d921cb5322aa1a37fdd9f9d73d728L74-R74) [[4]](diffhunk://#diff-6016265db35be235693fbe7e8fd21914c73f72a71c2d136f5cdc756eaa9d9302L150-R150) [[5]](diffhunk://#diff-ba3e05e32ab54259c27405787a43efee535c22c1ff26413775c1fbe50c5ff0f7L152-R152) [[6]](diffhunk://#diff-be8844e4e44f2a3baf2831150b013f9cba6051a21f4ded2adcd33de2b187bf40L63-R63) [[7]](diffhunk://#diff-5bca94ed0f2b711b68873de0fc56e2e67a70e0da0c305c3b4eb37cdcb9bc8965L370-R370) [[8]](diffhunk://#diff-d8e24839736a7e8e228cd881f20818f23584d1b8ca510f318653403d23421ee3L36-R36) **Improved Foreground Execution Output Handling:** - Changed foreground execution in generated run scripts to use `set -o pipefail` and pipe output through `tee -a` to the log file, ensuring real-time output streaming and improved error propagation. [[1]](diffhunk://#diff-bfd13ebd35f064c032620db5685d478276b27ebfbb251bc2062ae00f151e0e20L196-R202) [[2]](diffhunk://#diff-631edcb28a8f12e6d60166e341b21846774b505e6413603fa05b2c7a42e44b25L111-R116) [[3]](diffhunk://#diff-4dc79b3be860d48c2997c3532bd0d494700d921cb5322aa1a37fdd9f9d73d728L116-R122) [[4]](diffhunk://#diff-6016265db35be235693fbe7e8fd21914c73f72a71c2d136f5cdc756eaa9d9302L353-R354) [[5]](diffhunk://#diff-ba3e05e32ab54259c27405787a43efee535c22c1ff26413775c1fbe50c5ff0f7L388-R389) [[6]](diffhunk://#diff-be8844e4e44f2a3baf2831150b013f9cba6051a21f4ded2adcd33de2b187bf40L115-R119) [[7]](diffhunk://#diff-5bca94ed0f2b711b68873de0fc56e2e67a70e0da0c305c3b4eb37cdcb9bc8965L407-R413) [[8]](diffhunk://#diff-5bca94ed0f2b711b68873de0fc56e2e67a70e0da0c305c3b4eb37cdcb9bc8965L598-R595) [[9]](diffhunk://#diff-5bca94ed0f2b711b68873de0fc56e2e67a70e0da0c305c3b4eb37cdcb9bc8965L980-R978) [[10]](diffhunk://#diff-d8e24839736a7e8e228cd881f20818f23584d1b8ca510f318653403d23421ee3L108-R113) **Launcher Refactoring:** - Updated launcher methods (`launcher_cloud.py` and `launcher_ssh.py`) to use the new `background` parameter instead of `with_test`, aligning their APIs with backend changes and clarifying execution intent. [[1]](diffhunk://#diff-1c2c242252fa9e9bfec772758217afdbd3a99097593b3882ab8e157c8f449e21L32-R48) [[2]](diffhunk://#diff-1c2c242252fa9e9bfec772758217afdbd3a99097593b3882ab8e157c8f449e21L70-R70) [[3]](diffhunk://#diff-bf783cfb880f39690d7e8c9a101aa46035482edd5987af835dbd113fffb839e1L144-R147) [[4]](diffhunk://#diff-bf783cfb880f39690d7e8c9a101aa46035482edd5987af835dbd113fffb839e1L169-R172) [[5]](diffhunk://#diff-bf783cfb880f39690d7e8c9a101aa46035482edd5987af835dbd113fffb839e1L199-R202) [[6]](diffhunk://#diff-bf783cfb880f39690d7e8c9a101aa46035482edd5987af835dbd113fffb839e1L270-R273) [[7]](diffhunk://#diff-bf783cfb880f39690d7e8c9a101aa46035482edd5987af835dbd113fffb839e1L281-R288) **Code Clean-up:** - Removed all logic branches and code paths related to `with_test`, reducing complexity and eliminating redundant code. [[1]](diffhunk://#diff-bfd13ebd35f064c032620db5685d478276b27ebfbb251bc2062ae00f151e0e20L196-R202) [[2]](diffhunk://#diff-631edcb28a8f12e6d60166e341b21846774b505e6413603fa05b2c7a42e44b25L111-R116) [[3]](diffhunk://#diff-4dc79b3be860d48c2997c3532bd0d494700d921cb5322aa1a37fdd9f9d73d728L116-R122) [[4]](diffhunk://#diff-6016265db35be235693fbe7e8fd21914c73f72a71c2d136f5cdc756eaa9d9302L353-R354) [[5]](diffhunk://#diff-ba3e05e32ab54259c27405787a43efee535c22c1ff26413775c1fbe50c5ff0f7L388-R389) [[6]](diffhunk://#diff-be8844e4e44f2a3baf2831150b013f9cba6051a21f4ded2adcd33de2b187bf40L115-R119) [[7]](diffhunk://#diff-5bca94ed0f2b711b68873de0fc56e2e67a70e0da0c305c3b4eb37cdcb9bc8965L407-R413) [[8]](diffhunk://#diff-5bca94ed0f2b711b68873de0fc56e2e67a70e0da0c305c3b4eb37cdcb9bc8965L598-R595) [[9]](diffhunk://#diff-5bca94ed0f2b711b68873de0fc56e2e67a70e0da0c305c3b4eb37cdcb9bc8965L980-R978) [[10]](diffhunk://#diff-d8e24839736a7e8e228cd881f20818f23584d1b8ca510f318653403d23421ee3L108-R113) **Minor Improvements:** - Minor code style improvements, such as using unpacking in command construction and removing unused variables. [[1]](diffhunk://#diff-1c2c242252fa9e9bfec772758217afdbd3a99097593b3882ab8e157c8f449e21L32-R48) [[2]](diffhunk://#diff-1c2c242252fa9e9bfec772758217afdbd3a99097593b3882ab8e157c8f449e21L121) [[3]](diffhunk://#diff-bf783cfb880f39690d7e8c9a101aa46035482edd5987af835dbd113fffb839e1R19-R22) [[4]](diffhunk://#diff-bf783cfb880f39690d7e8c9a101aa46035482edd5987af835dbd113fffb839e1R31) These changes make the codebase cleaner, easier to maintain, and improve the reliability of log handling for foreground tasks.
1 parent 4bb7e97 commit b0b713f

16 files changed

Lines changed: 636 additions & 456 deletions

flagscale/run.py

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -116,7 +116,13 @@ def execute_action(runner, action: str, task_type: str, config: DictConfig) -> N
116116
elif action == "dryrun":
117117
runner.run(dryrun=True)
118118
elif action == "test":
119-
runner.run(with_test=True)
119+
# Serve tasks are long-running daemons validated externally, so they
120+
# must start in the background even during tests. Other tasks
121+
# (train, inference) run in the foreground so output streams directly.
122+
if task_type == "serve":
123+
runner.run()
124+
else:
125+
runner.run(background=False)
120126
elif action == "stop":
121127
runner.stop()
122128
elif action == "query":

flagscale/runner/backend/backend_llama_cpp.py

Lines changed: 7 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -136,7 +136,7 @@ def _prepare(self):
136136
logger.info("\n************** LlamaCpp Configuration **************")
137137
logger.info(f"\n{OmegaConf.to_yaml(self.config)}")
138138

139-
def generate_run_script(self, config, host, node_rank, cmd, background=True, with_test=False):
139+
def generate_run_script(self, config, host, node_rank, cmd, background=False):
140140
logging_config = config.logging
141141

142142
no_shared_fs = config.experiment.runner.get("no_shared_fs", False)
@@ -193,15 +193,13 @@ def generate_run_script(self, config, host, node_rank, cmd, background=True, wit
193193
f.write("\n")
194194
f.write("echo '=========== launch task (LlamaCpp) ==========='\n")
195195

196-
if with_test:
197-
f.write(f'bash -c "$cmd; sync" >> {host_output_file} \n')
196+
if background:
197+
f.write(
198+
f'nohup bash -c "$cmd; sync" >> {host_output_file} 2>&1 & echo $! > {host_pid_file}\n'
199+
)
198200
else:
199-
if background:
200-
f.write(
201-
f'nohup bash -c "$cmd; sync" >> {host_output_file} 2>&1 & echo $! > {host_pid_file}\n'
202-
)
203-
else:
204-
f.write(f'bash -c "$cmd; sync" >> {host_output_file} 2>&1\n')
201+
f.write("set -o pipefail\n")
202+
f.write(f'bash -c "$cmd; sync" 2>&1 | tee -a {host_output_file}\n')
205203

206204
f.write("\n")
207205
f.flush()

flagscale/runner/backend/backend_megatron.py

Lines changed: 7 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -36,8 +36,7 @@ def generate_run_script(
3636
host,
3737
node_rank,
3838
cmd,
39-
background=True,
40-
with_test=False,
39+
background=False,
4140
pkg_dir=None,
4241
enable_monitoring=False,
4342
):
@@ -108,17 +107,13 @@ def generate_run_script(
108107
)
109108
f.write("\n")
110109

111-
if with_test:
112-
f.write('bash -c "$cmd; sync" \n')
110+
if background:
111+
f.write(
112+
f'nohup bash -c "$cmd; sync" >> {host_output_file} 2>&1 & echo $! > {host_pid_file}\n'
113+
)
113114
else:
114-
# TODO: need a option to control whether to append or overwrite the output file
115-
# Now, it always appends to the output file
116-
if background:
117-
f.write(
118-
f'nohup bash -c "$cmd; sync" >> {host_output_file} 2>&1 & echo $! > {host_pid_file}\n'
119-
)
120-
else:
121-
f.write(f'bash -c "$cmd; sync" >> {host_output_file} 2>&1\n')
115+
f.write("set -o pipefail\n")
116+
f.write(f'bash -c "$cmd; sync" 2>&1 | tee -a {host_output_file}\n')
122117
f.write("\n")
123118
f.flush()
124119
os.fsync(f.fileno())

flagscale/runner/backend/backend_native_compress.py

Lines changed: 7 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -71,7 +71,7 @@ def _prepare(self):
7171
logger.info("\n************** configuration **************")
7272
logger.info(f"\n{OmegaConf.to_yaml(self.config)}")
7373

74-
def generate_run_script(self, config, host, node_rank, cmd, background=True, with_test=False):
74+
def generate_run_script(self, config, host, node_rank, cmd, background=False):
7575
system_config = config.compress.system
7676
logging_config = config.compress.system.logging
7777

@@ -113,17 +113,13 @@ def generate_run_script(self, config, host, node_rank, cmd, background=True, wit
113113
f.write("\n")
114114
f.write(f'cmd="{cmd}"\n')
115115
f.write("\n")
116-
if with_test:
117-
f.write('bash -c "$cmd; sync" \n')
116+
if background:
117+
f.write(
118+
f'nohup bash -c "$cmd; sync" >> {host_output_file} 2>&1 & echo $! > {host_pid_file}\n'
119+
)
118120
else:
119-
# TODO: need a option to control whether to append or overwrite the output file
120-
# Now, it always appends to the output file
121-
if background:
122-
f.write(
123-
f'nohup bash -c "$cmd; sync" >> {host_output_file} 2>&1 & echo $! > {host_pid_file}\n'
124-
)
125-
else:
126-
f.write(f'bash -c "$cmd; sync" >> {host_output_file} 2>&1\n')
121+
f.write("set -o pipefail\n")
122+
f.write(f'bash -c "$cmd; sync" 2>&1 | tee -a {host_output_file}\n')
127123
f.write("\n")
128124
f.flush()
129125
os.fsync(f.fileno())

flagscale/runner/backend/backend_native_serve.py

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -147,7 +147,7 @@ def _prepare(self):
147147
logger.info("\n************** Ray Configuration **************")
148148
logger.info(f"\n{OmegaConf.to_yaml(self.config)}")
149149

150-
def generate_run_script(self, config, host, node_rank, cmd, background=True, with_test=False):
150+
def generate_run_script(self, config, host, node_rank, cmd, background=False):
151151
nodes = config.get("nodes", None)
152152
logging_config = config.logging
153153

@@ -350,7 +350,8 @@ def generate_run_script(self, config, host, node_rank, cmd, background=True, wit
350350
f'nohup bash -c "$cmd; sync" >> {host_output_file} 2>&1 & echo $! > {host_pid_file}\n'
351351
)
352352
else:
353-
f.write(f'bash -c "$cmd; sync" >> {host_output_file} 2>&1\n')
353+
f.write("set -o pipefail\n")
354+
f.write(f'bash -c "$cmd; sync" 2>&1 | tee -a {host_output_file}\n')
354355

355356
f.write("\n")
356357
f.flush()

flagscale/runner/backend/backend_native_train.py

Lines changed: 7 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -33,8 +33,7 @@ def generate_run_script(
3333
host,
3434
node_rank,
3535
cmd,
36-
background=True,
37-
with_test=False,
36+
background=False,
3837
pkg_dir=None,
3938
enable_monitoring=False,
4039
):
@@ -105,17 +104,13 @@ def generate_run_script(
105104
)
106105
f.write("\n")
107106

108-
if with_test:
109-
f.write('bash -c "$cmd; sync" \n')
107+
if background:
108+
f.write(
109+
f'nohup bash -c "$cmd; sync" >> {host_output_file} 2>&1 & echo $! > {host_pid_file}\n'
110+
)
110111
else:
111-
# TODO: need a option to control whether to append or overwrite the output file
112-
# Now, it always appends to the output file
113-
if background:
114-
f.write(
115-
f'nohup bash -c "$cmd; sync" >> {host_output_file} 2>&1 & echo $! > {host_pid_file}\n'
116-
)
117-
else:
118-
f.write(f'bash -c "$cmd; sync" >> {host_output_file} 2>&1\n')
112+
f.write("set -o pipefail\n")
113+
f.write(f'bash -c "$cmd; sync" 2>&1 | tee -a {host_output_file}\n')
119114
f.write("\n")
120115
f.flush()
121116
os.fsync(f.fileno())

flagscale/runner/backend/backend_sglang.py

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -149,7 +149,7 @@ def _prepare(self):
149149
logger.info("\n************** Sglang Configuration **************")
150150
logger.info(f"\n{OmegaConf.to_yaml(self.config)}")
151151

152-
def generate_run_script(self, config, host, node_rank, cmd, background=True, with_test=False):
152+
def generate_run_script(self, config, host, node_rank, cmd, background=False):
153153
nodes = config.get("nodes", None)
154154
logging_config = config.logging
155155

@@ -385,7 +385,8 @@ def generate_run_script(self, config, host, node_rank, cmd, background=True, wit
385385
f'nohup bash -c "$cmd; sync" >> {host_output_file} 2>&1 & echo $! > {host_pid_file}\n'
386386
)
387387
else:
388-
f.write(f'bash -c "$cmd; sync" >> {host_output_file} 2>&1\n')
388+
f.write("set -o pipefail\n")
389+
f.write(f'bash -c "$cmd; sync" 2>&1 | tee -a {host_output_file}\n')
389390
f.write("\n")
390391
f.flush()
391392
os.fsync(f.fileno())

flagscale/runner/backend/backend_verl.py

Lines changed: 7 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -60,9 +60,7 @@ def _prepare(self):
6060
logger.info("\n************** configuration **************")
6161
logger.info(f"\n{OmegaConf.to_yaml(self.config)}")
6262

63-
def generate_run_script(
64-
self, config, host, node_rank, cmd, background=True, with_test=False, resources=None
65-
):
63+
def generate_run_script(self, config, host, node_rank, cmd, background=False, resources=None):
6664
system_config = config.system
6765
logging_config = config.system.logging
6866

@@ -112,17 +110,13 @@ def generate_run_script(
112110
f.write("\n")
113111
f.write(f'cmd="{cmd}"\n')
114112
f.write("\n")
115-
if with_test:
116-
f.write(f'bash -c "$cmd; sync" >> {host_output_file} \n')
113+
if background:
114+
f.write(
115+
f'nohup bash -c "$cmd; sync" >> {host_output_file} 2>&1 & echo $! > {host_pid_file}\n'
116+
)
117117
else:
118-
# TODO: need a option to control whether to append or overwrite the output file
119-
# Now, it always appends to the output file
120-
if background:
121-
f.write(
122-
f'nohup bash -c "$cmd; sync" >> {host_output_file} 2>&1 & echo $! > {host_pid_file}\n'
123-
)
124-
else:
125-
f.write(f'bash -c "$cmd; sync" >> {host_output_file} 2>&1\n')
118+
f.write("set -o pipefail\n")
119+
f.write(f'bash -c "$cmd; sync" 2>&1 | tee -a {host_output_file}\n')
126120
f.write("\n")
127121
f.flush()
128122
os.fsync(f.fileno())

flagscale/runner/backend/backend_vllm.py

Lines changed: 11 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -367,7 +367,7 @@ def _prepare(self):
367367
logger.info("\n************** configuration **************")
368368
logger.info(f"\n{OmegaConf.to_yaml(self.config)}")
369369

370-
def generate_run_script(self, config, host, node_rank, cmd, background=True, with_test=False):
370+
def generate_run_script(self, config, host, node_rank, cmd, background=False):
371371
if self.task_type == "inference":
372372
logging_config = config.inference.logging
373373

@@ -404,17 +404,13 @@ def generate_run_script(self, config, host, node_rank, cmd, background=True, wit
404404
f.write("\n")
405405
f.write(f'cmd="{cmd}"\n')
406406
f.write("\n")
407-
if with_test:
408-
f.write(f'bash -c "$cmd; sync" >> {host_output_file} \n')
407+
if background:
408+
f.write(
409+
f'nohup bash -c "$cmd; sync" >> {host_output_file} 2>&1 & echo $! > {host_pid_file}\n'
410+
)
409411
else:
410-
# TODO: need a option to control whether to append or overwrite the output file
411-
# Now, it always appends to the output file
412-
if background:
413-
f.write(
414-
f'nohup bash -c "$cmd; sync" >> {host_output_file} 2>&1 & echo $! > {host_pid_file}\n'
415-
)
416-
else:
417-
f.write(f'bash -c "$cmd; sync" >> {host_output_file} 2>&1\n')
412+
f.write("set -o pipefail\n")
413+
f.write(f'bash -c "$cmd; sync" 2>&1 | tee -a {host_output_file}\n')
418414
f.write("\n")
419415
f.flush()
420416
os.fsync(f.fileno())
@@ -595,7 +591,8 @@ def generate_run_script(self, config, host, node_rank, cmd, background=True, wit
595591
f'nohup bash -c "$cmd; sync" >> {host_output_file} 2>&1 & echo $! > {host_pid_file}\n'
596592
)
597593
else:
598-
f.write(f'bash -c "$cmd; sync" >> {host_output_file} 2>&1\n')
594+
f.write("set -o pipefail\n")
595+
f.write(f'bash -c "$cmd; sync" 2>&1 | tee -a {host_output_file}\n')
599596
f.write("\n")
600597
f.flush()
601598
os.fsync(f.fileno())
@@ -977,7 +974,8 @@ def generate_run_script(self, config, host, node_rank, cmd, background=True, wit
977974
f'nohup bash -c "$cmd; sync" >> {host_output_file} 2>&1 & echo $! > {host_pid_file}\n'
978975
)
979976
else:
980-
f.write(f'bash -c "$cmd; sync" >> {host_output_file} 2>&1\n')
977+
f.write("set -o pipefail\n")
978+
f.write(f'bash -c "$cmd; sync" 2>&1 | tee -a {host_output_file}\n')
981979
f.write("\n")
982980
f.flush()
983981
os.fsync(f.fileno())

flagscale/runner/launcher/launcher_cloud.py

Lines changed: 5 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -29,23 +29,23 @@ def _run_each(
2929
nnodes,
3030
node_rank,
3131
nproc_per_node,
32-
with_test=False,
32+
background=True,
3333
dryrun=False,
3434
):
3535
export_cmd = []
3636
for k, v in self.user_envs.items():
3737
export_cmd += [f"{k}={v}"]
3838

39-
cmd = shlex.join(export_cmd + ["python"] + [self.user_script] + self.user_args)
39+
cmd = shlex.join([*export_cmd, "python", self.user_script, *self.user_args])
4040

4141
host_run_script_file = self.backend.generate_run_script(
42-
self.config, host, node_rank, cmd, background=True, with_test=with_test
42+
self.config, host, node_rank, cmd, background=background
4343
)
4444

4545
run_local_command(f"bash {host_run_script_file}", dryrun)
4646

4747
def run(
48-
self, with_test=False, dryrun=False, monitor=False, interval=10, enable_monitoring=None
48+
self, background=True, dryrun=False, monitor=False, interval=10, enable_monitoring=None
4949
):
5050
num_visible_devices = None
5151
visible_devices = self.user_envs.get("CUDA_VISIBLE_DEVICES", None)
@@ -67,7 +67,7 @@ def run(
6767
1,
6868
0,
6969
nproc_per_node,
70-
with_test=with_test,
70+
background=background,
7171
dryrun=dryrun,
7272
)
7373
self.host = available_addr
@@ -118,7 +118,6 @@ def _generate_query_script(self, host, node_rank):
118118
def _query_each(self, host, node_rank):
119119
"Query each node status."
120120
host_query_script_file = self._generate_query_script(host, node_rank)
121-
logging_config = self.config.logging
122121
result = ""
123122
try:
124123
result = run_local_command(f"bash {host_query_script_file}", query=True)

0 commit comments

Comments
 (0)