Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,101 @@
/*
* The MIT License (MIT)
*
* Copyright (c) Open Application Platform Authors
*
* Permission is hereby granted, free of charge, to any person obtaining a copy
* of this software and associated documentation files (the "Software"), to deal
* in the Software without restriction, including without limitation the rights
* to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
* copies of the Software, and to permit persons to whom the Software is
* furnished to do so, subject to the following conditions:
*
* The above copyright notice and this permission notice shall be included in all
* copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
* FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
* AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
* LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
* OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
* SOFTWARE.
*/

package oap.logstream.disk;

import oap.compression.Compression;
import oap.logstream.LogId;
import oap.logstream.formats.rowbinary.RowBinaryUtils;
import oap.template.Types;
import oap.testng.Fixtures;
import oap.testng.TestDirectoryFixture;
import oap.util.Dates;
import oap.util.Pair;
import org.joda.time.DateTime;
import org.testng.annotations.Test;

import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Path;
import java.util.List;
import java.util.Map;

import static oap.logstream.LogStreamProtocol.CURRENT_PROTOCOL_VERSION;
import static oap.logstream.Timestamp.BPH_12;
import static org.assertj.core.api.Assertions.assertThat;
import static org.joda.time.DateTimeZone.UTC;

public class RowBinaryWriterTest extends Fixtures {
private static final String FILE_PATTERN = "${p}-file-${INTERVAL}-${LOG_VERSION}.rb.gz";
private final TestDirectoryFixture testDirectoryFixture;

public RowBinaryWriterTest() {
testDirectoryFixture = fixture( new TestDirectoryFixture() );
}

@Test
public void testWrite() throws IOException {
Dates.setTimeFixed( 2022, 3, 8, 21, 11 );

byte[] content1 = Compression.gzip( RowBinaryUtils.lines( List.of(
List.of( "s11", 21L, List.of( "1" ), new DateTime( 2022, 3, 11, 15, 16, 12, UTC ) ),
List.of( "s12", 22L, List.of( "1", "2" ), new DateTime( 2022, 3, 11, 15, 16, 13, UTC ) )
) ) );

byte[] content2 = Compression.gzip( RowBinaryUtils.lines( List.of(
List.of( "s111", 121L, List.of( "rr" ), new DateTime( 2022, 3, 11, 15, 16, 14, UTC ) ),
List.of( "s112", 122L, List.of( "zz", "66" ), new DateTime( 2022, 3, 11, 15, 16, 15, UTC ) )
) ) );


String[] headers = new String[] { "COL1", "COL2", "COL3", "DATETIME" };
byte[][] types = new byte[][] { new byte[] { Types.STRING.id },
new byte[] { Types.LONG.id },
new byte[] { Types.LIST.id, Types.STRING.id },
new byte[] { Types.DATETIME.id }
};
LogId logId = new LogId( "", "log", "log",
Map.of( "p", "1" ), headers, types );
Path logs = testDirectoryFixture.testPath( "logs" );
try( RowBinaryWriter writer = new RowBinaryWriter( logs, FILE_PATTERN, logId, 1024, BPH_12, 20 ) ) {
writer.write( CURRENT_PROTOCOL_VERSION, content1 );
writer.write( CURRENT_PROTOCOL_VERSION, content2 );
}

Path path = logs.resolve( "1-file-02-4cd64dae-1.rb.gz" );

byte[] rb = Compression.ungzip( Files.readAllBytes( path ) );

Pair<List<List<Object>>, List<String>> read = RowBinaryUtils.read( rb, headers, types, true );
assertThat( read._2 ).isEqualTo( List.of( headers ) );
assertThat( read._1 )
.isEqualTo( List.of(
List.of( "s11", 21L, List.of( "1" ), new DateTime( 2022, 3, 11, 15, 16, 12, UTC ) ),
List.of( "s12", 22L, List.of( "1", "2" ), new DateTime( 2022, 3, 11, 15, 16, 13, UTC ) ),

List.of( "s111", 121L, List.of( "rr" ), new DateTime( 2022, 3, 11, 15, 16, 14, UTC ) ),
List.of( "s112", 122L, List.of( "zz", "66" ), new DateTime( 2022, 3, 11, 15, 16, 15, UTC ) )
) );
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@
import java.nio.file.Files;
import java.nio.file.Path;
import java.util.Objects;
import java.util.concurrent.locks.ReentrantLock;

@Slf4j
public abstract class AbstractWriter<T extends Closeable> implements Closeable {
Expand All @@ -54,6 +55,7 @@ public abstract class AbstractWriter<T extends Closeable> implements Closeable {
protected final int bufferSize;
protected final Stopwatch stopwatch = new Stopwatch();
protected final int maxVersions;
protected final ReentrantLock lock = new ReentrantLock();
protected T out;
protected Path outFilename;
protected String lastPattern;
Expand All @@ -79,7 +81,7 @@ protected AbstractWriter( LogFormat logFormat, Path logDirectory, String filePat

@SneakyThrows
static String currentPattern( LogFormat logFormat, String filePattern, LogId logId, Timestamp timestamp, int version, DateTime time ) {
var suffix = filePattern;
String suffix = filePattern;
if( filePattern.startsWith( "/" ) && filePattern.endsWith( "/" ) ) suffix = suffix.substring( 1 );
else if( !filePattern.startsWith( "/" ) && !logId.filePrefixPattern.endsWith( "/" ) ) suffix = "/" + suffix;

Expand All @@ -102,36 +104,41 @@ protected String currentPattern() {
return currentPattern( logFormat, filePattern, logId, timestamp, fileVersion, Dates.nowUtc() );
}

public synchronized void write( ProtocolVersion protocolVersion, byte[] buffer ) throws LoggerException {
public void write( ProtocolVersion protocolVersion, byte[] buffer ) throws LoggerException {
write( protocolVersion, buffer, 0, buffer.length );
}

public abstract String write( ProtocolVersion protocolVersion, byte[] buffer, int offset, int length ) throws LoggerException;

public synchronized void refresh() {
public void refresh() {
refresh( false );
}

public synchronized void refresh( boolean forceSync ) {
log.debug( "refresh {}...", lastPattern );
public void refresh( boolean forceSync ) {
lock.lock();
try {
log.debug( "refresh {}...", lastPattern );

String currentPattern = currentPattern();
String currentPattern = currentPattern();

if( forceSync || !Objects.equals( this.lastPattern, currentPattern ) ) {
log.debug( "lastPattern {} currentPattern {} version {}", lastPattern, currentPattern, fileVersion );
if( forceSync || !Objects.equals( this.lastPattern, currentPattern ) ) {
log.debug( "lastPattern {} currentPattern {} version {}", lastPattern, currentPattern, fileVersion );

String patternWithPreviousVersion = currentPattern( fileVersion - 1 );
if( !Objects.equals( patternWithPreviousVersion, this.lastPattern ) ) {
fileVersion = 1;
}
currentPattern = currentPattern();
String patternWithPreviousVersion = currentPattern( fileVersion - 1 );
if( !Objects.equals( patternWithPreviousVersion, this.lastPattern ) ) {
fileVersion = 1;
}
currentPattern = currentPattern();

log.debug( "force {} change pattern from '{}' to '{}'", forceSync, this.lastPattern, currentPattern );
closeOutput();
log.debug( "force {} change pattern from '{}' to '{}'", forceSync, this.lastPattern, currentPattern );
closeOutput();

lastPattern = currentPattern;
} else {
log.debug( "refresh {}... SKIP", lastPattern );
lastPattern = currentPattern;
} else {
log.debug( "refresh {}... SKIP", lastPattern );
}
} finally {
lock.unlock();
}
}

Expand All @@ -143,7 +150,7 @@ protected void closeOutput() throws LoggerException {
if( out != null ) try {
stopwatch.count( out::close );

var fileSize = Files.size( outFilename );
long fileSize = Files.size( outFilename );
log.trace( "closing output {} ({} bytes)", this, fileSize );
Metrics.summary( "logstream_logging_server_bucket_size" ).record( fileSize );
Metrics.summary( "logstream_logging_server_bucket_time_seconds" ).record( Dates.nanosToSeconds( stopwatch.elapsed() ) );
Expand All @@ -156,10 +163,15 @@ protected void closeOutput() throws LoggerException {
}

@Override
public synchronized void close() {
log.debug( "closing {}", this );
closed = true;
closeOutput();
public void close() {
lock.lock();
try {
log.debug( "closing {}", this );
closed = true;
closeOutput();
} finally {
lock.unlock();
}
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,8 @@ public AbstractWriter<? extends Closeable> load( LogId id ) {
writerConfiguration.parquet, bufferSize, timestamp, maxVersions );
case TSV_GZ, TSV_ZSTD -> new TsvWriter( logDirectory, fp.path, id,
writerConfiguration.tsv, bufferSize, timestamp, maxVersions );
case ROW_BINARY_GZ -> new RowBinaryWriter( logDirectory, fp.path, id,
bufferSize, timestamp, maxVersions );
};
}
} );
Expand Down Expand Up @@ -161,7 +163,7 @@ private void filePatternValidation( String type, String filePattern ) {
@Override
@SneakyThrows
public String log( ProtocolVersion protocolVersion, String hostName, String filePreffix, Map<String, String> properties, String logType,
String[] headers, byte[][] types, byte[] buffer, int offset, int length ) {
String[] headers, byte[][] types, byte[] buffer, int offset, int length ) {
if( closed ) {
throw new LoggerException( "already closed!" );
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
public enum LogFormat {
TSV_GZ( "tsv" + Encoding.GZIP.extension ),
TSV_ZSTD( "tsv" + Encoding.ZSTD.extension ),
ROW_BINARY_GZ( "rb" + Encoding.GZIP.extension ),
PARQUET( "parquet" );

public final String extension;
Expand All @@ -41,7 +42,7 @@ public enum LogFormat {

@Nonnull
public static LogFormat parse( String filePattern ) {
for( var logFormat : values() ) {
for( LogFormat logFormat : values() ) {
if( filePattern.endsWith( logFormat.extension ) ) {
return logFormat;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,6 @@
import java.util.HashMap;
import java.util.LinkedHashSet;
import java.util.List;
import java.util.concurrent.locks.ReentrantLock;
import java.util.function.Function;

import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.BINARY;
Expand Down Expand Up @@ -90,7 +89,6 @@ public class ParquetLogWriter extends AbstractWriter<org.apache.parquet.hadoop.P
// types.put( Types.ENUM.id, _ -> org.apache.parquet.schema.Types.required( BINARY ).as( LogicalTypeAnnotation.stringType() ) );
}

private final ReentrantLock lock = new ReentrantLock();
private final MessageType messageType;
private final WriterConfiguration.ParquetConfiguration configuration;
private final LinkedHashSet<String> excludeFields = new LinkedHashSet<>();
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
package oap.logstream.disk;

import lombok.extern.slf4j.Slf4j;
import oap.io.Files;
import oap.logstream.LogId;
import oap.logstream.LogIdTemplate;
import oap.logstream.LogStreamProtocol;
import oap.logstream.LoggerException;
import oap.logstream.Timestamp;
import oap.logstream.formats.rowbinary.RowBinaryOutputStream;
import oap.util.FastByteArrayOutputStream;

import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.channels.FileChannel;
import java.nio.file.Path;
import java.nio.file.StandardOpenOption;
import java.util.List;
import java.util.zip.GZIPOutputStream;

@Slf4j
public class RowBinaryWriter extends AbstractWriter<FileChannel> {
protected RowBinaryWriter( Path logDirectory, String filePattern, LogId logId, int bufferSize, Timestamp timestamp, int maxVersions ) {
super( LogFormat.ROW_BINARY_GZ, logDirectory, filePattern, logId, bufferSize, timestamp, maxVersions );
}

@Override
public String write( LogStreamProtocol.ProtocolVersion protocolVersion, byte[] buffer, int offset, int length ) throws LoggerException {
try {
refresh();
Path filename = filename();
if( out == null )
if( !java.nio.file.Files.exists( filename ) ) {
log.debug( "[{}] open new file v{}", filename, fileVersion );
outFilename = filename;
Files.ensureDirectory( filename.getParent() );
out = FileChannel.open( filename, StandardOpenOption.CREATE_NEW, StandardOpenOption.WRITE, StandardOpenOption.APPEND );
LogIdTemplate logIdTemplate = new LogIdTemplate( logId );
new LogMetadata( logId ).withProperty( "VERSION", logIdTemplate.getHashWithVersion( fileVersion ) ).writeFor( filename );

FastByteArrayOutputStream outputStream = new FastByteArrayOutputStream();
GZIPOutputStream gzip = new GZIPOutputStream( outputStream );
RowBinaryOutputStream rbOut = new RowBinaryOutputStream( gzip, List.of( logId.headers ) );
rbOut.close();

ByteBuffer byteBuffer = ByteBuffer.wrap( outputStream.array, 0, outputStream.length );
do {
out.write( byteBuffer );
} while( byteBuffer.hasRemaining() );

log.trace( "[{}] write headers {}", filename, logId.headers );
} else {
log.debug( "[{}] file exists v{}", filename, fileVersion );
fileVersion += 1;
if( fileVersion > maxVersions ) throw new IllegalStateException( "version > " + maxVersions );
return write( protocolVersion, buffer, offset, length );
}
log.trace( "writing {} bytes to {}", length, this );

ByteBuffer byteBuffer = ByteBuffer.wrap( buffer, offset, length );
do {
out.write( byteBuffer );
} while( byteBuffer.hasRemaining() );
out.force( true );

return filename.toString();

} catch( IOException e ) {
log.error( e.getMessage(), e );
try {
closeOutput();
} finally {
outFilename = null;
out = null;
}
throw new LoggerException( e );
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@
import com.google.common.io.CountingOutputStream;
import lombok.extern.slf4j.Slf4j;
import oap.io.IoStreams;
import oap.logstream.InvalidProtocolVersionException;
import oap.logstream.LogId;
import oap.logstream.LogIdTemplate;
import oap.logstream.LogStreamProtocol.ProtocolVersion;
Expand Down Expand Up @@ -56,22 +55,22 @@ public TsvWriter( Path logDirectory, String filePattern, LogId logId,
this.configuration = configuration;
}

public synchronized void write( ProtocolVersion protocolVersion, byte[] buffer ) throws LoggerException {
write( protocolVersion, buffer, 0, buffer.length );
}

@Override
public synchronized String write( ProtocolVersion protocolVersion, byte[] buffer, int offset, int length ) throws LoggerException {
if( closed ) {
throw new LoggerException( "writer is already closed!" );
}
public String write( ProtocolVersion protocolVersion, byte[] buffer, int offset, int length ) throws LoggerException {
lock.lock();
try {
if( closed ) {
throw new LoggerException( "writer is already closed!" );
}

return switch( protocolVersion ) {
case TSV_V1 -> writeTsvV1( protocolVersion, buffer, offset, length );
case BINARY_V2 -> writeBinaryV2( protocolVersion, buffer, offset, length );
case ROW_BINARY_V3 -> writeBinaryV3( protocolVersion, buffer, offset, length );
default -> throw new InvalidProtocolVersionException( "tsv", protocolVersion.version );
};
return switch( protocolVersion ) {
case TSV_V1 -> writeTsvV1( protocolVersion, buffer, offset, length );
case BINARY_V2 -> writeBinaryV2( protocolVersion, buffer, offset, length );
case ROW_BINARY_V3 -> writeBinaryV3( protocolVersion, buffer, offset, length );
};
} finally {
lock.unlock();
}
}

private String writeTsvV1( ProtocolVersion protocolVersion, byte[] buffer, int offset, int length ) {
Expand Down
Loading