From e90d4ddf9c3cf4d61a1283bf894e7e558e6fda3a Mon Sep 17 00:00:00 2001 From: Jihoon Son Date: Mon, 6 Apr 2026 16:04:00 -0700 Subject: [PATCH] Add --clean_local_dir option to free disk space between queries Spark's block manager shuffle files can accumulate and fill the disk before a full power run completes. This adds a --clean_local_dir flag that deletes shuffle files from Spark's block manager local directories after each query, while preserving the subdirectory structure that DiskBlockManager expects. Co-Authored-By: Claude Opus 4.6 (1M context) Signed-off-by: Jihoon Son --- nds-h/nds_h_power.py | 38 ++++++++++++++++++++++++++++++++++++-- nds/nds_power.py | 38 ++++++++++++++++++++++++++++++++++++-- 2 files changed, 72 insertions(+), 4 deletions(-) diff --git a/nds-h/nds_h_power.py b/nds-h/nds_h_power.py index b6a4cff..a2fc8d4 100644 --- a/nds-h/nds_h_power.py +++ b/nds-h/nds_h_power.py @@ -32,6 +32,7 @@ import argparse import csv +import shutil import time from collections import OrderedDict from pyspark.sql import SparkSession @@ -213,6 +214,31 @@ def run_one_query(spark_session, f.write(plans[plan_type]) +def clean_block_manager_dirs(spark_session): + """Remove shuffle/block data from Spark's block manager local directories. + + Uses Spark's internal DiskBlockManager to get the exact directories owned by + this application, then deletes files while preserving the directory structure + (Spark's DiskBlockManager pre-creates a hash-based subdirectory pool and + expects those subdirectories to exist). + """ + jsc = spark_session.sparkContext._jsc + JArray = spark_session.sparkContext._gateway.jvm.java.lang.reflect.Array + local_dirs = jsc.sc().env().blockManager().diskBlockManager().localDirs() + total_freed = 0 + for i in range(JArray.getLength(local_dirs)): + dir_path = JArray.get(local_dirs, i).getAbsolutePath() + if not os.path.isdir(dir_path): + continue + for dp, _, fns in os.walk(dir_path): + for f in fns: + fp = os.path.join(dp, f) + total_freed += os.path.getsize(fp) + os.remove(fp) + if total_freed > 0: + print(f"Cleaned block manager local dirs, freed {total_freed / (1024**3):.2f} GB") + + def get_query_subset(query_dict, subset): """Get a subset of queries from query_dict. The subset is specified by a list of query names. @@ -237,7 +263,8 @@ def run_query_stream(input_prefix, save_plan_path=None, skip_execution=False, profiling_hook=None, - app_name=None): + app_name=None, + clean_local_dir=False): """run SQL in Spark and record execution time log. The execution time log is saved as a CSV file for easy accessibility. TempView Creation time is also recorded. @@ -315,6 +342,8 @@ def run_query_stream(input_prefix, else: summary_prefix = os.path.join(json_summary_folder, '') q_report.write_summary(prefix=summary_prefix) + if clean_local_dir: + clean_block_manager_dirs(spark_session) clearQueryName(spark_session) power_end = int(time.time()) power_elapse = int((power_end - power_start)*1000) @@ -436,6 +465,10 @@ def load_properties(filename): parser.add_argument('--app_name', help='The name of the application. If not specified, the default name will be "NDS-H - Power Run".', default=None) + parser.add_argument('--clean_local_dir', + action='store_true', + help='Clean Spark block manager local directories after each query to free disk space. ' + 'Useful for large scale factors where shuffle data can fill up the disk.') args = parser.parse_args() query_dict = gen_sql_from_stream(args.query_stream_file) run_query_stream(args.input_prefix, @@ -454,4 +487,5 @@ def load_properties(filename): args.save_plan_path, args.skip_execution, args.profiling_hook, - args.app_name) + args.app_name, + args.clean_local_dir) diff --git a/nds/nds_power.py b/nds/nds_power.py index 8e82dd1..0d09395 100644 --- a/nds/nds_power.py +++ b/nds/nds_power.py @@ -34,6 +34,7 @@ import csv import os import re +import shutil import sys import time from collections import OrderedDict @@ -331,6 +332,31 @@ def deduplicate(column_names): return df.toDF(*dedup_col_names) +def clean_block_manager_dirs(spark_session): + """Remove shuffle/block data from Spark's block manager local directories. + + Uses Spark's internal DiskBlockManager to get the exact directories owned by + this application, then deletes files while preserving the directory structure + (Spark's DiskBlockManager pre-creates a hash-based subdirectory pool and + expects those subdirectories to exist). + """ + jsc = spark_session.sparkContext._jsc + JArray = spark_session.sparkContext._gateway.jvm.java.lang.reflect.Array + local_dirs = jsc.sc().env().blockManager().diskBlockManager().localDirs() + total_freed = 0 + for i in range(JArray.getLength(local_dirs)): + dir_path = JArray.get(local_dirs, i).getAbsolutePath() + if not os.path.isdir(dir_path): + continue + for dp, _, fns in os.walk(dir_path): + for f in fns: + fp = os.path.join(dp, f) + total_freed += os.path.getsize(fp) + os.remove(fp) + if total_freed > 0: + print(f"Cleaned block manager local dirs, freed {total_freed / (1024**3):.2f} GB") + + def get_query_subset(query_dict, subset): """Get a subset of queries from query_dict. The subset is specified by a list of query names. @@ -375,7 +401,8 @@ def run_query_stream(input_prefix, profiling_hook=None, save_plan_path=None, skip_execution=False, - app_name=None): + app_name=None, + clean_local_dir=False): """run SQL in Spark and record execution time log. The execution time log is saved as a CSV file for easy accesibility. TempView Creation time is also recorded. @@ -487,6 +514,8 @@ def run_query_stream(input_prefix, else: summary_prefix = os.path.join(json_summary_folder, '') q_report.write_summary(prefix=summary_prefix) + if clean_local_dir: + clean_block_manager_dirs(spark_session) clearQueryName(spark_session) power_end = int(time.time()) power_elapse = int((power_end - power_start)*1000) @@ -644,6 +673,10 @@ def load_properties(filename): help='The name of the application. If not specified, the default name will be "NDS - Power Run", ' 'or "NDS - " when running a single query.', default=None) + parser.add_argument('--clean_local_dir', + action='store_true', + help='Clean Spark block manager local directories after each query to free disk space. ' + 'Useful for large scale factors where shuffle data can fill up the disk.') query_filter_group.add_argument('--sub_queries', type=lambda s: [x.strip() for x in s.split(',')], help='comma separated list of queries to run. If this is specified, sub_query_patterns should be empty. ' + @@ -681,4 +714,5 @@ def load_properties(filename): args.profiling_hook, args.save_plan_path, args.skip_execution, - args.app_name) + args.app_name, + args.clean_local_dir)