diff --git a/src/main/java/com/baidu/fsg/uid/buffer/BufferPaddingExecutor.java b/src/main/java/com/baidu/fsg/uid/buffer/BufferPaddingExecutor.java index 80c37ad..89b78b2 100644 --- a/src/main/java/com/baidu/fsg/uid/buffer/BufferPaddingExecutor.java +++ b/src/main/java/com/baidu/fsg/uid/buffer/BufferPaddingExecutor.java @@ -147,6 +147,27 @@ public void paddingBuffer() { } // fill the rest slots until to catch the cursor + putBuffer(); + } + /** + * Padding buffer fill the slots until to catch the cursor + */ + public Boolean booleanPaddingBuffer() { + LOGGER.info("Ready to padding buffer lastSecond:{}. {}", lastSecond.get(), ringBuffer); + + // is still running + if (!running.compareAndSet(false, true)) { + LOGGER.info("Padding buffer is still running. {}", ringBuffer); + return false; + } + + // fill the rest slots until to catch the cursor + putBuffer(); + return true; + } + + private void putBuffer() { + boolean isFullRingBuffer = false; while (!isFullRingBuffer) { List uidList = uidProvider.provide(lastSecond.incrementAndGet()); diff --git a/src/main/java/com/baidu/fsg/uid/buffer/RingBuffer.java b/src/main/java/com/baidu/fsg/uid/buffer/RingBuffer.java index 2b5ab62..a982e27 100644 --- a/src/main/java/com/baidu/fsg/uid/buffer/RingBuffer.java +++ b/src/main/java/com/baidu/fsg/uid/buffer/RingBuffer.java @@ -166,9 +166,16 @@ public long take() { // cursor catch the tail, means that there is no more available UID to take if (nextCursor == currentCursor) { - rejectedTakeHandler.rejectTakeBuffer(this); + Boolean running = false; + while (!running) { + running = bufferPaddingExecutor.booleanPaddingBuffer(); + } + nextCursor = cursor.updateAndGet(old -> old == tail.get() ? old : old + 1); + + if (nextCursor == currentCursor) { + rejectedTakeHandler.rejectTakeBuffer(this); + } } - // 1. check next slot flag is CAN_TAKE_FLAG int nextCursorIndex = calSlotIndex(nextCursor); Assert.isTrue(flags[nextCursorIndex].get() == CAN_TAKE_FLAG, "Curosr not in can take status");