From 878ee19bbab6f57b834fa8fa06022806b589e387 Mon Sep 17 00:00:00 2001 From: Dmitry Basin Date: Mon, 12 Jun 2017 16:32:08 +0300 Subject: [PATCH 1/2] Preliminary oak integration --- .../incremental/OakIncrementalIndex.java | 243 +++++++++++++++++ .../io/druid/segment/incremental/oak/Oak.java | 245 ++++++++++++++++++ 2 files changed, 488 insertions(+) create mode 100644 processing/src/main/java/io/druid/segment/incremental/OakIncrementalIndex.java create mode 100644 processing/src/main/java/io/druid/segment/incremental/oak/Oak.java diff --git a/processing/src/main/java/io/druid/segment/incremental/OakIncrementalIndex.java b/processing/src/main/java/io/druid/segment/incremental/OakIncrementalIndex.java new file mode 100644 index 000000000000..c7c1fc9c5860 --- /dev/null +++ b/processing/src/main/java/io/druid/segment/incremental/OakIncrementalIndex.java @@ -0,0 +1,243 @@ +package io.druid.segment.incremental; + +import com.google.common.base.Supplier; +import com.google.common.collect.Iterators; +import com.google.common.collect.Maps; +import io.druid.data.input.InputRow; +import io.druid.data.input.MapBasedRow; +import io.druid.data.input.Row; +import io.druid.query.aggregation.AggregatorFactory; +import io.druid.query.aggregation.BufferAggregator; +import io.druid.query.aggregation.PostAggregator; +import io.druid.segment.ColumnSelectorFactory; +import io.druid.segment.DimensionHandler; +import io.druid.segment.DimensionIndexer; +import io.druid.segment.incremental.oak.Oak; + +import java.nio.ByteBuffer; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.concurrent.atomic.AtomicInteger; + +/** + * Created by dbasin on 6/11/17. + */ +public class OakIncrementalIndex extends IncrementalIndex { + /** + * Setting deserializeComplexMetrics to false is necessary for intermediate aggregation such as groupBy that + * should not deserialize input columns using ComplexMetricSerde for aggregators that return complex metrics. + * + * @param incrementalIndexSchema the schema to use for incremental index + * @param deserializeComplexMetrics flag whether or not to call ComplexMetricExtractor.extractValue() on the input + * value for aggregators that return metrics other than float. + * @param reportParseExceptions flag whether or not to report ParseExceptions that occur while extracting values + */ + private FactsHolder factsHolder; + private Oak rows; + + private volatile Map selectors; + //given a ByteBuffer and an offset where all aggregates for a row are stored + //offset + aggOffsetInBuffer[i] would give position in ByteBuffer where ith aggregate + //is stored + private volatile int[] aggOffsetInBuffer; + + + public OakIncrementalIndex(IncrementalIndexSchema incrementalIndexSchema, boolean deserializeComplexMetrics, boolean reportParseExceptions) { + super(incrementalIndexSchema, deserializeComplexMetrics, reportParseExceptions); + + factsHolder = new OakFactsHolder(); + } + + + class OakFactsHolder implements FactsHolder { + @Override + public int getPriorIndex(TimeAndDims key) { + throw new UnsupportedOperationException(); + } + + @Override + public long getMinTimeMillis() { + return 0; + } + + @Override + public long getMaxTimeMillis() { + return 0; + } + + @Override + public Iterator iterator(boolean descending) { + return descending ? rows.keySet().descendingIterator() : rows.keySet().iterator(); + } + + @Override + public Iterable timeRangeIterable(boolean descending, long timeStart, long timeEnd) { + return null; + } + + @Override + public Iterable keySet() { + return rows.keySet(); + } + + @Override + public int putIfAbsent(TimeAndDims key, int rowIndex) { + throw new UnsupportedOperationException(); + } + + @Override + public void clear() { + + } + } + + @Override + public FactsHolder getFacts() { + return factsHolder; + } + + @Override + public boolean canAppendRow() { + return false; + } + + @Override + public String getOutOfRowsReason() { + return null; + } + + // copied from OffheapIncrementalIndex + @Override + protected BufferAggregator[] initAggs(AggregatorFactory[] metrics, Supplier rowSupplier, boolean deserializeComplexMetrics) { + selectors = Maps.newHashMap(); + aggOffsetInBuffer = new int[metrics.length]; + + for (int i = 0; i < metrics.length; i++) { + AggregatorFactory agg = metrics[i]; + + ColumnSelectorFactory columnSelectorFactory = makeColumnSelectorFactory( + agg, + rowSupplier, + deserializeComplexMetrics + ); + + selectors.put( + agg.getName(), + new OnheapIncrementalIndex.ObjectCachingColumnSelectorFactory(columnSelectorFactory) + ); + + if (i == 0) { + aggOffsetInBuffer[i] = 0; + } else { + aggOffsetInBuffer[i] = aggOffsetInBuffer[i-1] + metrics[i-1].getMaxIntermediateSize(); + } + } + + aggsTotalSize = aggOffsetInBuffer[metrics.length - 1] + metrics[metrics.length - 1].getMaxIntermediateSize(); + + return new BufferAggregator[metrics.length]; + + } + + @Override + protected Integer addToFacts(AggregatorFactory[] metrics, boolean deserializeComplexMetrics, boolean reportParseExceptions, InputRow row, AtomicInteger numEntries, TimeAndDims key, ThreadLocal rowContainer, Supplier rowSupplier) throws IndexSizeExceededException { + if(metrics.length > 0 && getAggs()[0] == null) { + // init aggreagators lazily + } + + rows.compute(key, (k, v) -> v == null ? initBuffer(k, row) : aggregateValue(v, row) ); + return rows.size(); + } + + @Override + public int getLastRowIndex() { + throw new UnsupportedOperationException(); + } + + @Override + protected BufferAggregator[] getAggsForRow(int rowOffset) { + throw new UnsupportedOperationException(); + } + + @Override + protected Object getAggVal(BufferAggregator agg, int rowOffset, int aggPosition) { + return null; + } + + @Override + protected float getMetricFloatValue(int rowOffset, int aggOffset) { + throw new UnsupportedOperationException(); + } + + @Override + protected long getMetricLongValue(int rowOffset, int aggOffset) { + throw new UnsupportedOperationException(); + } + + @Override + protected Object getMetricObjectValue(int rowOffset, int aggOffset) { + throw new UnsupportedOperationException(); + } + + + // Override here to use oak entries iterator + @Override + public Iterable iterableWithPostAggregations(final List postAggs, final boolean descending) { + return new Iterable() + { + @Override + public Iterator iterator() + { + final List dimensions = getDimensions(); + + Iterator> entryIterator = descending ? rows.descendingMap().entrySet().iterator() : rows.entrySet().iterator(); + return Iterators.transform( + entryIterator, + entry -> { + TimeAndDims timeAndDims = entry.getKey(); + + + Object[] theDims = timeAndDims.getDims(); + + Map theVals = Maps.newLinkedHashMap(); + for (int i = 0; i < theDims.length; ++i) { + Object dim = theDims[i]; + DimensionDesc dimensionDesc = dimensions.get(i); + if (dimensionDesc == null) { + continue; + } + String dimensionName = dimensionDesc.getName(); + DimensionHandler handler = dimensionDesc.getHandler(); + if (dim == null || handler.getLengthOfEncodedKeyComponent(dim) == 0) { + theVals.put(dimensionName, null); + continue; + } + final DimensionIndexer indexer = dimensionDesc.getIndexer(); + Object rowVals = indexer.convertUnsortedEncodedKeyComponentToActualArrayOrList(dim, DimensionIndexer.LIST); + theVals.put(dimensionName, rowVals); + } + + BufferAggregator[] aggs = getAggs(); + for (int i = 0; i < aggs.length; ++i) { + theVals.put(getMetricNames().get(i), getAggVal(i, entry.getValue())); + } + + if (postAggs != null) { + for (PostAggregator postAgg : postAggs) { + theVals.put(postAgg.getName(), postAgg.compute(theVals)); + } + } + + return new MapBasedRow(timeAndDims.getTimestamp(), theVals); + } + ); + } + }; + + } + + private Object getAggVal(int aggIdx, ByteBuffer value) { + throw new UnsupportedOperationException(); + } +} diff --git a/processing/src/main/java/io/druid/segment/incremental/oak/Oak.java b/processing/src/main/java/io/druid/segment/incremental/oak/Oak.java new file mode 100644 index 000000000000..f48cd84932df --- /dev/null +++ b/processing/src/main/java/io/druid/segment/incremental/oak/Oak.java @@ -0,0 +1,245 @@ +package io.druid.segment.incremental.oak; + + +import java.nio.ByteBuffer; +import java.util.*; +import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.ConcurrentNavigableMap; +import java.util.function.BiFunction; +import java.util.function.Function; + +/** + * Created by dbasin on 6/11/17. + */ +public class Oak implements ConcurrentNavigableMap { + + public Function getSerializer() { + return serializer; + } + + public Function getDeserializer() { + return deserializer; + } + + public Comparator getComparator() { + return comparator; + } + + Function serializer; + Function deserializer; + Comparator comparator; + + public Oak(Function serializer, Function deserializer, Comparator comparator) { + this.serializer = serializer; + this.deserializer = deserializer; + this.comparator = comparator; + } + + @Override + public int size() { + return 0; + } + + @Override + public boolean isEmpty() { + return false; + } + + @Override + public boolean containsKey(Object key) { + return false; + } + + @Override + public boolean containsValue(Object value) { + return false; + } + + @Override + public ByteBuffer get(Object key) { + return null; + } + + @Override + public ByteBuffer put(K key, ByteBuffer value) { + return null; + } + + @Override + public ByteBuffer remove(Object key) { + return null; + } + + @Override + public void putAll(Map m) { + + } + + @Override + public void clear() { + + } + + @Override + public ConcurrentNavigableMap subMap(K fromKey, boolean fromInclusive, K toKey, boolean toInclusive) { + return null; + } + + @Override + public ConcurrentNavigableMap headMap(K toKey, boolean inclusive) { + return null; + } + + @Override + public ConcurrentNavigableMap tailMap(K fromKey, boolean inclusive) { + return null; + } + + @Override + public Comparator comparator() { + return null; + } + + @Override + public ConcurrentNavigableMap subMap(K fromKey, K toKey) { + return null; + } + + @Override + public ConcurrentNavigableMap headMap(K toKey) { + return null; + } + + @Override + public ConcurrentNavigableMap tailMap(K fromKey) { + return null; + } + + @Override + public K firstKey() { + return null; + } + + @Override + public K lastKey() { + return null; + } + + @Override + public Entry lowerEntry(K key) { + return null; + } + + @Override + public K lowerKey(K key) { + return null; + } + + @Override + public Entry floorEntry(K key) { + return null; + } + + @Override + public K floorKey(K key) { + return null; + } + + @Override + public Entry ceilingEntry(K key) { + return null; + } + + @Override + public K ceilingKey(K key) { + return null; + } + + @Override + public Entry higherEntry(K key) { + return null; + } + + @Override + public K higherKey(K key) { + return null; + } + + @Override + public Entry firstEntry() { + return null; + } + + @Override + public Entry lastEntry() { + return null; + } + + @Override + public Entry pollFirstEntry() { + return null; + } + + @Override + public Entry pollLastEntry() { + return null; + } + + @Override + public ConcurrentNavigableMap descendingMap() { + return null; + } + + @Override + public NavigableSet navigableKeySet() { + return null; + } + + @Override + public NavigableSet keySet() { + return null; + } + + + @Override + public NavigableSet descendingKeySet() { + return null; + } + + @Override + public Collection values() { + return null; + } + + @Override + public Set> entrySet() { + return null; + } + + @Override + public ByteBuffer putIfAbsent(K key, ByteBuffer value) { + return null; + } + + @Override + public boolean remove(Object key, Object value) { + return false; + } + + @Override + public boolean replace(K key, ByteBuffer oldValue, ByteBuffer newValue) { + return false; + } + + @Override + public ByteBuffer replace(K key, ByteBuffer value) { + return null; + } + + @Override + public ByteBuffer compute(K key, + BiFunction remappingFunction) { + throw new UnsupportedOperationException(); + } + +} From c3508b5bb0befcdd008211baea03bc74e6562c2e Mon Sep 17 00:00:00 2001 From: Dmitry Basin Date: Tue, 20 Jun 2017 17:43:33 +0300 Subject: [PATCH 2/2] Preliminary integration with Oak --- .../incremental/OakIncrementalIndex.java | 22 ++++++++-------- .../io/druid/segment/incremental/oak/Oak.java | 25 ++++++++++++++----- 2 files changed, 31 insertions(+), 16 deletions(-) diff --git a/processing/src/main/java/io/druid/segment/incremental/OakIncrementalIndex.java b/processing/src/main/java/io/druid/segment/incremental/OakIncrementalIndex.java index c7c1fc9c5860..a4bd368b6ed2 100644 --- a/processing/src/main/java/io/druid/segment/incremental/OakIncrementalIndex.java +++ b/processing/src/main/java/io/druid/segment/incremental/OakIncrementalIndex.java @@ -50,7 +50,7 @@ public OakIncrementalIndex(IncrementalIndexSchema incrementalIndexSchema, boolea } - class OakFactsHolder implements FactsHolder { + class OakFactsHolder implements FactsHolder { @Override public int getPriorIndex(TimeAndDims key) { throw new UnsupportedOperationException(); @@ -92,6 +92,17 @@ public void clear() { } } + + @Override + protected Integer addToFacts(AggregatorFactory[] metrics, boolean deserializeComplexMetrics, boolean reportParseExceptions, InputRow row, AtomicInteger numEntries, TimeAndDims key, ThreadLocal rowContainer, Supplier rowSupplier) throws IndexSizeExceededException { + if(metrics.length > 0 && getAggs()[0] == null) { + // init aggreagators lazily + } + + rows.compute(key, (k, v) -> v == null ? initBuffer(k, row) : aggregateValue(v, row) ); + return rows.size(); + } + @Override public FactsHolder getFacts() { return factsHolder; @@ -140,15 +151,6 @@ protected BufferAggregator[] initAggs(AggregatorFactory[] metrics, Supplier rowContainer, Supplier rowSupplier) throws IndexSizeExceededException { - if(metrics.length > 0 && getAggs()[0] == null) { - // init aggreagators lazily - } - - rows.compute(key, (k, v) -> v == null ? initBuffer(k, row) : aggregateValue(v, row) ); - return rows.size(); - } @Override public int getLastRowIndex() { diff --git a/processing/src/main/java/io/druid/segment/incremental/oak/Oak.java b/processing/src/main/java/io/druid/segment/incremental/oak/Oak.java index f48cd84932df..381abaeb2d36 100644 --- a/processing/src/main/java/io/druid/segment/incremental/oak/Oak.java +++ b/processing/src/main/java/io/druid/segment/incremental/oak/Oak.java @@ -3,7 +3,6 @@ import java.nio.ByteBuffer; import java.util.*; -import java.util.concurrent.ConcurrentMap; import java.util.concurrent.ConcurrentNavigableMap; import java.util.function.BiFunction; import java.util.function.Function; @@ -35,6 +34,25 @@ public Oak(Function serializer, Function deseriali this.comparator = comparator; } + /*** + * The function executes atomically provided as input updateFunction. + * @param key The key to be updated + * + * @param updateFunction The update function accepts reference to internal ByteBuffer and can update it in place + * and then return as a result of function call. + * If the key is not present in OakMap, updateFunction is expected to create + * a new on-heap ByteBuffer and return it as a result of function call. In this case, OakMap + * will create appropriate off-heap buffer internally and copy there the result. + * + * + */ + @Override + public ByteBuffer compute(K key, + BiFunction updateFunction) { + throw new UnsupportedOperationException(); + } + + @Override public int size() { return 0; @@ -236,10 +254,5 @@ public ByteBuffer replace(K key, ByteBuffer value) { return null; } - @Override - public ByteBuffer compute(K key, - BiFunction remappingFunction) { - throw new UnsupportedOperationException(); - } }