Topic writers imlementation update#622
Conversation
Codecov Report❌ Patch coverage is Additional details and impacted files@@ Coverage Diff @@
## master #622 +/- ##
============================================
+ Coverage 69.59% 69.71% +0.11%
- Complexity 3172 3197 +25
============================================
Files 362 366 +4
Lines 15455 15430 -25
Branches 1628 1617 -11
============================================
+ Hits 10756 10757 +1
+ Misses 4033 4019 -14
+ Partials 666 654 -12 ☔ View full report in Codecov by Sentry. 🚀 New features to boost your workflow:
|
There was a problem hiding this comment.
Pull request overview
This PR refactors the topic writer implementation by extracting queue/buffer management into new components and adjusting integration tests accordingly.
Changes:
- Introduces
WriterQueue+BufferManagerto manage in-flight buffering, compression readiness, seqNo, and flush behavior. - Reworks
WriterImplsend/ack flow to useWriterQueue, and updatesMessageSenderto send per-message protobufs fromSentMessage. - Updates integration tests around codec validation and writer buffer behavior/timeouts.
Reviewed changes
Copilot reviewed 11 out of 11 changed files in this pull request and generated 12 comments.
Show a summary per file
| File | Description |
|---|---|
| topic/src/test/java/tech/ydb/topic/impl/YdbTopicsCodecIntegrationTest.java | Un-ignores codec error tests; adds per-test timeout rule; expects IllegalArgumentException for unsupported codec. |
| topic/src/test/java/tech/ydb/topic/impl/TopicWritersIntegrationTest.java | Modifies overflow test to stop asserting overflow behavior. |
| topic/src/main/java/tech/ydb/topic/write/impl/WriterQueue.java | New queue/seqNo/flush/compression orchestration for writer. |
| topic/src/main/java/tech/ydb/topic/write/impl/WriterImpl.java | Refactors writer to delegate buffering/sending/ack handling to WriterQueue and MessageSender. |
| topic/src/main/java/tech/ydb/topic/write/impl/SyncWriterImpl.java | Routes sync writer sends through new queue-based APIs. |
| topic/src/main/java/tech/ydb/topic/write/impl/SentMessage.java | New immutable message wrapper used for sending protobuf requests and tracking ack futures. |
| topic/src/main/java/tech/ydb/topic/write/impl/MessageSender.java | Simplifies batching logic; computes overheads statically and flushes based on predicted size. |
| topic/src/main/java/tech/ydb/topic/write/impl/MessageMeta.java | New metadata holder extracted from Message. |
| topic/src/main/java/tech/ydb/topic/write/impl/EnqueuedMessage.java | Simplifies enqueued message representation to readiness + ack future + encoded data. |
| topic/src/main/java/tech/ydb/topic/write/impl/BufferManager.java | New semaphore-based buffer limiter for bytes + message count. |
| topic/src/main/java/tech/ydb/topic/write/impl/AsyncWriterImpl.java | Routes async writer sends through new queue-based APIs. |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| SentMessage nextMessageToSend() { | ||
| Iterator<EnqueuedMessage> it = queue.iterator(); | ||
| while (it.hasNext()) { | ||
| EnqueuedMessage next = it.next(); | ||
|
|
||
| if (next.hasProblem()) { | ||
| it.remove(); | ||
| buffer.releaseMessage(next.getBufferSize()); |
There was a problem hiding this comment.
queue is a ConcurrentLinkedQueue, whose iterator does not support remove(). Calling it.remove() here will throw UnsupportedOperationException at runtime, breaking sending/ack handling. Use poll()/peek() (or queue.remove(next) with care) to dequeue elements instead of iterator removal.
| int msgSize = Math.min(message.getData().length, buffer.getMaxSize()); | ||
| buffer.acquire(msgSize); | ||
| return accept(message, tx, msgSize); | ||
| } | ||
|
|
||
| CompletableFuture<WriteAck> tryEnqueue(Message message, YdbTransaction tx) throws QueueOverflowException { | ||
| int msgSize = Math.min(message.getData().length, buffer.getMaxSize()); | ||
| buffer.tryAcquire(msgSize); | ||
| return accept(message, tx, msgSize); | ||
| } | ||
|
|
||
| CompletableFuture<WriteAck> tryEnqueue(Message message, YdbTransaction tx, long timeout, TimeUnit unit) | ||
| throws QueueOverflowException, InterruptedException, TimeoutException { | ||
| int msgSize = Math.min(message.getData().length, buffer.getMaxSize()); | ||
| buffer.tryAcquire(msgSize, timeout, unit); | ||
| return accept(message, tx, msgSize); | ||
| } | ||
|
|
There was a problem hiding this comment.
Message size accounting uses Math.min(message.getData().length, buffer.getMaxSize()), which under-accounts messages larger than the configured max. This defeats the buffer limit (and can lead to unbounded memory usage) while still sending the full payload. Reject oversize messages (or account for full size) instead of clamping the acquired size.
| int msgSize = Math.min(message.getData().length, buffer.getMaxSize()); | |
| buffer.acquire(msgSize); | |
| return accept(message, tx, msgSize); | |
| } | |
| CompletableFuture<WriteAck> tryEnqueue(Message message, YdbTransaction tx) throws QueueOverflowException { | |
| int msgSize = Math.min(message.getData().length, buffer.getMaxSize()); | |
| buffer.tryAcquire(msgSize); | |
| return accept(message, tx, msgSize); | |
| } | |
| CompletableFuture<WriteAck> tryEnqueue(Message message, YdbTransaction tx, long timeout, TimeUnit unit) | |
| throws QueueOverflowException, InterruptedException, TimeoutException { | |
| int msgSize = Math.min(message.getData().length, buffer.getMaxSize()); | |
| buffer.tryAcquire(msgSize, timeout, unit); | |
| return accept(message, tx, msgSize); | |
| } | |
| int msgSize = getValidatedMessageSize(message); | |
| buffer.acquire(msgSize); | |
| return accept(message, tx, msgSize); | |
| } | |
| CompletableFuture<WriteAck> tryEnqueue(Message message, YdbTransaction tx) throws QueueOverflowException { | |
| int msgSize = getValidatedMessageSize(message); | |
| buffer.tryAcquire(msgSize); | |
| return accept(message, tx, msgSize); | |
| } | |
| CompletableFuture<WriteAck> tryEnqueue(Message message, YdbTransaction tx, long timeout, TimeUnit unit) | |
| throws QueueOverflowException, InterruptedException, TimeoutException { | |
| int msgSize = getValidatedMessageSize(message); | |
| buffer.tryAcquire(msgSize, timeout, unit); | |
| return accept(message, tx, msgSize); | |
| } | |
| private int getValidatedMessageSize(Message message) throws QueueOverflowException { | |
| int msgSize = message.getData().length; | |
| if (msgSize > buffer.getMaxSize()) { | |
| throw new QueueOverflowException("Message size " + msgSize | |
| + " exceeds maximum buffer size " + buffer.getMaxSize()); | |
| } | |
| return msgSize; | |
| } |
| private CompletableFuture<WriteAck> accept(Message message, YdbTransaction tx, int msgSize) { | ||
| EnqueuedMessage msg = new EnqueuedMessage(new MessageMeta(message, tx), msgSize); | ||
| queue.add(msg); | ||
|
|
||
| if (codec.getId() == Codec.RAW) { | ||
| // fast track without compression | ||
| msg.setData(UnsafeByteOperations.unsafeWrap(message.getData()), msgSize); | ||
| readyNotify.run(); | ||
| return msg.getAckFuture(); | ||
| } |
There was a problem hiding this comment.
flush() relies on lastAcceptedMessage, but the RAW fast-path returns before setting it. That makes flush() complete immediately even when RAW messages are still pending/awaiting acks, violating SyncWriter.flush() semantics.
| compressionExecutor.execute(() -> encode(message.getData(), msgSize, msg)); | ||
| } catch (Throwable ex) { | ||
| logger.warn("[{}] Message wasn't sent because of processing error", id, ex); | ||
| msg.setError(ex); |
There was a problem hiding this comment.
If compressionExecutor.execute(...) throws, the message is marked errored but readyNotify is never called. That can leave an errored message stuck in the queue with no trigger to drain it, potentially causing flush()/sending to hang. Ensure the error path also notifies/drains the queue.
| msg.setError(ex); | |
| msg.setError(ex); | |
| readyNotify.run(); |
| public BufferManager(String id, WriterSettings settings) { | ||
| this.id = id; | ||
| this.maxSize = (int) settings.getMaxSendBufferMemorySize(); | ||
| this.maxCount = settings.getMaxSendBufferMessagesCount(); | ||
|
|
||
| this.bytesAvailable = new Semaphore(maxSize, true); | ||
| this.countAvailable = new Semaphore(maxCount, true); | ||
| } |
There was a problem hiding this comment.
maxSendBufferMemorySize is a long in WriterSettings, but it is cast to int here. Values >2GB will overflow and create semaphores with an incorrect permit count. Either validate the setting fits into an int or keep the accounting in long (e.g., split into chunks / use long-based accounting).
| public void tryAcquire(int messageSize, long timeout, TimeUnit unit) throws InterruptedException, | ||
| QueueOverflowException, TimeoutException { | ||
| if (!countAvailable.tryAcquire(timeout, unit)) { | ||
| logger.warn("[{}] Rejecting a message due to reaching message queue in-flight limit of {}", id, | ||
| maxCount); | ||
| throw new QueueOverflowException("Message queue in-flight limit of " + maxSize + " reached"); | ||
| } | ||
|
|
||
| if (!bytesAvailable.tryAcquire(messageSize, timeout, unit)) { | ||
| countAvailable.release(); | ||
| int size = maxCount - countAvailable.availablePermits(); | ||
| String errorMessage = "[" + id + "] Rejecting a message of " + messageSize + | ||
| " bytes: not enough space in message queue. Buffer currently has " + size + | ||
| " messages with " + bytesAvailable.availablePermits() + " / " + maxSize + " bytes available"; | ||
| logger.warn(errorMessage); | ||
| throw new TimeoutException(errorMessage); | ||
| } |
There was a problem hiding this comment.
This method applies the full timeout twice (first for countAvailable, then again for bytesAvailable), so the total blocking time can exceed the caller-provided timeout. Track remaining time between acquires (or acquire both resources under a single deadline) to honor the overall timeout.
| // Unknown type of write ack | ||
| return null; |
There was a problem hiding this comment.
mapAck() can return null for an unknown ack type, but confirmAck completes the user future with that value. This can lead to NPEs downstream and hides protocol errors. Prefer completing the future exceptionally (and logging) when the ack state is unrecognized.
| // Unknown type of write ack | |
| return null; | |
| IllegalStateException exception = new IllegalStateException( | |
| "Unknown write ack type: " + ack.getMessageWriteStatusCase()); | |
| logger.error("[{}] Received unsupported WriteAck state for seqNo {}: {}", streamId, ack.getSeqNo(), | |
| ack.getMessageWriteStatusCase(), exception); | |
| throw exception; |
| public void sendWriteRequest() { | ||
| YdbTopic.StreamWriteMessage.WriteRequest.Builder req = YdbTopic.StreamWriteMessage.WriteRequest.newBuilder(); | ||
| if (currentTransaction != null) { | ||
| writeRequestBuilder.setTx(YdbTopic.TransactionIdentity.newBuilder() | ||
| req.setTx(YdbTopic.TransactionIdentity.newBuilder() | ||
| .setId(currentTransaction.getId()) | ||
| .setSession(currentTransaction.getSessionId())); | ||
| } | ||
|
|
||
| req.setCodec(codecCode); | ||
| req.addAllMessages(messages); | ||
|
|
||
| YdbTopic.StreamWriteMessage.FromClient fromClient = YdbTopic.StreamWriteMessage.FromClient.newBuilder() | ||
| .setWriteRequest(writeRequestBuilder) | ||
| .setWriteRequest(req.build()) | ||
| .build(); | ||
|
|
||
| if (logger.isDebugEnabled()) { | ||
| logger.debug("Predicted request size: {} = {}(request overhead) + {}(all MessageData protos) " + | ||
| "+ {}(message overheads)\nActual request size: {} bytes", getCurrentRequestSize(), | ||
| requestOverheadBytes, totalMessageDataProtoSize, messageOverheadBytes * messageCount, | ||
| "+ {}(message overheads) Actual request size: {} bytes", getCurrentRequestSize(), | ||
| REQUEST_OVERHEAD, messagesPbSize, MESSAGE_OVERHEAD * messages.size(), | ||
| fromClient.getSerializedSize()); | ||
| } | ||
| if (fromClient.getSerializedSize() > MAX_GRPC_MESSAGE_SIZE) { | ||
| List<YdbTopic.StreamWriteMessage.WriteRequest.MessageData> messages = writeRequestBuilder.getMessagesList(); | ||
| if (messages.size() > 1) { | ||
| int firstHalfMessagesCount = messages.size() / 2; | ||
| logger.debug("Failed to predict request total size. Total size is {} which exceeds the limit of {}. " + | ||
| "Splitting {} messages into two requests of {} and {} messages", | ||
| fromClient.getSerializedSize(), MAX_GRPC_MESSAGE_SIZE, messages.size(), firstHalfMessagesCount, | ||
| messages.size() - firstHalfMessagesCount); | ||
|
|
||
| for (List<YdbTopic.StreamWriteMessage.WriteRequest.MessageData> sublist : Arrays.asList( | ||
| messages.subList(0, firstHalfMessagesCount), | ||
| messages.subList(firstHalfMessagesCount, messages.size()) | ||
| )) { | ||
| writeRequestBuilder = YdbTopic.StreamWriteMessage.WriteRequest.newBuilder() | ||
| .setCodec(settings.getCodec()); | ||
| writeRequestBuilder.addAllMessages(sublist); | ||
| YdbTopic.StreamWriteMessage.FromClient subRequest = YdbTopic.StreamWriteMessage.FromClient | ||
| .newBuilder() | ||
| .setWriteRequest(writeRequestBuilder) | ||
| .build(); | ||
| logger.debug("Total sub-request size: {} bytes", subRequest.getSerializedSize()); | ||
| session.send(subRequest); | ||
| } | ||
| return; | ||
| } | ||
| } | ||
|
|
||
| session.send(fromClient); | ||
| messages.clear(); | ||
| messagesPbSize.set(0); | ||
| } |
There was a problem hiding this comment.
The previous implementation had a safety check that split requests if the actual serialized size exceeded the gRPC limit. That fallback is gone, so any underestimation (e.g., codec varint size, tx fields, metadata growth) can result in sending oversized messages and gRPC failures. Consider restoring an actual-size guard/splitting strategy (and explicitly handling the single-message>limit case).
| byte[] msg1 = new byte[1000]; | ||
| byte[] msg2 = new byte[1001]; | ||
| Arrays.fill(msg1, (byte) 0x10); | ||
| Arrays.fill(msg2, (byte) 0x11); | ||
|
|
||
| writer.send(Message.of(msg1)); | ||
| writer.send(Message.of(msg1)); | ||
| writer.send(Message.of(msg1)); | ||
| writer.flush(); | ||
|
|
||
| IllegalArgumentException ex = Assert.assertThrows(IllegalArgumentException.class, | ||
| () -> writer.send(Message.of(msg2)) | ||
| ); | ||
| Assert.assertEquals("Rejecting a message of 1001 bytes: not enough space in message queue. " | ||
| + "The maximum size of buffer is 1000 bytes", ex.getMessage()); | ||
|
|
||
| writer.send(Message.of(msg2)); // this message is more that buffset limit | ||
| writer.send(Message.of(msg1)); | ||
| writer.send(Message.of(msg1)); | ||
| writer.send(Message.of(msg1)); | ||
|
|
||
| writer.flush(); | ||
| writer.shutdown(10, TimeUnit.SECONDS); |
There was a problem hiding this comment.
This test no longer asserts the expected overflow behavior and will pass even if the buffer limit is not enforced. Please restore an assertion (exception or observable state) that verifies what should happen when a message exceeds maxSendBufferMemorySize.
| Assert.assertEquals("Rejecting a message of 1001 bytes: not enough space in message queue. " | ||
| + "The maximum size of buffer is 1000 bytes", ex.getMessage()); | ||
|
|
||
| writer.send(Message.of(msg2)); // this message is more that buffset limit |
There was a problem hiding this comment.
Typo in comment: "more that buffset limit" should be something like "more than buffer limit".
| writer.send(Message.of(msg2)); // this message is more that buffset limit | |
| writer.send(Message.of(msg2)); // this message is more than buffer limit |
No description provided.