diff --git a/CHANGELOG.md b/CHANGELOG.md index c37cbc5..01d179c 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -3,6 +3,30 @@ All notable changes to this project will be documented in this file. ## [Unreleased] +## [0.2.0] - 2026-03-10 + +### Added +- Activity log console panel with Prism screen reader announcements, F6 pane cycling, Ctrl+1/2/3 shortcuts, and Tab navigation (#94) +- Decoupled transfer logic from dialog — transfers now run in the background (#95) +- One-click retry for failed transfers (#101) +- Persist transfer queue across app sessions — restored jobs survive crashes and restarts (#100) +- Queue additional files during an active transfer (#103) +- Resume interrupted downloads from byte offset instead of restarting (#109) +- Concurrent transfers setting wired into worker pool — honors max parallel transfers from settings (#110) +- Dedicated Updates tab in settings (#90) + +### Fixed +- Reset progress display to 0% immediately on retry +- Announce transfer cancellation immediately with clear messaging (#86, #92) +- Add cancel/close button to Site Manager dialog (#80) +- Add colons to file list and toolbar field labels for screen reader clarity (#108) +- Associate StaticText labels with file lists via SetLabelFor +- Use SetLabel() for ListCtrl and name= for ListBox accessible names +- Resolve Tab focus trap in activity log panel +- Switch activity log to TextCtrl with HSCROLL for reliable NVDA reading (#104) +- Read version and build info from _build_meta when available (#105) + + --- diff --git a/pyproject.toml b/pyproject.toml index 1269e54..b62b1b1 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -1,6 +1,6 @@ [project] name = "portkeydrop" -version = "0.1.1" +version = "0.2.0" description = "A keyboard-driven file transfer client for FTP, SFTP, FTPS, SCP, and WebDAV" requires-python = ">=3.11,<3.13" dependencies = [ diff --git a/src/portkeydrop/__init__.py b/src/portkeydrop/__init__.py index cdf7499..d8a09e8 100644 --- a/src/portkeydrop/__init__.py +++ b/src/portkeydrop/__init__.py @@ -3,4 +3,4 @@ try: from portkeydrop._build_meta import __version__ # type: ignore[import] except ImportError: - __version__ = "0.1.1" + __version__ = "0.2.0" diff --git a/src/portkeydrop/services/transfer_service.py b/src/portkeydrop/services/transfer_service.py index 67a102d..b6d9654 100644 --- a/src/portkeydrop/services/transfer_service.py +++ b/src/portkeydrop/services/transfer_service.py @@ -91,20 +91,25 @@ def from_dict(cls, data: dict) -> TransferJob: ) +@dataclass +class _Worker: + """Tracks a worker thread and its stop signal for pool replacement.""" + + thread: threading.Thread + stop_event: threading.Event + + class TransferService: """Owns the transfer queue and a pool of daemon worker threads.""" def __init__(self, notify_window: Any | None = None, max_workers: int = 1) -> None: self._notify_window = notify_window - self._queue: queue.Queue[TransferJob | None] = queue.Queue() + self._queue: queue.Queue[TransferJob] = queue.Queue() self._jobs: list[TransferJob] = [] - self._lock = threading.Lock() + self._lock = threading.RLock() self._max_workers = max(1, max_workers) - self._workers: list[threading.Thread] = [] - for _ in range(self._max_workers): - t = threading.Thread(target=self._worker_loop, daemon=True) - t.start() - self._workers.append(t) + self._workers: list[_Worker] = [] + self._replace_worker_pool(self._max_workers) # ------------------------------------------------------------------ # Public API @@ -212,23 +217,13 @@ def cancel(self, job_id: str) -> None: def set_max_workers(self, n: int) -> None: """Resize the worker pool to *n* threads. - Extra workers are drained via a ``None`` sentinel on the queue; - missing workers are spawned immediately. + Existing workers finish at most their current job before exiting; + replacement workers start immediately against the same shared queue. """ n = max(1, n) with self._lock: - # Prune threads that have already exited - self._workers = [t for t in self._workers if t.is_alive()] - current = len(self._workers) - if n > current: - for _ in range(n - current): - t = threading.Thread(target=self._worker_loop, daemon=True) - t.start() - self._workers.append(t) - elif n < current: - for _ in range(current - n): - self._queue.put(None) # sentinel to stop one worker self._max_workers = n + self._replace_worker_pool(n) # ------------------------------------------------------------------ # Internal @@ -240,17 +235,49 @@ def _enqueue(self, job: TransferJob) -> None: self._queue.put(job) self._post_event() - def _worker_loop(self) -> None: + def _replace_worker_pool(self, size: int) -> None: + with self._lock: + old_workers = self._workers + self._workers = [] + for _worker in old_workers: + _worker.stop_event.set() + for _ in range(size): + stop_event = threading.Event() + thread = threading.Thread( + target=self._worker_loop, + args=(stop_event,), + daemon=True, + ) + thread.start() + self._workers.append(_Worker(thread=thread, stop_event=stop_event)) + self._workers = [worker for worker in self._workers if worker.thread.is_alive()] + + for worker in old_workers: + worker.thread.join(timeout=1) + + with self._lock: + self._workers = [worker for worker in self._workers if worker.thread.is_alive()] + + def _worker_loop(self, stop_event: threading.Event) -> None: while True: - job = self._queue.get() - if job is None: # shutdown sentinel - break + if stop_event.is_set(): + return + try: + job = self._queue.get(timeout=0.1) + except queue.Empty: + continue + if stop_event.is_set(): + self._queue.put(job) + return + if job.cancel_event.is_set(): - job.status = TransferStatus.CANCELLED + with self._lock: + job.status = TransferStatus.CANCELLED self._post_event() continue try: - job.status = TransferStatus.IN_PROGRESS + with self._lock: + job.status = TransferStatus.IN_PROGRESS self._post_event() if job.direction == TransferDirection.DOWNLOAD: if job._recursive: @@ -262,15 +289,19 @@ def _worker_loop(self) -> None: self._run_recursive_upload(job) else: self._run_upload(job) - if job.status == TransferStatus.IN_PROGRESS: - job.status = TransferStatus.COMPLETE + with self._lock: + if job.status == TransferStatus.IN_PROGRESS: + job.status = TransferStatus.COMPLETE except InterruptedError: - job.status = TransferStatus.CANCELLED + with self._lock: + job.status = TransferStatus.CANCELLED except Exception as exc: - job.status = TransferStatus.FAILED - job.error = str(exc) + with self._lock: + job.status = TransferStatus.FAILED + job.error = str(exc) logger.exception("Transfer failed: %s", job.id) - self._update_progress(job) + with self._lock: + self._update_progress(job) self._post_event() # --- single-file transfers --- @@ -290,10 +321,11 @@ def _run_download(self, job: TransferJob) -> None: def _cb(transferred: int, total: int) -> None: if job.cancel_event.is_set(): raise InterruptedError("Transfer cancelled") - job.transferred_bytes = offset + transferred - if total > 0: - job.total_bytes = total - self._update_progress(job) + with self._lock: + job.transferred_bytes = offset + transferred + if total > 0: + job.total_bytes = total + self._update_progress(job) self._post_event() client.download(job.source, f, callback=_cb, offset=offset) @@ -305,10 +337,11 @@ def _run_upload(self, job: TransferJob) -> None: def _cb(transferred: int, total: int) -> None: if job.cancel_event.is_set(): raise InterruptedError("Transfer cancelled") - job.transferred_bytes = transferred - if total > 0: - job.total_bytes = total - self._update_progress(job) + with self._lock: + job.transferred_bytes = transferred + if total > 0: + job.total_bytes = total + self._update_progress(job) self._post_event() job._client.upload(f, job.destination, callback=_cb) @@ -329,9 +362,10 @@ def _run_recursive_download(self, job: TransferJob) -> None: file_queue[i] = (remote_file, local_file, real_size) except Exception: pass - job.total_bytes = sum(s for _, _, s in file_queue) - job.transferred_bytes = 0 - self._update_progress(job) + with self._lock: + job.total_bytes = sum(s for _, _, s in file_queue) + job.transferred_bytes = 0 + self._update_progress(job) self._post_event() for remote_file, local_file, _size in file_queue: @@ -344,8 +378,9 @@ def _run_recursive_download(self, job: TransferJob) -> None: def _cb(transferred: int, total: int, _base=base) -> None: if job.cancel_event.is_set(): raise InterruptedError("Transfer cancelled") - job.transferred_bytes = _base + transferred - self._update_progress(job) + with self._lock: + job.transferred_bytes = _base + transferred + self._update_progress(job) self._post_event() client.download(remote_file, f, callback=_cb) @@ -355,9 +390,10 @@ def _run_recursive_upload(self, job: TransferJob) -> None: client = job._client file_queue: list[tuple[str, str, int]] = [] self._collect_local_files(job.source, job.destination, file_queue) - job.total_bytes = sum(s for _, _, s in file_queue) - job.transferred_bytes = 0 - self._update_progress(job) + with self._lock: + job.total_bytes = sum(s for _, _, s in file_queue) + job.transferred_bytes = 0 + self._update_progress(job) self._post_event() # Create directories @@ -382,8 +418,9 @@ def _run_recursive_upload(self, job: TransferJob) -> None: def _cb(transferred: int, total: int, _base=base) -> None: if job.cancel_event.is_set(): raise InterruptedError("Transfer cancelled") - job.transferred_bytes = _base + transferred - self._update_progress(job) + with self._lock: + job.transferred_bytes = _base + transferred + self._update_progress(job) self._post_event() client.upload(f, remote_file, callback=_cb) @@ -411,8 +448,9 @@ def _resolve_download_offset(self, job: TransferJob, client: TransferClient) -> local_size = os.path.getsize(job.destination) except OSError: logger.info("Resume: partial file missing, restarting %s", job.id) - job.transferred_bytes = 0 - job.progress = 0 + with self._lock: + job.transferred_bytes = 0 + job.progress = 0 self._snapshot_remote_metadata(job, client) return 0 @@ -423,8 +461,9 @@ def _resolve_download_offset(self, job: TransferJob, client: TransferClient) -> job.transferred_bytes, job.id, ) - job.transferred_bytes = 0 - job.progress = 0 + with self._lock: + job.transferred_bytes = 0 + job.progress = 0 self._snapshot_remote_metadata(job, client) return 0 @@ -433,8 +472,9 @@ def _resolve_download_offset(self, job: TransferJob, client: TransferClient) -> remote_info = client.stat(job.source) except Exception: logger.info("Resume: cannot stat remote file, restarting %s", job.id) - job.transferred_bytes = 0 - job.progress = 0 + with self._lock: + job.transferred_bytes = 0 + job.progress = 0 return 0 remote_size = remote_info.size @@ -447,9 +487,10 @@ def _resolve_download_offset(self, job: TransferJob, client: TransferClient) -> remote_size, job.id, ) - job.transferred_bytes = 0 - job.progress = 0 - job._remote_mtime = remote_mtime + with self._lock: + job.transferred_bytes = 0 + job.progress = 0 + job._remote_mtime = remote_mtime return 0 if ( @@ -458,24 +499,25 @@ def _resolve_download_offset(self, job: TransferJob, client: TransferClient) -> and remote_mtime != job._remote_mtime ): logger.info("Resume: remote mtime changed, restarting %s", job.id) - job.transferred_bytes = 0 - job.progress = 0 - job._remote_mtime = remote_mtime + with self._lock: + job.transferred_bytes = 0 + job.progress = 0 + job._remote_mtime = remote_mtime return 0 # All checks passed — resume from offset logger.info("Resume: resuming %s from byte %d", job.id, job.transferred_bytes) return job.transferred_bytes - @staticmethod - def _snapshot_remote_metadata(job: TransferJob, client: TransferClient) -> None: + def _snapshot_remote_metadata(self, job: TransferJob, client: TransferClient) -> None: """Record remote file size/mtime on the job for later resume validation.""" try: info = client.stat(job.source) - if info.modified: - job._remote_mtime = info.modified.timestamp() - if info.size > 0 and job.total_bytes == 0: - job.total_bytes = info.size + with self._lock: + if info.modified: + job._remote_mtime = info.modified.timestamp() + if info.size > 0 and job.total_bytes == 0: + job.total_bytes = info.size except Exception: pass diff --git a/tests/test_transfer_service.py b/tests/test_transfer_service.py index 25f4616..5b5cb48 100644 --- a/tests/test_transfer_service.py +++ b/tests/test_transfer_service.py @@ -72,13 +72,13 @@ class TestTransferServiceInit: def test_starts_daemon_worker_threads(self): svc = TransferService(notify_window=None) assert len(svc._workers) == 1 - assert all(t.is_alive() for t in svc._workers) - assert all(t.daemon for t in svc._workers) + assert all(worker.thread.is_alive() for worker in svc._workers) + assert all(worker.thread.daemon for worker in svc._workers) def test_starts_multiple_workers(self): svc = TransferService(notify_window=None, max_workers=3) assert len(svc._workers) == 3 - assert all(t.is_alive() for t in svc._workers) + assert all(worker.thread.is_alive() for worker in svc._workers) def test_max_workers_clamped_to_one(self): svc = TransferService(notify_window=None, max_workers=0) @@ -493,11 +493,11 @@ def slow_download(src, fh, callback=None, offset=0): def test_set_max_workers_increases_pool(self): svc = TransferService(notify_window=None, max_workers=1) - assert len([t for t in svc._workers if t.is_alive()]) == 1 + assert len([worker for worker in svc._workers if worker.thread.is_alive()]) == 1 svc.set_max_workers(3) time.sleep(0.1) - alive = [t for t in svc._workers if t.is_alive()] + alive = [worker for worker in svc._workers if worker.thread.is_alive()] assert len(alive) == 3 def test_set_max_workers_decreases_pool(self): @@ -505,12 +505,45 @@ def test_set_max_workers_decreases_pool(self): assert len(svc._workers) == 3 svc.set_max_workers(1) - # Give sentinels time to be consumed - time.sleep(0.5) - alive = [t for t in svc._workers if t.is_alive()] + time.sleep(0.2) + alive = [worker for worker in svc._workers if worker.thread.is_alive()] assert len(alive) == 1 def test_set_max_workers_clamps_to_one(self): svc = TransferService(notify_window=None, max_workers=2) svc.set_max_workers(0) assert svc._max_workers == 1 + + def test_set_max_workers_replaces_pool_while_jobs_are_active(self): + started = threading.Event() + release = threading.Event() + completed: list[str] = [] + completion_lock = threading.Lock() + mock_client = MagicMock() + + def slow_download(src, fh, callback=None, offset=0): + started.set() + release.wait(timeout=5) + with completion_lock: + completed.append(PurePosixPath(src).name) + + mock_client.download.side_effect = slow_download + + svc = TransferService(notify_window=None, max_workers=1) + with patch("builtins.open", return_value=MagicMock(spec=io.BufferedWriter)): + j1 = svc.submit_download(mock_client, "/r/a.txt", "/tmp/a.txt") + started.wait(timeout=5) + + old_workers = svc._workers.copy() + svc.set_max_workers(2) + j2 = svc.submit_download(mock_client, "/r/b.txt", "/tmp/b.txt") + release.set() + + _wait_for_terminal(j1) + _wait_for_terminal(j2) + + assert j1.status == TransferStatus.COMPLETE + assert j2.status == TransferStatus.COMPLETE + assert len([worker for worker in svc._workers if worker.thread.is_alive()]) == 2 + assert all(worker.stop_event.is_set() for worker in old_workers) + assert sorted(completed) == ["a.txt", "b.txt"]