Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Added array optimimzation fuse notebook #89

Open
wants to merge 5 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -4,3 +4,4 @@ _build/
mydask.png
dataframes/data
.idea/
*.zarr
240 changes: 240 additions & 0 deletions applications/array-optimization.ipynb
Original file line number Diff line number Diff line change
@@ -0,0 +1,240 @@
{
"cells": [
{
"cell_type": "markdown",
"metadata": {},
"source": [
"# Array Scheduling Optimization\n",
"\n",
"In this example, we'll perform a typical workflow\n",
"\n",
"* Load batches of data\n",
"* Stack batches into a single Dask Array\n",
"* Rechunk the data for downstream processing\n",
"* Write rechunked data to disk, using [Zarr](https://zarr.readthedocs.io)\n",
"\n",
"Depending on the rechunking pattern, this is an embarassingly parallel opertion. However,\n",
"the Dask scheduler doesn't necessarily know that, which may result in sub-optimal scheduling. We'll configure the optimization settings to ensure that we have ideal scheduling to minimize data transfer across the cluster."
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"!conda install -y -c conda-forge zarr"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"from dask.distributed import Client\n",
"import dask.array as da\n",
"import zarr\n",
"\n",
"client = Client(processes=False, threads_per_worker=4,\n",
" n_workers=1, memory_limit='2GB')\n",
"client"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"Rather than loading data from disk, we'll generate random data in memory.\n",
"We'll have 25 batches, each of which has shape `(2,000,000 x 90,000)`.\n",
"\n",
"We rechunk along just the first axis (preserving the chunking on the second axis)."
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"inputs = [da.random.random(size=2_000_000, chunks=90_000)\n",
" for _ in range(25)]\n",
"inputs_stacked = da.vstack(inputs)\n",
"inputs_rechunked = inputs_stacked.rechunk((50, 90_000))"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"inputs_stacked"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"inputs_rechunked"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"And we'll set up the writing to Zarr."
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"store = zarr.DirectoryStore('spike.zarr')\n",
"root = zarr.group(store, overwrite=True)\n",
"dest = root.empty_like(name='dest', data=inputs_rechunked, \n",
" chunks=inputs_rechunked.chunksize,\n",
" overwrite=True)"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"Examining the structure of the stacking and rechunking, we can see that the problem is embarassingly parallel. We'll take a look at the task graph for two of the blocks."
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"inputs_rechunked.blocks[0, :2].visualize(optimize_graph=True)"

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice trick, didn't know about this :-)

]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"These different chains of computation -- from data loading to stacking to rechunking (and eventually writing) -- are completely independent. There's no shared dependencies between chains.\n",
"\n",
"Dask prefers to execute graphs depth first. If you zoom in on the visualization below, which is colored by the order Dask will prefer to execute tasks in, you'll notice that Dask wants to execute all the tasks from the first chain, then all the tasks from the second, and so on (in practice, Dask will notice that the data-generating tasks from the second chain are ready to execute before tasks concatenate or rechunk tasks in the first chain)."
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"inputs_rechunked.blocks[0, :2].visualize(color='order', optimize_graph=True)"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"What does this imply for the distribution of tasks across a cluster of machines?\n",
"\n",
"Because Dask would like to complete all the tasks from a single chain as quickly as possible, they'll end up being scheduled on different machines. For the (perhaps more typical) non-embarassingly parallel workload, this is fine. But for embarassingly parallel workloads it's not optimal. The rechunk / reduction step may end up needing data from two different machines, requiring a data transfer. But we know that shouldn't be necessary in this case, since it's an embarassingly parallel problem at a higher level.\n",
"\n",
"Let's actually write the data, and monitor the dashboard while writing."
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"%time _ = inputs_rechunked.store(dest, lock=False)"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"In this case we end up with many small tasks. Depending on various things (the ratio of machines to cores on the cluster, the memory available on the cluster, bandwidth between machines, network latency), tasks from a single chain may end up on different machines, requiring transfer (highlighted in red). See https://github.com/dask/dask/issues/5105 for an example.\n",
"\n",
"We can acheive the desired scheduling by having Dask *fuse* chains of tasks. See :ref:`dask.optimize.fuse` for more, but the default values aren't aggressive enough for this computation. We want to ensure that the 25 original inputs (we used `da.random.random`) are fused into a single task. This ensures they'll be executed on a single machine, so the following rechunking will happen on the same machine."
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"import dask"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"with dask.config.set(fuse_ave_width=25):\n",
" display(inputs_rechunked.blocks[0, :2].visualize(optimize_graph=True))"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"Let's perform the write, and again observe the dashboard."
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"%%time\n",
"\n",
"with dask.config.set(fuse_ave_width=25):\n",
" write = inputs_rechunked.store(dest, lock=False)"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"Overall, we notice larger chunks, and no communcation. Each of these contributes to a faster runtime.\n",
"\n",
"Fusing tasks is not an unambiguously good thing. As the docs for :ref:`dask.optimize.fuse` state\n",
"\n",
"> This trades parallelism opportunities for faster scheduling by making tasks less granular.\n",
"\n",
"In this case, *we* know that we can already acheive ideal parallelism by the coarse-grained embarassinly parallel nature of the problem. We're happy to have less fine-grained parllelism since we know we'll still be able to saturate the cluster."
]
}
],
"metadata": {
"kernelspec": {
"display_name": "Python 3",
"language": "python",
"name": "python3"
},
"language_info": {
"codemirror_mode": {
"name": "ipython",
"version": 3
},
"file_extension": ".py",
"mimetype": "text/x-python",
"name": "python",
"nbconvert_exporter": "python",
"pygments_lexer": "ipython3",
"version": "3.7.3"
}
},
"nbformat": 4,
"nbformat_minor": 2
}
7 changes: 7 additions & 0 deletions conf.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
# ones.
extensions = [
"sphinx.ext.mathjax",
"sphinx.ext.intersphinx",
'nbsphinx',
]

Expand All @@ -56,6 +57,12 @@
"""


intersphinx_mapping = {
"dask": ("https://docs.dask.org/en/latest/", None),
"distributed": ("https://distributed.dask.org/en/latest/", None),
}


# Add any paths that contain templates here, relative to this directory.
templates_path = ['_templates']

Expand Down
1 change: 1 addition & 0 deletions index.rst
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ You can run these examples in a live session here: |Binder|
applications/json-data-on-the-web
applications/async-await
applications/embarrassingly-parallel
applications/array-optimization
applications/image-processing
applications/prefect-etl
applications/satellite-imagery-geotiff
Expand Down