Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 24 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,30 @@
All notable changes to this project will be documented in this file.

## [Unreleased]
## [0.2.0] - 2026-03-10

### Added
- Activity log console panel with Prism screen reader announcements, F6 pane cycling, Ctrl+1/2/3 shortcuts, and Tab navigation (#94)
- Decoupled transfer logic from dialog — transfers now run in the background (#95)
- One-click retry for failed transfers (#101)
- Persist transfer queue across app sessions — restored jobs survive crashes and restarts (#100)
- Queue additional files during an active transfer (#103)
- Resume interrupted downloads from byte offset instead of restarting (#109)
- Concurrent transfers setting wired into worker pool — honors max parallel transfers from settings (#110)
- Dedicated Updates tab in settings (#90)

### Fixed
- Reset progress display to 0% immediately on retry
- Announce transfer cancellation immediately with clear messaging (#86, #92)
- Add cancel/close button to Site Manager dialog (#80)
- Add colons to file list and toolbar field labels for screen reader clarity (#108)
- Associate StaticText labels with file lists via SetLabelFor
- Use SetLabel() for ListCtrl and name= for ListBox accessible names
- Resolve Tab focus trap in activity log panel
- Switch activity log to TextCtrl with HSCROLL for reliable NVDA reading (#104)
- Read version and build info from _build_meta when available (#105)



---

Expand Down
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[project]
name = "portkeydrop"
version = "0.1.1"
version = "0.2.0"
description = "A keyboard-driven file transfer client for FTP, SFTP, FTPS, SCP, and WebDAV"
requires-python = ">=3.11,<3.13"
dependencies = [
Expand Down
2 changes: 1 addition & 1 deletion src/portkeydrop/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,4 +3,4 @@
try:
from portkeydrop._build_meta import __version__ # type: ignore[import]
except ImportError:
__version__ = "0.1.1"
__version__ = "0.2.0"
178 changes: 110 additions & 68 deletions src/portkeydrop/services/transfer_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -91,20 +91,25 @@ def from_dict(cls, data: dict) -> TransferJob:
)


@dataclass
class _Worker:
"""Tracks a worker thread and its stop signal for pool replacement."""

thread: threading.Thread
stop_event: threading.Event


class TransferService:
"""Owns the transfer queue and a pool of daemon worker threads."""

def __init__(self, notify_window: Any | None = None, max_workers: int = 1) -> None:
self._notify_window = notify_window
self._queue: queue.Queue[TransferJob | None] = queue.Queue()
self._queue: queue.Queue[TransferJob] = queue.Queue()
self._jobs: list[TransferJob] = []
self._lock = threading.Lock()
self._lock = threading.RLock()
self._max_workers = max(1, max_workers)
self._workers: list[threading.Thread] = []
for _ in range(self._max_workers):
t = threading.Thread(target=self._worker_loop, daemon=True)
t.start()
self._workers.append(t)
self._workers: list[_Worker] = []
self._replace_worker_pool(self._max_workers)

# ------------------------------------------------------------------
# Public API
Expand Down Expand Up @@ -212,23 +217,13 @@ def cancel(self, job_id: str) -> None:
def set_max_workers(self, n: int) -> None:
"""Resize the worker pool to *n* threads.

Extra workers are drained via a ``None`` sentinel on the queue;
missing workers are spawned immediately.
Existing workers finish at most their current job before exiting;
replacement workers start immediately against the same shared queue.
"""
n = max(1, n)
with self._lock:
# Prune threads that have already exited
self._workers = [t for t in self._workers if t.is_alive()]
current = len(self._workers)
if n > current:
for _ in range(n - current):
t = threading.Thread(target=self._worker_loop, daemon=True)
t.start()
self._workers.append(t)
elif n < current:
for _ in range(current - n):
self._queue.put(None) # sentinel to stop one worker
self._max_workers = n
self._replace_worker_pool(n)

# ------------------------------------------------------------------
# Internal
Expand All @@ -240,17 +235,49 @@ def _enqueue(self, job: TransferJob) -> None:
self._queue.put(job)
self._post_event()

def _worker_loop(self) -> None:
def _replace_worker_pool(self, size: int) -> None:
with self._lock:
old_workers = self._workers
self._workers = []
for _worker in old_workers:
_worker.stop_event.set()
for _ in range(size):
stop_event = threading.Event()
thread = threading.Thread(
target=self._worker_loop,
args=(stop_event,),
daemon=True,
)
thread.start()
self._workers.append(_Worker(thread=thread, stop_event=stop_event))
self._workers = [worker for worker in self._workers if worker.thread.is_alive()]

for worker in old_workers:
worker.thread.join(timeout=1)

with self._lock:
self._workers = [worker for worker in self._workers if worker.thread.is_alive()]

def _worker_loop(self, stop_event: threading.Event) -> None:
while True:
job = self._queue.get()
if job is None: # shutdown sentinel
break
if stop_event.is_set():
return
try:
job = self._queue.get(timeout=0.1)
except queue.Empty:
continue
if stop_event.is_set():
self._queue.put(job)
return

if job.cancel_event.is_set():
job.status = TransferStatus.CANCELLED
with self._lock:
job.status = TransferStatus.CANCELLED
self._post_event()
continue
try:
job.status = TransferStatus.IN_PROGRESS
with self._lock:
job.status = TransferStatus.IN_PROGRESS
self._post_event()
if job.direction == TransferDirection.DOWNLOAD:
if job._recursive:
Expand All @@ -262,15 +289,19 @@ def _worker_loop(self) -> None:
self._run_recursive_upload(job)
else:
self._run_upload(job)
if job.status == TransferStatus.IN_PROGRESS:
job.status = TransferStatus.COMPLETE
with self._lock:
if job.status == TransferStatus.IN_PROGRESS:
job.status = TransferStatus.COMPLETE
except InterruptedError:
job.status = TransferStatus.CANCELLED
with self._lock:
job.status = TransferStatus.CANCELLED
except Exception as exc:
job.status = TransferStatus.FAILED
job.error = str(exc)
with self._lock:
job.status = TransferStatus.FAILED
job.error = str(exc)
logger.exception("Transfer failed: %s", job.id)
self._update_progress(job)
with self._lock:
self._update_progress(job)
self._post_event()

# --- single-file transfers ---
Expand All @@ -290,10 +321,11 @@ def _run_download(self, job: TransferJob) -> None:
def _cb(transferred: int, total: int) -> None:
if job.cancel_event.is_set():
raise InterruptedError("Transfer cancelled")
job.transferred_bytes = offset + transferred
if total > 0:
job.total_bytes = total
self._update_progress(job)
with self._lock:
job.transferred_bytes = offset + transferred
if total > 0:
job.total_bytes = total
self._update_progress(job)
self._post_event()

client.download(job.source, f, callback=_cb, offset=offset)
Expand All @@ -305,10 +337,11 @@ def _run_upload(self, job: TransferJob) -> None:
def _cb(transferred: int, total: int) -> None:
if job.cancel_event.is_set():
raise InterruptedError("Transfer cancelled")
job.transferred_bytes = transferred
if total > 0:
job.total_bytes = total
self._update_progress(job)
with self._lock:
job.transferred_bytes = transferred
if total > 0:
job.total_bytes = total
self._update_progress(job)
self._post_event()

job._client.upload(f, job.destination, callback=_cb)
Expand All @@ -329,9 +362,10 @@ def _run_recursive_download(self, job: TransferJob) -> None:
file_queue[i] = (remote_file, local_file, real_size)
except Exception:
pass
job.total_bytes = sum(s for _, _, s in file_queue)
job.transferred_bytes = 0
self._update_progress(job)
with self._lock:
job.total_bytes = sum(s for _, _, s in file_queue)
job.transferred_bytes = 0
self._update_progress(job)
self._post_event()

for remote_file, local_file, _size in file_queue:
Expand All @@ -344,8 +378,9 @@ def _run_recursive_download(self, job: TransferJob) -> None:
def _cb(transferred: int, total: int, _base=base) -> None:
if job.cancel_event.is_set():
raise InterruptedError("Transfer cancelled")
job.transferred_bytes = _base + transferred
self._update_progress(job)
with self._lock:
job.transferred_bytes = _base + transferred
self._update_progress(job)
self._post_event()

client.download(remote_file, f, callback=_cb)
Expand All @@ -355,9 +390,10 @@ def _run_recursive_upload(self, job: TransferJob) -> None:
client = job._client
file_queue: list[tuple[str, str, int]] = []
self._collect_local_files(job.source, job.destination, file_queue)
job.total_bytes = sum(s for _, _, s in file_queue)
job.transferred_bytes = 0
self._update_progress(job)
with self._lock:
job.total_bytes = sum(s for _, _, s in file_queue)
job.transferred_bytes = 0
self._update_progress(job)
self._post_event()

# Create directories
Expand All @@ -382,8 +418,9 @@ def _run_recursive_upload(self, job: TransferJob) -> None:
def _cb(transferred: int, total: int, _base=base) -> None:
if job.cancel_event.is_set():
raise InterruptedError("Transfer cancelled")
job.transferred_bytes = _base + transferred
self._update_progress(job)
with self._lock:
job.transferred_bytes = _base + transferred
self._update_progress(job)
self._post_event()

client.upload(f, remote_file, callback=_cb)
Expand Down Expand Up @@ -411,8 +448,9 @@ def _resolve_download_offset(self, job: TransferJob, client: TransferClient) ->
local_size = os.path.getsize(job.destination)
except OSError:
logger.info("Resume: partial file missing, restarting %s", job.id)
job.transferred_bytes = 0
job.progress = 0
with self._lock:
job.transferred_bytes = 0
job.progress = 0
self._snapshot_remote_metadata(job, client)
return 0

Expand All @@ -423,8 +461,9 @@ def _resolve_download_offset(self, job: TransferJob, client: TransferClient) ->
job.transferred_bytes,
job.id,
)
job.transferred_bytes = 0
job.progress = 0
with self._lock:
job.transferred_bytes = 0
job.progress = 0
self._snapshot_remote_metadata(job, client)
return 0

Expand All @@ -433,8 +472,9 @@ def _resolve_download_offset(self, job: TransferJob, client: TransferClient) ->
remote_info = client.stat(job.source)
except Exception:
logger.info("Resume: cannot stat remote file, restarting %s", job.id)
job.transferred_bytes = 0
job.progress = 0
with self._lock:
job.transferred_bytes = 0
job.progress = 0
return 0

remote_size = remote_info.size
Expand All @@ -447,9 +487,10 @@ def _resolve_download_offset(self, job: TransferJob, client: TransferClient) ->
remote_size,
job.id,
)
job.transferred_bytes = 0
job.progress = 0
job._remote_mtime = remote_mtime
with self._lock:
job.transferred_bytes = 0
job.progress = 0
job._remote_mtime = remote_mtime
return 0

if (
Expand All @@ -458,24 +499,25 @@ def _resolve_download_offset(self, job: TransferJob, client: TransferClient) ->
and remote_mtime != job._remote_mtime
):
logger.info("Resume: remote mtime changed, restarting %s", job.id)
job.transferred_bytes = 0
job.progress = 0
job._remote_mtime = remote_mtime
with self._lock:
job.transferred_bytes = 0
job.progress = 0
job._remote_mtime = remote_mtime
return 0

# All checks passed — resume from offset
logger.info("Resume: resuming %s from byte %d", job.id, job.transferred_bytes)
return job.transferred_bytes

@staticmethod
def _snapshot_remote_metadata(job: TransferJob, client: TransferClient) -> None:
def _snapshot_remote_metadata(self, job: TransferJob, client: TransferClient) -> None:
"""Record remote file size/mtime on the job for later resume validation."""
try:
info = client.stat(job.source)
if info.modified:
job._remote_mtime = info.modified.timestamp()
if info.size > 0 and job.total_bytes == 0:
job.total_bytes = info.size
with self._lock:
if info.modified:
job._remote_mtime = info.modified.timestamp()
if info.size > 0 and job.total_bytes == 0:
job.total_bytes = info.size
except Exception:
pass

Expand Down
Loading
Loading