diff --git a/src/xpk/commands/workload.py b/src/xpk/commands/workload.py index 81648d1eb..bc4f7e879 100644 --- a/src/xpk/commands/workload.py +++ b/src/xpk/commands/workload.py @@ -240,6 +240,21 @@ containers: - args: {pathways_worker_args} + env: + - name: PROJECT_ID + value: {args.project} + - name: LOCATION + value: {args.zone} + - name: CLUSTER_NAME + value: {args.cluster} + - name: POD_NAME + valueFrom: + fieldRef: + fieldPath: metadata.name + - name: CONTAINER_NAME + value: "pathways-worker" + - name: NAMESPACE + value: "cloud_prod" image: {args.server_image} imagePullPolicy: Always name: pathways-worker @@ -295,6 +310,20 @@ value: $(JOBSET_NAME)-$(REPLICATED_JOB_NAME)-0-0.$(JOBSET_NAME) - name: TPU_SKIP_MDS_QUERY value: "true" + - name: PROJECT_ID + value: {args.project} + - name: LOCATION + value: {args.zone} + - name: CLUSTER_NAME + value: {args.cluster} + - name: POD_NAME + valueFrom: + fieldRef: + fieldPath: metadata.name + - name: CONTAINER_NAME + value: "pathways-rm" + - name: NAMESPACE + value: "cloud_prod" image: {args.server_image} imagePullPolicy: Always name: pathways-rm @@ -329,6 +358,23 @@ containers: - args: {pathways_proxy_args} + env: + - name: PROJECT_ID + value: {args.project} + - name: LOCATION + value: {args.zone} + - name: CLUSTER_NAME + value: {args.cluster} + - name: POD_NAME + valueFrom: + fieldRef: + fieldPath: metadata.name + - name: CONTAINER_NAME + value: "pathways-proxy" + - name: NAMESPACE + valueFrom: + fieldRef: + fieldPath: metadata.namespace image: {args.proxy_server_image} imagePullPolicy: Always name: pathways-proxy @@ -352,6 +398,13 @@ def workload_create_pathways(args) -> None: 0 if successful and 1 otherwise. """ args.use_pathways = True + if args.headless: + xpk_print( + 'Please use kubectl port forwarding to connect to the Pathways proxy.' + ' kubectl get pods kubectl port-forward 29000:29000' + ' JAX_PLATFORMS=proxy JAX_BACKEND_TARGET=grpc://127.0.0.1:29000 python' + " -c 'import pathwaysutils; import jax; print(jax.devices())'" + ) workload_create(args) @@ -366,14 +419,6 @@ def workload_create(args) -> None: """ add_zone_and_project(args) - if args.headless: - xpk_print( - 'Please use kubectl port forwarding to connect to the Pathways proxy.' - ' kubectl get pods kubectl port-forward 29000:29000' - ' JAX_PLATFORMS=proxy JAX_BACKEND_TARGET=grpc://127.0.0.1:29000 python' - " -c 'import pathwaysutils; import jax; print(jax.devices())'" - ) - set_cluster_command_code = set_cluster_command(args) if set_cluster_command_code != 0: xpk_exit(set_cluster_command_code) diff --git a/src/xpk/core/pathways.py b/src/xpk/core/pathways.py index 71ed27ea1..f3b1976bc 100644 --- a/src/xpk/core/pathways.py +++ b/src/xpk/core/pathways.py @@ -44,6 +44,8 @@ def get_pathways_worker_args(args) -> str: - --resource_manager_address={rm_address} - --gcs_scratch_location={args.pathways_gcs_location}""" if args.use_pathways: + if args.custom_pathways_worker_args: + yaml = append_custom_pathways_args(yaml, args.custom_pathways_worker_args) return yaml.format(args=args, rm_address=get_rm_address(args)) else: return '' @@ -62,6 +64,10 @@ def get_pathways_proxy_args(args) -> str: - --gcs_scratch_location={args.pathways_gcs_location}""" if args.use_pathways: + if args.custom_pathways_proxy_server_args: + yaml = append_custom_pathways_args( + yaml, args.custom_pathways_proxy_server_args + ) return yaml.format(args=args, rm_address=get_rm_address(args)) else: return '' @@ -174,15 +180,12 @@ def ensure_pathways_workload_prerequisites(args, system) -> bool: def get_pathways_unified_query_link(args) -> str: """Get the unified query link for the pathways workload.""" - pw_suffixes = ['main', 'rm', 'proxy'] - pw_pod_names = [f'"{args.workload}-{suffix}-0"' for suffix in pw_suffixes] - pw_pod_names_query = '%20OR%20'.join(pw_pod_names + ['worker-0-0']) query_params = ( 'resource.type%3D"k8s_container"%0A' f'resource.labels.project_id%3D"{args.project}"%0A' f'resource.labels.location%3D"{zone_to_region(args.zone)}"%0A' f'resource.labels.cluster_name%3D"{args.cluster}"%0A' - f'resource.labels.pod_name:{pw_pod_names_query}%0A' + f'resource.labels.pod_name:"{args.workload}-"%0A' 'severity>%3DDEFAULT' ) @@ -203,6 +206,8 @@ def get_pathways_rm_args(args, system: SystemCharacteristics) -> str: - --instance_count={instance_count} - --instance_type={instance_type}""" if args.use_pathways: + if args.custom_pathways_server_args: + yaml = append_custom_pathways_args(yaml, args.custom_pathways_server_args) return yaml.format( args=args, instance_count=args.num_slices, @@ -212,6 +217,28 @@ def get_pathways_rm_args(args, system: SystemCharacteristics) -> str: return '' +def append_custom_pathways_args(yaml, custom_args) -> str: + """Append custom Pathways args to the YAML with proper indentation. + + Args: + yaml (string): existing yaml containing args + + Returns: + yaml (string): yaml with additional args appended. + """ + second_line = yaml.split('\n')[1] + if ( + not second_line + ): # to cover edge case if only one arg remains, we would have to look at the entire YAML in this case. + return yaml + # Calculate the indentation based on the second line of existing YAML. + indentation = ' ' * (len(second_line) - len(second_line.lstrip())) + custom_args = custom_args.split(' ') + for arg in custom_args: + yaml += '\n' + indentation + '- ' + arg + return yaml + + def get_user_workload_for_pathways(args, system: SystemCharacteristics) -> str: """ Create a user workload container for Pathways. diff --git a/src/xpk/parser/cluster.py b/src/xpk/parser/cluster.py index bc2a944d2..b4ce61c36 100644 --- a/src/xpk/parser/cluster.py +++ b/src/xpk/parser/cluster.py @@ -93,11 +93,10 @@ def set_cluster_parser(cluster_parser): '--enable-pathways', action='store_true', help=( - 'DEPRECATING SOON!!! Please use `xpk cluster create-pathways`.' - ' Enable cluster to accept Pathways workloads.' + 'Please use `xpk cluster create-pathways` instead to' + ' enable cluster to accept Pathways workloads.' ), ) - ### Autoprovisioning arguments specific to "cluster create" cluster_create_autoprovisioning_arguments = ( cluster_create_parser.add_argument_group( diff --git a/src/xpk/parser/workload.py b/src/xpk/parser/workload.py index 023c2edf4..638af6d36 100644 --- a/src/xpk/parser/workload.py +++ b/src/xpk/parser/workload.py @@ -67,11 +67,7 @@ def set_workload_parsers(workload_parser): 'Arguments for configuring autoprovisioning.', ) ) - workload_pathways_workload_arguments = workload_create_parser.add_argument_group( - 'Pathways Image Arguments', - 'If --use-pathways is provided, user wants to set up a' - 'Pathways workload on xpk.', - ) + workload_vertex_tensorboard_arguments = ( workload_create_parser.add_argument_group( 'Vertex Tensorboard Arguments', @@ -151,6 +147,15 @@ def set_workload_parsers(workload_parser): ), ) + workload_create_parser_optional_arguments.add_argument( + '--use-pathways', + action='store_true', + help=( + 'Please use `xpk workload create-pathways` instead to' + ' create Pathways workloads.' + ), + ) + # Autoprovisioning workload arguments workload_create_autoprovisioning_arguments.add_argument( '--on-demand', @@ -178,16 +183,6 @@ def set_workload_parsers(workload_parser): ), ) - # Pathways workload arguments - workload_pathways_workload_arguments.add_argument( - '--use-pathways', - action='store_true', - help=( - 'DECRATING SOON!!! Please use `xpk workload create-pathways` instead.' - ' Provide this argument to create Pathways workloads.' - ), - ) - # "workload create-pathways" command parser. workload_create_pathways_parser = workload_subcommands.add_parser( 'create-pathways', help='Create a new job.' @@ -230,6 +225,45 @@ def set_workload_parsers(workload_parser): help='The tpu type to use, v5litepod-16, etc.', ) + ### "workload create-pathways" Optional arguments, specific to Pathways + workload_create_pathways_parser_optional_arguments.add_argument( + '--headless', + action='store_true', + help=( + 'Please provide this argument to create Pathways workloads in' + ' headless mode. This arg can only be used in `xpk workload' + ' create-pathways`.' + ), + ) + workload_create_pathways_parser_optional_arguments.add_argument( + '--proxy-server-image', + type=str, + default=( + 'us-docker.pkg.dev/cloud-tpu-v2-images/pathways/proxy_server:latest' + ), + help=( + 'Please provide the proxy server image for Pathways. This arg can' + ' only be used in `xpk workload create-pathways`.' + ), + ) + workload_create_pathways_parser_optional_arguments.add_argument( + '--server-image', + type=str, + default='us-docker.pkg.dev/cloud-tpu-v2-images/pathways/server:latest', + help=( + 'Please provide the server image for Pathways. This arg can only be' + ' used in `xpk workload create-pathways`.' + ), + ) + workload_create_pathways_parser_optional_arguments.add_argument( + '--pathways-gcs-location', + type=str, + default='gs://cloud-pathways-staging/tmp', + help=( + 'Please provide the GCS location to store Pathways artifacts. This' + ' arg can only be used in `xpk workload create-pathways`.' + ), + ) workload_create_pathways_parser_optional_arguments.add_argument( '--command', type=str, @@ -244,6 +278,39 @@ def set_workload_parsers(workload_parser): required=False, ) + workload_create_pathways_parser_optional_arguments.add_argument( + '--custom-pathways-server-args', + type=str, + default=None, + help=( + 'Provide custom Pathways server args as follows -' + " --custom-pathways-server-args='--arg_1=xxx --arg2=yyy'" + ), + required=False, + ) + + workload_create_pathways_parser_optional_arguments.add_argument( + '--custom-pathways-proxy-server-args', + type=str, + default=None, + help=( + 'Provide custom Pathways proxy server args as follows -' + " --custom-pathways-proxy-server-args='--arg_1=xxx --arg2=yyy'" + ), + required=False, + ) + + workload_create_pathways_parser_optional_arguments.add_argument( + '--custom-pathways-worker-args', + type=str, + default=None, + help=( + 'Provide custom Pathways worker args as follows -' + " --custom-pathways-worker-args='--arg_1=xxx --arg2=yyy'" + ), + required=False, + ) + add_shared_workload_create_required_arguments([ workload_create_parser_required_arguments, workload_create_pathways_parser_required_arguments, @@ -522,51 +589,6 @@ def add_shared_workload_create_optional_arguments(args_parsers): ' default on Pathways workloads.' ), ) - custom_parser.add_argument( - '--headless', - action='store_true', - help=( - 'Please provide this argument to create Pathways workloads in' - ' headless mode. This arg can only be used in `xpk workload' - ' create-pathways`(preferred) or `xpk workload create' - ' --use-pathways.` (--use-pathways will be deprecated soon).' - ), - ) - custom_parser.add_argument( - '--proxy-server-image', - type=str, - default=( - 'us-docker.pkg.dev/cloud-tpu-v2-images/pathways/proxy_server:latest' - ), - help=( - 'Please provide the proxy server image for Pathways. This arg can' - ' only be used in `xpk workload create-pathways`(preferred) or `xpk' - ' workload create --use-pathways.` (--use-pathways will be' - ' deprecated soon).' - ), - ) - custom_parser.add_argument( - '--server-image', - type=str, - default='us-docker.pkg.dev/cloud-tpu-v2-images/pathways/server:latest', - help=( - 'Please provide the server image for Pathways. This arg can only be' - ' used in `xpk workload create-pathways`(preferred) or `xpk' - ' workload create --use-pathways.` (--use-pathways will be' - ' deprecated soon).' - ), - ) - custom_parser.add_argument( - '--pathways-gcs-location', - type=str, - default='gs://cloud-pathways-staging/tmp', - help=( - 'Please provide the GCS location to store Pathways artifacts. This' - ' arg can only be used in `xpk workload create-pathways`(preferred)' - ' or `xpk workload create --use-pathways.` (--use-pathways will be' - ' deprecated soon).' - ), - ) custom_parser.add_argument( '--ramdisk-directory', type=str,