Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -852,6 +852,9 @@ def _check_if_model_exists(self, model_name: str, describe_func: Callable[[str],
def execute_complete(self, context: Context, event: dict[str, Any] | None = None) -> dict[str, dict]:
validated_event = validate_execute_complete_event(event)

if validated_event["status"] != "success":
raise RuntimeError(f"Error while running transform job: {validated_event}")

self.log.info("SageMaker job %s completed.", validated_event["job_name"])
return self.serialize_result(validated_event["job_name"])

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -341,6 +341,33 @@ def test_operator_failed_before_defer(self, _, mock_transform, mock_describe_tra
self.sagemaker.execute(context=None)
assert not mock_defer.called

@mock.patch.object(sagemaker, "serialize", return_value="")
@mock.patch.object(SageMakerHook, "describe_model", return_value={"ModelName": "model_name"})
@mock.patch.object(
SageMakerHook,
"describe_transform_job",
return_value={
"ModelName": "model_name",
"TransformJobStatus": "Failed",
"FailureReason": "it failed",
},
)
def test_execute_complete_raises_when_job_failed_during_deferred_wait(
self, mock_describe_transform_job, mock_describe_model, mock_serialize
):
# When the transform job fails during the deferred wait, the trigger (an
# AwsBaseWaiterTrigger) yields {"status": "error", ...} instead of raising, so
# execute_complete must reject a non-success status rather than report the task as
# successful — matching every other SageMaker operator's execute_complete.
event = {
"status": "error",
"message": "Error while waiting for transform job: terminal failure",
"job_name": "job_name",
}

with pytest.raises(RuntimeError, match="Error while running transform job"):
self.sagemaker.execute_complete(context=None, event=event)

@mock.patch("airflow.providers.amazon.aws.operators.sagemaker.SageMakerTransformOperator.defer")
@mock.patch.object(SageMakerHook, "describe_model")
@mock.patch.object(
Expand Down
Loading