File tree 4 files changed +14
-1
lines changed
4 files changed +14
-1
lines changed Original file line number Diff line number Diff line change 35
35
from metaflow .multicore_utils import parallel_map
36
36
from metaflow .datatools .s3util import aws_retry , read_in_chunks , get_timestamp
37
37
38
- NUM_WORKERS_DEFAULT = 64
38
+ NUM_WORKERS_DEFAULT = int ( os . environ . get ( "METAFLOW_S3OP_NUM_WORKERS" , 64 ))
39
39
40
40
DOWNLOAD_FILE_THRESHOLD = 2 * TransferConfig ().multipart_threshold
41
41
DOWNLOAD_MAX_CHUNK = 2 * 1024 * 1024 * 1024 - 1
Original file line number Diff line number Diff line change @@ -1488,6 +1488,8 @@ def _create_metaflow_step_op(
1488
1488
metaflow_execution_cmd += f" --namespace { flow_variables .namespace } "
1489
1489
if step_variables .is_split_index :
1490
1490
metaflow_execution_cmd += " --is_split_index"
1491
+ if node .type == "join" :
1492
+ metaflow_execution_cmd += " --is-join-step"
1491
1493
1492
1494
metaflow_execution_cmd += ' --preceding_component_outputs_dict "'
1493
1495
for key in preceding_component_outputs_dict :
Original file line number Diff line number Diff line change 28
28
STDERR_PATH = os .path .join (LOGS_DIR , STDERR_FILE )
29
29
30
30
AIP_CLI_DEFAULT_RETRY = 3
31
+
32
+ AIP_JOIN_METAFLOW_S3OP_NUM_WORKERS = 5
Original file line number Diff line number Diff line change 22
22
STDOUT_PATH ,
23
23
STEP_ENVIRONMENT_VARIABLES ,
24
24
TASK_ID_ENV_NAME ,
25
+ AIP_JOIN_METAFLOW_S3OP_NUM_WORKERS ,
25
26
)
26
27
27
28
from ... import R
@@ -282,6 +283,7 @@ def _command(
282
283
@click .option ("--user_code_retries" , type = int )
283
284
@click .option ("--workflow_name" )
284
285
@click .option ("--is-interruptible/--not-interruptible" , default = False )
286
+ @click .option ("--is-join-step" , is_flag = True , default = False )
285
287
def aip_metaflow_step (
286
288
volume_dir : str ,
287
289
environment : str ,
@@ -306,6 +308,7 @@ def aip_metaflow_step(
306
308
user_code_retries : int ,
307
309
workflow_name : str ,
308
310
is_interruptible : bool ,
311
+ is_join_step : bool ,
309
312
) -> None :
310
313
"""
311
314
(1) Renders and runs the Metaflow package_commands and Metaflow step
@@ -373,6 +376,12 @@ def aip_metaflow_step(
373
376
):
374
377
metaflow_configs_new ["METAFLOW_USER" ] = "aip-user"
375
378
379
+ if is_join_step and "METAFLOW_S3OP_NUM_WORKERS" not in os .environ :
380
+ # AIP-7487: Metaflow joins steps require lots of memory
381
+ os .environ ["METAFLOW_S3OP_NUM_WORKERS" ] = str (
382
+ AIP_JOIN_METAFLOW_S3OP_NUM_WORKERS
383
+ )
384
+
376
385
env : Dict [str , str ] = {
377
386
** os .environ ,
378
387
** metaflow_configs_new ,
You can’t perform that action at this time.
0 commit comments