An experimental client for Apache Spark Connect written in TypeScript. This library allows JavaScript and TypeScript applications to interact with Apache Spark using the Spark Connect protocol over gRPC.
⚠️ Experimental: This library is in active development. APIs may change without notice.
📚 API Documentation - Comprehensive API reference with examples
The API documentation is automatically generated from JSDoc comments and TypeScript definitions.
- 🚀 TypeScript-first: Full TypeScript support with comprehensive type definitions
- 🔗 Spark Connect Protocol: Uses the official Spark Connect gRPC protocol
- 📊 DataFrame API: Familiar DataFrame operations (select, filter, groupBy, join, etc.)
- 💾 Multiple Data Formats: Support for CSV, JSON, Parquet, ORC, and more
- 🔍 SQL Support: Execute SQL queries directly
- 📝 Catalog API: Metadata operations (databases, tables, functions)
- ⚡ Apache Arrow: Efficient data transfer using Apache Arrow
- 🎯 Type-safe: Built-in type system for Spark data types
- Node.js: v20 or higher
- Apache Spark: 4.0+ with Spark Connect enabled
- For development/testing: Docker is used to run Spark Connect server
Install from npm:
npm install spark.jsOr clone the repository for development:
git clone https://github.com/yaooqinn/spark.js.git
cd spark.js
npm installHere's a minimal example to get started:
import { SparkSession } from 'spark.js';
async function main() {
// Create a SparkSession connected to a Spark Connect server
const spark = await SparkSession.builder()
.appName('MyApp')
.remote('sc://localhost:15002')
.getOrCreate();
// Create a simple DataFrame
const df = spark.range(1, 100)
.selectExpr('id', 'id * 2 as doubled');
// Show the first 10 rows
await df.show(10);
// Perform aggregation
const count = await df.count();
console.log(`Total rows: ${count}`);
}
main().catch(console.error);The SparkSession is the entry point for all Spark operations:
import { SparkSession } from 'spark.js';
// Connect to a remote Spark Connect server
const spark = await SparkSession.builder()
.appName('MyApplication')
.remote('sc://localhost:15002') // Spark Connect endpoint
.getOrCreate();
// Get Spark version
const version = await spark.version();
console.log(`Spark version: ${version}`);Use the DataFrameReader to load data from various sources:
// Read CSV file
const csvDF = spark.read
.option('header', true)
.option('delimiter', ';')
.csv('path/to/people.csv');
// Read JSON file
const jsonDF = spark.read.json('path/to/data.json');
// Read Parquet file
const parquetDF = spark.read.parquet('path/to/data.parquet');
// Read with schema inference
const df = spark.read
.option('inferSchema', true)
.option('header', true)
.csv('data.csv');Perform transformations and actions on DataFrames:
import { functions } from 'spark.js';
const { col, lit } = functions;
// Select columns
const selected = df.select('name', 'age');
// Filter rows
const filtered = df.filter(col('age').gt(21));
// Add/modify columns
const transformed = df
.withColumn('age_plus_one', col('age').plus(lit(1)))
.withColumnRenamed('name', 'full_name');
// Group by and aggregate
const aggregated = df
.groupBy('department')
.agg({ salary: 'avg', age: 'max' });
// Join DataFrames
const joined = df1.join(df2, df1.col('id').equalTo(df2.col('user_id')), 'inner');
// Sort
const sorted = df.orderBy(col('age').desc());
// Limit
const limited = df.limit(100);
// Collect results
const rows = await df.collect();
rows.forEach(row => console.log(row.toJSON()));Execute SQL queries directly:
// Register DataFrame as temporary view
df.createOrReplaceTempView('people');
// Execute SQL
const resultDF = await spark.sql(`
SELECT name, age, department
FROM people
WHERE age > 30
ORDER BY age DESC
`);
await resultDF.show();Save DataFrames to various formats:
// Write as Parquet (default mode: error if exists)
await df.write.parquet('output/data.parquet');
// Write as CSV with options
await df.write
.option('header', true)
.option('delimiter', '|')
.mode('overwrite')
.csv('output/data.csv');
// Write as JSON
await df.write
.mode('append')
.json('output/data.json');
// Partition by column
await df.write
.partitionBy('year', 'month')
.parquet('output/partitioned_data');
// V2 Writer API (advanced)
await df.writeTo('my_table')
.using('parquet')
.partitionBy('year', 'month')
.tableProperty('compression', 'snappy')
.create();See guides/DataFrameWriterV2.md for more V2 Writer examples.
Explore metadata using the Catalog API:
// List databases
const databases = await spark.catalog.listDatabases();
// List tables in current database
const tables = await spark.catalog.listTables();
// List columns of a table
const columns = await spark.catalog.listColumns('my_table');
// Check if table exists
const exists = await spark.catalog.tableExists('my_table');
// Get current database
const currentDB = await spark.catalog.currentDatabase();SparkSession: Main entry point for Spark functionalityDataFrame: Distributed collection of data organized into named columnsColumn: Expression on a DataFrame columnRow: Represents a row of dataDataFrameReader: Interface for loading dataDataFrameWriter: Interface for saving dataDataFrameWriterV2: V2 writer with advanced optionsRuntimeConfig: Runtime configuration interfaceCatalog: Metadata and catalog operations
Import SQL functions from the functions module:
import { functions } from 'spark.js';
const { col, lit, sum, avg, max, min, count, when, concat, upper } = functions;
const df = spark.read.csv('data.csv');
df.select(
col('name'),
upper(col('name')).as('upper_name'),
when(col('age').gt(18), lit('adult')).otherwise(lit('minor')).as('category')
);See guides/STATISTICAL_FUNCTIONS.md for statistical functions.
Define schemas using the type system:
import { DataTypes, StructType, StructField } from 'spark.js';
const schema = new StructType([
new StructField('name', DataTypes.StringType, false),
new StructField('age', DataTypes.IntegerType, true),
new StructField('salary', DataTypes.DoubleType, true)
]);
const df = spark.createDataFrame(data, schema);Configure the connection to Spark Connect server:
const spark = await SparkSession.builder()
.appName('MyApp')
.remote('sc://host:port') // Default: sc://localhost:15002
.getOrCreate();Set Spark configuration at runtime:
// Set configuration
await spark.conf.set('spark.sql.shuffle.partitions', '200');
// Get configuration
const value = await spark.conf.get('spark.sql.shuffle.partitions');
// Get with default
const valueOrDefault = await spark.conf.get('my.config', 'default_value');Logging is configured in log4js.json. Logs are written to both console and logs/ directory.
For contributors, comprehensive documentation is available in the Contributor Guide:
- Getting Started - Set up your development environment
- Code Style Guide - Coding conventions and best practices
- Build and Test - Building, testing, and running the project
- IDE Setup - Recommended IDE configurations
- Submitting Changes - How to submit pull requests
# Clone and install
git clone https://github.com/yaooqinn/spark.js.git
cd spark.js
npm install
# Start Spark Connect server for testing
docker build -t scs .github/docker
docker run --name sparkconnect -p 15002:15002 -d scs
# Run tests and linting
npm test
npm run lint
# Clean up
docker stop sparkconnect && docker rm sparkconnectFor detailed instructions, see the Contributor Guide.
spark.js/
├── src/
│ ├── gen/ # Generated protobuf code (DO NOT EDIT)
│ └── org/apache/spark/
│ ├── sql/ # Main API implementation
│ │ ├── SparkSession.ts # Entry point
│ │ ├── DataFrame.ts # DataFrame API
│ │ ├── functions.ts # SQL functions
│ │ ├── types/ # Type system
│ │ ├── catalog/ # Catalog API
│ │ ├── grpc/ # gRPC client
│ │ └── proto/ # Protocol builders
│ └── storage/ # Storage levels
├── tests/ # Test suites
├── example/ # Example applications
├── docs/ # Additional documentation
├── protobuf/ # Protocol buffer definitions
├── .github/
│ ├── workflows/ # CI/CD workflows
│ └── docker/ # Spark Connect Docker setup
├── package.json # Dependencies and scripts
├── tsconfig.json # TypeScript configuration
├── jest.config.js # Jest test configuration
├── eslint.config.mjs # ESLint configuration
└── buf.gen.yaml # Buf protobuf generation config
The example/ directory contains several runnable examples:
- Pi.ts: Monte Carlo Pi estimation
- CSVExample.ts: Reading and writing CSV files
- ParquetExample.ts: Parquet file operations
- JsonExample.ts: JSON file operations
- JoinExample.ts: DataFrame join operations
- CatalogExample.ts: Catalog API usage
- StatisticalFunctionsExample.ts: Statistical functions
To run an example:
# Make sure Spark Connect server is running
npx ts-node example/org/apache/spark/sql/example/Pi.tsContributions are welcome! Please read the Contributor Guide for detailed information on:
- Setting up your development environment
- Code style and conventions
- Building and testing
- Submitting pull requests
- Fork the repository and create a feature branch
- Follow the Code Style Guide
- Add tests for new functionality
- Run checks:
npm run lintandnpm test - Submit a pull request with a clear description
See Submitting Changes for detailed instructions.
- For minor changes or some features associated with certain classes, SEARCH 'TODO'
- Support Retry / Reattachable execution
- Support Checkpoint for DataFrame
- Support DataFrameNaFunctions
- Support User-Defined Functions (UDF)
- UDF registration via
spark.udf.register() - Inline UDFs via
udf()function - Java UDF registration via
spark.udf.registerJava() - UDAF (User-Defined Aggregate Functions)
- UDTF (User-Defined Table Functions)
- UDF registration via
- Support DataFrame Join
- Support UserDefinedType
- UserDefinedType declaration
- UserDefinedType & Proto bidirectional conversions
- UserDefinedType & Arrow bidirectional conversions
- Maybe optimize the logging framework
This project is licensed under the Apache License 2.0.
Note: This is an experimental project. For production use, please refer to the official Apache Spark documentation and consider using official Spark clients.