Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 11 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -9,4 +9,14 @@ data/
data-*/
BROKER_CODE_EXPLAINED.md
CODE_REVIEW_FIXES_EXPLANATION.md
docs/
docs/

# Python SDK
drmq-python-client/venv/
drmq-python-client/__pycache__/
drmq-python-client/*.pyc

# TypeScript SDK
drmq-ts-client/node_modules/
drmq-ts-client/dist/
drmq-ts-client/build/
4 changes: 4 additions & 0 deletions .mvn/wrapper/maven-wrapper.properties
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
wrapperVersion=3.3.4
distributionType=only-script
distributionUrl=https://repo.maven.apache.org/maven2/org/apache/maven/apache-maven/3.8.7/apache-maven-3.8.7-bin.zip
Comment thread
coderabbitai[bot] marked this conversation as resolved.
distributionSha512Sum=1109a1a89c93dd4195a6fcc8cba279fb08e4d3a017088b90c1f54b6fb7d3ceccf8539eecb08fccba6462bb8bba99af2f073bb8956bb201887eabcc843a60a7e6
68 changes: 64 additions & 4 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ mvn clean install
To run a standalone broker (useful for testing and development):

```bash
java -jar drmq-broker/target/drmq-broker-1.0.0-SNAPSHOT.jar --port 9092 --data-dir ./data-1
./mvnw -pl drmq-broker exec:java -Dexec.args="--port 9092 --data-dir ./data-1"
```

### Cluster Mode
Expand All @@ -59,19 +59,19 @@ To run a fault-tolerant cluster, you must start multiple broker instances and pr
**Node 1:**

```bash
java -jar drmq-broker/target/drmq-broker-1.0.0-SNAPSHOT.jar --node-id 1 --port 9092 --data-dir ./data-1 --peers 2:localhost:9093,3:localhost:9094
./mvnw -pl drmq-broker exec:java -Dexec.args="--node-id 1 --port 9092 --data-dir ./data-1 --peers 2:localhost:9093,3:localhost:9094"
```

**Node 2:**

```bash
java -jar drmq-broker/target/drmq-broker-1.0.0-SNAPSHOT.jar --node-id 2 --port 9093 --data-dir ./data-2 --peers 1:localhost:9092,3:localhost:9094
./mvnw -pl drmq-broker exec:java -Dexec.args="--node-id 2 --port 9093 --data-dir ./data-2 --peers 1:localhost:9092,3:localhost:9094"
```

**Node 3:**

```bash
java -jar drmq-broker/target/drmq-broker-1.0.0-SNAPSHOT.jar --node-id 3 --port 9094 --data-dir ./data-3 --peers 1:localhost:9092,2:localhost:9093
./mvnw -pl drmq-broker exec:java -Dexec.args="--node-id 3 --port 9094 --data-dir ./data-3 --peers 1:localhost:9092,2:localhost:9093"
```

## Usage Example
Expand Down Expand Up @@ -154,6 +154,66 @@ try (DRMQConsumer consumer = new DRMQConsumer("localhost:9092", "my-group")) {
}
```

### Python Client (SDK)

DRMQ supports cross-language communication via raw TCP and Protocol Buffers. A Python client implementation is provided in `drmq-python-client`.
The SDK features automatic leader failover, transparent retries, and offset auto-commit functionality identical to the Java client.

**Producer Example:**
```python
from drmq_client import DRMQProducer

producer = DRMQProducer("localhost:9092,localhost:9093")
producer.connect()
res = producer.send("python-topic", b"Hello from Python!")
print(f"Sent at offset {res.offset}")
```

**Consumer Example:**
```python
from drmq_client import DRMQConsumer

consumer = DRMQConsumer("localhost:9092,localhost:9093", group_id="python-workers")
consumer.auto_commit = True
consumer.connect()
consumer.subscribe("python-topic")

messages = consumer.poll(max_messages=10, timeout_ms=5000)
for msg in messages:
print(f"Received: {msg.payload.decode('utf-8')}")
```

### TypeScript Client (SDK)

A native Node.js/TypeScript client is provided in `drmq-ts-client`. Like the Python SDK, it natively supports cluster failovers and leader redirects.

**Producer Example:**
```typescript
import { DRMQProducer } from './client';

const producer = new DRMQProducer("localhost:9092,localhost:9093");
await producer.connect();

const payload = Buffer.from("Hello from TypeScript!");
const res = await producer.send("ts-topic", payload);
console.log(`Sent at offset ${res.offset}`);
```

**Consumer Example:**
```typescript
import { DRMQConsumer } from './client';

const consumer = new DRMQConsumer("localhost:9092,localhost:9093", "ts-workers");
consumer.autoCommit = true;
await consumer.connect();
await consumer.subscribe("ts-topic");

const messages = await consumer.poll(10, 5000);
for (const msg of messages) {
console.log(`Received: ${Buffer.from(msg.payload).toString('utf-8')}`);
}
```

## Interactive CLI

DRMQ provides an interactive command-line interface for both the producer and consumer. This is great for testing and debugging.
Expand Down
10 changes: 10 additions & 0 deletions drmq-broker/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,16 @@
<artifactId>junit-jupiter</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.java-websocket</groupId>
<artifactId>Java-WebSocket</artifactId>
<version>1.5.3</version>
</dependency>
<dependency>
<groupId>com.google.code.gson</groupId>
<artifactId>gson</artifactId>
<version>2.10.1</version>
</dependency>
</dependencies>

<build>
Expand Down
59 changes: 54 additions & 5 deletions drmq-broker/src/main/java/com/drmq/broker/BrokerConfig.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,25 +19,34 @@ public class BrokerConfig {
private final boolean metricsEnabled;
private final int metricsPort;
private final String metricsPath;
private long logSegmentBytes;
private long logRetentionMs;
private final long raftCompactThreshold;

public BrokerConfig(String nodeId, int port, String dataDir, List<PeerAddress> peers,
boolean metricsEnabled, int metricsPort, String metricsPath) {
boolean metricsEnabled, int metricsPort, String metricsPath,
long logSegmentBytes, long logRetentionMs, long raftCompactThreshold) {
this.nodeId = nodeId;
this.port = port;
this.dataDir = dataDir;
this.peers = peers != null ? peers : List.of();
this.metricsEnabled = metricsEnabled;
this.metricsPort = metricsPort;
this.metricsPath = metricsPath != null ? metricsPath : "/metrics";
this.logSegmentBytes = logSegmentBytes;
this.logRetentionMs = logRetentionMs;
this.raftCompactThreshold = raftCompactThreshold;
}

public BrokerConfig(String nodeId, int port, String dataDir, List<PeerAddress> peers) {
this(nodeId, port, dataDir, peers, true, 9096, "/metrics");
this(nodeId, port, dataDir, peers, true, 9096, "/metrics",
100 * 1024 * 1024L, 7L * 24 * 60 * 60 * 1000, 1000L);
}

/** Single-node config (backward compatible) */
public BrokerConfig(int port, String dataDir) {
this("standalone", port, dataDir, List.of(), true, 9096, "/metrics");
this("standalone", port, dataDir, List.of(), true, 9096, "/metrics",
100 * 1024 * 1024L, 7L * 24 * 60 * 60 * 1000, 1000L);
}

public String getNodeId() { return nodeId; }
Expand All @@ -47,6 +56,23 @@ public BrokerConfig(int port, String dataDir) {
public boolean isMetricsEnabled() { return metricsEnabled; }
public int getMetricsPort() { return metricsPort; }
public String getMetricsPath() { return metricsPath; }
public long getLogSegmentBytes() { return logSegmentBytes; }
public long getLogRetentionMs() { return logRetentionMs; }
public long getRaftCompactThreshold() { return raftCompactThreshold; }
Comment thread
coderabbitai[bot] marked this conversation as resolved.

public void setLogSegmentBytes(long logSegmentBytes) {
if (logSegmentBytes <= 0) {
throw new IllegalArgumentException("logSegmentBytes must be positive");
}
this.logSegmentBytes = logSegmentBytes;
}

public void setLogRetentionMs(long logRetentionMs) {
if (logRetentionMs <= 0) {
throw new IllegalArgumentException("logRetentionMs must be positive");
}
this.logRetentionMs = logRetentionMs;
}
Comment thread
coderabbitai[bot] marked this conversation as resolved.

/** True if this broker is part of a Raft cluster */
public boolean isClusterMode() {
Expand All @@ -65,6 +91,9 @@ public boolean isClusterMode() {
* --metrics-disabled
* --metrics-port <port>
* --metrics-path </metrics>
* --log-segment-bytes <bytes>
* --log-retention-ms <ms>
* --raft-compact-threshold <count>
*/
public static BrokerConfig fromArgs(String[] args) {
String nodeId = "standalone";
Expand All @@ -74,10 +103,13 @@ public static BrokerConfig fromArgs(String[] args) {
boolean metricsEnabled = true;
int metricsPort = 9096;
String metricsPath = "/metrics";
long logSegmentBytes = 100 * 1024 * 1024L; // 100MB
long logRetentionMs = 7L * 24 * 60 * 60 * 1000; // 7 days
long raftCompactThreshold = 1000L;

for (int i = 0; i < args.length; i++) {
switch (args[i]) {
case "--id" -> nodeId = args[++i];
case "--id", "--node-id" -> nodeId = args[++i];
case "--port" -> port = Integer.parseInt(args[++i]);
case "--data-dir" -> dataDir = args[++i];
case "--peers" -> {
Expand All @@ -90,6 +122,9 @@ public static BrokerConfig fromArgs(String[] args) {
case "--metrics-disabled" -> metricsEnabled = false;
case "--metrics-port" -> metricsPort = parsePortArg(args, ++i, "--metrics-port");
case "--metrics-path" -> metricsPath = parsePathArg(args, ++i, "--metrics-path");
case "--log-segment-bytes" -> logSegmentBytes = parseLongArg(args, ++i, "--log-segment-bytes");
case "--log-retention-ms" -> logRetentionMs = parseLongArg(args, ++i, "--log-retention-ms");
case "--raft-compact-threshold" -> raftCompactThreshold = parseLongArg(args, ++i, "--raft-compact-threshold");
default -> {
if (i == 0) {
try {
Expand All @@ -104,7 +139,8 @@ public static BrokerConfig fromArgs(String[] args) {
}
}

return new BrokerConfig(nodeId, port, dataDir, peers, metricsEnabled, metricsPort, metricsPath);
return new BrokerConfig(nodeId, port, dataDir, peers, metricsEnabled, metricsPort, metricsPath,
logSegmentBytes, logRetentionMs, raftCompactThreshold);
}

private static boolean parseBooleanArg(String[] args, int index, String flag) {
Expand All @@ -131,6 +167,19 @@ private static int parsePortArg(String[] args, int index, String flag) {
}
}

private static long parseLongArg(String[] args, int index, String flag) {
String value = requireValue(args, index, flag);
try {
long parsed = Long.parseLong(value);
if (parsed <= 0) {
throw new IllegalArgumentException(flag + " must be positive");
}
return parsed;
} catch (NumberFormatException e) {
throw new IllegalArgumentException(flag + " must be a valid long integer, got: " + value, e);
}
}

private static String parsePathArg(String[] args, int index, String flag) {
String value = requireValue(args, index, flag);
String normalized = value.trim();
Expand Down
37 changes: 37 additions & 0 deletions drmq-broker/src/main/java/com/drmq/broker/BrokerMetrics.java
Original file line number Diff line number Diff line change
Expand Up @@ -133,6 +133,14 @@ public void registerBroker(java.util.function.Supplier<Integer> activeConnection
.register(registry);
Gauge.builder("drmq.broker.raft.is_leader", raftNode, node -> node.isLeader() ? 1 : 0)
.register(registry);
for (String peerId : raftNode.getPeerIds()) {
Gauge.builder("drmq.broker.raft.replication_lag", raftNode, node -> {
if (!node.isLeader()) return 0.0;
Long matchIdx = node.getMatchIndexMap().get(peerId);
if (matchIdx == null) return 0.0;
return (double) (node.getLastLogIndex() - matchIdx);
}).tags("peer_id", peerId).register(registry);
}
}
}

Expand Down Expand Up @@ -354,6 +362,35 @@ private void appendJsonLatency(StringBuilder json, String key, double meanSecond
json.append('}');
}

public double getCounterValueByType(String name, String type) {
if (!enabled || registry == null) {
return 0.0;
}
return counterValue(name, Tags.of("type", type));
}

/**
* Return the current value of a named gauge, or 0 if metrics are disabled or the gauge doesn't exist.
*/
public double getGaugeValue(String name) {
if (!enabled || registry == null) {
return 0.0;
}
var gauge = registry.find(name).gauge();
return gauge != null ? gauge.value() : 0.0;
}

/**
* Return the mean duration (in milliseconds) of a named timer tagged by type,
* or 0 if no samples have been recorded.
*/
public double getTimerMeanMs(String name, String type) {
if (!enabled || registry == null) {
return 0.0;
}
return timerMeanSeconds(name, Tags.of("type", type)) * 1000.0;
}

private double counterValue(String name, Tags tags) {
Counter counter = registry.find(name).tags(tags).counter();
return counter != null ? counter.count() : 0.0;
Expand Down
22 changes: 18 additions & 4 deletions drmq-broker/src/main/java/com/drmq/broker/BrokerServer.java
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ public class BrokerServer {
private final RaftNode raftNode;
private final List<RaftPeer> raftPeers;
private final BrokerMetrics metrics;
private TelemetryWebSocketServer telemetryServer;

private volatile boolean running = false;

Expand All @@ -52,8 +53,8 @@ public class BrokerServer {

public BrokerServer(BrokerConfig config) throws IOException {
this.config = config;
this.logManager = new LogManager(config.getDataDir());
this.messageStore = new MessageStore(logManager);
this.logManager = new LogManager(config);
this.messageStore = new MessageStore(logManager, config);
this.offsetManager = new OffsetManager(config.getDataDir());
this.raftPeers = new ArrayList<>();
this.metrics = BrokerMetrics.init(config);
Expand All @@ -65,7 +66,8 @@ public BrokerServer(BrokerConfig config) throws IOException {
config.getPeers(),
messageStore,
offsetManager,
Paths.get(config.getDataDir())
Paths.get(config.getDataDir()),
config.getRaftCompactThreshold()
);

for (BrokerConfig.PeerAddress peer : config.getPeers()) {
Expand All @@ -74,6 +76,7 @@ public BrokerServer(BrokerConfig config) throws IOException {
raftNode.registerVoteHandler(peer.id(), raftPeer::sendRequestVote);
raftNode.registerAppendHandler(peer.id(), raftPeer::sendAppendEntries);
raftNode.registerPreVoteHandler(peer.id(), raftPeer::sendPreVote);
raftNode.registerInstallSnapshotHandler(peer.id(), raftPeer::sendInstallSnapshot);
}

logger.info("Cluster mode: nodeId={}, peers={}", config.getNodeId(), config.getPeers());
Expand All @@ -100,6 +103,10 @@ public BrokerServer() throws IOException {
this(DEFAULT_PORT, DEFAULT_THREAD_POOL_SIZE);
}

public int getActiveChannelsCount() {
return activeChannels != null ? activeChannels.size() : 0;
}

public void start() throws IOException {
try {
messageStore.recover();
Expand All @@ -124,7 +131,7 @@ public void start() throws IOException {
@Override
public void initChannel(SocketChannel ch) {
ChannelPipeline p = ch.pipeline();
p.addLast(new LengthFieldBasedFrameDecoder(10 * 1024 * 1024, 0, 4, 0, 4));
p.addLast(new LengthFieldBasedFrameDecoder(256 * 1024 * 1024, 0, 4, 0, 4));
p.addLast(new ByteArrayDecoder());
p.addLast(new LengthFieldPrepender(4));
p.addLast(new ByteArrayEncoder());
Expand All @@ -139,6 +146,10 @@ public void initChannel(SocketChannel ch) {
if (raftNode != null) {
raftNode.start();
}
int wsPort = config.getPort() + 200;
telemetryServer = new TelemetryWebSocketServer(wsPort, this);
telemetryServer.start();

logger.info("DRMQ Broker started on port {} with data directory {}",
config.getPort(), config.getDataDir());

Expand Down Expand Up @@ -179,6 +190,9 @@ public void shutdown() {
if (raftNode != null) {
raftNode.stop();
}
if (telemetryServer != null) {
telemetryServer.shutdown();
}

if (activeChannels != null) {
activeChannels.close().awaitUninterruptibly();
Expand Down
Loading