Skip to content

Revert the PR changes 5122 #5134

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 33 commits into from
Apr 20, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
33 commits
Select commit Hold shift + click to select a range
25f16ef
change: Allow telemetry only in supported regions
Jan 29, 2025
0ed85d6
change: Allow telemetry only in supported regions
Jan 29, 2025
b69ffcb
change: Allow telemetry only in supported regions
Jan 29, 2025
8d7f4a8
change: Allow telemetry only in supported regions
Jan 29, 2025
9321367
Merge branch 'aws:master' into rsareddy-dev
rsareddy0329 Jan 29, 2025
f972222
Merge branch 'aws:master' into rsareddy-dev
rsareddy0329 Jan 30, 2025
dadbb22
change: Allow telemetry only in supported regions
Jan 30, 2025
28b3fe8
Merge branch 'aws:master' into rsareddy-dev
rsareddy0329 Feb 23, 2025
fe64f82
Merge branch 'aws:master' into rsareddy-dev
rsareddy0329 Feb 24, 2025
7775c63
documentation: Removed a line about python version requirements of tr…
Feb 24, 2025
acc861a
Merge branch 'master' into rsareddy-dev
rsareddy0329 Feb 24, 2025
16dc02b
Merge branch 'aws:master' into rsareddy-dev
rsareddy0329 Mar 10, 2025
06597c6
Merge branch 'aws:master' into rsareddy-dev
rsareddy0329 Mar 11, 2025
249872d
Merge branch 'aws:master' into rsareddy-dev
rsareddy0329 Mar 12, 2025
58f8746
feature: Enabled update_endpoint through model_builder
Mar 12, 2025
c6bad70
Merge branch 'aws:master' into rsareddy-dev
rsareddy0329 Mar 12, 2025
0bf6404
fix: fix unit test, black-check, pylint errors
Mar 12, 2025
c67d7df
fix: fix black-check, pylint errors
Mar 12, 2025
1f84662
Merge branch 'aws:master' into rsareddy-dev
rsareddy0329 Mar 13, 2025
ea1810b
Merge branch 'aws:master' into rsareddy-dev
rsareddy0329 Mar 14, 2025
6079269
Merge branch 'aws:master' into rsareddy-dev
rsareddy0329 Mar 14, 2025
c9fcefb
Merge branch 'aws:master' into rsareddy-dev
rsareddy0329 Mar 17, 2025
16b6f0c
Merge branch 'aws:master' into rsareddy-dev
rsareddy0329 Apr 8, 2025
89e18a9
fix:Added handler for pipeline variable while creating process job
Apr 8, 2025
10d4c4f
Merge branch 'aws:master' into rsareddy-dev
rsareddy0329 Apr 9, 2025
7f15e19
fix: Added handler for pipeline variable while creating process job
Apr 9, 2025
7440f4d
Merge branch 'aws:master' into rsareddy-dev
rsareddy0329 Apr 14, 2025
de11c91
Revert the PR changes: #5122, due to issue https://t.corp.amazon.com/…
Apr 14, 2025
7acccdb
Fix: fix the issue, https://t.corp.amazon.com/P223568185/communication
Apr 14, 2025
89c2b27
Merge branch 'master' into rsareddy-dev
rsareddy0329 Apr 15, 2025
b24fe0c
Merge branch 'aws:master' into rsareddy-dev
rsareddy0329 Apr 17, 2025
e68a4b9
Revert PR 5122 changes, due to issues with other processor codeflows
Apr 17, 2025
10b0782
Merge branch 'master' into rsareddy-dev
zhaoqizqwang Apr 19, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 0 additions & 11 deletions src/sagemaker/processing.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@
and interpretation on Amazon SageMaker.
"""
from __future__ import absolute_import
import json
import logging
import os
import pathlib
Expand Down Expand Up @@ -315,16 +314,6 @@ def _normalize_args(
+ "rather than a pipeline variable"
)

if arguments is not None:
processed_arguments = []
for arg in arguments:
if isinstance(arg, PipelineVariable):
processed_value = json.dumps(arg.expr)
processed_arguments.append(processed_value)
else:
processed_arguments.append(arg)
arguments = processed_arguments

self._current_job_name = self._generate_current_job_name(job_name=job_name)

inputs_with_code = self._include_code_in_inputs(inputs, code, kms_key)
Expand Down
18 changes: 2 additions & 16 deletions tests/unit/sagemaker/workflow/test_processing_step.py
Original file line number Diff line number Diff line change
Expand Up @@ -825,13 +825,6 @@ def test_spark_processor(spark_processor, processing_input, pipeline_session):
processor.sagemaker_session = pipeline_session
processor.role = ROLE

arguments_output = [
"--input",
"input-data-uri",
"--output",
'{"Get": "Parameters.MyArgOutput"}',
]

run_inputs["inputs"] = processing_input

step_args = processor.run(**run_inputs)
Expand All @@ -842,7 +835,7 @@ def test_spark_processor(spark_processor, processing_input, pipeline_session):

step_args = get_step_args_helper(step_args, "Processing")

assert step_args["AppSpecification"]["ContainerArguments"] == arguments_output
assert step_args["AppSpecification"]["ContainerArguments"] == run_inputs["arguments"]

entry_points = step_args["AppSpecification"]["ContainerEntrypoint"]
entry_points_expr = []
Expand Down Expand Up @@ -1027,13 +1020,6 @@ def test_spark_processor_local_code(spark_processor, processing_input, pipeline_
processor.sagemaker_session = pipeline_session
processor.role = ROLE

arguments_output = [
"--input",
"input-data-uri",
"--output",
'{"Get": "Parameters.MyArgOutput"}',
]

run_inputs["inputs"] = processing_input

step_args = processor.run(**run_inputs)
Expand All @@ -1044,7 +1030,7 @@ def test_spark_processor_local_code(spark_processor, processing_input, pipeline_

step_args = get_step_args_helper(step_args, "Processing")

assert step_args["AppSpecification"]["ContainerArguments"] == arguments_output
assert step_args["AppSpecification"]["ContainerArguments"] == run_inputs["arguments"]

entry_points = step_args["AppSpecification"]["ContainerEntrypoint"]
entry_points_expr = []
Expand Down
249 changes: 1 addition & 248 deletions tests/unit/test_processing.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,9 +46,8 @@
from sagemaker.fw_utils import UploadedCode
from sagemaker.workflow.pipeline_context import PipelineSession, _PipelineConfig
from sagemaker.workflow.functions import Join
from sagemaker.workflow.execution_variables import ExecutionVariable, ExecutionVariables
from sagemaker.workflow.execution_variables import ExecutionVariables
from tests.unit import SAGEMAKER_CONFIG_PROCESSING_JOB
from sagemaker.workflow.parameters import ParameterString

BUCKET_NAME = "mybucket"
REGION = "us-west-2"
Expand Down Expand Up @@ -1718,249 +1717,3 @@ def _get_describe_response_inputs_and_ouputs():
"ProcessingInputs": _get_expected_args_all_parameters(None)["inputs"],
"ProcessingOutputConfig": _get_expected_args_all_parameters(None)["output_config"],
}


# Parameters
def _get_data_inputs_with_parameters():
return [
ProcessingInput(
source=ParameterString(name="input_data", default_value="s3://dummy-bucket/input"),
destination="/opt/ml/processing/input",
input_name="input-1",
)
]


def _get_data_outputs_with_parameters():
return [
ProcessingOutput(
source="/opt/ml/processing/output",
destination=ParameterString(
name="output_data", default_value="s3://dummy-bucket/output"
),
output_name="output-1",
)
]


def _get_expected_args_with_parameters(job_name):
return {
"inputs": [
{
"InputName": "input-1",
"S3Input": {
"S3Uri": "s3://dummy-bucket/input",
"LocalPath": "/opt/ml/processing/input",
"S3DataType": "S3Prefix",
"S3InputMode": "File",
"S3DataDistributionType": "FullyReplicated",
"S3CompressionType": "None",
},
}
],
"output_config": {
"Outputs": [
{
"OutputName": "output-1",
"S3Output": {
"S3Uri": "s3://dummy-bucket/output",
"LocalPath": "/opt/ml/processing/output",
"S3UploadMode": "EndOfJob",
},
}
]
},
"job_name": job_name,
"resources": {
"ClusterConfig": {
"InstanceType": "ml.m4.xlarge",
"InstanceCount": 1,
"VolumeSizeInGB": 100,
"VolumeKmsKeyId": "arn:aws:kms:us-west-2:012345678901:key/volume-kms-key",
}
},
"stopping_condition": {"MaxRuntimeInSeconds": 3600},
"app_specification": {
"ImageUri": "custom-image-uri",
"ContainerArguments": [
"--input-data",
"s3://dummy-bucket/input-param",
"--output-path",
"s3://dummy-bucket/output-param",
],
"ContainerEntrypoint": ["python3"],
},
"environment": {"my_env_variable": "my_env_variable_value"},
"network_config": {
"EnableNetworkIsolation": True,
"EnableInterContainerTrafficEncryption": True,
"VpcConfig": {
"Subnets": ["my_subnet_id"],
"SecurityGroupIds": ["my_security_group_id"],
},
},
"role_arn": "dummy/role",
"tags": [{"Key": "my-tag", "Value": "my-tag-value"}],
"experiment_config": {"ExperimentName": "AnExperiment"},
}


@patch("os.path.exists", return_value=True)
@patch("os.path.isfile", return_value=True)
@patch("sagemaker.utils.repack_model")
@patch("sagemaker.utils.create_tar_file")
@patch("sagemaker.session.Session.upload_data")
def test_script_processor_with_parameter_string(
upload_data_mock,
create_tar_file_mock,
repack_model_mock,
exists_mock,
isfile_mock,
sagemaker_session,
):
"""Test ScriptProcessor with ParameterString arguments"""
upload_data_mock.return_value = "s3://mocked_s3_uri_from_upload_data"

# Setup processor
processor = ScriptProcessor(
role="arn:aws:iam::012345678901:role/SageMakerRole", # Updated role ARN
image_uri="custom-image-uri",
command=["python3"],
instance_type="ml.m4.xlarge",
instance_count=1,
volume_size_in_gb=100,
volume_kms_key="arn:aws:kms:us-west-2:012345678901:key/volume-kms-key",
output_kms_key="arn:aws:kms:us-west-2:012345678901:key/output-kms-key",
max_runtime_in_seconds=3600,
base_job_name="test_processor",
env={"my_env_variable": "my_env_variable_value"},
tags=[{"Key": "my-tag", "Value": "my-tag-value"}],
network_config=NetworkConfig(
subnets=["my_subnet_id"],
security_group_ids=["my_security_group_id"],
enable_network_isolation=True,
encrypt_inter_container_traffic=True,
),
sagemaker_session=sagemaker_session,
)

input_param = ParameterString(name="input_param", default_value="s3://dummy-bucket/input-param")
output_param = ParameterString(
name="output_param", default_value="s3://dummy-bucket/output-param"
)
exec_var = ExecutionVariable(name="ExecutionTest")
join_var = Join(on="/", values=["s3://bucket", "prefix", "file.txt"])
dummy_str_var = "test-variable"

# Define expected arguments
expected_args = {
"inputs": [
{
"InputName": "input-1",
"AppManaged": False,
"S3Input": {
"S3Uri": ParameterString(
name="input_data", default_value="s3://dummy-bucket/input"
),
"LocalPath": "/opt/ml/processing/input",
"S3DataType": "S3Prefix",
"S3InputMode": "File",
"S3DataDistributionType": "FullyReplicated",
"S3CompressionType": "None",
},
},
{
"InputName": "code",
"AppManaged": False,
"S3Input": {
"S3Uri": "s3://mocked_s3_uri_from_upload_data",
"LocalPath": "/opt/ml/processing/input/code",
"S3DataType": "S3Prefix",
"S3InputMode": "File",
"S3DataDistributionType": "FullyReplicated",
"S3CompressionType": "None",
},
},
],
"output_config": {
"Outputs": [
{
"OutputName": "output-1",
"AppManaged": False,
"S3Output": {
"S3Uri": ParameterString(
name="output_data", default_value="s3://dummy-bucket/output"
),
"LocalPath": "/opt/ml/processing/output",
"S3UploadMode": "EndOfJob",
},
}
],
"KmsKeyId": "arn:aws:kms:us-west-2:012345678901:key/output-kms-key",
},
"job_name": "test_job",
"resources": {
"ClusterConfig": {
"InstanceType": "ml.m4.xlarge",
"InstanceCount": 1,
"VolumeSizeInGB": 100,
"VolumeKmsKeyId": "arn:aws:kms:us-west-2:012345678901:key/volume-kms-key",
}
},
"stopping_condition": {"MaxRuntimeInSeconds": 3600},
"app_specification": {
"ImageUri": "custom-image-uri",
"ContainerArguments": [
"--input-data",
'{"Get": "Parameters.input_param"}',
"--output-path",
'{"Get": "Parameters.output_param"}',
"--exec-arg",
'{"Get": "Execution.ExecutionTest"}',
"--join-arg",
'{"Std:Join": {"On": "/", "Values": ["s3://bucket", "prefix", "file.txt"]}}',
"--string-param",
"test-variable",
],
"ContainerEntrypoint": ["python3", "/opt/ml/processing/input/code/processing_code.py"],
},
"environment": {"my_env_variable": "my_env_variable_value"},
"network_config": {
"EnableNetworkIsolation": True,
"EnableInterContainerTrafficEncryption": True,
"VpcConfig": {
"SecurityGroupIds": ["my_security_group_id"],
"Subnets": ["my_subnet_id"],
},
},
"role_arn": "arn:aws:iam::012345678901:role/SageMakerRole",
"tags": [{"Key": "my-tag", "Value": "my-tag-value"}],
"experiment_config": {"ExperimentName": "AnExperiment"},
}

# Run processor
processor.run(
code="/local/path/to/processing_code.py",
inputs=_get_data_inputs_with_parameters(),
outputs=_get_data_outputs_with_parameters(),
arguments=[
"--input-data",
input_param,
"--output-path",
output_param,
"--exec-arg",
exec_var,
"--join-arg",
join_var,
"--string-param",
dummy_str_var,
],
wait=True,
logs=False,
job_name="test_job",
experiment_config={"ExperimentName": "AnExperiment"},
)

# Assert
sagemaker_session.process.assert_called_with(**expected_args)
assert "test_job" in processor._current_job_name