From ab72f5898d3baf064d91e01bafa02ce968da8d13 Mon Sep 17 00:00:00 2001 From: liferoad Date: Tue, 23 Sep 2025 14:16:29 -0400 Subject: [PATCH 1/6] fix(sdk:python): Avoid AttributeError for transforms without hints The AppliedPTransform initializer would unconditionally attempt to call `get_resource_hints()` on a transform object. This could cause an AttributeError if a PTransform implementation does not define this method. This change adds an `hasattr` check to verify the existence of the `get_resource_hints` method before calling it, preventing the potential crash and making the pipeline construction more robust. --- sdks/python/apache_beam/pipeline.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/sdks/python/apache_beam/pipeline.py b/sdks/python/apache_beam/pipeline.py index caed03943e19..b34363eeed1e 100644 --- a/sdks/python/apache_beam/pipeline.py +++ b/sdks/python/apache_beam/pipeline.py @@ -1240,8 +1240,8 @@ def __init__( # once environment is a first-class citizen in Beam graph and we have # access to actual environment, not just an id. self.resource_hints = dict( - transform.get_resource_hints()) if transform else { - } # type: Dict[str, bytes] + transform.get_resource_hints()) if transform and hasattr( + transform, 'get_resource_hints') else {} # type: Dict[str, bytes] if transform: annotations = { From da45b26bd4b608e5611308812b71f3dd9430263b Mon Sep 17 00:00:00 2001 From: liferoad Date: Tue, 23 Sep 2025 16:05:05 -0400 Subject: [PATCH 2/6] fix annotations --- sdks/python/apache_beam/pipeline.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sdks/python/apache_beam/pipeline.py b/sdks/python/apache_beam/pipeline.py index b34363eeed1e..fbe2ba20c220 100644 --- a/sdks/python/apache_beam/pipeline.py +++ b/sdks/python/apache_beam/pipeline.py @@ -1243,7 +1243,7 @@ def __init__( transform.get_resource_hints()) if transform and hasattr( transform, 'get_resource_hints') else {} # type: Dict[str, bytes] - if transform: + if transform and hasattr(transform, 'annotations'): annotations = { **(annotations or {}), **encode_annotations(transform.annotations()) } From 7bf0a95df6b6b3e77beb9ac5964c49ff8f863930 Mon Sep 17 00:00:00 2001 From: liferoad Date: Tue, 23 Sep 2025 21:34:46 -0400 Subject: [PATCH 3/6] fixed more --- sdks/python/apache_beam/pipeline.py | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/sdks/python/apache_beam/pipeline.py b/sdks/python/apache_beam/pipeline.py index fbe2ba20c220..016be9e1d09f 100644 --- a/sdks/python/apache_beam/pipeline.py +++ b/sdks/python/apache_beam/pipeline.py @@ -354,7 +354,7 @@ def _replace_if_needed(self, original_transform_node): if replacement_transform is original_transform_node.transform: return replacement_transform.side_inputs = tuple( - original_transform_node.transform.side_inputs) + getattr(original_transform_node.transform, 'side_inputs', ())) replacement_transform_node = AppliedPTransform( original_transform_node.parent, @@ -1046,7 +1046,7 @@ def visit_transform(self, transform_node): == output.element_type.tuple_types[0]): output.requires_deterministic_key_coder = ( deterministic_key_coders and transform_node.full_label) - for side_input in transform_node.transform.side_inputs: + for side_input in getattr(transform_node.transform, 'side_inputs', []): if side_input.requires_keyed_input(): side_input.pvalue.element_type = typehints.coerce_to_kv_type( side_input.pvalue.element_type, @@ -1527,7 +1527,8 @@ def from_runner_api( environment_id=None, annotations=proto.annotations) - if result.transform and result.transform.side_inputs: + if result.transform and hasattr( + result.transform, 'side_inputs') and result.transform.side_inputs: for si, pcoll in zip(result.transform.side_inputs, side_inputs): si.pvalue = pcoll result.side_inputs = tuple(result.transform.side_inputs) From 15974f822a0020c61ef8c8b39c6ca37300296154 Mon Sep 17 00:00:00 2001 From: liferoad Date: Wed, 24 Sep 2025 09:42:58 -0400 Subject: [PATCH 4/6] fixed more --- sdks/python/apache_beam/pipeline.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/sdks/python/apache_beam/pipeline.py b/sdks/python/apache_beam/pipeline.py index 016be9e1d09f..8bbb8ade3b0c 100644 --- a/sdks/python/apache_beam/pipeline.py +++ b/sdks/python/apache_beam/pipeline.py @@ -1027,7 +1027,9 @@ def visit_transform(self, transform_node): # type: (AppliedPTransform) -> None if not transform_node.transform: return - if transform_node.transform.runner_api_requires_keyed_input(): + if hasattr( + transform_node.transform, 'runner_api_requires_keyed_input' + ) and transform_node.transform.runner_api_requires_keyed_input(): pcoll = transform_node.inputs[0] pcoll.element_type = typehints.coerce_to_kv_type( pcoll.element_type, transform_node.full_label) From 17b7ab66b274734905ccf9c0ea6da53fc2255bd8 Mon Sep 17 00:00:00 2001 From: liferoad Date: Wed, 24 Sep 2025 11:55:09 -0400 Subject: [PATCH 5/6] more fixes --- sdks/python/apache_beam/pipeline.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/sdks/python/apache_beam/pipeline.py b/sdks/python/apache_beam/pipeline.py index 8bbb8ade3b0c..baf0783150cd 100644 --- a/sdks/python/apache_beam/pipeline.py +++ b/sdks/python/apache_beam/pipeline.py @@ -1443,7 +1443,9 @@ def transform_to_runner_api( context, has_parts=bool(self.parts), named_inputs=self.named_inputs()) - return transform.to_runner_api(context, has_parts=bool(self.parts)) + elif hasattr(transform, 'to_runner_api'): + return transform.to_runner_api(context, has_parts=bool(self.parts)) + return None # Iterate over inputs and outputs by sorted key order, so that ids are # consistently generated for multiple runs of the same pipeline. From 6413d296f4f60ab518824a0f24a9ada93225d59d Mon Sep 17 00:00:00 2001 From: liferoad Date: Wed, 24 Sep 2025 19:49:23 -0400 Subject: [PATCH 6/6] one more --- sdks/python/apache_beam/pipeline.py | 12 +++++++++--- 1 file changed, 9 insertions(+), 3 deletions(-) diff --git a/sdks/python/apache_beam/pipeline.py b/sdks/python/apache_beam/pipeline.py index baf0783150cd..0540bd7a2274 100644 --- a/sdks/python/apache_beam/pipeline.py +++ b/sdks/python/apache_beam/pipeline.py @@ -1401,8 +1401,11 @@ def named_inputs(self): assert not self.main_inputs and not self.side_inputs return {} else: - named_inputs = self.transform._named_inputs( - self.main_inputs, self.side_inputs) + if hasattr(self.transform, '_named_inputs'): + named_inputs = self.transform._named_inputs( + self.main_inputs, self.side_inputs) + else: + named_inputs = {} if not self.parts: for name, pc_out in self.outputs.items(): if pc_out.producer is not self and pc_out not in named_inputs.values( @@ -1416,7 +1419,10 @@ def named_outputs(self): assert not self.outputs return {} else: - return self.transform._named_outputs(self.outputs) + if hasattr(self.transform, '_named_outputs'): + return self.transform._named_outputs(self.outputs) + else: + return {} def to_runner_api(self, context): # type: (PipelineContext) -> beam_runner_api_pb2.PTransform