Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
__pycache__
.vscode
target
*.zip
2 changes: 1 addition & 1 deletion nds/PysparkBenchReport.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@
from typing import Callable
from pyspark.sql import SparkSession

import python_listener
from . import python_listener

class PysparkBenchReport:
"""Class to generate json summary report for a benchmark
Expand Down
4 changes: 4 additions & 0 deletions nds/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -363,6 +363,10 @@ time.csv \
--output_format parquet
```

### Power Run on Notebook
User can also [run POWER RUN on a notebook](configs_in_zip/readme.md) if user is on EKS, Databricks or other cloud
environment.

### Throughput Run

Throughput Run simulates the scenario that multiple query sessions are running simultaneously in
Expand Down
31 changes: 31 additions & 0 deletions nds/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
#
# SPDX-FileCopyrightText: Copyright (c) 2022 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
# SPDX-License-Identifier: Apache-2.0
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
# -----
#
# Certain portions of the contents of this file are derived from TPC-DS version 3.2.0
# (retrieved from www.tpc.org/tpc_documents_current_versions/current_specifications5.asp).
# Such portions are subject to copyrights held by Transaction Processing Performance Council (“TPC”)
# and licensed under the TPC EULA (a copy of which accompanies this file as “TPC EULA” and is also
# available at http://www.tpc.org/tpc_documents_current_versions/current_specifications5.asp) (the “TPC EULA”).
#
# You may not use this file except in compliance with the TPC EULA.
# DISCLAIMER: Portions of this file is derived from the TPC-DS Benchmark and as such any results
# obtained using this file are not comparable to published TPC-DS Benchmark results, as the results
# obtained from using this file do not comply with the TPC-DS Benchmark.
#
31 changes: 31 additions & 0 deletions nds/configs_in_zip/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
#
# SPDX-FileCopyrightText: Copyright (c) 2022 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
# SPDX-License-Identifier: Apache-2.0
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
# -----
#
# Certain portions of the contents of this file are derived from TPC-DS version 3.2.0
# (retrieved from www.tpc.org/tpc_documents_current_versions/current_specifications5.asp).
# Such portions are subject to copyrights held by Transaction Processing Performance Council (“TPC”)
# and licensed under the TPC EULA (a copy of which accompanies this file as “TPC EULA” and is also
# available at http://www.tpc.org/tpc_documents_current_versions/current_specifications5.asp) (the “TPC EULA”).
#
# You may not use this file except in compliance with the TPC EULA.
# DISCLAIMER: Portions of this file is derived from the TPC-DS Benchmark and as such any results
# obtained using this file are not comparable to published TPC-DS Benchmark results, as the results
# obtained from using this file do not comply with the TPC-DS Benchmark.
#
1 change: 1 addition & 0 deletions nds/configs_in_zip/parameter
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
s3://ndsv2-data/parquet_sf3000 time-123.csv --json_summary_folder 500 --keep_sc
16 changes: 16 additions & 0 deletions nds/configs_in_zip/query_0.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
-- start query 1 in stream 0 using template query96.tpl
select count(*)
from store_sales
,household_demographics
,time_dim, store
where ss_sold_time_sk = time_dim.t_time_sk
and ss_hdemo_sk = household_demographics.hd_demo_sk
and ss_store_sk = s_store_sk
and time_dim.t_hour = 8
and time_dim.t_minute >= 30
and household_demographics.hd_dep_count = 5
and store.s_store_name = 'ese'
order by count(*)
LIMIT 100;

-- end query 1 in stream 0 using template query96.tpl
57 changes: 57 additions & 0 deletions nds/configs_in_zip/readme.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
This document describes how to do [Power Run](../README.md#power-run) on Notebook.
The approach is binding all the needed parameters to a zip file, then execute the zip file on Notebook.
First config the files in [configs_in_zip](../configs_in_zip),
then compress the [nds folder](../../nds) into a zip,
Finally run the `nds_power.py` in the zip on Notebook.
# config parameter files
## parameter file
e.g.:
```
s3://ndsv2-data/parquet_sf1000 time-123.csv --output_prefix s3://chongg/test/tmp --json_summary_folder 500 --keep_sc
```
`s3://ndsv2-data/parquet_sf1000` is the value of `input_prefix` parameter
`time-123.csv` is the value of `time_log` parameter
`s3://chongg/test/tmp` is the value of `output_prefix` parameter
`500` is the value of `json_summary_folder` parameter
Note: please do not specify `property_file`, refer to the next section to config Spark property file.
For more details, refer to the parameters in the [Power Run](../README.md#power-run)
## spark.properties file
Specify the Spark properties.
e.g.:
```
spark.executor.memoryOverhead=512M
```
## query_0.sql
This file is the stream file.
Put all the queries into this file.
For how to generate stream file, refer to [README](../README.md).
# create NDS zip file and put it into S3
Do the following commands to zip all the nds folder into a zip file.
```
cd spark-rapids-benchmarks
zip -r nds.zip nds
aws s3 cp nds.zip s3://path/to/this/zip
```
# run nds_power on notebook
```
spark.sparkContext.addPyFile("s3://path/to/this/zip")
from nds import nds_power
nds_power.run_query_stream_in_zip()
```
# How to run another Power RUN with different parameters
Updating the zip file in S3 does not take effective because Spark have caches for the zip file.
You can put another zip file in to S3.
Do the following:
```
cd spark-rapids-benchmarks
cp -r nds <another_nds_name>
# update the parameter files
zip -r <another_nds_name>.zip <another_nds_name>
aws s3 cp <another_nds_name>.zip s3://<another nds zip path>
```
On Notebook run:
```
spark.sparkContext.addPyFile("s3://<another nds zip path>")
from <another_nds_name> import nds_power
nds_power.run_query_stream_in_zip()
```
Empty file.
2 changes: 1 addition & 1 deletion nds/nds_gen_query_stream.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@
import subprocess
import sys

from check import check_build, check_version, get_abs_path
from .check import check_build, check_version, get_abs_path

check_version()

Expand Down
101 changes: 75 additions & 26 deletions nds/nds_power.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,14 +34,15 @@
import csv
import os
import time
import uuid
from collections import OrderedDict
from pyspark.sql import SparkSession
from PysparkBenchReport import PysparkBenchReport
from .PysparkBenchReport import PysparkBenchReport
from pyspark.sql import DataFrame

from check import check_json_summary_folder, check_query_subset_exists, check_version
from nds_gen_query_stream import split_special_query
from nds_schema import get_schemas
from .check import check_json_summary_folder, check_query_subset_exists, check_version
from .nds_gen_query_stream import split_special_query
from .nds_schema import get_schemas

check_version()

Expand All @@ -57,6 +58,9 @@ def gen_sql_from_stream(query_stream_file_path):
"""
with open(query_stream_file_path, 'r') as f:
stream = f.read()
gen_sql_from_stream_text(stream)

def gen_sql_from_stream_text(stream):
all_queries = stream.split('-- start')[1:]
# split query in query14, query23, query24, query39
extended_queries = OrderedDict()
Expand Down Expand Up @@ -141,9 +145,9 @@ def is_column_part(char):
return char.isalpha() or char.isdigit() or char == '_'

def is_valid(column_name):
return len(column_name) > 0 and is_column_start(column_name[0]) and all(
len(column_name) > 0 and is_column_start(column_name[0]) and all(
[is_column_part(char) for char in column_name[1:]])

def make_valid(column_name):
# To simplify: replace all invalid char with '_'
valid_name = ''
Expand All @@ -157,7 +161,7 @@ def make_valid(column_name):
else:
valid_name += char
return valid_name

def deduplicate(column_names):
# In some queries like q35, it's possible to get columns with the same name. Append a number
# suffix to resolve this problem.
Expand All @@ -182,6 +186,7 @@ def get_query_subset(query_dict, subset):

def run_query_stream(input_prefix,
property_file,
spark_properties,
query_dict,
time_log_output_path,
extra_time_log_output_path,
Expand All @@ -198,7 +203,9 @@ def run_query_stream(input_prefix,
for easy accesibility. TempView Creation time is also recorded.

Args:
input_prefix (str): path of input data or warehouse if input_format is "iceberg" or hive_external=True.
input_prefix (str): path of input data or warehouse if input_format is "iceberg".
property_file (str): the path of spark property file
spark_properties (Dict): the property dict loaded from above property_file
query_dict (OrderedDict): ordered dict {query_name: query_content} of all TPC-DS queries runnable in Spark
time_log_output_path (str): path of the log that contains query execution time, both local
and HDFS path are supported.
Expand All @@ -216,26 +223,18 @@ def run_query_stream(input_prefix,
app_name = "NDS - " + list(query_dict.keys())[0]
else:
app_name = "NDS - Power Run"
# Execute Power Run or Specific query in Spark
# Execute Power Run or Specific query in Spark
# build Spark Session
session_builder = SparkSession.builder
if property_file:
spark_properties = load_properties(property_file)
for k,v in spark_properties.items():
session_builder = session_builder.config(k,v)
for k,v in spark_properties.items():
session_builder = session_builder.config(k,v)
if input_format == 'iceberg':
session_builder.config("spark.sql.catalog.spark_catalog.warehouse", input_prefix)
if input_format == 'delta' and not delta_unmanaged:
session_builder.config("spark.sql.warehouse.dir", input_prefix)
session_builder.enableHiveSupport()
if hive_external:
session_builder.enableHiveSupport()

session_builder.config("spark.sql.catalogImplementation", "hive")
spark_session = session_builder.appName(
app_name).getOrCreate()
if hive_external:
spark_session.catalog.setCurrentDatabase(input_prefix)

if input_format == 'delta' and delta_unmanaged:
# Register tables for Delta Lake. This is only needed for unmanaged tables.
execution_time_list = register_delta_tables(spark_session, input_prefix, execution_time_list)
Expand Down Expand Up @@ -304,14 +303,54 @@ def run_query_stream(input_prefix,
time_df.coalesce(1).write.csv(extra_time_log_output_path)

def load_properties(filename):
myvars = {}
with open(filename) as myfile:
for line in myfile:
name, var = line.partition("=")[::2]
myvars[name.strip()] = var.strip()
lines = myfile.readlines()
return load_properties_from_lines(lines)

def load_properties_from_lines(lines):
myvars = {}
for line in lines:
name, var = line.partition("=")[::2]
myvars[name.strip()] = var.strip()
return myvars

if __name__ == "__main__":
def run_query_stream_in_zip():
try:
import importlib.resources as pkg_resources
except ImportError:
# Try backported to PY<37 `importlib_resources`.
import importlib_resources as pkg_resources
from . import configs_in_zip # relative-import the *package* containing the templates
stream = pkg_resources.read_text(configs_in_zip, 'query_0.sql')
parameters = pkg_resources.read_text(configs_in_zip, 'parameter')
parser = get_parser(use_local_stream_file = False)
args = parser.parse_args(parameters.split())
query_dict = gen_sql_from_stream_text(stream)

if args.property_file is not None:
raise Exception("Please do not specify --property_file, " +
"instead update the spark.properies file in NDS zip file, " +
"path is nds/configs_in_zip/spark.properies")
spark_properties_text = pkg_resources.read_text(configs_in_zip, 'spark.properties')
spark_properties = load_properties_from_lines(spark_properties_text.splitlines())

run_query_stream(args.input_prefix,
'spark.properties',
spark_properties,
query_dict,
args.time_log,
args.extra_time_log,
args.sub_queries,
args.input_format,
not args.floats,
args.output_prefix,
args.output_format,
args.json_summary_folder + str(uuid.uuid1()), # append uuid to avoid folder conflict
args.delta_unmanaged,
args.keep_sc,
args.hive)

def get_parser(use_local_stream_file = True):
parser = parser = argparse.ArgumentParser()
parser.add_argument('input_prefix',
help='text to prepend to every input file path (e.g., "hdfs:///ds-generated-data"). ' +
Expand All @@ -320,7 +359,8 @@ def load_properties(filename):
'session name "spark_catalog" is supported now, customized catalog is not ' +
'yet supported. Note if this points to a Delta Lake table, the path must be ' +
'absolute. Issue: https://github.com/delta-io/delta/issues/555')
parser.add_argument('query_stream_file',
if use_local_stream_file:
parser.add_argument('query_stream_file',
help='query stream file that contains NDS queries in specific order')
parser.add_argument('time_log',
help='path to execution time log, only support local path.',
Expand Down Expand Up @@ -371,10 +411,19 @@ def load_properties(filename):
'in the stream file will be run. e.g. "query1,query2,query3". Note, use ' +
'"_part1" and "_part2" suffix for the following query names: ' +
'query14, query23, query24, query39. e.g. query14_part1, query39_part2')
return parser

if __name__ == "__main__":
parser = get_parser()
args = parser.parse_args()
query_dict = gen_sql_from_stream(args.query_stream_file)
if args.property_file:
spark_properties = load_properties(args.property_file)
else:
spark_properties = {}
run_query_stream(args.input_prefix,
args.property_file,
spark_properties,
query_dict,
args.time_log,
args.extra_time_log,
Expand Down
2 changes: 1 addition & 1 deletion nds/nds_transcode.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@

from pyspark.sql.types import *
from pyspark.sql.functions import col
from nds_schema import *
from .nds_schema import *

# Note the specific partitioning is applied when save the parquet data files.
TABLE_PARTITIONING = {
Expand Down