From c1a817e587cf84d2e97fedde0a428f9db6b43124 Mon Sep 17 00:00:00 2001 From: Ayush Kamat Date: Mon, 7 Aug 2023 10:53:24 -0700 Subject: [PATCH 1/2] bunch of removals Signed-off-by: Ayush Kamat --- latch/__init__.py | 24 -- latch/functions/operators.py | 188 --------------- latch/types/__init__.py | 16 -- latch/verified/__init__.py | 5 - latch_cli/main.py | 63 +---- latch_cli/services/deprecated/__init__.py | 3 - latch_cli/services/deprecated/mkdir.py | 49 ---- latch_cli/services/deprecated/rm.py | 45 ---- latch_cli/services/deprecated/touch.py | 47 ---- latch_cli/services/execute.py | 272 ---------------------- 10 files changed, 1 insertion(+), 711 deletions(-) delete mode 100644 latch/functions/operators.py delete mode 100644 latch_cli/services/deprecated/__init__.py delete mode 100644 latch_cli/services/deprecated/mkdir.py delete mode 100644 latch_cli/services/deprecated/rm.py delete mode 100644 latch_cli/services/deprecated/touch.py delete mode 100644 latch_cli/services/execute.py diff --git a/latch/__init__.py b/latch/__init__.py index f01f732a..fe4be1a8 100644 --- a/latch/__init__.py +++ b/latch/__init__.py @@ -3,27 +3,3 @@ A commandline toolchain to define and register serverless workflows with the Latch platform. """ - -from latch.functions.messages import message -from latch.functions.operators import ( - combine, - group_tuple, - inner_join, - latch_filter, - left_join, - outer_join, - right_join, -) -from latch.resources.conditional import create_conditional_section -from latch.resources.map_tasks import map_task -from latch.resources.reference_workflow import workflow_reference -from latch.resources.tasks import ( - custom_task, - custom_memory_optimized_task, - large_gpu_task, - large_task, - medium_task, - small_gpu_task, - small_task, -) -from latch.resources.workflow import workflow diff --git a/latch/functions/operators.py b/latch/functions/operators.py deleted file mode 100644 index db622f0a..00000000 --- a/latch/functions/operators.py +++ /dev/null @@ -1,188 +0,0 @@ -""" -DEPRECATED - -Mimics channel operators from Nextflow, using the correspondence Channel --> Python Dictionary -""" - -import re -from itertools import product -from typing import Any, Callable, Dict, List, Optional, Tuple, Union - - -def _combine(item1: Any, item2: Any): - """ - Combines two items for use in *_join functions. The rules followed are: - - - If both items are lists, the lists are concatenated - - If one of the items is a list, the other is appended to that list - - Otherwise, the output is a new list containing both items - - This is so that composition of joins works as expected. We also use list - addition so as to not modify the input items and instead return a new copy. - """ - - if isinstance(item1, List) and isinstance(item2, List): - return item1 + item2 - elif isinstance(item1, List): - return item1 + [item2] - elif isinstance(item2, List): - return [item1] + item2 - else: - return [item1, item2] - - -def left_join(left: Dict[str, Any], right: Dict[str, Any]) -> Dict[str, Any]: - """A standard left join of two dictionaries, joining on their keys""" - output = {} - for key in left: - if key in right: - output[key] = _combine(left[key], right[key]) - else: - output[key] = left[key] - return output - - -def right_join(left: Dict[str, Any], right: Dict[str, Any]) -> Dict[str, Any]: - """A standard right join of two dictionaries, joining on their keys""" - output = {} - for key in right: - if key in left: - output[key] = _combine(left[key], right[key]) - else: - output[key] = right[key] - return output - - -def inner_join(left: Dict[str, Any], right: Dict[str, Any]) -> Dict[str, Any]: - """A standard inner join of two dictionaries, joining on their keys""" - output = {} - for key in left: - if key in right: - output[key] = _combine(left[key], right[key]) - return output - - -def outer_join(left: Dict[str, Any], right: Dict[str, Any]) -> Dict[str, Any]: - """A standard outer join of two dictionaries, joining on their keys""" - output = {} - for key in left: - if key in right: - output[key] = _combine(left[key], right[key]) - else: - output[key] = left[key] - for key in right: - if key not in output: - output[key] = right[key] - return output - - -def group_tuple(channel: List[Tuple], key_index: Optional[int] = None) -> List[Tuple]: - """ - Operator to mimic the `groupTuple` construct from Nextflow: - - The `groupTuple` operator collects tuples (or lists) of values emitted - by the source channel grouping together the elements that share the same - key. Finally it emits a new tuple object for each distinct key collected. - - Args: - channel: A list of tuples to be grouped by key_index - key_index: Which index of the tuple to match against - if not provided, - defaults to 0 - - Example: - >>> channel = [(1,'A'), (1,'B'), (2,'C'), (3, 'B'), (1,'C'), (2, 'A'), (3, 'D')] - >>> group_tuple(channel) # key_index defaults to grouping by the first element (index 0) - [(1, ['A', 'B', 'C']), (2, ['C', 'A']), (3, 'B', 'D')] - """ - - output = {} - if key_index is None: - key_index = 0 - for element in channel: - if not (0 <= key_index < len(element)): - raise ValueError(f"Key Index {key_index} too large for element {element}") - key = element[key_index] - if key not in output: - output[key] = tuple( - [[k] if i != key_index else k for i, k in enumerate(element)] - ) - else: - for i, k in enumerate(element): - if i == key_index: - continue - output[key][i].append(k) - return list(output.values()) - - -def latch_filter( - channel: List[Any], - predicate: Union[Callable, re.Pattern, type, None], -) -> List[Any]: - """Filters a given list with either a predicate, a regex, or a type""" - if isinstance(predicate, Callable): - return list(filter(predicate, channel)) - elif isinstance(predicate, re.Pattern): - - def filter_func(list_item: Any): - if isinstance(list_item, str): - return predicate.match(list_item) - return False - - return list(filter(filter_func, channel)) - elif isinstance(predicate, type): - - def filter_func(list_item: Any): - return isinstance(list_item, predicate) - - return list(filter(filter_func, channel)) - else: - return channel - - -def combine( - channel_0: List[Any], - channel_1: List[Any], - by: Optional[int] = None, -) -> Union[List, Dict[str, List[Any]]]: - """ - Creates a Cartesian product of the two provided channels, with the option to first group - the elements of the channels by a certain index and then doing individual products on the - groups. - - Args: - channel_0: A list. If by is provided, all elements must be tuples of the same length - channel_1: A list. If by is provided, all elements must be tuples of the same length - as channel_0 - by: If provided, which index to group by first. - - Example: - >>> c0 = ['hello', 'ciao'] - >>> c1 = [1, 2, 3] - >>> combine(c0, c1) - [('hello', 1), ('hello', 2), ('hello', 3), ('ciao', 1), ('ciao', 2), ('ciao', 3)] - """ - if by is not None: - output = {} - for element in channel_0: - if not isinstance(element, tuple): - raise ValueError(f"`by` is provided, but {element} is not a tuple.") - if not (0 <= by < len(element)): - raise ValueError(f"Combine index {by} too large for element {element}") - if element[by] not in output: - output[element[by]] = [[], []] - output[element[by]][0].append(element[:by] + element[by + 1 :]) - for element in channel_1: - if not isinstance(element, tuple): - raise ValueError(f"`by` is provided, but {element} is not a tuple.") - if not (0 <= by < len(element)): - raise ValueError(f"Combine index {by} too large for element {element}") - if element[by] not in output: - output[element[by]] = [[], []] - output[element[by]][1].append(element[:by] + element[by + 1 :]) - final_output = [] - for key in output: - prod = list(product(*output[key])) - for p1, p2 in prod: - final_output.append((key,) + p1 + p2) - return final_output - return list(product(channel_0, channel_1)) diff --git a/latch/types/__init__.py b/latch/types/__init__.py index fca24155..e69de29b 100644 --- a/latch/types/__init__.py +++ b/latch/types/__init__.py @@ -1,16 +0,0 @@ -from latch.types.directory import LatchDir, LatchOutputDir -from latch.types.file import LatchFile, LatchOutputFile -from latch.types.glob import file_glob -from latch.types.metadata import ( - Fork, - ForkBranch, - LatchAppearanceType, - LatchAuthor, - LatchMetadata, - LatchParameter, - LatchRule, - Params, - Section, - Spoiler, - Text, -) diff --git a/latch/verified/__init__.py b/latch/verified/__init__.py index 2d281a3c..e69de29b 100644 --- a/latch/verified/__init__.py +++ b/latch/verified/__init__.py @@ -1,5 +0,0 @@ -from latch.verified.deseq2 import deseq2_wf -from latch.verified.mafft import mafft -from latch.verified.pathway import gene_ontology_pathway_analysis -from latch.verified.rnaseq import rnaseq -from latch.verified.trim_galore import trim_galore diff --git a/latch_cli/main.py b/latch_cli/main.py index 136a2df9..93e4f239 100644 --- a/latch_cli/main.py +++ b/latch_cli/main.py @@ -459,6 +459,7 @@ def get_params(wf_name: Union[str, None], version: Union[str, None] = None): ) +# todo(ayush): rewrite with gql @main.command("get-wf") @click.option( "--name", @@ -504,68 +505,6 @@ def open_remote_file(remote_file: str): click.secho(f"Successfully opened {remote_file}.", fg="green") -@main.command("rm") -@click.argument("remote_path", nargs=1, type=str) -def rm(remote_path: str): - """Deletes a remote entity.""" - crash_handler.message = f"Unable to delete {remote_path}" - crash_handler.pkg_root = str(Path.cwd()) - - from latch_cli.services.deprecated.rm import rm - - click.secho( - f"Warning: `latch rm` is deprecated and will be removed soon.", fg="yellow" - ) - rm(remote_path) - click.secho(f"Successfully deleted {remote_path}.", fg="green") - - -@main.command("mkdir") -@click.argument("remote_directory", nargs=1, type=str) -def mkdir(remote_directory: str): - """Creates a new remote directory.""" - crash_handler.message = f"Unable to create directory {remote_directory}" - crash_handler.pkg_root = str(Path.cwd()) - - from latch_cli.services.deprecated.mkdir import mkdir - - click.secho( - f"Warning: `latch mkdir` is deprecated and will be removed soon.", - fg="yellow", - ) - mkdir(remote_directory) - click.secho(f"Successfully created directory {remote_directory}.", fg="green") - - -@main.command("touch") -@click.argument("remote_file", nargs=1, type=str) -def touch(remote_file: str): - """Creates an empty text file.""" - crash_handler.message = f"Unable to create {remote_file}" - crash_handler.pkg_root = str(Path.cwd()) - - from latch_cli.services.deprecated.touch import touch - - click.secho( - f"Warning: `latch touch` is deprecated and will be removed soon.", - fg="yellow", - ) - touch(remote_file) - click.secho(f"Successfully touched {remote_file}.", fg="green") - - -@main.command("exec") -@click.argument("task_name", nargs=1, type=str) -def execute(task_name: str): - """Drops the user into an interactive shell from within a task.""" - crash_handler.message = f"Unable to exec into {task_name}" - crash_handler.pkg_root = str(Path.cwd()) - - from latch_cli.services.execute import execute - - execute(task_name) - - @main.command("preview") @click.argument("pkg_root", nargs=1, type=click.Path(exists=True, path_type=Path)) def preview(pkg_root: Path): diff --git a/latch_cli/services/deprecated/__init__.py b/latch_cli/services/deprecated/__init__.py deleted file mode 100644 index 32805624..00000000 --- a/latch_cli/services/deprecated/__init__.py +++ /dev/null @@ -1,3 +0,0 @@ -""" -Deprecated functions that will be removed in the near future -""" diff --git a/latch_cli/services/deprecated/mkdir.py b/latch_cli/services/deprecated/mkdir.py deleted file mode 100644 index 94f58c3f..00000000 --- a/latch_cli/services/deprecated/mkdir.py +++ /dev/null @@ -1,49 +0,0 @@ -from latch_sdk_config.latch import config - -import latch_cli.tinyrequests as tinyrequests -from latch_cli.utils import _normalize_remote_path, current_workspace, retrieve_or_login - - -def mkdir(remote_directory): - """Creates an empty directory on Latch - - Args: - remote_directory: A valid path to a remote destination, of the form - - [latch://] [/] dir_1/dir_2/.../dir_n/dir_name, - - where dir_name is the name of the new directory to be created. - Every directory in the path (dir_i) must already exist. - - This function will create a directory at the specified path in Latch. Will error if - the path is invalid or if an upstream directory does not exist. If a directory with the - same name already exists, this will make a new directory with an indexed name (see below). - - Example: :: - - mkdir("sample") # sample doesn't already exist - - Creates a new empty directory visible in Latch Console called sample, located in - the root of the user's Latch filesystem - - mkdir("latch:///dir1/dir2/sample") # dir1/dir2/sample already exists - - Creates a new directory visible in Latch Console called "sample\ 1" (note the - escaped space), located in the nested directory dir1/dir2/. - - mkdir("/dir1/doesnt_exist/dir2/sample.txt") # doesnt_exist doesn't exist - - Will throw an error, as this operation tries to create a directory - inside of a directory that doesn't exist. - """ - token = retrieve_or_login() - remote_directory = _normalize_remote_path(remote_directory) - - headers = {"Authorization": f"Bearer {token}"} - data = {"directory": remote_directory, "ws_account_id": current_workspace()} - - response = tinyrequests.post(config.api.data.mkdir, headers=headers, json=data) - json_data = response.json() - - if not json_data["success"]: - raise ValueError(json_data["error"]["data"]["message"]) diff --git a/latch_cli/services/deprecated/rm.py b/latch_cli/services/deprecated/rm.py deleted file mode 100644 index a88d26c1..00000000 --- a/latch_cli/services/deprecated/rm.py +++ /dev/null @@ -1,45 +0,0 @@ -import requests -from latch_sdk_config.latch import config - -from latch_cli.utils import _normalize_remote_path, current_workspace, retrieve_or_login - - -def rm(remote_path: str): - """Deletes an entity on Latch - - Args: - remote_path: A valid path to a remote destination, of the form - - [latch://] [/] dir_1/dir_2/.../dir_n/entity_name, - - where entity_name is the name of the entity to be removed. - - This function will remove the entity at the remote path specified recursively - (like rm -r on POSIX systems), and will error if the remote path specified is - invalid or if the entity doesn't exist. - - Example: :: - - rm("sample.txt") # sample.txt exists - - Removes the existing file sample.txt from Latch. - - rm("latch:///dir1/dir2") # dir1/dir2/ exists and is nonempty - - Removes the directory dir1/dir2 along with all of its contents. - - rm("/dir1/dir3/dir2/doesnt_exist.txt") # doesnt_exist.txt doesn't exist - - Will throw an error, as this operation tries to remove a file - that doesn't exist. - """ - token = retrieve_or_login() - remote_path = _normalize_remote_path(remote_path) - - data = {"filename": remote_path, "ws_account_id": current_workspace()} - headers = {"Authorization": f"Bearer {token}"} - response = requests.post(config.api.data.remove, headers=headers, json=data) - - data = response.json() - if not data["success"]: - raise ValueError(data["error"]["data"]["message"]) diff --git a/latch_cli/services/deprecated/touch.py b/latch_cli/services/deprecated/touch.py deleted file mode 100644 index 05aca798..00000000 --- a/latch_cli/services/deprecated/touch.py +++ /dev/null @@ -1,47 +0,0 @@ -import requests -from latch_sdk_config.latch import config - -from latch_cli.utils import _normalize_remote_path, current_workspace, retrieve_or_login - - -def touch(remote_file: str): - """Creates an empty text file on Latch - - Args: - remote_file: A valid path to a remote destination, of the form - - [latch://] [/] dir_1/dir_2/.../dir_n/filename, - - where filename is the name of the new file to be created. - Every directory in the path (dir_i) must already exist. - - This function will create a node at the specified path in Latch, and directly - create an empty file in AWS S3 using the boto3. It will error if the remote_path - is invalid (i.e. if it contains a directory which doesn't exist). - - Example: :: - - touch("sample.txt") - - Creates a new empty file visible in Latch Console called sample.txt, located in - the root of the user's Latch filesystem - - touch("latch:///dir1/dir2/sample.txt") - - Creates a new file visible in Latch Console called sample.fa, located in - the nested directory /dir1/dir2/ - - touch("/dir1/doesnt_exist/dir2/sample.txt") # doesnt_exist doesn't exist - - Will throw an error, as this operation tries to create a file inside of a - directory that doesn't exist. - """ - remote_file = _normalize_remote_path(remote_file) - token = retrieve_or_login() - headers = {"Authorization": f"Bearer {token}"} - data = {"filename": remote_file, "ws_account_id": current_workspace()} - - response = requests.post(config.api.data.touch, json=data, headers=headers) - json_data = response.json() - if not json_data["success"]: - raise ValueError(data["error"]["data"]["message"]) diff --git a/latch_cli/services/execute.py b/latch_cli/services/execute.py deleted file mode 100644 index e4f22a64..00000000 --- a/latch_cli/services/execute.py +++ /dev/null @@ -1,272 +0,0 @@ -import json -import os -import select -import sys -import termios -import textwrap -from pathlib import Path -from tty import setraw -from typing import Tuple - -import kubernetes -import requests -import websocket -from kubernetes.client.api import core_v1_api -from kubernetes.stream import stream -from latch_sdk_config.latch import config - -from latch_cli.utils import account_id_from_token, current_workspace, retrieve_or_login - - -def _construct_kubeconfig( - cert_auth_data: str, - cluster_endpoint: str, - account_id: str, - access_key: str, - secret_key: str, - session_token: str, -) -> str: - open_brack = "{" - close_brack = "}" - region_code = "us-west-2" - cluster_name = "prion-prod" - - return textwrap.dedent(f"""apiVersion: v1 -clusters: -- cluster: - certificate-authority-data: {cert_auth_data} - server: {cluster_endpoint} - name: arn:aws:eks:{region_code}:{account_id}:cluster/{cluster_name} -contexts: -- context: - cluster: arn:aws:eks:{region_code}:{account_id}:cluster/{cluster_name} - user: arn:aws:eks:{region_code}:{account_id}:cluster/{cluster_name} - name: arn:aws:eks:{region_code}:{account_id}:cluster/{cluster_name} -current-context: arn:aws:eks:{region_code}:{account_id}:cluster/{cluster_name} -kind: Config -preferences: {open_brack}{close_brack} -users: -- name: arn:aws:eks:{region_code}:{account_id}:cluster/{cluster_name} - user: - exec: - apiVersion: client.authentication.k8s.io/v1beta1 - command: aws - args: - - --region - - {region_code} - - eks - - get-token - - --cluster-name - - {cluster_name} - env: - - name: 'AWS_ACCESS_KEY_ID' - value: '{access_key}' - - name: 'AWS_SECRET_ACCESS_KEY' - value: '{secret_key}' - - name: 'AWS_SESSION_TOKEN' - value: '{session_token}'""") - - -def _fetch_pod_info(token: str, task_name: str) -> Tuple[str, str, str]: - headers = {"Authorization": f"Bearer {token}"} - data = {"task_name": task_name, "ws_account_id": current_workspace()} - - response = requests.post(config.api.execution.exec, headers=headers, json=data) - - try: - response = response.json() - access_key = response["tmp_access_key"] - secret_key = response["tmp_secret_key"] - session_token = response["tmp_session_token"] - cert_auth_data = response["cert_auth_data"] - cluster_endpoint = response["cluster_endpoint"] - namespace = response["namespace"] - aws_account_id = response["aws_account_id"] - except KeyError as err: - raise ValueError(f"malformed response: {response}") from err - - return ( - access_key, - secret_key, - session_token, - cert_auth_data, - cluster_endpoint, - namespace, - aws_account_id, - ) - - -def execute(task_name: str): - """Allows a user to start an interactive shell session in the remote machine - that a task is running on. - - When running a workflow on Latch, its often helpful while debugging to have - a direct way of interacting with the machines on which tasks are run. Using - `execute`, a user can easily get a shell into the machine on which the - specified task is running. - - Args: - task_name: The name of the running task you want a shell into. This is a - hash that can be found in the sidebar in the browser display of the - running workflow. - - Example: - >>> execute("abcd1234-n0") - root@1.2.3.4:~$ - - """ - - token = retrieve_or_login() - ( - access_key, - secret_key, - session_token, - cert_auth_data, - cluster_endpoint, - namespace, - aws_account_id, - ) = _fetch_pod_info(token, task_name) - - account_id = account_id_from_token(token) - if int(account_id) < 10: - account_id = f"x{account_id}" - - config_data = _construct_kubeconfig( - cert_auth_data, - cluster_endpoint, - aws_account_id, - access_key, - secret_key, - session_token, - ) - config_file = Path("config").resolve() - - with open(config_file, "w") as c: - c.write(config_data) - - kubernetes.config.load_kube_config("config") - - core_v1 = core_v1_api.CoreV1Api() - - # TODO - pod_name = task_name - - stdin_channel = bytes([kubernetes.stream.ws_client.STDIN_CHANNEL]) - stdout_channel = kubernetes.stream.ws_client.STDOUT_CHANNEL - stderr_channel = kubernetes.stream.ws_client.STDERR_CHANNEL - - class WSStream: - def __init__(self): - self._wssock = stream( - core_v1.connect_get_namespaced_pod_exec, - pod_name, - namespace, - command=["/bin/sh"], - stderr=True, - stdin=True, - stdout=True, - tty=True, - _preload_content=False, - ).sock - - def send(self, chunk: bytes): - self._wssock.send(stdin_channel + chunk, websocket.ABNF.OPCODE_BINARY) - - def get_frame( - self, - ) -> Tuple[int, websocket.ABNF]: - return self._wssock.recv_data_frame(True) - - @property - def socket(self): - return self._wssock.sock - - def close(self): - self._wssock.close() - - class TTY: - def __init__( - self, - in_stream: int, - out_stream: int, - err_stream: int, - raw: bool = True, - ): - if raw: - setraw(sys.stdin.fileno()) - - self._stdin = in_stream - self._stdout = out_stream - self._stderr = err_stream - - def flush(self) -> bytes: - return os.read(self._stdin, 32 * 1024) - - def write_out(self, chunk: bytes): - os.write(self._stdout, chunk) - - def write_err(self, chunk: bytes): - os.write(self._stderr, chunk) - - @property - def in_stream(self): - return self._stdin - - try: - old_settings = termios.tcgetattr(sys.stdin.fileno()) - - tty_ = TTY( - sys.stdin.fileno(), - sys.stdout.fileno(), - sys.stderr.fileno(), - ) - try: - wsstream = WSStream() - except kubernetes.client.rest.ApiException: - raise ValueError( - "Unable to find requested task name - make sure that you are in the" - " correct workspace." - ) - - rlist = [wsstream.socket, tty_.in_stream] - while True: - rs, _, _ = select.select(rlist, [], []) - - if tty_.in_stream in rs: - chunk = tty_.flush() - if len(chunk) > 0: - wsstream.send(chunk) - - if wsstream.socket in rs: - opcode, frame = wsstream.get_frame() - if opcode == websocket.ABNF.OPCODE_CLOSE: - rlist.remove(wsstream.socket) - - elif opcode == websocket.ABNF.OPCODE_BINARY: - channel = frame.data[0] - chunk = frame.data[1:] - if channel in (stdout_channel, stderr_channel): - if len(chunk): - if channel == stdout_channel: - tty_.write_out(chunk) - else: - tty_.write_err(chunk) - elif channel == kubernetes.stream.ws_client.ERROR_CHANNEL: - wsstream.close() - error = json.loads(chunk) - if error["status"] == "Success": - break - raise websocket.WebSocketException( - f"Status: {error['status']} - Message: {error['message']}" - ) - else: - raise websocket.WebSocketException( - f"Unexpected channel: {channel}" - ) - - else: - raise websocket.WebSocketException( - f"Unexpected websocket opcode: {opcode}" - ) - finally: - termios.tcsetattr(sys.stdin.fileno(), termios.TCSANOW, old_settings) From f68dbb0088e7e5760a30e750de80e4cc69ca7e34 Mon Sep 17 00:00:00 2001 From: Ayush Kamat Date: Mon, 7 Aug 2023 11:26:05 -0700 Subject: [PATCH 2/2] remove launch + get_params Signed-off-by: Ayush Kamat --- latch_cli/main.py | 49 ---- latch_cli/services/get_params.py | 377 ------------------------------- latch_cli/services/launch.py | 229 ------------------- tests/test_launch.py | 102 --------- 4 files changed, 757 deletions(-) delete mode 100644 latch_cli/services/get_params.py delete mode 100644 latch_cli/services/launch.py delete mode 100644 tests/test_launch.py diff --git a/latch_cli/main.py b/latch_cli/main.py index 93e4f239..f663daca 100644 --- a/latch_cli/main.py +++ b/latch_cli/main.py @@ -410,55 +410,6 @@ def pad_styled(x: str, l: int, align_right=False): ) -@main.command("launch") -@click.argument("params_file", nargs=1, type=click.Path(exists=True)) -@click.option( - "--version", - default=None, - help="The version of the workflow to launch. Defaults to latest.", -) -def launch(params_file: Path, version: Union[str, None] = None): - """Launch a workflow using a python parameter map.""" - - crash_handler.message = f"Unable to launch workflow" - crash_handler.pkg_root = str(Path.cwd()) - - from latch_cli.services.launch import launch - - wf_name = launch(params_file, version) - if version is None: - version = "latest" - - click.secho( - f"Successfully launched workflow named {wf_name} with version {version}.", - fg="green", - ) - - -@main.command("get-params") -@click.argument("wf_name", nargs=1) -@click.option( - "--version", - default=None, - help="The version of the workflow. Defaults to latest.", -) -def get_params(wf_name: Union[str, None], version: Union[str, None] = None): - """Generate a python parameter map for a workflow.""" - crash_handler.message = "Unable to generate param map for workflow" - crash_handler.pkg_root = str(Path.cwd()) - - from latch_cli.services.get_params import get_params - - get_params(wf_name, version) - if version is None: - version = "latest" - click.secho( - f"Successfully generated python param map named {wf_name}.params.py with" - f" version {version}\n Run `latch launch {wf_name}.params.py` to launch it.", - fg="green", - ) - - # todo(ayush): rewrite with gql @main.command("get-wf") @click.option( diff --git a/latch_cli/services/get_params.py b/latch_cli/services/get_params.py deleted file mode 100644 index 2aaf5320..00000000 --- a/latch_cli/services/get_params.py +++ /dev/null @@ -1,377 +0,0 @@ -try: - from typing import get_args, get_origin -except ImportError: - from typing_extensions import get_args, get_origin - -import enum -import json -import keyword -import typing -from typing import Optional - -import google.protobuf.json_format as gpjson -from flyteidl.core.literals_pb2 import Literal as _Literal -from flyteidl.core.types_pb2 import LiteralType as _LiteralType -from flytekit.models.literals import Literal -from flytekit.models.types import LiteralType - -from latch.types.directory import LatchDir -from latch.types.file import LatchFile -from latch_cli.services.launch import _get_workflow_interface -from latch_cli.utils import retrieve_or_login - - -class _Unsupported: - ... - - -_simple_table = { - 0: type(None), - 1: int, - 2: float, - 3: str, - 4: bool, - 5: _Unsupported, - 6: _Unsupported, - 7: _Unsupported, - 8: _Unsupported, - 9: _Unsupported, -} - -_primitive_table = { - type(None): None, - int: 0, - float: 0.0, - str: "foo", - bool: False, -} - -# TODO(ayush): fix this to -# (1) support records, -# (2) support fully qualified workflow names, -# (note from kenny) - pretty sure you intend to support the opposite, -# fqn are supported by default, address when you get to this todo -# (3) show a message indicating the generated filename, -# (4) optionally specify the output filename - - -def get_params(wf_name: str, wf_version: Optional[str] = None): - """Constructs a parameter map for a workflow given its name and an optional - version. - - This function creates a python parameter file that can be used by `launch`. - You can specify the specific parameters by editing the file, and then launch - an execution on Latch using those parameters with `launch`. - - Args: - wf_name: The unique name of the workflow. - wf_version: An optional workflow version. If this argument is not given, - `get_params` will default to generating a parameter map of the most - recent version of the workflow. - - Example: - >>> get_params("wf.__init__.alphafold_wf") - # creates a file called `wf.__init__.alphafold_wf.params.py` that - # contains a template parameter map. - """ - - token = retrieve_or_login() - wf_id, wf_interface, wf_default_params = _get_workflow_interface( - token, wf_name, wf_version - ) - - params = {} - wf_vars = wf_interface["variables"] - default_wf_vars = wf_default_params["parameters"] - for key, value in wf_vars.items(): - try: - description_json = json.loads(value["description"]) - param_name = description_json["name"] - except (json.decoder.JSONDecodeError, KeyError) as e: - raise ValueError( - f"Parameter description json for workflow {wf_name} is malformed" - ) from e - - literal_type_json = value["type"] - literal_type = gpjson.ParseDict(literal_type_json, _LiteralType()) - - python_type = _guess_python_type( - LiteralType.from_flyte_idl(literal_type), param_name - ) - - default = True - if default_wf_vars[param_name].get("required") is not True: - literal_json = default_wf_vars[param_name].get("default") - literal = gpjson.ParseDict(literal_json, _Literal()) - val = _guess_python_val(Literal.from_flyte_idl(literal), python_type) - else: - default = False - val = _best_effort_default_val(python_type) - - params[param_name] = (python_type, val, default) - - import_statements = { - LatchFile: "from latch.types import LatchFile", - LatchDir: "from latch.types import LatchDir", - enum.Enum: "from enum import Enum", - } - - import_types = [] - enum_literals = [] - param_map_str = "" - param_map_str += "\nparams = {" - param_map_str += f'\n "_name": "{wf_name}", # Don\'t edit this value.' - for param_name, value in params.items(): - python_type, python_val, default = value - - # Check for imports. - - def _check_and_import(python_type: typing.T): - if python_type in import_statements and python_type not in import_types: - import_types.append(python_type) - - def _handle_enum(python_type: typing.T): - if type(python_type) is enum.EnumMeta: - if enum.Enum not in import_types: - import_types.append(enum.Enum) - - variants = python_type._variants - name = python_type._name - - _enum_literal = f"class {name}(Enum):" - for variant in variants: - if variant in keyword.kwlist: - variant_name = f"_{variant}" - else: - variant_name = variant - _enum_literal += f"\n {variant_name} = '{variant}'" - enum_literals.append(_enum_literal) - - # Parse collection, union types for potential imports and dependent - # objects, eg. enum class construction. - if get_origin(python_type) is not None: - if get_origin(python_type) is list: - _check_and_import(get_args(python_type)[0]) - _handle_enum(get_args(python_type)[0]) - elif get_origin(python_type) is typing.Union: - for variant in get_args(python_type): - _check_and_import(variant) - _handle_enum(variant) - else: - _check_and_import(python_type) - _handle_enum(python_type) - - python_val, python_type = _get_code_literal(python_val, python_type) - - if default is True: - default = "DEFAULT. " - else: - default = "" - - param_map_str += f'\n "{param_name}": {python_val}, # {default}{python_type}' - param_map_str += "\n}" - - with open(f"{wf_name}.params.py", "w") as f: - f.write( - f'"""Run `latch launch {wf_name}.params.py` to launch this workflow"""\n' - ) - - for t in import_types: - f.write(f"\n{import_statements[t]}") - for e in enum_literals: - f.write(f"\n\n{e}\n") - - f.write("\n") - f.write(param_map_str) - - -def _get_code_literal(python_val: any, python_type: typing.T): - """Construct value that is executable python when templated into a code - block.""" - - if python_type is str or (type(python_val) is str and str in get_args(python_type)): - return f'"{python_val}"', python_type - - if type(python_type) is enum.EnumMeta: - name = python_type._name - return python_val, f"" - - if get_origin(python_type) is typing.Union: - variants = get_args(python_type) - type_repr = "typing.Union[" - for i, variant in enumerate(variants): - if i < len(variants) - 1: - delimiter = ", " - else: - delimiter = "" - type_repr += f"{_get_code_literal(python_val, variant)[1]}{delimiter}" - type_repr += "]" - return python_val, type_repr - - if get_origin(python_type) is list: - if python_val is None: - _, type_repr = _get_code_literal(None, get_args(python_type)[0]) - return None, f"typing.List[{type_repr}]" - else: - collection_literal = "[" - if len(python_val) > 0: - for i, item in enumerate(python_val): - item_literal, type_repr = _get_code_literal( - item, get_args(python_type)[0] - ) - - if i < len(python_val) - 1: - delimiter = "," - else: - delimiter = "" - - collection_literal += f"{item_literal}{delimiter}" - else: - list_t = get_args(python_type)[0] - _, type_repr = _get_code_literal( - _best_effort_default_val(list_t), list_t - ) - - collection_literal += "]" - return collection_literal, f"typing.List[{type_repr}]" - - return python_val, python_type - - -def _guess_python_val(literal: _Literal, python_type: typing.T): - """Transform flyte literal value to native python value.""" - - if literal.scalar is not None: - if literal.scalar.none_type is not None: - return None - - if literal.scalar.primitive is not None: - primitive = literal.scalar.primitive - - if primitive.string_value is not None: - if type(python_type) is enum.EnumMeta: - return f"{python_type._name}.{str(primitive.string_value)}" - return str(primitive.string_value) - - if primitive.integer is not None: - return int(primitive.integer) - if primitive.float_value is not None: - return float(primitive.float_value) - if primitive.boolean is not None: - return bool(primitive.boolean) - - if literal.scalar.blob is not None: - blob = literal.scalar.blob - dim = blob.metadata.type.dimensionality - if dim == 0: - return LatchFile(blob.uri) - else: - return LatchDir(blob.uri) - - # collection - if literal.collection is not None: - p_list = [] - for item in literal.collection.literals: - p_list.append(_guess_python_val(item, get_args(python_type)[0])) - return p_list - - # sum - - # enum - - raise NotImplementedError( - f"The flyte literal {literal} cannot be transformed to a python type." - ) - - -def _guess_python_type(literal: LiteralType, param_name: str): - """Transform flyte type literal to native python type.""" - - if literal.simple is not None: - return _simple_table[literal.simple] - - if literal.collection_type is not None: - return typing.List[_guess_python_type(literal.collection_type, param_name)] - - if literal.blob is not None: - # flyteidl BlobType message for reference: - # enum BlobDimensionality { - # SINGLE = 0; - # MULTIPART = 1; - # } - - dim = literal.blob.dimensionality - if dim == 0: - return LatchFile - else: - return LatchDir - - if literal.union_type is not None: - variant_types = [ - _guess_python_type(variant, param_name) - for variant in literal.union_type.variants - ] - - # Trying to directly construct set of types will throw error if list is - # included as 'list' is not hashable. - unique_variants = [] - for t in variant_types: - if t not in unique_variants: - unique_variants.append(t) - - return typing.Union[tuple(variant_types)] - - if literal.enum_type is not None: - # We can hold the variants a proxy class that is also type 'Enum', s.t. - # we can parse the variants and define the object in the param map - # code. - - class _VariantCarrier(enum.Enum): - ... - - _VariantCarrier._variants = literal.enum_type.values - # Use param name to uniquely identify each enum - _VariantCarrier._name = param_name - return _VariantCarrier - - raise NotImplementedError( - f"The flyte literal {literal} cannot be transformed to a python type." - ) - - -def _best_effort_default_val(t: typing.T): - """Produce a "best-effort" default value given a python type.""" - - if t in _primitive_table: - return _primitive_table[t] - - if t is list: - return [] - - file_like_table = { - LatchDir: LatchDir("latch:///foobar"), - LatchFile: LatchFile("latch:///foobar"), - } - if t in file_like_table: - return file_like_table[t] - - if type(t) is enum.EnumMeta: - return f"{t._name}.{t._variants[0]}" - - if get_origin(t) is None: - raise NotImplementedError( - f"Unable to produce a best-effort value for the python type {t}" - ) - - if get_origin(t) is list: - list_args = get_args(t) - if len(list_args) == 0: - return [] - return [_best_effort_default_val(arg) for arg in list_args] - - if get_origin(t) is typing.Union: - return _best_effort_default_val(get_args(t)[0]) - - raise NotImplementedError( - f"Unable to produce a best-effort value for the python type {t}" - ) diff --git a/latch_cli/services/launch.py b/latch_cli/services/launch.py deleted file mode 100644 index 64a8d88f..00000000 --- a/latch_cli/services/launch.py +++ /dev/null @@ -1,229 +0,0 @@ -"""Service to launch a workflow.""" - -import importlib.util -import typing -from pathlib import Path -from typing import Optional, Tuple, Union - -import google.protobuf.json_format as gpjson -import requests -from flyteidl.core.types_pb2 import LiteralType -from flytekit.core.context_manager import FlyteContextManager -from flytekit.core.type_engine import TypeEngine -from latch_sdk_config.latch import config -from requests.adapters import HTTPAdapter -from urllib3.util.retry import Retry - -from latch_cli.utils import current_workspace, retrieve_or_login - - -def launch(params_file: Path, version: Optional[str] = None) -> str: - """Launches a (versioned) workflow with parameters specified in python. - - Using a parameter map written in python (this can be generated for you with - `get_params`), this function will launch the workflow specified in the file - using the parameters therein. This function also accepts an optional - `version` parameter to further specify the workflow to be run. If it is not - provided, this function will default to running the latest version of the - specified workflow. - - Args: - params_file: A path pointing to a python parameter file containing a - function call that represents the workflow execution with valid - parameter values. - version: An optional workflow version to launch, defaulting to the - latest if not provided. - - Returns: - The name of the workflow. - - Example: - >>> launch(Path("wf.__init__.assemble_and_sort.params.py")) - # Launches an execution of `wf.__init__.assemble_and_sort` with the - # parameters specified in the referenced file. - """ - - token = retrieve_or_login() - - with open(params_file, "r") as pf: - param_code = pf.read() - spec = importlib.util.spec_from_loader("wf_params", loader=None) - param_module = importlib.util.module_from_spec(spec) - exec(param_code, param_module.__dict__) - - module_vars = vars(param_module) - try: - wf_params = module_vars["params"] - except KeyError as e: - raise ValueError( - f"Execution file {params_file.name} needs to have" - " a parameter value dictionary named 'params'" - ) from e - - wf_name = wf_params.get("_name") - if wf_name is None: - raise ValueError( - f"The dictionary of parameters in the launch file lacks the" - f" _name key used to identify the workflow. Make sure a _name" - f" key with the workflow name exists in the dictionary." - ) - - wf_id, wf_interface, _ = _get_workflow_interface(token, wf_name, version) - - wf_vars = wf_interface["variables"] - wf_literals = {} - for key, value in wf_vars.items(): - ctx = FlyteContextManager.current_context() - literal_type_json = value["type"] - literal_type = gpjson.ParseDict(literal_type_json, LiteralType()) - - if key in wf_params: - python_value = wf_params[key] - # Recover parameterized generics for TypeTransformer. - python_type = _guess_python_type(python_value) - - python_type_literal = TypeEngine.to_literal( - ctx, python_value, python_type, literal_type - ) - - wf_literals[key] = gpjson.MessageToDict(python_type_literal.to_flyte_idl()) - - _launch_workflow(token, wf_id, wf_literals) - return wf_name - - -def _guess_python_type(v: any) -> typing.T: - """Python literal guesser. - - We will attempt to construct the correct python type representation from the - value and JSON type representation and rely on the TypeTransformer to produce - the correct flyte literal for execution (FlyteIDL representation of the value). - This is essentially how flytekit does it. - - Using the type() function alone is not sufficient because flyte interprets - the python list literal as a generic collection type and needs a - parameterization. - - For example: - - .. - >> type(["AUG", "AAA"]) = list - - - Becomes List[str] s.t. - - .. - >> TypeEngine.to_literal(ctx, ["AUG", "AAA"], List[str], type_literal) - - Returns our desired flyte literal. - """ - - if type(v) is list: - if len(v) == 0: - return typing.List[None] - elif type(v[0]) is list: - return typing.List[_guess_python_type(v[0])] - else: - return typing.List[type(v[0])] - - # TODO: maps, Records, future complex types - - return type(v) - - -def _get_workflow_interface( - token: str, wf_name: str, version: Union[None, str] -) -> Tuple[int, dict]: - """Retrieves the set of idl parameter values for a given workflow by name. - - Returns workflow id + interface as JSON string. - """ - - headers = {"Authorization": f"Bearer {token}"} - _interface_request = { - "workflow_name": wf_name, - "version": version, - "ws_account_id": current_workspace(), - } - - url = config.api.workflow.interface - - # TODO(ayush) - figure out why timeout within this endpoint only. - session = requests.Session() - retries = 5 - retry = Retry( - total=retries, - read=retries, - connect=retries, - method_whitelist=False, - ) - adapter = HTTPAdapter(max_retries=retry) - session.mount("http://", adapter) - session.mount("https://", adapter) - - response = session.post(url, headers=headers, json=_interface_request) - - wf_interface_resp = response.json() - - wf_id, wf_interface, wf_default_params = ( - wf_interface_resp.get("id"), - wf_interface_resp.get("interface"), - wf_interface_resp.get("default_params"), - ) - if wf_interface is None: - raise ValueError( - "Could not find interface. Nucleus returned a malformed JSON response -" - f" {wf_interface_resp}" - ) - if wf_id is None: - raise ValueError( - "Could not find wf ID. Nucleus returned a malformed JSON response -" - f" {wf_interface_resp}" - ) - if wf_default_params is None: - raise ValueError( - "Could not find wf default parameters. Nucleus returned a malformed JSON" - f" response - {wf_interface_resp}" - ) - - return int(wf_id), wf_interface, wf_default_params - - -def _launch_workflow(token: str, wf_id: str, params: dict) -> bool: - """Launch the workflow of given id with parameter map. - - Return True if success - """ - - # TODO (kenny) - pull out to consolidated requests class - # Server sometimes stalls on requests with python user-agent - headers = { - "Authorization": f"Bearer {token}", - "User-Agent": ( - "Mozilla/5.0 (Windows NT 6.1; Win64; x64) AppleWebKit/537.36 (KHTML, like" - " Gecko) Chrome/72.0.3626.119 Safari/537.36" - ), - } - - _interface_request = { - "workflow_id": str(wf_id), - "params": params, - "ws_account_id": current_workspace(), - } - url = config.api.execution.create - - response = requests.post(url, headers=headers, json=_interface_request) - - if response.status_code == 403: - raise PermissionError( - "You need access to the latch sdk beta ~ join the waitlist @" - " https://latch.bio/sdk" - ) - elif response.status_code == 401: - raise ValueError( - "your token has expired - please run latch login to refresh your token and" - " try again." - ) - wf_interface_resp = response.json() - - return wf_interface_resp.get("success") is True diff --git a/tests/test_launch.py b/tests/test_launch.py deleted file mode 100644 index a6bd20fa..00000000 --- a/tests/test_launch.py +++ /dev/null @@ -1,102 +0,0 @@ -""" -test.test_launch -~~~ - - - -""" - -from tempfile import NamedTemporaryFile - -from latch_cli.services.launch import launch - -simple_plan = """from latch.types import LatchFile - -params = { - "_name": "wf.__init__.assemble_and_sort", - "read1": LatchFile("latch:///read1"), - "read2": LatchFile("latch:///read2"), -}""" - -crispresso_plan = """from latch.types import LatchFile, LatchDir - - -params = { - "_name": "wf.__init__.crispresso2_wf", - "output_folder": LatchDir("latch:///CRISPResso2_output/"), - "fastq_r1": LatchFile("s3://latch-public/welcome/CRISPResso2/nhej.r1.fastq.gz"), - "fastq_r2": LatchFile("s3://latch-public/welcome/CRISPResso2/nhej.r2.fastq.gz"), - "amplicon_seq": [ - "AATGTCCCCCAATGGGAAGTTCATCTGGCACTGCCCACAGGTGAGGAGGTCATGATCCCCTTCTGGAGCTCCCAACGGGCCGTGGTCTGGTTCATCATCTGTAAGAATGGCTTCAAGAGGCTCGGCTGTGGTT" - ], - "name": "nhej", -}""" - -rnaseq_plan = """from latch.types import LatchFile, LatchDir -from enum import Enum - -class Strandedness(Enum): - reverse = "reverse" - forward = "forward" - -params = { - "_name": "wf.__init__.nf_rnaseq_wf", - "sample_ids": [ - "WT_REP1", - "RAP1_UNINDUCED_REP1", - "RAP1_IAA_30M_REP1", - ], - "samples": [ - [ - LatchFile("s3://latch-public/welcome/nf_rnaseq/SRR6357070_1.fastq.gz"), - LatchFile("s3://latch-public/welcome/nf_rnaseq/SRR6357070_2.fastq.gz"), - ], - [ - LatchFile("s3://latch-public/welcome/nf_rnaseq/SRR6357073_1.fastq.gz"), - ], - [ - LatchFile("s3://latch-public/welcome/nf_rnaseq/SRR6357076_1.fastq.gz"), - LatchFile("s3://latch-public/welcome/nf_rnaseq/SRR6357076_2.fastq.gz"), - ], - ], - "strandedness": [ - Strandedness.reverse, - Strandedness.reverse, - Strandedness.reverse, - ], - "fasta": LatchFile("s3://latch-public/welcome/nf_rnaseq/genome.fa.gz"), - "gtf": LatchFile("s3://latch-public/welcome/nf_rnaseq/genes.gtf.gz"), - "gene_bed": LatchFile("s3://latch-public/welcome/nf_rnaseq/genes.bed"), - "output_dir": LatchDir("latch://nf_rnaseq_results/"), -}""" - -# NOTE (kenny) ~ This is a poor test for the moment , but without mocking out -# the connection to Latch nucleus, we can rely on the boolean response as -# success. - - -# def test_execute_previous_versions(): -# with NamedTemporaryFile("w+") as tf: -# tf.write(simple_plan) -# tf.seek(0) - -# assert launch(tf.name) == "wf.__init__.assemble_and_sort" -# assert launch(tf.name, "barrackobama") == "wf.__init__.assemble_and_sort" - - -# def test_execute_rnaseq(): -# with NamedTemporaryFile("w+") as tf: -# tf.write(rnaseq_plan) -# tf.seek(0) - -# assert launch(tf.name) == "wf.__init__.nf_rnaseq_wf" - - -# TODO(ayush, kenny): fix this test - -# def test_execute_crispresso(): - -# with NamedTemporaryFile("w+") as tf: -# tf.write(crispresso_plan) -# tf.seek(0) - -# assert launch(tf.name) == "wf.__init__.crispresso2_wf"