diff --git a/oap-formats/oap-logstream/oap-logstream-net-server/src/main/java/oap/logstream/net/server/SocketLoggerServer.java b/oap-formats/oap-logstream/oap-logstream-net-server/src/main/java/oap/logstream/net/server/SocketLoggerServer.java index bab91c105..7502ba281 100644 --- a/oap-formats/oap-logstream/oap-logstream-net-server/src/main/java/oap/logstream/net/server/SocketLoggerServer.java +++ b/oap-formats/oap-logstream/oap-logstream-net-server/src/main/java/oap/logstream/net/server/SocketLoggerServer.java @@ -23,15 +23,12 @@ */ package oap.logstream.net.server; -import it.unimi.dsi.fastutil.io.FastByteArrayInputStream; -import it.unimi.dsi.fastutil.io.FastByteArrayOutputStream; import lombok.extern.slf4j.Slf4j; import oap.logstream.AbstractLoggerBackend; import oap.logstream.LogStreamProtocol; import oap.logstream.LogStreamProtocol.ProtocolVersion; import oap.logstream.LoggerException; import oap.message.server.MessageListener; -import org.apache.commons.io.IOUtils; import java.io.ByteArrayInputStream; import java.io.Closeable; @@ -39,7 +36,6 @@ import java.io.EOFException; import java.io.IOException; import java.util.LinkedHashMap; -import java.util.zip.GZIPInputStream; import static oap.logstream.LogStreamProtocol.MESSAGE_TYPE; @@ -119,12 +115,7 @@ private void readBinaryV3( ProtocolVersion version, String hostName, DataInputSt byte[] compressedBuffer = new byte[length]; in.readFully( compressedBuffer, 0, length ); - FastByteArrayOutputStream buffer = new FastByteArrayOutputStream( length ); - try( GZIPInputStream gzip = new GZIPInputStream( new FastByteArrayInputStream( compressedBuffer ) ) ) { - IOUtils.copy( gzip, buffer ); - } - - backend.log( version, clientHostname, filePreffix, properties, logType, headers, types, buffer.array, 0, buffer.length ); + backend.log( version, clientHostname, filePreffix, properties, logType, headers, types, compressedBuffer, 0, length ); } @Override diff --git a/oap-formats/oap-logstream/oap-logstream-test/src/test/java/oap/logstream/LoggerTest.java b/oap-formats/oap-logstream/oap-logstream-test/src/test/java/oap/logstream/LoggerTest.java index 85b69e599..f384866c2 100644 --- a/oap-formats/oap-logstream/oap-logstream-test/src/test/java/oap/logstream/LoggerTest.java +++ b/oap-formats/oap-logstream/oap-logstream-test/src/test/java/oap/logstream/LoggerTest.java @@ -106,11 +106,11 @@ public void net() throws IOException { Path controlStatePath = testDirectoryFixture.testPath( "controlStatePath.st" ); List lineData1 = List.of( new DateTime( 2015, 10, 10, 1, 0, UTC ), "12345678", "12345678" ); - byte[] line1 = Compression.gzip( RowBinaryUtils.line( lineData1 ) ); + byte[] line1 = RowBinaryUtils.line( lineData1 ); String[] headers1 = new String[] { "TIMESTAMP", "REQUEST_ID", "REQUEST_ID2" }; byte[][] types1 = new byte[][] { new byte[] { Types.DATETIME.id }, new byte[] { Types.STRING.id }, new byte[] { Types.STRING.id } }; List lineData2 = List.of( new DateTime( 2015, 10, 10, 1, 0, UTC ), "12345678" ); - byte[] line2 = Compression.gzip( RowBinaryUtils.line( lineData2 ) ); + byte[] line2 = RowBinaryUtils.line( lineData2 ); String[] headers2 = new String[] { "TIMESTAMP", "REQUEST_ID2" }; byte[][] types2 = new byte[][] { new byte[] { Types.DATETIME.id }, new byte[] { Types.STRING.id } }; diff --git a/oap-formats/oap-logstream/oap-logstream/src/main/java/oap/logstream/disk/LogMetadata.java b/oap-formats/oap-logstream/oap-logstream/src/main/java/oap/logstream/disk/LogMetadata.java index e7abf425c..b651ee0d9 100644 --- a/oap-formats/oap-logstream/oap-logstream/src/main/java/oap/logstream/disk/LogMetadata.java +++ b/oap-formats/oap-logstream/oap-logstream/src/main/java/oap/logstream/disk/LogMetadata.java @@ -14,8 +14,12 @@ import oap.util.Maps; import org.joda.time.DateTime; +import java.io.IOException; +import java.nio.charset.StandardCharsets; import java.nio.file.Path; import java.nio.file.Paths; +import java.nio.file.StandardCopyOption; +import java.nio.file.StandardOpenOption; import java.util.ArrayList; import java.util.LinkedHashMap; import java.util.List; @@ -27,7 +31,8 @@ @ToString @EqualsAndHashCode( exclude = "clientHostname" ) public class LogMetadata { - public static final String EXTENSION = ".metadata.yaml"; + public static final String EXTENSION_LOG_METADATA = ".metadata.yaml"; + public static final String EXTENSION_LOG_TRANSACTION = ".metadata.transaction"; public final String type; public final String clientHostname; @@ -62,23 +67,49 @@ public static LogMetadata readFor( Path file ) { return Binder.yaml.unmarshal( LogMetadata.class, pathFor( file ) ); } + public static Path pathFor( Path file, String extension ) { + return Path.of( file.toString() + extension ); + } + public static Path pathFor( Path file ) { - return Path.of( file.toString() + EXTENSION ); + return pathFor( file, EXTENSION_LOG_METADATA ); + } + + public static Path pathFor( String file, String extension ) { + return Path.of( file + extension ); } public static Path pathFor( String file ) { - return Path.of( file + EXTENSION ); + return pathFor( file, EXTENSION_LOG_METADATA ); } public static Path pathForDataFromMetadata( Path metadataPath ) { Preconditions.checkArgument( isMetadata( metadataPath ) ); String metadataPathString = metadataPath.toString(); - return Paths.get( metadataPathString.substring( 0, metadataPathString.indexOf( EXTENSION ) ) ); + return Paths.get( metadataPathString.substring( 0, metadataPathString.indexOf( EXTENSION_LOG_METADATA ) ) ); + } + + public static Path pathForTransactionFromMetadata( Path metadataPath ) { + return Paths.get( pathForDataFromMetadata( metadataPath ) + EXTENSION_LOG_TRANSACTION ); + } + + public static Path pathForDataFromTransaction( Path transactionPath ) { + + String transactionPathString = transactionPath.toString(); + return Paths.get( transactionPathString.substring( 0, transactionPathString.indexOf( EXTENSION_LOG_TRANSACTION ) ) ); + } + + public static Path pathForMetadataFromTransaction( Path transactionPath ) { + return Paths.get( pathForDataFromTransaction( transactionPath ) + EXTENSION_LOG_METADATA ); } public static boolean isMetadata( Path filename ) { - return filename.toString().endsWith( EXTENSION ); + return filename.toString().endsWith( EXTENSION_LOG_METADATA ); + } + + public static boolean isTransaction( Path filename ) { + return filename.toString().endsWith( EXTENSION_LOG_TRANSACTION ); } public static void rename( Path filename, Path newFile ) { @@ -93,6 +124,45 @@ public static void addProperty( Path path, String name, String value ) { metadata.writeFor( path ); } + public static long beginTransaction( Path file ) throws IOException { + Path path = pathFor( file, EXTENSION_LOG_TRANSACTION ); + + long dataSize; + if( java.nio.file.Files.exists( path ) ) { + dataSize = Long.parseLong( java.nio.file.Files.readString( path, StandardCharsets.UTF_8 ) ); + } else { + dataSize = 0; + } + + return dataSize; + } + + public static void commitTransaction( Path file, int length ) throws IOException { + Path path = pathFor( file, EXTENSION_LOG_TRANSACTION ); + Path tmpPath = pathFor( file, EXTENSION_LOG_TRANSACTION + ".tmp" ); + + long dataSize; + if( java.nio.file.Files.exists( path ) ) { + dataSize = Long.parseLong( java.nio.file.Files.readString( path, StandardCharsets.UTF_8 ) ); + } else { + dataSize = 0; + } + + + dataSize += length; + + java.nio.file.Files.writeString( tmpPath, String.valueOf( dataSize ), StandardCharsets.UTF_8, StandardOpenOption.TRUNCATE_EXISTING, StandardOpenOption.CREATE ); + java.nio.file.Files.move( tmpPath, path, StandardCopyOption.REPLACE_EXISTING ); + } + + public static void deleteTransaction( Path file ) throws IOException { + java.nio.file.Files.delete( pathFor( file, EXTENSION_LOG_TRANSACTION ) ); + } + + public static void deleteLogMetadata( Path file ) throws IOException { + java.nio.file.Files.delete( pathFor( file, EXTENSION_LOG_METADATA ) ); + } + @JsonAnyGetter public Map getProperties() { return properties; @@ -104,7 +174,11 @@ public void setProperty( String name, String value ) { } public void writeFor( Path file ) { - Binder.yaml.marshal( pathFor( file ), this ); + Path path = pathFor( file ); + Path tmpFile = Path.of( path + ".tmp" ); + Binder.yaml.marshal( tmpFile, this ); + + Files.rename( tmpFile, path ); } public DateTime getDateTime( String name ) { diff --git a/oap-formats/oap-logstream/oap-logstream/src/main/java/oap/logstream/disk/RowBinaryWriter.java b/oap-formats/oap-logstream/oap-logstream/src/main/java/oap/logstream/disk/RowBinaryWriter.java index 52157d527..84b70bc11 100644 --- a/oap-formats/oap-logstream/oap-logstream/src/main/java/oap/logstream/disk/RowBinaryWriter.java +++ b/oap-formats/oap-logstream/oap-logstream/src/main/java/oap/logstream/disk/RowBinaryWriter.java @@ -48,6 +48,9 @@ public String write( LogStreamProtocol.ProtocolVersion protocolVersion, byte[] b do { out.write( byteBuffer ); } while( byteBuffer.hasRemaining() ); + out.force( true ); + + LogMetadata.commitTransaction( filename, outputStream.length ); log.trace( "[{}] write headers {}", filename, logId.headers ); } else { @@ -58,12 +61,17 @@ public String write( LogStreamProtocol.ProtocolVersion protocolVersion, byte[] b } log.trace( "writing {} bytes to {}", length, this ); + long position = LogMetadata.beginTransaction( filename ); + ByteBuffer byteBuffer = ByteBuffer.wrap( buffer, offset, length ); do { + out.position( position ); out.write( byteBuffer ); } while( byteBuffer.hasRemaining() ); out.force( true ); + LogMetadata.commitTransaction( filename, length ); + return filename.toString(); } catch( IOException e ) { diff --git a/pom.xml b/pom.xml index 0282b2823..d6484d7ac 100644 --- a/pom.xml +++ b/pom.xml @@ -57,7 +57,7 @@ - 25.5.0 + 25.5.1 25.0.1 25.0.0