diff --git a/sdks/python/apache_beam/pipeline.py b/sdks/python/apache_beam/pipeline.py index caed03943e19..0540bd7a2274 100644 --- a/sdks/python/apache_beam/pipeline.py +++ b/sdks/python/apache_beam/pipeline.py @@ -354,7 +354,7 @@ def _replace_if_needed(self, original_transform_node): if replacement_transform is original_transform_node.transform: return replacement_transform.side_inputs = tuple( - original_transform_node.transform.side_inputs) + getattr(original_transform_node.transform, 'side_inputs', ())) replacement_transform_node = AppliedPTransform( original_transform_node.parent, @@ -1027,7 +1027,9 @@ def visit_transform(self, transform_node): # type: (AppliedPTransform) -> None if not transform_node.transform: return - if transform_node.transform.runner_api_requires_keyed_input(): + if hasattr( + transform_node.transform, 'runner_api_requires_keyed_input' + ) and transform_node.transform.runner_api_requires_keyed_input(): pcoll = transform_node.inputs[0] pcoll.element_type = typehints.coerce_to_kv_type( pcoll.element_type, transform_node.full_label) @@ -1046,7 +1048,7 @@ def visit_transform(self, transform_node): == output.element_type.tuple_types[0]): output.requires_deterministic_key_coder = ( deterministic_key_coders and transform_node.full_label) - for side_input in transform_node.transform.side_inputs: + for side_input in getattr(transform_node.transform, 'side_inputs', []): if side_input.requires_keyed_input(): side_input.pvalue.element_type = typehints.coerce_to_kv_type( side_input.pvalue.element_type, @@ -1240,10 +1242,10 @@ def __init__( # once environment is a first-class citizen in Beam graph and we have # access to actual environment, not just an id. self.resource_hints = dict( - transform.get_resource_hints()) if transform else { - } # type: Dict[str, bytes] + transform.get_resource_hints()) if transform and hasattr( + transform, 'get_resource_hints') else {} # type: Dict[str, bytes] - if transform: + if transform and hasattr(transform, 'annotations'): annotations = { **(annotations or {}), **encode_annotations(transform.annotations()) } @@ -1399,8 +1401,11 @@ def named_inputs(self): assert not self.main_inputs and not self.side_inputs return {} else: - named_inputs = self.transform._named_inputs( - self.main_inputs, self.side_inputs) + if hasattr(self.transform, '_named_inputs'): + named_inputs = self.transform._named_inputs( + self.main_inputs, self.side_inputs) + else: + named_inputs = {} if not self.parts: for name, pc_out in self.outputs.items(): if pc_out.producer is not self and pc_out not in named_inputs.values( @@ -1414,7 +1419,10 @@ def named_outputs(self): assert not self.outputs return {} else: - return self.transform._named_outputs(self.outputs) + if hasattr(self.transform, '_named_outputs'): + return self.transform._named_outputs(self.outputs) + else: + return {} def to_runner_api(self, context): # type: (PipelineContext) -> beam_runner_api_pb2.PTransform @@ -1441,7 +1449,9 @@ def transform_to_runner_api( context, has_parts=bool(self.parts), named_inputs=self.named_inputs()) - return transform.to_runner_api(context, has_parts=bool(self.parts)) + elif hasattr(transform, 'to_runner_api'): + return transform.to_runner_api(context, has_parts=bool(self.parts)) + return None # Iterate over inputs and outputs by sorted key order, so that ids are # consistently generated for multiple runs of the same pipeline. @@ -1527,7 +1537,8 @@ def from_runner_api( environment_id=None, annotations=proto.annotations) - if result.transform and result.transform.side_inputs: + if result.transform and hasattr( + result.transform, 'side_inputs') and result.transform.side_inputs: for si, pcoll in zip(result.transform.side_inputs, side_inputs): si.pvalue = pcoll result.side_inputs = tuple(result.transform.side_inputs)