diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 5256d9b8..e79370c6 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -36,9 +36,9 @@ jobs: name: "pre-commit hooks" runs-on: ubuntu-latest steps: - - uses: actions/checkout@v2 - - uses: actions/setup-python@v2 - - uses: pre-commit/action@v2.0.0 + - uses: actions/checkout@v3 + - uses: actions/setup-python@v3 + - uses: pre-commit/action@v3.0.1 imports: runs-on: ubuntu-latest diff --git a/.pre-commit-config.yaml b/.pre-commit-config.yaml index a2c64c16..6a69e429 100644 --- a/.pre-commit-config.yaml +++ b/.pre-commit-config.yaml @@ -1,12 +1,12 @@ repos: - repo: https://github.com/psf/black - rev: 23.10.1 + rev: 25.1.0 hooks: - id: black language_version: python3 exclude: versioneer.py - repo: https://github.com/pycqa/flake8 - rev: 6.1.0 + rev: 7.2.0 hooks: - id: flake8 language_version: python3 diff --git a/dask_cloudprovider/aws/ec2.py b/dask_cloudprovider/aws/ec2.py index 4e5c7adf..a843a746 100644 --- a/dask_cloudprovider/aws/ec2.py +++ b/dask_cloudprovider/aws/ec2.py @@ -128,9 +128,9 @@ async def create_vm(self): "InstanceInitiatedShutdownBehavior": "terminate", "NetworkInterfaces": [ { - "AssociatePublicIpAddress": False - if self.use_private_ip - else True, + "AssociatePublicIpAddress": ( + False if self.use_private_ip else True + ), "DeleteOnTermination": True, "Description": "private" if self.use_private_ip else "public", "DeviceIndex": 0, diff --git a/dask_cloudprovider/aws/ecs.py b/dask_cloudprovider/aws/ecs.py index a0a5a9b0..d747aa1d 100644 --- a/dask_cloudprovider/aws/ecs.py +++ b/dask_cloudprovider/aws/ecs.py @@ -224,9 +224,9 @@ async def start(self): "awsvpcConfiguration": { "subnets": self._vpc_subnets, "securityGroups": self._security_groups, - "assignPublicIp": "ENABLED" - if self._use_public_ip - else "DISABLED", + "assignPublicIp": ( + "ENABLED" if self._use_public_ip else "DISABLED" + ), } }, } @@ -1223,14 +1223,18 @@ async def _create_scheduler_task_definition_arn(self): "awslogs-create-group": "true", }, }, - "mountPoints": self._mount_points - if self._mount_points and self._mount_volumes_on_scheduler - else [], + "mountPoints": ( + self._mount_points + if self._mount_points and self._mount_volumes_on_scheduler + else [] + ), } ], - volumes=self._volumes - if self._volumes and self._mount_volumes_on_scheduler - else [], + volumes=( + self._volumes + if self._volumes and self._mount_volumes_on_scheduler + else [] + ), requiresCompatibilities=["FARGATE"] if self._fargate_scheduler else [], runtimePlatform={"cpuArchitecture": self._cpu_architecture}, cpu=str(self._scheduler_cpu), diff --git a/dask_cloudprovider/aws/helper.py b/dask_cloudprovider/aws/helper.py index 58990955..13af51cd 100644 --- a/dask_cloudprovider/aws/helper.py +++ b/dask_cloudprovider/aws/helper.py @@ -1,4 +1,5 @@ """Helper functions for working with AWS services.""" + from datetime import datetime DEFAULT_SECURITY_GROUP_NAME = "dask-default" diff --git a/dask_cloudprovider/azure/azurevm.py b/dask_cloudprovider/azure/azurevm.py index 0000b9f7..39f19573 100644 --- a/dask_cloudprovider/azure/azurevm.py +++ b/dask_cloudprovider/azure/azurevm.py @@ -174,15 +174,15 @@ async def create_vm(self): # Ref: https://docs.microsoft.com/en-us/rest/api/compute/virtual-machines/create-or-update#create-a-vm-with-a-marketplace-image-plan. # noqa # Creating a marketplace VM with a plan will override default vm_image values. vm_parameters["plan"] = self.marketplace_plan - vm_parameters["storage_profile"]["image_reference"][ - "sku" - ] = self.marketplace_plan["name"] - vm_parameters["storage_profile"]["image_reference"][ - "publisher" - ] = self.marketplace_plan["publisher"] - vm_parameters["storage_profile"]["image_reference"][ - "offer" - ] = self.marketplace_plan["product"] + vm_parameters["storage_profile"]["image_reference"]["sku"] = ( + self.marketplace_plan["name"] + ) + vm_parameters["storage_profile"]["image_reference"]["publisher"] = ( + self.marketplace_plan["publisher"] + ) + vm_parameters["storage_profile"]["image_reference"]["offer"] = ( + self.marketplace_plan["product"] + ) vm_parameters["storage_profile"]["image_reference"]["version"] = "latest" self.cluster._log("Using Marketplace VM image with a Plan") diff --git a/dask_cloudprovider/gcp/instances.py b/dask_cloudprovider/gcp/instances.py index acdb6778..e5ac3959 100644 --- a/dask_cloudprovider/gcp/instances.py +++ b/dask_cloudprovider/gcp/instances.py @@ -635,39 +635,63 @@ def __init__( ) self.machine_type = machine_type or self.config.get("machine_type") if machine_type is None: - self.scheduler_machine_type = scheduler_machine_type or self.config.get("scheduler_machine_type") - self.worker_machine_type = worker_machine_type or self.config.get("worker_machine_type") + self.scheduler_machine_type = scheduler_machine_type or self.config.get( + "scheduler_machine_type" + ) + self.worker_machine_type = worker_machine_type or self.config.get( + "worker_machine_type" + ) if self.scheduler_machine_type is None or self.worker_machine_type is None: raise ValueError("machine_type and scheduler_machine_type must be set") else: if scheduler_machine_type is not None or worker_machine_type is not None: - raise ValueError("If you specify machine_type, you may not specify scheduler_machine_type or worker_machine_type") + raise ValueError( + "If you specify machine_type, you may not specify scheduler_machine_type or worker_machine_type" + ) self.scheduler_machine_type = machine_type self.worker_machine_type = machine_type self.ngpus = ngpus or self.config.get("ngpus") if not self.ngpus: - self.scheduler_ngpus = scheduler_ngpus if scheduler_ngpus is not None else self.config.get("scheduler_ngpus", 0) - self.worker_ngpus = worker_ngpus if worker_ngpus is not None else self.config.get("worker_ngpus", 0) + self.scheduler_ngpus = ( + scheduler_ngpus + if scheduler_ngpus is not None + else self.config.get("scheduler_ngpus", 0) + ) + self.worker_ngpus = ( + worker_ngpus + if worker_ngpus is not None + else self.config.get("worker_ngpus", 0) + ) if self.scheduler_ngpus == 0 and self.worker_ngpus == 0: self._log("No GPU instances configured") else: if scheduler_ngpus is not None or worker_ngpus is not None: - raise ValueError("If you specify ngpus, you may not specify scheduler_ngpus or worker_ngpus") + raise ValueError( + "If you specify ngpus, you may not specify scheduler_ngpus or worker_ngpus" + ) self.scheduler_ngpus = self.ngpus self.worker_ngpus = self.ngpus self.gpu_type = gpu_type or self.config.get("gpu_type") if not self.gpu_type: - self.scheduler_gpu_type = scheduler_gpu_type or self.config.get("scheduler_gpu_type") + self.scheduler_gpu_type = scheduler_gpu_type or self.config.get( + "scheduler_gpu_type" + ) self.worker_gpu_type = worker_gpu_type or self.config.get("worker_gpu_type") if self.scheduler_ngpus > 0 and self.scheduler_gpu_type is None: - raise ValueError("scheduler_gpu_type must be specified when scheduler_ngpus > 0") + raise ValueError( + "scheduler_gpu_type must be specified when scheduler_ngpus > 0" + ) if self.worker_ngpus > 0 and self.worker_gpu_type is None: - raise ValueError("worker_gpu_type must be specified when worker_ngpus > 0") + raise ValueError( + "worker_gpu_type must be specified when worker_ngpus > 0" + ) else: if scheduler_gpu_type is not None or worker_gpu_type is not None: - raise ValueError("If you specify gpu_type, you may not specify scheduler_gpu_type or worker_gpu_type") + raise ValueError( + "If you specify gpu_type, you may not specify scheduler_gpu_type or worker_gpu_type" + ) self.scheduler_gpu_type = self.gpu_type self.worker_gpu_type = self.gpu_type diff --git a/dask_cloudprovider/ibm/code_engine.py b/dask_cloudprovider/ibm/code_engine.py index 1c56ffb9..cf4ab340 100644 --- a/dask_cloudprovider/ibm/code_engine.py +++ b/dask_cloudprovider/ibm/code_engine.py @@ -82,7 +82,9 @@ def __init__( self.docker_password = docker_password self.docker_registry_name = docker_registry_name - self.authenticator = IAMAuthenticator(self.api_key, url="https://iam.cloud.ibm.com") + self.authenticator = IAMAuthenticator( + self.api_key, url="https://iam.cloud.ibm.com" + ) self.authenticator.set_disable_ssl_verification( True ) # Disable SSL verification for the authenticator @@ -97,9 +99,11 @@ def __init__( def _extract_k8s_config_details(self, project_id): delegated_refresh_token_payload = { - 'grant_type': 'urn:ibm:params:oauth:grant-type:apikey', 'apikey': self.api_key, - 'response_type': 'delegated_refresh_token', 'receiver_client_ids': 'ce', - 'delegated_refresh_token_expiry': '3600' + "grant_type": "urn:ibm:params:oauth:grant-type:apikey", + "apikey": self.api_key, + "response_type": "delegated_refresh_token", + "receiver_client_ids": "ce", + "delegated_refresh_token_expiry": "3600", } token_manager = self.code_engine_service.authenticator.token_manager original_request_payload = token_manager.request_payload @@ -109,24 +113,40 @@ def _extract_k8s_config_details(self, project_id): finally: token_manager.request_payload = original_request_payload - kc_resp = self.code_engine_service_v1.get_kubeconfig(iam_response['delegated_refresh_token'], project_id) + kc_resp = self.code_engine_service_v1.get_kubeconfig( + iam_response["delegated_refresh_token"], project_id + ) kubeconfig_data = kc_resp.get_result() - current_context_name = kubeconfig_data['current-context'] - context_details = next(c['context'] for c in kubeconfig_data['contexts'] if c['name'] == current_context_name) + current_context_name = kubeconfig_data["current-context"] + context_details = next( + c["context"] + for c in kubeconfig_data["contexts"] + if c["name"] == current_context_name + ) - namespace = context_details.get('namespace', 'default') - server_url = next(c['cluster'] for c in kubeconfig_data['clusters'] if c['name'] == context_details['cluster'])['server'] + namespace = context_details.get("namespace", "default") + server_url = next( + c["cluster"] + for c in kubeconfig_data["clusters"] + if c["name"] == context_details["cluster"] + )["server"] return namespace, server_url def create_registry_secret(self): # Set up the authenticator and service instance - self.code_engine_service_v1 = IbmCloudCodeEngineV1(authenticator=self.authenticator) - self.code_engine_service_v1.set_service_url("https://api." + self.region + ".codeengine.cloud.ibm.com/api/v1") + self.code_engine_service_v1 = IbmCloudCodeEngineV1( + authenticator=self.authenticator + ) + self.code_engine_service_v1.set_service_url( + "https://api." + self.region + ".codeengine.cloud.ibm.com/api/v1" + ) token = self.authenticator.token_manager.get_token() # Fetch K8s config details - namespace, k8s_api_server_url = self._extract_k8s_config_details(self.project_id) + namespace, k8s_api_server_url = self._extract_k8s_config_details( + self.project_id + ) # Create a new configuration instance configuration = client.Configuration() @@ -136,35 +156,51 @@ def create_registry_secret(self): core_api = client.CoreV1Api(api_client_instance) secret = client.V1Secret( - metadata=client.V1ObjectMeta(name=self.docker_registry_name, namespace=namespace), + metadata=client.V1ObjectMeta( + name=self.docker_registry_name, namespace=namespace + ), type="kubernetes.io/dockerconfigjson", - string_data={".dockerconfigjson": json.dumps({ - "auths": { - self.docker_server: { - "username": self.docker_username, - "password": self.docker_password, + string_data={ + ".dockerconfigjson": json.dumps( + { + "auths": { + self.docker_server: { + "username": self.docker_username, + "password": self.docker_password, + } + } } - } - })} + ) + }, ) try: - core_api.delete_namespaced_secret(self.docker_registry_name, namespace=namespace) + core_api.delete_namespaced_secret( + self.docker_registry_name, namespace=namespace + ) except ApiException as e: - if e.status == 404: # Not Found, which is fine + if e.status == 404: # Not Found, which is fine pass else: - self.cluster._log(f"Error deleting existing registry secret {self.docker_registry_name} in {namespace}: {e}") + self.cluster._log( + f"Error deleting existing registry secret {self.docker_registry_name} in {namespace}: {e}" + ) pass try: core_api.create_namespaced_secret(namespace, secret) - self.cluster._log(f"Successfully created registry secret '{self.docker_registry_name}'.") + self.cluster._log( + f"Successfully created registry secret '{self.docker_registry_name}'." + ) except ApiException as e: - if e.status == 409: # Conflict, secret already exists - self.cluster._log(f"Registry secret '{self.docker_registry_name}' already exists.") + if e.status == 409: # Conflict, secret already exists + self.cluster._log( + f"Registry secret '{self.docker_registry_name}' already exists." + ) else: - self.cluster._log(f"Error creating registry secret '{self.docker_registry_name}': {e}") + self.cluster._log( + f"Error creating registry secret '{self.docker_registry_name}': {e}" + ) raise e async def create_vm(self): @@ -307,7 +343,9 @@ def __init__(self, *args, **kwargs): async def start(self): if self.docker_server and self.docker_username and self.docker_password: self.docker_registry_name = "dask-" + self.docker_server.split(".")[0] - self.cluster._log(f"Creating registry secret for {self.docker_registry_name}") + self.cluster._log( + f"Creating registry secret for {self.docker_registry_name}" + ) self.create_registry_secret() self.cluster._log( @@ -375,7 +413,7 @@ def __init__( ), ] - # To work with Code Engine, we need to use the extra arguments + # To work with Code Engine, we need to use the extra arguments if self.worker_command: custom_command_prefix = self.worker_command.split() original_command_suffix = self.command[3:] @@ -445,7 +483,8 @@ class IBMCodeEngineCluster(VMCluster): docker_username: str The username for authenticating with the Docker registry. Required if using private Docker images. docker_password: str - The password or access token for authenticating with the Docker registry. Required if using private Docker images. + The password or access token for authenticating with the Docker registry. + Required if using private Docker images. debug: bool, optional More information will be printed when constructing clusters to enable debugging. @@ -572,8 +611,12 @@ def __init__( self.scheduler_cpu = scheduler_cpu or self.config.get("scheduler_cpu") self.scheduler_mem = scheduler_mem or self.config.get("scheduler_mem") self.scheduler_disk = scheduler_disk or self.config.get("scheduler_disk") - self.scheduler_timeout = scheduler_timeout or self.config.get("scheduler_timeout") - self.scheduler_command = scheduler_command or self.config.get("scheduler_command") + self.scheduler_timeout = scheduler_timeout or self.config.get( + "scheduler_timeout" + ) + self.scheduler_command = scheduler_command or self.config.get( + "scheduler_command" + ) self.worker_cpu = worker_cpu or self.config.get("worker_cpu") self.worker_mem = worker_mem or self.config.get("worker_mem") self.worker_disk = worker_disk or self.config.get("worker_disk") diff --git a/dask_cloudprovider/openstack/instances.py b/dask_cloudprovider/openstack/instances.py index a12ba74c..30a2bd23 100644 --- a/dask_cloudprovider/openstack/instances.py +++ b/dask_cloudprovider/openstack/instances.py @@ -118,18 +118,17 @@ async def create_and_assign_floating_ip(self, conn): # Find the first port of the instance ports = await self.call_async( - conn.network.ports, - device_id=self.instance.id + conn.network.ports, device_id=self.instance.id ) ports = list(ports) if not ports: - raise RuntimeError(f"No network ports found for instance {self.instance.id}") + raise RuntimeError( + f"No network ports found for instance {self.instance.id}" + ) # Assign the floating IP to the instance's port await self.call_async( - conn.network.update_ip, - floating_ip, - port_id=ports[0].id + conn.network.update_ip, floating_ip, port_id=ports[0].id ) return floating_ip.floating_ip_address