Feat/log compaction#19
Conversation
|
Warning Review limit reached
More reviews will be available in 3 minutes and 2 seconds. Learn how PR review limits work. Your organization has used up its prepaid credits, and credit purchases are no longer available. Enable the review add-on in the billing tab to keep reviews running — you're only billed for reviews past your plan's rate limits ($0.25/file). ⌛ How to resolve this issue?After more reviews become available, a review can be triggered using the To avoid repeated limits, reduce automatic review volume by pausing incremental auto-reviews earlier, using label-based review opt-in, excluding WIP or generated PR titles, or requesting reviews manually when the PR is ready. If your team needs uninterrupted high-volume reviews, an organization admin can enable usage-based credits. 🚦 How do rate limits work?CodeRabbit enforces per-developer PR review limits for each organization. Most developers receive the normal plan refill rate. For paid Pro and Pro+ PR reviews, CodeRabbit uses adaptive limits for sustained high-volume activity. When a developer's recent PR review activity reaches the 95th percentile or higher among CodeRabbit users, the refill rate gradually slows as usage increases. The highest same-day bursts are limited more strictly. Please see our Fair Usage Limits Policy for further information. ℹ️ Review info⚙️ Run configurationConfiguration used: defaults Review profile: CHILL Plan: Pro Run ID: ⛔ Files ignored due to path filters (1)
📒 Files selected for processing (13)
📝 WalkthroughWalkthroughThis PR adds Raft ChangesBroker: Raft Snapshots, Storage Refactor, and Telemetry Server
React Dashboard: Live Telemetry Visualization
Sequence Diagram(s)sequenceDiagram
participant LeaderRaftNode
participant SnapshotManager
participant MessageStore
participant RaftPeer
participant ClientHandler
participant FollowerRaftNode
rect rgba(59, 130, 246, 0.5)
Note over LeaderRaftNode,FollowerRaftNode: Snapshot creation and chunked send
LeaderRaftNode->>SnapshotManager: createSnapshot(lastApplied)
SnapshotManager->>MessageStore: lockForSnapshot(zipTask)
SnapshotManager-->>LeaderRaftNode: zipPath
loop each 256KB chunk
LeaderRaftNode->>RaftPeer: sendInstallSnapshot(chunk, offset, done)
RaftPeer->>ClientHandler: INSTALL_SNAPSHOT_REQUEST (TCP)
ClientHandler->>FollowerRaftNode: handleInstallSnapshot(request, async 10s)
FollowerRaftNode-->>ClientHandler: InstallSnapshotResponse(term)
ClientHandler-->>RaftPeer: INSTALL_SNAPSHOT_RESPONSE
end
end
rect rgba(16, 185, 129, 0.5)
Note over FollowerRaftNode: Snapshot apply on follower
FollowerRaftNode->>SnapshotManager: restoreSnapshot(zipFile)
SnapshotManager->>MessageStore: rebuild from extracted state
FollowerRaftNode->>FollowerRaftNode: raftLog compact/truncate, update lastApplied/commitIndex
end
sequenceDiagram
participant BrokerServer
participant TelemetryWebSocketServer
participant BrokerMetrics
participant WebSocketTelemetryProvider
participant Dashboard
BrokerServer->>TelemetryWebSocketServer: start(port+200)
loop every 1 second
TelemetryWebSocketServer->>BrokerMetrics: getCounterValueByType/getGaugeValue/getTimerMeanMs
TelemetryWebSocketServer->>TelemetryWebSocketServer: buildTelemetryPayload() → JSON
TelemetryWebSocketServer->>WebSocketTelemetryProvider: broadcast JSON frame
WebSocketTelemetryProvider->>WebSocketTelemetryProvider: merge(allFrames) by leader/term
WebSocketTelemetryProvider->>Dashboard: onData(mergedTelemetryState)
Dashboard->>Dashboard: re-render with live metrics/topology
end
Estimated code review effort🎯 5 (Critical) | ⏱️ ~120 minutes Possibly related PRs
Poem
✨ Finishing Touches🧪 Generate unit tests (beta)
|
There was a problem hiding this comment.
Actionable comments posted: 16
Note
Due to the large number of review comments, Critical, Major severity comments were prioritized as inline comments.
🟡 Minor comments (11)
drmq-broker/src/main/java/com/drmq/broker/raft/SnapshotManager.java-166-174 (1)
166-174:⚠️ Potential issue | 🟡 Minor | ⚡ Quick winSilent failure in
deleteDirectorymay leave partial state.The
Files.delete(p)at Line 171 silently catches and ignoresIOException. If deletion fails (e.g., file locked on Windows, permission issues),restoreSnapshotwill proceed to extract the new snapshot on top of partially-deleted old state, potentially causing corruption or inconsistent state.Consider either propagating the exception or at least logging a warning.
Proposed fix
private void deleteDirectory(Path path) throws IOException { if (!Files.exists(path)) return; try (Stream<Path> walk = Files.walk(path)) { walk.sorted(java.util.Comparator.reverseOrder()) .forEach(p -> { - try { Files.delete(p); } catch (IOException ignored) {} + try { + Files.delete(p); + } catch (IOException e) { + logger.warn("Failed to delete {} during snapshot restore: {}", p, e.getMessage()); + } }); } }🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@drmq-broker/src/main/java/com/drmq/broker/raft/SnapshotManager.java` around lines 166 - 174, In the deleteDirectory method, the catch block at the forEach loop is silently ignoring IOException when Files.delete(p) fails, which allows partial deletion state to go undetected and cause inconsistencies during snapshot restoration. Replace the empty catch block with proper error handling by either propagating the exception so deleteDirectory throws IOException (and letting the caller handle it), or log a warning message that includes the path and error details before continuing. This ensures deletion failures are not silently masked and the caller can determine if the operation succeeded or failed.run_cluster.sh-22-22 (1)
22-22:⚠️ Potential issue | 🟡 Minor | ⚡ Quick winQuote the working-directory argument in
open_tab.Line 22 uses
cd $dirwithout quoting; paths containing spaces/special chars will fail.Suggested fix
-gnome-terminal --tab --title="$title" -- bash -c "cd $dir && echo -e '\033]0;$title\007' && $cmd; exec bash" +gnome-terminal --tab --title="$title" -- bash -c "cd \"$dir\" && echo -e '\033]0;$title\007' && $cmd; exec bash"🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@run_cluster.sh` at line 22, The working directory variable in the cd command is not quoted, which will cause the command to fail if the directory path contains spaces or special characters. In the line with the gnome-terminal command, change cd $dir to cd "$dir" to properly quote the directory path argument so it is treated as a single unit regardless of spaces or special characters it may contain.load_test.sh-18-18 (1)
18-18:⚠️ Potential issue | 🟡 Minor | ⚡ Quick winSwitch load test launcher to Maven wrapper.
Line 18 uses
mvndirectly; using../mvnw -pl drmq-clientmakes this script work in wrapper-only environments and aligns with the rest of this PR’s tooling changes.Suggested fix
-done | mvn exec:java -Dexec.mainClass="com.drmq.client.commandLineExample.ProducerApp" -Dexec.args="localhost:9092,localhost:9093,localhost:9094" +done | ../mvnw -pl drmq-client exec:java -Dexec.mainClass="com.drmq.client.commandLineExample.ProducerApp" -Dexec.args="localhost:9092,localhost:9093,localhost:9094"🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@load_test.sh` at line 18, Replace the direct `mvn` command on line 18 with the Maven wrapper `../mvnw -pl drmq-client` to ensure the script works in wrapper-only environments and maintains consistency with other tooling changes in the PR. Specifically, change `mvn exec:java` to `../mvnw -pl drmq-client exec:java` while keeping all other arguments (mainClass and exec args) unchanged.run_cluster.sh-37-37 (1)
37-37:⚠️ Potential issue | 🟡 Minor | ⚡ Quick winUse the Maven wrapper for the consumer tab as well.
Line 37 invokes
mvndirectly, which breaks on environments that rely on wrapper-only setup used elsewhere in this script.Suggested fix
-open_tab "DRMQ Consumer" "$(pwd)/drmq-client" "echo -e 'subscribe load-test-topic\nstream' | mvn exec:java -Dexec.mainClass=\"com.drmq.client.commandLineExample.ConsumerApp\" -Dexec.args=\"localhost:9092,localhost:9093,localhost:9094 group-1\"" +open_tab "DRMQ Consumer" "$(pwd)/drmq-client" "echo -e 'subscribe load-test-topic\nstream' | ../mvnw -pl drmq-client exec:java -Dexec.mainClass=\"com.drmq.client.commandLineExample.ConsumerApp\" -Dexec.args=\"localhost:9092,localhost:9093,localhost:9094 group-1\""🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@run_cluster.sh` at line 37, The open_tab command for the DRMQ Consumer is using mvn directly instead of the Maven wrapper, which is inconsistent with the rest of the script and breaks on wrapper-only environments. Replace the mvn invocation with ./mvnw in the ConsumerApp command to use the Maven wrapper consistently throughout the script.drmq-dashboard/src/services/telemetry/WebSocketTelemetryProvider.ts-98-101 (1)
98-101:⚠️ Potential issue | 🟡 Minor | ⚡ Quick win
merge()has an invalid empty-state return path.Line 100 returns
frames[0]whenframes.length === 0, which isundefined. Use an explicit failure or safe fallback state instead of returning an invalid telemetry object.Suggested fix
- if (frames.length === 0) return frames[0]; + if (frames.length === 0) { + throw new Error('[DRMQ] merge() called without telemetry frames'); + }🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@drmq-dashboard/src/services/telemetry/WebSocketTelemetryProvider.ts` around lines 98 - 101, The merge() method has a logic error where it returns frames[0] when frames.length === 0, which will be undefined since the array is empty. Fix this by replacing the return statement in the empty frames condition to return an explicit safe fallback state or default TelemetryState object instead of attempting to access frames[0].drmq-dashboard/src/useClusterTelemetry.ts-16-20 (1)
16-20:⚠️ Potential issue | 🟡 Minor | ⚡ Quick winFilter blank WebSocket URLs before provider initialization.
split(',').map(trim)can produce empty strings (common with trailing commas), which then trigger repeated failed reconnect attempts.Suggested fix
- const wsUrls = wsUrlsString.split(',').map((u: string) => u.trim()); + const parsedWsUrls = wsUrlsString + .split(',') + .map((u: string) => u.trim()) + .filter((u: string) => u.length > 0); + const wsUrls = parsedWsUrls.length > 0 + ? parsedWsUrls + : defaultUrls.split(',').map((u: string) => u.trim());🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@drmq-dashboard/src/useClusterTelemetry.ts` around lines 16 - 20, The WebSocket URL parsing in the useClusterTelemetry.ts file can produce empty strings when splitting and trimming URLs (due to trailing commas or consecutive delimiters), which then get passed to the WebSocketTelemetryProvider constructor and cause repeated failed reconnect attempts. After the split and trim operations on wsUrlsString, add a filter method to remove any empty strings before assigning the result to wsUrls and passing it to the new WebSocketTelemetryProvider constructor. This ensures only valid, non-empty URLs are included in the provider initialization.drmq-dashboard/src/services/telemetry/MockTelemetryProvider.ts-70-91 (1)
70-91:⚠️ Potential issue | 🟡 Minor | ⚡ Quick winMock health never leaves
OPTIMALwith the current error-rate logic.Line 70 caps
errorRateat 2, but Line 91 requires> 5forDEGRADED, so degradation is unreachable in simulation.Suggested fix
- health: errorRate > 5 ? 'DEGRADED' : 'OPTIMAL', + health: errorRate > 0 ? 'DEGRADED' : 'OPTIMAL',🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@drmq-dashboard/src/services/telemetry/MockTelemetryProvider.ts` around lines 70 - 91, The errorRate calculation in MockTelemetryProvider caps the maximum value at 2 (using Math.floor(Math.random() * 3)), but the health status check requires errorRate > 5 to transition to 'DEGRADED', making this state unreachable. Modify the errorRate calculation to use a higher multiplier (change Math.random() * 3 to a larger value) so that errorRate can exceed 5 and allow the health status to transition from 'OPTIMAL' to 'DEGRADED' based on the existing condition.drmq-dashboard/src/pages/Documentation.tsx-259-271 (1)
259-271:⚠️ Potential issue | 🟡 Minor | ⚡ Quick winUse Maven wrapper commands consistently in docs.
This repo introduces
mvnw; docs still showmvnin several snippets. Keeping wrapper usage consistent avoids requiring globally installed Maven and keeps toolchain parity.Suggested fix pattern
- mvn clean install -DskipTests + ./mvnw clean install -DskipTests- mvn exec:java + ./mvnw exec:javaAlso applies to: 280-290, 862-883
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@drmq-dashboard/src/pages/Documentation.tsx` around lines 259 - 271, Replace all instances of the `mvn` command with `mvnw` in the documentation code snippets to maintain consistency with the Maven wrapper toolchain. Specifically, update the bash code blocks containing `mvn clean install -DskipTests` and `mvn exec:java` commands to use `mvnw` instead. Apply this change throughout the Documentation.tsx file wherever Maven commands appear in CodeBlock components, including the sections at lines 259-271, 280-290, and 862-883 mentioned in the comment, to ensure developers use the wrapper and avoid requiring globally installed Maven.drmq-dashboard/src/pages/Documentation.tsx-381-382 (1)
381-382:⚠️ Potential issue | 🟡 Minor | ⚡ Quick winTelemetry WebSocket port docs conflict with the implemented contract.
The page documents
9093, but telemetry wiring and default dashboard URLs are based onbrokerPort + 200(e.g.,9292/9293/9294for brokers9092/9093/9094). This will mislead users during setup.Suggested fix
- ['TELEMETRY_WS_PORT', '9093', 'WebSocket port on each broker node that streams telemetry JSON frames to the dashboard.'], + ['TELEMETRY_WS_PORT', 'brokerPort + 200', 'WebSocket port on each broker node that streams telemetry JSON frames to the dashboard (e.g. 9292 when broker port is 9092).'],- Every DRMQ broker runs an embedded WebSocket server (port <code className="text-cyan-400 text-sm">9093</code> by default) + Every DRMQ broker runs an embedded WebSocket server (port <code className="text-cyan-400 text-sm">brokerPort + 200</code>, e.g. <code className="text-cyan-400 text-sm">9292</code> when broker port is 9092)Also applies to: 770-771
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@drmq-dashboard/src/pages/Documentation.tsx` around lines 381 - 382, The TELEMETRY_WS_PORT documentation entry at line 381 shows a hardcoded port value of 9093, but the actual implementation uses brokerPort + 200 as the formula for calculating the telemetry port. Update the value in the map array entry for TELEMETRY_WS_PORT to reflect the actual implementation pattern instead of the hardcoded port number, and adjust the description to clarify how the port is calculated based on the broker port. Also check line 770-771 for the same issue and apply the same fix there.drmq-broker/src/test/java/com/drmq/broker/raft/SnapshotManagerTest.java-37-42 (1)
37-42:⚠️ Potential issue | 🟡 Minor | ⚡ Quick winClose
OffsetManagerin teardown as well.
OffsetManageris initialized in setup but never closed, which can leak resources across test runs.Proposed fix
`@AfterEach` void tearDown() throws IOException { + if (offsetManager != null) { + offsetManager.close(); + } if (logManager != null) { logManager.close(); } }🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@drmq-broker/src/test/java/com/drmq/broker/raft/SnapshotManagerTest.java` around lines 37 - 42, In the tearDown method of SnapshotManagerTest, add a null check and close call for offsetManager, similar to how logManager is being closed. After the logManager closure block, add another if statement to check if offsetManager is not null and call offsetManager.close() to prevent resource leaks across test runs.drmq-broker/src/test/java/com/drmq/broker/raft/RaftLogTest.java-21-24 (1)
21-24:⚠️ Potential issue | 🟡 Minor | ⚡ Quick winClose
RaftLogin test teardown to prevent file-handle leaks.
RaftLogopens a file-backedRandomAccessFile; without teardown close, tests can leak descriptors and cause flaky cleanup on some platforms.Proposed fix
+ `@AfterEach` + void tearDown() throws IOException { + if (raftLog != null) { + raftLog.close(); + } + }🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@drmq-broker/src/test/java/com/drmq/broker/raft/RaftLogTest.java` around lines 21 - 24, The RaftLog instance created in the setUp method opens a file-backed RandomAccessFile but is never closed, causing file handle leaks that can cause test flakiness on some platforms. Add an `@AfterEach` teardown method that properly closes the raftLog instance to ensure resources are released after each test, preventing descriptor leaks and ensuring clean test isolation.
🧹 Nitpick comments (6)
drmq-broker/src/main/java/com/drmq/broker/raft/RaftPeer.java (1)
175-207: 💤 Low valueMissing rate-limited logging for InstallSnapshot failures.
Other RPC methods (
sendRequestVote,sendPreVote,sendAppendEntries) uselastXxxFailureLogTimefields to rate-limit failure logging. The newsendInstallSnapshotmethod logs every failure at Line 199, which could spam logs during network partitions.🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@drmq-broker/src/main/java/com/drmq/broker/raft/RaftPeer.java` around lines 175 - 207, The sendInstallSnapshot method logs failures on every exception at line 199 without rate-limiting, which can spam logs during network issues. Add a lastInstallSnapshotFailureLogTime field to the RaftPeer class following the pattern of similar fields used in other RPC methods like sendRequestVote and sendAppendEntries. Then in the catch block of sendInstallSnapshot, check this field before logging the debug message and only log if sufficient time has elapsed since the last failure was logged, using the same rate-limiting logic as the other RPC methods in this class.drmq-broker/src/main/java/com/drmq/broker/raft/RaftNode.java (1)
857-858: 💤 Low valuePotential arithmetic issue with retention calculation.
retentionLimit = lastApplied - (raftCompactThreshold * 100)could become negative or cause unexpected behavior whenlastAppliedis small. For example, with defaultraftCompactThreshold = 1000, the retention buffer is 100,000 entries. IflastApplied < 100,000,retentionLimitbecomes negative, which is handled correctly by laterMath.maxcalls, but the logic could be clearer.🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@drmq-broker/src/main/java/com/drmq/broker/raft/RaftNode.java` around lines 857 - 858, The retention calculation for retentionLimit in the RaftNode class can produce negative values when lastApplied is smaller than the retention buffer (raftCompactThreshold * 100), which may lead to unexpected behavior even if later Math.max calls handle it. To make the logic clearer and prevent negative retention limits, apply Math.max directly in the retentionLimit calculation to ensure it never goes below zero, explicitly enforcing that the retention buffer cannot be negative. This clarifies the intent and makes the code more defensive without relying on downstream Math.max calls to correct invalid intermediate values.drmq-broker/src/main/java/com/drmq/broker/MessageStore.java (2)
463-479: ⚖️ Poor tradeoffIndex cleanup uses stale
baseOffsetafter segment removal.After removing the segment from the map at Line 464, Line 472 queries
segments.higherKey(baseOffset). However,baseOffsetis the key of the segment that was just removed, sohigherKey(baseOffset)will correctly find the next key. But this is accessingsegmentsconcurrently withcleanupOldSegments()running—while theLogManager.getAllSegments()returns the live map references.If another thread concurrently adds/removes segments, the
subMap().clear()at Line 474 could throwConcurrentModificationExceptionor clear unexpected ranges. Consider holding a lock or working on a snapshot of the keys.🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@drmq-broker/src/main/java/com/drmq/broker/MessageStore.java` around lines 463 - 479, The concurrent access to the `segments` map during index cleanup in the deletion loop creates a race condition. After removing a segment from the `segments` map, the subsequent call to `segments.higherKey(baseOffset)` and the `index.subMap(baseOffset, endOffset).clear()` operation are vulnerable to concurrent modifications by other threads. To fix this, acquire a lock on the `segments` map before entering the loop over `toDelete`, or alternatively, create a snapshot of the keys before iterating and perform the higherKey lookup on that snapshot instead of the live map. Ensure the entire segment removal and index cleanup operation is atomic to prevent ConcurrentModificationException and unexpected behavior from concurrent segment additions or removals.
90-112: ⚡ Quick winRecovery reads past segment boundary if segment file is corrupted or truncated.
In
recoverInternal(), the loop at Line 94 reads untilposition < segmentSize, but if the last message is partially written (corrupted),segment.read(position)could throw anIOExceptionthat propagates up and aborts recovery of all remaining segments. Consider catching the exception per-segment and continuing with subsequent segments, logging a warning for the corrupted one.Suggested approach
- } catch (IOException ioe) { - logger.error("Error recovering topic {} segment {}: {}", topic, segment.getFilePath(), ioe.getMessage(), ioe); - throw ioe; + } catch (IOException ioe) { + logger.warn("Error recovering topic {} segment {} (continuing with next segment): {}", + topic, segment.getFilePath(), ioe.getMessage(), ioe); + // Continue with next segment rather than aborting entire recovery }🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@drmq-broker/src/main/java/com/drmq/broker/MessageStore.java` around lines 90 - 112, The exception handling in the recoverInternal() method is too strict - when a corrupted or truncated message causes segment.read(position) to throw an IOException, the exception is immediately rethrown, which aborts recovery of all remaining segments. Instead, modify the exception handling to catch the IOException at the inner loop level where segment.read(position) is called, log a warning message about the corrupted segment, and then break out of the message-reading loop to continue recovery of the next segment. This allows recovery to be resilient to individual segment corruption rather than failing the entire recovery process.drmq-broker/src/main/java/com/drmq/broker/BrokerServer.java (1)
133-133: 💤 Low valueConsider the memory implications of 256 MB max frame size.
The frame decoder now allows up to 256 MB per message, which is necessary for large snapshot transfers. However, this means each active channel can potentially allocate up to 256 MB for buffering. Under adversarial or bursty load, this could cause memory pressure.
This is acceptable given the snapshot use case, but consider documenting this limit or adding JVM heap sizing guidance for production deployments.
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@drmq-broker/src/main/java/com/drmq/broker/BrokerServer.java` at line 133, The LengthFieldBasedFrameDecoder in the BrokerServer class is configured with a 256 MB maximum frame size, which could impact memory allocation per channel under heavy load. Add a code comment above the decoder configuration explaining that this limit is necessary to support large snapshot transfers, document the potential memory implications (each active channel can buffer up to 256 MB), and reference or create production deployment guidance that includes JVM heap sizing recommendations to account for this buffer allocation when multiple concurrent channels are active.drmq-broker/src/test/java/com/drmq/broker/raft/RaftLogTest.java (1)
43-65: ⚡ Quick winAdd reopen assertions so compaction is verified as durable, not just in-memory.
This test currently checks post-
compact()state on the same instance. ReopeningRaftLogand reassertingstartIndex/entry visibility will directly validate the on-disk compaction contract.Proposed enhancement
raftLog.compact(5); assertEquals(6, raftLog.getStartIndex()); assertEquals(10, raftLog.getLastIndex()); @@ assertEquals(6, raftLog.getEntry(6).getIndex()); assertEquals(10, raftLog.getEntry(10).getIndex()); + + // Reopen and verify compaction persisted to disk + raftLog.close(); + raftLog = new RaftLog(tempDir); + assertEquals(6, raftLog.getStartIndex()); + assertEquals(10, raftLog.getLastIndex()); + assertNull(raftLog.getEntry(5)); + assertNotNull(raftLog.getEntry(6));🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@drmq-broker/src/test/java/com/drmq/broker/raft/RaftLogTest.java` around lines 43 - 65, The testDiskCompaction() method currently verifies compaction only in the in-memory state of the raftLog instance. To validate that compaction is durable and persisted to disk, add assertions after the existing post-compact checks that reopen the raftLog instance and reassert the startIndex, lastIndex, and entry availability. After the current assertions involving getStartIndex(), getLastIndex(), and getEntry() calls, close the current raftLog instance, create a new instance by reopening it from disk, and then repeat the same assertions to confirm that the compacted state (startIndex of 6, unavailability of entries 4-5, and availability of entries 6-10) persists across instances.
🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
Inline comments:
In @.mvn/wrapper/maven-wrapper.properties:
- Around line 1-3: The maven-wrapper.properties file is missing integrity
verification for the Maven distribution download. Add the distributionSha512Sum
property with the official Apache Maven 3.8.7 SHA512 checksum value to the
properties file alongside the existing distributionUrl and distributionType
entries. This ensures the downloaded Maven binary is verified against Apache's
official published checksum before use.
In `@drmq-broker/src/main/java/com/drmq/broker/BrokerConfig.java`:
- Around line 63-69: Add bounds validation to the setter methods for log and
compaction settings to ensure they only accept positive values. In the
setLogSegmentBytes method, add a check to verify that logSegmentBytes is greater
than 0 before assignment, throwing an appropriate exception if validation fails.
Apply the same validation pattern to the setLogRetentionMs method to ensure
logRetentionMs is positive. Additionally, apply identical validation logic to
the raft-compact-threshold setter method (referenced at lines 164-170) to ensure
consistent bounds checking across all long-valued configuration settings and
prevent zero or negative values that could cause storage instability.
- Line 61: The raftCompactThreshold configuration value is parsed and exposed
via the getter in BrokerConfig but is not actually applied when constructing the
RaftNode instance. In BrokerServer where the RaftNode is instantiated (around
the constructor call), you need to pass the raftCompactThreshold value from
BrokerConfig to the RaftNode constructor instead of allowing it to use the
hardcoded default value of 1000L. Either use an overloaded RaftNode constructor
that accepts the raftCompactThreshold parameter, or if no such constructor
exists, modify the existing constructor call to include this configuration value
as an argument.
In `@drmq-broker/src/main/java/com/drmq/broker/MessageStore.java`:
- Line 33: The cleanerScheduler ScheduledExecutorService field is initialized
but never shut down, causing thread leaks when MessageStore is closed or
recreated. Add a close() method to the MessageStore class that calls shutdown()
on the cleanerScheduler field to properly terminate the executor thread.
Consider implementing Closeable or AutoCloseable interface to ensure proper
resource management. Also review the code section at lines 57-65 mentioned in
the comment to identify any other resources that need cleanup in the close()
method.
In `@drmq-broker/src/main/java/com/drmq/broker/persistence/LogSegment.java`:
- Around line 29-31: The LogSegment constructor parses the base offset from the
filename using Long.parseLong, which throws an unchecked NumberFormatException
if the filename is malformed. Since callers handle IOException during discovery
but not NumberFormatException, malformed segment files can crash recovery. Wrap
the filename parsing logic (the Long.parseLong call that extracts baseOffset
from the fileName substring) in a try-catch block that catches
NumberFormatException and rethrows it as an IOException with a descriptive
message indicating the filename was invalid.
In `@drmq-broker/src/main/java/com/drmq/broker/raft/RaftLog.java`:
- Around line 234-238: In the file swap operation within RaftLog, modify the
Files.move call to include the ATOMIC_MOVE option instead of only using
REPLACE_EXISTING to ensure atomic file replacement and prevent data loss on
process crash. Additionally, wrap the RandomAccessFile initialization (new
RandomAccessFile at line 237) in a try-catch block to handle potential failures
gracefully, ensuring that if reopening fails, the error is caught and handled
appropriately rather than leaving the raf field in a closed but unrecovered
state.
In `@drmq-broker/src/main/java/com/drmq/broker/raft/RaftNode.java`:
- Around line 1201-1212: The snapshot receive state initialization in the
RaftNode class only resets when request.getOffset() equals 0, but doesn't
account for snapshot transfers being interrupted by leader changes or term
changes with a different snapshot index. Add validation to check if the
request's lastIncludedIndex differs from the currently expected snapshot index
(track this in a field like expectedSnapshotIndex), and reset the
snapshotReceiveStream, snapshotTempFile, and snapshotReceiveOffset state not
only when offset is 0 but also when lastIncludedIndex changes, ensuring stale
state doesn't interfere with new snapshot transfers from different leaders or
different snapshot indices.
- Around line 683-688: The snapshotTerm assignment using
raftLog.getTermAt(snapshotIndex) returns 0 when the snapshot index has been
previously compacted because the corresponding log entry is no longer available
in the raftLog. To fix this, either store the term of lastApplied alongside it
in persistent state during snapshotting and restore both values together, or
retrieve the term from the last available log entry before compaction rather
than querying a term that no longer exists in raftLog. The key is ensuring
snapshotTerm reflects the actual term of the last included index, not 0, so that
when sendInstallSnapshotToPeer() is called, the correct lastIncludedTerm is sent
in the InstallSnapshot RPC.
In `@drmq-broker/src/main/java/com/drmq/broker/TelemetryWebSocketServer.java`:
- Line 31: The `throughputHistory` ArrayList field is accessed by multiple
threads without synchronization, causing potential
ConcurrentModificationException. Replace the ArrayList with a thread-safe
collection like CopyOnWriteArrayList, or alternatively add synchronized blocks
around all accesses to `throughputHistory` in both the `updateRates()` method
(where it's modified at lines 142-143) and the `buildTelemetryPayload()` method
(where it's iterated at lines 244-246). Ensure the same synchronization strategy
is applied consistently across all access points.
In `@drmq-broker/src/test/java/com/drmq/broker/raft/RaftNodeTest.java`:
- Around line 116-157: The test handlesInstallSnapshotFromValidLeader uses fake
non-ZIP byte strings instead of real ZIP data, so the snapshot restoration fails
silently and the test doesn't actually verify successful snapshot application.
Replace the fake strings passed to setData() in chunk1 and chunk2 with a real
in-memory ZIP file that is properly split across the two chunks based on the
offset values (chunk1 at offset 0 with the first part, chunk2 at offset 20 with
the remaining part). This ensures the test exercises the actual snapshot
restoration and state advancement path rather than relying on exception
handling.
- Around line 185-197: The test method
testLogCompactionTriggersOnHighCommitIndex in the RaftNodeTest class is a
placeholder with only comments and no actual test logic or assertions, which
creates false confidence in test coverage. Either remove this test method
entirely or disable it using the `@Disabled` annotation if implementation is
planned for later, ensuring that only meaningful tests that verify actual
behavior are included in the test suite.
In `@drmq-dashboard/src/App.tsx`:
- Around line 7-12: The Sidebar component (and similarly Dashboard at lines
58-67) is calling useClusterTelemetry() independently, which creates duplicate
WebSocket connections. Move the useClusterTelemetry() hook call to the parent
component (App) and pass the telemetryState as props to both Sidebar and
Dashboard components instead of each calling the hook separately. This ensures a
single telemetry subscription is shared between both components.
- Around line 29-42: The navigation links in the Link components (with to="/"
and to="/docs") hide their text labels on small screens using the "hidden
lg:block" class, making them icon-only and inaccessible to screen readers. Add
aria-label attributes to each Link component to provide explicit accessible
names that match the hidden text content - "Telemetry" for the first link and
"Documentation" for the second link - ensuring screen readers can properly
convey the navigation options regardless of viewport size.
In `@drmq-dashboard/src/pages/Dashboard.tsx`:
- Around line 125-135: The Dashboard component currently treats any null
telemetryState as a "connecting" state with no way to distinguish between an
active connection attempt and a permanent failure. Update the
useClusterTelemetry hook and WebSocketTelemetryProvider to track and expose an
explicit error or timeout state (e.g., a failed connection flag or error
message), then modify the Dashboard component's conditional logic to check for
this error state separately from the loading state, rendering an appropriate
error message when the connection has failed rather than spinning indefinitely.
- Line 1: Remove the unused icon imports from the lucide-react import statement
in the Dashboard.tsx file. Specifically, delete `LayoutDashboard`, `Database`,
`Network`, and `Server` from the import statement since these icons are not
referenced anywhere in the Dashboard component and will cause the noUnusedLocals
build check to fail.
In `@run_cluster.sh`:
- Around line 6-10: The pkill commands in the run_cluster.sh cleanup section use
overly broad patterns (such as "vite", "ConsumerApp", "ProducerApp",
"load_test.sh", and "drmq-broker") that can inadvertently terminate unrelated
processes running on the developer's machine. To fix this, scope each pkill
pattern to be more specific to this stack only—for example, by incorporating
stack-specific identifiers like environment variables, specific port numbers, or
unique process arguments that distinguish this stack's processes from other
unrelated ones running on the same machine. Modify each pkill -f command to use
more targeted patterns that reduce the risk of killing unrelated processes.
---
Minor comments:
In `@drmq-broker/src/main/java/com/drmq/broker/raft/SnapshotManager.java`:
- Around line 166-174: In the deleteDirectory method, the catch block at the
forEach loop is silently ignoring IOException when Files.delete(p) fails, which
allows partial deletion state to go undetected and cause inconsistencies during
snapshot restoration. Replace the empty catch block with proper error handling
by either propagating the exception so deleteDirectory throws IOException (and
letting the caller handle it), or log a warning message that includes the path
and error details before continuing. This ensures deletion failures are not
silently masked and the caller can determine if the operation succeeded or
failed.
In `@drmq-broker/src/test/java/com/drmq/broker/raft/RaftLogTest.java`:
- Around line 21-24: The RaftLog instance created in the setUp method opens a
file-backed RandomAccessFile but is never closed, causing file handle leaks that
can cause test flakiness on some platforms. Add an `@AfterEach` teardown method
that properly closes the raftLog instance to ensure resources are released after
each test, preventing descriptor leaks and ensuring clean test isolation.
In `@drmq-broker/src/test/java/com/drmq/broker/raft/SnapshotManagerTest.java`:
- Around line 37-42: In the tearDown method of SnapshotManagerTest, add a null
check and close call for offsetManager, similar to how logManager is being
closed. After the logManager closure block, add another if statement to check if
offsetManager is not null and call offsetManager.close() to prevent resource
leaks across test runs.
In `@drmq-dashboard/src/pages/Documentation.tsx`:
- Around line 259-271: Replace all instances of the `mvn` command with `mvnw` in
the documentation code snippets to maintain consistency with the Maven wrapper
toolchain. Specifically, update the bash code blocks containing `mvn clean
install -DskipTests` and `mvn exec:java` commands to use `mvnw` instead. Apply
this change throughout the Documentation.tsx file wherever Maven commands appear
in CodeBlock components, including the sections at lines 259-271, 280-290, and
862-883 mentioned in the comment, to ensure developers use the wrapper and avoid
requiring globally installed Maven.
- Around line 381-382: The TELEMETRY_WS_PORT documentation entry at line 381
shows a hardcoded port value of 9093, but the actual implementation uses
brokerPort + 200 as the formula for calculating the telemetry port. Update the
value in the map array entry for TELEMETRY_WS_PORT to reflect the actual
implementation pattern instead of the hardcoded port number, and adjust the
description to clarify how the port is calculated based on the broker port. Also
check line 770-771 for the same issue and apply the same fix there.
In `@drmq-dashboard/src/services/telemetry/MockTelemetryProvider.ts`:
- Around line 70-91: The errorRate calculation in MockTelemetryProvider caps the
maximum value at 2 (using Math.floor(Math.random() * 3)), but the health status
check requires errorRate > 5 to transition to 'DEGRADED', making this state
unreachable. Modify the errorRate calculation to use a higher multiplier (change
Math.random() * 3 to a larger value) so that errorRate can exceed 5 and allow
the health status to transition from 'OPTIMAL' to 'DEGRADED' based on the
existing condition.
In `@drmq-dashboard/src/services/telemetry/WebSocketTelemetryProvider.ts`:
- Around line 98-101: The merge() method has a logic error where it returns
frames[0] when frames.length === 0, which will be undefined since the array is
empty. Fix this by replacing the return statement in the empty frames condition
to return an explicit safe fallback state or default TelemetryState object
instead of attempting to access frames[0].
In `@drmq-dashboard/src/useClusterTelemetry.ts`:
- Around line 16-20: The WebSocket URL parsing in the useClusterTelemetry.ts
file can produce empty strings when splitting and trimming URLs (due to trailing
commas or consecutive delimiters), which then get passed to the
WebSocketTelemetryProvider constructor and cause repeated failed reconnect
attempts. After the split and trim operations on wsUrlsString, add a filter
method to remove any empty strings before assigning the result to wsUrls and
passing it to the new WebSocketTelemetryProvider constructor. This ensures only
valid, non-empty URLs are included in the provider initialization.
In `@load_test.sh`:
- Line 18: Replace the direct `mvn` command on line 18 with the Maven wrapper
`../mvnw -pl drmq-client` to ensure the script works in wrapper-only
environments and maintains consistency with other tooling changes in the PR.
Specifically, change `mvn exec:java` to `../mvnw -pl drmq-client exec:java`
while keeping all other arguments (mainClass and exec args) unchanged.
In `@run_cluster.sh`:
- Line 22: The working directory variable in the cd command is not quoted, which
will cause the command to fail if the directory path contains spaces or special
characters. In the line with the gnome-terminal command, change cd $dir to cd
"$dir" to properly quote the directory path argument so it is treated as a
single unit regardless of spaces or special characters it may contain.
- Line 37: The open_tab command for the DRMQ Consumer is using mvn directly
instead of the Maven wrapper, which is inconsistent with the rest of the script
and breaks on wrapper-only environments. Replace the mvn invocation with ./mvnw
in the ConsumerApp command to use the Maven wrapper consistently throughout the
script.
---
Nitpick comments:
In `@drmq-broker/src/main/java/com/drmq/broker/BrokerServer.java`:
- Line 133: The LengthFieldBasedFrameDecoder in the BrokerServer class is
configured with a 256 MB maximum frame size, which could impact memory
allocation per channel under heavy load. Add a code comment above the decoder
configuration explaining that this limit is necessary to support large snapshot
transfers, document the potential memory implications (each active channel can
buffer up to 256 MB), and reference or create production deployment guidance
that includes JVM heap sizing recommendations to account for this buffer
allocation when multiple concurrent channels are active.
In `@drmq-broker/src/main/java/com/drmq/broker/MessageStore.java`:
- Around line 463-479: The concurrent access to the `segments` map during index
cleanup in the deletion loop creates a race condition. After removing a segment
from the `segments` map, the subsequent call to `segments.higherKey(baseOffset)`
and the `index.subMap(baseOffset, endOffset).clear()` operation are vulnerable
to concurrent modifications by other threads. To fix this, acquire a lock on the
`segments` map before entering the loop over `toDelete`, or alternatively,
create a snapshot of the keys before iterating and perform the higherKey lookup
on that snapshot instead of the live map. Ensure the entire segment removal and
index cleanup operation is atomic to prevent ConcurrentModificationException and
unexpected behavior from concurrent segment additions or removals.
- Around line 90-112: The exception handling in the recoverInternal() method is
too strict - when a corrupted or truncated message causes segment.read(position)
to throw an IOException, the exception is immediately rethrown, which aborts
recovery of all remaining segments. Instead, modify the exception handling to
catch the IOException at the inner loop level where segment.read(position) is
called, log a warning message about the corrupted segment, and then break out of
the message-reading loop to continue recovery of the next segment. This allows
recovery to be resilient to individual segment corruption rather than failing
the entire recovery process.
In `@drmq-broker/src/main/java/com/drmq/broker/raft/RaftNode.java`:
- Around line 857-858: The retention calculation for retentionLimit in the
RaftNode class can produce negative values when lastApplied is smaller than the
retention buffer (raftCompactThreshold * 100), which may lead to unexpected
behavior even if later Math.max calls handle it. To make the logic clearer and
prevent negative retention limits, apply Math.max directly in the retentionLimit
calculation to ensure it never goes below zero, explicitly enforcing that the
retention buffer cannot be negative. This clarifies the intent and makes the
code more defensive without relying on downstream Math.max calls to correct
invalid intermediate values.
In `@drmq-broker/src/main/java/com/drmq/broker/raft/RaftPeer.java`:
- Around line 175-207: The sendInstallSnapshot method logs failures on every
exception at line 199 without rate-limiting, which can spam logs during network
issues. Add a lastInstallSnapshotFailureLogTime field to the RaftPeer class
following the pattern of similar fields used in other RPC methods like
sendRequestVote and sendAppendEntries. Then in the catch block of
sendInstallSnapshot, check this field before logging the debug message and only
log if sufficient time has elapsed since the last failure was logged, using the
same rate-limiting logic as the other RPC methods in this class.
In `@drmq-broker/src/test/java/com/drmq/broker/raft/RaftLogTest.java`:
- Around line 43-65: The testDiskCompaction() method currently verifies
compaction only in the in-memory state of the raftLog instance. To validate that
compaction is durable and persisted to disk, add assertions after the existing
post-compact checks that reopen the raftLog instance and reassert the
startIndex, lastIndex, and entry availability. After the current assertions
involving getStartIndex(), getLastIndex(), and getEntry() calls, close the
current raftLog instance, create a new instance by reopening it from disk, and
then repeat the same assertions to confirm that the compacted state (startIndex
of 6, unavailability of entries 4-5, and availability of entries 6-10) persists
across instances.
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: defaults
Review profile: CHILL
Plan: Pro
Run ID: c80268f9-53df-4473-b639-36d3702e17f4
⛔ Files ignored due to path filters (6)
drmq-dashboard/package-lock.jsonis excluded by!**/package-lock.jsondrmq-dashboard/public/favicon.svgis excluded by!**/*.svgdrmq-dashboard/public/icons.svgis excluded by!**/*.svgdrmq-dashboard/src/assets/hero.pngis excluded by!**/*.pngdrmq-dashboard/src/assets/react.svgis excluded by!**/*.svgdrmq-dashboard/src/assets/vite.svgis excluded by!**/*.svg
📒 Files selected for processing (45)
.mvn/wrapper/maven-wrapper.propertiesREADME.mddrmq-broker/pom.xmldrmq-broker/src/main/java/com/drmq/broker/BrokerConfig.javadrmq-broker/src/main/java/com/drmq/broker/BrokerMetrics.javadrmq-broker/src/main/java/com/drmq/broker/BrokerServer.javadrmq-broker/src/main/java/com/drmq/broker/ClientHandler.javadrmq-broker/src/main/java/com/drmq/broker/ConsumerGroupCoordinator.javadrmq-broker/src/main/java/com/drmq/broker/MessageStore.javadrmq-broker/src/main/java/com/drmq/broker/OffsetManager.javadrmq-broker/src/main/java/com/drmq/broker/TelemetryWebSocketServer.javadrmq-broker/src/main/java/com/drmq/broker/persistence/LogManager.javadrmq-broker/src/main/java/com/drmq/broker/persistence/LogSegment.javadrmq-broker/src/main/java/com/drmq/broker/raft/RaftLog.javadrmq-broker/src/main/java/com/drmq/broker/raft/RaftNode.javadrmq-broker/src/main/java/com/drmq/broker/raft/RaftPeer.javadrmq-broker/src/main/java/com/drmq/broker/raft/SnapshotManager.javadrmq-broker/src/test/java/com/drmq/broker/ConsumerGroupCoordinatorTest.javadrmq-broker/src/test/java/com/drmq/broker/MessageStoreTest.javadrmq-broker/src/test/java/com/drmq/broker/raft/RaftLogTest.javadrmq-broker/src/test/java/com/drmq/broker/raft/RaftNodeTest.javadrmq-broker/src/test/java/com/drmq/broker/raft/SnapshotManagerTest.javadrmq-dashboard/.gitignoredrmq-dashboard/README.mddrmq-dashboard/eslint.config.jsdrmq-dashboard/index.htmldrmq-dashboard/package.jsondrmq-dashboard/src/App.tsxdrmq-dashboard/src/index.cssdrmq-dashboard/src/main.tsxdrmq-dashboard/src/pages/Dashboard.tsxdrmq-dashboard/src/pages/Documentation.tsxdrmq-dashboard/src/services/telemetry/MockTelemetryProvider.tsdrmq-dashboard/src/services/telemetry/WebSocketTelemetryProvider.tsdrmq-dashboard/src/types/telemetry.tsdrmq-dashboard/src/useClusterTelemetry.tsdrmq-dashboard/tsconfig.app.jsondrmq-dashboard/tsconfig.jsondrmq-dashboard/tsconfig.node.jsondrmq-dashboard/vite.config.tsdrmq-protocol/src/main/proto/messages.protoload_test.shmvnwmvnw.cmdrun_cluster.sh
…ity, resource management, and path handling.
… robust broker configuration validation
There was a problem hiding this comment.
Actionable comments posted: 1
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (6)
drmq-dashboard/src/services/telemetry/WebSocketTelemetryProvider.ts (1)
72-78:⚠️ Potential issue | 🟡 Minor | ⚡ Quick winKeep
merge()from returningundefined.Line 110 returns
undefinedwhile the method is typed asTelemetryState; if this path is reached,onDataCallbackreceives an invalid state object.Proposed fix
ws.onmessage = (event) => { if (!this.onDataCallback) return; try { const frame = JSON.parse(event.data) as TelemetryState; this.latestFrames.set(url, frame); - this.onDataCallback(this.merge()); + const merged = this.merge(); + if (merged) this.onDataCallback(merged); } catch (e) { console.error('[DRMQ] Failed to parse telemetry', e); } };- private merge(): TelemetryState { + private merge(): TelemetryState | null { const frames = Array.from(this.latestFrames.values()); - if (frames.length === 0) return frames[0]; + if (frames.length === 0) return null;Also applies to: 108-110
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@drmq-dashboard/src/services/telemetry/WebSocketTelemetryProvider.ts` around lines 72 - 78, The merge() method in the WebSocketTelemetryProvider class can return undefined on line 110, but the method's return type is TelemetryState (not TelemetryState | undefined). This causes onDataCallback in the onmessage handler to receive an invalid state object when merge() returns undefined. Fix this by ensuring the merge() method always returns a valid TelemetryState object instead of undefined, such as by returning an empty or default TelemetryState when appropriate rather than allowing the undefined path to be reached.drmq-broker/src/main/java/com/drmq/broker/raft/RaftNode.java (5)
121-129:⚠️ Potential issue | 🟠 Major | ⚡ Quick winValidate
raftCompactThresholdbefore using it in compaction math.
applyCommitted()assumes this value is positive. A negative value can produce afinalCompactIndexgreater thanlastApplied, risking deletion of entries that have not been applied.Proposed fix
this.offsetManager = offsetManager; this.dataDir = dataDir; this.snapshotManager = new SnapshotManager(dataDir, messageStore, offsetManager); + if (raftCompactThreshold <= 0) { + throw new IllegalArgumentException("raftCompactThreshold must be positive"); + } this.raftCompactThreshold = raftCompactThreshold;🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@drmq-broker/src/main/java/com/drmq/broker/raft/RaftNode.java` around lines 121 - 129, Add validation in the RaftNode constructor to ensure the raftCompactThreshold parameter is positive before assigning it to the instance field. Check that raftCompactThreshold is greater than zero and throw an IllegalArgumentException with a descriptive message if it is not. This prevents negative values from being used in compaction calculations that could lead to incorrect finalCompactIndex calculations in methods like applyCommitted().
1125-1129:⚠️ Potential issue | 🟠 Major | ⚡ Quick winDo not treat a compacted
prevLogTermas a wildcard.
prevTerm == 0means the follower cannot verify that prefix, but this code accepts any leader term. For compacted indexes, validate the snapshot boundary explicitly, e.g. only acceptprevLogIndex == lastApplied && request.getPrevLogTerm() == lastAppliedTerm; otherwise reject so the leader sends a snapshot or backs off safely.Proposed direction
if (request.getPrevLogIndex() > 0) { - long prevTerm = raftLog.getTermAt(request.getPrevLogIndex()); - if (request.getPrevLogIndex() > raftLog.getLastIndex() || - (prevTerm != 0 && prevTerm != request.getPrevLogTerm())) { + long prevLogIndex = request.getPrevLogIndex(); + long prevTerm = raftLog.getTermAt(prevLogIndex); + boolean compactedBoundaryMatches = + prevTerm == 0 + && prevLogIndex == lastApplied + && request.getPrevLogTerm() == lastAppliedTerm; + if (prevLogIndex > raftLog.getLastIndex() + || (prevTerm == 0 && !compactedBoundaryMatches) + || (prevTerm != 0 && prevTerm != request.getPrevLogTerm())) { return AppendEntriesResponse.newBuilder()🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@drmq-broker/src/main/java/com/drmq/broker/raft/RaftNode.java` around lines 1125 - 1129, The condition in RaftNode that checks prevLogTerm is incorrectly treating prevTerm == 0 (compacted indexes where the term cannot be verified) as a wildcard that accepts any leader term. Replace the logic that currently allows any request.getPrevLogTerm() when prevTerm == 0 with explicit snapshot boundary validation: only accept the AppendEntries request if prevLogIndex equals lastApplied AND request.getPrevLogTerm() equals lastAppliedTerm for compacted indexes; otherwise return an AppendEntriesResponse rejection so the leader can safely send a snapshot or back off.
724-743:⚠️ Potential issue | 🟠 Major | 🏗️ Heavy liftSurface InstallSnapshot failures before advancing peer replication state.
handleInstallSnapshot()catches restore/write errors and returns a normal response, while the sender treats any same-term final response as success and updatesmatchIndex. Add an explicit success/error field toInstallSnapshotResponseand gate thenextIndex/matchIndexupdate on it, or let failures propagate so the sender retries instead of marking the peer caught up.Also applies to: 1267-1269
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@drmq-broker/src/main/java/com/drmq/broker/raft/RaftNode.java` around lines 724 - 743, The code updates nextIndex and matchIndex for the peer without verifying that the InstallSnapshot operation actually succeeded. When isDone is true, the code assumes success and advances the peer's replication state, but the handler may have caught and silently handled restore or write errors internally. Add an explicit success or error field to the InstallSnapshotResponse class and check this field before updating nextIndex and matchIndex in the condition where isDone is true. Only advance the peer's replication indices when the response explicitly indicates success, preventing failed snapshot installations from being incorrectly treated as successful replication.
857-879:⚠️ Potential issue | 🟠 Major | ⚡ Quick winPersist
lastApplied/lastAppliedTermbefore deleting Raft log entries.Both paths can compact the log before the corresponding applied index is durable. A crash in that window leaves the state file pointing behind
raftLog.getStartIndex(), so recovery can hit missing entries that were already discarded.Proposed direction
if (applied) { stateSaveNeeded = true; + savePersistentState(); // Keep at least 100x the compact threshold as a retention bufferFor InstallSnapshot, update and persist the snapshot-applied state before compacting/removing local log entries:
- // Discard all local log entries up to the snapshot point + lastApplied = snapshotIndex; + lastAppliedTerm = request.getLastIncludedTerm(); + commitIndex = Math.max(commitIndex, snapshotIndex); + stateSaveNeeded = true; + savePersistentState(); + + // Discard all local log entries up to the snapshot point if (raftLog.getLastIndex() > 0) { try { long compactUpTo = Math.min(snapshotIndex, raftLog.getLastIndex()); raftLog.compact(compactUpTo); @@ } raftLog.setStartIndex(snapshotIndex + 1); - lastApplied = snapshotIndex; - lastAppliedTerm = request.getLastIncludedTerm(); - commitIndex = Math.max(commitIndex, snapshotIndex);Also applies to: 1246-1258
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@drmq-broker/src/main/java/com/drmq/broker/raft/RaftNode.java` around lines 857 - 879, The raftLog.compact(finalCompactIndex) call in the RaftNode class is being executed after setting stateSaveNeeded to true, but the lastApplied and lastAppliedTerm state is not actually persisted to disk before the log entries are deleted. This creates a window where a crash could leave the state file pointing to an index that no longer exists in the log. Move the persistence of lastApplied and lastAppliedTerm to occur explicitly and synchronously BEFORE the raftLog.compact(finalCompactIndex) call, ensuring the applied state is durable on disk before any log entries are discarded. This prevents recovery from attempting to access entries that have already been compacted away.
682-695:⚠️ Potential issue | 🟠 Major | 🏗️ Heavy liftCapture snapshot metadata under the same state-machine snapshot boundary.
snapshotIndex/snapshotTermare captured beforecreateSnapshot()freezes the store. IflastAppliedadvances in between, the ZIP can include state newer than the advertisedlastIncludedIndex, causing followers to restore data and then replay those same Raft entries.Move the metadata capture into the same critical section used to freeze the state-machine snapshot, or have
SnapshotManager.createSnapshot(...)return the exact included index/term after the snapshot lock is acquired.🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@drmq-broker/src/main/java/com/drmq/broker/raft/RaftNode.java` around lines 682 - 695, The metadata capture for snapshotIndex and snapshotTerm occurs within the lock but createSnapshot is called after the lock is released, creating a race condition where lastApplied could advance between these two operations, causing the snapshot to contain state newer than the advertised index. Move the snapshotManager.createSnapshot(snapshotIndex) call inside the try block before lock.unlock() so that both metadata capture and snapshot freezing happen atomically within the same critical section, or alternatively modify createSnapshot to return the exact snapshot metadata after acquiring its own internal lock to ensure consistency between the advertised index and the actual snapshot contents.
🧹 Nitpick comments (2)
drmq-dashboard/src/pages/Dashboard.tsx (1)
120-120: Use concrete telemetry types instead ofanyon Lines 120 and 160.This component already depends on the
TelemetryState/BrokerNodecontract. Typing these spots removes avoidable unsafe access in the main render path.Proposed change
+import type { BrokerNode, TelemetryState } from '../types/telemetry'; -export default function Dashboard({ telemetryState, telemetryError }: { telemetryState: any, telemetryError?: string | null }) { +export default function Dashboard({ + telemetryState, + telemetryError, +}: { + telemetryState: TelemetryState | null; + telemetryError?: string | null; +}) { @@ - {nodes.find((n: any) => n.status === 'LEADER')?.name ?? 'No Leader'} + {nodes.find((n: BrokerNode) => n.status === 'LEADER')?.name ?? 'No Leader'}🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@drmq-dashboard/src/pages/Dashboard.tsx` at line 120, Replace the `any` type annotation for the `telemetryState` parameter in the Dashboard function signature with the concrete `TelemetryState` type (or appropriate type based on your codebase's telemetry contract). Similarly, locate the code on line 160 and replace any `any` type annotations there with the appropriate concrete type. This removes unsafe type access in the main render path and provides better type safety for the component.drmq-dashboard/src/App.tsx (1)
7-7: Replaceanywith the telemetry contract type on Line 7.
SidebarreceivesTelemetryState | nullfromuseClusterTelemetry, so keeping this asanydrops type-safety on a shared boundary.Proposed change
+import type { TelemetryState } from './types/telemetry'; -function Sidebar({ telemetryState }: { telemetryState: any }) { +function Sidebar({ telemetryState }: { telemetryState: TelemetryState | null }) {🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@drmq-dashboard/src/App.tsx` at line 7, Replace the `any` type annotation for the `telemetryState` parameter in the `Sidebar` function with the proper telemetry contract type `TelemetryState | null`. This will restore type-safety at the component boundary and ensure that the actual data shape from `useClusterTelemetry` is properly enforced throughout the component.
🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
Inline comments:
In `@drmq-dashboard/src/services/telemetry/WebSocketTelemetryProvider.ts`:
- Around line 85-91: The onclose handler in WebSocketTelemetryProvider is not
removing stale frame data for the closed URL from latestFrames, causing merge()
to continue displaying outdated broker information. In the onclose handler,
after setting the socket to null with this.sockets.set(url, null), delete or
remove the corresponding frame entry from latestFrames using the same URL key to
drop the stale data. Additionally, check if the disconnection is intentional (by
examining whether there is a flag or state indicating this was a deliberate
disconnect call) and suppress the onErrorCallback invocation in that case, since
error reporting should only occur for unintended connection failures.
---
Outside diff comments:
In `@drmq-broker/src/main/java/com/drmq/broker/raft/RaftNode.java`:
- Around line 121-129: Add validation in the RaftNode constructor to ensure the
raftCompactThreshold parameter is positive before assigning it to the instance
field. Check that raftCompactThreshold is greater than zero and throw an
IllegalArgumentException with a descriptive message if it is not. This prevents
negative values from being used in compaction calculations that could lead to
incorrect finalCompactIndex calculations in methods like applyCommitted().
- Around line 1125-1129: The condition in RaftNode that checks prevLogTerm is
incorrectly treating prevTerm == 0 (compacted indexes where the term cannot be
verified) as a wildcard that accepts any leader term. Replace the logic that
currently allows any request.getPrevLogTerm() when prevTerm == 0 with explicit
snapshot boundary validation: only accept the AppendEntries request if
prevLogIndex equals lastApplied AND request.getPrevLogTerm() equals
lastAppliedTerm for compacted indexes; otherwise return an AppendEntriesResponse
rejection so the leader can safely send a snapshot or back off.
- Around line 724-743: The code updates nextIndex and matchIndex for the peer
without verifying that the InstallSnapshot operation actually succeeded. When
isDone is true, the code assumes success and advances the peer's replication
state, but the handler may have caught and silently handled restore or write
errors internally. Add an explicit success or error field to the
InstallSnapshotResponse class and check this field before updating nextIndex and
matchIndex in the condition where isDone is true. Only advance the peer's
replication indices when the response explicitly indicates success, preventing
failed snapshot installations from being incorrectly treated as successful
replication.
- Around line 857-879: The raftLog.compact(finalCompactIndex) call in the
RaftNode class is being executed after setting stateSaveNeeded to true, but the
lastApplied and lastAppliedTerm state is not actually persisted to disk before
the log entries are deleted. This creates a window where a crash could leave the
state file pointing to an index that no longer exists in the log. Move the
persistence of lastApplied and lastAppliedTerm to occur explicitly and
synchronously BEFORE the raftLog.compact(finalCompactIndex) call, ensuring the
applied state is durable on disk before any log entries are discarded. This
prevents recovery from attempting to access entries that have already been
compacted away.
- Around line 682-695: The metadata capture for snapshotIndex and snapshotTerm
occurs within the lock but createSnapshot is called after the lock is released,
creating a race condition where lastApplied could advance between these two
operations, causing the snapshot to contain state newer than the advertised
index. Move the snapshotManager.createSnapshot(snapshotIndex) call inside the
try block before lock.unlock() so that both metadata capture and snapshot
freezing happen atomically within the same critical section, or alternatively
modify createSnapshot to return the exact snapshot metadata after acquiring its
own internal lock to ensure consistency between the advertised index and the
actual snapshot contents.
In `@drmq-dashboard/src/services/telemetry/WebSocketTelemetryProvider.ts`:
- Around line 72-78: The merge() method in the WebSocketTelemetryProvider class
can return undefined on line 110, but the method's return type is TelemetryState
(not TelemetryState | undefined). This causes onDataCallback in the onmessage
handler to receive an invalid state object when merge() returns undefined. Fix
this by ensuring the merge() method always returns a valid TelemetryState object
instead of undefined, such as by returning an empty or default TelemetryState
when appropriate rather than allowing the undefined path to be reached.
---
Nitpick comments:
In `@drmq-dashboard/src/App.tsx`:
- Line 7: Replace the `any` type annotation for the `telemetryState` parameter
in the `Sidebar` function with the proper telemetry contract type
`TelemetryState | null`. This will restore type-safety at the component boundary
and ensure that the actual data shape from `useClusterTelemetry` is properly
enforced throughout the component.
In `@drmq-dashboard/src/pages/Dashboard.tsx`:
- Line 120: Replace the `any` type annotation for the `telemetryState` parameter
in the Dashboard function signature with the concrete `TelemetryState` type (or
appropriate type based on your codebase's telemetry contract). Similarly, locate
the code on line 160 and replace any `any` type annotations there with the
appropriate concrete type. This removes unsafe type access in the main render
path and provides better type safety for the component.
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: defaults
Review profile: CHILL
Plan: Pro
Run ID: c28dde76-ffaf-4bc9-87df-4385d79e6528
📒 Files selected for processing (16)
.mvn/wrapper/maven-wrapper.propertiesdrmq-broker/src/main/java/com/drmq/broker/BrokerConfig.javadrmq-broker/src/main/java/com/drmq/broker/BrokerServer.javadrmq-broker/src/main/java/com/drmq/broker/MessageStore.javadrmq-broker/src/main/java/com/drmq/broker/TelemetryWebSocketServer.javadrmq-broker/src/main/java/com/drmq/broker/persistence/LogSegment.javadrmq-broker/src/main/java/com/drmq/broker/raft/RaftLog.javadrmq-broker/src/main/java/com/drmq/broker/raft/RaftNode.javadrmq-broker/src/test/java/com/drmq/broker/raft/RaftNodeTest.javadrmq-dashboard/src/App.tsxdrmq-dashboard/src/pages/Dashboard.tsxdrmq-dashboard/src/services/telemetry/MockTelemetryProvider.tsdrmq-dashboard/src/services/telemetry/WebSocketTelemetryProvider.tsdrmq-dashboard/src/types/telemetry.tsdrmq-dashboard/src/useClusterTelemetry.tsrun_cluster.sh
🚧 Files skipped from review as they are similar to previous changes (10)
- .mvn/wrapper/maven-wrapper.properties
- drmq-broker/src/main/java/com/drmq/broker/persistence/LogSegment.java
- drmq-broker/src/main/java/com/drmq/broker/BrokerConfig.java
- drmq-dashboard/src/types/telemetry.ts
- drmq-broker/src/test/java/com/drmq/broker/raft/RaftNodeTest.java
- drmq-dashboard/src/services/telemetry/MockTelemetryProvider.ts
- run_cluster.sh
- drmq-broker/src/main/java/com/drmq/broker/BrokerServer.java
- drmq-broker/src/main/java/com/drmq/broker/TelemetryWebSocketServer.java
- drmq-broker/src/main/java/com/drmq/broker/MessageStore.java
…to prevent ghosting during cluster telemetry merging
There was a problem hiding this comment.
Actionable comments posted: 2
🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
Inline comments:
In `@drmq-dashboard/src/services/telemetry/WebSocketTelemetryProvider.ts`:
- Around line 135-145: In the frame processing loop where you iterate through
frames and access frame.nodes[0], the variable local holds a reference to the
original cached object from latestFrames. When you mutate local.status and
local.color directly, you are modifying the cached frame object itself, which
causes subsequent merge calls to operate on already-modified data. Create a
shallow copy of the local node object before applying the status and color
mutations (when isStaleLeader is true) to ensure the original cached frames
remain unchanged and subsequent operations see consistent data.
- Line 111: The condition in the early return statement is logically inverted.
Currently, when frames.length === 0 (no frames available), it attempts to return
frames[0], which is undefined and violates the TelemetryState return type. Fix
this by inverting the condition to check if frames.length > 0 before returning
frames[0], or alternatively, return a safe empty TelemetryState object when the
condition frames.length === 0 is true. This ensures the method always returns a
valid TelemetryState object and prevents undefined from being accessed by
callers.
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: defaults
Review profile: CHILL
Plan: Pro
Run ID: 4f337a18-9af9-4095-bdb8-0b01b96923d3
📒 Files selected for processing (1)
drmq-dashboard/src/services/telemetry/WebSocketTelemetryProvider.ts
Summary by CodeRabbit
Release Notes
New Features
Improvements
Tools