From 950932cf18a93febc8317b18577e9045defc1358 Mon Sep 17 00:00:00 2001 From: silverweed Date: Tue, 31 Mar 2026 08:39:02 +0200 Subject: [PATCH 1/8] [ntuple] Small improvement in RNTupleMerger Instead of calling continue multiple times in the AddColumnFromField loop, just early return in case of projected fields. --- tree/ntuple/src/RNTupleMerger.cxx | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/tree/ntuple/src/RNTupleMerger.cxx b/tree/ntuple/src/RNTupleMerger.cxx index 22eda7acfd09d..d01ab52680ea9 100644 --- a/tree/ntuple/src/RNTupleMerger.cxx +++ b/tree/ntuple/src/RNTupleMerger.cxx @@ -1006,7 +1006,7 @@ RNTupleMerger::MergeSourceClusters(RPageSource &source, std::span &columns, const RO { std::string name = prefix + '.' + srcFieldDesc.GetFieldName(); + // We don't want to try and merge alias columns + if (srcFieldDesc.IsProjectedField()) + return; + const auto &columnIds = srcFieldDesc.GetLogicalColumnIds(); columns.reserve(columns.size() + columnIds.size()); // NOTE: here we can match the src and dst columns by column index because we forbid merging fields with // different column representations. for (auto i = 0u; i < srcFieldDesc.GetLogicalColumnIds().size(); ++i) { - // We don't want to try and merge alias columns - if (srcFieldDesc.IsProjectedField()) - continue; - auto srcColumnId = srcFieldDesc.GetLogicalColumnIds()[i]; const auto &srcColumn = srcDesc.GetColumnDescriptor(srcColumnId); From bd8ce57d97b25b8ebe1ac43ad084cbf586157921 Mon Sep 17 00:00:00 2001 From: silverweed Date: Wed, 15 Apr 2026 16:08:30 +0200 Subject: [PATCH 2/8] [ntuple] Serializer: sort columns by ID before serializing them We are currently serializing columns per-field, but in case of late column extension this might result in inconsistent sorting of the columns in the serialized footer. e.g. assume you have fields "A" and "B", both late model extended, both with a single column: - col 0 -> field A, repr 0 - col 1 -> field B, repr 0 Now you add a new column representation to field "A"; this new column has id 2: - col 2 -> field A, repr 1 When serializing this RNTuple, all columns are written in the footer by RNTupleSerialize::SerializeColumnsForFields(). Before this change, they would end up on disk in order: [0, 2, 1]. This would corrupt the data by swapping the pages for columns 2 and 1. After this change, they get written as [0, 1, 2] which is the correct order. Note that this exact case is tested in ntuple_merger in the unit test MergeDeferredAdvanced. --- tree/ntuple/src/RNTupleSerialize.cxx | 22 +++++++++++++++------- 1 file changed, 15 insertions(+), 7 deletions(-) diff --git a/tree/ntuple/src/RNTupleSerialize.cxx b/tree/ntuple/src/RNTupleSerialize.cxx index 8e5f38bd7ca91..f0c197ffa18bc 100644 --- a/tree/ntuple/src/RNTupleSerialize.cxx +++ b/tree/ntuple/src/RNTupleSerialize.cxx @@ -291,6 +291,7 @@ ROOT::RResult SerializeColumnsOfFields(const ROOT::RNTupleDescrip const auto *xHeader = !forHeaderExtension ? desc.GetHeaderExtension() : nullptr; + std::vector columnsToSerialize; for (auto parentId : fieldList) { // If we're serializing the non-extended header and we already have a header extension (which may happen if // we load an RNTuple for incremental merging), we need to skip all the extended fields, as they need to be @@ -299,14 +300,21 @@ ROOT::RResult SerializeColumnsOfFields(const ROOT::RNTupleDescrip continue; for (const auto &c : desc.GetColumnIterable(parentId)) { - if (c.IsAliasColumn() || (xHeader && xHeader->ContainsExtendedColumnRepresentation(c.GetLogicalId()))) - continue; + if (!c.IsAliasColumn() && !(xHeader && xHeader->ContainsExtendedColumnRepresentation(c.GetLogicalId()))) + columnsToSerialize.push_back(&c); + } + } - if (auto res = SerializePhysicalColumn(c, context, *where)) { - pos += res.Unwrap(); - } else { - return R__FORWARD_ERROR(res); - } + // Make sure the columns are sorted by physical ID + std::sort(columnsToSerialize.begin(), columnsToSerialize.end(), [&context](const auto *a, const auto *b) { + return context.GetOnDiskColumnId(a->GetPhysicalId()) < context.GetOnDiskColumnId(b->GetPhysicalId()); + }); + + for (const auto *c : columnsToSerialize) { + if (auto res = SerializePhysicalColumn(*c, context, *where)) { + pos += res.Unwrap(); + } else { + return R__FORWARD_ERROR(res); } } From 46a7e6f5cf2308112673bd026228d69deadf7962 Mon Sep 17 00:00:00 2001 From: silverweed Date: Fri, 17 Apr 2026 11:08:30 +0200 Subject: [PATCH 3/8] [ntuple] update merger test to make sure we test nRepetitions --- tree/ntuple/test/ntuple_merger.cxx | 18 +++++++++++------- 1 file changed, 11 insertions(+), 7 deletions(-) diff --git a/tree/ntuple/test/ntuple_merger.cxx b/tree/ntuple/test/ntuple_merger.cxx index de27ead34d3f9..29942409a1d3f 100644 --- a/tree/ntuple/test/ntuple_merger.cxx +++ b/tree/ntuple/test/ntuple_merger.cxx @@ -1015,12 +1015,13 @@ TEST(RNTupleMerger, MergeLateModelExtension) { auto model = RNTupleModel::Create(); auto fieldFoo = model->MakeField>("foo"); - auto fieldVfoo = model->MakeField>("vfoo"); + auto fieldVfoo = model->MakeField[3]>("vfoo"); auto fieldBar = model->MakeField("bar"); auto ntuple = RNTupleWriter::Recreate(std::move(model), "ntuple", fileGuard1.GetPath(), RNTupleWriteOptions()); for (size_t i = 0; i < 10; ++i) { fieldFoo->insert(std::make_pair(std::to_string(i), i * 123)); - *fieldVfoo = {(int)i * 123}; + fieldVfoo[0] = {(int)i * 123}; + fieldVfoo[2] = {(int)i * 345}; *fieldBar = i * 321; ntuple->Fill(); } @@ -1031,14 +1032,15 @@ TEST(RNTupleMerger, MergeLateModelExtension) auto model = RNTupleModel::Create(); auto fieldBaz = model->MakeField("baz"); auto fieldFoo = model->MakeField>("foo"); - auto fieldVfoo = model->MakeField>("vfoo"); + auto fieldVfoo = model->MakeField[3]>("vfoo"); auto wopts = RNTupleWriteOptions(); wopts.SetCompression(0); auto ntuple = RNTupleWriter::Recreate(std::move(model), "ntuple", fileGuard2.GetPath(), wopts); for (size_t i = 0; i < 10; ++i) { *fieldBaz = i * 567; fieldFoo->insert(std::make_pair(std::to_string(i), i * 765)); - *fieldVfoo = {(int)i * 765}; + fieldVfoo[0] = {(int)i * 765}; + fieldVfoo[2] = {(int)i * 987}; ntuple->Fill(); } } @@ -1072,21 +1074,23 @@ TEST(RNTupleMerger, MergeLateModelExtension) auto ntuple = RNTupleReader::Open("ntuple", fileGuard3.GetPath()); EXPECT_EQ(ntuple->GetNEntries(), 20); auto foo = ntuple->GetModel().GetDefaultEntry().GetPtr>("foo"); - auto vfoo = ntuple->GetModel().GetDefaultEntry().GetPtr>("vfoo"); + auto vfoo = ntuple->GetModel().GetDefaultEntry().GetPtr[3]>("vfoo"); auto bar = ntuple->GetModel().GetDefaultEntry().GetPtr("bar"); auto baz = ntuple->GetModel().GetDefaultEntry().GetPtr("baz"); for (int i = 0; i < 10; ++i) { ntuple->LoadEntry(i); ASSERT_EQ((*foo)[std::to_string(i)], i * 123); - ASSERT_EQ((*vfoo)[0], i * 123); + ASSERT_EQ(vfoo[0][0], i * 123); + ASSERT_EQ(vfoo[2][0], i * 345); ASSERT_EQ(*bar, i * 321); ASSERT_EQ(*baz, 0); } for (int i = 10; i < 20; ++i) { ntuple->LoadEntry(i); ASSERT_EQ((*foo)[std::to_string(i - 10)], (i - 10) * 765); - ASSERT_EQ((*vfoo)[0], (i - 10) * 765); + ASSERT_EQ(vfoo[0][0], (i - 10) * 765); + ASSERT_EQ(vfoo[2][0], (i - 10) * 987); ASSERT_EQ(*bar, 0); ASSERT_EQ(*baz, (i - 10) * 567); } From bba9d4f8079c40b7280357d2c89686a947e779fc Mon Sep 17 00:00:00 2001 From: silverweed Date: Wed, 22 Apr 2026 14:47:11 +0200 Subject: [PATCH 4/8] [ntuple] Clarify a bit RFieldBase::EntryToColumnElementIndex Also fix the type of result --- tree/ntuple/inc/ROOT/RFieldBase.hxx | 7 ++++--- tree/ntuple/src/RFieldBase.cxx | 4 ++-- 2 files changed, 6 insertions(+), 5 deletions(-) diff --git a/tree/ntuple/inc/ROOT/RFieldBase.hxx b/tree/ntuple/inc/ROOT/RFieldBase.hxx index f82e83ea7061a..85e12aa012140 100644 --- a/tree/ntuple/inc/ROOT/RFieldBase.hxx +++ b/tree/ntuple/inc/ROOT/RFieldBase.hxx @@ -247,14 +247,15 @@ private: func(target); } - /// Translate an entry index to a column element index of the principal column and vice versa. These functions - /// take into account the role and number of repetitions on each level of the field hierarchy as follows: + /// Translate an entry index to a column element index of the principal column. This function + /// takes into account the role and number of repetitions on each level of the field hierarchy as follows: /// - Top level fields: element index == entry index /// - Record fields propagate their principal column index to the principal columns of direct descendant fields /// - Collection and variant fields set the principal column index of their children to 0 /// /// The column element index also depends on the number of repetitions of each field in the hierarchy, e.g., given a - /// field with type `std::array, 2>`, this function returns 8 for the innermost field. + /// field with type `std::array, 2>`, this function called with `globalIndex == 1` + /// returns 8 for the innermost field. ROOT::NTupleSize_t EntryToColumnElementIndex(ROOT::NTupleSize_t globalIndex) const; /// Flushes data from active columns diff --git a/tree/ntuple/src/RFieldBase.cxx b/tree/ntuple/src/RFieldBase.cxx index 38b70e5ad60b5..79479df716a96 100644 --- a/tree/ntuple/src/RFieldBase.cxx +++ b/tree/ntuple/src/RFieldBase.cxx @@ -668,14 +668,14 @@ void ROOT::RFieldBase::Attach(std::unique_ptr child, std::stri ROOT::NTupleSize_t ROOT::RFieldBase::EntryToColumnElementIndex(ROOT::NTupleSize_t globalIndex) const { - std::size_t result = globalIndex; + ROOT::NTupleSize_t result = globalIndex; for (auto f = this; f != nullptr; f = f->GetParent()) { auto parent = f->GetParent(); if (parent && (parent->GetStructure() == ROOT::ENTupleStructure::kCollection || parent->GetStructure() == ROOT::ENTupleStructure::kVariant)) { return 0U; } - result *= std::max(f->GetNRepetitions(), std::size_t{1U}); + result *= std::max(f->GetNRepetitions(), ROOT::NTupleSize_t{1U}); } return result; } From a1b4a347ab197ada7a546379cf42600bec9f9d1e Mon Sep 17 00:00:00 2001 From: silverweed Date: Wed, 22 Apr 2026 14:48:46 +0200 Subject: [PATCH 5/8] [ntuple] Add RClusterDescriptor::TryGetColumnRange --- tree/ntuple/inc/ROOT/RNTupleDescriptor.hxx | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/tree/ntuple/inc/ROOT/RNTupleDescriptor.hxx b/tree/ntuple/inc/ROOT/RNTupleDescriptor.hxx index a5a736010c223..fa34b90f52c27 100644 --- a/tree/ntuple/inc/ROOT/RNTupleDescriptor.hxx +++ b/tree/ntuple/inc/ROOT/RNTupleDescriptor.hxx @@ -519,6 +519,12 @@ public: ROOT::NTupleSize_t GetFirstEntryIndex() const { return fFirstEntryIndex; } ROOT::NTupleSize_t GetNEntries() const { return fNEntries; } const RColumnRange &GetColumnRange(ROOT::DescriptorId_t physicalId) const { return fColumnRanges.at(physicalId); } + const RColumnRange *TryGetColumnRange(ROOT::DescriptorId_t physicalId) const + { + if (auto it = fColumnRanges.find(physicalId); it != fColumnRanges.end()) + return &it->second; + return nullptr; + } const RPageRange &GetPageRange(ROOT::DescriptorId_t physicalId) const { return fPageRanges.at(physicalId); } /// Returns an iterator over pairs { columnId, columnRange }. The iteration order is unspecified. RColumnRangeIterable GetColumnRangeIterable() const; From 49bd9bbb7edeaddb569bd0aea5481e09e5ad5a5c Mon Sep 17 00:00:00 2001 From: silverweed Date: Wed, 22 Apr 2026 14:51:04 +0200 Subject: [PATCH 6/8] [ntuple] Add RPagePersistentSink::AddColumnRepresentation Internal functionality to be used by the Merger --- tree/ntuple/inc/ROOT/RPageStorage.hxx | 3 ++ tree/ntuple/src/RPageStorage.cxx | 51 +++++++++++++++++++++++++++ 2 files changed, 54 insertions(+) diff --git a/tree/ntuple/inc/ROOT/RPageStorage.hxx b/tree/ntuple/inc/ROOT/RPageStorage.hxx index a7c0441e3597e..fa6fc80102e77 100644 --- a/tree/ntuple/inc/ROOT/RPageStorage.hxx +++ b/tree/ntuple/inc/ROOT/RPageStorage.hxx @@ -544,6 +544,9 @@ public: [[nodiscard]] std::unique_ptr InitFromDescriptor(const ROOT::RNTupleDescriptor &descriptor, bool copyClusters); + void + AddColumnRepresentation(const ROOT::RFieldDescriptor &field, std::span newRepresentation); + void CommitSuppressedColumn(ColumnHandle_t columnHandle) final; void CommitPage(ColumnHandle_t columnHandle, const ROOT::Internal::RPage &page) final; void CommitSealedPage(ROOT::DescriptorId_t physicalColumnId, const RPageStorage::RSealedPage &sealedPage) final; diff --git a/tree/ntuple/src/RPageStorage.cxx b/tree/ntuple/src/RPageStorage.cxx index f5be9ec70af18..8e2d2e1d72016 100644 --- a/tree/ntuple/src/RPageStorage.cxx +++ b/tree/ntuple/src/RPageStorage.cxx @@ -1049,6 +1049,57 @@ ROOT::Internal::RPagePersistentSink::InitFromDescriptor(const ROOT::RNTupleDescr return model; } +void ROOT::Internal::RPagePersistentSink::AddColumnRepresentation(const ROOT::RFieldDescriptor &field, + std::span newRepresentation) +{ + R__ASSERT(field.GetColumnCardinality() > 0); + R__ASSERT(!field.GetLogicalColumnIds().empty()); + R__ASSERT(newRepresentation.size() == field.GetColumnCardinality()); + + const auto firstPhysicalIndex = fDescriptorBuilder.GetDescriptor().GetNPhysicalColumns(); + const auto reprIndex = field.GetLogicalColumnIds().size() / field.GetColumnCardinality(); + + fDescriptorBuilder.ShiftAliasColumns(newRepresentation.size()); + + std::uint16_t columnIndex = 0; // index into the representation + for (auto columnType : newRepresentation) { + // Extending columns with variable bit width is currently unsupported. + const auto [rangeMin, rangeMax] = ROOT::Internal::RColumnElementBase::GetValidBitRange(columnType); + R__ASSERT(rangeMin == rangeMax); + + const auto firstReprColumnId = field.GetLogicalColumnIds()[columnIndex]; + const auto &firstReprColumnRange = fOpenColumnRanges.at(firstReprColumnId); + const auto columnId = firstPhysicalIndex + columnIndex; + + RColumnDescriptorBuilder columnBuilder; + columnBuilder.LogicalColumnId(columnId) + .PhysicalColumnId(columnId) + .FieldId(field.GetId()) + .BitsOnStorage(rangeMax) + .Type(columnType) + .Index(columnIndex) + // NOTE: marking this column as suppressed with the minus sign + .FirstElementIndex(-firstReprColumnRange.GetFirstElementIndex()) + .RepresentationIndex(reprIndex); + fDescriptorBuilder.AddColumn(columnBuilder.MakeDescriptor().Unwrap()); + + ROOT::RClusterDescriptor::RColumnRange columnRange; + columnRange.SetPhysicalColumnId(columnId); + columnRange.SetFirstElementIndex(firstReprColumnRange.GetFirstElementIndex()); + columnRange.SetNElements(0); + columnRange.SetCompressionSettings(GetWriteOptions().GetCompression()); + fOpenColumnRanges.emplace_back(columnRange); + + ROOT::RClusterDescriptor::RPageRange pageRange; + pageRange.SetPhysicalColumnId(columnId); + fOpenPageRanges.emplace_back(std::move(pageRange)); + + fSerializationContext.MapPhysicalColumnId(columnId); + + ++columnIndex; + } +} + void ROOT::Internal::RPagePersistentSink::CommitSuppressedColumn(ColumnHandle_t columnHandle) { fOpenColumnRanges.at(columnHandle.fPhysicalId).SetIsSuppressed(true); From bcb1b321b76dde2f297f8a29b742c9da6e0f86cb Mon Sep 17 00:00:00 2001 From: silverweed Date: Wed, 22 Apr 2026 14:56:52 +0200 Subject: [PATCH 7/8] [ntuple] Fix AddExtendedColumnRanges to account for ExtendColumns --- tree/ntuple/src/RNTupleDescriptor.cxx | 12 ++++++++++-- 1 file changed, 10 insertions(+), 2 deletions(-) diff --git a/tree/ntuple/src/RNTupleDescriptor.cxx b/tree/ntuple/src/RNTupleDescriptor.cxx index f4a8b72b62c8b..57463ff6324f7 100644 --- a/tree/ntuple/src/RNTupleDescriptor.cxx +++ b/tree/ntuple/src/RNTupleDescriptor.cxx @@ -961,8 +961,16 @@ ROOT::Internal::RClusterDescriptorBuilder::AddExtendedColumnRanges(const RNTuple // `ROOT::RFieldBase::EntryToColumnElementIndex()`, i.e. it is a principal column reachable from the // field zero excluding subfields of collection and variant fields. if (c.IsDeferredColumn()) { - columnRange.SetFirstElementIndex(fCluster.GetFirstEntryIndex() * nRepetitions); - columnRange.SetNElements(fCluster.GetNEntries() * nRepetitions); + if (c.GetRepresentationIndex() == 0) { + columnRange.SetFirstElementIndex(fCluster.GetFirstEntryIndex() * nRepetitions); + columnRange.SetNElements(fCluster.GetNEntries() * nRepetitions); + } else { + const auto &field = desc.GetFieldDescriptor(fieldId); + const auto firstReprColumnId = field.GetLogicalColumnIds()[c.GetIndex()]; + const auto &firstReprColumnRange = fCluster.fColumnRanges[firstReprColumnId]; + columnRange.SetFirstElementIndex(firstReprColumnRange.GetFirstElementIndex()); + columnRange.SetNElements(firstReprColumnRange.GetNElements()); + } if (!columnRange.IsSuppressed()) { auto &pageRange = fCluster.fPageRanges[physicalId]; pageRange.fPhysicalColumnId = physicalId; From 3e9cdf7e5253c322c2b7ae687b56352da5cc94be Mon Sep 17 00:00:00 2001 From: silverweed Date: Wed, 22 Apr 2026 14:57:47 +0200 Subject: [PATCH 8/8] WIP: merger column ext --- tree/ntuple/inc/ROOT/RNTupleMerger.hxx | 14 +- tree/ntuple/inc/ROOT/RPageStorage.hxx | 8 +- tree/ntuple/src/RNTupleMerger.cxx | 616 +++++++++++++++---------- tree/ntuple/src/RPageStorage.cxx | 43 +- tree/ntuple/test/ntuple_merger.cxx | 64 +-- 5 files changed, 461 insertions(+), 284 deletions(-) diff --git a/tree/ntuple/inc/ROOT/RNTupleMerger.hxx b/tree/ntuple/inc/ROOT/RNTupleMerger.hxx index 10885c8406756..2ace1f9b21417 100644 --- a/tree/ntuple/inc/ROOT/RNTupleMerger.hxx +++ b/tree/ntuple/inc/ROOT/RNTupleMerger.hxx @@ -118,16 +118,16 @@ class RNTupleMerger final { std::unique_ptr fModel; [[nodiscard]] - ROOT::RResult MergeCommonColumns(ROOT::Internal::RClusterPool &clusterPool, - const ROOT::RClusterDescriptor &clusterDesc, - std::span commonColumns, - const ROOT::Internal::RCluster::ColumnSet_t &commonColumnSet, - std::size_t nCommonColumnsInCluster, RSealedPageMergeData &sealedPageData, - const RNTupleMergeData &mergeData, ROOT::Internal::RPageAllocator &pageAlloc); + ROOT::RResult + MergeCommonColumns(ROOT::Internal::RClusterPool &clusterPool, const ROOT::RClusterDescriptor &clusterDesc, + std::span commonColumns, + const ROOT::Internal::RCluster::ColumnSet_t &commonColumnSet, std::size_t nCommonColumnsInCluster, + RSealedPageMergeData &sealedPageData, const RNTupleMergeData &mergeData, + ROOT::Internal::RPageAllocator &pageAlloc); [[nodiscard]] ROOT::RResult - MergeSourceClusters(ROOT::Internal::RPageSource &source, std::span commonColumns, + MergeSourceClusters(ROOT::Internal::RPageSource &source, std::span commonColumns, std::span extraDstColumns, RNTupleMergeData &mergeData); /// Creates a RNTupleMerger with the given destination. diff --git a/tree/ntuple/inc/ROOT/RPageStorage.hxx b/tree/ntuple/inc/ROOT/RPageStorage.hxx index fa6fc80102e77..7df9f08a2dae6 100644 --- a/tree/ntuple/inc/ROOT/RPageStorage.hxx +++ b/tree/ntuple/inc/ROOT/RPageStorage.hxx @@ -544,9 +544,15 @@ public: [[nodiscard]] std::unique_ptr InitFromDescriptor(const ROOT::RNTupleDescriptor &descriptor, bool copyClusters); - void + /// Adds a new column representation to the given field. + /// \return The physical id of the first newly added column. + ROOT::DescriptorId_t AddColumnRepresentation(const ROOT::RFieldDescriptor &field, std::span newRepresentation); + /// Adds a new alias column pointing to an existing column with the given physical id to the given field. + void AddAliasColumn(const ROOT::RNTupleDescriptor &desc, const ROOT::RFieldDescriptor &field, + ROOT::DescriptorId_t physicalId); + void CommitSuppressedColumn(ColumnHandle_t columnHandle) final; void CommitPage(ColumnHandle_t columnHandle, const ROOT::Internal::RPage &page) final; void CommitSealedPage(ROOT::DescriptorId_t physicalColumnId, const RPageStorage::RSealedPage &sealedPage) final; diff --git a/tree/ntuple/src/RNTupleMerger.cxx b/tree/ntuple/src/RNTupleMerger.cxx index d01ab52680ea9..1a92067e775c1 100644 --- a/tree/ntuple/src/RNTupleMerger.cxx +++ b/tree/ntuple/src/RNTupleMerger.cxx @@ -270,12 +270,10 @@ try { } namespace { -// Functor used to change the compression of a page to `options.fCompressionSettings`. +// Functor used to change the compression of a page to `fCompressionSettings`. struct RChangeCompressionFunc { const RColumnElementBase &fSrcColElement; - const RColumnElementBase &fDstColElement; - const RNTupleMergeOptions &fMergeOptions; - + std::uint32_t fCompressionSettings; RPageStorage::RSealedPage &fSealedPage; ROOT::Internal::RPageAllocator &fPageAlloc; std::uint8_t *fBuffer; @@ -284,52 +282,25 @@ struct RChangeCompressionFunc { void operator()() const { - assert(fSrcColElement.GetIdentifier() == fDstColElement.GetIdentifier()); - fSealedPage.VerifyChecksumIfEnabled().ThrowOnError(); const auto bytesPacked = fSrcColElement.GetPackedSize(fSealedPage.GetNElements()); // TODO: this buffer could be kept and reused across pages + // TODO: if Zip is a no-op (compression == 0) we can unzip directly in fBuffer! auto unzipBuf = MakeUninitArray(bytesPacked); ROOT::Internal::RNTupleDecompressor::Unzip(fSealedPage.GetBuffer(), fSealedPage.GetDataSize(), bytesPacked, unzipBuf.get()); const auto checksumSize = fWriteOpts.GetEnablePageChecksums() * sizeof(std::uint64_t); assert(fBufSize >= bytesPacked + checksumSize); - auto nBytesZipped = ROOT::Internal::RNTupleCompressor::Zip(unzipBuf.get(), bytesPacked, - fMergeOptions.fCompressionSettings.value(), fBuffer); + auto nBytesZipped = + ROOT::Internal::RNTupleCompressor::Zip(unzipBuf.get(), bytesPacked, fCompressionSettings, fBuffer); fSealedPage = {fBuffer, nBytesZipped + checksumSize, fSealedPage.GetNElements(), fSealedPage.GetHasChecksum()}; fSealedPage.ChecksumIfEnabled(); } }; -struct RResealFunc { - const RColumnElementBase &fSrcColElement; - const RColumnElementBase &fDstColElement; - const RNTupleMergeOptions &fMergeOptions; - - RPageStorage::RSealedPage &fSealedPage; - ROOT::Internal::RPageAllocator &fPageAlloc; - std::uint8_t *fBuffer; - std::size_t fBufSize; - const ROOT::RNTupleWriteOptions &fWriteOpts; - - void operator()() const - { - auto page = RPageSource::UnsealPage(fSealedPage, fSrcColElement, fPageAlloc).Unwrap(); - RPageSink::RSealPageConfig sealConf; - sealConf.fElement = &fDstColElement; - sealConf.fPage = &page; - sealConf.fBuffer = fBuffer; - sealConf.fCompressionSettings = *fMergeOptions.fCompressionSettings; - sealConf.fWriteChecksum = fWriteOpts.GetEnablePageChecksums(); - assert(fBufSize >= fSealedPage.GetDataSize() + fSealedPage.GetHasChecksum() * sizeof(std::uint64_t)); - auto refSealedPage = RPageSink::SealPage(sealConf); - fSealedPage = refSealedPage; - } -}; - struct RTaskVisitor { std::optional &fGroup; @@ -350,23 +321,47 @@ struct RCommonField { RCommonField(const ROOT::RFieldDescriptor &src, const ROOT::RFieldDescriptor &dst) : fSrc(&src), fDst(&dst) {} }; +struct RColReprMapping { + std::uint32_t fSource; + std::uint32_t fDest; +}; + +struct RColReprExtension : RColReprMapping { + ROOT::RFieldBase::ColumnRepresentation_t fSourceRepr; +}; + +template +using FieldCollectionMap_t = std::unordered_map>; + +static std::optional +FindColumnReprMapping(const std::vector &mappings, std::uint32_t sourceReprIndex) +{ + for (const auto [src, dst] : mappings) + if (src == sourceReprIndex) + return dst; + return std::nullopt; +} + struct RDescriptorsComparison { std::vector fExtraDstFields; std::vector fExtraSrcFields; std::vector fCommonFields; + // For each field that has more than 1 column representation in the output model, + // maps the column representatives of the source field with those of the destination. + // The key is the destination field. + FieldCollectionMap_t fColReprMappings; + FieldCollectionMap_t fColReprExtensions; }; struct RColumnOutInfo { - ROOT::DescriptorId_t fColumnId; - ENTupleColumnType fColumnType; + ROOT::DescriptorId_t fColumnId = ROOT::kInvalidDescriptorId; }; -// { fully.qualified.fieldName.colInputId => colOutputInfo } +// { ".fully.qualified.fieldName.colInputIndex.colOutputReprIndex" => colOutputInfo } using ColumnIdMap_t = std::unordered_map; struct RColumnInfoGroup { std::vector fExtraDstColumns; - // These are sorted by InputId std::vector fCommonColumns; }; @@ -380,14 +375,14 @@ struct RColumnMergeInfo { // e.g. "Muon.pt.x._0" std::string fColumnName; // The column id in the source RNTuple - ROOT::DescriptorId_t fInputId; + ROOT::DescriptorId_t fInputId = kInvalidDescriptorId; // The corresponding column id in the destination RNTuple (the mapping happens in AddColumnsFromField()) - ROOT::DescriptorId_t fOutputId; - ENTupleColumnType fColumnType; + ROOT::DescriptorId_t fOutputId = kInvalidDescriptorId; + std::uint16_t fOutputReprIndex = 0; // If nullopt, use the default in-memory type std::optional fInMemoryType; - const ROOT::RFieldDescriptor *fParentFieldDescriptor; - const ROOT::RNTupleDescriptor *fParentNTupleDescriptor; + const ROOT::RFieldDescriptor *fParentFieldDescriptor = nullptr; + const ROOT::RNTupleDescriptor *fParentNTupleDescriptor = nullptr; }; // Data related to a single call of RNTupleMerger::Merge() @@ -399,6 +394,7 @@ struct RNTupleMergeData { const ROOT::RNTupleDescriptor *fSrcDescriptor = nullptr; std::vector fColumns; + // Maps input column IDs to output IDs ColumnIdMap_t fColumnIdMap; ROOT::NTupleSize_t fNumDstEntries = 0; @@ -429,33 +425,6 @@ std::ostream &operator<<(std::ostream &os, const std::optional @@ -483,8 +452,9 @@ CompareDescriptorStructure(const ROOT::RNTupleDescriptor &dst, const ROOT::RNTup } for (const auto &srcField : src.GetTopLevelFields()) { const auto dstFieldId = dst.FindFieldId(srcField.GetFieldName()); - if (dstFieldId == ROOT::kInvalidDescriptorId) + if (dstFieldId == ROOT::kInvalidDescriptorId) { res.fExtraSrcFields.push_back(&srcField); + } } // Check compatibility of common fields @@ -571,55 +541,103 @@ CompareDescriptorStructure(const ROOT::RNTupleDescriptor &dst, const ROOT::RNTup } // Require that column representations match - const auto srcNCols = field.fSrc->GetLogicalColumnIds().size(); - const auto dstNCols = field.fDst->GetLogicalColumnIds().size(); - if (srcNCols != dstNCols) { - std::stringstream ss; - ss << "Field `" << field.fSrc->GetFieldName() - << "` has a different number of columns than previously-seen field with the same name (old: " << dstNCols - << ", new: " << srcNCols << ")"; - errors.push_back(ss.str()); - } else { - for (auto i = 0u; i < srcNCols; ++i) { - const auto srcColId = field.fSrc->GetLogicalColumnIds()[i]; - const auto dstColId = field.fDst->GetLogicalColumnIds()[i]; - const auto &srcCol = src.GetColumnDescriptor(srcColId); - const auto &dstCol = dst.GetColumnDescriptor(dstColId); - // TODO(gparolini): currently we refuse to merge columns of different types unless they are Split/non-Split - // version of the same type, because we know how to treat that specific case. We should also properly handle - // different but compatible types. - if (srcCol.GetType() != dstCol.GetType() && - !IsSplitOrUnsplitVersionOf(srcCol.GetType(), dstCol.GetType())) { - std::stringstream ss; - ss << i << "-th column of field `" << field.fSrc->GetFieldName() - << "` has a different column type of the same column on the previously-seen field with the same name " - "(old: " - << RColumnElementBase::GetColumnTypeName(srcCol.GetType()) - << ", new: " << RColumnElementBase::GetColumnTypeName(dstCol.GetType()) << ")"; - errors.push_back(ss.str()); - } - if (srcCol.GetBitsOnStorage() != dstCol.GetBitsOnStorage()) { - std::stringstream ss; - ss << i << "-th column of field `" << field.fSrc->GetFieldName() - << "` has a different number of bits of the same column on the previously-seen field with the same " - "name " - "(old: " - << srcCol.GetBitsOnStorage() << ", new: " << dstCol.GetBitsOnStorage() << ")"; - errors.push_back(ss.str()); - } - if (srcCol.GetValueRange() != dstCol.GetValueRange()) { - std::stringstream ss; - ss << i << "-th column of field `" << field.fSrc->GetFieldName() - << "` has a different value range of the same column on the previously-seen field with the same name " - "(old: " - << srcCol.GetValueRange() << ", new: " << dstCol.GetValueRange() << ")"; - errors.push_back(ss.str()); - } - if (srcCol.GetRepresentationIndex() > 0) { + if (!field.fSrc->IsProjectedField()) { + const auto &srcColumns = field.fSrc->GetLogicalColumnIds(); + const auto &dstColumns = field.fDst->GetLogicalColumnIds(); + const auto srcNCols = srcColumns.size(); + const auto dstNCols = dstColumns.size(); + if (srcNCols != dstNCols) { + std::stringstream ss; + ss << "Field `" << field.fSrc->GetFieldName() + << "` has a different number of columns than previously-seen field with the same name (old: " << dstNCols + << ", new: " << srcNCols << ")"; + errors.push_back(ss.str()); + } else { + const std::uint32_t srcColCardinality = field.fSrc->GetColumnCardinality(); + const std::uint32_t dstColCardinality = field.fDst->GetColumnCardinality(); + if (srcColCardinality != dstColCardinality) { std::stringstream ss; - ss << i << "-th column of field `" << field.fSrc->GetFieldName() - << "` has a representation index higher than 0. This is not supported yet by the merger."; + ss << "Field `" << field.fSrc->GetFieldName() + << "` has a different column cardinality than previously-seen field with the same name (old: " + << dstColCardinality << ", new: " << srcColCardinality << ")"; errors.push_back(ss.str()); + } else if (srcColCardinality > 0) { + const auto srcNColReprs = srcNCols / srcColCardinality; + const auto dstNColReprs = dstNCols / dstColCardinality; + + // For each column representation of the source, check if it matches one in the descriptor. + // If so, and if it doesn't match the destination's repr index, add a mapping for it. + // If nothing matches, schedule the column representation to be added later. + // NOTE: this has quadratic complexity but the numbers involved are small so it's fine. + for (auto srcReprIdx = 0u; srcReprIdx < srcNColReprs; ++srcReprIdx) { + std::int64_t matchingRepr = -1; + for (auto dstReprIdx = 0u; dstReprIdx < dstNColReprs; ++dstReprIdx) { + bool matches = true; + for (auto reprColIdx = 0u; reprColIdx < srcColCardinality; ++reprColIdx) { + const auto srcColId = srcColumns[srcReprIdx * srcColCardinality + reprColIdx]; + const auto &srcCol = src.GetColumnDescriptor(srcColId); + const auto dstColId = dstColumns[dstReprIdx * dstColCardinality + reprColIdx]; + const auto &dstCol = dst.GetColumnDescriptor(dstColId); + if (srcCol.GetType() != dstCol.GetType()) { + matches = false; + break; + } + } + + if (matches) { + // If this column representation matches by column type, we need to make sure that it also has + // matching column metadata. Since we currently do not support multiple column representations + // that only differ by such metadata, we forbid merging such columns (e.g. we cannot merge two + // Real32Trunc columns with different bit widths). This could technically be supported, but it + // would require significant effort, so we currently don't. + for (auto reprColIdx = 0u; reprColIdx < srcColCardinality; ++reprColIdx) { + const auto srcColId = srcColumns[srcReprIdx * srcColCardinality + reprColIdx]; + const auto &srcCol = src.GetColumnDescriptor(srcColId); + const auto dstColId = dstColumns[dstReprIdx * dstColCardinality + reprColIdx]; + const auto &dstCol = dst.GetColumnDescriptor(dstColId); + if (srcCol.GetBitsOnStorage() != dstCol.GetBitsOnStorage() || + srcCol.GetValueRange() != dstCol.GetValueRange()) { + std::stringstream ss; + ss << "Source field `" << field.fSrc->GetFieldName() + << "` has a matching column representation as its destination field, however one or " + "more " + "of its columns have different column metadata (bit width and/or value range). " + "Merging variable-sized columns is currently only supported if all metadata is " + "identical between source and destination columns." + << "\n bit width src: " << srcCol.GetBitsOnStorage() + << ", dst: " << dstCol.GetBitsOnStorage() << "" + << "\n value range src: " << srcCol.GetValueRange() + << ", dst: " << dstCol.GetValueRange(); + errors.push_back(ss.str()); + break; + } + } + matchingRepr = dstReprIdx; + break; + } + } + + if (errors.empty()) { + if (matchingRepr >= 0 && matchingRepr != srcReprIdx) { + // a different matching representation was found + assert(matchingRepr < std::numeric_limits::max()); + res.fColReprMappings[field.fDst].push_back( + RColReprMapping{srcReprIdx, static_cast(matchingRepr)}); + } else if (matchingRepr < 0) { + // this representation was not found in the destination + assert(dstNColReprs < std::numeric_limits::max()); + ROOT::RFieldBase::ColumnRepresentation_t newRepr; + for (auto reprColIdx = 0u; reprColIdx < srcColCardinality; ++reprColIdx) { + const auto srcColId = srcColumns[srcReprIdx * srcColCardinality + reprColIdx]; + const auto &srcCol = src.GetColumnDescriptor(srcColId); + newRepr.push_back(srcCol.GetType()); + } + RColReprExtension extension{{srcReprIdx, static_cast(dstNColReprs)}, newRepr}; + res.fColReprExtensions[field.fDst].push_back(extension); + res.fColReprMappings[field.fDst].push_back(extension); + } + } + } } } } @@ -657,13 +675,13 @@ CompareDescriptorStructure(const ROOT::RNTupleDescriptor &dst, const ROOT::RNTup return ROOT::RResult(res); } -// Applies late model extension to `destination`, adding all `newFields` to it. +// Applies late model extension to `mergeData.fDestination`, adding all `descCmp.fExtraSrcFields` to it. [[nodiscard]] static ROOT::RResult -ExtendDestinationModel(std::span newFields, ROOT::RNTupleModel &dstModel, - RNTupleMergeData &mergeData, std::vector &commonFields) +ExtendDestinationModel(RDescriptorsComparison &descCmp, ROOT::RNTupleModel &dstModel, RNTupleMergeData &mergeData) { - assert(newFields.size() > 0); // no point in calling this with 0 new cols + const auto &newFields = descCmp.fExtraSrcFields; + auto &commonFields = descCmp.fCommonFields; dstModel.Unfreeze(); ROOT::Internal::RNTupleModelChangeset changeset{dstModel}; @@ -683,10 +701,19 @@ ExtendDestinationModel(std::span newFields, ROOT changeset.fAddedFields.reserve(newFields.size()); // First add all non-projected fields... for (const auto *fieldDesc : newFields) { - if (!fieldDesc->IsProjectedField()) { - auto field = fieldDesc->CreateField(*mergeData.fSrcDescriptor); - changeset.AddField(std::move(field)); + if (fieldDesc->IsProjectedField()) + continue; + + auto field = fieldDesc->CreateField(*mergeData.fSrcDescriptor); + // Explicitly set the field representatives. This prevents UpdateSchema() from changing our column + // representations via AutoAdjustColumnTypes. + ROOT::RFieldBase::ColumnRepresentation_t representatives; + for (const auto &colId : fieldDesc->GetLogicalColumnIds()) { + const auto &column = mergeData.fSrcDescriptor->GetColumnDescriptor(colId); + representatives.push_back(column.GetType()); } + field->SetColumnRepresentatives({representatives}); + changeset.AddField(std::move(field)); } // ...then add all projected fields. for (const auto *fieldDesc : newFields) { @@ -711,16 +738,39 @@ ExtendDestinationModel(std::span newFields, ROOT } dstModel.Freeze(); try { + // XXX: here we are connecting the new fields/columns to the sink! + // We should avoid doing that, as all other fields never get connected. + // NOTE: this calls AutoAdjustColumnTypes, but we have set the column representations of all fields + // explicitly, so it will not change it under the hood. mergeData.fDestination.UpdateSchema(changeset, mergeData.fNumDstEntries); } catch (const ROOT::RException &ex) { return R__FAIL(ex.GetError().GetReport()); } commonFields.reserve(commonFields.size() + newFields.size()); - for (const auto *field : newFields) { + // NOTE(gparolini): Insert the new fields at the beginning of `commonFields`. + // We need to make sure the extended fields appear before all other common fields for the following reason: + // in general, when we GatherColumnInfos we (potentially) assign new column output ids in field order; this + // assignment happens whenever we find new columns, which happens in 3 cases: + // 1. we are in the first source and we're adding the first set of (common) fields; + // 2. we are adding a new set of extended common fields (which come from this function); + // 3. we are adding new column representations for fields that we already had before processing this source. + // + // It's important that the output id assigned to the new columns is coherent with the order of the column descriptors + // as they appear in the header and footer. This is in turn determined by the order by which we append new columns to + // the dst descriptor during the merging process. + // Since we call ExtendDestinationModel (this function) *before* adding the new column representations, it is always + // the case that the dst descriptor gets updated with the new column descriptors coming from the extended fields + // (since they are added in UpdateSchema a few lines above) before it gets updated with the extended column + // representations (which happens later in sink->AddColumnRepresentation). + // However, the new column output ids are added sequentially in *field* order in GatherColumnInfos and the fields + // containing the new column representations are already in that list from earlier! So, to make sure the new output + // ids are assigned to our extended fields first, we push them in from on the list so that they are visited first. + for (auto it = newFields.rbegin(); it != newFields.rend(); ++it) { + const auto *field = *it; const auto newFieldInDstId = mergeData.fDstDescriptor.FindFieldId(field->GetFieldName()); const auto &newFieldInDst = mergeData.fDstDescriptor.GetFieldDescriptor(newFieldInDstId); - commonFields.emplace_back(*field, newFieldInDst); + commonFields.insert(commonFields.begin(), RCommonField{*field, newFieldInDst}); } return ROOT::RResult::Success(); @@ -764,10 +814,10 @@ GenerateZeroPagesForColumns(size_t nEntriesToGenerate, std::spanGetStructure(); if (structure == ROOT::ENTupleStructure::kStreamer) { - return R__FAIL( - "Destination RNTuple contains a streamer field (" + field->GetFieldName() + - ") that is not present in one of the sources. " - "Creating a default value for a streamer field is ill-defined, therefore the merging process will abort."); + return R__FAIL("Destination RNTuple contains a streamer field (" + field->GetFieldName() + + ") that is not present in one of the sources. " + "Creating a default value for a streamer field is ill-defined, therefore the merging " + "process will abort."); } // NOTE: we cannot have a Record here because it has no associated columns. @@ -814,7 +864,7 @@ GenerateZeroPagesForColumns(size_t nEntriesToGenerate, std::span RNTupleMerger::MergeCommonColumns(ROOT::Internal::RClusterPool &clusterPool, const ROOT::RClusterDescriptor &clusterDesc, - std::span commonColumns, + std::span commonColumns, const RCluster::ColumnSet_t &commonColumnSet, std::size_t nCommonColumnsInCluster, RSealedPageMergeData &sealedPageData, const RNTupleMergeData &mergeData, ROOT::Internal::RPageAllocator &pageAlloc) @@ -826,9 +876,11 @@ RNTupleMerger::MergeCommonColumns(ROOT::Internal::RClusterPool &clusterPool, const RCluster *cluster = clusterPool.GetCluster(clusterDesc.GetId(), commonColumnSet); // we expect the cluster pool to contain the requested set of columns, since they were - // validated by CompareDescriptorStructure(). + // validated by CompareDescriptorStructure() and MergeSourceClusters(). assert(cluster); + const std::uint32_t outCompression = mergeData.fMergeOpts.fCompressionSettings.value(); + for (size_t colIdx = 0; colIdx < nCommonColumnsInCluster; ++colIdx) { const auto &column = commonColumns[colIdx]; const auto &columnId = column.fInputId; @@ -838,9 +890,6 @@ RNTupleMerger::MergeCommonColumns(ROOT::Internal::RClusterPool &clusterPool, const auto srcColElement = column.fInMemoryType ? ROOT::Internal::GenerateColumnElement(*column.fInMemoryType, columnDesc.GetType()) : RColumnElementBase::Generate(columnDesc.GetType()); - const auto dstColElement = column.fInMemoryType - ? ROOT::Internal::GenerateColumnElement(*column.fInMemoryType, column.fColumnType) - : RColumnElementBase::Generate(column.fColumnType); // Now get the pages for this column in this cluster const auto &pages = clusterDesc.GetPageRange(columnId); @@ -855,24 +904,21 @@ RNTupleMerger::MergeCommonColumns(ROOT::Internal::RClusterPool &clusterPool, // L1: compression and encoding of src and dest both match: we can simply copy the page // L2: compression of dest doesn't match the src but encoding does: we must recompress the page but can avoid // resealing it. - // L3: on-disk encoding doesn't match: we need to reseal the page, which implies decompressing and recompressing + // L3: on-disk encoding doesn't match: we need to reseal the page, which implies decompressing and + // recompressing // it. - const bool compressionIsDifferent = - colRangeCompressionSettings != mergeData.fMergeOpts.fCompressionSettings.value(); - const bool needsResealing = - srcColElement->GetIdentifier().fOnDiskType != dstColElement->GetIdentifier().fOnDiskType; - const bool needsRecompressing = compressionIsDifferent || needsResealing; + const bool compressionIsDifferent = colRangeCompressionSettings != outCompression; + const bool needsRecompressing = compressionIsDifferent; if (needsRecompressing && mergeData.fMergeOpts.fExtraVerbose) { - R__LOG_INFO(NTupleMergeLog()) - << (needsResealing ? "Resealing" : "Recompressing") << " column " << column.fColumnName - << ": { compression: " << colRangeCompressionSettings << " => " - << mergeData.fMergeOpts.fCompressionSettings.value() - << ", onDiskType: " << RColumnElementBase::GetColumnTypeName(srcColElement->GetIdentifier().fOnDiskType) - << " => " << RColumnElementBase::GetColumnTypeName(dstColElement->GetIdentifier().fOnDiskType); + R__LOG_INFO(NTupleMergeLog()) << "Recompressing column " << column.fColumnName + << ": { compression: " << colRangeCompressionSettings << " => " + << mergeData.fMergeOpts.fCompressionSettings.value() << ", onDiskType: " + << RColumnElementBase::GetColumnTypeName( + srcColElement->GetIdentifier().fOnDiskType); } - size_t pageBufferBaseIdx = sealedPageData.fBuffers.size(); + const size_t pageBufferBaseIdx = sealedPageData.fBuffers.size(); // If the column range already has the right compression we don't need to allocate any new buffer, so we don't // bother reserving memory for them. if (needsRecompressing) @@ -921,29 +967,15 @@ RNTupleMerger::MergeCommonColumns(ROOT::Internal::RClusterPool &clusterPool, buffer = MakeUninitArray(bufSize); // clang-format off - if (needsResealing) { - RTaskVisitor{fTaskGroup}(RResealFunc{ - *srcColElement, - *dstColElement, - mergeData.fMergeOpts, - sealedPage, - *fPageAlloc, - buffer.get(), - bufSize, - mergeData.fDestination.GetWriteOptions() - }); - } else { - RTaskVisitor{fTaskGroup}(RChangeCompressionFunc{ - *srcColElement, - *dstColElement, - mergeData.fMergeOpts, - sealedPage, - *fPageAlloc, - buffer.get(), - bufSize, - mergeData.fDestination.GetWriteOptions() - }); - } + RTaskVisitor{fTaskGroup}(RChangeCompressionFunc{ + *srcColElement, + outCompression, + sealedPage, + *fPageAlloc, + buffer.get(), + bufSize, + mergeData.fDestination.GetWriteOptions() + }); // clang-format on } @@ -967,9 +999,9 @@ RNTupleMerger::MergeCommonColumns(ROOT::Internal::RClusterPool &clusterPool, // the destination's schemas. // The pages may be "fast-merged" (i.e. simply copied with no decompression/recompression) if the target // compression is unspecified or matches the original compression settings. -ROOT::RResult -RNTupleMerger::MergeSourceClusters(RPageSource &source, std::span commonColumns, - std::span extraDstColumns, RNTupleMergeData &mergeData) +ROOT::RResult RNTupleMerger::MergeSourceClusters(RPageSource &source, std::span commonColumns, + std::span extraDstColumns, + RNTupleMergeData &mergeData) { ROOT::Internal::RClusterPool clusterPool{source}; @@ -984,31 +1016,60 @@ RNTupleMerger::MergeSourceClusters(RPageSource &source, std::span 0); - // NOTE: just because a column is in `commonColumns` it doesn't mean that each cluster in the source contains it, - // as it may be a deferred column that only has real data in a future cluster. - // We need to figure out which columns are actually present in this cluster so we only merge their pages (the - // missing columns are handled by synthesizing zero pages - see below). - size_t nCommonColumnsInCluster = commonColumns.size(); - while (nCommonColumnsInCluster > 0) { - // Since `commonColumns` is sorted by column input id, we can simply traverse it from the back and stop as - // soon as we find a common column that appears in this cluster: we know that in that case all previous - // columns must appear as well. - if (clusterDesc.ContainsColumn(commonColumns[nCommonColumnsInCluster - 1].fInputId)) - break; - --nCommonColumnsInCluster; - } - + // Deduce which columns are suppressed (cluster by cluster) by exclusion, as: + // (columns in the columnIdMap) - (columns in commonColumns which are not suppressed). + // Note that some suppressed columns may not be in commonColumns because they might not appear at all in the + // current source. + FieldCollectionMap_t activeColumns; + + // NOTE: just because a column is in `commonColumns` it doesn't mean that each cluster in the source contains + // it, as it may be a deferred column that only has real data in a future cluster. We need to figure out which + // columns are actually present in this cluster so we only merge their pages (the missing columns are handled + // by synthesizing zero pages - see below). + size_t nCommonColumnsInCluster = 0; // Convert columns to a ColumnSet for the ClusterPool query RCluster::ColumnSet_t commonColumnSet; commonColumnSet.reserve(nCommonColumnsInCluster); - for (size_t i = 0; i < nCommonColumnsInCluster; ++i) - commonColumnSet.emplace(commonColumns[i].fInputId); + // Collect all common columns appearing in this cluster into commonColumnSet and reorganize commonColumns so + // that those columns are at the start of it (whereas missing columns are at its end). + // NOTE: it's fine if this scrambles the order of columns: the RNTupleSerializer will sort them by physical ID. + std::partition(commonColumns.begin(), commonColumns.end(), [&](const auto &column) { + if (const auto *colRange = clusterDesc.TryGetColumnRange(column.fInputId)) { + if (!colRange->IsSuppressed()) { + ++nCommonColumnsInCluster; + commonColumnSet.emplace(column.fInputId); + activeColumns[column.fParentFieldDescriptor].push_back(column.fOutputId); + return true; + } + } + return false; + }); - // For each cluster, the "missing columns" are the union of the extraDstColumns and the common columns - // that are not present in the cluster. We generate zero pages for all of them. - missingColumns.resize(extraDstColumns.size()); // NOTE: this clears all common columns of the previous cluster - for (size_t i = nCommonColumnsInCluster; i < commonColumns.size(); ++i) - missingColumns.push_back(commonColumns[i]); + // Commit all suppressed columns. + // This is a fairly involved operation, as we need to commit all known columns that: + // a) do not appear in extraDstColumns (those are "missing", not suppressed), and + // b) do not appear in commonColumnSet (those are the active columns). + // Not that these may or may not appear in commonColumns as suppressed columns, since they may or may not be + // present in the current source. + // The only way to find all the columns is to go and get them from fColumnIdMap, which keeps track of every + // column we added to the destination so far. However, since it also contains the extraDstColumns, we need to + // specifically only query those columns that belong to a field that has at least 1 column in commonColumns + // (remember that commonColumns contains all columns associated to the common fields for this source). + for (const auto &[fieldDesc, activeIds] : activeColumns) { + const auto &fieldFQName = mergeData.fSrcDescriptor->GetQualifiedFieldName(fieldDesc->GetId()); + const auto cardinality = fieldDesc->GetColumnCardinality(); + for (auto i = 0u; i < fieldDesc->GetLogicalColumnIds().size(); ++i) { + const auto colIndex = i % cardinality; + const auto reprIndex = i / cardinality; + const auto colName = "." + fieldFQName + '.' + std::to_string(colIndex) + '.' + std::to_string(reprIndex); + const auto colIt = mergeData.fColumnIdMap.find(colName); + assert(colIt != mergeData.fColumnIdMap.end()); + const auto colOutId = colIt->second.fColumnId; + if (std::find(activeIds.begin(), activeIds.end(), colOutId) == activeIds.end()) { + mergeData.fDestination.CommitSuppressedColumn(ROOT::Internal::RPageStorage::ColumnHandle_t{colOutId}); + } + } + } RSealedPageMergeData sealedPageData; auto res = MergeCommonColumns(clusterPool, clusterDesc, commonColumns, commonColumnSet, nCommonColumnsInCluster, @@ -1016,6 +1077,14 @@ RNTupleMerger::MergeSourceClusters(RPageSource &source, std::span ColumnInMemoryType(std::string_view fieldT return std::nullopt; } -// Given a field, fill `columns` and `mergeData.fColumnIdMap` with information about all columns belonging to it and its -// subfields. `mergeData.fColumnIdMap` is used to map matching columns from different sources to the same output column -// in the destination. We match columns by their "fully qualified name", which is the concatenation of their ancestor -// fields' names and the column index. By this point, since we called `CompareDescriptorStructure()` earlier, we should -// be guaranteed that two matching columns will have at least compatible representations. NOTE: srcFieldDesc and -// dstFieldDesc may alias. +// Given a field, fill `columns` and `mergeData.fColumnIdMap` with information about all columns belonging to it and +// its subfields. `mergeData.fColumnIdMap` is used to map matching columns from different sources to the same output +// column in the destination. We match columns by their "fully qualified name", which is the concatenation of their +// ancestor fields' names and the column index. By this point, since we called `CompareDescriptorStructure()` +// earlier, we should be guaranteed that two matching columns will have at least compatible representations. +// This function is recursive as it needs to call itself on the entire subfield hierarchy of the source field. +// NOTE: srcFieldDesc and dstFieldDesc may alias. static void AddColumnsFromField(std::vector &columns, const ROOT::RNTupleDescriptor &srcDesc, + const FieldCollectionMap_t &colReprMappings, RNTupleMergeData &mergeData, const ROOT::RFieldDescriptor &srcFieldDesc, const ROOT::RFieldDescriptor &dstFieldDesc, const std::string &prefix = "") { @@ -1086,21 +1157,19 @@ static void AddColumnsFromField(std::vector &columns, const RO const auto &columnIds = srcFieldDesc.GetLogicalColumnIds(); columns.reserve(columns.size() + columnIds.size()); - // NOTE: here we can match the src and dst columns by column index because we forbid merging fields with - // different column representations. + for (auto i = 0u; i < srcFieldDesc.GetLogicalColumnIds().size(); ++i) { auto srcColumnId = srcFieldDesc.GetLogicalColumnIds()[i]; const auto &srcColumn = srcDesc.GetColumnDescriptor(srcColumnId); RColumnMergeInfo info{}; - info.fColumnName = name + '.' + std::to_string(srcColumn.GetIndex()); info.fInputId = srcColumn.GetPhysicalId(); // NOTE(gparolini): the parent field is used when synthesizing zero pages, which happens in 2 situations: // 1. when adding extra dst columns (in which case we need to synthesize zero pages for the incoming src), and // 2. when merging a deferred column into an existing column (in which case we need to fill the "hole" with - // zeroes). For the first case srcFieldDesc and dstFieldDesc are the same (see the calling site of this function), - // but for the second case they're not, and we need to pick the source field because we will then check the - // column's *input* id inside fParentFieldDescriptor to see if it's a suppressed column (see + // zeroes). For the first case srcFieldDesc and dstFieldDesc are the same (see the calling site of this + // function), but for the second case they're not, and we need to pick the source field because we will then + // check the column's *input* id inside fParentFieldDescriptor to see if it's a suppressed column (see // GenerateZeroPagesForColumns()). info.fParentFieldDescriptor = &srcFieldDesc; // Save the parent field descriptor since this may be either the source or destination descriptor depending on @@ -1108,22 +1177,34 @@ static void AddColumnsFromField(std::vector &columns, const RO // properly walk up the field hierarchy. info.fParentNTupleDescriptor = &srcDesc; + const auto mappingsIt = colReprMappings.find(&dstFieldDesc); + std::uint16_t reprIndex = srcColumn.GetRepresentationIndex(); + if (mappingsIt != colReprMappings.end()) { + if (auto outReprIdx = FindColumnReprMapping(mappingsIt->second, reprIndex); outReprIdx) + reprIndex = *outReprIdx; + } + + info.fColumnName = name + '.' + std::to_string(srcColumn.GetIndex()) + '.' + std::to_string(reprIndex); + + ENTupleColumnType columnType = ENTupleColumnType::kUnknown; + if (auto it = mergeData.fColumnIdMap.find(info.fColumnName); it != mergeData.fColumnIdMap.end()) { + // We had already added this column to the column id map: just copy its data. info.fOutputId = it->second.fColumnId; - info.fColumnType = it->second.fColumnType; + info.fOutputReprIndex = reprIndex; } else { + // New column: assign it the next ouput id. info.fOutputId = mergeData.fColumnIdMap.size(); - // NOTE(gparolini): map the type of src column to the type of dst column. - // This mapping is only relevant for common columns and it's done to ensure we keep a consistent - // on-disk representation of the same column. - // This is also important to do for first source when it is used to generate the destination sink, - // because even in that case their column representations may differ. - // e.g. if the destination has a different compression than the source, an integer column might be - // zigzag-encoded in the source but not in the destination. - auto dstColumnId = dstFieldDesc.GetLogicalColumnIds()[i]; + // NOTE(gparolini): map the representation index of src column to that of dst column. + // This mapping is only relevant for common columns and it's done to ensure we have the correct representation + // index in the output column metadata. + assert(dstFieldDesc.GetColumnCardinality() == srcFieldDesc.GetColumnCardinality()); + const auto dstColumnIndex = reprIndex * dstFieldDesc.GetColumnCardinality() + srcColumn.GetIndex(); + const auto dstColumnId = dstFieldDesc.GetLogicalColumnIds()[dstColumnIndex]; const auto &dstColumn = mergeData.fDstDescriptor.GetColumnDescriptor(dstColumnId); - info.fColumnType = dstColumn.GetType(); - mergeData.fColumnIdMap[info.fColumnName] = {info.fOutputId, info.fColumnType}; + columnType = dstColumn.GetType(); + info.fOutputReprIndex = reprIndex; + mergeData.fColumnIdMap[info.fColumnName] = RColumnOutInfo{info.fOutputId}; } if (mergeData.fMergeOpts.fExtraVerbose) { @@ -1131,12 +1212,12 @@ static void AddColumnsFromField(std::vector &columns, const RO << ", phys.id " << srcColumn.GetPhysicalId() << ", type " << RColumnElementBase::GetColumnTypeName(srcColumn.GetType()) << " -> log.id " << info.fOutputId << ", type " - << RColumnElementBase::GetColumnTypeName(info.fColumnType); + << RColumnElementBase::GetColumnTypeName(columnType); } // Since we disallow merging fields of different types, src and dstFieldDesc must have the same type name. assert(srcFieldDesc.GetTypeName() == dstFieldDesc.GetTypeName()); - info.fInMemoryType = ColumnInMemoryType(srcFieldDesc.GetTypeName(), info.fColumnType); + info.fInMemoryType = ColumnInMemoryType(srcFieldDesc.GetTypeName(), columnType); columns.emplace_back(info); } @@ -1146,7 +1227,7 @@ static void AddColumnsFromField(std::vector &columns, const RO for (auto i = 0u; i < srcChildrenIds.size(); ++i) { const auto &srcChild = srcDesc.GetFieldDescriptor(srcChildrenIds[i]); const auto &dstChild = mergeData.fDstDescriptor.GetFieldDescriptor(dstChildrenIds[i]); - AddColumnsFromField(columns, srcDesc, mergeData, srcChild, dstChild, name); + AddColumnsFromField(columns, srcDesc, colReprMappings, mergeData, srcChild, dstChild, name); } } @@ -1158,17 +1239,12 @@ static RColumnInfoGroup GatherColumnInfos(const RDescriptorsComparison &descCmp, { RColumnInfoGroup res; for (const ROOT::RFieldDescriptor *field : descCmp.fExtraDstFields) { - AddColumnsFromField(res.fExtraDstColumns, mergeData.fDstDescriptor, mergeData, *field, *field); + AddColumnsFromField(res.fExtraDstColumns, mergeData.fDstDescriptor, descCmp.fColReprMappings, mergeData, *field, + *field); } for (const auto &[srcField, dstField] : descCmp.fCommonFields) { - AddColumnsFromField(res.fCommonColumns, srcDesc, mergeData, *srcField, *dstField); + AddColumnsFromField(res.fCommonColumns, srcDesc, descCmp.fColReprMappings, mergeData, *srcField, *dstField); } - - // Sort the commonColumns by ID so we can more easily tell how many common columns each cluster has - // (since each cluster must contain all columns of the previous cluster plus potentially some new ones) - std::sort(res.fCommonColumns.begin(), res.fCommonColumns.end(), - [](const auto &a, const auto &b) { return a.fInputId < b.fInputId; }); - return res; } @@ -1179,9 +1255,9 @@ static void PrefillColumnMap(const ROOT::RNTupleDescriptor &desc, const ROOT::RF for (const auto &colId : fieldDesc.GetLogicalColumnIds()) { const auto &colDesc = desc.GetColumnDescriptor(colId); RColumnOutInfo info{}; - const auto colName = name + '.' + std::to_string(colDesc.GetIndex()); info.fColumnId = colDesc.GetLogicalId(); - info.fColumnType = colDesc.GetType(); + const auto colName = + name + '.' + std::to_string(colDesc.GetIndex()) + '.' + std::to_string(colDesc.GetRepresentationIndex()); colIdMap[colName] = info; } @@ -1191,6 +1267,25 @@ static void PrefillColumnMap(const ROOT::RNTupleDescriptor &desc, const ROOT::RF } } +static void AddColumnExtensionsInFieldOrder( + const ROOT::RFieldDescriptor &field, const ROOT::RNTupleDescriptor &desc, + const FieldCollectionMap_t &extensions, + std::vector>> &outExtensions, + std::unordered_map> &outProjectionPointees) +{ + const auto it = extensions.find(&field); + if (it != extensions.end()) + outExtensions.emplace_back(it->first, it->second); + + if (field.IsProjectedField()) + outProjectionPointees[field.GetProjectionSourceId()].push_back(&field); + + for (auto childId : field.GetLinkIds()) { + const auto &child = desc.GetFieldDescriptor(childId); + AddColumnExtensionsInFieldOrder(child, desc, extensions, outExtensions, outProjectionPointees); + } +} + RNTupleMerger::RNTupleMerger(std::unique_ptr destination, std::unique_ptr model) // TODO(gparolini): consider using an arena allocator instead, since we know the precise lifetime @@ -1231,6 +1326,10 @@ ROOT::RResult RNTupleMerger::Merge(std::span sources, const } } + // Maps projection source fields to all their projections. + std::unordered_map> projectionPointees; + bool projectionPointeesInitialized = false; + // we should have a model if and only if the destination is initialized. if (!!fModel != fDestination->IsInitialized()) { return R__FAIL( @@ -1282,7 +1381,7 @@ ROOT::RResult RNTupleMerger::Merge(std::span sources, const } } - // Create sink from the input model if not initialized + // Create sink and model from the input descriptor if not initialized if (!fModel) { fModel = fDestination->InitFromDescriptor(srcDescriptor.GetRef(), false /* copyClusters */); } @@ -1308,10 +1407,10 @@ ROOT::RResult RNTupleMerger::Merge(std::span sources, const } // handle extra src fields - if (descCmp.fExtraSrcFields.size()) { + if (!descCmp.fExtraSrcFields.empty()) { if (mergeOpts.fMergingMode == ENTupleMergingMode::kUnion) { // late model extension for all fExtraSrcFields in Union mode - auto res = ExtendDestinationModel(descCmp.fExtraSrcFields, *fModel, mergeData, descCmp.fCommonFields); + auto res = ExtendDestinationModel(descCmp, *fModel, mergeData); if (!res) return R__FORWARD_ERROR(res); } else if (mergeOpts.fMergingMode == ENTupleMergingMode::kStrict) { @@ -1324,6 +1423,53 @@ ROOT::RResult RNTupleMerger::Merge(std::span sources, const } } + //// Extend columns if needed + if (!descCmp.fColReprExtensions.empty()) { + if (!projectionPointeesInitialized) { + for (const auto &field : descCmp.fExtraDstFields) { + if (field->IsProjectedField()) + projectionPointees[field->GetProjectionSourceId()].push_back(field); + } + projectionPointeesInitialized = true; + } + + // We need to extend the columns in the proper order, i.e. so that they appear in the same order as + // their first representation. This is to ensure that the pages we write to the cluster are in a consistent + // order as their column descriptors. The page creation order is determined by the order of + // columnInfos.fCommonColumns, which in turn depends on the common fields order (see GatherColumnInfos). + // XXX: do we need this separate sort step? Why not just create this vector directly in + // CompareDescriptorStructure? + std::vector>> colExtensions; + colExtensions.reserve(descCmp.fColReprExtensions.size()); + for (const auto &commonField : descCmp.fCommonFields) { + const auto *field = commonField.fDst; + AddColumnExtensionsInFieldOrder(*field, mergeData.fDstDescriptor, descCmp.fColReprExtensions, colExtensions, + projectionPointees); + } + for (const auto &field : descCmp.fExtraSrcFields) { + if (field->IsProjectedField()) + projectionPointees[field->GetProjectionSourceId()].push_back(field); + } + + for (const auto &[fieldDesc, extensions] : colExtensions) { + auto &mappings = descCmp.fColReprMappings[fieldDesc]; + for (const auto &extension : extensions) { + const auto firstColumnId = fDestination->AddColumnRepresentation(*fieldDesc, extension.fSourceRepr); + + // When adding new column representations to an existing field which is the source of some projected + // fields, we need to also add new alias columns to those fields so that they can point to the proper + // representation. + if (auto it = projectionPointees.find(fieldDesc->GetId()); it != projectionPointees.end()) { + for (const auto &projection : it->second) { + for (auto colIdx = 0u; colIdx < extension.fSourceRepr.size(); ++colIdx) + fDestination->AddAliasColumn(mergeData.fDstDescriptor, *projection, firstColumnId + colIdx); + } + } + mappings.push_back(extension); + } + } + } + // handle extra dst fields & common fields auto columnInfos = GatherColumnInfos(descCmp, srcDescriptor.GetRef(), mergeData); auto res = MergeSourceClusters(*source, columnInfos.fCommonColumns, columnInfos.fExtraDstColumns, mergeData); diff --git a/tree/ntuple/src/RPageStorage.cxx b/tree/ntuple/src/RPageStorage.cxx index 8e2d2e1d72016..1293b975934b4 100644 --- a/tree/ntuple/src/RPageStorage.cxx +++ b/tree/ntuple/src/RPageStorage.cxx @@ -1049,15 +1049,17 @@ ROOT::Internal::RPagePersistentSink::InitFromDescriptor(const ROOT::RNTupleDescr return model; } -void ROOT::Internal::RPagePersistentSink::AddColumnRepresentation(const ROOT::RFieldDescriptor &field, - std::span newRepresentation) +ROOT::DescriptorId_t +ROOT::Internal::RPagePersistentSink::AddColumnRepresentation(const ROOT::RFieldDescriptor &field, + std::span newRepresentation) { - R__ASSERT(field.GetColumnCardinality() > 0); - R__ASSERT(!field.GetLogicalColumnIds().empty()); - R__ASSERT(newRepresentation.size() == field.GetColumnCardinality()); + assert(!field.IsProjectedField()); + assert(field.GetColumnCardinality() > 0); + assert(!field.GetLogicalColumnIds().empty()); + assert(newRepresentation.size() == field.GetColumnCardinality()); - const auto firstPhysicalIndex = fDescriptorBuilder.GetDescriptor().GetNPhysicalColumns(); - const auto reprIndex = field.GetLogicalColumnIds().size() / field.GetColumnCardinality(); + const std::size_t firstPhysicalIndex = fDescriptorBuilder.GetDescriptor().GetNPhysicalColumns(); + const std::uint16_t reprIndex = field.GetLogicalColumnIds().size() / field.GetColumnCardinality(); fDescriptorBuilder.ShiftAliasColumns(newRepresentation.size()); @@ -1067,9 +1069,9 @@ void ROOT::Internal::RPagePersistentSink::AddColumnRepresentation(const ROOT::RF const auto [rangeMin, rangeMax] = ROOT::Internal::RColumnElementBase::GetValidBitRange(columnType); R__ASSERT(rangeMin == rangeMax); - const auto firstReprColumnId = field.GetLogicalColumnIds()[columnIndex]; + const ROOT::DescriptorId_t firstReprColumnId = field.GetLogicalColumnIds()[columnIndex]; const auto &firstReprColumnRange = fOpenColumnRanges.at(firstReprColumnId); - const auto columnId = firstPhysicalIndex + columnIndex; + const ROOT::DescriptorId_t columnId = firstPhysicalIndex + columnIndex; RColumnDescriptorBuilder columnBuilder; columnBuilder.LogicalColumnId(columnId) @@ -1098,6 +1100,29 @@ void ROOT::Internal::RPagePersistentSink::AddColumnRepresentation(const ROOT::RF ++columnIndex; } + + return firstPhysicalIndex; +} + +void ROOT::Internal::RPagePersistentSink::AddAliasColumn(const ROOT::RNTupleDescriptor &desc, + const ROOT::RFieldDescriptor &field, + ROOT::DescriptorId_t physicalId) +{ + const auto &pointedColumn = desc.GetColumnDescriptor(physicalId); + assert(!pointedColumn.IsAliasColumn()); + + const auto columnId = fDescriptorBuilder.GetDescriptor().GetNLogicalColumns(); + RColumnDescriptorBuilder columnBuilder; + columnBuilder.LogicalColumnId(columnId) + .PhysicalColumnId(physicalId) + .FieldId(field.GetId()) + .Type(pointedColumn.GetType()) + .Index(pointedColumn.GetIndex()) + .BitsOnStorage(pointedColumn.GetBitsOnStorage()) + .ValueRange(pointedColumn.GetValueRange()) + .FirstElementIndex(pointedColumn.GetFirstElementIndex()) + .RepresentationIndex(pointedColumn.GetRepresentationIndex()); + fDescriptorBuilder.AddColumn(columnBuilder.MakeDescriptor().Unwrap()); } void ROOT::Internal::RPagePersistentSink::CommitSuppressedColumn(ColumnHandle_t columnHandle) diff --git a/tree/ntuple/test/ntuple_merger.cxx b/tree/ntuple/test/ntuple_merger.cxx index 29942409a1d3f..6bf768a956722 100644 --- a/tree/ntuple/test/ntuple_merger.cxx +++ b/tree/ntuple/test/ntuple_merger.cxx @@ -1011,7 +1011,7 @@ TEST(RNTupleMerger, MergeLateModelExtension) { // Write two test ntuples to be merged, with different models. // Use EMergingMode::kUnion so the output ntuple has all the fields of its inputs. - FileRaii fileGuard1("test_ntuple_merge_in_1.root"); + FileRaii fileGuard1("test_ntuple_merge_lmext_in_1.root"); { auto model = RNTupleModel::Create(); auto fieldFoo = model->MakeField>("foo"); @@ -1027,11 +1027,12 @@ TEST(RNTupleMerger, MergeLateModelExtension) } } - FileRaii fileGuard2("test_ntuple_merge_in_2.root"); + FileRaii fileGuard2("test_ntuple_merge_lmext_in_2.root"); { auto model = RNTupleModel::Create(); auto fieldBaz = model->MakeField("baz"); auto fieldFoo = model->MakeField>("foo"); + auto fieldQux = model->MakeField("qux"); auto fieldVfoo = model->MakeField[3]>("vfoo"); auto wopts = RNTupleWriteOptions(); wopts.SetCompression(0); @@ -1041,12 +1042,13 @@ TEST(RNTupleMerger, MergeLateModelExtension) fieldFoo->insert(std::make_pair(std::to_string(i), i * 765)); fieldVfoo[0] = {(int)i * 765}; fieldVfoo[2] = {(int)i * 987}; + *fieldQux = i * 777; ntuple->Fill(); } } // Now merge the inputs - FileRaii fileGuard3("test_ntuple_merge_out.root"); + FileRaii fileGuard3("test_ntuple_merge_lmext_out.root"); { // Gather the input sources std::vector> sources; @@ -1077,6 +1079,7 @@ TEST(RNTupleMerger, MergeLateModelExtension) auto vfoo = ntuple->GetModel().GetDefaultEntry().GetPtr[3]>("vfoo"); auto bar = ntuple->GetModel().GetDefaultEntry().GetPtr("bar"); auto baz = ntuple->GetModel().GetDefaultEntry().GetPtr("baz"); + auto qux = ntuple->GetModel().GetDefaultEntry().GetPtr("qux"); for (int i = 0; i < 10; ++i) { ntuple->LoadEntry(i); @@ -1085,6 +1088,7 @@ TEST(RNTupleMerger, MergeLateModelExtension) ASSERT_EQ(vfoo[2][0], i * 345); ASSERT_EQ(*bar, i * 321); ASSERT_EQ(*baz, 0); + ASSERT_EQ(*qux, 0); } for (int i = 10; i < 20; ++i) { ntuple->LoadEntry(i); @@ -1093,6 +1097,7 @@ TEST(RNTupleMerger, MergeLateModelExtension) ASSERT_EQ(vfoo[2][0], (i - 10) * 987); ASSERT_EQ(*bar, 0); ASSERT_EQ(*baz, (i - 10) * 567); + ASSERT_EQ(*qux, (i - 10) * 777); } } } @@ -1180,8 +1185,10 @@ TEST(RNTupleMerger, DifferentCompatibleRepresentations) auto model = RNTupleModel::Create(); auto pFoo = model->MakeField("foo"); auto clonedModel = model->Clone(); + auto wopts = RNTupleWriteOptions(); + wopts.SetCompression(0); { - auto ntuple = RNTupleWriter::Recreate(std::move(model), "ntuple", fileGuard1.GetPath()); + auto ntuple = RNTupleWriter::Recreate(std::move(model), "ntuple", fileGuard1.GetPath(), wopts); for (size_t i = 0; i < 10; ++i) { *pFoo = i * 123; ntuple->Fill(); @@ -1193,12 +1200,12 @@ TEST(RNTupleMerger, DifferentCompatibleRepresentations) { auto &fieldFooDbl = clonedModel->GetMutableField("foo"); fieldFooDbl.SetColumnRepresentatives({{ROOT::ENTupleColumnType::kReal32}}); - auto ntuple = RNTupleWriter::Recreate(std::move(clonedModel), "ntuple", fileGuard2.GetPath()); + auto ntuple = RNTupleWriter::Recreate(std::move(clonedModel), "ntuple", fileGuard2.GetPath(), wopts); auto e = ntuple->CreateEntry(); auto pFoo2 = e->GetPtr("foo"); for (size_t i = 0; i < 10; ++i) { *pFoo2 = i * 567; - ntuple->Fill(); + ntuple->Fill(*e); } } @@ -1218,30 +1225,18 @@ TEST(RNTupleMerger, DifferentCompatibleRepresentations) auto sourcePtrs2 = sourcePtrs; { - auto wopts = RNTupleWriteOptions(); - wopts.SetCompression(0); auto destination = std::make_unique("ntuple", fileGuard3.GetPath(), wopts); auto opts = RNTupleMergeOptions(); opts.fCompressionSettings = 0; RNTupleMerger merger{std::move(destination)}; auto res = merger.Merge(sourcePtrs, opts); - // TODO(gparolini): we want to support this in the future - EXPECT_FALSE(bool(res)); - if (res.GetError()) { - EXPECT_THAT(res.GetError()->GetReport(), testing::HasSubstr("different column type")); - } - // EXPECT_TRUE(bool(res)); + EXPECT_TRUE(bool(res)); } { auto destination = std::make_unique("ntuple", fileGuard4.GetPath(), RNTupleWriteOptions()); RNTupleMerger merger{std::move(destination)}; auto res = merger.Merge(sourcePtrs); - // TODO(gparolini): we want to support this in the future - EXPECT_FALSE(bool(res)); - if (res.GetError()) { - EXPECT_THAT(res.GetError()->GetReport(), testing::HasSubstr("different column type")); - } - // EXPECT_TRUE(bool(res)); + EXPECT_TRUE(bool(res)); } } } @@ -1531,7 +1526,9 @@ TEST(RNTupleMerger, MergeProjectedFieldsOnlyFirst) { auto model = RNTupleModel::Create(); auto fieldFoo = model->MakeField("foo"); - auto ntuple = RNTupleWriter::Recreate(std::move(model), "ntuple", fileGuard2.GetPath()); + auto wopts = RNTupleWriteOptions(); + wopts.SetCompression(0); + auto ntuple = RNTupleWriter::Recreate(std::move(model), "ntuple", fileGuard2.GetPath(), wopts); for (size_t i = 0; i < 10; ++i) { *fieldFoo = i * 123; ntuple->Fill(); @@ -1565,16 +1562,18 @@ TEST(RNTupleMerger, MergeProjectedFieldsOnlyFirst) auto ntuple1 = RNTupleReader::Open("ntuple", fileGuard1.GetPath()); auto ntuple2 = RNTupleReader::Open("ntuple", fileGuard2.GetPath()); auto ntuple3 = RNTupleReader::Open("ntuple", fileGuardOut.GetPath()); - ASSERT_EQ(ntuple1->GetNEntries() + ntuple2->GetNEntries(), ntuple3->GetNEntries()); + EXPECT_EQ(ntuple1->GetNEntries() + ntuple2->GetNEntries(), ntuple3->GetNEntries()); const auto &desc1 = ntuple1->GetDescriptor(); const auto &desc2 = ntuple2->GetDescriptor(); const auto &desc3 = ntuple3->GetDescriptor(); const auto nAliasColumns1 = desc1.GetNLogicalColumns() - desc1.GetNPhysicalColumns(); const auto nAliasColumns2 = desc2.GetNLogicalColumns() - desc2.GetNPhysicalColumns(); const auto nAliasColumns3 = desc3.GetNLogicalColumns() - desc3.GetNPhysicalColumns(); - ASSERT_EQ(nAliasColumns1, 1); - ASSERT_EQ(nAliasColumns2, 0); - ASSERT_EQ(nAliasColumns3, 1); + EXPECT_EQ(nAliasColumns1, 1); + EXPECT_EQ(nAliasColumns2, 0); + // The output RNTuple has 2 alias columns because one was created by the merger to point to the extended + // column that was added to field "foo" (since source 2 had a different encoding than source 1). + EXPECT_EQ(nAliasColumns3, 2); auto foo1 = ntuple1->GetModel().GetDefaultEntry().GetPtr("foo"); auto foo2 = ntuple2->GetModel().GetDefaultEntry().GetPtr("foo"); @@ -1586,16 +1585,16 @@ TEST(RNTupleMerger, MergeProjectedFieldsOnlyFirst) for (auto i = 0u; i < ntuple1->GetNEntries(); ++i) { ntuple1->LoadEntry(i); ntuple3->LoadEntry(i); - ASSERT_EQ(*foo1, *foo3); - ASSERT_EQ(*bar1, *foo3); - ASSERT_EQ(*bar1, *bar3); + EXPECT_EQ(*foo1, *foo3); + EXPECT_EQ(*bar1, *foo3); + EXPECT_EQ(*bar1, *bar3); } for (auto i = 0u; i < ntuple2->GetNEntries(); ++i) { ntuple2->LoadEntry(i); ntuple3->LoadEntry(ntuple1->GetNEntries() + i); - ASSERT_EQ(*foo2, *foo3); + EXPECT_EQ(*foo2, *foo3); // we should be able to read the data from the second ntuple using the projection defined in the first. - ASSERT_EQ(*foo2, *bar3); + EXPECT_EQ(*foo2, *bar3); } } } @@ -2538,7 +2537,8 @@ TEST(RNTupleMerger, MergeDeferredAdvanced) auto model1 = RNTupleModel::Create(); auto wopts = RNTupleWriteOptions(); wopts.SetCompression(0); - auto writer1 = RNTupleWriter::Recreate(std::move(model1), "ntuple", fileGuard1.GetPath(), wopts); + auto tfile = TFile::Open((fileGuard1.GetPath() + "?reproducible").c_str(), "RECREATE"); + auto writer1 = RNTupleWriter::Append(std::move(model1), "ntuple", *tfile, wopts); auto updater = writer1->CreateModelUpdater(); updater->BeginUpdate(); updater->AddField(RFieldBase::Create("flt", "float").Unwrap()); @@ -2611,7 +2611,7 @@ TEST(RNTupleMerger, MergeDeferredAdvanced) auto pInt = reader->GetModel().GetDefaultEntry().GetPtr("int"); auto pFlt = reader->GetModel().GetDefaultEntry().GetPtr("flt"); - for (auto i = 0u; i < reader->GetNEntries(); ++i) { + for (auto i : reader->GetEntryRange()) { reader->LoadEntry(i); float expectedFlt = (i >= 10 && i < 15) ? 0 : i; EXPECT_FLOAT_EQ(*pFlt, expectedFlt);