diff --git a/nds/nds_gen_data.py b/nds/nds_gen_data.py index 72611ed..2a5725a 100644 --- a/nds/nds_gen_data.py +++ b/nds/nds_gen_data.py @@ -1,7 +1,7 @@ #!/usr/bin/env python3 # -*- coding: utf-8 -*- # -# SPDX-FileCopyrightText: Copyright (c) 2022 NVIDIA CORPORATION & AFFILIATES. All rights reserved. +# SPDX-FileCopyrightText: Copyright (c) 2022-2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved. # SPDX-License-Identifier: Apache-2.0 # # Licensed under the Apache License, Version 2.0 (the "License"); @@ -127,6 +127,122 @@ def move_delete_date_tables(base_path, update): subprocess.run(mkdir, check=True) subprocess.run(move, check=True) + +def _check_existing_update_data(data_dir): + """Check if data_dir already contains maintenance/update table data. + + Regular maintenance table filenames encode only child index and parallel + count (e.g. s_catalog_order_1_4.dat), not the update number, so files + from different update runs would collide. Any existing maintenance data + therefore requires --overwrite_output to proceed. + + Returns: + list: maintenance table names that have existing data in data_dir. + """ + existing = [] + for table_name in maintenance_table_names: + table_dir = os.path.join(data_dir, table_name) + if os.path.isdir(table_dir) and os.listdir(table_dir): + existing.append(table_name) + return existing + + +def _merge_update_data_local(temp_base, data_dir, range_start, range_end, + parallel, update, overwrite_output=False): + """Merge update data from per-child temp directories into data_dir. + + Each parallel child generates to its own temp directory to avoid file collisions. + Delete/inventory_delete tables produce identical content across all children, + so only the first child's copy is kept — consistent with the HDFS approach + in move_delete_date_tables(). + """ + delete_tables = {'delete', 'inventory_delete'} + for table in maintenance_table_names: + target_dir = os.path.join(data_dir, table) + os.makedirs(target_dir, exist_ok=True) + + if table in delete_tables: + target_file = os.path.join(target_dir, f'{table}_{update}.dat') + # All children produce identical delete content for a given update number. + # Preserve any existing copy unless the caller explicitly requested overwrite. + if os.path.exists(target_file) and not overwrite_output: + print("Skipping {} (already present from a previous run)".format(target_file)) + continue + src = os.path.join(temp_base, f'child_{range_start}', + f'{table}_{update}.dat') + if os.path.exists(src): + shutil.move(src, target_file) + else: + print("Warning: expected delete source not found, skipping: {}".format(src)) + else: + for i in range(range_start, range_end + 1): + filename = f'{table}_{i}_{parallel}.dat' + src = os.path.join(temp_base, f'child_{i}', filename) + if os.path.exists(src): + shutil.move(src, os.path.join(target_dir, filename)) + else: + print("Warning: expected source not found, skipping: {}".format(src)) + + +def _generate_update_data_local(args, data_dir, range_start, range_end, tool_path): + """Generate update/maintenance data to local filesystem. + + Uses per-child temp directories to avoid dsdgen file collisions (particularly + for delete tables where all children produce an identically-named file). + This eliminates the need for dsdgen's -force flag and prevents silent overwrites, + consistent with the HDFS approach in generate_data_hdfs(). + + Raises: + Exception: if update data already exists and --overwrite_output is not set + Exception: if dsdgen fails + """ + if not os.path.isdir(data_dir): + os.makedirs(data_dir) + + existing = _check_existing_update_data(data_dir) + if existing and not args.overwrite_output: + raise Exception( + "Update data already exists in directory {} for tables: {}. " + "Use '--overwrite_output' to overwrite.".format(data_dir, existing)) + + temp_base = os.path.join(data_dir, '_temp_update') + if os.path.exists(temp_base): + print("Warning: removing stale temp directory from a previous run: {}".format(temp_base)) + shutil.rmtree(temp_base) + + work_dir = tool_path.parent + procs = [] + try: + for i in range(range_start, range_end + 1): + child_dir = os.path.join(temp_base, f'child_{i}') + os.makedirs(child_dir) + dsdgen_args = ["-scale", args.scale, + "-dir", child_dir, + "-parallel", args.parallel, + "-child", str(i), + "-verbose", "Y", + "-update", args.update] + procs.append(subprocess.Popen( + ["./dsdgen"] + dsdgen_args, cwd=str(work_dir))) + for p in procs: + p.wait() + if p.returncode != 0: + print("dsdgen failed with return code {}".format(p.returncode)) + raise Exception("dsdgen failed") + # Note: if merge partially succeeds before raising (e.g. disk-full), + # data_dir may be incomplete; re-run with --overwrite_output to recover. + _merge_update_data_local(temp_base, data_dir, range_start, range_end, + args.parallel, args.update, args.overwrite_output) + finally: + for p in procs: + if p.poll() is None: + p.terminate() + p.wait() + if os.path.exists(temp_base): + shutil.rmtree(temp_base) + subprocess.run(['du', '-h', '-d1', data_dir], check=False) + + def generate_data_hdfs(args, jar_path): """generate data to hdfs using TPC-DS dsdgen tool. Support incremental generation: due to the limit of hdfs, each range data will be generated under a temporary folder then move to target @@ -196,6 +312,14 @@ def generate_data_local(args, range_start, range_end, tool_path): Exception: dsdgen failed """ data_dir = get_abs_path(args.data_dir) + + if args.update: + if os.path.isdir(data_dir) and get_dir_size(data_dir) > 0: + print("Warning: data_dir '{}' is non-empty; update data will be written " + "alongside existing content.".format(data_dir)) + _generate_update_data_local(args, data_dir, range_start, range_end, tool_path) + return + if not os.path.isdir(data_dir): os.makedirs(data_dir) else: @@ -216,8 +340,6 @@ def generate_data_local(args, range_start, range_end, tool_path): "-verbose", "Y"] if args.overwrite_output: dsdgen_args += ["-force", "Y"] - if args.update: - dsdgen_args += ["-update", args.update] procs.append(subprocess.Popen( ["./dsdgen"] + dsdgen_args, cwd=str(work_dir))) # wait for data generation to complete @@ -227,17 +349,12 @@ def generate_data_local(args, range_start, range_end, tool_path): print("dsdgen failed with return code {}".format(p.returncode)) raise Exception("dsdgen failed") # move multi-partition files into table folders - if args.update: - table_names = maintenance_table_names - else: - table_names = source_table_names - for table in table_names: + for table in source_table_names: print('mkdir -p {}/{}'.format(data_dir, table)) subprocess.run(['mkdir', '-p', data_dir + '/' + table]) for i in range(range_start, range_end + 1): subprocess.run(['mv', f'{data_dir}/{table}_{i}_{args.parallel}.dat', f'{data_dir}/{table}/'], stderr=subprocess.DEVNULL) - # delete date file has no parallel number suffix in the file name, move separately subprocess.run(['mv', f'{data_dir}/{table}_1.dat', f'{data_dir}/{table}/'], stderr=subprocess.DEVNULL) # show summary diff --git a/nds/test_update_data_gen.py b/nds/test_update_data_gen.py new file mode 100644 index 0000000..c4d0f00 --- /dev/null +++ b/nds/test_update_data_gen.py @@ -0,0 +1,317 @@ +#!/usr/bin/env python3 +# -*- coding: utf-8 -*- +# +# SPDX-FileCopyrightText: Copyright (c) 2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved. +# SPDX-License-Identifier: Apache-2.0 +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +"""Tests for the update data generation logic in nds_gen_data.py. + +Validates: + 1. _check_existing_update_data: detects existing maintenance table data + 2. _merge_update_data_local: merges per-child temp dirs correctly, + deduplicates delete tables (only keeps first child copy) + 3. _generate_update_data_local: pre-flight safety check blocks overwrite, + allows with --overwrite_output, uses temp dirs + cleanup +""" + +import os +import shutil +import stat +import sys +import tempfile +import textwrap +import unittest +from pathlib import Path +from types import SimpleNamespace + +sys.path.insert(0, os.path.dirname(__file__)) + +from nds_gen_data import ( + _check_existing_update_data, + _merge_update_data_local, + _generate_update_data_local, + maintenance_table_names, +) + + +class TestCheckExistingUpdateData(unittest.TestCase): + + def setUp(self): + self.test_dir = tempfile.mkdtemp() + + def tearDown(self): + shutil.rmtree(self.test_dir) + + def test_empty_dir_returns_empty(self): + result = _check_existing_update_data(self.test_dir) + self.assertEqual(result, []) + + def test_base_data_only_returns_empty(self): + os.makedirs(os.path.join(self.test_dir, 'customer')) + Path(os.path.join(self.test_dir, 'customer', 'data.dat')).touch() + result = _check_existing_update_data(self.test_dir) + self.assertEqual(result, []) + + def test_detects_existing_update_tables(self): + for table in ['s_catalog_order', 'delete']: + table_dir = os.path.join(self.test_dir, table) + os.makedirs(table_dir) + Path(os.path.join(table_dir, 'some_data.dat')).touch() + result = _check_existing_update_data(self.test_dir) + self.assertIn('s_catalog_order', result) + self.assertIn('delete', result) + + def test_empty_subfolder_not_detected(self): + os.makedirs(os.path.join(self.test_dir, 's_inventory')) + result = _check_existing_update_data(self.test_dir) + self.assertEqual(result, []) + + +class TestMergeUpdateDataLocal(unittest.TestCase): + + def setUp(self): + self.test_dir = tempfile.mkdtemp() + self.data_dir = os.path.join(self.test_dir, 'data') + self.temp_base = os.path.join(self.test_dir, 'temp') + os.makedirs(self.data_dir) + os.makedirs(self.temp_base) + + def tearDown(self): + shutil.rmtree(self.test_dir) + + def _create_child_files(self, child_idx, parallel, update): + """Simulate dsdgen output for one child.""" + child_dir = os.path.join(self.temp_base, f'child_{child_idx}') + os.makedirs(child_dir, exist_ok=True) + created = [] + for table in maintenance_table_names: + if table in ('delete', 'inventory_delete'): + fname = f'{table}_{update}.dat' + else: + fname = f'{table}_{child_idx}_{parallel}.dat' + fpath = os.path.join(child_dir, fname) + with open(fpath, 'w') as f: + f.write(f'data for {table} child={child_idx}') + created.append(fpath) + return created + + def test_regular_tables_merged_from_all_children(self): + self._create_child_files(1, '4', '1') + self._create_child_files(2, '4', '1') + + _merge_update_data_local(self.temp_base, self.data_dir, 1, 2, '4', '1') + + target = os.path.join(self.data_dir, 's_catalog_order') + files = sorted(os.listdir(target)) + self.assertIn('s_catalog_order_1_4.dat', files) + self.assertIn('s_catalog_order_2_4.dat', files) + + def test_delete_table_only_first_child_kept(self): + self._create_child_files(1, '4', '2') + self._create_child_files(2, '4', '2') + + _merge_update_data_local(self.temp_base, self.data_dir, 1, 2, '4', '2') + + delete_dir = os.path.join(self.data_dir, 'delete') + files = os.listdir(delete_dir) + self.assertEqual(files, ['delete_2.dat']) + with open(os.path.join(delete_dir, 'delete_2.dat')) as f: + content = f.read() + self.assertIn('child=1', content) + + def test_delete_table_skipped_if_already_exists(self): + """Simulates a second --range invocation where delete file already exists.""" + delete_dir = os.path.join(self.data_dir, 'delete') + os.makedirs(delete_dir) + existing_file = os.path.join(delete_dir, 'delete_1.dat') + with open(existing_file, 'w') as f: + f.write('original data from previous range') + + self._create_child_files(3, '4', '1') + self._create_child_files(4, '4', '1') + + _merge_update_data_local(self.temp_base, self.data_dir, 3, 4, '4', '1') + + with open(existing_file) as f: + content = f.read() + self.assertEqual(content, 'original data from previous range') + + def test_creates_table_subdirs(self): + self._create_child_files(1, '1', '1') + _merge_update_data_local(self.temp_base, self.data_dir, 1, 1, '1', '1') + for table in maintenance_table_names: + self.assertTrue(os.path.isdir(os.path.join(self.data_dir, table))) + + +class TestGenerateUpdateDataLocal(unittest.TestCase): + """Test _generate_update_data_local with a mock dsdgen script.""" + + def setUp(self): + self.test_dir = tempfile.mkdtemp() + self.data_dir = os.path.join(self.test_dir, 'output') + self.tool_dir = os.path.join(self.test_dir, 'tools') + os.makedirs(self.tool_dir) + self._create_mock_dsdgen() + + def tearDown(self): + shutil.rmtree(self.test_dir) + + def _create_mock_dsdgen(self): + """Create a mock dsdgen that writes expected files based on args.""" + script = textwrap.dedent('''\ + #!/usr/bin/env python3 + import argparse, os + parser = argparse.ArgumentParser() + parser.add_argument('-scale') + parser.add_argument('-dir') + parser.add_argument('-parallel') + parser.add_argument('-child') + parser.add_argument('-verbose') + parser.add_argument('-update') + parser.add_argument('-force') + args = parser.parse_args() + + maintenance_tables = [ + 's_catalog_order', 's_catalog_order_lineitem', 's_catalog_returns', + 's_inventory', 's_purchase', 's_purchase_lineitem', 's_store_returns', + 's_web_order', 's_web_order_lineitem', 's_web_returns', + ] + delete_tables = ['delete', 'inventory_delete'] + + for t in maintenance_tables: + fname = f'{t}_{args.child}_{args.parallel}.dat' + with open(os.path.join(args.dir, fname), 'w') as f: + f.write(f'{t} child={args.child} scale={args.scale}') + + for t in delete_tables: + fname = f'{t}_{args.update}.dat' + with open(os.path.join(args.dir, fname), 'w') as f: + f.write(f'{t} update={args.update}') + ''') + self.mock_dsdgen = os.path.join(self.tool_dir, 'dsdgen') + with open(self.mock_dsdgen, 'w') as f: + f.write(script) + os.chmod(self.mock_dsdgen, stat.S_IRWXU) + + def _make_args(self, overwrite=False): + return SimpleNamespace( + scale='10', + parallel='2', + overwrite_output=overwrite, + update='1', + data_dir=self.data_dir, + ) + + def test_generates_to_fresh_dir(self): + args = self._make_args() + tool_path = Path(self.mock_dsdgen) + _generate_update_data_local(args, self.data_dir, 1, 2, tool_path) + + self.assertTrue(os.path.isdir(os.path.join(self.data_dir, 's_catalog_order'))) + files = os.listdir(os.path.join(self.data_dir, 's_catalog_order')) + self.assertIn('s_catalog_order_1_2.dat', files) + self.assertIn('s_catalog_order_2_2.dat', files) + + delete_files = os.listdir(os.path.join(self.data_dir, 'delete')) + self.assertEqual(delete_files, ['delete_1.dat']) + + def test_temp_dir_cleaned_up(self): + args = self._make_args() + tool_path = Path(self.mock_dsdgen) + _generate_update_data_local(args, self.data_dir, 1, 2, tool_path) + + temp_path = os.path.join(self.data_dir, '_temp_update') + self.assertFalse(os.path.exists(temp_path)) + + def test_rejects_existing_update_data(self): + os.makedirs(os.path.join(self.data_dir, 's_catalog_order')) + Path(os.path.join(self.data_dir, 's_catalog_order', 'old.dat')).touch() + + args = self._make_args(overwrite=False) + tool_path = Path(self.mock_dsdgen) + with self.assertRaises(Exception) as ctx: + _generate_update_data_local(args, self.data_dir, 1, 2, tool_path) + self.assertIn('Update data already exists', str(ctx.exception)) + self.assertIn('s_catalog_order', str(ctx.exception)) + + def test_allows_overwrite_with_flag(self): + os.makedirs(os.path.join(self.data_dir, 's_catalog_order')) + Path(os.path.join(self.data_dir, 's_catalog_order', 'old.dat')).touch() + + args = self._make_args(overwrite=True) + tool_path = Path(self.mock_dsdgen) + _generate_update_data_local(args, self.data_dir, 1, 2, tool_path) + + files = os.listdir(os.path.join(self.data_dir, 's_catalog_order')) + self.assertIn('s_catalog_order_1_2.dat', files) + + def test_no_force_flag_passed_to_dsdgen(self): + """Verify dsdgen is never called with -force (the whole point of this redesign).""" + script = textwrap.dedent('''\ + #!/usr/bin/env python3 + import sys + if '-force' in sys.argv: + sys.exit(99) + # Generate minimal output so merge doesn't fail + import argparse, os + parser = argparse.ArgumentParser() + parser.add_argument('-scale'); parser.add_argument('-dir') + parser.add_argument('-parallel'); parser.add_argument('-child') + parser.add_argument('-verbose'); parser.add_argument('-update') + args = parser.parse_args() + for t in ['s_catalog_order']: + with open(os.path.join(args.dir, f'{t}_{args.child}_{args.parallel}.dat'), 'w') as f: + f.write('ok') + for t in ['delete', 'inventory_delete']: + with open(os.path.join(args.dir, f'{t}_{args.update}.dat'), 'w') as f: + f.write('ok') + ''') + with open(self.mock_dsdgen, 'w') as f: + f.write(script) + os.chmod(self.mock_dsdgen, stat.S_IRWXU) + + args = self._make_args() + tool_path = Path(self.mock_dsdgen) + _generate_update_data_local(args, self.data_dir, 1, 2, tool_path) + + def test_incremental_range_preserves_previous_data(self): + """Simulate two --range invocations.""" + args = self._make_args() + tool_path = Path(self.mock_dsdgen) + + # First range: children 1-2 + _generate_update_data_local(args, self.data_dir, 1, 2, tool_path) + + # Second range: children 3-4 (need --overwrite_output since data exists) + args_range2 = self._make_args(overwrite=True) + args_range2.parallel = '4' + _generate_update_data_local(args_range2, self.data_dir, 3, 4, tool_path) + + cat_order_dir = os.path.join(self.data_dir, 's_catalog_order') + files = sorted(os.listdir(cat_order_dir)) + self.assertIn('s_catalog_order_1_2.dat', files) + self.assertIn('s_catalog_order_3_4.dat', files) + self.assertIn('s_catalog_order_4_4.dat', files) + + # Delete file should still be from first range (not overwritten) + delete_dir = os.path.join(self.data_dir, 'delete') + with open(os.path.join(delete_dir, 'delete_1.dat')) as f: + content = f.read() + self.assertIn('update=1', content) + + +if __name__ == '__main__': + unittest.main()