diff --git a/optimus/engines/dask/engine.py b/optimus/engines/dask/engine.py index 4ca3065d9..ca5d4aeac 100644 --- a/optimus/engines/dask/engine.py +++ b/optimus/engines/dask/engine.py @@ -16,18 +16,15 @@ class DaskEngine(BaseEngine): # Using procces or threads https://stackoverflow.com/questions/51099685/best-practices-in-setting-number-of-dask-workers def __init__(self, session=None, address=None, n_workers=None, threads_per_worker=None, processes=False, - memory_limit='4GB', verbose=False, coiled_token=None, *args, **kwargs): + memory_limit=None, verbose=False, coiled_token=None, *args, **kwargs): - if n_workers is None: + if n_workers is None and not coiled_token: import psutil threads_per_worker = psutil.cpu_count() * 4 self.verbose(verbose) - use_remote = kwargs.get("use_remote", coiled_token is not None) - - if kwargs.get("use_remote", None): - del kwargs["use_remote"] + use_remote = kwargs.pop("use_remote", False) if coiled_token: import coiled @@ -39,14 +36,15 @@ def __init__(self, session=None, address=None, n_workers=None, threads_per_worke idle_timeout = kwargs.get("idle_timeout", None) + memory_limit = memory_limit or '16 GiB' + cluster = coiled.Cluster( name=kwargs.get("name"), worker_options={ - **({"nthreads": threads_per_worker} if threads_per_worker else {}), - **({"memory_limit": memory_limit} if memory_limit else {}) + **({"nthreads": threads_per_worker} if threads_per_worker else {}) }, - n_workers=n_workers, - worker_memory='15GiB', + n_workers=n_workers if n_workers else 4, + worker_memory=memory_limit, scheduler_options={ **({"idle_timeout": idle_timeout} if idle_timeout else {}) }, @@ -67,6 +65,7 @@ def __init__(self, session=None, address=None, n_workers=None, threads_per_worke try: self.client = get_client() except ValueError: + memory_limit = memory_limit or '4GB' self.client = Client(address=address, n_workers=n_workers, threads_per_worker=threads_per_worker, processes=processes, memory_limit=memory_limit, *args, **kwargs) diff --git a/optimus/engines/dask_cudf/engine.py b/optimus/engines/dask_cudf/engine.py index b9697ed38..2bb7b7c38 100644 --- a/optimus/engines/dask_cudf/engine.py +++ b/optimus/engines/dask_cudf/engine.py @@ -12,7 +12,7 @@ class DaskCUDFEngine(BaseEngine): def __init__(self, session=None, address=None, n_workers=1, threads_per_worker=8, processes=False, - memory_limit='4GB', verbose=False, coiled_token=None, *args, **kwargs): + memory_limit=None, verbose=False, coiled_token=None, *args, **kwargs): """ @@ -42,16 +42,17 @@ def __init__(self, session=None, address=None, n_workers=1, threads_per_worker=8 idle_timeout = kwargs.get("idle_timeout", None) + memory_limit = memory_limit or '16 GiB' + cluster = coiled.Cluster( name=kwargs.get("name"), worker_options={ - **({"nthreads": threads_per_worker} if threads_per_worker else {}), - **({"memory_limit": memory_limit} if memory_limit else {}) + **({"nthreads": threads_per_worker} if threads_per_worker else {}) }, worker_gpu=1, worker_class='dask_cuda.CUDAWorker', - n_workers=n_workers, - worker_memory='15GiB', + n_workers=n_workers if n_workers else 4, + worker_memory=memory_limit, backend_options={ "region": kwargs.get("backend_region", "us-east-1") }, @@ -91,7 +92,8 @@ def __init__(self, session=None, address=None, n_workers=1, threads_per_worker=8 device_memory_limit=memoryTotal * 0.8 # Spill to RAM when 80% memory is full ) - self.client = Client(cluster, *args, **kwargs) + memory_limit = memory_limit or '4GB' + self.client = Client(cluster, memory_limit=memory_limit, *args, **kwargs) if use_remote: self.remote = RemoteOptimusInterface(self.client, Engine.DASK_CUDF.value) diff --git a/requirements/dask-requirements.txt b/requirements/dask-requirements.txt index 475a684ac..d6997200b 100644 --- a/requirements/dask-requirements.txt +++ b/requirements/dask-requirements.txt @@ -2,4 +2,4 @@ dask[complete]==2021.9.0 distributed==2021.9.0 dask-ml>=1.9.0 pyarrow==1.0.1 -coiled>=0.0.30 \ No newline at end of file +coiled>=0.0.52 \ No newline at end of file