Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Adds FutureAdapter that delegates executions to a threadpool for parallelization #1264

Merged
merged 6 commits into from
Jan 3, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
42 changes: 42 additions & 0 deletions docs/concepts/parallel-task.rst
Original file line number Diff line number Diff line change
@@ -1,6 +1,48 @@
Dynamic DAGs/Parallel Execution
----------------------------------

There are two approaches to parallel execution in Hamilton:

1. Using an adapter that submits each node/function to a system that handles execution, e.g. ray, dask, async, or a threadpool.
2. Using the `Parallelizable[]` and `Collect[]` types + delegating to an executor.

Using an Adapter
================
The adapter approach effectively farms out the execution of each node/function to a system that can handle resolving
futures. That is, Hamilton walks the DAG and submits each node to the adapter, which then submits the node for execution,
and internally the execution resolves any Futures from prior submitted nodes.

To make use of this, the general pattern is you apply an adapter to the driver and don't need to touch your Hamilton functions!:

.. code-block:: python

from hamilton import driver
from hamilton.execution import executors
from hamilton.plugins.h_threadpool import FutureAdapter
# from hamilton.plugins.h_ray import RayGraphAdapter
# from hamilton.plugins.h_dask import DaskGraphAdapter

dr = (
driver.Builder()
.with_modules(foo_module)
.with_adapter(FutureAdapter())
.build()
)

dr.execute(["my_variable"], inputs={...}, overrides={...})

The code above will execute the DAG submitting to a `ThreadPoolExecutor` (see :doc:`../reference/graph-adapters/ThreadPoolFutureAdapter`),
which is great if you're doing a lot of I/O bound work, e.g. making API calls, reading from a database, etc.

See this `Threadpool based example <https://github.com/dagworks-inc/hamilton/blob/main/examples/parallelism/lazy_threadpool_execution/>`_ for a complete example.

Other adapters, e.g. Ray :doc:`../reference/graph-adapters/RayGraphAdapter`, Dask :doc:`../reference/graph-adapters/DaskGraphAdapter`, etc... will submit to their respective executors, but will involve object serialization
(see caveats below).

Using the `Parallelizable[]` and `Collect[]` types
==================================================


Hamilton now has pluggable execution, which allows for the following:

1. Grouping of nodes into "tasks" (discrete execution unit between serialization boundaries)
Expand Down
11 changes: 11 additions & 0 deletions docs/reference/graph-adapters/ThreadPoolFutureAdapter.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
==========================
h_threadpool.FutureAdapter
==========================

This is an adapter to delegate execution of the individual nodes in a Hamilton graph to a threadpool.
This is useful when you have a graph with many nodes that can be executed in parallel.

.. autoclass:: hamilton.plugins.h_threadpool.FutureAdapter
:special-members: __init__
:members:
:inherited-members:
1 change: 1 addition & 0 deletions docs/reference/graph-adapters/index.rst
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ Reference
SimplePythonGraphAdapter
HamiltonGraphAdapter
AsyncGraphAdapter
ThreadPoolFutureAdapter
CachingGraphAdapter
DaskGraphAdapter
PySparkUDFGraphAdapter
Expand Down
53 changes: 53 additions & 0 deletions examples/parallelism/lazy_threadpool_execution/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
# Lazy threadpool execution

[![Open In Colab](https://colab.research.google.com/assets/colab-badge.svg)](https://colab.research.google.com/github/dagworks-inc/hamilton/blob/main/examples/parallelism/lazy_threadpool_execution/notebook.ipynb)

This example is different from the other examples under /parallelism/ in that
it demonstrates how to use an adapter to put each
function into a threadpool that allows for lazy DAG evaluation and for parallelism
to be achieved. This is useful when you have a lot of
functions doing I/O bound tasks and you want to speed
up the execution of your program. E.g. doing lots of
HTTP requests, reading/writing to disk, LLM API calls, etc.

> Note: this adapter does not support DAGs with Parallelizable and Collect functions; create an issue if you need this feature.

![DAG](my_functions.png)

The above image shows the DAG that will be executed. You can see from the structure
that the DAG can be parallelized, i.e. the left most nodes can be executed in parallel.

When you execute `run.py`, you will output that shows:

1. The DAG running in parallel -- check the image against what is printed.
2. The DAG logging to the Hamilton UI -- please adjust for you project.
3. The DAG running without the adapter -- this is to show the difference in execution time.
4. An async version of the DAG running in parallel -- this is to show that the performance of this approach is similar.

```bash
python run.py
```

To use this adapter:

```python
from hamilton import driver
from hamilton.plugins import h_threadpool

# import your hamilton functions
import my_functions

# Create the adapter
adapter = h_threadpool.FutureAdapter()

# Create a driver
dr = (
driver.Builder()
.with_modules(my_functions)
.with_adapters(adapter)
.build()
)
# execute
dr.execute(["s", "x", "a"]) # if the DAG can be parallelized it will be

```
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
55 changes: 55 additions & 0 deletions examples/parallelism/lazy_threadpool_execution/my_functions.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
import time


def a() -> str:
print("a")
time.sleep(3)
return "a"


def b() -> str:
print("b")
time.sleep(3)
return "b"


def c(a: str, b: str) -> str:
print("c")
time.sleep(3)
return a + " " + b


def d() -> str:
print("d")
time.sleep(3)
return "d"


def e(c: str, d: str) -> str:
print("e")
time.sleep(3)
return c + " " + d


def z() -> str:
print("z")
time.sleep(3)
return "z"


def y() -> str:
print("y")
time.sleep(3)
return "y"


def x(z: str, y: str) -> str:
print("x")
time.sleep(3)
return z + " " + y


def s(x: str, e: str) -> str:
print("s")
time.sleep(3)
return x + " " + e
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
import asyncio


async def a() -> str:
print("a")
await asyncio.sleep(3)
return "a"


async def b() -> str:
print("b")
await asyncio.sleep(3)
return "b"


async def c(a: str, b: str) -> str:
print("c")
await asyncio.sleep(3)
return a + " " + b


async def d() -> str:
print("d")
await asyncio.sleep(3)
return "d"


async def e(c: str, d: str) -> str:
print("e")
await asyncio.sleep(3)
return c + " " + d


async def z() -> str:
print("z")
await asyncio.sleep(3)
return "z"


async def y() -> str:
print("y")
await asyncio.sleep(3)
return "y"


async def x(z: str, y: str) -> str:
print("x")
await asyncio.sleep(3)
return z + " " + y


async def s(x: str, e: str) -> str:
print("s")
await asyncio.sleep(3)
return x + " " + e
Loading