From bb496fc0447a15102d09af87dd3f02185b072182 Mon Sep 17 00:00:00 2001 From: Allen Xu Date: Fri, 13 Feb 2026 14:27:05 +0800 Subject: [PATCH 1/7] init support for k8s data gen Signed-off-by: Allen Xu --- nds/README.md | 59 +++++++ nds/nds_gen_data_spark.py | 317 ++++++++++++++++++++++++++++++++++++++ 2 files changed, 376 insertions(+) create mode 100644 nds/nds_gen_data_spark.py diff --git a/nds/README.md b/nds/README.md index 91f5e88..095b7a8 100644 --- a/nds/README.md +++ b/nds/README.md @@ -138,6 +138,65 @@ Example command: python nds_gen_data.py hdfs 100 100 /data/raw_sf100 --overwrite_output ``` +### Generate data with Spark (recommended for K8s / no MapReduce) + +`nds_gen_data_spark.py` is a PySpark application that replaces the Hadoop MapReduce +approach. It distributes the `dsdgen` binary across Spark executors via `--archives`, +runs data generation in parallel, and writes output to any Hadoop-compatible filesystem +(HDFS, S3, GCS, ABFS, or local). It works with any Spark cluster manager: **K8s, YARN, +Standalone, or local**. + +**Prerequisites:** build tpcds-gen as before (`cd tpcds-gen && make`). + +Using the spark-submit-template: + +```bash +./spark-submit-template datagen_submit.template \ + nds_gen_data_spark.py 100 100 hdfs:///data/raw_sf100 --overwrite +``` + +Or directly with spark-submit (K8s example): + +```bash +spark-submit --master k8s://https:// \ + --deploy-mode cluster \ + --conf spark.kubernetes.container.image= \ + --conf spark.executor.instances=10 \ + --archives tpcds-gen/target/lib/dsdgen.jar#dsdgen \ + nds_gen_data_spark.py 1000 200 hdfs:///data/raw_sf1000 --overwrite +``` + +For incremental generation (split across multiple spark-submit runs): + +```bash +# Run 1: children 1-100 +spark-submit --archives tpcds-gen/target/lib/dsdgen.jar#dsdgen \ + nds_gen_data_spark.py 1000 200 hdfs:///data/raw_sf1000 --range 1,100 + +# Run 2: children 101-200 +spark-submit --archives tpcds-gen/target/lib/dsdgen.jar#dsdgen \ + nds_gen_data_spark.py 1000 200 hdfs:///data/raw_sf1000 --range 101,200 +``` + +Arguments: + +```text +positional arguments: + scale Data scale factor in GB. + parallel Number of parallel dsdgen children (must be >= 2). + output_dir Output directory (hdfs://..., s3a://..., gs://..., or local path). + +optional arguments: + --range START,END Generate only this child range (inclusive). + --overwrite Overwrite existing output directory. + --update N Generate update/maintenance dataset N. + --num_executors N Hint for number of Spark partitions (default: one per child). +``` + +**Note:** The dsdgen binary in the archive must be compiled for the same OS/architecture +as the Spark executor nodes (typically Linux x86_64). If you build on macOS but run on +K8s (Linux), you need to cross-compile or build inside a Linux container. + ### Convert CSV to Parquet or Other data sources To do the data conversion, the `nds_transcode.py` need to be submitted as a Spark job. User can leverage diff --git a/nds/nds_gen_data_spark.py b/nds/nds_gen_data_spark.py new file mode 100644 index 0000000..430611b --- /dev/null +++ b/nds/nds_gen_data_spark.py @@ -0,0 +1,317 @@ +#!/usr/bin/env python3 +# -*- coding: utf-8 -*- +# +# SPDX-FileCopyrightText: Copyright (c) 2022-2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved. +# SPDX-License-Identifier: Apache-2.0 +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +# ----- +# +# Certain portions of the contents of this file are derived from TPC-DS version 3.2.0 +# (retrieved from www.tpc.org/tpc_documents_current_versions/current_specifications5.asp). +# Such portions are subject to copyrights held by Transaction Processing Performance Council ("TPC") +# and licensed under the TPC EULA (a copy of which accompanies this file as "TPC EULA" and is also +# available at http://www.tpc.org/tpc_documents_current_versions/current_specifications5.asp) (the "TPC EULA"). +# +# You may not use this file except in compliance with the TPC EULA. +# DISCLAIMER: Portions of this file is derived from the TPC-DS Benchmark and as such any results +# obtained using this file are not comparable to published TPC-DS Benchmark results, as the results +# obtained from using this file do not comply with the TPC-DS Benchmark. +# + +""" +Spark-based TPC-DS data generation — replaces the Hadoop MapReduce approach. + +Distributes dsdgen across Spark executors, writes output to any Hadoop-compatible +filesystem (HDFS, S3, GCS, ABFS, local). Works with any Spark cluster manager +(K8s, YARN, Standalone, local). + +Prerequisites: + 1. Build tpcds-gen (cd tpcds-gen && make) + 2. The build produces target/lib/dsdgen.jar which is a jar archive containing + the tools/ directory (dsdgen binary + *.dst files). + +Usage: + spark-submit [--master k8s://... | yarn | ...] \\ + --archives tpcds-gen/target/lib/dsdgen.jar#dsdgen \\ + nds_gen_data_spark.py \\ + [options] + + The --archives flag distributes and extracts the dsdgen toolset to every executor. + +Example (K8s): + spark-submit --master k8s://https:// \\ + --deploy-mode cluster \\ + --conf spark.kubernetes.container.image= \\ + --archives tpcds-gen/target/lib/dsdgen.jar#dsdgen \\ + nds_gen_data_spark.py 100 100 hdfs:///data/raw_sf100 --overwrite + +Example (YARN): + spark-submit --master yarn \\ + --archives tpcds-gen/target/lib/dsdgen.jar#dsdgen \\ + nds_gen_data_spark.py 1000 200 hdfs:///data/raw_sf1000 + +Example (local testing): + spark-submit --master 'local[4]' \\ + --archives tpcds-gen/target/lib/dsdgen.jar#dsdgen \\ + nds_gen_data_spark.py 1 2 /tmp/nds_test_data --overwrite +""" + +import argparse +import os +import subprocess +import sys +import tempfile + +from pyspark.sql import SparkSession + +# All source (qualification/power-run) table names in TPC-DS +SOURCE_TABLE_NAMES = [ + 'call_center', 'catalog_page', 'catalog_returns', 'catalog_sales', + 'customer', 'customer_address', 'customer_demographics', 'date_dim', + 'dbgen_version', 'household_demographics', 'income_band', 'inventory', + 'item', 'promotion', 'reason', 'ship_mode', 'store', 'store_returns', + 'store_sales', 'time_dim', 'warehouse', 'web_page', 'web_returns', + 'web_sales', 'web_site', +] + +# Maintenance (data-update) table names +MAINTENANCE_TABLE_NAMES = [ + 's_catalog_order', 's_catalog_order_lineitem', 's_catalog_returns', + 's_inventory', 's_purchase', 's_purchase_lineitem', 's_store_returns', + 's_web_order', 's_web_order_lineitem', 's_web_returns', + 'delete', 'inventory_delete', +] + + +def run_dsdgen_and_read(child_index, scale, parallel, update=None): + """Execute dsdgen for one child partition, yield (table_name, line) pairs. + + This function runs inside a Spark executor task. The dsdgen binary and its + auxiliary files (*.dst) are expected under SparkFiles root, extracted from + the archive passed via --archives dsdgen.jar#dsdgen. + + Each generated .dat file is read line-by-line and yielded as (table_name, line), + which Spark then writes to the target filesystem partitioned by table_name. + Memory usage is bounded: only one line is held in memory at a time. + """ + from pyspark import SparkFiles + + # Locate dsdgen binary from the extracted archive + archive_root = SparkFiles.getRootDirectory() + tools_dir = os.path.join(archive_root, "dsdgen", "tools") + dsdgen_bin = os.path.join(tools_dir, "dsdgen") + + if not os.path.isfile(dsdgen_bin): + raise FileNotFoundError( + f"dsdgen binary not found at {dsdgen_bin}. " + "Make sure --archives tpcds-gen/target/lib/dsdgen.jar#dsdgen is set." + ) + + # Ensure the binary is executable (archive extraction may lose permissions) + if not os.access(dsdgen_bin, os.X_OK): + os.chmod(dsdgen_bin, 0o755) + + # Temp directory for dsdgen output; each task gets its own + work_dir = tempfile.mkdtemp(prefix=f"dsdgen_c{child_index}_") + + try: + cmd = [ + dsdgen_bin, + "-dir", work_dir, + "-force", "Y", + "-scale", str(scale), + "-parallel", str(parallel), + "-child", str(child_index), + ] + if update is not None: + cmd += ["-update", str(update)] + + proc = subprocess.run(cmd, cwd=tools_dir, capture_output=True, text=True) + if proc.returncode != 0: + raise RuntimeError( + f"dsdgen failed for child {child_index} (exit {proc.returncode}): " + f"{proc.stderr}" + ) + + # Read .dat files and yield (table_name, line) pairs + regular_suffix = f"_{child_index}_{parallel}.dat" + + for fname in sorted(os.listdir(work_dir)): + filepath = os.path.join(work_dir, fname) + if not os.path.isfile(filepath): + continue + + table_name = None + + if fname.endswith(regular_suffix): + # Regular table: call_center_1_100.dat → table "call_center" + table_name = fname[: -len(regular_suffix)] + elif fname.endswith(".dat"): + # Delete / inventory_delete tables have different naming: + # delete_.dat, inventory_delete_.dat + for special in ("inventory_delete", "delete"): + if fname.startswith(special): + table_name = special + break + + if table_name is None: + continue + + with open(filepath, "r") as f: + for line in f: + stripped = line.rstrip("\n\r") + if stripped: + yield (table_name, stripped) + + os.remove(filepath) + finally: + # Best-effort cleanup of temp directory + try: + os.rmdir(work_dir) + except OSError: + pass + + +def rename_partition_dirs(spark, output_dir, table_names): + """Rename Hive-style 'table_name=xxx' directories to plain 'xxx'. + + Spark's partitionBy writes to directories like output_dir/table_name=call_center/. + The rest of the NDS pipeline expects output_dir/call_center/. This function + performs fast HDFS-level renames (metadata only, no data copy) via the Hadoop + FileSystem Java API on the driver. + """ + jvm = spark._jvm + hadoop_conf = spark._jsc.hadoopConfiguration() + Path = jvm.org.apache.hadoop.fs.Path + + base_path = Path(output_dir) + fs = jvm.org.apache.hadoop.fs.FileSystem.get(base_path.toUri(), hadoop_conf) + + for table in table_names: + hive_dir = Path(output_dir, f"table_name={table}") + target_dir = Path(output_dir, table) + + if not fs.exists(hive_dir): + continue + + if fs.exists(target_dir): + # Merge into existing directory (e.g., incremental range generation) + statuses = fs.listStatus(hive_dir) + for status in statuses: + src_path = status.getPath() + # Skip _SUCCESS / hidden files + if src_path.getName().startswith("_"): + continue + dst_path = Path(target_dir, src_path.getName()) + fs.rename(src_path, dst_path) + fs.delete(hive_dir, True) + else: + fs.rename(hive_dir, target_dir) + + # Clean up Hive-style _SUCCESS at root if present + success_file = Path(output_dir, "_SUCCESS") + if fs.exists(success_file): + fs.delete(success_file, False) + + +def main(): + parser = argparse.ArgumentParser( + description="Spark-based TPC-DS data generation (replaces MapReduce)." + ) + parser.add_argument("scale", type=int, + help="Data scale factor in GB.") + parser.add_argument("parallel", type=int, + help="Number of parallel dsdgen children (must be >= 2).") + parser.add_argument("output_dir", + help="Output directory (hdfs://..., s3a://..., gs://..., or local path).") + parser.add_argument("--range", + help='Child range "start,end" (inclusive). ' + 'Default: generate all children 1..parallel. ' + 'Useful for splitting work across multiple spark-submit invocations.') + parser.add_argument("--overwrite", action="store_true", + help="Overwrite existing output directory.") + parser.add_argument("--update", type=int, default=None, + help="Generate update/maintenance dataset .") + parser.add_argument("--num_executors", type=int, default=None, + help="Hint for number of Spark partitions. " + "Default: one partition per child (optimal parallelism).") + args = parser.parse_args() + + if args.parallel < 2: + print("ERROR: parallel must be >= 2", file=sys.stderr) + sys.exit(1) + + range_start = 1 + range_end = args.parallel + if args.range: + parts = args.range.split(",") + if len(parts) != 2: + print("ERROR: --range must be 'start,end'", file=sys.stderr) + sys.exit(1) + range_start, range_end = int(parts[0]), int(parts[1]) + if range_start < 1 or range_end > args.parallel or range_start > range_end: + print("ERROR: range must satisfy 1 <= start <= end <= parallel", file=sys.stderr) + sys.exit(1) + + children = list(range(range_start, range_end + 1)) + num_children = len(children) + num_partitions = args.num_executors if args.num_executors else num_children + table_names = MAINTENANCE_TABLE_NAMES if args.update else SOURCE_TABLE_NAMES + + # Capture args for closure (avoid serializing argparse Namespace) + scale = args.scale + parallel = args.parallel + update = args.update + + spark = SparkSession.builder \ + .appName(f"NDS_DataGen_sf{scale}_p{parallel}") \ + .getOrCreate() + sc = spark.sparkContext + + print(f"=== NDS Spark Data Generation ===") + print(f" Scale: {scale} GB") + print(f" Parallel: {parallel}") + print(f" Range: {range_start}..{range_end} ({num_children} children)") + print(f" Output: {args.output_dir}") + print(f" Overwrite: {args.overwrite}") + print(f" Update: {update}") + print(f" Partitions:{num_partitions}") + + # Create RDD: one element per child index. + # Each Spark task runs dsdgen for its child, reads output line by line, + # and yields (table_name, line) pairs — no data accumulates in memory. + children_rdd = sc.parallelize(children, numSlices=num_partitions) + + all_data_rdd = children_rdd.flatMap( + lambda child: run_dsdgen_and_read(child, scale, parallel, update) + ) + + # Convert to DataFrame: [table_name: string, value: string] + # partitionBy("table_name") writes separate directories per table WITHOUT shuffle — + # each task independently splits its output into per-table files. + df = spark.createDataFrame(all_data_rdd, ["table_name", "value"]) + + write_mode = "overwrite" if args.overwrite else "errorifexists" + df.write.partitionBy("table_name").mode(write_mode).text(args.output_dir) + + # Rename Hive-style "table_name=xxx" dirs to plain "xxx" for NDS pipeline compat + rename_partition_dirs(spark, args.output_dir, table_names) + + print(f"=== Data generation complete: {args.output_dir} ===") + spark.stop() + + +if __name__ == "__main__": + main() From ae55e43267cc2433c9c45ebe28a155bec0ad7e91 Mon Sep 17 00:00:00 2001 From: Allen Xu Date: Fri, 13 Feb 2026 16:41:37 +0800 Subject: [PATCH 2/7] add hdfs related readme Signed-off-by: Allen Xu --- nds/Dockerfile.spark-k8s | 40 +++++++++++ nds/README.md | 54 +++++++++++++- nds/nds_gen_data_spark.py | 49 ++++++++++--- scripts/k8s_datagen_smoketest.sh | 116 +++++++++++++++++++++++++++++++ 4 files changed, 245 insertions(+), 14 deletions(-) create mode 100644 nds/Dockerfile.spark-k8s create mode 100755 scripts/k8s_datagen_smoketest.sh diff --git a/nds/Dockerfile.spark-k8s b/nds/Dockerfile.spark-k8s new file mode 100644 index 0000000..1cd5be9 --- /dev/null +++ b/nds/Dockerfile.spark-k8s @@ -0,0 +1,40 @@ +# Custom Spark + PySpark image for K8s data generation testing. +# Based on Ubuntu 24.04 to match the host Python 3.12 version. +FROM ubuntu:24.04 + +ARG spark_uid=185 + +# Install Java 17 + Python 3.12 (default on 24.04) +RUN apt-get update && \ + apt-get install -y --no-install-recommends \ + openjdk-17-jre-headless \ + python3 \ + python3-pip \ + tini \ + procps \ + && rm -rf /var/cache/apt/* /var/lib/apt/lists/* + +# Set up Spark directory structure +ENV SPARK_HOME=/opt/spark +ENV PATH="${SPARK_HOME}/bin:${PATH}" +ENV PYTHONPATH="${SPARK_HOME}/python:${SPARK_HOME}/python/lib/py4j-0.10.9.7-src.zip:${PYTHONPATH}" + +# Copy Spark distribution +COPY jars ${SPARK_HOME}/jars +COPY bin ${SPARK_HOME}/bin +COPY sbin ${SPARK_HOME}/sbin +COPY python ${SPARK_HOME}/python +COPY kubernetes/dockerfiles/spark/entrypoint.sh /opt/ +COPY kubernetes/dockerfiles/spark/decom.sh /opt/ + +RUN chmod +x /opt/entrypoint.sh /opt/decom.sh && \ + mkdir -p ${SPARK_HOME}/work-dir && \ + chmod g+w ${SPARK_HOME}/work-dir && \ + # Create spark user + useradd -u ${spark_uid} -r -g root -d ${SPARK_HOME} -s /sbin/nologin spark + +WORKDIR ${SPARK_HOME}/work-dir + +ENTRYPOINT [ "/opt/entrypoint.sh" ] + +USER ${spark_uid} diff --git a/nds/README.md b/nds/README.md index 095b7a8..a640a6c 100644 --- a/nds/README.md +++ b/nds/README.md @@ -148,6 +148,23 @@ Standalone, or local**. **Prerequisites:** build tpcds-gen as before (`cd tpcds-gen && make`). +#### Required prerequisites for K8s + HDFS runs + +Before submitting `nds_gen_data_spark.py` to Spark on K8s, make sure all of the +following conditions are true: + +1. Driver and executor images include Hadoop client configuration (`core-site.xml` + and `hdfs-site.xml`) so both sides can access the same HDFS cluster. +2. `output_dir` is an explicit HDFS path (for example, `hdfs:///data/raw_sf1000` + or `hdfs://:8020/data/raw_sf1000`) instead of a local path. +3. The `dsdgen` archive is provided with `--archives` using a shell-quoted value + when needed (zsh users should quote `#`): + `--archives 'tpcds-gen/target/lib/dsdgen.tar.gz#dsdgen'`. +4. Python minor versions are consistent between driver and executor environments + (for example, both are Python 3.12). +5. For incremental generation (`--range`), do not rerun overlapping child ranges, + otherwise duplicate data will be produced. + Using the spark-submit-template: ```bash @@ -162,7 +179,7 @@ spark-submit --master k8s://https:// \ --deploy-mode cluster \ --conf spark.kubernetes.container.image= \ --conf spark.executor.instances=10 \ - --archives tpcds-gen/target/lib/dsdgen.jar#dsdgen \ + --archives tpcds-gen/target/lib/dsdgen.tar.gz#dsdgen \ nds_gen_data_spark.py 1000 200 hdfs:///data/raw_sf1000 --overwrite ``` @@ -170,11 +187,11 @@ For incremental generation (split across multiple spark-submit runs): ```bash # Run 1: children 1-100 -spark-submit --archives tpcds-gen/target/lib/dsdgen.jar#dsdgen \ +spark-submit --archives tpcds-gen/target/lib/dsdgen.tar.gz#dsdgen \ nds_gen_data_spark.py 1000 200 hdfs:///data/raw_sf1000 --range 1,100 # Run 2: children 101-200 -spark-submit --archives tpcds-gen/target/lib/dsdgen.jar#dsdgen \ +spark-submit --archives tpcds-gen/target/lib/dsdgen.tar.gz#dsdgen \ nds_gen_data_spark.py 1000 200 hdfs:///data/raw_sf1000 --range 101,200 ``` @@ -188,6 +205,7 @@ positional arguments: optional arguments: --range START,END Generate only this child range (inclusive). + Uses append mode automatically for incremental runs. --overwrite Overwrite existing output directory. --update N Generate update/maintenance dataset N. --num_executors N Hint for number of Spark partitions (default: one per child). @@ -197,6 +215,36 @@ optional arguments: as the Spark executor nodes (typically Linux x86_64). If you build on macOS but run on K8s (Linux), you need to cross-compile or build inside a Linux container. +**Note for zsh users:** quote the `--archives` value because `#` may be interpreted by shell. +For example: `--archives 'tpcds-gen/target/lib/dsdgen.tar.gz#dsdgen'`. + +#### K8s local filesystem caveat + +When `output_dir` is a local path (for example `/tmp/...`) and Spark executors run in K8s pods, +each pod writes to its own container filesystem by default. You will not see merged results on +the driver host unless a shared volume is configured. + +For production runs, prefer remote/shared storage such as HDFS, S3, GCS, or ABFS. +For local smoke tests on minikube, mount a shared host path and mount it into executor pods. + +#### One-command K8s smoke test + +Use the helper script: + +```bash +chmod +x ../scripts/k8s_datagen_smoketest.sh +../scripts/k8s_datagen_smoketest.sh +``` + +What the script does: + +- starts minikube if needed +- configures Spark service account/role +- mounts `/tmp/nds_shared` into minikube with Spark-compatible UID/GID +- builds Spark Python image from `nds/Dockerfile.spark-k8s` +- submits `nds_gen_data_spark.py` on K8s +- verifies output (`25` source table folders and non-empty `store_sales`) + ### Convert CSV to Parquet or Other data sources To do the data conversion, the `nds_transcode.py` need to be submitted as a Spark job. User can leverage diff --git a/nds/nds_gen_data_spark.py b/nds/nds_gen_data_spark.py index 430611b..51ade1b 100644 --- a/nds/nds_gen_data_spark.py +++ b/nds/nds_gen_data_spark.py @@ -1,7 +1,7 @@ #!/usr/bin/env python3 # -*- coding: utf-8 -*- # -# SPDX-FileCopyrightText: Copyright (c) 2022-2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved. +# SPDX-FileCopyrightText: Copyright (c) 2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved. # SPDX-License-Identifier: Apache-2.0 # # Licensed under the Apache License, Version 2.0 (the "License"); @@ -70,6 +70,7 @@ import argparse import os +import shutil import subprocess import sys import tempfile @@ -108,15 +109,35 @@ def run_dsdgen_and_read(child_index, scale, parallel, update=None): """ from pyspark import SparkFiles - # Locate dsdgen binary from the extracted archive - archive_root = SparkFiles.getRootDirectory() + # Locate dsdgen binary from the extracted archive. + # The archive (dsdgen.tar.gz or dsdgen.jar) contains a tools/ directory. + # With --archives '#dsdgen', Spark extracts contents under a + # directory named 'dsdgen' in SparkFiles root. + # Note: SparkFiles.getRootDirectory() may return a relative path (e.g. ".") + # in K8s mode, so we must resolve to absolute paths to avoid issues when + # subprocess.run() changes cwd before resolving the executable path. + archive_root = os.path.abspath(SparkFiles.getRootDirectory()) tools_dir = os.path.join(archive_root, "dsdgen", "tools") dsdgen_bin = os.path.join(tools_dir, "dsdgen") if not os.path.isfile(dsdgen_bin): + # Provide detailed debug info for troubleshooting archive extraction + import glob + dsdgen_dir = os.path.join(archive_root, "dsdgen") + debug_info = ( + f"archive_root={archive_root}, " + f"dsdgen_dir exists={os.path.exists(dsdgen_dir)}, " + f"dsdgen_dir isdir={os.path.isdir(dsdgen_dir)}" + ) + if os.path.isdir(dsdgen_dir): + contents = glob.glob(os.path.join(dsdgen_dir, "**"), recursive=True)[:30] + debug_info += f", contents={contents}" + elif os.path.isdir(archive_root): + contents = glob.glob(os.path.join(archive_root, "**"), recursive=True)[:30] + debug_info += f", root_contents={contents}" raise FileNotFoundError( - f"dsdgen binary not found at {dsdgen_bin}. " - "Make sure --archives tpcds-gen/target/lib/dsdgen.jar#dsdgen is set." + f"dsdgen binary not found at {dsdgen_bin}. {debug_info}. " + "Make sure --archives #dsdgen is set." ) # Ensure the binary is executable (archive extraction may lose permissions) @@ -177,11 +198,8 @@ def run_dsdgen_and_read(child_index, scale, parallel, update=None): os.remove(filepath) finally: - # Best-effort cleanup of temp directory - try: - os.rmdir(work_dir) - except OSError: - pass + # Best-effort cleanup of temp directory (may contain leftover files) + shutil.rmtree(work_dir, ignore_errors=True) def rename_partition_dirs(spark, output_dir, table_names): @@ -303,7 +321,16 @@ def main(): # each task independently splits its output into per-table files. df = spark.createDataFrame(all_data_rdd, ["table_name", "value"]) - write_mode = "overwrite" if args.overwrite else "errorifexists" + # Determine write mode: + # --range → append (incremental generation across multiple spark-submit runs) + # --overwrite → overwrite (fresh start, wipe existing data) + # default → errorifexists (fail if output already exists) + if args.range: + write_mode = "append" + elif args.overwrite: + write_mode = "overwrite" + else: + write_mode = "errorifexists" df.write.partitionBy("table_name").mode(write_mode).text(args.output_dir) # Rename Hive-style "table_name=xxx" dirs to plain "xxx" for NDS pipeline compat diff --git a/scripts/k8s_datagen_smoketest.sh b/scripts/k8s_datagen_smoketest.sh new file mode 100755 index 0000000..90670cc --- /dev/null +++ b/scripts/k8s_datagen_smoketest.sh @@ -0,0 +1,116 @@ +#!/usr/bin/env bash +set -euo pipefail + +# One-command smoke test for nds_gen_data_spark.py on Minikube. +# It verifies Spark-on-K8s submission and checks generated table outputs. + +SCRIPT_DIR="$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)" +REPO_ROOT="$(cd "${SCRIPT_DIR}/.." && pwd)" +NDS_DIR="${REPO_ROOT}/nds" + +SPARK_HOME="${SPARK_HOME:-$HOME/local/spark}" +SPARK_SUBMIT="${SPARK_HOME}/bin/spark-submit" +K8S_IMAGE="${K8S_IMAGE:-spark-py:v3.5.4-py312}" +ARCHIVE_PATH="${NDS_DIR}/tpcds-gen/target/lib/dsdgen.tar.gz" +OUTPUT_ROOT="${OUTPUT_ROOT:-/tmp/nds_shared}" +OUTPUT_DIR="${OUTPUT_ROOT}/output" +MOUNT_PID_FILE="${OUTPUT_ROOT}/.minikube_mount.pid" +MINIKUBE_CPUS="${MINIKUBE_CPUS:-4}" +MINIKUBE_MEMORY_MB="${MINIKUBE_MEMORY_MB:-8192}" + +require_cmd() { + if ! command -v "$1" >/dev/null 2>&1; then + echo "ERROR: command not found: $1" >&2 + exit 1 + fi +} + +require_file() { + if [[ ! -f "$1" ]]; then + echo "ERROR: file not found: $1" >&2 + exit 1 + fi +} + +echo "[1/7] Checking prerequisites..." +require_cmd minikube +require_cmd docker +require_cmd "${SPARK_SUBMIT}" +require_file "${NDS_DIR}/nds_gen_data_spark.py" +require_file "${NDS_DIR}/Dockerfile.spark-k8s" + +if [[ ! -f "${ARCHIVE_PATH}" ]]; then + echo "ERROR: ${ARCHIVE_PATH} not found." >&2 + echo "Build and package dsdgen first:" >&2 + echo " cd nds/tpcds-gen" >&2 + echo " make clean all LINUX_CC='gcc -fcommon'" >&2 + echo " cd target && tar czf lib/dsdgen.tar.gz tools/" >&2 + exit 1 +fi + +if ! docker ps >/dev/null 2>&1; then + echo "ERROR: docker is not accessible for current user." >&2 + echo "Run: sudo usermod -aG docker \$USER && newgrp docker" >&2 + exit 1 +fi + +export PATH="${SPARK_HOME}/bin:${PATH}" + +echo "[2/7] Starting minikube (if needed)..." +if ! minikube status >/dev/null 2>&1; then + minikube start --cpus="${MINIKUBE_CPUS}" --memory="${MINIKUBE_MEMORY_MB}" --driver=docker +fi + +echo "[3/7] Ensuring Spark service account/role..." +minikube kubectl -- create serviceaccount spark >/dev/null 2>&1 || true +minikube kubectl -- create clusterrolebinding spark-role \ + --clusterrole=edit --serviceaccount=default:spark >/dev/null 2>&1 || true + +echo "[4/7] Starting shared mount for local output..." +mkdir -p "${OUTPUT_ROOT}" +chmod 777 "${OUTPUT_ROOT}" +if [[ -f "${MOUNT_PID_FILE}" ]]; then + old_pid="$(cat "${MOUNT_PID_FILE}" || true)" + if [[ -n "${old_pid}" ]] && kill -0 "${old_pid}" >/dev/null 2>&1; then + kill "${old_pid}" >/dev/null 2>&1 || true + fi +fi +nohup minikube mount "${OUTPUT_ROOT}:${OUTPUT_ROOT}" --uid 185 --gid 0 \ + >"${OUTPUT_ROOT}/minikube_mount.log" 2>&1 & +echo $! > "${MOUNT_PID_FILE}" +sleep 3 + +echo "[5/7] Building Spark image inside minikube docker daemon..." +eval "$(minikube docker-env)" +docker build -t "${K8S_IMAGE}" -f "${NDS_DIR}/Dockerfile.spark-k8s" "${SPARK_HOME}" + +echo "[6/7] Running Spark data generation smoke test..." +rm -rf "${OUTPUT_DIR}" +"${SPARK_SUBMIT}" \ + --master "k8s://$(minikube kubectl -- config view --minify -o jsonpath='{.clusters[0].cluster.server}')" \ + --deploy-mode client \ + --conf "spark.kubernetes.container.image=${K8S_IMAGE}" \ + --conf "spark.kubernetes.authenticate.driver.serviceAccountName=spark" \ + --conf "spark.executor.instances=2" \ + --conf "spark.kubernetes.container.image.pullPolicy=Never" \ + --conf "spark.pyspark.python=python3" \ + --conf "spark.pyspark.driver.python=python3" \ + --conf "spark.kubernetes.executor.volumes.hostPath.nds-data.mount.path=${OUTPUT_ROOT}" \ + --conf "spark.kubernetes.executor.volumes.hostPath.nds-data.options.path=${OUTPUT_ROOT}" \ + --archives "${ARCHIVE_PATH}#dsdgen" \ + "${NDS_DIR}/nds_gen_data_spark.py" 1 2 "${OUTPUT_DIR}" --overwrite + +echo "[7/7] Verifying generated output..." +table_count="$(ls -d "${OUTPUT_DIR}"/*/ 2>/dev/null | wc -l | tr -d ' ')" +if [[ "${table_count}" != "25" ]]; then + echo "ERROR: expected 25 source table directories, got ${table_count}" >&2 + exit 1 +fi +if ! head -1 "${OUTPUT_DIR}/store_sales/"*.txt >/dev/null 2>&1; then + echo "ERROR: store_sales data not found or empty." >&2 + exit 1 +fi + +echo "Smoke test passed." +echo "Output dir: ${OUTPUT_DIR}" +echo "Table count: ${table_count}" From 0cf1830a10e5bc960e1094933326c049c7e113be Mon Sep 17 00:00:00 2001 From: Allen Xu Date: Fri, 13 Feb 2026 17:08:08 +0800 Subject: [PATCH 3/7] resolve comments Signed-off-by: Allen Xu --- nds/README.md | 9 +-------- nds/nds_gen_data_spark.py | 36 +++++++++++++++++++++++------------- 2 files changed, 24 insertions(+), 21 deletions(-) diff --git a/nds/README.md b/nds/README.md index a640a6c..5f8443a 100644 --- a/nds/README.md +++ b/nds/README.md @@ -165,14 +165,7 @@ following conditions are true: 5. For incremental generation (`--range`), do not rerun overlapping child ranges, otherwise duplicate data will be produced. -Using the spark-submit-template: - -```bash -./spark-submit-template datagen_submit.template \ - nds_gen_data_spark.py 100 100 hdfs:///data/raw_sf100 --overwrite -``` - -Or directly with spark-submit (K8s example): +Directly with spark-submit (K8s example): ```bash spark-submit --master k8s://https:// \ diff --git a/nds/nds_gen_data_spark.py b/nds/nds_gen_data_spark.py index 51ade1b..5787e59 100644 --- a/nds/nds_gen_data_spark.py +++ b/nds/nds_gen_data_spark.py @@ -39,12 +39,12 @@ Prerequisites: 1. Build tpcds-gen (cd tpcds-gen && make) - 2. The build produces target/lib/dsdgen.jar which is a jar archive containing + 2. The build produces target/lib/dsdgen.tar.gz containing the tools/ directory (dsdgen binary + *.dst files). Usage: spark-submit [--master k8s://... | yarn | ...] \\ - --archives tpcds-gen/target/lib/dsdgen.jar#dsdgen \\ + --archives tpcds-gen/target/lib/dsdgen.tar.gz#dsdgen \\ nds_gen_data_spark.py \\ [options] @@ -54,17 +54,17 @@ spark-submit --master k8s://https:// \\ --deploy-mode cluster \\ --conf spark.kubernetes.container.image= \\ - --archives tpcds-gen/target/lib/dsdgen.jar#dsdgen \\ + --archives tpcds-gen/target/lib/dsdgen.tar.gz#dsdgen \\ nds_gen_data_spark.py 100 100 hdfs:///data/raw_sf100 --overwrite Example (YARN): spark-submit --master yarn \\ - --archives tpcds-gen/target/lib/dsdgen.jar#dsdgen \\ + --archives tpcds-gen/target/lib/dsdgen.tar.gz#dsdgen \\ nds_gen_data_spark.py 1000 200 hdfs:///data/raw_sf1000 Example (local testing): spark-submit --master 'local[4]' \\ - --archives tpcds-gen/target/lib/dsdgen.jar#dsdgen \\ + --archives tpcds-gen/target/lib/dsdgen.tar.gz#dsdgen \\ nds_gen_data_spark.py 1 2 /tmp/nds_test_data --overwrite """ @@ -101,7 +101,7 @@ def run_dsdgen_and_read(child_index, scale, parallel, update=None): This function runs inside a Spark executor task. The dsdgen binary and its auxiliary files (*.dst) are expected under SparkFiles root, extracted from - the archive passed via --archives dsdgen.jar#dsdgen. + the archive passed via --archives dsdgen.tar.gz#dsdgen. Each generated .dat file is read line-by-line and yielded as (table_name, line), which Spark then writes to the target filesystem partitioned by table_name. @@ -110,7 +110,7 @@ def run_dsdgen_and_read(child_index, scale, parallel, update=None): from pyspark import SparkFiles # Locate dsdgen binary from the extracted archive. - # The archive (dsdgen.tar.gz or dsdgen.jar) contains a tools/ directory. + # The archive (dsdgen.tar.gz) contains a tools/ directory. # With --archives '#dsdgen', Spark extracts contents under a # directory named 'dsdgen' in SparkFiles root. # Note: SparkFiles.getRootDirectory() may return a relative path (e.g. ".") @@ -190,7 +190,7 @@ def run_dsdgen_and_read(child_index, scale, parallel, update=None): if table_name is None: continue - with open(filepath, "r") as f: + with open(filepath, "r", encoding="utf-8") as f: for line in f: stripped = line.rstrip("\n\r") if stripped: @@ -217,6 +217,14 @@ def rename_partition_dirs(spark, output_dir, table_names): base_path = Path(output_dir) fs = jvm.org.apache.hadoop.fs.FileSystem.get(base_path.toUri(), hadoop_conf) + def _rename_or_raise(src_path, dst_path): + if fs.exists(dst_path): + raise FileExistsError( + f"Refusing to overwrite existing path during rename: {dst_path}" + ) + if not fs.rename(src_path, dst_path): + raise RuntimeError(f"HDFS rename failed: {src_path} -> {dst_path}") + for table in table_names: hive_dir = Path(output_dir, f"table_name={table}") target_dir = Path(output_dir, table) @@ -233,15 +241,17 @@ def rename_partition_dirs(spark, output_dir, table_names): if src_path.getName().startswith("_"): continue dst_path = Path(target_dir, src_path.getName()) - fs.rename(src_path, dst_path) - fs.delete(hive_dir, True) + _rename_or_raise(src_path, dst_path) + if not fs.delete(hive_dir, True): + raise RuntimeError(f"Failed to delete temporary partition dir: {hive_dir}") else: - fs.rename(hive_dir, target_dir) + _rename_or_raise(hive_dir, target_dir) # Clean up Hive-style _SUCCESS at root if present success_file = Path(output_dir, "_SUCCESS") if fs.exists(success_file): - fs.delete(success_file, False) + if not fs.delete(success_file, False): + print(f"WARNING: failed to delete {success_file}", file=sys.stderr) def main(): @@ -305,7 +315,7 @@ def main(): print(f" Output: {args.output_dir}") print(f" Overwrite: {args.overwrite}") print(f" Update: {update}") - print(f" Partitions:{num_partitions}") + print(f" Partitions: {num_partitions}") # Create RDD: one element per child index. # Each Spark task runs dsdgen for its child, reads output line by line, From 7a520eb72942be5a4f26405edd8a5f200b272d98 Mon Sep 17 00:00:00 2001 From: Allen Xu Date: Thu, 26 Feb 2026 14:45:09 +0800 Subject: [PATCH 4/7] Fix PR review comments for Spark-based data generation - Use dsdgen.tar.gz instead of dsdgen.jar for archive format consistency - Add return value checks in rename_partition_dirs to prevent silent failures - Use shutil.rmtree for robust temp directory cleanup - Add encoding="utf-8" to file open calls - Fix Partitions print alignment Made-with: Cursor --- nds/README.md | 7 ++++++- nds/datagen_submit.template | 36 ++++++++++++++++++++++++++++++++++++ nds/nds_gen_data_spark.py | 6 ++---- 3 files changed, 44 insertions(+), 5 deletions(-) create mode 100644 nds/datagen_submit.template diff --git a/nds/README.md b/nds/README.md index 5f8443a..3c3789a 100644 --- a/nds/README.md +++ b/nds/README.md @@ -146,7 +146,12 @@ runs data generation in parallel, and writes output to any Hadoop-compatible fil (HDFS, S3, GCS, ABFS, or local). It works with any Spark cluster manager: **K8s, YARN, Standalone, or local**. -**Prerequisites:** build tpcds-gen as before (`cd tpcds-gen && make`). +**Prerequisites:** build tpcds-gen as before (`cd tpcds-gen && make`), then package the +dsdgen tools as a tar.gz archive: + +```bash +cd tpcds-gen/target && tar czf lib/dsdgen.tar.gz tools/ +``` #### Required prerequisites for K8s + HDFS runs diff --git a/nds/datagen_submit.template b/nds/datagen_submit.template new file mode 100644 index 0000000..85f3631 --- /dev/null +++ b/nds/datagen_submit.template @@ -0,0 +1,36 @@ +# +# SPDX-FileCopyrightText: Copyright (c) 2024-2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved. +# SPDX-License-Identifier: Apache-2.0 +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +# Template for Spark-based TPC-DS data generation (nds_gen_data_spark.py). +# This replaces the Hadoop MapReduce approach and works on YARN, K8s, or Standalone. +# +# Usage: +# ./spark-submit-template datagen_submit.template \ +# nds_gen_data_spark.py 100 100 hdfs:///data/raw_sf100 --overwrite + +source base.template + +# Path to the dsdgen archive (built by: cd tpcds-gen && make, then tar czf) +export DSDGEN_ARCHIVE=${DSDGEN_ARCHIVE:-tpcds-gen/target/lib/dsdgen.tar.gz} + +export SPARK_CONF=("--master" "${SPARK_MASTER}" + "--deploy-mode" "client" + "--conf" "spark.driver.memory=${DRIVER_MEMORY}" + "--conf" "spark.executor.cores=${EXECUTOR_CORES}" + "--conf" "spark.executor.instances=${NUM_EXECUTORS}" + "--conf" "spark.executor.memory=${EXECUTOR_MEMORY}" + "--archives" "${DSDGEN_ARCHIVE}#dsdgen") diff --git a/nds/nds_gen_data_spark.py b/nds/nds_gen_data_spark.py index 5787e59..69907fc 100644 --- a/nds/nds_gen_data_spark.py +++ b/nds/nds_gen_data_spark.py @@ -39,8 +39,8 @@ Prerequisites: 1. Build tpcds-gen (cd tpcds-gen && make) - 2. The build produces target/lib/dsdgen.tar.gz containing - the tools/ directory (dsdgen binary + *.dst files). + 2. Package the tools directory as a tar.gz archive: + cd tpcds-gen/target && tar czf lib/dsdgen.tar.gz tools/ Usage: spark-submit [--master k8s://... | yarn | ...] \\ @@ -233,11 +233,9 @@ def _rename_or_raise(src_path, dst_path): continue if fs.exists(target_dir): - # Merge into existing directory (e.g., incremental range generation) statuses = fs.listStatus(hive_dir) for status in statuses: src_path = status.getPath() - # Skip _SUCCESS / hidden files if src_path.getName().startswith("_"): continue dst_path = Path(target_dir, src_path.getName()) From d8b186490728dcb8598c92868b051ce185b43b8e Mon Sep 17 00:00:00 2001 From: Allen Xu Date: Fri, 27 Feb 2026 12:04:55 +0800 Subject: [PATCH 5/7] Add K8s Dockerfiles and update data generation docs - Add tpcds-gen/Dockerfile.dsdgen for cross-compiling dsdgen on Linux - Add Dockerfile.k8s-test for building Spark K8s image with datagen - Remove k8s_datagen_smoketest.sh script - Update README with cross-compilation, image build, and minikube testing instructions Made-with: Cursor --- nds/Dockerfile.k8s-test | 8 ++ nds/README.md | 142 +++++++++++++++++++++++++++---- nds/tpcds-gen/Dockerfile.dsdgen | 8 ++ scripts/k8s_datagen_smoketest.sh | 116 ------------------------- 4 files changed, 141 insertions(+), 133 deletions(-) create mode 100644 nds/Dockerfile.k8s-test create mode 100644 nds/tpcds-gen/Dockerfile.dsdgen delete mode 100755 scripts/k8s_datagen_smoketest.sh diff --git a/nds/Dockerfile.k8s-test b/nds/Dockerfile.k8s-test new file mode 100644 index 0000000..f437595 --- /dev/null +++ b/nds/Dockerfile.k8s-test @@ -0,0 +1,8 @@ +FROM spark-py:spark-3.5.6 + +USER root + +COPY nds_gen_data_spark.py /opt/spark/work-dir/ +COPY tpcds-gen/target/lib/dsdgen.tar.gz /opt/spark/work-dir/ + +USER ${spark_uid} diff --git a/nds/README.md b/nds/README.md index 3c3789a..48518fa 100644 --- a/nds/README.md +++ b/nds/README.md @@ -209,13 +209,65 @@ optional arguments: --num_executors N Hint for number of Spark partitions (default: one per child). ``` -**Note:** The dsdgen binary in the archive must be compiled for the same OS/architecture -as the Spark executor nodes (typically Linux x86_64). If you build on macOS but run on -K8s (Linux), you need to cross-compile or build inside a Linux container. +You can also use the provided `datagen_submit.template` with `spark-submit-template`: + +```bash +./spark-submit-template datagen_submit.template \ + nds_gen_data_spark.py 100 100 hdfs:///data/raw_sf100 --overwrite +``` **Note for zsh users:** quote the `--archives` value because `#` may be interpreted by shell. For example: `--archives 'tpcds-gen/target/lib/dsdgen.tar.gz#dsdgen'`. +#### Cross-compiling dsdgen for Linux (K8s) + +The dsdgen binary in the archive must be compiled for the same OS/architecture as the Spark +executor nodes (typically Linux x86_64 or Linux ARM64). If you build on macOS but run on +K8s (Linux), use the provided `tpcds-gen/Dockerfile.dsdgen` to build dsdgen inside a Linux +container: + +```bash +cd tpcds-gen + +# Build dsdgen for Linux (automatically handles gcc -fcommon for GCC 10+) +docker build -f Dockerfile.dsdgen -t dsdgen-builder . + +# Extract the built tools directory +CID=$(docker create dsdgen-builder) && \ +docker cp "$CID":/tools/ /tmp/dsdgen-tools-linux/ && \ +docker rm "$CID" + +# Package as dsdgen.tar.gz +mkdir -p target/lib +cd /tmp && mkdir -p dsdgen-package && cp -r dsdgen-tools-linux dsdgen-package/tools +cd dsdgen-package && tar czf /nds/tpcds-gen/target/lib/dsdgen.tar.gz tools/ +``` + +#### Building the Spark K8s container image + +For K8s deployment, you need a Docker image that contains Spark, the data generation script, +and the dsdgen archive. The provided `Dockerfile.k8s-test` builds such an image on top of the +official Spark Python image: + +```bash +# Step 1: Build the Spark Python base image (from your Spark distribution) +cd $SPARK_HOME +./bin/docker-image-tool.sh -t \ + -p kubernetes/dockerfiles/spark/bindings/python/Dockerfile build + +# Step 2: Build the data generation image +cd /nds +# Edit Dockerfile.k8s-test if needed to match your Spark image tag +docker build -f Dockerfile.k8s-test -t nds-datagen: . +``` + +The resulting image includes `/opt/spark/work-dir/nds_gen_data_spark.py` and +`/opt/spark/work-dir/dsdgen.tar.gz`, ready for `spark-submit` with: + +```bash +--archives local:///opt/spark/work-dir/dsdgen.tar.gz#dsdgen +``` + #### K8s local filesystem caveat When `output_dir` is a local path (for example `/tmp/...`) and Spark executors run in K8s pods, @@ -223,26 +275,82 @@ each pod writes to its own container filesystem by default. You will not see mer the driver host unless a shared volume is configured. For production runs, prefer remote/shared storage such as HDFS, S3, GCS, or ABFS. -For local smoke tests on minikube, mount a shared host path and mount it into executor pods. +For local smoke tests on minikube, use a PersistentVolumeClaim mounted into the pod. -#### One-command K8s smoke test +#### Local K8s testing with minikube -Use the helper script: +To verify data generation on a local K8s cluster: ```bash -chmod +x ../scripts/k8s_datagen_smoketest.sh -../scripts/k8s_datagen_smoketest.sh +# 1. Start minikube (requires Docker runtime, e.g. Colima, Docker Desktop) +minikube start --driver=docker --cpus=4 --memory=6g + +# 2. Load the container image into minikube +minikube image load nds-datagen: + +# 3. Create Spark RBAC +kubectl create serviceaccount spark +kubectl create clusterrolebinding spark-role \ + --clusterrole=edit --serviceaccount=default:spark + +# 4. Create a PersistentVolumeClaim for output +kubectl apply -f - < --image-pull-policy=Never \ + --restart=Never --overrides='{ + "spec": { + "serviceAccountName": "spark", + "volumes": [{"name": "data", "persistentVolumeClaim": {"claimName": "nds-data"}}], + "containers": [{ + "name": "nds-test", + "image": "nds-datagen:", + "imagePullPolicy": "Never", + "command": ["/opt/spark/bin/spark-submit", "--master", "local[2]", + "--archives", "/opt/spark/work-dir/dsdgen.tar.gz#dsdgen", + "/opt/spark/work-dir/nds_gen_data_spark.py", + "1", "2", "/data/nds_test", "--overwrite"], + "resources": {"requests": {"memory": "2Gi", "cpu": "2"}}, + "volumeMounts": [{"name": "data", "mountPath": "/data"}] + }] + } +}' + +# 6. Wait for completion and check logs +kubectl wait --for=condition=Ready=false pod/nds-test --timeout=300s +kubectl logs nds-test | tail -5 +# Expected: "=== Data generation complete: /data/nds_test ===" + +# 7. Verify output (25 TPC-DS tables) +kubectl run nds-check --image=nds-datagen: --image-pull-policy=Never \ + --restart=Never --overrides='{ + "spec": { + "volumes": [{"name": "data", "persistentVolumeClaim": {"claimName": "nds-data"}}], + "containers": [{ + "name": "check", "image": "nds-datagen:", "imagePullPolicy": "Never", + "command": ["ls", "/data/nds_test/"], + "volumeMounts": [{"name": "data", "mountPath": "/data"}] + }] + } +}' +sleep 10 && kubectl logs nds-check +# Should list: call_center, catalog_page, ..., web_site (25 directories) + +# 8. Cleanup +kubectl delete pod nds-test nds-check --ignore-not-found +kubectl delete pvc nds-data --ignore-not-found ``` -What the script does: - -- starts minikube if needed -- configures Spark service account/role -- mounts `/tmp/nds_shared` into minikube with Spark-compatible UID/GID -- builds Spark Python image from `nds/Dockerfile.spark-k8s` -- submits `nds_gen_data_spark.py` on K8s -- verifies output (`25` source table folders and non-empty `store_sales`) - ### Convert CSV to Parquet or Other data sources To do the data conversion, the `nds_transcode.py` need to be submitted as a Spark job. User can leverage diff --git a/nds/tpcds-gen/Dockerfile.dsdgen b/nds/tpcds-gen/Dockerfile.dsdgen new file mode 100644 index 0000000..f1c3371 --- /dev/null +++ b/nds/tpcds-gen/Dockerfile.dsdgen @@ -0,0 +1,8 @@ +FROM ubuntu:22.04 AS builder +RUN apt-get update && apt-get install -y build-essential byacc flex && rm -rf /var/lib/apt/lists/* +COPY target/tools /build/tools +WORKDIR /build/tools +RUN make clean && make LINUX_CFLAGS="-g -Wall -fcommon" +FROM ubuntu:22.04 +COPY --from=builder /build/tools /tools +CMD ["true"] diff --git a/scripts/k8s_datagen_smoketest.sh b/scripts/k8s_datagen_smoketest.sh deleted file mode 100755 index 90670cc..0000000 --- a/scripts/k8s_datagen_smoketest.sh +++ /dev/null @@ -1,116 +0,0 @@ -#!/usr/bin/env bash -set -euo pipefail - -# One-command smoke test for nds_gen_data_spark.py on Minikube. -# It verifies Spark-on-K8s submission and checks generated table outputs. - -SCRIPT_DIR="$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)" -REPO_ROOT="$(cd "${SCRIPT_DIR}/.." && pwd)" -NDS_DIR="${REPO_ROOT}/nds" - -SPARK_HOME="${SPARK_HOME:-$HOME/local/spark}" -SPARK_SUBMIT="${SPARK_HOME}/bin/spark-submit" -K8S_IMAGE="${K8S_IMAGE:-spark-py:v3.5.4-py312}" -ARCHIVE_PATH="${NDS_DIR}/tpcds-gen/target/lib/dsdgen.tar.gz" -OUTPUT_ROOT="${OUTPUT_ROOT:-/tmp/nds_shared}" -OUTPUT_DIR="${OUTPUT_ROOT}/output" -MOUNT_PID_FILE="${OUTPUT_ROOT}/.minikube_mount.pid" -MINIKUBE_CPUS="${MINIKUBE_CPUS:-4}" -MINIKUBE_MEMORY_MB="${MINIKUBE_MEMORY_MB:-8192}" - -require_cmd() { - if ! command -v "$1" >/dev/null 2>&1; then - echo "ERROR: command not found: $1" >&2 - exit 1 - fi -} - -require_file() { - if [[ ! -f "$1" ]]; then - echo "ERROR: file not found: $1" >&2 - exit 1 - fi -} - -echo "[1/7] Checking prerequisites..." -require_cmd minikube -require_cmd docker -require_cmd "${SPARK_SUBMIT}" -require_file "${NDS_DIR}/nds_gen_data_spark.py" -require_file "${NDS_DIR}/Dockerfile.spark-k8s" - -if [[ ! -f "${ARCHIVE_PATH}" ]]; then - echo "ERROR: ${ARCHIVE_PATH} not found." >&2 - echo "Build and package dsdgen first:" >&2 - echo " cd nds/tpcds-gen" >&2 - echo " make clean all LINUX_CC='gcc -fcommon'" >&2 - echo " cd target && tar czf lib/dsdgen.tar.gz tools/" >&2 - exit 1 -fi - -if ! docker ps >/dev/null 2>&1; then - echo "ERROR: docker is not accessible for current user." >&2 - echo "Run: sudo usermod -aG docker \$USER && newgrp docker" >&2 - exit 1 -fi - -export PATH="${SPARK_HOME}/bin:${PATH}" - -echo "[2/7] Starting minikube (if needed)..." -if ! minikube status >/dev/null 2>&1; then - minikube start --cpus="${MINIKUBE_CPUS}" --memory="${MINIKUBE_MEMORY_MB}" --driver=docker -fi - -echo "[3/7] Ensuring Spark service account/role..." -minikube kubectl -- create serviceaccount spark >/dev/null 2>&1 || true -minikube kubectl -- create clusterrolebinding spark-role \ - --clusterrole=edit --serviceaccount=default:spark >/dev/null 2>&1 || true - -echo "[4/7] Starting shared mount for local output..." -mkdir -p "${OUTPUT_ROOT}" -chmod 777 "${OUTPUT_ROOT}" -if [[ -f "${MOUNT_PID_FILE}" ]]; then - old_pid="$(cat "${MOUNT_PID_FILE}" || true)" - if [[ -n "${old_pid}" ]] && kill -0 "${old_pid}" >/dev/null 2>&1; then - kill "${old_pid}" >/dev/null 2>&1 || true - fi -fi -nohup minikube mount "${OUTPUT_ROOT}:${OUTPUT_ROOT}" --uid 185 --gid 0 \ - >"${OUTPUT_ROOT}/minikube_mount.log" 2>&1 & -echo $! > "${MOUNT_PID_FILE}" -sleep 3 - -echo "[5/7] Building Spark image inside minikube docker daemon..." -eval "$(minikube docker-env)" -docker build -t "${K8S_IMAGE}" -f "${NDS_DIR}/Dockerfile.spark-k8s" "${SPARK_HOME}" - -echo "[6/7] Running Spark data generation smoke test..." -rm -rf "${OUTPUT_DIR}" -"${SPARK_SUBMIT}" \ - --master "k8s://$(minikube kubectl -- config view --minify -o jsonpath='{.clusters[0].cluster.server}')" \ - --deploy-mode client \ - --conf "spark.kubernetes.container.image=${K8S_IMAGE}" \ - --conf "spark.kubernetes.authenticate.driver.serviceAccountName=spark" \ - --conf "spark.executor.instances=2" \ - --conf "spark.kubernetes.container.image.pullPolicy=Never" \ - --conf "spark.pyspark.python=python3" \ - --conf "spark.pyspark.driver.python=python3" \ - --conf "spark.kubernetes.executor.volumes.hostPath.nds-data.mount.path=${OUTPUT_ROOT}" \ - --conf "spark.kubernetes.executor.volumes.hostPath.nds-data.options.path=${OUTPUT_ROOT}" \ - --archives "${ARCHIVE_PATH}#dsdgen" \ - "${NDS_DIR}/nds_gen_data_spark.py" 1 2 "${OUTPUT_DIR}" --overwrite - -echo "[7/7] Verifying generated output..." -table_count="$(ls -d "${OUTPUT_DIR}"/*/ 2>/dev/null | wc -l | tr -d ' ')" -if [[ "${table_count}" != "25" ]]; then - echo "ERROR: expected 25 source table directories, got ${table_count}" >&2 - exit 1 -fi -if ! head -1 "${OUTPUT_DIR}/store_sales/"*.txt >/dev/null 2>&1; then - echo "ERROR: store_sales data not found or empty." >&2 - exit 1 -fi - -echo "Smoke test passed." -echo "Output dir: ${OUTPUT_DIR}" -echo "Table count: ${table_count}" From ede7d6d6dd6c8de4e50e209afff0cd0906a14ace Mon Sep 17 00:00:00 2001 From: Allen Xu Date: Fri, 27 Feb 2026 15:10:03 +0800 Subject: [PATCH 6/7] Address remaining PR #248 review comments - Add try/except for --range int() parsing with clear error message - Remove hardcoded py4j version in Dockerfile.spark-k8s using symlink - Add base image build instructions and ARG to Dockerfile.k8s-test - Add kubectl wait for PVC binding in README minikube guide Made-with: Cursor --- nds/Dockerfile.k8s-test | 7 ++++++- nds/Dockerfile.spark-k8s | 5 ++++- nds/README.md | 1 + nds/nds_gen_data_spark.py | 6 +++++- 4 files changed, 16 insertions(+), 3 deletions(-) diff --git a/nds/Dockerfile.k8s-test b/nds/Dockerfile.k8s-test index f437595..0bf5e60 100644 --- a/nds/Dockerfile.k8s-test +++ b/nds/Dockerfile.k8s-test @@ -1,4 +1,9 @@ -FROM spark-py:spark-3.5.6 +# Base image must be built first from your Spark distribution: +# cd $SPARK_HOME && ./bin/docker-image-tool.sh -t \ +# -p kubernetes/dockerfiles/spark/bindings/python/Dockerfile build +# Then update the FROM line below to match your image name and tag. +ARG BASE_IMAGE=spark-py:spark-3.5.6 +FROM ${BASE_IMAGE} USER root diff --git a/nds/Dockerfile.spark-k8s b/nds/Dockerfile.spark-k8s index 1cd5be9..7a43208 100644 --- a/nds/Dockerfile.spark-k8s +++ b/nds/Dockerfile.spark-k8s @@ -17,7 +17,7 @@ RUN apt-get update && \ # Set up Spark directory structure ENV SPARK_HOME=/opt/spark ENV PATH="${SPARK_HOME}/bin:${PATH}" -ENV PYTHONPATH="${SPARK_HOME}/python:${SPARK_HOME}/python/lib/py4j-0.10.9.7-src.zip:${PYTHONPATH}" +ENV PYTHONPATH="${SPARK_HOME}/python:${SPARK_HOME}/python/lib/py4j-src.zip" # Copy Spark distribution COPY jars ${SPARK_HOME}/jars @@ -30,6 +30,9 @@ COPY kubernetes/dockerfiles/spark/decom.sh /opt/ RUN chmod +x /opt/entrypoint.sh /opt/decom.sh && \ mkdir -p ${SPARK_HOME}/work-dir && \ chmod g+w ${SPARK_HOME}/work-dir && \ + # Create a stable symlink so PYTHONPATH works across py4j versions + ln -sf $(ls ${SPARK_HOME}/python/lib/py4j-*-src.zip | head -1) \ + ${SPARK_HOME}/python/lib/py4j-src.zip && \ # Create spark user useradd -u ${spark_uid} -r -g root -d ${SPARK_HOME} -s /sbin/nologin spark diff --git a/nds/README.md b/nds/README.md index 48518fa..235b7ee 100644 --- a/nds/README.md +++ b/nds/README.md @@ -305,6 +305,7 @@ spec: requests: storage: 2Gi EOF +kubectl wait --for=jsonpath='{.status.phase}'=Bound pvc/nds-data --timeout=60s # 5. Run a small-scale data generation (scale=1, parallel=2) kubectl run nds-test --image=nds-datagen: --image-pull-policy=Never \ diff --git a/nds/nds_gen_data_spark.py b/nds/nds_gen_data_spark.py index 69907fc..24a3e7f 100644 --- a/nds/nds_gen_data_spark.py +++ b/nds/nds_gen_data_spark.py @@ -286,7 +286,11 @@ def main(): if len(parts) != 2: print("ERROR: --range must be 'start,end'", file=sys.stderr) sys.exit(1) - range_start, range_end = int(parts[0]), int(parts[1]) + try: + range_start, range_end = int(parts[0]), int(parts[1]) + except ValueError as e: + print(f"ERROR: --range values must be integers: {e}", file=sys.stderr) + sys.exit(1) if range_start < 1 or range_end > args.parallel or range_start > range_end: print("ERROR: range must satisfy 1 <= start <= end <= parallel", file=sys.stderr) sys.exit(1) From 3160717d71cfaa6e4fa654e8ea4e0ba68f8be68e Mon Sep 17 00:00:00 2001 From: Allen Xu Date: Fri, 27 Feb 2026 15:52:16 +0800 Subject: [PATCH 7/7] Fix py4j symlink error handling and document Dockerfile.spark-k8s - Add guard in Dockerfile.spark-k8s to fail build if no py4j zip found - Add usage instructions and build context note to Dockerfile.spark-k8s - Document both Dockerfile options (k8s-test vs spark-k8s) in README with comparison table and step-by-step instructions Made-with: Cursor --- nds/Dockerfile.spark-k8s | 17 +++++++++++++---- nds/README.md | 39 +++++++++++++++++++++++++++++++++------ 2 files changed, 46 insertions(+), 10 deletions(-) diff --git a/nds/Dockerfile.spark-k8s b/nds/Dockerfile.spark-k8s index 7a43208..e9a5637 100644 --- a/nds/Dockerfile.spark-k8s +++ b/nds/Dockerfile.spark-k8s @@ -1,5 +1,13 @@ -# Custom Spark + PySpark image for K8s data generation testing. -# Based on Ubuntu 24.04 to match the host Python 3.12 version. +# Standalone Spark + PySpark image for K8s deployment. +# Builds a complete Spark image from a Spark distribution directory. +# Build context must be $SPARK_HOME (the unpacked Spark distribution). +# +# Usage: +# cd $SPARK_HOME +# docker build -f /nds/Dockerfile.spark-k8s -t spark-custom: . +# +# For a simpler setup, use Dockerfile.k8s-test instead, which layers on top +# of the official pre-built Spark Python image. FROM ubuntu:24.04 ARG spark_uid=185 @@ -31,8 +39,9 @@ RUN chmod +x /opt/entrypoint.sh /opt/decom.sh && \ mkdir -p ${SPARK_HOME}/work-dir && \ chmod g+w ${SPARK_HOME}/work-dir && \ # Create a stable symlink so PYTHONPATH works across py4j versions - ln -sf $(ls ${SPARK_HOME}/python/lib/py4j-*-src.zip | head -1) \ - ${SPARK_HOME}/python/lib/py4j-src.zip && \ + PY4J_ZIP=$(ls ${SPARK_HOME}/python/lib/py4j-*-src.zip 2>/dev/null | head -1) && \ + [ -n "$PY4J_ZIP" ] || { echo "ERROR: no py4j-*-src.zip found in ${SPARK_HOME}/python/lib/"; exit 1; } && \ + ln -sf "$PY4J_ZIP" ${SPARK_HOME}/python/lib/py4j-src.zip && \ # Create spark user useradd -u ${spark_uid} -r -g root -d ${SPARK_HOME} -s /sbin/nologin spark diff --git a/nds/README.md b/nds/README.md index 235b7ee..0308181 100644 --- a/nds/README.md +++ b/nds/README.md @@ -245,9 +245,16 @@ cd dsdgen-package && tar czf /nds/tpcds-gen/target/lib/dsdgen.tar. #### Building the Spark K8s container image -For K8s deployment, you need a Docker image that contains Spark, the data generation script, -and the dsdgen archive. The provided `Dockerfile.k8s-test` builds such an image on top of the -official Spark Python image: +Two Dockerfiles are provided for different use cases: + +| Dockerfile | Base | Build context | Use case | +|---|---|---|---| +| `Dockerfile.k8s-test` | Official `spark-py` image | `nds/` | Quick setup — layers datagen scripts on top of a pre-built Spark image | +| `Dockerfile.spark-k8s` | `ubuntu:24.04` | `$SPARK_HOME` | Standalone build — constructs a full Spark + PySpark image from a Spark distribution (no pre-built image needed) | + +**Option A: `Dockerfile.k8s-test` (recommended)** + +Requires building the official Spark Python image first: ```bash # Step 1: Build the Spark Python base image (from your Spark distribution) @@ -257,11 +264,31 @@ cd $SPARK_HOME # Step 2: Build the data generation image cd /nds -# Edit Dockerfile.k8s-test if needed to match your Spark image tag -docker build -f Dockerfile.k8s-test -t nds-datagen: . +docker build -f Dockerfile.k8s-test \ + --build-arg BASE_IMAGE=spark-py: \ + -t nds-datagen: . +``` + +**Option B: `Dockerfile.spark-k8s`** + +Builds everything from scratch using a Spark distribution directory as build context. +No pre-built Spark image needed, but the build context **must** be `$SPARK_HOME`: + +```bash +cd $SPARK_HOME +docker build -f /nds/Dockerfile.spark-k8s -t spark-custom: . +``` + +Then copy datagen scripts into the image: + +```bash +cd /nds +docker build -f Dockerfile.k8s-test \ + --build-arg BASE_IMAGE=spark-custom: \ + -t nds-datagen: . ``` -The resulting image includes `/opt/spark/work-dir/nds_gen_data_spark.py` and +Both options produce an image with `/opt/spark/work-dir/nds_gen_data_spark.py` and `/opt/spark/work-dir/dsdgen.tar.gz`, ready for `spark-submit` with: ```bash