From aad7cdbd6ca9c79f41848a7fb4f82e7475043702 Mon Sep 17 00:00:00 2001 From: rkpattnaik780 Date: Fri, 15 Mar 2024 00:04:34 +0530 Subject: [PATCH 1/5] chore: bump version of kfp sdk --- Makefile | 2 +- elyra/metadata/schemasproviders.py | 22 +-- elyra/pipeline/kfp/PipelineConf.py | 157 ++++++++++++++++ elyra/pipeline/kfp/kfp_authentication.py | 6 +- elyra/pipeline/kfp/processor_kfp.py | 77 +++----- ...neric_component_definition_template.jinja2 | 24 +++ .../kubeflow/v2/python_dsl_template.jinja2 | 167 ++++++++++++++++++ pyproject.toml | 8 +- 8 files changed, 386 insertions(+), 77 deletions(-) create mode 100644 elyra/pipeline/kfp/PipelineConf.py create mode 100644 elyra/templates/kubeflow/v2/generic_component_definition_template.jinja2 create mode 100644 elyra/templates/kubeflow/v2/python_dsl_template.jinja2 diff --git a/Makefile b/Makefile index 9a68fd52c..3528c703e 100644 --- a/Makefile +++ b/Makefile @@ -179,7 +179,7 @@ uninstall-server-package: @$(PYTHON_PIP) uninstall elyra -y install-server-package: uninstall-server-package - $(PYTHON_PIP) install --upgrade --upgrade-strategy $(UPGRADE_STRATEGY) "$(shell find dist -name "elyra-*-py3-none-any.whl")[kfp-tekton]" + $(PYTHON_PIP) install --upgrade --upgrade-strategy $(UPGRADE_STRATEGY) "$(shell find dist -name "elyra-*-py3-none-any.whl")" install-server: build-dependencies lint-server build-server install-server-package ## Build and install backend diff --git a/elyra/metadata/schemasproviders.py b/elyra/metadata/schemasproviders.py index 03fdbb7e9..ab135d263 100644 --- a/elyra/metadata/schemasproviders.py +++ b/elyra/metadata/schemasproviders.py @@ -23,12 +23,6 @@ import entrypoints from traitlets import log # noqa H306 -try: - from kfp_tekton import TektonClient -except ImportError: - # We may not have kfp-tekton available and that's okay! - TektonClient = None - from elyra.metadata.schema import SchemasProvider from elyra.metadata.schemaspaces import CodeSnippets from elyra.metadata.schemaspaces import ComponentCatalogs @@ -93,16 +87,12 @@ def get_schemas(self) -> List[Dict]: ) if kfp_schema_present: # Update the kfp engine enum to reflect current packages... - # If TektonClient package is missing, navigate to the engine property - # and remove 'tekton' entry if present and return updated result. - if not TektonClient: - # locate the schema and update the enum - for schema in runtime_schemas: - if schema["name"] == "kfp": - engine_enum: list = schema["properties"]["metadata"]["properties"]["engine"]["enum"] - if "Tekton" in engine_enum: - engine_enum.remove("Tekton") - schema["properties"]["metadata"]["properties"]["engine"]["enum"] = engine_enum + for schema in runtime_schemas: + if schema["name"] == "kfp": + engine_enum: list = schema["properties"]["metadata"]["properties"]["engine"]["enum"] + if "Tekton" in engine_enum: + engine_enum.remove("Tekton") + schema["properties"]["metadata"]["properties"]["engine"]["enum"] = engine_enum # For KFP schemas replace placeholders: # - properties.metadata.properties.auth_type.enum ({AUTH_PROVIDER_PLACEHOLDERS}) diff --git a/elyra/pipeline/kfp/PipelineConf.py b/elyra/pipeline/kfp/PipelineConf.py new file mode 100644 index 000000000..1e4f1aae6 --- /dev/null +++ b/elyra/pipeline/kfp/PipelineConf.py @@ -0,0 +1,157 @@ +from typing import Union +from kubernetes.client.models import V1PodDNSConfig + +class PipelineConf(): + """PipelineConf contains pipeline level settings.""" + + def __init__(self): + self.image_pull_secrets = [] + self.timeout = 0 + self.ttl_seconds_after_finished = -1 + self._pod_disruption_budget_min_available = None + self.op_transformers = [] + self.default_pod_node_selector = {} + self.image_pull_policy = None + self.parallelism = None + self._data_passing_method = None + self.dns_config = None + + def set_image_pull_secrets(self, image_pull_secrets): + """Configures the pipeline level imagepullsecret. + + Args: + image_pull_secrets: a list of Kubernetes V1LocalObjectReference For + detailed description, check Kubernetes V1LocalObjectReference definition + https://github.com/kubernetes-client/python/blob/master/kubernetes/docs/V1LocalObjectReference.md + """ + self.image_pull_secrets = image_pull_secrets + return self + + def set_timeout(self, seconds: int): + """Configures the pipeline level timeout. + + Args: + seconds: number of seconds for timeout + """ + self.timeout = seconds + return self + + def set_parallelism(self, max_num_pods: int): + """Configures the max number of total parallel pods that can execute at + the same time in a workflow. + + Args: + max_num_pods: max number of total parallel pods. + """ + if max_num_pods < 1: + raise ValueError( + 'Pipeline max_num_pods set to < 1, allowed values are > 0') + + self.parallelism = max_num_pods + return self + + def set_ttl_seconds_after_finished(self, seconds: int): + """Configures the ttl after the pipeline has finished. + + Args: + seconds: number of seconds for the workflow to be garbage collected after + it is finished. + """ + self.ttl_seconds_after_finished = seconds + return self + + def set_pod_disruption_budget(self, min_available: Union[int, str]): + """PodDisruptionBudget holds the number of concurrent disruptions that + you allow for pipeline Pods. + + Args: + min_available (Union[int, str]): An eviction is allowed if at least + "minAvailable" pods selected by "selector" will still be available after + the eviction, i.e. even in the absence of the evicted pod. So for + example you can prevent all voluntary evictions by specifying "100%". + "minAvailable" can be either an absolute number or a percentage. + """ + self._pod_disruption_budget_min_available = min_available + return self + + def set_default_pod_node_selector(self, label_name: str, value: str): + """Add a constraint for nodeSelector for a pipeline. + + Each constraint is a key-value pair label. + + For the container to be eligible to run on a node, the node must have each + of the constraints appeared as labels. + + Args: + label_name: The name of the constraint label. + value: The value of the constraint label. + """ + self.default_pod_node_selector[label_name] = value + return self + + def set_image_pull_policy(self, policy: str): + """Configures the default image pull policy. + + Args: + policy: the pull policy, has to be one of: Always, Never, IfNotPresent. + For more info: + https://github.com/kubernetes-client/python/blob/10a7f95435c0b94a6d949ba98375f8cc85a70e5a/kubernetes/docs/V1Container.md + """ + self.image_pull_policy = policy + return self + + def add_op_transformer(self, transformer): + """Configures the op_transformers which will be applied to all ops in + the pipeline. The ops can be ResourceOp, VolumeOp, or ContainerOp. + + Args: + transformer: A function that takes a kfp Op as input and returns a kfp Op + """ + self.op_transformers.append(transformer) + + def set_dns_config(self, dns_config: V1PodDNSConfig): + """Set the dnsConfig to be given to each pod. + + Args: + dns_config: Kubernetes V1PodDNSConfig For detailed description, check + Kubernetes V1PodDNSConfig definition + https://github.com/kubernetes-client/python/blob/master/kubernetes/docs/V1PodDNSConfig.md + + Example: + :: + + import kfp + from kubernetes.client.models import V1PodDNSConfig, V1PodDNSConfigOption + pipeline_conf = kfp.dsl.PipelineConf() + pipeline_conf.set_dns_config(dns_config=V1PodDNSConfig( + nameservers=["1.2.3.4"], + options=[V1PodDNSConfigOption(name="ndots", value="2")], + )) + """ + self.dns_config = dns_config + + @property + def data_passing_method(self): + return self._data_passing_method + + @data_passing_method.setter + def data_passing_method(self, value): + """Sets the object representing the method used for intermediate data + passing. + + Example: + :: + + from kfp.dsl import PipelineConf, data_passing_methods + from kubernetes.client.models import V1Volume, V1PersistentVolumeClaimVolumeSource + pipeline_conf = PipelineConf() + pipeline_conf.data_passing_method = + data_passing_methods.KubernetesVolume( + volume=V1Volume( + name='data', + persistent_volume_claim=V1PersistentVolumeClaimVolumeSource('data-volume'), + ), + path_prefix='artifact_data/', + ) + """ + self._data_passing_method = value \ No newline at end of file diff --git a/elyra/pipeline/kfp/kfp_authentication.py b/elyra/pipeline/kfp/kfp_authentication.py index 3ca44519a..c369fc3eb 100644 --- a/elyra/pipeline/kfp/kfp_authentication.py +++ b/elyra/pipeline/kfp/kfp_authentication.py @@ -27,9 +27,9 @@ from typing import Tuple from urllib.parse import urlsplit -from kfp.auth import KF_PIPELINES_SA_TOKEN_ENV -from kfp.auth import KF_PIPELINES_SA_TOKEN_PATH -from kfp.auth import ServiceAccountTokenVolumeCredentials +from kfp.client import KF_PIPELINES_SA_TOKEN_ENV +from kfp.client import KF_PIPELINES_SA_TOKEN_PATH +from kfp.client import ServiceAccountTokenVolumeCredentials import requests diff --git a/elyra/pipeline/kfp/processor_kfp.py b/elyra/pipeline/kfp/processor_kfp.py index 4e7f9be57..d259b9186 100644 --- a/elyra/pipeline/kfp/processor_kfp.py +++ b/elyra/pipeline/kfp/processor_kfp.py @@ -39,19 +39,11 @@ from kfp import Client as ArgoClient from kfp import compiler as kfp_argo_compiler from kfp import components as components -from kfp.dsl import PipelineConf -from kfp.dsl import RUN_ID_PLACEHOLDER from kubernetes import client as k8s_client from traitlets import default from traitlets import Unicode -try: - from kfp_tekton import compiler as kfp_tekton_compiler - from kfp_tekton import TektonClient -except ImportError: - # We may not have kfp-tekton available and that's okay! - kfp_tekton_compiler = None - TektonClient = None +RUN_ID_PLACEHOLDER = "random-placeholder" from elyra._version import __version__ from elyra.metadata.schemaspaces import RuntimeImages @@ -81,6 +73,8 @@ from elyra.util.kubernetes import sanitize_label_value from elyra.util.path import get_absolute_path +from elyra.pipeline.kfp.PipelineConf import PipelineConf + @unique class WorkflowEngineType(Enum): @@ -113,8 +107,6 @@ def get_instance_by_value(value: str) -> "WorkflowEngineType": CRIO_VOL_MOUNT_PATH = "/opt/app-root/src" CRIO_VOL_WORKDIR_PATH = f"{CRIO_VOL_MOUNT_PATH}/jupyter-work-dir" CRIO_VOL_PYTHON_PATH = f"{CRIO_VOL_WORKDIR_PATH}/python3" - - class KfpPipelineProcessor(RuntimePipelineProcessor): _type = RuntimeProcessorType.KUBEFLOW_PIPELINES _name = "kfp" @@ -173,11 +165,6 @@ def process(self, pipeline): api_password = runtime_configuration.metadata.get("api_password") user_namespace = runtime_configuration.metadata.get("user_namespace") workflow_engine = WorkflowEngineType.get_instance_by_value(runtime_configuration.metadata.get("engine", "argo")) - if workflow_engine == WorkflowEngineType.TEKTON and not TektonClient: - raise ValueError( - "Python package `kfp-tekton` is not installed. " - "Please install using `elyra[kfp-tekton]` to use Tekton engine." - ) # unpack Cloud Object Storage configs cos_endpoint = runtime_configuration.metadata["cos_endpoint"] @@ -206,22 +193,13 @@ def process(self, pipeline): # Create Kubeflow Client ############# try: - if workflow_engine == WorkflowEngineType.TEKTON: - client = TektonClient( - host=api_endpoint, - cookies=auth_info.get("cookies", None), - credentials=auth_info.get("credentials", None), - existing_token=auth_info.get("existing_token", None), - namespace=user_namespace, - ) - else: - client = ArgoClient( - host=api_endpoint, - cookies=auth_info.get("cookies", None), - credentials=auth_info.get("credentials", None), - existing_token=auth_info.get("existing_token", None), - namespace=user_namespace, - ) + client = ArgoClient( + host=api_endpoint, + cookies=auth_info.get("cookies", None), + credentials=auth_info.get("credentials", None), + existing_token=auth_info.get("existing_token", None), + namespace=user_namespace, + ) except Exception as ex: # a common cause of these errors is forgetting to include `/pipeline` or including it with an 's' api_endpoint_obj = urlsplit(api_endpoint) @@ -275,7 +253,7 @@ def process(self, pipeline): with tempfile.TemporaryDirectory() as temp_dir: self.log.debug(f"Created temporary directory at: {temp_dir}") - pipeline_path = os.path.join(temp_dir, f"{pipeline_name}.tar.gz") + pipeline_path = os.path.join(temp_dir, f"{pipeline_name}.yaml") ############# # Get Pipeline ID @@ -351,11 +329,15 @@ def process(self, pipeline): ) # extract the ID of the pipeline we created - pipeline_id = kfp_pipeline.id + pipeline_id = kfp_pipeline.pipeline_id # the initial "pipeline version" has the same id as the pipeline itself - version_id = pipeline_id - + version_details = client.list_pipeline_versions(pipeline_id=pipeline_id) + version_list = version_details.pipeline_versions + if isinstance(version_list, list): + version_id = version_list[0].pipeline_version_id + else: + version_id = None # CASE 2: pipeline already exists else: # upload the "pipeline version" @@ -366,7 +348,7 @@ def process(self, pipeline): ) # extract the id of the "pipeline version" that was created - version_id = kfp_pipeline.id + version_id = kfp_pipeline.pipeline_version_id except Exception as ex: # a common cause of these errors is forgetting to include `/pipeline` or including it with an 's' @@ -416,7 +398,7 @@ def process(self, pipeline): # create pipeline run (or specified pipeline version) run = client.run_pipeline( - experiment_id=experiment.id, job_name=job_name, pipeline_id=pipeline_id, version_id=version_id + experiment_id=experiment.experiment_id, job_name=job_name, pipeline_id=pipeline_id, version_id=version_id ) except Exception as ex: @@ -435,7 +417,7 @@ def process(self, pipeline): self.log_pipeline_info( pipeline_name, - f"pipeline submitted: {public_api_endpoint}/#/runs/details/{run.id}", + f"pipeline submitted: {public_api_endpoint}/#/runs/details/{run.run_id}", duration=time.time() - t0, ) @@ -450,8 +432,8 @@ def process(self, pipeline): object_storage_path = None return KfpPipelineProcessorResponse( - run_id=run.id, - run_url=f"{public_api_endpoint}/#/runs/details/{run.id}", + run_id=run.run_id, + run_url=f"{public_api_endpoint}/#/runs/details/{run.run_id}", object_storage_url=object_storage_url, object_storage_path=object_storage_path, ) @@ -494,8 +476,6 @@ def export( ) workflow_engine = WorkflowEngineType.get_instance_by_value(runtime_configuration.metadata.get("engine", "argo")) - if workflow_engine == WorkflowEngineType.TEKTON and not TektonClient: - raise ValueError("kfp-tekton not installed. Please install using elyra[kfp-tekton] to use Tekton engine.") if Path(absolute_pipeline_export_path).exists() and not overwrite: raise ValueError("File " + absolute_pipeline_export_path + " already exists.") @@ -565,7 +545,7 @@ def _generate_pipeline_dsl( code_generation_options = {} # Load Kubeflow Pipelines Python DSL template - loader = PackageLoader("elyra", "templates/kubeflow/v1") + loader = PackageLoader("elyra", "templates/kubeflow/v2") template_env = Environment(loader=loader) # Add filter that produces a Python-safe variable name template_env.filters["python_safe"] = lambda x: re.sub(r"[" + re.escape(string.punctuation) + "\\s]", "_", x) @@ -668,12 +648,7 @@ def _compile_pipeline_dsl( # in the generated Python DSL "generated_pipeline" pipeline_function = getattr(mod, "generated_pipeline") # compile the DSL - if workflow_engine == WorkflowEngineType.TEKTON: - kfp_tekton_compiler.TektonCompiler().compile( - pipeline_function, output_file, pipeline_conf=pipeline_conf - ) - else: - kfp_argo_compiler.Compiler().compile(pipeline_function, output_file, pipeline_conf=pipeline_conf) + kfp_argo_compiler.Compiler().compile(pipeline_function, output_file) except Exception as ex: raise RuntimeError( f"Failed to compile pipeline with workflow_engine '{workflow_engine.value}' to '{output_file}'" @@ -729,7 +704,7 @@ def _generate_workflow_tasks( pipeline.pipeline_properties.get(pipeline_constants.COS_OBJECT_PREFIX), pipeline_instance_id ) # - load the generic component definition template - template_env = Environment(loader=PackageLoader("elyra", "templates/kubeflow/v1")) + template_env = Environment(loader=PackageLoader("elyra", "templates/kubeflow/v2")) generic_component_template = template_env.get_template("generic_component_definition_template.jinja2") # Add filter that escapes the " character in strings template_env.filters["string_delimiter_safe"] = lambda string: re.sub('"', '\\\\\\\\"', string) diff --git a/elyra/templates/kubeflow/v2/generic_component_definition_template.jinja2 b/elyra/templates/kubeflow/v2/generic_component_definition_template.jinja2 new file mode 100644 index 000000000..85c8eb086 --- /dev/null +++ b/elyra/templates/kubeflow/v2/generic_component_definition_template.jinja2 @@ -0,0 +1,24 @@ +name: Run a file +description: Run a Jupyter notebook or Python/R script +{% if task_parameters %} +inputs: +{%- for parameter in task_parameters %} +- {name: {{ parameter.name }}, type: {{ parameter.input_type.component_input_type }}{% if parameter.description %}, description: "{{ parameter.description | string_delimiter_safe}}"{% endif %}{% if parameter.default_value is not none %}, default: {% if parameter.selected_type == 'String' %} "{{ parameter.default_value|string_delimiter_safe }}"{% else %}{{ parameter.default_value }}{% endif %}{% endif %}, optional: {{ (not parameter.required)|tojson }}} +{%- endfor %} +{% endif %} +implementation: + container: + image: {{ container_image }} + command: [sh, -c] + args: + - | + {%- for parameter in task_parameters %} + {{ parameter.name }}="${{ loop.index0 }}" + {%- endfor %} + {%- for command in command_args %} + sh -c "{{command}}" + {%- endfor %} + + {%- for parameter in task_parameters %} + - {inputValue: {{ parameter.name }}} + {%- endfor %} diff --git a/elyra/templates/kubeflow/v2/python_dsl_template.jinja2 b/elyra/templates/kubeflow/v2/python_dsl_template.jinja2 new file mode 100644 index 000000000..505843a48 --- /dev/null +++ b/elyra/templates/kubeflow/v2/python_dsl_template.jinja2 @@ -0,0 +1,167 @@ +# +# Generated by Elyra {{ elyra_version }} +# +import kfp +from kubernetes.client import * +from kubernetes.client.models import * + +{# Load statements for custom components -#} +{# component_hash = """""" -#} +{# factory_hash = kfp.components.load_component_from_text(component_hash) -#} +{% for hash, component_definition in component_definitions.items() %} +component_def_{{ hash | python_safe }} = """ +{{ component_definition }} +""" + +factory_{{ hash | python_safe }} = kfp.components.load_component_from_text(component_def_{{ hash | python_safe }}) +{% endfor %} + +{# Define pipeline -#} +{% if pipeline_description %} +@kfp.dsl.pipeline(name="{{ pipeline_name }}", description="{{ pipeline_description | string_delimiter_safe }}") +{% else %} +@kfp.dsl.pipeline(name="{{ pipeline_name }}") +{% endif %} +def generated_pipeline( +{% if pipeline_parameters %} +{% for parameter in pipeline_parameters %} + {{ parameter.name }}{% if parameter.input_type.type_hint %}: {{ parameter.input_type.type_hint }}{% endif %} = {{ parameter|param_val_to_python_var }}, +{% endfor %} +{% endif %} +): +{% for workflow_task in workflow_tasks.values() %} + {% set task_name = "task_" + workflow_task.escaped_task_id %} + # Task for node '{{ workflow_task.name }}' + {{ task_name }} = factory_{{ workflow_task.component_definition_hash | python_safe }}( +{% for task_input_name, task_input_spec in workflow_task.task_inputs.items() %} +{% if task_input_spec.task_output_reference %} + {{ task_input_name }}=task_{{ task_input_spec.task_output_reference.task_id }}.outputs["{{ task_input_spec.task_output_reference.output_id }}"], +{% elif task_input_spec.pipeline_parameter_reference %} + {{ task_input_name }}={{ task_input_spec.pipeline_parameter_reference }}, +{% elif task_input_spec.requires_quoted_rendering %} + {{ task_input_name }}="""{{ task_input_spec.value | string_delimiter_safe }}""", +{% else %} + {{ task_input_name }}={{ task_input_spec.value }}, +{% endif %} +{% endfor %} + ) +{% if workflow_task.task_modifiers.image_pull_policy %} + {{ task_name }}.container.set_image_pull_policy("{{ workflow_task.task_modifiers.image_pull_policy }}") +{% endif %} +{% if workflow_task.task_modifiers.object_storage_secret %} + {{ task_name }}.apply(kfp.aws.use_aws_secret("{{ workflow_task.task_modifiers.object_storage_secret }}")) +{% endif %} + {{ task_name }}.set_display_name("{{ workflow_task.name | string_delimiter_safe }}") +{% if workflow_task.task_modifiers.cpu_request %} + {{ task_name }}.container.set_cpu_request(cpu="{{ workflow_task.task_modifiers.cpu_request }}") +{% endif %} +{% if workflow_task.task_modifiers.mem_request and workflow_task.task_modifiers.mem_request.size %} + {{ task_name }}.container.set_memory_request(memory="{{ workflow_task.task_modifiers.mem_request.size }}{{ workflow_task.task_modifiers.mem_request.units }}") +{% endif %} +{% if workflow_task.task_modifiers.cpu_limit %} + {{ task_name }}.container.set_cpu_limit(cpu="{{ workflow_task.task_modifiers.cpu_limit }}") +{% endif %} +{% if workflow_task.task_modifiers.memory_limit and workflow_task.task_modifiers.memory_limit.size %} + {{ task_name }}.container.set_memory_limit(memory="{{ workflow_task.task_modifiers.memory_limit.size }}{{ workflow_task.task_modifiers.memory_limit.units }}") +{% endif %} +{% if workflow_task.task_modifiers.gpu_limit and workflow_task.task_modifiers.gpu_limit.size %} + {{ task_name }}.container.add_resource_limit(resource_name="{{ workflow_task.task_modifiers.gpu_limit.vendor }}", value="{{ workflow_task.task_modifiers.gpu_limit.size }}") +{% endif %} +{% if workflow_task.task_modifiers.env_variables %} +{% for env_var_name, env_var_value in workflow_task.task_modifiers.env_variables.items() %} + {{ task_name }}.set_env_variable(name="{{ env_var_name }}", value="{{ env_var_value | string_delimiter_safe }}") +{% endfor %} +{% endif %} +{% if workflow_task.task_modifiers.set_run_name %} +{% if workflow_engine == "tekton" %} + {{ task_name }}.set_env_variable(name="ELYRA_RUN_NAME", value=V1ObjectFieldSelector(field_path="metadata.annotations['pipelines.kubeflow.org/run_name']")) +{% else %} + {{ task_name }}.set_env_variable(name="ELYRA_RUN_NAME", value="{{ workflow_task.task_modifiers.set_run_name }}") +{% endif %} +{% endif %} +{% if workflow_task.task_modifiers.disable_node_caching %} + {{ task_name }}.execution_options.caching_strategy.max_cache_staleness = "P0D" +{% endif %} +{% if workflow_task.task_modifiers.kubernetes_secrets %} +{% for env_var, secret_dict in workflow_task.task_modifiers.kubernetes_secrets.items() %} + {{ task_name }}.container.set_env_variable( + name="{{ env_var }}", + value=V1EnvVarSource(secret_key_ref=V1SecretKeySelector(name="{{ secret_dict.name }}", key="{{ secret_dict.key }}")), + ) +{% endfor %} +{% endif %} +{% if workflow_task.task_modifiers.kubernetes_volumes %} +{% for volume_path, volume_dict in workflow_task.task_modifiers.kubernetes_volumes.items() %} + {{ task_name }}.add_volume( + V1Volume( + name="{{ volume_dict.pvc_name}}", + persistent_volume_claim=V1PersistentVolumeClaimVolumeSource(claim_name="{{ volume_dict.pvc_name }}",), + )) + {{ task_name }}.container.add_volume_mount( + V1VolumeMount( + mount_path="{{ volume_path }}", + name="{{ volume_dict.pvc_name }}", +{% if volume_dict.sub_path %} + sub_path="{{ volume_dict.sub_path }}", +{% endif %} + read_only={{ volume_dict.read_only }}, + )) +{% endfor %} +{% endif %} +{% if workflow_task.task_modifiers.kubernetes_tolerations %} +{% for toleration_dict in workflow_task.task_modifiers.kubernetes_tolerations.values() %} + {{ task_name }}.add_toleration( + V1Toleration( +{% if toleration_dict.effect %} + effect="{{ toleration_dict.effect }}", +{% else %} + effect=None, +{% endif %} +{% if toleration_dict.key %} + key="{{ toleration_dict.key }}", +{% else %} + key=None, +{% endif %} + operator="{{ toleration_dict.operator }}", +{% if toleration_dict.value %} + value="{{ toleration_dict.value | string_delimiter_safe }}", +{% else %} + value=None, +{% endif %} + )) +{% endfor %} +{% endif %} +{% if workflow_task.task_modifiers.kubernetes_shared_mem_size %} + {{ task_name }}.add_volume(V1Volume( + name="shm", + empty_dir=V1EmptyDirVolumeSource(medium="Memory", size_limit="{{ workflow_task.task_modifiers.kubernetes_shared_mem_size.size }}{{ workflow_task.task_modifiers.kubernetes_shared_mem_size.units }}"), + )) + {{ task_name }}.container.add_volume_mount(V1VolumeMount(mount_path="/dev/shm", name="shm")) +{% endif %} +{% if workflow_task.task_modifiers.crio_runtime %} + {{ task_name }}.add_volume(V1Volume( + name="{{ workflow_task.task_modifiers.crio_runtime.emptydir_volume_name }}", + empty_dir=V1EmptyDirVolumeSource(medium="{{ workflow_task.task_modifiers.crio_runtime.emptydir_volume_medium }}", size_limit="{{ workflow_task.task_modifiers.crio_runtime.emptydir_volume_size }}"), + )) + {{ task_name }}.container.add_volume_mount(V1VolumeMount(mount_path="{{ workflow_task.task_modifiers.crio_runtime.emptydir_mount_path }}", name="{{ workflow_task.task_modifiers.crio_runtime.emptydir_volume_name }}")) +{% endif %} +{# declare upstream dependencies -#} +{% if workflow_task.upstream_workflow_task_ids %} +{% for upstream_workflow_task_id in workflow_task.upstream_workflow_task_ids %} + {{ task_name }}.after(task_{{ upstream_workflow_task_id | python_safe }}) +{% endfor %} +{% endif %} +{% endfor %} + +if __name__ == "__main__": + from pathlib import Path +{% if workflow_engine.lower() == "tekton" %} + from kfp_tekton import compiler + + compiler.TektonCompiler().compile( +{% else %} + kfp.compiler.Compiler().compile( +{% endif %} + pipeline_func=generated_pipeline, + package_path=Path(__file__).with_suffix(".yaml").name, + ) \ No newline at end of file diff --git a/pyproject.toml b/pyproject.toml index f8445c26d..a7814674c 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -50,7 +50,8 @@ dependencies = [ "yaspin", # see: https://stackoverflow.com/questions/76175487/sudden-importerror-cannot-import-name-appengine-from-requests-packages-urlli "appengine-python-standard", - "kfp>=1.7.0,<2.0,!=1.7.2", # We cap the SDK to <2.0 due to possible breaking changes + "kfp>=2.0.0", + "kfp-kubernetes>=1.1.0", "pygithub", "black>=22.8.0", ] @@ -85,10 +86,6 @@ test = [ "pytest_virtualenv", "requests-mock", "requests-unixsocket", - "kfp-tekton" -] -kfp-tekton = [ - "kfp-tekton>=1.5.2" # requires kfp >= 1.8.19, which contains fix for Jupyterlab ] kfp-examples = [ "elyra-examples-kfp-catalog" @@ -98,7 +95,6 @@ gitlab = [ ] # The following is a collection of "non-test" extra dependencies from above. all = [ - "kfp-tekton>=1.5.2", "elyra-examples-kfp-catalog", "python-gitlab", ] From 10434ebdc4793b502930b6f8682ab9b32985c644 Mon Sep 17 00:00:00 2001 From: Harshad Reddy Nalla Date: Fri, 15 Mar 2024 13:36:50 -0400 Subject: [PATCH 2/5] Cap kfp version for avoiding protobuf version dependency Signed-off-by: Harshad Reddy Nalla --- pyproject.toml | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/pyproject.toml b/pyproject.toml index a7814674c..8efc51be9 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -26,14 +26,14 @@ dependencies = [ "jupyter-packaging>=0.10", "jupyter_server>=1.7.0", "jupyterlab>=3.4.6,<4.0", # comment out to use local jupyterlab - "jupyterlab-lsp>=3.8.0", # comment out to use local jupyterlab - "jupyterlab-git~=0.32", # Avoid breaking 1.x changes + "jupyterlab-lsp<=4.2.0", # comment out to use local jupyterlab + "jupyterlab-git<=0.44.0", # Avoid breaking 1.x changes "jupyter-resource-usage>=0.5.1,<1", "MarkupSafe>=2.1", "minio>=7.0.0,!=7.2.1", "nbclient>=0.5.1", "nbconvert>=6.5.1", - "nbdime~=3.1", # Cap from jupyterlab-git + "nbdime~=3.2.1", # Cap from jupyterlab-git "nbformat>=5.1.2", "networkx>=2.5.1", "papermill>=2.3.4", @@ -50,8 +50,8 @@ dependencies = [ "yaspin", # see: https://stackoverflow.com/questions/76175487/sudden-importerror-cannot-import-name-appengine-from-requests-packages-urlli "appengine-python-standard", - "kfp>=2.0.0", - "kfp-kubernetes>=1.1.0", + "kfp>=2.0.0", # Cap kfp for protobuff compatibility + "kfp-kubernetes>=1.0.0", "pygithub", "black>=22.8.0", ] From 1a878078f347c185bcac4e1db3fdbe120c571307 Mon Sep 17 00:00:00 2001 From: rkpattnaik780 Date: Thu, 4 Apr 2024 20:33:39 +0530 Subject: [PATCH 3/5] fix: update container properties according to kfpv2 --- .../kubeflow/v2/python_dsl_template.jinja2 | 167 ++++++++++++------ 1 file changed, 109 insertions(+), 58 deletions(-) diff --git a/elyra/templates/kubeflow/v2/python_dsl_template.jinja2 b/elyra/templates/kubeflow/v2/python_dsl_template.jinja2 index 505843a48..e7f80fe7c 100644 --- a/elyra/templates/kubeflow/v2/python_dsl_template.jinja2 +++ b/elyra/templates/kubeflow/v2/python_dsl_template.jinja2 @@ -4,6 +4,89 @@ import kfp from kubernetes.client import * from kubernetes.client.models import * +from kfp.kubernetes import secret, volume + +from typing import Optional + +# ------------------------------------------------------------------ +# kfp-kubernetes 1.1.0 is misisng these function, explicity using them, +# TODO: remove these function once a new release of kfp-kubernetes is made. +# ------------------------------------------------------------------ + +from google.protobuf import json_format +from kfp.dsl import PipelineTask +from kfp.kubernetes import common +from kfp.kubernetes import kubernetes_executor_config_pb2 as pb + +try: + from typing import Literal +except ImportError: + from typing_extensions import Literal + +def add_toleration( + task: PipelineTask, + key: Optional[str] = None, + operator: Optional[Literal["Equal", "Exists"]] = None, + value: Optional[str] = None, + effect: Optional[Literal["NoExecute", "NoSchedule", "PreferNoSchedule"]] = None, + toleration_seconds: Optional[int] = None, +): + + msg = common.get_existing_kubernetes_config_as_message(task) + msg.tolerations.append( + pb.Toleration( + key=key, + operator=operator, + value=value, + effect=effect, + toleration_seconds=toleration_seconds, + ) + ) + task.platform_config["kubernetes"] = json_format.MessageToDict(msg) + + return task + + +def add_pod_label( + task: PipelineTask, + label_key: str, + label_value: str, +) -> PipelineTask: + + msg = common.get_existing_kubernetes_config_as_message(task) + msg.pod_metadata.labels.update({label_key: label_value}) + task.platform_config['kubernetes'] = json_format.MessageToDict(msg) + + return task + + +def add_pod_annotation( + task: PipelineTask, + annotation_key: str, + annotation_value: str, +) -> PipelineTask: + + msg = common.get_existing_kubernetes_config_as_message(task) + msg.pod_metadata.annotations.update({annotation_key: annotation_value}) + task.platform_config['kubernetes'] = json_format.MessageToDict(msg) + + return task + +def set_image_pull_policy(task: PipelineTask, policy: str) -> PipelineTask: + + if policy not in ['Always', 'Never', 'IfNotPresent']: + raise ValueError( + 'Invalid imagePullPolicy. Must be one of `Always`, `Never`, `IfNotPresent`.' + ) + msg = common.get_existing_kubernetes_config_as_message(task) + msg.image_pull_policy = policy + task.platform_config['kubernetes'] = json_format.MessageToDict(msg) + + return task + +# ------------------------------------------------------------------ +# end of missing functions +# ------------------------------------------------------------------ {# Load statements for custom components -#} {# component_hash = """""" -#} @@ -46,26 +129,29 @@ def generated_pipeline( {% endfor %} ) {% if workflow_task.task_modifiers.image_pull_policy %} - {{ task_name }}.container.set_image_pull_policy("{{ workflow_task.task_modifiers.image_pull_policy }}") + set_image_pull_policy({{ task_name }}, "{{ workflow_task.task_modifiers.image_pull_policy }}") {% endif %} {% if workflow_task.task_modifiers.object_storage_secret %} - {{ task_name }}.apply(kfp.aws.use_aws_secret("{{ workflow_task.task_modifiers.object_storage_secret }}")) + secret.use_secret_as_env({{ task_name }}, "{{ workflow_task.task_modifiers.object_storage_secret }}", { "AWS_ACCESS_KEY_ID": "AWS_ACCESS_KEY_ID", "AWS_SECRET_ACCESS_KEY": "AWS_SECRET_ACCESS_KEY" }) {% endif %} {{ task_name }}.set_display_name("{{ workflow_task.name | string_delimiter_safe }}") +{% if workflow_task.doc %} + add_pod_annotation({{ task_name }}, "elyra/node-user-doc","""{{ workflow_task.doc| string_delimiter_safe }}""") +{% endif %} {% if workflow_task.task_modifiers.cpu_request %} - {{ task_name }}.container.set_cpu_request(cpu="{{ workflow_task.task_modifiers.cpu_request }}") + {{ task_name }}.set_cpu_request(cpu="{{ workflow_task.task_modifiers.cpu_request }}") {% endif %} {% if workflow_task.task_modifiers.mem_request and workflow_task.task_modifiers.mem_request.size %} - {{ task_name }}.container.set_memory_request(memory="{{ workflow_task.task_modifiers.mem_request.size }}{{ workflow_task.task_modifiers.mem_request.units }}") + {{ task_name }}.set_memory_request(memory="{{ workflow_task.task_modifiers.mem_request.size }}{{ workflow_task.task_modifiers.mem_request.units }}") {% endif %} {% if workflow_task.task_modifiers.cpu_limit %} - {{ task_name }}.container.set_cpu_limit(cpu="{{ workflow_task.task_modifiers.cpu_limit }}") + {{ task_name }}.set_cpu_limit(cpu="{{ workflow_task.task_modifiers.cpu_limit }}") {% endif %} {% if workflow_task.task_modifiers.memory_limit and workflow_task.task_modifiers.memory_limit.size %} - {{ task_name }}.container.set_memory_limit(memory="{{ workflow_task.task_modifiers.memory_limit.size }}{{ workflow_task.task_modifiers.memory_limit.units }}") + {{ task_name }}.set_memory_limit(memory="{{ workflow_task.task_modifiers.memory_limit.size }}{{ workflow_task.task_modifiers.memory_limit.units }}") {% endif %} {% if workflow_task.task_modifiers.gpu_limit and workflow_task.task_modifiers.gpu_limit.size %} - {{ task_name }}.container.add_resource_limit(resource_name="{{ workflow_task.task_modifiers.gpu_limit.vendor }}", value="{{ workflow_task.task_modifiers.gpu_limit.size }}") + {{ task_name }}.set_gpu_limit("{{ workflow_task.task_modifiers.gpu_limit.size }}") {% endif %} {% if workflow_task.task_modifiers.env_variables %} {% for env_var_name, env_var_value in workflow_task.task_modifiers.env_variables.items() %} @@ -73,45 +159,35 @@ def generated_pipeline( {% endfor %} {% endif %} {% if workflow_task.task_modifiers.set_run_name %} -{% if workflow_engine == "tekton" %} - {{ task_name }}.set_env_variable(name="ELYRA_RUN_NAME", value=V1ObjectFieldSelector(field_path="metadata.annotations['pipelines.kubeflow.org/run_name']")) -{% else %} {{ task_name }}.set_env_variable(name="ELYRA_RUN_NAME", value="{{ workflow_task.task_modifiers.set_run_name }}") -{% endif %} {% endif %} {% if workflow_task.task_modifiers.disable_node_caching %} {{ task_name }}.execution_options.caching_strategy.max_cache_staleness = "P0D" {% endif %} +{% if workflow_task.task_modifiers.pod_labels %} +{% for pod_label_key, pod_label_value in workflow_task.task_modifiers.pod_labels.items() %} + add_pod_label({{ task_name }}, "{{ pod_label_key }}", "{{ pod_label_value }}") +{% endfor %} +{% endif %} +{% if workflow_task.task_modifiers.pod_annotations %} +{% for pod_annotation_key, pod_annotation_value in workflow_task.task_modifiers.pod_annotations.items() %} + add_pod_annotation({{ task_name }}, "{{ pod_annotation_key }}" , """{{ pod_annotation_value | string_delimiter_safe }}""") +{% endfor %} +{% endif %} {% if workflow_task.task_modifiers.kubernetes_secrets %} {% for env_var, secret_dict in workflow_task.task_modifiers.kubernetes_secrets.items() %} - {{ task_name }}.container.set_env_variable( - name="{{ env_var }}", - value=V1EnvVarSource(secret_key_ref=V1SecretKeySelector(name="{{ secret_dict.name }}", key="{{ secret_dict.key }}")), - ) + secret.use_secret_as_env({{ task_name }}, "{{ secret_dict.name }}", { "{{ secret_dict.key }}" : "{{ env_var }}" }) {% endfor %} {% endif %} {% if workflow_task.task_modifiers.kubernetes_volumes %} {% for volume_path, volume_dict in workflow_task.task_modifiers.kubernetes_volumes.items() %} - {{ task_name }}.add_volume( - V1Volume( - name="{{ volume_dict.pvc_name}}", - persistent_volume_claim=V1PersistentVolumeClaimVolumeSource(claim_name="{{ volume_dict.pvc_name }}",), - )) - {{ task_name }}.container.add_volume_mount( - V1VolumeMount( - mount_path="{{ volume_path }}", - name="{{ volume_dict.pvc_name }}", -{% if volume_dict.sub_path %} - sub_path="{{ volume_dict.sub_path }}", -{% endif %} - read_only={{ volume_dict.read_only }}, - )) + volume.mount_pvc({{ task_name }}, "{{ volume_dict.pvc_name }}", "{{ volume_path }}") {% endfor %} {% endif %} {% if workflow_task.task_modifiers.kubernetes_tolerations %} {% for toleration_dict in workflow_task.task_modifiers.kubernetes_tolerations.values() %} - {{ task_name }}.add_toleration( - V1Toleration( + add_toleration( + {{ task_name }}, {% if toleration_dict.effect %} effect="{{ toleration_dict.effect }}", {% else %} @@ -122,29 +198,10 @@ def generated_pipeline( {% else %} key=None, {% endif %} - operator="{{ toleration_dict.operator }}", -{% if toleration_dict.value %} - value="{{ toleration_dict.value | string_delimiter_safe }}", -{% else %} - value=None, -{% endif %} - )) + operator="{{ toleration_dict.operator }}", + ) {% endfor %} {% endif %} -{% if workflow_task.task_modifiers.kubernetes_shared_mem_size %} - {{ task_name }}.add_volume(V1Volume( - name="shm", - empty_dir=V1EmptyDirVolumeSource(medium="Memory", size_limit="{{ workflow_task.task_modifiers.kubernetes_shared_mem_size.size }}{{ workflow_task.task_modifiers.kubernetes_shared_mem_size.units }}"), - )) - {{ task_name }}.container.add_volume_mount(V1VolumeMount(mount_path="/dev/shm", name="shm")) -{% endif %} -{% if workflow_task.task_modifiers.crio_runtime %} - {{ task_name }}.add_volume(V1Volume( - name="{{ workflow_task.task_modifiers.crio_runtime.emptydir_volume_name }}", - empty_dir=V1EmptyDirVolumeSource(medium="{{ workflow_task.task_modifiers.crio_runtime.emptydir_volume_medium }}", size_limit="{{ workflow_task.task_modifiers.crio_runtime.emptydir_volume_size }}"), - )) - {{ task_name }}.container.add_volume_mount(V1VolumeMount(mount_path="{{ workflow_task.task_modifiers.crio_runtime.emptydir_mount_path }}", name="{{ workflow_task.task_modifiers.crio_runtime.emptydir_volume_name }}")) -{% endif %} {# declare upstream dependencies -#} {% if workflow_task.upstream_workflow_task_ids %} {% for upstream_workflow_task_id in workflow_task.upstream_workflow_task_ids %} @@ -155,13 +212,7 @@ def generated_pipeline( if __name__ == "__main__": from pathlib import Path -{% if workflow_engine.lower() == "tekton" %} - from kfp_tekton import compiler - - compiler.TektonCompiler().compile( -{% else %} kfp.compiler.Compiler().compile( -{% endif %} pipeline_func=generated_pipeline, package_path=Path(__file__).with_suffix(".yaml").name, ) \ No newline at end of file From c69c791966b019f65d54fb0aa85ee4141029f81d Mon Sep 17 00:00:00 2001 From: rkpattnaik780 Date: Tue, 9 Apr 2024 22:54:29 +0530 Subject: [PATCH 4/5] fix: remove helper method to set image pull policy --- .../kubeflow/v2/python_dsl_template.jinja2 | 15 --------------- 1 file changed, 15 deletions(-) diff --git a/elyra/templates/kubeflow/v2/python_dsl_template.jinja2 b/elyra/templates/kubeflow/v2/python_dsl_template.jinja2 index e7f80fe7c..45272bf89 100644 --- a/elyra/templates/kubeflow/v2/python_dsl_template.jinja2 +++ b/elyra/templates/kubeflow/v2/python_dsl_template.jinja2 @@ -72,18 +72,6 @@ def add_pod_annotation( return task -def set_image_pull_policy(task: PipelineTask, policy: str) -> PipelineTask: - - if policy not in ['Always', 'Never', 'IfNotPresent']: - raise ValueError( - 'Invalid imagePullPolicy. Must be one of `Always`, `Never`, `IfNotPresent`.' - ) - msg = common.get_existing_kubernetes_config_as_message(task) - msg.image_pull_policy = policy - task.platform_config['kubernetes'] = json_format.MessageToDict(msg) - - return task - # ------------------------------------------------------------------ # end of missing functions # ------------------------------------------------------------------ @@ -128,9 +116,6 @@ def generated_pipeline( {% endif %} {% endfor %} ) -{% if workflow_task.task_modifiers.image_pull_policy %} - set_image_pull_policy({{ task_name }}, "{{ workflow_task.task_modifiers.image_pull_policy }}") -{% endif %} {% if workflow_task.task_modifiers.object_storage_secret %} secret.use_secret_as_env({{ task_name }}, "{{ workflow_task.task_modifiers.object_storage_secret }}", { "AWS_ACCESS_KEY_ID": "AWS_ACCESS_KEY_ID", "AWS_SECRET_ACCESS_KEY": "AWS_SECRET_ACCESS_KEY" }) {% endif %} From bafd92770a12f34c7abe789d8fa4e8254c8d08be Mon Sep 17 00:00:00 2001 From: rkpattnaik780 Date: Wed, 10 Apr 2024 17:44:08 +0530 Subject: [PATCH 5/5] fix: remove helper method for add_toleration --- .../kubeflow/v2/python_dsl_template.jinja2 | 39 ------------------- 1 file changed, 39 deletions(-) diff --git a/elyra/templates/kubeflow/v2/python_dsl_template.jinja2 b/elyra/templates/kubeflow/v2/python_dsl_template.jinja2 index 45272bf89..f01fb2961 100644 --- a/elyra/templates/kubeflow/v2/python_dsl_template.jinja2 +++ b/elyra/templates/kubeflow/v2/python_dsl_template.jinja2 @@ -23,29 +23,6 @@ try: except ImportError: from typing_extensions import Literal -def add_toleration( - task: PipelineTask, - key: Optional[str] = None, - operator: Optional[Literal["Equal", "Exists"]] = None, - value: Optional[str] = None, - effect: Optional[Literal["NoExecute", "NoSchedule", "PreferNoSchedule"]] = None, - toleration_seconds: Optional[int] = None, -): - - msg = common.get_existing_kubernetes_config_as_message(task) - msg.tolerations.append( - pb.Toleration( - key=key, - operator=operator, - value=value, - effect=effect, - toleration_seconds=toleration_seconds, - ) - ) - task.platform_config["kubernetes"] = json_format.MessageToDict(msg) - - return task - def add_pod_label( task: PipelineTask, @@ -170,22 +147,6 @@ def generated_pipeline( {% endfor %} {% endif %} {% if workflow_task.task_modifiers.kubernetes_tolerations %} -{% for toleration_dict in workflow_task.task_modifiers.kubernetes_tolerations.values() %} - add_toleration( - {{ task_name }}, -{% if toleration_dict.effect %} - effect="{{ toleration_dict.effect }}", -{% else %} - effect=None, -{% endif %} -{% if toleration_dict.key %} - key="{{ toleration_dict.key }}", -{% else %} - key=None, -{% endif %} - operator="{{ toleration_dict.operator }}", - ) -{% endfor %} {% endif %} {# declare upstream dependencies -#} {% if workflow_task.upstream_workflow_task_ids %}