Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
38 changes: 36 additions & 2 deletions nds-h/nds_h_power.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@

import argparse
import csv
import shutil
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P2 Unused shutil import

shutil is imported but never called — clean_block_manager_dirs uses only os.remove and os.walk. This import can be removed.

Suggested change
import shutil

import time
from collections import OrderedDict
from pyspark.sql import SparkSession
Expand Down Expand Up @@ -213,6 +214,31 @@ def run_one_query(spark_session,
f.write(plans[plan_type])


def clean_block_manager_dirs(spark_session):
"""Remove shuffle/block data from Spark's block manager local directories.

Uses Spark's internal DiskBlockManager to get the exact directories owned by
this application, then deletes files while preserving the directory structure
(Spark's DiskBlockManager pre-creates a hash-based subdirectory pool and
expects those subdirectories to exist).
"""
jsc = spark_session.sparkContext._jsc
JArray = spark_session.sparkContext._gateway.jvm.java.lang.reflect.Array
local_dirs = jsc.sc().env().blockManager().diskBlockManager().localDirs()
total_freed = 0
for i in range(JArray.getLength(local_dirs)):
dir_path = JArray.get(local_dirs, i).getAbsolutePath()
if not os.path.isdir(dir_path):
continue
for dp, _, fns in os.walk(dir_path):
for f in fns:
fp = os.path.join(dp, f)
total_freed += os.path.getsize(fp)
os.remove(fp)
Comment on lines +234 to +237
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P2 No error handling on file size + delete

Same race as nds/nds_power.py: if Spark concurrently removes a file between os.path.getsize and os.remove, the unhandled FileNotFoundError aborts the benchmark. A try/except OSError around the pair makes cleanup resilient.

Suggested change
for f in fns:
fp = os.path.join(dp, f)
total_freed += os.path.getsize(fp)
os.remove(fp)
try:
total_freed += os.path.getsize(fp)
os.remove(fp)
except OSError:
pass

if total_freed > 0:
print(f"Cleaned block manager local dirs, freed {total_freed / (1024**3):.2f} GB")


def get_query_subset(query_dict, subset):
"""Get a subset of queries from query_dict.
The subset is specified by a list of query names.
Expand All @@ -237,7 +263,8 @@ def run_query_stream(input_prefix,
save_plan_path=None,
skip_execution=False,
profiling_hook=None,
app_name=None):
app_name=None,
clean_local_dir=False):
"""run SQL in Spark and record execution time log. The execution time log is saved as a CSV file
for easy accessibility. TempView Creation time is also recorded.

Expand Down Expand Up @@ -315,6 +342,8 @@ def run_query_stream(input_prefix,
else:
summary_prefix = os.path.join(json_summary_folder, '')
q_report.write_summary(prefix=summary_prefix)
if clean_local_dir:
clean_block_manager_dirs(spark_session)
clearQueryName(spark_session)
power_end = int(time.time())
power_elapse = int((power_end - power_start)*1000)
Expand Down Expand Up @@ -436,6 +465,10 @@ def load_properties(filename):
parser.add_argument('--app_name',
help='The name of the application. If not specified, the default name will be "NDS-H - Power Run".',
default=None)
parser.add_argument('--clean_local_dir',
action='store_true',
help='Clean Spark block manager local directories after each query to free disk space. '
'Useful for large scale factors where shuffle data can fill up the disk.')
args = parser.parse_args()
query_dict = gen_sql_from_stream(args.query_stream_file)
run_query_stream(args.input_prefix,
Expand All @@ -454,4 +487,5 @@ def load_properties(filename):
args.save_plan_path,
args.skip_execution,
args.profiling_hook,
args.app_name)
args.app_name,
args.clean_local_dir)
38 changes: 36 additions & 2 deletions nds/nds_power.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
import csv
import os
import re
import shutil
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P2 Unused shutil import

shutil is imported but never called — clean_block_manager_dirs uses only os.remove and os.walk. This import can be removed.

Suggested change
import shutil

import sys
import time
from collections import OrderedDict
Expand Down Expand Up @@ -331,6 +332,31 @@ def deduplicate(column_names):
return df.toDF(*dedup_col_names)


def clean_block_manager_dirs(spark_session):
"""Remove shuffle/block data from Spark's block manager local directories.

Uses Spark's internal DiskBlockManager to get the exact directories owned by
this application, then deletes files while preserving the directory structure
(Spark's DiskBlockManager pre-creates a hash-based subdirectory pool and
expects those subdirectories to exist).
"""
jsc = spark_session.sparkContext._jsc
JArray = spark_session.sparkContext._gateway.jvm.java.lang.reflect.Array
local_dirs = jsc.sc().env().blockManager().diskBlockManager().localDirs()
total_freed = 0
for i in range(JArray.getLength(local_dirs)):
dir_path = JArray.get(local_dirs, i).getAbsolutePath()
if not os.path.isdir(dir_path):
continue
for dp, _, fns in os.walk(dir_path):
for f in fns:
fp = os.path.join(dp, f)
total_freed += os.path.getsize(fp)
os.remove(fp)
Comment on lines +352 to +355
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P2 No error handling on file size + delete

If Spark's own deferred cleanup removes a file between the os.path.getsize(fp) and os.remove(fp) calls, an unhandled FileNotFoundError will propagate and abort the entire benchmark run. Wrapping the pair in try/except OSError keeps the run alive if a file disappears mid-cleanup.

Suggested change
for f in fns:
fp = os.path.join(dp, f)
total_freed += os.path.getsize(fp)
os.remove(fp)
try:
total_freed += os.path.getsize(fp)
os.remove(fp)
except OSError:
pass

if total_freed > 0:
print(f"Cleaned block manager local dirs, freed {total_freed / (1024**3):.2f} GB")
Comment on lines +335 to +357
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P2 Driver-only cleanup silently no-ops in cluster mode

os.walk / os.remove execute on the driver process. In cluster deployments (YARN, Kubernetes, Spark Standalone), shuffle files live on executor nodes' local filesystems — diskBlockManager().localDirs() on the driver returns directories that hold no shuffle data in those topologies, so cleanup silently does nothing. Adding a note to the docstring or --help text that this flag is only effective in local/single-node mode would prevent confusion.



def get_query_subset(query_dict, subset):
"""Get a subset of queries from query_dict.
The subset is specified by a list of query names.
Expand Down Expand Up @@ -375,7 +401,8 @@ def run_query_stream(input_prefix,
profiling_hook=None,
save_plan_path=None,
skip_execution=False,
app_name=None):
app_name=None,
clean_local_dir=False):
"""run SQL in Spark and record execution time log. The execution time log is saved as a CSV file
for easy accesibility. TempView Creation time is also recorded.

Expand Down Expand Up @@ -487,6 +514,8 @@ def run_query_stream(input_prefix,
else:
summary_prefix = os.path.join(json_summary_folder, '')
q_report.write_summary(prefix=summary_prefix)
if clean_local_dir:
clean_block_manager_dirs(spark_session)
clearQueryName(spark_session)
power_end = int(time.time())
power_elapse = int((power_end - power_start)*1000)
Expand Down Expand Up @@ -644,6 +673,10 @@ def load_properties(filename):
help='The name of the application. If not specified, the default name will be "NDS - Power Run", '
'or "NDS - <query_name>" when running a single query.',
default=None)
parser.add_argument('--clean_local_dir',
action='store_true',
help='Clean Spark block manager local directories after each query to free disk space. '
'Useful for large scale factors where shuffle data can fill up the disk.')
query_filter_group.add_argument('--sub_queries',
type=lambda s: [x.strip() for x in s.split(',')],
help='comma separated list of queries to run. If this is specified, sub_query_patterns should be empty. ' +
Expand Down Expand Up @@ -681,4 +714,5 @@ def load_properties(filename):
args.profiling_hook,
args.save_plan_path,
args.skip_execution,
args.app_name)
args.app_name,
args.clean_local_dir)