Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
148 changes: 125 additions & 23 deletions src/main/java/network/crypta/client/async/ClientLayerPersister.java
Original file line number Diff line number Diff line change
Expand Up @@ -13,12 +13,18 @@
import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;
import java.io.OutputStream;
import java.io.StreamCorruptedException;
import java.nio.file.Files;
import java.security.SecureRandom;
import java.util.ArrayDeque;
import java.util.Collections;
import java.util.HashMap;
import java.util.IdentityHashMap;
import java.util.Map;
import java.util.Set;
import network.crypta.clients.fcp.ClientRequest;
import network.crypta.clients.fcp.RequestIdentifier;
import network.crypta.crypt.AEADVerificationFailedException;
import network.crypta.crypt.CRCChecksumChecker;
import network.crypta.crypt.ChecksumChecker;
import network.crypta.crypt.ChecksumFailedException;
Expand Down Expand Up @@ -181,18 +187,44 @@ private void loadAllVariants(
File clientDatBakCrypt = new File(dir, baseName + EXT_BAK + EXT_CRYPT);

if (clientDat.exists()) {
innerLoad(loaded, makeBucket(dir, baseName, false, null), noSerialize, requestStarters);
innerLoad(
loaded,
makeBucket(dir, baseName, false, null),
clientDat,
false,
false,
noSerialize,
requestStarters);
}
if (clientDatCrypt.exists() && loaded.needsMore()) {
innerLoad(
loaded, makeBucket(dir, baseName, false, encryptionKey), noSerialize, requestStarters);
loaded,
makeBucket(dir, baseName, false, encryptionKey),
clientDatCrypt,
true,
false,
noSerialize,
requestStarters);
}
if (clientDatBak.exists()) {
innerLoad(loaded, makeBucket(dir, baseName, true, null), noSerialize, requestStarters);
innerLoad(
loaded,
makeBucket(dir, baseName, true, null),
clientDatBak,
false,
true,
noSerialize,
requestStarters);
}
if (clientDatBakCrypt.exists() && loaded.needsMore()) {
innerLoad(
loaded, makeBucket(dir, baseName, true, encryptionKey), noSerialize, requestStarters);
loaded,
makeBucket(dir, baseName, true, encryptionKey),
clientDatBakCrypt,
true,
true,
noSerialize,
requestStarters);
}
}

Expand Down Expand Up @@ -526,38 +558,108 @@ public boolean doneSomething() {
}

private void innerLoad(
PartialLoad loaded, Bucket bucket, boolean noSerialize, RequestStarterGroup requestStarters) {
PartialLoad loaded,
Bucket bucket,
File variantFile,
boolean encryptedVariant,
boolean backupVariant,
boolean noSerialize,
RequestStarterGroup requestStarters) {
long length = bucket.size();
try (InputStream fis = bucket.getInputStream()) {
doLoadFromStream(
innerLoad(
loaded,
fis,
length,
!noSerialize && !loaded.doneSomething(),
requestStarters,
noSerialize,
bucket);
} catch (IOException e) {
noSerialize);
} catch (Exception e) {
// Mark this variant as failed so callers continue probing other backups/variants.
loaded.setSomethingFailed();
LOG.warn("I/O error reading persistence bucket {}: {}", bucket, e.toString());
logVariantReadFailure(variantFile, encryptedVariant, backupVariant, e);
}
}

private void doLoadFromStream(
PartialLoad loaded,
InputStream fis,
long length,
boolean latest,
RequestStarterGroup requestStarters,
boolean noSerialize,
Bucket bucket) {
try {
innerLoad(loaded, fis, length, latest, requestStarters, noSerialize);
} catch (Exception t) {
LOG.error("Failed to deserialize persistent requests from {}: {}", bucket, t, t);
loaded.setSomethingFailed();
private void logVariantReadFailure(
File variantFile, boolean encryptedVariant, boolean backupVariant, Exception failure) {
String variantType = backupVariant ? "backup" : "primary";
String path = variantFile.getAbsolutePath();
if (isExpectedUnreadableVariantFailure(failure, encryptedVariant)) {
if (LOG.isWarnEnabled()) {
String unreadableReason = summarizeUnreadableVariantFailure(failure, encryptedVariant);
LOG.warn(
"Skipping unreadable {} persistence variant {} (encrypted={}): {}",
variantType,
path,
encryptedVariant,
unreadableReason);
}
return;
}
if (failure instanceof IOException) {
if (LOG.isWarnEnabled()) {
String ioFailure = failure.toString();
LOG.warn(
"I/O error reading {} persistence variant {} (encrypted={}): {}",
variantType,
path,
encryptedVariant,
ioFailure);
}
return;
}
LOG.error(
"Failed to deserialize {} persistence variant {} (encrypted={}): {}",
variantType,
path,
encryptedVariant,
failure,
failure);
}

static boolean isExpectedUnreadableVariantFailure(Throwable failure, boolean encryptedVariant) {
if (failure == null) return false;
if (findThrowableInTree(failure, StreamCorruptedException.class) != null) {
return true;
}
return encryptedVariant
&& findThrowableInTree(failure, AEADVerificationFailedException.class) != null;
}

private static String summarizeUnreadableVariantFailure(
Throwable failure, boolean encryptedVariant) {
Throwable streamCorrupted = findThrowableInTree(failure, StreamCorruptedException.class);
if (streamCorrupted != null) return streamCorrupted.toString();
if (encryptedVariant) {
Throwable authFailure = findThrowableInTree(failure, AEADVerificationFailedException.class);
if (authFailure != null) return authFailure.toString();
}
return failure.toString();
}

private static <T extends Throwable> T findThrowableInTree(
Throwable failure, Class<T> targetType) {
Set<Throwable> visited = Collections.newSetFromMap(new IdentityHashMap<>());
ArrayDeque<Throwable> queue = new ArrayDeque<>();
queue.add(failure);
while (!queue.isEmpty()) {
Throwable current = queue.removeFirst();
if (!visited.add(current)) continue;
if (targetType.isInstance(current)) {
return targetType.cast(current);
}
Throwable cause = current.getCause();
if (cause != null) {
queue.addLast(cause);
}
for (Throwable suppressed : current.getSuppressed()) {
if (suppressed != null) {
queue.addLast(suppressed);
}
}
}
return null;
}

private void innerLoad(
Expand Down
5 changes: 4 additions & 1 deletion src/main/java/network/crypta/node/Node.java
Original file line number Diff line number Diff line change
Expand Up @@ -465,9 +465,12 @@ private void writeNodeFile(File orig, File backup) {

try (FileOutputStream fos = new FileOutputStream(backup)) {
fs.writeTo(fos);
FileUtil.moveTo(backup, orig);
} catch (IOException ioe) {
LOG.error("IOE :{}", ioe.getMessage(), ioe);
return;
}
if (!FileUtil.moveTo(backup, orig)) {
LOG.error("Failed to replace node file {} with backup {}", orig, backup);
}
}

Expand Down
29 changes: 17 additions & 12 deletions src/main/java/network/crypta/node/OpennetManager.java
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,7 @@ public class OpennetManager {
private final Announcer announcer;

private final SeedAnnounceTracker seedTracker = new SeedAnnounceTracker();
private final Object writeFileSync = new Object();

/* The routing table is split into "buckets" by distance, each of which has a separate LRU
* list. For now there are only 2 buckets; the PETS paper suggested many buckets, but this
Expand Down Expand Up @@ -516,16 +517,17 @@ public OpennetManager(
/**
* Persist current opennet crypto state to disk.
*
* <p>The state is written to {@code opennet-<port>} using a temporary backup file. Failures are
* intentionally swallowed to preserve runtime behavior, so callers should treat persistence as
* best-effort. This method does not synchronize on the manager; invoke it from safe contexts that
* do not race with crypto replacement or concurrent file writes.
* <p>The state is written to {@code opennet-<port>} using a temporary backup file, then moved
* into place once the writer is closed. Failures are logged and the method returns without
* throwing, so callers should treat persistence as best-effort.
*/
public void writeFile() {
File nodeFile = node.nodeDir().file(OPENNET_FILE_PREFIX + crypto.getPortNumber());
File backupNodeFile =
node.nodeDir().file(OPENNET_FILE_PREFIX + crypto.getPortNumber() + ".bak");
writeFile(nodeFile, backupNodeFile);
synchronized (writeFileSync) {
File nodeFile = node.nodeDir().file(OPENNET_FILE_PREFIX + crypto.getPortNumber());
File backupNodeFile =
node.nodeDir().file(OPENNET_FILE_PREFIX + crypto.getPortNumber() + ".bak");
writeFile(nodeFile, backupNodeFile);
}
}

private void writeFile(File orig, File backup) {
Expand All @@ -543,10 +545,13 @@ private void writeFile(File orig, File backup) {
OutputStreamWriter osr = new OutputStreamWriter(fos, StandardCharsets.UTF_8);
BufferedWriter bw = new BufferedWriter(osr)) {
fs.writeTo(bw);
bw.flush(); // Ensure data is written before moving the file
FileUtil.moveTo(backup, orig);
} catch (IOException _) {
// Resources are automatically closed by try-with-resources
bw.flush();
} catch (IOException e) {
LOG.error("Failed writing opennet backup {}: {}", backup, e, e);
return;
}
if (!FileUtil.moveTo(backup, orig)) {
LOG.error("Failed to replace opennet file {} with backup {}", orig, backup);
}
}

Expand Down
88 changes: 84 additions & 4 deletions src/main/java/network/crypta/support/io/FileUtil.java
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,10 @@ public final class FileUtil {
*/
public static final int BUFFER_SIZE = 32 * 1024;

private static final int REPLACE_MOVE_MAX_ATTEMPTS = 5;
private static final long REPLACE_MOVE_BASE_BACKOFF_MILLIS = 50;
private static final long REPLACE_MOVE_MAX_BACKOFF_MILLIS = 400;

private static final Random SEED_GENERATOR =
MersenneTwister.createSynchronized(NodeStarter.getGlobalSecureRandom().generateSeed(32));

Expand Down Expand Up @@ -507,6 +511,13 @@ public static boolean moveTo(File orig, File dest, boolean overwrite) {
public static boolean moveTo(File orig, File dest) {
Path source = orig.toPath();
Path target = dest.toPath();
if (tryAtomicMove(source, target, orig, dest)) {
return true;
}
return moveWithReplaceRetries(source, target, orig, dest);
}

private static boolean tryAtomicMove(Path source, Path target, File orig, File dest) {
try {
Files.move(source, target, StandardCopyOption.ATOMIC_MOVE);
return true;
Expand All @@ -517,17 +528,86 @@ public static boolean moveTo(File orig, File dest) {
} catch (IOException e) {
// On Windows this frequently fails when replacing an existing file; retry with
// REPLACE_EXISTING before giving up.
LOG.warn("Atomic move failed for {} -> {}, retrying non-atomically: {}", orig, dest, e);
if (LOG.isWarnEnabled()) {
String atomicFailure = e.toString();
LOG.warn(
"Atomic move failed for {} -> {}, retrying non-atomically: {}",
orig,
dest,
atomicFailure);
}
}
return false;
}

private static boolean moveWithReplaceRetries(Path source, Path target, File orig, File dest) {
IOException lastFailure = null;
for (int attempt = 1; attempt <= REPLACE_MOVE_MAX_ATTEMPTS; attempt++) {
try {
Files.move(source, target, StandardCopyOption.REPLACE_EXISTING);
return true;
} catch (IOException e) {
lastFailure = e;
if (attempt == 1) {
LOG.warn(
"Replace-existing move failed for {} -> {} (attempt {}/{}), retrying: {}",
orig,
dest,
attempt,
REPLACE_MOVE_MAX_ATTEMPTS,
e.toString());
} else if (attempt < REPLACE_MOVE_MAX_ATTEMPTS && LOG.isDebugEnabled()) {
LOG.debug(
"Replace-existing move retry failed for {} -> {} (attempt {}/{}): {}",
orig,
dest,
attempt,
REPLACE_MOVE_MAX_ATTEMPTS,
e.toString());
}
if (attempt == REPLACE_MOVE_MAX_ATTEMPTS) break;
if (!sleepBeforeReplaceMoveRetry(orig, dest, attempt)) {
return false;
}
}
}
LOG.error(
"Replace-existing move failed for {} -> {} after {} attempts: {}",
orig,
dest,
REPLACE_MOVE_MAX_ATTEMPTS,
lastFailure,
lastFailure);
return false;
}

private static boolean sleepBeforeReplaceMoveRetry(File orig, File dest, int attempt) {
long backoffMillis = retryBackoffMillis(attempt);
try {
Files.move(source, target, StandardCopyOption.REPLACE_EXISTING);
Thread.sleep(backoffMillis);
return true;
} catch (IOException e) {
LOG.error("Replace-existing move failed for {} -> {}: {}", orig, dest, e, e);
} catch (InterruptedException interrupted) {
Thread.currentThread().interrupt();
LOG.error(
"Interrupted while retrying replace-existing move for {} -> {} (attempt {}/{}).",
orig,
dest,
attempt,
REPLACE_MOVE_MAX_ATTEMPTS,
interrupted);
return false;
}
}

private static long retryBackoffMillis(int failedAttempt) {
long backoff = REPLACE_MOVE_BASE_BACKOFF_MILLIS;
int shifts = failedAttempt - 1;
if (shifts > 0) {
backoff <<= Math.min(shifts, 30);
}
return Math.min(backoff, REPLACE_MOVE_MAX_BACKOFF_MILLIS);
}

/**
* Produces a safe filename for the specified target operating system.
*
Expand Down
Loading
Loading