Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Enable submission of tasks and flows to remote infrastructure without previous registration #17194

Open
desertaxle opened this issue Feb 19, 2025 · 0 comments
Labels
enhancement An improvement of an existing feature

Comments

@desertaxle
Copy link
Member

desertaxle commented Feb 19, 2025

Describe the current behavior

Today, when users write workflows with Prefect that need to run on multiple pieces of infrastructure, they are forced to create a deployment for each workflow and use run_deployment to invoke the created deployment. This can lead to a sprawl of deployments that may hold little business meaning but are listed alongside their workflows with business meaning to conform to our current operating model.

Here’s what a multi-infra workflow would look like today with Prefect:

from prefect import flow
from prefect.deployments import run_deployment

@flow
def parent_flow()
    ... # do some stuff
    flow_run = run_deployment(
        name="my-vertex-flow/my-vertex-deployment",
        parameters={"foo": "bar"},
    )
    vertex_result = flow_run.state.result()
    ... # do some more stuff
    run_deployment(
       name="my-k8s-high-cpu-flow/my-k8s-high-cpu-deployment",
       parameters={"foo": vertex_result},
    )
    k8s_high_cpu_result = flow_run.state.result()
    ... # finish up
   
    
@flow
def my_vertex_flow():
    ... # does things that need GPUs
    

@flow
def my_high_cpu_k8s_flow()
    ... # really works those CPUs

This code is hard to grok and easy to mess up.

Outside of this code, the author would also need to create the necessary work pools, create deployments for each flow, and run a worker for each work pool. The parent flow’s relationship to the child infra-dependent flows is very decoupled making execution of the parent flow brittle to changes in server-side configuration (e.g. deployment name changes).

Additionally, it’s highly likely that my_vertex_flow and my_high_cpu_k8s_flow will fail if called directly because they rely on specialized infrastructure, but the flow author cannot enforce that dependency in Prefect today.

Describe the proposed behavior

We want to enable users to run workflows on ad-hoc infrastructure without needing to first create a deployment or run a persistent worker so that they can write multi-infrastructure workflows that are portable, easy to understand, and resilient.

Note that some setup may still be required outside of Prefect to support workflows written to take advantage of the SDK proposed in this document (e.g. a Kubernetes cluster, a GCP account with APIs enabled). There is a possibility to extend this design further to take on responsibility for infra setup, but that is considered out of scope for this design.

Implementing this would involve introducing two new APIs:

  1. Flow submission to workers
  2. Infrastructure decorators for tasks

The first API would be more low-level and allow greater control over the submission process. The second API would allow decorating functions to bind them to execution in a given environment.

Example Use

Bind to infrastructure via a decorator

from prefect import flow
from prefect_kubernetes import kubernetes

@kubernetes(
    work_pool_name="olympic", 
    memory=64000,
)
@flow
def my_k8s_flow():
    return "hello"

future = my_k8s_flow() # Runs in the "olympic" work pool
print(future.result())  # prints "hello"

Because my_k8s_flow is decorated with @kubernetes, it will run via a Kubernetes job each time it is invoked. For this API, the worker creation and flow submission are handled under the hood. Any additional kwargs (like memory in the above example) will be used as job variables to customize the infrastructure the flow runs on.

Submit to a worker

from prefect import flow
from prefect_kubernetes import KubernetesWorker

@flow
def my_k8s_flow():
    return "hello"

with KubernetesWorker(
    work_pool_name="olympic"
) as worker:
    future = worker.submit(my_k8s_flow, job_variables={"memory": 64000})
    print(future.result())  # prints "hello"

When this code is executed, a Kubernetes worker will start up locally, query the configured work pool for its configured bundle storage location, bundle the submitted flow, and create a Kubernetes job that will be responsible for executing the flow. The bundle_storage configured on the work pool defines a storage location where the worker will upload the bundle for the submitted flow so it can be downloaded and executed in the execution environment (i.e. the Kubernetes job).

Upon submission, the user gets back a PrefectFuture object that can be used to track the execution of that flow and get the result once it is completed. This pattern tracks very closely to the one used by task runners.

Additional context

No response

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
enhancement An improvement of an existing feature
Projects
None yet
Development

No branches or pull requests

1 participant