Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions cursus-client/build.gradle
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
dependencies {
api "io.netty:netty-all:${nettyVersion}"
api "org.slf4j:slf4j-api:${slf4jVersion}"
implementation 'com.fasterxml.jackson.core:jackson-databind:2.18.3'

compileOnly 'io.micrometer:micrometer-core:1.16.5'
testImplementation 'io.micrometer:micrometer-core:1.16.5'
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,4 +28,5 @@ public class CursusConsumerConfig {
private String tlsKeyPath;
@Builder.Default private int maxRetries = 3;
@Builder.Default private long maxBackoffMs = 10000;
@Builder.Default private long metadataRefreshIntervalMs = 30000;
}
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,18 @@ public CompletableFuture<byte[]> sendCommand(String command) {
return send(command.getBytes(StandardCharsets.UTF_8));
}

public CompletableFuture<byte[]> sendToAddress(String address, String command) {
if (closed) {
return CompletableFuture.failedFuture(
new CursusConnectionException("ConnectionManager is closed"));
}
ManagedConnection conn =
connections.computeIfAbsent(address, addr -> connect(BrokerAddress.parse(addr)));
CompletableFuture<byte[]> responseFuture = conn.handler.addPendingRequest();
conn.channel.writeAndFlush(Unpooled.wrappedBuffer(command.getBytes(StandardCharsets.UTF_8)));
return responseFuture;
}

public void connectPartition(int partitionId) {
if (closed) {
throw new CursusConnectionException("ConnectionManager is closed");
Expand All @@ -76,6 +88,16 @@ public void connectPartition(int partitionId) {
log.info("Connected partition {} to {}:{}", partitionId, addr.host(), addr.port());
}

public void connectPartitionToAddress(int partitionId, String address) {
if (closed) {
throw new CursusConnectionException("ConnectionManager is closed");
}
BrokerAddress addr = BrokerAddress.parse(address);
ManagedConnection conn = connect(addr);
partitionConnections.put(partitionId, conn);
log.info("Connected partition {} to leader {}:{}", partitionId, addr.host(), addr.port());
}

public ManagedConnection getPartitionConnection(int partitionId) {
ManagedConnection conn = partitionConnections.get(partitionId);
if (conn == null) {
Expand All @@ -95,6 +117,10 @@ public CompletableFuture<byte[]> sendOnPartition(int partitionId, byte[] data) {
return responseFuture;
}

public CompletableFuture<byte[]> sendCommandOnPartition(int partitionId, String command) {
return sendOnPartition(partitionId, command.getBytes(StandardCharsets.UTF_8));
}

/** Returns the handler for a partition connection, allowing push-mode setup for streaming. */
public CursusClientHandler getPartitionHandler(int partitionId) {
ManagedConnection conn = partitionConnections.get(partitionId);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,11 +58,14 @@ public class CursusConsumer implements AutoCloseable {

private volatile String memberId;
private volatile int generation;
private volatile String coordinatorAddr;
private final Map<Integer, String> partitionLeaders = new ConcurrentHashMap<>();
private CursusConsumerMetrics metrics;

// Store scheduled future refs so they can be cancelled on rejoin (I4 fix)
private volatile ScheduledFuture<?> commitFuture;
private volatile ScheduledFuture<?> heartbeatFuture;
private volatile ScheduledFuture<?> metadataRefreshFuture;

public CursusConsumer(CursusConsumerConfig config) {
this(config, null);
Expand Down Expand Up @@ -155,7 +158,7 @@ public void close() {
if (memberId != null) {
String command =
CommandBuilder.leaveGroup(config.getTopic(), config.getGroupId(), memberId);
connectionManager.sendCommand(command).get(5000, TimeUnit.MILLISECONDS);
sendCoordinatorCommandSync(command);
}
} catch (Exception e) {
log.warn("Failed to leave group: {}", e.getMessage());
Expand All @@ -171,17 +174,122 @@ public void close() {
}
}

private byte[] sendCoordinatorCommandSync(String command) throws Exception {
for (int attempt = 0; attempt < 3; attempt++) {
String addr = coordinatorAddr != null ? coordinatorAddr : config.getBrokers().get(0);
byte[] response = sendPlainSocket(addr, command);
String result = new String(response, StandardCharsets.UTF_8);

if (result.contains("NOT_COORDINATOR")) {
log.info("NOT_COORDINATOR response, re-discovering coordinator");
String host = null, port = null;
for (String part : result.split("\\s+")) {
if (part.startsWith("host=")) host = part.substring(5);
else if (part.startsWith("port=")) port = part.substring(5);
}
if (host != null && port != null) {
coordinatorAddr = host + ":" + port;
} else {
findCoordinator();
}
continue;
}
return response;
}
String addr = coordinatorAddr != null ? coordinatorAddr : config.getBrokers().get(0);
return sendPlainSocket(addr, command);
}

private void fetchMetadata() {
try {
String cmd = CommandBuilder.metadata(config.getTopic());
String addr = config.getBrokers().get(0);
byte[] response = sendPlainSocket(addr, cmd);
String result = new String(response, StandardCharsets.UTF_8);
if (result.startsWith("OK")) {
for (String part : result.split("\\s+")) {
if (part.startsWith("leaders=")) {
String[] addrs = part.substring(8).split(",");
for (int i = 0; i < addrs.length; i++) {
String leaderAddr = addrs[i].trim();
if (!leaderAddr.isEmpty()) {
partitionLeaders.put(i, leaderAddr);
}
}
}
}
log.info("Partition leaders: {}", partitionLeaders);
}
} catch (Exception e) {
log.debug("Metadata fetch failed (non-critical): {}", e.getMessage());
}
}

private byte[] sendPlainSocket(String addr, String command) throws Exception {
String[] parts = addr.split(":");
String host = parts[0];
int port = Integer.parseInt(parts[1]);

try (java.net.Socket socket = new java.net.Socket(host, port)) {
socket.setSoTimeout(5000);
java.io.OutputStream out = socket.getOutputStream();
java.io.InputStream in = socket.getInputStream();

byte[] cmdBytes = command.getBytes(StandardCharsets.UTF_8);
byte[] payload = new byte[2 + cmdBytes.length];
System.arraycopy(cmdBytes, 0, payload, 2, cmdBytes.length);

byte[] frame = new byte[4 + payload.length];
frame[0] = (byte) (payload.length >> 24);
frame[1] = (byte) (payload.length >> 16);
frame[2] = (byte) (payload.length >> 8);
frame[3] = (byte) (payload.length);
System.arraycopy(payload, 0, frame, 4, payload.length);
out.write(frame);
out.flush();

byte[] lenBuf = in.readNBytes(4);
int respLen =
((lenBuf[0] & 0xFF) << 24)
| ((lenBuf[1] & 0xFF) << 16)
| ((lenBuf[2] & 0xFF) << 8)
| (lenBuf[3] & 0xFF);
return in.readNBytes(respLen);
}
}

private void findCoordinator() {
try {
String cmd = CommandBuilder.findCoordinator(config.getGroupId());
byte[] response = connectionManager.sendCommand(cmd).get(5000, TimeUnit.MILLISECONDS);
String result = new String(response, StandardCharsets.UTF_8);
if (result.startsWith("OK")) {
String host = null, port = null;
for (String part : result.split("\\s+")) {
if (part.startsWith("host=")) host = part.substring(5);
else if (part.startsWith("port=")) port = part.substring(5);
}
if (host != null && port != null) {
coordinatorAddr = host + ":" + port;
log.info("Coordinator found: {}", coordinatorAddr);
}
}
} catch (Exception e) {
log.debug("Find coordinator failed (non-critical): {}", e.getMessage());
}
}

private void joinGroupAndConsume(Consumer<CursusMessage> handler) throws Exception {
// Cancel any existing scheduled tasks before re-joining (I4 fix)
cancelScheduledTasks();
stopPartitionConsumers();

// Step 0: Find coordinator
findCoordinator();

// Step 1: JOIN_GROUP
String joinCmd = CommandBuilder.joinGroup(config.getTopic(), config.getGroupId(), consumerId);
byte[] joinResponse =
connectionManager
.sendCommand(joinCmd)
.get(config.getSessionTimeoutMs(), TimeUnit.MILLISECONDS);
byte[] joinResponse = sendCoordinatorCommandSync(joinCmd);
String joinResult = new String(joinResponse, StandardCharsets.UTF_8);
log.info("Join group response: {}", joinResult);

Expand All @@ -193,10 +301,7 @@ private void joinGroupAndConsume(Consumer<CursusMessage> handler) throws Excepti
if (assignedPartitions.isEmpty()) {
String syncCmd =
CommandBuilder.syncGroup(config.getTopic(), config.getGroupId(), memberId, generation);
byte[] syncResponse =
connectionManager
.sendCommand(syncCmd)
.get(config.getSessionTimeoutMs(), TimeUnit.MILLISECONDS);
byte[] syncResponse = sendCoordinatorCommandSync(syncCmd);
String syncResult = new String(syncResponse, StandardCharsets.UTF_8);
log.info("Sync group response: {}", syncResult);
assignedPartitions = parsePartitionAssignments(syncResult);
Expand All @@ -209,12 +314,33 @@ private void joinGroupAndConsume(Consumer<CursusMessage> handler) throws Excepti
return;
}

// Step 3-4: Start per-partition consumers (each with own connection)
// Step 3: Fetch metadata for partition leaders
fetchMetadata();

if (metadataRefreshFuture != null) {
metadataRefreshFuture.cancel(false);
}
long refreshMs = config.getMetadataRefreshIntervalMs();
if (refreshMs > 0) {
metadataRefreshFuture =
heartbeatScheduler.scheduleAtFixedRate(
this::fetchMetadata, refreshMs, refreshMs, TimeUnit.MILLISECONDS);
}

// Step 4: Start per-partition consumers (each with own connection)
// FETCH_OFFSET is done inside PartitionConsumer.start()
for (int partition : assignedPartitions) {
String partitionLeaderAddr = partitionLeaders.get(partition);
PartitionConsumer pc =
new PartitionConsumer(
partition, config, connectionManager, config.getGroupId(), memberId, generation);
partition,
config,
connectionManager,
config.getGroupId(),
memberId,
generation,
coordinatorAddr,
partitionLeaderAddr);
partitionConsumers.put(partition, pc);
workerExecutor.submit(() -> pc.start(handler));
if (metrics != null) {
Expand Down Expand Up @@ -315,7 +441,7 @@ private void sendHeartbeat() {
try {
String cmd =
CommandBuilder.heartbeat(config.getTopic(), config.getGroupId(), memberId, generation);
byte[] response = connectionManager.sendCommand(cmd).get(5000, TimeUnit.MILLISECONDS);
byte[] response = sendCoordinatorCommandSync(cmd);
String result = new String(response, StandardCharsets.UTF_8);
if (ProtocolDecoder.isRebalanceRequired(result)) {
log.info("Rebalance required via heartbeat");
Expand Down Expand Up @@ -348,7 +474,7 @@ private void commitAllOffsets() {
String cmd =
CommandBuilder.batchCommit(
config.getTopic(), config.getGroupId(), generation, memberId, sb.toString());
byte[] response = connectionManager.sendCommand(cmd).get(5000, TimeUnit.MILLISECONDS);
byte[] response = sendCoordinatorCommandSync(cmd);
String result = new String(response, StandardCharsets.UTF_8);
if (result.startsWith("OK")) {
// Update committed offsets on success
Expand Down Expand Up @@ -387,6 +513,10 @@ private void cancelScheduledTasks() {
hf.cancel(false);
heartbeatFuture = null;
}
if (metadataRefreshFuture != null) {
metadataRefreshFuture.cancel(false);
metadataRefreshFuture = null;
}
}

private void stopPartitionConsumers() {
Expand Down
Loading
Loading