Skip to content
Open
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 4 additions & 4 deletions dask_ctl/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
from distributed.core import Status

from . import __version__
from .utils import loop
from .utils import run_sync
from .discovery import (
discover_clusters,
discover_cluster_names,
Expand All @@ -38,7 +38,7 @@ async def _autocomplete_cluster_names():
if incomplete in cluster
]

return loop.run_sync(_autocomplete_cluster_names)
return run_sync(_autocomplete_cluster_names)


@click.group()
Expand Down Expand Up @@ -144,7 +144,7 @@ async def _list():

console.print(table)

loop.run_sync(_list)
run_sync(_list)


@cluster.command()
Expand Down Expand Up @@ -292,7 +292,7 @@ async def _list_discovery():
)
console.print(table)

loop.run_sync(_list_discovery)
run_sync(_list_discovery)


@discovery.command(name="enable")
Expand Down
14 changes: 7 additions & 7 deletions dask_ctl/lifecycle.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
from distributed.deploy.cluster import Cluster
from .discovery import discover_cluster_names, discover_clusters
from .spec import load_spec
from .utils import loop
from .utils import run_sync


def create_cluster(spec_path: str) -> Cluster:
Expand Down Expand Up @@ -38,7 +38,7 @@ def create_cluster(spec_path: str) -> Cluster:

"""

return loop.run_sync(_create_cluster, spec_path)
return run_sync(_create_cluster, spec_path)


async def _create_cluster(spec_path: str) -> Cluster:
Expand Down Expand Up @@ -75,7 +75,7 @@ def list_clusters() -> List[Cluster]:

"""

return loop.run_sync(_list_clusters)
return run_sync(_list_clusters)


async def _list_clusters() -> List[Cluster]:
Expand Down Expand Up @@ -110,7 +110,7 @@ def get_cluster(name: str) -> Cluster:

"""

return loop.run_sync(_get_cluster, name)
return run_sync(_get_cluster, name)


async def _get_cluster(name: str) -> Cluster:
Expand Down Expand Up @@ -148,7 +148,7 @@ def get_snippet(name: str) -> str:
client = Client(cluster)

"""
return loop.run_sync(_get_snippet, name)
return run_sync(_get_snippet, name)


async def _get_snippet(name: str) -> str:
Expand Down Expand Up @@ -184,7 +184,7 @@ def scale_cluster(name: str, n_workers: int) -> None:
>>> scale_cluster("mycluster", 10) # doctest: +SKIP

"""
return loop.run_sync(_scale_cluster, name, n_workers)
return run_sync(_scale_cluster, name, n_workers)


async def _scale_cluster(name: str, n_workers: int) -> None:
Expand All @@ -211,7 +211,7 @@ def delete_cluster(name: str) -> None:
>>> delete_cluster("mycluster") # doctest: +SKIP

"""
return loop.run_sync(_delete_cluster, name)
return run_sync(_delete_cluster, name)


async def _delete_cluster(name: str) -> None:
Expand Down
12 changes: 7 additions & 5 deletions dask_ctl/utils.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,13 @@
import asyncio

from tornado.ioloop import IOLoop
from distributed.cli.utils import install_signal_handlers


loop = IOLoop.current()
install_signal_handlers(loop)
def run_sync(f, *args, **kwargs):
loop = asyncio.get_event_loop()
try:
return loop.run_until_complete(f(*args, **kwargs))
except RuntimeError:
f = asyncio.run_coroutine_threadsafe(f(*args, **kwargs), loop)
return f.result()


class _AsyncTimedIterator:
Expand Down