diff --git a/nds/Dockerfile.k8s-test b/nds/Dockerfile.k8s-test new file mode 100644 index 00000000..0bf5e604 --- /dev/null +++ b/nds/Dockerfile.k8s-test @@ -0,0 +1,13 @@ +# Base image must be built first from your Spark distribution: +# cd $SPARK_HOME && ./bin/docker-image-tool.sh -t \ +# -p kubernetes/dockerfiles/spark/bindings/python/Dockerfile build +# Then update the FROM line below to match your image name and tag. +ARG BASE_IMAGE=spark-py:spark-3.5.6 +FROM ${BASE_IMAGE} + +USER root + +COPY nds_gen_data_spark.py /opt/spark/work-dir/ +COPY tpcds-gen/target/lib/dsdgen.tar.gz /opt/spark/work-dir/ + +USER ${spark_uid} diff --git a/nds/Dockerfile.spark-k8s b/nds/Dockerfile.spark-k8s new file mode 100644 index 00000000..e9a56377 --- /dev/null +++ b/nds/Dockerfile.spark-k8s @@ -0,0 +1,52 @@ +# Standalone Spark + PySpark image for K8s deployment. +# Builds a complete Spark image from a Spark distribution directory. +# Build context must be $SPARK_HOME (the unpacked Spark distribution). +# +# Usage: +# cd $SPARK_HOME +# docker build -f /nds/Dockerfile.spark-k8s -t spark-custom: . +# +# For a simpler setup, use Dockerfile.k8s-test instead, which layers on top +# of the official pre-built Spark Python image. +FROM ubuntu:24.04 + +ARG spark_uid=185 + +# Install Java 17 + Python 3.12 (default on 24.04) +RUN apt-get update && \ + apt-get install -y --no-install-recommends \ + openjdk-17-jre-headless \ + python3 \ + python3-pip \ + tini \ + procps \ + && rm -rf /var/cache/apt/* /var/lib/apt/lists/* + +# Set up Spark directory structure +ENV SPARK_HOME=/opt/spark +ENV PATH="${SPARK_HOME}/bin:${PATH}" +ENV PYTHONPATH="${SPARK_HOME}/python:${SPARK_HOME}/python/lib/py4j-src.zip" + +# Copy Spark distribution +COPY jars ${SPARK_HOME}/jars +COPY bin ${SPARK_HOME}/bin +COPY sbin ${SPARK_HOME}/sbin +COPY python ${SPARK_HOME}/python +COPY kubernetes/dockerfiles/spark/entrypoint.sh /opt/ +COPY kubernetes/dockerfiles/spark/decom.sh /opt/ + +RUN chmod +x /opt/entrypoint.sh /opt/decom.sh && \ + mkdir -p ${SPARK_HOME}/work-dir && \ + chmod g+w ${SPARK_HOME}/work-dir && \ + # Create a stable symlink so PYTHONPATH works across py4j versions + PY4J_ZIP=$(ls ${SPARK_HOME}/python/lib/py4j-*-src.zip 2>/dev/null | head -1) && \ + [ -n "$PY4J_ZIP" ] || { echo "ERROR: no py4j-*-src.zip found in ${SPARK_HOME}/python/lib/"; exit 1; } && \ + ln -sf "$PY4J_ZIP" ${SPARK_HOME}/python/lib/py4j-src.zip && \ + # Create spark user + useradd -u ${spark_uid} -r -g root -d ${SPARK_HOME} -s /sbin/nologin spark + +WORKDIR ${SPARK_HOME}/work-dir + +ENTRYPOINT [ "/opt/entrypoint.sh" ] + +USER ${spark_uid} diff --git a/nds/README.md b/nds/README.md index 91f5e880..0308181d 100644 --- a/nds/README.md +++ b/nds/README.md @@ -138,6 +138,247 @@ Example command: python nds_gen_data.py hdfs 100 100 /data/raw_sf100 --overwrite_output ``` +### Generate data with Spark (recommended for K8s / no MapReduce) + +`nds_gen_data_spark.py` is a PySpark application that replaces the Hadoop MapReduce +approach. It distributes the `dsdgen` binary across Spark executors via `--archives`, +runs data generation in parallel, and writes output to any Hadoop-compatible filesystem +(HDFS, S3, GCS, ABFS, or local). It works with any Spark cluster manager: **K8s, YARN, +Standalone, or local**. + +**Prerequisites:** build tpcds-gen as before (`cd tpcds-gen && make`), then package the +dsdgen tools as a tar.gz archive: + +```bash +cd tpcds-gen/target && tar czf lib/dsdgen.tar.gz tools/ +``` + +#### Required prerequisites for K8s + HDFS runs + +Before submitting `nds_gen_data_spark.py` to Spark on K8s, make sure all of the +following conditions are true: + +1. Driver and executor images include Hadoop client configuration (`core-site.xml` + and `hdfs-site.xml`) so both sides can access the same HDFS cluster. +2. `output_dir` is an explicit HDFS path (for example, `hdfs:///data/raw_sf1000` + or `hdfs://:8020/data/raw_sf1000`) instead of a local path. +3. The `dsdgen` archive is provided with `--archives` using a shell-quoted value + when needed (zsh users should quote `#`): + `--archives 'tpcds-gen/target/lib/dsdgen.tar.gz#dsdgen'`. +4. Python minor versions are consistent between driver and executor environments + (for example, both are Python 3.12). +5. For incremental generation (`--range`), do not rerun overlapping child ranges, + otherwise duplicate data will be produced. + +Directly with spark-submit (K8s example): + +```bash +spark-submit --master k8s://https:// \ + --deploy-mode cluster \ + --conf spark.kubernetes.container.image= \ + --conf spark.executor.instances=10 \ + --archives tpcds-gen/target/lib/dsdgen.tar.gz#dsdgen \ + nds_gen_data_spark.py 1000 200 hdfs:///data/raw_sf1000 --overwrite +``` + +For incremental generation (split across multiple spark-submit runs): + +```bash +# Run 1: children 1-100 +spark-submit --archives tpcds-gen/target/lib/dsdgen.tar.gz#dsdgen \ + nds_gen_data_spark.py 1000 200 hdfs:///data/raw_sf1000 --range 1,100 + +# Run 2: children 101-200 +spark-submit --archives tpcds-gen/target/lib/dsdgen.tar.gz#dsdgen \ + nds_gen_data_spark.py 1000 200 hdfs:///data/raw_sf1000 --range 101,200 +``` + +Arguments: + +```text +positional arguments: + scale Data scale factor in GB. + parallel Number of parallel dsdgen children (must be >= 2). + output_dir Output directory (hdfs://..., s3a://..., gs://..., or local path). + +optional arguments: + --range START,END Generate only this child range (inclusive). + Uses append mode automatically for incremental runs. + --overwrite Overwrite existing output directory. + --update N Generate update/maintenance dataset N. + --num_executors N Hint for number of Spark partitions (default: one per child). +``` + +You can also use the provided `datagen_submit.template` with `spark-submit-template`: + +```bash +./spark-submit-template datagen_submit.template \ + nds_gen_data_spark.py 100 100 hdfs:///data/raw_sf100 --overwrite +``` + +**Note for zsh users:** quote the `--archives` value because `#` may be interpreted by shell. +For example: `--archives 'tpcds-gen/target/lib/dsdgen.tar.gz#dsdgen'`. + +#### Cross-compiling dsdgen for Linux (K8s) + +The dsdgen binary in the archive must be compiled for the same OS/architecture as the Spark +executor nodes (typically Linux x86_64 or Linux ARM64). If you build on macOS but run on +K8s (Linux), use the provided `tpcds-gen/Dockerfile.dsdgen` to build dsdgen inside a Linux +container: + +```bash +cd tpcds-gen + +# Build dsdgen for Linux (automatically handles gcc -fcommon for GCC 10+) +docker build -f Dockerfile.dsdgen -t dsdgen-builder . + +# Extract the built tools directory +CID=$(docker create dsdgen-builder) && \ +docker cp "$CID":/tools/ /tmp/dsdgen-tools-linux/ && \ +docker rm "$CID" + +# Package as dsdgen.tar.gz +mkdir -p target/lib +cd /tmp && mkdir -p dsdgen-package && cp -r dsdgen-tools-linux dsdgen-package/tools +cd dsdgen-package && tar czf /nds/tpcds-gen/target/lib/dsdgen.tar.gz tools/ +``` + +#### Building the Spark K8s container image + +Two Dockerfiles are provided for different use cases: + +| Dockerfile | Base | Build context | Use case | +|---|---|---|---| +| `Dockerfile.k8s-test` | Official `spark-py` image | `nds/` | Quick setup — layers datagen scripts on top of a pre-built Spark image | +| `Dockerfile.spark-k8s` | `ubuntu:24.04` | `$SPARK_HOME` | Standalone build — constructs a full Spark + PySpark image from a Spark distribution (no pre-built image needed) | + +**Option A: `Dockerfile.k8s-test` (recommended)** + +Requires building the official Spark Python image first: + +```bash +# Step 1: Build the Spark Python base image (from your Spark distribution) +cd $SPARK_HOME +./bin/docker-image-tool.sh -t \ + -p kubernetes/dockerfiles/spark/bindings/python/Dockerfile build + +# Step 2: Build the data generation image +cd /nds +docker build -f Dockerfile.k8s-test \ + --build-arg BASE_IMAGE=spark-py: \ + -t nds-datagen: . +``` + +**Option B: `Dockerfile.spark-k8s`** + +Builds everything from scratch using a Spark distribution directory as build context. +No pre-built Spark image needed, but the build context **must** be `$SPARK_HOME`: + +```bash +cd $SPARK_HOME +docker build -f /nds/Dockerfile.spark-k8s -t spark-custom: . +``` + +Then copy datagen scripts into the image: + +```bash +cd /nds +docker build -f Dockerfile.k8s-test \ + --build-arg BASE_IMAGE=spark-custom: \ + -t nds-datagen: . +``` + +Both options produce an image with `/opt/spark/work-dir/nds_gen_data_spark.py` and +`/opt/spark/work-dir/dsdgen.tar.gz`, ready for `spark-submit` with: + +```bash +--archives local:///opt/spark/work-dir/dsdgen.tar.gz#dsdgen +``` + +#### K8s local filesystem caveat + +When `output_dir` is a local path (for example `/tmp/...`) and Spark executors run in K8s pods, +each pod writes to its own container filesystem by default. You will not see merged results on +the driver host unless a shared volume is configured. + +For production runs, prefer remote/shared storage such as HDFS, S3, GCS, or ABFS. +For local smoke tests on minikube, use a PersistentVolumeClaim mounted into the pod. + +#### Local K8s testing with minikube + +To verify data generation on a local K8s cluster: + +```bash +# 1. Start minikube (requires Docker runtime, e.g. Colima, Docker Desktop) +minikube start --driver=docker --cpus=4 --memory=6g + +# 2. Load the container image into minikube +minikube image load nds-datagen: + +# 3. Create Spark RBAC +kubectl create serviceaccount spark +kubectl create clusterrolebinding spark-role \ + --clusterrole=edit --serviceaccount=default:spark + +# 4. Create a PersistentVolumeClaim for output +kubectl apply -f - < --image-pull-policy=Never \ + --restart=Never --overrides='{ + "spec": { + "serviceAccountName": "spark", + "volumes": [{"name": "data", "persistentVolumeClaim": {"claimName": "nds-data"}}], + "containers": [{ + "name": "nds-test", + "image": "nds-datagen:", + "imagePullPolicy": "Never", + "command": ["/opt/spark/bin/spark-submit", "--master", "local[2]", + "--archives", "/opt/spark/work-dir/dsdgen.tar.gz#dsdgen", + "/opt/spark/work-dir/nds_gen_data_spark.py", + "1", "2", "/data/nds_test", "--overwrite"], + "resources": {"requests": {"memory": "2Gi", "cpu": "2"}}, + "volumeMounts": [{"name": "data", "mountPath": "/data"}] + }] + } +}' + +# 6. Wait for completion and check logs +kubectl wait --for=condition=Ready=false pod/nds-test --timeout=300s +kubectl logs nds-test | tail -5 +# Expected: "=== Data generation complete: /data/nds_test ===" + +# 7. Verify output (25 TPC-DS tables) +kubectl run nds-check --image=nds-datagen: --image-pull-policy=Never \ + --restart=Never --overrides='{ + "spec": { + "volumes": [{"name": "data", "persistentVolumeClaim": {"claimName": "nds-data"}}], + "containers": [{ + "name": "check", "image": "nds-datagen:", "imagePullPolicy": "Never", + "command": ["ls", "/data/nds_test/"], + "volumeMounts": [{"name": "data", "mountPath": "/data"}] + }] + } +}' +sleep 10 && kubectl logs nds-check +# Should list: call_center, catalog_page, ..., web_site (25 directories) + +# 8. Cleanup +kubectl delete pod nds-test nds-check --ignore-not-found +kubectl delete pvc nds-data --ignore-not-found +``` + ### Convert CSV to Parquet or Other data sources To do the data conversion, the `nds_transcode.py` need to be submitted as a Spark job. User can leverage diff --git a/nds/datagen_submit.template b/nds/datagen_submit.template new file mode 100644 index 00000000..85f36312 --- /dev/null +++ b/nds/datagen_submit.template @@ -0,0 +1,36 @@ +# +# SPDX-FileCopyrightText: Copyright (c) 2024-2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved. +# SPDX-License-Identifier: Apache-2.0 +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +# Template for Spark-based TPC-DS data generation (nds_gen_data_spark.py). +# This replaces the Hadoop MapReduce approach and works on YARN, K8s, or Standalone. +# +# Usage: +# ./spark-submit-template datagen_submit.template \ +# nds_gen_data_spark.py 100 100 hdfs:///data/raw_sf100 --overwrite + +source base.template + +# Path to the dsdgen archive (built by: cd tpcds-gen && make, then tar czf) +export DSDGEN_ARCHIVE=${DSDGEN_ARCHIVE:-tpcds-gen/target/lib/dsdgen.tar.gz} + +export SPARK_CONF=("--master" "${SPARK_MASTER}" + "--deploy-mode" "client" + "--conf" "spark.driver.memory=${DRIVER_MEMORY}" + "--conf" "spark.executor.cores=${EXECUTOR_CORES}" + "--conf" "spark.executor.instances=${NUM_EXECUTORS}" + "--conf" "spark.executor.memory=${EXECUTOR_MEMORY}" + "--archives" "${DSDGEN_ARCHIVE}#dsdgen") diff --git a/nds/nds_gen_data_spark.py b/nds/nds_gen_data_spark.py new file mode 100644 index 00000000..24a3e7f4 --- /dev/null +++ b/nds/nds_gen_data_spark.py @@ -0,0 +1,356 @@ +#!/usr/bin/env python3 +# -*- coding: utf-8 -*- +# +# SPDX-FileCopyrightText: Copyright (c) 2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved. +# SPDX-License-Identifier: Apache-2.0 +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +# ----- +# +# Certain portions of the contents of this file are derived from TPC-DS version 3.2.0 +# (retrieved from www.tpc.org/tpc_documents_current_versions/current_specifications5.asp). +# Such portions are subject to copyrights held by Transaction Processing Performance Council ("TPC") +# and licensed under the TPC EULA (a copy of which accompanies this file as "TPC EULA" and is also +# available at http://www.tpc.org/tpc_documents_current_versions/current_specifications5.asp) (the "TPC EULA"). +# +# You may not use this file except in compliance with the TPC EULA. +# DISCLAIMER: Portions of this file is derived from the TPC-DS Benchmark and as such any results +# obtained using this file are not comparable to published TPC-DS Benchmark results, as the results +# obtained from using this file do not comply with the TPC-DS Benchmark. +# + +""" +Spark-based TPC-DS data generation — replaces the Hadoop MapReduce approach. + +Distributes dsdgen across Spark executors, writes output to any Hadoop-compatible +filesystem (HDFS, S3, GCS, ABFS, local). Works with any Spark cluster manager +(K8s, YARN, Standalone, local). + +Prerequisites: + 1. Build tpcds-gen (cd tpcds-gen && make) + 2. Package the tools directory as a tar.gz archive: + cd tpcds-gen/target && tar czf lib/dsdgen.tar.gz tools/ + +Usage: + spark-submit [--master k8s://... | yarn | ...] \\ + --archives tpcds-gen/target/lib/dsdgen.tar.gz#dsdgen \\ + nds_gen_data_spark.py \\ + [options] + + The --archives flag distributes and extracts the dsdgen toolset to every executor. + +Example (K8s): + spark-submit --master k8s://https:// \\ + --deploy-mode cluster \\ + --conf spark.kubernetes.container.image= \\ + --archives tpcds-gen/target/lib/dsdgen.tar.gz#dsdgen \\ + nds_gen_data_spark.py 100 100 hdfs:///data/raw_sf100 --overwrite + +Example (YARN): + spark-submit --master yarn \\ + --archives tpcds-gen/target/lib/dsdgen.tar.gz#dsdgen \\ + nds_gen_data_spark.py 1000 200 hdfs:///data/raw_sf1000 + +Example (local testing): + spark-submit --master 'local[4]' \\ + --archives tpcds-gen/target/lib/dsdgen.tar.gz#dsdgen \\ + nds_gen_data_spark.py 1 2 /tmp/nds_test_data --overwrite +""" + +import argparse +import os +import shutil +import subprocess +import sys +import tempfile + +from pyspark.sql import SparkSession + +# All source (qualification/power-run) table names in TPC-DS +SOURCE_TABLE_NAMES = [ + 'call_center', 'catalog_page', 'catalog_returns', 'catalog_sales', + 'customer', 'customer_address', 'customer_demographics', 'date_dim', + 'dbgen_version', 'household_demographics', 'income_band', 'inventory', + 'item', 'promotion', 'reason', 'ship_mode', 'store', 'store_returns', + 'store_sales', 'time_dim', 'warehouse', 'web_page', 'web_returns', + 'web_sales', 'web_site', +] + +# Maintenance (data-update) table names +MAINTENANCE_TABLE_NAMES = [ + 's_catalog_order', 's_catalog_order_lineitem', 's_catalog_returns', + 's_inventory', 's_purchase', 's_purchase_lineitem', 's_store_returns', + 's_web_order', 's_web_order_lineitem', 's_web_returns', + 'delete', 'inventory_delete', +] + + +def run_dsdgen_and_read(child_index, scale, parallel, update=None): + """Execute dsdgen for one child partition, yield (table_name, line) pairs. + + This function runs inside a Spark executor task. The dsdgen binary and its + auxiliary files (*.dst) are expected under SparkFiles root, extracted from + the archive passed via --archives dsdgen.tar.gz#dsdgen. + + Each generated .dat file is read line-by-line and yielded as (table_name, line), + which Spark then writes to the target filesystem partitioned by table_name. + Memory usage is bounded: only one line is held in memory at a time. + """ + from pyspark import SparkFiles + + # Locate dsdgen binary from the extracted archive. + # The archive (dsdgen.tar.gz) contains a tools/ directory. + # With --archives '#dsdgen', Spark extracts contents under a + # directory named 'dsdgen' in SparkFiles root. + # Note: SparkFiles.getRootDirectory() may return a relative path (e.g. ".") + # in K8s mode, so we must resolve to absolute paths to avoid issues when + # subprocess.run() changes cwd before resolving the executable path. + archive_root = os.path.abspath(SparkFiles.getRootDirectory()) + tools_dir = os.path.join(archive_root, "dsdgen", "tools") + dsdgen_bin = os.path.join(tools_dir, "dsdgen") + + if not os.path.isfile(dsdgen_bin): + # Provide detailed debug info for troubleshooting archive extraction + import glob + dsdgen_dir = os.path.join(archive_root, "dsdgen") + debug_info = ( + f"archive_root={archive_root}, " + f"dsdgen_dir exists={os.path.exists(dsdgen_dir)}, " + f"dsdgen_dir isdir={os.path.isdir(dsdgen_dir)}" + ) + if os.path.isdir(dsdgen_dir): + contents = glob.glob(os.path.join(dsdgen_dir, "**"), recursive=True)[:30] + debug_info += f", contents={contents}" + elif os.path.isdir(archive_root): + contents = glob.glob(os.path.join(archive_root, "**"), recursive=True)[:30] + debug_info += f", root_contents={contents}" + raise FileNotFoundError( + f"dsdgen binary not found at {dsdgen_bin}. {debug_info}. " + "Make sure --archives #dsdgen is set." + ) + + # Ensure the binary is executable (archive extraction may lose permissions) + if not os.access(dsdgen_bin, os.X_OK): + os.chmod(dsdgen_bin, 0o755) + + # Temp directory for dsdgen output; each task gets its own + work_dir = tempfile.mkdtemp(prefix=f"dsdgen_c{child_index}_") + + try: + cmd = [ + dsdgen_bin, + "-dir", work_dir, + "-force", "Y", + "-scale", str(scale), + "-parallel", str(parallel), + "-child", str(child_index), + ] + if update is not None: + cmd += ["-update", str(update)] + + proc = subprocess.run(cmd, cwd=tools_dir, capture_output=True, text=True) + if proc.returncode != 0: + raise RuntimeError( + f"dsdgen failed for child {child_index} (exit {proc.returncode}): " + f"{proc.stderr}" + ) + + # Read .dat files and yield (table_name, line) pairs + regular_suffix = f"_{child_index}_{parallel}.dat" + + for fname in sorted(os.listdir(work_dir)): + filepath = os.path.join(work_dir, fname) + if not os.path.isfile(filepath): + continue + + table_name = None + + if fname.endswith(regular_suffix): + # Regular table: call_center_1_100.dat → table "call_center" + table_name = fname[: -len(regular_suffix)] + elif fname.endswith(".dat"): + # Delete / inventory_delete tables have different naming: + # delete_.dat, inventory_delete_.dat + for special in ("inventory_delete", "delete"): + if fname.startswith(special): + table_name = special + break + + if table_name is None: + continue + + with open(filepath, "r", encoding="utf-8") as f: + for line in f: + stripped = line.rstrip("\n\r") + if stripped: + yield (table_name, stripped) + + os.remove(filepath) + finally: + # Best-effort cleanup of temp directory (may contain leftover files) + shutil.rmtree(work_dir, ignore_errors=True) + + +def rename_partition_dirs(spark, output_dir, table_names): + """Rename Hive-style 'table_name=xxx' directories to plain 'xxx'. + + Spark's partitionBy writes to directories like output_dir/table_name=call_center/. + The rest of the NDS pipeline expects output_dir/call_center/. This function + performs fast HDFS-level renames (metadata only, no data copy) via the Hadoop + FileSystem Java API on the driver. + """ + jvm = spark._jvm + hadoop_conf = spark._jsc.hadoopConfiguration() + Path = jvm.org.apache.hadoop.fs.Path + + base_path = Path(output_dir) + fs = jvm.org.apache.hadoop.fs.FileSystem.get(base_path.toUri(), hadoop_conf) + + def _rename_or_raise(src_path, dst_path): + if fs.exists(dst_path): + raise FileExistsError( + f"Refusing to overwrite existing path during rename: {dst_path}" + ) + if not fs.rename(src_path, dst_path): + raise RuntimeError(f"HDFS rename failed: {src_path} -> {dst_path}") + + for table in table_names: + hive_dir = Path(output_dir, f"table_name={table}") + target_dir = Path(output_dir, table) + + if not fs.exists(hive_dir): + continue + + if fs.exists(target_dir): + statuses = fs.listStatus(hive_dir) + for status in statuses: + src_path = status.getPath() + if src_path.getName().startswith("_"): + continue + dst_path = Path(target_dir, src_path.getName()) + _rename_or_raise(src_path, dst_path) + if not fs.delete(hive_dir, True): + raise RuntimeError(f"Failed to delete temporary partition dir: {hive_dir}") + else: + _rename_or_raise(hive_dir, target_dir) + + # Clean up Hive-style _SUCCESS at root if present + success_file = Path(output_dir, "_SUCCESS") + if fs.exists(success_file): + if not fs.delete(success_file, False): + print(f"WARNING: failed to delete {success_file}", file=sys.stderr) + + +def main(): + parser = argparse.ArgumentParser( + description="Spark-based TPC-DS data generation (replaces MapReduce)." + ) + parser.add_argument("scale", type=int, + help="Data scale factor in GB.") + parser.add_argument("parallel", type=int, + help="Number of parallel dsdgen children (must be >= 2).") + parser.add_argument("output_dir", + help="Output directory (hdfs://..., s3a://..., gs://..., or local path).") + parser.add_argument("--range", + help='Child range "start,end" (inclusive). ' + 'Default: generate all children 1..parallel. ' + 'Useful for splitting work across multiple spark-submit invocations.') + parser.add_argument("--overwrite", action="store_true", + help="Overwrite existing output directory.") + parser.add_argument("--update", type=int, default=None, + help="Generate update/maintenance dataset .") + parser.add_argument("--num_executors", type=int, default=None, + help="Hint for number of Spark partitions. " + "Default: one partition per child (optimal parallelism).") + args = parser.parse_args() + + if args.parallel < 2: + print("ERROR: parallel must be >= 2", file=sys.stderr) + sys.exit(1) + + range_start = 1 + range_end = args.parallel + if args.range: + parts = args.range.split(",") + if len(parts) != 2: + print("ERROR: --range must be 'start,end'", file=sys.stderr) + sys.exit(1) + try: + range_start, range_end = int(parts[0]), int(parts[1]) + except ValueError as e: + print(f"ERROR: --range values must be integers: {e}", file=sys.stderr) + sys.exit(1) + if range_start < 1 or range_end > args.parallel or range_start > range_end: + print("ERROR: range must satisfy 1 <= start <= end <= parallel", file=sys.stderr) + sys.exit(1) + + children = list(range(range_start, range_end + 1)) + num_children = len(children) + num_partitions = args.num_executors if args.num_executors else num_children + table_names = MAINTENANCE_TABLE_NAMES if args.update else SOURCE_TABLE_NAMES + + # Capture args for closure (avoid serializing argparse Namespace) + scale = args.scale + parallel = args.parallel + update = args.update + + spark = SparkSession.builder \ + .appName(f"NDS_DataGen_sf{scale}_p{parallel}") \ + .getOrCreate() + sc = spark.sparkContext + + print(f"=== NDS Spark Data Generation ===") + print(f" Scale: {scale} GB") + print(f" Parallel: {parallel}") + print(f" Range: {range_start}..{range_end} ({num_children} children)") + print(f" Output: {args.output_dir}") + print(f" Overwrite: {args.overwrite}") + print(f" Update: {update}") + print(f" Partitions: {num_partitions}") + + # Create RDD: one element per child index. + # Each Spark task runs dsdgen for its child, reads output line by line, + # and yields (table_name, line) pairs — no data accumulates in memory. + children_rdd = sc.parallelize(children, numSlices=num_partitions) + + all_data_rdd = children_rdd.flatMap( + lambda child: run_dsdgen_and_read(child, scale, parallel, update) + ) + + # Convert to DataFrame: [table_name: string, value: string] + # partitionBy("table_name") writes separate directories per table WITHOUT shuffle — + # each task independently splits its output into per-table files. + df = spark.createDataFrame(all_data_rdd, ["table_name", "value"]) + + # Determine write mode: + # --range → append (incremental generation across multiple spark-submit runs) + # --overwrite → overwrite (fresh start, wipe existing data) + # default → errorifexists (fail if output already exists) + if args.range: + write_mode = "append" + elif args.overwrite: + write_mode = "overwrite" + else: + write_mode = "errorifexists" + df.write.partitionBy("table_name").mode(write_mode).text(args.output_dir) + + # Rename Hive-style "table_name=xxx" dirs to plain "xxx" for NDS pipeline compat + rename_partition_dirs(spark, args.output_dir, table_names) + + print(f"=== Data generation complete: {args.output_dir} ===") + spark.stop() + + +if __name__ == "__main__": + main() diff --git a/nds/tpcds-gen/Dockerfile.dsdgen b/nds/tpcds-gen/Dockerfile.dsdgen new file mode 100644 index 00000000..f1c33717 --- /dev/null +++ b/nds/tpcds-gen/Dockerfile.dsdgen @@ -0,0 +1,8 @@ +FROM ubuntu:22.04 AS builder +RUN apt-get update && apt-get install -y build-essential byacc flex && rm -rf /var/lib/apt/lists/* +COPY target/tools /build/tools +WORKDIR /build/tools +RUN make clean && make LINUX_CFLAGS="-g -Wall -fcommon" +FROM ubuntu:22.04 +COPY --from=builder /build/tools /tools +CMD ["true"]