Add --clean_local_dir option to free disk space between queries#253
Add --clean_local_dir option to free disk space between queries#253jihoonson wants to merge 1 commit intoNVIDIA:devfrom
Conversation
Spark's block manager shuffle files can accumulate and fill the disk before a full power run completes. This adds a --clean_local_dir flag that deletes shuffle files from Spark's block manager local directories after each query, while preserving the subdirectory structure that DiskBlockManager expects. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com> Signed-off-by: Jihoon Son <ghoonson@gmail.com>
Greptile SummaryThis PR adds an opt-in
Confidence Score: 5/5Safe to merge — the flag is opt-in, the logic is correct for local mode, and all findings are non-blocking P2 suggestions. All three issues (unused import, missing try/except, cluster-mode caveat) are style or hardening P2 items that do not block merge. Both nds/nds_power.py and nds-h/nds_h_power.py carry the same unused shutil import; nds/nds_power.py lines 352-355 and nds-h/nds_h_power.py lines 234-237 lack error handling. Important Files Changed
Sequence DiagramsequenceDiagram
participant CLI as CLI (--clean_local_dir)
participant RQS as run_query_stream()
participant ROQ as run_one_query()
participant Spark as Spark Engine
participant CBD as clean_block_manager_dirs()
participant FS as Local Filesystem
CLI->>RQS: clean_local_dir=True
loop For each query
RQS->>ROQ: execute query
ROQ->>Spark: spark.sql(query)
Spark-->>ROQ: DataFrame result
ROQ-->>RQS: q_report
RQS->>CBD: clean_block_manager_dirs(spark_session)
CBD->>Spark: diskBlockManager().localDirs()
Spark-->>CBD: local_dirs[]
CBD->>FS: os.remove(shuffle files)
FS-->>CBD: freed bytes
CBD-->>RQS: print freed GB
end
Reviews (1): Last reviewed commit: "Add --clean_local_dir option to free dis..." | Re-trigger Greptile |
| import csv | ||
| import os | ||
| import re | ||
| import shutil |
|
|
||
| import argparse | ||
| import csv | ||
| import shutil |
| for f in fns: | ||
| fp = os.path.join(dp, f) | ||
| total_freed += os.path.getsize(fp) | ||
| os.remove(fp) |
There was a problem hiding this comment.
No error handling on file size + delete
If Spark's own deferred cleanup removes a file between the os.path.getsize(fp) and os.remove(fp) calls, an unhandled FileNotFoundError will propagate and abort the entire benchmark run. Wrapping the pair in try/except OSError keeps the run alive if a file disappears mid-cleanup.
| for f in fns: | |
| fp = os.path.join(dp, f) | |
| total_freed += os.path.getsize(fp) | |
| os.remove(fp) | |
| try: | |
| total_freed += os.path.getsize(fp) | |
| os.remove(fp) | |
| except OSError: | |
| pass |
| for f in fns: | ||
| fp = os.path.join(dp, f) | ||
| total_freed += os.path.getsize(fp) | ||
| os.remove(fp) |
There was a problem hiding this comment.
No error handling on file size + delete
Same race as nds/nds_power.py: if Spark concurrently removes a file between os.path.getsize and os.remove, the unhandled FileNotFoundError aborts the benchmark. A try/except OSError around the pair makes cleanup resilient.
| for f in fns: | |
| fp = os.path.join(dp, f) | |
| total_freed += os.path.getsize(fp) | |
| os.remove(fp) | |
| try: | |
| total_freed += os.path.getsize(fp) | |
| os.remove(fp) | |
| except OSError: | |
| pass |
| def clean_block_manager_dirs(spark_session): | ||
| """Remove shuffle/block data from Spark's block manager local directories. | ||
|
|
||
| Uses Spark's internal DiskBlockManager to get the exact directories owned by | ||
| this application, then deletes files while preserving the directory structure | ||
| (Spark's DiskBlockManager pre-creates a hash-based subdirectory pool and | ||
| expects those subdirectories to exist). | ||
| """ | ||
| jsc = spark_session.sparkContext._jsc | ||
| JArray = spark_session.sparkContext._gateway.jvm.java.lang.reflect.Array | ||
| local_dirs = jsc.sc().env().blockManager().diskBlockManager().localDirs() | ||
| total_freed = 0 | ||
| for i in range(JArray.getLength(local_dirs)): | ||
| dir_path = JArray.get(local_dirs, i).getAbsolutePath() | ||
| if not os.path.isdir(dir_path): | ||
| continue | ||
| for dp, _, fns in os.walk(dir_path): | ||
| for f in fns: | ||
| fp = os.path.join(dp, f) | ||
| total_freed += os.path.getsize(fp) | ||
| os.remove(fp) | ||
| if total_freed > 0: | ||
| print(f"Cleaned block manager local dirs, freed {total_freed / (1024**3):.2f} GB") |
There was a problem hiding this comment.
Driver-only cleanup silently no-ops in cluster mode
os.walk / os.remove execute on the driver process. In cluster deployments (YARN, Kubernetes, Spark Standalone), shuffle files live on executor nodes' local filesystems — diskBlockManager().localDirs() on the driver returns directories that hold no shuffle data in those topologies, so cleanup silently does nothing. Adding a note to the docstring or --help text that this flag is only effective in local/single-node mode would prevent confusion.
Spark's block manager shuffle files can accumulate and fill the disk before a full power run completes. This adds a --
clean_local_dirflag that deletes shuffle files from Spark's block manager local directories after each query, while preserving the subdirectory structure that DiskBlockManager expects. This change has been manually tested as below.