From 1e01c9bad651f40f06130a2d6de69b390e88685a Mon Sep 17 00:00:00 2001 From: Ayush Petwal Date: Thu, 16 Apr 2026 19:58:14 +0530 Subject: [PATCH 1/3] feat(metrics): add controller-level Prometheus metrics and ServiceMonitor Signed-off-by: Ayush Petwal --- .../templates/manager/service-monitor.yaml | 51 ++++ charts/kubeflow-trainer/values.yaml | 18 ++ cmd/trainer-controller-manager/main.go | 13 ++ go.mod | 3 +- manifests/base/manager/kustomization.yaml | 1 + manifests/base/manager/service_monitor.yaml | 38 +++ pkg/controller/trainjob_controller.go | 39 ++++ pkg/metrics/metrics.go | 220 ++++++++++++++++++ pkg/metrics/metrics_test.go | 202 ++++++++++++++++ pkg/runtime/core/core.go | 9 + pkg/runtime/framework/core/framework.go | 30 ++- pkg/version/version.go | 31 +++ pkg/webhooks/trainjob_webhook.go | 25 +- 13 files changed, 669 insertions(+), 11 deletions(-) create mode 100644 charts/kubeflow-trainer/templates/manager/service-monitor.yaml create mode 100644 manifests/base/manager/service_monitor.yaml create mode 100644 pkg/metrics/metrics.go create mode 100644 pkg/metrics/metrics_test.go create mode 100644 pkg/version/version.go diff --git a/charts/kubeflow-trainer/templates/manager/service-monitor.yaml b/charts/kubeflow-trainer/templates/manager/service-monitor.yaml new file mode 100644 index 0000000000..e8d09bb1a1 --- /dev/null +++ b/charts/kubeflow-trainer/templates/manager/service-monitor.yaml @@ -0,0 +1,51 @@ +{{- /* +Copyright 2024 The Kubeflow authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + https://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ -}} + +{{- if .Values.manager.metrics.serviceMonitor.enabled }} +{{- if not (.Capabilities.APIVersions.Has "monitoring.coreos.com/v1/ServiceMonitor") -}} +{{- fail "ServiceMonitor requires the monitoring.coreos.com/v1 CRD (Prometheus Operator). Install Prometheus Operator first or set manager.metrics.serviceMonitor.enabled=false." -}} +{{- end }} +apiVersion: monitoring.coreos.com/v1 +kind: ServiceMonitor +metadata: + name: {{ include "trainer.manager.service.name" . }} + namespace: {{ .Release.Namespace }} + labels: + {{- include "trainer.manager.labels" . | nindent 4 }} + {{- with .Values.manager.metrics.serviceMonitor.additionalLabels }} + {{- toYaml . | nindent 4 }} + {{- end }} +spec: + selector: + matchLabels: + {{- include "trainer.manager.selectorLabels" . | nindent 6 }} + namespaceSelector: + matchNames: + - {{ .Release.Namespace }} + endpoints: + - port: monitoring-port + path: /metrics + scheme: https + interval: {{ .Values.manager.metrics.serviceMonitor.interval | default "30s" }} + scrapeTimeout: {{ .Values.manager.metrics.serviceMonitor.scrapeTimeout | default "10s" }} + bearerTokenFile: /var/run/secrets/kubernetes.io/serviceaccount/token + tlsConfig: + {{- if .Values.manager.metrics.serviceMonitor.tlsConfig }} + {{- toYaml .Values.manager.metrics.serviceMonitor.tlsConfig | nindent 8 }} + {{- else }} + insecureSkipVerify: true + {{- end }} +{{- end }} diff --git a/charts/kubeflow-trainer/values.yaml b/charts/kubeflow-trainer/values.yaml index 6d32e47214..99a259ee8c 100644 --- a/charts/kubeflow-trainer/values.yaml +++ b/charts/kubeflow-trainer/values.yaml @@ -108,6 +108,24 @@ manager: seccompProfile: type: RuntimeDefault + # -- Prometheus metrics configuration. + metrics: + serviceMonitor: + # -- Whether to create a Prometheus Operator ServiceMonitor for /metrics. + # Requires the monitoring.coreos.com/v1 CRDs (Prometheus Operator) installed. + enabled: false + # -- Scrape interval. + interval: 30s + # -- Scrape timeout. + scrapeTimeout: 10s + # -- Extra labels added to the ServiceMonitor metadata (e.g. release: prometheus) + # so that the Prometheus instance selects this monitor. + additionalLabels: {} + # -- TLS config for the metrics endpoint. + # Defaults to insecureSkipVerify: true so the controller-manager's self-signed CA + # does not block scrapes. Override with caFile/serverName for production. + tlsConfig: {} + # -- Controller manager configuration. # This configuration is used to generate the ConfigMap for the controller manager. config: diff --git a/cmd/trainer-controller-manager/main.go b/cmd/trainer-controller-manager/main.go index 3871dc84d2..13b58df638 100644 --- a/cmd/trainer-controller-manager/main.go +++ b/cmd/trainer-controller-manager/main.go @@ -21,6 +21,7 @@ import ( "flag" "net/http" "os" + goruntime "runtime" zaplog "go.uber.org/zap" "go.uber.org/zap/zapcore" @@ -41,10 +42,12 @@ import ( "github.com/kubeflow/trainer/v2/pkg/config" "github.com/kubeflow/trainer/v2/pkg/controller" "github.com/kubeflow/trainer/v2/pkg/features" + "github.com/kubeflow/trainer/v2/pkg/metrics" "github.com/kubeflow/trainer/v2/pkg/runtime" runtimecore "github.com/kubeflow/trainer/v2/pkg/runtime/core" "github.com/kubeflow/trainer/v2/pkg/statusserver" "github.com/kubeflow/trainer/v2/pkg/util/cert" + "github.com/kubeflow/trainer/v2/pkg/version" "github.com/kubeflow/trainer/v2/pkg/webhooks" ) @@ -125,6 +128,16 @@ func main() { os.Exit(1) } + metrics.Register() + metrics.BuildInfo.WithLabelValues( + version.GitVersion, + version.GitCommit, + version.BuildDate, + goruntime.Version(), + goruntime.Compiler, + goruntime.GOOS+"/"+goruntime.GOARCH, + ).Set(1) + certsReady := make(chan struct{}) if config.IsCertManagementEnabled(&cfg) { setupLog.Info("Setting up certificate management") diff --git a/go.mod b/go.mod index 82f2dbc221..9919c77a17 100644 --- a/go.mod +++ b/go.mod @@ -9,6 +9,7 @@ require ( github.com/onsi/ginkgo/v2 v2.28.1 github.com/onsi/gomega v1.39.1 github.com/open-policy-agent/cert-controller v0.16.0 + github.com/prometheus/client_golang v1.23.2 go.uber.org/zap v1.27.1 golang.org/x/crypto v0.48.0 k8s.io/api v0.35.2 @@ -54,6 +55,7 @@ require ( github.com/inconshreveable/mousetrap v1.1.0 // indirect github.com/josharian/intern v1.0.0 // indirect github.com/json-iterator/go v1.1.12 // indirect + github.com/kylelemons/godebug v1.1.0 // indirect github.com/mailru/easyjson v0.9.0 // indirect github.com/mattn/go-isatty v0.0.20 // indirect github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd // indirect @@ -62,7 +64,6 @@ require ( github.com/pelletier/go-toml v1.9.5 // indirect github.com/pkg/errors v0.9.1 // indirect github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2 // indirect - github.com/prometheus/client_golang v1.23.2 // indirect github.com/prometheus/client_model v0.6.2 // indirect github.com/prometheus/common v0.66.1 // indirect github.com/prometheus/procfs v0.16.1 // indirect diff --git a/manifests/base/manager/kustomization.yaml b/manifests/base/manager/kustomization.yaml index 9ddc848c0d..05a152d8f2 100644 --- a/manifests/base/manager/kustomization.yaml +++ b/manifests/base/manager/kustomization.yaml @@ -1,5 +1,6 @@ resources: - manager.yaml + - service_monitor.yaml # Disable hash suffix for predictable ConfigMap names generatorOptions: diff --git a/manifests/base/manager/service_monitor.yaml b/manifests/base/manager/service_monitor.yaml new file mode 100644 index 0000000000..39c2b30752 --- /dev/null +++ b/manifests/base/manager/service_monitor.yaml @@ -0,0 +1,38 @@ +# Copyright 2024 The Kubeflow Authors. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +# ServiceMonitor for the Kubeflow Trainer controller-manager metrics endpoint. +# Requires the Prometheus Operator (monitoring.coreos.com/v1) to be installed. +# If Prometheus Operator is not available, remove this resource from kustomization.yaml. +apiVersion: monitoring.coreos.com/v1 +kind: ServiceMonitor +metadata: + name: kubeflow-trainer-controller-manager + namespace: kubeflow +spec: + selector: + matchLabels: + app.kubernetes.io/component: manager + namespaceSelector: + matchNames: + - kubeflow + endpoints: + - port: monitoring-port + path: /metrics + scheme: https + interval: 30s + scrapeTimeout: 10s + bearerTokenFile: /var/run/secrets/kubernetes.io/serviceaccount/token + tlsConfig: + insecureSkipVerify: true diff --git a/pkg/controller/trainjob_controller.go b/pkg/controller/trainjob_controller.go index 4c5e3529a5..3ffddc050b 100644 --- a/pkg/controller/trainjob_controller.go +++ b/pkg/controller/trainjob_controller.go @@ -45,6 +45,7 @@ import ( trainer "github.com/kubeflow/trainer/v2/pkg/apis/trainer/v1alpha1" "github.com/kubeflow/trainer/v2/pkg/constants" + "github.com/kubeflow/trainer/v2/pkg/metrics" jobruntimes "github.com/kubeflow/trainer/v2/pkg/runtime" "github.com/kubeflow/trainer/v2/pkg/util/trainjob" ) @@ -98,6 +99,10 @@ func NewTrainJobReconciler(client client.Client, recorder events.EventRecorder, // +kubebuilder:rbac:groups=coordination.k8s.io,resources=leases,verbs=create;get;list;update func (r *TrainJobReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) { + start := time.Now() + reconcileResult := "success" + defer func() { metrics.ObserveReconcile("trainjob_controller", reconcileResult, time.Since(start)) }() + var trainJob trainer.TrainJob if err := r.client.Get(ctx, req.NamespacedName, &trainJob); err != nil { return ctrl.Result{}, client.IgnoreNotFound(err) @@ -136,10 +141,42 @@ func (r *TrainJobReconciler) Reconcile(ctx context.Context, req ctrl.Request) (c setSuspendedCondition(&trainJob) + // Detect Suspended: False → True transitions and record the metric. + prevSuspendedCond := meta.FindStatusCondition(prevTrainJob.Status.Conditions, trainer.TrainJobSuspended) + currSuspendedCond := meta.FindStatusCondition(trainJob.Status.Conditions, trainer.TrainJobSuspended) + if currSuspendedCond != nil && currSuspendedCond.Status == metav1.ConditionTrue && + (prevSuspendedCond == nil || prevSuspendedCond.Status != metav1.ConditionTrue) { + metrics.RecordTrainJobSuspended(trainJob.Namespace, metrics.RuntimeKind(&trainJob)) + } + if statusErr := setTrainJobStatus(ctx, runtime, &trainJob); statusErr != nil { err = errors.Join(err, statusErr) } + // Detect terminal state transitions and record lifecycle metrics. + // Using prevTrainJob (snapshot taken before any status mutations) to prevent + // double-counting on subsequent reconcile cycles after the status is persisted. + runtimeKind := metrics.RuntimeKind(&trainJob) + prevCompleteCond := meta.FindStatusCondition(prevTrainJob.Status.Conditions, trainer.TrainJobComplete) + currCompleteCond := meta.FindStatusCondition(trainJob.Status.Conditions, trainer.TrainJobComplete) + if currCompleteCond != nil && currCompleteCond.Status == metav1.ConditionTrue && + (prevCompleteCond == nil || prevCompleteCond.Status != metav1.ConditionTrue) { + dur := currCompleteCond.LastTransitionTime.Sub(trainJob.CreationTimestamp.Time) + metrics.RecordTrainJobCompleted(trainJob.Namespace, runtimeKind, dur) + } + + prevFailedCond := meta.FindStatusCondition(prevTrainJob.Status.Conditions, trainer.TrainJobFailed) + currFailedCond := meta.FindStatusCondition(trainJob.Status.Conditions, trainer.TrainJobFailed) + if currFailedCond != nil && currFailedCond.Status == metav1.ConditionTrue && + (prevFailedCond == nil || prevFailedCond.Status != metav1.ConditionTrue) { + dur := currFailedCond.LastTransitionTime.Sub(trainJob.CreationTimestamp.Time) + metrics.RecordTrainJobFailed(trainJob.Namespace, runtimeKind, currFailedCond.Reason, dur) + } + + if err != nil { + reconcileResult = "error" + } + if deadlineResult, deadlineErr := r.reconcileDeadline(ctx, &trainJob); deadlineErr != nil || deadlineResult.RequeueAfter > 0 { if !equality.Semantic.DeepEqual(&trainJob.Status, &prevTrainJob.Status) { return deadlineResult, errors.Join(err, r.client.Status().Patch(ctx, &trainJob, client.MergeFrom(prevTrainJob))) @@ -208,12 +245,14 @@ func (r *TrainJobReconciler) reconcileDeadline(ctx context.Context, trainJob *tr func (r *TrainJobReconciler) Create(e event.TypedCreateEvent[*trainer.TrainJob]) bool { r.log.WithValues("trainJob", klog.KObj(e.Object)).Info("TrainJob create event") + metrics.RecordTrainJobCreated(e.Object.Namespace, metrics.RuntimeKind(e.Object)) defer r.notifyWatchers(nil, e.Object) return true } func (r *TrainJobReconciler) Delete(e event.TypedDeleteEvent[*trainer.TrainJob]) bool { r.log.WithValues("trainJob", klog.KObj(e.Object)).Info("TrainJob delete event") + metrics.RecordTrainJobDeleted(e.Object.Namespace, metrics.RuntimeKind(e.Object)) defer r.notifyWatchers(e.Object, nil) return true } diff --git a/pkg/metrics/metrics.go b/pkg/metrics/metrics.go new file mode 100644 index 0000000000..d0a68086a7 --- /dev/null +++ b/pkg/metrics/metrics.go @@ -0,0 +1,220 @@ +/* +Copyright 2024 The Kubeflow Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package metrics + +import ( + "sync" + "time" + + "github.com/prometheus/client_golang/prometheus" + "k8s.io/utils/ptr" + ctrlmetrics "sigs.k8s.io/controller-runtime/pkg/metrics" + + trainer "github.com/kubeflow/trainer/v2/pkg/apis/trainer/v1alpha1" +) + +const ( + metricsNamespace = "kubeflow" + metricsSubsystem = "trainer" +) + +// durationBuckets matches Kueue's bucket policy: covers tiny CI jobs through long-running fine-tunes. +var durationBuckets = []float64{1, 5, 10, 30, 60, 120, 300, 600, 1800, 3600, 10800, 21600} + +var ( + // BuildInfo surfaces build metadata as a Gauge always set to 1. + BuildInfo = prometheus.NewGaugeVec(prometheus.GaugeOpts{ + Namespace: metricsNamespace, + Subsystem: metricsSubsystem, + Name: "build_info", + Help: "Build metadata for the Kubeflow Trainer controller manager (always 1).", + }, []string{"git_version", "git_commit", "build_date", "go_version", "compiler", "platform"}) + + // TrainJobsCreatedTotal is a counter for the total number of TrainJobs created. + TrainJobsCreatedTotal = prometheus.NewCounterVec(prometheus.CounterOpts{ + Namespace: metricsNamespace, + Subsystem: metricsSubsystem, + Name: "trainjobs_created_total", + Help: "Total number of TrainJobs created.", + }, []string{"namespace", "runtime"}) + + // TrainJobsCompletedTotal is a counter for TrainJobs that completed successfully. + TrainJobsCompletedTotal = prometheus.NewCounterVec(prometheus.CounterOpts{ + Namespace: metricsNamespace, + Subsystem: metricsSubsystem, + Name: "trainjobs_completed_total", + Help: "Total number of TrainJobs that completed successfully.", + }, []string{"namespace", "runtime"}) + + // TrainJobsFailedTotal is a counter for TrainJobs that failed. + // The reason label maps to the TrainJob Failed condition reason. + TrainJobsFailedTotal = prometheus.NewCounterVec(prometheus.CounterOpts{ + Namespace: metricsNamespace, + Subsystem: metricsSubsystem, + Name: "trainjobs_failed_total", + Help: "Total number of TrainJobs that failed.", + }, []string{"namespace", "runtime", "reason"}) + + // TrainJobsSuspendedTotal is a counter for TrainJob suspension events. + TrainJobsSuspendedTotal = prometheus.NewCounterVec(prometheus.CounterOpts{ + Namespace: metricsNamespace, + Subsystem: metricsSubsystem, + Name: "trainjobs_suspended_total", + Help: "Total number of TrainJob suspension events.", + }, []string{"namespace", "runtime"}) + + // TrainJobsDeletedTotal is a counter for TrainJobs that have been deleted. + TrainJobsDeletedTotal = prometheus.NewCounterVec(prometheus.CounterOpts{ + Namespace: metricsNamespace, + Subsystem: metricsSubsystem, + Name: "trainjobs_deleted_total", + Help: "Total number of TrainJobs deleted.", + }, []string{"namespace", "runtime"}) + + // TrainJobsActive is a gauge tracking TrainJobs present in the cluster. + // It resets on controller restart; incremented on Create events, decremented on Delete events. + TrainJobsActive = prometheus.NewGaugeVec(prometheus.GaugeOpts{ + Namespace: metricsNamespace, + Subsystem: metricsSubsystem, + Name: "trainjobs_active", + Help: "Number of TrainJobs currently present in the cluster (resets on controller restart).", + }, []string{"namespace", "runtime"}) + + // TrainJobDurationSeconds is a histogram of TrainJob end-to-end duration from creation to terminal condition. + TrainJobDurationSeconds = prometheus.NewHistogramVec(prometheus.HistogramOpts{ + Namespace: metricsNamespace, + Subsystem: metricsSubsystem, + Name: "trainjob_duration_seconds", + Help: "End-to-end duration of TrainJobs from creation to terminal condition.", + Buckets: durationBuckets, + }, []string{"namespace", "runtime", "result"}) + + // ReconcileDurationSeconds is a histogram of reconcile iteration latencies. + ReconcileDurationSeconds = prometheus.NewHistogramVec(prometheus.HistogramOpts{ + Namespace: metricsNamespace, + Subsystem: metricsSubsystem, + Name: "reconcile_duration_seconds", + Help: "Latency of reconcile iterations per controller.", + Buckets: prometheus.DefBuckets, + }, []string{"controller", "result"}) + + // PluginExecutionDurationSeconds is a histogram of plugin execution latencies. + PluginExecutionDurationSeconds = prometheus.NewHistogramVec(prometheus.HistogramOpts{ + Namespace: metricsNamespace, + Subsystem: metricsSubsystem, + Name: "plugin_execution_duration_seconds", + Help: "Latency of plugin execution per plugin and phase.", + Buckets: prometheus.DefBuckets, + }, []string{"plugin", "phase"}) + + // PluginExecutionErrorsTotal is a counter for plugin execution errors. + PluginExecutionErrorsTotal = prometheus.NewCounterVec(prometheus.CounterOpts{ + Namespace: metricsNamespace, + Subsystem: metricsSubsystem, + Name: "plugin_execution_errors_total", + Help: "Total number of errors encountered during plugin execution.", + }, []string{"plugin", "phase"}) + + // RuntimesRegistered is a gauge for the number of registered training runtimes by kind. + RuntimesRegistered = prometheus.NewGaugeVec(prometheus.GaugeOpts{ + Namespace: metricsNamespace, + Subsystem: metricsSubsystem, + Name: "runtimes_registered", + Help: "Number of registered training runtimes by kind.", + }, []string{"kind"}) + + // WebhookValidationTotal is a counter for TrainJob webhook validation calls. + WebhookValidationTotal = prometheus.NewCounterVec(prometheus.CounterOpts{ + Namespace: metricsNamespace, + Subsystem: metricsSubsystem, + Name: "webhook_validation_total", + Help: "Total number of webhook validation calls by resource, operation, and result.", + }, []string{"resource", "operation", "result"}) +) + +var registerOnce sync.Once + +// Register registers all Kubeflow Trainer metrics with the controller-runtime registry. +// It is idempotent and safe to call multiple times; only the first call takes effect. +func Register() { + registerOnce.Do(func() { + ctrlmetrics.Registry.MustRegister( + BuildInfo, + TrainJobsCreatedTotal, + TrainJobsCompletedTotal, + TrainJobsFailedTotal, + TrainJobsSuspendedTotal, + TrainJobsDeletedTotal, + TrainJobsActive, + TrainJobDurationSeconds, + ReconcileDurationSeconds, + PluginExecutionDurationSeconds, + PluginExecutionErrorsTotal, + RuntimesRegistered, + WebhookValidationTotal, + ) + }) +} + +// RuntimeKind extracts the runtime kind label from a TrainJob's RuntimeRef. +// Returns "Unknown" if the Kind field is nil. +func RuntimeKind(trainJob *trainer.TrainJob) string { + return ptr.Deref(trainJob.Spec.RuntimeRef.Kind, "Unknown") +} + +// RecordTrainJobCreated increments the created counter and active gauge. +func RecordTrainJobCreated(namespace, runtimeKind string) { + TrainJobsCreatedTotal.WithLabelValues(namespace, runtimeKind).Inc() + TrainJobsActive.WithLabelValues(namespace, runtimeKind).Inc() +} + +// RecordTrainJobDeleted increments the deleted counter and decrements the active gauge. +func RecordTrainJobDeleted(namespace, runtimeKind string) { + TrainJobsDeletedTotal.WithLabelValues(namespace, runtimeKind).Inc() + TrainJobsActive.WithLabelValues(namespace, runtimeKind).Dec() +} + +// RecordTrainJobCompleted increments the completed counter and observes the duration histogram. +func RecordTrainJobCompleted(namespace, runtimeKind string, dur time.Duration) { + TrainJobsCompletedTotal.WithLabelValues(namespace, runtimeKind).Inc() + TrainJobDurationSeconds.WithLabelValues(namespace, runtimeKind, "Complete").Observe(dur.Seconds()) +} + +// RecordTrainJobFailed increments the failed counter and observes the duration histogram. +func RecordTrainJobFailed(namespace, runtimeKind, reason string, dur time.Duration) { + TrainJobsFailedTotal.WithLabelValues(namespace, runtimeKind, reason).Inc() + TrainJobDurationSeconds.WithLabelValues(namespace, runtimeKind, "Failed").Observe(dur.Seconds()) +} + +// RecordTrainJobSuspended increments the suspended counter. +func RecordTrainJobSuspended(namespace, runtimeKind string) { + TrainJobsSuspendedTotal.WithLabelValues(namespace, runtimeKind).Inc() +} + +// ObserveReconcile records the duration of a reconcile iteration. +func ObserveReconcile(controller, result string, dur time.Duration) { + ReconcileDurationSeconds.WithLabelValues(controller, result).Observe(dur.Seconds()) +} + +// ObservePlugin records the latency of a plugin execution phase and increments the +// error counter if err is non-nil. +func ObservePlugin(pluginName, phase string, dur time.Duration, err error) { + PluginExecutionDurationSeconds.WithLabelValues(pluginName, phase).Observe(dur.Seconds()) + if err != nil { + PluginExecutionErrorsTotal.WithLabelValues(pluginName, phase).Inc() + } +} diff --git a/pkg/metrics/metrics_test.go b/pkg/metrics/metrics_test.go new file mode 100644 index 0000000000..8b0b17508d --- /dev/null +++ b/pkg/metrics/metrics_test.go @@ -0,0 +1,202 @@ +/* +Copyright 2024 The Kubeflow Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package metrics + +import ( + "errors" + "testing" + "time" + + "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/client_golang/prometheus/testutil" + "k8s.io/utils/ptr" + + trainer "github.com/kubeflow/trainer/v2/pkg/apis/trainer/v1alpha1" +) + +// newIsolatedRegistry returns a fresh prometheus.Registry with all Trainer metrics registered. +// Each test should use its own instance to avoid cross-test interference. +func newIsolatedRegistry(t *testing.T) *prometheus.Registry { + t.Helper() + reg := prometheus.NewRegistry() + collectors := []prometheus.Collector{ + BuildInfo, + TrainJobsCreatedTotal, + TrainJobsCompletedTotal, + TrainJobsFailedTotal, + TrainJobsSuspendedTotal, + TrainJobsDeletedTotal, + TrainJobsActive, + TrainJobDurationSeconds, + ReconcileDurationSeconds, + PluginExecutionDurationSeconds, + PluginExecutionErrorsTotal, + RuntimesRegistered, + WebhookValidationTotal, + } + for _, c := range collectors { + if err := reg.Register(c); err != nil { + // AlreadyRegisteredError is expected in tests that share the global vars. + if _, ok := err.(prometheus.AlreadyRegisteredError); !ok { + t.Fatalf("failed to register collector: %v", err) + } + } + } + return reg +} + +func TestRuntimeKind(t *testing.T) { + tests := []struct { + name string + trainJob *trainer.TrainJob + want string + }{ + { + name: "kind set to ClusterTrainingRuntime", + trainJob: &trainer.TrainJob{ + Spec: trainer.TrainJobSpec{ + RuntimeRef: trainer.RuntimeRef{Kind: ptr.To("ClusterTrainingRuntime")}, + }, + }, + want: "ClusterTrainingRuntime", + }, + { + name: "kind set to TrainingRuntime", + trainJob: &trainer.TrainJob{ + Spec: trainer.TrainJobSpec{ + RuntimeRef: trainer.RuntimeRef{Kind: ptr.To("TrainingRuntime")}, + }, + }, + want: "TrainingRuntime", + }, + { + name: "kind is nil", + trainJob: &trainer.TrainJob{ + Spec: trainer.TrainJobSpec{ + RuntimeRef: trainer.RuntimeRef{}, + }, + }, + want: "Unknown", + }, + } + for _, tc := range tests { + t.Run(tc.name, func(t *testing.T) { + got := RuntimeKind(tc.trainJob) + if got != tc.want { + t.Errorf("RuntimeKind() = %q, want %q", got, tc.want) + } + }) + } +} + +func TestRecordTrainJobCreated(t *testing.T) { + newIsolatedRegistry(t) + + RecordTrainJobCreated("test-ns", "ClusterTrainingRuntime") + + if got := testutil.ToFloat64(TrainJobsCreatedTotal.WithLabelValues("test-ns", "ClusterTrainingRuntime")); got != 1 { + t.Errorf("TrainJobsCreatedTotal = %v, want 1", got) + } + if got := testutil.ToFloat64(TrainJobsActive.WithLabelValues("test-ns", "ClusterTrainingRuntime")); got != 1 { + t.Errorf("TrainJobsActive = %v, want 1", got) + } +} + +func TestRecordTrainJobDeleted(t *testing.T) { + newIsolatedRegistry(t) + + // Use a unique namespace so the active gauge is isolated from other tests. + // One Create followed by one Delete should net to zero. + RecordTrainJobCreated("test-ns-deleted", "ClusterTrainingRuntime") + RecordTrainJobDeleted("test-ns-deleted", "ClusterTrainingRuntime") + + if got := testutil.ToFloat64(TrainJobsDeletedTotal.WithLabelValues("test-ns-deleted", "ClusterTrainingRuntime")); got != 1 { + t.Errorf("TrainJobsDeletedTotal = %v, want 1", got) + } + if got := testutil.ToFloat64(TrainJobsActive.WithLabelValues("test-ns-deleted", "ClusterTrainingRuntime")); got != 0 { + t.Errorf("TrainJobsActive = %v, want 0", got) + } +} + +func TestRecordTrainJobCompleted(t *testing.T) { + newIsolatedRegistry(t) + + RecordTrainJobCompleted("test-ns", "ClusterTrainingRuntime", 30*time.Second) + + if got := testutil.ToFloat64(TrainJobsCompletedTotal.WithLabelValues("test-ns", "ClusterTrainingRuntime")); got != 1 { + t.Errorf("TrainJobsCompletedTotal = %v, want 1", got) + } + // Verify histogram received at least one observation. + if count := testutil.CollectAndCount(TrainJobDurationSeconds, "kubeflow_trainer_trainjob_duration_seconds"); count == 0 { + t.Errorf("TrainJobDurationSeconds has no series, want at least one observation") + } +} + +func TestRecordTrainJobFailed(t *testing.T) { + newIsolatedRegistry(t) + + RecordTrainJobFailed("test-ns", "ClusterTrainingRuntime", "DeadlineExceeded", 60*time.Second) + + if got := testutil.ToFloat64(TrainJobsFailedTotal.WithLabelValues("test-ns", "ClusterTrainingRuntime", "DeadlineExceeded")); got != 1 { + t.Errorf("TrainJobsFailedTotal = %v, want 1", got) + } + // Verify histogram received at least one observation. + if count := testutil.CollectAndCount(TrainJobDurationSeconds, "kubeflow_trainer_trainjob_duration_seconds"); count == 0 { + t.Errorf("TrainJobDurationSeconds has no series, want at least one observation") + } +} + +func TestRecordTrainJobSuspended(t *testing.T) { + newIsolatedRegistry(t) + + RecordTrainJobSuspended("test-ns", "TrainingRuntime") + + if got := testutil.ToFloat64(TrainJobsSuspendedTotal.WithLabelValues("test-ns", "TrainingRuntime")); got != 1 { + t.Errorf("TrainJobsSuspendedTotal = %v, want 1", got) + } +} + +func TestObserveReconcile(t *testing.T) { + newIsolatedRegistry(t) + + ObserveReconcile("trainjob_controller", "success", 5*time.Millisecond) + ObserveReconcile("trainjob_controller", "error", 2*time.Millisecond) + + // Verify histogram received observations for both results. + if count := testutil.CollectAndCount(ReconcileDurationSeconds, "kubeflow_trainer_reconcile_duration_seconds"); count == 0 { + t.Errorf("ReconcileDurationSeconds has no series, want at least one observation") + } +} + +func TestObservePlugin(t *testing.T) { + newIsolatedRegistry(t) + + ObservePlugin("jobset", "build", 1*time.Millisecond, nil) + ObservePlugin("torch", "enforce_ml_policy", 2*time.Millisecond, errors.New("plugin error")) + + // Verify the histogram received observations. + if count := testutil.CollectAndCount(PluginExecutionDurationSeconds, "kubeflow_trainer_plugin_execution_duration_seconds"); count == 0 { + t.Errorf("PluginExecutionDurationSeconds has no series, want at least one observation") + } + if got := testutil.ToFloat64(PluginExecutionErrorsTotal.WithLabelValues("torch", "enforce_ml_policy")); got != 1 { + t.Errorf("PluginExecutionErrorsTotal[torch/enforce_ml_policy] = %v, want 1", got) + } + // No error for jobset/build + if got := testutil.ToFloat64(PluginExecutionErrorsTotal.WithLabelValues("jobset", "build")); got != 0 { + t.Errorf("PluginExecutionErrorsTotal[jobset/build] = %v, want 0", got) + } +} diff --git a/pkg/runtime/core/core.go b/pkg/runtime/core/core.go index dcea1df6de..d27ad166ea 100644 --- a/pkg/runtime/core/core.go +++ b/pkg/runtime/core/core.go @@ -23,6 +23,7 @@ import ( "sigs.k8s.io/controller-runtime/pkg/client" configapi "github.com/kubeflow/trainer/v2/pkg/apis/config/v1alpha1" + "github.com/kubeflow/trainer/v2/pkg/metrics" "github.com/kubeflow/trainer/v2/pkg/runtime" ) @@ -55,6 +56,14 @@ func New(ctx context.Context, client client.Client, indexer client.FieldIndexer, } } runtimes = newRuntimes + // Record the number of registered runtimes by kind for observability. + kindCounts := make(map[string]float64) + for key := range newRuntimes { + kindCounts[key]++ + } + for kind, count := range kindCounts { + metrics.RuntimesRegistered.WithLabelValues(kind).Set(count) + } return newRuntimes, nil } diff --git a/pkg/runtime/framework/core/framework.go b/pkg/runtime/framework/core/framework.go index cd7394a8f2..1b824ea595 100644 --- a/pkg/runtime/framework/core/framework.go +++ b/pkg/runtime/framework/core/framework.go @@ -19,6 +19,7 @@ package core import ( "context" "errors" + "time" apiruntime "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/util/validation/field" @@ -27,6 +28,7 @@ import ( configapi "github.com/kubeflow/trainer/v2/pkg/apis/config/v1alpha1" trainer "github.com/kubeflow/trainer/v2/pkg/apis/trainer/v1alpha1" + "github.com/kubeflow/trainer/v2/pkg/metrics" "github.com/kubeflow/trainer/v2/pkg/runtime" "github.com/kubeflow/trainer/v2/pkg/runtime/framework" fwkplugins "github.com/kubeflow/trainer/v2/pkg/runtime/framework/plugins" @@ -93,7 +95,10 @@ func New(ctx context.Context, c client.Client, r fwkplugins.Registry, indexer cl func (f *Framework) RunEnforceMLPolicyPlugins(info *runtime.Info, trainJob *trainer.TrainJob) error { for _, plugin := range f.enforceMLPlugins { - if err := plugin.EnforceMLPolicy(info, trainJob); err != nil { + start := time.Now() + err := plugin.EnforceMLPolicy(info, trainJob) + metrics.ObservePlugin(plugin.Name(), "enforce_ml_policy", time.Since(start), err) + if err != nil { return err } } @@ -102,7 +107,10 @@ func (f *Framework) RunEnforceMLPolicyPlugins(info *runtime.Info, trainJob *trai func (f *Framework) RunEnforcePodGroupPolicyPlugins(info *runtime.Info, trainJob *trainer.TrainJob) error { for _, plugin := range f.enforcePodGroupPolicyPlugins { - if err := plugin.EnforcePodGroupPolicy(info, trainJob); err != nil { + start := time.Now() + err := plugin.EnforcePodGroupPolicy(info, trainJob) + metrics.ObservePlugin(plugin.Name(), "enforce_pod_group_policy", time.Since(start), err) + if err != nil { return err } } @@ -113,7 +121,13 @@ func (f *Framework) RunCustomValidationPlugins(ctx context.Context, info *runtim var aggregatedWarnings admission.Warnings var aggregatedErrors field.ErrorList for _, plugin := range f.customValidationPlugins { + start := time.Now() warnings, errs := plugin.Validate(ctx, info, oldObj, newObj) + var pluginErr error + if len(errs) > 0 { + pluginErr = errs.ToAggregate() + } + metrics.ObservePlugin(plugin.Name(), "validate", time.Since(start), pluginErr) if len(warnings) != 0 { aggregatedWarnings = append(aggregatedWarnings, warnings...) } @@ -126,7 +140,10 @@ func (f *Framework) RunCustomValidationPlugins(ctx context.Context, info *runtim func (f *Framework) RunPodNetworkPlugins(info *runtime.Info, trainJob *trainer.TrainJob) error { for _, plugin := range f.podNetworkPlugins { - if err := plugin.IdentifyPodNetwork(info, trainJob); err != nil { + start := time.Now() + err := plugin.IdentifyPodNetwork(info, trainJob) + metrics.ObservePlugin(plugin.Name(), "pod_network", time.Since(start), err) + if err != nil { return err } } @@ -136,7 +153,9 @@ func (f *Framework) RunPodNetworkPlugins(info *runtime.Info, trainJob *trainer.T func (f *Framework) RunComponentBuilderPlugins(ctx context.Context, info *runtime.Info, trainJob *trainer.TrainJob) ([]apiruntime.ApplyConfiguration, error) { var objs []apiruntime.ApplyConfiguration for _, plugin := range f.componentBuilderPlugins { + start := time.Now() components, err := plugin.Build(ctx, info, trainJob) + metrics.ObservePlugin(plugin.Name(), "build", time.Since(start), err) if err != nil { return nil, err } @@ -147,7 +166,10 @@ func (f *Framework) RunComponentBuilderPlugins(ctx context.Context, info *runtim func (f *Framework) RunTrainJobStatusPlugin(ctx context.Context, trainJob *trainer.TrainJob) (*trainer.TrainJobStatus, error) { if f.trainJobStatusPlugin != nil { - return f.trainJobStatusPlugin.Status(ctx, trainJob) + start := time.Now() + status, err := f.trainJobStatusPlugin.Status(ctx, trainJob) + metrics.ObservePlugin(f.trainJobStatusPlugin.Name(), "status", time.Since(start), err) + return status, err } return nil, nil } diff --git a/pkg/version/version.go b/pkg/version/version.go new file mode 100644 index 0000000000..70590418a4 --- /dev/null +++ b/pkg/version/version.go @@ -0,0 +1,31 @@ +/* +Copyright 2024 The Kubeflow Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +// Package version holds version information for the Kubeflow Trainer controller manager. +// Variables are set at build time via -ldflags: +// +// -ldflags "-X github.com/kubeflow/trainer/v2/pkg/version.GitVersion=v1.0.0 +// -X github.com/kubeflow/trainer/v2/pkg/version.GitCommit=abc123 +// -X github.com/kubeflow/trainer/v2/pkg/version.BuildDate=2024-01-01T00:00:00Z" +package version + +// These variables are populated at build time via -ldflags. +// They default to "unknown" so the build_info metric is always present. +var ( + GitVersion = "unknown" + GitCommit = "unknown" + BuildDate = "unknown" +) diff --git a/pkg/webhooks/trainjob_webhook.go b/pkg/webhooks/trainjob_webhook.go index 454d08b4d9..d82d17fd1c 100644 --- a/pkg/webhooks/trainjob_webhook.go +++ b/pkg/webhooks/trainjob_webhook.go @@ -30,6 +30,7 @@ import ( "sigs.k8s.io/controller-runtime/pkg/webhook/admission" trainer "github.com/kubeflow/trainer/v2/pkg/apis/trainer/v1alpha1" + "github.com/kubeflow/trainer/v2/pkg/metrics" "github.com/kubeflow/trainer/v2/pkg/runtime" ) @@ -112,12 +113,18 @@ func (w *TrainJobValidator) ValidateCreate(ctx context.Context, obj *trainer.Tra log.V(5).Info("Validating create", "TrainJob", klog.KObj(obj)) runtimeRefGK := runtime.RuntimeRefToRuntimeRegistryKey(obj.Spec.RuntimeRef) - runtime, ok := w.runtimes[runtimeRefGK] + rt, ok := w.runtimes[runtimeRefGK] if !ok { + metrics.WebhookValidationTotal.WithLabelValues("TrainJob", "create", "error").Inc() return nil, fmt.Errorf("unsupported runtime: %s", runtimeRefGK) } - warnings, errors := runtime.ValidateObjects(ctx, nil, obj) - return warnings, errors.ToAggregate() + warnings, errs := rt.ValidateObjects(ctx, nil, obj) + result := "success" + if errs != nil { + result = "error" + } + metrics.WebhookValidationTotal.WithLabelValues("TrainJob", "create", result).Inc() + return warnings, errs.ToAggregate() } func (w *TrainJobValidator) ValidateUpdate(ctx context.Context, oldObj, newObj *trainer.TrainJob) (admission.Warnings, error) { @@ -125,12 +132,18 @@ func (w *TrainJobValidator) ValidateUpdate(ctx context.Context, oldObj, newObj * log.V(5).Info("Validating update", "TrainJob", klog.KObj(newObj)) runtimeRefGK := runtime.RuntimeRefToRuntimeRegistryKey(newObj.Spec.RuntimeRef) - runtime, ok := w.runtimes[runtimeRefGK] + rt, ok := w.runtimes[runtimeRefGK] if !ok { + metrics.WebhookValidationTotal.WithLabelValues("TrainJob", "update", "error").Inc() return nil, fmt.Errorf("unsupported runtime: %s", runtimeRefGK) } - warnings, errors := runtime.ValidateObjects(ctx, oldObj, newObj) - return warnings, errors.ToAggregate() + warnings, errs := rt.ValidateObjects(ctx, oldObj, newObj) + result := "success" + if errs != nil { + result = "error" + } + metrics.WebhookValidationTotal.WithLabelValues("TrainJob", "update", result).Inc() + return warnings, errs.ToAggregate() } func (w *TrainJobValidator) ValidateDelete(ctx context.Context, obj *trainer.TrainJob) (admission.Warnings, error) { From 071246620b025f98c3eeab9b3cc8e155f2dd3358 Mon Sep 17 00:00:00 2001 From: Ayush Petwal Date: Thu, 16 Apr 2026 23:24:21 +0530 Subject: [PATCH 2/3] fix(metrics): address review comments on controller metrics PR Signed-off-by: Ayush Petwal --- manifests/base/manager/kustomization.yaml | 1 - manifests/base/manager/manager.yaml | 4 ++ .../overlays/monitoring/kustomization.yaml | 4 ++ .../monitoring}/service_monitor.yaml | 2 +- pkg/controller/trainjob_controller.go | 35 ++++++++++---- pkg/metrics/metrics.go | 6 +-- pkg/metrics/metrics_test.go | 46 ------------------- pkg/runtime/core/core.go | 10 ++-- 8 files changed, 42 insertions(+), 66 deletions(-) create mode 100644 manifests/overlays/monitoring/kustomization.yaml rename manifests/{base/manager => overlays/monitoring}/service_monitor.yaml (93%) diff --git a/manifests/base/manager/kustomization.yaml b/manifests/base/manager/kustomization.yaml index 05a152d8f2..9ddc848c0d 100644 --- a/manifests/base/manager/kustomization.yaml +++ b/manifests/base/manager/kustomization.yaml @@ -1,6 +1,5 @@ resources: - manager.yaml - - service_monitor.yaml # Disable hash suffix for predictable ConfigMap names generatorOptions: diff --git a/manifests/base/manager/manager.yaml b/manifests/base/manager/manager.yaml index 209ee07c09..431b1bee5d 100644 --- a/manifests/base/manager/manager.yaml +++ b/manifests/base/manager/manager.yaml @@ -79,6 +79,10 @@ apiVersion: v1 kind: Service metadata: name: kubeflow-trainer-controller-manager + labels: + app.kubernetes.io/name: trainer + app.kubernetes.io/component: manager + app.kubernetes.io/part-of: kubeflow spec: ports: - name: monitoring-port diff --git a/manifests/overlays/monitoring/kustomization.yaml b/manifests/overlays/monitoring/kustomization.yaml new file mode 100644 index 0000000000..b33f78105c --- /dev/null +++ b/manifests/overlays/monitoring/kustomization.yaml @@ -0,0 +1,4 @@ +# Monitoring overlay — apply this only if Prometheus Operator (monitoring.coreos.com/v1) is installed. +resources: + - ../../base/manager + - service_monitor.yaml diff --git a/manifests/base/manager/service_monitor.yaml b/manifests/overlays/monitoring/service_monitor.yaml similarity index 93% rename from manifests/base/manager/service_monitor.yaml rename to manifests/overlays/monitoring/service_monitor.yaml index 39c2b30752..a1c91b4564 100644 --- a/manifests/base/manager/service_monitor.yaml +++ b/manifests/overlays/monitoring/service_monitor.yaml @@ -14,7 +14,7 @@ # # ServiceMonitor for the Kubeflow Trainer controller-manager metrics endpoint. # Requires the Prometheus Operator (monitoring.coreos.com/v1) to be installed. -# If Prometheus Operator is not available, remove this resource from kustomization.yaml. +# Apply this overlay only if Prometheus Operator is available in the cluster. apiVersion: monitoring.coreos.com/v1 kind: ServiceMonitor metadata: diff --git a/pkg/controller/trainjob_controller.go b/pkg/controller/trainjob_controller.go index 3ffddc050b..f99cfc4d1a 100644 --- a/pkg/controller/trainjob_controller.go +++ b/pkg/controller/trainjob_controller.go @@ -141,36 +141,41 @@ func (r *TrainJobReconciler) Reconcile(ctx context.Context, req ctrl.Request) (c setSuspendedCondition(&trainJob) - // Detect Suspended: False → True transitions and record the metric. + // Collect pending lifecycle metric callbacks; fired only after a successful status patch + // to prevent double-counting when the patch fails and the next reconcile re-detects the transition. + var pendingMetrics []func() + + // Detect Suspended: False → True transitions. prevSuspendedCond := meta.FindStatusCondition(prevTrainJob.Status.Conditions, trainer.TrainJobSuspended) currSuspendedCond := meta.FindStatusCondition(trainJob.Status.Conditions, trainer.TrainJobSuspended) if currSuspendedCond != nil && currSuspendedCond.Status == metav1.ConditionTrue && (prevSuspendedCond == nil || prevSuspendedCond.Status != metav1.ConditionTrue) { - metrics.RecordTrainJobSuspended(trainJob.Namespace, metrics.RuntimeKind(&trainJob)) + ns, rk := trainJob.Namespace, metrics.RuntimeKind(&trainJob) + pendingMetrics = append(pendingMetrics, func() { metrics.RecordTrainJobSuspended(ns, rk) }) } if statusErr := setTrainJobStatus(ctx, runtime, &trainJob); statusErr != nil { err = errors.Join(err, statusErr) } - // Detect terminal state transitions and record lifecycle metrics. - // Using prevTrainJob (snapshot taken before any status mutations) to prevent - // double-counting on subsequent reconcile cycles after the status is persisted. + // Detect terminal state transitions. runtimeKind := metrics.RuntimeKind(&trainJob) prevCompleteCond := meta.FindStatusCondition(prevTrainJob.Status.Conditions, trainer.TrainJobComplete) currCompleteCond := meta.FindStatusCondition(trainJob.Status.Conditions, trainer.TrainJobComplete) if currCompleteCond != nil && currCompleteCond.Status == metav1.ConditionTrue && (prevCompleteCond == nil || prevCompleteCond.Status != metav1.ConditionTrue) { + ns, rk := trainJob.Namespace, runtimeKind dur := currCompleteCond.LastTransitionTime.Sub(trainJob.CreationTimestamp.Time) - metrics.RecordTrainJobCompleted(trainJob.Namespace, runtimeKind, dur) + pendingMetrics = append(pendingMetrics, func() { metrics.RecordTrainJobCompleted(ns, rk, dur) }) } prevFailedCond := meta.FindStatusCondition(prevTrainJob.Status.Conditions, trainer.TrainJobFailed) currFailedCond := meta.FindStatusCondition(trainJob.Status.Conditions, trainer.TrainJobFailed) if currFailedCond != nil && currFailedCond.Status == metav1.ConditionTrue && (prevFailedCond == nil || prevFailedCond.Status != metav1.ConditionTrue) { + ns, rk, reason := trainJob.Namespace, runtimeKind, currFailedCond.Reason dur := currFailedCond.LastTransitionTime.Sub(trainJob.CreationTimestamp.Time) - metrics.RecordTrainJobFailed(trainJob.Namespace, runtimeKind, currFailedCond.Reason, dur) + pendingMetrics = append(pendingMetrics, func() { metrics.RecordTrainJobFailed(ns, rk, reason, dur) }) } if err != nil { @@ -179,7 +184,13 @@ func (r *TrainJobReconciler) Reconcile(ctx context.Context, req ctrl.Request) (c if deadlineResult, deadlineErr := r.reconcileDeadline(ctx, &trainJob); deadlineErr != nil || deadlineResult.RequeueAfter > 0 { if !equality.Semantic.DeepEqual(&trainJob.Status, &prevTrainJob.Status) { - return deadlineResult, errors.Join(err, r.client.Status().Patch(ctx, &trainJob, client.MergeFrom(prevTrainJob))) + patchErr := r.client.Status().Patch(ctx, &trainJob, client.MergeFrom(prevTrainJob)) + if patchErr == nil { + for _, fn := range pendingMetrics { + fn() + } + } + return deadlineResult, errors.Join(err, patchErr) } return deadlineResult, errors.Join(err, deadlineErr) } @@ -187,7 +198,13 @@ func (r *TrainJobReconciler) Reconcile(ctx context.Context, req ctrl.Request) (c if !equality.Semantic.DeepEqual(&trainJob.Status, prevTrainJob.Status) { // TODO(astefanutti): Consider using SSA once controller-runtime client has SSA support // for sub-resources. See: https://github.com/kubernetes-sigs/controller-runtime/issues/3183 - return ctrl.Result{}, errors.Join(err, r.client.Status().Patch(ctx, &trainJob, client.MergeFrom(prevTrainJob))) + patchErr := r.client.Status().Patch(ctx, &trainJob, client.MergeFrom(prevTrainJob)) + if patchErr == nil { + for _, fn := range pendingMetrics { + fn() + } + } + return ctrl.Result{}, errors.Join(err, patchErr) } return ctrl.Result{}, err } diff --git a/pkg/metrics/metrics.go b/pkg/metrics/metrics.go index d0a68086a7..c6e2a617ff 100644 --- a/pkg/metrics/metrics.go +++ b/pkg/metrics/metrics.go @@ -129,13 +129,13 @@ var ( Help: "Total number of errors encountered during plugin execution.", }, []string{"plugin", "phase"}) - // RuntimesRegistered is a gauge for the number of registered training runtimes by kind. + // RuntimesRegistered is a gauge for the number of registered training runtimes by group and kind. RuntimesRegistered = prometheus.NewGaugeVec(prometheus.GaugeOpts{ Namespace: metricsNamespace, Subsystem: metricsSubsystem, Name: "runtimes_registered", - Help: "Number of registered training runtimes by kind.", - }, []string{"kind"}) + Help: "Number of registered training runtimes by group and kind.", + }, []string{"group", "kind"}) // WebhookValidationTotal is a counter for TrainJob webhook validation calls. WebhookValidationTotal = prometheus.NewCounterVec(prometheus.CounterOpts{ diff --git a/pkg/metrics/metrics_test.go b/pkg/metrics/metrics_test.go index 8b0b17508d..984eb42404 100644 --- a/pkg/metrics/metrics_test.go +++ b/pkg/metrics/metrics_test.go @@ -21,44 +21,12 @@ import ( "testing" "time" - "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus/testutil" "k8s.io/utils/ptr" trainer "github.com/kubeflow/trainer/v2/pkg/apis/trainer/v1alpha1" ) -// newIsolatedRegistry returns a fresh prometheus.Registry with all Trainer metrics registered. -// Each test should use its own instance to avoid cross-test interference. -func newIsolatedRegistry(t *testing.T) *prometheus.Registry { - t.Helper() - reg := prometheus.NewRegistry() - collectors := []prometheus.Collector{ - BuildInfo, - TrainJobsCreatedTotal, - TrainJobsCompletedTotal, - TrainJobsFailedTotal, - TrainJobsSuspendedTotal, - TrainJobsDeletedTotal, - TrainJobsActive, - TrainJobDurationSeconds, - ReconcileDurationSeconds, - PluginExecutionDurationSeconds, - PluginExecutionErrorsTotal, - RuntimesRegistered, - WebhookValidationTotal, - } - for _, c := range collectors { - if err := reg.Register(c); err != nil { - // AlreadyRegisteredError is expected in tests that share the global vars. - if _, ok := err.(prometheus.AlreadyRegisteredError); !ok { - t.Fatalf("failed to register collector: %v", err) - } - } - } - return reg -} - func TestRuntimeKind(t *testing.T) { tests := []struct { name string @@ -104,8 +72,6 @@ func TestRuntimeKind(t *testing.T) { } func TestRecordTrainJobCreated(t *testing.T) { - newIsolatedRegistry(t) - RecordTrainJobCreated("test-ns", "ClusterTrainingRuntime") if got := testutil.ToFloat64(TrainJobsCreatedTotal.WithLabelValues("test-ns", "ClusterTrainingRuntime")); got != 1 { @@ -117,8 +83,6 @@ func TestRecordTrainJobCreated(t *testing.T) { } func TestRecordTrainJobDeleted(t *testing.T) { - newIsolatedRegistry(t) - // Use a unique namespace so the active gauge is isolated from other tests. // One Create followed by one Delete should net to zero. RecordTrainJobCreated("test-ns-deleted", "ClusterTrainingRuntime") @@ -133,8 +97,6 @@ func TestRecordTrainJobDeleted(t *testing.T) { } func TestRecordTrainJobCompleted(t *testing.T) { - newIsolatedRegistry(t) - RecordTrainJobCompleted("test-ns", "ClusterTrainingRuntime", 30*time.Second) if got := testutil.ToFloat64(TrainJobsCompletedTotal.WithLabelValues("test-ns", "ClusterTrainingRuntime")); got != 1 { @@ -147,8 +109,6 @@ func TestRecordTrainJobCompleted(t *testing.T) { } func TestRecordTrainJobFailed(t *testing.T) { - newIsolatedRegistry(t) - RecordTrainJobFailed("test-ns", "ClusterTrainingRuntime", "DeadlineExceeded", 60*time.Second) if got := testutil.ToFloat64(TrainJobsFailedTotal.WithLabelValues("test-ns", "ClusterTrainingRuntime", "DeadlineExceeded")); got != 1 { @@ -161,8 +121,6 @@ func TestRecordTrainJobFailed(t *testing.T) { } func TestRecordTrainJobSuspended(t *testing.T) { - newIsolatedRegistry(t) - RecordTrainJobSuspended("test-ns", "TrainingRuntime") if got := testutil.ToFloat64(TrainJobsSuspendedTotal.WithLabelValues("test-ns", "TrainingRuntime")); got != 1 { @@ -171,8 +129,6 @@ func TestRecordTrainJobSuspended(t *testing.T) { } func TestObserveReconcile(t *testing.T) { - newIsolatedRegistry(t) - ObserveReconcile("trainjob_controller", "success", 5*time.Millisecond) ObserveReconcile("trainjob_controller", "error", 2*time.Millisecond) @@ -183,8 +139,6 @@ func TestObserveReconcile(t *testing.T) { } func TestObservePlugin(t *testing.T) { - newIsolatedRegistry(t) - ObservePlugin("jobset", "build", 1*time.Millisecond, nil) ObservePlugin("torch", "enforce_ml_policy", 2*time.Millisecond, errors.New("plugin error")) diff --git a/pkg/runtime/core/core.go b/pkg/runtime/core/core.go index d27ad166ea..0d0fd3a002 100644 --- a/pkg/runtime/core/core.go +++ b/pkg/runtime/core/core.go @@ -20,6 +20,7 @@ import ( "context" "fmt" + "k8s.io/apimachinery/pkg/runtime/schema" "sigs.k8s.io/controller-runtime/pkg/client" configapi "github.com/kubeflow/trainer/v2/pkg/apis/config/v1alpha1" @@ -56,13 +57,10 @@ func New(ctx context.Context, client client.Client, indexer client.FieldIndexer, } } runtimes = newRuntimes - // Record the number of registered runtimes by kind for observability. - kindCounts := make(map[string]float64) + // Record each registered runtime by group and kind for observability. for key := range newRuntimes { - kindCounts[key]++ - } - for kind, count := range kindCounts { - metrics.RuntimesRegistered.WithLabelValues(kind).Set(count) + gk := schema.ParseGroupKind(key) + metrics.RuntimesRegistered.WithLabelValues(gk.Group, gk.Kind).Set(1) } return newRuntimes, nil } From 9295812f6b14e0e07f26f3eef1e7f1ed1bd67b82 Mon Sep 17 00:00:00 2001 From: Ayush Petwal Date: Thu, 16 Apr 2026 23:41:05 +0530 Subject: [PATCH 3/3] fix(manifests): drop hard-coded namespaceSelector from ServiceMonitor Signed-off-by: Ayush Petwal --- manifests/overlays/monitoring/service_monitor.yaml | 3 --- 1 file changed, 3 deletions(-) diff --git a/manifests/overlays/monitoring/service_monitor.yaml b/manifests/overlays/monitoring/service_monitor.yaml index a1c91b4564..8f43460d9d 100644 --- a/manifests/overlays/monitoring/service_monitor.yaml +++ b/manifests/overlays/monitoring/service_monitor.yaml @@ -24,9 +24,6 @@ spec: selector: matchLabels: app.kubernetes.io/component: manager - namespaceSelector: - matchNames: - - kubeflow endpoints: - port: monitoring-port path: /metrics