Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

RuntimeError when sending graph with dask.array.ufunc.ufuncs to scheduler and importing the wrapped numpy function #8442

Open
hendrikmakait opened this issue Jan 8, 2024 · 5 comments
Labels
bug Something is broken

Comments

@hendrikmakait
Copy link
Member

Describe the issue:

There's an odd pickle error that occurs when a numpy function that dask.array wraps in dask.array.ufunc.ufunc:

Minimal Complete Verifiable Example:

import dask
from numpy import exp
import dask.array as da
from distributed import Client, SubprocessCluster

if __name__ == "__main__":
    client = Client(SubprocessCluster())
    da.exp(da.from_array([1,2,3])).compute()

Traceback

Traceback (most recent call last):
  File "/Users/hendrikmakait/projects/dask/distributed/distributed/scheduler.py", line 4659, in update_graph
    graph = deserialize(graph_header, graph_frames).data
^^^^^^^^^^^
  File "/Users/hendrikmakait/projects/dask/distributed/distributed/protocol/serialize.py", line 439, in deserialize
    return loads(header, frames)
      ^^^^^^^^^^^^^^^^^
  File "/Users/hendrikmakait/projects/dask/distributed/distributed/protocol/serialize.py", line 101, in pickle_loads
    return pickle.loads(x, buffers=buffers)
      ^^^^^^^^^^^^^^^^^
  File "/Users/hendrikmakait/projects/dask/distributed/distributed/protocol/pickle.py", line 94, in loads
    return pickle.loads(x, buffers=buffers)
^^^^^^^^^^^^^^^
AttributeError: Can't get attribute 'exp' on <module '__main__' from '/opt/homebrew/Caskroom/mambaforge/base/envs/dask-distributed/bin/dask'>

The above exception was the direct cause of the following exception:

Traceback (most recent call last):
  File "/Users/hendrikmakait/projects/dask/distributed/sandbox.py", line 8, in <module>
    da.exp(da.from_array([1,2,3])).compute()
  File "/opt/homebrew/Caskroom/mambaforge/base/envs/dask-distributed/lib/python3.12/site-packages/dask/base.py", line 358, in compute
    (result,) = compute(self, traverse=False, **kwargs)
                ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/opt/homebrew/Caskroom/mambaforge/base/envs/dask-distributed/lib/python3.12/site-packages/dask/base.py", line 644, in compute
    results = schedule(dsk, keys, **kwargs)
              ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/Users/hendrikmakait/projects/dask/distributed/distributed/client.py", line 2243, in _gather
    raise exception.with_traceback(traceback)
RuntimeError: Error during deserialization of the task graph. This frequently
occurs if the Scheduler and Client have different environments.
For more information, see
https://docs.dask.org/en/stable/deployment-considerations.html#consistent-software-environments

This seems to be reproducible with all ufuncs. For example, try replacing exp with absolute or sin. The SubprocessCluster is not needed, the scheduler just has to run in a different process (i.e., no process-local LocalCluster).

Anything else we need to know?:

This example starts working if we do either of the following:

  • Remove import dask
  • Remove from numpy import exp
  • Replace da.exp(...) with a different ufunc that is not imported, e.g., da.sin(...)

Environment:

  • Dask version: main
  • Python version: 3.12
  • Operating System: mac os 14.2.1
  • Install method (conda, pip, source): source
@hendrikmakait hendrikmakait added the bug Something is broken label Jan 8, 2024
@hendrikmakait
Copy link
Member Author

Note that the pickled bytestring looks like b'\x80\x05\x95\x85\x02\x00\x00\x00\x00\x00\x00\x8c\x1edistributed.protocol.serialize\x94\x8c\x08ToPickle\x94\x93\x94)\x81\x94}\x94\x8c\x04data\x94\x8c\x13dask.highlevelgraph\x94\x8c\x0eHighLevelGraph\x94\x93\x94)\x81\x94}\x94(\x8c\x0cdependencies\x94}\x94\x8a\x05\xc0\x1a&d\x01\x8f\x94s\x8c\x10key_dependencies\x94}\x94\x8c\x06layers\x94}\x94\x8a\x05\xc0\x1a&d\x01h\x06\x8c\x11MaterializedLayer\x94\x93\x94)\x81\x94}\x94(\x8c\x0bannotations\x94N\x8c\x16collection_annotations\x94N\x8c\x07mapping\x94}\x94(\x8c$exp-c9304f2f1cbc4206553f4d39554f4b0a\x94K\x00\x86\x94\x8c\x11dask.optimization\x94\x8c\x10SubgraphCallable\x94\x93\x94(}\x94h\x1a\x8c\x0b__mp_main__\x94\x8c\x03exp\x94\x93\x94\x8c\x13__dask_blockwise__0\x94\x86\x94sh\x1a\x8c\x13__dask_blockwise__0\x94\x85\x94\x8c6subgraph_callable-3f595432-e747-4c00-a2df-22ef695e08a7\x94t\x94R\x94\x8c&array-c1869b3304668e2d86c1b4d53128b6c9\x94K\x00\x86\x94\x86\x94h*K\x00\x86\x94\x8c\x12numpy.core.numeric\x94\x8c\x0b_frombuffer\x94\x93\x94(\x97\x8c\x05numpy\x94\x8c\x05dtype\x94\x93\x94\x8c\x02i8\x94\x89\x88\x87\x94R\x94(K\x03\x8c\x01<\x94NNNJ\xff\xff\xff\xffJ\xff\xff\xff\xffK\x00t\x94bK\x03\x85\x94\x8c\x01C\x94t\x94R\x94uubsubsb.' when it fails, and like b'\x80\x05\x95\x96\x02\x00\x00\x00\x00\x00\x00\x8c\x1edistributed.protocol.serialize\x94\x8c\x08ToPickle\x94\x93\x94)\x81\x94}\x94\x8c\x04data\x94\x8c\x13dask.highlevelgraph\x94\x8c\x0eHighLevelGraph\x94\x93\x94)\x81\x94}\x94(\x8c\x0cdependencies\x94}\x94\x8a\x05\x80$\xf8?\x01\x8f\x94s\x8c\x10key_dependencies\x94}\x94\x8c\x06layers\x94}\x94\x8a\x05\x80$\xf8?\x01h\x06\x8c\x11MaterializedLayer\x94\x93\x94)\x81\x94}\x94(\x8c\x0bannotations\x94N\x8c\x16collection_annotations\x94N\x8c\x07mapping\x94}\x94(\x8c$exp-c9304f2f1cbc4206553f4d39554f4b0a\x94K\x00\x86\x94\x8c\x11dask.optimization\x94\x8c\x10SubgraphCallable\x94\x93\x94(}\x94h\x1a\x8c\x1cnumpy.core._multiarray_umath\x94\x8c\x03exp\x94\x93\x94\x8c\x13__dask_blockwise__0\x94\x86\x94sh\x1a\x8c\x13__dask_blockwise__0\x94\x85\x94\x8c6subgraph_callable-3b083291-ae5f-471e-8bf4-a07d7a088c72\x94t\x94R\x94\x8c&array-c1869b3304668e2d86c1b4d53128b6c9\x94K\x00\x86\x94\x86\x94h*K\x00\x86\x94\x8c\x12numpy.core.numeric\x94\x8c\x0b_frombuffer\x94\x93\x94(\x97\x8c\x05numpy\x94\x8c\x05dtype\x94\x93\x94\x8c\x02i8\x94\x89\x88\x87\x94R\x94(K\x03\x8c\x01<\x94NNNJ\xff\xff\xff\xffJ\xff\xff\xff\xffK\x00t\x94bK\x03\x85\x94\x8c\x01C\x94t\x94R\x94uubsubsb.'.

Notably, the former contains __mp_main__\x94\x8c\x03exp while the latter contains numpy.core._multiarray_umath\x94\x8c\x03exp.

@fjetter
Copy link
Member

fjetter commented Jan 9, 2024

A slightly easier way to reproduce this is without the cluster

import dask
from numpy import array, exp
import dask.array as da

import pickle
from dask.base import collections_to_dsk
graph = collections_to_dsk(da.exp(da.from_array([1,2,3], chunks=(-1,))))
print(pickle.dumps(graph))

At least this script returns the same bytes as above and the diff between __mp_main__ vs numpy.core._multiarray is shown as well.

In fact, even removing the graphs themselves already shows the same byte string

import pickle
import dask
from numpy import exp

print(pickle.dumps(exp))

The dask import somehow forces the __mp_main__ into the pickle stream

@fjetter
Copy link
Member

fjetter commented Jan 9, 2024

It looks like import dask is triggering an import chain that eventually imports import multiprocessing.pool here and here

which mutates the sys.modules by aliasing __main__ to __mp_main__
here https://github.com/python/cpython/blob/2e17cad2b8899126eb2024bf75db331b871bd5bc/Lib/multiprocessing/__init__.py#L36-L37

so the new reproducer is

import pickle
import multiprocessing
from numpy import exp

print(pickle.dumps(exp))

@hendrikmakait
Copy link
Member Author

hendrikmakait commented Jan 9, 2024

Similar to @fjetter I went down this rabbit hole. Here are a few observations:

dill faced a similar issue a while back: uqfoundation/dill#392. This appears to have been fixed with python/cpython#23403. Indeed, when I use dill instead of pickle, the bytestring does not contain __mp_main__: b'\x80\x04\x95(\x00\x00\x00\x00\x00\x00\x00\x8c\x1cnumpy.core._multiarray_umath\x94\x8c\x03exp\x94\x93\x94.'

For imports to __main__, dask/distributed currently falls back to cloudpickle:

if b"__main__" in result or (
CLOUDPICKLE_GE_20
and getattr(inspect.getmodule(x), "__name__", None)
in cloudpickle.list_registry_pickle_by_value()
):
if len(result) < 1000 or not _always_use_pickle_for(x):
buffers.clear()
result = cloudpickle.dumps(x, **dump_kwargs)
I'm fairly certain that we should extend this __mp_main__ as well given the above issues.

At the moment, cloudpickle lacks a mechanism akin to python/cpython#23403. I've filed cloudpipe/cloudpickle#529 to fix that.

Unfortunately, just mashing those two fixes together won't fix our problem; maybe using dill would, I'm not sure. This might also mess up other things. For now, I am done with my investigation. From what I see, this issue should have been around for a long time and has not come up until recently, so I assume that this has a low impact.

@hendrikmakait
Copy link
Member Author

hendrikmakait commented Jan 9, 2024

A few last notes:

Just using dill doesn't work either. This results in a TypeError("cannot pickle 'generator' object") that's caught and falls back to other options.

As to why dill does a better job here, it's probably because they handle numpy.ufunc very explicitly: https://github.com/uqfoundation/dill/blob/5ed40d5090b6a60559935c1f23f86583d04097c6/dill/_dill.py#L366C21-L407

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
bug Something is broken
Projects
None yet
Development

No branches or pull requests

2 participants