Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 0 additions & 15 deletions helm/spark-history-web-proxy/CHANGELOG.md

This file was deleted.

10 changes: 10 additions & 0 deletions helm/spark-web-proxy/CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
# Changelog

## 0.1.0 (2025-10-20)


### Features


* add helm chart ([d92a52d](https://github.com/OKDP/spark-web-proxy/commit/d92a52d068f9499201bf3ae9055fa9b19a6ad2ba))
* add support for jupyter ([4dfb78f](https://github.com/OKDP/spark-web-proxy/commit/4dfb78fff1506ad276d28b2ab0f092efffd06f02))
9 changes: 5 additions & 4 deletions internal/constants/constants.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,9 @@
package constants

const (
SparkHistoryBase = "/history"
SparkHistoryAppsEndpoint = "/api/v1/applications"
HealthzURI = "/healthz"
ReadinessURI = "/readiness"
SparkHistoryBase = "/history"
SparkAppsEndpoint = "/api/v1/applications"
HealthzURI = "/healthz"
ReadinessURI = "/readiness"
True = "true"
)
96 changes: 96 additions & 0 deletions internal/controllers/sparkapps_controller.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,96 @@
/*
* Copyright 2026 okdp.io
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package controllers

import (
"fmt"
"net/http"

"github.com/gin-gonic/gin"

"github.com/okdp/spark-web-proxy/internal/config"
sparkclient "github.com/okdp/spark-web-proxy/internal/discovery/resolvers/rest"
log "github.com/okdp/spark-web-proxy/internal/logging"
"github.com/okdp/spark-web-proxy/internal/model"
"github.com/okdp/spark-web-proxy/internal/utils"
)

type SparkAppsController struct {
sparkHistoryBaseURL string
}

func NewSparkAppsController(config *config.ApplicationConfig) *SparkAppsController {
return &SparkAppsController{
sparkHistoryBaseURL: config.GetSparkHistoryBaseURL(),
}
}

// HandleIncompleteApplications returns a unified list of Spark applications that are
// currently running or not yet fully available in Spark History.
//
// This handler addresses the gap where Spark applications may be visible in Kubernetes
// (as running pods) but are not yet listed in Spark History because event logs have not
// been persisted (e.g. delayed S3 uploads).
//
// The handler:
// 1. Fetches applications from Spark History Server
// 2. Discovers running Spark applications from Kubernetes
// 3. Queries each running application's Spark UI for live application metadata
// 4. Merges history and live applications into a single list, de-duplicated by app ID
//
// If an application exists in both Spark History and the live runtime, the Spark History
// representation is preferred.
//
// The response format is compatible with the Spark History Server API and can be consumed
// directly by the Spark UI.
func (r SparkAppsController) HandleIncompleteApplications(c *gin.Context) {

sparkHistoryClient, err := sparkclient.NewSparkRestClient(c.Request, r.sparkHistoryBaseURL)
if err != nil {
log.Error("Unable to create new spark history client: %+v", err)
c.JSON(http.StatusBadRequest, gin.H{"error": fmt.Sprintf("Unable to create new spark history client from upstream URL: %s", r.sparkHistoryBaseURL)})
return
}

historyApps, err := sparkHistoryClient.GetApplications()
if err != nil {
log.Error("Failed to list spark applications in spark history from upstream URL %s: %+v", r.sparkHistoryBaseURL, err)
c.JSON(http.StatusBadRequest, gin.H{"error": fmt.Sprintf("Failed to list spark applications from upstream URL: %s", r.sparkHistoryBaseURL)})
return
}

runningApps := model.GetRunningSparkApps()
uncompletedApps := make([]model.SparkApp, 0, len(runningApps))
for _, running := range runningApps {
sparkClient, err := sparkclient.NewSparkRestClient(c.Request, running.BaseURL)
if err != nil {
log.Warn("Unable to create new spark app client: %+v", err)
continue
}

app, err := sparkClient.GetApplicationInfo(running.AppID)
if err != nil {
log.Warn("Unable to fetch application info for %s: %v", running.AppID, err)
continue
}
uncompletedApps = append(uncompletedApps, *app)
}

running := utils.MergeByKey(*historyApps, uncompletedApps, func(a model.SparkApp) string { return a.ID })

c.JSON(http.StatusOK, running)
}
28 changes: 19 additions & 9 deletions internal/controllers/sparkhistory_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,14 +32,14 @@ import (
"github.com/okdp/spark-web-proxy/internal/spark"
)

type SparkkHistoryController struct {
type SparkHistoryController struct {
sparkHistoryBaseURL string
sparkHistoryBase string
sparkUIProxyBase string
}

func NewSparkkHistoryController(config *config.ApplicationConfig) *SparkkHistoryController {
controller := &SparkkHistoryController{
func NewSparkHistoryController(config *config.ApplicationConfig) *SparkHistoryController {
controller := &SparkHistoryController{
sparkHistoryBaseURL: config.GetSparkHistoryBaseURL(),
sparkHistoryBase: constants.SparkHistoryBase,
sparkUIProxyBase: strings.TrimSpace(config.Spark.UI.ProxyBase),
Expand All @@ -49,7 +49,7 @@ func NewSparkkHistoryController(config *config.ApplicationConfig) *SparkkHistory
return controller
}

func (r SparkkHistoryController) HandleHistoryApp(c *gin.Context) {
func (r SparkHistoryController) HandleHistoryApp(c *gin.Context) {

appID := c.Param("appID")
jobPath := c.Param("path")
Expand Down Expand Up @@ -83,19 +83,29 @@ func (r SparkkHistoryController) HandleHistoryApp(c *gin.Context) {
spark.ServeSparkHistory(c, upstreamURL, appID)
}

func (r SparkkHistoryController) HandleDefault(c *gin.Context) {
func (r SparkHistoryController) HandleDefault(c *gin.Context) {
r.serveSparkHistory(c, spark.ServeSparkHistory)
}

func (r SparkHistoryController) HandleIncompleteApps(c *gin.Context) {
r.serveSparkHistory(c, spark.ServeSparkHistoryIncompleteApps)
}

func (r SparkHistoryController) serveSparkHistory(c *gin.Context, serve func(*gin.Context, *url.URL, string)) {
path := c.Request.URL.Path

upstreamURL, err := url.Parse(fmt.Sprintf("%s%s", r.sparkHistoryBaseURL, path))
upstreamURL, err := url.Parse(r.sparkHistoryBaseURL + path)
if err != nil {
c.JSON(http.StatusBadRequest, gin.H{"error": fmt.Sprintf("Invalid upstream URL: %s", upstreamURL)})
c.JSON(http.StatusBadRequest, gin.H{
"error": fmt.Sprintf("Invalid upstream URL: %s", r.sparkHistoryBaseURL+path),
})
return
}

spark.ServeSparkHistory(c, upstreamURL, "")
serve(c, upstreamURL, "")
}

func (r SparkkHistoryController) redirectToSparkUI(c *gin.Context, appID string) {
func (r SparkHistoryController) redirectToSparkUI(c *gin.Context, appID string) {
c.Request.URL.Path = fmt.Sprintf("%s/%s/jobs/", r.sparkUIProxyBase, appID)
log.Debug("The application '%s' is running, redirect to spark ui '%s'", appID, c.Request.URL.String())
c.Redirect(http.StatusFound, c.Request.URL.String())
Expand Down
2 changes: 1 addition & 1 deletion internal/controllers/sparkui_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ func NewSparkUIController(config *config.ApplicationConfig) *SparkUIController {
}
}

func (r SparkUIController) HandleLiveApp(c *gin.Context) {
func (r SparkUIController) HandleRunningApp(c *gin.Context) {
appID := c.Param("appID")
sparkAppPath := strings.TrimPrefix(c.Param("path"), "/")

Expand Down
38 changes: 23 additions & 15 deletions internal/discovery/resolvers.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,38 +28,39 @@ import (
"github.com/okdp/spark-web-proxy/internal/utils"
)

func ResolveSparkAppFromPod(pod *corev1.Pod) (*model.CachedSparkApp, error) {
func ResolveSparkAppFromPod(pod *corev1.Pod) (*model.SparkAppInstance, error) {
sparkUIURL := fmt.Sprintf("http://%s:%d", pod.Status.PodIP, utils.GetSparkUIPort(pod))
sparkApp := &model.CachedSparkApp{
BaseURL: sparkUIURL,
PodName: pod.Name,
AppID: utils.GetSparkAppID(pod),
Namespace: pod.Namespace,
Status: string(pod.Status.Phase),
sparkApp := &model.SparkAppInstance{
BaseURL: sparkUIURL,
PodName: pod.Name,
AppID: utils.GetSparkAppID(pod),
Namespace: pod.Namespace,
Status: string(pod.Status.Phase),
StartTimeEpoch: podStartTimeEpoch(pod),
}

model.AddOrUpdateSparkApp(sparkApp)

return sparkApp, nil
}

func ResolveSparkAppFromHistory(request *http.Request, sparkHistoryBaseURL string, appID string) (*model.CachedSparkApp, error) {
sparkClient, err := sparkclient.NewSparkHistoryAppsClient(request, sparkHistoryBaseURL)
func ResolveSparkAppFromHistory(request *http.Request, sparkHistoryBaseURL string, appID string) (*model.SparkAppInstance, error) {
sparkClient, err := sparkclient.NewSparkRestClient(request, sparkHistoryBaseURL)
if err != nil {
log.Error("Unable to create new spark history client:", err)
log.Error("Unable to create new spark history client: %+v", err)
return nil, err
}
appInfo, err := sparkClient.GetApplicationInfo(appID)
if err != nil {
log.Error("Unable to get spark application '%s' status from spark history, %w", appID, err)
return &model.CachedSparkApp{
log.Error("Unable to get spark application '%s' status from spark history, %+v", appID, err)
return &model.SparkAppInstance{
Status: string(model.AppUnknown),
}, err
}
sparkAppEnv, err := sparkClient.GetEnvironment(appID)
if err != nil {
log.Error("Get the application '%s' environment properties from spark history: %w", appID, err)
return &model.CachedSparkApp{
log.Error("Failed to get the application '%s' environment properties from spark history: %+v", appID, err)
return &model.SparkAppInstance{
Status: string(model.AppUnknown),
}, err
}
Expand All @@ -70,7 +71,7 @@ func ResolveSparkAppFromHistory(request *http.Request, sparkHistoryBaseURL strin
sparkAppNamespace, _ := sparkAppEnv.GetProperty("spark.kubernetes.namespace")
sparkUIBaseURL := fmt.Sprintf("http://%s:%s", sparkDriverHost, sparkDriverPort)

sparkApp := &model.CachedSparkApp{
sparkApp := &model.SparkAppInstance{
BaseURL: sparkUIBaseURL,
PodName: sparkAppName,
AppID: sparkAppID,
Expand All @@ -85,3 +86,10 @@ func ResolveSparkAppFromHistory(request *http.Request, sparkHistoryBaseURL strin
}
return sparkApp, err
}

func podStartTimeEpoch(pod *corev1.Pod) int64 {
if pod.Status.StartTime != nil {
return pod.Status.StartTime.UnixMilli()
}
return -1
}
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ func (i SparkAppInformer) WatchNamespaceSparkApps(clientset *kubernetes.Clientse
})

if err != nil {
log.Error("Failed to add spark app event handler: %v", err)
log.Error("Failed to add spark app event handler: %+v", err)
return
}

Expand Down
19 changes: 14 additions & 5 deletions internal/discovery/resolvers/rest/client/rest_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,19 +25,23 @@ import (
log "github.com/okdp/spark-web-proxy/internal/logging"
)

type SparkHistoryClient struct {
type SparkClient struct {
Client *http.Client
Request *http.Request
}

func NewSparkHistoryClient(request *http.Request, sparkHistoryBaseURL string) (*SparkHistoryClient, error) {
func NewSparkClient(request *http.Request, sparkHistoryBaseURL string) (*SparkClient, error) {
jar, _ := cookiejar.New(nil)
apiURL := fmt.Sprintf("%s%s", sparkHistoryBaseURL, constants.SparkHistoryAppsEndpoint)
apiURL := fmt.Sprintf("%s%s", sparkHistoryBaseURL, constants.SparkAppsEndpoint)
req, err := http.NewRequest(request.Method, apiURL, nil)
if err != nil {
log.Error("failed to create request: %w", err)
log.Error("failed to create request: %+v", err)
return nil, err
}

// Forward query parameters
req.URL.RawQuery = request.URL.RawQuery

// Copy headers from the original request
for key, values := range request.Header {
for _, value := range values {
Expand All @@ -48,7 +52,12 @@ func NewSparkHistoryClient(request *http.Request, sparkHistoryBaseURL string) (*
for _, cookie := range request.Cookies() {
req.AddCookie(cookie)
}
return &SparkHistoryClient{

// Disable compression for API calls
req.Header.Del("Accept-Encoding")
req.Header.Set("Accept-Encoding", "identity")

return &SparkClient{
Client: &http.Client{Jar: jar},
Request: req,
}, nil
Expand Down
Loading