From 2dee81e62d40fb4e200697bd89c1e550c6758125 Mon Sep 17 00:00:00 2001 From: Valeria D'Amante Date: Thu, 19 Mar 2026 16:57:53 +0100 Subject: [PATCH 1/9] inclusion of 2025 --- .github/copilot-instructions.md | 2 +- .github/workflows/cross-section-check.yaml | 2 +- .github/workflows/ds-consistency-check.yaml | 4 +- config/Run3_2025/global.yaml | 14 ++++ config/dataset_exceptions.yaml | 72 +++++++++++++++++++-- include/AnalysisTools.h | 3 +- include/HHbTagScores.h | 3 + 7 files changed, 90 insertions(+), 10 deletions(-) create mode 100644 config/Run3_2025/global.yaml diff --git a/.github/copilot-instructions.md b/.github/copilot-instructions.md index e462eae2..09e51db9 100644 --- a/.github/copilot-instructions.md +++ b/.github/copilot-instructions.md @@ -63,7 +63,7 @@ These run automatically on PRs to `main`: 2. **repo-sanity-checks.yaml**: Checks for binary files (must use git LFS), calculates repo size delta 3. **ds-consistency-check.yaml**: Validates `config/*/samples.yaml` files with: ```bash - python3 test/checkDatasetConfigConsistency.py --exception config/dataset_exceptions.yaml Run3_2022 Run3_2022EE Run3_2023 Run3_2023BPix Run3_2024 + python3 test/checkDatasetConfigConsistency.py --exception config/dataset_exceptions.yaml Run3_2022 Run3_2022EE Run3_2023 Run3_2023BPix Run3_2024 Run3_2025 ``` 4. **trigger-flaf-integration.yaml**: Triggers GitLab integration pipeline via `@cms-flaf-bot test` comments diff --git a/.github/workflows/cross-section-check.yaml b/.github/workflows/cross-section-check.yaml index 732a7907..9c8e2060 100644 --- a/.github/workflows/cross-section-check.yaml +++ b/.github/workflows/cross-section-check.yaml @@ -47,4 +47,4 @@ jobs: - name: Check cross-sections if: ${{ steps.changed_files.outputs.has_changes == 'true' }} - run: python3 test/checkCrossSections.py Run3_2022 Run3_2022EE Run3_2023 Run3_2023BPix Run3_2024 + run: python3 test/checkCrossSections.py Run3_2022 Run3_2022EE Run3_2023 Run3_2023BPix Run3_2024 Run3_2025 diff --git a/.github/workflows/ds-consistency-check.yaml b/.github/workflows/ds-consistency-check.yaml index 0fed409c..6e2c4fa8 100644 --- a/.github/workflows/ds-consistency-check.yaml +++ b/.github/workflows/ds-consistency-check.yaml @@ -45,8 +45,8 @@ jobs: - name: Check dataset configs consistency if: ${{ steps.changed_files.outputs.has_ds_configs == 'true' }} - run: python3 test/checkDatasetConfigConsistency.py --exception config/dataset_exceptions.yaml Run3_2022 Run3_2022EE Run3_2023 Run3_2023BPix Run3_2024 + run: python3 test/checkDatasetConfigConsistency.py --exception config/dataset_exceptions.yaml Run3_2022 Run3_2022EE Run3_2023 Run3_2023BPix Run3_2024 Run3_2025 - name: Check dataset naming if: ${{ steps.changed_files.outputs.has_ds_configs == 'true' }} - run: python3 test/checkDatasetNaming.py --rules config/dataset_naming_rules.yaml Run3_2022 Run3_2022EE Run3_2023 Run3_2023BPix Run3_2024 + run: python3 test/checkDatasetNaming.py --rules config/dataset_naming_rules.yaml Run3_2022 Run3_2022EE Run3_2023 Run3_2023BPix Run3_2024 Run3_2025 diff --git a/config/Run3_2025/global.yaml b/config/Run3_2025/global.yaml new file mode 100644 index 00000000..ee73b8f3 --- /dev/null +++ b/config/Run3_2025/global.yaml @@ -0,0 +1,14 @@ +era: Run3_2025 +luminosity: 110730. # from https://twiki.cern.ch/twiki/bin/view/CMS/PdmVRun3Analysis#Year_2025 +nano_version: v15 +crossSectionsFile: FLAF/config/crossSections13p6TeV.yaml +MET_flags: # https://twiki.cern.ch/twiki/bin/view/CMS/MissingETOptionalFiltersRun2#Run_3_2024_data_and_MC_Recommend + - Flag_goodVertices + - Flag_globalSuperTightHalo2016Filter + - Flag_EcalDeadCellTriggerPrimitiveFilter + - Flag_BadPFMuonFilter + - Flag_BadPFMuonDzFilter + - Flag_hfNoisyHitsFilter + - Flag_eeBadScFilter + - Flag_ecalBadCalibFilter +lumiFile: Corrections/data/golden_json/Cert_Collisions2025_391658_398903_Golden.json diff --git a/config/dataset_exceptions.yaml b/config/dataset_exceptions.yaml index 3b4a2411..3a5fc000 100644 --- a/config/dataset_exceptions.yaml +++ b/config/dataset_exceptions.yaml @@ -1,17 +1,34 @@ +# no MC for 2025: +# ^(A-Z)*.: +# - Run3_2025 # DY +^DYto2(E|Mu|Tau)_M_50_[012]J_amcatnloFXFX: + - Run3_2025 +^DYto2(E|Mu|Tau)_M_(10to50|50)_amcatnloFXFX: + - Run3_2025 +^DYto2(E|Mu|Tau)_MLL_*.: + - Run3_2025 +^DYto2L_M_50_PTLL_*.: + - Run3_2025 ^DYto2L_M_(10to50|50)_amcatnloFXFX: - Run3_2024 + - Run3_2025 ^DYto2L_M_50_[012]J_amcatnloFXFX: - Run3_2024 + - Run3_2025 ^DYto2(E|Mu)_M_50_PTLL_.*_amcatnloFXFX: - Run3_2024 + - Run3_2025 ^DYto2.*_M_50_amcatnloFXFX_ext1: - Run3_2023 - Run3_2023BPix - Run3_2024 ^DYto2Tau_M_50_(0|1|2)J_Filtered_amcatnloFXFX: - Run3_2024 + - Run3_2025 # H +^GluGluH*.: + - Run3_2025 GluGluHto2B_M125_ext1: - Run3_2022 - Run3_2022EE @@ -20,7 +37,8 @@ GluGluHto2B_M125_ext1: GluGluHto2Tau_M125: - Run3_2022EE - Run3_2023 - +^ggZH_.*: + - Run3_2025 ^ggZH_.*_ext1: - Run3_2023 - Run3_2023BPix @@ -30,6 +48,8 @@ GluGluHto2Tau_M125: - Run3_2024 GluGlutoContinto2Zto2Mu2Tau: - Run3_2023 +^VBFH*.: + - Run3_2025 VBFHto2B_M125_ext1: - Run3_2022 - Run3_2022EE @@ -40,6 +60,8 @@ VBFHto2B_M125_ext1: - Run3_2023 - Run3_2023BPix - Run3_2024 +^ZH.*: + - Run3_2025 ^ZH_Hto2B.*_ext1: - Run3_2023 - Run3_2023BPix @@ -56,8 +78,11 @@ QCD_PT_300_EMEnriched: - Run3_2023 - Run3_2023BPix - Run3_2022 -^^QCD_PT_.*: +^QCD_PT_.*: - Run3_2024 + - Run3_2025 +^QCD_HT.*: + - Run3_2025 # ST ^T(bar|)W.*_ext1: - Run3_2023 @@ -67,11 +92,13 @@ TT: - Run3_2022EE - Run3_2023 - Run3_2024 + - Run3_2025 TT_ext1: - Run3_2022EE - Run3_2023 - Run3_2023BPix - Run3_2024 + - Run3_2025 TTto2L2Nu_ext1: - Run3_2023 - Run3_2023BPix @@ -84,26 +111,41 @@ TTtoLNu2Q_ext1: - Run3_2023 - Run3_2023BPix - Run3_2024 - +^TTto(2L2Nu|LNu2Q|4Q): + - Run3_2025 TTZ_Zto2Q: - Run3_2023 - Run3_2023BPix + - Run3_2025 TTWZ: - Run3_2022 + - Run3_2025 ^TTZ(Z|H)_Z(Z|H)to4B: - Run3_2022 - Run3_2022EE - Run3_2023 - Run3_2023BPix + - Run3_2025 TTZZ: - Run3_2024 + - Run3_2025 TTZH: - Run3_2024 + - Run3_2025 + +^TT(WH|WW): + - Run3_2025 + +^TTHto*: + - Run3_2025 + # VBF VBFHto2Tau_M125: - Run3_2022EE # W +^W(minus|plus|to)*.: + - Run3_2025 WtoLNu_madgraphMLM_ext1: - Run3_2023 - Run3_2023BPix @@ -115,6 +157,10 @@ WtoLNu_amcatnloFXFX: ^WtoLNu_(0|1)J_amcatnloFXFX: - Run3_2024 # VV +ZZ: + - Run3_2025 +^ZZto*.: + - Run3_2025 ^WW.*_ext1: - Run3_2023 - Run3_2023BPix @@ -127,7 +173,9 @@ WtoLNu_amcatnloFXFX: - Run3_2023 - Run3_2023BPix - Run3_2024 - +# VVV +ZZZ: + - Run3_2025 # change name in W samples.. WtoENu_amcatnloFXFX: - Run3_2022 @@ -155,14 +203,28 @@ WtoTauNu_amcatnloFXFX: - Run3_2022EE - Run3_2023 - Run3_2023BPix + - Run3_2025 +^T(barB|Bbar)toL(minus|plus)Nu(B|Bbar)_s_channel_4FS: + - Run3_2025 +^T(barBQ|BbarQ)_t_channel_4FS: + - Run3_2025 + +^T(barWplus|Wminus)to(2L2Nu|4Q|LNu2Q): + - Run3_2025 ZHto2Tau_UncorrelatedDecay_UnFiltered: - - Run3_2024 # tmp missing + - Run3_2024 ^Zto2Q_HT_(400to600|600to800|800toInf|200to400): - Run3_2024 + - Run3_2025 ^Zto2Q_HT_(100to400|400to800|800to1500|1500to2500|2500toInf): - Run3_2022 - Run3_2022EE - Run3_2023 - Run3_2023BPix + - Run3_2025 + +^Zto2Nu_HT*.: + - Run3_2025 + diff --git a/include/AnalysisTools.h b/include/AnalysisTools.h index 61672d98..3a4b49b0 100644 --- a/include/AnalysisTools.h +++ b/include/AnalysisTools.h @@ -32,7 +32,8 @@ enum class Period : int { Run3_2022EE = 6, Run3_2023 = 7, Run3_2023BPix = 8, - Run3_2024 = 9 + Run3_2024 = 9, + Run3_2025 = 10 }; enum class SampleType : int { diff --git a/include/HHbTagScores.h b/include/HHbTagScores.h index 57a40b61..b92a12a8 100644 --- a/include/HHbTagScores.h +++ b/include/HHbTagScores.h @@ -21,6 +21,7 @@ inline int PeriodToHHbTagInput(int version, Period period) { {{1, Period::Run3_2023}, 2018}, {{1, Period::Run3_2023BPix}, 2018}, {{1, Period::Run3_2024}, 2018}, + {{1, Period::Run3_2024},52018}, // v2 {{2, Period::Run2_2016_HIPM}, 2016}, {{2, Period::Run2_2016}, 2016}, @@ -31,6 +32,7 @@ inline int PeriodToHHbTagInput(int version, Period period) { {{2, Period::Run3_2023}, 2018}, {{2, Period::Run3_2023BPix}, 2018}, {{2, Period::Run3_2024}, 2018}, + {{2, Period::Run3_2024},52018}, // v3 {{3, Period::Run2_2016_HIPM}, 0}, {{3, Period::Run2_2016}, 0}, @@ -41,6 +43,7 @@ inline int PeriodToHHbTagInput(int version, Period period) { {{3, Period::Run3_2023}, 2}, {{3, Period::Run3_2023BPix}, 3}, {{3, Period::Run3_2024}, 3}, + {{3, Period::Run3_2025}, 3}, }; auto iter = periodHHBtag.find(std::make_pair(version, period)); if (iter == periodHHBtag.end()) { From 60765e9abcd3b7cb20ec5ec8c9e6d7589cc4e1bc Mon Sep 17 00:00:00 2001 From: Valeria D'Amante Date: Thu, 19 Mar 2026 20:22:48 +0100 Subject: [PATCH 2/9] yaml formatting 0 --- config/dataset_exceptions.yaml | 1 - 1 file changed, 1 deletion(-) diff --git a/config/dataset_exceptions.yaml b/config/dataset_exceptions.yaml index 3a5fc000..224cf891 100644 --- a/config/dataset_exceptions.yaml +++ b/config/dataset_exceptions.yaml @@ -227,4 +227,3 @@ ZHto2Tau_UncorrelatedDecay_UnFiltered: ^Zto2Nu_HT*.: - Run3_2025 - From 87ee4a8d9111394facb18ab34fecdc232af8a060 Mon Sep 17 00:00:00 2001 From: Valeria D'Amante Date: Wed, 1 Apr 2026 16:37:13 +0200 Subject: [PATCH 3/9] fixed 2025 lumi --- config/Run3_2025/global.yaml | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/config/Run3_2025/global.yaml b/config/Run3_2025/global.yaml index ee73b8f3..06801a58 100644 --- a/config/Run3_2025/global.yaml +++ b/config/Run3_2025/global.yaml @@ -1,5 +1,5 @@ era: Run3_2025 -luminosity: 110730. # from https://twiki.cern.ch/twiki/bin/view/CMS/PdmVRun3Analysis#Year_2025 +luminosity: 110730859275.72 # pb-1, https://twiki.cern.ch/twiki/bin/view/CMS/PdmVRun3Analysis#DATA_AN2 brilcalc lumi -b "STABLE BEAMS" -i Cert_Collisions2025_391658_398903_Golden.json (no normtag available) nano_version: v15 crossSectionsFile: FLAF/config/crossSections13p6TeV.yaml MET_flags: # https://twiki.cern.ch/twiki/bin/view/CMS/MissingETOptionalFiltersRun2#Run_3_2024_data_and_MC_Recommend @@ -11,4 +11,4 @@ MET_flags: # https://twiki.cern.ch/twiki/bin/view/CMS/MissingETOptionalFiltersR - Flag_hfNoisyHitsFilter - Flag_eeBadScFilter - Flag_ecalBadCalibFilter -lumiFile: Corrections/data/golden_json/Cert_Collisions2025_391658_398903_Golden.json +lumiFile: Corrections/data/golden_json/Cert_Collisions2024_378981_386951_Golden.json From 2d5c7732626b198ef4602d36cac2c8ac180a53ba Mon Sep 17 00:00:00 2001 From: Valeria D'Amante Date: Wed, 1 Apr 2026 16:56:32 +0200 Subject: [PATCH 4/9] fixed small bugs --- Analysis/tasks.py | 1809 ++++++++++++-------------------------- run_tools/mk_flaf_env.sh | 2 +- 2 files changed, 575 insertions(+), 1236 deletions(-) diff --git a/Analysis/tasks.py b/Analysis/tasks.py index aff01635..97b41994 100644 --- a/Analysis/tasks.py +++ b/Analysis/tasks.py @@ -1,1063 +1,567 @@ -import law -import os import contextlib +import json +import law import luigi +import os import shutil - - -from FLAF.RunKit.run_tools import ps_call -from FLAF.run_tools.law_customizations import ( - Task, - HTCondorWorkflow, - copy_param, -) -from FLAF.AnaProd.tasks import ( - AnaTupleFileListTask, - AnaTupleMergeTask, +import re +import yaml +from pathlib import Path + +from FLAF.RunKit.run_tools import ( + ps_call, + PsCallError, + natural_sort, + check_root_file_integrity, ) +from FLAF.run_tools.law_customizations import Task, HTCondorWorkflow, copy_param from FLAF.Common.Utilities import getCustomisationSplit, ServiceThread +from .AnaTupleFileList import CreateMergePlan +from .MergeAnaTuples import mergeAnaTuples -class HistTupleProducerTask(Task, HTCondorWorkflow, law.LocalWorkflow): - max_runtime = copy_param(HTCondorWorkflow.max_runtime, 5.0) - n_cpus = copy_param(HTCondorWorkflow.n_cpus, 4) - - def workflow_requires(self): - merge_organization_complete = AnaTupleFileListTask.req( - self, branches=() - ).complete() - if not merge_organization_complete: - req_dict = { - "AnaTupleFileListTask": AnaTupleFileListTask.req( - self, - branches=(), - max_runtime=AnaTupleFileListTask.max_runtime._default, - n_cpus=AnaTupleFileListTask.n_cpus._default, - ), - "AnaTupleMergeTask": AnaTupleMergeTask.req( - self, - branches=(), - max_runtime=AnaTupleMergeTask.max_runtime._default, - n_cpus=AnaTupleMergeTask.n_cpus._default, - ), - } - req_dict["AnalysisCacheTask"] = [] - var_produced_by = self.setup.var_producer_map - - flatten_vars = set() - for var in self.global_params["variables"]: - if isinstance(var, dict) and "vars" in var: - for v in var["vars"]: - flatten_vars.add(v) - else: - flatten_vars.add(var) - - for var_name in flatten_vars: - producer_to_run = var_produced_by.get(var_name, None) - if producer_to_run is not None: - req_dict["AnalysisCacheTask"].append( - AnalysisCacheTask.req( - self, - branches=(), - customisations=self.customisations, - producer_to_run=producer_to_run, - ) - ) - - return req_dict - - branch_set = set() - branch_set_cache = set() - producer_set = set() - for idx, ( - dataset, - br, - need_cache_global, - producer_list, - input_index, - ) in self.branch_map.items(): - branch_set.add(br) - if need_cache_global: - branch_set_cache.add(idx) - for producer_name in (p for p in producer_list if p is not None): - producer_set.add(producer_name) - reqs = {} - - if len(branch_set) > 0: - reqs["anaTuple"] = AnaTupleMergeTask.req( - self, - branches=tuple(branch_set), - customisations=self.customisations, - max_runtime=AnaTupleMergeTask.max_runtime._default, - n_cpus=AnaTupleMergeTask.n_cpus._default, - ) - - if len(branch_set_cache) > 0: - reqs["analysisCache"] = [] - for producer_name in (p for p in producer_set if p is not None): - reqs["analysisCache"].append( - AnalysisCacheTask.req( - self, - branches=tuple(branch_set_cache), - customisations=self.customisations, - producer_to_run=producer_name, - ) - ) - - return reqs - - def requires(self): - dataset_name, prod_br, need_cache_global, producer_list, input_index = ( - self.branch_data - ) - deps = { - "anaTuple": AnaTupleMergeTask.req( - self, - max_runtime=AnaTupleMergeTask.max_runtime._default, - branch=prod_br, - branches=(prod_br,), - customisations=self.customisations, - ) - } - if need_cache_global: - anaCaches = {} - for producer_name in (p for p in producer_list if p is not None): - if producer_name not in deps: - anaCaches[producer_name] = AnalysisCacheTask.req( - self, - max_runtime=AnalysisCacheTask.max_runtime._default, - branch=self.branch, - branches=(self.branch,), - customisations=self.customisations, - producer_to_run=producer_name, - ) - if len(anaCaches) > 0: - deps["anaCaches"] = anaCaches - - producers_to_aggregate = [] - process_group = ( - self.datasets[dataset_name]["process_group"] - if dataset_name != "data" - else "data" - ) - for producer_name in producer_list: - if producer_name: - payload_producers = self.global_params.get("payload_producers") - if not payload_producers: - continue - producer_cfg = payload_producers[producer_name] - needs_aggregation = producer_cfg.get("needs_aggregation", False) - target_groups = producer_cfg.get("target_groups", None) - applies_for_group = ( - target_groups is None or process_group in target_groups - ) - if needs_aggregation: - if applies_for_group: - producers_to_aggregate.append(producer_name) - - if producers_to_aggregate: - aggrAnaCaches = {} - for producer_name in producers_to_aggregate: - aggr_task_branch_map = AnalysisCacheAggregationTask.req( - self, - branch=-1, - producer_to_aggregate=producer_name, - ).create_branch_map() - - # find which branch of AnalysisCacheAggregationTask is needed for this producer and dataset - branch_idx = -1 - for aggr_br_idx, (aggr_dataset_name, _) in aggr_task_branch_map.items(): - if aggr_dataset_name == dataset_name: - branch_idx = aggr_br_idx - break - - assert branch_idx >= 0, "Must find correct branch" - - aggrAnaCaches[producer_name] = AnalysisCacheAggregationTask.req( - self, - branch=branch_idx, - producer_to_aggregate=producer_name, - ) - - if aggrAnaCaches: - deps["aggrAnaCaches"] = aggrAnaCaches - - return deps - - @law.dynamic_workflow_condition - def workflow_condition(self): - return AnaTupleFileListTask.req(self, branch=-1, branches=()).complete() +class InputFileTask(Task, law.LocalWorkflow): + def __init__(self, *args, **kwargs): + kwargs["workflow"] = "local" + super(InputFileTask, self).__init__(*args, **kwargs) - @workflow_condition.create_branch_map def create_branch_map(self): - var_produced_by = self.setup.var_producer_map - - n = 0 branches = {} - anaProd_branch_map = AnaTupleMergeTask.req( - self, branch=-1, branches=() - ).create_branch_map() - - datasets_to_consider = [ - key - for key in self.datasets.keys() - if self.datasets[key]["process_group"] != "data" - ] - datasets_to_consider.append("data") - - flatten_vars = set() - for var in self.global_params["variables"]: - if isinstance(var, dict) and "vars" in var: - for v in var["vars"]: - flatten_vars.add(v) - else: - flatten_vars.add(var) - - need_cache_list = [ - (var_name in var_produced_by, var_produced_by.get(var_name, None)) - # for var_name in self.global_params["variables"] - for var_name in flatten_vars - ] - producer_list = [] - need_cache_global = any(item[0] for item in need_cache_list) - # for var_name in self.global_params["variables"]: - for var_name in flatten_vars: - need_cache = True if var_name in var_produced_by else False - producer_to_run = ( - var_produced_by[var_name] if var_name in var_produced_by else None - ) - need_cache_list.append(need_cache) - producer_list.append(producer_to_run) - - payload_producers = self.global_params.get("payload_producers") - if payload_producers: - for producer_name, producer_cfg in payload_producers.items(): - is_global = producer_cfg.get("is_global", False) - not_present = producer_name not in producer_list - if not_present and is_global: - producer_list.append(producer_name) - - for prod_br, ( - dataset_name, - process_group, - ds_branch, - dataset_dependencies, - input_file_list, - output_file_list, - skip_future_tasks, - ) in anaProd_branch_map.items(): - if skip_future_tasks: - continue - if dataset_name not in datasets_to_consider: - continue - - for input_index in range(len(output_file_list)): - producers_to_run = [] - if payload_producers: - for prod in producer_list: - cfg = payload_producers.get(prod, None) - is_configurable = cfg is not None - if not is_configurable: - producers_to_run.append(prod) - continue - - target_groups = cfg.get("target_groups", None) - applies_for_group = ( - target_groups is None or process_group in target_groups - ) - - if applies_for_group: - producers_to_run.append(prod) - - branches[n] = ( - dataset_name, - prod_br, - need_cache_global, - producers_to_run, - input_index, - ) - else: - branches[n] = ( - dataset_name, - prod_br, - need_cache_global, - producer_list, - input_index, - ) - n += 1 + for dataset_id, dataset_name in self.iter_datasets(): + branches[dataset_id] = dataset_name return branches - @workflow_condition.output def output(self): - dataset_name, prod_br, need_cache_global, producer_list, input_index = ( - self.branch_data - ) - input = self.input()["anaTuple"][input_index] - input_name = os.path.basename(input.path) - outFileName = ( - f"histTuple_" + os.path.basename(input.path).split("_", 1)[1] - if input_name.startswith("anaTuple_") - else input_name - ) - output_path = os.path.join( - self.version, "HistTuples", self.period, dataset_name, outFileName - ) - return self.remote_target(output_path, fs=self.fs_HistTuple) + dataset_name = self.branch_data + return self.local_target(f"{dataset_name}.json") def run(self): - dataset_name, prod_br, need_cache_global, producer_list, input_index = ( - self.branch_data + dataset_name = self.branch_data + print(f"{dataset_name}: creating input file list into {self.output().path}") + dataset = self.datasets[dataset_name] + process_group = dataset["process_group"] + ignore_missing = self.global_params.get("ignore_missing_nanoAOD_files", {}).get( + process_group, False ) - input_file = self.input()["anaTuple"][input_index] - customisation_dict = getCustomisationSplit(self.customisations) - channels = customisation_dict.get( - "channels", self.global_params["channelSelection"] - ) - if type(channels) == list: - channels = ",".join(channels) - - print(f"input file is {input_file.path}") - histTupleDef = os.path.join(self.ana_path(), self.global_params["histTupleDef"]) - HistTupleProducer = os.path.join( - self.ana_path(), "FLAF", "Analysis", "HistTupleProducer.py" - ) - outFile = self.output().path - print(f"output file is {outFile}") - compute_unc_histograms = ( - customisation_dict.get("compute_unc_histograms") == "True" - if "compute_unc_histograms" in customisation_dict - else self.global_params.get("compute_unc_histograms", False) + fs_nanoAOD, folder_name, include_folder_name = self.get_fs_nanoAOD(dataset_name) + nano_version = self.get_nano_version(dataset_name) + pattern_dict = self.datasets[dataset_name].get("fileNamePattern", {}) + pattern = pattern_dict.get(nano_version, r".*\.root$") + input_files = [] + inactive_files = [] + for file in fs_nanoAOD.listdir(folder_name): + if not re.match(pattern, file): + continue + file_path = os.path.join(folder_name, file) if include_folder_name else file + if hasattr(fs_nanoAOD.file_interface, "is_available"): + if not fs_nanoAOD.file_interface.is_available( + folder_name, file, verbose=1 + ): + if ignore_missing: + print( + f"{file_path}: will be ignored because no sites are found." + ) + inactive_files.append(file_path) + continue + else: + raise RuntimeError(f"No sites found for {file_path}") + input_files.append(file_path) + + if len(input_files) == 0: + raise RuntimeError(f"No input files found for {dataset_name}") + + input_files = natural_sort(input_files) + output = { + "input_files": input_files, + "inactive_files": inactive_files, + } + with self.output().localize("w") as out_local_file: + with open(out_local_file.path, "w") as f: + json.dump(output, f, indent=2) + + print(f"{dataset_name}: {len(input_files)} input files are found.") + + input_file_cache = {} + + @staticmethod + def load_input_files(input_file_list, test=False): + if input_file_list not in InputFileTask.input_file_cache: + with open(input_file_list, "r") as f: + input_files = json.load(f)["input_files"] + InputFileTask.input_file_cache[input_file_list] = input_files + input_files = InputFileTask.input_file_cache[input_file_list] + active_files = ( + [input_files[0]] if test and len(input_files) > 0 else input_files ) - job_home, remove_job_home = self.law_job_home() - with contextlib.ExitStack() as stack: - local_input = stack.enter_context((input_file).localize("r")) - tmpFile = os.path.join( - job_home, f"HistTupleProducerTask_{input_index}.root" - ) - print(f"tmpfile is {tmpFile}") - HistTupleProducer_cmd = [ - "python3", - HistTupleProducer, - "--inFile", - local_input.path, - "--outFile", - tmpFile, - "--dataset", - dataset_name, - "--histTupleDef", - histTupleDef, - "--period", - self.period, - "--channels", - channels, - "--LAWrunVersion", - self.version, - ] - if compute_unc_histograms: - HistTupleProducer_cmd.extend( - [ - "--compute_rel_weights", - "True", - "--compute_unc_variations", - "True", - ] - ) - if self.customisations: - HistTupleProducer_cmd.extend([f"--customisations", self.customisations]) - if need_cache_global: - local_anacaches = {} - for producer_name, cache_file in self.input()["anaCaches"].items(): - local_anacaches[producer_name] = stack.enter_context( - cache_file.localize("r") - ).path - local_anacaches_str = ",".join( - f"{producer}:{path}" - for producer, path in local_anacaches.items() - if path.endswith("root") - ) - HistTupleProducer_cmd.extend(["--cacheFile", local_anacaches_str]) + return active_files - ps_call(HistTupleProducer_cmd, verbose=1) + WF = None + WF_complete_ = False - with self.output().localize("w") as local_output: - out_local_path = local_output.path - shutil.move(tmpFile, out_local_path) - if remove_job_home: - shutil.rmtree(job_home) + @staticmethod + def WF_complete(ref_task): + if InputFileTask.WF_complete_: + return True + if InputFileTask.WF is None: + InputFileTask.WF = InputFileTask.req(ref_task, branch=-1, branches=()) + InputFileTask.WF_complete_ = InputFileTask.WF.complete() + return InputFileTask.WF_complete_ -class HistFromNtupleProducerTask(Task, HTCondorWorkflow, law.LocalWorkflow): - max_runtime = copy_param(HTCondorWorkflow.max_runtime, 10.0) +class AnaTupleFileTask(Task, HTCondorWorkflow, law.LocalWorkflow): + max_runtime = copy_param(HTCondorWorkflow.max_runtime, 40.0) n_cpus = copy_param(HTCondorWorkflow.n_cpus, 2) def workflow_requires(self): - merge_organization_complete = AnaTupleFileListTask.req( - self, branches=() - ).complete() - if not merge_organization_complete: - req_dict = {} - req_dict["AnaTupleFileListTask"] = AnaTupleFileListTask.req( - self, - branches=(), - max_runtime=AnaTupleFileListTask.max_runtime._default, - n_cpus=AnaTupleFileListTask.n_cpus._default, - ) - req_dict["HistTupleProducerTask"] = HistTupleProducerTask.req( - self, branches=(), customisations=self.customisations - ) - return req_dict - branch_set = set() - for br_idx, (var, prod_br_list, dataset_names) in self.branch_map.items(): - if var in self.global_params["variables"]: - branch_set.update(prod_br_list) - branches = tuple(branch_set) - req_dict = { - "HistTupleProducerTask": HistTupleProducerTask.req( - self, branches=branches, customisations=self.customisations - ) + return { + "inputFile": InputFileTask.req(self, branches=()), } - return req_dict def requires(self): - var, prod_br_list, dataset_name = self.branch_data - reqs = [] - reqs.append( - HistTupleProducerTask.req( - self, - max_runtime=HistTupleProducerTask.max_runtime._default, - branch=prod_br, - customisations=self.customisations, - ) - for prod_br in prod_br_list - ) - return reqs + return [] + + _req_params = None + + @classmethod + def req(cls, inst, **kwargs): + if cls._req_params is None: + cls._req_params = cls.req_params(inst, **kwargs) + for param_name in ["branch", "branches"]: + param_value = kwargs.get(param_name, getattr(inst, param_name)) + cls._req_params[param_name] = param_value + return cls(**cls._req_params) @law.dynamic_workflow_condition def workflow_condition(self): - return AnaTupleFileListTask.req(self, branch=-1, branches=()).complete() + return InputFileTask.WF_complete(self) @workflow_condition.create_branch_map def create_branch_map(self): + branch_idx = 0 branches = {} - prod_br_list = [] - current_dataset = None - n = 0 - - dataset_to_branches = {} - HistTupleBranchMap = HistTupleProducerTask.req( - self, branches=() - ).create_branch_map() - for prod_br, ( - histTuple_dataset_name, - histTuple_prod_br, - need_cache_global, - producer_list, - input_index, - ) in HistTupleBranchMap.items(): - dataset_to_branches.setdefault(histTuple_dataset_name, []).append(prod_br) - - for dataset_name, prod_br_list in dataset_to_branches.items(): - for var_name in self.global_params["variables"]: - branches[n] = (var_name, prod_br_list, dataset_name) - n += 1 + for dataset_id, dataset_name in self.iter_datasets(): + input_file_list = ( + InputFileTask.req(self, branch=dataset_id, branches=()).output().path + ) + input_files = InputFileTask.load_input_files( + input_file_list, test=self.test > 0 + ) + for input_file_idx, input_file in enumerate(input_files): + output_name = f"anaTupleFile_{input_file_idx}" + branches[branch_idx] = ( + dataset_name, + input_file, + output_name, + ) + branch_idx += 1 return branches @workflow_condition.output def output(self): - var, prod_br, dataset_name = self.branch_data - if isinstance(var, dict): - var = var["name"] + dataset_name, _, output_name = self.branch_data output_path = os.path.join( - self.version, "Hists_split", self.period, var, f"{dataset_name}.root" + self.version, "AnaTuples_split", self.period, dataset_name ) - return self.remote_target(output_path, fs=self.fs_HistTuple) + root_output = os.path.join(output_path, f"{output_name}.root") + report_output = os.path.join(output_path, f"{output_name}.json") + return { + "root": self.remote_target(root_output, fs=self.fs_anaTuple), + "report": self.remote_target(report_output, fs=self.fs_anaTuple), + } def run(self): - var, prod_br, dataset_name = self.branch_data - job_home, remove_job_home = self.law_job_home() - customisation_dict = getCustomisationSplit(self.customisations) - channels = ( - customisation_dict["channels"] - if "channels" in customisation_dict.keys() - else self.global_params["channelSelection"] - ) - # Channels from the yaml are a list, but the format we need for the ps_call later is 'ch1,ch2,ch3', basically join into a string separated by comma - if type(channels) == list: - channels = ",".join(channels) - compute_unc_histograms = ( - customisation_dict["compute_unc_histograms"] == "True" - if "compute_unc_histograms" in customisation_dict.keys() - else self.global_params.get("compute_unc_histograms", False) - ) - HistFromNtupleProducer = os.path.join( - self.ana_path(), "FLAF", "Analysis", "HistProducerFromNTuple.py" - ) - input_list_remote_target = [inp for inp in self.input()[0]] - with contextlib.ExitStack() as stack: - local_inputs = [ - stack.enter_context((inp).localize("r")).path for inp in self.input()[0] - ] + with ServiceThread() as service_thread: + dataset_name, input_file_name, output_name = self.branch_data + dataset = self.datasets[dataset_name] + process_group = dataset["process_group"] + producer_anatuples = os.path.join( + self.ana_path(), "FLAF", "AnaProd", "anaTupleProducer.py" + ) - var = var if type(var) != dict else var["name"] - tmpFile = os.path.join(job_home, f"HistFromNtuple_{var}.root") - - HistFromNtupleProducer_cmd = [ - "python3", - HistFromNtupleProducer, - "--period", - self.period, - "--outFile", - tmpFile, - "--channels", - channels, - "--var", - var, - "--dataset_name", - dataset_name, - "--LAWrunVersion", - self.version, - ] - if compute_unc_histograms: - HistFromNtupleProducer_cmd.extend( - [ - "--compute_rel_weights", - "True", - "--compute_unc_variations", - "True", - ] - ) - if self.customisations: - HistFromNtupleProducer_cmd.extend( - [f"--customisations", self.customisations] - ) + customisation_dict = getCustomisationSplit(self.customisations) + channels = ( + customisation_dict["channels"] + if "channels" in customisation_dict.keys() + else self.global_params["channelSelection"] + ) + if type(channels) == list: + channels = ",".join(channels) + store_noncentral = ( + customisation_dict["store_noncentral"] == "True" + if "store_noncentral" in customisation_dict.keys() + else self.global_params.get("store_noncentral", False) + ) + compute_unc_variations = ( + customisation_dict["compute_unc_variations"] == "True" + if "compute_unc_variations" in customisation_dict.keys() + else self.global_params.get("compute_unc_variations", False) + ) - HistFromNtupleProducer_cmd.extend(local_inputs) - ps_call(HistFromNtupleProducer_cmd, verbose=1) + fs_nanoAOD, _, _ = self.get_fs_nanoAOD(dataset_name) + input_file = self.remote_target(input_file_name, fs=fs_nanoAOD) - with (self.output()).localize("w") as tmp_local_file: - out_local_path = tmp_local_file.path - shutil.move(tmpFile, out_local_path) + job_home, remove_job_home = self.law_job_home() + print(f"dataset_name: {dataset_name}") + print(f"process_group: {process_group}") + print(f"input_file = {input_file.uri()}") + + print("step 1: nanoAOD -> raw anaTuples") + outdir_anatuples = os.path.join(job_home, "rawAnaTuples") + anaTupleDef = os.path.join( + self.ana_path(), self.global_params["anaTupleDef"] + ) + reportFileName = "report.json" + rawReportPath = os.path.join(outdir_anatuples, reportFileName) + input_ok = True + with contextlib.ExitStack() as stack: + local_input = stack.enter_context(input_file.localize("r")).path + inFileName = os.path.basename(input_file.path) + print(f"inFileName {inFileName}") + anatuple_cmd = [ + "python3", + "-u", + producer_anatuples, + "--period", + self.period, + "--inFile", + local_input, + "--outDir", + outdir_anatuples, + "--dataset", + dataset_name, + "--anaTupleDef", + anaTupleDef, + "--channels", + channels, + "--inFileName", + inFileName, + "--reportOutput", + rawReportPath, + "--LAWrunVersion", + self.version, + "--output-name", + output_name, + ] + if compute_unc_variations: + anatuple_cmd.append("--compute-unc-variations") + if store_noncentral: + anatuple_cmd.append("--store-noncentral") + + if self.test > 0: + anatuple_cmd.extend(["--nEvents", str(self.test)]) + env = None + if self.global_params.get("use_cmssw_env_AnaTupleProduction", False): + env = self.cmssw_env + try: + ps_call(anatuple_cmd, env=env, verbose=1) + except PsCallError as e: + print(f"anaTupleProducer failed: {e}") + print("Checking input file integrity...") + input_ok = check_root_file_integrity(local_input, verbose=1) + if input_ok: + raise RuntimeError("anaTupleProducer failed.") + print( + "Input file is corrupted. Will create empty anaTuple and report." + ) - delete_after_merge = False # var == self.global_config["variables"][-1] --> find more robust condition - if delete_after_merge: - print(f"Finished HistogramProducer, lets delete remote targets") - for remote_target in input_list_remote_target: - remote_target.remove() - with remote_target.localize("w") as tmp_local_file: - tmp_local_file.touch() # Create a dummy to avoid dependency crashes + producer_fuseTuples = os.path.join( + self.ana_path(), "FLAF", "AnaProd", "FuseAnaTuples.py" + ) + outdir_fusedTuples = os.path.join(job_home, "fusedAnaTuples") + outFileName = os.path.basename(input_file.path) + outFilePath = os.path.join(outdir_fusedTuples, outFileName) + finalReportPath = os.path.join(outdir_fusedTuples, reportFileName) + if input_ok: + print("step 2: raw anaTuples -> fused anaTuples") + verbosity = "1" + fuseTuple_cmd = [ + "python", + "-u", + producer_fuseTuples, + "--input-config", + rawReportPath, + "--work-dir", + outdir_fusedTuples, + "--tuple-output", + outFileName, + "--report-output", + reportFileName, + "--verbose", + verbosity, + ] + ps_call(fuseTuple_cmd, verbose=1) + else: + os.makedirs(outdir_fusedTuples, exist_ok=True) + Path(outFilePath).touch() + report = { + "valid": False, + "nano_file_name": inFileName, + "anaTuple_file_name": output_name, + "dataset_name": dataset_name, + } + with open(finalReportPath, "w") as f: + json.dump(report, f, indent=2) - if remove_job_home: - shutil.rmtree(job_home) + with self.output()["root"].localize("w") as local_file: + shutil.move(outFilePath, local_file.path) + with self.output()["report"].localize("w") as local_file: + shutil.move(finalReportPath, local_file.path) + if remove_job_home: + shutil.rmtree(job_home) -class HistMergerTask(Task, HTCondorWorkflow, law.LocalWorkflow): - max_runtime = copy_param(HTCondorWorkflow.max_runtime, 5.0) - n_cpus = copy_param(HTCondorWorkflow.n_cpus, 2) - def workflow_requires(self): - branch_map = self.create_branch_map() +class AnaTupleFileListBuilderTask(Task, HTCondorWorkflow, law.LocalWorkflow): + max_runtime = copy_param(HTCondorWorkflow.max_runtime, 24.0) + n_cpus = copy_param(HTCondorWorkflow.n_cpus, 1) - merge_organization_complete = AnaTupleFileListTask.req( - self, branches=() - ).complete() - if not merge_organization_complete: + def workflow_requires(self): + input_file_task_complete = InputFileTask.WF_complete(self) + if not input_file_task_complete: return { - "AnaTupleFileListTask": AnaTupleFileListTask.req( - self, - branches=(), - max_runtime=AnaTupleFileListTask.max_runtime._default, - n_cpus=AnaTupleFileListTask.n_cpus._default, - ), - "HistFromNtupleProducerTask": HistFromNtupleProducerTask.req( - self, - branches=(), - ), + "anaTuple": AnaTupleFileTask.req(self, branches=()), + "inputFile": InputFileTask.req(self, branches=()), } + AnaTuple_map = AnaTupleFileTask.req( + self, branch=-1, branches=() + ).create_branch_map() branch_set = set() - all_datasets = {} - for br_idx, (var, prod_br_list, dataset_names) in self.branch_map.items(): - all_datasets[var] = prod_br_list - - new_branchset = set() - for var in all_datasets.keys(): - new_branchset.update(all_datasets[var]) + for idx, (dataset_name, process_group) in self.branch_map.items(): + for br_idx, (anaTuple_dataset_name, _, _) in AnaTuple_map.items(): + match = dataset_name == anaTuple_dataset_name + if not match and process_group == "data": + anaTuple_dataset = self.datasets[anaTuple_dataset_name] + anaTuple_process_group = anaTuple_dataset["process_group"] + match = anaTuple_process_group == "data" + if match: + branch_set.add(br_idx) - return { - "HistFromNtupleProducerTask": HistFromNtupleProducerTask.req( - self, branches=list(new_branchset) + deps = { + "AnaTupleFileTask": AnaTupleFileTask.req( + self, + branches=tuple(branch_set), + max_runtime=AnaTupleFileTask.max_runtime._default, + n_cpus=AnaTupleFileTask.n_cpus._default, ) } + return deps def requires(self): - var_name, br_indices, datasets = self.branch_data + dataset_name, process_group = self.branch_data + AnaTuple_map = AnaTupleFileTask.req( + self, branch=-1, branches=() + ).create_branch_map() + branch_set = set() + for br_idx, (anaTuple_dataset_name, _, _) in AnaTuple_map.items(): + match = dataset_name == anaTuple_dataset_name + if not match and process_group == "data": + anaTuple_dataset = self.datasets[anaTuple_dataset_name] + anaTuple_process_group = anaTuple_dataset["process_group"] + match = anaTuple_process_group == "data" + if match: + branch_set.add(br_idx) + reqs = [ - HistFromNtupleProducerTask.req( + AnaTupleFileTask.req( self, - max_runtime=HistFromNtupleProducerTask.max_runtime._default, + max_runtime=AnaTupleFileTask.max_runtime._default, branch=prod_br, - customisations=self.customisations, + branches=(prod_br,), ) - for prod_br in tuple(set(br_indices)) + for prod_br in tuple(branch_set) ] - return reqs - @law.dynamic_workflow_condition - def workflow_condition(self): - return AnaTupleFileListTask.req(self, branch=-1, branches=()).complete() - - @workflow_condition.create_branch_map def create_branch_map(self): - HistFromNtupleProducerTask_branch_map = HistFromNtupleProducerTask.req( - self, branches=() - ).create_branch_map() - all_datasets = {} branches = {} k = 0 - for br_idx, ( - var_name, - prod_br_list, - current_dataset, - ) in HistFromNtupleProducerTask_branch_map.items(): - var_name = ( - var_name.get("name", var_name) - if isinstance(var_name, dict) - else var_name - ) - if var_name not in all_datasets.keys(): - all_datasets[var_name] = [] - all_datasets[var_name].append((br_idx, current_dataset)) - for var_name, br_list in all_datasets.items(): - br_indices = [] - datasets = [] - for key in br_list: - idx, dataset_name = key - br_indices.append(idx) - datasets.append(dataset_name) - branches[k] = (var_name, br_indices, datasets) + data_done = False + for dataset_id, dataset_name in self.iter_datasets(): + dataset = self.datasets[dataset_name] + process_group = dataset["process_group"] + if process_group == "data": + if data_done: + continue # Will have multiple data datasets, but only need one branch + dataset_name = "data" + data_done = True + branches[k] = (dataset_name, process_group) k += 1 return branches - @workflow_condition.output + def get_output_path(self, dataset_name, output_name): + output_file = f"{dataset_name}.json" + base_name = "AnaTupleFileList" + if output_name != "plan": + base_name += f"_{output_name}" + return os.path.join(self.version, base_name, self.period, output_file) + def output(self): - var_name, br_indices, datasets = self.branch_data - output_path = os.path.join(self.version, "Hists_merged", self.period, var_name) - output_file_name = os.path.join(output_path, f"{var_name}.root") - return self.remote_target(output_file_name, fs=self.fs_HistTuple) + dataset_name, process_group = self.branch_data + outputs = {} + for output_name in ["plan", "reports"]: + output_path = self.get_output_path(dataset_name, output_name) + outputs[output_name] = self.remote_target(output_path, fs=self.fs_anaTuple) + return outputs def run(self): - var_name, br_indices, datasets = self.branch_data - customisation_dict = getCustomisationSplit(self.customisations) - - channels = ( - customisation_dict["channels"] - if "channels" in customisation_dict.keys() - else self.global_params["channelSelection"] - ) - # Channels from the yaml are a list, but the format we need for the ps_call later is 'ch1,ch2,ch3', basically join into a string separated by comma - if type(channels) == list: - channels = ",".join(channels) - - uncNames = ["Central"] - unc_cfg_dict = self.setup.weights_config - uncs_to_exclude = ( - self.global_params["uncs_to_exclude"][self.period] - if "uncs_to_exclude" in self.global_params.keys() - else [] - ) - compute_unc_histograms = ( - customisation_dict["compute_unc_histograms"] == "True" - if "compute_unc_histograms" in customisation_dict.keys() - else self.global_params.get("compute_unc_histograms", False) - ) - if compute_unc_histograms: - for uncName in list(unc_cfg_dict["norm"].keys()) + list( - unc_cfg_dict["shape"].keys() - ): - if uncName in uncs_to_exclude: - continue - uncNames.append(uncName) - - MergerProducer = os.path.join( - self.ana_path(), "FLAF", "Analysis", "HistMergerFromHists.py" - ) - HaddMergedHistsProducer = os.path.join( - self.ana_path(), "FLAF", "Analysis", "hadd_merged_hists.py" - ) - - all_datasets = [] - local_inputs = [] + dataset_name, process_group = self.branch_data with contextlib.ExitStack() as stack: - for inp in self.input(): - dataset_name = os.path.basename(inp.path) - all_datasets.append(dataset_name.strip(".root")) - local_inputs.append(stack.enter_context(inp.localize("r")).path) - dataset_names = ",".join(smpl for smpl in all_datasets) - all_outputs_merged = [] - if len(uncNames) == 1: - with self.output().localize("w") as outFile: - MergerProducer_cmd = [ - "python3", - MergerProducer, - "--outFile", - outFile.path, - "--var", - var_name, - "--dataset_names", - dataset_names, - "--uncSource", - uncNames[0], - "--channels", - channels, - "--period", - self.period, - "--LAWrunVersion", - self.version, - ] - MergerProducer_cmd.extend(local_inputs) - ps_call(MergerProducer_cmd, verbose=1) - else: - job_home, remove_job_home = self.law_job_home() - for uncName in uncNames: - final_histname = f"{var_name}_{uncName}.root" - tmp_outfile_merge = os.path.join(job_home, final_histname) - MergerProducer_cmd = [ - "python3", - MergerProducer, - "--outFile", - tmp_outfile_merge, - "--var", - var_name, - "--dataset_names", - dataset_names, - "--uncSource", - uncName, - "--channels", - channels, - "--period", - self.period, - "--LAWrunVersion", - self.version, - ] - MergerProducer_cmd.extend(local_inputs) - ps_call(MergerProducer_cmd, verbose=1) - all_outputs_merged.append(tmp_outfile_merge) - with self.output().localize("w") as outFile: - HaddMergedHistsProducer_cmd = [ - "python3", - HaddMergedHistsProducer, - "--outFile", - outFile.path, - "--var", - var_name, - ] - HaddMergedHistsProducer_cmd.extend(all_outputs_merged) - ps_call(HaddMergedHistsProducer_cmd, verbose=1) - if remove_job_home: - shutil.rmtree(job_home) - - -class AnalysisCacheTask(Task, HTCondorWorkflow, law.LocalWorkflow): - max_runtime = copy_param(HTCondorWorkflow.max_runtime, 2.0) - n_cpus = copy_param(HTCondorWorkflow.n_cpus, 1) - producer_to_run = luigi.Parameter() - - # Need to override this from HTCondorWorkflow to have separate data pathways for different cache tasks - def htcondor_output_directory(self): - return law.LocalDirectoryTarget(self.local_path(self.producer_to_run)) - def __init__(self, *args, **kwargs): - # Needed to get the config and ht_condor_pathways figured out - super(AnalysisCacheTask, self).__init__(*args, **kwargs) - self.n_cpus = self.global_params["payload_producers"][self.producer_to_run].get( - "n_cpus", 1 - ) - self.max_runtime = self.global_params["payload_producers"][ - self.producer_to_run - ].get("max_runtime", 2.0) - self.output_file_extension = self.global_params["payload_producers"][ - self.producer_to_run - ].get("save_as", "root") - - def workflow_requires(self): - merge_organization_complete = AnaTupleFileListTask.req( - self, branches=() - ).complete() - if not merge_organization_complete: - req_dict = { - "AnaTupleFileListTask": AnaTupleFileListTask.req( - self, - branches=(), - max_runtime=AnaTupleFileListTask.max_runtime._default, - n_cpus=AnaTupleFileListTask.n_cpus._default, - ), - "AnaTupleMergeTask": AnaTupleMergeTask.req( - self, - branches=(), - max_runtime=AnaTupleMergeTask.max_runtime._default, - n_cpus=AnaTupleMergeTask.n_cpus._default, - ), - } - # Get all the producers to require for this dummy branch - producer_requires_set = set() - producer_dependencies = self.global_params["payload_producers"][ - self.producer_to_run - ]["dependencies"] - if producer_dependencies: - for dependency in producer_dependencies: - producer_requires_set.add(dependency) - req_dict["AnalysisCacheTask"] = [ - AnalysisCacheTask.req( - self, - branches=(), - customisations=self.customisations, - producer_to_run=producer_name, - ) - for producer_name in list(producer_requires_set) - if producer_name is not None + print("Localizing inputs") + local_inputs = [ + stack.enter_context(inp["report"].localize("r")).path + for inp in self.input() ] - return req_dict + print(f"Localized {len(local_inputs)} inputs") - workflow_dict = {} - workflow_dict["anaTuple"] = { - br_idx: AnaTupleMergeTask.req( - self, - branch=prod_br, - branches=(), - max_runtime=AnaTupleMergeTask.max_runtime._default, - n_cpus=AnaTupleMergeTask.n_cpus._default, - ) - for br_idx, ( - dataset_name, - prod_br, - need_cache_global, - producer_list, - input_index, - ) in self.branch_map.items() - } - producer_dependencies = self.global_params["payload_producers"][ - self.producer_to_run - ]["dependencies"] - if producer_dependencies: - for dependency in producer_dependencies: - workflow_dict[dependency] = { - br_idx: AnalysisCacheTask.req( - self, - branch=br_idx, - branches=(), - customisations=self.customisations, - producer_to_run=dependency, - ) - for br_idx, _ in self.branch_map.items() - } - return workflow_dict + job_home, remove_job_home = self.law_job_home() - def requires(self): - dataset_name, prod_br, need_cache_global, producer_list, input_index = ( - self.branch_data - ) - producer_dependencies = self.global_params["payload_producers"][ - self.producer_to_run - ]["dependencies"] - requirements = { - "anaTuple": AnaTupleMergeTask.req( - self, - branch=prod_br, - max_runtime=AnaTupleMergeTask.max_runtime._default, - branches=(), + nEventsPerFile = self.setup.global_params.get( + "nEventsPerFile", {"data": 1_000_000} ) - } - anaCaches = {} - if producer_dependencies: - for dependency in producer_dependencies: - anaCaches[dependency] = AnalysisCacheTask.req( - self, producer_to_run=dependency - ) - requirements["anaCaches"] = anaCaches + if isinstance(nEventsPerFile, dict): + nEventsPerFile = nEventsPerFile.get(process_group, 100_000) + is_data = process_group == "data" - return requirements + result = CreateMergePlan(self.setup, local_inputs, nEventsPerFile, is_data) - @law.dynamic_workflow_condition - def workflow_condition(self): - return AnaTupleFileListTask.req(self, branch=-1, branches=()).complete() + for output_name, output_remote in self.output().items(): + output_path_tmp = os.path.join(job_home, f"{output_name}_tmp.json") + with open(output_path_tmp, "w") as f: + json.dump(result[output_name], f, indent=2) + with output_remote.localize("w") as output_localized: + shutil.move(output_path_tmp, output_localized.path) - @workflow_condition.create_branch_map - def create_branch_map(self): - branches = HistTupleProducerTask.req( - self, branch=-1, branches=() - ).create_branch_map() - return branches + if remove_job_home: + shutil.rmtree(job_home) - @workflow_condition.output - def output(self): - dataset_name, _, _, _, input_index = self.branch_data - inputFilePath = self.input()["anaTuple"][input_index].path - outFileNameWithoutExtension = os.path.basename(inputFilePath).split(".")[0] - outFileName = f"{outFileNameWithoutExtension}.{self.output_file_extension}" - output_path = os.path.join( - self.version, - "AnalysisCache", - self.producer_to_run, - self.period, - dataset_name, - outFileName, - ) - return self.remote_target(output_path, fs=self.fs_anaCacheTuple) - def run(self): - with ServiceThread() as service_thread: - dataset_name, prod_br, need_cache_global, producer_list, input_index = ( - self.branch_data - ) - analysis_cache_producer = os.path.join( - self.ana_path(), "FLAF", "Analysis", "AnalysisCacheProducer.py" - ) - customisation_dict = getCustomisationSplit(self.customisations) - channels = ( - customisation_dict["channels"] - if "channels" in customisation_dict.keys() - else self.global_params["channelSelection"] - ) - # Channels from the yaml are a list, but the format we need for the ps_call later is 'ch1,ch2,ch3', basically join into a string separated by comma - if type(channels) == list: - channels = ",".join(channels) - job_home, remove_job_home = self.law_job_home() - print(f"At job_home {job_home}") +class AnaTupleFileListTask(AnaTupleFileListBuilderTask): + def workflow_requires(self): + return {"AnaTupleFileListBuilderTask": AnaTupleFileListBuilderTask.req(self)} - with contextlib.ExitStack() as stack: - # Enter a stack to maybe load the analysis cache files - input_file = self.input()["anaTuple"][input_index] - if len(self.input()["anaCaches"]) > 0: - local_anacaches = {} - for producer_name, cache_files in self.input()["anaCaches"].items(): - local_anacaches[producer_name] = stack.enter_context( - cache_files[input_index].localize("r") - ).path - local_anacaches_str = ",".join( - f"{producer}:{path}" - for producer, path in local_anacaches.items() - ) - print(f"Task has cache input files {local_anacaches_str}") - else: - local_anacaches_str = "" - - output_file = self.output() - print(f"considering dataset {dataset_name}, and file {input_file.path}") - customisation_dict = getCustomisationSplit(self.customisations) - tmpFile = os.path.join( - job_home, f"AnalysisCacheTask.{self.output_file_extension}" - ) - with input_file.localize("r") as local_input: - analysisCacheProducer_cmd = [ - "python3", - analysis_cache_producer, - "--period", - self.period, - "--inFile", - local_input.path, - "--outFile", - tmpFile, - "--dataset", - dataset_name, - "--channels", - channels, - "--producer", - self.producer_to_run, - "--workingDir", - job_home, - "--LAWrunVersion", - self.version, - ] - if ( - self.global_params["store_noncentral"] - and dataset_name != "data" - ): - analysisCacheProducer_cmd.extend( - ["--compute_unc_variations", "True"] - ) - if len(local_anacaches_str) > 0: - analysisCacheProducer_cmd.extend( - ["--cacheFiles", local_anacaches_str] - ) - # Check if cmssw env is required - prod_env = ( - self.cmssw_env - if self.global_params["payload_producers"][ - self.producer_to_run - ].get("cmssw_env", False) - else None - ) + def requires(self): + return AnaTupleFileListBuilderTask.req(self) - histTupleDef = os.path.join( - self.ana_path(), self.global_params["histTupleDef"] - ) - analysisCacheProducer_cmd.extend(["--histTupleDef", histTupleDef]) + def output(self): + dataset_name, process_group = self.branch_data + return self.local_target(self.get_output_path(dataset_name, "plan")) - ps_call(analysisCacheProducer_cmd, env=prod_env, verbose=1) - print( - f"Finished producing payload for producer={self.producer_to_run} with name={dataset_name}, file={input_file.path}" - ) - with output_file.localize("w") as tmp_local_file: - out_local_path = tmp_local_file.path - shutil.move(tmpFile, out_local_path) - if remove_job_home: - shutil.rmtree(job_home) + def run(self): + with self.input()["plan"].localize("r") as input_local: + self.output().makedirs() + shutil.copy(input_local.path, self.output().path) -class HistPlotTask(Task, HTCondorWorkflow, law.LocalWorkflow): - max_runtime = copy_param(HTCondorWorkflow.max_runtime, 2.0) - n_cpus = copy_param(HTCondorWorkflow.n_cpus, 1) +class AnaTupleMergeTask(Task, HTCondorWorkflow, law.LocalWorkflow): + max_runtime = copy_param(HTCondorWorkflow.max_runtime, 48.0) + n_cpus = copy_param(HTCondorWorkflow.n_cpus, 2) + delete_inputs_after_merge = luigi.BoolParameter(default=False) def workflow_requires(self): merge_organization_complete = AnaTupleFileListTask.req( self, branches=() ).complete() if not merge_organization_complete: - req_dict = {} - req_dict["HistMergerTask"] = HistMergerTask.req( - self, branches=(), customisations=self.customisations - ) - req_dict["AnaTupleFileListTask"] = AnaTupleFileListTask.req( - self, - branches=(), - max_runtime=AnaTupleFileListTask.max_runtime._default, - n_cpus=AnaTupleFileListTask.n_cpus._default, - ) - return req_dict - merge_map = HistMergerTask.req( - self, branch=-1, branches=(), customisations=self.customisations - ).create_branch_map() + return { + "AnaTupleFileListTask": AnaTupleFileListTask.req( + self, + branches=(), + max_runtime=AnaTupleFileListTask.max_runtime._default, + n_cpus=AnaTupleFileListTask.n_cpus._default, + ), + } branch_set = set() - for br_idx, (var) in self.branch_map.items(): - for br, (v, _, _) in merge_map.items(): - if v == var: - branch_set.add(br) + for _, ( + _, + _, + ds_branch, + dataset_dependencies, + _, + _, + _, + ) in self.branch_map.items(): + branch_set.add(ds_branch) + branch_set.update(dataset_dependencies.values()) return { - "merge": HistMergerTask.req( + "AnaTupleFileListTask": AnaTupleFileListTask.req( self, branches=tuple(branch_set), - customisations=self.customisations, + max_runtime=AnaTupleFileListTask.max_runtime._default, + n_cpus=AnaTupleFileListTask.n_cpus._default, ) } def requires(self): - var = self.branch_data - - merge_map = HistMergerTask.req( - self, branch=-1, branches=(), customisations=self.customisations + # Need both the AnaTupleFileTask for the input ROOT file, and the AnaTupleFileListTask for the json structure + ( + dataset_name, + process_group, + ds_branch, + dataset_dependencies, + input_file_list, + _, + skip_future_tasks, + ) = self.branch_data + anaTuple_branch_map = AnaTupleFileTask.req( + self, branch=-1, branches=() ).create_branch_map() - merge_branch = next(br for br, (v, _, _) in merge_map.items() if v == var) + required_branches = {"root": {}} + for prod_br, ( + anaTuple_dataset_name, + anaTuple_input_file, + anaTuple_output_name, + ) in anaTuple_branch_map.items(): + match = dataset_name == anaTuple_dataset_name + if not match and process_group == "data": + anaTuple_dataset = self.datasets[anaTuple_dataset_name] + anaTuple_process_group = anaTuple_dataset["process_group"] + match = anaTuple_process_group == "data" + dependency_type = None + if match: + key = f"{anaTuple_dataset_name}/{anaTuple_output_name}" + if key in input_file_list: + dependency_type = "root" + if dependency_type: + if anaTuple_dataset_name not in required_branches[dependency_type]: + required_branches[dependency_type][anaTuple_dataset_name] = [] + required_branches[dependency_type][anaTuple_dataset_name].append( + AnaTupleFileTask.req( + self, + max_runtime=AnaTupleFileTask.max_runtime._default, + branch=prod_br, + branches=(prod_br,), + ) + ) - return HistMergerTask.req( - self, - branch=merge_branch, - customisations=self.customisations, - max_runtime=HistMergerTask.max_runtime._default, - ) + required_branches["json"] = {} + if process_group != "data": + anaTupleFileListBuilder_branch_map = AnaTupleFileListBuilderTask.req( + self, branch=-1, branches=() + ).create_branch_map() + + for builder_branch, ( + builder_dataset_name, + _, + ) in anaTupleFileListBuilder_branch_map.items(): + if ( + builder_dataset_name == dataset_name + or builder_dataset_name in dataset_dependencies + ): + required_branches["json"][builder_dataset_name] = ( + AnaTupleFileListBuilderTask.req( + self, + max_runtime=AnaTupleFileListBuilderTask.max_runtime._default, + branch=builder_branch, + branches=(builder_branch,), + ) + ) + + return required_branches @law.dynamic_workflow_condition def workflow_condition(self): @@ -1066,304 +570,139 @@ def workflow_condition(self): @workflow_condition.create_branch_map def create_branch_map(self): branches = {} - merge_map = HistMergerTask.req( - self, branch=-1, branches=(), customisations=self.customisations + nBranch = 0 + ds_branch_map = AnaTupleFileListTask.req( + self, branch=-1, branches=() ).create_branch_map() - var_dict = {} - for var in self.global_params["variables"]: - var_name = var if isinstance(var, str) else var["name"] - var_dict[var_name] = var - for k, (_, (var, _, _)) in enumerate(merge_map.items()): - # Check if we want to plot this var in the global config - if isinstance(var_dict[var], dict): - if var_dict[var].get("plot_task", True): - branches[k] = var - else: - branches[k] = var - return branches - - @workflow_condition.output - def output(self): - var = self.branch_data - outputs = {} - customisation_dict = getCustomisationSplit(self.customisations) - channels = customisation_dict.get( - "channels", self.global_params["channelSelection"] - ) - if isinstance(channels, str): - channels = channels.split(",") - - base_cats = self.global_params.get("categories") or [] - boosted_cats = self.global_params.get("boosted_categories") or [] - categories = base_cats + boosted_cats - if isinstance(categories, str): - categories = categories.split(",") - - custom_region_name = self.global_params.get("custom_regions") - - custom_regions = customisation_dict.get( - custom_region_name, self.global_params[custom_region_name] - ) - - for ch in channels: - for cat in categories: - for custom_region in custom_regions: - rel_path = os.path.join( - self.version, - "Plots", - self.period, - var, - custom_region, - cat, - f"{ch}_{var}.pdf", - ) - outputs[f"{ch}:{cat}:{custom_region}"] = self.remote_target( - rel_path, fs=self.fs_plots - ) - return outputs + ds_branches = {} + for ds_branch, (dataset_name, process_group) in ds_branch_map.items(): + if dataset_name in ds_branches: + raise RuntimeError( + f"Dataset {dataset_name} appears multiple times in AnaTupleFileListTask branch map!" + ) + ds_branches[dataset_name] = ds_branch - def run(self): - var = self.branch_data - era = self.period - ver = self.version - customisation_dict = getCustomisationSplit(self.customisations) - - plotter = os.path.join(self.ana_path(), "FLAF", "Analysis", "HistPlotter.py") - - def bool_flag(key, default): - return ( - customisation_dict.get( - key, str(self.global_params.get(key, default)) - ).lower() - == "true" + for ds_branch, (dataset_name, process_group) in ds_branch_map.items(): + dataset_dependencies = self.collect_extra_dependencies( + dataset_name, ds_branches, process_group ) - - plot_unc = bool_flag("plot_unc", True) - plot_wantData = bool_flag(f"plot_wantData_{var}", True) - plot_wantSignals = bool_flag("plot_wantSignals", True) - plot_wantQCD = bool_flag("plot_wantQCD", False) - plot_rebin = bool_flag("plot_rebin", True) - plot_analysis = customisation_dict.get( - "plot_analysis", self.global_params.get("plot_analysis", "") - ) - - with self.input().localize("r") as local_input: - infile = local_input.path - print("Loading fname", infile) - - # Create list of all keys and all targets - key_list = [] - output_list = [] - for output_key, output_target in self.output().items(): - if (output_target).exists(): - print(f"Output for {var} {output_target} already exists! Continue") - continue - key_list.append(output_key) - output_list.append(output_target) - - # Now localize all output_targets - with contextlib.ExitStack() as stack: - local_outputs = [ - stack.enter_context((output).localize("w")).path - for output in output_list - ] - cmd = [ - "python3", - plotter, - "--inFile", - infile, - "--all_outFiles", - ",".join(local_outputs), - "--globalConfig", - os.path.join( - self.ana_path(), - self.global_params["analysis_config_area"], - "global.yaml", - ), - "--var", - var, - "--all_keys", - ",".join(key_list), - "--year", - era, - "--analysis", - plot_analysis, - "--ana_path", - self.ana_path(), - "--period", - self.period, - "--LAWrunVersion", - self.version, - ] - if plot_wantData: - cmd.append("--wantData") - if plot_wantSignals: - cmd.append("--wantSignals") - if plot_wantQCD: - cmd += ["--wantQCD", "true"] - if plot_rebin: - cmd += ["--rebin", "true"] - ps_call(cmd, verbose=1) - - -class AnalysisCacheAggregationTask(Task, HTCondorWorkflow, law.LocalWorkflow): - max_runtime = copy_param(HTCondorWorkflow.max_runtime, 2.0) - n_cpus = copy_param(HTCondorWorkflow.n_cpus, 1) - producer_to_aggregate = luigi.Parameter() - - def __init__(self, *args, **kwargs): - super(AnalysisCacheAggregationTask, self).__init__(*args, **kwargs) - - @law.dynamic_workflow_condition - def workflow_condition(self): - return AnaTupleFileListTask.req(self, branch=-1, branches=()).complete() - - def workflow_requires(self): - merge_organization_complete = AnaTupleFileListTask.req( - self, branches=() - ).complete() - payload_producers = self.global_params["payload_producers"] - if not merge_organization_complete: - deps = { - "AnaTupleFileListTask": AnaTupleFileListTask.req( - self, - branches=(), - max_runtime=AnaTupleFileListTask.max_runtime._default, - n_cpus=AnaTupleFileListTask.n_cpus._default, - ), - } - - deps["AnalysisCacheTask"] = AnalysisCacheTask.req( - self, - branches=(), - max_runtime=AnalysisCacheTask.max_runtime._default, - n_cpus=AnalysisCacheTask.n_cpus._default, - customisations=self.customisations, - producer_to_run=self.producer_to_aggregate, + this_dataset_dict = self.setup.getAnaTupleFileList( + dataset_name, + AnaTupleFileListTask.req(self, branch=ds_branch, branches=()).output(), ) - return deps - - deps = {} - producers_cache_branch_map = AnalysisCacheTask.req( - self, branch=-1, branches=(), producer_to_run=self.producer_to_aggregate - ).create_branch_map() - branches = [b for b in producers_cache_branch_map.keys()] - deps["AnalysisCacheTask"] = AnalysisCacheTask.req( - self, - branches=tuple(branches), - max_runtime=AnalysisCacheTask.max_runtime._default, - n_cpus=AnalysisCacheTask.n_cpus._default, - customisations=self.customisations, - producer_to_run=self.producer_to_aggregate, - ) - return deps + for this_dict in this_dataset_dict: + input_file_list = this_dict["inputs"] + output_file_list = this_dict["outputs"] + skip_future_tasks = this_dict["n_events"] == 0 + branches[nBranch] = ( + dataset_name, + process_group, + ds_branch, + dataset_dependencies, + input_file_list, + output_file_list, + skip_future_tasks, + ) + nBranch += 1 + return branches - def requires(self): - # I don't need to check here that this producer applies to target group - # the reason is that if its in the branch map - it already was checked - sample_name, list_of_producer_cache_keys = self.branch_data - reqs = [ - AnalysisCacheTask.req( - self, - max_runtime=AnalysisCacheTask.max_runtime._default, - branch=prod_br, - customisations=self.customisations, - producer_to_run=self.producer_to_aggregate, + def collect_extra_dependencies(self, dataset_name, ds_branches, process_group): + other_datasets = {} + if process_group != "data": + dataset = self.datasets[dataset_name] + processors = self.setup.get_processors( + dataset["process_name"], stage="AnaTupleMerge" ) - for prod_br in list_of_producer_cache_keys - ] - return reqs - - @workflow_condition.create_branch_map - def create_branch_map(self): - # structure of branch map - # ---- name of sample, - # ---- list of branch indices of the AnalysisCacheTask(producer_to_run=producer_name) - - branches = {} - branch_idx = 0 - - payload_producers = self.global_params["payload_producers"] - producer_cfg = payload_producers[self.producer_to_aggregate] - producer_cache_branch_map = AnalysisCacheTask.req( - self, branch=-1, branches=(), producer_to_run=self.producer_to_aggregate - ).create_branch_map() - - # find which branches of this producer correspond to each sample - sample_branch_map = {} - for producer_cache_branch_idx, ( - sample_name, - _, - _, - _, - _, - ) in producer_cache_branch_map.items(): - if sample_name not in sample_branch_map: - sample_branch_map[sample_name] = [] - sample_branch_map[sample_name].append(producer_cache_branch_idx) - - target_groups = producer_cfg.get("target_groups", None) - - for sample_name, list_of_producer_cache_keys in sample_branch_map.items(): - process_group = ( - self.datasets[sample_name]["process_group"] - if sample_name != "data" - else "data" + require_whole_process = any( + p.get("dependency_level", {}).get("AnaTupleMerge", "file") == "process" + for p in processors ) - applies_for_group = target_groups is None or process_group in target_groups - if applies_for_group: - branches[branch_idx] = (sample_name, list_of_producer_cache_keys) - branch_idx += 1 - - return branches + if require_whole_process: + process = self.setup.base_processes[dataset["process_name"]] + for p_dataset_name in process.get("datasets", []): + if p_dataset_name != dataset_name: + other_datasets[p_dataset_name] = ds_branches[p_dataset_name] + return other_datasets @workflow_condition.output def output(self): - sample_name, _ = self.branch_data - extension = self.global_params["payload_producers"][ - self.producer_to_aggregate - ].get("save_as", "root") - output_name = f"aggregatedCache.{extension}" - return self.local_target(sample_name, self.producer_to_aggregate, output_name) + ( + dataset_name, + process_group, + ds_branch, + dataset_dependencies, + input_file_list, + output_file_list, + skip_future_tasks, + ) = self.branch_data + output_dir = os.path.join(self.version, "AnaTuples", self.period, dataset_name) + outputs = [os.path.join(output_dir, out_file) for out_file in output_file_list] + return [ + self.remote_target(out_path, fs=self.fs_anaTuple) for out_path in outputs + ] def run(self): - sample_name, _ = self.branch_data - producers = self.global_params["payload_producers"] - cacheAggregator = os.path.join( - self.ana_path(), "FLAF", "Analysis", "AnalysisCacheAggregator.py" - ) + ( + dataset_name, + process_group, + ds_branch, + dataset_dependencies, + input_file_list, + output_file_list, + skip_future_tasks, + ) = self.branch_data + is_data = process_group == "data" + job_home, remove_job_home = self.law_job_home() + tmpFiles = [ + os.path.join(job_home, f"AnaTupleMergeTask_{dataset_name}_{i}.root") + for i in range(len(self.output())) + ] + print(f"dataset: {dataset_name}") with contextlib.ExitStack() as stack: - local_output = self.output() - inputs = self.input() - local_inputs = [ - stack.enter_context(inp.localize("r")).path for inp in inputs - ] - assert local_inputs, "`local_inputs` must be a non-empty list" - producer_cfg = producers[self.producer_to_aggregate] - ext = producer_cfg.get("save_as", "root") - job_home, remove_job_home = self.law_job_home() - tmpFile = os.path.join(job_home, f"aggregatedCache_tmp.{ext}") - aggregate_cmd = [ - "python3", - cacheAggregator, - "--outFile", - tmpFile, - "--period", - self.period, - "--producer", - self.producer_to_aggregate, - "--LAWrunVersion", - self.version, - ] - aggregate_cmd.append("--inputFiles") - aggregate_cmd.extend(local_inputs) - ps_call(aggregate_cmd, verbose=1) - - # For local target: ensure parent directory exists and move directly - out_local_path = local_output.path - local_output.parent.touch() # Creates parent directories if needed - shutil.move(tmpFile, out_local_path) - print( - f"Creating aggregated cache for producer {self.producer_to_aggregate} and dataset {sample_name} at {out_local_path}" + + print("Localizing root inputs") + local_root_inputs = [] + for ds_name, files in self.input()["root"].items(): + for file_list in files: + local_input = stack.enter_context( + file_list["root"].localize("r") + ).path + local_root_inputs.append(local_input) + print(f"Localized {len(local_root_inputs)} root inputs") + + print("Localizing reports") + reports = {} + for ds_name, file_list in self.input()["json"].items(): + report_file = stack.enter_context( + file_list["reports"].localize("r") + ).path + with open(report_file, "r") as f: + ds_reports = yaml.safe_load(f) + reports[ds_name] = list(ds_reports.values()) + print(f"Localized {len(reports)} reports") + + mergeAnaTuples( + setup=self.setup, + dataset_name=dataset_name, + is_data=is_data, + work_dir=job_home, + input_reports=reports, + input_roots=local_root_inputs, + root_outputs=tmpFiles, ) + + for outFile, tmpFile in zip(self.output(), tmpFiles): + with outFile.localize("w") as tmp_local_file: + out_local_path = tmp_local_file.path + shutil.move(tmpFile, out_local_path) + + if self.delete_inputs_after_merge: + print(f"Finished merging, lets delete remote AnaTupleFile targets") + for ds_name, files in self.input()["root"].items(): + for remote_targets in files: + for target in remote_targets: + target.remove() + + if remove_job_home: + shutil.rmtree(job_home) diff --git a/run_tools/mk_flaf_env.sh b/run_tools/mk_flaf_env.sh index 8e4e6713..62c74e2a 100755 --- a/run_tools/mk_flaf_env.sh +++ b/run_tools/mk_flaf_env.sh @@ -28,7 +28,7 @@ install() { echo "Installing packages in $env_base" run_cmd source $env_base/bin/activate run_cmd pip install --upgrade pip - run_cmd pip install law scinum + run_cmd pip install luigi==3.7.3 law scinum run_cmd pip install https://github.com/riga/plotlib/archive/refs/heads/master.zip run_cmd pip install fastcrc run_cmd pip install bayesian-optimization From b01021611426ef504dab585627c8fdca8bea7c7f Mon Sep 17 00:00:00 2001 From: Valeria D'Amante Date: Wed, 1 Apr 2026 17:02:05 +0200 Subject: [PATCH 5/9] fixed units for 2025 lumi --- config/Run3_2025/global.yaml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/config/Run3_2025/global.yaml b/config/Run3_2025/global.yaml index 06801a58..4cfd91df 100644 --- a/config/Run3_2025/global.yaml +++ b/config/Run3_2025/global.yaml @@ -1,5 +1,5 @@ era: Run3_2025 -luminosity: 110730859275.72 # pb-1, https://twiki.cern.ch/twiki/bin/view/CMS/PdmVRun3Analysis#DATA_AN2 brilcalc lumi -b "STABLE BEAMS" -i Cert_Collisions2025_391658_398903_Golden.json (no normtag available) +luminosity: 110730.86 # pb-1, https://twiki.cern.ch/twiki/bin/view/CMS/PdmVRun3Analysis#DATA_AN2 brilcalc lumi -b "STABLE BEAMS" -i Cert_Collisions2025_391658_398903_Golden.json (no normtag available) nano_version: v15 crossSectionsFile: FLAF/config/crossSections13p6TeV.yaml MET_flags: # https://twiki.cern.ch/twiki/bin/view/CMS/MissingETOptionalFiltersRun2#Run_3_2024_data_and_MC_Recommend From a330febdee13253f22c85faa5123a03393264b2e Mon Sep 17 00:00:00 2001 From: Valeria D'Amante Date: Wed, 1 Apr 2026 17:05:08 +0200 Subject: [PATCH 6/9] fixed small bug for merger task --- Analysis/tasks.py | 1809 +++++++++++++++++++++++++++++++-------------- 1 file changed, 1235 insertions(+), 574 deletions(-) diff --git a/Analysis/tasks.py b/Analysis/tasks.py index 97b41994..51b3ceef 100644 --- a/Analysis/tasks.py +++ b/Analysis/tasks.py @@ -1,567 +1,1063 @@ -import contextlib -import json import law -import luigi import os +import contextlib +import luigi import shutil -import re -import yaml -from pathlib import Path - -from FLAF.RunKit.run_tools import ( - ps_call, - PsCallError, - natural_sort, - check_root_file_integrity, + + +from FLAF.RunKit.run_tools import ps_call +from FLAF.run_tools.law_customizations import ( + Task, + HTCondorWorkflow, + copy_param, +) +from FLAF.AnaProd.tasks import ( + AnaTupleFileListTask, + AnaTupleMergeTask, ) -from FLAF.run_tools.law_customizations import Task, HTCondorWorkflow, copy_param from FLAF.Common.Utilities import getCustomisationSplit, ServiceThread -from .AnaTupleFileList import CreateMergePlan -from .MergeAnaTuples import mergeAnaTuples -class InputFileTask(Task, law.LocalWorkflow): - def __init__(self, *args, **kwargs): - kwargs["workflow"] = "local" - super(InputFileTask, self).__init__(*args, **kwargs) +class HistTupleProducerTask(Task, HTCondorWorkflow, law.LocalWorkflow): + max_runtime = copy_param(HTCondorWorkflow.max_runtime, 5.0) + n_cpus = copy_param(HTCondorWorkflow.n_cpus, 4) + + def workflow_requires(self): + merge_organization_complete = AnaTupleFileListTask.req( + self, branches=() + ).complete() + if not merge_organization_complete: + req_dict = { + "AnaTupleFileListTask": AnaTupleFileListTask.req( + self, + branches=(), + max_runtime=AnaTupleFileListTask.max_runtime._default, + n_cpus=AnaTupleFileListTask.n_cpus._default, + ), + "AnaTupleMergeTask": AnaTupleMergeTask.req( + self, + branches=(), + max_runtime=AnaTupleMergeTask.max_runtime._default, + n_cpus=AnaTupleMergeTask.n_cpus._default, + ), + } + req_dict["AnalysisCacheTask"] = [] + var_produced_by = self.setup.var_producer_map + + flatten_vars = set() + for var in self.global_params["variables"]: + if isinstance(var, dict) and "vars" in var: + for v in var["vars"]: + flatten_vars.add(v) + else: + flatten_vars.add(var) + + for var_name in flatten_vars: + producer_to_run = var_produced_by.get(var_name, None) + if producer_to_run is not None: + req_dict["AnalysisCacheTask"].append( + AnalysisCacheTask.req( + self, + branches=(), + customisations=self.customisations, + producer_to_run=producer_to_run, + ) + ) + + return req_dict + + branch_set = set() + branch_set_cache = set() + producer_set = set() + for idx, ( + dataset, + br, + need_cache_global, + producer_list, + input_index, + ) in self.branch_map.items(): + branch_set.add(br) + if need_cache_global: + branch_set_cache.add(idx) + for producer_name in (p for p in producer_list if p is not None): + producer_set.add(producer_name) + reqs = {} + + if len(branch_set) > 0: + reqs["anaTuple"] = AnaTupleMergeTask.req( + self, + branches=tuple(branch_set), + customisations=self.customisations, + max_runtime=AnaTupleMergeTask.max_runtime._default, + n_cpus=AnaTupleMergeTask.n_cpus._default, + ) + + if len(branch_set_cache) > 0: + reqs["analysisCache"] = [] + for producer_name in (p for p in producer_set if p is not None): + reqs["analysisCache"].append( + AnalysisCacheTask.req( + self, + branches=tuple(branch_set_cache), + customisations=self.customisations, + producer_to_run=producer_name, + ) + ) + + return reqs + + def requires(self): + dataset_name, prod_br, need_cache_global, producer_list, input_index = ( + self.branch_data + ) + deps = { + "anaTuple": AnaTupleMergeTask.req( + self, + max_runtime=AnaTupleMergeTask.max_runtime._default, + branch=prod_br, + branches=(prod_br,), + customisations=self.customisations, + ) + } + if need_cache_global: + anaCaches = {} + for producer_name in (p for p in producer_list if p is not None): + if producer_name not in deps: + anaCaches[producer_name] = AnalysisCacheTask.req( + self, + max_runtime=AnalysisCacheTask.max_runtime._default, + branch=self.branch, + branches=(self.branch,), + customisations=self.customisations, + producer_to_run=producer_name, + ) + if len(anaCaches) > 0: + deps["anaCaches"] = anaCaches + + producers_to_aggregate = [] + process_group = ( + self.datasets[dataset_name]["process_group"] + if dataset_name != "data" + else "data" + ) + for producer_name in producer_list: + if producer_name: + payload_producers = self.global_params.get("payload_producers") + if not payload_producers: + continue + producer_cfg = payload_producers[producer_name] + needs_aggregation = producer_cfg.get("needs_aggregation", False) + target_groups = producer_cfg.get("target_groups", None) + applies_for_group = ( + target_groups is None or process_group in target_groups + ) + if needs_aggregation: + if applies_for_group: + producers_to_aggregate.append(producer_name) + + if producers_to_aggregate: + aggrAnaCaches = {} + for producer_name in producers_to_aggregate: + aggr_task_branch_map = AnalysisCacheAggregationTask.req( + self, + branch=-1, + producer_to_aggregate=producer_name, + ).create_branch_map() + + # find which branch of AnalysisCacheAggregationTask is needed for this producer and dataset + branch_idx = -1 + for aggr_br_idx, (aggr_dataset_name, _) in aggr_task_branch_map.items(): + if aggr_dataset_name == dataset_name: + branch_idx = aggr_br_idx + break + + assert branch_idx >= 0, "Must find correct branch" + + aggrAnaCaches[producer_name] = AnalysisCacheAggregationTask.req( + self, + branch=branch_idx, + producer_to_aggregate=producer_name, + ) + + if aggrAnaCaches: + deps["aggrAnaCaches"] = aggrAnaCaches + return deps + + @law.dynamic_workflow_condition + def workflow_condition(self): + return AnaTupleFileListTask.req(self, branch=-1, branches=()).complete() + + @workflow_condition.create_branch_map def create_branch_map(self): + var_produced_by = self.setup.var_producer_map + + n = 0 branches = {} - for dataset_id, dataset_name in self.iter_datasets(): - branches[dataset_id] = dataset_name + anaProd_branch_map = AnaTupleMergeTask.req( + self, branch=-1, branches=() + ).create_branch_map() + + datasets_to_consider = [ + key + for key in self.datasets.keys() + if self.datasets[key]["process_group"] != "data" + ] + datasets_to_consider.append("data") + + flatten_vars = set() + for var in self.global_params["variables"]: + if isinstance(var, dict) and "vars" in var: + for v in var["vars"]: + flatten_vars.add(v) + else: + flatten_vars.add(var) + + need_cache_list = [ + (var_name in var_produced_by, var_produced_by.get(var_name, None)) + # for var_name in self.global_params["variables"] + for var_name in flatten_vars + ] + producer_list = [] + need_cache_global = any(item[0] for item in need_cache_list) + # for var_name in self.global_params["variables"]: + for var_name in flatten_vars: + need_cache = True if var_name in var_produced_by else False + producer_to_run = ( + var_produced_by[var_name] if var_name in var_produced_by else None + ) + need_cache_list.append(need_cache) + producer_list.append(producer_to_run) + + payload_producers = self.global_params.get("payload_producers") + if payload_producers: + for producer_name, producer_cfg in payload_producers.items(): + is_global = producer_cfg.get("is_global", False) + not_present = producer_name not in producer_list + if not_present and is_global: + producer_list.append(producer_name) + + for prod_br, ( + dataset_name, + process_group, + ds_branch, + dataset_dependencies, + input_file_list, + output_file_list, + skip_future_tasks, + ) in anaProd_branch_map.items(): + if skip_future_tasks: + continue + if dataset_name not in datasets_to_consider: + continue + + for input_index in range(len(output_file_list)): + producers_to_run = [] + if payload_producers: + for prod in producer_list: + cfg = payload_producers.get(prod, None) + is_configurable = cfg is not None + if not is_configurable: + producers_to_run.append(prod) + continue + + target_groups = cfg.get("target_groups", None) + applies_for_group = ( + target_groups is None or process_group in target_groups + ) + + if applies_for_group: + producers_to_run.append(prod) + + branches[n] = ( + dataset_name, + prod_br, + need_cache_global, + producers_to_run, + input_index, + ) + else: + branches[n] = ( + dataset_name, + prod_br, + need_cache_global, + producer_list, + input_index, + ) + n += 1 return branches + @workflow_condition.output def output(self): - dataset_name = self.branch_data - return self.local_target(f"{dataset_name}.json") + dataset_name, prod_br, need_cache_global, producer_list, input_index = ( + self.branch_data + ) + input = self.input()["anaTuple"][input_index] + input_name = os.path.basename(input.path) + outFileName = ( + f"histTuple_" + os.path.basename(input.path).split("_", 1)[1] + if input_name.startswith("anaTuple_") + else input_name + ) + output_path = os.path.join( + self.version, "HistTuples", self.period, dataset_name, outFileName + ) + return self.remote_target(output_path, fs=self.fs_HistTuple) def run(self): - dataset_name = self.branch_data - print(f"{dataset_name}: creating input file list into {self.output().path}") - dataset = self.datasets[dataset_name] - process_group = dataset["process_group"] - ignore_missing = self.global_params.get("ignore_missing_nanoAOD_files", {}).get( - process_group, False + dataset_name, prod_br, need_cache_global, producer_list, input_index = ( + self.branch_data ) - fs_nanoAOD, folder_name, include_folder_name = self.get_fs_nanoAOD(dataset_name) - nano_version = self.get_nano_version(dataset_name) - pattern_dict = self.datasets[dataset_name].get("fileNamePattern", {}) - pattern = pattern_dict.get(nano_version, r".*\.root$") - input_files = [] - inactive_files = [] - for file in fs_nanoAOD.listdir(folder_name): - if not re.match(pattern, file): - continue - file_path = os.path.join(folder_name, file) if include_folder_name else file - if hasattr(fs_nanoAOD.file_interface, "is_available"): - if not fs_nanoAOD.file_interface.is_available( - folder_name, file, verbose=1 - ): - if ignore_missing: - print( - f"{file_path}: will be ignored because no sites are found." - ) - inactive_files.append(file_path) - continue - else: - raise RuntimeError(f"No sites found for {file_path}") - input_files.append(file_path) - - if len(input_files) == 0: - raise RuntimeError(f"No input files found for {dataset_name}") - - input_files = natural_sort(input_files) - output = { - "input_files": input_files, - "inactive_files": inactive_files, - } - with self.output().localize("w") as out_local_file: - with open(out_local_file.path, "w") as f: - json.dump(output, f, indent=2) - - print(f"{dataset_name}: {len(input_files)} input files are found.") - - input_file_cache = {} - - @staticmethod - def load_input_files(input_file_list, test=False): - if input_file_list not in InputFileTask.input_file_cache: - with open(input_file_list, "r") as f: - input_files = json.load(f)["input_files"] - InputFileTask.input_file_cache[input_file_list] = input_files - input_files = InputFileTask.input_file_cache[input_file_list] - active_files = ( - [input_files[0]] if test and len(input_files) > 0 else input_files + input_file = self.input()["anaTuple"][input_index] + customisation_dict = getCustomisationSplit(self.customisations) + channels = customisation_dict.get( + "channels", self.global_params["channelSelection"] + ) + if type(channels) == list: + channels = ",".join(channels) + + print(f"input file is {input_file.path}") + histTupleDef = os.path.join(self.ana_path(), self.global_params["histTupleDef"]) + HistTupleProducer = os.path.join( + self.ana_path(), "FLAF", "Analysis", "HistTupleProducer.py" ) - return active_files + outFile = self.output().path + print(f"output file is {outFile}") + compute_unc_histograms = ( + customisation_dict.get("compute_unc_histograms") == "True" + if "compute_unc_histograms" in customisation_dict + else self.global_params.get("compute_unc_histograms", False) + ) + job_home, remove_job_home = self.law_job_home() + with contextlib.ExitStack() as stack: + local_input = stack.enter_context((input_file).localize("r")) + tmpFile = os.path.join( + job_home, f"HistTupleProducerTask_{input_index}.root" + ) + print(f"tmpfile is {tmpFile}") + HistTupleProducer_cmd = [ + "python3", + HistTupleProducer, + "--inFile", + local_input.path, + "--outFile", + tmpFile, + "--dataset", + dataset_name, + "--histTupleDef", + histTupleDef, + "--period", + self.period, + "--channels", + channels, + "--LAWrunVersion", + self.version, + ] + if compute_unc_histograms: + HistTupleProducer_cmd.extend( + [ + "--compute_rel_weights", + "True", + "--compute_unc_variations", + "True", + ] + ) + if self.customisations: + HistTupleProducer_cmd.extend([f"--customisations", self.customisations]) + if need_cache_global: + local_anacaches = {} + for producer_name, cache_file in self.input()["anaCaches"].items(): + local_anacaches[producer_name] = stack.enter_context( + cache_file.localize("r") + ).path + local_anacaches_str = ",".join( + f"{producer}:{path}" + for producer, path in local_anacaches.items() + if path.endswith("root") + ) + HistTupleProducer_cmd.extend(["--cacheFile", local_anacaches_str]) - WF = None - WF_complete_ = False + ps_call(HistTupleProducer_cmd, verbose=1) - @staticmethod - def WF_complete(ref_task): - if InputFileTask.WF_complete_: - return True - if InputFileTask.WF is None: - InputFileTask.WF = InputFileTask.req(ref_task, branch=-1, branches=()) - InputFileTask.WF_complete_ = InputFileTask.WF.complete() - return InputFileTask.WF_complete_ + with self.output().localize("w") as local_output: + out_local_path = local_output.path + shutil.move(tmpFile, out_local_path) + if remove_job_home: + shutil.rmtree(job_home) -class AnaTupleFileTask(Task, HTCondorWorkflow, law.LocalWorkflow): - max_runtime = copy_param(HTCondorWorkflow.max_runtime, 40.0) +class HistFromNtupleProducerTask(Task, HTCondorWorkflow, law.LocalWorkflow): + max_runtime = copy_param(HTCondorWorkflow.max_runtime, 10.0) n_cpus = copy_param(HTCondorWorkflow.n_cpus, 2) def workflow_requires(self): - return { - "inputFile": InputFileTask.req(self, branches=()), + merge_organization_complete = AnaTupleFileListTask.req( + self, branches=() + ).complete() + if not merge_organization_complete: + req_dict = {} + req_dict["AnaTupleFileListTask"] = AnaTupleFileListTask.req( + self, + branches=(), + max_runtime=AnaTupleFileListTask.max_runtime._default, + n_cpus=AnaTupleFileListTask.n_cpus._default, + ) + req_dict["HistTupleProducerTask"] = HistTupleProducerTask.req( + self, branches=(), customisations=self.customisations + ) + return req_dict + branch_set = set() + for br_idx, (var, prod_br_list, dataset_names) in self.branch_map.items(): + if var in self.global_params["variables"]: + branch_set.update(prod_br_list) + branches = tuple(branch_set) + req_dict = { + "HistTupleProducerTask": HistTupleProducerTask.req( + self, branches=branches, customisations=self.customisations + ) } + return req_dict def requires(self): - return [] - - _req_params = None - - @classmethod - def req(cls, inst, **kwargs): - if cls._req_params is None: - cls._req_params = cls.req_params(inst, **kwargs) - for param_name in ["branch", "branches"]: - param_value = kwargs.get(param_name, getattr(inst, param_name)) - cls._req_params[param_name] = param_value - return cls(**cls._req_params) + var, prod_br_list, dataset_name = self.branch_data + reqs = [] + reqs.append( + HistTupleProducerTask.req( + self, + max_runtime=HistTupleProducerTask.max_runtime._default, + branch=prod_br, + customisations=self.customisations, + ) + for prod_br in prod_br_list + ) + return reqs @law.dynamic_workflow_condition def workflow_condition(self): - return InputFileTask.WF_complete(self) + return AnaTupleFileListTask.req(self, branch=-1, branches=()).complete() @workflow_condition.create_branch_map def create_branch_map(self): - branch_idx = 0 branches = {} - for dataset_id, dataset_name in self.iter_datasets(): - input_file_list = ( - InputFileTask.req(self, branch=dataset_id, branches=()).output().path - ) - input_files = InputFileTask.load_input_files( - input_file_list, test=self.test > 0 - ) + prod_br_list = [] + current_dataset = None + n = 0 + + dataset_to_branches = {} + HistTupleBranchMap = HistTupleProducerTask.req( + self, branches=() + ).create_branch_map() + for prod_br, ( + histTuple_dataset_name, + histTuple_prod_br, + need_cache_global, + producer_list, + input_index, + ) in HistTupleBranchMap.items(): + dataset_to_branches.setdefault(histTuple_dataset_name, []).append(prod_br) + + for dataset_name, prod_br_list in dataset_to_branches.items(): + for var_name in self.global_params["variables"]: + branches[n] = (var_name, prod_br_list, dataset_name) + n += 1 - for input_file_idx, input_file in enumerate(input_files): - output_name = f"anaTupleFile_{input_file_idx}" - branches[branch_idx] = ( - dataset_name, - input_file, - output_name, - ) - branch_idx += 1 return branches @workflow_condition.output def output(self): - dataset_name, _, output_name = self.branch_data + var, prod_br, dataset_name = self.branch_data + if isinstance(var, dict): + var = var["name"] output_path = os.path.join( - self.version, "AnaTuples_split", self.period, dataset_name + self.version, "Hists_split", self.period, var, f"{dataset_name}.root" ) - root_output = os.path.join(output_path, f"{output_name}.root") - report_output = os.path.join(output_path, f"{output_name}.json") - return { - "root": self.remote_target(root_output, fs=self.fs_anaTuple), - "report": self.remote_target(report_output, fs=self.fs_anaTuple), - } + return self.remote_target(output_path, fs=self.fs_HistTuple) def run(self): - with ServiceThread() as service_thread: - dataset_name, input_file_name, output_name = self.branch_data - dataset = self.datasets[dataset_name] - process_group = dataset["process_group"] - producer_anatuples = os.path.join( - self.ana_path(), "FLAF", "AnaProd", "anaTupleProducer.py" - ) + var, prod_br, dataset_name = self.branch_data + job_home, remove_job_home = self.law_job_home() + customisation_dict = getCustomisationSplit(self.customisations) + channels = ( + customisation_dict["channels"] + if "channels" in customisation_dict.keys() + else self.global_params["channelSelection"] + ) + # Channels from the yaml are a list, but the format we need for the ps_call later is 'ch1,ch2,ch3', basically join into a string separated by comma + if type(channels) == list: + channels = ",".join(channels) + compute_unc_histograms = ( + customisation_dict["compute_unc_histograms"] == "True" + if "compute_unc_histograms" in customisation_dict.keys() + else self.global_params.get("compute_unc_histograms", False) + ) + HistFromNtupleProducer = os.path.join( + self.ana_path(), "FLAF", "Analysis", "HistProducerFromNTuple.py" + ) + input_list_remote_target = [inp for inp in self.input()[0]] + with contextlib.ExitStack() as stack: + local_inputs = [ + stack.enter_context((inp).localize("r")).path for inp in self.input()[0] + ] - customisation_dict = getCustomisationSplit(self.customisations) - channels = ( - customisation_dict["channels"] - if "channels" in customisation_dict.keys() - else self.global_params["channelSelection"] - ) - if type(channels) == list: - channels = ",".join(channels) - store_noncentral = ( - customisation_dict["store_noncentral"] == "True" - if "store_noncentral" in customisation_dict.keys() - else self.global_params.get("store_noncentral", False) - ) - compute_unc_variations = ( - customisation_dict["compute_unc_variations"] == "True" - if "compute_unc_variations" in customisation_dict.keys() - else self.global_params.get("compute_unc_variations", False) - ) + var = var if type(var) != dict else var["name"] + tmpFile = os.path.join(job_home, f"HistFromNtuple_{var}.root") + + HistFromNtupleProducer_cmd = [ + "python3", + HistFromNtupleProducer, + "--period", + self.period, + "--outFile", + tmpFile, + "--channels", + channels, + "--var", + var, + "--dataset_name", + dataset_name, + "--LAWrunVersion", + self.version, + ] + if compute_unc_histograms: + HistFromNtupleProducer_cmd.extend( + [ + "--compute_rel_weights", + "True", + "--compute_unc_variations", + "True", + ] + ) + if self.customisations: + HistFromNtupleProducer_cmd.extend( + [f"--customisations", self.customisations] + ) - fs_nanoAOD, _, _ = self.get_fs_nanoAOD(dataset_name) - input_file = self.remote_target(input_file_name, fs=fs_nanoAOD) + HistFromNtupleProducer_cmd.extend(local_inputs) + ps_call(HistFromNtupleProducer_cmd, verbose=1) - job_home, remove_job_home = self.law_job_home() - print(f"dataset_name: {dataset_name}") - print(f"process_group: {process_group}") - print(f"input_file = {input_file.uri()}") - - print("step 1: nanoAOD -> raw anaTuples") - outdir_anatuples = os.path.join(job_home, "rawAnaTuples") - anaTupleDef = os.path.join( - self.ana_path(), self.global_params["anaTupleDef"] - ) - reportFileName = "report.json" - rawReportPath = os.path.join(outdir_anatuples, reportFileName) - input_ok = True - with contextlib.ExitStack() as stack: - local_input = stack.enter_context(input_file.localize("r")).path - inFileName = os.path.basename(input_file.path) - print(f"inFileName {inFileName}") - anatuple_cmd = [ - "python3", - "-u", - producer_anatuples, - "--period", - self.period, - "--inFile", - local_input, - "--outDir", - outdir_anatuples, - "--dataset", - dataset_name, - "--anaTupleDef", - anaTupleDef, - "--channels", - channels, - "--inFileName", - inFileName, - "--reportOutput", - rawReportPath, - "--LAWrunVersion", - self.version, - "--output-name", - output_name, - ] - if compute_unc_variations: - anatuple_cmd.append("--compute-unc-variations") - if store_noncentral: - anatuple_cmd.append("--store-noncentral") - - if self.test > 0: - anatuple_cmd.extend(["--nEvents", str(self.test)]) - env = None - if self.global_params.get("use_cmssw_env_AnaTupleProduction", False): - env = self.cmssw_env - try: - ps_call(anatuple_cmd, env=env, verbose=1) - except PsCallError as e: - print(f"anaTupleProducer failed: {e}") - print("Checking input file integrity...") - input_ok = check_root_file_integrity(local_input, verbose=1) - if input_ok: - raise RuntimeError("anaTupleProducer failed.") - print( - "Input file is corrupted. Will create empty anaTuple and report." - ) - - producer_fuseTuples = os.path.join( - self.ana_path(), "FLAF", "AnaProd", "FuseAnaTuples.py" - ) - outdir_fusedTuples = os.path.join(job_home, "fusedAnaTuples") - outFileName = os.path.basename(input_file.path) - outFilePath = os.path.join(outdir_fusedTuples, outFileName) - finalReportPath = os.path.join(outdir_fusedTuples, reportFileName) - if input_ok: - print("step 2: raw anaTuples -> fused anaTuples") - verbosity = "1" - fuseTuple_cmd = [ - "python", - "-u", - producer_fuseTuples, - "--input-config", - rawReportPath, - "--work-dir", - outdir_fusedTuples, - "--tuple-output", - outFileName, - "--report-output", - reportFileName, - "--verbose", - verbosity, - ] - ps_call(fuseTuple_cmd, verbose=1) - else: - os.makedirs(outdir_fusedTuples, exist_ok=True) - Path(outFilePath).touch() - report = { - "valid": False, - "nano_file_name": inFileName, - "anaTuple_file_name": output_name, - "dataset_name": dataset_name, - } - with open(finalReportPath, "w") as f: - json.dump(report, f, indent=2) + with (self.output()).localize("w") as tmp_local_file: + out_local_path = tmp_local_file.path + shutil.move(tmpFile, out_local_path) - with self.output()["root"].localize("w") as local_file: - shutil.move(outFilePath, local_file.path) - with self.output()["report"].localize("w") as local_file: - shutil.move(finalReportPath, local_file.path) + delete_after_merge = False # var == self.global_config["variables"][-1] --> find more robust condition + if delete_after_merge: + print(f"Finished HistogramProducer, lets delete remote targets") + for remote_target in input_list_remote_target: + remote_target.remove() + with remote_target.localize("w") as tmp_local_file: + tmp_local_file.touch() # Create a dummy to avoid dependency crashes - if remove_job_home: - shutil.rmtree(job_home) + if remove_job_home: + shutil.rmtree(job_home) -class AnaTupleFileListBuilderTask(Task, HTCondorWorkflow, law.LocalWorkflow): - max_runtime = copy_param(HTCondorWorkflow.max_runtime, 24.0) - n_cpus = copy_param(HTCondorWorkflow.n_cpus, 1) +class HistMergerTask(Task, HTCondorWorkflow, law.LocalWorkflow): + max_runtime = copy_param(HTCondorWorkflow.max_runtime, 5.0) + n_cpus = copy_param(HTCondorWorkflow.n_cpus, 2) def workflow_requires(self): - input_file_task_complete = InputFileTask.WF_complete(self) - if not input_file_task_complete: + branch_map = self.create_branch_map() + + merge_organization_complete = AnaTupleFileListTask.req( + self, branches=() + ).complete() + if not merge_organization_complete: return { - "anaTuple": AnaTupleFileTask.req(self, branches=()), - "inputFile": InputFileTask.req(self, branches=()), + "AnaTupleFileListTask": AnaTupleFileListTask.req( + self, + branches=(), + max_runtime=AnaTupleFileListTask.max_runtime._default, + n_cpus=AnaTupleFileListTask.n_cpus._default, + ), + "HistFromNtupleProducerTask": HistFromNtupleProducerTask.req( + self, + branches=(), + ), } - AnaTuple_map = AnaTupleFileTask.req( - self, branch=-1, branches=() - ).create_branch_map() branch_set = set() - for idx, (dataset_name, process_group) in self.branch_map.items(): - for br_idx, (anaTuple_dataset_name, _, _) in AnaTuple_map.items(): - match = dataset_name == anaTuple_dataset_name - if not match and process_group == "data": - anaTuple_dataset = self.datasets[anaTuple_dataset_name] - anaTuple_process_group = anaTuple_dataset["process_group"] - match = anaTuple_process_group == "data" - if match: - branch_set.add(br_idx) + all_datasets = {} + for br_idx, (var, prod_br_list, dataset_names) in self.branch_map.items(): + all_datasets[var] = prod_br_list - deps = { - "AnaTupleFileTask": AnaTupleFileTask.req( - self, - branches=tuple(branch_set), - max_runtime=AnaTupleFileTask.max_runtime._default, - n_cpus=AnaTupleFileTask.n_cpus._default, + new_branchset = set() + for var in all_datasets.keys(): + new_branchset.update(all_datasets[var]) + + return { + "HistFromNtupleProducerTask": HistFromNtupleProducerTask.req( + self, branches=list(new_branchset) ) } - return deps def requires(self): - dataset_name, process_group = self.branch_data - AnaTuple_map = AnaTupleFileTask.req( - self, branch=-1, branches=() - ).create_branch_map() - branch_set = set() - for br_idx, (anaTuple_dataset_name, _, _) in AnaTuple_map.items(): - match = dataset_name == anaTuple_dataset_name - if not match and process_group == "data": - anaTuple_dataset = self.datasets[anaTuple_dataset_name] - anaTuple_process_group = anaTuple_dataset["process_group"] - match = anaTuple_process_group == "data" - if match: - branch_set.add(br_idx) - + var_name, br_indices, datasets = self.branch_data reqs = [ - AnaTupleFileTask.req( + HistFromNtupleProducerTask.req( self, - max_runtime=AnaTupleFileTask.max_runtime._default, + max_runtime=HistFromNtupleProducerTask.max_runtime._default, branch=prod_br, - branches=(prod_br,), + customisations=self.customisations, ) - for prod_br in tuple(branch_set) + for prod_br in tuple(set(br_indices)) ] + return reqs + @law.dynamic_workflow_condition + def workflow_condition(self): + return AnaTupleFileListTask.req(self, branch=-1, branches=()).complete() + + @workflow_condition.create_branch_map def create_branch_map(self): + HistFromNtupleProducerTask_branch_map = HistFromNtupleProducerTask.req( + self, branches=() + ).create_branch_map() + all_datasets = {} branches = {} k = 0 - data_done = False - for dataset_id, dataset_name in self.iter_datasets(): - dataset = self.datasets[dataset_name] - process_group = dataset["process_group"] - if process_group == "data": - if data_done: - continue # Will have multiple data datasets, but only need one branch - dataset_name = "data" - data_done = True - branches[k] = (dataset_name, process_group) + for br_idx, ( + var_name, + prod_br_list, + current_dataset, + ) in HistFromNtupleProducerTask_branch_map.items(): + var_name = ( + var_name.get("name", var_name) + if isinstance(var_name, dict) + else var_name + ) + if var_name not in all_datasets.keys(): + all_datasets[var_name] = [] + all_datasets[var_name].append((br_idx, current_dataset)) + for var_name, br_list in all_datasets.items(): + br_indices = [] + datasets = [] + for key in br_list: + idx, dataset_name = key + br_indices.append(idx) + datasets.append(dataset_name) + branches[k] = (var_name, br_indices, datasets) k += 1 return branches - def get_output_path(self, dataset_name, output_name): - output_file = f"{dataset_name}.json" - base_name = "AnaTupleFileList" - if output_name != "plan": - base_name += f"_{output_name}" - return os.path.join(self.version, base_name, self.period, output_file) - + @workflow_condition.output def output(self): - dataset_name, process_group = self.branch_data - outputs = {} - for output_name in ["plan", "reports"]: - output_path = self.get_output_path(dataset_name, output_name) - outputs[output_name] = self.remote_target(output_path, fs=self.fs_anaTuple) - return outputs + var_name, br_indices, datasets = self.branch_data + output_path = os.path.join(self.version, "Hists_merged", self.period, var_name) + output_file_name = os.path.join(output_path, f"{var_name}.root") + return self.remote_target(output_file_name, fs=self.fs_HistTuple) def run(self): - dataset_name, process_group = self.branch_data + var_name, br_indices, datasets = self.branch_data + customisation_dict = getCustomisationSplit(self.customisations) + + channels = ( + customisation_dict["channels"] + if "channels" in customisation_dict.keys() + else self.global_params["channelSelection"] + ) + # Channels from the yaml are a list, but the format we need for the ps_call later is 'ch1,ch2,ch3', basically join into a string separated by comma + if type(channels) == list: + channels = ",".join(channels) + + uncNames = ["Central"] + unc_cfg_dict = self.setup.weights_config + uncs_to_exclude = ( + self.global_params["uncs_to_exclude"][self.period] + if "uncs_to_exclude" in self.global_params.keys() + else [] + ) + compute_unc_histograms = ( + customisation_dict["compute_unc_histograms"] == "True" + if "compute_unc_histograms" in customisation_dict.keys() + else self.global_params.get("compute_unc_histograms", False) + ) + if compute_unc_histograms: + for uncName in list(unc_cfg_dict["norm"].keys()) + list( + unc_cfg_dict["shape"].keys() + ): + if uncName in uncs_to_exclude: + continue + uncNames.append(uncName) + + MergerProducer = os.path.join( + self.ana_path(), "FLAF", "Analysis", "HistMergerFromHists.py" + ) + HaddMergedHistsProducer = os.path.join( + self.ana_path(), "FLAF", "Analysis", "hadd_merged_hists.py" + ) + + all_datasets = [] + local_inputs = [] with contextlib.ExitStack() as stack: + for inp in self.input(): + dataset_name = os.path.basename(inp.path) + all_datasets.append(dataset_name.split(".")[0]) + local_inputs.append(stack.enter_context(inp.localize("r")).path) + dataset_names = ",".join(smpl for smpl in all_datasets) + all_outputs_merged = [] + if len(uncNames) == 1: + with self.output().localize("w") as outFile: + MergerProducer_cmd = [ + "python3", + MergerProducer, + "--outFile", + outFile.path, + "--var", + var_name, + "--dataset_names", + dataset_names, + "--uncSource", + uncNames[0], + "--channels", + channels, + "--period", + self.period, + "--LAWrunVersion", + self.version, + ] + MergerProducer_cmd.extend(local_inputs) + ps_call(MergerProducer_cmd, verbose=1) + else: + job_home, remove_job_home = self.law_job_home() + for uncName in uncNames: + final_histname = f"{var_name}_{uncName}.root" + tmp_outfile_merge = os.path.join(job_home, final_histname) + MergerProducer_cmd = [ + "python3", + MergerProducer, + "--outFile", + tmp_outfile_merge, + "--var", + var_name, + "--dataset_names", + dataset_names, + "--uncSource", + uncName, + "--channels", + channels, + "--period", + self.period, + "--LAWrunVersion", + self.version, + ] + MergerProducer_cmd.extend(local_inputs) + ps_call(MergerProducer_cmd, verbose=1) + all_outputs_merged.append(tmp_outfile_merge) + with self.output().localize("w") as outFile: + HaddMergedHistsProducer_cmd = [ + "python3", + HaddMergedHistsProducer, + "--outFile", + outFile.path, + "--var", + var_name, + ] + HaddMergedHistsProducer_cmd.extend(all_outputs_merged) + ps_call(HaddMergedHistsProducer_cmd, verbose=1) + if remove_job_home: + shutil.rmtree(job_home) + + +class AnalysisCacheTask(Task, HTCondorWorkflow, law.LocalWorkflow): + max_runtime = copy_param(HTCondorWorkflow.max_runtime, 2.0) + n_cpus = copy_param(HTCondorWorkflow.n_cpus, 1) + producer_to_run = luigi.Parameter() - print("Localizing inputs") - local_inputs = [ - stack.enter_context(inp["report"].localize("r")).path - for inp in self.input() + # Need to override this from HTCondorWorkflow to have separate data pathways for different cache tasks + def htcondor_output_directory(self): + return law.LocalDirectoryTarget(self.local_path(self.producer_to_run)) + + def __init__(self, *args, **kwargs): + # Needed to get the config and ht_condor_pathways figured out + super(AnalysisCacheTask, self).__init__(*args, **kwargs) + self.n_cpus = self.global_params["payload_producers"][self.producer_to_run].get( + "n_cpus", 1 + ) + self.max_runtime = self.global_params["payload_producers"][ + self.producer_to_run + ].get("max_runtime", 2.0) + self.output_file_extension = self.global_params["payload_producers"][ + self.producer_to_run + ].get("save_as", "root") + + def workflow_requires(self): + merge_organization_complete = AnaTupleFileListTask.req( + self, branches=() + ).complete() + if not merge_organization_complete: + req_dict = { + "AnaTupleFileListTask": AnaTupleFileListTask.req( + self, + branches=(), + max_runtime=AnaTupleFileListTask.max_runtime._default, + n_cpus=AnaTupleFileListTask.n_cpus._default, + ), + "AnaTupleMergeTask": AnaTupleMergeTask.req( + self, + branches=(), + max_runtime=AnaTupleMergeTask.max_runtime._default, + n_cpus=AnaTupleMergeTask.n_cpus._default, + ), + } + # Get all the producers to require for this dummy branch + producer_requires_set = set() + producer_dependencies = self.global_params["payload_producers"][ + self.producer_to_run + ]["dependencies"] + if producer_dependencies: + for dependency in producer_dependencies: + producer_requires_set.add(dependency) + req_dict["AnalysisCacheTask"] = [ + AnalysisCacheTask.req( + self, + branches=(), + customisations=self.customisations, + producer_to_run=producer_name, + ) + for producer_name in list(producer_requires_set) + if producer_name is not None ] - print(f"Localized {len(local_inputs)} inputs") + return req_dict - job_home, remove_job_home = self.law_job_home() + workflow_dict = {} + workflow_dict["anaTuple"] = { + br_idx: AnaTupleMergeTask.req( + self, + branch=prod_br, + branches=(), + max_runtime=AnaTupleMergeTask.max_runtime._default, + n_cpus=AnaTupleMergeTask.n_cpus._default, + ) + for br_idx, ( + dataset_name, + prod_br, + need_cache_global, + producer_list, + input_index, + ) in self.branch_map.items() + } + producer_dependencies = self.global_params["payload_producers"][ + self.producer_to_run + ]["dependencies"] + if producer_dependencies: + for dependency in producer_dependencies: + workflow_dict[dependency] = { + br_idx: AnalysisCacheTask.req( + self, + branch=br_idx, + branches=(), + customisations=self.customisations, + producer_to_run=dependency, + ) + for br_idx, _ in self.branch_map.items() + } + return workflow_dict - nEventsPerFile = self.setup.global_params.get( - "nEventsPerFile", {"data": 1_000_000} + def requires(self): + dataset_name, prod_br, need_cache_global, producer_list, input_index = ( + self.branch_data + ) + producer_dependencies = self.global_params["payload_producers"][ + self.producer_to_run + ]["dependencies"] + requirements = { + "anaTuple": AnaTupleMergeTask.req( + self, + branch=prod_br, + max_runtime=AnaTupleMergeTask.max_runtime._default, + branches=(), ) - if isinstance(nEventsPerFile, dict): - nEventsPerFile = nEventsPerFile.get(process_group, 100_000) - is_data = process_group == "data" + } + anaCaches = {} + if producer_dependencies: + for dependency in producer_dependencies: + anaCaches[dependency] = AnalysisCacheTask.req( + self, producer_to_run=dependency + ) + requirements["anaCaches"] = anaCaches - result = CreateMergePlan(self.setup, local_inputs, nEventsPerFile, is_data) + return requirements - for output_name, output_remote in self.output().items(): - output_path_tmp = os.path.join(job_home, f"{output_name}_tmp.json") - with open(output_path_tmp, "w") as f: - json.dump(result[output_name], f, indent=2) - with output_remote.localize("w") as output_localized: - shutil.move(output_path_tmp, output_localized.path) + @law.dynamic_workflow_condition + def workflow_condition(self): + return AnaTupleFileListTask.req(self, branch=-1, branches=()).complete() - if remove_job_home: - shutil.rmtree(job_home) + @workflow_condition.create_branch_map + def create_branch_map(self): + branches = HistTupleProducerTask.req( + self, branch=-1, branches=() + ).create_branch_map() + return branches + @workflow_condition.output + def output(self): + dataset_name, _, _, _, input_index = self.branch_data + inputFilePath = self.input()["anaTuple"][input_index].path + outFileNameWithoutExtension = os.path.basename(inputFilePath).split(".")[0] + outFileName = f"{outFileNameWithoutExtension}.{self.output_file_extension}" + output_path = os.path.join( + self.version, + "AnalysisCache", + self.producer_to_run, + self.period, + dataset_name, + outFileName, + ) + return self.remote_target(output_path, fs=self.fs_anaCacheTuple) -class AnaTupleFileListTask(AnaTupleFileListBuilderTask): - def workflow_requires(self): - return {"AnaTupleFileListBuilderTask": AnaTupleFileListBuilderTask.req(self)} + def run(self): + with ServiceThread() as service_thread: + dataset_name, prod_br, need_cache_global, producer_list, input_index = ( + self.branch_data + ) + analysis_cache_producer = os.path.join( + self.ana_path(), "FLAF", "Analysis", "AnalysisCacheProducer.py" + ) + customisation_dict = getCustomisationSplit(self.customisations) + channels = ( + customisation_dict["channels"] + if "channels" in customisation_dict.keys() + else self.global_params["channelSelection"] + ) + # Channels from the yaml are a list, but the format we need for the ps_call later is 'ch1,ch2,ch3', basically join into a string separated by comma + if type(channels) == list: + channels = ",".join(channels) + job_home, remove_job_home = self.law_job_home() + print(f"At job_home {job_home}") - def requires(self): - return AnaTupleFileListBuilderTask.req(self) + with contextlib.ExitStack() as stack: + # Enter a stack to maybe load the analysis cache files + input_file = self.input()["anaTuple"][input_index] + if len(self.input()["anaCaches"]) > 0: + local_anacaches = {} + for producer_name, cache_files in self.input()["anaCaches"].items(): + local_anacaches[producer_name] = stack.enter_context( + cache_files[input_index].localize("r") + ).path + local_anacaches_str = ",".join( + f"{producer}:{path}" + for producer, path in local_anacaches.items() + ) + print(f"Task has cache input files {local_anacaches_str}") + else: + local_anacaches_str = "" + + output_file = self.output() + print(f"considering dataset {dataset_name}, and file {input_file.path}") + customisation_dict = getCustomisationSplit(self.customisations) + tmpFile = os.path.join( + job_home, f"AnalysisCacheTask.{self.output_file_extension}" + ) + with input_file.localize("r") as local_input: + analysisCacheProducer_cmd = [ + "python3", + analysis_cache_producer, + "--period", + self.period, + "--inFile", + local_input.path, + "--outFile", + tmpFile, + "--dataset", + dataset_name, + "--channels", + channels, + "--producer", + self.producer_to_run, + "--workingDir", + job_home, + "--LAWrunVersion", + self.version, + ] + if ( + self.global_params["store_noncentral"] + and dataset_name != "data" + ): + analysisCacheProducer_cmd.extend( + ["--compute_unc_variations", "True"] + ) + if len(local_anacaches_str) > 0: + analysisCacheProducer_cmd.extend( + ["--cacheFiles", local_anacaches_str] + ) + # Check if cmssw env is required + prod_env = ( + self.cmssw_env + if self.global_params["payload_producers"][ + self.producer_to_run + ].get("cmssw_env", False) + else None + ) - def output(self): - dataset_name, process_group = self.branch_data - return self.local_target(self.get_output_path(dataset_name, "plan")) + histTupleDef = os.path.join( + self.ana_path(), self.global_params["histTupleDef"] + ) + analysisCacheProducer_cmd.extend(["--histTupleDef", histTupleDef]) - def run(self): - with self.input()["plan"].localize("r") as input_local: - self.output().makedirs() - shutil.copy(input_local.path, self.output().path) + ps_call(analysisCacheProducer_cmd, env=prod_env, verbose=1) + print( + f"Finished producing payload for producer={self.producer_to_run} with name={dataset_name}, file={input_file.path}" + ) + with output_file.localize("w") as tmp_local_file: + out_local_path = tmp_local_file.path + shutil.move(tmpFile, out_local_path) + if remove_job_home: + shutil.rmtree(job_home) -class AnaTupleMergeTask(Task, HTCondorWorkflow, law.LocalWorkflow): - max_runtime = copy_param(HTCondorWorkflow.max_runtime, 48.0) - n_cpus = copy_param(HTCondorWorkflow.n_cpus, 2) - delete_inputs_after_merge = luigi.BoolParameter(default=False) +class HistPlotTask(Task, HTCondorWorkflow, law.LocalWorkflow): + max_runtime = copy_param(HTCondorWorkflow.max_runtime, 2.0) + n_cpus = copy_param(HTCondorWorkflow.n_cpus, 1) def workflow_requires(self): merge_organization_complete = AnaTupleFileListTask.req( self, branches=() ).complete() if not merge_organization_complete: - return { - "AnaTupleFileListTask": AnaTupleFileListTask.req( - self, - branches=(), - max_runtime=AnaTupleFileListTask.max_runtime._default, - n_cpus=AnaTupleFileListTask.n_cpus._default, - ), - } + req_dict = {} + req_dict["HistMergerTask"] = HistMergerTask.req( + self, branches=(), customisations=self.customisations + ) + req_dict["AnaTupleFileListTask"] = AnaTupleFileListTask.req( + self, + branches=(), + max_runtime=AnaTupleFileListTask.max_runtime._default, + n_cpus=AnaTupleFileListTask.n_cpus._default, + ) + return req_dict + merge_map = HistMergerTask.req( + self, branch=-1, branches=(), customisations=self.customisations + ).create_branch_map() branch_set = set() - for _, ( - _, - _, - ds_branch, - dataset_dependencies, - _, - _, - _, - ) in self.branch_map.items(): - branch_set.add(ds_branch) - branch_set.update(dataset_dependencies.values()) + for br_idx, (var) in self.branch_map.items(): + for br, (v, _, _) in merge_map.items(): + if v == var: + branch_set.add(br) return { - "AnaTupleFileListTask": AnaTupleFileListTask.req( + "merge": HistMergerTask.req( self, branches=tuple(branch_set), - max_runtime=AnaTupleFileListTask.max_runtime._default, - n_cpus=AnaTupleFileListTask.n_cpus._default, + customisations=self.customisations, ) } def requires(self): - # Need both the AnaTupleFileTask for the input ROOT file, and the AnaTupleFileListTask for the json structure - ( - dataset_name, - process_group, - ds_branch, - dataset_dependencies, - input_file_list, - _, - skip_future_tasks, - ) = self.branch_data - anaTuple_branch_map = AnaTupleFileTask.req( - self, branch=-1, branches=() - ).create_branch_map() - required_branches = {"root": {}} - for prod_br, ( - anaTuple_dataset_name, - anaTuple_input_file, - anaTuple_output_name, - ) in anaTuple_branch_map.items(): - match = dataset_name == anaTuple_dataset_name - if not match and process_group == "data": - anaTuple_dataset = self.datasets[anaTuple_dataset_name] - anaTuple_process_group = anaTuple_dataset["process_group"] - match = anaTuple_process_group == "data" - dependency_type = None - if match: - key = f"{anaTuple_dataset_name}/{anaTuple_output_name}" - if key in input_file_list: - dependency_type = "root" - if dependency_type: - if anaTuple_dataset_name not in required_branches[dependency_type]: - required_branches[dependency_type][anaTuple_dataset_name] = [] - required_branches[dependency_type][anaTuple_dataset_name].append( - AnaTupleFileTask.req( - self, - max_runtime=AnaTupleFileTask.max_runtime._default, - branch=prod_br, - branches=(prod_br,), - ) - ) + var = self.branch_data - required_branches["json"] = {} - if process_group != "data": - anaTupleFileListBuilder_branch_map = AnaTupleFileListBuilderTask.req( - self, branch=-1, branches=() - ).create_branch_map() - - for builder_branch, ( - builder_dataset_name, - _, - ) in anaTupleFileListBuilder_branch_map.items(): - if ( - builder_dataset_name == dataset_name - or builder_dataset_name in dataset_dependencies - ): - required_branches["json"][builder_dataset_name] = ( - AnaTupleFileListBuilderTask.req( - self, - max_runtime=AnaTupleFileListBuilderTask.max_runtime._default, - branch=builder_branch, - branches=(builder_branch,), - ) - ) + merge_map = HistMergerTask.req( + self, branch=-1, branches=(), customisations=self.customisations + ).create_branch_map() + merge_branch = next(br for br, (v, _, _) in merge_map.items() if v == var) - return required_branches + return HistMergerTask.req( + self, + branch=merge_branch, + customisations=self.customisations, + max_runtime=HistMergerTask.max_runtime._default, + ) @law.dynamic_workflow_condition def workflow_condition(self): @@ -570,139 +1066,304 @@ def workflow_condition(self): @workflow_condition.create_branch_map def create_branch_map(self): branches = {} - nBranch = 0 - ds_branch_map = AnaTupleFileListTask.req( - self, branch=-1, branches=() + merge_map = HistMergerTask.req( + self, branch=-1, branches=(), customisations=self.customisations ).create_branch_map() + var_dict = {} + for var in self.global_params["variables"]: + var_name = var if isinstance(var, str) else var["name"] + var_dict[var_name] = var + for k, (_, (var, _, _)) in enumerate(merge_map.items()): + # Check if we want to plot this var in the global config + if isinstance(var_dict[var], dict): + if var_dict[var].get("plot_task", True): + branches[k] = var + else: + branches[k] = var + return branches - ds_branches = {} - for ds_branch, (dataset_name, process_group) in ds_branch_map.items(): - if dataset_name in ds_branches: - raise RuntimeError( - f"Dataset {dataset_name} appears multiple times in AnaTupleFileListTask branch map!" - ) - ds_branches[dataset_name] = ds_branch + @workflow_condition.output + def output(self): + var = self.branch_data + outputs = {} + customisation_dict = getCustomisationSplit(self.customisations) - for ds_branch, (dataset_name, process_group) in ds_branch_map.items(): - dataset_dependencies = self.collect_extra_dependencies( - dataset_name, ds_branches, process_group + channels = customisation_dict.get( + "channels", self.global_params["channelSelection"] + ) + if isinstance(channels, str): + channels = channels.split(",") + + base_cats = self.global_params.get("categories") or [] + boosted_cats = self.global_params.get("boosted_categories") or [] + categories = base_cats + boosted_cats + if isinstance(categories, str): + categories = categories.split(",") + + custom_region_name = self.global_params.get("custom_regions") + + custom_regions = customisation_dict.get( + custom_region_name, self.global_params[custom_region_name] + ) + + for ch in channels: + for cat in categories: + for custom_region in custom_regions: + rel_path = os.path.join( + self.version, + "Plots", + self.period, + var, + custom_region, + cat, + f"{ch}_{var}.pdf", + ) + outputs[f"{ch}:{cat}:{custom_region}"] = self.remote_target( + rel_path, fs=self.fs_plots + ) + return outputs + + def run(self): + var = self.branch_data + era = self.period + ver = self.version + customisation_dict = getCustomisationSplit(self.customisations) + + plotter = os.path.join(self.ana_path(), "FLAF", "Analysis", "HistPlotter.py") + + def bool_flag(key, default): + return ( + customisation_dict.get( + key, str(self.global_params.get(key, default)) + ).lower() + == "true" ) - this_dataset_dict = self.setup.getAnaTupleFileList( - dataset_name, - AnaTupleFileListTask.req(self, branch=ds_branch, branches=()).output(), + + plot_unc = bool_flag("plot_unc", True) + plot_wantData = bool_flag(f"plot_wantData_{var}", True) + plot_wantSignals = bool_flag("plot_wantSignals", True) + plot_wantQCD = bool_flag("plot_wantQCD", False) + plot_rebin = bool_flag("plot_rebin", True) + plot_analysis = customisation_dict.get( + "plot_analysis", self.global_params.get("plot_analysis", "") + ) + + with self.input().localize("r") as local_input: + infile = local_input.path + print("Loading fname", infile) + + # Create list of all keys and all targets + key_list = [] + output_list = [] + for output_key, output_target in self.output().items(): + if (output_target).exists(): + print(f"Output for {var} {output_target} already exists! Continue") + continue + key_list.append(output_key) + output_list.append(output_target) + + # Now localize all output_targets + with contextlib.ExitStack() as stack: + local_outputs = [ + stack.enter_context((output).localize("w")).path + for output in output_list + ] + cmd = [ + "python3", + plotter, + "--inFile", + infile, + "--all_outFiles", + ",".join(local_outputs), + "--globalConfig", + os.path.join( + self.ana_path(), + self.global_params["analysis_config_area"], + "global.yaml", + ), + "--var", + var, + "--all_keys", + ",".join(key_list), + "--year", + era, + "--analysis", + plot_analysis, + "--ana_path", + self.ana_path(), + "--period", + self.period, + "--LAWrunVersion", + self.version, + ] + if plot_wantData: + cmd.append("--wantData") + if plot_wantSignals: + cmd.append("--wantSignals") + if plot_wantQCD: + cmd += ["--wantQCD", "true"] + if plot_rebin: + cmd += ["--rebin", "true"] + ps_call(cmd, verbose=1) + + +class AnalysisCacheAggregationTask(Task, HTCondorWorkflow, law.LocalWorkflow): + max_runtime = copy_param(HTCondorWorkflow.max_runtime, 2.0) + n_cpus = copy_param(HTCondorWorkflow.n_cpus, 1) + producer_to_aggregate = luigi.Parameter() + + def __init__(self, *args, **kwargs): + super(AnalysisCacheAggregationTask, self).__init__(*args, **kwargs) + + @law.dynamic_workflow_condition + def workflow_condition(self): + return AnaTupleFileListTask.req(self, branch=-1, branches=()).complete() + + def workflow_requires(self): + merge_organization_complete = AnaTupleFileListTask.req( + self, branches=() + ).complete() + payload_producers = self.global_params["payload_producers"] + if not merge_organization_complete: + deps = { + "AnaTupleFileListTask": AnaTupleFileListTask.req( + self, + branches=(), + max_runtime=AnaTupleFileListTask.max_runtime._default, + n_cpus=AnaTupleFileListTask.n_cpus._default, + ), + } + + deps["AnalysisCacheTask"] = AnalysisCacheTask.req( + self, + branches=(), + max_runtime=AnalysisCacheTask.max_runtime._default, + n_cpus=AnalysisCacheTask.n_cpus._default, + customisations=self.customisations, + producer_to_run=self.producer_to_aggregate, ) - for this_dict in this_dataset_dict: - input_file_list = this_dict["inputs"] - output_file_list = this_dict["outputs"] - skip_future_tasks = this_dict["n_events"] == 0 - branches[nBranch] = ( - dataset_name, - process_group, - ds_branch, - dataset_dependencies, - input_file_list, - output_file_list, - skip_future_tasks, - ) - nBranch += 1 - return branches + return deps + + deps = {} + producers_cache_branch_map = AnalysisCacheTask.req( + self, branch=-1, branches=(), producer_to_run=self.producer_to_aggregate + ).create_branch_map() + branches = [b for b in producers_cache_branch_map.keys()] + deps["AnalysisCacheTask"] = AnalysisCacheTask.req( + self, + branches=tuple(branches), + max_runtime=AnalysisCacheTask.max_runtime._default, + n_cpus=AnalysisCacheTask.n_cpus._default, + customisations=self.customisations, + producer_to_run=self.producer_to_aggregate, + ) + return deps - def collect_extra_dependencies(self, dataset_name, ds_branches, process_group): - other_datasets = {} - if process_group != "data": - dataset = self.datasets[dataset_name] - processors = self.setup.get_processors( - dataset["process_name"], stage="AnaTupleMerge" + def requires(self): + # I don't need to check here that this producer applies to target group + # the reason is that if its in the branch map - it already was checked + sample_name, list_of_producer_cache_keys = self.branch_data + reqs = [ + AnalysisCacheTask.req( + self, + max_runtime=AnalysisCacheTask.max_runtime._default, + branch=prod_br, + customisations=self.customisations, + producer_to_run=self.producer_to_aggregate, ) - require_whole_process = any( - p.get("dependency_level", {}).get("AnaTupleMerge", "file") == "process" - for p in processors + for prod_br in list_of_producer_cache_keys + ] + return reqs + + @workflow_condition.create_branch_map + def create_branch_map(self): + # structure of branch map + # ---- name of sample, + # ---- list of branch indices of the AnalysisCacheTask(producer_to_run=producer_name) + + branches = {} + branch_idx = 0 + + payload_producers = self.global_params["payload_producers"] + producer_cfg = payload_producers[self.producer_to_aggregate] + producer_cache_branch_map = AnalysisCacheTask.req( + self, branch=-1, branches=(), producer_to_run=self.producer_to_aggregate + ).create_branch_map() + + # find which branches of this producer correspond to each sample + sample_branch_map = {} + for producer_cache_branch_idx, ( + sample_name, + _, + _, + _, + _, + ) in producer_cache_branch_map.items(): + if sample_name not in sample_branch_map: + sample_branch_map[sample_name] = [] + sample_branch_map[sample_name].append(producer_cache_branch_idx) + + target_groups = producer_cfg.get("target_groups", None) + + for sample_name, list_of_producer_cache_keys in sample_branch_map.items(): + process_group = ( + self.datasets[sample_name]["process_group"] + if sample_name != "data" + else "data" ) - if require_whole_process: - process = self.setup.base_processes[dataset["process_name"]] - for p_dataset_name in process.get("datasets", []): - if p_dataset_name != dataset_name: - other_datasets[p_dataset_name] = ds_branches[p_dataset_name] - return other_datasets + applies_for_group = target_groups is None or process_group in target_groups + if applies_for_group: + branches[branch_idx] = (sample_name, list_of_producer_cache_keys) + branch_idx += 1 + + return branches @workflow_condition.output def output(self): - ( - dataset_name, - process_group, - ds_branch, - dataset_dependencies, - input_file_list, - output_file_list, - skip_future_tasks, - ) = self.branch_data - output_dir = os.path.join(self.version, "AnaTuples", self.period, dataset_name) - outputs = [os.path.join(output_dir, out_file) for out_file in output_file_list] - return [ - self.remote_target(out_path, fs=self.fs_anaTuple) for out_path in outputs - ] + sample_name, _ = self.branch_data + extension = self.global_params["payload_producers"][ + self.producer_to_aggregate + ].get("save_as", "root") + output_name = f"aggregatedCache.{extension}" + return self.local_target(sample_name, self.producer_to_aggregate, output_name) def run(self): - ( - dataset_name, - process_group, - ds_branch, - dataset_dependencies, - input_file_list, - output_file_list, - skip_future_tasks, - ) = self.branch_data - is_data = process_group == "data" - job_home, remove_job_home = self.law_job_home() - tmpFiles = [ - os.path.join(job_home, f"AnaTupleMergeTask_{dataset_name}_{i}.root") - for i in range(len(self.output())) - ] - print(f"dataset: {dataset_name}") + sample_name, _ = self.branch_data + producers = self.global_params["payload_producers"] + cacheAggregator = os.path.join( + self.ana_path(), "FLAF", "Analysis", "AnalysisCacheAggregator.py" + ) with contextlib.ExitStack() as stack: - - print("Localizing root inputs") - local_root_inputs = [] - for ds_name, files in self.input()["root"].items(): - for file_list in files: - local_input = stack.enter_context( - file_list["root"].localize("r") - ).path - local_root_inputs.append(local_input) - print(f"Localized {len(local_root_inputs)} root inputs") - - print("Localizing reports") - reports = {} - for ds_name, file_list in self.input()["json"].items(): - report_file = stack.enter_context( - file_list["reports"].localize("r") - ).path - with open(report_file, "r") as f: - ds_reports = yaml.safe_load(f) - reports[ds_name] = list(ds_reports.values()) - print(f"Localized {len(reports)} reports") - - mergeAnaTuples( - setup=self.setup, - dataset_name=dataset_name, - is_data=is_data, - work_dir=job_home, - input_reports=reports, - input_roots=local_root_inputs, - root_outputs=tmpFiles, + local_output = self.output() + inputs = self.input() + local_inputs = [ + stack.enter_context(inp.localize("r")).path for inp in inputs + ] + assert local_inputs, "`local_inputs` must be a non-empty list" + producer_cfg = producers[self.producer_to_aggregate] + ext = producer_cfg.get("save_as", "root") + job_home, remove_job_home = self.law_job_home() + tmpFile = os.path.join(job_home, f"aggregatedCache_tmp.{ext}") + aggregate_cmd = [ + "python3", + cacheAggregator, + "--outFile", + tmpFile, + "--period", + self.period, + "--producer", + self.producer_to_aggregate, + "--LAWrunVersion", + self.version, + ] + aggregate_cmd.append("--inputFiles") + aggregate_cmd.extend(local_inputs) + ps_call(aggregate_cmd, verbose=1) + + # For local target: ensure parent directory exists and move directly + out_local_path = local_output.path + local_output.parent.touch() # Creates parent directories if needed + shutil.move(tmpFile, out_local_path) + print( + f"Creating aggregated cache for producer {self.producer_to_aggregate} and dataset {sample_name} at {out_local_path}" ) - - for outFile, tmpFile in zip(self.output(), tmpFiles): - with outFile.localize("w") as tmp_local_file: - out_local_path = tmp_local_file.path - shutil.move(tmpFile, out_local_path) - - if self.delete_inputs_after_merge: - print(f"Finished merging, lets delete remote AnaTupleFile targets") - for ds_name, files in self.input()["root"].items(): - for remote_targets in files: - for target in remote_targets: - target.remove() - - if remove_job_home: - shutil.rmtree(job_home) From ff203edabbb9d39d669e962dd5983781ae6381ba Mon Sep 17 00:00:00 2001 From: Valeria D'Amante Date: Wed, 1 Apr 2026 21:21:41 +0200 Subject: [PATCH 7/9] fixed golden json file --- config/Run3_2025/global.yaml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/config/Run3_2025/global.yaml b/config/Run3_2025/global.yaml index 4cfd91df..36e30b2b 100644 --- a/config/Run3_2025/global.yaml +++ b/config/Run3_2025/global.yaml @@ -11,4 +11,4 @@ MET_flags: # https://twiki.cern.ch/twiki/bin/view/CMS/MissingETOptionalFiltersR - Flag_hfNoisyHitsFilter - Flag_eeBadScFilter - Flag_ecalBadCalibFilter -lumiFile: Corrections/data/golden_json/Cert_Collisions2024_378981_386951_Golden.json +lumiFile: Corrections/data/golden_json/Cert_Collisions2025_391658_398903_Golden.json From 4f775a774e475b46e65f317e2dbd0261e54fbbf8 Mon Sep 17 00:00:00 2001 From: Valeria D'Amante Date: Wed, 1 Apr 2026 21:32:09 +0200 Subject: [PATCH 8/9] restored 2024 and added 2025 for HHBtagScores --- include/HHbTagScores.h | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/include/HHbTagScores.h b/include/HHbTagScores.h index b92a12a8..dfc64b6e 100644 --- a/include/HHbTagScores.h +++ b/include/HHbTagScores.h @@ -21,7 +21,9 @@ inline int PeriodToHHbTagInput(int version, Period period) { {{1, Period::Run3_2023}, 2018}, {{1, Period::Run3_2023BPix}, 2018}, {{1, Period::Run3_2024}, 2018}, - {{1, Period::Run3_2024},52018}, + {{1, Period::Run3_2024},2018}, + {{1, Period::Run3_2025},2018}, + // v2 {{2, Period::Run2_2016_HIPM}, 2016}, {{2, Period::Run2_2016}, 2016}, @@ -32,7 +34,9 @@ inline int PeriodToHHbTagInput(int version, Period period) { {{2, Period::Run3_2023}, 2018}, {{2, Period::Run3_2023BPix}, 2018}, {{2, Period::Run3_2024}, 2018}, - {{2, Period::Run3_2024},52018}, + {{2, Period::Run3_2024},2018}, + {{2, Period::Run3_2025},2018}, + // v3 {{3, Period::Run2_2016_HIPM}, 0}, {{3, Period::Run2_2016}, 0}, From 1a763ca4fae3c9b11dc99f98d12f71f6b9d7cb6b Mon Sep 17 00:00:00 2001 From: Valeria D'Amante Date: Wed, 1 Apr 2026 21:39:47 +0200 Subject: [PATCH 9/9] removed duplicated 2024 entry --- include/HHbTagScores.h | 6 ++---- 1 file changed, 2 insertions(+), 4 deletions(-) diff --git a/include/HHbTagScores.h b/include/HHbTagScores.h index dfc64b6e..7386964e 100644 --- a/include/HHbTagScores.h +++ b/include/HHbTagScores.h @@ -21,8 +21,7 @@ inline int PeriodToHHbTagInput(int version, Period period) { {{1, Period::Run3_2023}, 2018}, {{1, Period::Run3_2023BPix}, 2018}, {{1, Period::Run3_2024}, 2018}, - {{1, Period::Run3_2024},2018}, - {{1, Period::Run3_2025},2018}, + {{1, Period::Run3_2025}, 2018}, // v2 {{2, Period::Run2_2016_HIPM}, 2016}, @@ -34,8 +33,7 @@ inline int PeriodToHHbTagInput(int version, Period period) { {{2, Period::Run3_2023}, 2018}, {{2, Period::Run3_2023BPix}, 2018}, {{2, Period::Run3_2024}, 2018}, - {{2, Period::Run3_2024},2018}, - {{2, Period::Run3_2025},2018}, + {{2, Period::Run3_2025}, 2018}, // v3 {{3, Period::Run2_2016_HIPM}, 0},