diff --git a/oap-storage/oap-storage-mongo-test/src/test/java/oap/storage/MongoPersistenceTest.java b/oap-storage/oap-storage-mongo-test/src/test/java/oap/storage/MongoPersistenceTest.java index 66e50c025..f4dcc67c9 100644 --- a/oap-storage/oap-storage-mongo-test/src/test/java/oap/storage/MongoPersistenceTest.java +++ b/oap-storage/oap-storage-mongo-test/src/test/java/oap/storage/MongoPersistenceTest.java @@ -114,7 +114,7 @@ public void delete() { Bean bean1 = storage.store( new Bean( "test1" ), Storage.MODIFIED_BY_SYSTEM ); storage.store( new Bean( "test2" ), Storage.MODIFIED_BY_SYSTEM ); - storage.delete( bean1.id, Storage.MODIFIED_BY_SYSTEM ); + storage.delete( bean1.id ); // one bean is removed, one is left assertEventually( 100, 100, () -> assertThat( persistence.collection.countDocuments() ).isEqualTo( 1 ) ); } diff --git a/oap-storage/oap-storage-mongo/src/main/java/oap/storage/MongoPersistence.java b/oap-storage/oap-storage-mongo/src/main/java/oap/storage/MongoPersistence.java index 9e8589586..0d8f75bef 100644 --- a/oap-storage/oap-storage-mongo/src/main/java/oap/storage/MongoPersistence.java +++ b/oap-storage/oap-storage-mongo/src/main/java/oap/storage/MongoPersistence.java @@ -154,7 +154,7 @@ public void fsync() { updatedSince.data.forEach( t -> { updated.incrementAndGet(); - if( t.operation == TransactionLog.Operation.DELETE || t.object.isDeleted() ) { + if( t.operation == TransactionLog.Operation.DELETE ) { deletedIds.add( t.id ); list.add( new DeleteOneModel<>( eq( "_id", t.id ) ) ); } else { @@ -175,7 +175,7 @@ private void persist( List deletedIds, List>> list ) { if( list.isEmpty() ) return; try { collection.bulkWrite( list, new BulkWriteOptions().ordered( false ) ); - deletedIds.forEach( storage.memory::removePermanently ); + deletedIds.forEach( storage.memory::delete ); list.clear(); deletedIds.clear(); } catch( Exception e ) { diff --git a/oap-storage/oap-storage/src/main/java/oap/storage/AbstractPersistance.java b/oap-storage/oap-storage/src/main/java/oap/storage/AbstractPersistance.java index 2758ef61b..57db45ee8 100644 --- a/oap-storage/oap-storage/src/main/java/oap/storage/AbstractPersistance.java +++ b/oap-storage/oap-storage/src/main/java/oap/storage/AbstractPersistance.java @@ -104,7 +104,7 @@ public void preStart() { } protected Optional deleteById( String id ) { - return storage.delete( storage.identifier.fromString( id ), Storage.MODIFIED_BY_SYSTEM ); + return storage.delete( storage.identifier.fromString( id ) ); } protected abstract void load(); diff --git a/oap-storage/oap-storage/src/main/java/oap/storage/MemoryStorage.java b/oap-storage/oap-storage/src/main/java/oap/storage/MemoryStorage.java index 8114d2e91..4b874facd 100644 --- a/oap-storage/oap-storage/src/main/java/oap/storage/MemoryStorage.java +++ b/oap-storage/oap-storage/src/main/java/oap/storage/MemoryStorage.java @@ -28,7 +28,6 @@ import oap.storage.Storage.DataListener.IdObject; import oap.util.BiStream; import oap.util.Lists; -import oap.util.Pair; import oap.util.Stream; import org.apache.commons.lang3.mutable.MutableObject; @@ -72,24 +71,12 @@ public MemoryStorage( Identifier identifier, Lock lock, int transactio @Override public Stream select() { - return select( true ); - } - - public Stream select( boolean liveOnly ) { - return selectMetadata( liveOnly ).map( metadata -> metadata.object ); - } - - public Stream> selectMetadata( boolean liveOnly ) { - return ( liveOnly ? memory.selectLive() : memory.selectAll() ).map( p -> p._2 ); + return selectMetadata().map( metadata -> metadata.object ); } @Override public Stream> selectMetadata() { - return selectMetadata( true ); - } - - Stream selectAll() { - return memory.selectAll().map( p -> p._2.object ); + return memory.selectAll().map( p -> p._2 ); } @Override @@ -209,37 +196,22 @@ public Data get( Id id, @Nonnull Supplier init, String modifiedBy ) { @Override public void deleteAll() { - fireDeleted( Lists.map( memory.markDeletedAll(), p -> IdObject.__io( p._1, p._2 ) ) ); + HashMap> map = memory.deleteAll(); + fireDeleted( Lists.map( map.entrySet(), p -> IdObject.__io( p.getKey(), p.getValue() ) ) ); } @Override - public Optional delete( @Nonnull Id id, String modifiedBy ) { - return deleteMetadata( id, modifiedBy ).map( m -> m.object ); + public Optional delete( @Nonnull Id id ) { + return deleteMetadata( id ).map( m -> m.object ); } @Override - public Optional> deleteMetadata( @Nonnull Id id, String modifiedBy ) { - requireNonNull( id ); - Optional> old = memory.markDeleted( id, modifiedBy ); + public Optional> deleteMetadata( @Nonnull Id id ) { + Optional> old = memory.delete( id ); old.ifPresent( o -> fireDeleted( id, o ) ); return old; } - @Override - public Optional permanentlyDelete( @Nonnull Id id ) { - requireNonNull( id ); - Optional> old = memory.removePermanently( id ); - old.ifPresent( o -> firePermanentlyDeleted( id, o ) ); - return old.map( m -> m.object ); - } - - @Override - public void permanentlyDelete() { - HashMap> oldData = memory.removePermanently(); - - oldData.forEach( this::firePermanentlyDeleted ); - } - @Override public long size() { return memory.selectLiveIds().count(); @@ -356,10 +328,6 @@ public Memory( Lock lock ) { this.lock = lock; } - public BiStream> selectLive() { - return BiStream.of( data ).filter( ( _, m ) -> !m.isDeleted() ); - } - public BiStream> selectAll() { return BiStream.of( data ); } @@ -369,18 +337,12 @@ public BiStream> selectUpdatedSince( long since ) { } public Optional> get( @Nonnull I id ) { - return Optional.ofNullable( data.get( id ) ) - .filter( m -> !m.isDeleted() ); + return Optional.ofNullable( data.get( id ) ); } @Nullable public Metadata getNullable( @Nonnull I id ) { - Metadata metadata = data.get( id ); - if( metadata != null && !metadata.isDeleted() ) { - return metadata; - } - - return null; + return data.get( id ); } public boolean put( @Nonnull I id, @Nonnull Metadata m ) { @@ -443,27 +405,11 @@ public Optional> tryRemap( I id, Function tryUpdate, String mo } ); } - public List>> markDeletedAll() { - List>> ms = selectLive().toList(); - ms.forEach( p -> p._2.delete( Storage.MODIFIED_BY_SYSTEM ) ); - return ms; - } - - public Optional> markDeleted( @Nonnull I id, String modifiedBy ) { - return lock.synchronizedOn( id, () -> { - Metadata metadata = data.get( id ); - if( metadata != null ) { - metadata.delete( modifiedBy ); - return Optional.of( metadata ); - } else return Optional.empty(); - } ); - } - - public Optional> removePermanently( @Nonnull I id ) { + public Optional> delete( @Nonnull I id ) { return Optional.ofNullable( data.remove( id ) ); } - public HashMap> removePermanently() { + public HashMap> deleteAll() { HashMap> oldData = new HashMap<>( data ); data.clear(); @@ -475,7 +421,7 @@ public void clear() { } public Stream selectLiveIds() { - return selectLive().mapToObj( ( id, _ ) -> id ); + return selectAll().mapToObj( ( id, _ ) -> id ); } } } diff --git a/oap-storage/oap-storage/src/main/java/oap/storage/Metadata.java b/oap-storage/oap-storage/src/main/java/oap/storage/Metadata.java index 0c7b32b83..092bb01fc 100644 --- a/oap-storage/oap-storage/src/main/java/oap/storage/Metadata.java +++ b/oap-storage/oap-storage/src/main/java/oap/storage/Metadata.java @@ -28,7 +28,6 @@ import com.fasterxml.jackson.annotation.JsonTypeInfo; import com.fasterxml.jackson.databind.annotation.JsonTypeIdResolver; import lombok.EqualsAndHashCode; -import lombok.Getter; import oap.json.TypeIdFactory; import org.joda.time.DateTimeUtils; @@ -47,8 +46,6 @@ public class Metadata implements Serializable { @JsonTypeIdResolver( TypeIdFactory.class ) @JsonTypeInfo( use = JsonTypeInfo.Id.CUSTOM, include = JsonTypeInfo.As.EXTERNAL_PROPERTY, property = "object:type" ) public T object; - @Getter - private boolean deleted = false; @JsonCreator protected Metadata( T object, String createdBy ) { @@ -69,7 +66,6 @@ public static Metadata from( Metadata metadata ) { public Metadata update( T t, String modifiedBy ) { this.object = t; - this.deleted = false; if( this.createdBy == null ) { this.createdBy = modifiedBy; } @@ -82,12 +78,6 @@ public void refresh() { this.modified = DateTimeUtils.currentTimeMillis(); } - public void delete( String modifiedBy ) { - this.deleted = true; - this.modifiedBy = modifiedBy; - refresh(); - } - public boolean looksUnmodified( Metadata metadata ) { return modified == metadata.modified; } @@ -98,7 +88,6 @@ public String toString() { + "created=" + created + ", modified=" + modified + ", object=" + object - + ", deleted=" + deleted + ')'; } } diff --git a/oap-storage/oap-storage/src/main/java/oap/storage/Replicator.java b/oap-storage/oap-storage/src/main/java/oap/storage/Replicator.java index 16766cac5..0685494a8 100644 --- a/oap-storage/oap-storage/src/main/java/oap/storage/Replicator.java +++ b/oap-storage/oap-storage/src/main/java/oap/storage/Replicator.java @@ -58,12 +58,11 @@ public class Replicator implements Closeable { final AtomicLong replicatorCounterPartialSync = new AtomicLong(); final AtomicLong replicatorSizeFullSync = new AtomicLong(); final AtomicLong replicatorSizePartialSync = new AtomicLong(); - + final ReentrantLock lock = new ReentrantLock(); private final MemoryStorage slave; private final ReplicationMaster master; - final ReentrantLock lock = new ReentrantLock(); - private String uniqueName = Cuid.UNIQUE.next(); private final Scheduled scheduled; + private String uniqueName = Cuid.UNIQUE.next(); private transient long timestamp = -1L; private transient long hash = -1L; @@ -129,7 +128,7 @@ public Pair replicate( long timestamp ) { long lastUpdate = updatedSince.timestamp; if( updatedSince.type == FULL_SYNC ) { - slave.memory.removePermanently(); + slave.memory.clear(); } for( TransactionLog.Transaction> transaction : updatedSince.data ) { @@ -144,21 +143,16 @@ public Pair replicate( long timestamp ) { slave.memory.put( id, metadata ); } case UPDATE -> { - if( metadata.isDeleted() ) { - deleted.add( __io( id, metadata ) ); - slave.memory.removePermanently( id ); + if( updatedSince.type == FULL_SYNC ) { + added.add( __io( id, metadata ) ); } else { - if( updatedSince.type == FULL_SYNC ) { - added.add( __io( id, metadata ) ); - } else { - updated.add( __io( id, metadata ) ); - } - slave.memory.put( id, metadata ); + updated.add( __io( id, metadata ) ); } + slave.memory.put( id, metadata ); } case DELETE -> { deleted.add( __io( id, metadata ) ); - slave.memory.removePermanently( id ); + slave.memory.delete( id ); } } } diff --git a/oap-storage/oap-storage/src/main/java/oap/storage/Storage.java b/oap-storage/oap-storage/src/main/java/oap/storage/Storage.java index 03714c2a8..73de00da6 100644 --- a/oap-storage/oap-storage/src/main/java/oap/storage/Storage.java +++ b/oap-storage/oap-storage/src/main/java/oap/storage/Storage.java @@ -72,13 +72,9 @@ public interface Storage extends Iterable { boolean tryUpdate( @Nonnull Id id, @Nonnull Function tryUpdate, String modifiedBy ); - Optional delete( @Nonnull Id id, String modifiedBy ); + Optional delete( @Nonnull Id id ); - Optional> deleteMetadata( @Nonnull Id id, String modifiedBy ); - - Optional permanentlyDelete( @Nonnull Id id ); - - void permanentlyDelete(); + Optional> deleteMetadata( @Nonnull Id id ); void deleteAll(); diff --git a/oap-storage/oap-storage/src/test/java/oap/storage/ReplicatorTest.java b/oap-storage/oap-storage/src/test/java/oap/storage/ReplicatorTest.java index 468f99781..4e6f94ea6 100644 --- a/oap-storage/oap-storage/src/test/java/oap/storage/ReplicatorTest.java +++ b/oap-storage/oap-storage/src/test/java/oap/storage/ReplicatorTest.java @@ -90,7 +90,7 @@ public void changed( List> added, deletions.set( 0 ); updates.set( 0 ); addons.set( 0 ); - master.delete( "111", Storage.MODIFIED_BY_SYSTEM ); + master.delete( "111" ); master.store( new Bean( "222", "xyz" ), Storage.MODIFIED_BY_SYSTEM ); master.store( new Bean( "333", "ccc" ), Storage.MODIFIED_BY_SYSTEM ); assertEventually( 100, 50, () -> { @@ -217,13 +217,13 @@ public void testAddDeleteUpdateDeleteAdd() { replicator.replicateNow(); assertThat( slave.getNullable( "1" ) ).isNotNull(); - master.permanentlyDelete( "1" ); + master.delete( "1" ); master.store( new Bean( "1" ), Storage.MODIFIED_BY_SYSTEM ); master.update( "1", b -> { b.s = "2"; return b; }, Storage.MODIFIED_BY_SYSTEM ); - master.permanentlyDelete( "1" ); + master.delete( "1" ); master.store( new Bean( "1" ), Storage.MODIFIED_BY_SYSTEM ); replicator.replicateNow(); diff --git a/pom.xml b/pom.xml index ff2906be5..2b132d22c 100644 --- a/pom.xml +++ b/pom.xml @@ -57,7 +57,7 @@ - 25.1.4 + 25.2.0 25.0.1 25.0.0