Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
135 changes: 126 additions & 9 deletions nds/nds_gen_data.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
#
# SPDX-FileCopyrightText: Copyright (c) 2022 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
# SPDX-FileCopyrightText: Copyright (c) 2022-2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
# SPDX-License-Identifier: Apache-2.0
#
# Licensed under the Apache License, Version 2.0 (the "License");
Expand Down Expand Up @@ -127,6 +127,122 @@ def move_delete_date_tables(base_path, update):
subprocess.run(mkdir, check=True)
subprocess.run(move, check=True)


def _check_existing_update_data(data_dir):
"""Check if data_dir already contains maintenance/update table data.

Regular maintenance table filenames encode only child index and parallel
count (e.g. s_catalog_order_1_4.dat), not the update number, so files
from different update runs would collide. Any existing maintenance data
therefore requires --overwrite_output to proceed.

Returns:
list: maintenance table names that have existing data in data_dir.
"""
existing = []
for table_name in maintenance_table_names:
table_dir = os.path.join(data_dir, table_name)
if os.path.isdir(table_dir) and os.listdir(table_dir):
existing.append(table_name)
return existing
Comment on lines +131 to +147
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pre-flight check blocks valid multi-update-set accumulation

_check_existing_update_data flags any non-empty maintenance table directory as a blocker, regardless of which update number was already generated. This prevents a common workflow where multiple update sets (e.g., update 1, then update 2) are accumulated into the same output directory, even though their output files never collide (delete_1.dat vs delete_2.dat, s_catalog_order_1_4.dat vs s_catalog_order_1_4.dat — wait, regular table filenames do collide across update numbers since the name encodes child and parallel, not the update number).

Looking at the regular table filename pattern on line 172: f'{table}_{i}_{parallel}.dat' — this does collide between update runs (child 1 of update 1 and child 1 of update 2 both produce s_catalog_order_1_4.dat). So the current pre-flight check is actually correct in blocking the second run without --overwrite_output. However, it's worth adding a comment explaining this reasoning so it isn't seen as overly conservative:

def _check_existing_update_data(data_dir):
    """Check if data_dir already contains maintenance/update table data.

    Note: regular maintenance table filenames encode only child index and parallel
    count (e.g. s_catalog_order_1_4.dat), not the update number, so files from
    different update runs would collide. Any existing maintenance data therefore
    requires --overwrite_output to proceed.

    Returns:
        list: maintenance table names that have existing data in data_dir.
    """

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed: docstring now explains that regular maintenance table filenames don't encode the update number, so files from different update runs collide.



def _merge_update_data_local(temp_base, data_dir, range_start, range_end,
parallel, update, overwrite_output=False):
"""Merge update data from per-child temp directories into data_dir.

Each parallel child generates to its own temp directory to avoid file collisions.
Delete/inventory_delete tables produce identical content across all children,
so only the first child's copy is kept — consistent with the HDFS approach
in move_delete_date_tables().
"""
delete_tables = {'delete', 'inventory_delete'}
for table in maintenance_table_names:
target_dir = os.path.join(data_dir, table)
os.makedirs(target_dir, exist_ok=True)

if table in delete_tables:
target_file = os.path.join(target_dir, f'{table}_{update}.dat')
# All children produce identical delete content for a given update number.
# Preserve any existing copy unless the caller explicitly requested overwrite.
if os.path.exists(target_file) and not overwrite_output:
print("Skipping {} (already present from a previous run)".format(target_file))
continue
src = os.path.join(temp_base, f'child_{range_start}',
f'{table}_{update}.dat')
if os.path.exists(src):
shutil.move(src, target_file)
Comment on lines +164 to +174
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Delete-table overwrite skips even when destination exists with wrong update number

The guard if os.path.exists(target_file) and not overwrite_output: continue uses target_file = f'{table}_{update}.dat', which encodes the update number. This means if a user previously generated delete_1.dat and is now running --update 2, target_file is delete_2.dat — it doesn't exist yet — so no skip occurs, and the merge proceeds correctly.

However the logic becomes subtly wrong if overwrite_output=True is passed after a previous identical update run: the condition not overwrite_output is False, so the skip is bypassed and shutil.move replaces the file with identical content. This is harmless functionally, but the PR description states "delete/delete_1.dat kept from first range, not overwritten" for Test 4, which contradicts the actual code path when --overwrite_output is set. Consider adding a comment clarifying the intended semantics:

        if table in delete_tables:
            target_file = os.path.join(target_dir, f'{table}_{update}.dat')
            # All children produce identical delete content for a given update number.
            # Preserve any existing copy unless the caller explicitly requested overwrite.
            if os.path.exists(target_file) and not overwrite_output:

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed: added a comment clarifying the intended semantics — preserve existing copy unless caller explicitly requested overwrite.

else:
print("Warning: expected delete source not found, skipping: {}".format(src))
else:
for i in range(range_start, range_end + 1):
filename = f'{table}_{i}_{parallel}.dat'
src = os.path.join(temp_base, f'child_{i}', filename)
if os.path.exists(src):
shutil.move(src, os.path.join(target_dir, filename))
else:
print("Warning: expected source not found, skipping: {}".format(src))


def _generate_update_data_local(args, data_dir, range_start, range_end, tool_path):
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Unit tests not committed to the repository

The PR description lists 14 unit tests covering _check_existing_update_data, _merge_update_data_local, and _generate_update_data_local, but no test file appears in the diff or anywhere in the repository (a search for test*gen_data* returns no results). Without the test file being part of the repository, CI cannot automatically run or enforce them, and they are not available to future contributors.

Consider committing the test module (e.g. nds/test_nds_gen_data.py) as part of this PR to ensure regression coverage is preserved.

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Acknowledged. The tests were used for local validation with a mock dsdgen. This project doesn't have an existing test framework or CI for Python unit tests yet, so committing them would add files without integration. Happy to add if maintainers want them included.

"""Generate update/maintenance data to local filesystem.

Uses per-child temp directories to avoid dsdgen file collisions (particularly
for delete tables where all children produce an identically-named file).
This eliminates the need for dsdgen's -force flag and prevents silent overwrites,
consistent with the HDFS approach in generate_data_hdfs().

Raises:
Exception: if update data already exists and --overwrite_output is not set
Exception: if dsdgen fails
"""
if not os.path.isdir(data_dir):
os.makedirs(data_dir)

existing = _check_existing_update_data(data_dir)
if existing and not args.overwrite_output:
raise Exception(
"Update data already exists in directory {} for tables: {}. "
"Use '--overwrite_output' to overwrite.".format(data_dir, existing))

temp_base = os.path.join(data_dir, '_temp_update')
if os.path.exists(temp_base):
print("Warning: removing stale temp directory from a previous run: {}".format(temp_base))
shutil.rmtree(temp_base)
Comment on lines +208 to +211
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No diagnostic when stale temp directory is silently removed

If a previous run crashed after creating _temp_update/, the leftover directory is quietly deleted here. A user debugging a data consistency problem would have no indication that orphaned temp data was discarded. Adding a log line makes this behaviour explicit without changing any logic:

Suggested change
temp_base = os.path.join(data_dir, '_temp_update')
if os.path.exists(temp_base):
shutil.rmtree(temp_base)
temp_base = os.path.join(data_dir, '_temp_update')
if os.path.exists(temp_base):
print(f"Warning: removing stale temp directory from a previous run: {temp_base}")
shutil.rmtree(temp_base)

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed: a warning is now printed before removing a stale _temp_update directory.


work_dir = tool_path.parent
procs = []
try:
for i in range(range_start, range_end + 1):
child_dir = os.path.join(temp_base, f'child_{i}')
os.makedirs(child_dir)
dsdgen_args = ["-scale", args.scale,
"-dir", child_dir,
"-parallel", args.parallel,
"-child", str(i),
"-verbose", "Y",
"-update", args.update]
procs.append(subprocess.Popen(
["./dsdgen"] + dsdgen_args, cwd=str(work_dir)))
for p in procs:
p.wait()
if p.returncode != 0:
print("dsdgen failed with return code {}".format(p.returncode))
raise Exception("dsdgen failed")
Comment on lines +227 to +231
Copy link

Copilot AI Mar 3, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When one dsdgen process returns a non-zero exit code, the code raises an exception inside the for p in procs loop without waiting for or terminating the remaining still-running child processes. The finally block then immediately calls shutil.rmtree(temp_base), which may fail or produce unpredictable results if some dsdgen processes are still actively writing to that directory.

To fix this, before raising the exception (or as part of the finally block), iterate over the remaining processes and call p.terminate() (or p.kill()) followed by p.wait() to ensure all child processes have stopped before deleting the temp directory.

Copilot uses AI. Check for mistakes.
Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed: the finally block now terminates all still-running child processes via p.terminate() and reaps all children via p.wait() before deleting the temp directory.

Comment on lines +227 to +231
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Early exception leaves later processes un-waited before finally

The p.wait() loop checks processes in order. If process at index k exits with a non-zero code, the exception is raised before p.wait() is called for processes k+1, k+2, .... The finally block correctly terminates those remaining processes, but there is a window where a process that finishes with a non-zero exit code between the exception raise and the finally block's p.poll() check could be missed by p.terminate() (because p.poll() returns its exit code, not None). Those processes are effectively orphaned without p.wait() being called on them, leaving zombie processes until the Python interpreter exits.

Consider waiting on all started processes in the finally block regardless of whether they needed termination:

    finally:
        for p in procs:
            if p.poll() is None:
                p.terminate()
            p.wait()  # reap all children to avoid zombies
        if os.path.exists(temp_base):
            shutil.rmtree(temp_base)

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed: p.wait() is now called unconditionally for all processes in the finally block, not just terminated ones.

# Note: if merge partially succeeds before raising (e.g. disk-full),
# data_dir may be incomplete; re-run with --overwrite_output to recover.
_merge_update_data_local(temp_base, data_dir, range_start, range_end,
args.parallel, args.update, args.overwrite_output)
Comment on lines +234 to +235
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Partial merge leaves data_dir in inconsistent state on failure

_merge_update_data_local is called inside the try block, and the finally block cleans up temp_base. However, if the merge itself partially succeeds (some shutil.move calls complete) and then throws (e.g., disk-full, permission error), the already-moved files remain in data_dir while temp_base is deleted — leaving data_dir in a partially-merged, unrecoverable state without re-running with --overwrite_output.

Since there is no rollback for the partial merge, consider adding a warning comment before the merge call to make this behaviour explicit, or move the merge outside the try block and handle its failures separately:

        _merge_update_data_local(temp_base, data_dir, range_start, range_end,
                                 args.parallel, args.update, args.overwrite_output)
    # Note: if merge partially succeeded before raising, data_dir may be incomplete;
    # re-run with --overwrite_output to recover.
    finally:

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed: added a comment before the merge call noting that partial failure leaves data_dir incomplete and requires --overwrite_output to recover.

finally:
for p in procs:
if p.poll() is None:
p.terminate()
p.wait()
if os.path.exists(temp_base):
shutil.rmtree(temp_base)
Comment on lines +227 to +242
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Orphaned child processes on dsdgen failure

When one dsdgen process fails and the exception is raised at line 218, the remaining processes in procs that haven't been waited on yet are still running. The finally block then calls shutil.rmtree(temp_base), deleting the per-child directories underneath those still-running processes. On Linux, processes with already-open file handles can continue writing to deleted inodes, but any subsequent file creations will fail silently, producing incomplete data and wasting CPU/memory until the orphaned processes eventually terminate on their own.

The fix is to terminate remaining processes inside the finally block before removing the temp tree:

    finally:
        for p in procs:
            if p.poll() is None:
                p.terminate()
        if os.path.exists(temp_base):
            shutil.rmtree(temp_base)

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed: finally block terminates and waits on all child processes before cleanup.

subprocess.run(['du', '-h', '-d1', data_dir], check=False)


def generate_data_hdfs(args, jar_path):
"""generate data to hdfs using TPC-DS dsdgen tool. Support incremental generation: due to the
limit of hdfs, each range data will be generated under a temporary folder then move to target
Expand Down Expand Up @@ -196,6 +312,14 @@ def generate_data_local(args, range_start, range_end, tool_path):
Exception: dsdgen failed
"""
data_dir = get_abs_path(args.data_dir)

if args.update:
if os.path.isdir(data_dir) and get_dir_size(data_dir) > 0:
print("Warning: data_dir '{}' is non-empty; update data will be written "
"alongside existing content.".format(data_dir))
_generate_update_data_local(args, data_dir, range_start, range_end, tool_path)
return
Comment on lines +316 to +321
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Update path skips overwrite_output check for pre-existing base data

When args.update is truthy the function immediately delegates to _generate_update_data_local and returns, bypassing the existing get_dir_size guard that protects base data. This means that if data_dir is already populated with base (source) data, a user accidentally running --update against it will silently proceed and start writing maintenance table directories alongside the base data without any warning.

_check_existing_update_data only looks at maintenance_table_names, so it provides no protection here. Consider adding a check that warns (or fails) when source table directories already exist in data_dir before generating update data:

    if args.update:
        if not args.overwrite_output and get_dir_size(data_dir) > 0:
            # Only warn; don't block — source and update data can coexist,
            # but the user should know they're writing into an existing directory.
            print("Warning: data_dir '{}' is non-empty; update data will be written alongside "
                  "existing content.".format(data_dir))
        _generate_update_data_local(args, data_dir, range_start, range_end, tool_path)
        return

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed: a warning is now printed when --update targets a non-empty data_dir.


if not os.path.isdir(data_dir):
os.makedirs(data_dir)
else:
Expand All @@ -216,8 +340,6 @@ def generate_data_local(args, range_start, range_end, tool_path):
"-verbose", "Y"]
if args.overwrite_output:
dsdgen_args += ["-force", "Y"]
if args.update:
dsdgen_args += ["-update", args.update]
procs.append(subprocess.Popen(
["./dsdgen"] + dsdgen_args, cwd=str(work_dir)))
# wait for data generation to complete
Expand All @@ -227,17 +349,12 @@ def generate_data_local(args, range_start, range_end, tool_path):
print("dsdgen failed with return code {}".format(p.returncode))
raise Exception("dsdgen failed")
# move multi-partition files into table folders
if args.update:
table_names = maintenance_table_names
else:
table_names = source_table_names
for table in table_names:
for table in source_table_names:
print('mkdir -p {}/{}'.format(data_dir, table))
subprocess.run(['mkdir', '-p', data_dir + '/' + table])
for i in range(range_start, range_end + 1):
subprocess.run(['mv', f'{data_dir}/{table}_{i}_{args.parallel}.dat',
f'{data_dir}/{table}/'], stderr=subprocess.DEVNULL)
# delete date file has no parallel number suffix in the file name, move separately
subprocess.run(['mv', f'{data_dir}/{table}_1.dat',
f'{data_dir}/{table}/'], stderr=subprocess.DEVNULL)
# show summary
Expand Down
Loading