diff --git a/cursus-client/build.gradle b/cursus-client/build.gradle index 132a0f8..e663163 100644 --- a/cursus-client/build.gradle +++ b/cursus-client/build.gradle @@ -1,6 +1,7 @@ dependencies { api "io.netty:netty-all:${nettyVersion}" api "org.slf4j:slf4j-api:${slf4jVersion}" + implementation 'com.fasterxml.jackson.core:jackson-databind:2.18.3' compileOnly 'io.micrometer:micrometer-core:1.16.5' testImplementation 'io.micrometer:micrometer-core:1.16.5' diff --git a/cursus-client/src/main/java/io/cursus/client/config/CursusConsumerConfig.java b/cursus-client/src/main/java/io/cursus/client/config/CursusConsumerConfig.java index de82655..06793a4 100644 --- a/cursus-client/src/main/java/io/cursus/client/config/CursusConsumerConfig.java +++ b/cursus-client/src/main/java/io/cursus/client/config/CursusConsumerConfig.java @@ -28,4 +28,5 @@ public class CursusConsumerConfig { private String tlsKeyPath; @Builder.Default private int maxRetries = 3; @Builder.Default private long maxBackoffMs = 10000; + @Builder.Default private long metadataRefreshIntervalMs = 30000; } diff --git a/cursus-client/src/main/java/io/cursus/client/connection/ConnectionManager.java b/cursus-client/src/main/java/io/cursus/client/connection/ConnectionManager.java index 933815d..baeff97 100644 --- a/cursus-client/src/main/java/io/cursus/client/connection/ConnectionManager.java +++ b/cursus-client/src/main/java/io/cursus/client/connection/ConnectionManager.java @@ -66,6 +66,18 @@ public CompletableFuture sendCommand(String command) { return send(command.getBytes(StandardCharsets.UTF_8)); } + public CompletableFuture sendToAddress(String address, String command) { + if (closed) { + return CompletableFuture.failedFuture( + new CursusConnectionException("ConnectionManager is closed")); + } + ManagedConnection conn = + connections.computeIfAbsent(address, addr -> connect(BrokerAddress.parse(addr))); + CompletableFuture responseFuture = conn.handler.addPendingRequest(); + conn.channel.writeAndFlush(Unpooled.wrappedBuffer(command.getBytes(StandardCharsets.UTF_8))); + return responseFuture; + } + public void connectPartition(int partitionId) { if (closed) { throw new CursusConnectionException("ConnectionManager is closed"); @@ -76,6 +88,16 @@ public void connectPartition(int partitionId) { log.info("Connected partition {} to {}:{}", partitionId, addr.host(), addr.port()); } + public void connectPartitionToAddress(int partitionId, String address) { + if (closed) { + throw new CursusConnectionException("ConnectionManager is closed"); + } + BrokerAddress addr = BrokerAddress.parse(address); + ManagedConnection conn = connect(addr); + partitionConnections.put(partitionId, conn); + log.info("Connected partition {} to leader {}:{}", partitionId, addr.host(), addr.port()); + } + public ManagedConnection getPartitionConnection(int partitionId) { ManagedConnection conn = partitionConnections.get(partitionId); if (conn == null) { @@ -95,6 +117,10 @@ public CompletableFuture sendOnPartition(int partitionId, byte[] data) { return responseFuture; } + public CompletableFuture sendCommandOnPartition(int partitionId, String command) { + return sendOnPartition(partitionId, command.getBytes(StandardCharsets.UTF_8)); + } + /** Returns the handler for a partition connection, allowing push-mode setup for streaming. */ public CursusClientHandler getPartitionHandler(int partitionId) { ManagedConnection conn = partitionConnections.get(partitionId); diff --git a/cursus-client/src/main/java/io/cursus/client/consumer/CursusConsumer.java b/cursus-client/src/main/java/io/cursus/client/consumer/CursusConsumer.java index f2dae5e..584657c 100644 --- a/cursus-client/src/main/java/io/cursus/client/consumer/CursusConsumer.java +++ b/cursus-client/src/main/java/io/cursus/client/consumer/CursusConsumer.java @@ -58,11 +58,14 @@ public class CursusConsumer implements AutoCloseable { private volatile String memberId; private volatile int generation; + private volatile String coordinatorAddr; + private final Map partitionLeaders = new ConcurrentHashMap<>(); private CursusConsumerMetrics metrics; // Store scheduled future refs so they can be cancelled on rejoin (I4 fix) private volatile ScheduledFuture commitFuture; private volatile ScheduledFuture heartbeatFuture; + private volatile ScheduledFuture metadataRefreshFuture; public CursusConsumer(CursusConsumerConfig config) { this(config, null); @@ -155,7 +158,7 @@ public void close() { if (memberId != null) { String command = CommandBuilder.leaveGroup(config.getTopic(), config.getGroupId(), memberId); - connectionManager.sendCommand(command).get(5000, TimeUnit.MILLISECONDS); + sendCoordinatorCommandSync(command); } } catch (Exception e) { log.warn("Failed to leave group: {}", e.getMessage()); @@ -171,17 +174,122 @@ public void close() { } } + private byte[] sendCoordinatorCommandSync(String command) throws Exception { + for (int attempt = 0; attempt < 3; attempt++) { + String addr = coordinatorAddr != null ? coordinatorAddr : config.getBrokers().get(0); + byte[] response = sendPlainSocket(addr, command); + String result = new String(response, StandardCharsets.UTF_8); + + if (result.contains("NOT_COORDINATOR")) { + log.info("NOT_COORDINATOR response, re-discovering coordinator"); + String host = null, port = null; + for (String part : result.split("\\s+")) { + if (part.startsWith("host=")) host = part.substring(5); + else if (part.startsWith("port=")) port = part.substring(5); + } + if (host != null && port != null) { + coordinatorAddr = host + ":" + port; + } else { + findCoordinator(); + } + continue; + } + return response; + } + String addr = coordinatorAddr != null ? coordinatorAddr : config.getBrokers().get(0); + return sendPlainSocket(addr, command); + } + + private void fetchMetadata() { + try { + String cmd = CommandBuilder.metadata(config.getTopic()); + String addr = config.getBrokers().get(0); + byte[] response = sendPlainSocket(addr, cmd); + String result = new String(response, StandardCharsets.UTF_8); + if (result.startsWith("OK")) { + for (String part : result.split("\\s+")) { + if (part.startsWith("leaders=")) { + String[] addrs = part.substring(8).split(","); + for (int i = 0; i < addrs.length; i++) { + String leaderAddr = addrs[i].trim(); + if (!leaderAddr.isEmpty()) { + partitionLeaders.put(i, leaderAddr); + } + } + } + } + log.info("Partition leaders: {}", partitionLeaders); + } + } catch (Exception e) { + log.debug("Metadata fetch failed (non-critical): {}", e.getMessage()); + } + } + + private byte[] sendPlainSocket(String addr, String command) throws Exception { + String[] parts = addr.split(":"); + String host = parts[0]; + int port = Integer.parseInt(parts[1]); + + try (java.net.Socket socket = new java.net.Socket(host, port)) { + socket.setSoTimeout(5000); + java.io.OutputStream out = socket.getOutputStream(); + java.io.InputStream in = socket.getInputStream(); + + byte[] cmdBytes = command.getBytes(StandardCharsets.UTF_8); + byte[] payload = new byte[2 + cmdBytes.length]; + System.arraycopy(cmdBytes, 0, payload, 2, cmdBytes.length); + + byte[] frame = new byte[4 + payload.length]; + frame[0] = (byte) (payload.length >> 24); + frame[1] = (byte) (payload.length >> 16); + frame[2] = (byte) (payload.length >> 8); + frame[3] = (byte) (payload.length); + System.arraycopy(payload, 0, frame, 4, payload.length); + out.write(frame); + out.flush(); + + byte[] lenBuf = in.readNBytes(4); + int respLen = + ((lenBuf[0] & 0xFF) << 24) + | ((lenBuf[1] & 0xFF) << 16) + | ((lenBuf[2] & 0xFF) << 8) + | (lenBuf[3] & 0xFF); + return in.readNBytes(respLen); + } + } + + private void findCoordinator() { + try { + String cmd = CommandBuilder.findCoordinator(config.getGroupId()); + byte[] response = connectionManager.sendCommand(cmd).get(5000, TimeUnit.MILLISECONDS); + String result = new String(response, StandardCharsets.UTF_8); + if (result.startsWith("OK")) { + String host = null, port = null; + for (String part : result.split("\\s+")) { + if (part.startsWith("host=")) host = part.substring(5); + else if (part.startsWith("port=")) port = part.substring(5); + } + if (host != null && port != null) { + coordinatorAddr = host + ":" + port; + log.info("Coordinator found: {}", coordinatorAddr); + } + } + } catch (Exception e) { + log.debug("Find coordinator failed (non-critical): {}", e.getMessage()); + } + } + private void joinGroupAndConsume(Consumer handler) throws Exception { // Cancel any existing scheduled tasks before re-joining (I4 fix) cancelScheduledTasks(); stopPartitionConsumers(); + // Step 0: Find coordinator + findCoordinator(); + // Step 1: JOIN_GROUP String joinCmd = CommandBuilder.joinGroup(config.getTopic(), config.getGroupId(), consumerId); - byte[] joinResponse = - connectionManager - .sendCommand(joinCmd) - .get(config.getSessionTimeoutMs(), TimeUnit.MILLISECONDS); + byte[] joinResponse = sendCoordinatorCommandSync(joinCmd); String joinResult = new String(joinResponse, StandardCharsets.UTF_8); log.info("Join group response: {}", joinResult); @@ -193,10 +301,7 @@ private void joinGroupAndConsume(Consumer handler) throws Excepti if (assignedPartitions.isEmpty()) { String syncCmd = CommandBuilder.syncGroup(config.getTopic(), config.getGroupId(), memberId, generation); - byte[] syncResponse = - connectionManager - .sendCommand(syncCmd) - .get(config.getSessionTimeoutMs(), TimeUnit.MILLISECONDS); + byte[] syncResponse = sendCoordinatorCommandSync(syncCmd); String syncResult = new String(syncResponse, StandardCharsets.UTF_8); log.info("Sync group response: {}", syncResult); assignedPartitions = parsePartitionAssignments(syncResult); @@ -209,12 +314,33 @@ private void joinGroupAndConsume(Consumer handler) throws Excepti return; } - // Step 3-4: Start per-partition consumers (each with own connection) + // Step 3: Fetch metadata for partition leaders + fetchMetadata(); + + if (metadataRefreshFuture != null) { + metadataRefreshFuture.cancel(false); + } + long refreshMs = config.getMetadataRefreshIntervalMs(); + if (refreshMs > 0) { + metadataRefreshFuture = + heartbeatScheduler.scheduleAtFixedRate( + this::fetchMetadata, refreshMs, refreshMs, TimeUnit.MILLISECONDS); + } + + // Step 4: Start per-partition consumers (each with own connection) // FETCH_OFFSET is done inside PartitionConsumer.start() for (int partition : assignedPartitions) { + String partitionLeaderAddr = partitionLeaders.get(partition); PartitionConsumer pc = new PartitionConsumer( - partition, config, connectionManager, config.getGroupId(), memberId, generation); + partition, + config, + connectionManager, + config.getGroupId(), + memberId, + generation, + coordinatorAddr, + partitionLeaderAddr); partitionConsumers.put(partition, pc); workerExecutor.submit(() -> pc.start(handler)); if (metrics != null) { @@ -315,7 +441,7 @@ private void sendHeartbeat() { try { String cmd = CommandBuilder.heartbeat(config.getTopic(), config.getGroupId(), memberId, generation); - byte[] response = connectionManager.sendCommand(cmd).get(5000, TimeUnit.MILLISECONDS); + byte[] response = sendCoordinatorCommandSync(cmd); String result = new String(response, StandardCharsets.UTF_8); if (ProtocolDecoder.isRebalanceRequired(result)) { log.info("Rebalance required via heartbeat"); @@ -348,7 +474,7 @@ private void commitAllOffsets() { String cmd = CommandBuilder.batchCommit( config.getTopic(), config.getGroupId(), generation, memberId, sb.toString()); - byte[] response = connectionManager.sendCommand(cmd).get(5000, TimeUnit.MILLISECONDS); + byte[] response = sendCoordinatorCommandSync(cmd); String result = new String(response, StandardCharsets.UTF_8); if (result.startsWith("OK")) { // Update committed offsets on success @@ -387,6 +513,10 @@ private void cancelScheduledTasks() { hf.cancel(false); heartbeatFuture = null; } + if (metadataRefreshFuture != null) { + metadataRefreshFuture.cancel(false); + metadataRefreshFuture = null; + } } private void stopPartitionConsumers() { diff --git a/cursus-client/src/main/java/io/cursus/client/consumer/PartitionConsumer.java b/cursus-client/src/main/java/io/cursus/client/consumer/PartitionConsumer.java index 2d5ebfc..2d518bb 100644 --- a/cursus-client/src/main/java/io/cursus/client/consumer/PartitionConsumer.java +++ b/cursus-client/src/main/java/io/cursus/client/consumer/PartitionConsumer.java @@ -42,10 +42,13 @@ public class PartitionConsumer { private final String group; private final String member; private final int generation; + private final String coordinatorAddr; + private volatile String partitionLeaderAddr; private final AtomicLong currentOffset = new AtomicLong(0); private final AtomicLong committedOffset = new AtomicLong(0); private final AtomicBoolean running = new AtomicBoolean(false); private final AtomicBoolean rebalanceRequired = new AtomicBoolean(false); + private byte[] response; public PartitionConsumer( int partitionId, @@ -53,13 +56,17 @@ public PartitionConsumer( ConnectionManager connectionManager, String group, String member, - int generation) { + int generation, + String coordinatorAddr, + String partitionLeaderAddr) { this.partitionId = partitionId; this.config = config; this.connectionManager = connectionManager; this.group = group; this.member = member; this.generation = generation; + this.coordinatorAddr = coordinatorAddr; + this.partitionLeaderAddr = partitionLeaderAddr; } /** @@ -122,12 +129,12 @@ private long fetchOffset() { for (int attempt = 0; attempt < 3; attempt++) { try { String cmd = CommandBuilder.fetchOffset(config.getTopic(), partitionId, group); - byte[] response = - connectionManager - .sendCommand(cmd) - .get(config.getSessionTimeoutMs(), TimeUnit.MILLISECONDS); + String target = coordinatorAddr != null ? coordinatorAddr : config.getBrokers().get(0); + response = sendPlainCommand(target, cmd); String result = new String(response, StandardCharsets.UTF_8).trim(); - if (result.isEmpty() || ProtocolDecoder.isErrorResponse(result)) { + if (result.isEmpty() + || ProtocolDecoder.isErrorResponse(result) + || result.startsWith("NOT_AUTHORIZED")) { throw new CursusProtocolException("Fetch offset error: " + result); } return Long.parseLong(result); @@ -157,7 +164,7 @@ private long fetchOffset() { "Failed to fetch offset for partition " + partitionId + " after 3 attempts"); } - /** Polling mode: send CONSUME, read one response, process messages, sleep, repeat. */ + /** Polling mode: send CONSUME via plain socket (like Python SDK), process messages, repeat. */ private void runPollingLoop(Consumer handler) { Backoff backoff = new Backoff(Duration.ofMillis(100), Duration.ofMillis(config.getMaxBackoffMs())); @@ -166,12 +173,23 @@ private void runPollingLoop(Consumer handler) { String command = CommandBuilder.consume( config.getTopic(), partitionId, currentOffset.get(), group, generation, member); - byte[] responseBytes = - connectionManager - .sendOnPartition(partitionId, command.getBytes(StandardCharsets.UTF_8)) - .get(config.getSessionTimeoutMs(), TimeUnit.MILLISECONDS); + String consumeTarget = + partitionLeaderAddr != null ? partitionLeaderAddr : config.getBrokers().get(0); + byte[] responseBytes = sendPlainCommand(consumeTarget, command); String response = new String(responseBytes, StandardCharsets.UTF_8); + if (response.contains("NOT_LEADER LEADER_IS")) { + String[] parts = response.split("\\s+"); + for (int i = 0; i < parts.length; i++) { + if ("LEADER_IS".equals(parts[i]) && i + 1 < parts.length) { + partitionLeaderAddr = parts[i + 1]; + log.info("Partition {} leader updated to {}", partitionId, partitionLeaderAddr); + break; + } + } + continue; + } + if (ProtocolDecoder.isErrorResponse(response)) { log.warn("Partition {} poll error: {}", partitionId, response); Thread.sleep(backoff.nextBackoff().toMillis()); @@ -184,6 +202,11 @@ private void runPollingLoop(Consumer handler) { } List messages = ProtocolDecoder.decodeBatchMessages(responseBytes); + log.debug( + "Partition {} poll: {} bytes, {} messages", + partitionId, + responseBytes.length, + messages.size()); if (messages.isEmpty()) { Thread.sleep(backoff.nextBackoff().toMillis()); continue; @@ -234,7 +257,11 @@ private void runStreamingLoop(Consumer handler) { String command = CommandBuilder.stream( config.getTopic(), partitionId, group, currentOffset.get(), generation, member); - connectionManager.sendOnPartition(partitionId, command.getBytes(StandardCharsets.UTF_8)); + if (coordinatorAddr != null) { + connectionManager.sendToAddress(coordinatorAddr, command); + } else { + connectionManager.sendCommandOnPartition(partitionId, command); + } while (running.get()) { try { @@ -270,8 +297,7 @@ private void runStreamingLoop(Consumer handler) { String restream = CommandBuilder.stream( config.getTopic(), partitionId, group, currentOffset.get(), generation, member); - connectionManager.sendOnPartition( - partitionId, restream.getBytes(StandardCharsets.UTF_8)); + connectionManager.sendCommandOnPartition(partitionId, restream); } catch (InterruptedException ie) { Thread.currentThread().interrupt(); return; @@ -286,6 +312,41 @@ private void runStreamingLoop(Consumer handler) { } } + private byte[] sendPlainCommand(String addr, String command) throws Exception { + String[] parts = addr.split(":"); + String host = parts[0]; + int port = Integer.parseInt(parts[1]); + + try (java.net.Socket socket = new java.net.Socket(host, port)) { + socket.setSoTimeout((int) config.getSessionTimeoutMs()); + java.io.OutputStream out = socket.getOutputStream(); + java.io.InputStream in = socket.getInputStream(); + + // Wrap with encode_message("", cmd) — 2-byte topic length prefix (0x0000) + command + byte[] cmdBytes = command.getBytes(StandardCharsets.UTF_8); + byte[] payload = new byte[2 + cmdBytes.length]; + // payload[0] and payload[1] are already 0x00 (empty topic length) + System.arraycopy(cmdBytes, 0, payload, 2, cmdBytes.length); + + byte[] frame = new byte[4 + payload.length]; + frame[0] = (byte) (payload.length >> 24); + frame[1] = (byte) (payload.length >> 16); + frame[2] = (byte) (payload.length >> 8); + frame[3] = (byte) (payload.length); + System.arraycopy(payload, 0, frame, 4, payload.length); + out.write(frame); + out.flush(); + + byte[] lenBuf = in.readNBytes(4); + int respLen = + ((lenBuf[0] & 0xFF) << 24) + | ((lenBuf[1] & 0xFF) << 16) + | ((lenBuf[2] & 0xFF) << 8) + | (lenBuf[3] & 0xFF); + return in.readNBytes(respLen); + } + } + /** Commits the current offset for this partition using COMMIT_OFFSET on the leader connection. */ public void commitOffset() { long offset = currentOffset.get(); @@ -294,7 +355,8 @@ public void commitOffset() { String command = CommandBuilder.commitOffset( config.getTopic(), partitionId, group, offset, generation, member); - connectionManager.sendCommand(command).get(5000, TimeUnit.MILLISECONDS); + String target = coordinatorAddr != null ? coordinatorAddr : config.getBrokers().get(0); + sendPlainCommand(target, command); committedOffset.set(offset); } catch (Exception e) { log.warn( diff --git a/cursus-client/src/main/java/io/cursus/client/eventstore/AppendResult.java b/cursus-client/src/main/java/io/cursus/client/eventstore/AppendResult.java new file mode 100644 index 0000000..ab35e2e --- /dev/null +++ b/cursus-client/src/main/java/io/cursus/client/eventstore/AppendResult.java @@ -0,0 +1,16 @@ +package io.cursus.client.eventstore; + +import lombok.Getter; + +@Getter +public class AppendResult { + private final long version; + private final long offset; + private final int partition; + + public AppendResult(long version, long offset, int partition) { + this.version = version; + this.offset = offset; + this.partition = partition; + } +} diff --git a/cursus-client/src/main/java/io/cursus/client/eventstore/CursusEventStore.java b/cursus-client/src/main/java/io/cursus/client/eventstore/CursusEventStore.java new file mode 100644 index 0000000..1dcd335 --- /dev/null +++ b/cursus-client/src/main/java/io/cursus/client/eventstore/CursusEventStore.java @@ -0,0 +1,270 @@ +package io.cursus.client.eventstore; + +import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.ObjectMapper; +import io.cursus.client.exception.CursusConnectionException; +import io.cursus.client.exception.CursusException; +import io.cursus.client.message.CursusMessage; +import io.cursus.client.protocol.ProtocolDecoder; +import java.io.InputStream; +import java.io.OutputStream; +import java.net.Socket; +import java.nio.charset.StandardCharsets; +import java.util.ArrayList; +import java.util.List; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class CursusEventStore implements AutoCloseable { + + private static final Logger log = LoggerFactory.getLogger(CursusEventStore.class); + private static final ObjectMapper MAPPER = new ObjectMapper(); + + private final String addr; + private final String topic; + private final String producerId; + private Socket socket; + + public CursusEventStore(String addr, String topic, String producerId) { + this.addr = addr; + this.topic = topic; + this.producerId = producerId; + } + + private Socket getSocket() throws Exception { + if (socket != null && !socket.isClosed()) return socket; + String[] parts = addr.split(":"); + socket = new Socket(parts[0], Integer.parseInt(parts[1])); + socket.setSoTimeout(10000); + return socket; + } + + private void resetSocket() { + if (socket != null) { + try { + socket.close(); + } catch (Exception ignored) { + } + socket = null; + } + } + + private String sendCommand(String command) throws Exception { + Socket s = getSocket(); + OutputStream out = s.getOutputStream(); + InputStream in = s.getInputStream(); + + byte[] cmdBytes = command.getBytes(StandardCharsets.UTF_8); + byte[] payload = new byte[2 + cmdBytes.length]; + System.arraycopy(cmdBytes, 0, payload, 2, cmdBytes.length); + + byte[] frame = new byte[4 + payload.length]; + frame[0] = (byte) (payload.length >> 24); + frame[1] = (byte) (payload.length >> 16); + frame[2] = (byte) (payload.length >> 8); + frame[3] = (byte) (payload.length); + System.arraycopy(payload, 0, frame, 4, payload.length); + out.write(frame); + out.flush(); + + byte[] lenBuf = in.readNBytes(4); + int respLen = + ((lenBuf[0] & 0xFF) << 24) + | ((lenBuf[1] & 0xFF) << 16) + | ((lenBuf[2] & 0xFF) << 8) + | (lenBuf[3] & 0xFF); + byte[] resp = in.readNBytes(respLen); + return new String(resp, StandardCharsets.UTF_8); + } + + private byte[] readFrame() throws Exception { + Socket s = getSocket(); + InputStream in = s.getInputStream(); + byte[] lenBuf = in.readNBytes(4); + int respLen = + ((lenBuf[0] & 0xFF) << 24) + | ((lenBuf[1] & 0xFF) << 16) + | ((lenBuf[2] & 0xFF) << 8) + | (lenBuf[3] & 0xFF); + return in.readNBytes(respLen); + } + + public void createTopic(int partitions) { + try { + String resp = + sendCommand( + "CREATE topic=" + topic + " partitions=" + partitions + " event_sourcing=true"); + if (resp.startsWith("ERROR")) { + throw new CursusException("createTopic: " + resp); + } + log.info("EventStore topic '{}' created with {} partitions", topic, partitions); + } catch (CursusException e) { + throw e; + } catch (Exception e) { + resetSocket(); + throw new CursusConnectionException("createTopic failed", e); + } + } + + public AppendResult append(String key, long expectedVersion, Event event) { + try { + int sv = event.getSchemaVersion() > 0 ? event.getSchemaVersion() : 1; + StringBuilder cmd = new StringBuilder(); + cmd.append("APPEND_STREAM topic=") + .append(topic) + .append(" key=") + .append(key) + .append(" version=") + .append(expectedVersion) + .append(" event_type=") + .append(event.getType()) + .append(" schema_version=") + .append(sv) + .append(" producerId=") + .append(producerId); + if (event.getMetadata() != null && !event.getMetadata().isEmpty()) { + cmd.append(" metadata=").append(event.getMetadata()); + } + cmd.append(" message=").append(event.getPayload()); + + String resp = sendCommand(cmd.toString()); + if (resp.startsWith("ERROR")) { + throw new CursusException("append: " + resp); + } + return parseAppendResponse(resp); + } catch (CursusException e) { + throw e; + } catch (Exception e) { + resetSocket(); + throw new CursusConnectionException("append failed", e); + } + } + + private AppendResult parseAppendResponse(String resp) { + long version = 0, offset = 0; + int partition = 0; + for (String part : resp.split("\\s+")) { + String[] kv = part.split("=", 2); + if (kv.length != 2) continue; + switch (kv[0]) { + case "version" -> version = Long.parseLong(kv[1]); + case "offset" -> offset = Long.parseLong(kv[1]); + case "partition" -> partition = Integer.parseInt(kv[1]); + default -> {} + } + } + return new AppendResult(version, offset, partition); + } + + public StreamData readStream(String key) { + return readStream(key, 0); + } + + public StreamData readStream(String key, long fromVersion) { + try { + String cmd = "READ_STREAM topic=" + topic + " key=" + key; + if (fromVersion > 0) cmd += " from_version=" + fromVersion; + + Socket s = getSocket(); + OutputStream out = s.getOutputStream(); + + byte[] cmdBytes = cmd.getBytes(StandardCharsets.UTF_8); + byte[] payload = new byte[2 + cmdBytes.length]; + System.arraycopy(cmdBytes, 0, payload, 2, cmdBytes.length); + byte[] frame = new byte[4 + payload.length]; + frame[0] = (byte) (payload.length >> 24); + frame[1] = (byte) (payload.length >> 16); + frame[2] = (byte) (payload.length >> 8); + frame[3] = (byte) (payload.length); + System.arraycopy(payload, 0, frame, 4, payload.length); + out.write(frame); + out.flush(); + + // Frame 1: JSON envelope + byte[] envData = readFrame(); + JsonNode envelope = MAPPER.readTree(envData); + + Snapshot snapshot = null; + JsonNode snapNode = envelope.get("snapshot"); + if (snapNode != null && !snapNode.isNull()) { + snapshot = new Snapshot(snapNode.get("version").asLong(), snapNode.get("payload").asText()); + } + + // Frame 2: Binary batch + byte[] batchData = readFrame(); + List events = new ArrayList<>(); + if (batchData.length > 0) { + List messages = ProtocolDecoder.decodeBatchMessages(batchData); + for (CursusMessage m : messages) { + events.add( + new StreamEvent( + m.getAggregateVersion(), m.getOffset(), + m.getEventType(), (int) m.getSchemaVersion(), + m.getPayload(), m.getMetadata())); + } + } + + return new StreamData(snapshot, events); + } catch (Exception e) { + resetSocket(); + throw new CursusConnectionException("readStream failed", e); + } + } + + public void saveSnapshot(String key, long version, String payload) { + try { + String resp = + sendCommand( + "SAVE_SNAPSHOT topic=" + + topic + + " key=" + + key + + " version=" + + version + + " message=" + + payload); + if (resp.startsWith("ERROR")) { + throw new CursusException("saveSnapshot: " + resp); + } + } catch (CursusException e) { + throw e; + } catch (Exception e) { + resetSocket(); + throw new CursusConnectionException("saveSnapshot failed", e); + } + } + + public Snapshot readSnapshot(String key) { + try { + String resp = sendCommand("READ_SNAPSHOT topic=" + topic + " key=" + key); + if ("NULL".equals(resp) || resp.contains("NOT_FOUND")) return null; + if (resp.startsWith("ERROR")) { + throw new CursusException("readSnapshot: " + resp); + } + JsonNode obj = MAPPER.readTree(resp); + return new Snapshot(obj.get("version").asLong(), obj.get("payload").asText()); + } catch (CursusException e) { + throw e; + } catch (Exception e) { + resetSocket(); + throw new CursusConnectionException("readSnapshot failed", e); + } + } + + public long streamVersion(String key) { + try { + String resp = sendCommand("STREAM_VERSION topic=" + topic + " key=" + key); + return Long.parseLong(resp.trim()); + } catch (NumberFormatException e) { + throw new CursusException("streamVersion: invalid response"); + } catch (Exception e) { + resetSocket(); + throw new CursusConnectionException("streamVersion failed", e); + } + } + + @Override + public void close() { + resetSocket(); + } +} diff --git a/cursus-client/src/main/java/io/cursus/client/eventstore/Event.java b/cursus-client/src/main/java/io/cursus/client/eventstore/Event.java new file mode 100644 index 0000000..ceae100 --- /dev/null +++ b/cursus-client/src/main/java/io/cursus/client/eventstore/Event.java @@ -0,0 +1,13 @@ +package io.cursus.client.eventstore; + +import lombok.Builder; +import lombok.Getter; + +@Getter +@Builder +public class Event { + private final String type; + private final String payload; + @Builder.Default private final int schemaVersion = 1; + @Builder.Default private final String metadata = ""; +} diff --git a/cursus-client/src/main/java/io/cursus/client/eventstore/Snapshot.java b/cursus-client/src/main/java/io/cursus/client/eventstore/Snapshot.java new file mode 100644 index 0000000..649f758 --- /dev/null +++ b/cursus-client/src/main/java/io/cursus/client/eventstore/Snapshot.java @@ -0,0 +1,14 @@ +package io.cursus.client.eventstore; + +import lombok.Getter; + +@Getter +public class Snapshot { + private final long version; + private final String payload; + + public Snapshot(long version, String payload) { + this.version = version; + this.payload = payload; + } +} diff --git a/cursus-client/src/main/java/io/cursus/client/eventstore/StreamData.java b/cursus-client/src/main/java/io/cursus/client/eventstore/StreamData.java new file mode 100644 index 0000000..7489b9b --- /dev/null +++ b/cursus-client/src/main/java/io/cursus/client/eventstore/StreamData.java @@ -0,0 +1,15 @@ +package io.cursus.client.eventstore; + +import java.util.List; +import lombok.Getter; + +@Getter +public class StreamData { + private final Snapshot snapshot; + private final List events; + + public StreamData(Snapshot snapshot, List events) { + this.snapshot = snapshot; + this.events = events; + } +} diff --git a/cursus-client/src/main/java/io/cursus/client/eventstore/StreamEvent.java b/cursus-client/src/main/java/io/cursus/client/eventstore/StreamEvent.java new file mode 100644 index 0000000..3e214a7 --- /dev/null +++ b/cursus-client/src/main/java/io/cursus/client/eventstore/StreamEvent.java @@ -0,0 +1,23 @@ +package io.cursus.client.eventstore; + +import lombok.Getter; + +@Getter +public class StreamEvent { + private final long version; + private final long offset; + private final String type; + private final int schemaVersion; + private final String payload; + private final String metadata; + + public StreamEvent( + long version, long offset, String type, int schemaVersion, String payload, String metadata) { + this.version = version; + this.offset = offset; + this.type = type; + this.schemaVersion = schemaVersion; + this.payload = payload; + this.metadata = metadata; + } +} diff --git a/cursus-client/src/main/java/io/cursus/client/producer/CursusProducer.java b/cursus-client/src/main/java/io/cursus/client/producer/CursusProducer.java index 9d480ac..9a45e83 100644 --- a/cursus-client/src/main/java/io/cursus/client/producer/CursusProducer.java +++ b/cursus-client/src/main/java/io/cursus/client/producer/CursusProducer.java @@ -14,6 +14,7 @@ import io.cursus.client.util.Backoff; import io.cursus.client.util.ExecutorFactory; import io.cursus.client.util.FnvHash; +import java.io.IOException; import java.nio.charset.StandardCharsets; import java.time.Duration; import java.util.*; @@ -43,6 +44,7 @@ public class CursusProducer implements AutoCloseable { private final Map batchStates = new ConcurrentHashMap<>(); private final Set> inflightFutures = ConcurrentHashMap.newKeySet(); private final AtomicLong batchIdGenerator = new AtomicLong(0); + private final Map partitionLeaders = new ConcurrentHashMap<>(); private CursusProducerMetrics metrics; private static final Pattern PARTITION_COUNT_PATTERN = Pattern.compile("partitions=(\\d+)"); @@ -87,8 +89,15 @@ public CursusProducer(CursusProducerConfig config, Object metricsRegistry) { partitionBuffers[i] = new PartitionBuffer(i, config.getBatchSize(), config.getBufferSize()); } + fetchMetadata(); + for (int i = 0; i < verifiedPartitions; i++) { - connectionManager.connectPartition(i); + String leaderAddr = partitionLeaders.get(i); + if (leaderAddr != null) { + connectionManager.connectPartitionToAddress(i, leaderAddr); + } else { + connectionManager.connectPartition(i); + } } log.info( @@ -159,6 +168,61 @@ private int verifyPartitionCount(String topic, int configured) { } } + private void fetchMetadata() { + try { + String cmd = CommandBuilder.metadata(config.getTopic()); + List brokers = config.getBrokers(); + if (brokers == null || brokers.isEmpty()) return; + String addr = brokers.get(0); + + String[] parts = addr.split(":"); + try (java.net.Socket socket = new java.net.Socket(parts[0], Integer.parseInt(parts[1]))) { + socket.setSoTimeout(5000); + java.io.OutputStream out = socket.getOutputStream(); + java.io.InputStream in = socket.getInputStream(); + + byte[] cmdBytes = cmd.getBytes(StandardCharsets.UTF_8); + byte[] payload = new byte[2 + cmdBytes.length]; + System.arraycopy(cmdBytes, 0, payload, 2, cmdBytes.length); + + byte[] frame = new byte[4 + payload.length]; + frame[0] = (byte) (payload.length >> 24); + frame[1] = (byte) (payload.length >> 16); + frame[2] = (byte) (payload.length >> 8); + frame[3] = (byte) (payload.length); + System.arraycopy(payload, 0, frame, 4, payload.length); + out.write(frame); + out.flush(); + + byte[] lenBuf = in.readNBytes(4); + int respLen = + ((lenBuf[0] & 0xFF) << 24) + | ((lenBuf[1] & 0xFF) << 16) + | ((lenBuf[2] & 0xFF) << 8) + | (lenBuf[3] & 0xFF); + byte[] respBytes = in.readNBytes(respLen); + String result = new String(respBytes, StandardCharsets.UTF_8); + + if (result.startsWith("OK")) { + for (String part : result.split("\\s+")) { + if (part.startsWith("leaders=")) { + String[] addrs = part.substring(8).split(","); + for (int i = 0; i < addrs.length; i++) { + String leaderAddr = addrs[i].trim(); + if (!leaderAddr.isEmpty()) { + partitionLeaders.put(i, leaderAddr); + } + } + } + } + log.info("Producer partition leaders: {}", partitionLeaders); + } + } + } catch (IOException e) { + log.debug("Producer metadata fetch failed (non-critical): {}", e.getMessage()); + } + } + public long send(String payload) { return send(payload, null); } diff --git a/cursus-client/src/main/java/io/cursus/client/protocol/CommandBuilder.java b/cursus-client/src/main/java/io/cursus/client/protocol/CommandBuilder.java index 4835a49..565d4b4 100644 --- a/cursus-client/src/main/java/io/cursus/client/protocol/CommandBuilder.java +++ b/cursus-client/src/main/java/io/cursus/client/protocol/CommandBuilder.java @@ -123,4 +123,12 @@ public static String fetchOffset(String topic, int partition, String group) { public static String leaveGroup(String topic, String group, String member) { return "LEAVE_GROUP topic=" + topic + " group=" + group + " member=" + member; } + + public static String findCoordinator(String group) { + return "FIND_COORDINATOR group=" + group; + } + + public static String metadata(String topic) { + return "METADATA topic=" + topic; + } } diff --git a/cursus-client/src/main/java/io/cursus/client/protocol/ProtocolDecoder.java b/cursus-client/src/main/java/io/cursus/client/protocol/ProtocolDecoder.java index cda1994..59d0121 100644 --- a/cursus-client/src/main/java/io/cursus/client/protocol/ProtocolDecoder.java +++ b/cursus-client/src/main/java/io/cursus/client/protocol/ProtocolDecoder.java @@ -57,7 +57,7 @@ public static List decodeBatchMessages(byte[] data) { } public static boolean isErrorResponse(String response) { - return response != null && response.startsWith("ERROR:"); + return response != null && (response.startsWith("ERROR:") || response.startsWith("ERROR ")); } public static boolean isNotLeaderResponse(String response) { diff --git a/cursus-examples/standalone/src/main/java/io/cursus/examples/standalone/EventSourcingExample.java b/cursus-examples/standalone/src/main/java/io/cursus/examples/standalone/EventSourcingExample.java new file mode 100644 index 0000000..49c2553 --- /dev/null +++ b/cursus-examples/standalone/src/main/java/io/cursus/examples/standalone/EventSourcingExample.java @@ -0,0 +1,55 @@ +package io.cursus.examples.standalone; + +import io.cursus.client.eventstore.*; + +public class EventSourcingExample { + + public static void main(String[] args) { + String broker = args.length > 0 ? args[0] : "localhost:9000"; + + try (CursusEventStore es = new CursusEventStore(broker, "orders-es", "order-svc")) { + es.createTopic(4); + + String key = "order-" + System.currentTimeMillis(); + + AppendResult r1 = + es.append( + key, + 1, + Event.builder() + .type("OrderCreated") + .payload("{\"item\":\"widget\",\"qty\":5}") + .build()); + System.out.printf("Append 1: version=%d offset=%d%n", r1.getVersion(), r1.getOffset()); + + AppendResult r2 = + es.append( + key, + 2, + Event.builder().type("OrderShipped").payload("{\"tracking\":\"ABC123\"}").build()); + System.out.printf("Append 2: version=%d offset=%d%n", r2.getVersion(), r2.getOffset()); + + long ver = es.streamVersion(key); + System.out.printf("StreamVersion: %d%n", ver); + + StreamData stream = es.readStream(key); + System.out.printf("%nStream for %s (%d events):%n", key, stream.getEvents().size()); + for (StreamEvent evt : stream.getEvents()) { + System.out.printf(" v%d: %s -- %s%n", evt.getVersion(), evt.getType(), evt.getPayload()); + } + + es.saveSnapshot(key, ver, "{\"state\":\"shipped\"}"); + Snapshot snap = es.readSnapshot(key); + if (snap != null) { + System.out.printf("Snapshot: v%d payload=%s%n", snap.getVersion(), snap.getPayload()); + } + + // Version conflict test + try { + es.append(key, 1, Event.builder().type("Stale").payload("{}").build()); + } catch (Exception e) { + System.out.printf("Version conflict (expected): %s%n", e.getMessage()); + } + } + } +} diff --git a/cursus-examples/standalone/src/main/java/io/cursus/examples/standalone/ProduceAndConsume.java b/cursus-examples/standalone/src/main/java/io/cursus/examples/standalone/ProduceAndConsume.java index 0628305..3447731 100644 --- a/cursus-examples/standalone/src/main/java/io/cursus/examples/standalone/ProduceAndConsume.java +++ b/cursus-examples/standalone/src/main/java/io/cursus/examples/standalone/ProduceAndConsume.java @@ -15,13 +15,14 @@ public class ProduceAndConsume { public static void main(String[] args) throws Exception { + String broker = args.length > 0 ? args[0] : "localhost:9000"; String topic = "e2e-test-" + System.currentTimeMillis(); int messageCount = 5; // === Produce === CursusProducerConfig prodConfig = CursusProducerConfig.builder() - .brokers(List.of("localhost:9000")) + .brokers(List.of(broker)) .topic(topic) .partitions(1) .acks(Acks.ONE) @@ -41,9 +42,9 @@ public static void main(String[] args) throws Exception { // === Consume === CursusConsumerConfig consConfig = CursusConsumerConfig.builder() - .brokers(List.of("localhost:9000")) + .brokers(List.of(broker)) .topic(topic) - .groupId("e2e-group") + .groupId("e2e-group-" + System.currentTimeMillis()) .consumerMode(ConsumerMode.POLLING) .maxPollRecords(100) .build(); diff --git a/docs/architecture.md b/docs/architecture.md index bdc31e5..76a40a8 100644 --- a/docs/architecture.md +++ b/docs/architecture.md @@ -29,100 +29,101 @@ cursus-java (root Gradle project) ## Layer Diagram -``` -┌───────────────────────────────────────────────────────┐ -│ Public API │ -│ CursusProducer · CursusConsumer · @CursusListener│ -├───────────────────────────────────────────────────────┤ -│ Configuration Layer │ -│ CursusProducerConfig · CursusConsumerConfig │ -│ CursusProperties (Spring) │ -├───────────────────────────────────────────────────────┤ -│ Protocol Layer │ -│ ProtocolEncoder · ProtocolDecoder · CommandBuilder│ -├───────────────────────────────────────────────────────┤ -│ Connection Layer │ -│ ConnectionManager · CursusClientHandler │ -├───────────────────────────────────────────────────────┤ -│ Netty Pipeline │ -│ CursusFrameDecoder (4-byte length prefix) │ -│ CursusFrameEncoder (4-byte length prefix) │ -├───────────────────────────────────────────────────────┤ -│ Transport: TCP / NioEventLoopGroup │ -├───────────────────────────────────────────────────────┤ -│ Cursus Broker (Go) port 9000 │ -└───────────────────────────────────────────────────────┘ +```mermaid +flowchart TB + A["Public API\nCursusProducer · CursusConsumer · @CursusListener"] + B["Configuration Layer\nCursusProducerConfig · CursusConsumerConfig · CursusProperties (Spring)"] + C["Protocol Layer\nProtocolEncoder · ProtocolDecoder · CommandBuilder"] + D["Connection Layer\nConnectionManager · CursusClientHandler"] + E["Netty Pipeline\nCursusFrameDecoder (4-byte length prefix)\nCursusFrameEncoder (4-byte length prefix)"] + F["Transport: TCP / NioEventLoopGroup"] + G["Cursus Broker (Go) port 9000"] + + A --> B --> C --> D --> E --> F --> G ``` ## Producer Data Flow -``` -Application calls producer.send(payload, key) - │ - ▼ -Partition selection - key != null → FnvHash.partition(key, numPartitions) [FNV-1a mod] - key == null → roundRobinCounter % numPartitions - │ - ▼ -PartitionBuffer.add(payload, key) returns seqNum - │ - ▼ -Buffer reaches batchSize? ──yes──► PartitionBuffer.drain() -lingerMs timer fires? ──yes──► PartitionBuffer.forceFlush() -producer.flush() called? ──yes──► all buffers forceFlush() - │ - ▼ -ProtocolEncoder.encodeBatchMessages(...) - magic 0xBA7C + header (topic, partition, acks, idempotent, - seqStart, seqEnd, count) + per-message fields - │ - ▼ -compressionType != "none"? ──yes──► CursusCompressor.compress(bytes) - │ - ▼ -ConnectionManager.send(bytes) - → resolveLeader() → NioSocketChannel.writeAndFlush() - │ - ▼ (async CompletableFuture) -CursusClientHandler receives response bytes - │ - ▼ -ProtocolDecoder.decodeAckResponse(bytes) → AckResponse - status == "OK" → uniqueAckCount += batch.size() - NOT_LEADER → updateLeader(null), retry with backoff - error → retry up to maxRetries +```mermaid +sequenceDiagram + participant App as Application + participant P as CursusProducer + participant PB as PartitionBuffer + participant PE as ProtocolEncoder + participant CC as CursusCompressor + participant CM as ConnectionManager + participant B as Cursus Broker + + App->>P: send(payload, key) + Note over P: Partition selection
key != null → FnvHash.partition(key, N)
key == null → roundRobin % N + P->>PB: add(payload, key) → seqNum + P-->>App: returns seqNum + + alt batchSize reached + PB->>PB: drain() + else lingerMs timer fires + PB->>PB: forceFlush() + else producer.flush() called + PB->>PB: forceFlush() all buffers + end + + PB->>PE: encodeBatchMessages(...) + Note over PE: magic 0xBA7C + header
(topic, partition, acks,
idempotent, seqStart, seqEnd,
count) + per-message fields + + alt compressionType != "none" + PE->>CC: compress(bytes) + CC-->>PE: compressed bytes + end + + PE->>CM: send(bytes) + CM->>CM: resolveLeader() + CM->>B: NioSocketChannel.writeAndFlush() + + B-->>CM: ACK response bytes (async) + CM->>P: ProtocolDecoder.decodeAckResponse(bytes) + + alt status == "OK" + P->>P: uniqueAckCount += batch.size() + else NOT_LEADER + P->>P: updateLeader(null), retry with backoff + else error + P->>P: retry up to maxRetries + end ``` ## Consumer Data Flow -``` -Broker ──TCP──► CursusFrameDecoder ──► CursusClientHandler - (strips 4-byte length prefix, max 64 MB) - │ - ▼ - CompletableFuture resolved - │ - PartitionConsumer - │ - ┌──────────────┴──────────────┐ - │ STREAMING mode │ POLLING mode - │ STREAM topic part offset │ CONSUME topic part offset - │ broker pushes frames │ client polls each loop iter - └──────────────┬──────────────┘ - │ - ProtocolDecoder.decodeBatchMessages(bytes) - │ - ▼ - for each CursusMessage → handler.accept(msg) - │ - ▼ - currentOffset = msg.getOffset() + 1 - │ - commitScheduler (autoCommitInterval) - │ - CommandBuilder.commit(topic, groupId, partition, offset) - ConnectionManager.sendCommand(cmd) +```mermaid +sequenceDiagram + participant B as Cursus Broker + participant FD as CursusFrameDecoder + participant CH as CursusClientHandler + participant PC as PartitionConsumer + participant PD as ProtocolDecoder + participant H as Message Handler + participant CS as Commit Scheduler + participant CM as ConnectionManager + + B->>FD: TCP frame (4-byte length prefix + payload, max 64 MB) + FD->>CH: stripped payload bytes + CH->>CH: CompletableFuture resolved + CH->>PC: bytes + + alt STREAMING mode + Note over PC,B: STREAM topic partition offset
Broker pushes frames continuously + else POLLING mode + Note over PC,B: CONSUME topic partition offset
Client polls each loop iteration + end + + PC->>PD: decodeBatchMessages(bytes) + loop for each CursusMessage + PD->>H: handler.accept(msg) + PC->>PC: currentOffset = msg.getOffset() + 1 + end + + CS->>CS: autoCommitInterval fires + CS->>CM: CommandBuilder.commit(topic, groupId, partition, offset) + CM->>B: COMMIT command ``` ## Go SDK Mapping diff --git a/docs/cluster-consumer.md b/docs/cluster-consumer.md new file mode 100644 index 0000000..41167fa --- /dev/null +++ b/docs/cluster-consumer.md @@ -0,0 +1,126 @@ +# Cluster Consumer Architecture + +FindCoordinator-based routing architecture for consuming messages in a clustered environment. + +## Overview + +Following the same pattern as Kafka, consumer group commands are routed to the coordinator broker while data commands are routed to the partition leader. + +## Command Routing + +```mermaid +flowchart TB + Consumer[Consumer SDK] + + subgraph "1. Coordinator Discovery" + ANY[Any Broker] + FC[FIND_COORDINATOR] + Consumer -->|"①"| ANY + ANY -->|"group=my-group"| FC + FC -->|"OK host=H port=P"| Consumer + end + + subgraph "2. Group Commands → Coordinator" + COORD[Coordinator Broker] + Consumer -->|"②"| COORD + COORD --- JOIN[JOIN_GROUP] + COORD --- SYNC[SYNC_GROUP] + COORD --- HB[HEARTBEAT] + COORD --- LEAVE[LEAVE_GROUP] + COORD --- FETCH[FETCH_OFFSET] + COORD --- COMMIT[COMMIT_OFFSET] + COORD --- BATCH[BATCH_COMMIT] + end + + subgraph "3. Data Commands → Partition Leader" + META[METADATA] + Consumer -->|"③"| ANY + ANY -->|"leaders=H1:P1,H2:P2"| META + end + + subgraph "4. Consume from Leaders" + L1[Partition 0 Leader] + L2[Partition 1 Leader] + Consumer -->|"CONSUME P0"| L1 + Consumer -->|"CONSUME P1"| L2 + end +``` + +## Command Routing Table + +| Command | Target | Error Handling | +|---|---|---| +| `FIND_COORDINATOR` | Any broker | Retry with next broker | +| `JOIN_GROUP` | Coordinator | NOT_COORDINATOR → re-discover | +| `SYNC_GROUP` | Coordinator | NOT_COORDINATOR → re-discover | +| `LEAVE_GROUP` | Coordinator | NOT_COORDINATOR → re-discover | +| `HEARTBEAT` | Coordinator | NOT_COORDINATOR → re-discover | +| `FETCH_OFFSET` | Coordinator | NOT_COORDINATOR → re-discover | +| `COMMIT_OFFSET` | Coordinator | NOT_COORDINATOR → re-discover | +| `BATCH_COMMIT` | Coordinator | NOT_COORDINATOR → re-discover | +| `CONSUME` | Partition Leader | NOT_LEADER → update leader cache | +| `STREAM` | Partition Leader | NOT_LEADER → update leader cache | + +## Consumer Lifecycle (Cluster) + +```mermaid +sequenceDiagram + participant C as Consumer + participant B as Any Broker + participant CO as Coordinator + participant PL as Partition Leader + + C->>B: FIND_COORDINATOR group=G + B-->>C: OK host=H port=P + + C->>CO: JOIN_GROUP topic=T group=G member=M + CO-->>C: OK generation=1 member=M-1234 assignments=[0,1] + + C->>CO: FETCH_OFFSET topic=T partition=0 group=G + CO-->>C: 0 + + C->>B: METADATA topic=T + B-->>C: OK leaders=H1:P1,H2:P2 + + loop Poll Loop + C->>PL: CONSUME topic=T partition=0 offset=0 member=M-1234 group=G generation=1 + PL-->>C: batch(messages) + end + + par Heartbeat (every 3s) + C->>CO: HEARTBEAT topic=T group=G member=M-1234 generation=1 + CO-->>C: OK + end +``` + +## Error Recovery + +```mermaid +flowchart TD + SEND[Send Command] + SEND --> CHECK{Response?} + + CHECK -->|NOT_COORDINATOR| REDISC[Re-discover Coordinator] + REDISC --> PARSE{host/port in response?} + PARSE -->|Yes| UPDATE[Update coordinator addr] + PARSE -->|No| FIND[FIND_COORDINATOR] + UPDATE --> RETRY[Retry command] + FIND --> RETRY + + CHECK -->|NOT_LEADER LEADER_IS addr| ULEAD[Update partition leader cache] + ULEAD --> RETRY + + CHECK -->|OK / data| DONE[Process response] +``` + +## Implementation Details + +### Java SDK + +- `CursusConsumer.findCoordinator()` — sends `FIND_COORDINATOR` via `sendPlainSocket` +- `CursusConsumer.sendCoordinatorCommandSync(cmd)` — sends to coordinator, retries on NOT_COORDINATOR +- `CursusConsumer.fetchMetadata()` — sends `METADATA`, populates `partitionLeaders` map +- `PartitionConsumer.sendPlainCommand(addr, cmd)` — per-request TCP socket (like Python SDK) +- `PartitionConsumer.runPollingLoop()` — CONSUME to partition leader, handles NOT_LEADER redirect +- `CommandBuilder.findCoordinator(group)` / `CommandBuilder.metadata(topic)` — new commands +- `ProtocolDecoder.isErrorResponse()` — now matches both `"ERROR:"` and `"ERROR "` prefixes diff --git a/docs/consumer-guide.md b/docs/consumer-guide.md index 02d908d..d18efd2 100644 --- a/docs/consumer-guide.md +++ b/docs/consumer-guide.md @@ -64,6 +64,26 @@ The consumer sends a `STREAM topic partition offset` command and then the broker The consumer sends a `CONSUME topic partition offset` command for each poll cycle. After receiving a batch it loops immediately; if the broker returns an empty response it backs off before retrying. Use polling when you want pull-based flow control or when integrating with a framework that has its own polling loop. +### Mode Comparison + +```mermaid +flowchart LR + subgraph STREAMING + direction TB + S1["STREAM topic partition offset"] --> S2["Broker pushes batches\ncontinuously"] + S2 --> S3["Long-lived connection\nauto-reconnects"] + S3 --> S4["Low latency\nHigh throughput"] + end + subgraph POLLING + direction TB + P1["CONSUME topic partition offset"] --> P2{Empty\nresponse?} + P2 -- yes --> P3["Back off, retry"] + P3 --> P1 + P2 -- no --> P4["Process batch,\nloop immediately"] + P4 --> P1 + end +``` + ## Consumer Groups Set `groupId` to make the consumer a member of a named group. The broker assigns a subset of the topic's partitions to each group member using modulo-based assignment. @@ -75,6 +95,34 @@ The consumer performs the full group lifecycle automatically: 3. **Heartbeat** — every `heartbeatIntervalMs` the consumer sends `HEARTBEAT` to keep the session alive. If the broker replies with `REBALANCE_REQUIRED` or `GEN_MISMATCH`, the consumer stops its partition consumers and rejoins the group. 4. **LEAVE_GROUP** — sent automatically when `close()` is called. +```mermaid +sequenceDiagram + participant C as CursusConsumer + participant B as Cursus Broker + + C->>B: JOIN_GROUP topic groupId consumerId + B-->>C: membership info (memberId, generation) + + C->>B: SYNC_GROUP topic groupId memberId generation + B-->>C: assigned partition list + + loop every heartbeatIntervalMs + C->>B: HEARTBEAT topic groupId consumerId memberId generation + alt normal + B-->>C: OK + else rebalance needed + B-->>C: REBALANCE_REQUIRED or GEN_MISMATCH + C->>C: stop PartitionConsumers + C->>B: JOIN_GROUP (rejoin) + end + end + + Note over C,B: consuming messages... + + C->>B: LEAVE_GROUP topic groupId consumerId + B-->>C: partitions reassigned to remaining members +``` + Multiple consumers in the same group can be started as separate processes or as separate `CursusConsumer` instances within the same JVM: ```java @@ -99,6 +147,24 @@ for (int i = 0; i < 2; i++) { Offsets are committed automatically on a periodic schedule controlled by `autoCommitInterval` (default 5 seconds). Internally the commit scheduler calls `COMMIT topic groupId partition offset` for each active partition consumer. +```mermaid +sequenceDiagram + participant B as Cursus Broker + participant PC as PartitionConsumer + participant H as Message Handler + participant CS as Commit Scheduler + + B->>PC: batch of messages + loop for each message + PC->>H: handler.accept(msg) + PC->>PC: currentOffset = msg.getOffset() + 1 + end + + Note over CS: autoCommitInterval elapses (default 5s) + CS->>B: COMMIT topic groupId partition currentOffset + B-->>CS: OK +``` + You do not need to call any commit method manually unless you want finer control. If you require lower commit latency, reduce `autoCommitInterval`: ```java diff --git a/docs/getting-started.md b/docs/getting-started.md index 31dfeca..87d7eb2 100644 --- a/docs/getting-started.md +++ b/docs/getting-started.md @@ -133,6 +133,21 @@ Run it in a second terminal while the producer is running: ./gradlew :cursus-examples:standalone:run --main-class io.cursus.examples.standalone.SimpleConsumer ``` +## Quick Start Flow + +```mermaid +flowchart TB + A["Add dependency\ncursus-client or\ncursus-spring-boot-starter"] --> B["Start Cursus broker\ndocker run -p 9000:9000 cursusio/cursus:latest"] + B --> C["Build CursusProducerConfig\n.brokers() .topic() .partitions() .acks()"] + C --> D["Construct CursusProducer\ntry-with-resources or shutdown hook"] + D --> E["producer.send(payload)"] + E --> F["producer.flush()\nwait for ACKs"] + F --> G["Build CursusConsumerConfig\n.brokers() .topic() .groupId() .consumerMode()"] + G --> H["Construct CursusConsumer\nregister shutdown hook"] + H --> I["consumer.start(handler)\nblocks — receives messages"] + I --> J["Ctrl+C → shutdown hook\nconsumer.close() → LEAVE_GROUP"] +``` + ## Next steps - [Producer Guide](producer-guide.md) — batching, compression, idempotency, monitoring diff --git a/docs/producer-guide.md b/docs/producer-guide.md index 056a2a3..40803d7 100644 --- a/docs/producer-guide.md +++ b/docs/producer-guide.md @@ -47,6 +47,16 @@ producer.send("payload", "customer-123"); // partition = Math.abs(FNV-1a("customer-123") % numPartitions) ``` +```mermaid +flowchart TB + A["producer.send(payload, key?)"] --> B{key provided?} + B -- yes --> C["FnvHash.partition(key, numPartitions)\nFNV-1a mod — deterministic"] + B -- no --> D["roundRobinCounter++ % numPartitions\nRound-robin distribution"] + C --> E["PartitionBuffer[N].add(payload, key)"] + D --> E + E --> F["returns seqNum"] +``` + The FNV-1a hash is the same algorithm used by the Go SDK (`hash/fnv`), so a given key always maps to the same partition number regardless of which SDK produced the message. Messages with the same key are always routed to the same partition, preserving ordering for that key. ## Batching @@ -56,6 +66,23 @@ Messages accumulate in a per-partition buffer and are flushed as a batch when ei - **`batchSize`** — the buffer holds this many messages before flushing automatically. - **`lingerMs`** — a background scheduler fires every `lingerMs` milliseconds and flushes all non-empty buffers. +```mermaid +flowchart TB + A["Message added to PartitionBuffer"] --> B{batchSize\nreached?} + B -- yes --> D["PartitionBuffer.drain()\nauto-flush"] + B -- no --> C{lingerMs\ntimer fires?} + C -- yes --> E["PartitionBuffer.forceFlush()"] + C -- no --> F{producer.flush()\ncalled?} + F -- yes --> G["All buffers forceFlush()"] + F -- no --> A + + D --> H["ProtocolEncoder.encodeBatchMessages()"] + E --> H + G --> H + H --> I["ConnectionManager.send(bytes)"] + I --> J["Broker ACK → uniqueAckCount++"] +``` + You can also flush all partitions synchronously: ```java diff --git a/docs/protocol.md b/docs/protocol.md index 437913f..d63f0c4 100644 --- a/docs/protocol.md +++ b/docs/protocol.md @@ -24,6 +24,18 @@ Every message sent and received over the TCP connection is wrapped in a length-p └──────────────────────────────────────────────────────────────┘ ``` +```mermaid +flowchart LR + subgraph Frame + direction LR + A["4 bytes\nuint32 big-endian\nFrame length (N)"] + B["N bytes\nPayload\n(Command or batch body)"] + A --> B + end + C["CursusFrameEncoder\n(outbound)"] --> Frame + Frame --> D["CursusFrameDecoder\n(inbound)"] +``` + The Netty pipeline handles framing transparently via `CursusFrameDecoder` (inbound) and `CursusFrameEncoder` (outbound). Application code never sees the length prefix. ## Commands @@ -45,6 +57,23 @@ Text commands are UTF-8 strings. Fields are separated by a single space. The ent | `BATCH_COMMIT` | `BATCH_COMMIT ` | Commit offsets for multiple partitions in one round-trip | | `SYNC_GROUP` | `SYNC_GROUP ` | Fetch partition assignment after joining a group | +### Command Routing + +```mermaid +flowchart TB + A[Outbound frame payload] --> B{Starts with\n0xBA7C magic?} + B -- yes --> C[Batch Message\nbinary encoding] + B -- no --> D[Text Command\nUTF-8 string] + + C --> E[Producer path:\nProtocolEncoder.encodeBatchMessages] + D --> F{Command type} + + F --> G[Topic management\nCREATE / DELETE / LIST] + F --> H[Consumer group\nJOIN_GROUP / LEAVE_GROUP\nSYNC_GROUP / SUBSCRIBE] + F --> I[Streaming & polling\nSTREAM / CONSUME] + F --> J[Offset & heartbeat\nCOMMIT / BATCH_COMMIT\nHEARTBEAT] +``` + ## Batch Message Encoding Producers send messages in binary batch format. The batch body (after the frame length prefix) has the following layout: diff --git a/docs/spring-boot-integration.md b/docs/spring-boot-integration.md index a3874a1..30622d6 100644 --- a/docs/spring-boot-integration.md +++ b/docs/spring-boot-integration.md @@ -52,6 +52,22 @@ cursus: See [Configuration Reference](configuration-reference.md) for a complete list with types and defaults. +## Auto-Configuration Flow + +```mermaid +flowchart TB + A["Spring Boot application starts"] --> B["Auto-configuration loaded\nvia spring.factories / AutoConfiguration.imports"] + B --> C["CursusAutoConfiguration evaluates\n@ConditionalOnProperty conditions"] + C --> D{cursus.producer.topic\npresent?} + C --> E{cursus.consumer.topic\npresent?} + D -- yes --> F["CursusProducer bean created\nfrom CursusProducerConfig"] + D -- no --> G["Producer bean skipped"] + E -- yes --> H["CursusConsumer bean created\nfrom CursusConsumerConfig"] + E -- no --> I["Consumer bean skipped"] + B --> J["CursusListenerRegistrar\n(BeanPostProcessor) scans all beans\nfor @CursusListener methods"] + J --> K["For each @CursusListener method:\ncreate CursusConsumer instance\nrun on cursus-listener thread pool"] +``` + ## Auto-Configured Beans ### CursusProducer @@ -97,6 +113,31 @@ A `CursusConsumer` bean is created when `cursus.consumer.topic` is set. This bea Annotate any `void` method on a Spring-managed bean (`@Service`, `@Component`, etc.) with `@CursusListener`. The method must accept a single `CursusMessage` parameter. +### @CursusListener Processing Flow + +```mermaid +sequenceDiagram + participant SC as Spring Context + participant LR as CursusListenerRegistrar + participant CC as CursusConsumer + participant B as Cursus Broker + participant M as @CursusListener method + + SC->>LR: postProcessAfterInitialization(bean) + LR->>LR: scan bean methods for @CursusListener + LR->>CC: new CursusConsumer(config from annotation + cursus.brokers) + LR->>CC: start(msg -> method.invoke(bean, msg))\non cursus-listener thread pool + + loop receiving messages + B->>CC: push/poll batch + CC->>M: invoke(bean, message) + end + + SC->>LR: destroy() — application shutdown + LR->>CC: close() for each managed consumer + CC->>B: LEAVE_GROUP +``` + ```java import io.cursus.client.message.CursusMessage; import io.cursus.spring.annotation.CursusListener;