From 91e7350764f300a071d205e0c0208e57bda963da Mon Sep 17 00:00:00 2001 From: "igor.petrenko" Date: Wed, 17 Dec 2025 13:21:46 +0200 Subject: [PATCH 1/3] CE-124 oap-storage: run full sync when restarting the master --- .../java/oap/storage/MongoPersistence.java | 3 ++- .../java/oap/storage/AbstractPersistance.java | 1 + .../main/java/oap/storage/MemoryStorage.java | 4 ++-- .../java/oap/storage/ReplicationMaster.java | 2 +- .../src/main/java/oap/storage/Replicator.java | 22 +++++++++++-------- .../main/java/oap/storage/TransactionLog.java | 6 +++-- .../java/oap/storage/TransactionLogImpl.java | 15 ++++++++----- .../java/oap/storage/TransactionLogZero.java | 4 ++-- pom.xml | 2 +- 9 files changed, 36 insertions(+), 23 deletions(-) diff --git a/oap-storage/oap-storage-mongo/src/main/java/oap/storage/MongoPersistence.java b/oap-storage/oap-storage-mongo/src/main/java/oap/storage/MongoPersistence.java index c77e77dfb..9e8589586 100644 --- a/oap-storage/oap-storage-mongo/src/main/java/oap/storage/MongoPersistence.java +++ b/oap-storage/oap-storage-mongo/src/main/java/oap/storage/MongoPersistence.java @@ -150,7 +150,7 @@ public void fsync() { var list = new ArrayList>>( batchSize ); var deletedIds = new ArrayList( batchSize ); AtomicInteger updated = new AtomicInteger(); - TransactionLog.ReplicationResult> updatedSince = storage.updatedSince( lastTimestamp ); + TransactionLog.ReplicationResult> updatedSince = storage.updatedSince( lastTimestamp, hash ); updatedSince.data.forEach( t -> { updated.incrementAndGet(); @@ -167,6 +167,7 @@ public void fsync() { log.trace( "fsyncing, last: {}, updated objects in storage: {}, total in storage: {}", lastTimestamp, updated.get(), storage.size() ); persist( deletedIds, list ); lastTimestamp = updatedSince.timestamp; + hash = updatedSince.hash; } ); } diff --git a/oap-storage/oap-storage/src/main/java/oap/storage/AbstractPersistance.java b/oap-storage/oap-storage/src/main/java/oap/storage/AbstractPersistance.java index b043735e0..2758ef61b 100644 --- a/oap-storage/oap-storage/src/main/java/oap/storage/AbstractPersistance.java +++ b/oap-storage/oap-storage/src/main/java/oap/storage/AbstractPersistance.java @@ -65,6 +65,7 @@ public abstract class AbstractPersistance implements Closeable, AutoClosea protected ScheduledExecutorService scheduler; protected int batchSize = 100; protected volatile long lastTimestamp = -1; + protected volatile long hash = -1; protected volatile boolean stopped = false; public AbstractPersistance( MemoryStorage storage, String tableName, long delay, Path crashDumpPath ) { diff --git a/oap-storage/oap-storage/src/main/java/oap/storage/MemoryStorage.java b/oap-storage/oap-storage/src/main/java/oap/storage/MemoryStorage.java index 1659c0961..c2a148e0e 100644 --- a/oap-storage/oap-storage/src/main/java/oap/storage/MemoryStorage.java +++ b/oap-storage/oap-storage/src/main/java/oap/storage/MemoryStorage.java @@ -327,10 +327,10 @@ public void forEach( Consumer action ) { } @Override - public TransactionLog.ReplicationResult> updatedSince( long timestamp ) { + public TransactionLog.ReplicationResult> updatedSince( long timestamp, long hash ) { log.trace( "requested updated objects timestamp {}", timestamp ); - return transactionLog.updatedSince( timestamp, memory.data.entrySet() ); + return transactionLog.updatedSince( timestamp, hash, memory.data.entrySet() ); } protected static class Memory { diff --git a/oap-storage/oap-storage/src/main/java/oap/storage/ReplicationMaster.java b/oap-storage/oap-storage/src/main/java/oap/storage/ReplicationMaster.java index 913074544..d04fccd1e 100644 --- a/oap-storage/oap-storage/src/main/java/oap/storage/ReplicationMaster.java +++ b/oap-storage/oap-storage/src/main/java/oap/storage/ReplicationMaster.java @@ -25,5 +25,5 @@ package oap.storage; public interface ReplicationMaster { - TransactionLog.ReplicationResult> updatedSince( long timestamp ); + TransactionLog.ReplicationResult> updatedSince( long timestamp, long hash ); } diff --git a/oap-storage/oap-storage/src/main/java/oap/storage/Replicator.java b/oap-storage/oap-storage/src/main/java/oap/storage/Replicator.java index 476bd4541..4210793d8 100644 --- a/oap-storage/oap-storage/src/main/java/oap/storage/Replicator.java +++ b/oap-storage/oap-storage/src/main/java/oap/storage/Replicator.java @@ -33,6 +33,7 @@ import oap.io.Closeables; import oap.storage.Storage.DataListener.IdObject; import oap.util.Cuid; +import oap.util.Pair; import java.io.Closeable; import java.io.UncheckedIOException; @@ -41,6 +42,7 @@ import static oap.storage.Storage.DataListener.IdObject.__io; import static oap.storage.TransactionLog.ReplicationResult.ReplicationStatusType.FULL_SYNC; +import static oap.util.Pair.__; /** * Replicator works on the MemoryStorage internals. It's intentional. @@ -54,6 +56,7 @@ public class Replicator implements Closeable { private String uniqueName = Cuid.UNIQUE.next(); private Scheduled scheduled; private transient long timestamp = -1L; + private transient long hash = -1L; public Replicator( MemoryStorage slave, ReplicationMaster master, long interval ) { @@ -62,10 +65,11 @@ public Replicator( MemoryStorage slave, ReplicationMaster master, lo this.slave = slave; this.master = master; this.scheduled = Scheduler.scheduleWithFixedDelay( getClass(), interval, i -> { - long newTimestamp = replicate( timestamp ); + Pair newTimestamp = replicate( timestamp ); log.trace( "[{}] newTimestamp {}, lastModified {}", uniqueName, newTimestamp, timestamp ); - timestamp = newTimestamp; + timestamp = newTimestamp._1; + hash = newTimestamp._2; } ); } @@ -79,12 +83,12 @@ public void replicateAllNow() { replicateNow(); } - public synchronized long replicate( long timestamp ) { - log.trace( "replicate service {} timestamp {}", uniqueName, timestamp ); + public synchronized Pair replicate( long timestamp ) { + log.trace( "replicate service {} timestamp {} hash {}", uniqueName, timestamp, hash ); try { - log.trace( "[{}] replicate {} to {} timestamp: {}", master, slave, timestamp, uniqueName ); - TransactionLog.ReplicationResult> updatedSince = master.updatedSince( timestamp ); + log.trace( "[{}] replicate {} to {} timestamp {} hash {}", uniqueName, master, slave, timestamp, hash ); + TransactionLog.ReplicationResult> updatedSince = master.updatedSince( timestamp, hash ); log.trace( "[{}] type {} updated objects {}", uniqueName, updatedSince.type, updatedSince.data.size() ); Metrics.counter( "replicator", Tags.of( "name", uniqueName, "type", updatedSince.type.name() ) ).increment(); @@ -135,14 +139,14 @@ public synchronized long replicate( long timestamp ) { slave.fireChanged( added, updated, deleted ); } - return lastUpdate; + return __( lastUpdate, updatedSince.hash ); } catch( UncheckedIOException e ) { log.error( e.getCause().getMessage() ); - return timestamp; + return __( timestamp, hash ); } catch( Exception e ) { if( e.getCause() instanceof SocketException ) { log.error( e.getCause().getMessage() ); - return timestamp; + return __( timestamp, hash ); } throw e; } diff --git a/oap-storage/oap-storage/src/main/java/oap/storage/TransactionLog.java b/oap-storage/oap-storage/src/main/java/oap/storage/TransactionLog.java index 40044a44d..4f9e0f7ea 100644 --- a/oap-storage/oap-storage/src/main/java/oap/storage/TransactionLog.java +++ b/oap-storage/oap-storage/src/main/java/oap/storage/TransactionLog.java @@ -15,7 +15,7 @@ public interface TransactionLog { void delete( Id id, Metadata data ); - ReplicationResult> updatedSince( long timestamp, Set>> fullData ); + ReplicationResult> updatedSince( long timestamp, long hash, Set>> fullData ); enum Operation { INSERT, @@ -45,11 +45,13 @@ class ReplicationResult implements Serializable { private static final long serialVersionUID = 467235422368462526L; public final long timestamp; + public final long hash; public final ReplicationStatusType type; public final List> data; - public ReplicationResult( long timestamp, ReplicationStatusType type, List> data ) { + public ReplicationResult( long timestamp, long hash, ReplicationStatusType type, List> data ) { this.timestamp = timestamp; + this.hash = hash; this.type = type; this.data = data; } diff --git a/oap-storage/oap-storage/src/main/java/oap/storage/TransactionLogImpl.java b/oap-storage/oap-storage/src/main/java/oap/storage/TransactionLogImpl.java index 3bd6233fa..aca0b82b9 100644 --- a/oap-storage/oap-storage/src/main/java/oap/storage/TransactionLogImpl.java +++ b/oap-storage/oap-storage/src/main/java/oap/storage/TransactionLogImpl.java @@ -13,6 +13,7 @@ public class TransactionLogImpl implements TransactionLog { public final CircularFifoQueue>> transactions; public final AtomicLong timestamp = new AtomicLong(); + public final long hash = System.currentTimeMillis(); public TransactionLogImpl( int transactionLogSize ) { this.transactions = new CircularFifoQueue<>( transactionLogSize ); @@ -34,15 +35,15 @@ public synchronized void delete( Id id, Metadata data ) { } @Override - public synchronized ReplicationResult> updatedSince( long timestamp, Set>> fullData ) { + public synchronized ReplicationResult> updatedSince( long timestamp, long hash, Set>> fullData ) { int size = transactions.size(); - if( size == 0 && timestamp < 0 ) { // first sync && no modification + if( this.hash != hash || size == 0 && timestamp < 0 ) { // first sync && no modification return fullSync( fullData, this.timestamp.longValue() ); } Transaction> older = transactions.peek(); if( older == null ) { - return new ReplicationResult<>( this.timestamp.longValue(), ReplicationResult.ReplicationStatusType.CHANGES, List.of() ); + return new ReplicationResult<>( this.timestamp.longValue(), this.hash, ReplicationResult.ReplicationStatusType.CHANGES, List.of() ); } if( older.timestamp > timestamp ) { return fullSync( fullData, this.timestamp.longValue() ); @@ -56,10 +57,14 @@ public synchronized ReplicationResult> updatedSince( long timest } } - return new ReplicationResult<>( this.timestamp.longValue(), ReplicationResult.ReplicationStatusType.CHANGES, list ); + return new ReplicationResult<>( this.timestamp.longValue(), this.hash, ReplicationResult.ReplicationStatusType.CHANGES, list ); } private @NonNull ReplicationResult> fullSync( Set>> fullData, long t ) { - return new ReplicationResult<>( this.timestamp.longValue(), ReplicationResult.ReplicationStatusType.FULL_SYNC, Lists.map( fullData, d -> new Transaction<>( t, Operation.UPDATE, d.getKey(), d.getValue() ) ) ); + return new ReplicationResult<>( + this.timestamp.longValue(), + this.hash, + ReplicationResult.ReplicationStatusType.FULL_SYNC, + Lists.map( fullData, d -> new Transaction<>( t, Operation.UPDATE, d.getKey(), d.getValue() ) ) ); } } diff --git a/oap-storage/oap-storage/src/main/java/oap/storage/TransactionLogZero.java b/oap-storage/oap-storage/src/main/java/oap/storage/TransactionLogZero.java index 8c0bfd329..a3d773a9b 100644 --- a/oap-storage/oap-storage/src/main/java/oap/storage/TransactionLogZero.java +++ b/oap-storage/oap-storage/src/main/java/oap/storage/TransactionLogZero.java @@ -23,11 +23,11 @@ public void delete( Id id, Metadata data ) { } @Override - public ReplicationResult> updatedSince( long timestamp, Set>> fullData ) { + public ReplicationResult> updatedSince( long timestamp, long hash, Set>> fullData ) { throw new IllegalStateException(); } private @NonNull ReplicationResult> fullSync( Set>> fullData ) { - return new ReplicationResult<>( -1, ReplicationResult.ReplicationStatusType.FULL_SYNC, Lists.map( fullData, d -> new Transaction<>( 0L, Operation.UPDATE, d.getKey(), d.getValue() ) ) ); + return new ReplicationResult<>( -1, 0L, ReplicationResult.ReplicationStatusType.FULL_SYNC, Lists.map( fullData, d -> new Transaction<>( 0L, Operation.UPDATE, d.getKey(), d.getValue() ) ) ); } } diff --git a/pom.xml b/pom.xml index 04d3872b4..3e8850cc7 100644 --- a/pom.xml +++ b/pom.xml @@ -57,7 +57,7 @@ - 25.1.0 + 25.1.1 25.0.0 25.0.0 From 9c87ab7b919677663768f95e52885c22435a1faf Mon Sep 17 00:00:00 2001 From: "igor.petrenko" Date: Wed, 17 Dec 2025 14:12:15 +0200 Subject: [PATCH 2/3] CE-124 oap-storage: run full sync when restarting the master --- .../src/main/java/oap/storage/Replicator.java | 131 +++++++++++------- .../java/oap/storage/TransactionLogImpl.java | 9 +- .../test/java/oap/storage/ReplicatorTest.java | 40 ++++++ 3 files changed, 129 insertions(+), 51 deletions(-) diff --git a/oap-storage/oap-storage/src/main/java/oap/storage/Replicator.java b/oap-storage/oap-storage/src/main/java/oap/storage/Replicator.java index 4210793d8..16766cac5 100644 --- a/oap-storage/oap-storage/src/main/java/oap/storage/Replicator.java +++ b/oap-storage/oap-storage/src/main/java/oap/storage/Replicator.java @@ -39,8 +39,11 @@ import java.io.UncheckedIOException; import java.net.SocketException; import java.util.ArrayList; +import java.util.concurrent.atomic.AtomicLong; +import java.util.concurrent.locks.ReentrantLock; import static oap.storage.Storage.DataListener.IdObject.__io; +import static oap.storage.TransactionLog.ReplicationResult.ReplicationStatusType.CHANGES; import static oap.storage.TransactionLog.ReplicationResult.ReplicationStatusType.FULL_SYNC; import static oap.util.Pair.__; @@ -51,19 +54,25 @@ */ @Slf4j public class Replicator implements Closeable { + final AtomicLong replicatorCounterFullSync = new AtomicLong(); + final AtomicLong replicatorCounterPartialSync = new AtomicLong(); + final AtomicLong replicatorSizeFullSync = new AtomicLong(); + final AtomicLong replicatorSizePartialSync = new AtomicLong(); + private final MemoryStorage slave; private final ReplicationMaster master; + final ReentrantLock lock = new ReentrantLock(); private String uniqueName = Cuid.UNIQUE.next(); - private Scheduled scheduled; + private final Scheduled scheduled; private transient long timestamp = -1L; private transient long hash = -1L; public Replicator( MemoryStorage slave, ReplicationMaster master, long interval ) { - Preconditions.checkArgument( slave.transactionLog instanceof TransactionLogZero ); this.slave = slave; this.master = master; + this.scheduled = Scheduler.scheduleWithFixedDelay( getClass(), interval, i -> { Pair newTimestamp = replicate( timestamp ); log.trace( "[{}] newTimestamp {}, lastModified {}", uniqueName, newTimestamp, timestamp ); @@ -73,6 +82,14 @@ public Replicator( MemoryStorage slave, ReplicationMaster master, lo } ); } + public void start() { + Metrics.gauge( "replicator", Tags.of( "name", uniqueName, "type", FULL_SYNC.name() ), this, _ -> replicatorCounterFullSync.doubleValue() ); + Metrics.gauge( "replicator", Tags.of( "name", uniqueName, "type", CHANGES.name() ), this, _ -> replicatorCounterPartialSync.doubleValue() ); + + Metrics.gauge( "replicator_size", Tags.of( "name", uniqueName, "type", FULL_SYNC.name() ), this, _ -> replicatorSizeFullSync.doubleValue() ); + Metrics.gauge( "replicator_size", Tags.of( "name", uniqueName, "type", CHANGES.name() ), this, _ -> replicatorSizePartialSync.doubleValue() ); + } + public void replicateNow() { log.trace( "[{}] forcing replication...", uniqueName ); scheduled.triggerNow(); @@ -83,72 +100,86 @@ public void replicateAllNow() { replicateNow(); } - public synchronized Pair replicate( long timestamp ) { - log.trace( "replicate service {} timestamp {} hash {}", uniqueName, timestamp, hash ); - + public Pair replicate( long timestamp ) { + lock.lock(); try { - log.trace( "[{}] replicate {} to {} timestamp {} hash {}", uniqueName, master, slave, timestamp, hash ); - TransactionLog.ReplicationResult> updatedSince = master.updatedSince( timestamp, hash ); - log.trace( "[{}] type {} updated objects {}", uniqueName, updatedSince.type, updatedSince.data.size() ); + log.trace( "replicate service {} timestamp {} hash {}", uniqueName, timestamp, hash ); - Metrics.counter( "replicator", Tags.of( "name", uniqueName, "type", updatedSince.type.name() ) ).increment(); - Metrics.counter( "replicator_size", Tags.of( "name", uniqueName, "type", updatedSince.type.name() ) ).increment( updatedSince.data.size() ); + try { + log.trace( "[{}] replicate {} to {} timestamp {} hash {}", uniqueName, master, slave, timestamp, hash ); + TransactionLog.ReplicationResult> updatedSince = master.updatedSince( timestamp, hash ); + log.trace( "[{}] type {} updated objects {}", uniqueName, updatedSince.type, updatedSince.data.size() ); - ArrayList> added = new ArrayList<>(); - ArrayList> updated = new ArrayList<>(); - ArrayList> deleted = new ArrayList<>(); + switch( updatedSince.type ) { + case FULL_SYNC -> { + replicatorCounterFullSync.incrementAndGet(); + replicatorSizeFullSync.addAndGet( updatedSince.data.size() ); + } + case CHANGES -> { + replicatorCounterPartialSync.incrementAndGet(); + replicatorSizePartialSync.addAndGet( updatedSince.data.size() ); + } + default -> throw new IllegalStateException( "Unknown replicator type: " + updatedSince.type ); + } - long lastUpdate = updatedSince.timestamp; + ArrayList> added = new ArrayList<>(); + ArrayList> updated = new ArrayList<>(); + ArrayList> deleted = new ArrayList<>(); - if( updatedSince.type == FULL_SYNC ) { - slave.memory.removePermanently(); - } + long lastUpdate = updatedSince.timestamp; - for( TransactionLog.Transaction> transaction : updatedSince.data ) { - log.trace( "[{}] replicate {}", transaction, uniqueName ); + if( updatedSince.type == FULL_SYNC ) { + slave.memory.removePermanently(); + } - Metadata metadata = transaction.object; - I id = transaction.id; + for( TransactionLog.Transaction> transaction : updatedSince.data ) { + log.trace( "[{}] replicate {}", transaction, uniqueName ); - switch( transaction.operation ) { - case INSERT -> { - added.add( __io( id, metadata ) ); - slave.memory.put( id, metadata ); - } - case UPDATE -> { - if( metadata.isDeleted() ) { - deleted.add( __io( id, metadata ) ); - slave.memory.removePermanently( id ); - } else { - if( updatedSince.type == FULL_SYNC ) { - added.add( __io( id, metadata ) ); + Metadata metadata = transaction.object; + I id = transaction.id; + + switch( transaction.operation ) { + case INSERT -> { + added.add( __io( id, metadata ) ); + slave.memory.put( id, metadata ); + } + case UPDATE -> { + if( metadata.isDeleted() ) { + deleted.add( __io( id, metadata ) ); + slave.memory.removePermanently( id ); } else { - updated.add( __io( id, metadata ) ); + if( updatedSince.type == FULL_SYNC ) { + added.add( __io( id, metadata ) ); + } else { + updated.add( __io( id, metadata ) ); + } + slave.memory.put( id, metadata ); } - slave.memory.put( id, metadata ); } - } - case DELETE -> { - deleted.add( __io( id, metadata ) ); - slave.memory.removePermanently( id ); + case DELETE -> { + deleted.add( __io( id, metadata ) ); + slave.memory.removePermanently( id ); + } } } - } - if( !added.isEmpty() || !updated.isEmpty() || !deleted.isEmpty() ) { - slave.fireChanged( added, updated, deleted ); - } + if( !added.isEmpty() || !updated.isEmpty() || !deleted.isEmpty() ) { + slave.fireChanged( added, updated, deleted ); + } - return __( lastUpdate, updatedSince.hash ); - } catch( UncheckedIOException e ) { - log.error( e.getCause().getMessage() ); - return __( timestamp, hash ); - } catch( Exception e ) { - if( e.getCause() instanceof SocketException ) { + return __( lastUpdate, updatedSince.hash ); + } catch( UncheckedIOException e ) { log.error( e.getCause().getMessage() ); return __( timestamp, hash ); + } catch( Exception e ) { + if( e.getCause() instanceof SocketException ) { + log.error( e.getCause().getMessage() ); + return __( timestamp, hash ); + } + throw e; } - throw e; + } finally { + lock.unlock(); } } diff --git a/oap-storage/oap-storage/src/main/java/oap/storage/TransactionLogImpl.java b/oap-storage/oap-storage/src/main/java/oap/storage/TransactionLogImpl.java index aca0b82b9..6ca71aaf2 100644 --- a/oap-storage/oap-storage/src/main/java/oap/storage/TransactionLogImpl.java +++ b/oap-storage/oap-storage/src/main/java/oap/storage/TransactionLogImpl.java @@ -2,6 +2,7 @@ import oap.util.Lists; import org.apache.commons.collections4.queue.CircularFifoQueue; +import org.joda.time.DateTimeUtils; import org.jspecify.annotations.NonNull; import java.util.ArrayList; @@ -13,7 +14,7 @@ public class TransactionLogImpl implements TransactionLog { public final CircularFifoQueue>> transactions; public final AtomicLong timestamp = new AtomicLong(); - public final long hash = System.currentTimeMillis(); + public long hash = DateTimeUtils.currentTimeMillis(); public TransactionLogImpl( int transactionLogSize ) { this.transactions = new CircularFifoQueue<>( transactionLogSize ); @@ -67,4 +68,10 @@ public synchronized ReplicationResult> updatedSince( long timest ReplicationResult.ReplicationStatusType.FULL_SYNC, Lists.map( fullData, d -> new Transaction<>( t, Operation.UPDATE, d.getKey(), d.getValue() ) ) ); } + + public void reset() { + hash = DateTimeUtils.currentTimeMillis(); + timestamp.set( 0 ); + transactions.clear(); + } } diff --git a/oap-storage/oap-storage/src/test/java/oap/storage/ReplicatorTest.java b/oap-storage/oap-storage/src/test/java/oap/storage/ReplicatorTest.java index 4a2a0e6e6..24d0660d9 100644 --- a/oap-storage/oap-storage/src/test/java/oap/storage/ReplicatorTest.java +++ b/oap-storage/oap-storage/src/test/java/oap/storage/ReplicatorTest.java @@ -28,6 +28,7 @@ import oap.json.TypeIdFactory; import oap.testng.Fixtures; import oap.testng.SystemTimerFixture; +import oap.util.Dates; import org.joda.time.DateTimeUtils; import org.testng.annotations.Test; @@ -172,4 +173,43 @@ public void changed( List> added, List slave = new MemoryStorage<>( Identifier.forId( b -> b.id ).build(), SERIALIZED ); + MemoryStorage master = new MemoryStorage<>( Identifier.forId( b -> b.id ).build(), SERIALIZED, 100 ); + try( Replicator replicator = new Replicator<>( slave, master, 100000 ) ) { + master.store( new Bean( "1" ), Storage.MODIFIED_BY_SYSTEM ); + + replicator.replicateNow(); + assertEventually( 100, 100, () -> { + assertThat( replicator.replicatorSizeFullSync ).hasValue( 1L ); + assertThat( replicator.replicatorSizePartialSync ).hasValue( 0L ); + } ); + + Dates.incFixed( 10 ); + replicator.replicateNow(); + replicator.replicateNow(); + assertEventually( 100, 100, () -> { + assertThat( replicator.replicatorCounterFullSync ).hasValue( 1L ); + assertThat( replicator.replicatorCounterPartialSync ).hasValue( 2L ); + } ); + + + // restart + Dates.incFixed( 10 ); + ( ( TransactionLogImpl ) master.transactionLog ).reset(); + + master.store( new Bean( "2" ), Storage.MODIFIED_BY_SYSTEM ); + + replicator.replicateNow(); + assertEventually( 100, 100, () -> { + assertThat( replicator.replicatorCounterFullSync ).hasValue( 2L ); + assertThat( replicator.replicatorCounterPartialSync ).hasValue( 2L ); + } ); + } + + } + } From 5d184837ae7dc56f3d28406c15c40404bacf6af2 Mon Sep 17 00:00:00 2001 From: "igor.petrenko" Date: Wed, 17 Dec 2025 15:03:55 +0200 Subject: [PATCH 3/3] CE-124 oap-storage: run full sync when restarting the master --- .../src/main/java/oap/storage/TransactionLogImpl.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/oap-storage/oap-storage/src/main/java/oap/storage/TransactionLogImpl.java b/oap-storage/oap-storage/src/main/java/oap/storage/TransactionLogImpl.java index 6ca71aaf2..927ffae83 100644 --- a/oap-storage/oap-storage/src/main/java/oap/storage/TransactionLogImpl.java +++ b/oap-storage/oap-storage/src/main/java/oap/storage/TransactionLogImpl.java @@ -38,7 +38,7 @@ public synchronized void delete( Id id, Metadata data ) { @Override public synchronized ReplicationResult> updatedSince( long timestamp, long hash, Set>> fullData ) { int size = transactions.size(); - if( this.hash != hash || size == 0 && timestamp < 0 ) { // first sync && no modification + if( this.hash != hash || timestamp < 0 ) { // first sync && no modification return fullSync( fullData, this.timestamp.longValue() ); }