From c50aeb845e40879b4b770a1a5ee9fa4e071c63c4 Mon Sep 17 00:00:00 2001 From: iizitounene Date: Fri, 9 Jan 2026 11:55:49 +0100 Subject: [PATCH 1/2] refactor: refactoring --- helm/spark-history-web-proxy/CHANGELOG.md | 15 --------------- helm/spark-web-proxy/CHANGELOG.md | 10 ++++++++++ 2 files changed, 10 insertions(+), 15 deletions(-) delete mode 100644 helm/spark-history-web-proxy/CHANGELOG.md create mode 100644 helm/spark-web-proxy/CHANGELOG.md diff --git a/helm/spark-history-web-proxy/CHANGELOG.md b/helm/spark-history-web-proxy/CHANGELOG.md deleted file mode 100644 index 1193636..0000000 --- a/helm/spark-history-web-proxy/CHANGELOG.md +++ /dev/null @@ -1,15 +0,0 @@ -# Changelog - -## 0.1.0 (2025-10-17) - - -### Features - -* add helm chart ([d92a52d](https://github.com/OKDP/spark-web-proxy/commit/d92a52d068f9499201bf3ae9055fa9b19a6ad2ba)) -* add support for jupyter ([4dfb78f](https://github.com/OKDP/spark-web-proxy/commit/4dfb78fff1506ad276d28b2ab0f092efffd06f02)) - - -### Documentation - -* add helm docs ([a849e09](https://github.com/OKDP/spark-web-proxy/commit/a849e097f4de0dc1649b4b039827805a550c9d53)) -* update readme ([010af7d](https://github.com/OKDP/spark-web-proxy/commit/010af7d1fd5f7009f790fa792f76d39b89a317cc)) diff --git a/helm/spark-web-proxy/CHANGELOG.md b/helm/spark-web-proxy/CHANGELOG.md new file mode 100644 index 0000000..0d9f656 --- /dev/null +++ b/helm/spark-web-proxy/CHANGELOG.md @@ -0,0 +1,10 @@ +# Changelog + +## 0.1.0 (2025-10-20) + + +### Features + + +* add helm chart ([d92a52d](https://github.com/OKDP/spark-web-proxy/commit/d92a52d068f9499201bf3ae9055fa9b19a6ad2ba)) +* add support for jupyter ([4dfb78f](https://github.com/OKDP/spark-web-proxy/commit/4dfb78fff1506ad276d28b2ab0f092efffd06f02)) From e975cccd367c88f923a18ddc5347706a7f2ff854 Mon Sep 17 00:00:00 2001 From: iizitounene Date: Fri, 9 Jan 2026 12:13:18 +0100 Subject: [PATCH 2/2] feat(history): show running Spark applications without waiting event logs to be available #11 --- internal/constants/constants.go | 9 +- internal/controllers/sparkapps_controller.go | 96 ++++++++ .../controllers/sparkhistory_controller.go | 28 ++- internal/controllers/sparkui_controller.go | 2 +- internal/discovery/resolvers.go | 38 +-- .../k8s/informers/sparkapp_informer.go | 2 +- .../resolvers/rest/client/rest_client.go | 19 +- .../discovery/resolvers/rest/sparkapp_rest.go | 80 +++--- internal/model/history_sparkapp.go | 59 ----- internal/model/sparkapp.go | 59 +++++ ...ached_sparkapp.go => sparkapp_instance.go} | 48 ++-- ...kapp_test.go => sparkapp_instance_test.go} | 2 +- ...tory_sparkapp_test.go => sparkapp_test.go} | 22 +- internal/server/server.go | 49 +++- internal/spark/default_handler.go | 6 +- internal/spark/incomplete_apps_handler.go | 232 ++++++++++++++++++ internal/spark/proxy/handler.go | 41 ++++ internal/utils/sparkutils.go | 54 ++++ 18 files changed, 656 insertions(+), 190 deletions(-) create mode 100644 internal/controllers/sparkapps_controller.go delete mode 100644 internal/model/history_sparkapp.go create mode 100644 internal/model/sparkapp.go rename internal/model/{cached_sparkapp.go => sparkapp_instance.go} (76%) rename internal/model/{cached_sparkapp_test.go => sparkapp_instance_test.go} (97%) rename internal/model/{history_sparkapp_test.go => sparkapp_test.go} (83%) create mode 100644 internal/spark/incomplete_apps_handler.go diff --git a/internal/constants/constants.go b/internal/constants/constants.go index 0410898..0539b54 100644 --- a/internal/constants/constants.go +++ b/internal/constants/constants.go @@ -17,8 +17,9 @@ package constants const ( - SparkHistoryBase = "/history" - SparkHistoryAppsEndpoint = "/api/v1/applications" - HealthzURI = "/healthz" - ReadinessURI = "/readiness" + SparkHistoryBase = "/history" + SparkAppsEndpoint = "/api/v1/applications" + HealthzURI = "/healthz" + ReadinessURI = "/readiness" + True = "true" ) diff --git a/internal/controllers/sparkapps_controller.go b/internal/controllers/sparkapps_controller.go new file mode 100644 index 0000000..a56ba96 --- /dev/null +++ b/internal/controllers/sparkapps_controller.go @@ -0,0 +1,96 @@ +/* + * Copyright 2026 okdp.io + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package controllers + +import ( + "fmt" + "net/http" + + "github.com/gin-gonic/gin" + + "github.com/okdp/spark-web-proxy/internal/config" + sparkclient "github.com/okdp/spark-web-proxy/internal/discovery/resolvers/rest" + log "github.com/okdp/spark-web-proxy/internal/logging" + "github.com/okdp/spark-web-proxy/internal/model" + "github.com/okdp/spark-web-proxy/internal/utils" +) + +type SparkAppsController struct { + sparkHistoryBaseURL string +} + +func NewSparkAppsController(config *config.ApplicationConfig) *SparkAppsController { + return &SparkAppsController{ + sparkHistoryBaseURL: config.GetSparkHistoryBaseURL(), + } +} + +// HandleIncompleteApplications returns a unified list of Spark applications that are +// currently running or not yet fully available in Spark History. +// +// This handler addresses the gap where Spark applications may be visible in Kubernetes +// (as running pods) but are not yet listed in Spark History because event logs have not +// been persisted (e.g. delayed S3 uploads). +// +// The handler: +// 1. Fetches applications from Spark History Server +// 2. Discovers running Spark applications from Kubernetes +// 3. Queries each running application's Spark UI for live application metadata +// 4. Merges history and live applications into a single list, de-duplicated by app ID +// +// If an application exists in both Spark History and the live runtime, the Spark History +// representation is preferred. +// +// The response format is compatible with the Spark History Server API and can be consumed +// directly by the Spark UI. +func (r SparkAppsController) HandleIncompleteApplications(c *gin.Context) { + + sparkHistoryClient, err := sparkclient.NewSparkRestClient(c.Request, r.sparkHistoryBaseURL) + if err != nil { + log.Error("Unable to create new spark history client: %+v", err) + c.JSON(http.StatusBadRequest, gin.H{"error": fmt.Sprintf("Unable to create new spark history client from upstream URL: %s", r.sparkHistoryBaseURL)}) + return + } + + historyApps, err := sparkHistoryClient.GetApplications() + if err != nil { + log.Error("Failed to list spark applications in spark history from upstream URL %s: %+v", r.sparkHistoryBaseURL, err) + c.JSON(http.StatusBadRequest, gin.H{"error": fmt.Sprintf("Failed to list spark applications from upstream URL: %s", r.sparkHistoryBaseURL)}) + return + } + + runningApps := model.GetRunningSparkApps() + uncompletedApps := make([]model.SparkApp, 0, len(runningApps)) + for _, running := range runningApps { + sparkClient, err := sparkclient.NewSparkRestClient(c.Request, running.BaseURL) + if err != nil { + log.Warn("Unable to create new spark app client: %+v", err) + continue + } + + app, err := sparkClient.GetApplicationInfo(running.AppID) + if err != nil { + log.Warn("Unable to fetch application info for %s: %v", running.AppID, err) + continue + } + uncompletedApps = append(uncompletedApps, *app) + } + + running := utils.MergeByKey(*historyApps, uncompletedApps, func(a model.SparkApp) string { return a.ID }) + + c.JSON(http.StatusOK, running) +} diff --git a/internal/controllers/sparkhistory_controller.go b/internal/controllers/sparkhistory_controller.go index 1b3c79e..2d4fecb 100644 --- a/internal/controllers/sparkhistory_controller.go +++ b/internal/controllers/sparkhistory_controller.go @@ -32,14 +32,14 @@ import ( "github.com/okdp/spark-web-proxy/internal/spark" ) -type SparkkHistoryController struct { +type SparkHistoryController struct { sparkHistoryBaseURL string sparkHistoryBase string sparkUIProxyBase string } -func NewSparkkHistoryController(config *config.ApplicationConfig) *SparkkHistoryController { - controller := &SparkkHistoryController{ +func NewSparkHistoryController(config *config.ApplicationConfig) *SparkHistoryController { + controller := &SparkHistoryController{ sparkHistoryBaseURL: config.GetSparkHistoryBaseURL(), sparkHistoryBase: constants.SparkHistoryBase, sparkUIProxyBase: strings.TrimSpace(config.Spark.UI.ProxyBase), @@ -49,7 +49,7 @@ func NewSparkkHistoryController(config *config.ApplicationConfig) *SparkkHistory return controller } -func (r SparkkHistoryController) HandleHistoryApp(c *gin.Context) { +func (r SparkHistoryController) HandleHistoryApp(c *gin.Context) { appID := c.Param("appID") jobPath := c.Param("path") @@ -83,19 +83,29 @@ func (r SparkkHistoryController) HandleHistoryApp(c *gin.Context) { spark.ServeSparkHistory(c, upstreamURL, appID) } -func (r SparkkHistoryController) HandleDefault(c *gin.Context) { +func (r SparkHistoryController) HandleDefault(c *gin.Context) { + r.serveSparkHistory(c, spark.ServeSparkHistory) +} + +func (r SparkHistoryController) HandleIncompleteApps(c *gin.Context) { + r.serveSparkHistory(c, spark.ServeSparkHistoryIncompleteApps) +} + +func (r SparkHistoryController) serveSparkHistory(c *gin.Context, serve func(*gin.Context, *url.URL, string)) { path := c.Request.URL.Path - upstreamURL, err := url.Parse(fmt.Sprintf("%s%s", r.sparkHistoryBaseURL, path)) + upstreamURL, err := url.Parse(r.sparkHistoryBaseURL + path) if err != nil { - c.JSON(http.StatusBadRequest, gin.H{"error": fmt.Sprintf("Invalid upstream URL: %s", upstreamURL)}) + c.JSON(http.StatusBadRequest, gin.H{ + "error": fmt.Sprintf("Invalid upstream URL: %s", r.sparkHistoryBaseURL+path), + }) return } - spark.ServeSparkHistory(c, upstreamURL, "") + serve(c, upstreamURL, "") } -func (r SparkkHistoryController) redirectToSparkUI(c *gin.Context, appID string) { +func (r SparkHistoryController) redirectToSparkUI(c *gin.Context, appID string) { c.Request.URL.Path = fmt.Sprintf("%s/%s/jobs/", r.sparkUIProxyBase, appID) log.Debug("The application '%s' is running, redirect to spark ui '%s'", appID, c.Request.URL.String()) c.Redirect(http.StatusFound, c.Request.URL.String()) diff --git a/internal/controllers/sparkui_controller.go b/internal/controllers/sparkui_controller.go index 8accf4d..4a00365 100644 --- a/internal/controllers/sparkui_controller.go +++ b/internal/controllers/sparkui_controller.go @@ -46,7 +46,7 @@ func NewSparkUIController(config *config.ApplicationConfig) *SparkUIController { } } -func (r SparkUIController) HandleLiveApp(c *gin.Context) { +func (r SparkUIController) HandleRunningApp(c *gin.Context) { appID := c.Param("appID") sparkAppPath := strings.TrimPrefix(c.Param("path"), "/") diff --git a/internal/discovery/resolvers.go b/internal/discovery/resolvers.go index 117865b..b6b3069 100644 --- a/internal/discovery/resolvers.go +++ b/internal/discovery/resolvers.go @@ -28,14 +28,15 @@ import ( "github.com/okdp/spark-web-proxy/internal/utils" ) -func ResolveSparkAppFromPod(pod *corev1.Pod) (*model.CachedSparkApp, error) { +func ResolveSparkAppFromPod(pod *corev1.Pod) (*model.SparkAppInstance, error) { sparkUIURL := fmt.Sprintf("http://%s:%d", pod.Status.PodIP, utils.GetSparkUIPort(pod)) - sparkApp := &model.CachedSparkApp{ - BaseURL: sparkUIURL, - PodName: pod.Name, - AppID: utils.GetSparkAppID(pod), - Namespace: pod.Namespace, - Status: string(pod.Status.Phase), + sparkApp := &model.SparkAppInstance{ + BaseURL: sparkUIURL, + PodName: pod.Name, + AppID: utils.GetSparkAppID(pod), + Namespace: pod.Namespace, + Status: string(pod.Status.Phase), + StartTimeEpoch: podStartTimeEpoch(pod), } model.AddOrUpdateSparkApp(sparkApp) @@ -43,23 +44,23 @@ func ResolveSparkAppFromPod(pod *corev1.Pod) (*model.CachedSparkApp, error) { return sparkApp, nil } -func ResolveSparkAppFromHistory(request *http.Request, sparkHistoryBaseURL string, appID string) (*model.CachedSparkApp, error) { - sparkClient, err := sparkclient.NewSparkHistoryAppsClient(request, sparkHistoryBaseURL) +func ResolveSparkAppFromHistory(request *http.Request, sparkHistoryBaseURL string, appID string) (*model.SparkAppInstance, error) { + sparkClient, err := sparkclient.NewSparkRestClient(request, sparkHistoryBaseURL) if err != nil { - log.Error("Unable to create new spark history client:", err) + log.Error("Unable to create new spark history client: %+v", err) return nil, err } appInfo, err := sparkClient.GetApplicationInfo(appID) if err != nil { - log.Error("Unable to get spark application '%s' status from spark history, %w", appID, err) - return &model.CachedSparkApp{ + log.Error("Unable to get spark application '%s' status from spark history, %+v", appID, err) + return &model.SparkAppInstance{ Status: string(model.AppUnknown), }, err } sparkAppEnv, err := sparkClient.GetEnvironment(appID) if err != nil { - log.Error("Get the application '%s' environment properties from spark history: %w", appID, err) - return &model.CachedSparkApp{ + log.Error("Failed to get the application '%s' environment properties from spark history: %+v", appID, err) + return &model.SparkAppInstance{ Status: string(model.AppUnknown), }, err } @@ -70,7 +71,7 @@ func ResolveSparkAppFromHistory(request *http.Request, sparkHistoryBaseURL strin sparkAppNamespace, _ := sparkAppEnv.GetProperty("spark.kubernetes.namespace") sparkUIBaseURL := fmt.Sprintf("http://%s:%s", sparkDriverHost, sparkDriverPort) - sparkApp := &model.CachedSparkApp{ + sparkApp := &model.SparkAppInstance{ BaseURL: sparkUIBaseURL, PodName: sparkAppName, AppID: sparkAppID, @@ -85,3 +86,10 @@ func ResolveSparkAppFromHistory(request *http.Request, sparkHistoryBaseURL strin } return sparkApp, err } + +func podStartTimeEpoch(pod *corev1.Pod) int64 { + if pod.Status.StartTime != nil { + return pod.Status.StartTime.UnixMilli() + } + return -1 +} diff --git a/internal/discovery/resolvers/k8s/informers/sparkapp_informer.go b/internal/discovery/resolvers/k8s/informers/sparkapp_informer.go index cc7dc4c..f1312c3 100644 --- a/internal/discovery/resolvers/k8s/informers/sparkapp_informer.go +++ b/internal/discovery/resolvers/k8s/informers/sparkapp_informer.go @@ -84,7 +84,7 @@ func (i SparkAppInformer) WatchNamespaceSparkApps(clientset *kubernetes.Clientse }) if err != nil { - log.Error("Failed to add spark app event handler: %v", err) + log.Error("Failed to add spark app event handler: %+v", err) return } diff --git a/internal/discovery/resolvers/rest/client/rest_client.go b/internal/discovery/resolvers/rest/client/rest_client.go index 7826c77..e7aa7f1 100644 --- a/internal/discovery/resolvers/rest/client/rest_client.go +++ b/internal/discovery/resolvers/rest/client/rest_client.go @@ -25,19 +25,23 @@ import ( log "github.com/okdp/spark-web-proxy/internal/logging" ) -type SparkHistoryClient struct { +type SparkClient struct { Client *http.Client Request *http.Request } -func NewSparkHistoryClient(request *http.Request, sparkHistoryBaseURL string) (*SparkHistoryClient, error) { +func NewSparkClient(request *http.Request, sparkHistoryBaseURL string) (*SparkClient, error) { jar, _ := cookiejar.New(nil) - apiURL := fmt.Sprintf("%s%s", sparkHistoryBaseURL, constants.SparkHistoryAppsEndpoint) + apiURL := fmt.Sprintf("%s%s", sparkHistoryBaseURL, constants.SparkAppsEndpoint) req, err := http.NewRequest(request.Method, apiURL, nil) if err != nil { - log.Error("failed to create request: %w", err) + log.Error("failed to create request: %+v", err) return nil, err } + + // Forward query parameters + req.URL.RawQuery = request.URL.RawQuery + // Copy headers from the original request for key, values := range request.Header { for _, value := range values { @@ -48,7 +52,12 @@ func NewSparkHistoryClient(request *http.Request, sparkHistoryBaseURL string) (* for _, cookie := range request.Cookies() { req.AddCookie(cookie) } - return &SparkHistoryClient{ + + // Disable compression for API calls + req.Header.Del("Accept-Encoding") + req.Header.Set("Accept-Encoding", "identity") + + return &SparkClient{ Client: &http.Client{Jar: jar}, Request: req, }, nil diff --git a/internal/discovery/resolvers/rest/sparkapp_rest.go b/internal/discovery/resolvers/rest/sparkapp_rest.go index 0a04f4d..ae36b6f 100644 --- a/internal/discovery/resolvers/rest/sparkapp_rest.go +++ b/internal/discovery/resolvers/rest/sparkapp_rest.go @@ -17,13 +17,10 @@ package sparkclient import ( - "bytes" - "compress/gzip" "encoding/json" "fmt" - "io" "net/http" - "strconv" + "strings" "github.com/okdp/spark-web-proxy/internal/constants" restclient "github.com/okdp/spark-web-proxy/internal/discovery/resolvers/rest/client" @@ -31,82 +28,73 @@ import ( "github.com/okdp/spark-web-proxy/internal/model" ) -type SparkHistoryAppsClient struct { - *restclient.SparkHistoryClient +type SparkRestClient struct { + *restclient.SparkClient } -func NewSparkHistoryAppsClient(request *http.Request, sparkHistoryBaseURL string) (*SparkHistoryAppsClient, error) { - client, err := restclient.NewSparkHistoryClient(request, sparkHistoryBaseURL) - return &SparkHistoryAppsClient{ +func NewSparkRestClient(request *http.Request, sparkHistoryBaseURL string) (*SparkRestClient, error) { + client, err := restclient.NewSparkClient(request, sparkHistoryBaseURL) + return &SparkRestClient{ client, }, err } -func (c *SparkHistoryAppsClient) GetApplicationInfo(appID string) (*model.HistorySparkApp, error) { - c.Request.URL.Path = fmt.Sprintf("%s/%s", constants.SparkHistoryAppsEndpoint, appID) +func (c *SparkRestClient) GetApplications() (*[]model.SparkApp, error) { + + log.Debug("Get the list of spark history applications from URL: %s", c.Request.URL.String()) + + resp, err := c.Client.Do(c.Request) + if err != nil { + return nil, err + } + + return doResponse[[]model.SparkApp](resp, "") +} + +func (c *SparkRestClient) GetApplicationInfo(appID string) (*model.SparkApp, error) { + c.Request.URL.Path = fmt.Sprintf("%s/%s", constants.SparkAppsEndpoint, appID) log.Debug("Get the application status '%s' from URL: %s", appID, c.Request.URL.String()) resp, err := c.Client.Do(c.Request) if err != nil { - log.Error("Failed to get status for application '%s' from URL %s: %w", appID, c.Request.URL.Path, err) return nil, err } - return doResponse[model.HistorySparkApp](resp, appID) + return doResponse[model.SparkApp](resp, appID) } -func (c *SparkHistoryAppsClient) GetEnvironment(appID string) (*model.HistorySparkAppEnvironment, error) { - c.Request.URL.Path = fmt.Sprintf("%s/%s/%s", constants.SparkHistoryAppsEndpoint, appID, "environment") +func (c *SparkRestClient) GetEnvironment(appID string) (*model.SparkAppEnvironment, error) { + c.Request.URL.Path = fmt.Sprintf("%s/%s/%s", constants.SparkAppsEndpoint, appID, "environment") log.Debug("Get the application '%s' environment properties from URL: %s", appID, c.Request.URL.String()) resp, err := c.Client.Do(c.Request) if err != nil { - log.Error("Failed to get environment properties for application '%s' from URL %s: %w", appID, c.Request.URL.String(), err) return nil, err } - return doResponse[model.HistorySparkAppEnvironment](resp, appID) + return doResponse[model.SparkAppEnvironment](resp, appID) } func doResponse[T any](response *http.Response, appID string) (*T, error) { var object T - gzReader, err := gzip.NewReader(response.Body) - if err != nil { - return nil, err - } - defer gzReader.Close() + ct := strings.ToLower(response.Header.Get("Content-Type")) - // Read all decompressed content - body, err := io.ReadAll(gzReader) - if err != nil { - log.Error("Failed to read body response for application '%s': %w", appID, err) - return nil, err - } + log.Debug("Upstream response: status=%d content-encoding=%q content-type=%q content-length=%q", + response.StatusCode, response.Header.Get("Content-Encoding"), response.Header.Get("Content-Type"), response.Header.Get("Content-Length"), + ) - err = json.Unmarshal([]byte(string(body)), &object) - if err != nil { - log.Error("Failed to parse body response for application '%s': %w", appID, err) + // Not JSON content-type: log snippet and fail fast + if !strings.Contains(ct, "application/json") && !strings.Contains(ct, "text/json") { + return nil, fmt.Errorf("spark UI is initializing") } - // Re-compress the modified body - var buf bytes.Buffer - gzWriter := gzip.NewWriter(&buf) - _, err = gzWriter.Write(body) - if err != nil { - log.Error("Failed to write body response for application '%s': %w", appID, err) + if err := json.NewDecoder(response.Body).Decode(&object); err != nil { + log.Error("Failed to decode JSON for '%s': %v", appID, err) return nil, err } - err = gzWriter.Close() - if err != nil { - log.Error("Failed to close body response for application '%s': %w", appID, err) - return nil, err - } - response.Body = io.NopCloser(&buf) - response.ContentLength = int64(buf.Len()) - response.Header.Set("Content-Length", strconv.Itoa(buf.Len())) - response.Header.Set("Content-Encoding", "gzip") return &object, nil + } diff --git a/internal/model/history_sparkapp.go b/internal/model/history_sparkapp.go deleted file mode 100644 index f2c52ef..0000000 --- a/internal/model/history_sparkapp.go +++ /dev/null @@ -1,59 +0,0 @@ -/* - * Copyright 2025 okdp.io - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package model - -// HistorySparkApp represents the structure of a Spark application info JSON response. -type HistorySparkApp struct { - ID string `json:"id"` - Name string `json:"name"` - Attempts []HistorySparkAppAttempt `json:"attempts"` -} - -// HistorySparkAppAttempt represents each attempt of the Spark application. -type HistorySparkAppAttempt struct { - StartTime string `json:"startTime"` - EndTime string `json:"endTime"` - LastUpdated string `json:"lastUpdated"` - Duration int64 `json:"duration"` - SparkUser string `json:"sparkUser"` - Completed bool `json:"completed"` - AppSparkVersion string `json:"appSparkVersion"` - StartTimeEpoch int64 `json:"startTimeEpoch"` - EndTimeEpoch int64 `json:"endTimeEpoch"` - LastUpdatedEpoch int64 `json:"lastUpdatedEpoch"` -} - -// HistorySparkAppEnvironment represents the JSON structure for Spark history environment response (/applications/[app-id]/environment). -type HistorySparkAppEnvironment struct { - SparkProperties [][]string `json:"sparkProperties"` -} - -// IsRunning checks if the Spark application is still running. -// It returns true if at least one attempt meets any of the following conditions: -// 1. Completed is false -// 2. Duration is 0 -// 3. EndTimeEpoch is -1 -func (app HistorySparkApp) IsRunning() bool { - for _, attempt := range app.Attempts { - if !attempt.Completed || - attempt.Duration == 0 || - attempt.EndTimeEpoch == -1 { - return true - } - } - return false -} diff --git a/internal/model/sparkapp.go b/internal/model/sparkapp.go new file mode 100644 index 0000000..505d679 --- /dev/null +++ b/internal/model/sparkapp.go @@ -0,0 +1,59 @@ +/* + * Copyright 2025 okdp.io + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package model + +// SparkApp represents the structure of a Spark application info JSON response. +type SparkApp struct { + ID string `json:"id,omitempty"` + Name string `json:"name,omitempty"` + Attempts []SparkAppAttempt `json:"attempts,omitempty"` +} + +// SparkAppAttempt represents each attempt of the Spark application. +type SparkAppAttempt struct { + StartTime string `json:"startTime,omitempty"` + EndTime string `json:"endTime,omitempty"` + LastUpdated string `json:"lastUpdated,omitempty"` + Duration int64 `json:"duration,omitempty"` + SparkUser string `json:"sparkUser,omitempty"` + Completed bool `json:"completed,omitempty"` + AppSparkVersion string `json:"appSparkVersion,omitempty"` + StartTimeEpoch int64 `json:"startTimeEpoch,omitempty"` + EndTimeEpoch int64 `json:"endTimeEpoch,omitempty"` + LastUpdatedEpoch int64 `json:"lastUpdatedEpoch,omitempty"` +} + +// SparkAppEnvironment represents the JSON structure for Spark history environment response (/applications/[app-id]/environment). +type SparkAppEnvironment struct { + SparkProperties [][]string `json:"sparkProperties"` +} + +// IsRunning checks if the Spark application is still running. +// It returns true if at least one attempt meets any of the following conditions: +// 1. Completed is false +// 2. Duration is 0 +// 3. EndTimeEpoch is -1 +func (app SparkApp) IsRunning() bool { + for _, attempt := range app.Attempts { + if !attempt.Completed || + attempt.Duration == 0 || + attempt.EndTimeEpoch == -1 { + return true + } + } + return false +} diff --git a/internal/model/cached_sparkapp.go b/internal/model/sparkapp_instance.go similarity index 76% rename from internal/model/cached_sparkapp.go rename to internal/model/sparkapp_instance.go index 787dc89..70af3b3 100644 --- a/internal/model/cached_sparkapp.go +++ b/internal/model/sparkapp_instance.go @@ -20,12 +20,13 @@ import ( "sync" ) -type CachedSparkApp struct { - BaseURL string - PodName string - AppID string - Namespace string - Status string +type SparkAppInstance struct { + BaseURL string + PodName string + AppID string + Namespace string + Status string + StartTimeEpoch int64 } // SparkAppsStore holds a concurrent map of Spark applications, keyed by appId. @@ -35,16 +36,16 @@ var ( }{} ) -func (app CachedSparkApp) IsRunning() bool { +func (app SparkAppInstance) IsRunning() bool { return app.Status == string(AppRunning) } -func (app CachedSparkApp) IsCompleted() bool { +func (app SparkAppInstance) IsCompleted() bool { return !app.IsRunning() } // AddOrUpdateSparkApp adds a new SparkApp to the map or updates an existing one -func AddOrUpdateSparkApp(app *CachedSparkApp) { +func AddOrUpdateSparkApp(app *SparkAppInstance) { SparkAppsStore.Instances.Store(app.AppID, app) } @@ -54,7 +55,7 @@ func MakeSparkAppCompleted(appID string) { if found { app.Status = string(AppUnknown) } else { - app = &CachedSparkApp{ + app = &SparkAppInstance{ AppID: appID, Status: string(AppUnknown), } @@ -86,12 +87,12 @@ func DeleteSparkApp(appID string) { // if found { // fmt.Println("Deleted SparkApp:", deletedApp) // } -func DeleteSparkAppByName(podName string) (*CachedSparkApp, bool) { - var deletedApp *CachedSparkApp +func DeleteSparkAppByName(podName string) (*SparkAppInstance, bool) { + var deletedApp *SparkAppInstance var found bool SparkAppsStore.Instances.Range(func(key, value interface{}) bool { - if app, ok := value.(*CachedSparkApp); ok && app.PodName == podName { + if app, ok := value.(*SparkAppInstance); ok && app.PodName == podName { deletedApp = app found = true SparkAppsStore.Instances.Delete(key) @@ -104,20 +105,21 @@ func DeleteSparkAppByName(podName string) (*CachedSparkApp, bool) { } // Get retrieves a SparkApp from the map by appID -func GetSparkApp(appID string) (*CachedSparkApp, bool) { +func GetSparkApp(appID string) (*SparkAppInstance, bool) { value, exists := SparkAppsStore.Instances.Load(appID) if exists { - return value.(*CachedSparkApp), exists + return value.(*SparkAppInstance), exists } - return &CachedSparkApp{}, false + return &SparkAppInstance{}, false } -// ListSparkApps retrieves all SparkApps from the map -func ListSparkApps() []*CachedSparkApp { - var apps []*CachedSparkApp +// GetRunningSparkApps retrieves all SparkApps from the map +func GetRunningSparkApps() []*SparkAppInstance { + apps := make([]*SparkAppInstance, 0) SparkAppsStore.Instances.Range(func(_, value interface{}) bool { - if app, ok := value.(CachedSparkApp); ok { - apps = append(apps, &app) + app := value.(*SparkAppInstance) + if app != nil && app.IsRunning() { + apps = append(apps, app) } return true }) @@ -143,8 +145,8 @@ func ListSparkApps() []*CachedSparkApp { // } // value, found := response.GetProperty("spark.app.id") // fmt.Println(value, found) // Output: "spark-xyz123 true" -func (r HistorySparkAppEnvironment) GetProperty(propertyName string) (string, bool) { - for _, property := range r.SparkProperties { +func (app SparkAppEnvironment) GetProperty(propertyName string) (string, bool) { + for _, property := range app.SparkProperties { if property[0] == propertyName { return property[1], true } diff --git a/internal/model/cached_sparkapp_test.go b/internal/model/sparkapp_instance_test.go similarity index 97% rename from internal/model/cached_sparkapp_test.go rename to internal/model/sparkapp_instance_test.go index c4c33b4..b52e424 100644 --- a/internal/model/cached_sparkapp_test.go +++ b/internal/model/sparkapp_instance_test.go @@ -24,7 +24,7 @@ import ( func TestGetProperty(t *testing.T) { // Define a sample SparkHistoryEnvironmentResponse - response := HistorySparkAppEnvironment{ + response := SparkAppEnvironment{ SparkProperties: [][]string{ {"spark.acls.enable", "true"}, {"spark.app.id", "spark-9adcb03756d042de8f2f5c7deb8715b3"}, diff --git a/internal/model/history_sparkapp_test.go b/internal/model/sparkapp_test.go similarity index 83% rename from internal/model/history_sparkapp_test.go rename to internal/model/sparkapp_test.go index 5d53888..03468fb 100644 --- a/internal/model/history_sparkapp_test.go +++ b/internal/model/sparkapp_test.go @@ -24,15 +24,15 @@ import ( func TestIsRunning(t *testing.T) { tests := []struct { name string - app HistorySparkApp + app SparkApp expected bool }{ { name: "Running application (Completed false)", - app: HistorySparkApp{ + app: SparkApp{ ID: "spark-123", Name: "TestApp", - Attempts: []HistorySparkAppAttempt{ + Attempts: []SparkAppAttempt{ {Completed: false}, }, }, @@ -40,10 +40,10 @@ func TestIsRunning(t *testing.T) { }, { name: "Running application (Duration 0)", - app: HistorySparkApp{ + app: SparkApp{ ID: "spark-456", Name: "TestApp", - Attempts: []HistorySparkAppAttempt{ + Attempts: []SparkAppAttempt{ {Completed: true, Duration: 0}, }, }, @@ -51,10 +51,10 @@ func TestIsRunning(t *testing.T) { }, { name: "Running application (EndTimeEpoch -1)", - app: HistorySparkApp{ + app: SparkApp{ ID: "spark-789", Name: "TestApp", - Attempts: []HistorySparkAppAttempt{ + Attempts: []SparkAppAttempt{ {Completed: true, Duration: 100, EndTimeEpoch: -1}, }, }, @@ -62,10 +62,10 @@ func TestIsRunning(t *testing.T) { }, { name: "Completed application", - app: HistorySparkApp{ + app: SparkApp{ ID: "spark-999", Name: "TestApp", - Attempts: []HistorySparkAppAttempt{ + Attempts: []SparkAppAttempt{ {Completed: true, Duration: 100, EndTimeEpoch: 1742487647315}, }, }, @@ -73,10 +73,10 @@ func TestIsRunning(t *testing.T) { }, { name: "No attempts", - app: HistorySparkApp{ + app: SparkApp{ ID: "spark-000", Name: "TestApp", - Attempts: []HistorySparkAppAttempt{}, + Attempts: []SparkAppAttempt{}, }, expected: false, }, diff --git a/internal/server/server.go b/internal/server/server.go index d056c60..f341392 100644 --- a/internal/server/server.go +++ b/internal/server/server.go @@ -57,19 +57,46 @@ func NewSparkUIProxyServer(config *config.ApplicationConfig) *http.Server { r.Use(security.HTTPSecurity(config.Security)...) // Spark UI - sparkui := controllers.NewSparkUIController(config) - sparkhistory := controllers.NewSparkkHistoryController(config) + sparkUI := controllers.NewSparkUIController(config) + sparkHistory := controllers.NewSparkHistoryController(config) + sparkApps := controllers.NewSparkAppsController(config) + // Spark UI Handler - r.Any(fmt.Sprintf("%s/:appID/*path", config.Spark.UI.ProxyBase), sparkui.HandleLiveApp) + r.Any(fmt.Sprintf("%s/:appID/*path", config.Spark.UI.ProxyBase), sparkUI.HandleRunningApp) + // Spark history Handlers - r.Any("/history/:appID/*path", sparkhistory.HandleHistoryApp) - r.Any("/static/*path", sparkhistory.HandleDefault) - r.Any("/api/v1/applications", sparkhistory.HandleDefault) - r.Any("/api/v1/applications/*path", sparkhistory.HandleDefault) - r.Any("/history/", sparkhistory.HandleDefault) - r.Any("/home/", sparkhistory.HandleDefault) - r.Any("/jobs/", sparkhistory.HandleDefault) - r.Any("/", sparkhistory.HandleDefault) + r.Any("/history/:appID/*path", sparkHistory.HandleHistoryApp) + r.Any("/static/*path", sparkHistory.HandleDefault) + r.Any("/api/v1/applications", func(c *gin.Context) { + if c.Query("status") == "running" { + sparkApps.HandleIncompleteApplications(c) + return + } + sparkHistory.HandleDefault(c) + }) + r.Any("/api/v1/applications/*path", sparkHistory.HandleDefault) + r.Any("/history/", sparkHistory.HandleDefault) + r.Any("/home/", func(c *gin.Context) { + if c.Query("showIncomplete") == constants.True { + sparkHistory.HandleIncompleteApps(c) + return + } + sparkHistory.HandleDefault(c) + }) + r.Any("/jobs/", func(c *gin.Context) { + if c.Query("showIncomplete") == constants.True { + sparkHistory.HandleIncompleteApps(c) + return + } + sparkHistory.HandleDefault(c) + }) + r.Any("/", func(c *gin.Context) { + if c.Query("showIncomplete") == constants.True { + sparkHistory.HandleIncompleteApps(c) + return + } + sparkHistory.HandleDefault(c) + }) r.GET(constants.HealthzURI, controllers.Healthz) r.GET(constants.ReadinessURI, controllers.Readiness) diff --git a/internal/spark/default_handler.go b/internal/spark/default_handler.go index 205cbb7..2a7f775 100644 --- a/internal/spark/default_handler.go +++ b/internal/spark/default_handler.go @@ -51,14 +51,11 @@ func (c DefaultSparkHandler) ModifyRequest(upstreamURL *url.URL) func(*http.Requ upstreamURL.RawQuery = req.URL.RawQuery upstreamURL.RawFragment = req.URL.RawFragment req.URL = upstreamURL - // log.Debug("Request Method: %s, URL: %s, Host: %s, Headers: %v", req.Method, req.URL.String(), req.Host, req.Header) } } func (c DefaultSparkHandler) ModifyResponse() func(*http.Response) error { return func(resp *http.Response) error { - resp.TransferEncoding = []string{"identity"} - // log.Debug("Response Status: %d, Headers: %v", resp.StatusCode, resp.Header) if resp.StatusCode == http.StatusFound { location := resp.Header.Get("Location") if location == "" { @@ -67,7 +64,7 @@ func (c DefaultSparkHandler) ModifyResponse() func(*http.Response) error { } parsedURL, err := url.Parse(location) if err != nil { - log.Error("Error parsing Location URL: %v", err) + log.Error("Error parsing Location URL: %+v", err) return nil } @@ -78,6 +75,7 @@ func (c DefaultSparkHandler) ModifyResponse() func(*http.Response) error { resp.Header.Set("Location", newLocation) log.Debug("Rewritten Location Header: %s", newLocation) + return nil } return nil diff --git a/internal/spark/incomplete_apps_handler.go b/internal/spark/incomplete_apps_handler.go new file mode 100644 index 0000000..dc9073e --- /dev/null +++ b/internal/spark/incomplete_apps_handler.go @@ -0,0 +1,232 @@ +/* + * Copyright 2025 okdp.io + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package spark + +import ( + "bytes" + "compress/gzip" + "io" + "math" + "net/http" + "net/url" + "regexp" + "strconv" + "strings" + + "github.com/gin-gonic/gin" + log "github.com/okdp/spark-web-proxy/internal/logging" + "github.com/okdp/spark-web-proxy/internal/spark/proxy" +) + +var sparkVersionRe = regexp.MustCompile(`(?is)]*class=["'][^"']*\bversion\b[^"']*["'][^>]*>\s*([0-9]+)`) + +type IncompleteAppsHandler struct { +} + +func NewIncompleteAppsHandler(upstreamURL *url.URL, appID string) *proxy.SparkReverseProxy { + return proxy.NewSparkReverseProxy(IncompleteAppsHandler{}, upstreamURL, appID) +} + +func ServeSparkHistoryIncompleteApps(c *gin.Context, upstreamURL *url.URL, appID string) { + NewIncompleteAppsHandler(upstreamURL, appID). + ServeHTTP(c.Writer, c.Request) +} + +func (c IncompleteAppsHandler) ModifyRequest(upstreamURL *url.URL) func(*http.Request) { + return func(req *http.Request) { + req.URL.Scheme = upstreamURL.Scheme + req.URL.Host = upstreamURL.Host + req.Host = upstreamURL.Host + upstreamURL.RawQuery = req.URL.RawQuery + upstreamURL.RawFragment = req.URL.RawFragment + req.URL = upstreamURL + } +} + +func (c IncompleteAppsHandler) ModifyResponse() func(*http.Response) error { + return func(resp *http.Response) error { + resp.TransferEncoding = []string{"identity"} + // spark.history.ui.maxApplications = math.MaxInt32 + // https://spark.apache.org/docs/latest/monitoring.html#spark-history-server-configuration-options + return handleIncompleteApplicationsPage(resp, math.MaxInt32) + } +} + +func handleIncompleteApplicationsPage(resp *http.Response, limit int) error { + + log.Debug("Handle incomplete applications pages") + + ct := strings.ToLower(resp.Header.Get("Content-Type")) + if !strings.Contains(ct, "text/html") { + return nil + } + + plain, raw, isGzip, err := readBodyMaybeGunzip(resp) + if err != nil { + log.Warn("Failed to read HTML response body: %v", err) + return nil + } + + // Intercept "no incomplete applications" page + if !bytes.Contains(plain, []byte("No incomplete applications found!")) { + restoreBody(resp, raw, isGzip) + return nil + } + + modified := replaceNoIncompleteBlock(plain, limit) + + if err := writeBody(resp, modified, isGzip); err != nil { + log.Warn("Failed to write modified HTML response body: %v", err) + restoreBody(resp, raw, isGzip) + return nil + } + + log.Debug("Add Spark historypage scripts into 'incomplete applications' page") + return nil +} + +func replaceNoIncompleteBlock(html []byte, limit int) []byte { + major, ok := sparkMajorFromHTML(html) + + var repl []byte + if ok && major >= 4 { + log.Debug("Spark version parsed successfully (major=%d); using Spark 4+ ES module call", major) + repl = []byte( + `` + "\n" + + `` + "\n" + + `` + "\n" + + `
` + "\n", + ) + } else { + log.Debug("Spark version parsed successfully (major=%d); using Spark 3+ classic js call", major) + repl = []byte( + `` + "\n" + + `
` + "\n" + + `` + "\n" + + `` + "\n", + ) + } + + needle := []byte("

No incomplete applications found!

") + if bytes.Contains(html, needle) { + return bytes.Replace(html, needle, repl, 1) + } + + // fallback: replace plain text + return bytes.Replace(html, []byte("No incomplete applications found!"), repl, 1) +} + +func readBodyMaybeGunzip(resp *http.Response) (plain []byte, raw []byte, isGzip bool, err error) { + isGzip = strings.Contains(strings.ToLower(resp.Header.Get("Content-Encoding")), "gzip") + + raw, err = io.ReadAll(resp.Body) + _ = resp.Body.Close() + if err != nil { + return nil, nil, isGzip, err + } + + if !isGzip { + return raw, raw, false, nil + } + + gr, err := gzip.NewReader(bytes.NewReader(raw)) + if err != nil { + return nil, nil, true, err + } + defer gr.Close() + + plain, err = io.ReadAll(gr) + if err != nil { + return nil, nil, true, err + } + return plain, raw, true, nil +} + +func restoreBody(resp *http.Response, raw []byte, isGzip bool) { + resp.Body = io.NopCloser(bytes.NewReader(raw)) + resp.ContentLength = int64(len(raw)) + resp.Header.Set("Content-Length", strconv.Itoa(len(raw))) + if isGzip { + resp.Header.Set("Content-Encoding", "gzip") + } else { + resp.Header.Del("Content-Encoding") + } +} + +func writeBody(resp *http.Response, plain []byte, gzipIt bool) error { + if !gzipIt { + resp.Body = io.NopCloser(bytes.NewReader(plain)) + resp.ContentLength = int64(len(plain)) + resp.Header.Set("Content-Length", strconv.Itoa(len(plain))) + resp.Header.Del("Content-Encoding") + return nil + } + + var buf bytes.Buffer + gw := gzip.NewWriter(&buf) + if _, err := gw.Write(plain); err != nil { + _ = gw.Close() + return err + } + if err := gw.Close(); err != nil { + return err + } + + b := buf.Bytes() + resp.Body = io.NopCloser(bytes.NewReader(b)) + resp.ContentLength = int64(len(b)) + resp.Header.Set("Content-Length", strconv.Itoa(len(b))) + resp.Header.Set("Content-Encoding", "gzip") + return nil +} + +/* +sparkMajorFromHTML extracts the Apache Spark *major version* from +the Spark UI HTML page. + +It looks for a element whose class attribute contains the token +"version", for example: + + 3.3.1 + 4.0.0 + 4.1.1 + +Only the *major* version (the number before the first dot) is returned. + +Returns: + + (major, true) if a version span is found and parsed successfully + (0, false) if the version cannot be found or parsed + +This function intentionally uses a lightweight regex instead of a full +HTML parser for performance and robustness in a reverse-proxy context, +where Spark’s HTML structure is stable and well-known. +*/ +func sparkMajorFromHTML(b []byte) (int, bool) { + m := sparkVersionRe.FindSubmatch(b) + if len(m) < 2 { + return 0, false + } + major, err := strconv.Atoi(string(m[1])) + if err != nil { + return 0, false + } + return major, true +} diff --git a/internal/spark/proxy/handler.go b/internal/spark/proxy/handler.go index f3c694d..ca8698e 100644 --- a/internal/spark/proxy/handler.go +++ b/internal/spark/proxy/handler.go @@ -17,7 +17,10 @@ package proxy import ( + "context" + "errors" "fmt" + "net" "net/http" "net/url" "strings" @@ -50,6 +53,10 @@ type ReverseProxyHandler interface { // the error and sends the appropriate response back to the client. func DefaultErrorHandler(appID string) func(http.ResponseWriter, *http.Request, error) { return func(rw http.ResponseWriter, req *http.Request, err error) { + if isCancelErr(err) { + log.Debug("Request canceled for app '%s' url=%s: %v", appID, req.URL.String(), err) + return + } log.Error("An error was occured when accessing the application '%s' at URL: %s, \ndetails: %+v", appID, req.URL.String(), err) http.Error(rw, fmt.Sprintf("An error was occured when accessing the application '%s' at URL: %s, %s", appID, req.URL.String(), err.Error()), http.StatusBadGateway) } @@ -57,6 +64,10 @@ func DefaultErrorHandler(appID string) func(http.ResponseWriter, *http.Request, func SparkUIErrorHandler(fromURL *url.URL, appID string) func(http.ResponseWriter, *http.Request, error) { return func(rw http.ResponseWriter, req *http.Request, err error) { + if isCancelErr(err) { + log.Debug("Request canceled for app '%s' url=%s: %v", appID, req.URL.String(), err) + return + } if strings.Contains(fromURL.Path, "/kill") && utils.IsBrowserRequest(req) { previousPage := utils.CleanKillURLPath(fromURL.Path) log.Info("A spark job or stage kill was received '%s' for application '%s', redirecting to previous page: %s", fromURL.Path, appID, previousPage) @@ -71,3 +82,33 @@ func SparkUIErrorHandler(fromURL *url.URL, appID string) func(http.ResponseWrite rw.WriteHeader(http.StatusFound) } } + +// isCancelErr reports whether the given error is caused by a request +// cancellation or timeout rather than a real upstream failure. +// +// This typically happens when: +// - the client (browser) closes the connection +// - the user navigates away or refreshes the page +// - the request context is canceled by Gin / net/http +// - a request times out while waiting for a response +// +// These errors are expected in reverse proxies and should usually be +// logged at debug or warn level, not as hard errors. +func isCancelErr(err error) bool { + if err == nil { + return false + } + + // Context canceled by client disconnect or request abort + if errors.Is(err, context.Canceled) || errors.Is(err, context.DeadlineExceeded) { + return true + } + + // Network timeout (common when client disconnects mid-stream) + var ne net.Error + if errors.As(err, &ne) && ne.Timeout() { + return true + } + + return false +} diff --git a/internal/utils/sparkutils.go b/internal/utils/sparkutils.go index aa58539..1357c62 100644 --- a/internal/utils/sparkutils.go +++ b/internal/utils/sparkutils.go @@ -18,6 +18,7 @@ package utils import ( "regexp" + "time" ) // Function to clean the Spark URL job or stage kill path @@ -29,3 +30,56 @@ func CleanKillURLPath(path string) string { } return path } + +// FormatSparkTime converts an epoch timestamp in milliseconds to the +// Spark History Server time format. +// +// The returned format matches Spark's UI and REST API expectations: +// +// YYYY-MM-DDTHH:mm:ss.SSSGMT +// +// Example: +// +// epochMillis: 1767710303938 +// output: "2026-01-06T14:38:23.938GMT" +// +// The time is always formatted in UTC (GMT). +func FormatSparkTime(epochMillis int64) string { + return time.UnixMilli(epochMillis). + UTC(). + Format("2006-01-02T15:04:05.000GMT") +} + +// MergeByKey merges two slices of the same type into one. +// If an element exists in both slices (same key), the element from `preferred` wins. +// Memory notes: +// - Allocates exactly one output slice with capacity len(preferred)+len(other) +// - Allocates one map sized to len(other) (only items inserted from `other`) +func MergeByKey[T any](preferred []T, other []T, keyFn func(T) string) []T { + + merged := make([]T, 0, len(preferred)+len(other)) + index := make(map[string]int, len(other)) + + for _, v := range other { + k := keyFn(v) + if k == "" { + continue + } + index[k] = len(merged) + merged = append(merged, v) + } + + for _, v := range preferred { + k := keyFn(v) + if k == "" { + continue + } + if i, ok := index[k]; ok { + merged[i] = v + continue + } + merged = append(merged, v) + } + + return merged +}