diff --git a/oap-formats/oap-logstream/oap-logstream-test/src/test/java/oap/logstream/disk/RowBinaryWriterTest.java b/oap-formats/oap-logstream/oap-logstream-test/src/test/java/oap/logstream/disk/RowBinaryWriterTest.java new file mode 100644 index 000000000..bc240d1d0 --- /dev/null +++ b/oap-formats/oap-logstream/oap-logstream-test/src/test/java/oap/logstream/disk/RowBinaryWriterTest.java @@ -0,0 +1,101 @@ +/* + * The MIT License (MIT) + * + * Copyright (c) Open Application Platform Authors + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package oap.logstream.disk; + +import oap.compression.Compression; +import oap.logstream.LogId; +import oap.logstream.formats.rowbinary.RowBinaryUtils; +import oap.template.Types; +import oap.testng.Fixtures; +import oap.testng.TestDirectoryFixture; +import oap.util.Dates; +import oap.util.Pair; +import org.joda.time.DateTime; +import org.testng.annotations.Test; + +import java.io.IOException; +import java.nio.file.Files; +import java.nio.file.Path; +import java.util.List; +import java.util.Map; + +import static oap.logstream.LogStreamProtocol.CURRENT_PROTOCOL_VERSION; +import static oap.logstream.Timestamp.BPH_12; +import static org.assertj.core.api.Assertions.assertThat; +import static org.joda.time.DateTimeZone.UTC; + +public class RowBinaryWriterTest extends Fixtures { + private static final String FILE_PATTERN = "${p}-file-${INTERVAL}-${LOG_VERSION}.rb.gz"; + private final TestDirectoryFixture testDirectoryFixture; + + public RowBinaryWriterTest() { + testDirectoryFixture = fixture( new TestDirectoryFixture() ); + } + + @Test + public void testWrite() throws IOException { + Dates.setTimeFixed( 2022, 3, 8, 21, 11 ); + + byte[] content1 = Compression.gzip( RowBinaryUtils.lines( List.of( + List.of( "s11", 21L, List.of( "1" ), new DateTime( 2022, 3, 11, 15, 16, 12, UTC ) ), + List.of( "s12", 22L, List.of( "1", "2" ), new DateTime( 2022, 3, 11, 15, 16, 13, UTC ) ) + ) ) ); + + byte[] content2 = Compression.gzip( RowBinaryUtils.lines( List.of( + List.of( "s111", 121L, List.of( "rr" ), new DateTime( 2022, 3, 11, 15, 16, 14, UTC ) ), + List.of( "s112", 122L, List.of( "zz", "66" ), new DateTime( 2022, 3, 11, 15, 16, 15, UTC ) ) + ) ) ); + + + String[] headers = new String[] { "COL1", "COL2", "COL3", "DATETIME" }; + byte[][] types = new byte[][] { new byte[] { Types.STRING.id }, + new byte[] { Types.LONG.id }, + new byte[] { Types.LIST.id, Types.STRING.id }, + new byte[] { Types.DATETIME.id } + }; + LogId logId = new LogId( "", "log", "log", + Map.of( "p", "1" ), headers, types ); + Path logs = testDirectoryFixture.testPath( "logs" ); + try( RowBinaryWriter writer = new RowBinaryWriter( logs, FILE_PATTERN, logId, 1024, BPH_12, 20 ) ) { + writer.write( CURRENT_PROTOCOL_VERSION, content1 ); + writer.write( CURRENT_PROTOCOL_VERSION, content2 ); + } + + Path path = logs.resolve( "1-file-02-4cd64dae-1.rb.gz" ); + + byte[] rb = Compression.ungzip( Files.readAllBytes( path ) ); + + Pair>, List> read = RowBinaryUtils.read( rb, headers, types, true ); + assertThat( read._2 ).isEqualTo( List.of( headers ) ); + assertThat( read._1 ) + .isEqualTo( List.of( + List.of( "s11", 21L, List.of( "1" ), new DateTime( 2022, 3, 11, 15, 16, 12, UTC ) ), + List.of( "s12", 22L, List.of( "1", "2" ), new DateTime( 2022, 3, 11, 15, 16, 13, UTC ) ), + + List.of( "s111", 121L, List.of( "rr" ), new DateTime( 2022, 3, 11, 15, 16, 14, UTC ) ), + List.of( "s112", 122L, List.of( "zz", "66" ), new DateTime( 2022, 3, 11, 15, 16, 15, UTC ) ) + ) ); + } +} diff --git a/oap-formats/oap-logstream/oap-logstream/src/main/java/oap/logstream/disk/AbstractWriter.java b/oap-formats/oap-logstream/oap-logstream/src/main/java/oap/logstream/disk/AbstractWriter.java index 59194cc52..5d052f1e5 100644 --- a/oap-formats/oap-logstream/oap-logstream/src/main/java/oap/logstream/disk/AbstractWriter.java +++ b/oap-formats/oap-logstream/oap-logstream/src/main/java/oap/logstream/disk/AbstractWriter.java @@ -43,6 +43,7 @@ import java.nio.file.Files; import java.nio.file.Path; import java.util.Objects; +import java.util.concurrent.locks.ReentrantLock; @Slf4j public abstract class AbstractWriter implements Closeable { @@ -54,6 +55,7 @@ public abstract class AbstractWriter implements Closeable { protected final int bufferSize; protected final Stopwatch stopwatch = new Stopwatch(); protected final int maxVersions; + protected final ReentrantLock lock = new ReentrantLock(); protected T out; protected Path outFilename; protected String lastPattern; @@ -79,7 +81,7 @@ protected AbstractWriter( LogFormat logFormat, Path logDirectory, String filePat @SneakyThrows static String currentPattern( LogFormat logFormat, String filePattern, LogId logId, Timestamp timestamp, int version, DateTime time ) { - var suffix = filePattern; + String suffix = filePattern; if( filePattern.startsWith( "/" ) && filePattern.endsWith( "/" ) ) suffix = suffix.substring( 1 ); else if( !filePattern.startsWith( "/" ) && !logId.filePrefixPattern.endsWith( "/" ) ) suffix = "/" + suffix; @@ -102,36 +104,41 @@ protected String currentPattern() { return currentPattern( logFormat, filePattern, logId, timestamp, fileVersion, Dates.nowUtc() ); } - public synchronized void write( ProtocolVersion protocolVersion, byte[] buffer ) throws LoggerException { + public void write( ProtocolVersion protocolVersion, byte[] buffer ) throws LoggerException { write( protocolVersion, buffer, 0, buffer.length ); } public abstract String write( ProtocolVersion protocolVersion, byte[] buffer, int offset, int length ) throws LoggerException; - public synchronized void refresh() { + public void refresh() { refresh( false ); } - public synchronized void refresh( boolean forceSync ) { - log.debug( "refresh {}...", lastPattern ); + public void refresh( boolean forceSync ) { + lock.lock(); + try { + log.debug( "refresh {}...", lastPattern ); - String currentPattern = currentPattern(); + String currentPattern = currentPattern(); - if( forceSync || !Objects.equals( this.lastPattern, currentPattern ) ) { - log.debug( "lastPattern {} currentPattern {} version {}", lastPattern, currentPattern, fileVersion ); + if( forceSync || !Objects.equals( this.lastPattern, currentPattern ) ) { + log.debug( "lastPattern {} currentPattern {} version {}", lastPattern, currentPattern, fileVersion ); - String patternWithPreviousVersion = currentPattern( fileVersion - 1 ); - if( !Objects.equals( patternWithPreviousVersion, this.lastPattern ) ) { - fileVersion = 1; - } - currentPattern = currentPattern(); + String patternWithPreviousVersion = currentPattern( fileVersion - 1 ); + if( !Objects.equals( patternWithPreviousVersion, this.lastPattern ) ) { + fileVersion = 1; + } + currentPattern = currentPattern(); - log.debug( "force {} change pattern from '{}' to '{}'", forceSync, this.lastPattern, currentPattern ); - closeOutput(); + log.debug( "force {} change pattern from '{}' to '{}'", forceSync, this.lastPattern, currentPattern ); + closeOutput(); - lastPattern = currentPattern; - } else { - log.debug( "refresh {}... SKIP", lastPattern ); + lastPattern = currentPattern; + } else { + log.debug( "refresh {}... SKIP", lastPattern ); + } + } finally { + lock.unlock(); } } @@ -143,7 +150,7 @@ protected void closeOutput() throws LoggerException { if( out != null ) try { stopwatch.count( out::close ); - var fileSize = Files.size( outFilename ); + long fileSize = Files.size( outFilename ); log.trace( "closing output {} ({} bytes)", this, fileSize ); Metrics.summary( "logstream_logging_server_bucket_size" ).record( fileSize ); Metrics.summary( "logstream_logging_server_bucket_time_seconds" ).record( Dates.nanosToSeconds( stopwatch.elapsed() ) ); @@ -156,10 +163,15 @@ protected void closeOutput() throws LoggerException { } @Override - public synchronized void close() { - log.debug( "closing {}", this ); - closed = true; - closeOutput(); + public void close() { + lock.lock(); + try { + log.debug( "closing {}", this ); + closed = true; + closeOutput(); + } finally { + lock.unlock(); + } } @Override diff --git a/oap-formats/oap-logstream/oap-logstream/src/main/java/oap/logstream/disk/DiskLoggerBackend.java b/oap-formats/oap-logstream/oap-logstream/src/main/java/oap/logstream/disk/DiskLoggerBackend.java index 55c4ceec5..4d4341e7e 100644 --- a/oap-formats/oap-logstream/oap-logstream/src/main/java/oap/logstream/disk/DiskLoggerBackend.java +++ b/oap-formats/oap-logstream/oap-logstream/src/main/java/oap/logstream/disk/DiskLoggerBackend.java @@ -122,6 +122,8 @@ public AbstractWriter load( LogId id ) { writerConfiguration.parquet, bufferSize, timestamp, maxVersions ); case TSV_GZ, TSV_ZSTD -> new TsvWriter( logDirectory, fp.path, id, writerConfiguration.tsv, bufferSize, timestamp, maxVersions ); + case ROW_BINARY_GZ -> new RowBinaryWriter( logDirectory, fp.path, id, + bufferSize, timestamp, maxVersions ); }; } } ); @@ -161,7 +163,7 @@ private void filePatternValidation( String type, String filePattern ) { @Override @SneakyThrows public String log( ProtocolVersion protocolVersion, String hostName, String filePreffix, Map properties, String logType, - String[] headers, byte[][] types, byte[] buffer, int offset, int length ) { + String[] headers, byte[][] types, byte[] buffer, int offset, int length ) { if( closed ) { throw new LoggerException( "already closed!" ); } diff --git a/oap-formats/oap-logstream/oap-logstream/src/main/java/oap/logstream/disk/LogFormat.java b/oap-formats/oap-logstream/oap-logstream/src/main/java/oap/logstream/disk/LogFormat.java index caaf47649..84667039f 100644 --- a/oap-formats/oap-logstream/oap-logstream/src/main/java/oap/logstream/disk/LogFormat.java +++ b/oap-formats/oap-logstream/oap-logstream/src/main/java/oap/logstream/disk/LogFormat.java @@ -31,6 +31,7 @@ public enum LogFormat { TSV_GZ( "tsv" + Encoding.GZIP.extension ), TSV_ZSTD( "tsv" + Encoding.ZSTD.extension ), + ROW_BINARY_GZ( "rb" + Encoding.GZIP.extension ), PARQUET( "parquet" ); public final String extension; @@ -41,7 +42,7 @@ public enum LogFormat { @Nonnull public static LogFormat parse( String filePattern ) { - for( var logFormat : values() ) { + for( LogFormat logFormat : values() ) { if( filePattern.endsWith( logFormat.extension ) ) { return logFormat; } diff --git a/oap-formats/oap-logstream/oap-logstream/src/main/java/oap/logstream/disk/ParquetLogWriter.java b/oap-formats/oap-logstream/oap-logstream/src/main/java/oap/logstream/disk/ParquetLogWriter.java index eb9612723..90ab24e3c 100644 --- a/oap-formats/oap-logstream/oap-logstream/src/main/java/oap/logstream/disk/ParquetLogWriter.java +++ b/oap-formats/oap-logstream/oap-logstream/src/main/java/oap/logstream/disk/ParquetLogWriter.java @@ -59,7 +59,6 @@ import java.util.HashMap; import java.util.LinkedHashSet; import java.util.List; -import java.util.concurrent.locks.ReentrantLock; import java.util.function.Function; import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.BINARY; @@ -90,7 +89,6 @@ public class ParquetLogWriter extends AbstractWriter org.apache.parquet.schema.Types.required( BINARY ).as( LogicalTypeAnnotation.stringType() ) ); } - private final ReentrantLock lock = new ReentrantLock(); private final MessageType messageType; private final WriterConfiguration.ParquetConfiguration configuration; private final LinkedHashSet excludeFields = new LinkedHashSet<>(); diff --git a/oap-formats/oap-logstream/oap-logstream/src/main/java/oap/logstream/disk/RowBinaryWriter.java b/oap-formats/oap-logstream/oap-logstream/src/main/java/oap/logstream/disk/RowBinaryWriter.java new file mode 100644 index 000000000..d1924f177 --- /dev/null +++ b/oap-formats/oap-logstream/oap-logstream/src/main/java/oap/logstream/disk/RowBinaryWriter.java @@ -0,0 +1,79 @@ +package oap.logstream.disk; + +import lombok.extern.slf4j.Slf4j; +import oap.io.Files; +import oap.logstream.LogId; +import oap.logstream.LogIdTemplate; +import oap.logstream.LogStreamProtocol; +import oap.logstream.LoggerException; +import oap.logstream.Timestamp; +import oap.logstream.formats.rowbinary.RowBinaryOutputStream; +import oap.util.FastByteArrayOutputStream; + +import java.io.IOException; +import java.nio.ByteBuffer; +import java.nio.channels.FileChannel; +import java.nio.file.Path; +import java.nio.file.StandardOpenOption; +import java.util.List; +import java.util.zip.GZIPOutputStream; + +@Slf4j +public class RowBinaryWriter extends AbstractWriter { + protected RowBinaryWriter( Path logDirectory, String filePattern, LogId logId, int bufferSize, Timestamp timestamp, int maxVersions ) { + super( LogFormat.ROW_BINARY_GZ, logDirectory, filePattern, logId, bufferSize, timestamp, maxVersions ); + } + + @Override + public String write( LogStreamProtocol.ProtocolVersion protocolVersion, byte[] buffer, int offset, int length ) throws LoggerException { + try { + refresh(); + Path filename = filename(); + if( out == null ) + if( !java.nio.file.Files.exists( filename ) ) { + log.debug( "[{}] open new file v{}", filename, fileVersion ); + outFilename = filename; + Files.ensureDirectory( filename.getParent() ); + out = FileChannel.open( filename, StandardOpenOption.CREATE_NEW, StandardOpenOption.WRITE, StandardOpenOption.APPEND ); + LogIdTemplate logIdTemplate = new LogIdTemplate( logId ); + new LogMetadata( logId ).withProperty( "VERSION", logIdTemplate.getHashWithVersion( fileVersion ) ).writeFor( filename ); + + FastByteArrayOutputStream outputStream = new FastByteArrayOutputStream(); + GZIPOutputStream gzip = new GZIPOutputStream( outputStream ); + RowBinaryOutputStream rbOut = new RowBinaryOutputStream( gzip, List.of( logId.headers ) ); + rbOut.close(); + + ByteBuffer byteBuffer = ByteBuffer.wrap( outputStream.array, 0, outputStream.length ); + do { + out.write( byteBuffer ); + } while( byteBuffer.hasRemaining() ); + + log.trace( "[{}] write headers {}", filename, logId.headers ); + } else { + log.debug( "[{}] file exists v{}", filename, fileVersion ); + fileVersion += 1; + if( fileVersion > maxVersions ) throw new IllegalStateException( "version > " + maxVersions ); + return write( protocolVersion, buffer, offset, length ); + } + log.trace( "writing {} bytes to {}", length, this ); + + ByteBuffer byteBuffer = ByteBuffer.wrap( buffer, offset, length ); + do { + out.write( byteBuffer ); + } while( byteBuffer.hasRemaining() ); + out.force( true ); + + return filename.toString(); + + } catch( IOException e ) { + log.error( e.getMessage(), e ); + try { + closeOutput(); + } finally { + outFilename = null; + out = null; + } + throw new LoggerException( e ); + } + } +} diff --git a/oap-formats/oap-logstream/oap-logstream/src/main/java/oap/logstream/disk/TsvWriter.java b/oap-formats/oap-logstream/oap-logstream/src/main/java/oap/logstream/disk/TsvWriter.java index 366018ec1..d6eb2fd00 100644 --- a/oap-formats/oap-logstream/oap-logstream/src/main/java/oap/logstream/disk/TsvWriter.java +++ b/oap-formats/oap-logstream/oap-logstream/src/main/java/oap/logstream/disk/TsvWriter.java @@ -27,7 +27,6 @@ import com.google.common.io.CountingOutputStream; import lombok.extern.slf4j.Slf4j; import oap.io.IoStreams; -import oap.logstream.InvalidProtocolVersionException; import oap.logstream.LogId; import oap.logstream.LogIdTemplate; import oap.logstream.LogStreamProtocol.ProtocolVersion; @@ -56,22 +55,22 @@ public TsvWriter( Path logDirectory, String filePattern, LogId logId, this.configuration = configuration; } - public synchronized void write( ProtocolVersion protocolVersion, byte[] buffer ) throws LoggerException { - write( protocolVersion, buffer, 0, buffer.length ); - } - @Override - public synchronized String write( ProtocolVersion protocolVersion, byte[] buffer, int offset, int length ) throws LoggerException { - if( closed ) { - throw new LoggerException( "writer is already closed!" ); - } + public String write( ProtocolVersion protocolVersion, byte[] buffer, int offset, int length ) throws LoggerException { + lock.lock(); + try { + if( closed ) { + throw new LoggerException( "writer is already closed!" ); + } - return switch( protocolVersion ) { - case TSV_V1 -> writeTsvV1( protocolVersion, buffer, offset, length ); - case BINARY_V2 -> writeBinaryV2( protocolVersion, buffer, offset, length ); - case ROW_BINARY_V3 -> writeBinaryV3( protocolVersion, buffer, offset, length ); - default -> throw new InvalidProtocolVersionException( "tsv", protocolVersion.version ); - }; + return switch( protocolVersion ) { + case TSV_V1 -> writeTsvV1( protocolVersion, buffer, offset, length ); + case BINARY_V2 -> writeBinaryV2( protocolVersion, buffer, offset, length ); + case ROW_BINARY_V3 -> writeBinaryV3( protocolVersion, buffer, offset, length ); + }; + } finally { + lock.unlock(); + } } private String writeTsvV1( ProtocolVersion protocolVersion, byte[] buffer, int offset, int length ) { diff --git a/oap-formats/oap-logstream/oap-logstream/src/main/java/oap/logstream/formats/rowbinary/RowBinaryOutputStream.java b/oap-formats/oap-logstream/oap-logstream/src/main/java/oap/logstream/formats/rowbinary/RowBinaryOutputStream.java index 233d7afd2..c6ff018ab 100644 --- a/oap-formats/oap-logstream/oap-logstream/src/main/java/oap/logstream/formats/rowbinary/RowBinaryOutputStream.java +++ b/oap-formats/oap-logstream/oap-logstream/src/main/java/oap/logstream/formats/rowbinary/RowBinaryOutputStream.java @@ -1,6 +1,7 @@ package oap.logstream.formats.rowbinary; import oap.dictionary.Dictionary; +import oap.util.Strings; import org.joda.time.DateTime; import java.io.IOException; @@ -133,7 +134,7 @@ public void writeString( String s ) throws IOException { if( s.isEmpty() ) { out.write( 0 ); } else { - byte[] bytes = s.getBytes( UTF_8 ); + byte[] bytes = ( Strings.UNKNOWN.equals( s ) ? "" : s ).getBytes( UTF_8 ); writeVarInt( bytes.length ); out.write( bytes ); } diff --git a/oap-formats/oap-logstream/oap-logstream/src/main/java/oap/logstream/formats/rowbinary/RowBinaryUtils.java b/oap-formats/oap-logstream/oap-logstream/src/main/java/oap/logstream/formats/rowbinary/RowBinaryUtils.java index 5c41ebd93..8ad1a5292 100644 --- a/oap-formats/oap-logstream/oap-logstream/src/main/java/oap/logstream/formats/rowbinary/RowBinaryUtils.java +++ b/oap-formats/oap-logstream/oap-logstream/src/main/java/oap/logstream/formats/rowbinary/RowBinaryUtils.java @@ -1,18 +1,30 @@ package oap.logstream.formats.rowbinary; +import oap.util.Pair; + import java.io.ByteArrayInputStream; import java.io.ByteArrayOutputStream; import java.io.IOException; import java.util.ArrayList; import java.util.List; +import static oap.util.Pair.__; + public class RowBinaryUtils { public static List> read( byte[] bytes, String[] headers, byte[][] types ) throws IOException { return read( bytes, 0, bytes.length, headers, types ); } + public static Pair>, List> read( byte[] bytes, String[] headers, byte[][] types, boolean readHeaders ) throws IOException { + return read( bytes, 0, bytes.length, headers, types, readHeaders ); + } + public static List> read( byte[] bytes, int offset, int length, String[] headers, byte[][] types ) throws IOException { - RowBinaryInputStream binaryInputStream = new RowBinaryInputStream( new ByteArrayInputStream( bytes, offset, length ), headers, types ); + return read( bytes, offset, length, headers, types, false )._1; + } + + public static Pair>, List> read( byte[] bytes, int offset, int length, String[] headers, byte[][] types, boolean readHeaders ) throws IOException { + RowBinaryInputStream binaryInputStream = new RowBinaryInputStream( new ByteArrayInputStream( bytes, offset, length ), readHeaders, headers, types ); ArrayList> res = new ArrayList<>(); @@ -23,7 +35,7 @@ public static List> read( byte[] bytes, int offset, int length, Str row = binaryInputStream.readRow(); } - return res; + return __( res, List.of( binaryInputStream.headers ) ); } public static byte[] lines( List> rows ) throws IOException { diff --git a/pom.xml b/pom.xml index 516605132..1b2d463bd 100644 --- a/pom.xml +++ b/pom.xml @@ -57,7 +57,7 @@ - 25.4.2 + 25.4.3 25.0.1 25.0.0