Skip to content
Merged
Changes from 7 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
33 changes: 22 additions & 11 deletions sdks/python/apache_beam/pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -354,7 +354,7 @@ def _replace_if_needed(self, original_transform_node):
if replacement_transform is original_transform_node.transform:
return
replacement_transform.side_inputs = tuple(
original_transform_node.transform.side_inputs)
getattr(original_transform_node.transform, 'side_inputs', ()))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Rather than doing all of these attribute checks, can we just set these property to empty values when we initialize the object?

def __init__(self, label=None):

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The reason is not to set these property. It is caused by the nested MaybeReshuffle. Any fix in MaybeReshuffle could cause the update-compatibly issue. That is why we did #36238

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't quite follow - are you saying adding these properties to the PTransform class would cause update incompatibility?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

check https://github.com/apache/beam/pull/36184/files#r2359516983: MaybeReshuffle is defined dynamically (inside Create.expand?) which is affecting the inheritance.

The fields should be there if MaybeReshuffle was not nested.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks, chimed in on that thread. I think we should fix the core label issue which is causing this issue.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this fix is much better since it can handle other nested transforms.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So would updating

def __init__(self, label=None):

The reason https://github.com/apache/beam/pull/36184/files#r2359516983 was breaking is because:

  1. There were transforms which didn't have explicit labels
  2. Those transforms get autoassigned names which include the line number. For example Map(<lambda at bigquery_file_loads.py:1157>) in https://github.com/apache/beam/pull/34807/files
  3. When we change the file, the line number that those transforms land on is no longer the same

So if we:

  1. Explicitly name the transform which is getting assigned a name with a line number
  2. Add these properties to
    def __init__(self, label=None):

Then we should fix this issue while avoiding any breaking changes.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't know what you mean here. The nested transform misses many fields (check the rest of my PR), which are not needed when the transform is nested. My PR can make sure any future nested transform should work.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The nested transform misses many fields (check the rest of my PR), which are not needed when the transform is nested. My PR can make sure any future nested transform should work.

I agree your PR works. But it is quite messy - for example, we check for the existence of a side_inputs property 3 times when the object is always a PTransform object. It seems much cleaner to just guarantee that this property will always exist on PTransform objects. This also means that if we use these properties elsewhere (now or in the future), we don't need to do more of these kinds of checks.

It seem reasonable to me that PTransform should have these fields in all cases. An alternative would be PTransform providing some functions to get these properties if they exist.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Regardless, this is a minor code quality issue and not a correctness one. It doesn't need to block the PR if you disagree.


replacement_transform_node = AppliedPTransform(
original_transform_node.parent,
Expand Down Expand Up @@ -1027,7 +1027,9 @@ def visit_transform(self, transform_node):
# type: (AppliedPTransform) -> None
if not transform_node.transform:
return
if transform_node.transform.runner_api_requires_keyed_input():
if hasattr(
transform_node.transform, 'runner_api_requires_keyed_input'
) and transform_node.transform.runner_api_requires_keyed_input():
pcoll = transform_node.inputs[0]
pcoll.element_type = typehints.coerce_to_kv_type(
pcoll.element_type, transform_node.full_label)
Expand All @@ -1046,7 +1048,7 @@ def visit_transform(self, transform_node):
== output.element_type.tuple_types[0]):
output.requires_deterministic_key_coder = (
deterministic_key_coders and transform_node.full_label)
for side_input in transform_node.transform.side_inputs:
for side_input in getattr(transform_node.transform, 'side_inputs', []):
if side_input.requires_keyed_input():
side_input.pvalue.element_type = typehints.coerce_to_kv_type(
side_input.pvalue.element_type,
Expand Down Expand Up @@ -1240,10 +1242,10 @@ def __init__(
# once environment is a first-class citizen in Beam graph and we have
# access to actual environment, not just an id.
self.resource_hints = dict(
transform.get_resource_hints()) if transform else {
} # type: Dict[str, bytes]
transform.get_resource_hints()) if transform and hasattr(
transform, 'get_resource_hints') else {} # type: Dict[str, bytes]

if transform:
if transform and hasattr(transform, 'annotations'):
annotations = {
**(annotations or {}), **encode_annotations(transform.annotations())
}
Expand Down Expand Up @@ -1399,8 +1401,11 @@ def named_inputs(self):
assert not self.main_inputs and not self.side_inputs
return {}
else:
named_inputs = self.transform._named_inputs(
self.main_inputs, self.side_inputs)
if hasattr(self.transform, '_named_inputs'):
named_inputs = self.transform._named_inputs(
self.main_inputs, self.side_inputs)
else:
named_inputs = {}
if not self.parts:
for name, pc_out in self.outputs.items():
if pc_out.producer is not self and pc_out not in named_inputs.values(
Expand All @@ -1414,7 +1419,10 @@ def named_outputs(self):
assert not self.outputs
return {}
else:
return self.transform._named_outputs(self.outputs)
if hasattr(self.transform, '_named_outputs'):
return self.transform._named_outputs(self.outputs)
else:
return {}

def to_runner_api(self, context):
# type: (PipelineContext) -> beam_runner_api_pb2.PTransform
Expand All @@ -1441,7 +1449,9 @@ def transform_to_runner_api(
context,
has_parts=bool(self.parts),
named_inputs=self.named_inputs())
return transform.to_runner_api(context, has_parts=bool(self.parts))
elif hasattr(transform, 'to_runner_api'):
return transform.to_runner_api(context, has_parts=bool(self.parts))
return None

# Iterate over inputs and outputs by sorted key order, so that ids are
# consistently generated for multiple runs of the same pipeline.
Expand Down Expand Up @@ -1527,7 +1537,8 @@ def from_runner_api(
environment_id=None,
annotations=proto.annotations)

if result.transform and result.transform.side_inputs:
if result.transform and hasattr(
result.transform, 'side_inputs') and result.transform.side_inputs:
for si, pcoll in zip(result.transform.side_inputs, side_inputs):
si.pvalue = pcoll
result.side_inputs = tuple(result.transform.side_inputs)
Expand Down
Loading