-
Notifications
You must be signed in to change notification settings - Fork 133
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Adds FutureAdapter that delegates executions to a threadpool for parallelization #1264
Conversation
Taking inspiration from #1263, I implemented a similar adapter to how async works. We get away with this because we don't encounter SERDE boundaries. If you run the example DAG you'll see that: 1. it is parallelized as it should be 2. you can use caching and the tracking adapter Rough edges: - haven't tested this extensively, but seems to just work. - need to add tests for it & docs, etc.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Mark as experimental IMO
:return: the result of the execution of the graph. | ||
""" | ||
for k, v in outputs.items(): | ||
if isinstance(v, Future): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think you want concurrent.futures.as_completed(...)
. Can't convince myselfe that we're not deadlocking in certain cases, but I think the fact that we're doing topological order should be good enough...
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think this matters here. Since this wont block anything executing in a thread...
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's IMO slightly cleaner, but yeah, it'll not return until the slowest does regardless. Might also not want to mutate the outputs dictionary (copying is cleaner). But yes, nits.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
👍 Looks good to me! Reviewed everything up to 037c915 in 1 minute and 12 seconds
More details
- Looked at
501
lines of code in9
files - Skipped
2
files when reviewing. - Skipped posting
7
drafted comments based on config settings.
1. tests/plugins/test_h_threadpool.py:5
- Draft comment:
Consider adding tests for edge cases and error handling, such as: - Submitting a function that raises an exception.
- Handling of non-Future objects in
build_result
. - Using
FutureAdapter
with nomax_workers
specified. - Reason this comment was not posted:
Confidence changes required:50%
The test file is missing tests for edge cases and error handling.
2. examples/parallelism/lazy_threadpool_execution/run.py:11
- Draft comment:
Typo in filename: "my_funtions.png" should be "my_functions.png". - Reason this comment was not posted:
Decided after close inspection that this draft comment was likely wrong and/or not actionable:
While this is a real typo, it's a very minor issue in an example file. The typo would be immediately obvious when the code runs since the generated file would have a misspelled name. It's not a logic error or code quality issue. According to the rules, we shouldn't make comments about obvious or unimportant issues.
The typo could potentially cause confusion for users following the example, and inconsistent filenames across examples might be worth fixing for documentation purposes.
While consistency is good, this is still a very minor cosmetic issue that would be immediately apparent when running the code, and fixing it isn't critical to the code's functionality.
The comment should be deleted as it points out an obvious and unimportant issue that doesn't affect code functionality or quality.
3. examples/parallelism/lazy_threadpool_execution/run.py:20
- Draft comment:
Consider makingproject_id
andusername
configurable or document that these need to be changed by the user. - Reason this comment was not posted:
Confidence changes required:50%
Therun.py
script uses a hardcoded project ID and username, which might not be suitable for all users.
4. examples/parallelism/lazy_threadpool_execution/run.py:17
- Draft comment:
Ensure thathamilton_sdk
is a required dependency or provide instructions for its installation. - Reason this comment was not posted:
Confidence changes required:50%
Therun.py
script importsadapters
fromhamilton_sdk
, which might not be available in all environments.
5. hamilton/plugins/h_threadpool.py:71
- Draft comment:
Consider adding exception handling for the function execution to prevent unhandled exceptions from crashing the thread pool. - Reason this comment was not posted:
Decided after close inspection that this draft comment was likely wrong and/or not actionable:
ThreadPoolExecutor automatically captures exceptions in Future objects, which can be retrieved via Future.exception(). The code already uses Future.result() which will raise any captured exceptions. Adding additional exception handling would be redundant since the executor already provides this functionality. The comment seems to misunderstand how ThreadPoolExecutor works.
Maybe there are specific types of exceptions that should be handled differently or logged in a particular way for this application.
While custom exception handling could be added, it should be driven by specific requirements rather than general concerns about unhandled exceptions, which are already managed by ThreadPoolExecutor.
The comment should be deleted as it suggests adding unnecessary exception handling that's already provided by ThreadPoolExecutor.
6. examples/parallelism/lazy_threadpool_execution/my_functions.py:4
- Draft comment:
Function names likea
,b
,c
, etc., are not descriptive. Consider using more descriptive names to improve code readability. This applies to all similar functions in this file and inmy_functions_async.py
. - Reason this comment was not posted:
Decided after close inspection that this draft comment was likely wrong and/or not actionable:
Since this is in an examples/parallelism directory, these functions appear to be deliberately simplified to demonstrate a concept about parallel execution. The single-letter names might actually be beneficial here to keep the example focused on the parallelism patterns rather than domain-specific logic. The file name and directory structure suggest this is educational/example code rather than production code.
The comment raises a valid point about code readability in general, but I might be overlooking the pedagogical value of keeping example code extremely simple.
In example code demonstrating technical concepts like parallelism, overly descriptive names could actually detract from the learning objective by adding unnecessary complexity.
Delete the comment. While descriptive names are generally good practice, in this specific context of example code demonstrating parallelism concepts, the simple single-letter names serve a valid pedagogical purpose.
7. examples/parallelism/lazy_threadpool_execution/run.py:20
- Draft comment:
Avoid hardcoding sensitive information likeproject_id
andusername
. Consider using environment variables or configuration files to manage such data securely. This applies to all similar instances in this file. - Reason this comment was not posted:
Decided after close inspection that this draft comment was likely wrong and/or not actionable:
While hardcoded credentials are generally a security concern, this is in an examples directory and appears to be demo code. The project_id is just "21" and has a comment "modify this as needed". The email address appears to be a real email but is likely just for demonstration. Example code often includes placeholder values to show the expected format.
The comment raises a valid security best practice. Hardcoded credentials could be copied by users into production code.
Since this is clearly example code and the values are meant to be replaced, adding configuration management would make the example more complex and harder to understand.
Delete the comment. While the security practice is valid, this is example code where simple placeholder values are more appropriate for demonstration purposes.
Workflow ID: wflow_jgnKlIGIxZMkXrlc
You can customize Ellipsis with 👍 / 👎 feedback, review rules, user-specific overrides, quiet
mode, and more.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
👍 Looks good to me! Incremental review on cecc050 in 24 seconds
More details
- Looked at
30
lines of code in2
files - Skipped
0
files when reviewing. - Skipped posting
1
drafted comments based on config settings.
1. examples/parallelism/lazy_threadpool_execution/run.py:11
- Draft comment:
There's a typo in the filename. It should bemy_functions.png
instead ofmy_funtions.png
. - Reason this comment was not posted:
Comment looked like it was already resolved.
Workflow ID: wflow_7kOHRbqPlOa0bhhM
You can customize Ellipsis with 👍 / 👎 feedback, review rules, user-specific overrides, quiet
mode, and more.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
👍 Looks good to me! Incremental review on 4ac52f1 in 4 seconds
More details
- Looked at
9
lines of code in1
files - Skipped
0
files when reviewing. - Skipped posting
2
drafted comments based on config settings.
1. examples/parallelism/lazy_threadpool_execution/requirements.txt:1
- Draft comment:
It's recommended to specify exact versions for dependencies to ensure reproducibility. - Reason this comment was not posted:
Confidence changes required:50%
The requirements file should specify exact versions to ensure reproducibility.
2. examples/parallelism/lazy_threadpool_execution/requirements.txt:3
- Draft comment:
Remove the empty line at the end of the file for consistency. - Reason this comment was not posted:
Confidence changes required:10%
The requirements file should not have an empty line at the end. This is a minor issue but should be corrected for consistency.
Workflow ID: wflow_TnxeLrcDmtuh7gwY
You can customize Ellipsis with 👍 / 👎 feedback, review rules, user-specific overrides, quiet
mode, and more.
So that people can adjust the result accordingly.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
👍 Looks good to me! Incremental review on 4f136d4 in 45 seconds
More details
- Looked at
133
lines of code in2
files - Skipped
0
files when reviewing. - Skipped posting
6
drafted comments based on config settings.
1. tests/plugins/test_h_threadpool.py:4
- Draft comment:
The importfrom hamilton import lifecycle
is unused and can be removed to clean up the code. - Reason this comment was not posted:
Confidence changes required:10%
The import statement forlifecycle
is not used in the test file. Removing it will clean up the code.
2. hamilton/plugins/h_threadpool.py:72
- Draft comment:
Returning[Any]
frominput_types
is not very informative. Consider returning an empty list or a more specific type if applicable. - Reason this comment was not posted:
Confidence changes required:50%
Theinput_types
method inFutureAdapter
returns a list withAny
, which is not very informative. It might be better to return an empty list or a more specific type if possible.
3. hamilton/plugins/h_threadpool.py:80
- Draft comment:
ReturningAny
fromoutput_type
is not very informative. Consider returning a more specific type if applicable. - Reason this comment was not posted:
Confidence changes required:50%
Theoutput_type
method inFutureAdapter
returnsAny
whenresult_builder
is not provided. It might be better to return a more specific type if possible.
4. hamilton/plugins/h_threadpool.py:82
- Draft comment:
Thedo_remote_execute
method inFutureAdapter
is not following the single responsibility principle as it both submits a function to the executor and wraps it with_new_fn
. Consider splitting this into two methods for clarity. - Reason this comment was not posted:
Comment was on unchanged code.
5. hamilton/plugins/h_threadpool.py:48
- Draft comment:
TheFutureAdapter
class is handling two responsibilities: managing thread execution and building results. Consider separating these concerns to adhere to the Single Responsibility Principle. - Reason this comment was not posted:
Comment was not on a valid diff hunk.
6. hamilton/plugins/h_threadpool.py:64
- Draft comment:
The methodsinput_types
andoutput_type
inFutureAdapter
could have more descriptive names to better convey their purpose. - Reason this comment was not posted:
Decided after close inspection that this draft comment was likely wrong and/or not actionable:
- These method names are part of an interface (ResultBuilder) implementation. 2. The names are clear and descriptive for their purpose. 3. Changing interface method names would break compatibility. 4. The methods have good docstrings explaining their purpose. 5. The names follow standard Python type annotation patterns.
The names could potentially be more specific about their role in the FutureAdapter context. However, they're interface methods.
Interface method names should be consistent across implementations, and these names are already clear and well-documented.
The comment should be deleted as it suggests renaming interface methods that are already well-named and documented.
Workflow ID: wflow_PfXblcHcDhyI1ynw
You can customize Ellipsis with 👍 / 👎 feedback, review rules, user-specific overrides, quiet
mode, and more.
Taking inspiration from #1263, I implemented
a similar adapter to how async works.
We get away with this because we don't encounter
SERDE boundaries.
If you run the example DAG you'll see that:
Rough edges:
Changes
How I tested this
See this image from the tracker showing that things are running in parallel:
Notes - TODO:
- [ ] example uses materializers & caching to exercise things.Checklist