-
Notifications
You must be signed in to change notification settings - Fork 133
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Caching for async driver #1263
Comments
@nemilentsau thanks! Currently this is a known limitation. What's your use case / context? There could be something we could help you do in the meantime with some more details. |
@skrawcz I am building a pipeline to process documents with complex layouts that involves multiple calls to various LLMs. Certain calls can be run in parallel as they extract complimentary yet independent information from the documents. I'd like to cache those calls as not to re-run them while I am developing. In principle overrides could do the trick, although it can get a bit cumbersome when you have large graph. But also, it seems that materialization doesn't work with async_driver as well. Do you have any means to automate saving nodes outputs that would work for async? dr = await ( async_driver.Builder().with_modules(pdf_processor_async).with_materializers(*materializers).build()) ---------------------------------------------------------------------------ValueError Traceback (most recent call last)/home/sagemaker-user/ai-experiment/llm-frameworks/hamilton/test_hamilton_async.ipynb Cell 12 line 2 1 dr = await (----> 2 async_driver.Builder().with_modules(pdf_processor_async).with_materializers(*materializers).build() 3 )File /opt/conda/lib/python3.10/site-packages/hamilton/async_driver.py:472, in Builder.with_materializers(self, *materializers) 469 def with_materializers( 470 self, *materializers: typing.Union[ExtractorFactory, MaterializerFactory] 471 ) -> "Builder":--> 472 self._not_supported("with_materializers")File /opt/conda/lib/python3.10/site-packages/hamilton/async_driver.py:462, in Builder._not_supported(self, method_name, additional_message) 461 def _not_supported(self, method_name: str, additional_message: str = ""):--> 462 raise ValueError( 463 f"Builder().{method_name}() is not supported for the async driver. {additional_message}" 464 )ValueError: Builder().with_materializers() is not supported for the async driver. |
Ah okay. So for development and you've seen overrides, good; you can just request the outputs you want cached and then feed them back in as overrides on the next cycle... but yes can be a little messy when the graph gets large. Question, is this happening in a web-service? Or? Just curious about the async python requirement - Hamilton has a synchronous python way to do parallelization that does work with caching.
Yes unfortunately the async driver doesn't have full parity with synchronous Hamilton driver (for materializers we just need to implement an async version of it). I think short term you could write a quick async decorator that could check the filesystem for a serialized result or not for that function? E.g. make a decorator that does something similar to this? |
Yeah, I've implemented parallelization as well and it works. But I feel that it makes code less clear. Conceptually, I am trying to run some document through text-only and multimodal llms and compare the results. If I parallelize execution of the nodes with async, I am getting a graph that is much easier to read and understand (and hence maintain in the future), compared to using map-reduce type of parallelization. I actually implemented the same graph with Burr, but unfortunately Burr doesn't seem to allow to run several branches in parallel, only conditional branching |
Makes sense -- looks like you have a few good approaches. FWIW Burr does allow parallelism -- this is a new but very powerful feature. Works with async too.
That said, it's also the "map/reduce" style of parallelism, and doesn't have the multiple branches, but it's pretty good for this kind of stuff. IMO Hamilton is a bit more natural for non-map-reduce in your case, but agreed regarding async there's some work to do with caching/reusing results... @skrawcz's idea for decorators might be a good approach |
Agree. I think implementing the async decorator might be a good solution for my use case. I did play with parllelism in burr. It's pretty nice. It's just in my case I would prefer non map-reduce style implementation. Thank you for your help guys |
Yep, makes sense! Let us know how your decorator goes -- feel free to post code samples back that we can share out/provide feedback on if that's helpful! |
Thanks for raising the issue. Sorry we don't have anything first class yet for async. Just to summarize your situation:
Your ideal solution would:
Is that right? |
Yes, when it comes to async. A bit more complex than that when it comes to caching. I do want to materialize certain assets throughout the pipeline for debugging/analysis purposes. Outputs of all the intermediary llm call, ocr engines outputs, etc, for example Moving forward, I might want to re-run my pipeline in production from certain steps if I detect extraction/hallucination issues. Or most likely utilize a different extraction scenario based on the type of issue. But I have other issues to handle before I get to implementing these functionalities. At the moment my pipe simply throws an error in such a case and thus I don't need full caching capabilities in production |
Have you tried using the Hamilton UI here? It supports async and logs outputs; we'd love more input here. E.g. maybe we should expose and API to pull back out what was tracked for a run? On the roadmap is to converge caching with what the Hamilton UI exposes... Another idea:
async def execute_wrapper(dr, cache_path, requested_outputs, ...):
nodes = dr.list_available_variables(...)
# find what's in the cache
cached = _whats_in_the_cache(cache_path, nodes)
# inject
return await dr.execute(requested_outputs, inputs=..., overrides=cached) |
Taking inspiration from #1263, I implemented a similar adapter to how async works. We get away with this because we don't encounter SERDE boundaries. If you run the example DAG you'll see that: 1. it is parallelized as it should be 2. you can use caching and the tracking adapter Rough edges: - haven't tested this extensively, but seems to just work. - need to add tests for it & docs, etc.
@nemilentsau happy new year -- want to give this branch a try? What I did was make regular Hamilton behave similar to the async driver but using multithreading. It seems to work as expected, along with caching and the Hamilton UI working out of the box... See the example in the PR. I didn't try materializers but they should also just work. |
@skrawcz Happy new year! I did run this branch and it works! Thank you so much! This is exactly what I've been looking for |
I haven't tried UI yet. I'll share my feedback after I play with it. But generally speaking having API access to log outputs shall be beneficial. For example, I might want to run a script to do a batch analysis on logs to see what is happening at certain nodes in my pipe |
Awesome. I can try to get this released this week / early next week. If you wouldn't mind testing it, that would be great please... |
Perfect! This would be great. I will test it out then and let you know if I find any issues |
Would you mind taking |
@skrawcz Sure thing. I will check it out |
Taking inspiration from #1263, I implemented a similar adapter to how async works. We get away with this because we don't encounter SERDE boundaries. If you run the example DAG you'll see that: 1. it is parallelized as it should be 2. you can use caching and the tracking adapter Rough edges: - haven't tested this extensively, but seems to just work. - need to add tests for it & docs, etc.
Okay merged #1264, but will properly release tomorrow. |
Awesome! |
Okay it's out as |
I am getting a value error when trying to use caching with an async_driver
Current behavior
dr = await (
async_driver.Builder().with_modules(pdf_processor_async).with_cache().build()
)
ValueError Traceback (most recent call last)
/home/sagemaker-user/ai-experiment/llm-frameworks/hamilton/test_hamilton_async.ipynb Cell 6 line 1
----> 1 dr = await (
2 async_driver.Builder().with_modules(pdf_processor_async).with_cache().build()
3 )
ValueError: Multiple adapters cannot (currently) implement the same lifecycle method. Sync methods: ['do_node_execute']. Async methods: []
Stack Traces
ValueError Traceback (most recent call last)
/home/sagemaker-user/ai-experiment/llm-frameworks/hamilton/test_hamilton_async.ipynb Cell 6 line 1
----> 1 dr = await (
2 async_driver.Builder().with_modules(pdf_processor_async).with_cache().build()
3 )
File /opt/conda/lib/python3.10/site-packages/hamilton/async_driver.py:512, in Builder.build(self)
504 async def build(self):
505 """Builds the async driver. This also initializes it, hence the async definition.
506 If you don't want to use async, you can use
build_without_init
and callainit
later,507 but we recommend using this in an asynchronous lifespan management function (E.G. in fastAPI),
(...)
510 :return: The fully
511 """
--> 512 dr = self.build_without_init()
513 return await dr.ainit()
File /opt/conda/lib/python3.10/site-packages/hamilton/async_driver.py:496, in Builder.build_without_init(self)
494 result_builders = [adapter for adapter in adapters if isinstance(adapter, base.ResultMixin)]
495 specified_result_builder = base.DictResult() if len(result_builders) == 0 else None
--> 496 return AsyncDriver(
497 self.config,
498 *self.modules,
499 adapters=self.adapters,
500 result_builder=specified_result_builder,
501 allow_module_overrides=self._allow_module_overrides,
502 )
File /opt/conda/lib/python3.10/site-packages/hamilton/async_driver.py:237, in AsyncDriver.init(self, config, result_builder, adapters, allow_module_overrides, *modules)
235 # it will be defaulted by the graph adapter
236 result_builder = result_builders[0] if len(result_builders) == 1 else None
--> 237 super(AsyncDriver, self).init(
238 config,
239 *modules,
240 adapter=[
241 # We pass in the async adapters here as this can call node-level hooks
242 # Otherwise we trust the driver/fn graph to call sync adapters
243 AsyncGraphAdapter(
244 result_builder=result_builder,
245 async_lifecycle_adapters=lifecycle_base.LifecycleAdapterSet(*async_adapters),
246 ),
247 # We pass in the sync adapters here as this can call
248 *sync_adapters,
249 *async_adapters, # note async adapters will not be called during synchronous execution -- this is for access later
250 ],
251 allow_module_overrides=allow_module_overrides,
252 )
253 self.initialized = False
File /opt/conda/lib/python3.10/site-packages/hamilton/driver.py:434, in Driver.init(self, config, adapter, allow_module_overrides, _materializers, _graph_executor, _use_legacy_adapter, *modules)
413 """Constructor: creates a DAG given the configuration & modules to crawl.
414
415 :param config: This is a dictionary of initial data & configuration.
(...)
430
431 """
433 self.driver_run_id = uuid.uuid4()
--> 434 adapter = self.normalize_adapter_input(adapter, use_legacy_adapter=_use_legacy_adapter)
435 if adapter.does_hook("pre_do_anything", is_async=False):
436 adapter.call_all_lifecycle_hooks_sync("pre_do_anything")
File /opt/conda/lib/python3.10/site-packages/hamilton/driver.py:341, in Driver.normalize_adapter_input(adapter, use_legacy_adapter)
339 if use_legacy_adapter:
340 adapter.append(base.PandasDataFrameResult())
--> 341 return lifecycle_base.LifecycleAdapterSet(*adapter)
File /opt/conda/lib/python3.10/site-packages/hamilton/lifecycle/base.py:770, in LifecycleAdapterSet.init(self, *adapters)
768 self._adapters = self._uniqify_adapters(adapters)
769 self.sync_hooks, self.async_hooks = self._get_lifecycle_hooks()
--> 770 self.sync_methods, self.async_methods = self._get_lifecycle_methods()
771 self.sync_validators = self._get_lifecycle_validators()
File /opt/conda/lib/python3.10/site-packages/hamilton/lifecycle/base.py:838, in LifecycleAdapterSet._get_lifecycle_methods(self)
834 multiple_implementations_async = [
835 method for method, adapters in async_methods.items() if len(adapters) > 1
836 ]
837 if len(multiple_implementations_sync) > 0 or len(multiple_implementations_async) > 0:
--> 838 raise ValueError(
839 f"Multiple adapters cannot (currently) implement the same lifecycle method. "
840 f"Sync methods: {multiple_implementations_sync}. "
841 f"Async methods: {multiple_implementations_async}"
842 )
843 return (
844 {method: list(adapters) for method, adapters in sync_methods.items()},
845 {method: list(adapters) for method, adapters in async_methods.items()},
846 )
ValueError: Multiple adapters cannot (currently) implement the same lifecycle method. Sync methods: ['do_node_execute']. Async methods: []
Steps to replicate behavior
async_driver.Builder().with_modules(pdf_processor_async).with_cache().build()
)
Library & System Information
python=3.10.14
hamilton=1.85.1
The text was updated successfully, but these errors were encountered: