Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -106,9 +106,11 @@ private boolean sendAsync( boolean shutdown ) {
}

@Override
public void log( ProtocolVersion version, String hostName, String filePreffix, Map<String, String> properties, String logType,
String[] headers, byte[][] types, byte[] buffer, int offset, int length ) {
buffers.put( new LogId( filePreffix, logType, hostName, properties, headers, types ), version, buffer, offset, length );
public String log( ProtocolVersion version, String hostName, String filePreffix, Map<String, String> properties, String logType,
String[] headers, byte[][] types, byte[] buffer, int offset, int length ) {
LogId logId = new LogId( filePreffix, logType, hostName, properties, headers, types );
buffers.put( logId, version, buffer, offset, length );
return logId.toString();
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,4 +73,11 @@ public void testSaveLoad() {
LogMetadata newLm = LogMetadata.readFor( file );
assertThat( newLm.getDateTime( "time" ) ).isEqualTo( dt );
}

@Test
public void testPathForDataFromMetadata() {
Path metadataFilePath = testDirectoryFixture.testPath( "file.gz.metadata.yaml" );

assertThat( LogMetadata.pathForDataFromMetadata( metadataFilePath ) ).isEqualTo( testDirectoryFixture.testPath( "file.gz" ) );
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -30,13 +30,16 @@
import java.util.Map;

public abstract class AbstractLoggerBackend implements Closeable {
public void log( ProtocolVersion version, String hostName, String filePreffix, Map<String, String> properties, String logType,
String[] headers, byte[][] types, byte[] buffer ) {
log( version, hostName, filePreffix, properties, logType, headers, types, buffer, 0, buffer.length );
public String log( ProtocolVersion version, String hostName, String filePreffix, Map<String, String> properties, String logType,
String[] headers, byte[][] types, byte[] buffer ) {
return log( version, hostName, filePreffix, properties, logType, headers, types, buffer, 0, buffer.length );
}

public abstract void log( ProtocolVersion version, String hostName, String filePreffix, Map<String, String> properties, String logType,
String[] headers, byte[][] types, byte[] buffer, int offset, int length );
/**
* @return log id ( file path, etc )
*/
public abstract String log( ProtocolVersion version, String hostName, String filePreffix, Map<String, String> properties, String logType,
String[] headers, byte[][] types, byte[] buffer, int offset, int length );

public abstract void close();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,9 +42,9 @@ public Logger( AbstractLoggerBackend backend ) {
this( backend, CURRENT_PROTOCOL_VERSION );
}

public void log( String filePreffix, Map<String, String> properties, String logType,
public String log( String filePreffix, Map<String, String> properties, String logType,
String[] headers, byte[][] types, byte[] row ) {
backend.log( protocolVersion, Inet.HOSTNAME, filePreffix, properties, logType, headers, types, row );
return backend.log( protocolVersion, Inet.HOSTNAME, filePreffix, properties, logType, headers, types, row );
}

public boolean isLoggingAvailable() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,11 +47,14 @@ public class MemoryLoggerBackend extends AbstractLoggerBackend {
private final LinkedHashMap<LogId, ByteArrayOutputStream> outputs = new LinkedHashMap<>();

@Override
public synchronized void log( ProtocolVersion version, String hostName, String filePreffix, Map<String, String> properties, String logType,
public synchronized String log( ProtocolVersion version, String hostName, String filePreffix, Map<String, String> properties, String logType,
String[] headers, byte[][] types, byte[] buffer, int offset, int length ) {
LogId logId = new LogId( filePreffix, logType, hostName, properties, headers, types );
outputs
.computeIfAbsent( new LogId( filePreffix, logType, hostName, properties, headers, types ), fn -> new ByteArrayOutputStream() )
.computeIfAbsent( logId, fn -> new ByteArrayOutputStream() )
.write( buffer, offset, length );

return logId.toString();
}

@Deprecated( forRemoval = true )
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,10 @@

public class NullLoggerBackend extends AbstractLoggerBackend {
@Override
public void log( ProtocolVersion protocolVersion, String hostName, String filePreffix, Map<String, String> properties, String logType,
String[] headers, byte[][] types, byte[] row, int offset, int length ) {
public String log( ProtocolVersion protocolVersion, String hostName, String filePreffix, Map<String, String> properties, String logType,
String[] headers, byte[][] types, byte[] row, int offset, int length ) {
LogId logId = new LogId( filePreffix, logType, hostName, properties, headers, types );
return logId.toString();
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,7 @@ public synchronized void write( ProtocolVersion protocolVersion, byte[] buffer )
write( protocolVersion, buffer, 0, buffer.length );
}

public abstract void write( ProtocolVersion protocolVersion, byte[] buffer, int offset, int length ) throws LoggerException;
public abstract String write( ProtocolVersion protocolVersion, byte[] buffer, int offset, int length ) throws LoggerException;

public synchronized void refresh() {
refresh( false );
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,17 +75,17 @@ public class DiskLoggerBackend extends AbstractLoggerBackend implements Cloneabl
public static final long DEFAULT_FREE_SPACE_REQUIRED = 2000000000L;
public final LinkedHashMap<String, FilePatternConfiguration> filePatternByType = new LinkedHashMap<>();
public final WriterConfiguration writerConfiguration;
private final Path logDirectory;
private final Timestamp timestamp;
private final int bufferSize;
private final LoadingCache<LogId, AbstractWriter<? extends Closeable>> writers;
private final ScheduledExecutorService pool;
public final Path logDirectory;
public final Timestamp timestamp;
public final int bufferSize;
public final LoadingCache<LogId, AbstractWriter<? extends Closeable>> writers;
public final ScheduledExecutorService pool;
public String filePattern = "/${YEAR}-${MONTH}/${DAY}/${LOG_TYPE}_v${LOG_VERSION}_${CLIENT_HOST}-${YEAR}-${MONTH}-${DAY}-${HOUR}-${INTERVAL}.tsv.gz";
public long requiredFreeSpace = DEFAULT_FREE_SPACE_REQUIRED;
public int maxVersions = 20;
public long refreshInitDelay = Dates.s( 10 );
public long refreshPeriod = Dates.s( 10 );
private volatile boolean closed;
public volatile boolean closed;

public DiskLoggerBackend( Path logDirectory, Timestamp timestamp, int bufferSize ) {
this( logDirectory, new WriterConfiguration(), timestamp, bufferSize );
Expand Down Expand Up @@ -160,7 +160,7 @@ private void filePatternValidation( String type, String filePattern ) {

@Override
@SneakyThrows
public void log( ProtocolVersion protocolVersion, String hostName, String filePreffix, Map<String, String> properties, String logType,
public String log( ProtocolVersion protocolVersion, String hostName, String filePreffix, Map<String, String> properties, String logType,
String[] headers, byte[][] types, byte[] buffer, int offset, int length ) {
if( closed ) {
throw new LoggerException( "already closed!" );
Expand All @@ -172,7 +172,7 @@ public void log( ProtocolVersion protocolVersion, String hostName, String filePr

log.trace( "logging {} bytes to {}", length, writer );
try {
writer.write( protocolVersion, buffer, offset, length );
return writer.write( protocolVersion, buffer, offset, length );
} catch( Exception e ) {
var headersWithTypes = new ArrayList<String>();
for( int i = 0; i < headers.length; i++ ) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonGetter;
import com.fasterxml.jackson.annotation.JsonIgnore;
import com.google.common.base.Preconditions;
import lombok.EqualsAndHashCode;
import lombok.ToString;
import oap.io.Files;
Expand All @@ -14,6 +15,7 @@
import org.joda.time.DateTime;

import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.ArrayList;
import java.util.LinkedHashMap;
import java.util.List;
Expand Down Expand Up @@ -64,6 +66,17 @@ public static Path pathFor( Path file ) {
return Path.of( file.toString() + EXTENSION );
}

public static Path pathFor( String file ) {
return Path.of( file + EXTENSION );
}

public static Path pathForDataFromMetadata( Path metadataPath ) {
Preconditions.checkArgument( isMetadata( metadataPath ) );

String metadataPathString = metadataPath.toString();
return Paths.get( metadataPathString.substring( 0, metadataPathString.indexOf( EXTENSION ) ) );
}

public static boolean isMetadata( Path filename ) {
return filename.toString().endsWith( EXTENSION );
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@
import java.util.HashMap;
import java.util.LinkedHashSet;
import java.util.List;
import java.util.concurrent.locks.ReentrantLock;
import java.util.function.Function;

import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.BINARY;
Expand Down Expand Up @@ -89,6 +90,7 @@ public class ParquetLogWriter extends AbstractWriter<org.apache.parquet.hadoop.P
// types.put( Types.ENUM.id, _ -> org.apache.parquet.schema.Types.required( BINARY ).as( LogicalTypeAnnotation.stringType() ) );
}

private final ReentrantLock lock = new ReentrantLock();
private final MessageType messageType;
private final WriterConfiguration.ParquetConfiguration configuration;
private final LinkedHashSet<String> excludeFields = new LinkedHashSet<>();
Expand Down Expand Up @@ -168,14 +170,15 @@ private static void addValue( int col, Object obj, byte[] colType, int typeIdx,
}

@Override
public synchronized void write( ProtocolVersion protocolVersion, byte[] buffer, int offset, int length ) throws LoggerException {
public String write( ProtocolVersion protocolVersion, byte[] buffer, int offset, int length ) throws LoggerException {
if( protocolVersion.version < ProtocolVersion.BINARY_V2.version ) {
throw new InvalidProtocolVersionException( "parquet", protocolVersion.version );
}

if( closed ) {
throw new LoggerException( "writer is already closed!" );
}
lock.lock();
try {
refresh();
Path filename = filename();
Expand All @@ -200,8 +203,7 @@ public synchronized void write( ProtocolVersion protocolVersion, byte[] buffer,
log.info( "[{}] file exists v{}", filename, fileVersion );
fileVersion += 1;
if( fileVersion > maxVersions ) throw new IllegalStateException( "version > " + maxVersions );
write( protocolVersion, buffer, offset, length );
return;
return write( protocolVersion, buffer, offset, length );
}
log.trace( "writing {} bytes to {}", length, this );
if( protocolVersion == ProtocolVersion.BINARY_V2 ) {
Expand All @@ -211,6 +213,8 @@ public synchronized void write( ProtocolVersion protocolVersion, byte[] buffer,
} else {
throw new IllegalArgumentException( "Unknown protocol version: " + protocolVersion );
}

return filename.toString();
} catch( IOException e ) {
log.error( e.getMessage(), e );
try {
Expand All @@ -220,6 +224,8 @@ public synchronized void write( ProtocolVersion protocolVersion, byte[] buffer,
out = null;
}
throw new LoggerException( e );
} finally {
lock.unlock();
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,20 +61,20 @@ public synchronized void write( ProtocolVersion protocolVersion, byte[] buffer )
}

@Override
public synchronized void write( ProtocolVersion protocolVersion, byte[] buffer, int offset, int length ) throws LoggerException {
public synchronized String write( ProtocolVersion protocolVersion, byte[] buffer, int offset, int length ) throws LoggerException {
if( closed ) {
throw new LoggerException( "writer is already closed!" );
}

switch( protocolVersion ) {
return switch( protocolVersion ) {
case TSV_V1 -> writeTsvV1( protocolVersion, buffer, offset, length );
case BINARY_V2 -> writeBinaryV2( protocolVersion, buffer, offset, length );
case ROW_BINARY_V3 -> writeBinaryV3( protocolVersion, buffer, offset, length );
default -> throw new InvalidProtocolVersionException( "tsv", protocolVersion.version );
}
};
}

private void writeTsvV1( ProtocolVersion protocolVersion, byte[] buffer, int offset, int length ) {
private String writeTsvV1( ProtocolVersion protocolVersion, byte[] buffer, int offset, int length ) {
try {
refresh();
Path filename = filename();
Expand All @@ -93,13 +93,14 @@ private void writeTsvV1( ProtocolVersion protocolVersion, byte[] buffer, int off
log.debug( "[{}] file exists v{}", filename, fileVersion );
fileVersion += 1;
if( fileVersion > maxVersions ) throw new IllegalStateException( "version > " + maxVersions );
write( protocolVersion, buffer, offset, length );
return;
return write( protocolVersion, buffer, offset, length );
}
log.trace( "writing {} bytes to {}", length, this );

out.write( buffer, offset, length );

return filename.toString();

} catch( IOException e ) {
log.error( e.getMessage(), e );
try {
Expand All @@ -113,7 +114,7 @@ private void writeTsvV1( ProtocolVersion protocolVersion, byte[] buffer, int off

}

private void writeBinaryV2( ProtocolVersion protocolVersion, byte[] buffer, int offset, int length ) {
private String writeBinaryV2( ProtocolVersion protocolVersion, byte[] buffer, int offset, int length ) {
try {
refresh();
Path filename = filename();
Expand All @@ -132,13 +133,13 @@ private void writeBinaryV2( ProtocolVersion protocolVersion, byte[] buffer, int
log.debug( "[{}] file exists v{}", filename, fileVersion );
fileVersion += 1;
if( fileVersion > maxVersions ) throw new IllegalStateException( "version > " + maxVersions );
write( protocolVersion, buffer, offset, length );
return;
return write( protocolVersion, buffer, offset, length );
}
log.trace( "writing {} bytes to {}", length, this );

convertToTsvV2( buffer, offset, length, line -> out.write( line ) );

return filename.toString();
} catch( IOException e ) {
log.error( e.getMessage(), e );
try {
Expand All @@ -151,7 +152,7 @@ private void writeBinaryV2( ProtocolVersion protocolVersion, byte[] buffer, int
}
}

private void writeBinaryV3( ProtocolVersion protocolVersion, byte[] buffer, int offset, int length ) {
private String writeBinaryV3( ProtocolVersion protocolVersion, byte[] buffer, int offset, int length ) {
try {
refresh();
Path filename = filename();
Expand All @@ -170,13 +171,13 @@ private void writeBinaryV3( ProtocolVersion protocolVersion, byte[] buffer, int
log.debug( "[{}] file exists v{}", filename, fileVersion );
fileVersion += 1;
if( fileVersion > maxVersions ) throw new IllegalStateException( "version > " + maxVersions );
write( protocolVersion, buffer, offset, length );
return;
return write( protocolVersion, buffer, offset, length );
}
log.trace( "writing {} bytes to {}", length, this );

convertToTsvV3( buffer, offset, length, line -> out.write( line ), logId.headers, logId.types );

return filename.toString();
} catch( IOException e ) {
log.error( e.getMessage(), e );
try {
Expand Down
4 changes: 3 additions & 1 deletion oap-stdlib/src/main/java/oap/json/Binder.java
Original file line number Diff line number Diff line change
Expand Up @@ -435,8 +435,10 @@ public <T> T unmarshalFromAny( Class<T> clazz, Object any ) throws JsonException
public <T> T unmarshal( Class<T> clazz, Path path ) throws JsonException {
try( var in = IoStreams.in( path ) ) {
return unmarshal( clazz, in );
} catch( JsonException e ) {
throw new JsonException( "Cannot deserialize to class: " + clazz.getCanonicalName() + " from path " + path, e.getCause() );
} catch( IOException e ) {
throw new JsonException( "Cannot deserialize to class: " + clazz.getCanonicalName(), e );
throw new JsonException( "Cannot deserialize to class: " + clazz.getCanonicalName() + " from path " + path, e );
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,8 @@
import oap.util.CircularFifoQueue;
import oap.util.Lists;
import org.joda.time.DateTimeUtils;
import org.jspecify.annotations.NonNull;

import javax.annotation.Nonnull;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.LinkedHashMap;
Expand Down Expand Up @@ -90,7 +90,7 @@ public ReplicationResult<Id, Metadata<T>> updatedSince( long timestamp, long has
}
}

private @NonNull ReplicationResult<Id, Metadata<T>> fullSync( Set<Map.Entry<Id, Metadata<T>>> fullData, long t ) {
private @Nonnull ReplicationResult<Id, Metadata<T>> fullSync( Set<Map.Entry<Id, Metadata<T>>> fullData, long t ) {
return new ReplicationResult<>(
this.timestamp.longValue(),
this.hash,
Expand Down
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
package oap.storage;

import oap.util.Lists;
import org.jspecify.annotations.NonNull;

import javax.annotation.Nonnull;
import java.util.Map;
import java.util.Set;

Expand All @@ -27,7 +27,7 @@ public ReplicationResult<Id, Metadata<T>> updatedSince( long timestamp, long has
throw new IllegalStateException();
}

private @NonNull ReplicationResult<Id, Metadata<T>> fullSync( Set<Map.Entry<Id, Metadata<T>>> fullData ) {
private @Nonnull ReplicationResult<Id, Metadata<T>> fullSync( Set<Map.Entry<Id, Metadata<T>>> fullData ) {
return new ReplicationResult<>( -1, 0L, ReplicationResult.ReplicationStatusType.FULL_SYNC, Lists.map( fullData, d -> new Transaction<>( 0L, Operation.UPDATE, d.getKey(), d.getValue() ) ) );
}
}
2 changes: 1 addition & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@
</distributionManagement>

<properties>
<oap.project.version>25.2.0</oap.project.version>
<oap.project.version>25.2.1</oap.project.version>

<oap.deps.config.version>25.0.1</oap.deps.config.version>
<oap.deps.oap-teamcity.version>25.0.0</oap.deps.oap-teamcity.version>
Expand Down