From 69609a4f956ab611d2ff030bb032d2faf5081a03 Mon Sep 17 00:00:00 2001 From: "igor.petrenko" Date: Wed, 17 Dec 2025 19:01:56 +0200 Subject: [PATCH 1/2] CE-126 oap-storage: fix multiple changes --- .../main/java/oap/util/CircularFifoQueue.java | 291 ++++++++++++++++++ .../main/java/oap/storage/MemoryStorage.java | 26 +- .../src/main/java/oap/storage/Storage.java | 4 + .../main/java/oap/storage/TransactionLog.java | 6 +- .../java/oap/storage/TransactionLogImpl.java | 80 +++-- .../test/java/oap/storage/ReplicatorTest.java | 67 ++-- 6 files changed, 420 insertions(+), 54 deletions(-) create mode 100644 oap-stdlib/src/main/java/oap/util/CircularFifoQueue.java diff --git a/oap-stdlib/src/main/java/oap/util/CircularFifoQueue.java b/oap-stdlib/src/main/java/oap/util/CircularFifoQueue.java new file mode 100644 index 000000000..6f22d8120 --- /dev/null +++ b/oap-stdlib/src/main/java/oap/util/CircularFifoQueue.java @@ -0,0 +1,291 @@ +package oap.util; + +import org.apache.commons.lang3.NotImplementedException; + +import javax.annotation.Nonnull; +import java.io.IOException; +import java.io.ObjectInputStream; +import java.io.ObjectOutputStream; +import java.io.Serial; +import java.io.Serializable; +import java.util.AbstractCollection; +import java.util.Arrays; +import java.util.Collection; +import java.util.Iterator; +import java.util.NoSuchElementException; +import java.util.Objects; +import java.util.Queue; + +public class CircularFifoQueue extends AbstractCollection implements Queue, Serializable { + @Serial + private static final long serialVersionUID = 7342311080952645978L; + private final int maxElements; + private transient T[] elements; + private transient int start; + private transient int end; + private transient boolean full; + + public CircularFifoQueue( Collection coll ) { + this( coll.size() ); + this.addAll( coll ); + } + + @SuppressWarnings( "unchecked" ) + public CircularFifoQueue( int size ) { + if( size <= 0 ) { + throw new IllegalArgumentException( "The size must be greater than 0" ); + } else { + this.elements = ( T[] ) ( new Object[size] ); + this.maxElements = this.elements.length; + } + } + + public boolean add( T element ) { + Objects.requireNonNull( element, "element" ); + + if( this.isAtFullCapacity() ) { + this.remove(); + } + + this.elements[this.end++] = element; + if( this.end >= this.maxElements ) { + this.end = 0; + } + + if( this.end == this.start ) { + this.full = true; + } + + return true; + } + + public void clear() { + this.full = false; + this.start = 0; + this.end = 0; + Arrays.fill( this.elements, ( Object ) null ); + } + + @SuppressWarnings( "checkstyle:ParameterAssignment" ) + private int decrement( int index ) { + --index; + if( index < 0 ) { + index = this.maxElements - 1; + } + + return index; + } + + public T element() { + if( this.isEmpty() ) { + throw new NoSuchElementException( "queue is empty" ); + } else { + return ( T ) this.peek(); + } + } + + public T get( int index ) { + int sz = this.size(); + if( index >= 0 && index < sz ) { + int idx = ( this.start + index ) % this.maxElements; + return ( T ) this.elements[idx]; + } else { + throw new NoSuchElementException( String.format( "The specified index %1$d is outside the available range [0, %2$d)", index, sz ) ); + } + } + + @SuppressWarnings( "checkstyle:ParameterAssignment" ) + private int increment( int index ) { + ++index; + if( index >= this.maxElements ) { + index = 0; + } + + return index; + } + + public boolean isAtFullCapacity() { + return this.size() == this.maxElements; + } + + public boolean isEmpty() { + return this.size() == 0; + } + + public boolean isFull() { + return false; + } + + public int maxSize() { + return this.maxElements; + } + + public boolean offer( T element ) { + return this.add( element ); + } + + public T peek() { + return ( T ) ( this.isEmpty() ? null : this.elements[this.start] ); + } + + public T poll() { + return ( T ) ( this.isEmpty() ? null : this.remove() ); + } + + @SuppressWarnings( "unchecked" ) + private void readObject( ObjectInputStream in ) throws IOException, ClassNotFoundException { + in.defaultReadObject(); + this.elements = ( T[] ) ( new Object[this.maxElements] ); + int size = in.readInt(); + + for( int i = 0; i < size; ++i ) { + this.elements[i] = ( T ) in.readObject(); + } + + this.start = 0; + this.full = size == this.maxElements; + if( this.full ) { + this.end = 0; + } else { + this.end = size; + } + + } + + public T remove() { + if( this.isEmpty() ) { + throw new NoSuchElementException( "queue is empty" ); + } else { + T element = this.elements[this.start]; + if( null != element ) { + this.elements[this.start++] = null; + if( this.start >= this.maxElements ) { + this.start = 0; + } + + this.full = false; + } + + return element; + } + } + + public int size() { + int size; + if( this.end < this.start ) { + size = this.maxElements - this.start + this.end; + } else if( this.end == this.start ) { + size = this.full ? this.maxElements : 0; + } else { + size = this.end - this.start; + } + + return size; + } + + @Serial + private void writeObject( ObjectOutputStream out ) throws IOException { + out.defaultWriteObject(); + out.writeInt( this.size() ); + + for( T e : this ) { + out.writeObject( e ); + } + + } + + @Nonnull + @Override + public Iterator iterator() { + return new Iterator<>() { + private int index; + private int lastReturnedIndex; + private boolean isFirst; + + { + this.index = start; + this.lastReturnedIndex = -1; + this.isFirst = full; + } + + public boolean hasNext() { + return this.isFirst || this.index != end; + } + + public T next() { + if( !this.hasNext() ) { + throw new NoSuchElementException(); + } else { + this.isFirst = false; + this.lastReturnedIndex = this.index; + this.index = increment( this.index ); + return elements[this.lastReturnedIndex]; + } + } + + public void remove() { + if( this.lastReturnedIndex == -1 ) { + throw new IllegalStateException(); + } else if( this.lastReturnedIndex == start ) { + remove(); + this.lastReturnedIndex = -1; + } else { + int pos = this.lastReturnedIndex + 1; + if( start < this.lastReturnedIndex && pos < end ) { + System.arraycopy( elements, pos, elements, this.lastReturnedIndex, end - pos ); + } else { + while( pos != end ) { + if( pos >= maxElements ) { + elements[pos - 1] = elements[0]; + pos = 0; + } else { + elements[decrement( pos )] = elements[pos]; + pos = increment( pos ); + } + } + } + + this.lastReturnedIndex = -1; + end = decrement( end ); + elements[end] = null; + full = false; + this.index = decrement( this.index ); + } + } + }; + } + + @Nonnull + public Iterator reverseIterator() { + return new Iterator<>() { + private int index; + private int lastReturnedIndex; + private boolean isLast; + + { + this.index = end; + this.lastReturnedIndex = -1; + this.isLast = full; + } + + public boolean hasNext() { + return this.isLast || this.index != start; + } + + public T next() { + if( !this.hasNext() ) { + throw new NoSuchElementException(); + } else { + this.isLast = false; + this.lastReturnedIndex = decrement( this.index ); + this.index = decrement( this.index ); + return elements[this.lastReturnedIndex]; + } + } + + public void remove() { + throw new NotImplementedException(); + } + }; + } +} diff --git a/oap-storage/oap-storage/src/main/java/oap/storage/MemoryStorage.java b/oap-storage/oap-storage/src/main/java/oap/storage/MemoryStorage.java index c2a148e0e..8114d2e91 100644 --- a/oap-storage/oap-storage/src/main/java/oap/storage/MemoryStorage.java +++ b/oap-storage/oap-storage/src/main/java/oap/storage/MemoryStorage.java @@ -181,6 +181,21 @@ public Optional> getMetadata( @Nonnull Id id ) { return memory.get( id ); } + @Nullable + public Metadata getMetadataNullable( @Nonnull Id id ) { + return memory.getNullable( id ); + } + + @Nullable + @Override + public Data getNullable( @Nonnull Id id ) { + Metadata metadata = getMetadataNullable( id ); + if( metadata != null ) { + return metadata.object; + } + return null; + } + @Override public Optional get( @Nonnull Id id ) { return getMetadata( id ).map( m -> m.object ); @@ -354,11 +369,20 @@ public BiStream> selectUpdatedSince( long since ) { } public Optional> get( @Nonnull I id ) { - requireNonNull( id ); return Optional.ofNullable( data.get( id ) ) .filter( m -> !m.isDeleted() ); } + @Nullable + public Metadata getNullable( @Nonnull I id ) { + Metadata metadata = data.get( id ); + if( metadata != null && !metadata.isDeleted() ) { + return metadata; + } + + return null; + } + public boolean put( @Nonnull I id, @Nonnull Metadata m ) { requireNonNull( id ); requireNonNull( m ); diff --git a/oap-storage/oap-storage/src/main/java/oap/storage/Storage.java b/oap-storage/oap-storage/src/main/java/oap/storage/Storage.java index 5596c341b..03714c2a8 100644 --- a/oap-storage/oap-storage/src/main/java/oap/storage/Storage.java +++ b/oap-storage/oap-storage/src/main/java/oap/storage/Storage.java @@ -30,6 +30,7 @@ import oap.util.Stream; import javax.annotation.Nonnull; +import javax.annotation.Nullable; import java.io.Serial; import java.io.Serializable; import java.util.Collection; @@ -50,6 +51,9 @@ public interface Storage extends Iterable { List> listMetadata(); + @Nullable + Data getNullable( @Nonnull Id id ); + Optional get( @Nonnull Id id ); Data get( Id id, @Nonnull Supplier init, String modifiedBy ); diff --git a/oap-storage/oap-storage/src/main/java/oap/storage/TransactionLog.java b/oap-storage/oap-storage/src/main/java/oap/storage/TransactionLog.java index 4f9e0f7ea..28e527867 100644 --- a/oap-storage/oap-storage/src/main/java/oap/storage/TransactionLog.java +++ b/oap-storage/oap-storage/src/main/java/oap/storage/TransactionLog.java @@ -4,7 +4,7 @@ import java.io.Serial; import java.io.Serializable; -import java.util.List; +import java.util.Collection; import java.util.Map; import java.util.Set; @@ -47,9 +47,9 @@ class ReplicationResult implements Serializable { public final long timestamp; public final long hash; public final ReplicationStatusType type; - public final List> data; + public final Collection> data; - public ReplicationResult( long timestamp, long hash, ReplicationStatusType type, List> data ) { + public ReplicationResult( long timestamp, long hash, ReplicationStatusType type, Collection> data ) { this.timestamp = timestamp; this.hash = hash; this.type = type; diff --git a/oap-storage/oap-storage/src/main/java/oap/storage/TransactionLogImpl.java b/oap-storage/oap-storage/src/main/java/oap/storage/TransactionLogImpl.java index 927ffae83..d177d64fe 100644 --- a/oap-storage/oap-storage/src/main/java/oap/storage/TransactionLogImpl.java +++ b/oap-storage/oap-storage/src/main/java/oap/storage/TransactionLogImpl.java @@ -1,19 +1,22 @@ package oap.storage; +import oap.util.CircularFifoQueue; import oap.util.Lists; -import org.apache.commons.collections4.queue.CircularFifoQueue; import org.joda.time.DateTimeUtils; import org.jspecify.annotations.NonNull; -import java.util.ArrayList; +import java.util.Iterator; +import java.util.LinkedHashMap; import java.util.List; import java.util.Map; import java.util.Set; import java.util.concurrent.atomic.AtomicLong; +import java.util.concurrent.locks.ReentrantLock; public class TransactionLogImpl implements TransactionLog { public final CircularFifoQueue>> transactions; public final AtomicLong timestamp = new AtomicLong(); + public final ReentrantLock lock = new ReentrantLock(); public long hash = DateTimeUtils.currentTimeMillis(); public TransactionLogImpl( int transactionLogSize ) { @@ -21,44 +24,69 @@ public TransactionLogImpl( int transactionLogSize ) { } @Override - public synchronized void insert( Id id, Metadata data ) { - transactions.add( new Transaction<>( timestamp.incrementAndGet(), Operation.INSERT, id, data ) ); + public void insert( Id id, Metadata data ) { + lock.lock(); + try { + transactions.add( new Transaction<>( timestamp.incrementAndGet(), Operation.INSERT, id, data ) ); + } finally { + lock.unlock(); + } } @Override - public synchronized void update( Id id, Metadata data ) { - transactions.add( new Transaction<>( timestamp.incrementAndGet(), Operation.UPDATE, id, data ) ); + public void update( Id id, Metadata data ) { + lock.lock(); + try { + transactions.add( new Transaction<>( timestamp.incrementAndGet(), Operation.UPDATE, id, data ) ); + } finally { + lock.unlock(); + } } @Override - public synchronized void delete( Id id, Metadata data ) { - transactions.add( new Transaction<>( timestamp.incrementAndGet(), Operation.DELETE, id, data ) ); + public void delete( Id id, Metadata data ) { + lock.lock(); + try { + transactions.add( new Transaction<>( timestamp.incrementAndGet(), Operation.DELETE, id, data ) ); + } finally { + lock.unlock(); + } } @Override - public synchronized ReplicationResult> updatedSince( long timestamp, long hash, Set>> fullData ) { - int size = transactions.size(); - if( this.hash != hash || timestamp < 0 ) { // first sync && no modification - return fullSync( fullData, this.timestamp.longValue() ); - } + public ReplicationResult> updatedSince( long timestamp, long hash, Set>> fullData ) { + lock.lock(); + try { + int size = transactions.size(); + if( this.hash != hash || timestamp < 0 ) { // first sync && no modification + return fullSync( fullData, this.timestamp.longValue() ); + } - Transaction> older = transactions.peek(); - if( older == null ) { - return new ReplicationResult<>( this.timestamp.longValue(), this.hash, ReplicationResult.ReplicationStatusType.CHANGES, List.of() ); - } - if( older.timestamp > timestamp ) { - return fullSync( fullData, this.timestamp.longValue() ); - } + Transaction> older = transactions.peek(); + if( older == null ) { + return new ReplicationResult<>( this.timestamp.longValue(), this.hash, ReplicationResult.ReplicationStatusType.CHANGES, List.of() ); + } + if( older.timestamp > timestamp ) { + return fullSync( fullData, this.timestamp.longValue() ); + } - ArrayList>> list = new ArrayList<>( size ); + LinkedHashMap>> list = new LinkedHashMap<>( size ); - for( Transaction> transaction : transactions ) { - if( transaction.timestamp > timestamp ) { - list.add( transaction ); + Iterator>> reverseIterator = transactions.reverseIterator(); + while( reverseIterator.hasNext() ) { + Transaction> transaction = reverseIterator.next(); + + if( transaction.timestamp > timestamp ) { + list.putIfAbsent( transaction.id, transaction ); + } else { + break; + } } - } - return new ReplicationResult<>( this.timestamp.longValue(), this.hash, ReplicationResult.ReplicationStatusType.CHANGES, list ); + return new ReplicationResult<>( this.timestamp.longValue(), this.hash, ReplicationResult.ReplicationStatusType.CHANGES, list.values() ); + } finally { + lock.unlock(); + } } private @NonNull ReplicationResult> fullSync( Set>> fullData, long t ) { diff --git a/oap-storage/oap-storage/src/test/java/oap/storage/ReplicatorTest.java b/oap-storage/oap-storage/src/test/java/oap/storage/ReplicatorTest.java index 24d0660d9..468f99781 100644 --- a/oap-storage/oap-storage/src/test/java/oap/storage/ReplicatorTest.java +++ b/oap-storage/oap-storage/src/test/java/oap/storage/ReplicatorTest.java @@ -113,6 +113,7 @@ public void replicateNow() { try( Replicator replicator = new Replicator<>( slave, master, 5000 ) ) { master.store( new Bean( "1" ), Storage.MODIFIED_BY_SYSTEM ); master.store( new Bean( "2" ), Storage.MODIFIED_BY_SYSTEM ); + replicator.replicateNow(); assertThat( slave.list() ).containsOnly( new Bean( "1" ), new Bean( "2" ) ); } @@ -140,34 +141,35 @@ public void changed( List> added, List { - assertThat( replicator.replicatorSizeFullSync ).hasValue( 1L ); - assertThat( replicator.replicatorSizePartialSync ).hasValue( 0L ); - } ); + assertThat( replicator.replicatorSizeFullSync ).hasValue( 1L ); + assertThat( replicator.replicatorSizePartialSync ).hasValue( 0L ); Dates.incFixed( 10 ); replicator.replicateNow(); replicator.replicateNow(); - assertEventually( 100, 100, () -> { - assertThat( replicator.replicatorCounterFullSync ).hasValue( 1L ); - assertThat( replicator.replicatorCounterPartialSync ).hasValue( 2L ); - } ); + assertThat( replicator.replicatorCounterFullSync ).hasValue( 1L ); + assertThat( replicator.replicatorCounterPartialSync ).hasValue( 2L ); // restart @@ -204,12 +202,33 @@ public void testFullResyncIfMasterRestarted() { master.store( new Bean( "2" ), Storage.MODIFIED_BY_SYSTEM ); replicator.replicateNow(); - assertEventually( 100, 100, () -> { - assertThat( replicator.replicatorCounterFullSync ).hasValue( 2L ); - assertThat( replicator.replicatorCounterPartialSync ).hasValue( 2L ); - } ); + assertThat( replicator.replicatorCounterFullSync ).hasValue( 2L ); + assertThat( replicator.replicatorCounterPartialSync ).hasValue( 2L ); } + } + + @Test + public void testAddDeleteUpdateDeleteAdd() { + MemoryStorage slave = new MemoryStorage<>( Identifier.forId( b -> b.id ).build(), SERIALIZED ); + MemoryStorage master = new MemoryStorage<>( Identifier.forId( b -> b.id ).build(), SERIALIZED, 100 ); + try( Replicator replicator = new Replicator<>( slave, master, 100000 ) ) { + master.store( new Bean( "1" ), Storage.MODIFIED_BY_SYSTEM ); + + replicator.replicateNow(); + assertThat( slave.getNullable( "1" ) ).isNotNull(); + + master.permanentlyDelete( "1" ); + master.store( new Bean( "1" ), Storage.MODIFIED_BY_SYSTEM ); + master.update( "1", b -> { + b.s = "2"; + return b; + }, Storage.MODIFIED_BY_SYSTEM ); + master.permanentlyDelete( "1" ); + master.store( new Bean( "1" ), Storage.MODIFIED_BY_SYSTEM ); + replicator.replicateNow(); + assertThat( slave.getNullable( "1" ) ).isEqualTo( new Bean( "1" ) ); + } } } From c68f06e891397360b77a847dc401c8b087ba97f9 Mon Sep 17 00:00:00 2001 From: "igor.petrenko" Date: Wed, 17 Dec 2025 19:02:09 +0200 Subject: [PATCH 2/2] CE-126 oap-storage: fix multiple changes --- pom.xml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pom.xml b/pom.xml index 3e8850cc7..52ae63948 100644 --- a/pom.xml +++ b/pom.xml @@ -57,7 +57,7 @@ - 25.1.1 + 25.1.2 25.0.0 25.0.0