Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -23,23 +23,19 @@
*/
package oap.logstream.net.server;

import it.unimi.dsi.fastutil.io.FastByteArrayInputStream;
import it.unimi.dsi.fastutil.io.FastByteArrayOutputStream;
import lombok.extern.slf4j.Slf4j;
import oap.logstream.AbstractLoggerBackend;
import oap.logstream.LogStreamProtocol;
import oap.logstream.LogStreamProtocol.ProtocolVersion;
import oap.logstream.LoggerException;
import oap.message.server.MessageListener;
import org.apache.commons.io.IOUtils;

import java.io.ByteArrayInputStream;
import java.io.Closeable;
import java.io.DataInputStream;
import java.io.EOFException;
import java.io.IOException;
import java.util.LinkedHashMap;
import java.util.zip.GZIPInputStream;

import static oap.logstream.LogStreamProtocol.MESSAGE_TYPE;

Expand Down Expand Up @@ -119,12 +115,7 @@ private void readBinaryV3( ProtocolVersion version, String hostName, DataInputSt
byte[] compressedBuffer = new byte[length];
in.readFully( compressedBuffer, 0, length );

FastByteArrayOutputStream buffer = new FastByteArrayOutputStream( length );
try( GZIPInputStream gzip = new GZIPInputStream( new FastByteArrayInputStream( compressedBuffer ) ) ) {
IOUtils.copy( gzip, buffer );
}

backend.log( version, clientHostname, filePreffix, properties, logType, headers, types, buffer.array, 0, buffer.length );
backend.log( version, clientHostname, filePreffix, properties, logType, headers, types, compressedBuffer, 0, length );
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -106,11 +106,11 @@ public void net() throws IOException {
Path controlStatePath = testDirectoryFixture.testPath( "controlStatePath.st" );

List<Object> lineData1 = List.of( new DateTime( 2015, 10, 10, 1, 0, UTC ), "12345678", "12345678" );
byte[] line1 = Compression.gzip( RowBinaryUtils.line( lineData1 ) );
byte[] line1 = RowBinaryUtils.line( lineData1 );
String[] headers1 = new String[] { "TIMESTAMP", "REQUEST_ID", "REQUEST_ID2" };
byte[][] types1 = new byte[][] { new byte[] { Types.DATETIME.id }, new byte[] { Types.STRING.id }, new byte[] { Types.STRING.id } };
List<Object> lineData2 = List.of( new DateTime( 2015, 10, 10, 1, 0, UTC ), "12345678" );
byte[] line2 = Compression.gzip( RowBinaryUtils.line( lineData2 ) );
byte[] line2 = RowBinaryUtils.line( lineData2 );
String[] headers2 = new String[] { "TIMESTAMP", "REQUEST_ID2" };
byte[][] types2 = new byte[][] { new byte[] { Types.DATETIME.id }, new byte[] { Types.STRING.id } };

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,12 @@
import oap.util.Maps;
import org.joda.time.DateTime;

import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.nio.file.StandardCopyOption;
import java.nio.file.StandardOpenOption;
import java.util.ArrayList;
import java.util.LinkedHashMap;
import java.util.List;
Expand All @@ -27,7 +31,8 @@
@ToString
@EqualsAndHashCode( exclude = "clientHostname" )
public class LogMetadata {
public static final String EXTENSION = ".metadata.yaml";
public static final String EXTENSION_LOG_METADATA = ".metadata.yaml";
public static final String EXTENSION_LOG_TRANSACTION = ".metadata.transaction";

public final String type;
public final String clientHostname;
Expand Down Expand Up @@ -62,23 +67,49 @@ public static LogMetadata readFor( Path file ) {
return Binder.yaml.unmarshal( LogMetadata.class, pathFor( file ) );
}

public static Path pathFor( Path file, String extension ) {
return Path.of( file.toString() + extension );
}

public static Path pathFor( Path file ) {
return Path.of( file.toString() + EXTENSION );
return pathFor( file, EXTENSION_LOG_METADATA );
}

public static Path pathFor( String file, String extension ) {
return Path.of( file + extension );
}

public static Path pathFor( String file ) {
return Path.of( file + EXTENSION );
return pathFor( file, EXTENSION_LOG_METADATA );
}

public static Path pathForDataFromMetadata( Path metadataPath ) {
Preconditions.checkArgument( isMetadata( metadataPath ) );

String metadataPathString = metadataPath.toString();
return Paths.get( metadataPathString.substring( 0, metadataPathString.indexOf( EXTENSION ) ) );
return Paths.get( metadataPathString.substring( 0, metadataPathString.indexOf( EXTENSION_LOG_METADATA ) ) );
}

public static Path pathForTransactionFromMetadata( Path metadataPath ) {
return Paths.get( pathForDataFromMetadata( metadataPath ) + EXTENSION_LOG_TRANSACTION );
}

public static Path pathForDataFromTransaction( Path transactionPath ) {

String transactionPathString = transactionPath.toString();
return Paths.get( transactionPathString.substring( 0, transactionPathString.indexOf( EXTENSION_LOG_TRANSACTION ) ) );
}

public static Path pathForMetadataFromTransaction( Path transactionPath ) {
return Paths.get( pathForDataFromTransaction( transactionPath ) + EXTENSION_LOG_METADATA );
}

public static boolean isMetadata( Path filename ) {
return filename.toString().endsWith( EXTENSION );
return filename.toString().endsWith( EXTENSION_LOG_METADATA );
}

public static boolean isTransaction( Path filename ) {
return filename.toString().endsWith( EXTENSION_LOG_TRANSACTION );
}

public static void rename( Path filename, Path newFile ) {
Expand All @@ -93,6 +124,45 @@ public static void addProperty( Path path, String name, String value ) {
metadata.writeFor( path );
}

public static long beginTransaction( Path file ) throws IOException {
Path path = pathFor( file, EXTENSION_LOG_TRANSACTION );

long dataSize;
if( java.nio.file.Files.exists( path ) ) {
dataSize = Long.parseLong( java.nio.file.Files.readString( path, StandardCharsets.UTF_8 ) );
} else {
dataSize = 0;
}

return dataSize;
}

public static void commitTransaction( Path file, int length ) throws IOException {
Path path = pathFor( file, EXTENSION_LOG_TRANSACTION );
Path tmpPath = pathFor( file, EXTENSION_LOG_TRANSACTION + ".tmp" );

long dataSize;
if( java.nio.file.Files.exists( path ) ) {
dataSize = Long.parseLong( java.nio.file.Files.readString( path, StandardCharsets.UTF_8 ) );
} else {
dataSize = 0;
}


dataSize += length;

java.nio.file.Files.writeString( tmpPath, String.valueOf( dataSize ), StandardCharsets.UTF_8, StandardOpenOption.TRUNCATE_EXISTING, StandardOpenOption.CREATE );
java.nio.file.Files.move( tmpPath, path, StandardCopyOption.REPLACE_EXISTING );
}

public static void deleteTransaction( Path file ) throws IOException {
java.nio.file.Files.delete( pathFor( file, EXTENSION_LOG_TRANSACTION ) );
}

public static void deleteLogMetadata( Path file ) throws IOException {
java.nio.file.Files.delete( pathFor( file, EXTENSION_LOG_METADATA ) );
}

@JsonAnyGetter
public Map<String, String> getProperties() {
return properties;
Expand All @@ -104,7 +174,11 @@ public void setProperty( String name, String value ) {
}

public void writeFor( Path file ) {
Binder.yaml.marshal( pathFor( file ), this );
Path path = pathFor( file );
Path tmpFile = Path.of( path + ".tmp" );
Binder.yaml.marshal( tmpFile, this );

Files.rename( tmpFile, path );
}

public DateTime getDateTime( String name ) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,9 @@ public String write( LogStreamProtocol.ProtocolVersion protocolVersion, byte[] b
do {
out.write( byteBuffer );
} while( byteBuffer.hasRemaining() );
out.force( true );

LogMetadata.commitTransaction( filename, outputStream.length );

log.trace( "[{}] write headers {}", filename, logId.headers );
} else {
Expand All @@ -58,12 +61,17 @@ public String write( LogStreamProtocol.ProtocolVersion protocolVersion, byte[] b
}
log.trace( "writing {} bytes to {}", length, this );

long position = LogMetadata.beginTransaction( filename );

ByteBuffer byteBuffer = ByteBuffer.wrap( buffer, offset, length );
do {
out.position( position );
out.write( byteBuffer );
} while( byteBuffer.hasRemaining() );
out.force( true );

LogMetadata.commitTransaction( filename, length );

return filename.toString();

} catch( IOException e ) {
Expand Down
2 changes: 1 addition & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@
</distributionManagement>

<properties>
<oap.project.version>25.5.0</oap.project.version>
<oap.project.version>25.5.1</oap.project.version>

<oap.deps.config.version>25.0.1</oap.deps.config.version>
<oap.deps.oap-teamcity.version>25.0.0</oap.deps.oap-teamcity.version>
Expand Down
Loading