diff --git a/oap-formats/oap-logstream/oap-logstream-net-client/src/main/java/oap/logstream/net/client/SocketLoggerBackend.java b/oap-formats/oap-logstream/oap-logstream-net-client/src/main/java/oap/logstream/net/client/SocketLoggerBackend.java index 93ac25ed5b..d0008a343a 100644 --- a/oap-formats/oap-logstream/oap-logstream-net-client/src/main/java/oap/logstream/net/client/SocketLoggerBackend.java +++ b/oap-formats/oap-logstream/oap-logstream-net-client/src/main/java/oap/logstream/net/client/SocketLoggerBackend.java @@ -106,9 +106,11 @@ private boolean sendAsync( boolean shutdown ) { } @Override - public void log( ProtocolVersion version, String hostName, String filePreffix, Map properties, String logType, - String[] headers, byte[][] types, byte[] buffer, int offset, int length ) { - buffers.put( new LogId( filePreffix, logType, hostName, properties, headers, types ), version, buffer, offset, length ); + public String log( ProtocolVersion version, String hostName, String filePreffix, Map properties, String logType, + String[] headers, byte[][] types, byte[] buffer, int offset, int length ) { + LogId logId = new LogId( filePreffix, logType, hostName, properties, headers, types ); + buffers.put( logId, version, buffer, offset, length ); + return logId.toString(); } @Override diff --git a/oap-formats/oap-logstream/oap-logstream-test/src/test/java/oap/logstream/disk/LogMetadataTest.java b/oap-formats/oap-logstream/oap-logstream-test/src/test/java/oap/logstream/disk/LogMetadataTest.java index b9764de410..d8d51cd098 100644 --- a/oap-formats/oap-logstream/oap-logstream-test/src/test/java/oap/logstream/disk/LogMetadataTest.java +++ b/oap-formats/oap-logstream/oap-logstream-test/src/test/java/oap/logstream/disk/LogMetadataTest.java @@ -73,4 +73,11 @@ public void testSaveLoad() { LogMetadata newLm = LogMetadata.readFor( file ); assertThat( newLm.getDateTime( "time" ) ).isEqualTo( dt ); } + + @Test + public void testPathForDataFromMetadata() { + Path metadataFilePath = testDirectoryFixture.testPath( "file.gz.metadata.yaml" ); + + assertThat( LogMetadata.pathForDataFromMetadata( metadataFilePath ) ).isEqualTo( testDirectoryFixture.testPath( "file.gz" ) ); + } } diff --git a/oap-formats/oap-logstream/oap-logstream/src/main/java/oap/logstream/AbstractLoggerBackend.java b/oap-formats/oap-logstream/oap-logstream/src/main/java/oap/logstream/AbstractLoggerBackend.java index 44caffba95..bc01854a70 100644 --- a/oap-formats/oap-logstream/oap-logstream/src/main/java/oap/logstream/AbstractLoggerBackend.java +++ b/oap-formats/oap-logstream/oap-logstream/src/main/java/oap/logstream/AbstractLoggerBackend.java @@ -30,13 +30,16 @@ import java.util.Map; public abstract class AbstractLoggerBackend implements Closeable { - public void log( ProtocolVersion version, String hostName, String filePreffix, Map properties, String logType, - String[] headers, byte[][] types, byte[] buffer ) { - log( version, hostName, filePreffix, properties, logType, headers, types, buffer, 0, buffer.length ); + public String log( ProtocolVersion version, String hostName, String filePreffix, Map properties, String logType, + String[] headers, byte[][] types, byte[] buffer ) { + return log( version, hostName, filePreffix, properties, logType, headers, types, buffer, 0, buffer.length ); } - public abstract void log( ProtocolVersion version, String hostName, String filePreffix, Map properties, String logType, - String[] headers, byte[][] types, byte[] buffer, int offset, int length ); + /** + * @return log id ( file path, etc ) + */ + public abstract String log( ProtocolVersion version, String hostName, String filePreffix, Map properties, String logType, + String[] headers, byte[][] types, byte[] buffer, int offset, int length ); public abstract void close(); diff --git a/oap-formats/oap-logstream/oap-logstream/src/main/java/oap/logstream/Logger.java b/oap-formats/oap-logstream/oap-logstream/src/main/java/oap/logstream/Logger.java index 185b953087..5bdb84f49a 100644 --- a/oap-formats/oap-logstream/oap-logstream/src/main/java/oap/logstream/Logger.java +++ b/oap-formats/oap-logstream/oap-logstream/src/main/java/oap/logstream/Logger.java @@ -42,9 +42,9 @@ public Logger( AbstractLoggerBackend backend ) { this( backend, CURRENT_PROTOCOL_VERSION ); } - public void log( String filePreffix, Map properties, String logType, + public String log( String filePreffix, Map properties, String logType, String[] headers, byte[][] types, byte[] row ) { - backend.log( protocolVersion, Inet.HOSTNAME, filePreffix, properties, logType, headers, types, row ); + return backend.log( protocolVersion, Inet.HOSTNAME, filePreffix, properties, logType, headers, types, row ); } public boolean isLoggingAvailable() { diff --git a/oap-formats/oap-logstream/oap-logstream/src/main/java/oap/logstream/MemoryLoggerBackend.java b/oap-formats/oap-logstream/oap-logstream/src/main/java/oap/logstream/MemoryLoggerBackend.java index 5796697619..72d1a41a26 100644 --- a/oap-formats/oap-logstream/oap-logstream/src/main/java/oap/logstream/MemoryLoggerBackend.java +++ b/oap-formats/oap-logstream/oap-logstream/src/main/java/oap/logstream/MemoryLoggerBackend.java @@ -47,11 +47,14 @@ public class MemoryLoggerBackend extends AbstractLoggerBackend { private final LinkedHashMap outputs = new LinkedHashMap<>(); @Override - public synchronized void log( ProtocolVersion version, String hostName, String filePreffix, Map properties, String logType, + public synchronized String log( ProtocolVersion version, String hostName, String filePreffix, Map properties, String logType, String[] headers, byte[][] types, byte[] buffer, int offset, int length ) { + LogId logId = new LogId( filePreffix, logType, hostName, properties, headers, types ); outputs - .computeIfAbsent( new LogId( filePreffix, logType, hostName, properties, headers, types ), fn -> new ByteArrayOutputStream() ) + .computeIfAbsent( logId, fn -> new ByteArrayOutputStream() ) .write( buffer, offset, length ); + + return logId.toString(); } @Deprecated( forRemoval = true ) diff --git a/oap-formats/oap-logstream/oap-logstream/src/main/java/oap/logstream/NullLoggerBackend.java b/oap-formats/oap-logstream/oap-logstream/src/main/java/oap/logstream/NullLoggerBackend.java index 8eb77d9056..fb7b04fbf5 100644 --- a/oap-formats/oap-logstream/oap-logstream/src/main/java/oap/logstream/NullLoggerBackend.java +++ b/oap-formats/oap-logstream/oap-logstream/src/main/java/oap/logstream/NullLoggerBackend.java @@ -9,8 +9,10 @@ public class NullLoggerBackend extends AbstractLoggerBackend { @Override - public void log( ProtocolVersion protocolVersion, String hostName, String filePreffix, Map properties, String logType, - String[] headers, byte[][] types, byte[] row, int offset, int length ) { + public String log( ProtocolVersion protocolVersion, String hostName, String filePreffix, Map properties, String logType, + String[] headers, byte[][] types, byte[] row, int offset, int length ) { + LogId logId = new LogId( filePreffix, logType, hostName, properties, headers, types ); + return logId.toString(); } @Override diff --git a/oap-formats/oap-logstream/oap-logstream/src/main/java/oap/logstream/disk/AbstractWriter.java b/oap-formats/oap-logstream/oap-logstream/src/main/java/oap/logstream/disk/AbstractWriter.java index 5daf9ed235..59194cc521 100644 --- a/oap-formats/oap-logstream/oap-logstream/src/main/java/oap/logstream/disk/AbstractWriter.java +++ b/oap-formats/oap-logstream/oap-logstream/src/main/java/oap/logstream/disk/AbstractWriter.java @@ -106,7 +106,7 @@ public synchronized void write( ProtocolVersion protocolVersion, byte[] buffer ) write( protocolVersion, buffer, 0, buffer.length ); } - public abstract void write( ProtocolVersion protocolVersion, byte[] buffer, int offset, int length ) throws LoggerException; + public abstract String write( ProtocolVersion protocolVersion, byte[] buffer, int offset, int length ) throws LoggerException; public synchronized void refresh() { refresh( false ); diff --git a/oap-formats/oap-logstream/oap-logstream/src/main/java/oap/logstream/disk/DiskLoggerBackend.java b/oap-formats/oap-logstream/oap-logstream/src/main/java/oap/logstream/disk/DiskLoggerBackend.java index 3769ef9a91..55c4ceec5f 100644 --- a/oap-formats/oap-logstream/oap-logstream/src/main/java/oap/logstream/disk/DiskLoggerBackend.java +++ b/oap-formats/oap-logstream/oap-logstream/src/main/java/oap/logstream/disk/DiskLoggerBackend.java @@ -75,17 +75,17 @@ public class DiskLoggerBackend extends AbstractLoggerBackend implements Cloneabl public static final long DEFAULT_FREE_SPACE_REQUIRED = 2000000000L; public final LinkedHashMap filePatternByType = new LinkedHashMap<>(); public final WriterConfiguration writerConfiguration; - private final Path logDirectory; - private final Timestamp timestamp; - private final int bufferSize; - private final LoadingCache> writers; - private final ScheduledExecutorService pool; + public final Path logDirectory; + public final Timestamp timestamp; + public final int bufferSize; + public final LoadingCache> writers; + public final ScheduledExecutorService pool; public String filePattern = "/${YEAR}-${MONTH}/${DAY}/${LOG_TYPE}_v${LOG_VERSION}_${CLIENT_HOST}-${YEAR}-${MONTH}-${DAY}-${HOUR}-${INTERVAL}.tsv.gz"; public long requiredFreeSpace = DEFAULT_FREE_SPACE_REQUIRED; public int maxVersions = 20; public long refreshInitDelay = Dates.s( 10 ); public long refreshPeriod = Dates.s( 10 ); - private volatile boolean closed; + public volatile boolean closed; public DiskLoggerBackend( Path logDirectory, Timestamp timestamp, int bufferSize ) { this( logDirectory, new WriterConfiguration(), timestamp, bufferSize ); @@ -160,7 +160,7 @@ private void filePatternValidation( String type, String filePattern ) { @Override @SneakyThrows - public void log( ProtocolVersion protocolVersion, String hostName, String filePreffix, Map properties, String logType, + public String log( ProtocolVersion protocolVersion, String hostName, String filePreffix, Map properties, String logType, String[] headers, byte[][] types, byte[] buffer, int offset, int length ) { if( closed ) { throw new LoggerException( "already closed!" ); @@ -172,7 +172,7 @@ public void log( ProtocolVersion protocolVersion, String hostName, String filePr log.trace( "logging {} bytes to {}", length, writer ); try { - writer.write( protocolVersion, buffer, offset, length ); + return writer.write( protocolVersion, buffer, offset, length ); } catch( Exception e ) { var headersWithTypes = new ArrayList(); for( int i = 0; i < headers.length; i++ ) { diff --git a/oap-formats/oap-logstream/oap-logstream/src/main/java/oap/logstream/disk/LogMetadata.java b/oap-formats/oap-logstream/oap-logstream/src/main/java/oap/logstream/disk/LogMetadata.java index ba7a3aad66..e7abf425c3 100644 --- a/oap-formats/oap-logstream/oap-logstream/src/main/java/oap/logstream/disk/LogMetadata.java +++ b/oap-formats/oap-logstream/oap-logstream/src/main/java/oap/logstream/disk/LogMetadata.java @@ -5,6 +5,7 @@ import com.fasterxml.jackson.annotation.JsonCreator; import com.fasterxml.jackson.annotation.JsonGetter; import com.fasterxml.jackson.annotation.JsonIgnore; +import com.google.common.base.Preconditions; import lombok.EqualsAndHashCode; import lombok.ToString; import oap.io.Files; @@ -14,6 +15,7 @@ import org.joda.time.DateTime; import java.nio.file.Path; +import java.nio.file.Paths; import java.util.ArrayList; import java.util.LinkedHashMap; import java.util.List; @@ -64,6 +66,17 @@ public static Path pathFor( Path file ) { return Path.of( file.toString() + EXTENSION ); } + public static Path pathFor( String file ) { + return Path.of( file + EXTENSION ); + } + + public static Path pathForDataFromMetadata( Path metadataPath ) { + Preconditions.checkArgument( isMetadata( metadataPath ) ); + + String metadataPathString = metadataPath.toString(); + return Paths.get( metadataPathString.substring( 0, metadataPathString.indexOf( EXTENSION ) ) ); + } + public static boolean isMetadata( Path filename ) { return filename.toString().endsWith( EXTENSION ); } diff --git a/oap-formats/oap-logstream/oap-logstream/src/main/java/oap/logstream/disk/ParquetLogWriter.java b/oap-formats/oap-logstream/oap-logstream/src/main/java/oap/logstream/disk/ParquetLogWriter.java index 1bf2e6d5c4..eb96127231 100644 --- a/oap-formats/oap-logstream/oap-logstream/src/main/java/oap/logstream/disk/ParquetLogWriter.java +++ b/oap-formats/oap-logstream/oap-logstream/src/main/java/oap/logstream/disk/ParquetLogWriter.java @@ -59,6 +59,7 @@ import java.util.HashMap; import java.util.LinkedHashSet; import java.util.List; +import java.util.concurrent.locks.ReentrantLock; import java.util.function.Function; import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.BINARY; @@ -89,6 +90,7 @@ public class ParquetLogWriter extends AbstractWriter org.apache.parquet.schema.Types.required( BINARY ).as( LogicalTypeAnnotation.stringType() ) ); } + private final ReentrantLock lock = new ReentrantLock(); private final MessageType messageType; private final WriterConfiguration.ParquetConfiguration configuration; private final LinkedHashSet excludeFields = new LinkedHashSet<>(); @@ -168,7 +170,7 @@ private static void addValue( int col, Object obj, byte[] colType, int typeIdx, } @Override - public synchronized void write( ProtocolVersion protocolVersion, byte[] buffer, int offset, int length ) throws LoggerException { + public String write( ProtocolVersion protocolVersion, byte[] buffer, int offset, int length ) throws LoggerException { if( protocolVersion.version < ProtocolVersion.BINARY_V2.version ) { throw new InvalidProtocolVersionException( "parquet", protocolVersion.version ); } @@ -176,6 +178,7 @@ public synchronized void write( ProtocolVersion protocolVersion, byte[] buffer, if( closed ) { throw new LoggerException( "writer is already closed!" ); } + lock.lock(); try { refresh(); Path filename = filename(); @@ -200,8 +203,7 @@ public synchronized void write( ProtocolVersion protocolVersion, byte[] buffer, log.info( "[{}] file exists v{}", filename, fileVersion ); fileVersion += 1; if( fileVersion > maxVersions ) throw new IllegalStateException( "version > " + maxVersions ); - write( protocolVersion, buffer, offset, length ); - return; + return write( protocolVersion, buffer, offset, length ); } log.trace( "writing {} bytes to {}", length, this ); if( protocolVersion == ProtocolVersion.BINARY_V2 ) { @@ -211,6 +213,8 @@ public synchronized void write( ProtocolVersion protocolVersion, byte[] buffer, } else { throw new IllegalArgumentException( "Unknown protocol version: " + protocolVersion ); } + + return filename.toString(); } catch( IOException e ) { log.error( e.getMessage(), e ); try { @@ -220,6 +224,8 @@ public synchronized void write( ProtocolVersion protocolVersion, byte[] buffer, out = null; } throw new LoggerException( e ); + } finally { + lock.unlock(); } } diff --git a/oap-formats/oap-logstream/oap-logstream/src/main/java/oap/logstream/disk/TsvWriter.java b/oap-formats/oap-logstream/oap-logstream/src/main/java/oap/logstream/disk/TsvWriter.java index 81912d3818..366018ec1a 100644 --- a/oap-formats/oap-logstream/oap-logstream/src/main/java/oap/logstream/disk/TsvWriter.java +++ b/oap-formats/oap-logstream/oap-logstream/src/main/java/oap/logstream/disk/TsvWriter.java @@ -61,20 +61,20 @@ public synchronized void write( ProtocolVersion protocolVersion, byte[] buffer ) } @Override - public synchronized void write( ProtocolVersion protocolVersion, byte[] buffer, int offset, int length ) throws LoggerException { + public synchronized String write( ProtocolVersion protocolVersion, byte[] buffer, int offset, int length ) throws LoggerException { if( closed ) { throw new LoggerException( "writer is already closed!" ); } - switch( protocolVersion ) { + return switch( protocolVersion ) { case TSV_V1 -> writeTsvV1( protocolVersion, buffer, offset, length ); case BINARY_V2 -> writeBinaryV2( protocolVersion, buffer, offset, length ); case ROW_BINARY_V3 -> writeBinaryV3( protocolVersion, buffer, offset, length ); default -> throw new InvalidProtocolVersionException( "tsv", protocolVersion.version ); - } + }; } - private void writeTsvV1( ProtocolVersion protocolVersion, byte[] buffer, int offset, int length ) { + private String writeTsvV1( ProtocolVersion protocolVersion, byte[] buffer, int offset, int length ) { try { refresh(); Path filename = filename(); @@ -93,13 +93,14 @@ private void writeTsvV1( ProtocolVersion protocolVersion, byte[] buffer, int off log.debug( "[{}] file exists v{}", filename, fileVersion ); fileVersion += 1; if( fileVersion > maxVersions ) throw new IllegalStateException( "version > " + maxVersions ); - write( protocolVersion, buffer, offset, length ); - return; + return write( protocolVersion, buffer, offset, length ); } log.trace( "writing {} bytes to {}", length, this ); out.write( buffer, offset, length ); + return filename.toString(); + } catch( IOException e ) { log.error( e.getMessage(), e ); try { @@ -113,7 +114,7 @@ private void writeTsvV1( ProtocolVersion protocolVersion, byte[] buffer, int off } - private void writeBinaryV2( ProtocolVersion protocolVersion, byte[] buffer, int offset, int length ) { + private String writeBinaryV2( ProtocolVersion protocolVersion, byte[] buffer, int offset, int length ) { try { refresh(); Path filename = filename(); @@ -132,13 +133,13 @@ private void writeBinaryV2( ProtocolVersion protocolVersion, byte[] buffer, int log.debug( "[{}] file exists v{}", filename, fileVersion ); fileVersion += 1; if( fileVersion > maxVersions ) throw new IllegalStateException( "version > " + maxVersions ); - write( protocolVersion, buffer, offset, length ); - return; + return write( protocolVersion, buffer, offset, length ); } log.trace( "writing {} bytes to {}", length, this ); convertToTsvV2( buffer, offset, length, line -> out.write( line ) ); + return filename.toString(); } catch( IOException e ) { log.error( e.getMessage(), e ); try { @@ -151,7 +152,7 @@ private void writeBinaryV2( ProtocolVersion protocolVersion, byte[] buffer, int } } - private void writeBinaryV3( ProtocolVersion protocolVersion, byte[] buffer, int offset, int length ) { + private String writeBinaryV3( ProtocolVersion protocolVersion, byte[] buffer, int offset, int length ) { try { refresh(); Path filename = filename(); @@ -170,13 +171,13 @@ private void writeBinaryV3( ProtocolVersion protocolVersion, byte[] buffer, int log.debug( "[{}] file exists v{}", filename, fileVersion ); fileVersion += 1; if( fileVersion > maxVersions ) throw new IllegalStateException( "version > " + maxVersions ); - write( protocolVersion, buffer, offset, length ); - return; + return write( protocolVersion, buffer, offset, length ); } log.trace( "writing {} bytes to {}", length, this ); convertToTsvV3( buffer, offset, length, line -> out.write( line ), logId.headers, logId.types ); + return filename.toString(); } catch( IOException e ) { log.error( e.getMessage(), e ); try { diff --git a/oap-stdlib/src/main/java/oap/json/Binder.java b/oap-stdlib/src/main/java/oap/json/Binder.java index 0082066ca3..b56bcac876 100644 --- a/oap-stdlib/src/main/java/oap/json/Binder.java +++ b/oap-stdlib/src/main/java/oap/json/Binder.java @@ -435,8 +435,10 @@ public T unmarshalFromAny( Class clazz, Object any ) throws JsonException public T unmarshal( Class clazz, Path path ) throws JsonException { try( var in = IoStreams.in( path ) ) { return unmarshal( clazz, in ); + } catch( JsonException e ) { + throw new JsonException( "Cannot deserialize to class: " + clazz.getCanonicalName() + " from path " + path, e.getCause() ); } catch( IOException e ) { - throw new JsonException( "Cannot deserialize to class: " + clazz.getCanonicalName(), e ); + throw new JsonException( "Cannot deserialize to class: " + clazz.getCanonicalName() + " from path " + path, e ); } } diff --git a/oap-storage/oap-storage/src/main/java/oap/storage/TransactionLogImpl.java b/oap-storage/oap-storage/src/main/java/oap/storage/TransactionLogImpl.java index 0505f91dd7..46ada41ca6 100644 --- a/oap-storage/oap-storage/src/main/java/oap/storage/TransactionLogImpl.java +++ b/oap-storage/oap-storage/src/main/java/oap/storage/TransactionLogImpl.java @@ -3,8 +3,8 @@ import oap.util.CircularFifoQueue; import oap.util.Lists; import org.joda.time.DateTimeUtils; -import org.jspecify.annotations.NonNull; +import javax.annotation.Nonnull; import java.util.ArrayList; import java.util.Iterator; import java.util.LinkedHashMap; @@ -90,7 +90,7 @@ public ReplicationResult> updatedSince( long timestamp, long has } } - private @NonNull ReplicationResult> fullSync( Set>> fullData, long t ) { + private @Nonnull ReplicationResult> fullSync( Set>> fullData, long t ) { return new ReplicationResult<>( this.timestamp.longValue(), this.hash, diff --git a/oap-storage/oap-storage/src/main/java/oap/storage/TransactionLogZero.java b/oap-storage/oap-storage/src/main/java/oap/storage/TransactionLogZero.java index a3d773a9bc..845c18dd49 100644 --- a/oap-storage/oap-storage/src/main/java/oap/storage/TransactionLogZero.java +++ b/oap-storage/oap-storage/src/main/java/oap/storage/TransactionLogZero.java @@ -1,8 +1,8 @@ package oap.storage; import oap.util.Lists; -import org.jspecify.annotations.NonNull; +import javax.annotation.Nonnull; import java.util.Map; import java.util.Set; @@ -27,7 +27,7 @@ public ReplicationResult> updatedSince( long timestamp, long has throw new IllegalStateException(); } - private @NonNull ReplicationResult> fullSync( Set>> fullData ) { + private @Nonnull ReplicationResult> fullSync( Set>> fullData ) { return new ReplicationResult<>( -1, 0L, ReplicationResult.ReplicationStatusType.FULL_SYNC, Lists.map( fullData, d -> new Transaction<>( 0L, Operation.UPDATE, d.getKey(), d.getValue() ) ) ); } } diff --git a/pom.xml b/pom.xml index 2b132d22cc..322fda5136 100644 --- a/pom.xml +++ b/pom.xml @@ -57,7 +57,7 @@ - 25.2.0 + 25.2.1 25.0.1 25.0.0