From ce45bfd2f204b91cc7824cf22de8cee4a19a340e Mon Sep 17 00:00:00 2001 From: Tom Augspurger Date: Thu, 18 Jul 2019 09:08:48 -0500 Subject: [PATCH 1/5] Added array optimimzation fuse notebook --- .gitignore | 1 + applications/array-optimization.ipynb | 231 ++++++++++++++++++++++++++ conf.py | 7 + 3 files changed, 239 insertions(+) create mode 100644 applications/array-optimization.ipynb diff --git a/.gitignore b/.gitignore index 70702445..6c37994b 100644 --- a/.gitignore +++ b/.gitignore @@ -4,3 +4,4 @@ _build/ mydask.png dataframes/data .idea/ +*.zarr diff --git a/applications/array-optimization.ipynb b/applications/array-optimization.ipynb new file mode 100644 index 00000000..a7627106 --- /dev/null +++ b/applications/array-optimization.ipynb @@ -0,0 +1,231 @@ +{ + "cells": [ + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "# Array Scheduling Optimization\n", + "\n", + "In this example, we'll perform a typical workflow\n", + "\n", + "* Load batches of data\n", + "* Stack batches into a single Dask Array\n", + "* Rechunk the data for downstream processing\n", + "* Write rechunked data to disk, using Zarr\n", + "\n", + "Depending on the rechunking pattern, this is an embarassingly parallel opertion. However,\n", + "the Dask scheduler doesn't necessarily know that, which may result in sub-optimal scheduling. We'll configure the optimization settings to ensure that we have ideal scheduling to minimize data transfer across the cluster." + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "from dask.distributed import Client\n", + "import dask.array as da\n", + "import zarr\n", + "\n", + "client = Client(processes=False, threads_per_worker=4,\n", + " n_workers=1, memory_limit='2GB')\n", + "client" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "Rather than loading data from disk, we'll generate random data in memory.\n", + "We'll have 25 batches, each of which has shape `(2,000,000 x 90,000)`.\n", + "\n", + "We rechunk along just the first axis (preserving the chunking on the second axis)." + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "inputs = [da.random.random(size=2_000_000, chunks=90_000)\n", + " for _ in range(25)]\n", + "inputs_stacked = da.vstack(inputs)\n", + "inputs_rechunked = inputs_stacked.rechunk((50, 90_000))" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "inputs_stacked" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "inputs_rechunked" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "And we'll set up the writing to Zarr." + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "store = zarr.DirectoryStore('spike.zarr')\n", + "root = zarr.group(store, overwrite=True)\n", + "dest = root.empty_like(name='dest', data=inputs_rechunked, \n", + " chunks=inputs_rechunked.chunksize,\n", + " overwrite=True)" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "Examining the structure of the stacking and rechunking, we can see that the problem is embarassingly parallel. We'll take a look at the task graph for two of the blocks." + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "inputs_rechunked.blocks[0, :2].visualize(optimize_graph=True)" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "These different chains of computation -- from data loading to stacking to rechunking (and eventually writing) -- are completely independent. There's no shared dependencies between chains.\n", + "\n", + "Dask prefers to execute graphs depth first. If you zoom in on the visualization below, which is colored by the order Dask will prefer to execute tasks in, you'll notice that Dask wants to execute all the tasks from the first chain, then all the tasks from the second, and so on (in practice, Dask will notice that the data-generating tasks from the second chain are ready to execute before tasks concatenate or rechunk tasks in the first chain)." + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "inputs_rechunked.blocks[0, :2].visualize(color='order', optimize_graph=True)" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "What does this imply for the distribution of tasks across a cluster of machines?\n", + "\n", + "Because Dask would like to complete all the tasks from a single chain as quickly as possible, they'll end up being scheduled on different machines. For the (perhaps more typical) non-embarassingly parallel workload, this is fine. But for embarassingly parallel workloads it's not optimal. The rechunk / reduction step may end up needing data from two different machines, requiring a data transfer. But we know that shouldn't be necessary in this case, since it's an embarassingly parallel problem at a higher level.\n", + "\n", + "Let's actually write the data, and monitor the dashboard while writing." + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "%time _ = inputs_rechunked.store(dest, lock=False)" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "In this case we end up with many small tasks. Depending on various things (the ratio of machines to cores on the cluster, the memory available on the cluster, bandwidth between machines, network latency), tasks from a single chain may end up on different machines, requiring transfer (highlighted in red). See https://github.com/dask/dask/issues/5105 for an example.\n", + "\n", + "We can acheive the desired scheduling by having Dask *fuse* chains of tasks. See :ref:`dask.optimize.fuse` for more, but the default values aren't aggressive enough for this computation. We want to ensure that the 25 original inputs (we used `da.random.random`) are fused into a single task. This ensures they'll be executed on a single machine, so the following rechunking will happen on the same machine." + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "import dask" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "with dask.config.set(fuse_ave_width=25):\n", + " display(inputs_rechunked.blocks[0, :2].visualize(optimize_graph=True))" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "Let's perform the write, and again observe the dashboard." + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "%%time\n", + "\n", + "with dask.config.set(fuse_ave_width=25):\n", + " write = inputs_rechunked.store(dest, lock=False)" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "Overall, we notice larger chunks, and no communcation. Each of these contributes to a faster runtime.\n", + "\n", + "Fusing tasks is not an unambiguously good thing. As the docs for :ref:`dask.optimize.fuse` state\n", + "\n", + "> This trades parallelism opportunities for faster scheduling by making tasks less granular.\n", + "\n", + "In this case, *we* know that we can already acheive ideal parallelism by the coarse-grained embarassinly parallel nature of the problem. We're happy to have less fine-grained parllelism since we know we'll still be able to saturate the cluster." + ] + } + ], + "metadata": { + "kernelspec": { + "display_name": "Python 3", + "language": "python", + "name": "python3" + }, + "language_info": { + "codemirror_mode": { + "name": "ipython", + "version": 3 + }, + "file_extension": ".py", + "mimetype": "text/x-python", + "name": "python", + "nbconvert_exporter": "python", + "pygments_lexer": "ipython3", + "version": "3.7.3" + } + }, + "nbformat": 4, + "nbformat_minor": 2 +} diff --git a/conf.py b/conf.py index 484152da..3bfcc613 100644 --- a/conf.py +++ b/conf.py @@ -40,6 +40,7 @@ # ones. extensions = [ "sphinx.ext.mathjax", + "sphinx.ext.intersphinx", 'nbsphinx', ] @@ -56,6 +57,12 @@ """ +intersphinx_mapping = { + "dask": ("https://docs.dask.org/en/latest/", None), + "distributed": ("https://distributed.dask.org/en/latest/", None), +} + + # Add any paths that contain templates here, relative to this directory. templates_path = ['_templates'] From d3a0095f6d2c6901896a9f7440f7d9057f9b0fa8 Mon Sep 17 00:00:00 2001 From: Tom Augspurger Date: Thu, 18 Jul 2019 09:49:44 -0500 Subject: [PATCH 2/5] install zarr --- applications/array-optimization.ipynb | 11 ++++++++++- 1 file changed, 10 insertions(+), 1 deletion(-) diff --git a/applications/array-optimization.ipynb b/applications/array-optimization.ipynb index a7627106..722d9454 100644 --- a/applications/array-optimization.ipynb +++ b/applications/array-optimization.ipynb @@ -11,7 +11,7 @@ "* Load batches of data\n", "* Stack batches into a single Dask Array\n", "* Rechunk the data for downstream processing\n", - "* Write rechunked data to disk, using Zarr\n", + "* Write rechunked data to disk, using [Zarr](https://zarr.readthedocs.io)\n", "\n", "Depending on the rechunking pattern, this is an embarassingly parallel opertion. However,\n", "the Dask scheduler doesn't necessarily know that, which may result in sub-optimal scheduling. We'll configure the optimization settings to ensure that we have ideal scheduling to minimize data transfer across the cluster." @@ -79,6 +79,15 @@ "And we'll set up the writing to Zarr." ] }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "!conda install -y -c conda-forge zarr" + ] + }, { "cell_type": "code", "execution_count": null, From f6199e0543c9d30e70f3dcdf47b17feda4961acc Mon Sep 17 00:00:00 2001 From: Tom Augspurger Date: Thu, 18 Jul 2019 10:07:24 -0500 Subject: [PATCH 3/5] install zarr --- applications/array-optimization.ipynb | 18 +++++++++--------- 1 file changed, 9 insertions(+), 9 deletions(-) diff --git a/applications/array-optimization.ipynb b/applications/array-optimization.ipynb index 722d9454..706f3f1a 100644 --- a/applications/array-optimization.ipynb +++ b/applications/array-optimization.ipynb @@ -17,6 +17,15 @@ "the Dask scheduler doesn't necessarily know that, which may result in sub-optimal scheduling. We'll configure the optimization settings to ensure that we have ideal scheduling to minimize data transfer across the cluster." ] }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "!conda install -y -c conda-forge zarr" + ] + }, { "cell_type": "code", "execution_count": null, @@ -79,15 +88,6 @@ "And we'll set up the writing to Zarr." ] }, - { - "cell_type": "code", - "execution_count": null, - "metadata": {}, - "outputs": [], - "source": [ - "!conda install -y -c conda-forge zarr" - ] - }, { "cell_type": "code", "execution_count": null, From 97fda16dfee447a8e8c4d8698467fe7cae2ba0c6 Mon Sep 17 00:00:00 2001 From: Tom Augspurger Date: Thu, 18 Jul 2019 12:16:09 -0500 Subject: [PATCH 4/5] add to index --- index.rst | 1 + 1 file changed, 1 insertion(+) diff --git a/index.rst b/index.rst index 8b5b84cb..177c5899 100644 --- a/index.rst +++ b/index.rst @@ -53,6 +53,7 @@ You can run these examples in a live session here: |Binder| applications/json-data-on-the-web applications/async-await applications/embarrassingly-parallel + applications/array-optimization applications/image-processing applications/prefect-etl applications/satellite-imagery-geotiff From 4affee9d31bccd327205af90dd495347c8f2f7f7 Mon Sep 17 00:00:00 2001 From: Tom Augspurger Date: Fri, 19 Jul 2019 15:38:39 -0500 Subject: [PATCH 5/5] updates --- applications/array-optimization.ipynb | 103 +++++++++++++++++++++++++- 1 file changed, 99 insertions(+), 4 deletions(-) diff --git a/applications/array-optimization.ipynb b/applications/array-optimization.ipynb index 706f3f1a..f9b8c8f5 100644 --- a/applications/array-optimization.ipynb +++ b/applications/array-optimization.ipynb @@ -46,7 +46,7 @@ "metadata": {}, "source": [ "Rather than loading data from disk, we'll generate random data in memory.\n", - "We'll have 25 batches, each of which has shape `(2,000,000 x 90,000)`.\n", + "We'll have 25 batches, each of which has shape `(2,000,000,)` split into chunks of 90,000 each.\n", "\n", "We rechunk along just the first axis (preserving the chunking on the second axis)." ] @@ -161,7 +161,7 @@ "source": [ "In this case we end up with many small tasks. Depending on various things (the ratio of machines to cores on the cluster, the memory available on the cluster, bandwidth between machines, network latency), tasks from a single chain may end up on different machines, requiring transfer (highlighted in red). See https://github.com/dask/dask/issues/5105 for an example.\n", "\n", - "We can acheive the desired scheduling by having Dask *fuse* chains of tasks. See :ref:`dask.optimize.fuse` for more, but the default values aren't aggressive enough for this computation. We want to ensure that the 25 original inputs (we used `da.random.random`) are fused into a single task. This ensures they'll be executed on a single machine, so the following rechunking will happen on the same machine." + "We can achieve the desired scheduling by having Dask *fuse* chains of tasks. See :ref:`dask.optimize.fuse` for more, but the default values aren't aggressive enough for this computation. We want to ensure that the 25 original inputs (we used `da.random.random`) are fused into a single task. This ensures they'll be executed on a single machine, so the following rechunking will happen on the same machine." ] }, { @@ -206,13 +206,108 @@ "cell_type": "markdown", "metadata": {}, "source": [ - "Overall, we notice larger chunks, and no communcation. Each of these contributes to a faster runtime.\n", + "Overall, we notice larger chunks, and no communication. Each of these contributes to a faster runtime.\n", "\n", + "The necessary value for `fuse_ave_width` depends strongly on the computation. If we increase the number of batches to, say, 75 we need to increase `fuse_ave_width` accordingly." + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "inputs = [da.random.random(size=2_000_000, chunks=90_000)\n", + " for _ in range(75)] # increased from 25 -> 75\n", + "inputs_stacked = da.vstack(inputs)\n", + "inputs_rechunked = inputs_stacked.rechunk((50, 90_000))\n", + "\n", + "store = zarr.DirectoryStore('spike.zarr')\n", + "root = zarr.group(store, overwrite=True)\n", + "dest = root.empty_like(name='dest', data=inputs_rechunked, \n", + " chunks=inputs_rechunked.chunksize,\n", + " overwrite=True)" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "inputs_stacked" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "inputs_rechunked" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "In this case, the widest number of tasks being reduces is the size-`50` blocks as a result of the `inputs_stacked.rechunk((50, 90_000))`. So we need to increase the `fuse_ave_width` to at least 50." + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "with dask.config.set(fuse_ave_width=50):\n", + " display(inputs_rechunked.blocks[0, :2].visualize(optimize_graph=True))" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "Without fusion." + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "%time write = inputs_rechunked.store(dest, lock=False)" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "With fusion." + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "%%time\n", + "\n", + "with dask.config.set(fuse_ave_width=50):\n", + " write = inputs_rechunked.store(dest, lock=False)" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ "Fusing tasks is not an unambiguously good thing. As the docs for :ref:`dask.optimize.fuse` state\n", "\n", "> This trades parallelism opportunities for faster scheduling by making tasks less granular.\n", "\n", - "In this case, *we* know that we can already acheive ideal parallelism by the coarse-grained embarassinly parallel nature of the problem. We're happy to have less fine-grained parllelism since we know we'll still be able to saturate the cluster." + "In this case, *we* know that we can already achieve ideal parallelism by the coarse-grained embarassinly parallel nature of the problem. We're happy to have less fine-grained parllelism since we know we'll still be able to saturate the cluster." ] } ],