diff --git a/processing/src/main/java/io/druid/segment/incremental/OakIncrementalIndex.java b/processing/src/main/java/io/druid/segment/incremental/OakIncrementalIndex.java new file mode 100644 index 000000000000..a4bd368b6ed2 --- /dev/null +++ b/processing/src/main/java/io/druid/segment/incremental/OakIncrementalIndex.java @@ -0,0 +1,245 @@ +package io.druid.segment.incremental; + +import com.google.common.base.Supplier; +import com.google.common.collect.Iterators; +import com.google.common.collect.Maps; +import io.druid.data.input.InputRow; +import io.druid.data.input.MapBasedRow; +import io.druid.data.input.Row; +import io.druid.query.aggregation.AggregatorFactory; +import io.druid.query.aggregation.BufferAggregator; +import io.druid.query.aggregation.PostAggregator; +import io.druid.segment.ColumnSelectorFactory; +import io.druid.segment.DimensionHandler; +import io.druid.segment.DimensionIndexer; +import io.druid.segment.incremental.oak.Oak; + +import java.nio.ByteBuffer; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.concurrent.atomic.AtomicInteger; + +/** + * Created by dbasin on 6/11/17. + */ +public class OakIncrementalIndex extends IncrementalIndex { + /** + * Setting deserializeComplexMetrics to false is necessary for intermediate aggregation such as groupBy that + * should not deserialize input columns using ComplexMetricSerde for aggregators that return complex metrics. + * + * @param incrementalIndexSchema the schema to use for incremental index + * @param deserializeComplexMetrics flag whether or not to call ComplexMetricExtractor.extractValue() on the input + * value for aggregators that return metrics other than float. + * @param reportParseExceptions flag whether or not to report ParseExceptions that occur while extracting values + */ + private FactsHolder factsHolder; + private Oak rows; + + private volatile Map selectors; + //given a ByteBuffer and an offset where all aggregates for a row are stored + //offset + aggOffsetInBuffer[i] would give position in ByteBuffer where ith aggregate + //is stored + private volatile int[] aggOffsetInBuffer; + + + public OakIncrementalIndex(IncrementalIndexSchema incrementalIndexSchema, boolean deserializeComplexMetrics, boolean reportParseExceptions) { + super(incrementalIndexSchema, deserializeComplexMetrics, reportParseExceptions); + + factsHolder = new OakFactsHolder(); + } + + + class OakFactsHolder implements FactsHolder { + @Override + public int getPriorIndex(TimeAndDims key) { + throw new UnsupportedOperationException(); + } + + @Override + public long getMinTimeMillis() { + return 0; + } + + @Override + public long getMaxTimeMillis() { + return 0; + } + + @Override + public Iterator iterator(boolean descending) { + return descending ? rows.keySet().descendingIterator() : rows.keySet().iterator(); + } + + @Override + public Iterable timeRangeIterable(boolean descending, long timeStart, long timeEnd) { + return null; + } + + @Override + public Iterable keySet() { + return rows.keySet(); + } + + @Override + public int putIfAbsent(TimeAndDims key, int rowIndex) { + throw new UnsupportedOperationException(); + } + + @Override + public void clear() { + + } + } + + + @Override + protected Integer addToFacts(AggregatorFactory[] metrics, boolean deserializeComplexMetrics, boolean reportParseExceptions, InputRow row, AtomicInteger numEntries, TimeAndDims key, ThreadLocal rowContainer, Supplier rowSupplier) throws IndexSizeExceededException { + if(metrics.length > 0 && getAggs()[0] == null) { + // init aggreagators lazily + } + + rows.compute(key, (k, v) -> v == null ? initBuffer(k, row) : aggregateValue(v, row) ); + return rows.size(); + } + + @Override + public FactsHolder getFacts() { + return factsHolder; + } + + @Override + public boolean canAppendRow() { + return false; + } + + @Override + public String getOutOfRowsReason() { + return null; + } + + // copied from OffheapIncrementalIndex + @Override + protected BufferAggregator[] initAggs(AggregatorFactory[] metrics, Supplier rowSupplier, boolean deserializeComplexMetrics) { + selectors = Maps.newHashMap(); + aggOffsetInBuffer = new int[metrics.length]; + + for (int i = 0; i < metrics.length; i++) { + AggregatorFactory agg = metrics[i]; + + ColumnSelectorFactory columnSelectorFactory = makeColumnSelectorFactory( + agg, + rowSupplier, + deserializeComplexMetrics + ); + + selectors.put( + agg.getName(), + new OnheapIncrementalIndex.ObjectCachingColumnSelectorFactory(columnSelectorFactory) + ); + + if (i == 0) { + aggOffsetInBuffer[i] = 0; + } else { + aggOffsetInBuffer[i] = aggOffsetInBuffer[i-1] + metrics[i-1].getMaxIntermediateSize(); + } + } + + aggsTotalSize = aggOffsetInBuffer[metrics.length - 1] + metrics[metrics.length - 1].getMaxIntermediateSize(); + + return new BufferAggregator[metrics.length]; + + } + + + @Override + public int getLastRowIndex() { + throw new UnsupportedOperationException(); + } + + @Override + protected BufferAggregator[] getAggsForRow(int rowOffset) { + throw new UnsupportedOperationException(); + } + + @Override + protected Object getAggVal(BufferAggregator agg, int rowOffset, int aggPosition) { + return null; + } + + @Override + protected float getMetricFloatValue(int rowOffset, int aggOffset) { + throw new UnsupportedOperationException(); + } + + @Override + protected long getMetricLongValue(int rowOffset, int aggOffset) { + throw new UnsupportedOperationException(); + } + + @Override + protected Object getMetricObjectValue(int rowOffset, int aggOffset) { + throw new UnsupportedOperationException(); + } + + + // Override here to use oak entries iterator + @Override + public Iterable iterableWithPostAggregations(final List postAggs, final boolean descending) { + return new Iterable() + { + @Override + public Iterator iterator() + { + final List dimensions = getDimensions(); + + Iterator> entryIterator = descending ? rows.descendingMap().entrySet().iterator() : rows.entrySet().iterator(); + return Iterators.transform( + entryIterator, + entry -> { + TimeAndDims timeAndDims = entry.getKey(); + + + Object[] theDims = timeAndDims.getDims(); + + Map theVals = Maps.newLinkedHashMap(); + for (int i = 0; i < theDims.length; ++i) { + Object dim = theDims[i]; + DimensionDesc dimensionDesc = dimensions.get(i); + if (dimensionDesc == null) { + continue; + } + String dimensionName = dimensionDesc.getName(); + DimensionHandler handler = dimensionDesc.getHandler(); + if (dim == null || handler.getLengthOfEncodedKeyComponent(dim) == 0) { + theVals.put(dimensionName, null); + continue; + } + final DimensionIndexer indexer = dimensionDesc.getIndexer(); + Object rowVals = indexer.convertUnsortedEncodedKeyComponentToActualArrayOrList(dim, DimensionIndexer.LIST); + theVals.put(dimensionName, rowVals); + } + + BufferAggregator[] aggs = getAggs(); + for (int i = 0; i < aggs.length; ++i) { + theVals.put(getMetricNames().get(i), getAggVal(i, entry.getValue())); + } + + if (postAggs != null) { + for (PostAggregator postAgg : postAggs) { + theVals.put(postAgg.getName(), postAgg.compute(theVals)); + } + } + + return new MapBasedRow(timeAndDims.getTimestamp(), theVals); + } + ); + } + }; + + } + + private Object getAggVal(int aggIdx, ByteBuffer value) { + throw new UnsupportedOperationException(); + } +} diff --git a/processing/src/main/java/io/druid/segment/incremental/oak/Oak.java b/processing/src/main/java/io/druid/segment/incremental/oak/Oak.java new file mode 100644 index 000000000000..381abaeb2d36 --- /dev/null +++ b/processing/src/main/java/io/druid/segment/incremental/oak/Oak.java @@ -0,0 +1,258 @@ +package io.druid.segment.incremental.oak; + + +import java.nio.ByteBuffer; +import java.util.*; +import java.util.concurrent.ConcurrentNavigableMap; +import java.util.function.BiFunction; +import java.util.function.Function; + +/** + * Created by dbasin on 6/11/17. + */ +public class Oak implements ConcurrentNavigableMap { + + public Function getSerializer() { + return serializer; + } + + public Function getDeserializer() { + return deserializer; + } + + public Comparator getComparator() { + return comparator; + } + + Function serializer; + Function deserializer; + Comparator comparator; + + public Oak(Function serializer, Function deserializer, Comparator comparator) { + this.serializer = serializer; + this.deserializer = deserializer; + this.comparator = comparator; + } + + /*** + * The function executes atomically provided as input updateFunction. + * @param key The key to be updated + * + * @param updateFunction The update function accepts reference to internal ByteBuffer and can update it in place + * and then return as a result of function call. + * If the key is not present in OakMap, updateFunction is expected to create + * a new on-heap ByteBuffer and return it as a result of function call. In this case, OakMap + * will create appropriate off-heap buffer internally and copy there the result. + * + * + */ + @Override + public ByteBuffer compute(K key, + BiFunction updateFunction) { + throw new UnsupportedOperationException(); + } + + + @Override + public int size() { + return 0; + } + + @Override + public boolean isEmpty() { + return false; + } + + @Override + public boolean containsKey(Object key) { + return false; + } + + @Override + public boolean containsValue(Object value) { + return false; + } + + @Override + public ByteBuffer get(Object key) { + return null; + } + + @Override + public ByteBuffer put(K key, ByteBuffer value) { + return null; + } + + @Override + public ByteBuffer remove(Object key) { + return null; + } + + @Override + public void putAll(Map m) { + + } + + @Override + public void clear() { + + } + + @Override + public ConcurrentNavigableMap subMap(K fromKey, boolean fromInclusive, K toKey, boolean toInclusive) { + return null; + } + + @Override + public ConcurrentNavigableMap headMap(K toKey, boolean inclusive) { + return null; + } + + @Override + public ConcurrentNavigableMap tailMap(K fromKey, boolean inclusive) { + return null; + } + + @Override + public Comparator comparator() { + return null; + } + + @Override + public ConcurrentNavigableMap subMap(K fromKey, K toKey) { + return null; + } + + @Override + public ConcurrentNavigableMap headMap(K toKey) { + return null; + } + + @Override + public ConcurrentNavigableMap tailMap(K fromKey) { + return null; + } + + @Override + public K firstKey() { + return null; + } + + @Override + public K lastKey() { + return null; + } + + @Override + public Entry lowerEntry(K key) { + return null; + } + + @Override + public K lowerKey(K key) { + return null; + } + + @Override + public Entry floorEntry(K key) { + return null; + } + + @Override + public K floorKey(K key) { + return null; + } + + @Override + public Entry ceilingEntry(K key) { + return null; + } + + @Override + public K ceilingKey(K key) { + return null; + } + + @Override + public Entry higherEntry(K key) { + return null; + } + + @Override + public K higherKey(K key) { + return null; + } + + @Override + public Entry firstEntry() { + return null; + } + + @Override + public Entry lastEntry() { + return null; + } + + @Override + public Entry pollFirstEntry() { + return null; + } + + @Override + public Entry pollLastEntry() { + return null; + } + + @Override + public ConcurrentNavigableMap descendingMap() { + return null; + } + + @Override + public NavigableSet navigableKeySet() { + return null; + } + + @Override + public NavigableSet keySet() { + return null; + } + + + @Override + public NavigableSet descendingKeySet() { + return null; + } + + @Override + public Collection values() { + return null; + } + + @Override + public Set> entrySet() { + return null; + } + + @Override + public ByteBuffer putIfAbsent(K key, ByteBuffer value) { + return null; + } + + @Override + public boolean remove(Object key, Object value) { + return false; + } + + @Override + public boolean replace(K key, ByteBuffer oldValue, ByteBuffer newValue) { + return false; + } + + @Override + public ByteBuffer replace(K key, ByteBuffer value) { + return null; + } + + +}