Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
291 changes: 291 additions & 0 deletions oap-stdlib/src/main/java/oap/util/CircularFifoQueue.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,291 @@
package oap.util;

import org.apache.commons.lang3.NotImplementedException;

import javax.annotation.Nonnull;
import java.io.IOException;
import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;
import java.io.Serial;
import java.io.Serializable;
import java.util.AbstractCollection;
import java.util.Arrays;
import java.util.Collection;
import java.util.Iterator;
import java.util.NoSuchElementException;
import java.util.Objects;
import java.util.Queue;

public class CircularFifoQueue<T> extends AbstractCollection<T> implements Queue<T>, Serializable {
@Serial
private static final long serialVersionUID = 7342311080952645978L;
private final int maxElements;
private transient T[] elements;
private transient int start;
private transient int end;
private transient boolean full;

public CircularFifoQueue( Collection<? extends T> coll ) {
this( coll.size() );
this.addAll( coll );
}

@SuppressWarnings( "unchecked" )
public CircularFifoQueue( int size ) {
if( size <= 0 ) {
throw new IllegalArgumentException( "The size must be greater than 0" );
} else {
this.elements = ( T[] ) ( new Object[size] );
this.maxElements = this.elements.length;
}
}

public boolean add( T element ) {
Objects.requireNonNull( element, "element" );

if( this.isAtFullCapacity() ) {
this.remove();
}

this.elements[this.end++] = element;
if( this.end >= this.maxElements ) {
this.end = 0;
}

if( this.end == this.start ) {
this.full = true;
}

return true;
}

public void clear() {
this.full = false;
this.start = 0;
this.end = 0;
Arrays.fill( this.elements, ( Object ) null );
}

@SuppressWarnings( "checkstyle:ParameterAssignment" )
private int decrement( int index ) {
--index;
if( index < 0 ) {
index = this.maxElements - 1;
}

return index;
}

public T element() {
if( this.isEmpty() ) {
throw new NoSuchElementException( "queue is empty" );
} else {
return ( T ) this.peek();
}
}

public T get( int index ) {
int sz = this.size();
if( index >= 0 && index < sz ) {
int idx = ( this.start + index ) % this.maxElements;
return ( T ) this.elements[idx];
} else {
throw new NoSuchElementException( String.format( "The specified index %1$d is outside the available range [0, %2$d)", index, sz ) );
}
}

@SuppressWarnings( "checkstyle:ParameterAssignment" )
private int increment( int index ) {
++index;
if( index >= this.maxElements ) {
index = 0;
}

return index;
}

public boolean isAtFullCapacity() {
return this.size() == this.maxElements;
}

public boolean isEmpty() {
return this.size() == 0;
}

public boolean isFull() {
return false;
}

public int maxSize() {
return this.maxElements;
}

public boolean offer( T element ) {
return this.add( element );
}

public T peek() {
return ( T ) ( this.isEmpty() ? null : this.elements[this.start] );
}

public T poll() {
return ( T ) ( this.isEmpty() ? null : this.remove() );
}

@SuppressWarnings( "unchecked" )
private void readObject( ObjectInputStream in ) throws IOException, ClassNotFoundException {
in.defaultReadObject();
this.elements = ( T[] ) ( new Object[this.maxElements] );
int size = in.readInt();

for( int i = 0; i < size; ++i ) {
this.elements[i] = ( T ) in.readObject();
}

this.start = 0;
this.full = size == this.maxElements;
if( this.full ) {
this.end = 0;
} else {
this.end = size;
}

}

public T remove() {
if( this.isEmpty() ) {
throw new NoSuchElementException( "queue is empty" );
} else {
T element = this.elements[this.start];
if( null != element ) {
this.elements[this.start++] = null;
if( this.start >= this.maxElements ) {
this.start = 0;
}

this.full = false;
}

return element;
}
}

public int size() {
int size;
if( this.end < this.start ) {
size = this.maxElements - this.start + this.end;
} else if( this.end == this.start ) {
size = this.full ? this.maxElements : 0;
} else {
size = this.end - this.start;
}

return size;
}

@Serial
private void writeObject( ObjectOutputStream out ) throws IOException {
out.defaultWriteObject();
out.writeInt( this.size() );

for( T e : this ) {
out.writeObject( e );
}

}

@Nonnull
@Override
public Iterator<T> iterator() {
return new Iterator<>() {
private int index;
private int lastReturnedIndex;
private boolean isFirst;

{
this.index = start;
this.lastReturnedIndex = -1;
this.isFirst = full;
}

public boolean hasNext() {
return this.isFirst || this.index != end;
}

public T next() {
if( !this.hasNext() ) {
throw new NoSuchElementException();
} else {
this.isFirst = false;
this.lastReturnedIndex = this.index;
this.index = increment( this.index );
return elements[this.lastReturnedIndex];
}
}

public void remove() {
if( this.lastReturnedIndex == -1 ) {
throw new IllegalStateException();
} else if( this.lastReturnedIndex == start ) {
remove();
this.lastReturnedIndex = -1;
} else {
int pos = this.lastReturnedIndex + 1;
if( start < this.lastReturnedIndex && pos < end ) {
System.arraycopy( elements, pos, elements, this.lastReturnedIndex, end - pos );
} else {
while( pos != end ) {
if( pos >= maxElements ) {
elements[pos - 1] = elements[0];
pos = 0;
} else {
elements[decrement( pos )] = elements[pos];
pos = increment( pos );
}
}
}

this.lastReturnedIndex = -1;
end = decrement( end );
elements[end] = null;
full = false;
this.index = decrement( this.index );
}
}
};
}

@Nonnull
public Iterator<T> reverseIterator() {
return new Iterator<>() {
private int index;
private int lastReturnedIndex;
private boolean isLast;

{
this.index = end;
this.lastReturnedIndex = -1;
this.isLast = full;
}

public boolean hasNext() {
return this.isLast || this.index != start;
}

public T next() {
if( !this.hasNext() ) {
throw new NoSuchElementException();
} else {
this.isLast = false;
this.lastReturnedIndex = decrement( this.index );
this.index = decrement( this.index );
return elements[this.lastReturnedIndex];
}
}

public void remove() {
throw new NotImplementedException();
}
};
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -181,6 +181,21 @@ public Optional<Metadata<Data>> getMetadata( @Nonnull Id id ) {
return memory.get( id );
}

@Nullable
public Metadata<Data> getMetadataNullable( @Nonnull Id id ) {
return memory.getNullable( id );
}

@Nullable
@Override
public Data getNullable( @Nonnull Id id ) {
Metadata<Data> metadata = getMetadataNullable( id );
if( metadata != null ) {
return metadata.object;
}
return null;
}

@Override
public Optional<Data> get( @Nonnull Id id ) {
return getMetadata( id ).map( m -> m.object );
Expand Down Expand Up @@ -354,11 +369,20 @@ public BiStream<I, Metadata<T>> selectUpdatedSince( long since ) {
}

public Optional<Metadata<T>> get( @Nonnull I id ) {
requireNonNull( id );
return Optional.ofNullable( data.get( id ) )
.filter( m -> !m.isDeleted() );
}

@Nullable
public Metadata<T> getNullable( @Nonnull I id ) {
Metadata<T> metadata = data.get( id );
if( metadata != null && !metadata.isDeleted() ) {
return metadata;
}

return null;
}

public boolean put( @Nonnull I id, @Nonnull Metadata<T> m ) {
requireNonNull( id );
requireNonNull( m );
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import oap.util.Stream;

import javax.annotation.Nonnull;
import javax.annotation.Nullable;
import java.io.Serial;
import java.io.Serializable;
import java.util.Collection;
Expand All @@ -50,6 +51,9 @@ public interface Storage<Id, Data> extends Iterable<Data> {

List<Metadata<Data>> listMetadata();

@Nullable
Data getNullable( @Nonnull Id id );

Optional<Data> get( @Nonnull Id id );

Data get( Id id, @Nonnull Supplier<Data> init, String modifiedBy );
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@

import java.io.Serial;
import java.io.Serializable;
import java.util.List;
import java.util.Collection;
import java.util.Map;
import java.util.Set;

Expand Down Expand Up @@ -47,9 +47,9 @@ class ReplicationResult<Id, T> implements Serializable {
public final long timestamp;
public final long hash;
public final ReplicationStatusType type;
public final List<Transaction<Id, T>> data;
public final Collection<Transaction<Id, T>> data;

public ReplicationResult( long timestamp, long hash, ReplicationStatusType type, List<Transaction<Id, T>> data ) {
public ReplicationResult( long timestamp, long hash, ReplicationStatusType type, Collection<Transaction<Id, T>> data ) {
this.timestamp = timestamp;
this.hash = hash;
this.type = type;
Expand Down
Loading