diff --git a/src/main/java/network/crypta/client/async/ClientLayerPersister.java b/src/main/java/network/crypta/client/async/ClientLayerPersister.java index 5aa981f77d..8c1dc88aa2 100644 --- a/src/main/java/network/crypta/client/async/ClientLayerPersister.java +++ b/src/main/java/network/crypta/client/async/ClientLayerPersister.java @@ -13,12 +13,18 @@ import java.io.ObjectInputStream; import java.io.ObjectOutputStream; import java.io.OutputStream; +import java.io.StreamCorruptedException; import java.nio.file.Files; import java.security.SecureRandom; +import java.util.ArrayDeque; +import java.util.Collections; import java.util.HashMap; +import java.util.IdentityHashMap; import java.util.Map; +import java.util.Set; import network.crypta.clients.fcp.ClientRequest; import network.crypta.clients.fcp.RequestIdentifier; +import network.crypta.crypt.AEADVerificationFailedException; import network.crypta.crypt.CRCChecksumChecker; import network.crypta.crypt.ChecksumChecker; import network.crypta.crypt.ChecksumFailedException; @@ -181,18 +187,44 @@ private void loadAllVariants( File clientDatBakCrypt = new File(dir, baseName + EXT_BAK + EXT_CRYPT); if (clientDat.exists()) { - innerLoad(loaded, makeBucket(dir, baseName, false, null), noSerialize, requestStarters); + innerLoad( + loaded, + makeBucket(dir, baseName, false, null), + clientDat, + false, + false, + noSerialize, + requestStarters); } if (clientDatCrypt.exists() && loaded.needsMore()) { innerLoad( - loaded, makeBucket(dir, baseName, false, encryptionKey), noSerialize, requestStarters); + loaded, + makeBucket(dir, baseName, false, encryptionKey), + clientDatCrypt, + true, + false, + noSerialize, + requestStarters); } if (clientDatBak.exists()) { - innerLoad(loaded, makeBucket(dir, baseName, true, null), noSerialize, requestStarters); + innerLoad( + loaded, + makeBucket(dir, baseName, true, null), + clientDatBak, + false, + true, + noSerialize, + requestStarters); } if (clientDatBakCrypt.exists() && loaded.needsMore()) { innerLoad( - loaded, makeBucket(dir, baseName, true, encryptionKey), noSerialize, requestStarters); + loaded, + makeBucket(dir, baseName, true, encryptionKey), + clientDatBakCrypt, + true, + true, + noSerialize, + requestStarters); } } @@ -526,38 +558,108 @@ public boolean doneSomething() { } private void innerLoad( - PartialLoad loaded, Bucket bucket, boolean noSerialize, RequestStarterGroup requestStarters) { + PartialLoad loaded, + Bucket bucket, + File variantFile, + boolean encryptedVariant, + boolean backupVariant, + boolean noSerialize, + RequestStarterGroup requestStarters) { long length = bucket.size(); try (InputStream fis = bucket.getInputStream()) { - doLoadFromStream( + innerLoad( loaded, fis, length, !noSerialize && !loaded.doneSomething(), requestStarters, - noSerialize, - bucket); - } catch (IOException e) { + noSerialize); + } catch (Exception e) { // Mark this variant as failed so callers continue probing other backups/variants. loaded.setSomethingFailed(); - LOG.warn("I/O error reading persistence bucket {}: {}", bucket, e.toString()); + logVariantReadFailure(variantFile, encryptedVariant, backupVariant, e); } } - private void doLoadFromStream( - PartialLoad loaded, - InputStream fis, - long length, - boolean latest, - RequestStarterGroup requestStarters, - boolean noSerialize, - Bucket bucket) { - try { - innerLoad(loaded, fis, length, latest, requestStarters, noSerialize); - } catch (Exception t) { - LOG.error("Failed to deserialize persistent requests from {}: {}", bucket, t, t); - loaded.setSomethingFailed(); + private void logVariantReadFailure( + File variantFile, boolean encryptedVariant, boolean backupVariant, Exception failure) { + String variantType = backupVariant ? "backup" : "primary"; + String path = variantFile.getAbsolutePath(); + if (isExpectedUnreadableVariantFailure(failure, encryptedVariant)) { + if (LOG.isWarnEnabled()) { + String unreadableReason = summarizeUnreadableVariantFailure(failure, encryptedVariant); + LOG.warn( + "Skipping unreadable {} persistence variant {} (encrypted={}): {}", + variantType, + path, + encryptedVariant, + unreadableReason); + } + return; + } + if (failure instanceof IOException) { + if (LOG.isWarnEnabled()) { + String ioFailure = failure.toString(); + LOG.warn( + "I/O error reading {} persistence variant {} (encrypted={}): {}", + variantType, + path, + encryptedVariant, + ioFailure); + } + return; + } + LOG.error( + "Failed to deserialize {} persistence variant {} (encrypted={}): {}", + variantType, + path, + encryptedVariant, + failure, + failure); + } + + static boolean isExpectedUnreadableVariantFailure(Throwable failure, boolean encryptedVariant) { + if (failure == null) return false; + if (findThrowableInTree(failure, StreamCorruptedException.class) != null) { + return true; + } + return encryptedVariant + && findThrowableInTree(failure, AEADVerificationFailedException.class) != null; + } + + private static String summarizeUnreadableVariantFailure( + Throwable failure, boolean encryptedVariant) { + Throwable streamCorrupted = findThrowableInTree(failure, StreamCorruptedException.class); + if (streamCorrupted != null) return streamCorrupted.toString(); + if (encryptedVariant) { + Throwable authFailure = findThrowableInTree(failure, AEADVerificationFailedException.class); + if (authFailure != null) return authFailure.toString(); + } + return failure.toString(); + } + + private static T findThrowableInTree( + Throwable failure, Class targetType) { + Set visited = Collections.newSetFromMap(new IdentityHashMap<>()); + ArrayDeque queue = new ArrayDeque<>(); + queue.add(failure); + while (!queue.isEmpty()) { + Throwable current = queue.removeFirst(); + if (!visited.add(current)) continue; + if (targetType.isInstance(current)) { + return targetType.cast(current); + } + Throwable cause = current.getCause(); + if (cause != null) { + queue.addLast(cause); + } + for (Throwable suppressed : current.getSuppressed()) { + if (suppressed != null) { + queue.addLast(suppressed); + } + } } + return null; } private void innerLoad( diff --git a/src/main/java/network/crypta/node/Node.java b/src/main/java/network/crypta/node/Node.java index b357482611..11705a72a9 100644 --- a/src/main/java/network/crypta/node/Node.java +++ b/src/main/java/network/crypta/node/Node.java @@ -465,9 +465,12 @@ private void writeNodeFile(File orig, File backup) { try (FileOutputStream fos = new FileOutputStream(backup)) { fs.writeTo(fos); - FileUtil.moveTo(backup, orig); } catch (IOException ioe) { LOG.error("IOE :{}", ioe.getMessage(), ioe); + return; + } + if (!FileUtil.moveTo(backup, orig)) { + LOG.error("Failed to replace node file {} with backup {}", orig, backup); } } diff --git a/src/main/java/network/crypta/node/OpennetManager.java b/src/main/java/network/crypta/node/OpennetManager.java index d8f5b9adf6..0e2ae07bbb 100644 --- a/src/main/java/network/crypta/node/OpennetManager.java +++ b/src/main/java/network/crypta/node/OpennetManager.java @@ -112,6 +112,7 @@ public class OpennetManager { private final Announcer announcer; private final SeedAnnounceTracker seedTracker = new SeedAnnounceTracker(); + private final Object writeFileSync = new Object(); /* The routing table is split into "buckets" by distance, each of which has a separate LRU * list. For now there are only 2 buckets; the PETS paper suggested many buckets, but this @@ -516,16 +517,17 @@ public OpennetManager( /** * Persist current opennet crypto state to disk. * - *

The state is written to {@code opennet-} using a temporary backup file. Failures are - * intentionally swallowed to preserve runtime behavior, so callers should treat persistence as - * best-effort. This method does not synchronize on the manager; invoke it from safe contexts that - * do not race with crypto replacement or concurrent file writes. + *

The state is written to {@code opennet-} using a temporary backup file, then moved + * into place once the writer is closed. Failures are logged and the method returns without + * throwing, so callers should treat persistence as best-effort. */ public void writeFile() { - File nodeFile = node.nodeDir().file(OPENNET_FILE_PREFIX + crypto.getPortNumber()); - File backupNodeFile = - node.nodeDir().file(OPENNET_FILE_PREFIX + crypto.getPortNumber() + ".bak"); - writeFile(nodeFile, backupNodeFile); + synchronized (writeFileSync) { + File nodeFile = node.nodeDir().file(OPENNET_FILE_PREFIX + crypto.getPortNumber()); + File backupNodeFile = + node.nodeDir().file(OPENNET_FILE_PREFIX + crypto.getPortNumber() + ".bak"); + writeFile(nodeFile, backupNodeFile); + } } private void writeFile(File orig, File backup) { @@ -543,10 +545,13 @@ private void writeFile(File orig, File backup) { OutputStreamWriter osr = new OutputStreamWriter(fos, StandardCharsets.UTF_8); BufferedWriter bw = new BufferedWriter(osr)) { fs.writeTo(bw); - bw.flush(); // Ensure data is written before moving the file - FileUtil.moveTo(backup, orig); - } catch (IOException _) { - // Resources are automatically closed by try-with-resources + bw.flush(); + } catch (IOException e) { + LOG.error("Failed writing opennet backup {}: {}", backup, e, e); + return; + } + if (!FileUtil.moveTo(backup, orig)) { + LOG.error("Failed to replace opennet file {} with backup {}", orig, backup); } } diff --git a/src/main/java/network/crypta/support/io/FileUtil.java b/src/main/java/network/crypta/support/io/FileUtil.java index 182df9e6b0..ecfcd42c7a 100644 --- a/src/main/java/network/crypta/support/io/FileUtil.java +++ b/src/main/java/network/crypta/support/io/FileUtil.java @@ -50,6 +50,10 @@ public final class FileUtil { */ public static final int BUFFER_SIZE = 32 * 1024; + private static final int REPLACE_MOVE_MAX_ATTEMPTS = 5; + private static final long REPLACE_MOVE_BASE_BACKOFF_MILLIS = 50; + private static final long REPLACE_MOVE_MAX_BACKOFF_MILLIS = 400; + private static final Random SEED_GENERATOR = MersenneTwister.createSynchronized(NodeStarter.getGlobalSecureRandom().generateSeed(32)); @@ -507,6 +511,13 @@ public static boolean moveTo(File orig, File dest, boolean overwrite) { public static boolean moveTo(File orig, File dest) { Path source = orig.toPath(); Path target = dest.toPath(); + if (tryAtomicMove(source, target, orig, dest)) { + return true; + } + return moveWithReplaceRetries(source, target, orig, dest); + } + + private static boolean tryAtomicMove(Path source, Path target, File orig, File dest) { try { Files.move(source, target, StandardCopyOption.ATOMIC_MOVE); return true; @@ -517,17 +528,86 @@ public static boolean moveTo(File orig, File dest) { } catch (IOException e) { // On Windows this frequently fails when replacing an existing file; retry with // REPLACE_EXISTING before giving up. - LOG.warn("Atomic move failed for {} -> {}, retrying non-atomically: {}", orig, dest, e); + if (LOG.isWarnEnabled()) { + String atomicFailure = e.toString(); + LOG.warn( + "Atomic move failed for {} -> {}, retrying non-atomically: {}", + orig, + dest, + atomicFailure); + } + } + return false; + } + + private static boolean moveWithReplaceRetries(Path source, Path target, File orig, File dest) { + IOException lastFailure = null; + for (int attempt = 1; attempt <= REPLACE_MOVE_MAX_ATTEMPTS; attempt++) { + try { + Files.move(source, target, StandardCopyOption.REPLACE_EXISTING); + return true; + } catch (IOException e) { + lastFailure = e; + if (attempt == 1) { + LOG.warn( + "Replace-existing move failed for {} -> {} (attempt {}/{}), retrying: {}", + orig, + dest, + attempt, + REPLACE_MOVE_MAX_ATTEMPTS, + e.toString()); + } else if (attempt < REPLACE_MOVE_MAX_ATTEMPTS && LOG.isDebugEnabled()) { + LOG.debug( + "Replace-existing move retry failed for {} -> {} (attempt {}/{}): {}", + orig, + dest, + attempt, + REPLACE_MOVE_MAX_ATTEMPTS, + e.toString()); + } + if (attempt == REPLACE_MOVE_MAX_ATTEMPTS) break; + if (!sleepBeforeReplaceMoveRetry(orig, dest, attempt)) { + return false; + } + } } + LOG.error( + "Replace-existing move failed for {} -> {} after {} attempts: {}", + orig, + dest, + REPLACE_MOVE_MAX_ATTEMPTS, + lastFailure, + lastFailure); + return false; + } + + private static boolean sleepBeforeReplaceMoveRetry(File orig, File dest, int attempt) { + long backoffMillis = retryBackoffMillis(attempt); try { - Files.move(source, target, StandardCopyOption.REPLACE_EXISTING); + Thread.sleep(backoffMillis); return true; - } catch (IOException e) { - LOG.error("Replace-existing move failed for {} -> {}: {}", orig, dest, e, e); + } catch (InterruptedException interrupted) { + Thread.currentThread().interrupt(); + LOG.error( + "Interrupted while retrying replace-existing move for {} -> {} (attempt {}/{}).", + orig, + dest, + attempt, + REPLACE_MOVE_MAX_ATTEMPTS, + interrupted); return false; } } + private static long retryBackoffMillis(int failedAttempt) { + long backoff = REPLACE_MOVE_BASE_BACKOFF_MILLIS; + int shifts = failedAttempt - 1; + if (shifts > 0) { + backoff <<= Math.min(shifts, 30); + } + return Math.min(backoff, REPLACE_MOVE_MAX_BACKOFF_MILLIS); + } + /** * Produces a safe filename for the specified target operating system. * diff --git a/src/test/java/network/crypta/client/async/ClientLayerPersisterTest.java b/src/test/java/network/crypta/client/async/ClientLayerPersisterTest.java index 3f2c427bab..de431d0f69 100644 --- a/src/test/java/network/crypta/client/async/ClientLayerPersisterTest.java +++ b/src/test/java/network/crypta/client/async/ClientLayerPersisterTest.java @@ -10,7 +10,9 @@ import java.io.InputStream; import java.io.ObjectInputStream; import java.io.OutputStream; +import java.io.StreamCorruptedException; import network.crypta.clients.fcp.ClientRequest; +import network.crypta.crypt.AEADVerificationFailedException; import network.crypta.io.comm.IOStatisticCollector; import network.crypta.node.MasterKeysWrongPasswordException; import network.crypta.node.Node; @@ -74,7 +76,7 @@ private ClientLayerPersister newPersister() { // Temp buckets used by checksum writers/readers try { - when(tempBucketFactory.makeBucket(anyLong())).thenAnswer(inv -> new InMemoryBucket()); + when(tempBucketFactory.makeBucket(anyLong())).thenAnswer(_ -> new InMemoryBucket()); } catch (IOException e) { throw new RuntimeException(e); } @@ -105,12 +107,12 @@ void setFilesAndLoad_whenNoExisting_unencrypted_writesFile_and_setsSalt() throws null, // encryptionKey requestStarters); - // Wait for async checkpoint to complete + // Wait for the async checkpoint to complete persister.waitForNotWriting(); assertEquals(base, persister.getWriteFilename()); assertTrue(base.exists(), "main persistence file should exist after first save"); - // A backup may or may not be present depending on timing; only require main file. + // A backup may or may not be present depending on timing; only require the main file. // Header sanity: MAGIC then VERSION try (ObjectInputStream ois = new ObjectInputStream(new FileInputStream(base))) { @@ -145,7 +147,8 @@ void secondCheckpoint_whenWritten_movesPreviousToBackup() throws Exception { // Either a backup exists (expected when rotating), or the main file was rewritten // in place with a newer timestamp. boolean rotated = backupFile.exists(); - boolean rewritten = mainFile.lastModified() >= 0; // trivially true; rely on rotate in practice + boolean rewritten = + mainFile.lastModified() >= 0; // trivially true; rely on rotating in practice assertTrue(rotated || rewritten, "backup created or file rewritten"); } @@ -192,11 +195,36 @@ void setFilesAndLoad_whenEncryptedWithoutKey_throws() { requestStarters)); } + @Test + void isExpectedUnreadableVariantFailure_whenEncryptedAndAuthFails_expectTrue() { + AEADVerificationFailedException failure = new AEADVerificationFailedException(); + + assertTrue(ClientLayerPersister.isExpectedUnreadableVariantFailure(failure, true)); + assertFalse(ClientLayerPersister.isExpectedUnreadableVariantFailure(failure, false)); + } + + @Test + void isExpectedUnreadableVariantFailure_whenSuppressedStreamCorruption_expectTrue() { + IOException failure = new IOException("outer"); + failure.addSuppressed(new StreamCorruptedException("invalid stream header: 00000000")); + + assertTrue(ClientLayerPersister.isExpectedUnreadableVariantFailure(failure, true)); + assertTrue(ClientLayerPersister.isExpectedUnreadableVariantFailure(failure, false)); + } + + @Test + void isExpectedUnreadableVariantFailure_whenUnrelatedError_expectFalse() { + IOException failure = new IOException("boom"); + + assertFalse(ClientLayerPersister.isExpectedUnreadableVariantFailure(failure, true)); + assertFalse(ClientLayerPersister.isExpectedUnreadableVariantFailure(failure, false)); + } + // ---------- helpers ---------- private static void touch(File f) throws IOException { try (OutputStream os = new FileOutputStream(f)) { - // Opening the stream is sufficient to create/truncate the file; flush to satisfy + // Opening the stream is enough to create/truncate the file; flush to satisfy // static analysis so the try block isn’t empty. os.flush(); } @@ -236,6 +264,7 @@ public int getWaitingThreadsCount() { } /** Ticker that runs tasks immediately on the provided executor. */ + @SuppressWarnings("ClassCanBeRecord") private static final class InlineTicker implements Ticker { private final PriorityAwareExecutor exec; diff --git a/src/test/java/network/crypta/support/io/FileUtilTest.java b/src/test/java/network/crypta/support/io/FileUtilTest.java index 98a4fcde74..6c99cbeed7 100644 --- a/src/test/java/network/crypta/support/io/FileUtilTest.java +++ b/src/test/java/network/crypta/support/io/FileUtilTest.java @@ -304,6 +304,95 @@ void moveTo_whenIOExceptionOnAtomic_expectFallbackReplaceSucceeds(@TempDir Path } } + @Test + @DisplayName("moveTo_whenFallbackFailsTransiently_thenRetriesAndSucceeds") + void moveTo_whenFallbackFailsTransiently_thenRetriesAndSucceeds(@TempDir Path tmp) + throws Exception { + // Arrange + Path src = tmp.resolve("from-retry.bin"); + Path dst = tmp.resolve("to-retry.bin"); + Files.write(src, List.of("retry")); + + try (MockedStatic files = mockStatic(Files.class, Answers.CALLS_REAL_METHODS)) { + files + .when(() -> Files.move(eq(src), eq(dst), eq(StandardCopyOption.ATOMIC_MOVE))) + .thenThrow(new java.nio.file.AtomicMoveNotSupportedException("a", "b", "nope")); + files + .when(() -> Files.move(eq(src), eq(dst), eq(StandardCopyOption.REPLACE_EXISTING))) + .thenThrow(new IOException("locked-1")) + .thenThrow(new IOException("locked-2")) + .then(_ -> dst); + + // Act + boolean ok = FileUtil.moveTo(src.toFile(), dst.toFile()); + + // Assert + assertTrue(ok); + files.verify(() -> Files.move(src, dst, StandardCopyOption.REPLACE_EXISTING), times(3)); + } + } + + @Test + @DisplayName("moveTo_whenFallbackFailsAllRetries_expectFalse") + void moveTo_whenFallbackFailsAllRetries_expectFalse(@TempDir Path tmp) throws Exception { + // Arrange + Path src = tmp.resolve("from-fail.bin"); + Path dst = tmp.resolve("to-fail.bin"); + Files.write(src, List.of("fail")); + + try (MockedStatic files = mockStatic(Files.class, Answers.CALLS_REAL_METHODS)) { + files + .when(() -> Files.move(eq(src), eq(dst), eq(StandardCopyOption.ATOMIC_MOVE))) + .thenThrow(new java.nio.file.AtomicMoveNotSupportedException("a", "b", "nope")); + files + .when(() -> Files.move(eq(src), eq(dst), eq(StandardCopyOption.REPLACE_EXISTING))) + .thenThrow(new IOException("locked-1")) + .thenThrow(new IOException("locked-2")) + .thenThrow(new IOException("locked-3")) + .thenThrow(new IOException("locked-4")) + .thenThrow(new IOException("locked-5")); + + // Act + boolean ok = FileUtil.moveTo(src.toFile(), dst.toFile()); + + // Assert + assertFalse(ok); + files.verify(() -> Files.move(src, dst, StandardCopyOption.REPLACE_EXISTING), times(5)); + } + } + + @Test + @DisplayName("moveTo_whenInterruptedDuringRetryBackoff_expectFalseAndInterruptPreserved") + void moveTo_whenInterruptedDuringRetryBackoff_expectFalseAndInterruptPreserved(@TempDir Path tmp) + throws Exception { + // Arrange + Path src = tmp.resolve("from-interrupt.bin"); + Path dst = tmp.resolve("to-interrupt.bin"); + Files.write(src, List.of("interrupt")); + + try (MockedStatic files = mockStatic(Files.class, Answers.CALLS_REAL_METHODS)) { + files + .when(() -> Files.move(eq(src), eq(dst), eq(StandardCopyOption.ATOMIC_MOVE))) + .thenThrow(new java.nio.file.AtomicMoveNotSupportedException("a", "b", "nope")); + files + .when(() -> Files.move(eq(src), eq(dst), eq(StandardCopyOption.REPLACE_EXISTING))) + .thenThrow(new IOException("locked")); + + Thread.currentThread().interrupt(); + try { + // Act + boolean ok = FileUtil.moveTo(src.toFile(), dst.toFile()); + + // Assert + assertFalse(ok); + assertTrue(Thread.currentThread().isInterrupted()); + files.verify(() -> Files.move(src, dst, StandardCopyOption.REPLACE_EXISTING), times(1)); + } finally { + network.crypta.testsupport.SpotBugsTestSupport.ignoreValue(Thread.interrupted()); + } + } + } + @Test @DisplayName("moveTo_overwriteFalseAndDestExists_expectFalse") void moveTo_overwriteFalseAndDestExists_expectFalse(@TempDir Path tmp) throws Exception {