Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 11 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions crates/filesync/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ gtk = "0.18.2"
rfd = "0.17.2"
env_logger.workspace = true
dirs = "6.0.0"
fs2 = "0.4"
toml.workspace = true
tray-icon = "0.21.3"
iced = { version = "0.14.0", features = ["tokio"] }
Expand Down
89 changes: 88 additions & 1 deletion crates/filesync/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -406,6 +406,50 @@ impl Client {
);
}

// ── Preemptive disk-space check (client) ──────────────────────────────
// Simulate the server's send-list computation (is_server = true) to
// predict exactly which bytes will arrive. We do this *before* sending
// our ManifestExchange so that, if we are out of space, the server is
// notified before it starts transmitting anything.
let bytes_incoming: u64 = manifest::compute_send_list(&remote, &local, true)
.iter()
.filter_map(|p| remote.files.get(p))
.filter(|m| !m.is_dir)
.map(|m| m.size)
.sum();
if bytes_incoming > 0 {
match common::check_disk_space(self.engine.root(), bytes_incoming) {
Ok(avail) => {
debug!(
"filesync session: disk-space check OK — {} B available, {} B required",
avail, bytes_incoming
);
}
Err(_) => {
let avail = common::available_disk_space(self.engine.root()).unwrap_or(0);
error!(
"filesync: not enough disk space on client — \
{} B available, {} B required; aborting sync",
avail, bytes_incoming
);
// Notify the server before closing so it can surface the
// real reason rather than a generic connection error.
let _ = conn.send(&Message::InsufficientDiskSpace {
available_bytes: avail,
required_bytes: bytes_incoming,
});
return Err(io::Error::new(
io::ErrorKind::StorageFull,
format!(
"filesync: client out of disk space: \
{} B available, {} B required",
avail, bytes_incoming
),
));
}
}
}

debug!("filesync session: sending local ManifestExchange");
conn.send(&Message::ManifestExchange(local.clone()))?;

Expand Down Expand Up @@ -463,7 +507,7 @@ impl Client {
b.bundle_id, n_files, n_dirs, bundle_bytes,
bytes_received + bundle_bytes
);
let n = self.engine.apply_bundle(&b)?;
let n = self.engine.apply_bundle(&b)?.written;
for fd in &b.files {
if fd.metadata.is_dir {
dirs_received += 1;
Expand Down Expand Up @@ -542,6 +586,21 @@ impl Client {
);
info!("filesync: initial sync large file committed {path:?}");
}
crate::sync_engine::FinishResult::CommittedWithConflict(ci) => {
files_received += 1;
if let Some(ref gs) = self.gui_state {
gs.write().files_received = files_received as u64;
}
debug!(
"filesync session: large file committed {path:?} (files_received={})",
files_received
);
warn!(
"filesync: initial sync large file committed {path:?} \
(conflict copy: {:?})",
ci.conflict_copy_path
);
}
crate::sync_engine::FinishResult::MissingChunks(indices) => {
warn!(
"filesync: initial sync {path:?} missing {} chunk(s), requesting retransmit",
Expand All @@ -565,6 +624,24 @@ impl Client {
);
break;
}
Message::InsufficientDiskSpace {
available_bytes,
required_bytes,
} => {
error!(
"filesync session: server reports insufficient disk space — \
{} B available, {} B required; aborting sync",
available_bytes, required_bytes
);
return Err(io::Error::new(
io::ErrorKind::StorageFull,
format!(
"filesync: server out of disk space: \
{} B available, {} B required",
available_bytes, required_bytes
),
));
}
other => {
warn!("filesync session: unexpected message during initial-sync recv phase");
debug!("filesync session: unexpected message variant: {other:?}");
Expand Down Expand Up @@ -806,6 +883,16 @@ fn recv_loop(engine: Arc<SyncEngine>, conn: Arc<Connection>, bus: Option<Arc<Mes
}
return;
}
Ok(Message::InsufficientDiskSpace {
available_bytes,
required_bytes,
}) => {
error!(
"{prefix}: server reports insufficient disk space — \
{available_bytes} B available, {required_bytes} B required; disconnecting"
);
return;
}
Ok(other) => {
warn!("{prefix}: unexpected message in live sync phase — possible protocol issue");
debug!("{prefix}: unexpected message variant: {other:?}");
Expand Down
109 changes: 105 additions & 4 deletions crates/filesync/src/common.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,14 @@
//! Shared helpers used by both the filesync **client** and **server**.
//!
//! Includes preemptive disk-space checking via [`check_disk_space`] /
//! [`available_disk_space`], which are called after a manifest exchange to
//! abort the sync early when the local filesystem has insufficient room.
//!
//! Everything that was duplicated between `client.rs` and `server.rs` lives
//! here so that bug-fixes and behavioural changes only need to happen once.

use crate::protocol::*;
use crate::sync_engine::{ChunkResult, FinishResult, SyncEngine};
use crate::sync_engine::{ChunkResult, ConflictInfo, FinishResult, SyncEngine};
use crate::transport::Connection;
use crate::watcher::FsEvent;

Expand All @@ -17,6 +21,36 @@ use std::path::{Path, PathBuf};
use std::sync::Arc;
use std::time::Instant;

// ── Disk-space helpers ────────────────────────────────────────────────────

/// Returns the number of bytes currently available on the filesystem that
/// contains `path`. Thin wrapper over [`fs2::available_space`].
pub fn available_disk_space(path: &Path) -> io::Result<u64> {
fs2::available_space(path)
}

/// Asserts that at least `required_bytes` are available on the filesystem
/// that contains `path`.
///
/// * `Ok(available)` – there is enough space; `available` is the free-byte
/// count at the time of the check.
/// * `Err(StorageFull)` – not enough space; the error message includes both
/// the available and required byte counts.
pub fn check_disk_space(path: &Path, required_bytes: u64) -> io::Result<u64> {
let available = available_disk_space(path)?;
if available < required_bytes {
Err(io::Error::new(
io::ErrorKind::StorageFull,
format!(
"insufficient disk space: {} B available, {} B required",
available, required_bytes
),
))
} else {
Ok(available)
}
}

// ── Pending-changes accumulator ──────────────────────────────────────────

const MAX_BATCH_MS: u64 = 2_000;
Expand Down Expand Up @@ -256,6 +290,8 @@ pub struct BundleApplied {
pub dirs_count: usize,
/// Total bytes across all entries in the bundle.
pub bytes: u64,
/// Conflict copies created while applying this bundle.
pub conflicts: Vec<ConflictInfo>,
}

/// Outcome of processing a `LargeFileChunk` message.
Expand Down Expand Up @@ -290,15 +326,36 @@ pub fn handle_recv_bundle(
bus: &Option<Arc<MessageBus>>,
log_prefix: &str,
) -> io::Result<BundleApplied> {
let applied = engine.apply_bundle(bundle)?;
let apply_result = engine.apply_bundle(bundle)?;

let files_count = bundle.files.iter().filter(|f| !f.metadata.is_dir).count();
let dirs_count = bundle.files.iter().filter(|f| f.metadata.is_dir).count();
let bytes: u64 = bundle.files.iter().map(|f| f.metadata.size).sum();

let applied = apply_result.written;

info!("{log_prefix}: +{applied} file(s) from {peer}");
publish_changed(bus, bundle, peer);

for ci in &apply_result.conflicts {
warn!(
"{log_prefix}: conflict copy from {peer}: {:?} → {:?}",
ci.original_path, ci.conflict_copy_path
);
if let Some(ref bus) = bus {
bus.publish(
"filesync",
"filesync.conflict_copy",
serde_json::json!({
"original": ci.original_path,
"conflict_copy": ci.conflict_copy_path,
"peer": peer,
"node": engine.node_id(),
}),
);
}
}

if files_count > 0 {
if let Some(ref bus) = bus {
bus.publish(
Expand All @@ -320,6 +377,7 @@ pub fn handle_recv_bundle(
files_count,
dirs_count,
bytes,
conflicts: apply_result.conflicts,
})
}

Expand Down Expand Up @@ -351,10 +409,19 @@ pub fn handle_recv_large_file_chunk(
) -> io::Result<ChunkOutcome> {
match engine.receive_large_file_chunk(path, chunk_index, data)? {
ChunkResult::ReadyToCommit(hash) => {
engine.commit_large_file(path, hash).map_err(|e| {
match engine.commit_large_file(path, hash).map_err(|e| {
error!("{log_prefix}: large file commit after retransmit from {peer}: {e}");
e
})?;
})? {
FinishResult::Committed => {}
FinishResult::CommittedWithConflict(ci) => {
warn!(
"{log_prefix}: conflict copy during retransmit commit: {:?} → {:?}",
ci.original_path, ci.conflict_copy_path
);
}
FinishResult::MissingChunks(_) => {}
}
info!("{log_prefix}: large file committed {path:?} from {peer} (after retransmit)");
Ok(ChunkOutcome::Committed)
}
Expand Down Expand Up @@ -392,6 +459,40 @@ pub fn handle_recv_large_file_end(
}
Ok(LargeFileEndOutcome::Committed)
}
FinishResult::CommittedWithConflict(ci) => {
info!(
"{log_prefix}: large file committed {path:?} from {peer} \
(conflict copy: {:?})",
ci.conflict_copy_path
);
if let Some(ref bus) = bus {
bus.publish(
"filesync",
"filesync.conflict_copy",
serde_json::json!({
"original": ci.original_path,
"conflict_copy": ci.conflict_copy_path,
"peer": peer,
"node": engine.node_id(),
}),
);
bus.publish(
"filesync",
"filesync.file_changed",
serde_json::json!({ "path": path, "node": peer }),
);
bus.publish(
"filesync",
"filesync.incremental_stats",
serde_json::json!({
"node": engine.node_id(),
"peer": peer,
"files_changed": 1,
}),
);
}
Ok(LargeFileEndOutcome::Committed)
}
FinishResult::MissingChunks(indices) => {
warn!(
"{log_prefix}: {path:?} from {peer} missing {} chunk(s), requesting retransmit",
Expand Down
10 changes: 10 additions & 0 deletions crates/filesync/src/protocol.rs
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,16 @@ pub enum Message {
from: PathBuf,
to: PathBuf,
},

/// Sent by either side when a preemptive disk-space check fails after
/// receiving the remote manifest. The receiver should abort the sync
/// and surface the error to the user / operator.
InsufficientDiskSpace {
/// Free bytes on the sender's filesystem at the time of the check.
available_bytes: u64,
/// Bytes that would have been needed to complete the sync.
required_bytes: u64,
},
}

pub fn serialise_message(msg: &Message) -> io::Result<Vec<u8>> {
Expand Down
Loading
Loading