diff --git a/tree/ntuple/inc/ROOT/RFieldBase.hxx b/tree/ntuple/inc/ROOT/RFieldBase.hxx index f82e83ea7061a..85e12aa012140 100644 --- a/tree/ntuple/inc/ROOT/RFieldBase.hxx +++ b/tree/ntuple/inc/ROOT/RFieldBase.hxx @@ -247,14 +247,15 @@ private: func(target); } - /// Translate an entry index to a column element index of the principal column and vice versa. These functions - /// take into account the role and number of repetitions on each level of the field hierarchy as follows: + /// Translate an entry index to a column element index of the principal column. This function + /// takes into account the role and number of repetitions on each level of the field hierarchy as follows: /// - Top level fields: element index == entry index /// - Record fields propagate their principal column index to the principal columns of direct descendant fields /// - Collection and variant fields set the principal column index of their children to 0 /// /// The column element index also depends on the number of repetitions of each field in the hierarchy, e.g., given a - /// field with type `std::array, 2>`, this function returns 8 for the innermost field. + /// field with type `std::array, 2>`, this function called with `globalIndex == 1` + /// returns 8 for the innermost field. ROOT::NTupleSize_t EntryToColumnElementIndex(ROOT::NTupleSize_t globalIndex) const; /// Flushes data from active columns diff --git a/tree/ntuple/inc/ROOT/RNTupleDescriptor.hxx b/tree/ntuple/inc/ROOT/RNTupleDescriptor.hxx index a5a736010c223..fa34b90f52c27 100644 --- a/tree/ntuple/inc/ROOT/RNTupleDescriptor.hxx +++ b/tree/ntuple/inc/ROOT/RNTupleDescriptor.hxx @@ -519,6 +519,12 @@ public: ROOT::NTupleSize_t GetFirstEntryIndex() const { return fFirstEntryIndex; } ROOT::NTupleSize_t GetNEntries() const { return fNEntries; } const RColumnRange &GetColumnRange(ROOT::DescriptorId_t physicalId) const { return fColumnRanges.at(physicalId); } + const RColumnRange *TryGetColumnRange(ROOT::DescriptorId_t physicalId) const + { + if (auto it = fColumnRanges.find(physicalId); it != fColumnRanges.end()) + return &it->second; + return nullptr; + } const RPageRange &GetPageRange(ROOT::DescriptorId_t physicalId) const { return fPageRanges.at(physicalId); } /// Returns an iterator over pairs { columnId, columnRange }. The iteration order is unspecified. RColumnRangeIterable GetColumnRangeIterable() const; diff --git a/tree/ntuple/inc/ROOT/RNTupleMerger.hxx b/tree/ntuple/inc/ROOT/RNTupleMerger.hxx index 10885c8406756..2ace1f9b21417 100644 --- a/tree/ntuple/inc/ROOT/RNTupleMerger.hxx +++ b/tree/ntuple/inc/ROOT/RNTupleMerger.hxx @@ -118,16 +118,16 @@ class RNTupleMerger final { std::unique_ptr fModel; [[nodiscard]] - ROOT::RResult MergeCommonColumns(ROOT::Internal::RClusterPool &clusterPool, - const ROOT::RClusterDescriptor &clusterDesc, - std::span commonColumns, - const ROOT::Internal::RCluster::ColumnSet_t &commonColumnSet, - std::size_t nCommonColumnsInCluster, RSealedPageMergeData &sealedPageData, - const RNTupleMergeData &mergeData, ROOT::Internal::RPageAllocator &pageAlloc); + ROOT::RResult + MergeCommonColumns(ROOT::Internal::RClusterPool &clusterPool, const ROOT::RClusterDescriptor &clusterDesc, + std::span commonColumns, + const ROOT::Internal::RCluster::ColumnSet_t &commonColumnSet, std::size_t nCommonColumnsInCluster, + RSealedPageMergeData &sealedPageData, const RNTupleMergeData &mergeData, + ROOT::Internal::RPageAllocator &pageAlloc); [[nodiscard]] ROOT::RResult - MergeSourceClusters(ROOT::Internal::RPageSource &source, std::span commonColumns, + MergeSourceClusters(ROOT::Internal::RPageSource &source, std::span commonColumns, std::span extraDstColumns, RNTupleMergeData &mergeData); /// Creates a RNTupleMerger with the given destination. diff --git a/tree/ntuple/inc/ROOT/RPageStorage.hxx b/tree/ntuple/inc/ROOT/RPageStorage.hxx index a7c0441e3597e..7df9f08a2dae6 100644 --- a/tree/ntuple/inc/ROOT/RPageStorage.hxx +++ b/tree/ntuple/inc/ROOT/RPageStorage.hxx @@ -544,6 +544,15 @@ public: [[nodiscard]] std::unique_ptr InitFromDescriptor(const ROOT::RNTupleDescriptor &descriptor, bool copyClusters); + /// Adds a new column representation to the given field. + /// \return The physical id of the first newly added column. + ROOT::DescriptorId_t + AddColumnRepresentation(const ROOT::RFieldDescriptor &field, std::span newRepresentation); + + /// Adds a new alias column pointing to an existing column with the given physical id to the given field. + void AddAliasColumn(const ROOT::RNTupleDescriptor &desc, const ROOT::RFieldDescriptor &field, + ROOT::DescriptorId_t physicalId); + void CommitSuppressedColumn(ColumnHandle_t columnHandle) final; void CommitPage(ColumnHandle_t columnHandle, const ROOT::Internal::RPage &page) final; void CommitSealedPage(ROOT::DescriptorId_t physicalColumnId, const RPageStorage::RSealedPage &sealedPage) final; diff --git a/tree/ntuple/src/RFieldBase.cxx b/tree/ntuple/src/RFieldBase.cxx index 38b70e5ad60b5..79479df716a96 100644 --- a/tree/ntuple/src/RFieldBase.cxx +++ b/tree/ntuple/src/RFieldBase.cxx @@ -668,14 +668,14 @@ void ROOT::RFieldBase::Attach(std::unique_ptr child, std::stri ROOT::NTupleSize_t ROOT::RFieldBase::EntryToColumnElementIndex(ROOT::NTupleSize_t globalIndex) const { - std::size_t result = globalIndex; + ROOT::NTupleSize_t result = globalIndex; for (auto f = this; f != nullptr; f = f->GetParent()) { auto parent = f->GetParent(); if (parent && (parent->GetStructure() == ROOT::ENTupleStructure::kCollection || parent->GetStructure() == ROOT::ENTupleStructure::kVariant)) { return 0U; } - result *= std::max(f->GetNRepetitions(), std::size_t{1U}); + result *= std::max(f->GetNRepetitions(), ROOT::NTupleSize_t{1U}); } return result; } diff --git a/tree/ntuple/src/RNTupleDescriptor.cxx b/tree/ntuple/src/RNTupleDescriptor.cxx index f4a8b72b62c8b..57463ff6324f7 100644 --- a/tree/ntuple/src/RNTupleDescriptor.cxx +++ b/tree/ntuple/src/RNTupleDescriptor.cxx @@ -961,8 +961,16 @@ ROOT::Internal::RClusterDescriptorBuilder::AddExtendedColumnRanges(const RNTuple // `ROOT::RFieldBase::EntryToColumnElementIndex()`, i.e. it is a principal column reachable from the // field zero excluding subfields of collection and variant fields. if (c.IsDeferredColumn()) { - columnRange.SetFirstElementIndex(fCluster.GetFirstEntryIndex() * nRepetitions); - columnRange.SetNElements(fCluster.GetNEntries() * nRepetitions); + if (c.GetRepresentationIndex() == 0) { + columnRange.SetFirstElementIndex(fCluster.GetFirstEntryIndex() * nRepetitions); + columnRange.SetNElements(fCluster.GetNEntries() * nRepetitions); + } else { + const auto &field = desc.GetFieldDescriptor(fieldId); + const auto firstReprColumnId = field.GetLogicalColumnIds()[c.GetIndex()]; + const auto &firstReprColumnRange = fCluster.fColumnRanges[firstReprColumnId]; + columnRange.SetFirstElementIndex(firstReprColumnRange.GetFirstElementIndex()); + columnRange.SetNElements(firstReprColumnRange.GetNElements()); + } if (!columnRange.IsSuppressed()) { auto &pageRange = fCluster.fPageRanges[physicalId]; pageRange.fPhysicalColumnId = physicalId; diff --git a/tree/ntuple/src/RNTupleMerger.cxx b/tree/ntuple/src/RNTupleMerger.cxx index 22eda7acfd09d..1a92067e775c1 100644 --- a/tree/ntuple/src/RNTupleMerger.cxx +++ b/tree/ntuple/src/RNTupleMerger.cxx @@ -270,12 +270,10 @@ try { } namespace { -// Functor used to change the compression of a page to `options.fCompressionSettings`. +// Functor used to change the compression of a page to `fCompressionSettings`. struct RChangeCompressionFunc { const RColumnElementBase &fSrcColElement; - const RColumnElementBase &fDstColElement; - const RNTupleMergeOptions &fMergeOptions; - + std::uint32_t fCompressionSettings; RPageStorage::RSealedPage &fSealedPage; ROOT::Internal::RPageAllocator &fPageAlloc; std::uint8_t *fBuffer; @@ -284,52 +282,25 @@ struct RChangeCompressionFunc { void operator()() const { - assert(fSrcColElement.GetIdentifier() == fDstColElement.GetIdentifier()); - fSealedPage.VerifyChecksumIfEnabled().ThrowOnError(); const auto bytesPacked = fSrcColElement.GetPackedSize(fSealedPage.GetNElements()); // TODO: this buffer could be kept and reused across pages + // TODO: if Zip is a no-op (compression == 0) we can unzip directly in fBuffer! auto unzipBuf = MakeUninitArray(bytesPacked); ROOT::Internal::RNTupleDecompressor::Unzip(fSealedPage.GetBuffer(), fSealedPage.GetDataSize(), bytesPacked, unzipBuf.get()); const auto checksumSize = fWriteOpts.GetEnablePageChecksums() * sizeof(std::uint64_t); assert(fBufSize >= bytesPacked + checksumSize); - auto nBytesZipped = ROOT::Internal::RNTupleCompressor::Zip(unzipBuf.get(), bytesPacked, - fMergeOptions.fCompressionSettings.value(), fBuffer); + auto nBytesZipped = + ROOT::Internal::RNTupleCompressor::Zip(unzipBuf.get(), bytesPacked, fCompressionSettings, fBuffer); fSealedPage = {fBuffer, nBytesZipped + checksumSize, fSealedPage.GetNElements(), fSealedPage.GetHasChecksum()}; fSealedPage.ChecksumIfEnabled(); } }; -struct RResealFunc { - const RColumnElementBase &fSrcColElement; - const RColumnElementBase &fDstColElement; - const RNTupleMergeOptions &fMergeOptions; - - RPageStorage::RSealedPage &fSealedPage; - ROOT::Internal::RPageAllocator &fPageAlloc; - std::uint8_t *fBuffer; - std::size_t fBufSize; - const ROOT::RNTupleWriteOptions &fWriteOpts; - - void operator()() const - { - auto page = RPageSource::UnsealPage(fSealedPage, fSrcColElement, fPageAlloc).Unwrap(); - RPageSink::RSealPageConfig sealConf; - sealConf.fElement = &fDstColElement; - sealConf.fPage = &page; - sealConf.fBuffer = fBuffer; - sealConf.fCompressionSettings = *fMergeOptions.fCompressionSettings; - sealConf.fWriteChecksum = fWriteOpts.GetEnablePageChecksums(); - assert(fBufSize >= fSealedPage.GetDataSize() + fSealedPage.GetHasChecksum() * sizeof(std::uint64_t)); - auto refSealedPage = RPageSink::SealPage(sealConf); - fSealedPage = refSealedPage; - } -}; - struct RTaskVisitor { std::optional &fGroup; @@ -350,23 +321,47 @@ struct RCommonField { RCommonField(const ROOT::RFieldDescriptor &src, const ROOT::RFieldDescriptor &dst) : fSrc(&src), fDst(&dst) {} }; +struct RColReprMapping { + std::uint32_t fSource; + std::uint32_t fDest; +}; + +struct RColReprExtension : RColReprMapping { + ROOT::RFieldBase::ColumnRepresentation_t fSourceRepr; +}; + +template +using FieldCollectionMap_t = std::unordered_map>; + +static std::optional +FindColumnReprMapping(const std::vector &mappings, std::uint32_t sourceReprIndex) +{ + for (const auto [src, dst] : mappings) + if (src == sourceReprIndex) + return dst; + return std::nullopt; +} + struct RDescriptorsComparison { std::vector fExtraDstFields; std::vector fExtraSrcFields; std::vector fCommonFields; + // For each field that has more than 1 column representation in the output model, + // maps the column representatives of the source field with those of the destination. + // The key is the destination field. + FieldCollectionMap_t fColReprMappings; + FieldCollectionMap_t fColReprExtensions; }; struct RColumnOutInfo { - ROOT::DescriptorId_t fColumnId; - ENTupleColumnType fColumnType; + ROOT::DescriptorId_t fColumnId = ROOT::kInvalidDescriptorId; }; -// { fully.qualified.fieldName.colInputId => colOutputInfo } +// { ".fully.qualified.fieldName.colInputIndex.colOutputReprIndex" => colOutputInfo } using ColumnIdMap_t = std::unordered_map; struct RColumnInfoGroup { std::vector fExtraDstColumns; - // These are sorted by InputId std::vector fCommonColumns; }; @@ -380,14 +375,14 @@ struct RColumnMergeInfo { // e.g. "Muon.pt.x._0" std::string fColumnName; // The column id in the source RNTuple - ROOT::DescriptorId_t fInputId; + ROOT::DescriptorId_t fInputId = kInvalidDescriptorId; // The corresponding column id in the destination RNTuple (the mapping happens in AddColumnsFromField()) - ROOT::DescriptorId_t fOutputId; - ENTupleColumnType fColumnType; + ROOT::DescriptorId_t fOutputId = kInvalidDescriptorId; + std::uint16_t fOutputReprIndex = 0; // If nullopt, use the default in-memory type std::optional fInMemoryType; - const ROOT::RFieldDescriptor *fParentFieldDescriptor; - const ROOT::RNTupleDescriptor *fParentNTupleDescriptor; + const ROOT::RFieldDescriptor *fParentFieldDescriptor = nullptr; + const ROOT::RNTupleDescriptor *fParentNTupleDescriptor = nullptr; }; // Data related to a single call of RNTupleMerger::Merge() @@ -399,6 +394,7 @@ struct RNTupleMergeData { const ROOT::RNTupleDescriptor *fSrcDescriptor = nullptr; std::vector fColumns; + // Maps input column IDs to output IDs ColumnIdMap_t fColumnIdMap; ROOT::NTupleSize_t fNumDstEntries = 0; @@ -429,33 +425,6 @@ std::ostream &operator<<(std::ostream &os, const std::optional @@ -483,8 +452,9 @@ CompareDescriptorStructure(const ROOT::RNTupleDescriptor &dst, const ROOT::RNTup } for (const auto &srcField : src.GetTopLevelFields()) { const auto dstFieldId = dst.FindFieldId(srcField.GetFieldName()); - if (dstFieldId == ROOT::kInvalidDescriptorId) + if (dstFieldId == ROOT::kInvalidDescriptorId) { res.fExtraSrcFields.push_back(&srcField); + } } // Check compatibility of common fields @@ -571,55 +541,103 @@ CompareDescriptorStructure(const ROOT::RNTupleDescriptor &dst, const ROOT::RNTup } // Require that column representations match - const auto srcNCols = field.fSrc->GetLogicalColumnIds().size(); - const auto dstNCols = field.fDst->GetLogicalColumnIds().size(); - if (srcNCols != dstNCols) { - std::stringstream ss; - ss << "Field `" << field.fSrc->GetFieldName() - << "` has a different number of columns than previously-seen field with the same name (old: " << dstNCols - << ", new: " << srcNCols << ")"; - errors.push_back(ss.str()); - } else { - for (auto i = 0u; i < srcNCols; ++i) { - const auto srcColId = field.fSrc->GetLogicalColumnIds()[i]; - const auto dstColId = field.fDst->GetLogicalColumnIds()[i]; - const auto &srcCol = src.GetColumnDescriptor(srcColId); - const auto &dstCol = dst.GetColumnDescriptor(dstColId); - // TODO(gparolini): currently we refuse to merge columns of different types unless they are Split/non-Split - // version of the same type, because we know how to treat that specific case. We should also properly handle - // different but compatible types. - if (srcCol.GetType() != dstCol.GetType() && - !IsSplitOrUnsplitVersionOf(srcCol.GetType(), dstCol.GetType())) { - std::stringstream ss; - ss << i << "-th column of field `" << field.fSrc->GetFieldName() - << "` has a different column type of the same column on the previously-seen field with the same name " - "(old: " - << RColumnElementBase::GetColumnTypeName(srcCol.GetType()) - << ", new: " << RColumnElementBase::GetColumnTypeName(dstCol.GetType()) << ")"; - errors.push_back(ss.str()); - } - if (srcCol.GetBitsOnStorage() != dstCol.GetBitsOnStorage()) { - std::stringstream ss; - ss << i << "-th column of field `" << field.fSrc->GetFieldName() - << "` has a different number of bits of the same column on the previously-seen field with the same " - "name " - "(old: " - << srcCol.GetBitsOnStorage() << ", new: " << dstCol.GetBitsOnStorage() << ")"; - errors.push_back(ss.str()); - } - if (srcCol.GetValueRange() != dstCol.GetValueRange()) { - std::stringstream ss; - ss << i << "-th column of field `" << field.fSrc->GetFieldName() - << "` has a different value range of the same column on the previously-seen field with the same name " - "(old: " - << srcCol.GetValueRange() << ", new: " << dstCol.GetValueRange() << ")"; - errors.push_back(ss.str()); - } - if (srcCol.GetRepresentationIndex() > 0) { + if (!field.fSrc->IsProjectedField()) { + const auto &srcColumns = field.fSrc->GetLogicalColumnIds(); + const auto &dstColumns = field.fDst->GetLogicalColumnIds(); + const auto srcNCols = srcColumns.size(); + const auto dstNCols = dstColumns.size(); + if (srcNCols != dstNCols) { + std::stringstream ss; + ss << "Field `" << field.fSrc->GetFieldName() + << "` has a different number of columns than previously-seen field with the same name (old: " << dstNCols + << ", new: " << srcNCols << ")"; + errors.push_back(ss.str()); + } else { + const std::uint32_t srcColCardinality = field.fSrc->GetColumnCardinality(); + const std::uint32_t dstColCardinality = field.fDst->GetColumnCardinality(); + if (srcColCardinality != dstColCardinality) { std::stringstream ss; - ss << i << "-th column of field `" << field.fSrc->GetFieldName() - << "` has a representation index higher than 0. This is not supported yet by the merger."; + ss << "Field `" << field.fSrc->GetFieldName() + << "` has a different column cardinality than previously-seen field with the same name (old: " + << dstColCardinality << ", new: " << srcColCardinality << ")"; errors.push_back(ss.str()); + } else if (srcColCardinality > 0) { + const auto srcNColReprs = srcNCols / srcColCardinality; + const auto dstNColReprs = dstNCols / dstColCardinality; + + // For each column representation of the source, check if it matches one in the descriptor. + // If so, and if it doesn't match the destination's repr index, add a mapping for it. + // If nothing matches, schedule the column representation to be added later. + // NOTE: this has quadratic complexity but the numbers involved are small so it's fine. + for (auto srcReprIdx = 0u; srcReprIdx < srcNColReprs; ++srcReprIdx) { + std::int64_t matchingRepr = -1; + for (auto dstReprIdx = 0u; dstReprIdx < dstNColReprs; ++dstReprIdx) { + bool matches = true; + for (auto reprColIdx = 0u; reprColIdx < srcColCardinality; ++reprColIdx) { + const auto srcColId = srcColumns[srcReprIdx * srcColCardinality + reprColIdx]; + const auto &srcCol = src.GetColumnDescriptor(srcColId); + const auto dstColId = dstColumns[dstReprIdx * dstColCardinality + reprColIdx]; + const auto &dstCol = dst.GetColumnDescriptor(dstColId); + if (srcCol.GetType() != dstCol.GetType()) { + matches = false; + break; + } + } + + if (matches) { + // If this column representation matches by column type, we need to make sure that it also has + // matching column metadata. Since we currently do not support multiple column representations + // that only differ by such metadata, we forbid merging such columns (e.g. we cannot merge two + // Real32Trunc columns with different bit widths). This could technically be supported, but it + // would require significant effort, so we currently don't. + for (auto reprColIdx = 0u; reprColIdx < srcColCardinality; ++reprColIdx) { + const auto srcColId = srcColumns[srcReprIdx * srcColCardinality + reprColIdx]; + const auto &srcCol = src.GetColumnDescriptor(srcColId); + const auto dstColId = dstColumns[dstReprIdx * dstColCardinality + reprColIdx]; + const auto &dstCol = dst.GetColumnDescriptor(dstColId); + if (srcCol.GetBitsOnStorage() != dstCol.GetBitsOnStorage() || + srcCol.GetValueRange() != dstCol.GetValueRange()) { + std::stringstream ss; + ss << "Source field `" << field.fSrc->GetFieldName() + << "` has a matching column representation as its destination field, however one or " + "more " + "of its columns have different column metadata (bit width and/or value range). " + "Merging variable-sized columns is currently only supported if all metadata is " + "identical between source and destination columns." + << "\n bit width src: " << srcCol.GetBitsOnStorage() + << ", dst: " << dstCol.GetBitsOnStorage() << "" + << "\n value range src: " << srcCol.GetValueRange() + << ", dst: " << dstCol.GetValueRange(); + errors.push_back(ss.str()); + break; + } + } + matchingRepr = dstReprIdx; + break; + } + } + + if (errors.empty()) { + if (matchingRepr >= 0 && matchingRepr != srcReprIdx) { + // a different matching representation was found + assert(matchingRepr < std::numeric_limits::max()); + res.fColReprMappings[field.fDst].push_back( + RColReprMapping{srcReprIdx, static_cast(matchingRepr)}); + } else if (matchingRepr < 0) { + // this representation was not found in the destination + assert(dstNColReprs < std::numeric_limits::max()); + ROOT::RFieldBase::ColumnRepresentation_t newRepr; + for (auto reprColIdx = 0u; reprColIdx < srcColCardinality; ++reprColIdx) { + const auto srcColId = srcColumns[srcReprIdx * srcColCardinality + reprColIdx]; + const auto &srcCol = src.GetColumnDescriptor(srcColId); + newRepr.push_back(srcCol.GetType()); + } + RColReprExtension extension{{srcReprIdx, static_cast(dstNColReprs)}, newRepr}; + res.fColReprExtensions[field.fDst].push_back(extension); + res.fColReprMappings[field.fDst].push_back(extension); + } + } + } } } } @@ -657,13 +675,13 @@ CompareDescriptorStructure(const ROOT::RNTupleDescriptor &dst, const ROOT::RNTup return ROOT::RResult(res); } -// Applies late model extension to `destination`, adding all `newFields` to it. +// Applies late model extension to `mergeData.fDestination`, adding all `descCmp.fExtraSrcFields` to it. [[nodiscard]] static ROOT::RResult -ExtendDestinationModel(std::span newFields, ROOT::RNTupleModel &dstModel, - RNTupleMergeData &mergeData, std::vector &commonFields) +ExtendDestinationModel(RDescriptorsComparison &descCmp, ROOT::RNTupleModel &dstModel, RNTupleMergeData &mergeData) { - assert(newFields.size() > 0); // no point in calling this with 0 new cols + const auto &newFields = descCmp.fExtraSrcFields; + auto &commonFields = descCmp.fCommonFields; dstModel.Unfreeze(); ROOT::Internal::RNTupleModelChangeset changeset{dstModel}; @@ -683,10 +701,19 @@ ExtendDestinationModel(std::span newFields, ROOT changeset.fAddedFields.reserve(newFields.size()); // First add all non-projected fields... for (const auto *fieldDesc : newFields) { - if (!fieldDesc->IsProjectedField()) { - auto field = fieldDesc->CreateField(*mergeData.fSrcDescriptor); - changeset.AddField(std::move(field)); + if (fieldDesc->IsProjectedField()) + continue; + + auto field = fieldDesc->CreateField(*mergeData.fSrcDescriptor); + // Explicitly set the field representatives. This prevents UpdateSchema() from changing our column + // representations via AutoAdjustColumnTypes. + ROOT::RFieldBase::ColumnRepresentation_t representatives; + for (const auto &colId : fieldDesc->GetLogicalColumnIds()) { + const auto &column = mergeData.fSrcDescriptor->GetColumnDescriptor(colId); + representatives.push_back(column.GetType()); } + field->SetColumnRepresentatives({representatives}); + changeset.AddField(std::move(field)); } // ...then add all projected fields. for (const auto *fieldDesc : newFields) { @@ -711,16 +738,39 @@ ExtendDestinationModel(std::span newFields, ROOT } dstModel.Freeze(); try { + // XXX: here we are connecting the new fields/columns to the sink! + // We should avoid doing that, as all other fields never get connected. + // NOTE: this calls AutoAdjustColumnTypes, but we have set the column representations of all fields + // explicitly, so it will not change it under the hood. mergeData.fDestination.UpdateSchema(changeset, mergeData.fNumDstEntries); } catch (const ROOT::RException &ex) { return R__FAIL(ex.GetError().GetReport()); } commonFields.reserve(commonFields.size() + newFields.size()); - for (const auto *field : newFields) { + // NOTE(gparolini): Insert the new fields at the beginning of `commonFields`. + // We need to make sure the extended fields appear before all other common fields for the following reason: + // in general, when we GatherColumnInfos we (potentially) assign new column output ids in field order; this + // assignment happens whenever we find new columns, which happens in 3 cases: + // 1. we are in the first source and we're adding the first set of (common) fields; + // 2. we are adding a new set of extended common fields (which come from this function); + // 3. we are adding new column representations for fields that we already had before processing this source. + // + // It's important that the output id assigned to the new columns is coherent with the order of the column descriptors + // as they appear in the header and footer. This is in turn determined by the order by which we append new columns to + // the dst descriptor during the merging process. + // Since we call ExtendDestinationModel (this function) *before* adding the new column representations, it is always + // the case that the dst descriptor gets updated with the new column descriptors coming from the extended fields + // (since they are added in UpdateSchema a few lines above) before it gets updated with the extended column + // representations (which happens later in sink->AddColumnRepresentation). + // However, the new column output ids are added sequentially in *field* order in GatherColumnInfos and the fields + // containing the new column representations are already in that list from earlier! So, to make sure the new output + // ids are assigned to our extended fields first, we push them in from on the list so that they are visited first. + for (auto it = newFields.rbegin(); it != newFields.rend(); ++it) { + const auto *field = *it; const auto newFieldInDstId = mergeData.fDstDescriptor.FindFieldId(field->GetFieldName()); const auto &newFieldInDst = mergeData.fDstDescriptor.GetFieldDescriptor(newFieldInDstId); - commonFields.emplace_back(*field, newFieldInDst); + commonFields.insert(commonFields.begin(), RCommonField{*field, newFieldInDst}); } return ROOT::RResult::Success(); @@ -764,10 +814,10 @@ GenerateZeroPagesForColumns(size_t nEntriesToGenerate, std::spanGetStructure(); if (structure == ROOT::ENTupleStructure::kStreamer) { - return R__FAIL( - "Destination RNTuple contains a streamer field (" + field->GetFieldName() + - ") that is not present in one of the sources. " - "Creating a default value for a streamer field is ill-defined, therefore the merging process will abort."); + return R__FAIL("Destination RNTuple contains a streamer field (" + field->GetFieldName() + + ") that is not present in one of the sources. " + "Creating a default value for a streamer field is ill-defined, therefore the merging " + "process will abort."); } // NOTE: we cannot have a Record here because it has no associated columns. @@ -814,7 +864,7 @@ GenerateZeroPagesForColumns(size_t nEntriesToGenerate, std::span RNTupleMerger::MergeCommonColumns(ROOT::Internal::RClusterPool &clusterPool, const ROOT::RClusterDescriptor &clusterDesc, - std::span commonColumns, + std::span commonColumns, const RCluster::ColumnSet_t &commonColumnSet, std::size_t nCommonColumnsInCluster, RSealedPageMergeData &sealedPageData, const RNTupleMergeData &mergeData, ROOT::Internal::RPageAllocator &pageAlloc) @@ -826,9 +876,11 @@ RNTupleMerger::MergeCommonColumns(ROOT::Internal::RClusterPool &clusterPool, const RCluster *cluster = clusterPool.GetCluster(clusterDesc.GetId(), commonColumnSet); // we expect the cluster pool to contain the requested set of columns, since they were - // validated by CompareDescriptorStructure(). + // validated by CompareDescriptorStructure() and MergeSourceClusters(). assert(cluster); + const std::uint32_t outCompression = mergeData.fMergeOpts.fCompressionSettings.value(); + for (size_t colIdx = 0; colIdx < nCommonColumnsInCluster; ++colIdx) { const auto &column = commonColumns[colIdx]; const auto &columnId = column.fInputId; @@ -838,9 +890,6 @@ RNTupleMerger::MergeCommonColumns(ROOT::Internal::RClusterPool &clusterPool, const auto srcColElement = column.fInMemoryType ? ROOT::Internal::GenerateColumnElement(*column.fInMemoryType, columnDesc.GetType()) : RColumnElementBase::Generate(columnDesc.GetType()); - const auto dstColElement = column.fInMemoryType - ? ROOT::Internal::GenerateColumnElement(*column.fInMemoryType, column.fColumnType) - : RColumnElementBase::Generate(column.fColumnType); // Now get the pages for this column in this cluster const auto &pages = clusterDesc.GetPageRange(columnId); @@ -855,24 +904,21 @@ RNTupleMerger::MergeCommonColumns(ROOT::Internal::RClusterPool &clusterPool, // L1: compression and encoding of src and dest both match: we can simply copy the page // L2: compression of dest doesn't match the src but encoding does: we must recompress the page but can avoid // resealing it. - // L3: on-disk encoding doesn't match: we need to reseal the page, which implies decompressing and recompressing + // L3: on-disk encoding doesn't match: we need to reseal the page, which implies decompressing and + // recompressing // it. - const bool compressionIsDifferent = - colRangeCompressionSettings != mergeData.fMergeOpts.fCompressionSettings.value(); - const bool needsResealing = - srcColElement->GetIdentifier().fOnDiskType != dstColElement->GetIdentifier().fOnDiskType; - const bool needsRecompressing = compressionIsDifferent || needsResealing; + const bool compressionIsDifferent = colRangeCompressionSettings != outCompression; + const bool needsRecompressing = compressionIsDifferent; if (needsRecompressing && mergeData.fMergeOpts.fExtraVerbose) { - R__LOG_INFO(NTupleMergeLog()) - << (needsResealing ? "Resealing" : "Recompressing") << " column " << column.fColumnName - << ": { compression: " << colRangeCompressionSettings << " => " - << mergeData.fMergeOpts.fCompressionSettings.value() - << ", onDiskType: " << RColumnElementBase::GetColumnTypeName(srcColElement->GetIdentifier().fOnDiskType) - << " => " << RColumnElementBase::GetColumnTypeName(dstColElement->GetIdentifier().fOnDiskType); + R__LOG_INFO(NTupleMergeLog()) << "Recompressing column " << column.fColumnName + << ": { compression: " << colRangeCompressionSettings << " => " + << mergeData.fMergeOpts.fCompressionSettings.value() << ", onDiskType: " + << RColumnElementBase::GetColumnTypeName( + srcColElement->GetIdentifier().fOnDiskType); } - size_t pageBufferBaseIdx = sealedPageData.fBuffers.size(); + const size_t pageBufferBaseIdx = sealedPageData.fBuffers.size(); // If the column range already has the right compression we don't need to allocate any new buffer, so we don't // bother reserving memory for them. if (needsRecompressing) @@ -921,29 +967,15 @@ RNTupleMerger::MergeCommonColumns(ROOT::Internal::RClusterPool &clusterPool, buffer = MakeUninitArray(bufSize); // clang-format off - if (needsResealing) { - RTaskVisitor{fTaskGroup}(RResealFunc{ - *srcColElement, - *dstColElement, - mergeData.fMergeOpts, - sealedPage, - *fPageAlloc, - buffer.get(), - bufSize, - mergeData.fDestination.GetWriteOptions() - }); - } else { - RTaskVisitor{fTaskGroup}(RChangeCompressionFunc{ - *srcColElement, - *dstColElement, - mergeData.fMergeOpts, - sealedPage, - *fPageAlloc, - buffer.get(), - bufSize, - mergeData.fDestination.GetWriteOptions() - }); - } + RTaskVisitor{fTaskGroup}(RChangeCompressionFunc{ + *srcColElement, + outCompression, + sealedPage, + *fPageAlloc, + buffer.get(), + bufSize, + mergeData.fDestination.GetWriteOptions() + }); // clang-format on } @@ -967,9 +999,9 @@ RNTupleMerger::MergeCommonColumns(ROOT::Internal::RClusterPool &clusterPool, // the destination's schemas. // The pages may be "fast-merged" (i.e. simply copied with no decompression/recompression) if the target // compression is unspecified or matches the original compression settings. -ROOT::RResult -RNTupleMerger::MergeSourceClusters(RPageSource &source, std::span commonColumns, - std::span extraDstColumns, RNTupleMergeData &mergeData) +ROOT::RResult RNTupleMerger::MergeSourceClusters(RPageSource &source, std::span commonColumns, + std::span extraDstColumns, + RNTupleMergeData &mergeData) { ROOT::Internal::RClusterPool clusterPool{source}; @@ -984,31 +1016,60 @@ RNTupleMerger::MergeSourceClusters(RPageSource &source, std::span 0); - // NOTE: just because a column is in `commonColumns` it doesn't mean that each cluster in the source contains it, - // as it may be a deferred column that only has real data in a future cluster. - // We need to figure out which columns are actually present in this cluster so we only merge their pages (the - // missing columns are handled by synthesizing zero pages - see below). - size_t nCommonColumnsInCluster = commonColumns.size(); - while (nCommonColumnsInCluster > 0) { - // Since `commonColumns` is sorted by column input id, we can simply traverse it from the back and stop as - // soon as we find a common column that appears in this cluster: we know that in that case all previous - // columns must appear as well. - if (clusterDesc.ContainsColumn(commonColumns[nCommonColumnsInCluster - 1].fInputId)) - break; - --nCommonColumnsInCluster; - } - + // Deduce which columns are suppressed (cluster by cluster) by exclusion, as: + // (columns in the columnIdMap) - (columns in commonColumns which are not suppressed). + // Note that some suppressed columns may not be in commonColumns because they might not appear at all in the + // current source. + FieldCollectionMap_t activeColumns; + + // NOTE: just because a column is in `commonColumns` it doesn't mean that each cluster in the source contains + // it, as it may be a deferred column that only has real data in a future cluster. We need to figure out which + // columns are actually present in this cluster so we only merge their pages (the missing columns are handled + // by synthesizing zero pages - see below). + size_t nCommonColumnsInCluster = 0; // Convert columns to a ColumnSet for the ClusterPool query RCluster::ColumnSet_t commonColumnSet; commonColumnSet.reserve(nCommonColumnsInCluster); - for (size_t i = 0; i < nCommonColumnsInCluster; ++i) - commonColumnSet.emplace(commonColumns[i].fInputId); + // Collect all common columns appearing in this cluster into commonColumnSet and reorganize commonColumns so + // that those columns are at the start of it (whereas missing columns are at its end). + // NOTE: it's fine if this scrambles the order of columns: the RNTupleSerializer will sort them by physical ID. + std::partition(commonColumns.begin(), commonColumns.end(), [&](const auto &column) { + if (const auto *colRange = clusterDesc.TryGetColumnRange(column.fInputId)) { + if (!colRange->IsSuppressed()) { + ++nCommonColumnsInCluster; + commonColumnSet.emplace(column.fInputId); + activeColumns[column.fParentFieldDescriptor].push_back(column.fOutputId); + return true; + } + } + return false; + }); - // For each cluster, the "missing columns" are the union of the extraDstColumns and the common columns - // that are not present in the cluster. We generate zero pages for all of them. - missingColumns.resize(extraDstColumns.size()); - for (size_t i = nCommonColumnsInCluster; i < commonColumns.size(); ++i) - missingColumns.push_back(commonColumns[i]); + // Commit all suppressed columns. + // This is a fairly involved operation, as we need to commit all known columns that: + // a) do not appear in extraDstColumns (those are "missing", not suppressed), and + // b) do not appear in commonColumnSet (those are the active columns). + // Not that these may or may not appear in commonColumns as suppressed columns, since they may or may not be + // present in the current source. + // The only way to find all the columns is to go and get them from fColumnIdMap, which keeps track of every + // column we added to the destination so far. However, since it also contains the extraDstColumns, we need to + // specifically only query those columns that belong to a field that has at least 1 column in commonColumns + // (remember that commonColumns contains all columns associated to the common fields for this source). + for (const auto &[fieldDesc, activeIds] : activeColumns) { + const auto &fieldFQName = mergeData.fSrcDescriptor->GetQualifiedFieldName(fieldDesc->GetId()); + const auto cardinality = fieldDesc->GetColumnCardinality(); + for (auto i = 0u; i < fieldDesc->GetLogicalColumnIds().size(); ++i) { + const auto colIndex = i % cardinality; + const auto reprIndex = i / cardinality; + const auto colName = "." + fieldFQName + '.' + std::to_string(colIndex) + '.' + std::to_string(reprIndex); + const auto colIt = mergeData.fColumnIdMap.find(colName); + assert(colIt != mergeData.fColumnIdMap.end()); + const auto colOutId = colIt->second.fColumnId; + if (std::find(activeIds.begin(), activeIds.end(), colOutId) == activeIds.end()) { + mergeData.fDestination.CommitSuppressedColumn(ROOT::Internal::RPageStorage::ColumnHandle_t{colOutId}); + } + } + } RSealedPageMergeData sealedPageData; auto res = MergeCommonColumns(clusterPool, clusterDesc, commonColumns, commonColumnSet, nCommonColumnsInCluster, @@ -1016,6 +1077,14 @@ RNTupleMerger::MergeSourceClusters(RPageSource &source, std::span ColumnInMemoryType(std::string_view fieldT return std::nullopt; } -// Given a field, fill `columns` and `mergeData.fColumnIdMap` with information about all columns belonging to it and its -// subfields. `mergeData.fColumnIdMap` is used to map matching columns from different sources to the same output column -// in the destination. We match columns by their "fully qualified name", which is the concatenation of their ancestor -// fields' names and the column index. By this point, since we called `CompareDescriptorStructure()` earlier, we should -// be guaranteed that two matching columns will have at least compatible representations. NOTE: srcFieldDesc and -// dstFieldDesc may alias. +// Given a field, fill `columns` and `mergeData.fColumnIdMap` with information about all columns belonging to it and +// its subfields. `mergeData.fColumnIdMap` is used to map matching columns from different sources to the same output +// column in the destination. We match columns by their "fully qualified name", which is the concatenation of their +// ancestor fields' names and the column index. By this point, since we called `CompareDescriptorStructure()` +// earlier, we should be guaranteed that two matching columns will have at least compatible representations. +// This function is recursive as it needs to call itself on the entire subfield hierarchy of the source field. +// NOTE: srcFieldDesc and dstFieldDesc may alias. static void AddColumnsFromField(std::vector &columns, const ROOT::RNTupleDescriptor &srcDesc, + const FieldCollectionMap_t &colReprMappings, RNTupleMergeData &mergeData, const ROOT::RFieldDescriptor &srcFieldDesc, const ROOT::RFieldDescriptor &dstFieldDesc, const std::string &prefix = "") { std::string name = prefix + '.' + srcFieldDesc.GetFieldName(); + // We don't want to try and merge alias columns + if (srcFieldDesc.IsProjectedField()) + return; + const auto &columnIds = srcFieldDesc.GetLogicalColumnIds(); columns.reserve(columns.size() + columnIds.size()); - // NOTE: here we can match the src and dst columns by column index because we forbid merging fields with - // different column representations. - for (auto i = 0u; i < srcFieldDesc.GetLogicalColumnIds().size(); ++i) { - // We don't want to try and merge alias columns - if (srcFieldDesc.IsProjectedField()) - continue; + for (auto i = 0u; i < srcFieldDesc.GetLogicalColumnIds().size(); ++i) { auto srcColumnId = srcFieldDesc.GetLogicalColumnIds()[i]; const auto &srcColumn = srcDesc.GetColumnDescriptor(srcColumnId); RColumnMergeInfo info{}; - info.fColumnName = name + '.' + std::to_string(srcColumn.GetIndex()); info.fInputId = srcColumn.GetPhysicalId(); // NOTE(gparolini): the parent field is used when synthesizing zero pages, which happens in 2 situations: // 1. when adding extra dst columns (in which case we need to synthesize zero pages for the incoming src), and // 2. when merging a deferred column into an existing column (in which case we need to fill the "hole" with - // zeroes). For the first case srcFieldDesc and dstFieldDesc are the same (see the calling site of this function), - // but for the second case they're not, and we need to pick the source field because we will then check the - // column's *input* id inside fParentFieldDescriptor to see if it's a suppressed column (see + // zeroes). For the first case srcFieldDesc and dstFieldDesc are the same (see the calling site of this + // function), but for the second case they're not, and we need to pick the source field because we will then + // check the column's *input* id inside fParentFieldDescriptor to see if it's a suppressed column (see // GenerateZeroPagesForColumns()). info.fParentFieldDescriptor = &srcFieldDesc; // Save the parent field descriptor since this may be either the source or destination descriptor depending on @@ -1108,22 +1177,34 @@ static void AddColumnsFromField(std::vector &columns, const RO // properly walk up the field hierarchy. info.fParentNTupleDescriptor = &srcDesc; + const auto mappingsIt = colReprMappings.find(&dstFieldDesc); + std::uint16_t reprIndex = srcColumn.GetRepresentationIndex(); + if (mappingsIt != colReprMappings.end()) { + if (auto outReprIdx = FindColumnReprMapping(mappingsIt->second, reprIndex); outReprIdx) + reprIndex = *outReprIdx; + } + + info.fColumnName = name + '.' + std::to_string(srcColumn.GetIndex()) + '.' + std::to_string(reprIndex); + + ENTupleColumnType columnType = ENTupleColumnType::kUnknown; + if (auto it = mergeData.fColumnIdMap.find(info.fColumnName); it != mergeData.fColumnIdMap.end()) { + // We had already added this column to the column id map: just copy its data. info.fOutputId = it->second.fColumnId; - info.fColumnType = it->second.fColumnType; + info.fOutputReprIndex = reprIndex; } else { + // New column: assign it the next ouput id. info.fOutputId = mergeData.fColumnIdMap.size(); - // NOTE(gparolini): map the type of src column to the type of dst column. - // This mapping is only relevant for common columns and it's done to ensure we keep a consistent - // on-disk representation of the same column. - // This is also important to do for first source when it is used to generate the destination sink, - // because even in that case their column representations may differ. - // e.g. if the destination has a different compression than the source, an integer column might be - // zigzag-encoded in the source but not in the destination. - auto dstColumnId = dstFieldDesc.GetLogicalColumnIds()[i]; + // NOTE(gparolini): map the representation index of src column to that of dst column. + // This mapping is only relevant for common columns and it's done to ensure we have the correct representation + // index in the output column metadata. + assert(dstFieldDesc.GetColumnCardinality() == srcFieldDesc.GetColumnCardinality()); + const auto dstColumnIndex = reprIndex * dstFieldDesc.GetColumnCardinality() + srcColumn.GetIndex(); + const auto dstColumnId = dstFieldDesc.GetLogicalColumnIds()[dstColumnIndex]; const auto &dstColumn = mergeData.fDstDescriptor.GetColumnDescriptor(dstColumnId); - info.fColumnType = dstColumn.GetType(); - mergeData.fColumnIdMap[info.fColumnName] = {info.fOutputId, info.fColumnType}; + columnType = dstColumn.GetType(); + info.fOutputReprIndex = reprIndex; + mergeData.fColumnIdMap[info.fColumnName] = RColumnOutInfo{info.fOutputId}; } if (mergeData.fMergeOpts.fExtraVerbose) { @@ -1131,12 +1212,12 @@ static void AddColumnsFromField(std::vector &columns, const RO << ", phys.id " << srcColumn.GetPhysicalId() << ", type " << RColumnElementBase::GetColumnTypeName(srcColumn.GetType()) << " -> log.id " << info.fOutputId << ", type " - << RColumnElementBase::GetColumnTypeName(info.fColumnType); + << RColumnElementBase::GetColumnTypeName(columnType); } // Since we disallow merging fields of different types, src and dstFieldDesc must have the same type name. assert(srcFieldDesc.GetTypeName() == dstFieldDesc.GetTypeName()); - info.fInMemoryType = ColumnInMemoryType(srcFieldDesc.GetTypeName(), info.fColumnType); + info.fInMemoryType = ColumnInMemoryType(srcFieldDesc.GetTypeName(), columnType); columns.emplace_back(info); } @@ -1146,7 +1227,7 @@ static void AddColumnsFromField(std::vector &columns, const RO for (auto i = 0u; i < srcChildrenIds.size(); ++i) { const auto &srcChild = srcDesc.GetFieldDescriptor(srcChildrenIds[i]); const auto &dstChild = mergeData.fDstDescriptor.GetFieldDescriptor(dstChildrenIds[i]); - AddColumnsFromField(columns, srcDesc, mergeData, srcChild, dstChild, name); + AddColumnsFromField(columns, srcDesc, colReprMappings, mergeData, srcChild, dstChild, name); } } @@ -1158,17 +1239,12 @@ static RColumnInfoGroup GatherColumnInfos(const RDescriptorsComparison &descCmp, { RColumnInfoGroup res; for (const ROOT::RFieldDescriptor *field : descCmp.fExtraDstFields) { - AddColumnsFromField(res.fExtraDstColumns, mergeData.fDstDescriptor, mergeData, *field, *field); + AddColumnsFromField(res.fExtraDstColumns, mergeData.fDstDescriptor, descCmp.fColReprMappings, mergeData, *field, + *field); } for (const auto &[srcField, dstField] : descCmp.fCommonFields) { - AddColumnsFromField(res.fCommonColumns, srcDesc, mergeData, *srcField, *dstField); + AddColumnsFromField(res.fCommonColumns, srcDesc, descCmp.fColReprMappings, mergeData, *srcField, *dstField); } - - // Sort the commonColumns by ID so we can more easily tell how many common columns each cluster has - // (since each cluster must contain all columns of the previous cluster plus potentially some new ones) - std::sort(res.fCommonColumns.begin(), res.fCommonColumns.end(), - [](const auto &a, const auto &b) { return a.fInputId < b.fInputId; }); - return res; } @@ -1179,9 +1255,9 @@ static void PrefillColumnMap(const ROOT::RNTupleDescriptor &desc, const ROOT::RF for (const auto &colId : fieldDesc.GetLogicalColumnIds()) { const auto &colDesc = desc.GetColumnDescriptor(colId); RColumnOutInfo info{}; - const auto colName = name + '.' + std::to_string(colDesc.GetIndex()); info.fColumnId = colDesc.GetLogicalId(); - info.fColumnType = colDesc.GetType(); + const auto colName = + name + '.' + std::to_string(colDesc.GetIndex()) + '.' + std::to_string(colDesc.GetRepresentationIndex()); colIdMap[colName] = info; } @@ -1191,6 +1267,25 @@ static void PrefillColumnMap(const ROOT::RNTupleDescriptor &desc, const ROOT::RF } } +static void AddColumnExtensionsInFieldOrder( + const ROOT::RFieldDescriptor &field, const ROOT::RNTupleDescriptor &desc, + const FieldCollectionMap_t &extensions, + std::vector>> &outExtensions, + std::unordered_map> &outProjectionPointees) +{ + const auto it = extensions.find(&field); + if (it != extensions.end()) + outExtensions.emplace_back(it->first, it->second); + + if (field.IsProjectedField()) + outProjectionPointees[field.GetProjectionSourceId()].push_back(&field); + + for (auto childId : field.GetLinkIds()) { + const auto &child = desc.GetFieldDescriptor(childId); + AddColumnExtensionsInFieldOrder(child, desc, extensions, outExtensions, outProjectionPointees); + } +} + RNTupleMerger::RNTupleMerger(std::unique_ptr destination, std::unique_ptr model) // TODO(gparolini): consider using an arena allocator instead, since we know the precise lifetime @@ -1231,6 +1326,10 @@ ROOT::RResult RNTupleMerger::Merge(std::span sources, const } } + // Maps projection source fields to all their projections. + std::unordered_map> projectionPointees; + bool projectionPointeesInitialized = false; + // we should have a model if and only if the destination is initialized. if (!!fModel != fDestination->IsInitialized()) { return R__FAIL( @@ -1282,7 +1381,7 @@ ROOT::RResult RNTupleMerger::Merge(std::span sources, const } } - // Create sink from the input model if not initialized + // Create sink and model from the input descriptor if not initialized if (!fModel) { fModel = fDestination->InitFromDescriptor(srcDescriptor.GetRef(), false /* copyClusters */); } @@ -1308,10 +1407,10 @@ ROOT::RResult RNTupleMerger::Merge(std::span sources, const } // handle extra src fields - if (descCmp.fExtraSrcFields.size()) { + if (!descCmp.fExtraSrcFields.empty()) { if (mergeOpts.fMergingMode == ENTupleMergingMode::kUnion) { // late model extension for all fExtraSrcFields in Union mode - auto res = ExtendDestinationModel(descCmp.fExtraSrcFields, *fModel, mergeData, descCmp.fCommonFields); + auto res = ExtendDestinationModel(descCmp, *fModel, mergeData); if (!res) return R__FORWARD_ERROR(res); } else if (mergeOpts.fMergingMode == ENTupleMergingMode::kStrict) { @@ -1324,6 +1423,53 @@ ROOT::RResult RNTupleMerger::Merge(std::span sources, const } } + //// Extend columns if needed + if (!descCmp.fColReprExtensions.empty()) { + if (!projectionPointeesInitialized) { + for (const auto &field : descCmp.fExtraDstFields) { + if (field->IsProjectedField()) + projectionPointees[field->GetProjectionSourceId()].push_back(field); + } + projectionPointeesInitialized = true; + } + + // We need to extend the columns in the proper order, i.e. so that they appear in the same order as + // their first representation. This is to ensure that the pages we write to the cluster are in a consistent + // order as their column descriptors. The page creation order is determined by the order of + // columnInfos.fCommonColumns, which in turn depends on the common fields order (see GatherColumnInfos). + // XXX: do we need this separate sort step? Why not just create this vector directly in + // CompareDescriptorStructure? + std::vector>> colExtensions; + colExtensions.reserve(descCmp.fColReprExtensions.size()); + for (const auto &commonField : descCmp.fCommonFields) { + const auto *field = commonField.fDst; + AddColumnExtensionsInFieldOrder(*field, mergeData.fDstDescriptor, descCmp.fColReprExtensions, colExtensions, + projectionPointees); + } + for (const auto &field : descCmp.fExtraSrcFields) { + if (field->IsProjectedField()) + projectionPointees[field->GetProjectionSourceId()].push_back(field); + } + + for (const auto &[fieldDesc, extensions] : colExtensions) { + auto &mappings = descCmp.fColReprMappings[fieldDesc]; + for (const auto &extension : extensions) { + const auto firstColumnId = fDestination->AddColumnRepresentation(*fieldDesc, extension.fSourceRepr); + + // When adding new column representations to an existing field which is the source of some projected + // fields, we need to also add new alias columns to those fields so that they can point to the proper + // representation. + if (auto it = projectionPointees.find(fieldDesc->GetId()); it != projectionPointees.end()) { + for (const auto &projection : it->second) { + for (auto colIdx = 0u; colIdx < extension.fSourceRepr.size(); ++colIdx) + fDestination->AddAliasColumn(mergeData.fDstDescriptor, *projection, firstColumnId + colIdx); + } + } + mappings.push_back(extension); + } + } + } + // handle extra dst fields & common fields auto columnInfos = GatherColumnInfos(descCmp, srcDescriptor.GetRef(), mergeData); auto res = MergeSourceClusters(*source, columnInfos.fCommonColumns, columnInfos.fExtraDstColumns, mergeData); diff --git a/tree/ntuple/src/RNTupleSerialize.cxx b/tree/ntuple/src/RNTupleSerialize.cxx index 8e5f38bd7ca91..f0c197ffa18bc 100644 --- a/tree/ntuple/src/RNTupleSerialize.cxx +++ b/tree/ntuple/src/RNTupleSerialize.cxx @@ -291,6 +291,7 @@ ROOT::RResult SerializeColumnsOfFields(const ROOT::RNTupleDescrip const auto *xHeader = !forHeaderExtension ? desc.GetHeaderExtension() : nullptr; + std::vector columnsToSerialize; for (auto parentId : fieldList) { // If we're serializing the non-extended header and we already have a header extension (which may happen if // we load an RNTuple for incremental merging), we need to skip all the extended fields, as they need to be @@ -299,14 +300,21 @@ ROOT::RResult SerializeColumnsOfFields(const ROOT::RNTupleDescrip continue; for (const auto &c : desc.GetColumnIterable(parentId)) { - if (c.IsAliasColumn() || (xHeader && xHeader->ContainsExtendedColumnRepresentation(c.GetLogicalId()))) - continue; + if (!c.IsAliasColumn() && !(xHeader && xHeader->ContainsExtendedColumnRepresentation(c.GetLogicalId()))) + columnsToSerialize.push_back(&c); + } + } - if (auto res = SerializePhysicalColumn(c, context, *where)) { - pos += res.Unwrap(); - } else { - return R__FORWARD_ERROR(res); - } + // Make sure the columns are sorted by physical ID + std::sort(columnsToSerialize.begin(), columnsToSerialize.end(), [&context](const auto *a, const auto *b) { + return context.GetOnDiskColumnId(a->GetPhysicalId()) < context.GetOnDiskColumnId(b->GetPhysicalId()); + }); + + for (const auto *c : columnsToSerialize) { + if (auto res = SerializePhysicalColumn(*c, context, *where)) { + pos += res.Unwrap(); + } else { + return R__FORWARD_ERROR(res); } } diff --git a/tree/ntuple/src/RPageStorage.cxx b/tree/ntuple/src/RPageStorage.cxx index f5be9ec70af18..1293b975934b4 100644 --- a/tree/ntuple/src/RPageStorage.cxx +++ b/tree/ntuple/src/RPageStorage.cxx @@ -1049,6 +1049,82 @@ ROOT::Internal::RPagePersistentSink::InitFromDescriptor(const ROOT::RNTupleDescr return model; } +ROOT::DescriptorId_t +ROOT::Internal::RPagePersistentSink::AddColumnRepresentation(const ROOT::RFieldDescriptor &field, + std::span newRepresentation) +{ + assert(!field.IsProjectedField()); + assert(field.GetColumnCardinality() > 0); + assert(!field.GetLogicalColumnIds().empty()); + assert(newRepresentation.size() == field.GetColumnCardinality()); + + const std::size_t firstPhysicalIndex = fDescriptorBuilder.GetDescriptor().GetNPhysicalColumns(); + const std::uint16_t reprIndex = field.GetLogicalColumnIds().size() / field.GetColumnCardinality(); + + fDescriptorBuilder.ShiftAliasColumns(newRepresentation.size()); + + std::uint16_t columnIndex = 0; // index into the representation + for (auto columnType : newRepresentation) { + // Extending columns with variable bit width is currently unsupported. + const auto [rangeMin, rangeMax] = ROOT::Internal::RColumnElementBase::GetValidBitRange(columnType); + R__ASSERT(rangeMin == rangeMax); + + const ROOT::DescriptorId_t firstReprColumnId = field.GetLogicalColumnIds()[columnIndex]; + const auto &firstReprColumnRange = fOpenColumnRanges.at(firstReprColumnId); + const ROOT::DescriptorId_t columnId = firstPhysicalIndex + columnIndex; + + RColumnDescriptorBuilder columnBuilder; + columnBuilder.LogicalColumnId(columnId) + .PhysicalColumnId(columnId) + .FieldId(field.GetId()) + .BitsOnStorage(rangeMax) + .Type(columnType) + .Index(columnIndex) + // NOTE: marking this column as suppressed with the minus sign + .FirstElementIndex(-firstReprColumnRange.GetFirstElementIndex()) + .RepresentationIndex(reprIndex); + fDescriptorBuilder.AddColumn(columnBuilder.MakeDescriptor().Unwrap()); + + ROOT::RClusterDescriptor::RColumnRange columnRange; + columnRange.SetPhysicalColumnId(columnId); + columnRange.SetFirstElementIndex(firstReprColumnRange.GetFirstElementIndex()); + columnRange.SetNElements(0); + columnRange.SetCompressionSettings(GetWriteOptions().GetCompression()); + fOpenColumnRanges.emplace_back(columnRange); + + ROOT::RClusterDescriptor::RPageRange pageRange; + pageRange.SetPhysicalColumnId(columnId); + fOpenPageRanges.emplace_back(std::move(pageRange)); + + fSerializationContext.MapPhysicalColumnId(columnId); + + ++columnIndex; + } + + return firstPhysicalIndex; +} + +void ROOT::Internal::RPagePersistentSink::AddAliasColumn(const ROOT::RNTupleDescriptor &desc, + const ROOT::RFieldDescriptor &field, + ROOT::DescriptorId_t physicalId) +{ + const auto &pointedColumn = desc.GetColumnDescriptor(physicalId); + assert(!pointedColumn.IsAliasColumn()); + + const auto columnId = fDescriptorBuilder.GetDescriptor().GetNLogicalColumns(); + RColumnDescriptorBuilder columnBuilder; + columnBuilder.LogicalColumnId(columnId) + .PhysicalColumnId(physicalId) + .FieldId(field.GetId()) + .Type(pointedColumn.GetType()) + .Index(pointedColumn.GetIndex()) + .BitsOnStorage(pointedColumn.GetBitsOnStorage()) + .ValueRange(pointedColumn.GetValueRange()) + .FirstElementIndex(pointedColumn.GetFirstElementIndex()) + .RepresentationIndex(pointedColumn.GetRepresentationIndex()); + fDescriptorBuilder.AddColumn(columnBuilder.MakeDescriptor().Unwrap()); +} + void ROOT::Internal::RPagePersistentSink::CommitSuppressedColumn(ColumnHandle_t columnHandle) { fOpenColumnRanges.at(columnHandle.fPhysicalId).SetIsSuppressed(true); diff --git a/tree/ntuple/test/ntuple_merger.cxx b/tree/ntuple/test/ntuple_merger.cxx index de27ead34d3f9..6bf768a956722 100644 --- a/tree/ntuple/test/ntuple_merger.cxx +++ b/tree/ntuple/test/ntuple_merger.cxx @@ -1011,40 +1011,44 @@ TEST(RNTupleMerger, MergeLateModelExtension) { // Write two test ntuples to be merged, with different models. // Use EMergingMode::kUnion so the output ntuple has all the fields of its inputs. - FileRaii fileGuard1("test_ntuple_merge_in_1.root"); + FileRaii fileGuard1("test_ntuple_merge_lmext_in_1.root"); { auto model = RNTupleModel::Create(); auto fieldFoo = model->MakeField>("foo"); - auto fieldVfoo = model->MakeField>("vfoo"); + auto fieldVfoo = model->MakeField[3]>("vfoo"); auto fieldBar = model->MakeField("bar"); auto ntuple = RNTupleWriter::Recreate(std::move(model), "ntuple", fileGuard1.GetPath(), RNTupleWriteOptions()); for (size_t i = 0; i < 10; ++i) { fieldFoo->insert(std::make_pair(std::to_string(i), i * 123)); - *fieldVfoo = {(int)i * 123}; + fieldVfoo[0] = {(int)i * 123}; + fieldVfoo[2] = {(int)i * 345}; *fieldBar = i * 321; ntuple->Fill(); } } - FileRaii fileGuard2("test_ntuple_merge_in_2.root"); + FileRaii fileGuard2("test_ntuple_merge_lmext_in_2.root"); { auto model = RNTupleModel::Create(); auto fieldBaz = model->MakeField("baz"); auto fieldFoo = model->MakeField>("foo"); - auto fieldVfoo = model->MakeField>("vfoo"); + auto fieldQux = model->MakeField("qux"); + auto fieldVfoo = model->MakeField[3]>("vfoo"); auto wopts = RNTupleWriteOptions(); wopts.SetCompression(0); auto ntuple = RNTupleWriter::Recreate(std::move(model), "ntuple", fileGuard2.GetPath(), wopts); for (size_t i = 0; i < 10; ++i) { *fieldBaz = i * 567; fieldFoo->insert(std::make_pair(std::to_string(i), i * 765)); - *fieldVfoo = {(int)i * 765}; + fieldVfoo[0] = {(int)i * 765}; + fieldVfoo[2] = {(int)i * 987}; + *fieldQux = i * 777; ntuple->Fill(); } } // Now merge the inputs - FileRaii fileGuard3("test_ntuple_merge_out.root"); + FileRaii fileGuard3("test_ntuple_merge_lmext_out.root"); { // Gather the input sources std::vector> sources; @@ -1072,23 +1076,28 @@ TEST(RNTupleMerger, MergeLateModelExtension) auto ntuple = RNTupleReader::Open("ntuple", fileGuard3.GetPath()); EXPECT_EQ(ntuple->GetNEntries(), 20); auto foo = ntuple->GetModel().GetDefaultEntry().GetPtr>("foo"); - auto vfoo = ntuple->GetModel().GetDefaultEntry().GetPtr>("vfoo"); + auto vfoo = ntuple->GetModel().GetDefaultEntry().GetPtr[3]>("vfoo"); auto bar = ntuple->GetModel().GetDefaultEntry().GetPtr("bar"); auto baz = ntuple->GetModel().GetDefaultEntry().GetPtr("baz"); + auto qux = ntuple->GetModel().GetDefaultEntry().GetPtr("qux"); for (int i = 0; i < 10; ++i) { ntuple->LoadEntry(i); ASSERT_EQ((*foo)[std::to_string(i)], i * 123); - ASSERT_EQ((*vfoo)[0], i * 123); + ASSERT_EQ(vfoo[0][0], i * 123); + ASSERT_EQ(vfoo[2][0], i * 345); ASSERT_EQ(*bar, i * 321); ASSERT_EQ(*baz, 0); + ASSERT_EQ(*qux, 0); } for (int i = 10; i < 20; ++i) { ntuple->LoadEntry(i); ASSERT_EQ((*foo)[std::to_string(i - 10)], (i - 10) * 765); - ASSERT_EQ((*vfoo)[0], (i - 10) * 765); + ASSERT_EQ(vfoo[0][0], (i - 10) * 765); + ASSERT_EQ(vfoo[2][0], (i - 10) * 987); ASSERT_EQ(*bar, 0); ASSERT_EQ(*baz, (i - 10) * 567); + ASSERT_EQ(*qux, (i - 10) * 777); } } } @@ -1176,8 +1185,10 @@ TEST(RNTupleMerger, DifferentCompatibleRepresentations) auto model = RNTupleModel::Create(); auto pFoo = model->MakeField("foo"); auto clonedModel = model->Clone(); + auto wopts = RNTupleWriteOptions(); + wopts.SetCompression(0); { - auto ntuple = RNTupleWriter::Recreate(std::move(model), "ntuple", fileGuard1.GetPath()); + auto ntuple = RNTupleWriter::Recreate(std::move(model), "ntuple", fileGuard1.GetPath(), wopts); for (size_t i = 0; i < 10; ++i) { *pFoo = i * 123; ntuple->Fill(); @@ -1189,12 +1200,12 @@ TEST(RNTupleMerger, DifferentCompatibleRepresentations) { auto &fieldFooDbl = clonedModel->GetMutableField("foo"); fieldFooDbl.SetColumnRepresentatives({{ROOT::ENTupleColumnType::kReal32}}); - auto ntuple = RNTupleWriter::Recreate(std::move(clonedModel), "ntuple", fileGuard2.GetPath()); + auto ntuple = RNTupleWriter::Recreate(std::move(clonedModel), "ntuple", fileGuard2.GetPath(), wopts); auto e = ntuple->CreateEntry(); auto pFoo2 = e->GetPtr("foo"); for (size_t i = 0; i < 10; ++i) { *pFoo2 = i * 567; - ntuple->Fill(); + ntuple->Fill(*e); } } @@ -1214,30 +1225,18 @@ TEST(RNTupleMerger, DifferentCompatibleRepresentations) auto sourcePtrs2 = sourcePtrs; { - auto wopts = RNTupleWriteOptions(); - wopts.SetCompression(0); auto destination = std::make_unique("ntuple", fileGuard3.GetPath(), wopts); auto opts = RNTupleMergeOptions(); opts.fCompressionSettings = 0; RNTupleMerger merger{std::move(destination)}; auto res = merger.Merge(sourcePtrs, opts); - // TODO(gparolini): we want to support this in the future - EXPECT_FALSE(bool(res)); - if (res.GetError()) { - EXPECT_THAT(res.GetError()->GetReport(), testing::HasSubstr("different column type")); - } - // EXPECT_TRUE(bool(res)); + EXPECT_TRUE(bool(res)); } { auto destination = std::make_unique("ntuple", fileGuard4.GetPath(), RNTupleWriteOptions()); RNTupleMerger merger{std::move(destination)}; auto res = merger.Merge(sourcePtrs); - // TODO(gparolini): we want to support this in the future - EXPECT_FALSE(bool(res)); - if (res.GetError()) { - EXPECT_THAT(res.GetError()->GetReport(), testing::HasSubstr("different column type")); - } - // EXPECT_TRUE(bool(res)); + EXPECT_TRUE(bool(res)); } } } @@ -1527,7 +1526,9 @@ TEST(RNTupleMerger, MergeProjectedFieldsOnlyFirst) { auto model = RNTupleModel::Create(); auto fieldFoo = model->MakeField("foo"); - auto ntuple = RNTupleWriter::Recreate(std::move(model), "ntuple", fileGuard2.GetPath()); + auto wopts = RNTupleWriteOptions(); + wopts.SetCompression(0); + auto ntuple = RNTupleWriter::Recreate(std::move(model), "ntuple", fileGuard2.GetPath(), wopts); for (size_t i = 0; i < 10; ++i) { *fieldFoo = i * 123; ntuple->Fill(); @@ -1561,16 +1562,18 @@ TEST(RNTupleMerger, MergeProjectedFieldsOnlyFirst) auto ntuple1 = RNTupleReader::Open("ntuple", fileGuard1.GetPath()); auto ntuple2 = RNTupleReader::Open("ntuple", fileGuard2.GetPath()); auto ntuple3 = RNTupleReader::Open("ntuple", fileGuardOut.GetPath()); - ASSERT_EQ(ntuple1->GetNEntries() + ntuple2->GetNEntries(), ntuple3->GetNEntries()); + EXPECT_EQ(ntuple1->GetNEntries() + ntuple2->GetNEntries(), ntuple3->GetNEntries()); const auto &desc1 = ntuple1->GetDescriptor(); const auto &desc2 = ntuple2->GetDescriptor(); const auto &desc3 = ntuple3->GetDescriptor(); const auto nAliasColumns1 = desc1.GetNLogicalColumns() - desc1.GetNPhysicalColumns(); const auto nAliasColumns2 = desc2.GetNLogicalColumns() - desc2.GetNPhysicalColumns(); const auto nAliasColumns3 = desc3.GetNLogicalColumns() - desc3.GetNPhysicalColumns(); - ASSERT_EQ(nAliasColumns1, 1); - ASSERT_EQ(nAliasColumns2, 0); - ASSERT_EQ(nAliasColumns3, 1); + EXPECT_EQ(nAliasColumns1, 1); + EXPECT_EQ(nAliasColumns2, 0); + // The output RNTuple has 2 alias columns because one was created by the merger to point to the extended + // column that was added to field "foo" (since source 2 had a different encoding than source 1). + EXPECT_EQ(nAliasColumns3, 2); auto foo1 = ntuple1->GetModel().GetDefaultEntry().GetPtr("foo"); auto foo2 = ntuple2->GetModel().GetDefaultEntry().GetPtr("foo"); @@ -1582,16 +1585,16 @@ TEST(RNTupleMerger, MergeProjectedFieldsOnlyFirst) for (auto i = 0u; i < ntuple1->GetNEntries(); ++i) { ntuple1->LoadEntry(i); ntuple3->LoadEntry(i); - ASSERT_EQ(*foo1, *foo3); - ASSERT_EQ(*bar1, *foo3); - ASSERT_EQ(*bar1, *bar3); + EXPECT_EQ(*foo1, *foo3); + EXPECT_EQ(*bar1, *foo3); + EXPECT_EQ(*bar1, *bar3); } for (auto i = 0u; i < ntuple2->GetNEntries(); ++i) { ntuple2->LoadEntry(i); ntuple3->LoadEntry(ntuple1->GetNEntries() + i); - ASSERT_EQ(*foo2, *foo3); + EXPECT_EQ(*foo2, *foo3); // we should be able to read the data from the second ntuple using the projection defined in the first. - ASSERT_EQ(*foo2, *bar3); + EXPECT_EQ(*foo2, *bar3); } } } @@ -2534,7 +2537,8 @@ TEST(RNTupleMerger, MergeDeferredAdvanced) auto model1 = RNTupleModel::Create(); auto wopts = RNTupleWriteOptions(); wopts.SetCompression(0); - auto writer1 = RNTupleWriter::Recreate(std::move(model1), "ntuple", fileGuard1.GetPath(), wopts); + auto tfile = TFile::Open((fileGuard1.GetPath() + "?reproducible").c_str(), "RECREATE"); + auto writer1 = RNTupleWriter::Append(std::move(model1), "ntuple", *tfile, wopts); auto updater = writer1->CreateModelUpdater(); updater->BeginUpdate(); updater->AddField(RFieldBase::Create("flt", "float").Unwrap()); @@ -2607,7 +2611,7 @@ TEST(RNTupleMerger, MergeDeferredAdvanced) auto pInt = reader->GetModel().GetDefaultEntry().GetPtr("int"); auto pFlt = reader->GetModel().GetDefaultEntry().GetPtr("flt"); - for (auto i = 0u; i < reader->GetNEntries(); ++i) { + for (auto i : reader->GetEntryRange()) { reader->LoadEntry(i); float expectedFlt = (i >= 10 && i < 15) ? 0 : i; EXPECT_FLOAT_EQ(*pFlt, expectedFlt);