From e09d7083382fcbd313a8523ae00d7833ad518e81 Mon Sep 17 00:00:00 2001 From: Silia Taider Date: Fri, 13 Feb 2026 17:57:29 +0100 Subject: [PATCH 1/3] [ML] Introduce asynchronous lazy data loading --- .../ROOT/_pythonization/_ml_dataloader.py | 6 +- tree/ml/inc/ROOT/ML/RBatchGenerator.hxx | 280 +++++++++++++----- tree/ml/inc/ROOT/ML/RBatchLoader.hxx | 107 +++++-- 3 files changed, 296 insertions(+), 97 deletions(-) diff --git a/bindings/pyroot/pythonizations/python/ROOT/_pythonization/_ml_dataloader.py b/bindings/pyroot/pythonizations/python/ROOT/_pythonization/_ml_dataloader.py index eeb88b2006d24..634568c771a2a 100644 --- a/bindings/pyroot/pythonizations/python/ROOT/_pythonization/_ml_dataloader.py +++ b/bindings/pyroot/pythonizations/python/ROOT/_pythonization/_ml_dataloader.py @@ -3,6 +3,7 @@ # Author: Nopphakorn Subsa-Ard, King Mongkut's University of Technology Thonburi (KMUTT) (TH) 08/2024 # Author: Vincenzo Eduardo Padulano, CERN 10/2024 # Author: Martin Føll, University of Oslo (UiO) & CERN 01/2026 +# Author: Silia Taider, CERN 02/2026 ################################################################################ # Copyright (C) 1995-2026, Rene Brun and Fons Rademakers. # @@ -501,6 +502,8 @@ def GetValidationBatch(self) -> Any: class LoadingThreadContext: def __init__(self, base_generator: BaseGenerator): self.base_generator = base_generator + # init loading thread + self.base_generator.Activate() # create training batches from the first chunk self.base_generator.CreateTrainBatches() @@ -593,6 +596,8 @@ def __call__(self) -> Any: class LoadingThreadContextVal: def __init__(self, base_generator: BaseGenerator): self.base_generator = base_generator + # init loading thread + self.base_generator.Activate() # create validation batches from the first chunk self.base_generator.CreateValidationBatches() @@ -668,7 +673,6 @@ def __call__(self) -> Any: while True: batch = self.base_generator.GetValidationBatch() if batch is None: - self.base_generator.DeActivateValidationEpoch() break yield self.conversion_function(batch) diff --git a/tree/ml/inc/ROOT/ML/RBatchGenerator.hxx b/tree/ml/inc/ROOT/ML/RBatchGenerator.hxx index 2fd74b6d76cbc..29d231fc5f66f 100644 --- a/tree/ml/inc/ROOT/ML/RBatchGenerator.hxx +++ b/tree/ml/inc/ROOT/ML/RBatchGenerator.hxx @@ -3,6 +3,7 @@ // Author: Nopphakorn Subsa-Ard, King Mongkut's University of Technology Thonburi (KMUTT) (TH) 08/2024 // Author: Vincenzo Eduardo Padulano, CERN 10/2024 // Author: Martin Føll, University of Oslo (UiO) & CERN 01/2026 +// Author: Silia Taider, CERN 02/2026 /************************************************************************* * Copyright (C) 1995-2026, Rene Brun and Fons Rademakers. * @@ -72,11 +73,11 @@ private: std::vector f_rdfs; std::unique_ptr fLoadingThread; + std::condition_variable fLoadingCondition; + std::mutex fLoadingMutex; - std::size_t fTrainingChunkNum; - std::size_t fValidationChunkNum; - - std::mutex fIsActiveMutex; + std::size_t fTrainingChunkNum{0}; + std::size_t fValidationChunkNum{0}; bool fDropRemainder; bool fShuffle; @@ -108,10 +109,8 @@ private: RFlat2DMatrix fSampledTrainingDataset; RFlat2DMatrix fSampledValidationDataset; - RFlat2DMatrix fTrainTensor; RFlat2DMatrix fTrainChunkTensor; - RFlat2DMatrix fValidationTensor; RFlat2DMatrix fValidationChunkTensor; public: @@ -186,10 +185,10 @@ public: fNumValidationChunks = fChunkLoader->GetNumValidationChunks(); } - fTrainingBatchLoader = - std::make_unique(fBatchSize, fCols, fVecSizes, fNumTrainingEntries, fDropRemainder); - fValidationBatchLoader = - std::make_unique(fBatchSize, fCols, fVecSizes, fNumValidationEntries, fDropRemainder); + fTrainingBatchLoader = std::make_unique(fBatchSize, fCols, fLoadingMutex, fLoadingCondition, + fVecSizes, fNumTrainingEntries, fDropRemainder); + fValidationBatchLoader = std::make_unique(fBatchSize, fCols, fLoadingMutex, fLoadingCondition, + fVecSizes, fNumValidationEntries, fDropRemainder); } ~RBatchGenerator() { DeActivate(); } @@ -197,54 +196,217 @@ public: void DeActivate() { { - std::lock_guard lock(fIsActiveMutex); + std::lock_guard lock(fLoadingMutex); + if (!fIsActive) + return; fIsActive = false; } - fTrainingBatchLoader->DeActivate(); - fValidationBatchLoader->DeActivate(); + fLoadingCondition.notify_all(); if (fLoadingThread) { if (fLoadingThread->joinable()) { fLoadingThread->join(); } } + + fLoadingThread.reset(); } - /// \brief Activate the loading process by starting the batchloader, and - /// spawning the loading thread. + /// \brief Activate the loading process by spawning the loading thread. void Activate() { - if (fIsActive) + { + std::lock_guard lock(fLoadingMutex); + if (fIsActive) + return; + + fIsActive = true; + } + + if (fLoadEager) { return; + } + + fLoadingThread = std::make_unique(&RBatchGenerator::LoadChunks, this); + } + + void ActivateEpoch() + { + std::lock_guard lock(fLoadingMutex); + fEpochActive = true; + } + + void DeActivateEpoch() + { + std::lock_guard lock(fLoadingMutex); + fEpochActive = false; + } + /// \brief Activate the training epoch by starting the batchloader. + void ActivateTrainingEpoch() + { { - std::lock_guard lock(fIsActiveMutex); - fIsActive = true; + std::lock_guard lock(fLoadingMutex); + fTrainingEpochActive = true; + fTrainingChunkNum = 0; } fTrainingBatchLoader->Activate(); - fValidationBatchLoader->Activate(); - // fLoadingThread = std::make_unique(&RBatchGenerator::LoadChunks, this); + fLoadingCondition.notify_all(); } - void ActivateEpoch() { fEpochActive = true; } + void DeActivateTrainingEpoch() + { + { + std::lock_guard lock(fLoadingMutex); + fTrainingEpochActive = false; + } - void DeActivateEpoch() { fEpochActive = false; } + fTrainingBatchLoader->Reset(); + fTrainingBatchLoader->DeActivate(); + fLoadingCondition.notify_all(); + } - void ActivateTrainingEpoch() { fTrainingEpochActive = true; } + void ActivateValidationEpoch() + { + { + std::lock_guard lock(fLoadingMutex); + fValidationEpochActive = true; + fValidationChunkNum = 0; + } - void DeActivateTrainingEpoch() { fTrainingEpochActive = false; } + fValidationBatchLoader->Activate(); + fLoadingCondition.notify_all(); + } - void ActivateValidationEpoch() { fValidationEpochActive = true; } + void DeActivateValidationEpoch() + { + { + std::lock_guard lock(fLoadingMutex); + fValidationEpochActive = false; + } - void DeActivateValidationEpoch() { fValidationEpochActive = false; } + fValidationBatchLoader->Reset(); + fValidationBatchLoader->DeActivate(); + fLoadingCondition.notify_all(); + } + + /// \brief Main loop for loading chunks and creating batches. + /// The producer (loading thread) will keep loading chunks and creating batches until the end of the epoch is reached, or the generator is deactivated. + void LoadChunks() + { + // Set minimum number of batches to keep in the queue before producer goes to work. + // This is to ensure that the producer will get a chance to work if the consumer is too fast and drains the queue quickly. + // With this, the maximum queue size will be approximately fChunkSize*1.5. + // TODO(staider): improve this heuristic by taking into consideration a "maximum number of batches in memory" set by the user. + const std::size_t kMinQueuedBatches = std::max(1, (fChunkSize / fBatchSize) / 2); + + std::unique_lock lock(fLoadingMutex); + + while (true) { + // Wait until we have work or shutdown + fLoadingCondition.wait(lock, [&] { + return !fIsActive || (fTrainingEpochActive && fTrainingChunkNum < fNumTrainingChunks) || + (fValidationEpochActive && fValidationChunkNum < fNumValidationChunks); + }); + + if (!fIsActive) + break; + + // Helper: check if validation queue below watermark and needs the producer + auto validationEmpty = [&] { + if (!fValidationEpochActive || fValidationChunkNum >= fNumValidationChunks) + return false; + if (fValidationBatchLoader->isProducerDone()) + return false; + return fValidationBatchLoader->GetNumBatchQueue() < kMinQueuedBatches; + }; + + // -- TRAINING -- + if (fTrainingEpochActive) { + while (true) { + // Stop conditions (shutdown or epoch end) + if (!fIsActive || !fTrainingEpochActive) + break; + + // No more chunks to load: signal consumers + if (fTrainingChunkNum >= fNumTrainingChunks) { + fTrainingBatchLoader->MarkProducerDone(); + break; + } + + // In the case of training prefetching, we could start requesting data for the next training loop while validation is active and might need data. + // To avoid getting stuck in the training loop, we check if the validation queue is below watermark and if so, we break out of the training loop. + if (validationEmpty()){ + break; + } + + // If queue is not empty, wait until it drains below watermark, or validation needs data, or we are + // deactivated. + if (fTrainingBatchLoader->GetNumBatchQueue() >= kMinQueuedBatches) { + fLoadingCondition.wait(lock, [&] { + return !fIsActive || !fTrainingEpochActive || + fTrainingBatchLoader->GetNumBatchQueue() < kMinQueuedBatches || validationEmpty(); + }); + continue; + } + + // Claim chunk under lock + const std::size_t chunkIdx = fTrainingChunkNum++; + const bool isLastTrainChunk = (chunkIdx == fNumTrainingChunks - 1); + + // Release lock while reading and loading data to allow the consumer to access the queue freely in parallel. + // The loading thread re-acquires the lock in CreateBatches when it needs to push batches to the queue. + lock.unlock(); + fChunkLoader->LoadTrainingChunk(fTrainChunkTensor, chunkIdx); + fTrainingBatchLoader->CreateBatches(fTrainChunkTensor, isLastTrainChunk); + lock.lock(); + } + } + + // -- VALIDATION -- + if (fValidationEpochActive) { + while (true) { + // Stop conditions (shutdown or epoch end) + if (!fIsActive || !fValidationEpochActive) + break; + + // No more chunks to load: signal consumers + if (fValidationChunkNum >= fNumValidationChunks) { + fValidationBatchLoader->MarkProducerDone(); + break; + } + + // If queue is not hungry, wait until it drains below watermark, or we are deactivated + if (fValidationBatchLoader->GetNumBatchQueue() >= kMinQueuedBatches) { + fLoadingCondition.wait(lock, [&] { + return !fIsActive || !fValidationEpochActive || + fValidationBatchLoader->GetNumBatchQueue() < kMinQueuedBatches; + }); + continue; + } + + // Claim chunk under lock + const std::size_t chunkIdx = fValidationChunkNum++; + const bool isLastValidationChunk = (chunkIdx == fNumValidationChunks - 1); + + // Release lock while working + lock.unlock(); + fChunkLoader->LoadValidationChunk(fValidationChunkTensor, chunkIdx); + fValidationBatchLoader->CreateBatches(fValidationChunkTensor, isLastValidationChunk); + lock.lock(); + } + } + } + } /// \brief Create training batches by first loading a chunk (see RChunkLoader) and split it into batches (see /// RBatchLoader) void CreateTrainBatches() { - fTrainingEpochActive = true; + fTrainingBatchLoader->Activate(); + if (fLoadEager) { if (fSampleType == "") { fTensorOperators->ShuffleTensor(fSampledTrainingDataset, fTrainingDataset); @@ -254,15 +416,10 @@ public: fTrainingSampler->Sampler(fSampledTrainingDataset); } - fTrainingBatchLoader->CreateBatches(fSampledTrainingDataset, 1); - } - - else { + fTrainingBatchLoader->CreateBatches(fSampledTrainingDataset, true); + fTrainingBatchLoader->MarkProducerDone(); + } else { fChunkLoader->CreateTrainingChunksIntervals(); - fTrainingChunkNum = 0; - fChunkLoader->LoadTrainingChunk(fTrainChunkTensor, fTrainingChunkNum); - fTrainingBatchLoader->CreateBatches(fTrainChunkTensor, fNumTrainingChunks); - fTrainingChunkNum++; } } @@ -270,7 +427,8 @@ public: /// (see RBatchLoader) void CreateValidationBatches() { - fValidationEpochActive = true; + fValidationBatchLoader->Activate(); + if (fLoadEager) { if (fSampleType == "") { fTensorOperators->ShuffleTensor(fSampledValidationDataset, fValidationDataset); @@ -280,32 +438,18 @@ public: fValidationSampler->Sampler(fSampledValidationDataset); } - fValidationBatchLoader->CreateBatches(fSampledValidationDataset, 1); + fValidationBatchLoader->CreateBatches(fSampledValidationDataset, true); + fValidationBatchLoader->MarkProducerDone(); } else { fChunkLoader->CreateValidationChunksIntervals(); - fValidationChunkNum = 0; - fChunkLoader->LoadValidationChunk(fValidationChunkTensor, fValidationChunkNum); - fValidationBatchLoader->CreateBatches(fValidationChunkTensor, fNumValidationChunks); - fValidationChunkNum++; } } /// \brief Loads a training batch from the queue RFlat2DMatrix GetTrainBatch() { - if (!fLoadEager) { - auto batchQueue = fTrainingBatchLoader->GetNumBatchQueue(); - - // load the next chunk if the queue is empty - if (batchQueue < 1 && fTrainingChunkNum < fNumTrainingChunks) { - fChunkLoader->LoadTrainingChunk(fTrainChunkTensor, fTrainingChunkNum); - std::size_t lastTrainingBatch = fNumTrainingChunks - fTrainingChunkNum; - fTrainingBatchLoader->CreateBatches(fTrainChunkTensor, lastTrainingBatch); - fTrainingChunkNum++; - } - } // Get next batch if available return fTrainingBatchLoader->GetBatch(); } @@ -313,17 +457,6 @@ public: /// \brief Loads a validation batch from the queue RFlat2DMatrix GetValidationBatch() { - if (!fLoadEager) { - auto batchQueue = fValidationBatchLoader->GetNumBatchQueue(); - - // load the next chunk if the queue is empty - if (batchQueue < 1 && fValidationChunkNum < fNumValidationChunks) { - fChunkLoader->LoadValidationChunk(fValidationChunkTensor, fValidationChunkNum); - std::size_t lastValidationBatch = fNumValidationChunks - fValidationChunkNum; - fValidationBatchLoader->CreateBatches(fValidationChunkTensor, lastValidationBatch); - fValidationChunkNum++; - } - } // Get next batch if available return fValidationBatchLoader->GetBatch(); } @@ -334,10 +467,23 @@ public: std::size_t TrainRemainderRows() { return fTrainingBatchLoader->GetNumRemainderRows(); } std::size_t ValidationRemainderRows() { return fValidationBatchLoader->GetNumRemainderRows(); } - bool IsActive() { return fIsActive; } - bool TrainingIsActive() { return fTrainingEpochActive; } - /// \brief Returns the next batch of validation data if available. - /// Returns empty RTensor otherwise. + bool IsActive() + { + std::lock_guard lock(fLoadingMutex); + return fIsActive; + } + + bool IsTrainingActive() + { + std::lock_guard lock(fLoadingMutex); + return fTrainingEpochActive; + } + + bool IsValidationActive() + { + std::lock_guard lock(fLoadingMutex); + return fValidationEpochActive; + } }; } // namespace ROOT::Experimental::Internal::ML diff --git a/tree/ml/inc/ROOT/ML/RBatchLoader.hxx b/tree/ml/inc/ROOT/ML/RBatchLoader.hxx index 05ec5e59d416a..c6571d9fbb437 100644 --- a/tree/ml/inc/ROOT/ML/RBatchLoader.hxx +++ b/tree/ml/inc/ROOT/ML/RBatchLoader.hxx @@ -3,6 +3,7 @@ // Author: Nopphakorn Subsa-Ard, King Mongkut's University of Technology Thonburi (KMUTT) (TH) 08/2024 // Author: Vincenzo Eduardo Padulano, CERN 10/2024 // Author: Martin Føll, University of Oslo (UiO) & CERN 05/2025 +// Author: Silia Taider, CERN 02/2026 /************************************************************************* * Copyright (C) 1995-2025, Rene Brun and Fons Rademakers. * @@ -28,7 +29,7 @@ namespace ROOT::Experimental::Internal::ML { /** -\class ROOT::Experimental::Internal::ML::RBatchLoader + \class ROOT::Experimental::Internal::ML::RBatchLoader \brief Building and loading the batches from loaded chunks in RChunkLoader @@ -41,6 +42,8 @@ private: std::size_t fBatchSize; // needed for calculating the total number of batch columns when vectors columns are present std::vector fCols; + std::mutex &fLock; + std::condition_variable &fCV; std::vector fVecSizes; std::size_t fSumVecSizes; std::size_t fNumColumns; @@ -53,9 +56,7 @@ private: std::size_t fLeftoverBatchSize; bool fIsActive = false; - - std::mutex fBatchLock; - std::condition_variable fBatchCondition; + bool fProducerDone = true; // queues of flattened tensors (rows * cols) std::queue> fBatchQueue; @@ -68,11 +69,17 @@ private: std::unique_ptr fSecondaryLeftoverBatch; public: - RBatchLoader(std::size_t batchSize, const std::vector &cols, - const std::vector &vecSizes = {}, std::size_t numEntries = 0, bool dropRemainder = false) - : fBatchSize(batchSize), fCols(cols), fVecSizes(vecSizes), fNumEntries(numEntries), fDropRemainder(dropRemainder) + RBatchLoader(std::size_t batchSize, const std::vector &cols, std::mutex &sharedMutex, + std::condition_variable &sharedCV, const std::vector &vecSizes = {}, + std::size_t numEntries = 0, bool dropRemainder = false) + : fBatchSize(batchSize), + fCols(cols), + fLock(sharedMutex), + fCV(sharedCV), + fVecSizes(vecSizes), + fNumEntries(numEntries), + fDropRemainder(dropRemainder) { - fSumVecSizes = std::accumulate(fVecSizes.begin(), fVecSizes.end(), 0); fNumColumns = fCols.size() + fSumVecSizes - fVecSizes.size(); @@ -87,9 +94,7 @@ public: if (fDropRemainder) { fNumBatches = fNumFullBatches; - } - - else { + } else { fNumBatches = fNumFullBatches + fNumLeftoverBatches; } @@ -97,25 +102,32 @@ public: fSecondaryLeftoverBatch = std::make_unique(); } -public: + /// \brief Activate the batchloader. This means that batches can be created and loaded. void Activate() { { - std::lock_guard lock(fBatchLock); + std::lock_guard lock(fLock); + if (fIsActive) + return; fIsActive = true; + fProducerDone = false; } - fBatchCondition.notify_all(); + + fCV.notify_all(); } /// \brief DeActivate the batchloader. This means that no more batches are created. - /// Batches can still be returned if they are already loaded + /// Batches can still be returned if they are already loaded. void DeActivate() { { - std::lock_guard lock(fBatchLock); + std::lock_guard lock(fLock); + if (!fIsActive) + return; fIsActive = false; } - fBatchCondition.notify_all(); + + fCV.notify_all(); } /// \brief Return a batch of data as a unique pointer. @@ -132,26 +144,36 @@ public: return batch; } - /// \brief Loading the batch from the queue + /// \brief Loading the batch from the queue. /// \return Batch RFlat2DMatrix GetBatch() { + std::unique_lock lock(fLock); + + // Wait until: + // - there is data in the queue + // - or producer declares "done" + // - or we are deactivated + fCV.wait(lock, [&] { return !fBatchQueue.empty() || fProducerDone || !fIsActive; }); if (fBatchQueue.empty()) { + // producer done and no queued data -> end-of-epoch signal fCurrentBatch = std::make_unique(); return *fCurrentBatch; } fCurrentBatch = std::move(fBatchQueue.front()); fBatchQueue.pop(); + // Notify the loading thread that the queue has drained + fCV.notify_all(); return *fCurrentBatch; } /// \brief Creating the batches from a chunk and add them to the queue. /// \param[in] chunkTensor Tensor with the data from the chunk - /// \param[in] lastbatch Check if the batch in the chunk is the last one - void CreateBatches(RFlat2DMatrix &chunkTensor, std::size_t lastbatch) + /// \param[in] isLastBatch Check if the batch in the chunk is the last one + void CreateBatches(RFlat2DMatrix &chunkTensor, bool isLastBatch) { std::size_t ChunkSize = chunkTensor.GetRows(); std::size_t NumCols = chunkTensor.GetCols(); @@ -163,7 +185,6 @@ public: // fill the full batches from the chunk into a vector for (std::size_t i = 0; i < Batches; i++) { - // Fill a batch batches.emplace_back(CreateBatch(chunkTensor, i)); } @@ -183,7 +204,7 @@ public: std::copy(LeftoverBatch.GetData(), LeftoverBatch.GetData() + (LeftoverBatchSize * fNumColumns), fPrimaryLeftoverBatch->GetData() + (PrimaryLeftoverSize * NumCols)); - // copy LeftoverBatch to end of fPrimaryLeftoverBatch and add it to the batch vector + // copy LeftoverBatch to end of fPrimaryLeftoverBatch and add it to the batch if (emptySlots == LeftoverBatchSize) { auto copy = std::make_unique(fBatchSize, fNumColumns); std::copy(fPrimaryLeftoverBatch->GetData(), fPrimaryLeftoverBatch->GetData() + (fBatchSize * fNumColumns), @@ -198,7 +219,7 @@ public: // copy LeftoverBatch to both fPrimaryLeftoverBatch and fSecondaryLeftoverBatch else if (emptySlots < LeftoverBatchSize) { - // copy the first part of LeftoverBatch to end of fPrimaryLeftoverTrainingBatch + // copy the first part of LeftoverBatch to end of fPrimaryLeftoverTrainingBatch fPrimaryLeftoverBatch->Resize(fBatchSize, NumCols); std::copy(LeftoverBatch.GetData(), LeftoverBatch.GetData() + (emptySlots * NumCols), fPrimaryLeftoverBatch->GetData() + (PrimaryLeftoverSize * NumCols)); @@ -216,14 +237,12 @@ public: // exchange fPrimaryLeftoverBatch and fSecondaryLeftoverBatch *fPrimaryLeftoverBatch = *fSecondaryLeftoverBatch; - // reset fSecondaryLeftoverTrainingBatch fSecondaryLeftoverBatch = std::make_unique(); } // copy the content of fPrimaryLeftoverBatch to the leftover batch from the chunk - if (lastbatch == 1) { - + if (isLastBatch) { if (fDropRemainder == false && fLeftoverBatchSize > 0) { auto copy = std::make_unique(fLeftoverBatchSize, fNumColumns); std::copy(fPrimaryLeftoverBatch->GetData(), @@ -235,12 +254,42 @@ public: fSecondaryLeftoverBatch = std::make_unique(); } - // append the batches from the batch vector from the chunk to the training batch queue - for (std::size_t i = 0; i < batches.size(); i++) { - fBatchQueue.push(std::move(batches[i])); + { + std::lock_guard lock(fLock); + for (std::size_t i = 0; i < batches.size(); i++) { + fBatchQueue.push(std::move(batches[i])); + } } + + fCV.notify_all(); + } + + /// \brief Reset the batchloader state. + void Reset() + { + { + std::lock_guard lock(fLock); + + while (!fBatchQueue.empty()) { + fBatchQueue.pop(); + } + + fCurrentBatch.reset(); + fPrimaryLeftoverBatch = std::make_unique(); + fSecondaryLeftoverBatch = std::make_unique(); + } + + fCV.notify_all(); + } + + /// \brief Signal that the producer has finished pushing all batches for this epoch. + void MarkProducerDone() + { + fProducerDone = true; + fCV.notify_all(); } + bool isProducerDone() { return fProducerDone; } std::size_t GetNumBatches() { return fNumBatches; } std::size_t GetNumEntries() { return fNumEntries; } std::size_t GetNumRemainderRows() { return fLeftoverBatchSize; } From f9cafd39c21e5f33b9a328284bc9d66fdeea53da Mon Sep 17 00:00:00 2001 From: Silia Taider Date: Tue, 24 Feb 2026 13:54:34 +0100 Subject: [PATCH 2/3] [ML][Python] Change CreateNumPyGenerators signature for consistency + refactoring --- .../ROOT/_pythonization/_ml_dataloader.py | 32 +++++++++++-------- .../pythonizations/test/ml_dataloader.py | 6 ++-- 2 files changed, 21 insertions(+), 17 deletions(-) diff --git a/bindings/pyroot/pythonizations/python/ROOT/_pythonization/_ml_dataloader.py b/bindings/pyroot/pythonizations/python/ROOT/_pythonization/_ml_dataloader.py index 634568c771a2a..cee6721c7ff83 100644 --- a/bindings/pyroot/pythonizations/python/ROOT/_pythonization/_ml_dataloader.py +++ b/bindings/pyroot/pythonizations/python/ROOT/_pythonization/_ml_dataloader.py @@ -271,14 +271,17 @@ def __init__( atexit.register(self.DeActivate) @property - def is_active(self): + def isActive(self): return self.generator.IsActive() - def is_training_active(self): - return self.generator.TrainingIsActive() + def isTrainingActive(self): + return self.generator.IsTrainingActive() + + def isValidationActive(self): + return self.generator.IsValidationActive() def Activate(self): - """Initialize the generator to be used for a loop""" + """Initialize the generator to be used for a loop, this spawns the loading thread""" self.generator.Activate() def DeActivate(self): @@ -286,27 +289,27 @@ def DeActivate(self): self.generator.DeActivate() def ActivateTrainingEpoch(self): - """Activate the generator""" + """Activate the training epoch of the generator""" self.generator.ActivateTrainingEpoch() def ActivateValidationEpoch(self): - """Activate the generator""" + """Activate the validation epoch of the generator""" self.generator.ActivateValidationEpoch() def DeActivateTrainingEpoch(self): - """Deactivate the generator""" + """Deactivate the training epoch of the generator""" self.generator.DeActivateTrainingEpoch() def DeActivateValidationEpoch(self): - """Deactivate the generator""" + """Deactivate the validation epoch of the generator""" self.generator.DeActivateValidationEpoch() def CreateTrainBatches(self): - """Deactivate the generator""" + """Create the first training batches from the first chunk""" self.generator.CreateTrainBatches() def CreateValidationBatches(self): - """Deactivate the generator""" + """Create the first validation batches from the first chunk""" self.generator.CreateValidationBatches() def GetSample(self): @@ -509,10 +512,10 @@ def __init__(self, base_generator: BaseGenerator): def __enter__(self): self.base_generator.ActivateTrainingEpoch() + return self def __exit__(self, type, value, traceback): self.base_generator.DeActivateTrainingEpoch() - return True class TrainDataLoader: @@ -603,10 +606,10 @@ def __init__(self, base_generator: BaseGenerator): def __enter__(self): self.base_generator.ActivateValidationEpoch() + return self def __exit__(self, type, value, traceback): self.base_generator.DeActivateValidationEpoch() - return True class ValidationDataLoader: @@ -811,7 +814,7 @@ def CreateNumPyGenerators( train_generator = TrainDataLoader(base_generator, base_generator.ConvertBatchToNumpy) if validation_split == 0.0: - return train_generator, None + return train_generator validation_generator = ValidationDataLoader(base_generator, base_generator.ConvertBatchToNumpy) @@ -950,7 +953,6 @@ def CreateTFDatasets( ) train_generator = TrainDataLoader(base_generator, base_generator.ConvertBatchToTF) - validation_generator = ValidationDataLoader(base_generator, base_generator.ConvertBatchToTF) num_train_columns = len(train_generator.train_columns) num_target_columns = len(train_generator.target_columns) @@ -986,6 +988,8 @@ def CreateTFDatasets( if validation_split == 0.0: return ds_train + validation_generator = ValidationDataLoader(base_generator, base_generator.ConvertBatchToTF) + ds_validation = tf.data.Dataset.from_generator(validation_generator, output_signature=batch_signature) # Give access to the columns function of the validation set diff --git a/bindings/pyroot/pythonizations/test/ml_dataloader.py b/bindings/pyroot/pythonizations/test/ml_dataloader.py index 5445114f08125..40ffa0197c8b9 100644 --- a/bindings/pyroot/pythonizations/test/ml_dataloader.py +++ b/bindings/pyroot/pythonizations/test/ml_dataloader.py @@ -540,7 +540,7 @@ def test09_filtered_last_chunk(self): dff = df.Filter("b1 % 2 == 0", "name") - gen_train, _ = ROOT.Experimental.ML.CreateNumPyGenerators( + gen_train = ROOT.Experimental.ML.CreateNumPyGenerators( dff, batch_size=3, chunk_size=9, @@ -1622,7 +1622,7 @@ def test09_filtered_last_chunk(self): dff = df.Filter("b1 % 2 == 0", "name") - gen_train, _ = ROOT.Experimental.ML.CreateNumPyGenerators( + gen_train = ROOT.Experimental.ML.CreateNumPyGenerators( dff, batch_size=3, target="b2", validation_split=0, shuffle=False, drop_remainder=False, load_eager=True ) @@ -2784,7 +2784,7 @@ def test09_filtered_last_chunk(self): dff1 = df1.Filter("b1 % 2 == 0", "name") dff2 = df2.Filter("b1 % 2 == 0", "name") - gen_train, _ = ROOT.Experimental.ML.CreateNumPyGenerators( + gen_train = ROOT.Experimental.ML.CreateNumPyGenerators( [dff1, dff2], batch_size=3, target="b2", From d9e82ae9ba0d6f0d51ae43c46916480e9b32a9d4 Mon Sep 17 00:00:00 2001 From: Silia Taider Date: Mon, 23 Feb 2026 17:35:30 +0100 Subject: [PATCH 3/3] [ML] Clean up and formatting of the data loader headers --- tree/ml/inc/ROOT/ML/RBatchGenerator.hxx | 46 ++++++++----------- tree/ml/inc/ROOT/ML/RBatchLoader.hxx | 59 ++++++++++++------------- 2 files changed, 48 insertions(+), 57 deletions(-) diff --git a/tree/ml/inc/ROOT/ML/RBatchGenerator.hxx b/tree/ml/inc/ROOT/ML/RBatchGenerator.hxx index 29d231fc5f66f..1cfb9256a0b21 100644 --- a/tree/ml/inc/ROOT/ML/RBatchGenerator.hxx +++ b/tree/ml/inc/ROOT/ML/RBatchGenerator.hxx @@ -40,7 +40,7 @@ namespace ROOT::Experimental::ML { namespace ROOT::Experimental::Internal::ML { /** -\class ROOT::Experimental::Internal::ML::RBatchGenerator + \class ROOT::Experimental::Internal::ML::RBatchGenerator \brief In this class, the processes of loading chunks (see RChunkLoader) and creating batches from those chunks (see @@ -70,7 +70,7 @@ private: std::unique_ptr fTensorOperators; - std::vector f_rdfs; + std::vector fRdfs; std::unique_ptr fLoadingThread; std::condition_variable fLoadingCondition; @@ -121,7 +121,7 @@ public: bool dropRemainder = true, const std::size_t setSeed = 0, bool loadEager = false, std::string sampleType = "", float sampleRatio = 1.0, bool replacement = false) - : f_rdfs(rdfs), + : fRdfs(rdfs), fCols(cols), fVecSizes(vecSizes), fChunkSize(chunkSize), @@ -141,7 +141,7 @@ public: fTensorOperators = std::make_unique(fShuffle, fSetSeed); if (fLoadEager) { - fDatasetLoader = std::make_unique>(f_rdfs, fValidationSplit, fCols, fVecSizes, + fDatasetLoader = std::make_unique>(fRdfs, fValidationSplit, fCols, fVecSizes, vecPadding, fShuffle, fSetSeed); // split the datasets and extract the training and validation datasets fDatasetLoader->SplitDatasets(); @@ -171,7 +171,7 @@ public: } else { - fChunkLoader = std::make_unique>(f_rdfs[0], fChunkSize, fBlockSize, fValidationSplit, + fChunkLoader = std::make_unique>(fRdfs[0], fChunkSize, fBlockSize, fValidationSplit, fCols, fVecSizes, vecPadding, fShuffle, fSetSeed); // split the dataset into training and validation sets @@ -231,18 +231,6 @@ public: fLoadingThread = std::make_unique(&RBatchGenerator::LoadChunks, this); } - void ActivateEpoch() - { - std::lock_guard lock(fLoadingMutex); - fEpochActive = true; - } - - void DeActivateEpoch() - { - std::lock_guard lock(fLoadingMutex); - fEpochActive = false; - } - /// \brief Activate the training epoch by starting the batchloader. void ActivateTrainingEpoch() { @@ -293,13 +281,15 @@ public: } /// \brief Main loop for loading chunks and creating batches. - /// The producer (loading thread) will keep loading chunks and creating batches until the end of the epoch is reached, or the generator is deactivated. + /// The producer (loading thread) will keep loading chunks and creating batches until the end of the epoch is + /// reached, or the generator is deactivated. void LoadChunks() { // Set minimum number of batches to keep in the queue before producer goes to work. - // This is to ensure that the producer will get a chance to work if the consumer is too fast and drains the queue quickly. - // With this, the maximum queue size will be approximately fChunkSize*1.5. - // TODO(staider): improve this heuristic by taking into consideration a "maximum number of batches in memory" set by the user. + // This is to ensure that the producer will get a chance to work if the consumer is too fast and drains the queue + // quickly. With this, the maximum queue size will be approximately fChunkSize*1.5. + // TODO(staider): improve this heuristic by taking into consideration a "maximum number of batches in memory" set + // by the user. const std::size_t kMinQueuedBatches = std::max(1, (fChunkSize / fBatchSize) / 2); std::unique_lock lock(fLoadingMutex); @@ -336,9 +326,10 @@ public: break; } - // In the case of training prefetching, we could start requesting data for the next training loop while validation is active and might need data. - // To avoid getting stuck in the training loop, we check if the validation queue is below watermark and if so, we break out of the training loop. - if (validationEmpty()){ + // In the case of training prefetching, we could start requesting data for the next training loop while + // validation is active and might need data. To avoid getting stuck in the training loop, we check if the + // validation queue is below watermark and if so, we break out of the training loop. + if (validationEmpty()) { break; } @@ -356,8 +347,9 @@ public: const std::size_t chunkIdx = fTrainingChunkNum++; const bool isLastTrainChunk = (chunkIdx == fNumTrainingChunks - 1); - // Release lock while reading and loading data to allow the consumer to access the queue freely in parallel. - // The loading thread re-acquires the lock in CreateBatches when it needs to push batches to the queue. + // Release lock while reading and loading data to allow the consumer to access the queue freely in + // parallel. The loading thread re-acquires the lock in CreateBatches when it needs to push batches to + // the queue. lock.unlock(); fChunkLoader->LoadTrainingChunk(fTrainChunkTensor, chunkIdx); fTrainingBatchLoader->CreateBatches(fTrainChunkTensor, isLastTrainChunk); @@ -488,4 +480,4 @@ public: } // namespace ROOT::Experimental::Internal::ML -#endif // ROOT_INTERNAL_ML_RBATCHGENERATOR +#endif // ROOT_INTERNAL_ML_RBATCHGENERATOR \ No newline at end of file diff --git a/tree/ml/inc/ROOT/ML/RBatchLoader.hxx b/tree/ml/inc/ROOT/ML/RBatchLoader.hxx index c6571d9fbb437..c167017a9b996 100644 --- a/tree/ml/inc/ROOT/ML/RBatchLoader.hxx +++ b/tree/ml/inc/ROOT/ML/RBatchLoader.hxx @@ -51,7 +51,6 @@ private: bool fDropRemainder; std::size_t fNumFullBatches; - std::size_t fNumLeftoverBatches; std::size_t fNumBatches; std::size_t fLeftoverBatchSize; @@ -90,12 +89,12 @@ public: fLeftoverBatchSize = fNumEntries % fBatchSize; fNumFullBatches = fNumEntries / fBatchSize; - fNumLeftoverBatches = fLeftoverBatchSize == 0 ? 0 : 1; + std::size_t numLeftoverBatches = fLeftoverBatchSize == 0 ? 0 : 1; if (fDropRemainder) { fNumBatches = fNumFullBatches; } else { - fNumBatches = fNumFullBatches + fNumLeftoverBatches; + fNumBatches = fNumFullBatches + numLeftoverBatches; } fPrimaryLeftoverBatch = std::make_unique(); @@ -135,11 +134,11 @@ public: /// \param[in] chunkTensor Tensor with the data from the chunk /// \param[in] idxs Index of batch in the chunk /// \return Batch - std::unique_ptr CreateBatch(RFlat2DMatrix &chunTensor, std::size_t idxs) + std::unique_ptr CreateBatch(RFlat2DMatrix &chunkTensor, std::size_t idxs) { auto batch = std::make_unique(fBatchSize, fNumColumns); - std::copy(chunTensor.GetData() + (idxs * fBatchSize * fNumColumns), - chunTensor.GetData() + ((idxs + 1) * fBatchSize * fNumColumns), batch->GetData()); + std::copy(chunkTensor.GetData() + (idxs * fBatchSize * fNumColumns), + chunkTensor.GetData() + ((idxs + 1) * fBatchSize * fNumColumns), batch->GetData()); return batch; } @@ -175,23 +174,23 @@ public: /// \param[in] isLastBatch Check if the batch in the chunk is the last one void CreateBatches(RFlat2DMatrix &chunkTensor, bool isLastBatch) { - std::size_t ChunkSize = chunkTensor.GetRows(); - std::size_t NumCols = chunkTensor.GetCols(); - std::size_t Batches = ChunkSize / fBatchSize; - std::size_t LeftoverBatchSize = ChunkSize % fBatchSize; + std::size_t chunkSize = chunkTensor.GetRows(); + std::size_t numCols = chunkTensor.GetCols(); + std::size_t numBatches = chunkSize / fBatchSize; + std::size_t leftoverBatchSize = chunkSize % fBatchSize; // create a vector of batches std::vector> batches; // fill the full batches from the chunk into a vector - for (std::size_t i = 0; i < Batches; i++) { + for (std::size_t i = 0; i < numBatches; i++) { batches.emplace_back(CreateBatch(chunkTensor, i)); } // copy the remaining entries from the chunk into a leftover batch - RFlat2DMatrix LeftoverBatch(LeftoverBatchSize, NumCols); - std::copy(chunkTensor.GetData() + (Batches * fBatchSize * NumCols), - chunkTensor.GetData() + (Batches * fBatchSize * NumCols + LeftoverBatchSize * NumCols), + RFlat2DMatrix LeftoverBatch(leftoverBatchSize, numCols); + std::copy(chunkTensor.GetData() + (numBatches * fBatchSize * numCols), + chunkTensor.GetData() + (numBatches * fBatchSize * numCols + leftoverBatchSize * numCols), LeftoverBatch.GetData()); // calculate how many empty slots are left in fPrimaryLeftoverBatch @@ -199,13 +198,13 @@ public: std::size_t emptySlots = fBatchSize - PrimaryLeftoverSize; // copy LeftoverBatch to end of fPrimaryLeftoverBatch - if (emptySlots >= LeftoverBatchSize) { - fPrimaryLeftoverBatch->Resize(PrimaryLeftoverSize + LeftoverBatchSize, NumCols); - std::copy(LeftoverBatch.GetData(), LeftoverBatch.GetData() + (LeftoverBatchSize * fNumColumns), - fPrimaryLeftoverBatch->GetData() + (PrimaryLeftoverSize * NumCols)); + if (emptySlots >= leftoverBatchSize) { + fPrimaryLeftoverBatch->Resize(PrimaryLeftoverSize + leftoverBatchSize, numCols); + std::copy(LeftoverBatch.GetData(), LeftoverBatch.GetData() + (leftoverBatchSize * fNumColumns), + fPrimaryLeftoverBatch->GetData() + (PrimaryLeftoverSize * numCols)); // copy LeftoverBatch to end of fPrimaryLeftoverBatch and add it to the batch - if (emptySlots == LeftoverBatchSize) { + if (emptySlots == leftoverBatchSize) { auto copy = std::make_unique(fBatchSize, fNumColumns); std::copy(fPrimaryLeftoverBatch->GetData(), fPrimaryLeftoverBatch->GetData() + (fBatchSize * fNumColumns), copy->GetData()); @@ -218,16 +217,16 @@ public: } // copy LeftoverBatch to both fPrimaryLeftoverBatch and fSecondaryLeftoverBatch - else if (emptySlots < LeftoverBatchSize) { - // copy the first part of LeftoverBatch to end of fPrimaryLeftoverTrainingBatch - fPrimaryLeftoverBatch->Resize(fBatchSize, NumCols); - std::copy(LeftoverBatch.GetData(), LeftoverBatch.GetData() + (emptySlots * NumCols), - fPrimaryLeftoverBatch->GetData() + (PrimaryLeftoverSize * NumCols)); + else if (emptySlots < leftoverBatchSize) { + // copy the first part of LeftoverBatch to end of fPrimaryLeftoverTrainingBatch + fPrimaryLeftoverBatch->Resize(fBatchSize, numCols); + std::copy(LeftoverBatch.GetData(), LeftoverBatch.GetData() + (emptySlots * numCols), + fPrimaryLeftoverBatch->GetData() + (PrimaryLeftoverSize * numCols)); // copy the last part of LeftoverBatch to the end of fSecondaryLeftoverBatch - fSecondaryLeftoverBatch->Resize(LeftoverBatchSize - emptySlots, NumCols); - std::copy(LeftoverBatch.GetData() + (emptySlots * NumCols), - LeftoverBatch.GetData() + (LeftoverBatchSize * NumCols), fSecondaryLeftoverBatch->GetData()); + fSecondaryLeftoverBatch->Resize(leftoverBatchSize - emptySlots, numCols); + std::copy(LeftoverBatch.GetData() + (emptySlots * numCols), + LeftoverBatch.GetData() + (leftoverBatchSize * numCols), fSecondaryLeftoverBatch->GetData()); // add fPrimaryLeftoverBatch to the batch vector auto copy = std::make_unique(fBatchSize, fNumColumns); @@ -243,7 +242,7 @@ public: // copy the content of fPrimaryLeftoverBatch to the leftover batch from the chunk if (isLastBatch) { - if (fDropRemainder == false && fLeftoverBatchSize > 0) { + if (!fDropRemainder && fLeftoverBatchSize > 0) { auto copy = std::make_unique(fLeftoverBatchSize, fNumColumns); std::copy(fPrimaryLeftoverBatch->GetData(), fPrimaryLeftoverBatch->GetData() + (fLeftoverBatchSize * fNumColumns), copy->GetData()); @@ -256,8 +255,8 @@ public: { std::lock_guard lock(fLock); - for (std::size_t i = 0; i < batches.size(); i++) { - fBatchQueue.push(std::move(batches[i])); + for (auto &batch : batches) { + fBatchQueue.push(std::move(batch)); } }