diff --git a/charts/kubeflow-trainer/templates/manager/service-monitor.yaml b/charts/kubeflow-trainer/templates/manager/service-monitor.yaml new file mode 100644 index 0000000000..e8d09bb1a1 --- /dev/null +++ b/charts/kubeflow-trainer/templates/manager/service-monitor.yaml @@ -0,0 +1,51 @@ +{{- /* +Copyright 2024 The Kubeflow authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + https://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ -}} + +{{- if .Values.manager.metrics.serviceMonitor.enabled }} +{{- if not (.Capabilities.APIVersions.Has "monitoring.coreos.com/v1/ServiceMonitor") -}} +{{- fail "ServiceMonitor requires the monitoring.coreos.com/v1 CRD (Prometheus Operator). Install Prometheus Operator first or set manager.metrics.serviceMonitor.enabled=false." -}} +{{- end }} +apiVersion: monitoring.coreos.com/v1 +kind: ServiceMonitor +metadata: + name: {{ include "trainer.manager.service.name" . }} + namespace: {{ .Release.Namespace }} + labels: + {{- include "trainer.manager.labels" . | nindent 4 }} + {{- with .Values.manager.metrics.serviceMonitor.additionalLabels }} + {{- toYaml . | nindent 4 }} + {{- end }} +spec: + selector: + matchLabels: + {{- include "trainer.manager.selectorLabels" . | nindent 6 }} + namespaceSelector: + matchNames: + - {{ .Release.Namespace }} + endpoints: + - port: monitoring-port + path: /metrics + scheme: https + interval: {{ .Values.manager.metrics.serviceMonitor.interval | default "30s" }} + scrapeTimeout: {{ .Values.manager.metrics.serviceMonitor.scrapeTimeout | default "10s" }} + bearerTokenFile: /var/run/secrets/kubernetes.io/serviceaccount/token + tlsConfig: + {{- if .Values.manager.metrics.serviceMonitor.tlsConfig }} + {{- toYaml .Values.manager.metrics.serviceMonitor.tlsConfig | nindent 8 }} + {{- else }} + insecureSkipVerify: true + {{- end }} +{{- end }} diff --git a/charts/kubeflow-trainer/values.yaml b/charts/kubeflow-trainer/values.yaml index 6d32e47214..99a259ee8c 100644 --- a/charts/kubeflow-trainer/values.yaml +++ b/charts/kubeflow-trainer/values.yaml @@ -108,6 +108,24 @@ manager: seccompProfile: type: RuntimeDefault + # -- Prometheus metrics configuration. + metrics: + serviceMonitor: + # -- Whether to create a Prometheus Operator ServiceMonitor for /metrics. + # Requires the monitoring.coreos.com/v1 CRDs (Prometheus Operator) installed. + enabled: false + # -- Scrape interval. + interval: 30s + # -- Scrape timeout. + scrapeTimeout: 10s + # -- Extra labels added to the ServiceMonitor metadata (e.g. release: prometheus) + # so that the Prometheus instance selects this monitor. + additionalLabels: {} + # -- TLS config for the metrics endpoint. + # Defaults to insecureSkipVerify: true so the controller-manager's self-signed CA + # does not block scrapes. Override with caFile/serverName for production. + tlsConfig: {} + # -- Controller manager configuration. # This configuration is used to generate the ConfigMap for the controller manager. config: diff --git a/cmd/trainer-controller-manager/main.go b/cmd/trainer-controller-manager/main.go index 3871dc84d2..13b58df638 100644 --- a/cmd/trainer-controller-manager/main.go +++ b/cmd/trainer-controller-manager/main.go @@ -21,6 +21,7 @@ import ( "flag" "net/http" "os" + goruntime "runtime" zaplog "go.uber.org/zap" "go.uber.org/zap/zapcore" @@ -41,10 +42,12 @@ import ( "github.com/kubeflow/trainer/v2/pkg/config" "github.com/kubeflow/trainer/v2/pkg/controller" "github.com/kubeflow/trainer/v2/pkg/features" + "github.com/kubeflow/trainer/v2/pkg/metrics" "github.com/kubeflow/trainer/v2/pkg/runtime" runtimecore "github.com/kubeflow/trainer/v2/pkg/runtime/core" "github.com/kubeflow/trainer/v2/pkg/statusserver" "github.com/kubeflow/trainer/v2/pkg/util/cert" + "github.com/kubeflow/trainer/v2/pkg/version" "github.com/kubeflow/trainer/v2/pkg/webhooks" ) @@ -125,6 +128,16 @@ func main() { os.Exit(1) } + metrics.Register() + metrics.BuildInfo.WithLabelValues( + version.GitVersion, + version.GitCommit, + version.BuildDate, + goruntime.Version(), + goruntime.Compiler, + goruntime.GOOS+"/"+goruntime.GOARCH, + ).Set(1) + certsReady := make(chan struct{}) if config.IsCertManagementEnabled(&cfg) { setupLog.Info("Setting up certificate management") diff --git a/go.mod b/go.mod index 82f2dbc221..9919c77a17 100644 --- a/go.mod +++ b/go.mod @@ -9,6 +9,7 @@ require ( github.com/onsi/ginkgo/v2 v2.28.1 github.com/onsi/gomega v1.39.1 github.com/open-policy-agent/cert-controller v0.16.0 + github.com/prometheus/client_golang v1.23.2 go.uber.org/zap v1.27.1 golang.org/x/crypto v0.48.0 k8s.io/api v0.35.2 @@ -54,6 +55,7 @@ require ( github.com/inconshreveable/mousetrap v1.1.0 // indirect github.com/josharian/intern v1.0.0 // indirect github.com/json-iterator/go v1.1.12 // indirect + github.com/kylelemons/godebug v1.1.0 // indirect github.com/mailru/easyjson v0.9.0 // indirect github.com/mattn/go-isatty v0.0.20 // indirect github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd // indirect @@ -62,7 +64,6 @@ require ( github.com/pelletier/go-toml v1.9.5 // indirect github.com/pkg/errors v0.9.1 // indirect github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2 // indirect - github.com/prometheus/client_golang v1.23.2 // indirect github.com/prometheus/client_model v0.6.2 // indirect github.com/prometheus/common v0.66.1 // indirect github.com/prometheus/procfs v0.16.1 // indirect diff --git a/manifests/base/manager/manager.yaml b/manifests/base/manager/manager.yaml index 209ee07c09..431b1bee5d 100644 --- a/manifests/base/manager/manager.yaml +++ b/manifests/base/manager/manager.yaml @@ -79,6 +79,10 @@ apiVersion: v1 kind: Service metadata: name: kubeflow-trainer-controller-manager + labels: + app.kubernetes.io/name: trainer + app.kubernetes.io/component: manager + app.kubernetes.io/part-of: kubeflow spec: ports: - name: monitoring-port diff --git a/manifests/overlays/monitoring/kustomization.yaml b/manifests/overlays/monitoring/kustomization.yaml new file mode 100644 index 0000000000..b33f78105c --- /dev/null +++ b/manifests/overlays/monitoring/kustomization.yaml @@ -0,0 +1,4 @@ +# Monitoring overlay — apply this only if Prometheus Operator (monitoring.coreos.com/v1) is installed. +resources: + - ../../base/manager + - service_monitor.yaml diff --git a/manifests/overlays/monitoring/service_monitor.yaml b/manifests/overlays/monitoring/service_monitor.yaml new file mode 100644 index 0000000000..8f43460d9d --- /dev/null +++ b/manifests/overlays/monitoring/service_monitor.yaml @@ -0,0 +1,35 @@ +# Copyright 2024 The Kubeflow Authors. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +# ServiceMonitor for the Kubeflow Trainer controller-manager metrics endpoint. +# Requires the Prometheus Operator (monitoring.coreos.com/v1) to be installed. +# Apply this overlay only if Prometheus Operator is available in the cluster. +apiVersion: monitoring.coreos.com/v1 +kind: ServiceMonitor +metadata: + name: kubeflow-trainer-controller-manager + namespace: kubeflow +spec: + selector: + matchLabels: + app.kubernetes.io/component: manager + endpoints: + - port: monitoring-port + path: /metrics + scheme: https + interval: 30s + scrapeTimeout: 10s + bearerTokenFile: /var/run/secrets/kubernetes.io/serviceaccount/token + tlsConfig: + insecureSkipVerify: true diff --git a/pkg/controller/trainjob_controller.go b/pkg/controller/trainjob_controller.go index 4c5e3529a5..f99cfc4d1a 100644 --- a/pkg/controller/trainjob_controller.go +++ b/pkg/controller/trainjob_controller.go @@ -45,6 +45,7 @@ import ( trainer "github.com/kubeflow/trainer/v2/pkg/apis/trainer/v1alpha1" "github.com/kubeflow/trainer/v2/pkg/constants" + "github.com/kubeflow/trainer/v2/pkg/metrics" jobruntimes "github.com/kubeflow/trainer/v2/pkg/runtime" "github.com/kubeflow/trainer/v2/pkg/util/trainjob" ) @@ -98,6 +99,10 @@ func NewTrainJobReconciler(client client.Client, recorder events.EventRecorder, // +kubebuilder:rbac:groups=coordination.k8s.io,resources=leases,verbs=create;get;list;update func (r *TrainJobReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) { + start := time.Now() + reconcileResult := "success" + defer func() { metrics.ObserveReconcile("trainjob_controller", reconcileResult, time.Since(start)) }() + var trainJob trainer.TrainJob if err := r.client.Get(ctx, req.NamespacedName, &trainJob); err != nil { return ctrl.Result{}, client.IgnoreNotFound(err) @@ -136,13 +141,56 @@ func (r *TrainJobReconciler) Reconcile(ctx context.Context, req ctrl.Request) (c setSuspendedCondition(&trainJob) + // Collect pending lifecycle metric callbacks; fired only after a successful status patch + // to prevent double-counting when the patch fails and the next reconcile re-detects the transition. + var pendingMetrics []func() + + // Detect Suspended: False → True transitions. + prevSuspendedCond := meta.FindStatusCondition(prevTrainJob.Status.Conditions, trainer.TrainJobSuspended) + currSuspendedCond := meta.FindStatusCondition(trainJob.Status.Conditions, trainer.TrainJobSuspended) + if currSuspendedCond != nil && currSuspendedCond.Status == metav1.ConditionTrue && + (prevSuspendedCond == nil || prevSuspendedCond.Status != metav1.ConditionTrue) { + ns, rk := trainJob.Namespace, metrics.RuntimeKind(&trainJob) + pendingMetrics = append(pendingMetrics, func() { metrics.RecordTrainJobSuspended(ns, rk) }) + } + if statusErr := setTrainJobStatus(ctx, runtime, &trainJob); statusErr != nil { err = errors.Join(err, statusErr) } + // Detect terminal state transitions. + runtimeKind := metrics.RuntimeKind(&trainJob) + prevCompleteCond := meta.FindStatusCondition(prevTrainJob.Status.Conditions, trainer.TrainJobComplete) + currCompleteCond := meta.FindStatusCondition(trainJob.Status.Conditions, trainer.TrainJobComplete) + if currCompleteCond != nil && currCompleteCond.Status == metav1.ConditionTrue && + (prevCompleteCond == nil || prevCompleteCond.Status != metav1.ConditionTrue) { + ns, rk := trainJob.Namespace, runtimeKind + dur := currCompleteCond.LastTransitionTime.Sub(trainJob.CreationTimestamp.Time) + pendingMetrics = append(pendingMetrics, func() { metrics.RecordTrainJobCompleted(ns, rk, dur) }) + } + + prevFailedCond := meta.FindStatusCondition(prevTrainJob.Status.Conditions, trainer.TrainJobFailed) + currFailedCond := meta.FindStatusCondition(trainJob.Status.Conditions, trainer.TrainJobFailed) + if currFailedCond != nil && currFailedCond.Status == metav1.ConditionTrue && + (prevFailedCond == nil || prevFailedCond.Status != metav1.ConditionTrue) { + ns, rk, reason := trainJob.Namespace, runtimeKind, currFailedCond.Reason + dur := currFailedCond.LastTransitionTime.Sub(trainJob.CreationTimestamp.Time) + pendingMetrics = append(pendingMetrics, func() { metrics.RecordTrainJobFailed(ns, rk, reason, dur) }) + } + + if err != nil { + reconcileResult = "error" + } + if deadlineResult, deadlineErr := r.reconcileDeadline(ctx, &trainJob); deadlineErr != nil || deadlineResult.RequeueAfter > 0 { if !equality.Semantic.DeepEqual(&trainJob.Status, &prevTrainJob.Status) { - return deadlineResult, errors.Join(err, r.client.Status().Patch(ctx, &trainJob, client.MergeFrom(prevTrainJob))) + patchErr := r.client.Status().Patch(ctx, &trainJob, client.MergeFrom(prevTrainJob)) + if patchErr == nil { + for _, fn := range pendingMetrics { + fn() + } + } + return deadlineResult, errors.Join(err, patchErr) } return deadlineResult, errors.Join(err, deadlineErr) } @@ -150,7 +198,13 @@ func (r *TrainJobReconciler) Reconcile(ctx context.Context, req ctrl.Request) (c if !equality.Semantic.DeepEqual(&trainJob.Status, prevTrainJob.Status) { // TODO(astefanutti): Consider using SSA once controller-runtime client has SSA support // for sub-resources. See: https://github.com/kubernetes-sigs/controller-runtime/issues/3183 - return ctrl.Result{}, errors.Join(err, r.client.Status().Patch(ctx, &trainJob, client.MergeFrom(prevTrainJob))) + patchErr := r.client.Status().Patch(ctx, &trainJob, client.MergeFrom(prevTrainJob)) + if patchErr == nil { + for _, fn := range pendingMetrics { + fn() + } + } + return ctrl.Result{}, errors.Join(err, patchErr) } return ctrl.Result{}, err } @@ -208,12 +262,14 @@ func (r *TrainJobReconciler) reconcileDeadline(ctx context.Context, trainJob *tr func (r *TrainJobReconciler) Create(e event.TypedCreateEvent[*trainer.TrainJob]) bool { r.log.WithValues("trainJob", klog.KObj(e.Object)).Info("TrainJob create event") + metrics.RecordTrainJobCreated(e.Object.Namespace, metrics.RuntimeKind(e.Object)) defer r.notifyWatchers(nil, e.Object) return true } func (r *TrainJobReconciler) Delete(e event.TypedDeleteEvent[*trainer.TrainJob]) bool { r.log.WithValues("trainJob", klog.KObj(e.Object)).Info("TrainJob delete event") + metrics.RecordTrainJobDeleted(e.Object.Namespace, metrics.RuntimeKind(e.Object)) defer r.notifyWatchers(e.Object, nil) return true } diff --git a/pkg/metrics/metrics.go b/pkg/metrics/metrics.go new file mode 100644 index 0000000000..c6e2a617ff --- /dev/null +++ b/pkg/metrics/metrics.go @@ -0,0 +1,220 @@ +/* +Copyright 2024 The Kubeflow Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package metrics + +import ( + "sync" + "time" + + "github.com/prometheus/client_golang/prometheus" + "k8s.io/utils/ptr" + ctrlmetrics "sigs.k8s.io/controller-runtime/pkg/metrics" + + trainer "github.com/kubeflow/trainer/v2/pkg/apis/trainer/v1alpha1" +) + +const ( + metricsNamespace = "kubeflow" + metricsSubsystem = "trainer" +) + +// durationBuckets matches Kueue's bucket policy: covers tiny CI jobs through long-running fine-tunes. +var durationBuckets = []float64{1, 5, 10, 30, 60, 120, 300, 600, 1800, 3600, 10800, 21600} + +var ( + // BuildInfo surfaces build metadata as a Gauge always set to 1. + BuildInfo = prometheus.NewGaugeVec(prometheus.GaugeOpts{ + Namespace: metricsNamespace, + Subsystem: metricsSubsystem, + Name: "build_info", + Help: "Build metadata for the Kubeflow Trainer controller manager (always 1).", + }, []string{"git_version", "git_commit", "build_date", "go_version", "compiler", "platform"}) + + // TrainJobsCreatedTotal is a counter for the total number of TrainJobs created. + TrainJobsCreatedTotal = prometheus.NewCounterVec(prometheus.CounterOpts{ + Namespace: metricsNamespace, + Subsystem: metricsSubsystem, + Name: "trainjobs_created_total", + Help: "Total number of TrainJobs created.", + }, []string{"namespace", "runtime"}) + + // TrainJobsCompletedTotal is a counter for TrainJobs that completed successfully. + TrainJobsCompletedTotal = prometheus.NewCounterVec(prometheus.CounterOpts{ + Namespace: metricsNamespace, + Subsystem: metricsSubsystem, + Name: "trainjobs_completed_total", + Help: "Total number of TrainJobs that completed successfully.", + }, []string{"namespace", "runtime"}) + + // TrainJobsFailedTotal is a counter for TrainJobs that failed. + // The reason label maps to the TrainJob Failed condition reason. + TrainJobsFailedTotal = prometheus.NewCounterVec(prometheus.CounterOpts{ + Namespace: metricsNamespace, + Subsystem: metricsSubsystem, + Name: "trainjobs_failed_total", + Help: "Total number of TrainJobs that failed.", + }, []string{"namespace", "runtime", "reason"}) + + // TrainJobsSuspendedTotal is a counter for TrainJob suspension events. + TrainJobsSuspendedTotal = prometheus.NewCounterVec(prometheus.CounterOpts{ + Namespace: metricsNamespace, + Subsystem: metricsSubsystem, + Name: "trainjobs_suspended_total", + Help: "Total number of TrainJob suspension events.", + }, []string{"namespace", "runtime"}) + + // TrainJobsDeletedTotal is a counter for TrainJobs that have been deleted. + TrainJobsDeletedTotal = prometheus.NewCounterVec(prometheus.CounterOpts{ + Namespace: metricsNamespace, + Subsystem: metricsSubsystem, + Name: "trainjobs_deleted_total", + Help: "Total number of TrainJobs deleted.", + }, []string{"namespace", "runtime"}) + + // TrainJobsActive is a gauge tracking TrainJobs present in the cluster. + // It resets on controller restart; incremented on Create events, decremented on Delete events. + TrainJobsActive = prometheus.NewGaugeVec(prometheus.GaugeOpts{ + Namespace: metricsNamespace, + Subsystem: metricsSubsystem, + Name: "trainjobs_active", + Help: "Number of TrainJobs currently present in the cluster (resets on controller restart).", + }, []string{"namespace", "runtime"}) + + // TrainJobDurationSeconds is a histogram of TrainJob end-to-end duration from creation to terminal condition. + TrainJobDurationSeconds = prometheus.NewHistogramVec(prometheus.HistogramOpts{ + Namespace: metricsNamespace, + Subsystem: metricsSubsystem, + Name: "trainjob_duration_seconds", + Help: "End-to-end duration of TrainJobs from creation to terminal condition.", + Buckets: durationBuckets, + }, []string{"namespace", "runtime", "result"}) + + // ReconcileDurationSeconds is a histogram of reconcile iteration latencies. + ReconcileDurationSeconds = prometheus.NewHistogramVec(prometheus.HistogramOpts{ + Namespace: metricsNamespace, + Subsystem: metricsSubsystem, + Name: "reconcile_duration_seconds", + Help: "Latency of reconcile iterations per controller.", + Buckets: prometheus.DefBuckets, + }, []string{"controller", "result"}) + + // PluginExecutionDurationSeconds is a histogram of plugin execution latencies. + PluginExecutionDurationSeconds = prometheus.NewHistogramVec(prometheus.HistogramOpts{ + Namespace: metricsNamespace, + Subsystem: metricsSubsystem, + Name: "plugin_execution_duration_seconds", + Help: "Latency of plugin execution per plugin and phase.", + Buckets: prometheus.DefBuckets, + }, []string{"plugin", "phase"}) + + // PluginExecutionErrorsTotal is a counter for plugin execution errors. + PluginExecutionErrorsTotal = prometheus.NewCounterVec(prometheus.CounterOpts{ + Namespace: metricsNamespace, + Subsystem: metricsSubsystem, + Name: "plugin_execution_errors_total", + Help: "Total number of errors encountered during plugin execution.", + }, []string{"plugin", "phase"}) + + // RuntimesRegistered is a gauge for the number of registered training runtimes by group and kind. + RuntimesRegistered = prometheus.NewGaugeVec(prometheus.GaugeOpts{ + Namespace: metricsNamespace, + Subsystem: metricsSubsystem, + Name: "runtimes_registered", + Help: "Number of registered training runtimes by group and kind.", + }, []string{"group", "kind"}) + + // WebhookValidationTotal is a counter for TrainJob webhook validation calls. + WebhookValidationTotal = prometheus.NewCounterVec(prometheus.CounterOpts{ + Namespace: metricsNamespace, + Subsystem: metricsSubsystem, + Name: "webhook_validation_total", + Help: "Total number of webhook validation calls by resource, operation, and result.", + }, []string{"resource", "operation", "result"}) +) + +var registerOnce sync.Once + +// Register registers all Kubeflow Trainer metrics with the controller-runtime registry. +// It is idempotent and safe to call multiple times; only the first call takes effect. +func Register() { + registerOnce.Do(func() { + ctrlmetrics.Registry.MustRegister( + BuildInfo, + TrainJobsCreatedTotal, + TrainJobsCompletedTotal, + TrainJobsFailedTotal, + TrainJobsSuspendedTotal, + TrainJobsDeletedTotal, + TrainJobsActive, + TrainJobDurationSeconds, + ReconcileDurationSeconds, + PluginExecutionDurationSeconds, + PluginExecutionErrorsTotal, + RuntimesRegistered, + WebhookValidationTotal, + ) + }) +} + +// RuntimeKind extracts the runtime kind label from a TrainJob's RuntimeRef. +// Returns "Unknown" if the Kind field is nil. +func RuntimeKind(trainJob *trainer.TrainJob) string { + return ptr.Deref(trainJob.Spec.RuntimeRef.Kind, "Unknown") +} + +// RecordTrainJobCreated increments the created counter and active gauge. +func RecordTrainJobCreated(namespace, runtimeKind string) { + TrainJobsCreatedTotal.WithLabelValues(namespace, runtimeKind).Inc() + TrainJobsActive.WithLabelValues(namespace, runtimeKind).Inc() +} + +// RecordTrainJobDeleted increments the deleted counter and decrements the active gauge. +func RecordTrainJobDeleted(namespace, runtimeKind string) { + TrainJobsDeletedTotal.WithLabelValues(namespace, runtimeKind).Inc() + TrainJobsActive.WithLabelValues(namespace, runtimeKind).Dec() +} + +// RecordTrainJobCompleted increments the completed counter and observes the duration histogram. +func RecordTrainJobCompleted(namespace, runtimeKind string, dur time.Duration) { + TrainJobsCompletedTotal.WithLabelValues(namespace, runtimeKind).Inc() + TrainJobDurationSeconds.WithLabelValues(namespace, runtimeKind, "Complete").Observe(dur.Seconds()) +} + +// RecordTrainJobFailed increments the failed counter and observes the duration histogram. +func RecordTrainJobFailed(namespace, runtimeKind, reason string, dur time.Duration) { + TrainJobsFailedTotal.WithLabelValues(namespace, runtimeKind, reason).Inc() + TrainJobDurationSeconds.WithLabelValues(namespace, runtimeKind, "Failed").Observe(dur.Seconds()) +} + +// RecordTrainJobSuspended increments the suspended counter. +func RecordTrainJobSuspended(namespace, runtimeKind string) { + TrainJobsSuspendedTotal.WithLabelValues(namespace, runtimeKind).Inc() +} + +// ObserveReconcile records the duration of a reconcile iteration. +func ObserveReconcile(controller, result string, dur time.Duration) { + ReconcileDurationSeconds.WithLabelValues(controller, result).Observe(dur.Seconds()) +} + +// ObservePlugin records the latency of a plugin execution phase and increments the +// error counter if err is non-nil. +func ObservePlugin(pluginName, phase string, dur time.Duration, err error) { + PluginExecutionDurationSeconds.WithLabelValues(pluginName, phase).Observe(dur.Seconds()) + if err != nil { + PluginExecutionErrorsTotal.WithLabelValues(pluginName, phase).Inc() + } +} diff --git a/pkg/metrics/metrics_test.go b/pkg/metrics/metrics_test.go new file mode 100644 index 0000000000..984eb42404 --- /dev/null +++ b/pkg/metrics/metrics_test.go @@ -0,0 +1,156 @@ +/* +Copyright 2024 The Kubeflow Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package metrics + +import ( + "errors" + "testing" + "time" + + "github.com/prometheus/client_golang/prometheus/testutil" + "k8s.io/utils/ptr" + + trainer "github.com/kubeflow/trainer/v2/pkg/apis/trainer/v1alpha1" +) + +func TestRuntimeKind(t *testing.T) { + tests := []struct { + name string + trainJob *trainer.TrainJob + want string + }{ + { + name: "kind set to ClusterTrainingRuntime", + trainJob: &trainer.TrainJob{ + Spec: trainer.TrainJobSpec{ + RuntimeRef: trainer.RuntimeRef{Kind: ptr.To("ClusterTrainingRuntime")}, + }, + }, + want: "ClusterTrainingRuntime", + }, + { + name: "kind set to TrainingRuntime", + trainJob: &trainer.TrainJob{ + Spec: trainer.TrainJobSpec{ + RuntimeRef: trainer.RuntimeRef{Kind: ptr.To("TrainingRuntime")}, + }, + }, + want: "TrainingRuntime", + }, + { + name: "kind is nil", + trainJob: &trainer.TrainJob{ + Spec: trainer.TrainJobSpec{ + RuntimeRef: trainer.RuntimeRef{}, + }, + }, + want: "Unknown", + }, + } + for _, tc := range tests { + t.Run(tc.name, func(t *testing.T) { + got := RuntimeKind(tc.trainJob) + if got != tc.want { + t.Errorf("RuntimeKind() = %q, want %q", got, tc.want) + } + }) + } +} + +func TestRecordTrainJobCreated(t *testing.T) { + RecordTrainJobCreated("test-ns", "ClusterTrainingRuntime") + + if got := testutil.ToFloat64(TrainJobsCreatedTotal.WithLabelValues("test-ns", "ClusterTrainingRuntime")); got != 1 { + t.Errorf("TrainJobsCreatedTotal = %v, want 1", got) + } + if got := testutil.ToFloat64(TrainJobsActive.WithLabelValues("test-ns", "ClusterTrainingRuntime")); got != 1 { + t.Errorf("TrainJobsActive = %v, want 1", got) + } +} + +func TestRecordTrainJobDeleted(t *testing.T) { + // Use a unique namespace so the active gauge is isolated from other tests. + // One Create followed by one Delete should net to zero. + RecordTrainJobCreated("test-ns-deleted", "ClusterTrainingRuntime") + RecordTrainJobDeleted("test-ns-deleted", "ClusterTrainingRuntime") + + if got := testutil.ToFloat64(TrainJobsDeletedTotal.WithLabelValues("test-ns-deleted", "ClusterTrainingRuntime")); got != 1 { + t.Errorf("TrainJobsDeletedTotal = %v, want 1", got) + } + if got := testutil.ToFloat64(TrainJobsActive.WithLabelValues("test-ns-deleted", "ClusterTrainingRuntime")); got != 0 { + t.Errorf("TrainJobsActive = %v, want 0", got) + } +} + +func TestRecordTrainJobCompleted(t *testing.T) { + RecordTrainJobCompleted("test-ns", "ClusterTrainingRuntime", 30*time.Second) + + if got := testutil.ToFloat64(TrainJobsCompletedTotal.WithLabelValues("test-ns", "ClusterTrainingRuntime")); got != 1 { + t.Errorf("TrainJobsCompletedTotal = %v, want 1", got) + } + // Verify histogram received at least one observation. + if count := testutil.CollectAndCount(TrainJobDurationSeconds, "kubeflow_trainer_trainjob_duration_seconds"); count == 0 { + t.Errorf("TrainJobDurationSeconds has no series, want at least one observation") + } +} + +func TestRecordTrainJobFailed(t *testing.T) { + RecordTrainJobFailed("test-ns", "ClusterTrainingRuntime", "DeadlineExceeded", 60*time.Second) + + if got := testutil.ToFloat64(TrainJobsFailedTotal.WithLabelValues("test-ns", "ClusterTrainingRuntime", "DeadlineExceeded")); got != 1 { + t.Errorf("TrainJobsFailedTotal = %v, want 1", got) + } + // Verify histogram received at least one observation. + if count := testutil.CollectAndCount(TrainJobDurationSeconds, "kubeflow_trainer_trainjob_duration_seconds"); count == 0 { + t.Errorf("TrainJobDurationSeconds has no series, want at least one observation") + } +} + +func TestRecordTrainJobSuspended(t *testing.T) { + RecordTrainJobSuspended("test-ns", "TrainingRuntime") + + if got := testutil.ToFloat64(TrainJobsSuspendedTotal.WithLabelValues("test-ns", "TrainingRuntime")); got != 1 { + t.Errorf("TrainJobsSuspendedTotal = %v, want 1", got) + } +} + +func TestObserveReconcile(t *testing.T) { + ObserveReconcile("trainjob_controller", "success", 5*time.Millisecond) + ObserveReconcile("trainjob_controller", "error", 2*time.Millisecond) + + // Verify histogram received observations for both results. + if count := testutil.CollectAndCount(ReconcileDurationSeconds, "kubeflow_trainer_reconcile_duration_seconds"); count == 0 { + t.Errorf("ReconcileDurationSeconds has no series, want at least one observation") + } +} + +func TestObservePlugin(t *testing.T) { + ObservePlugin("jobset", "build", 1*time.Millisecond, nil) + ObservePlugin("torch", "enforce_ml_policy", 2*time.Millisecond, errors.New("plugin error")) + + // Verify the histogram received observations. + if count := testutil.CollectAndCount(PluginExecutionDurationSeconds, "kubeflow_trainer_plugin_execution_duration_seconds"); count == 0 { + t.Errorf("PluginExecutionDurationSeconds has no series, want at least one observation") + } + if got := testutil.ToFloat64(PluginExecutionErrorsTotal.WithLabelValues("torch", "enforce_ml_policy")); got != 1 { + t.Errorf("PluginExecutionErrorsTotal[torch/enforce_ml_policy] = %v, want 1", got) + } + // No error for jobset/build + if got := testutil.ToFloat64(PluginExecutionErrorsTotal.WithLabelValues("jobset", "build")); got != 0 { + t.Errorf("PluginExecutionErrorsTotal[jobset/build] = %v, want 0", got) + } +} diff --git a/pkg/runtime/core/core.go b/pkg/runtime/core/core.go index dcea1df6de..0d0fd3a002 100644 --- a/pkg/runtime/core/core.go +++ b/pkg/runtime/core/core.go @@ -20,9 +20,11 @@ import ( "context" "fmt" + "k8s.io/apimachinery/pkg/runtime/schema" "sigs.k8s.io/controller-runtime/pkg/client" configapi "github.com/kubeflow/trainer/v2/pkg/apis/config/v1alpha1" + "github.com/kubeflow/trainer/v2/pkg/metrics" "github.com/kubeflow/trainer/v2/pkg/runtime" ) @@ -55,6 +57,11 @@ func New(ctx context.Context, client client.Client, indexer client.FieldIndexer, } } runtimes = newRuntimes + // Record each registered runtime by group and kind for observability. + for key := range newRuntimes { + gk := schema.ParseGroupKind(key) + metrics.RuntimesRegistered.WithLabelValues(gk.Group, gk.Kind).Set(1) + } return newRuntimes, nil } diff --git a/pkg/runtime/framework/core/framework.go b/pkg/runtime/framework/core/framework.go index cd7394a8f2..1b824ea595 100644 --- a/pkg/runtime/framework/core/framework.go +++ b/pkg/runtime/framework/core/framework.go @@ -19,6 +19,7 @@ package core import ( "context" "errors" + "time" apiruntime "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/util/validation/field" @@ -27,6 +28,7 @@ import ( configapi "github.com/kubeflow/trainer/v2/pkg/apis/config/v1alpha1" trainer "github.com/kubeflow/trainer/v2/pkg/apis/trainer/v1alpha1" + "github.com/kubeflow/trainer/v2/pkg/metrics" "github.com/kubeflow/trainer/v2/pkg/runtime" "github.com/kubeflow/trainer/v2/pkg/runtime/framework" fwkplugins "github.com/kubeflow/trainer/v2/pkg/runtime/framework/plugins" @@ -93,7 +95,10 @@ func New(ctx context.Context, c client.Client, r fwkplugins.Registry, indexer cl func (f *Framework) RunEnforceMLPolicyPlugins(info *runtime.Info, trainJob *trainer.TrainJob) error { for _, plugin := range f.enforceMLPlugins { - if err := plugin.EnforceMLPolicy(info, trainJob); err != nil { + start := time.Now() + err := plugin.EnforceMLPolicy(info, trainJob) + metrics.ObservePlugin(plugin.Name(), "enforce_ml_policy", time.Since(start), err) + if err != nil { return err } } @@ -102,7 +107,10 @@ func (f *Framework) RunEnforceMLPolicyPlugins(info *runtime.Info, trainJob *trai func (f *Framework) RunEnforcePodGroupPolicyPlugins(info *runtime.Info, trainJob *trainer.TrainJob) error { for _, plugin := range f.enforcePodGroupPolicyPlugins { - if err := plugin.EnforcePodGroupPolicy(info, trainJob); err != nil { + start := time.Now() + err := plugin.EnforcePodGroupPolicy(info, trainJob) + metrics.ObservePlugin(plugin.Name(), "enforce_pod_group_policy", time.Since(start), err) + if err != nil { return err } } @@ -113,7 +121,13 @@ func (f *Framework) RunCustomValidationPlugins(ctx context.Context, info *runtim var aggregatedWarnings admission.Warnings var aggregatedErrors field.ErrorList for _, plugin := range f.customValidationPlugins { + start := time.Now() warnings, errs := plugin.Validate(ctx, info, oldObj, newObj) + var pluginErr error + if len(errs) > 0 { + pluginErr = errs.ToAggregate() + } + metrics.ObservePlugin(plugin.Name(), "validate", time.Since(start), pluginErr) if len(warnings) != 0 { aggregatedWarnings = append(aggregatedWarnings, warnings...) } @@ -126,7 +140,10 @@ func (f *Framework) RunCustomValidationPlugins(ctx context.Context, info *runtim func (f *Framework) RunPodNetworkPlugins(info *runtime.Info, trainJob *trainer.TrainJob) error { for _, plugin := range f.podNetworkPlugins { - if err := plugin.IdentifyPodNetwork(info, trainJob); err != nil { + start := time.Now() + err := plugin.IdentifyPodNetwork(info, trainJob) + metrics.ObservePlugin(plugin.Name(), "pod_network", time.Since(start), err) + if err != nil { return err } } @@ -136,7 +153,9 @@ func (f *Framework) RunPodNetworkPlugins(info *runtime.Info, trainJob *trainer.T func (f *Framework) RunComponentBuilderPlugins(ctx context.Context, info *runtime.Info, trainJob *trainer.TrainJob) ([]apiruntime.ApplyConfiguration, error) { var objs []apiruntime.ApplyConfiguration for _, plugin := range f.componentBuilderPlugins { + start := time.Now() components, err := plugin.Build(ctx, info, trainJob) + metrics.ObservePlugin(plugin.Name(), "build", time.Since(start), err) if err != nil { return nil, err } @@ -147,7 +166,10 @@ func (f *Framework) RunComponentBuilderPlugins(ctx context.Context, info *runtim func (f *Framework) RunTrainJobStatusPlugin(ctx context.Context, trainJob *trainer.TrainJob) (*trainer.TrainJobStatus, error) { if f.trainJobStatusPlugin != nil { - return f.trainJobStatusPlugin.Status(ctx, trainJob) + start := time.Now() + status, err := f.trainJobStatusPlugin.Status(ctx, trainJob) + metrics.ObservePlugin(f.trainJobStatusPlugin.Name(), "status", time.Since(start), err) + return status, err } return nil, nil } diff --git a/pkg/version/version.go b/pkg/version/version.go new file mode 100644 index 0000000000..70590418a4 --- /dev/null +++ b/pkg/version/version.go @@ -0,0 +1,31 @@ +/* +Copyright 2024 The Kubeflow Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +// Package version holds version information for the Kubeflow Trainer controller manager. +// Variables are set at build time via -ldflags: +// +// -ldflags "-X github.com/kubeflow/trainer/v2/pkg/version.GitVersion=v1.0.0 +// -X github.com/kubeflow/trainer/v2/pkg/version.GitCommit=abc123 +// -X github.com/kubeflow/trainer/v2/pkg/version.BuildDate=2024-01-01T00:00:00Z" +package version + +// These variables are populated at build time via -ldflags. +// They default to "unknown" so the build_info metric is always present. +var ( + GitVersion = "unknown" + GitCommit = "unknown" + BuildDate = "unknown" +) diff --git a/pkg/webhooks/trainjob_webhook.go b/pkg/webhooks/trainjob_webhook.go index 454d08b4d9..d82d17fd1c 100644 --- a/pkg/webhooks/trainjob_webhook.go +++ b/pkg/webhooks/trainjob_webhook.go @@ -30,6 +30,7 @@ import ( "sigs.k8s.io/controller-runtime/pkg/webhook/admission" trainer "github.com/kubeflow/trainer/v2/pkg/apis/trainer/v1alpha1" + "github.com/kubeflow/trainer/v2/pkg/metrics" "github.com/kubeflow/trainer/v2/pkg/runtime" ) @@ -112,12 +113,18 @@ func (w *TrainJobValidator) ValidateCreate(ctx context.Context, obj *trainer.Tra log.V(5).Info("Validating create", "TrainJob", klog.KObj(obj)) runtimeRefGK := runtime.RuntimeRefToRuntimeRegistryKey(obj.Spec.RuntimeRef) - runtime, ok := w.runtimes[runtimeRefGK] + rt, ok := w.runtimes[runtimeRefGK] if !ok { + metrics.WebhookValidationTotal.WithLabelValues("TrainJob", "create", "error").Inc() return nil, fmt.Errorf("unsupported runtime: %s", runtimeRefGK) } - warnings, errors := runtime.ValidateObjects(ctx, nil, obj) - return warnings, errors.ToAggregate() + warnings, errs := rt.ValidateObjects(ctx, nil, obj) + result := "success" + if errs != nil { + result = "error" + } + metrics.WebhookValidationTotal.WithLabelValues("TrainJob", "create", result).Inc() + return warnings, errs.ToAggregate() } func (w *TrainJobValidator) ValidateUpdate(ctx context.Context, oldObj, newObj *trainer.TrainJob) (admission.Warnings, error) { @@ -125,12 +132,18 @@ func (w *TrainJobValidator) ValidateUpdate(ctx context.Context, oldObj, newObj * log.V(5).Info("Validating update", "TrainJob", klog.KObj(newObj)) runtimeRefGK := runtime.RuntimeRefToRuntimeRegistryKey(newObj.Spec.RuntimeRef) - runtime, ok := w.runtimes[runtimeRefGK] + rt, ok := w.runtimes[runtimeRefGK] if !ok { + metrics.WebhookValidationTotal.WithLabelValues("TrainJob", "update", "error").Inc() return nil, fmt.Errorf("unsupported runtime: %s", runtimeRefGK) } - warnings, errors := runtime.ValidateObjects(ctx, oldObj, newObj) - return warnings, errors.ToAggregate() + warnings, errs := rt.ValidateObjects(ctx, oldObj, newObj) + result := "success" + if errs != nil { + result = "error" + } + metrics.WebhookValidationTotal.WithLabelValues("TrainJob", "update", result).Inc() + return warnings, errs.ToAggregate() } func (w *TrainJobValidator) ValidateDelete(ctx context.Context, obj *trainer.TrainJob) (admission.Warnings, error) {