Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Emphasize on remote result storage necessity in README #82

Closed
wants to merge 3 commits into from

Conversation

j-tr
Copy link
Contributor

@j-tr j-tr commented Apr 20, 2023

The current README recommends using remote result storage if special prefect features like caching are used.
However, in edge cases, not using remote result storage can lead to very hard to debug errors even when not using any prefect features that would require remote result persistence. Consequently, the README should emphasize that remote result storage is necessary.

Explanation:

Tasks manage their own state in the Prefect cloud. So right before a task ends and returns its result, it calls the prefect API to signal that it is Completed. Consequently, for a brief moment the task is technically still running but marked as Completed. If ray then kills the worker that runs this task (e.g. due to oom, could be also another remote function on the same node that is causing this) after it got marked as Completed but before it returned the result it is causing issues. Ray will rerun the task, which results in the forbidden state transition Completed -> Running (again). When encountering these forbidden transitions (Abort exception caught in https://github.com/PrefectHQ/prefect/blob/main/src/prefect/engine.py#L1386), prefect does one final check with the cloud api to find out if the task is maybe already finished. If this is the case, prefect continues by retrieving the result from storage (which is empty if no remote result persistence is enabled and the task was running on another node).
This leads to MissingResult error (if result persistence was off) or ValueError storage path not found (if result persistence is on).
While this is extremely flaky and hard to reproduce in the wild, because the oom needs to hit in exactly the right moment, this can be reproduced by the following example code that triggers an oom error in the on_completion hook (which runs after the task is marked completed but before it returns).

from prefect import flow
from prefect_ray import RayTaskRunner
from prefect import task
from prefect_ray.context import remote_options

def task_completion_hook(task, task_run, state):
    # Allocate a lot of memory, simulate oom
    x = []
    while True:
        x += [1] * 1024**2

@task(persist_result=True, on_completion=[task_completion_hook])
def task(x=None):
    return x

@flow(
    task_runner=RayTaskRunner,
)
def main() -> None:
    with remote_options(resources={"worker_node_1": 0.001}):  # force task to be executed on worker node with custom resource "worker_node_1"
        future = task.submit(1)
    
    print(future.result())

if __name__ == "__main__":
    main()

Checklist

  • References any related issue by including "Closes #" or "Closes ".
    • If no issue exists and your change is not a small fix, please create an issue first.
  • Includes tests or only affects documentation.
  • Passes pre-commit checks.
    • Run pre-commit install && pre-commit run --all locally for formatting and linting.
  • Includes screenshots of documentation updates.
    • Run mkdocs serve view documentation locally.
  • Summarizes PR's changes in CHANGELOG.md

@j-tr j-tr requested a review from zanieb as a code owner April 20, 2023 13:21
@j-tr j-tr requested a review from a team April 20, 2023 13:21
README.md Outdated Show resolved Hide resolved
@desertaxle
Copy link
Member

Thank you for your willingness to contribute to prefect-ray! The code for this library has been moved to https://github.com/PrefectHQ/prefect/tree/main/src/integrations/prefect-ray. Please reopen this PR there if this functionality is still needed!

@desertaxle desertaxle closed this Apr 26, 2024
Sign up for free to subscribe to this conversation on GitHub. Already have an account? Sign in.
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants