Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -36,9 +36,9 @@ jobs:
name: "pre-commit hooks"
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v2
- uses: actions/setup-python@v2
- uses: pre-commit/action@v2.0.0
- uses: actions/checkout@v3
- uses: actions/setup-python@v3
- uses: pre-commit/action@v3.0.1

imports:
runs-on: ubuntu-latest
Expand Down
4 changes: 2 additions & 2 deletions .pre-commit-config.yaml
Original file line number Diff line number Diff line change
@@ -1,12 +1,12 @@
repos:
- repo: https://github.com/psf/black
rev: 23.10.1
rev: 25.1.0
hooks:
- id: black
language_version: python3
exclude: versioneer.py
- repo: https://github.com/pycqa/flake8
rev: 6.1.0
rev: 7.2.0
hooks:
- id: flake8
language_version: python3
6 changes: 3 additions & 3 deletions dask_cloudprovider/aws/ec2.py
Original file line number Diff line number Diff line change
Expand Up @@ -128,9 +128,9 @@ async def create_vm(self):
"InstanceInitiatedShutdownBehavior": "terminate",
"NetworkInterfaces": [
{
"AssociatePublicIpAddress": False
if self.use_private_ip
else True,
"AssociatePublicIpAddress": (
False if self.use_private_ip else True
),
"DeleteOnTermination": True,
"Description": "private" if self.use_private_ip else "public",
"DeviceIndex": 0,
Expand Down
22 changes: 13 additions & 9 deletions dask_cloudprovider/aws/ecs.py
Original file line number Diff line number Diff line change
Expand Up @@ -224,9 +224,9 @@ async def start(self):
"awsvpcConfiguration": {
"subnets": self._vpc_subnets,
"securityGroups": self._security_groups,
"assignPublicIp": "ENABLED"
if self._use_public_ip
else "DISABLED",
"assignPublicIp": (
"ENABLED" if self._use_public_ip else "DISABLED"
),
}
},
}
Expand Down Expand Up @@ -1223,14 +1223,18 @@ async def _create_scheduler_task_definition_arn(self):
"awslogs-create-group": "true",
},
},
"mountPoints": self._mount_points
if self._mount_points and self._mount_volumes_on_scheduler
else [],
"mountPoints": (
self._mount_points
if self._mount_points and self._mount_volumes_on_scheduler
else []
),
}
],
volumes=self._volumes
if self._volumes and self._mount_volumes_on_scheduler
else [],
volumes=(
self._volumes
if self._volumes and self._mount_volumes_on_scheduler
else []
),
requiresCompatibilities=["FARGATE"] if self._fargate_scheduler else [],
runtimePlatform={"cpuArchitecture": self._cpu_architecture},
cpu=str(self._scheduler_cpu),
Expand Down
1 change: 1 addition & 0 deletions dask_cloudprovider/aws/helper.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
"""Helper functions for working with AWS services."""

from datetime import datetime

DEFAULT_SECURITY_GROUP_NAME = "dask-default"
Expand Down
18 changes: 9 additions & 9 deletions dask_cloudprovider/azure/azurevm.py
Original file line number Diff line number Diff line change
Expand Up @@ -174,15 +174,15 @@ async def create_vm(self):
# Ref: https://docs.microsoft.com/en-us/rest/api/compute/virtual-machines/create-or-update#create-a-vm-with-a-marketplace-image-plan. # noqa
# Creating a marketplace VM with a plan will override default vm_image values.
vm_parameters["plan"] = self.marketplace_plan
vm_parameters["storage_profile"]["image_reference"][
"sku"
] = self.marketplace_plan["name"]
vm_parameters["storage_profile"]["image_reference"][
"publisher"
] = self.marketplace_plan["publisher"]
vm_parameters["storage_profile"]["image_reference"][
"offer"
] = self.marketplace_plan["product"]
vm_parameters["storage_profile"]["image_reference"]["sku"] = (
self.marketplace_plan["name"]
)
vm_parameters["storage_profile"]["image_reference"]["publisher"] = (
self.marketplace_plan["publisher"]
)
vm_parameters["storage_profile"]["image_reference"]["offer"] = (
self.marketplace_plan["product"]
)
vm_parameters["storage_profile"]["image_reference"]["version"] = "latest"
self.cluster._log("Using Marketplace VM image with a Plan")

Expand Down
44 changes: 34 additions & 10 deletions dask_cloudprovider/gcp/instances.py
Original file line number Diff line number Diff line change
Expand Up @@ -635,39 +635,63 @@ def __init__(
)
self.machine_type = machine_type or self.config.get("machine_type")
if machine_type is None:
self.scheduler_machine_type = scheduler_machine_type or self.config.get("scheduler_machine_type")
self.worker_machine_type = worker_machine_type or self.config.get("worker_machine_type")
self.scheduler_machine_type = scheduler_machine_type or self.config.get(
"scheduler_machine_type"
)
self.worker_machine_type = worker_machine_type or self.config.get(
"worker_machine_type"
)
if self.scheduler_machine_type is None or self.worker_machine_type is None:
raise ValueError("machine_type and scheduler_machine_type must be set")
else:
if scheduler_machine_type is not None or worker_machine_type is not None:
raise ValueError("If you specify machine_type, you may not specify scheduler_machine_type or worker_machine_type")
raise ValueError(
"If you specify machine_type, you may not specify scheduler_machine_type or worker_machine_type"
)
self.scheduler_machine_type = machine_type
self.worker_machine_type = machine_type

self.ngpus = ngpus or self.config.get("ngpus")
if not self.ngpus:
self.scheduler_ngpus = scheduler_ngpus if scheduler_ngpus is not None else self.config.get("scheduler_ngpus", 0)
self.worker_ngpus = worker_ngpus if worker_ngpus is not None else self.config.get("worker_ngpus", 0)
self.scheduler_ngpus = (
scheduler_ngpus
if scheduler_ngpus is not None
else self.config.get("scheduler_ngpus", 0)
)
self.worker_ngpus = (
worker_ngpus
if worker_ngpus is not None
else self.config.get("worker_ngpus", 0)
)
if self.scheduler_ngpus == 0 and self.worker_ngpus == 0:
self._log("No GPU instances configured")
else:
if scheduler_ngpus is not None or worker_ngpus is not None:
raise ValueError("If you specify ngpus, you may not specify scheduler_ngpus or worker_ngpus")
raise ValueError(
"If you specify ngpus, you may not specify scheduler_ngpus or worker_ngpus"
)
self.scheduler_ngpus = self.ngpus
self.worker_ngpus = self.ngpus

self.gpu_type = gpu_type or self.config.get("gpu_type")
if not self.gpu_type:
self.scheduler_gpu_type = scheduler_gpu_type or self.config.get("scheduler_gpu_type")
self.scheduler_gpu_type = scheduler_gpu_type or self.config.get(
"scheduler_gpu_type"
)
self.worker_gpu_type = worker_gpu_type or self.config.get("worker_gpu_type")
if self.scheduler_ngpus > 0 and self.scheduler_gpu_type is None:
raise ValueError("scheduler_gpu_type must be specified when scheduler_ngpus > 0")
raise ValueError(
"scheduler_gpu_type must be specified when scheduler_ngpus > 0"
)
if self.worker_ngpus > 0 and self.worker_gpu_type is None:
raise ValueError("worker_gpu_type must be specified when worker_ngpus > 0")
raise ValueError(
"worker_gpu_type must be specified when worker_ngpus > 0"
)
else:
if scheduler_gpu_type is not None or worker_gpu_type is not None:
raise ValueError("If you specify gpu_type, you may not specify scheduler_gpu_type or worker_gpu_type")
raise ValueError(
"If you specify gpu_type, you may not specify scheduler_gpu_type or worker_gpu_type"
)
self.scheduler_gpu_type = self.gpu_type
self.worker_gpu_type = self.gpu_type

Expand Down
107 changes: 75 additions & 32 deletions dask_cloudprovider/ibm/code_engine.py
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,9 @@ def __init__(
self.docker_password = docker_password
self.docker_registry_name = docker_registry_name

self.authenticator = IAMAuthenticator(self.api_key, url="https://iam.cloud.ibm.com")
self.authenticator = IAMAuthenticator(
self.api_key, url="https://iam.cloud.ibm.com"
)
self.authenticator.set_disable_ssl_verification(
True
) # Disable SSL verification for the authenticator
Expand All @@ -97,9 +99,11 @@ def __init__(

def _extract_k8s_config_details(self, project_id):
delegated_refresh_token_payload = {
'grant_type': 'urn:ibm:params:oauth:grant-type:apikey', 'apikey': self.api_key,
'response_type': 'delegated_refresh_token', 'receiver_client_ids': 'ce',
'delegated_refresh_token_expiry': '3600'
"grant_type": "urn:ibm:params:oauth:grant-type:apikey",
"apikey": self.api_key,
"response_type": "delegated_refresh_token",
"receiver_client_ids": "ce",
"delegated_refresh_token_expiry": "3600",
}
token_manager = self.code_engine_service.authenticator.token_manager
original_request_payload = token_manager.request_payload
Expand All @@ -109,24 +113,40 @@ def _extract_k8s_config_details(self, project_id):
finally:
token_manager.request_payload = original_request_payload

kc_resp = self.code_engine_service_v1.get_kubeconfig(iam_response['delegated_refresh_token'], project_id)
kc_resp = self.code_engine_service_v1.get_kubeconfig(
iam_response["delegated_refresh_token"], project_id
)
kubeconfig_data = kc_resp.get_result()

current_context_name = kubeconfig_data['current-context']
context_details = next(c['context'] for c in kubeconfig_data['contexts'] if c['name'] == current_context_name)
current_context_name = kubeconfig_data["current-context"]
context_details = next(
c["context"]
for c in kubeconfig_data["contexts"]
if c["name"] == current_context_name
)

namespace = context_details.get('namespace', 'default')
server_url = next(c['cluster'] for c in kubeconfig_data['clusters'] if c['name'] == context_details['cluster'])['server']
namespace = context_details.get("namespace", "default")
server_url = next(
c["cluster"]
for c in kubeconfig_data["clusters"]
if c["name"] == context_details["cluster"]
)["server"]
return namespace, server_url

def create_registry_secret(self):
# Set up the authenticator and service instance
self.code_engine_service_v1 = IbmCloudCodeEngineV1(authenticator=self.authenticator)
self.code_engine_service_v1.set_service_url("https://api." + self.region + ".codeengine.cloud.ibm.com/api/v1")
self.code_engine_service_v1 = IbmCloudCodeEngineV1(
authenticator=self.authenticator
)
self.code_engine_service_v1.set_service_url(
"https://api." + self.region + ".codeengine.cloud.ibm.com/api/v1"
)
token = self.authenticator.token_manager.get_token()

# Fetch K8s config details
namespace, k8s_api_server_url = self._extract_k8s_config_details(self.project_id)
namespace, k8s_api_server_url = self._extract_k8s_config_details(
self.project_id
)

# Create a new configuration instance
configuration = client.Configuration()
Expand All @@ -136,35 +156,51 @@ def create_registry_secret(self):
core_api = client.CoreV1Api(api_client_instance)

secret = client.V1Secret(
metadata=client.V1ObjectMeta(name=self.docker_registry_name, namespace=namespace),
metadata=client.V1ObjectMeta(
name=self.docker_registry_name, namespace=namespace
),
type="kubernetes.io/dockerconfigjson",
string_data={".dockerconfigjson": json.dumps({
"auths": {
self.docker_server: {
"username": self.docker_username,
"password": self.docker_password,
string_data={
".dockerconfigjson": json.dumps(
{
"auths": {
self.docker_server: {
"username": self.docker_username,
"password": self.docker_password,
}
}
}
}
})}
)
},
)

try:
core_api.delete_namespaced_secret(self.docker_registry_name, namespace=namespace)
core_api.delete_namespaced_secret(
self.docker_registry_name, namespace=namespace
)
except ApiException as e:
if e.status == 404: # Not Found, which is fine
if e.status == 404: # Not Found, which is fine
pass
else:
self.cluster._log(f"Error deleting existing registry secret {self.docker_registry_name} in {namespace}: {e}")
self.cluster._log(
f"Error deleting existing registry secret {self.docker_registry_name} in {namespace}: {e}"
)
pass

try:
core_api.create_namespaced_secret(namespace, secret)
self.cluster._log(f"Successfully created registry secret '{self.docker_registry_name}'.")
self.cluster._log(
f"Successfully created registry secret '{self.docker_registry_name}'."
)
except ApiException as e:
if e.status == 409: # Conflict, secret already exists
self.cluster._log(f"Registry secret '{self.docker_registry_name}' already exists.")
if e.status == 409: # Conflict, secret already exists
self.cluster._log(
f"Registry secret '{self.docker_registry_name}' already exists."
)
else:
self.cluster._log(f"Error creating registry secret '{self.docker_registry_name}': {e}")
self.cluster._log(
f"Error creating registry secret '{self.docker_registry_name}': {e}"
)
raise e

async def create_vm(self):
Expand Down Expand Up @@ -307,7 +343,9 @@ def __init__(self, *args, **kwargs):
async def start(self):
if self.docker_server and self.docker_username and self.docker_password:
self.docker_registry_name = "dask-" + self.docker_server.split(".")[0]
self.cluster._log(f"Creating registry secret for {self.docker_registry_name}")
self.cluster._log(
f"Creating registry secret for {self.docker_registry_name}"
)
self.create_registry_secret()

self.cluster._log(
Expand Down Expand Up @@ -375,7 +413,7 @@ def __init__(
),
]

# To work with Code Engine, we need to use the extra arguments
# To work with Code Engine, we need to use the extra arguments
if self.worker_command:
custom_command_prefix = self.worker_command.split()
original_command_suffix = self.command[3:]
Expand Down Expand Up @@ -445,7 +483,8 @@ class IBMCodeEngineCluster(VMCluster):
docker_username: str
The username for authenticating with the Docker registry. Required if using private Docker images.
docker_password: str
The password or access token for authenticating with the Docker registry. Required if using private Docker images.
The password or access token for authenticating with the Docker registry.
Required if using private Docker images.
debug: bool, optional
More information will be printed when constructing clusters to enable debugging.

Expand Down Expand Up @@ -572,8 +611,12 @@ def __init__(
self.scheduler_cpu = scheduler_cpu or self.config.get("scheduler_cpu")
self.scheduler_mem = scheduler_mem or self.config.get("scheduler_mem")
self.scheduler_disk = scheduler_disk or self.config.get("scheduler_disk")
self.scheduler_timeout = scheduler_timeout or self.config.get("scheduler_timeout")
self.scheduler_command = scheduler_command or self.config.get("scheduler_command")
self.scheduler_timeout = scheduler_timeout or self.config.get(
"scheduler_timeout"
)
self.scheduler_command = scheduler_command or self.config.get(
"scheduler_command"
)
self.worker_cpu = worker_cpu or self.config.get("worker_cpu")
self.worker_mem = worker_mem or self.config.get("worker_mem")
self.worker_disk = worker_disk or self.config.get("worker_disk")
Expand Down
11 changes: 5 additions & 6 deletions dask_cloudprovider/openstack/instances.py
Original file line number Diff line number Diff line change
Expand Up @@ -118,18 +118,17 @@ async def create_and_assign_floating_ip(self, conn):

# Find the first port of the instance
ports = await self.call_async(
conn.network.ports,
device_id=self.instance.id
conn.network.ports, device_id=self.instance.id
)
ports = list(ports)
if not ports:
raise RuntimeError(f"No network ports found for instance {self.instance.id}")
raise RuntimeError(
f"No network ports found for instance {self.instance.id}"
)

# Assign the floating IP to the instance's port
await self.call_async(
conn.network.update_ip,
floating_ip,
port_id=ports[0].id
conn.network.update_ip, floating_ip, port_id=ports[0].id
)

return floating_ip.floating_ip_address
Expand Down
Loading