Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 1 addition & 2 deletions distributed/cfexecutor.py
Original file line number Diff line number Diff line change
Expand Up @@ -125,8 +125,7 @@ def map(self, fn, *iterables, **kwargs):
if timeout is not None:
timeout = parse_timedelta(timeout)
end_time = timeout + time()
if "chunksize" in kwargs:
del kwargs["chunksize"]
kwargs.pop("chunksize", None)
if kwargs:
raise TypeError("unexpected arguments to map(): %s" % sorted(kwargs))

Expand Down
7 changes: 3 additions & 4 deletions distributed/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -2297,8 +2297,8 @@ def map(
keys = [list(element) for element in partition_all(batch_size, key)]
else:
keys = [key for _ in range(len(batches))]
return sum(
(
return list(
flatten(
self.map(
func,
*batch,
Expand All @@ -2315,8 +2315,7 @@ def map(
**kwargs,
)
for key, batch in zip(keys, batches)
),
[],
)
)

key = key or funcname(func)
Expand Down
2 changes: 1 addition & 1 deletion distributed/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -520,7 +520,7 @@ async def start(self):
timeout = getattr(self, "death_timeout", None)

async def _close_on_failure(exc: Exception) -> None:
await self.close(reason=f"failure-to-start-{str(type(exc))}")
await self.close(reason=f"failure-to-start-{type(exc)}")
self.status = Status.failed
self.__startup_exc = exc

Expand Down
2 changes: 1 addition & 1 deletion distributed/deploy/local.py
Original file line number Diff line number Diff line change
Expand Up @@ -211,7 +211,7 @@ def __init__(
n_workers = max(1, CPU_COUNT // threads_per_worker) if processes else 1
if n_workers and threads_per_worker is None:
# Overcommit threads per worker, rather than undercommit
threads_per_worker = max(1, int(math.ceil(CPU_COUNT / n_workers)))
threads_per_worker = max(1, math.ceil(CPU_COUNT / n_workers))
if n_workers and "memory_limit" not in worker_kwargs:
worker_kwargs["memory_limit"] = parse_memory_limit(
"auto", 1, n_workers, logger=logger
Expand Down
4 changes: 2 additions & 2 deletions distributed/deploy/spec.py
Original file line number Diff line number Diff line change
Expand Up @@ -514,10 +514,10 @@ def _memory_per_worker(self) -> int:

def scale(self, n=0, memory=None, cores=None):
if memory is not None:
n = max(n, int(math.ceil(parse_bytes(memory) / self._memory_per_worker())))
n = max(n, math.ceil(parse_bytes(memory) / self._memory_per_worker()))

if cores is not None:
n = max(n, int(math.ceil(cores / self._threads_per_worker())))
n = max(n, math.ceil(cores / self._threads_per_worker()))

if len(self.worker_spec) > n:
not_yet_launched = set(self.worker_spec) - {
Expand Down
2 changes: 1 addition & 1 deletion distributed/deploy/subprocess.py
Original file line number Diff line number Diff line change
Expand Up @@ -241,7 +241,7 @@ def SubprocessCluster(
n_workers = max(1, CPU_COUNT // threads_per_worker)
if n_workers and threads_per_worker is None:
# Overcommit threads per worker, rather than undercommit
threads_per_worker = max(1, int(math.ceil(CPU_COUNT / n_workers)))
threads_per_worker = max(1, math.ceil(CPU_COUNT / n_workers))
if n_workers and "memory_limit" not in worker_kwargs:
worker_kwargs["memory_limit"] = parse_memory_limit(
"auto", 1, n_workers, logger=logger
Expand Down
2 changes: 1 addition & 1 deletion distributed/protocol/serialize.py
Original file line number Diff line number Diff line change
Expand Up @@ -126,7 +126,7 @@ def import_allowed_module(name):
return _cached_allowed_modules[name]
else:
raise RuntimeError(
f"Importing {repr(name)} is not allowed, please add it to the list of "
f"Importing {name!r} is not allowed, please add it to the list of "
"allowed modules the scheduler can import via the "
"distributed.scheduler.allowed-imports configuration setting."
)
Expand Down
4 changes: 2 additions & 2 deletions distributed/protocol/tests/test_serialize.py
Original file line number Diff line number Diff line change
Expand Up @@ -284,8 +284,8 @@ def test_serialize_bytes(kwargs):
1,
"abc",
b"ab" * int(40e6),
int(2**26) * b"ab",
(int(2**25) * b"ab", int(2**25) * b"ab"),
(2**26) * b"ab",
((2**25) * b"ab", (2**25) * b"ab"),
]:
b = serialize_bytes(x, **kwargs)
assert isinstance(b, bytes)
Expand Down
2 changes: 1 addition & 1 deletion distributed/shuffle/tests/test_buffer.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@


def gen_bytes(percentage: float, limit: int) -> bytes:
num_bytes = int(math.floor(percentage * limit))
num_bytes = math.floor(percentage * limit)
return b"0" * num_bytes


Expand Down
2 changes: 1 addition & 1 deletion distributed/shuffle/tests/test_comm_buffer.py
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ async def send(address, shards):


def gen_bytes(percentage: float, memory_limit: int) -> bytes:
num_bytes = int(math.floor(percentage * memory_limit))
num_bytes = math.floor(percentage * memory_limit)
return b"0" * num_bytes


Expand Down
2 changes: 1 addition & 1 deletion distributed/stealing.py
Original file line number Diff line number Diff line change
Expand Up @@ -293,7 +293,7 @@ def steal_time_ratio(self, ts: TaskState) -> tuple[float, int] | tuple[None, Non
transfer_time = nbytes / self.scheduler.bandwidth + LATENCY
cost_multiplier = transfer_time / compute_time

level = int(round(log2(cost_multiplier) + 6))
level = round(log2(cost_multiplier) + 6)

if level < 1:
level = 1
Expand Down
3 changes: 2 additions & 1 deletion distributed/tests/test_metrics.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
from __future__ import annotations

import itertools
import math
import pickle
import threading
Expand All @@ -19,7 +20,7 @@ def test_wall_clock(name):
t = getattr(time, name)()
samples = [getattr(metrics, name)() for _ in range(100)]
# Resolution
deltas = [sj - si for si, sj in zip(samples[:-1], samples[1:])]
deltas = [sj - si for si, sj in itertools.pairwise(samples)]
assert min(deltas) >= 0.0, deltas
assert max(deltas) <= 0.005, deltas
assert any(0.0 < d < 0.0001 for d in deltas), deltas
Expand Down
4 changes: 2 additions & 2 deletions distributed/tests/test_scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -5063,7 +5063,7 @@ async def test_fan_out_pattern_deadlock(c, s, a):
await block_hb.set()
await block_f.clear()

# Remove the new instance of the 'b' worker while it processes 'f'
# Remove the new instance of the 'b' worker while it processes 'f'
# to trigger an transition for 'f' to 'erred'
async with Worker(s.address, nthreads=1, resources={"b": 1}) as b:
await in_f.wait()
Expand Down Expand Up @@ -5118,7 +5118,7 @@ async def test_stimulus_from_erred_task(c, s, a):
await s.remove_worker(b1.address, stimulus_id="remove_b1")
await block_f.clear()

# Remove the new instance of the 'b' worker while it processes 'f'
# Remove the new instance of the 'b' worker while it processes 'f'
# to trigger a transition for 'f' to 'erred'
async with Worker(s.address, nthreads=1, resources={"b": 1}) as b2:
await in_f.wait()
Expand Down
3 changes: 2 additions & 1 deletion distributed/tests/test_worker_memory.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

import asyncio
import glob
import itertools
import logging
import os
import signal
Expand Down Expand Up @@ -1033,7 +1034,7 @@ def __reduce__(self):
# (this is because everything is pickled twice:
# https://github.com/dask/distributed/issues/1371).
# We should regain control of the event loop every 0.5s.
c = Counter(round(t1 - t0, 1) for t0, t1 in zip(ts, ts[1:]))
c = Counter(round(t1 - t0, 1) for t0, t1 in itertools.pairwise(ts))
# Depending on the implementation of WorkerMemoryMonitor._maybe_spill:
# if it calls sleep(0) every 0.5s:
# {0.0: 315, 0.5: 4}
Expand Down
5 changes: 3 additions & 2 deletions distributed/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@
from tornado.ioloop import IOLoop

import dask
from dask.core import flatten
from dask.utils import _deprecated, key_split
from dask.utils import ensure_bytes as _ensure_bytes
from dask.utils import parse_timedelta as _parse_timedelta
Expand Down Expand Up @@ -1473,8 +1474,8 @@ def convert_value(v):
out = '"' + out + '"'
return out

return sum(
(["--" + k.replace("_", "-"), convert_value(v)] for k, v in d.items()), []
return list(
flatten(["--" + k.replace("_", "-"), convert_value(v)] for k, v in d.items())
)


Expand Down
13 changes: 13 additions & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,7 @@ extend-select = [
"TID",
"I",
"UP",
"RUF",
]
ignore = [
"B011", # Do not `assert False`, raise `AssertionError()`
Expand All @@ -95,6 +96,18 @@ ignore = [
"E721", # Use `is` and `is not` for type comparisons, or `isinstance()` for isinstance checks
"E741", # Ambiguous variable name: `l`
"UP031", # TODO: apply this rule
"RUF001", # String contains ambiguous letter
"RUF002", # Docstring contains ambiguous letter
"RUF005", # Consider unpacking operator instead of concatenation
"RUF006", # Store a reference to the return value of `asyncio.ensure_future`
"RUF012", # Mutable class attributes should be annotated with `typing.ClassVar`
"RUF015", # Prefer `next(iter(...))` over single element slice
"RUF018", # Avoid assignment expressions in `assert` statements
"RUF021", # Parenthesize `a and b` expressions when chaining `and` and `or` together, to make the precedence clear
"RUF022", # `__all__` is not sorted
"RUF023", # `__slots__` is not sorted
"RUF043", # Pattern passed to `match=` contains metacharacters but is neither escaped nor raw
"RUF059", # Unpacked variable is never used
]

[tool.ruff.lint.extend-per-file-ignores]
Expand Down
Loading