From a99ab179fcf89cc2030437c4db8a329a9d7b432a Mon Sep 17 00:00:00 2001 From: "igor.petrenko" Date: Wed, 18 Mar 2026 18:21:56 +0200 Subject: [PATCH 1/2] fix: MemoryLoggerBackend#asRowBinary --- .../oap/logstream/MemoryLoggerBackend.java | 26 ++++++++++++++----- pom.xml | 2 +- 2 files changed, 21 insertions(+), 7 deletions(-) diff --git a/oap-formats/oap-logstream/oap-logstream/src/main/java/oap/logstream/MemoryLoggerBackend.java b/oap-formats/oap-logstream/oap-logstream/src/main/java/oap/logstream/MemoryLoggerBackend.java index 1d03ee776..8ec6a9db5 100644 --- a/oap-formats/oap-logstream/oap-logstream/src/main/java/oap/logstream/MemoryLoggerBackend.java +++ b/oap-formats/oap-logstream/oap-logstream/src/main/java/oap/logstream/MemoryLoggerBackend.java @@ -24,6 +24,8 @@ package oap.logstream; +import com.google.common.base.Preconditions; +import it.unimi.dsi.fastutil.ints.IntArrayList; import lombok.SneakyThrows; import oap.io.Closeables; import oap.logstream.LogStreamProtocol.ProtocolVersion; @@ -48,7 +50,7 @@ public class MemoryLoggerBackend extends AbstractLoggerBackend { @Override public synchronized String log( ProtocolVersion version, String hostName, String filePreffix, Map properties, String logType, - String[] headers, byte[][] types, byte[] buffer, int offset, int length ) { + String[] headers, byte[][] types, byte[] buffer, int offset, int length ) { LogId logId = new LogId( filePreffix, logType, hostName, properties, headers, types ); outputs .computeIfAbsent( logId, fn -> new ByteArrayOutputStream() ) @@ -132,6 +134,21 @@ public List> asRowBinary( Predicate filter, String... header List> ret = new ArrayList<>(); for( LogId id : outputs.keySet() ) { + IntArrayList rows = new IntArrayList(); + if( headers.length == 0 ) { + for( int i = 0; i < id.headers.length; i++ ) { + rows.add( i ); + } + } else { + for( String header : headers ) { + int index = ArrayUtils.indexOf( id.headers, header ); + + Preconditions.checkArgument( index >= 0, "header " + header + " not found" ); + + rows.add( index ); + } + } + if( filter.test( id ) ) { ByteArrayInputStream byteArrayInputStream = new ByteArrayInputStream( outputs.getOrDefault( id, new ByteArrayOutputStream() ).toByteArray() ); RowBinaryInputStream rowBinaryInputStream = new RowBinaryInputStream( byteArrayInputStream, id.headers, id.types ); @@ -139,13 +156,10 @@ public List> asRowBinary( Predicate filter, String... header List objects; while( ( objects = rowBinaryInputStream.readRow() ) != null ) { ArrayList filtered = new ArrayList<>(); - for( int i = 0; i < id.headers.length; i++ ) { - if( headers.length == 0 || ArrayUtils.contains( headers, id.headers[i] ) ) { - filtered.add( objects.get( i ) ); - } + for( int i : rows ) { + filtered.add( objects.get( i ) ); } - ret.add( filtered ); } } diff --git a/pom.xml b/pom.xml index 8a804ac5b..380b7af61 100644 --- a/pom.xml +++ b/pom.xml @@ -57,7 +57,7 @@ - 25.4.7 + 25.4.8 25.0.1 25.0.0 From a8be034617d91cc5f269cf85e5bae634f30cfcd5 Mon Sep 17 00:00:00 2001 From: "igor.petrenko" Date: Wed, 18 Mar 2026 18:33:51 +0200 Subject: [PATCH 2/2] fix: MemoryLoggerBackend#asRowBinary --- .../oap/logstream/MemoryLoggerBackend.java | 24 +++++++++---------- 1 file changed, 12 insertions(+), 12 deletions(-) diff --git a/oap-formats/oap-logstream/oap-logstream/src/main/java/oap/logstream/MemoryLoggerBackend.java b/oap-formats/oap-logstream/oap-logstream/src/main/java/oap/logstream/MemoryLoggerBackend.java index 8ec6a9db5..2621414c7 100644 --- a/oap-formats/oap-logstream/oap-logstream/src/main/java/oap/logstream/MemoryLoggerBackend.java +++ b/oap-formats/oap-logstream/oap-logstream/src/main/java/oap/logstream/MemoryLoggerBackend.java @@ -134,22 +134,22 @@ public List> asRowBinary( Predicate filter, String... header List> ret = new ArrayList<>(); for( LogId id : outputs.keySet() ) { - IntArrayList rows = new IntArrayList(); - if( headers.length == 0 ) { - for( int i = 0; i < id.headers.length; i++ ) { - rows.add( i ); - } - } else { - for( String header : headers ) { - int index = ArrayUtils.indexOf( id.headers, header ); + if( filter.test( id ) ) { + IntArrayList rows = new IntArrayList(); + if( headers.length == 0 ) { + for( int i = 0; i < id.headers.length; i++ ) { + rows.add( i ); + } + } else { + for( String header : headers ) { + int index = ArrayUtils.indexOf( id.headers, header ); - Preconditions.checkArgument( index >= 0, "header " + header + " not found" ); + Preconditions.checkArgument( index >= 0, "header " + header + " not found" ); - rows.add( index ); + rows.add( index ); + } } - } - if( filter.test( id ) ) { ByteArrayInputStream byteArrayInputStream = new ByteArrayInputStream( outputs.getOrDefault( id, new ByteArrayOutputStream() ).toByteArray() ); RowBinaryInputStream rowBinaryInputStream = new RowBinaryInputStream( byteArrayInputStream, id.headers, id.types );