Skip to content

Commit c84f28f

Browse files
authored
Add option to pickle relative filepaths in cloudpickle. (#36300)
* Add option to pickle relative filepaths in cloudpickle. * Use relative filepaths for deterministic coder pickling. * Make filepath interceptor, add docstrings to CloudPickleConfig, revert coder changes (they need to be guarded by update compat flag).
1 parent 50e14ac commit c84f28f

File tree

1 file changed

+92
-24
lines changed

1 file changed

+92
-24
lines changed

sdks/python/apache_beam/internal/cloudpickle/cloudpickle.py

Lines changed: 92 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -66,6 +66,7 @@
6666
import itertools
6767
import logging
6868
import opcode
69+
import os
6970
import pickle
7071
from pickle import _getattribute as _pickle_getattribute
7172
import platform
@@ -108,9 +109,28 @@ def uuid_generator(_):
108109

109110
@dataclasses.dataclass
110111
class CloudPickleConfig:
111-
"""Configuration for cloudpickle behavior."""
112+
"""Configuration for cloudpickle behavior.
113+
114+
This class controls various aspects of how cloudpickle serializes objects.
115+
116+
Attributes:
117+
id_generator: Callable that generates unique identifiers for dynamic
118+
types. Controls isinstance semantics preservation. If None,
119+
disables type tracking and isinstance relationships are not
120+
preserved across pickle/unpickle cycles. If callable, generates
121+
unique IDs to maintain object identity.
122+
Default: uuid_generator (generates UUID hex strings).
123+
124+
skip_reset_dynamic_type_state: Whether to skip resetting state when
125+
reconstructing dynamic types. If True, skips state reset for
126+
already-reconstructed types.
127+
128+
filepath_interceptor: Used to modify filepaths in `co_filename` and
129+
function.__globals__['__file__'].
130+
"""
112131
id_generator: typing.Optional[callable] = uuid_generator
113132
skip_reset_dynamic_type_state: bool = False
133+
filepath_interceptor: typing.Optional[callable] = None
114134

115135

116136
DEFAULT_CONFIG = CloudPickleConfig()
@@ -396,6 +416,27 @@ def func():
396416
return subimports
397417

398418

419+
def get_relative_path(path):
420+
"""Returns the path of a filename relative to the longest matching directory
421+
in sys.path.
422+
Args:
423+
path: The path to the file.
424+
"""
425+
abs_path = os.path.abspath(path)
426+
longest_match = ""
427+
428+
for dir_path in sys.path:
429+
if not dir_path.endswith(os.path.sep):
430+
dir_path += os.path.sep
431+
432+
if abs_path.startswith(dir_path) and len(dir_path) > len(longest_match):
433+
longest_match = dir_path
434+
435+
if not longest_match:
436+
return path
437+
return os.path.relpath(abs_path, longest_match)
438+
439+
399440
# relevant opcodes
400441
STORE_GLOBAL = opcode.opmap["STORE_GLOBAL"]
401442
DELETE_GLOBAL = opcode.opmap["DELETE_GLOBAL"]
@@ -608,7 +649,7 @@ def _make_typevar(
608649
return _lookup_class_or_track(class_tracker_id, tv)
609650

610651

611-
def _decompose_typevar(obj, config):
652+
def _decompose_typevar(obj, config: CloudPickleConfig):
612653
return (
613654
obj.__name__,
614655
obj.__bound__,
@@ -619,7 +660,7 @@ def _decompose_typevar(obj, config):
619660
)
620661

621662

622-
def _typevar_reduce(obj, config):
663+
def _typevar_reduce(obj, config: CloudPickleConfig):
623664
# TypeVar instances require the module information hence why we
624665
# are not using the _should_pickle_by_reference directly
625666
module_and_name = _lookup_module_and_qualname(obj, name=obj.__name__)
@@ -671,7 +712,7 @@ def _make_dict_items(obj, is_ordered=False):
671712
# -------------------------------------------------
672713

673714

674-
def _class_getnewargs(obj, config):
715+
def _class_getnewargs(obj, config: CloudPickleConfig):
675716
type_kwargs = {}
676717
if "__module__" in obj.__dict__:
677718
type_kwargs["__module__"] = obj.__module__
@@ -690,7 +731,7 @@ def _class_getnewargs(obj, config):
690731
)
691732

692733

693-
def _enum_getnewargs(obj, config):
734+
def _enum_getnewargs(obj, config: CloudPickleConfig):
694735
members = {e.name: e.value for e in obj}
695736
return (
696737
obj.__bases__,
@@ -831,7 +872,7 @@ def _enum_getstate(obj):
831872
# these holes".
832873

833874

834-
def _code_reduce(obj):
875+
def _code_reduce(obj, config: CloudPickleConfig):
835876
"""code object reducer."""
836877
# If you are not sure about the order of arguments, take a look at help
837878
# of the specific type from types, for example:
@@ -850,6 +891,11 @@ def _code_reduce(obj):
850891
co_varnames = tuple(name for name in obj.co_varnames)
851892
co_freevars = tuple(name for name in obj.co_freevars)
852893
co_cellvars = tuple(name for name in obj.co_cellvars)
894+
895+
co_filename = obj.co_filename
896+
if (config and config.filepath_interceptor):
897+
co_filename = config.filepath_interceptor(co_filename)
898+
853899
if hasattr(obj, "co_exceptiontable"):
854900
# Python 3.11 and later: there are some new attributes
855901
# related to the enhanced exceptions.
@@ -864,7 +910,7 @@ def _code_reduce(obj):
864910
obj.co_consts,
865911
co_names,
866912
co_varnames,
867-
obj.co_filename,
913+
co_filename,
868914
co_name,
869915
obj.co_qualname,
870916
obj.co_firstlineno,
@@ -887,7 +933,7 @@ def _code_reduce(obj):
887933
obj.co_consts,
888934
co_names,
889935
co_varnames,
890-
obj.co_filename,
936+
co_filename,
891937
co_name,
892938
obj.co_firstlineno,
893939
obj.co_linetable,
@@ -908,7 +954,7 @@ def _code_reduce(obj):
908954
obj.co_code,
909955
obj.co_consts,
910956
co_varnames,
911-
obj.co_filename,
957+
co_filename,
912958
co_name,
913959
obj.co_firstlineno,
914960
obj.co_lnotab,
@@ -932,7 +978,7 @@ def _code_reduce(obj):
932978
obj.co_consts,
933979
co_names,
934980
co_varnames,
935-
obj.co_filename,
981+
co_filename,
936982
co_name,
937983
obj.co_firstlineno,
938984
obj.co_lnotab,
@@ -1043,7 +1089,7 @@ def _weakset_reduce(obj):
10431089
return weakref.WeakSet, (list(obj), )
10441090

10451091

1046-
def _dynamic_class_reduce(obj, config):
1092+
def _dynamic_class_reduce(obj, config: CloudPickleConfig):
10471093
"""Save a class that can't be referenced as a module attribute.
10481094
10491095
This method is used to serialize classes that are defined inside
@@ -1074,7 +1120,7 @@ def _dynamic_class_reduce(obj, config):
10741120
)
10751121

10761122

1077-
def _class_reduce(obj, config):
1123+
def _class_reduce(obj, config: CloudPickleConfig):
10781124
"""Select the reducer depending on the dynamic nature of the class obj."""
10791125
if obj is type(None): # noqa
10801126
return type, (None, )
@@ -1169,7 +1215,7 @@ def _function_setstate(obj, state):
11691215
setattr(obj, k, v)
11701216

11711217

1172-
def _class_setstate(obj, state, skip_reset_dynamic_type_state):
1218+
def _class_setstate(obj, state, skip_reset_dynamic_type_state=False):
11731219
# Lock while potentially modifying class state.
11741220
with _DYNAMIC_CLASS_TRACKER_LOCK:
11751221
if skip_reset_dynamic_type_state and obj in _DYNAMIC_CLASS_STATE_TRACKER_BY_CLASS:
@@ -1240,7 +1286,6 @@ class Pickler(pickle.Pickler):
12401286
_dispatch_table[property] = _property_reduce
12411287
_dispatch_table[staticmethod] = _classmethod_reduce
12421288
_dispatch_table[CellType] = _cell_reduce
1243-
_dispatch_table[types.CodeType] = _code_reduce
12441289
_dispatch_table[types.GetSetDescriptorType] = _getset_descriptor_reduce
12451290
_dispatch_table[types.ModuleType] = _module_reduce
12461291
_dispatch_table[types.MethodType] = _method_reduce
@@ -1300,9 +1345,15 @@ def _function_getnewargs(self, func):
13001345
base_globals = self.globals_ref.setdefault(id(func.__globals__), {})
13011346

13021347
if base_globals == {}:
1348+
if "__file__" in func.__globals__:
1349+
# Apply normalization ONLY to the __file__ attribute
1350+
file_path = func.__globals__["__file__"]
1351+
if self.config.filepath_interceptor:
1352+
file_path = self.config.filepath_interceptor(file_path)
1353+
base_globals["__file__"] = file_path
13031354
# Add module attributes used to resolve relative imports
13041355
# instructions inside func.
1305-
for k in ["__package__", "__name__", "__path__", "__file__"]:
1356+
for k in ["__package__", "__name__", "__path__"]:
13061357
if k in func.__globals__:
13071358
base_globals[k] = func.__globals__[k]
13081359

@@ -1318,15 +1369,16 @@ def _function_getnewargs(self, func):
13181369
def dump(self, obj):
13191370
try:
13201371
return super().dump(obj)
1321-
except RuntimeError as e:
1322-
if len(e.args) > 0 and "recursion" in e.args[0]:
1323-
msg = "Could not pickle object as excessively deep recursion required."
1324-
raise pickle.PicklingError(msg) from e
1325-
else:
1326-
raise
1372+
except RecursionError as e:
1373+
msg = "Could not pickle object as excessively deep recursion required."
1374+
raise pickle.PicklingError(msg) from e
13271375

13281376
def __init__(
1329-
self, file, protocol=None, buffer_callback=None, config=DEFAULT_CONFIG):
1377+
self,
1378+
file,
1379+
protocol=None,
1380+
buffer_callback=None,
1381+
config: CloudPickleConfig = DEFAULT_CONFIG):
13301382
if protocol is None:
13311383
protocol = DEFAULT_PROTOCOL
13321384
super().__init__(file, protocol=protocol, buffer_callback=buffer_callback)
@@ -1405,6 +1457,8 @@ def reducer_override(self, obj):
14051457
return _class_reduce(obj, self.config)
14061458
elif isinstance(obj, typing.TypeVar): # Add this check
14071459
return _typevar_reduce(obj, self.config)
1460+
elif isinstance(obj, types.CodeType):
1461+
return _code_reduce(obj, self.config)
14081462
elif isinstance(obj, types.FunctionType):
14091463
return self._function_reduce(obj)
14101464
else:
@@ -1487,6 +1541,11 @@ def save_typevar(self, obj, name=None):
14871541

14881542
dispatch[typing.TypeVar] = save_typevar
14891543

1544+
def save_code(self, obj, name=None):
1545+
return self.save_reduce(*_code_reduce(obj, self.config), obj=obj)
1546+
1547+
dispatch[types.CodeType] = save_code
1548+
14901549
def save_function(self, obj, name=None):
14911550
"""Registered with the dispatch to handle all function types.
14921551
@@ -1532,7 +1591,12 @@ def save_pypy_builtin_func(self, obj):
15321591
# Shorthands similar to pickle.dump/pickle.dumps
15331592

15341593

1535-
def dump(obj, file, protocol=None, buffer_callback=None, config=DEFAULT_CONFIG):
1594+
def dump(
1595+
obj,
1596+
file,
1597+
protocol=None,
1598+
buffer_callback=None,
1599+
config: CloudPickleConfig = DEFAULT_CONFIG):
15361600
"""Serialize obj as bytes streamed into file
15371601
15381602
protocol defaults to cloudpickle.DEFAULT_PROTOCOL which is an alias to
@@ -1550,7 +1614,11 @@ def dump(obj, file, protocol=None, buffer_callback=None, config=DEFAULT_CONFIG):
15501614
config=config).dump(obj)
15511615

15521616

1553-
def dumps(obj, protocol=None, buffer_callback=None, config=DEFAULT_CONFIG):
1617+
def dumps(
1618+
obj,
1619+
protocol=None,
1620+
buffer_callback=None,
1621+
config: CloudPickleConfig = DEFAULT_CONFIG):
15541622
"""Serialize obj as a string of bytes allocated in memory
15551623
15561624
protocol defaults to cloudpickle.DEFAULT_PROTOCOL which is an alias to

0 commit comments

Comments
 (0)