Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,7 @@ public void delete() {
Bean bean1 = storage.store( new Bean( "test1" ), Storage.MODIFIED_BY_SYSTEM );
storage.store( new Bean( "test2" ), Storage.MODIFIED_BY_SYSTEM );

storage.delete( bean1.id, Storage.MODIFIED_BY_SYSTEM );
storage.delete( bean1.id );
// one bean is removed, one is left
assertEventually( 100, 100, () -> assertThat( persistence.collection.countDocuments() ).isEqualTo( 1 ) );
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -154,7 +154,7 @@ public void fsync() {

updatedSince.data.forEach( t -> {
updated.incrementAndGet();
if( t.operation == TransactionLog.Operation.DELETE || t.object.isDeleted() ) {
if( t.operation == TransactionLog.Operation.DELETE ) {
deletedIds.add( t.id );
list.add( new DeleteOneModel<>( eq( "_id", t.id ) ) );
} else {
Expand All @@ -175,7 +175,7 @@ private void persist( List<I> deletedIds, List<WriteModel<Metadata<T>>> list ) {
if( list.isEmpty() ) return;
try {
collection.bulkWrite( list, new BulkWriteOptions().ordered( false ) );
deletedIds.forEach( storage.memory::removePermanently );
deletedIds.forEach( storage.memory::delete );
list.clear();
deletedIds.clear();
} catch( Exception e ) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,7 @@ public void preStart() {
}

protected Optional<T> deleteById( String id ) {
return storage.delete( storage.identifier.fromString( id ), Storage.MODIFIED_BY_SYSTEM );
return storage.delete( storage.identifier.fromString( id ) );
}

protected abstract void load();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@
import oap.storage.Storage.DataListener.IdObject;
import oap.util.BiStream;
import oap.util.Lists;
import oap.util.Pair;
import oap.util.Stream;
import org.apache.commons.lang3.mutable.MutableObject;

Expand Down Expand Up @@ -72,24 +71,12 @@ public MemoryStorage( Identifier<Id, Data> identifier, Lock lock, int transactio

@Override
public Stream<Data> select() {
return select( true );
}

public Stream<Data> select( boolean liveOnly ) {
return selectMetadata( liveOnly ).map( metadata -> metadata.object );
}

public Stream<Metadata<Data>> selectMetadata( boolean liveOnly ) {
return ( liveOnly ? memory.selectLive() : memory.selectAll() ).map( p -> p._2 );
return selectMetadata().map( metadata -> metadata.object );
}

@Override
public Stream<Metadata<Data>> selectMetadata() {
return selectMetadata( true );
}

Stream<Data> selectAll() {
return memory.selectAll().map( p -> p._2.object );
return memory.selectAll().map( p -> p._2 );
}

@Override
Expand Down Expand Up @@ -209,37 +196,22 @@ public Data get( Id id, @Nonnull Supplier<Data> init, String modifiedBy ) {

@Override
public void deleteAll() {
fireDeleted( Lists.map( memory.markDeletedAll(), p -> IdObject.__io( p._1, p._2 ) ) );
HashMap<Id, Metadata<Data>> map = memory.deleteAll();
fireDeleted( Lists.map( map.entrySet(), p -> IdObject.__io( p.getKey(), p.getValue() ) ) );
}

@Override
public Optional<Data> delete( @Nonnull Id id, String modifiedBy ) {
return deleteMetadata( id, modifiedBy ).map( m -> m.object );
public Optional<Data> delete( @Nonnull Id id ) {
return deleteMetadata( id ).map( m -> m.object );
}

@Override
public Optional<Metadata<Data>> deleteMetadata( @Nonnull Id id, String modifiedBy ) {
requireNonNull( id );
Optional<Metadata<Data>> old = memory.markDeleted( id, modifiedBy );
public Optional<Metadata<Data>> deleteMetadata( @Nonnull Id id ) {
Optional<Metadata<Data>> old = memory.delete( id );
old.ifPresent( o -> fireDeleted( id, o ) );
return old;
}

@Override
public Optional<Data> permanentlyDelete( @Nonnull Id id ) {
requireNonNull( id );
Optional<Metadata<Data>> old = memory.removePermanently( id );
old.ifPresent( o -> firePermanentlyDeleted( id, o ) );
return old.map( m -> m.object );
}

@Override
public void permanentlyDelete() {
HashMap<Id, Metadata<Data>> oldData = memory.removePermanently();

oldData.forEach( this::firePermanentlyDeleted );
}

@Override
public long size() {
return memory.selectLiveIds().count();
Expand Down Expand Up @@ -356,10 +328,6 @@ public Memory( Lock lock ) {
this.lock = lock;
}

public BiStream<I, Metadata<T>> selectLive() {
return BiStream.of( data ).filter( ( _, m ) -> !m.isDeleted() );
}

public BiStream<I, Metadata<T>> selectAll() {
return BiStream.of( data );
}
Expand All @@ -369,18 +337,12 @@ public BiStream<I, Metadata<T>> selectUpdatedSince( long since ) {
}

public Optional<Metadata<T>> get( @Nonnull I id ) {
return Optional.ofNullable( data.get( id ) )
.filter( m -> !m.isDeleted() );
return Optional.ofNullable( data.get( id ) );
}

@Nullable
public Metadata<T> getNullable( @Nonnull I id ) {
Metadata<T> metadata = data.get( id );
if( metadata != null && !metadata.isDeleted() ) {
return metadata;
}

return null;
return data.get( id );
}

public boolean put( @Nonnull I id, @Nonnull Metadata<T> m ) {
Expand Down Expand Up @@ -443,27 +405,11 @@ public Optional<Metadata<T>> tryRemap( I id, Function<T, T> tryUpdate, String mo
} );
}

public List<Pair<I, Metadata<T>>> markDeletedAll() {
List<Pair<I, Metadata<T>>> ms = selectLive().toList();
ms.forEach( p -> p._2.delete( Storage.MODIFIED_BY_SYSTEM ) );
return ms;
}

public Optional<Metadata<T>> markDeleted( @Nonnull I id, String modifiedBy ) {
return lock.synchronizedOn( id, () -> {
Metadata<T> metadata = data.get( id );
if( metadata != null ) {
metadata.delete( modifiedBy );
return Optional.of( metadata );
} else return Optional.empty();
} );
}

public Optional<Metadata<T>> removePermanently( @Nonnull I id ) {
public Optional<Metadata<T>> delete( @Nonnull I id ) {
return Optional.ofNullable( data.remove( id ) );
}

public HashMap<I, Metadata<T>> removePermanently() {
public HashMap<I, Metadata<T>> deleteAll() {
HashMap<I, Metadata<T>> oldData = new HashMap<>( data );
data.clear();

Expand All @@ -475,7 +421,7 @@ public void clear() {
}

public Stream<I> selectLiveIds() {
return selectLive().mapToObj( ( id, _ ) -> id );
return selectAll().mapToObj( ( id, _ ) -> id );
}
}
}
11 changes: 0 additions & 11 deletions oap-storage/oap-storage/src/main/java/oap/storage/Metadata.java
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@
import com.fasterxml.jackson.annotation.JsonTypeInfo;
import com.fasterxml.jackson.databind.annotation.JsonTypeIdResolver;
import lombok.EqualsAndHashCode;
import lombok.Getter;
import oap.json.TypeIdFactory;
import org.joda.time.DateTimeUtils;

Expand All @@ -47,8 +46,6 @@ public class Metadata<T> implements Serializable {
@JsonTypeIdResolver( TypeIdFactory.class )
@JsonTypeInfo( use = JsonTypeInfo.Id.CUSTOM, include = JsonTypeInfo.As.EXTERNAL_PROPERTY, property = "object:type" )
public T object;
@Getter
private boolean deleted = false;

@JsonCreator
protected Metadata( T object, String createdBy ) {
Expand All @@ -69,7 +66,6 @@ public static <T> Metadata<T> from( Metadata<T> metadata ) {

public Metadata<T> update( T t, String modifiedBy ) {
this.object = t;
this.deleted = false;
if( this.createdBy == null ) {
this.createdBy = modifiedBy;
}
Expand All @@ -82,12 +78,6 @@ public void refresh() {
this.modified = DateTimeUtils.currentTimeMillis();
}

public void delete( String modifiedBy ) {
this.deleted = true;
this.modifiedBy = modifiedBy;
refresh();
}

public boolean looksUnmodified( Metadata<T> metadata ) {
return modified == metadata.modified;
}
Expand All @@ -98,7 +88,6 @@ public String toString() {
+ "created=" + created
+ ", modified=" + modified
+ ", object=" + object
+ ", deleted=" + deleted
+ ')';
}
}
22 changes: 8 additions & 14 deletions oap-storage/oap-storage/src/main/java/oap/storage/Replicator.java
Original file line number Diff line number Diff line change
Expand Up @@ -58,12 +58,11 @@ public class Replicator<I, T> implements Closeable {
final AtomicLong replicatorCounterPartialSync = new AtomicLong();
final AtomicLong replicatorSizeFullSync = new AtomicLong();
final AtomicLong replicatorSizePartialSync = new AtomicLong();

final ReentrantLock lock = new ReentrantLock();
private final MemoryStorage<I, T> slave;
private final ReplicationMaster<I, T> master;
final ReentrantLock lock = new ReentrantLock();
private String uniqueName = Cuid.UNIQUE.next();
private final Scheduled scheduled;
private String uniqueName = Cuid.UNIQUE.next();
private transient long timestamp = -1L;
private transient long hash = -1L;

Expand Down Expand Up @@ -129,7 +128,7 @@ public Pair<Long, Long> replicate( long timestamp ) {
long lastUpdate = updatedSince.timestamp;

if( updatedSince.type == FULL_SYNC ) {
slave.memory.removePermanently();
slave.memory.clear();
}

for( TransactionLog.Transaction<I, Metadata<T>> transaction : updatedSince.data ) {
Expand All @@ -144,21 +143,16 @@ public Pair<Long, Long> replicate( long timestamp ) {
slave.memory.put( id, metadata );
}
case UPDATE -> {
if( metadata.isDeleted() ) {
deleted.add( __io( id, metadata ) );
slave.memory.removePermanently( id );
if( updatedSince.type == FULL_SYNC ) {
added.add( __io( id, metadata ) );
} else {
if( updatedSince.type == FULL_SYNC ) {
added.add( __io( id, metadata ) );
} else {
updated.add( __io( id, metadata ) );
}
slave.memory.put( id, metadata );
updated.add( __io( id, metadata ) );
}
slave.memory.put( id, metadata );
}
case DELETE -> {
deleted.add( __io( id, metadata ) );
slave.memory.removePermanently( id );
slave.memory.delete( id );
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,13 +72,9 @@ public interface Storage<Id, Data> extends Iterable<Data> {

boolean tryUpdate( @Nonnull Id id, @Nonnull Function<Data, Data> tryUpdate, String modifiedBy );

Optional<Data> delete( @Nonnull Id id, String modifiedBy );
Optional<Data> delete( @Nonnull Id id );

Optional<Metadata<Data>> deleteMetadata( @Nonnull Id id, String modifiedBy );

Optional<Data> permanentlyDelete( @Nonnull Id id );

void permanentlyDelete();
Optional<Metadata<Data>> deleteMetadata( @Nonnull Id id );

void deleteAll();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ public void changed( List<Storage.DataListener.IdObject<String, Bean>> added,
deletions.set( 0 );
updates.set( 0 );
addons.set( 0 );
master.delete( "111", Storage.MODIFIED_BY_SYSTEM );
master.delete( "111" );
master.store( new Bean( "222", "xyz" ), Storage.MODIFIED_BY_SYSTEM );
master.store( new Bean( "333", "ccc" ), Storage.MODIFIED_BY_SYSTEM );
assertEventually( 100, 50, () -> {
Expand Down Expand Up @@ -217,13 +217,13 @@ public void testAddDeleteUpdateDeleteAdd() {
replicator.replicateNow();
assertThat( slave.getNullable( "1" ) ).isNotNull();

master.permanentlyDelete( "1" );
master.delete( "1" );
master.store( new Bean( "1" ), Storage.MODIFIED_BY_SYSTEM );
master.update( "1", b -> {
b.s = "2";
return b;
}, Storage.MODIFIED_BY_SYSTEM );
master.permanentlyDelete( "1" );
master.delete( "1" );
master.store( new Bean( "1" ), Storage.MODIFIED_BY_SYSTEM );

replicator.replicateNow();
Expand Down
2 changes: 1 addition & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@
</distributionManagement>

<properties>
<oap.project.version>25.1.4</oap.project.version>
<oap.project.version>25.2.0</oap.project.version>

<oap.deps.config.version>25.0.1</oap.deps.config.version>
<oap.deps.oap-teamcity.version>25.0.0</oap.deps.oap-teamcity.version>
Expand Down