Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 13 additions & 0 deletions nds/Dockerfile.k8s-test
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
# Base image must be built first from your Spark distribution:
# cd $SPARK_HOME && ./bin/docker-image-tool.sh -t <tag> \
# -p kubernetes/dockerfiles/spark/bindings/python/Dockerfile build
# Then update the FROM line below to match your image name and tag.
ARG BASE_IMAGE=spark-py:spark-3.5.6
FROM ${BASE_IMAGE}

USER root

COPY nds_gen_data_spark.py /opt/spark/work-dir/
COPY tpcds-gen/target/lib/dsdgen.tar.gz /opt/spark/work-dir/

USER ${spark_uid}
52 changes: 52 additions & 0 deletions nds/Dockerfile.spark-k8s
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
# Standalone Spark + PySpark image for K8s deployment.
# Builds a complete Spark image from a Spark distribution directory.
# Build context must be $SPARK_HOME (the unpacked Spark distribution).
#
# Usage:
# cd $SPARK_HOME
# docker build -f <repo>/nds/Dockerfile.spark-k8s -t spark-custom:<tag> .
#
# For a simpler setup, use Dockerfile.k8s-test instead, which layers on top
# of the official pre-built Spark Python image.
FROM ubuntu:24.04

ARG spark_uid=185

# Install Java 17 + Python 3.12 (default on 24.04)
RUN apt-get update && \
apt-get install -y --no-install-recommends \
openjdk-17-jre-headless \
python3 \
python3-pip \
tini \
procps \
&& rm -rf /var/cache/apt/* /var/lib/apt/lists/*

# Set up Spark directory structure
ENV SPARK_HOME=/opt/spark
ENV PATH="${SPARK_HOME}/bin:${PATH}"
ENV PYTHONPATH="${SPARK_HOME}/python:${SPARK_HOME}/python/lib/py4j-src.zip"

# Copy Spark distribution
COPY jars ${SPARK_HOME}/jars
COPY bin ${SPARK_HOME}/bin
COPY sbin ${SPARK_HOME}/sbin
COPY python ${SPARK_HOME}/python
COPY kubernetes/dockerfiles/spark/entrypoint.sh /opt/
COPY kubernetes/dockerfiles/spark/decom.sh /opt/

RUN chmod +x /opt/entrypoint.sh /opt/decom.sh && \
mkdir -p ${SPARK_HOME}/work-dir && \
chmod g+w ${SPARK_HOME}/work-dir && \
# Create a stable symlink so PYTHONPATH works across py4j versions
PY4J_ZIP=$(ls ${SPARK_HOME}/python/lib/py4j-*-src.zip 2>/dev/null | head -1) && \
[ -n "$PY4J_ZIP" ] || { echo "ERROR: no py4j-*-src.zip found in ${SPARK_HOME}/python/lib/"; exit 1; } && \
ln -sf "$PY4J_ZIP" ${SPARK_HOME}/python/lib/py4j-src.zip && \
# Create spark user
useradd -u ${spark_uid} -r -g root -d ${SPARK_HOME} -s /sbin/nologin spark

WORKDIR ${SPARK_HOME}/work-dir

ENTRYPOINT [ "/opt/entrypoint.sh" ]

USER ${spark_uid}
241 changes: 241 additions & 0 deletions nds/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -138,6 +138,247 @@ Example command:
python nds_gen_data.py hdfs 100 100 /data/raw_sf100 --overwrite_output
```

### Generate data with Spark (recommended for K8s / no MapReduce)

`nds_gen_data_spark.py` is a PySpark application that replaces the Hadoop MapReduce
approach. It distributes the `dsdgen` binary across Spark executors via `--archives`,
runs data generation in parallel, and writes output to any Hadoop-compatible filesystem
(HDFS, S3, GCS, ABFS, or local). It works with any Spark cluster manager: **K8s, YARN,
Standalone, or local**.

**Prerequisites:** build tpcds-gen as before (`cd tpcds-gen && make`), then package the
dsdgen tools as a tar.gz archive:

```bash
cd tpcds-gen/target && tar czf lib/dsdgen.tar.gz tools/
```

#### Required prerequisites for K8s + HDFS runs

Before submitting `nds_gen_data_spark.py` to Spark on K8s, make sure all of the
following conditions are true:

1. Driver and executor images include Hadoop client configuration (`core-site.xml`
and `hdfs-site.xml`) so both sides can access the same HDFS cluster.
2. `output_dir` is an explicit HDFS path (for example, `hdfs:///data/raw_sf1000`
or `hdfs://<namenode-host>:8020/data/raw_sf1000`) instead of a local path.
3. The `dsdgen` archive is provided with `--archives` using a shell-quoted value
when needed (zsh users should quote `#`):
`--archives 'tpcds-gen/target/lib/dsdgen.tar.gz#dsdgen'`.
4. Python minor versions are consistent between driver and executor environments
Comment thread
wjxiz1992 marked this conversation as resolved.
(for example, both are Python 3.12).
5. For incremental generation (`--range`), do not rerun overlapping child ranges,
otherwise duplicate data will be produced.

Directly with spark-submit (K8s example):

```bash
spark-submit --master k8s://https://<k8s-api-server> \
--deploy-mode cluster \
--conf spark.kubernetes.container.image=<spark-image> \
--conf spark.executor.instances=10 \
--archives tpcds-gen/target/lib/dsdgen.tar.gz#dsdgen \
nds_gen_data_spark.py 1000 200 hdfs:///data/raw_sf1000 --overwrite
```

For incremental generation (split across multiple spark-submit runs):

```bash
# Run 1: children 1-100
spark-submit --archives tpcds-gen/target/lib/dsdgen.tar.gz#dsdgen \
nds_gen_data_spark.py 1000 200 hdfs:///data/raw_sf1000 --range 1,100

# Run 2: children 101-200
spark-submit --archives tpcds-gen/target/lib/dsdgen.tar.gz#dsdgen \
nds_gen_data_spark.py 1000 200 hdfs:///data/raw_sf1000 --range 101,200
```

Arguments:

```text
positional arguments:
scale Data scale factor in GB.
parallel Number of parallel dsdgen children (must be >= 2).
output_dir Output directory (hdfs://..., s3a://..., gs://..., or local path).

optional arguments:
--range START,END Generate only this child range (inclusive).
Uses append mode automatically for incremental runs.
--overwrite Overwrite existing output directory.
--update N Generate update/maintenance dataset N.
--num_executors N Hint for number of Spark partitions (default: one per child).
```

You can also use the provided `datagen_submit.template` with `spark-submit-template`:

```bash
./spark-submit-template datagen_submit.template \
nds_gen_data_spark.py 100 100 hdfs:///data/raw_sf100 --overwrite
```

**Note for zsh users:** quote the `--archives` value because `#` may be interpreted by shell.
For example: `--archives 'tpcds-gen/target/lib/dsdgen.tar.gz#dsdgen'`.

#### Cross-compiling dsdgen for Linux (K8s)

The dsdgen binary in the archive must be compiled for the same OS/architecture as the Spark
executor nodes (typically Linux x86_64 or Linux ARM64). If you build on macOS but run on
K8s (Linux), use the provided `tpcds-gen/Dockerfile.dsdgen` to build dsdgen inside a Linux
container:

```bash
cd tpcds-gen

# Build dsdgen for Linux (automatically handles gcc -fcommon for GCC 10+)
docker build -f Dockerfile.dsdgen -t dsdgen-builder .

# Extract the built tools directory
CID=$(docker create dsdgen-builder) && \
docker cp "$CID":/tools/ /tmp/dsdgen-tools-linux/ && \
docker rm "$CID"

# Package as dsdgen.tar.gz
mkdir -p target/lib
cd /tmp && mkdir -p dsdgen-package && cp -r dsdgen-tools-linux dsdgen-package/tools
cd dsdgen-package && tar czf <path-to-repo>/nds/tpcds-gen/target/lib/dsdgen.tar.gz tools/
```

#### Building the Spark K8s container image

Two Dockerfiles are provided for different use cases:

| Dockerfile | Base | Build context | Use case |
|---|---|---|---|
| `Dockerfile.k8s-test` | Official `spark-py` image | `nds/` | Quick setup — layers datagen scripts on top of a pre-built Spark image |
| `Dockerfile.spark-k8s` | `ubuntu:24.04` | `$SPARK_HOME` | Standalone build — constructs a full Spark + PySpark image from a Spark distribution (no pre-built image needed) |

**Option A: `Dockerfile.k8s-test` (recommended)**

Requires building the official Spark Python image first:

```bash
# Step 1: Build the Spark Python base image (from your Spark distribution)
cd $SPARK_HOME
./bin/docker-image-tool.sh -t <tag> \
-p kubernetes/dockerfiles/spark/bindings/python/Dockerfile build

# Step 2: Build the data generation image
cd <path-to-repo>/nds
docker build -f Dockerfile.k8s-test \
--build-arg BASE_IMAGE=spark-py:<tag> \
-t nds-datagen:<tag> .
```

**Option B: `Dockerfile.spark-k8s`**

Builds everything from scratch using a Spark distribution directory as build context.
No pre-built Spark image needed, but the build context **must** be `$SPARK_HOME`:

```bash
cd $SPARK_HOME
docker build -f <path-to-repo>/nds/Dockerfile.spark-k8s -t spark-custom:<tag> .
```

Then copy datagen scripts into the image:

```bash
cd <path-to-repo>/nds
docker build -f Dockerfile.k8s-test \
--build-arg BASE_IMAGE=spark-custom:<tag> \
-t nds-datagen:<tag> .
```

Both options produce an image with `/opt/spark/work-dir/nds_gen_data_spark.py` and
`/opt/spark/work-dir/dsdgen.tar.gz`, ready for `spark-submit` with:

```bash
--archives local:///opt/spark/work-dir/dsdgen.tar.gz#dsdgen
```

#### K8s local filesystem caveat

When `output_dir` is a local path (for example `/tmp/...`) and Spark executors run in K8s pods,
each pod writes to its own container filesystem by default. You will not see merged results on
the driver host unless a shared volume is configured.

For production runs, prefer remote/shared storage such as HDFS, S3, GCS, or ABFS.
For local smoke tests on minikube, use a PersistentVolumeClaim mounted into the pod.

#### Local K8s testing with minikube

To verify data generation on a local K8s cluster:

```bash
# 1. Start minikube (requires Docker runtime, e.g. Colima, Docker Desktop)
minikube start --driver=docker --cpus=4 --memory=6g

# 2. Load the container image into minikube
minikube image load nds-datagen:<tag>

# 3. Create Spark RBAC
kubectl create serviceaccount spark
kubectl create clusterrolebinding spark-role \
--clusterrole=edit --serviceaccount=default:spark

# 4. Create a PersistentVolumeClaim for output
kubectl apply -f - <<EOF
apiVersion: v1
kind: PersistentVolumeClaim
metadata:
name: nds-data
spec:
accessModes: [ReadWriteOnce]
resources:
requests:
storage: 2Gi
EOF
kubectl wait --for=jsonpath='{.status.phase}'=Bound pvc/nds-data --timeout=60s

# 5. Run a small-scale data generation (scale=1, parallel=2)
kubectl run nds-test --image=nds-datagen:<tag> --image-pull-policy=Never \
--restart=Never --overrides='{
"spec": {
"serviceAccountName": "spark",
"volumes": [{"name": "data", "persistentVolumeClaim": {"claimName": "nds-data"}}],
"containers": [{
"name": "nds-test",
"image": "nds-datagen:<tag>",
"imagePullPolicy": "Never",
"command": ["/opt/spark/bin/spark-submit", "--master", "local[2]",
"--archives", "/opt/spark/work-dir/dsdgen.tar.gz#dsdgen",
"/opt/spark/work-dir/nds_gen_data_spark.py",
"1", "2", "/data/nds_test", "--overwrite"],
"resources": {"requests": {"memory": "2Gi", "cpu": "2"}},
"volumeMounts": [{"name": "data", "mountPath": "/data"}]
}]
}
}'

# 6. Wait for completion and check logs
kubectl wait --for=condition=Ready=false pod/nds-test --timeout=300s
kubectl logs nds-test | tail -5
# Expected: "=== Data generation complete: /data/nds_test ==="

# 7. Verify output (25 TPC-DS tables)
kubectl run nds-check --image=nds-datagen:<tag> --image-pull-policy=Never \
--restart=Never --overrides='{
"spec": {
"volumes": [{"name": "data", "persistentVolumeClaim": {"claimName": "nds-data"}}],
"containers": [{
"name": "check", "image": "nds-datagen:<tag>", "imagePullPolicy": "Never",
"command": ["ls", "/data/nds_test/"],
"volumeMounts": [{"name": "data", "mountPath": "/data"}]
}]
}
}'
sleep 10 && kubectl logs nds-check
# Should list: call_center, catalog_page, ..., web_site (25 directories)

# 8. Cleanup
kubectl delete pod nds-test nds-check --ignore-not-found
kubectl delete pvc nds-data --ignore-not-found
```

### Convert CSV to Parquet or Other data sources

To do the data conversion, the `nds_transcode.py` need to be submitted as a Spark job. User can leverage
Expand Down
36 changes: 36 additions & 0 deletions nds/datagen_submit.template
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
#
# SPDX-FileCopyrightText: Copyright (c) 2024-2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
# SPDX-License-Identifier: Apache-2.0
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#

# Template for Spark-based TPC-DS data generation (nds_gen_data_spark.py).
# This replaces the Hadoop MapReduce approach and works on YARN, K8s, or Standalone.
#
# Usage:
# ./spark-submit-template datagen_submit.template \
# nds_gen_data_spark.py 100 100 hdfs:///data/raw_sf100 --overwrite

source base.template

# Path to the dsdgen archive (built by: cd tpcds-gen && make, then tar czf)
export DSDGEN_ARCHIVE=${DSDGEN_ARCHIVE:-tpcds-gen/target/lib/dsdgen.tar.gz}

export SPARK_CONF=("--master" "${SPARK_MASTER}"
"--deploy-mode" "client"
"--conf" "spark.driver.memory=${DRIVER_MEMORY}"
"--conf" "spark.executor.cores=${EXECUTOR_CORES}"
"--conf" "spark.executor.instances=${NUM_EXECUTORS}"
"--conf" "spark.executor.memory=${EXECUTOR_MEMORY}"
"--archives" "${DSDGEN_ARCHIVE}#dsdgen")
Loading