Skip to content

Commit 19196c4

Browse files
Orinksclaude
andcommitted
feat(transfer): wire concurrent transfers setting into worker pool
Replace the single daemon worker thread in TransferService with a configurable pool sized to the concurrent_transfers setting. Each worker pulls from the same queue, naturally distributing jobs. The pool can be resized at runtime via set_max_workers() which is called when settings are saved. Closes #107 Co-Authored-By: Claude Opus 4.6 <[email protected]>
1 parent cfa97dc commit 19196c4

4 files changed

Lines changed: 116 additions & 11 deletions

File tree

src/portkeydrop/app.py

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -118,7 +118,10 @@ def __init__(self) -> None:
118118
self.build_tag = os.environ.get("PORTKEYDROP_BUILD_TAG")
119119
self._auto_update_check_timer: wx.Timer | None = None
120120
self._site_manager = SiteManager()
121-
self._transfer_service = TransferService(notify_window=self)
121+
self._transfer_service = TransferService(
122+
notify_window=self,
123+
max_workers=self._settings.transfer.concurrent_transfers,
124+
)
122125
self._transfer_state_by_id: dict[str, str] = {}
123126
self._last_failed_transfer: str | None = None
124127
self._announcer = ScreenReaderAnnouncer()
@@ -1574,6 +1577,9 @@ def _on_settings(self, event: wx.CommandEvent) -> None:
15741577
self._settings = dlg.get_settings()
15751578
update_last_local_folder(self._settings, self._local_cwd)
15761579
save_settings(self._settings)
1580+
self._transfer_service.set_max_workers(
1581+
self._settings.transfer.concurrent_transfers,
1582+
)
15771583
self.update_check_updates_menu_label()
15781584
self._start_auto_update_checks()
15791585
self._populate_file_list(

src/portkeydrop/services/transfer_service.py

Lines changed: 32 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -88,15 +88,19 @@ def from_dict(cls, data: dict) -> TransferJob:
8888

8989

9090
class TransferService:
91-
"""Owns the transfer queue and a single daemon worker thread."""
91+
"""Owns the transfer queue and a pool of daemon worker threads."""
9292

93-
def __init__(self, notify_window: Any | None = None) -> None:
93+
def __init__(self, notify_window: Any | None = None, max_workers: int = 1) -> None:
9494
self._notify_window = notify_window
95-
self._queue: queue.Queue[TransferJob] = queue.Queue()
95+
self._queue: queue.Queue[TransferJob | None] = queue.Queue()
9696
self._jobs: list[TransferJob] = []
9797
self._lock = threading.Lock()
98-
self._worker = threading.Thread(target=self._worker_loop, daemon=True)
99-
self._worker.start()
98+
self._max_workers = max(1, max_workers)
99+
self._workers: list[threading.Thread] = []
100+
for _ in range(self._max_workers):
101+
t = threading.Thread(target=self._worker_loop, daemon=True)
102+
t.start()
103+
self._workers.append(t)
100104

101105
# ------------------------------------------------------------------
102106
# Public API
@@ -199,6 +203,27 @@ def cancel(self, job_id: str) -> None:
199203
break
200204
self._post_event()
201205

206+
def set_max_workers(self, n: int) -> None:
207+
"""Resize the worker pool to *n* threads.
208+
209+
Extra workers are drained via a ``None`` sentinel on the queue;
210+
missing workers are spawned immediately.
211+
"""
212+
n = max(1, n)
213+
with self._lock:
214+
# Prune threads that have already exited
215+
self._workers = [t for t in self._workers if t.is_alive()]
216+
current = len(self._workers)
217+
if n > current:
218+
for _ in range(n - current):
219+
t = threading.Thread(target=self._worker_loop, daemon=True)
220+
t.start()
221+
self._workers.append(t)
222+
elif n < current:
223+
for _ in range(current - n):
224+
self._queue.put(None) # sentinel to stop one worker
225+
self._max_workers = n
226+
202227
# ------------------------------------------------------------------
203228
# Internal
204229
# ------------------------------------------------------------------
@@ -212,6 +237,8 @@ def _enqueue(self, job: TransferJob) -> None:
212237
def _worker_loop(self) -> None:
213238
while True:
214239
job = self._queue.get()
240+
if job is None: # shutdown sentinel
241+
break
215242
if job.cancel_event.is_set():
216243
job.status = TransferStatus.CANCELLED
217244
self._post_event()

tests/test_app.py

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,8 @@ def _build_frame(module, tmp_path):
3535
sort_by="name",
3636
sort_ascending=True,
3737
)
38-
settings = SimpleNamespace(display=display)
38+
transfer = SimpleNamespace(concurrent_transfers=2)
39+
settings = SimpleNamespace(display=display, transfer=transfer)
3940
fake_manager = MagicMock(jobs=[])
4041
fake_site_manager = MagicMock()
4142

@@ -85,7 +86,7 @@ def _hydrate_frame(module):
8586
def test_main_frame_init_sets_transfer_state(tmp_path, app_module):
8687
frame, _, transfer_service_cls = _build_frame(app_module, tmp_path)
8788
assert frame._transfer_state_by_id == {}
88-
transfer_service_cls.assert_called_once_with(notify_window=frame)
89+
transfer_service_cls.assert_called_once_with(notify_window=frame, max_workers=2)
8990

9091

9192
def test_bind_events_hooks_transfer_update(app_module):
@@ -1177,6 +1178,7 @@ def test_on_settings_reconfigures_update_menu_and_timer(app_module):
11771178
frame._settings = SimpleNamespace(
11781179
app=SimpleNamespace(update_channel="stable"),
11791180
display=SimpleNamespace(show_hidden_files=True),
1181+
transfer=SimpleNamespace(concurrent_transfers=2),
11801182
)
11811183
frame._local_cwd = "/tmp"
11821184
frame.remote_file_list = MagicMock()
@@ -1192,6 +1194,7 @@ def test_on_settings_reconfigures_update_menu_and_timer(app_module):
11921194
updated_settings = SimpleNamespace(
11931195
app=SimpleNamespace(update_channel="nightly"),
11941196
display=SimpleNamespace(show_hidden_files=True),
1197+
transfer=SimpleNamespace(concurrent_transfers=4),
11951198
)
11961199
dialog = MagicMock(
11971200
ShowModal=MagicMock(return_value=fake_wx.ID_OK),
@@ -1215,6 +1218,7 @@ def test_on_settings_passes_check_updates_callback(app_module):
12151218
frame._settings = SimpleNamespace(
12161219
app=SimpleNamespace(update_channel="stable"),
12171220
display=SimpleNamespace(show_hidden_files=True),
1221+
transfer=SimpleNamespace(concurrent_transfers=2),
12181222
)
12191223
frame._local_cwd = "/tmp"
12201224
frame.remote_file_list = MagicMock()

tests/test_transfer_service.py

Lines changed: 71 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -69,10 +69,20 @@ def test_cancel_event_independent(self):
6969

7070

7171
class TestTransferServiceInit:
72-
def test_starts_daemon_worker_thread(self):
72+
def test_starts_daemon_worker_threads(self):
7373
svc = TransferService(notify_window=None)
74-
assert svc._worker.is_alive()
75-
assert svc._worker.daemon is True
74+
assert len(svc._workers) == 1
75+
assert all(t.is_alive() for t in svc._workers)
76+
assert all(t.daemon for t in svc._workers)
77+
78+
def test_starts_multiple_workers(self):
79+
svc = TransferService(notify_window=None, max_workers=3)
80+
assert len(svc._workers) == 3
81+
assert all(t.is_alive() for t in svc._workers)
82+
83+
def test_max_workers_clamped_to_one(self):
84+
svc = TransferService(notify_window=None, max_workers=0)
85+
assert len(svc._workers) == 1
7686

7787
def test_jobs_returns_snapshot(self):
7888
svc = TransferService(notify_window=None)
@@ -444,3 +454,61 @@ def test_status_values(self):
444454
assert TransferStatus.COMPLETE.value == "complete"
445455
assert TransferStatus.FAILED.value == "failed"
446456
assert TransferStatus.CANCELLED.value == "cancelled"
457+
458+
459+
# ---------------------------------------------------------------------------
460+
# Concurrent worker pool
461+
# ---------------------------------------------------------------------------
462+
463+
464+
class TestConcurrentWorkers:
465+
def test_jobs_run_concurrently_with_multiple_workers(self):
466+
"""Two slow jobs should overlap when max_workers >= 2."""
467+
barrier = threading.Barrier(2, timeout=5)
468+
completed_order: list[str] = []
469+
lock = threading.Lock()
470+
471+
mock_client = MagicMock()
472+
473+
def slow_download(src, fh, callback=None):
474+
name = PurePosixPath(src).name
475+
barrier.wait() # both workers must reach here before either proceeds
476+
with lock:
477+
completed_order.append(name)
478+
479+
mock_client.download.side_effect = slow_download
480+
481+
svc = TransferService(notify_window=None, max_workers=2)
482+
with patch("builtins.open", return_value=MagicMock(spec=io.BufferedWriter)):
483+
j1 = svc.submit_download(mock_client, "/r/a.txt", "/tmp/a.txt")
484+
j2 = svc.submit_download(mock_client, "/r/b.txt", "/tmp/b.txt")
485+
_wait_for_terminal(j1)
486+
_wait_for_terminal(j2)
487+
488+
assert j1.status == TransferStatus.COMPLETE
489+
assert j2.status == TransferStatus.COMPLETE
490+
assert len(completed_order) == 2
491+
492+
def test_set_max_workers_increases_pool(self):
493+
svc = TransferService(notify_window=None, max_workers=1)
494+
assert len([t for t in svc._workers if t.is_alive()]) == 1
495+
496+
svc.set_max_workers(3)
497+
time.sleep(0.1)
498+
alive = [t for t in svc._workers if t.is_alive()]
499+
assert len(alive) == 3
500+
501+
def test_set_max_workers_decreases_pool(self):
502+
svc = TransferService(notify_window=None, max_workers=3)
503+
assert len(svc._workers) == 3
504+
505+
svc.set_max_workers(1)
506+
# Give sentinels time to be consumed
507+
time.sleep(0.5)
508+
alive = [t for t in svc._workers if t.is_alive()]
509+
assert len(alive) == 1
510+
511+
def test_set_max_workers_clamps_to_one(self):
512+
svc = TransferService(notify_window=None, max_workers=2)
513+
svc.set_max_workers(0)
514+
assert svc._max_workers == 1

0 commit comments

Comments
 (0)