Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 6 additions & 1 deletion internal/executor/categorizer/classifier.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,12 @@ package categorizer
import (
"fmt"
"regexp"
"time"

v1 "k8s.io/api/core/v1"

"github.com/armadaproject/armada/internal/common/errormatch"
"github.com/armadaproject/armada/internal/executor/metrics"
)

// maxCategoryNameLen is the maximum length for category and subcategory strings.
Expand Down Expand Up @@ -157,7 +159,10 @@ func (c *Classifier) Classify(pod *v1.Pod) ClassifyResult {

for _, cat := range c.categories {
for _, r := range cat.rules {
if ruleMatches(r, containers, podReason) {
start := time.Now()
matched := ruleMatches(r, containers, podReason)
metrics.RecordRuleEvaluationDuration(cat.name, r.subcategory, time.Since(start))
if matched {
return ClassifyResult{Category: cat.name, Subcategory: r.subcategory}
}
}
Expand Down
65 changes: 65 additions & 0 deletions internal/executor/categorizer/classifier_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@ import (
"strings"
"testing"

"github.com/prometheus/client_golang/prometheus"
dto "github.com/prometheus/client_model/go"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
v1 "k8s.io/api/core/v1"
Expand Down Expand Up @@ -408,6 +410,69 @@ func TestNewClassifier_ValidationErrors(t *testing.T) {
}
}

func TestClassify_RecordsRuleEvaluationDuration(t *testing.T) {
classifier, err := NewClassifier(ErrorCategoriesConfig{Categories: []CategoryConfig{
{Name: "infra", Rules: []CategoryRule{
{OnConditions: []string{errormatch.ConditionOOMKilled}, Subcategory: "oom"},
}},
{Name: "user_error", Rules: []CategoryRule{
{OnExitCodes: &errormatch.ExitCodeMatcher{Operator: errormatch.ExitCodeOperatorIn, Values: []int32{42}}, Subcategory: "bad_exit"},
{OnExitCodes: &errormatch.ExitCodeMatcher{Operator: errormatch.ExitCodeOperatorIn, Values: []int32{74}}, Subcategory: "other_exit"},
}},
}})
require.NoError(t, err)

before := ruleHistogramCounts(t)
pod := podWithTerminatedContainer(42, "Error", "")
result := classifier.Classify(pod)
require.Equal(t, "user_error", result.Category)
require.Equal(t, "bad_exit", result.Subcategory)
after := ruleHistogramCounts(t)

// First rule (infra/oom) does not match -> observed.
assert.Equal(t, uint64(1), after[labelKey{"infra", "oom"}]-before[labelKey{"infra", "oom"}],
"non-matching rule before the match should be observed")
// Second rule (user_error/bad_exit) matches -> observed.
assert.Equal(t, uint64(1), after[labelKey{"user_error", "bad_exit"}]-before[labelKey{"user_error", "bad_exit"}],
"matching rule should be observed")
// Third rule (user_error/other_exit) is after the match -> not reached, not observed.
assert.Equal(t, uint64(0), after[labelKey{"user_error", "other_exit"}]-before[labelKey{"user_error", "other_exit"}],
"rule after the match should not be observed")
}

type labelKey struct{ category, subcategory string }

// ruleHistogramCounts gathers the rule-evaluation histogram from the default
// registry and returns a map of (category, subcategory) -> sample count.
func ruleHistogramCounts(t *testing.T) map[labelKey]uint64 {
t.Helper()
families, err := prometheus.DefaultGatherer.Gather()
require.NoError(t, err)
counts := map[labelKey]uint64{}
for _, mf := range families {
if mf.GetName() != "armada_executor_job_failure_rule_evaluation_duration_seconds" {
continue
}
for _, m := range mf.GetMetric() {
counts[labelKeyFromMetric(m)] = m.GetHistogram().GetSampleCount()
}
}
return counts
}

func labelKeyFromMetric(m *dto.Metric) labelKey {
var k labelKey
for _, lp := range m.GetLabel() {
switch lp.GetName() {
case "failure_category":
k.category = lp.GetValue()
case "failure_subcategory":
k.subcategory = lp.GetValue()
}
}
return k
}

func podWithTerminatedContainer(exitCode int32, reason, message string) *v1.Pod {
return &v1.Pod{
Status: v1.PodStatus{
Expand Down
27 changes: 27 additions & 0 deletions internal/executor/metrics/metrics.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
package metrics

import (
"time"

"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
)
Expand All @@ -20,6 +22,21 @@ var jobFailureCategoryTotal = promauto.NewCounterVec(
[]string{failureCategoryLabel, failureSubcategoryLabel},
)

var jobFailureRuleEvaluationDurationSeconds = promauto.NewHistogramVec(
prometheus.HistogramOpts{
Name: ArmadaExecutorMetricsPrefix + "job_failure_rule_evaluation_duration_seconds",
Help: "Duration of evaluating a single classification rule against a pod, " +
"labeled by the rule's category and subcategory. Observed for every " +
"rule evaluation regardless of whether it matched.",
Buckets: []float64{
0.00001, 0.0001, 0.0005,
0.001, 0.005, 0.01, 0.05,
0.1, 0.25,
},
},
[]string{failureCategoryLabel, failureSubcategoryLabel},
)

// RecordJobFailure increments the per-category failure counter. Should be
// called from paths that commit to emitting a JobFailedEvent — retryable
// pod issues that emit a ReturnLease instead must not call this.
Expand All @@ -34,3 +51,13 @@ func RecordJobFailure(category, subcategory string) {
}
jobFailureCategoryTotal.WithLabelValues(category, subcategory).Inc()
}

// RecordRuleEvaluationDuration records the time a single classification
// rule took to evaluate. Called for every rule regardless of match outcome.
// An empty category is a no-op to avoid an empty failure_category label.
func RecordRuleEvaluationDuration(category, subcategory string, duration time.Duration) {
if category == "" {
return
}
jobFailureRuleEvaluationDurationSeconds.WithLabelValues(category, subcategory).Observe(duration.Seconds())
}
49 changes: 49 additions & 0 deletions internal/executor/metrics/metrics_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,13 @@ package metrics

import (
"testing"
"time"

"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/testutil"
dto "github.com/prometheus/client_model/go"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)

func TestRecordJobFailure(t *testing.T) {
Expand Down Expand Up @@ -38,3 +42,48 @@ func TestRecordJobFailure(t *testing.T) {
})
}
}

func TestRecordRuleEvaluationDuration(t *testing.T) {
tests := map[string]struct {
category string
subcategory string
duration time.Duration
expectedDelta uint64
}{
"records observation for category and subcategory": {
category: "rule-eval-test-cat",
subcategory: "rule-eval-test-sub",
duration: 100 * time.Microsecond,
expectedDelta: 1,
},
"accepts empty subcategory": {
category: "rule-eval-test-no-sub",
subcategory: "",
duration: 50 * time.Microsecond,
expectedDelta: 1,
},
"empty category is a no-op": {
category: "",
subcategory: "rule-eval-test-empty-cat",
duration: 75 * time.Microsecond,
expectedDelta: 0,
},
}
for name, tc := range tests {
t.Run(name, func(t *testing.T) {
h := jobFailureRuleEvaluationDurationSeconds.WithLabelValues(tc.category, tc.subcategory).(prometheus.Histogram)
before := histogramSampleCount(t, h)
RecordRuleEvaluationDuration(tc.category, tc.subcategory, tc.duration)
after := histogramSampleCount(t, h)
assert.Equal(t, tc.expectedDelta, after-before)
})
}
}

func histogramSampleCount(t *testing.T, h prometheus.Histogram) uint64 {
t.Helper()
pb := &dto.Metric{}
require.NoError(t, h.Write(pb))
require.NotNil(t, pb.Histogram)
return pb.Histogram.GetSampleCount()
}
Loading