Add Spark-based data generation for K8s (replaces MapReduce)#248
Add Spark-based data generation for K8s (replaces MapReduce)#248wjxiz1992 wants to merge 7 commits intoNVIDIA:devfrom
Conversation
Signed-off-by: Allen Xu <allxu@nvidia.com>
Greptile SummaryThis PR adds K8s-native TPC-DS data generation support via a new PySpark-based approach ( Key additions:
Review findings: Confidence Score: 4/5
Important Files Changed
Sequence DiagramsequenceDiagram
participant User
participant Driver as Spark Driver<br/>(K8s Pod)
participant Executor as Spark Executor<br/>(K8s Pod)
participant FS as Filesystem<br/>(HDFS/S3/etc)
User->>Driver: spark-submit with --archives dsdgen.tar.gz#dsdgen
Driver->>Driver: Parse args (scale, parallel, range)
Driver->>Driver: Create RDD with child indices
Driver->>Executor: Distribute archive + tasks
Executor->>Executor: Extract dsdgen.tar.gz to SparkFiles
Executor->>Executor: run_dsdgen_and_read(child_index)
Executor->>Executor: subprocess.run(dsdgen -child N)
Executor->>Executor: Stream .dat files line-by-line
Executor->>FS: Write partitioned data (table_name=xxx/)
Executor-->>Driver: Task complete
Driver->>Driver: All tasks finished
Driver->>FS: rename_partition_dirs()
Driver->>FS: Rename table_name=xxx to xxx/
FS-->>Driver: Rename complete
Driver->>User: Data generation complete
Last reviewed commit: 3160717 |
There was a problem hiding this comment.
Pull request overview
This PR introduces a new Spark-based approach for TPC-DS data generation that works with Kubernetes and other Spark cluster managers, complementing the existing Hadoop MapReduce-based approach. The implementation distributes the dsdgen binary via Spark's --archives mechanism and generates data in parallel across Spark executors.
Changes:
- Added
nds_gen_data_spark.py: PySpark application that replaces MapReduce approach for data generation, supporting K8s, YARN, Standalone, and local modes - Updated README with documentation and examples for the new Spark-based data generation workflow
Reviewed changes
Copilot reviewed 2 out of 2 changed files in this pull request and generated 6 comments.
| File | Description |
|---|---|
| nds/nds_gen_data_spark.py | New PySpark script implementing distributed TPC-DS data generation via Spark executors with support for incremental generation and multiple filesystem backends |
| nds/README.md | Added comprehensive documentation section explaining Spark-based data generation with usage examples for K8s and other cluster managers |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
Signed-off-by: Allen Xu <allxu@nvidia.com>
Signed-off-by: Allen Xu <allxu@nvidia.com>
- Use dsdgen.tar.gz instead of dsdgen.jar for archive format consistency - Add return value checks in rename_partition_dirs to prevent silent failures - Use shutil.rmtree for robust temp directory cleanup - Add encoding="utf-8" to file open calls - Fix Partitions print alignment Made-with: Cursor
| if len(parts) != 2: | ||
| print("ERROR: --range must be 'start,end'", file=sys.stderr) | ||
| sys.exit(1) | ||
| range_start, range_end = int(parts[0]), int(parts[1]) |
There was a problem hiding this comment.
Missing exception handling for int() conversion - passing non-numeric values like --range abc,def will crash with unclear ValueError
| range_start, range_end = int(parts[0]), int(parts[1]) | |
| try: | |
| range_start, range_end = int(parts[0]), int(parts[1]) | |
| except ValueError as e: | |
| print(f"ERROR: --range values must be integers: {e}", file=sys.stderr) | |
| sys.exit(1) |
- Add tpcds-gen/Dockerfile.dsdgen for cross-compiling dsdgen on Linux - Add Dockerfile.k8s-test for building Spark K8s image with datagen - Remove k8s_datagen_smoketest.sh script - Update README with cross-compilation, image build, and minikube testing instructions Made-with: Cursor
| # Set up Spark directory structure | ||
| ENV SPARK_HOME=/opt/spark | ||
| ENV PATH="${SPARK_HOME}/bin:${PATH}" | ||
| ENV PYTHONPATH="${SPARK_HOME}/python:${SPARK_HOME}/python/lib/py4j-0.10.9.7-src.zip:${PYTHONPATH}" |
There was a problem hiding this comment.
Hardcoded py4j-0.10.9.7-src.zip version assumes Spark 3.5.x - will break with other Spark versions that bundle different py4j versions
- Add try/except for --range int() parsing with clear error message - Remove hardcoded py4j version in Dockerfile.spark-k8s using symlink - Add base image build instructions and ARG to Dockerfile.k8s-test - Add kubectl wait for PVC binding in README minikube guide Made-with: Cursor
- Add guard in Dockerfile.spark-k8s to fail build if no py4j zip found - Add usage instructions and build context note to Dockerfile.spark-k8s - Document both Dockerfile options (k8s-test vs spark-k8s) in README with comparison table and step-by-step instructions Made-with: Cursor
|
@copilot code review[agent] review |
Summary
This PR adds Kubernetes-native support for TPC-DS data generation by introducing a new PySpark-based approach (
nds_gen_data_spark.py) that replaces the Hadoop MapReduce method. The dsdgen binary is distributed to Spark executors via--archives, enabling parallel data generation on any Spark cluster manager (K8s, YARN, Standalone, local) without requiring YARN or MapReduce.Changes
Core
nds/nds_gen_data_spark.py— New PySpark application that:dsdgenbinary across Spark executors via--archivestable_name=xxxdirectories to plainxxxfor NDS pipeline compatibility--rangefor incremental generation,--overwrite, and--updatefor maintenance datasetsDockerfiles
nds/tpcds-gen/Dockerfile.dsdgen— Multi-stage build for cross-compilingdsdgenon Linux (handles GCC 10+-fcommonissue)nds/Dockerfile.k8s-test— Layers datagen scripts onto the official Spark Python image (recommended, usesARG BASE_IMAGE)nds/Dockerfile.spark-k8s— Standalone full Spark + PySpark image from a Spark distribution (build context =$SPARK_HOME)Configuration
nds/datagen_submit.template— spark-submit template for use withspark-submit-templateutilityDocumentation
nds/README.md— Comprehensive new sections:Dockerfile.dsdgenTesting
Verified on local minikube (Docker driver, ARM64):
Dockerfile.dsdgenDockerfile.k8s-testnds_gen_data_spark.pywithlocal[2]mode inside a K8s pod (scale=1, parallel=2)Migration Notes
nds_gen_data.py(MapReduce-based) is unchanged — this PR adds a parallel path, not a replacementnds_gen_data_spark.py