From 1c80ed10d8c13cac7716a4229ffeaddf7a2d2acc Mon Sep 17 00:00:00 2001 From: Dave Protasowski Date: Mon, 30 Jun 2025 20:06:14 -0400 Subject: [PATCH 1/8] add workqueue metrics interface --- observability/metrics/k8s/instruments.go | 73 +++++++ observability/metrics/k8s/instruments_test.go | 195 ++++++++++++++++++ observability/metrics/k8s/workqueue.go | 167 +++++++++++++++ observability/metrics/k8s/workqueue_test.go | 82 ++++++++ observability/metrics/metricstest/assert.go | 103 +++++++++ 5 files changed, 620 insertions(+) create mode 100644 observability/metrics/k8s/instruments.go create mode 100644 observability/metrics/k8s/instruments_test.go create mode 100644 observability/metrics/k8s/workqueue.go create mode 100644 observability/metrics/k8s/workqueue_test.go diff --git a/observability/metrics/k8s/instruments.go b/observability/metrics/k8s/instruments.go new file mode 100644 index 0000000000..430c3f0ffe --- /dev/null +++ b/observability/metrics/k8s/instruments.go @@ -0,0 +1,73 @@ +/* +Copyright 2025 The Knative Authors + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package k8s + +import ( + "context" + + "go.opentelemetry.io/otel/attribute" + "go.opentelemetry.io/otel/metric" + + "k8s.io/client-go/util/workqueue" +) + +type gauge struct { + m metric.Int64UpDownCounter + attrs attribute.Set +} + +func (g *gauge) Inc() { + g.m.Add(context.Background(), 1, metric.WithAttributeSet(g.attrs)) +} + +func (g *gauge) Dec() { + g.m.Add(context.Background(), -1, metric.WithAttributeSet(g.attrs)) +} + +type counter struct { + m metric.Int64Counter + attrs attribute.Set +} + +func (c *counter) Inc() { + c.m.Add(context.Background(), 1, metric.WithAttributeSet(c.attrs)) +} + +type histogram struct { + m metric.Float64Histogram + attrs attribute.Set +} + +func (h *histogram) Observe(val float64) { + h.m.Record(context.Background(), val, metric.WithAttributeSet(h.attrs)) +} + +type settableGauge struct { + m metric.Float64Gauge + attrs attribute.Set +} + +func (s *settableGauge) Set(val float64) { + s.m.Record(context.Background(), val, metric.WithAttributeSet(s.attrs)) +} + +var ( + _ workqueue.GaugeMetric = (*gauge)(nil) + _ workqueue.CounterMetric = (*counter)(nil) + _ workqueue.HistogramMetric = (*histogram)(nil) + _ workqueue.SettableGaugeMetric = (*settableGauge)(nil) +) diff --git a/observability/metrics/k8s/instruments_test.go b/observability/metrics/k8s/instruments_test.go new file mode 100644 index 0000000000..90975dc5a5 --- /dev/null +++ b/observability/metrics/k8s/instruments_test.go @@ -0,0 +1,195 @@ +/* +Copyright 2025 The Knative Authors + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package k8s + +import ( + "testing" + + "go.opentelemetry.io/otel/attribute" + "go.opentelemetry.io/otel/metric" + sdkmetric "go.opentelemetry.io/otel/sdk/metric" + "go.opentelemetry.io/otel/sdk/metric/metricdata" + + "knative.dev/pkg/observability/metrics/metricstest" +) + +func TestGauge(t *testing.T) { + r := sdkmetric.NewManualReader() + provider := sdkmetric.NewMeterProvider(sdkmetric.WithReader(r)) + m := must(provider.Meter("meter").Int64UpDownCounter("instrument")) + g := gauge{m, attribute.NewSet(attribute.String("key", "val"))} + + g.Inc() + + metricstest.AssertMetrics(t, r, + metricstest.MetricsEqual("meter", metricdata.Metrics{ + Name: "instrument", + Data: metricdata.Sum[int64]{ + Temporality: metricdata.CumulativeTemporality, + DataPoints: []metricdata.DataPoint[int64]{{ + Attributes: attribute.NewSet(attribute.String("key", "val")), + Value: 1, + }}, + }, + }), + ) + + g.Dec() + + metricstest.AssertMetrics(t, r, + metricstest.MetricsEqual("meter", metricdata.Metrics{ + Name: "instrument", + Data: metricdata.Sum[int64]{ + Temporality: metricdata.CumulativeTemporality, + DataPoints: []metricdata.DataPoint[int64]{{ + Attributes: attribute.NewSet(attribute.String("key", "val")), + Value: 0, + }}, + }, + }), + ) +} + +func TestCounter(t *testing.T) { + r := sdkmetric.NewManualReader() + provider := sdkmetric.NewMeterProvider(sdkmetric.WithReader(r)) + m := must(provider.Meter("meter").Int64Counter("instrument")) + c := counter{m, attribute.NewSet(attribute.String("key", "val"))} + + c.Inc() + + metricstest.AssertMetrics(t, r, + metricstest.MetricsEqual("meter", metricdata.Metrics{ + Name: "instrument", + Data: metricdata.Sum[int64]{ + Temporality: metricdata.CumulativeTemporality, + IsMonotonic: true, + DataPoints: []metricdata.DataPoint[int64]{{ + Attributes: attribute.NewSet(attribute.String("key", "val")), + Value: 1, + }}, + }, + }), + ) + + c.Inc() + + metricstest.AssertMetrics(t, r, + metricstest.MetricsEqual("meter", metricdata.Metrics{ + Name: "instrument", + Data: metricdata.Sum[int64]{ + Temporality: metricdata.CumulativeTemporality, + IsMonotonic: true, + DataPoints: []metricdata.DataPoint[int64]{{ + Attributes: attribute.NewSet(attribute.String("key", "val")), + Value: 2, + }}, + }, + }), + ) +} + +func TestHistogram(t *testing.T) { + r := sdkmetric.NewManualReader() + provider := sdkmetric.NewMeterProvider(sdkmetric.WithReader(r)) + m := must(provider.Meter("meter").Float64Histogram("instrument", + metric.WithExplicitBucketBoundaries(0.01, 1, 2), + )) + c := histogram{m, attribute.NewSet(attribute.String("key", "val"))} + + c.Observe(1) + + metricstest.AssertMetrics(t, r, + metricstest.MetricsEqual("meter", metricdata.Metrics{ + Name: "instrument", + Data: metricdata.Histogram[float64]{ + Temporality: metricdata.CumulativeTemporality, + DataPoints: []metricdata.HistogramDataPoint[float64]{{ + Attributes: attribute.NewSet(attribute.String("key", "val")), + Bounds: []float64{0.01, 1, 2}, + BucketCounts: []uint64{0, 1, 0, 0}, + Count: 1, + Max: metricdata.NewExtrema[float64](1), + Min: metricdata.NewExtrema[float64](1), + Sum: 1, + }}, + }, + }), + ) + + c.Observe(2) + + metricstest.AssertMetrics(t, r, + metricstest.MetricsEqual("meter", metricdata.Metrics{ + Name: "instrument", + Data: metricdata.Histogram[float64]{ + Temporality: metricdata.CumulativeTemporality, + DataPoints: []metricdata.HistogramDataPoint[float64]{{ + Attributes: attribute.NewSet(attribute.String("key", "val")), + Bounds: []float64{0.01, 1, 2}, + BucketCounts: []uint64{0, 1, 1, 0}, + Count: 2, + Max: metricdata.NewExtrema[float64](2), + Min: metricdata.NewExtrema[float64](1), + Sum: 3, + }}, + }, + }), + ) +} + +func TestSettableGauge(t *testing.T) { + r := sdkmetric.NewManualReader() + provider := sdkmetric.NewMeterProvider(sdkmetric.WithReader(r)) + m := must(provider.Meter("meter").Float64Gauge("instrument")) + g := settableGauge{m, attribute.NewSet(attribute.String("key", "val"))} + + g.Set(1) + + metricstest.AssertMetrics(t, r, + metricstest.MetricsEqual("meter", metricdata.Metrics{ + Name: "instrument", + Data: metricdata.Gauge[float64]{ + DataPoints: []metricdata.DataPoint[float64]{{ + Attributes: attribute.NewSet(attribute.String("key", "val")), + Value: 1, + }}, + }, + }), + ) + + g.Set(2) + + metricstest.AssertMetrics(t, r, + metricstest.MetricsEqual("meter", metricdata.Metrics{ + Name: "instrument", + Data: metricdata.Gauge[float64]{ + DataPoints: []metricdata.DataPoint[float64]{{ + Attributes: attribute.NewSet(attribute.String("key", "val")), + Value: 2, + }}, + }, + }), + ) +} + +func must[T any](val T, err error) T { + if err != nil { + panic(err) + } + return val +} diff --git a/observability/metrics/k8s/workqueue.go b/observability/metrics/k8s/workqueue.go new file mode 100644 index 0000000000..2f2d0d1b61 --- /dev/null +++ b/observability/metrics/k8s/workqueue.go @@ -0,0 +1,167 @@ +/* +Copyright 2025 The Knative Authors + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package k8s + +import ( + "go.opentelemetry.io/otel" + "go.opentelemetry.io/otel/attribute" + "go.opentelemetry.io/otel/metric" + + "k8s.io/client-go/util/workqueue" + + "knative.dev/pkg/observability/attributekey" +) + +var nameKey = attributekey.String("name") + +type WorkqueueMetricsProvider struct { + depth metric.Int64UpDownCounter + adds metric.Int64Counter + latency metric.Float64Histogram + workDuration metric.Float64Histogram + unfinishedWork metric.Float64Gauge + longestRunning metric.Float64Gauge + retries metric.Int64Counter +} + +type options struct { + meterProvider metric.MeterProvider +} + +type Option func(*options) + +func WithMeterProvider(p metric.MeterProvider) Option { + return func(o *options) { + if p != nil { + o.meterProvider = p + } + } +} + +func NewWorkqueueMetricsProvider(opts ...Option) (*WorkqueueMetricsProvider, error) { + options := options{ + meterProvider: otel.GetMeterProvider(), + } + var err error + + for _, opt := range opts { + opt(&options) + } + + meter := options.meterProvider.Meter("knative.dev/pkg/observability/metrics/k8s") + + w := &WorkqueueMetricsProvider{} + + w.depth, err = meter.Int64UpDownCounter( + "kn.workqueue.depth", + metric.WithDescription("Number of current items in the queue"), + metric.WithUnit("{item}"), + ) + if err != nil { + return nil, err + } + + w.adds, err = meter.Int64Counter( + "kn.workqueue.adds", + metric.WithDescription("Number of items added to the queue"), + metric.WithUnit("{item}"), + ) + if err != nil { + return nil, err + } + + w.latency, err = meter.Float64Histogram( + "kn.workqueue.queue.duration", + metric.WithDescription("How long an item stays in workqueue"), + metric.WithUnit("s"), + metric.WithExplicitBucketBoundaries(0.005, 0.01, 0.025, 0.05, 0.075, 0.1, 0.25, 0.5, 0.75, 1, 2.5, 5, 7.5, 10), + ) + if err != nil { + return nil, err + } + + w.workDuration, err = meter.Float64Histogram( + "kn.workqueue.process.duration", + metric.WithUnit("How long in seconds processing an item from workqueue takes"), + metric.WithUnit("s"), + metric.WithExplicitBucketBoundaries(0.005, 0.01, 0.025, 0.05, 0.075, 0.1, 0.25, 0.5, 0.75, 1, 2.5, 5, 7.5, 10), + ) + if err != nil { + return nil, err + } + + w.unfinishedWork, err = meter.Float64Gauge( + "kn.workqueue.unfinished_work", + metric.WithUnit("s"), + ) + if err != nil { + return nil, err + } + + w.longestRunning, err = meter.Float64Gauge( + "kn.workqueue.longest_running_processor", + metric.WithUnit("s"), + ) + if err != nil { + return nil, err + } + + w.retries, err = meter.Int64Counter( + "kn.workqueue.retries", + metric.WithDescription("Number of items re-added to the queue"), + metric.WithUnit("{item}"), + ) + if err != nil { + return nil, err + } + + return w, nil +} + +func (p *WorkqueueMetricsProvider) NewDepthMetric(name string) workqueue.GaugeMetric { + return &gauge{m: p.depth, attrs: attrs(name)} +} + +func (p *WorkqueueMetricsProvider) NewAddsMetric(name string) workqueue.CounterMetric { + return &counter{m: p.adds, attrs: attrs(name)} +} + +func (p *WorkqueueMetricsProvider) NewLatencyMetric(name string) workqueue.HistogramMetric { + return &histogram{m: p.latency, attrs: attrs(name)} +} + +func (p *WorkqueueMetricsProvider) NewWorkDurationMetric(name string) workqueue.HistogramMetric { + return &histogram{m: p.workDuration, attrs: attrs(name)} +} + +func (p *WorkqueueMetricsProvider) NewUnfinishedWorkSecondsMetric(name string) workqueue.SettableGaugeMetric { + return &settableGauge{m: p.unfinishedWork, attrs: attrs(name)} +} + +func (p *WorkqueueMetricsProvider) NewLongestRunningProcessorSecondsMetric(name string) workqueue.SettableGaugeMetric { + return &settableGauge{m: p.longestRunning, attrs: attrs(name)} +} + +func (p *WorkqueueMetricsProvider) NewRetriesMetric(name string) workqueue.CounterMetric { + return &counter{m: p.retries, attrs: attrs(name)} +} + +var _ workqueue.MetricsProvider = (*WorkqueueMetricsProvider)(nil) + +func attrs(name string) attribute.Set { + return attribute.NewSet(nameKey.With(name)) +} diff --git a/observability/metrics/k8s/workqueue_test.go b/observability/metrics/k8s/workqueue_test.go new file mode 100644 index 0000000000..390876fc95 --- /dev/null +++ b/observability/metrics/k8s/workqueue_test.go @@ -0,0 +1,82 @@ +/* +Copyright 2025 The Knative Authors + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package k8s + +import ( + "testing" + + "go.opentelemetry.io/otel" + sdkmetric "go.opentelemetry.io/otel/sdk/metric" +) + +func TestNewWorkqueueMetricsProvider(t *testing.T) { + r := sdkmetric.NewManualReader() + provider := sdkmetric.NewMeterProvider(sdkmetric.WithReader(r)) + p, err := NewWorkqueueMetricsProvider(WithMeterProvider(provider)) + if err != nil { + t.Fatal("unexpected error", err) + } + + if p == nil { + t.Fatal("provider returned was nil") + } +} + +func TestNewWorkqueueMetricsProviderGlobal(t *testing.T) { + r := sdkmetric.NewManualReader() + provider := sdkmetric.NewMeterProvider(sdkmetric.WithReader(r)) + otel.SetMeterProvider(provider) + p, err := NewWorkqueueMetricsProvider() + if err != nil { + t.Fatal("unexpected error", err) + } + + if p == nil { + t.Fatal("provider returned was nil") + } +} + +func TestWorkqueueMetricsProviderHelpers(t *testing.T) { + r := sdkmetric.NewManualReader() + provider := sdkmetric.NewMeterProvider(sdkmetric.WithReader(r)) + p, err := NewWorkqueueMetricsProvider(WithMeterProvider(provider)) + if err != nil { + t.Fatal("unexpected error", err) + } + + if m := p.NewAddsMetric("queue"); m == nil { + t.Error("NewAddsMetric() returned nil") + } + if m := p.NewDepthMetric("queue"); m == nil { + t.Error("NewDepthMetric() returned nil") + } + if m := p.NewLatencyMetric("queue"); m == nil { + t.Error("NewLatencyMetric() returned nil") + } + if m := p.NewWorkDurationMetric("queue"); m == nil { + t.Error("NewWorkDurationMetric() returned nil") + } + if m := p.NewLongestRunningProcessorSecondsMetric("queue"); m == nil { + t.Error("NewLongestRunningProcessorSecondsMetric() returned nil") + } + if m := p.NewUnfinishedWorkSecondsMetric("queue"); m == nil { + t.Error("NewUnfinishedWorkSecondsMetric() returned nil") + } + if m := p.NewRetriesMetric("queue"); m == nil { + t.Error("NewRetriesMetric() returned nil") + } +} diff --git a/observability/metrics/metricstest/assert.go b/observability/metrics/metricstest/assert.go index a4ea314536..992af5c6ff 100644 --- a/observability/metrics/metricstest/assert.go +++ b/observability/metrics/metricstest/assert.go @@ -134,3 +134,106 @@ func MetricsPresent(scopeName string, names ...string) AssertFunc { } } } + +func MetricsEqual(scopeName string, expected ...metricdata.Metrics) AssertFunc { + return func(t testingT, rm *metricdata.ResourceMetrics) { + t.Helper() + opts := metricdatatest.IgnoreTimestamp() + + expectedSet := make(map[string]metricdata.Metrics) + for _, exp := range expected { + expectedSet[exp.Name] = exp + } + + for _, sm := range rm.ScopeMetrics { + if sm.Scope.Name != scopeName { + continue + } + + if len(sm.Metrics) != len(expected) { + t.Errorf("expected %d metrics in scope %q, got %d", len(expected), scopeName, len(sm.Metrics)) + return + } + + for _, actual := range sm.Metrics { + expected, exists := expectedSet[actual.Name] + if !exists { + t.Errorf("unexpected metric %q in scope %q", actual.Name, scopeName) + continue + } + + // Compare metric properties + if actual.Name != expected.Name { + t.Errorf("metric name mismatch: expected %q, got %q", expected.Name, actual.Name) + } + + if actual.Description != expected.Description { + t.Errorf("metric description mismatch for %q: expected %q, got %q", actual.Name, expected.Description, actual.Description) + } + + if actual.Unit != expected.Unit { + t.Errorf("metric unit mismatch for %q: expected %q, got %q", actual.Name, expected.Unit, actual.Unit) + } + + // Use metricdatatest to compare the actual metric data + mt := t.(metricdatatest.TestingT) + switch actualData := actual.Data.(type) { + case metricdata.Sum[int64]: + if expectedData, ok := expected.Data.(metricdata.Sum[int64]); ok { + metricdatatest.AssertEqual(mt, expectedData, actualData, opts) + } else { + t.Errorf("metric data type mismatch for %q: expected %T, got %T", actual.Name, expected.Data, actualData) + } + case metricdata.Sum[float64]: + if expectedData, ok := expected.Data.(metricdata.Sum[float64]); ok { + metricdatatest.AssertEqual(mt, expectedData, actualData, opts) + } else { + t.Errorf("metric data type mismatch for %q: expected %T, got %T", actual.Name, expected.Data, actualData) + } + case metricdata.Histogram[int64]: + if expectedData, ok := expected.Data.(metricdata.Histogram[int64]); ok { + metricdatatest.AssertEqual(mt, expectedData, actualData, opts) + } else { + t.Errorf("metric data type mismatch for %q: expected %T, got %T", actual.Name, expected.Data, actualData) + } + case metricdata.Histogram[float64]: + if expectedData, ok := expected.Data.(metricdata.Histogram[float64]); ok { + metricdatatest.AssertEqual(mt, expectedData, actualData, opts) + } else { + t.Errorf("metric data type mismatch for %q: expected %T, got %T", actual.Name, expected.Data, actualData) + } + case metricdata.ExponentialHistogram[int64]: + if expectedData, ok := expected.Data.(metricdata.ExponentialHistogram[int64]); ok { + metricdatatest.AssertEqual(mt, expectedData, actualData, opts) + } else { + t.Errorf("metric data type mismatch for %q: expected %T, got %T", actual.Name, expected.Data, actualData) + } + case metricdata.ExponentialHistogram[float64]: + if expectedData, ok := expected.Data.(metricdata.ExponentialHistogram[float64]); ok { + metricdatatest.AssertEqual(mt, expectedData, actualData, opts) + } else { + t.Errorf("metric data type mismatch for %q: expected %T, got %T", actual.Name, expected.Data, actualData) + } + case metricdata.Gauge[int64]: + if expectedData, ok := expected.Data.(metricdata.Gauge[int64]); ok { + metricdatatest.AssertEqual(mt, expectedData, actualData, opts) + } else { + t.Errorf("metric data type mismatch for %q: expected %T, got %T", actual.Name, expected.Data, actualData) + } + case metricdata.Gauge[float64]: + if expectedData, ok := expected.Data.(metricdata.Gauge[float64]); ok { + metricdatatest.AssertEqual(mt, expectedData, actualData, opts) + } else { + t.Errorf("metric data type mismatch for %q: expected %T, got %T", actual.Name, expected.Data, actualData) + } + default: + t.Errorf("unsupported metric data type for metric %q: %T", actual.Name, actualData) + } + } + return + } + + // If we get here, the scope wasn't found + t.Errorf("scope %q not found in metrics", scopeName) + } +} From a5a44aefa5a6e38a62abfca3b380a6ca6ca3e7d5 Mon Sep 17 00:00:00 2001 From: Dave Protasowski Date: Mon, 30 Jun 2025 22:03:35 -0400 Subject: [PATCH 2/8] incorporate k8s client-go tool metrics - request latency & result --- injection/sharedmain/main.go | 24 + observability/metrics/k8s/tools.go | 144 ++ observability/metrics/k8s/tools_test.go | 150 ++ observability/metrics/k8s/workqueue.go | 2 +- .../otel/semconv/v1.34.0/httpconv/metric.go | 1418 +++++++++++++++++ vendor/modules.txt | 1 + 6 files changed, 1738 insertions(+), 1 deletion(-) create mode 100644 observability/metrics/k8s/tools.go create mode 100644 observability/metrics/k8s/tools_test.go create mode 100644 vendor/go.opentelemetry.io/otel/semconv/v1.34.0/httpconv/metric.go diff --git a/injection/sharedmain/main.go b/injection/sharedmain/main.go index a79d2d760d..2db674f9ef 100644 --- a/injection/sharedmain/main.go +++ b/injection/sharedmain/main.go @@ -45,6 +45,8 @@ import ( "k8s.io/apimachinery/pkg/util/sets" "k8s.io/apimachinery/pkg/util/wait" "k8s.io/client-go/rest" + k8stoolmetrics "k8s.io/client-go/tools/metrics" + "k8s.io/client-go/util/workqueue" "k8s.io/klog/v2" kubeclient "knative.dev/pkg/client/injection/kube/client" @@ -58,6 +60,7 @@ import ( "knative.dev/pkg/observability" o11yconfigmap "knative.dev/pkg/observability/configmap" "knative.dev/pkg/observability/metrics" + k8smetrics "knative.dev/pkg/observability/metrics/k8s" "knative.dev/pkg/observability/resource" "knative.dev/pkg/observability/tracing" "knative.dev/pkg/reconciler" @@ -407,6 +410,27 @@ func SetupObservabilityOrDie( otel.SetMeterProvider(meterProvider) + workQueueMetrics, err := k8smetrics.NewWorkqueueMetricsProvider( + k8smetrics.WithMeterProvider(meterProvider), + ) + if err != nil { + logger.Fatalw("Failed to setup k8s workqueue metrics", zap.Error(err)) + } + + workqueue.SetProvider(workQueueMetrics) + + clientMetrics, err := k8smetrics.NewClientMetricProvider( + k8smetrics.WithMeterProvider(meterProvider), + ) + if err != nil { + logger.Fatalw("Failed to setup k8s client go metrics", zap.Error(err)) + } + + k8stoolmetrics.Register(k8stoolmetrics.RegisterOpts{ + RequestLatency: clientMetrics.RequestLatencyMetric(), + RequestResult: clientMetrics.RequestResultMetric(), + }) + err = runtime.Start( runtime.WithMinimumReadMemStatsInterval(cfg.Runtime.ExportInterval), ) diff --git a/observability/metrics/k8s/tools.go b/observability/metrics/k8s/tools.go new file mode 100644 index 0000000000..83e8ab20ec --- /dev/null +++ b/observability/metrics/k8s/tools.go @@ -0,0 +1,144 @@ +/* +Copyright 2025 The Knative Authors + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package k8s + +import ( + "context" + "net/url" + "strconv" + "strings" + "time" + + "go.opentelemetry.io/otel" + "go.opentelemetry.io/otel/attribute" + "go.opentelemetry.io/otel/metric" + semconv "go.opentelemetry.io/otel/semconv/v1.34.0" + "go.opentelemetry.io/otel/semconv/v1.34.0/httpconv" + "k8s.io/client-go/tools/metrics" +) + +const ( + scopeName = "knative.dev/pkg/observability/metrics/k8s" + + resultMetricName = "kn.k8s.client.http.response.status_code" + resultMetricDescription = "Count of response codes partitioned by method and host" +) + +var ( + latencyBounds = []float64{0.005, 0.01, 0.025, 0.05, 0.075, 0.1, 0.25, 0.5, 0.75, 1, 2.5, 5, 7.5, 10} +) + +type ClientProvider struct { + latencyMetric httpconv.ClientRequestDuration + resultMetric metric.Int64Counter +} + +func NewClientMetricProvider(opts ...Option) (*ClientProvider, error) { + options := options{ + meterProvider: otel.GetMeterProvider(), + } + + cp := &ClientProvider{} + + for _, opt := range opts { + opt(&options) + } + + meter := options.meterProvider.Meter(scopeName) + + var err error + cp.latencyMetric, err = httpconv.NewClientRequestDuration( + meter, + metric.WithExplicitBucketBoundaries(latencyBounds...), + ) + if err != nil { + return nil, err + } + + cp.resultMetric, err = meter.Int64Counter( + resultMetricName, + metric.WithDescription(resultMetricDescription), + metric.WithUnit("{item}"), + ) + + return cp, nil +} + +func (cp *ClientProvider) RequestLatencyMetric() metrics.LatencyMetric { + return &latency{cp} +} + +func (cp *ClientProvider) RequestResultMetric() metrics.ResultMetric { + return &result{cp} +} + +type latency struct { + cp *ClientProvider +} + +func (l *latency) Observe(ctx context.Context, verb string, u url.URL, latency time.Duration) { + serverAddress := u.Hostname() + serverPort := 80 + + if u.Scheme == "https" { + serverPort = 443 + } + + if portStr := u.Port(); portStr != "" { + port, err := strconv.ParseInt(portStr, 10, 64) + if err != nil && port > 0 { + serverPort = int(port) + } + } + + elapsedTime := float64(latency) / float64(time.Second) + + l.cp.latencyMetric.Record(ctx, elapsedTime, + httpconv.RequestMethodAttr(strings.ToUpper(verb)), + serverAddress, + serverPort, + l.cp.latencyMetric.AttrURLTemplate(u.Path), + l.cp.latencyMetric.AttrURLScheme(u.Scheme), + ) +} + +type result struct { + cp *ClientProvider +} + +func (r *result) Increment(ctx context.Context, code string, method string, host string) { + codeInt := 200 + + if code == "200" { + // happy path - noop + } else if c, err := strconv.ParseInt(code, 10, 64); err != nil { + codeInt = int(c) + } + + r.cp.resultMetric.Add(ctx, 1, + metric.WithAttributeSet(attribute.NewSet( + semconv.ServerAddressKey.String(host), + semconv.HTTPRequestMethodKey.String(strings.ToUpper(method)), + semconv.HTTPResponseStatusCodeKey.Int(codeInt), + )), + ) +} + +var ( + _ metrics.LatencyMetric = (*latency)(nil) + _ metrics.ResultMetric = (*result)(nil) +) diff --git a/observability/metrics/k8s/tools_test.go b/observability/metrics/k8s/tools_test.go new file mode 100644 index 0000000000..f5b0cd63ac --- /dev/null +++ b/observability/metrics/k8s/tools_test.go @@ -0,0 +1,150 @@ +/* +Copyright 2025 The Knative Authors + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package k8s + +import ( + "context" + "net/url" + "testing" + "time" + + "go.opentelemetry.io/otel" + "go.opentelemetry.io/otel/attribute" + sdkmetric "go.opentelemetry.io/otel/sdk/metric" + "go.opentelemetry.io/otel/sdk/metric/metricdata" + semconv "go.opentelemetry.io/otel/semconv/v1.34.0" + "knative.dev/pkg/observability/metrics/metricstest" +) + +func TestNewClientMetricsProvider(t *testing.T) { + r := sdkmetric.NewManualReader() + provider := sdkmetric.NewMeterProvider(sdkmetric.WithReader(r)) + p, err := NewClientMetricProvider(WithMeterProvider(provider)) + if err != nil { + t.Fatal("unexpected error", err) + } + + if p == nil { + t.Fatal("provider returned was nil") + } +} + +func TestNewClientMetricsProviderGlobal(t *testing.T) { + r := sdkmetric.NewManualReader() + provider := sdkmetric.NewMeterProvider(sdkmetric.WithReader(r)) + otel.SetMeterProvider(provider) + p, err := NewClientMetricProvider() + if err != nil { + t.Fatal("unexpected error", err) + } + + if p == nil { + t.Fatal("provider returned was nil") + } +} + +func TestLatencyMetric(t *testing.T) { + r := sdkmetric.NewManualReader() + provider := sdkmetric.NewMeterProvider(sdkmetric.WithReader(r)) + p, err := NewClientMetricProvider(WithMeterProvider(provider)) + if err != nil { + t.Fatal("unexpected error", err) + } + + m := p.RequestLatencyMetric() + if m == nil { + t.Fatal("unexpected nil latency metric") + } + + u, err := url.Parse("https://example.com/path") + if err != nil { + t.Fatal(err) + } + + m.Observe(context.Background(), "GET", *u, time.Second) + + bucketCounts := [15]uint64{} + bucketCounts[9] = 1 + + metricstest.AssertMetrics(t, r, + metricstest.MetricsEqual( + scopeName, + metricdata.Metrics{ + Name: "http.client.request.duration", + Unit: "s", + Description: "Duration of HTTP client requests.", + Data: metricdata.Histogram[float64]{ + Temporality: metricdata.CumulativeTemporality, + DataPoints: []metricdata.HistogramDataPoint[float64]{{ + Bounds: latencyBounds, + Count: 1, + Sum: 1, + BucketCounts: bucketCounts[:], + Min: metricdata.NewExtrema[float64](1), + Max: metricdata.NewExtrema[float64](1), + Attributes: attribute.NewSet( + semconv.ServerAddressKey.String("example.com"), + semconv.ServerPortKey.Int(443), + semconv.HTTPRequestMethodKey.String("GET"), + semconv.URLSchemeKey.String("https"), + semconv.URLTemplateKey.String("/path"), + ), + }}, + }, + }, + ), + ) +} + +func TestResultMetric(t *testing.T) { + r := sdkmetric.NewManualReader() + provider := sdkmetric.NewMeterProvider(sdkmetric.WithReader(r)) + p, err := NewClientMetricProvider(WithMeterProvider(provider)) + if err != nil { + t.Fatal("unexpected error", err) + } + + m := p.RequestResultMetric() + if m == nil { + t.Fatal("unexpected nil result metric") + } + + m.Increment(context.Background(), "200", "GET", "example.com") + + metricstest.AssertMetrics(t, r, + metricstest.MetricsEqual( + scopeName, + metricdata.Metrics{ + Name: resultMetricName, + Unit: "{item}", + Description: resultMetricDescription, + Data: metricdata.Sum[int64]{ + IsMonotonic: true, + Temporality: metricdata.CumulativeTemporality, + DataPoints: []metricdata.DataPoint[int64]{{ + Attributes: attribute.NewSet( + semconv.ServerAddressKey.String("example.com"), + semconv.HTTPRequestMethodKey.String("GET"), + semconv.HTTPResponseStatusCodeKey.Int(200), + ), + Value: 1, + }}, + }, + }, + ), + ) +} diff --git a/observability/metrics/k8s/workqueue.go b/observability/metrics/k8s/workqueue.go index 2f2d0d1b61..0f64112fa6 100644 --- a/observability/metrics/k8s/workqueue.go +++ b/observability/metrics/k8s/workqueue.go @@ -62,7 +62,7 @@ func NewWorkqueueMetricsProvider(opts ...Option) (*WorkqueueMetricsProvider, err opt(&options) } - meter := options.meterProvider.Meter("knative.dev/pkg/observability/metrics/k8s") + meter := options.meterProvider.Meter(scopeName) w := &WorkqueueMetricsProvider{} diff --git a/vendor/go.opentelemetry.io/otel/semconv/v1.34.0/httpconv/metric.go b/vendor/go.opentelemetry.io/otel/semconv/v1.34.0/httpconv/metric.go new file mode 100644 index 0000000000..79843adbb5 --- /dev/null +++ b/vendor/go.opentelemetry.io/otel/semconv/v1.34.0/httpconv/metric.go @@ -0,0 +1,1418 @@ +// Code generated from semantic convention specification. DO NOT EDIT. + +// Package httpconv provides types and functionality for OpenTelemetry semantic +// conventions in the "http" namespace. +package httpconv + +import ( + "context" + "sync" + + "go.opentelemetry.io/otel/attribute" + "go.opentelemetry.io/otel/metric" + "go.opentelemetry.io/otel/metric/noop" +) + +var ( + addOptPool = &sync.Pool{New: func() any { return &[]metric.AddOption{} }} + recOptPool = &sync.Pool{New: func() any { return &[]metric.RecordOption{} }} +) + +// ErrorTypeAttr is an attribute conforming to the error.type semantic +// conventions. It represents the describes a class of error the operation ended +// with. +type ErrorTypeAttr string + +var ( + // ErrorTypeOther is a fallback error value to be used when the instrumentation + // doesn't define a custom value. + ErrorTypeOther ErrorTypeAttr = "_OTHER" +) + +// ConnectionStateAttr is an attribute conforming to the http.connection.state +// semantic conventions. It represents the state of the HTTP connection in the +// HTTP connection pool. +type ConnectionStateAttr string + +var ( + // ConnectionStateActive is the active state. + ConnectionStateActive ConnectionStateAttr = "active" + // ConnectionStateIdle is the idle state. + ConnectionStateIdle ConnectionStateAttr = "idle" +) + +// RequestMethodAttr is an attribute conforming to the http.request.method +// semantic conventions. It represents the HTTP request method. +type RequestMethodAttr string + +var ( + // RequestMethodConnect is the CONNECT method. + RequestMethodConnect RequestMethodAttr = "CONNECT" + // RequestMethodDelete is the DELETE method. + RequestMethodDelete RequestMethodAttr = "DELETE" + // RequestMethodGet is the GET method. + RequestMethodGet RequestMethodAttr = "GET" + // RequestMethodHead is the HEAD method. + RequestMethodHead RequestMethodAttr = "HEAD" + // RequestMethodOptions is the OPTIONS method. + RequestMethodOptions RequestMethodAttr = "OPTIONS" + // RequestMethodPatch is the PATCH method. + RequestMethodPatch RequestMethodAttr = "PATCH" + // RequestMethodPost is the POST method. + RequestMethodPost RequestMethodAttr = "POST" + // RequestMethodPut is the PUT method. + RequestMethodPut RequestMethodAttr = "PUT" + // RequestMethodTrace is the TRACE method. + RequestMethodTrace RequestMethodAttr = "TRACE" + // RequestMethodOther is the any HTTP method that the instrumentation has no + // prior knowledge of. + RequestMethodOther RequestMethodAttr = "_OTHER" +) + +// UserAgentSyntheticTypeAttr is an attribute conforming to the +// user_agent.synthetic.type semantic conventions. It represents the specifies +// the category of synthetic traffic, such as tests or bots. +type UserAgentSyntheticTypeAttr string + +var ( + // UserAgentSyntheticTypeBot is the bot source. + UserAgentSyntheticTypeBot UserAgentSyntheticTypeAttr = "bot" + // UserAgentSyntheticTypeTest is the synthetic test source. + UserAgentSyntheticTypeTest UserAgentSyntheticTypeAttr = "test" +) + +// ClientActiveRequests is an instrument used to record metric values conforming +// to the "http.client.active_requests" semantic conventions. It represents the +// number of active HTTP requests. +type ClientActiveRequests struct { + metric.Int64UpDownCounter +} + +// NewClientActiveRequests returns a new ClientActiveRequests instrument. +func NewClientActiveRequests( + m metric.Meter, + opt ...metric.Int64UpDownCounterOption, +) (ClientActiveRequests, error) { + // Check if the meter is nil. + if m == nil { + return ClientActiveRequests{noop.Int64UpDownCounter{}}, nil + } + + i, err := m.Int64UpDownCounter( + "http.client.active_requests", + append([]metric.Int64UpDownCounterOption{ + metric.WithDescription("Number of active HTTP requests."), + metric.WithUnit("{request}"), + }, opt...)..., + ) + if err != nil { + return ClientActiveRequests{noop.Int64UpDownCounter{}}, err + } + return ClientActiveRequests{i}, nil +} + +// Inst returns the underlying metric instrument. +func (m ClientActiveRequests) Inst() metric.Int64UpDownCounter { + return m.Int64UpDownCounter +} + +// Name returns the semantic convention name of the instrument. +func (ClientActiveRequests) Name() string { + return "http.client.active_requests" +} + +// Unit returns the semantic convention unit of the instrument +func (ClientActiveRequests) Unit() string { + return "{request}" +} + +// Description returns the semantic convention description of the instrument +func (ClientActiveRequests) Description() string { + return "Number of active HTTP requests." +} + +// Add adds incr to the existing count. +// +// The serverAddress is the server domain name if available without reverse DNS +// lookup; otherwise, IP address or Unix domain socket name. +// +// The serverPort is the port identifier of the ["URI origin"] HTTP request is +// sent to. +// +// All additional attrs passed are included in the recorded value. +// +// ["URI origin"]: https://www.rfc-editor.org/rfc/rfc9110.html#name-uri-origin +func (m ClientActiveRequests) Add( + ctx context.Context, + incr int64, + serverAddress string, + serverPort int, + attrs ...attribute.KeyValue, +) { + o := addOptPool.Get().(*[]metric.AddOption) + defer func() { + *o = (*o)[:0] + addOptPool.Put(o) + }() + + *o = append( + *o, + metric.WithAttributes( + append( + attrs, + attribute.String("server.address", serverAddress), + attribute.Int("server.port", serverPort), + )..., + ), + ) + + m.Int64UpDownCounter.Add(ctx, incr, *o...) +} + +// AttrURLTemplate returns an optional attribute for the "url.template" semantic +// convention. It represents the low-cardinality template of an +// [absolute path reference]. +// +// [absolute path reference]: https://www.rfc-editor.org/rfc/rfc3986#section-4.2 +func (ClientActiveRequests) AttrURLTemplate(val string) attribute.KeyValue { + return attribute.String("url.template", val) +} + +// AttrRequestMethod returns an optional attribute for the "http.request.method" +// semantic convention. It represents the HTTP request method. +func (ClientActiveRequests) AttrRequestMethod(val RequestMethodAttr) attribute.KeyValue { + return attribute.String("http.request.method", string(val)) +} + +// AttrURLScheme returns an optional attribute for the "url.scheme" semantic +// convention. It represents the [URI scheme] component identifying the used +// protocol. +// +// [URI scheme]: https://www.rfc-editor.org/rfc/rfc3986#section-3.1 +func (ClientActiveRequests) AttrURLScheme(val string) attribute.KeyValue { + return attribute.String("url.scheme", val) +} + +// ClientConnectionDuration is an instrument used to record metric values +// conforming to the "http.client.connection.duration" semantic conventions. It +// represents the duration of the successfully established outbound HTTP +// connections. +type ClientConnectionDuration struct { + metric.Float64Histogram +} + +// NewClientConnectionDuration returns a new ClientConnectionDuration instrument. +func NewClientConnectionDuration( + m metric.Meter, + opt ...metric.Float64HistogramOption, +) (ClientConnectionDuration, error) { + // Check if the meter is nil. + if m == nil { + return ClientConnectionDuration{noop.Float64Histogram{}}, nil + } + + i, err := m.Float64Histogram( + "http.client.connection.duration", + append([]metric.Float64HistogramOption{ + metric.WithDescription("The duration of the successfully established outbound HTTP connections."), + metric.WithUnit("s"), + }, opt...)..., + ) + if err != nil { + return ClientConnectionDuration{noop.Float64Histogram{}}, err + } + return ClientConnectionDuration{i}, nil +} + +// Inst returns the underlying metric instrument. +func (m ClientConnectionDuration) Inst() metric.Float64Histogram { + return m.Float64Histogram +} + +// Name returns the semantic convention name of the instrument. +func (ClientConnectionDuration) Name() string { + return "http.client.connection.duration" +} + +// Unit returns the semantic convention unit of the instrument +func (ClientConnectionDuration) Unit() string { + return "s" +} + +// Description returns the semantic convention description of the instrument +func (ClientConnectionDuration) Description() string { + return "The duration of the successfully established outbound HTTP connections." +} + +// Record records val to the current distribution. +// +// The serverAddress is the server domain name if available without reverse DNS +// lookup; otherwise, IP address or Unix domain socket name. +// +// The serverPort is the port identifier of the ["URI origin"] HTTP request is +// sent to. +// +// All additional attrs passed are included in the recorded value. +// +// ["URI origin"]: https://www.rfc-editor.org/rfc/rfc9110.html#name-uri-origin +func (m ClientConnectionDuration) Record( + ctx context.Context, + val float64, + serverAddress string, + serverPort int, + attrs ...attribute.KeyValue, +) { + o := recOptPool.Get().(*[]metric.RecordOption) + defer func() { + *o = (*o)[:0] + recOptPool.Put(o) + }() + + *o = append( + *o, + metric.WithAttributes( + append( + attrs, + attribute.String("server.address", serverAddress), + attribute.Int("server.port", serverPort), + )..., + ), + ) + + m.Float64Histogram.Record(ctx, val, *o...) +} + +// AttrNetworkPeerAddress returns an optional attribute for the +// "network.peer.address" semantic convention. It represents the peer address of +// the network connection - IP address or Unix domain socket name. +func (ClientConnectionDuration) AttrNetworkPeerAddress(val string) attribute.KeyValue { + return attribute.String("network.peer.address", val) +} + +// AttrNetworkProtocolVersion returns an optional attribute for the +// "network.protocol.version" semantic convention. It represents the actual +// version of the protocol used for network communication. +func (ClientConnectionDuration) AttrNetworkProtocolVersion(val string) attribute.KeyValue { + return attribute.String("network.protocol.version", val) +} + +// AttrURLScheme returns an optional attribute for the "url.scheme" semantic +// convention. It represents the [URI scheme] component identifying the used +// protocol. +// +// [URI scheme]: https://www.rfc-editor.org/rfc/rfc3986#section-3.1 +func (ClientConnectionDuration) AttrURLScheme(val string) attribute.KeyValue { + return attribute.String("url.scheme", val) +} + +// ClientOpenConnections is an instrument used to record metric values conforming +// to the "http.client.open_connections" semantic conventions. It represents the +// number of outbound HTTP connections that are currently active or idle on the +// client. +type ClientOpenConnections struct { + metric.Int64UpDownCounter +} + +// NewClientOpenConnections returns a new ClientOpenConnections instrument. +func NewClientOpenConnections( + m metric.Meter, + opt ...metric.Int64UpDownCounterOption, +) (ClientOpenConnections, error) { + // Check if the meter is nil. + if m == nil { + return ClientOpenConnections{noop.Int64UpDownCounter{}}, nil + } + + i, err := m.Int64UpDownCounter( + "http.client.open_connections", + append([]metric.Int64UpDownCounterOption{ + metric.WithDescription("Number of outbound HTTP connections that are currently active or idle on the client."), + metric.WithUnit("{connection}"), + }, opt...)..., + ) + if err != nil { + return ClientOpenConnections{noop.Int64UpDownCounter{}}, err + } + return ClientOpenConnections{i}, nil +} + +// Inst returns the underlying metric instrument. +func (m ClientOpenConnections) Inst() metric.Int64UpDownCounter { + return m.Int64UpDownCounter +} + +// Name returns the semantic convention name of the instrument. +func (ClientOpenConnections) Name() string { + return "http.client.open_connections" +} + +// Unit returns the semantic convention unit of the instrument +func (ClientOpenConnections) Unit() string { + return "{connection}" +} + +// Description returns the semantic convention description of the instrument +func (ClientOpenConnections) Description() string { + return "Number of outbound HTTP connections that are currently active or idle on the client." +} + +// Add adds incr to the existing count. +// +// The connectionState is the state of the HTTP connection in the HTTP connection +// pool. +// +// The serverAddress is the server domain name if available without reverse DNS +// lookup; otherwise, IP address or Unix domain socket name. +// +// The serverPort is the port identifier of the ["URI origin"] HTTP request is +// sent to. +// +// All additional attrs passed are included in the recorded value. +// +// ["URI origin"]: https://www.rfc-editor.org/rfc/rfc9110.html#name-uri-origin +func (m ClientOpenConnections) Add( + ctx context.Context, + incr int64, + connectionState ConnectionStateAttr, + serverAddress string, + serverPort int, + attrs ...attribute.KeyValue, +) { + o := addOptPool.Get().(*[]metric.AddOption) + defer func() { + *o = (*o)[:0] + addOptPool.Put(o) + }() + + *o = append( + *o, + metric.WithAttributes( + append( + attrs, + attribute.String("http.connection.state", string(connectionState)), + attribute.String("server.address", serverAddress), + attribute.Int("server.port", serverPort), + )..., + ), + ) + + m.Int64UpDownCounter.Add(ctx, incr, *o...) +} + +// AttrNetworkPeerAddress returns an optional attribute for the +// "network.peer.address" semantic convention. It represents the peer address of +// the network connection - IP address or Unix domain socket name. +func (ClientOpenConnections) AttrNetworkPeerAddress(val string) attribute.KeyValue { + return attribute.String("network.peer.address", val) +} + +// AttrNetworkProtocolVersion returns an optional attribute for the +// "network.protocol.version" semantic convention. It represents the actual +// version of the protocol used for network communication. +func (ClientOpenConnections) AttrNetworkProtocolVersion(val string) attribute.KeyValue { + return attribute.String("network.protocol.version", val) +} + +// AttrURLScheme returns an optional attribute for the "url.scheme" semantic +// convention. It represents the [URI scheme] component identifying the used +// protocol. +// +// [URI scheme]: https://www.rfc-editor.org/rfc/rfc3986#section-3.1 +func (ClientOpenConnections) AttrURLScheme(val string) attribute.KeyValue { + return attribute.String("url.scheme", val) +} + +// ClientRequestBodySize is an instrument used to record metric values conforming +// to the "http.client.request.body.size" semantic conventions. It represents the +// size of HTTP client request bodies. +type ClientRequestBodySize struct { + metric.Int64Histogram +} + +// NewClientRequestBodySize returns a new ClientRequestBodySize instrument. +func NewClientRequestBodySize( + m metric.Meter, + opt ...metric.Int64HistogramOption, +) (ClientRequestBodySize, error) { + // Check if the meter is nil. + if m == nil { + return ClientRequestBodySize{noop.Int64Histogram{}}, nil + } + + i, err := m.Int64Histogram( + "http.client.request.body.size", + append([]metric.Int64HistogramOption{ + metric.WithDescription("Size of HTTP client request bodies."), + metric.WithUnit("By"), + }, opt...)..., + ) + if err != nil { + return ClientRequestBodySize{noop.Int64Histogram{}}, err + } + return ClientRequestBodySize{i}, nil +} + +// Inst returns the underlying metric instrument. +func (m ClientRequestBodySize) Inst() metric.Int64Histogram { + return m.Int64Histogram +} + +// Name returns the semantic convention name of the instrument. +func (ClientRequestBodySize) Name() string { + return "http.client.request.body.size" +} + +// Unit returns the semantic convention unit of the instrument +func (ClientRequestBodySize) Unit() string { + return "By" +} + +// Description returns the semantic convention description of the instrument +func (ClientRequestBodySize) Description() string { + return "Size of HTTP client request bodies." +} + +// Record records val to the current distribution. +// +// The requestMethod is the HTTP request method. +// +// The serverAddress is the host identifier of the ["URI origin"] HTTP request is +// sent to. +// +// The serverPort is the port identifier of the ["URI origin"] HTTP request is +// sent to. +// +// All additional attrs passed are included in the recorded value. +// +// ["URI origin"]: https://www.rfc-editor.org/rfc/rfc9110.html#name-uri-origin +// ["URI origin"]: https://www.rfc-editor.org/rfc/rfc9110.html#name-uri-origin +// +// The size of the request payload body in bytes. This is the number of bytes +// transferred excluding headers and is often, but not always, present as the +// [Content-Length] header. For requests using transport encoding, this should be +// the compressed size. +// +// [Content-Length]: https://www.rfc-editor.org/rfc/rfc9110.html#field.content-length +func (m ClientRequestBodySize) Record( + ctx context.Context, + val int64, + requestMethod RequestMethodAttr, + serverAddress string, + serverPort int, + attrs ...attribute.KeyValue, +) { + o := recOptPool.Get().(*[]metric.RecordOption) + defer func() { + *o = (*o)[:0] + recOptPool.Put(o) + }() + + *o = append( + *o, + metric.WithAttributes( + append( + attrs, + attribute.String("http.request.method", string(requestMethod)), + attribute.String("server.address", serverAddress), + attribute.Int("server.port", serverPort), + )..., + ), + ) + + m.Int64Histogram.Record(ctx, val, *o...) +} + +// AttrErrorType returns an optional attribute for the "error.type" semantic +// convention. It represents the describes a class of error the operation ended +// with. +func (ClientRequestBodySize) AttrErrorType(val ErrorTypeAttr) attribute.KeyValue { + return attribute.String("error.type", string(val)) +} + +// AttrResponseStatusCode returns an optional attribute for the +// "http.response.status_code" semantic convention. It represents the +// [HTTP response status code]. +// +// [HTTP response status code]: https://tools.ietf.org/html/rfc7231#section-6 +func (ClientRequestBodySize) AttrResponseStatusCode(val int) attribute.KeyValue { + return attribute.Int("http.response.status_code", val) +} + +// AttrNetworkProtocolName returns an optional attribute for the +// "network.protocol.name" semantic convention. It represents the +// [OSI application layer] or non-OSI equivalent. +// +// [OSI application layer]: https://wikipedia.org/wiki/Application_layer +func (ClientRequestBodySize) AttrNetworkProtocolName(val string) attribute.KeyValue { + return attribute.String("network.protocol.name", val) +} + +// AttrURLTemplate returns an optional attribute for the "url.template" semantic +// convention. It represents the low-cardinality template of an +// [absolute path reference]. +// +// [absolute path reference]: https://www.rfc-editor.org/rfc/rfc3986#section-4.2 +func (ClientRequestBodySize) AttrURLTemplate(val string) attribute.KeyValue { + return attribute.String("url.template", val) +} + +// AttrNetworkProtocolVersion returns an optional attribute for the +// "network.protocol.version" semantic convention. It represents the actual +// version of the protocol used for network communication. +func (ClientRequestBodySize) AttrNetworkProtocolVersion(val string) attribute.KeyValue { + return attribute.String("network.protocol.version", val) +} + +// AttrURLScheme returns an optional attribute for the "url.scheme" semantic +// convention. It represents the [URI scheme] component identifying the used +// protocol. +// +// [URI scheme]: https://www.rfc-editor.org/rfc/rfc3986#section-3.1 +func (ClientRequestBodySize) AttrURLScheme(val string) attribute.KeyValue { + return attribute.String("url.scheme", val) +} + +// ClientRequestDuration is an instrument used to record metric values conforming +// to the "http.client.request.duration" semantic conventions. It represents the +// duration of HTTP client requests. +type ClientRequestDuration struct { + metric.Float64Histogram +} + +// NewClientRequestDuration returns a new ClientRequestDuration instrument. +func NewClientRequestDuration( + m metric.Meter, + opt ...metric.Float64HistogramOption, +) (ClientRequestDuration, error) { + // Check if the meter is nil. + if m == nil { + return ClientRequestDuration{noop.Float64Histogram{}}, nil + } + + i, err := m.Float64Histogram( + "http.client.request.duration", + append([]metric.Float64HistogramOption{ + metric.WithDescription("Duration of HTTP client requests."), + metric.WithUnit("s"), + }, opt...)..., + ) + if err != nil { + return ClientRequestDuration{noop.Float64Histogram{}}, err + } + return ClientRequestDuration{i}, nil +} + +// Inst returns the underlying metric instrument. +func (m ClientRequestDuration) Inst() metric.Float64Histogram { + return m.Float64Histogram +} + +// Name returns the semantic convention name of the instrument. +func (ClientRequestDuration) Name() string { + return "http.client.request.duration" +} + +// Unit returns the semantic convention unit of the instrument +func (ClientRequestDuration) Unit() string { + return "s" +} + +// Description returns the semantic convention description of the instrument +func (ClientRequestDuration) Description() string { + return "Duration of HTTP client requests." +} + +// Record records val to the current distribution. +// +// The requestMethod is the HTTP request method. +// +// The serverAddress is the host identifier of the ["URI origin"] HTTP request is +// sent to. +// +// The serverPort is the port identifier of the ["URI origin"] HTTP request is +// sent to. +// +// All additional attrs passed are included in the recorded value. +// +// ["URI origin"]: https://www.rfc-editor.org/rfc/rfc9110.html#name-uri-origin +// ["URI origin"]: https://www.rfc-editor.org/rfc/rfc9110.html#name-uri-origin +func (m ClientRequestDuration) Record( + ctx context.Context, + val float64, + requestMethod RequestMethodAttr, + serverAddress string, + serverPort int, + attrs ...attribute.KeyValue, +) { + o := recOptPool.Get().(*[]metric.RecordOption) + defer func() { + *o = (*o)[:0] + recOptPool.Put(o) + }() + + *o = append( + *o, + metric.WithAttributes( + append( + attrs, + attribute.String("http.request.method", string(requestMethod)), + attribute.String("server.address", serverAddress), + attribute.Int("server.port", serverPort), + )..., + ), + ) + + m.Float64Histogram.Record(ctx, val, *o...) +} + +// AttrErrorType returns an optional attribute for the "error.type" semantic +// convention. It represents the describes a class of error the operation ended +// with. +func (ClientRequestDuration) AttrErrorType(val ErrorTypeAttr) attribute.KeyValue { + return attribute.String("error.type", string(val)) +} + +// AttrResponseStatusCode returns an optional attribute for the +// "http.response.status_code" semantic convention. It represents the +// [HTTP response status code]. +// +// [HTTP response status code]: https://tools.ietf.org/html/rfc7231#section-6 +func (ClientRequestDuration) AttrResponseStatusCode(val int) attribute.KeyValue { + return attribute.Int("http.response.status_code", val) +} + +// AttrNetworkProtocolName returns an optional attribute for the +// "network.protocol.name" semantic convention. It represents the +// [OSI application layer] or non-OSI equivalent. +// +// [OSI application layer]: https://wikipedia.org/wiki/Application_layer +func (ClientRequestDuration) AttrNetworkProtocolName(val string) attribute.KeyValue { + return attribute.String("network.protocol.name", val) +} + +// AttrNetworkProtocolVersion returns an optional attribute for the +// "network.protocol.version" semantic convention. It represents the actual +// version of the protocol used for network communication. +func (ClientRequestDuration) AttrNetworkProtocolVersion(val string) attribute.KeyValue { + return attribute.String("network.protocol.version", val) +} + +// AttrURLScheme returns an optional attribute for the "url.scheme" semantic +// convention. It represents the [URI scheme] component identifying the used +// protocol. +// +// [URI scheme]: https://www.rfc-editor.org/rfc/rfc3986#section-3.1 +func (ClientRequestDuration) AttrURLScheme(val string) attribute.KeyValue { + return attribute.String("url.scheme", val) +} + +// AttrURLTemplate returns an optional attribute for the "url.template" semantic +// convention. It represents the low-cardinality template of an +// [absolute path reference]. +// +// [absolute path reference]: https://www.rfc-editor.org/rfc/rfc3986#section-4.2 +func (ClientRequestDuration) AttrURLTemplate(val string) attribute.KeyValue { + return attribute.String("url.template", val) +} + +// ClientResponseBodySize is an instrument used to record metric values +// conforming to the "http.client.response.body.size" semantic conventions. It +// represents the size of HTTP client response bodies. +type ClientResponseBodySize struct { + metric.Int64Histogram +} + +// NewClientResponseBodySize returns a new ClientResponseBodySize instrument. +func NewClientResponseBodySize( + m metric.Meter, + opt ...metric.Int64HistogramOption, +) (ClientResponseBodySize, error) { + // Check if the meter is nil. + if m == nil { + return ClientResponseBodySize{noop.Int64Histogram{}}, nil + } + + i, err := m.Int64Histogram( + "http.client.response.body.size", + append([]metric.Int64HistogramOption{ + metric.WithDescription("Size of HTTP client response bodies."), + metric.WithUnit("By"), + }, opt...)..., + ) + if err != nil { + return ClientResponseBodySize{noop.Int64Histogram{}}, err + } + return ClientResponseBodySize{i}, nil +} + +// Inst returns the underlying metric instrument. +func (m ClientResponseBodySize) Inst() metric.Int64Histogram { + return m.Int64Histogram +} + +// Name returns the semantic convention name of the instrument. +func (ClientResponseBodySize) Name() string { + return "http.client.response.body.size" +} + +// Unit returns the semantic convention unit of the instrument +func (ClientResponseBodySize) Unit() string { + return "By" +} + +// Description returns the semantic convention description of the instrument +func (ClientResponseBodySize) Description() string { + return "Size of HTTP client response bodies." +} + +// Record records val to the current distribution. +// +// The requestMethod is the HTTP request method. +// +// The serverAddress is the host identifier of the ["URI origin"] HTTP request is +// sent to. +// +// The serverPort is the port identifier of the ["URI origin"] HTTP request is +// sent to. +// +// All additional attrs passed are included in the recorded value. +// +// ["URI origin"]: https://www.rfc-editor.org/rfc/rfc9110.html#name-uri-origin +// ["URI origin"]: https://www.rfc-editor.org/rfc/rfc9110.html#name-uri-origin +// +// The size of the response payload body in bytes. This is the number of bytes +// transferred excluding headers and is often, but not always, present as the +// [Content-Length] header. For requests using transport encoding, this should be +// the compressed size. +// +// [Content-Length]: https://www.rfc-editor.org/rfc/rfc9110.html#field.content-length +func (m ClientResponseBodySize) Record( + ctx context.Context, + val int64, + requestMethod RequestMethodAttr, + serverAddress string, + serverPort int, + attrs ...attribute.KeyValue, +) { + o := recOptPool.Get().(*[]metric.RecordOption) + defer func() { + *o = (*o)[:0] + recOptPool.Put(o) + }() + + *o = append( + *o, + metric.WithAttributes( + append( + attrs, + attribute.String("http.request.method", string(requestMethod)), + attribute.String("server.address", serverAddress), + attribute.Int("server.port", serverPort), + )..., + ), + ) + + m.Int64Histogram.Record(ctx, val, *o...) +} + +// AttrErrorType returns an optional attribute for the "error.type" semantic +// convention. It represents the describes a class of error the operation ended +// with. +func (ClientResponseBodySize) AttrErrorType(val ErrorTypeAttr) attribute.KeyValue { + return attribute.String("error.type", string(val)) +} + +// AttrResponseStatusCode returns an optional attribute for the +// "http.response.status_code" semantic convention. It represents the +// [HTTP response status code]. +// +// [HTTP response status code]: https://tools.ietf.org/html/rfc7231#section-6 +func (ClientResponseBodySize) AttrResponseStatusCode(val int) attribute.KeyValue { + return attribute.Int("http.response.status_code", val) +} + +// AttrNetworkProtocolName returns an optional attribute for the +// "network.protocol.name" semantic convention. It represents the +// [OSI application layer] or non-OSI equivalent. +// +// [OSI application layer]: https://wikipedia.org/wiki/Application_layer +func (ClientResponseBodySize) AttrNetworkProtocolName(val string) attribute.KeyValue { + return attribute.String("network.protocol.name", val) +} + +// AttrURLTemplate returns an optional attribute for the "url.template" semantic +// convention. It represents the low-cardinality template of an +// [absolute path reference]. +// +// [absolute path reference]: https://www.rfc-editor.org/rfc/rfc3986#section-4.2 +func (ClientResponseBodySize) AttrURLTemplate(val string) attribute.KeyValue { + return attribute.String("url.template", val) +} + +// AttrNetworkProtocolVersion returns an optional attribute for the +// "network.protocol.version" semantic convention. It represents the actual +// version of the protocol used for network communication. +func (ClientResponseBodySize) AttrNetworkProtocolVersion(val string) attribute.KeyValue { + return attribute.String("network.protocol.version", val) +} + +// AttrURLScheme returns an optional attribute for the "url.scheme" semantic +// convention. It represents the [URI scheme] component identifying the used +// protocol. +// +// [URI scheme]: https://www.rfc-editor.org/rfc/rfc3986#section-3.1 +func (ClientResponseBodySize) AttrURLScheme(val string) attribute.KeyValue { + return attribute.String("url.scheme", val) +} + +// ServerActiveRequests is an instrument used to record metric values conforming +// to the "http.server.active_requests" semantic conventions. It represents the +// number of active HTTP server requests. +type ServerActiveRequests struct { + metric.Int64UpDownCounter +} + +// NewServerActiveRequests returns a new ServerActiveRequests instrument. +func NewServerActiveRequests( + m metric.Meter, + opt ...metric.Int64UpDownCounterOption, +) (ServerActiveRequests, error) { + // Check if the meter is nil. + if m == nil { + return ServerActiveRequests{noop.Int64UpDownCounter{}}, nil + } + + i, err := m.Int64UpDownCounter( + "http.server.active_requests", + append([]metric.Int64UpDownCounterOption{ + metric.WithDescription("Number of active HTTP server requests."), + metric.WithUnit("{request}"), + }, opt...)..., + ) + if err != nil { + return ServerActiveRequests{noop.Int64UpDownCounter{}}, err + } + return ServerActiveRequests{i}, nil +} + +// Inst returns the underlying metric instrument. +func (m ServerActiveRequests) Inst() metric.Int64UpDownCounter { + return m.Int64UpDownCounter +} + +// Name returns the semantic convention name of the instrument. +func (ServerActiveRequests) Name() string { + return "http.server.active_requests" +} + +// Unit returns the semantic convention unit of the instrument +func (ServerActiveRequests) Unit() string { + return "{request}" +} + +// Description returns the semantic convention description of the instrument +func (ServerActiveRequests) Description() string { + return "Number of active HTTP server requests." +} + +// Add adds incr to the existing count. +// +// The requestMethod is the HTTP request method. +// +// The urlScheme is the the [URI scheme] component identifying the used protocol. +// +// All additional attrs passed are included in the recorded value. +// +// [URI scheme]: https://www.rfc-editor.org/rfc/rfc3986#section-3.1 +func (m ServerActiveRequests) Add( + ctx context.Context, + incr int64, + requestMethod RequestMethodAttr, + urlScheme string, + attrs ...attribute.KeyValue, +) { + o := addOptPool.Get().(*[]metric.AddOption) + defer func() { + *o = (*o)[:0] + addOptPool.Put(o) + }() + + *o = append( + *o, + metric.WithAttributes( + append( + attrs, + attribute.String("http.request.method", string(requestMethod)), + attribute.String("url.scheme", urlScheme), + )..., + ), + ) + + m.Int64UpDownCounter.Add(ctx, incr, *o...) +} + +// AttrServerAddress returns an optional attribute for the "server.address" +// semantic convention. It represents the name of the local HTTP server that +// received the request. +func (ServerActiveRequests) AttrServerAddress(val string) attribute.KeyValue { + return attribute.String("server.address", val) +} + +// AttrServerPort returns an optional attribute for the "server.port" semantic +// convention. It represents the port of the local HTTP server that received the +// request. +func (ServerActiveRequests) AttrServerPort(val int) attribute.KeyValue { + return attribute.Int("server.port", val) +} + +// ServerRequestBodySize is an instrument used to record metric values conforming +// to the "http.server.request.body.size" semantic conventions. It represents the +// size of HTTP server request bodies. +type ServerRequestBodySize struct { + metric.Int64Histogram +} + +// NewServerRequestBodySize returns a new ServerRequestBodySize instrument. +func NewServerRequestBodySize( + m metric.Meter, + opt ...metric.Int64HistogramOption, +) (ServerRequestBodySize, error) { + // Check if the meter is nil. + if m == nil { + return ServerRequestBodySize{noop.Int64Histogram{}}, nil + } + + i, err := m.Int64Histogram( + "http.server.request.body.size", + append([]metric.Int64HistogramOption{ + metric.WithDescription("Size of HTTP server request bodies."), + metric.WithUnit("By"), + }, opt...)..., + ) + if err != nil { + return ServerRequestBodySize{noop.Int64Histogram{}}, err + } + return ServerRequestBodySize{i}, nil +} + +// Inst returns the underlying metric instrument. +func (m ServerRequestBodySize) Inst() metric.Int64Histogram { + return m.Int64Histogram +} + +// Name returns the semantic convention name of the instrument. +func (ServerRequestBodySize) Name() string { + return "http.server.request.body.size" +} + +// Unit returns the semantic convention unit of the instrument +func (ServerRequestBodySize) Unit() string { + return "By" +} + +// Description returns the semantic convention description of the instrument +func (ServerRequestBodySize) Description() string { + return "Size of HTTP server request bodies." +} + +// Record records val to the current distribution. +// +// The requestMethod is the HTTP request method. +// +// The urlScheme is the the [URI scheme] component identifying the used protocol. +// +// All additional attrs passed are included in the recorded value. +// +// [URI scheme]: https://www.rfc-editor.org/rfc/rfc3986#section-3.1 +// +// The size of the request payload body in bytes. This is the number of bytes +// transferred excluding headers and is often, but not always, present as the +// [Content-Length] header. For requests using transport encoding, this should be +// the compressed size. +// +// [Content-Length]: https://www.rfc-editor.org/rfc/rfc9110.html#field.content-length +func (m ServerRequestBodySize) Record( + ctx context.Context, + val int64, + requestMethod RequestMethodAttr, + urlScheme string, + attrs ...attribute.KeyValue, +) { + o := recOptPool.Get().(*[]metric.RecordOption) + defer func() { + *o = (*o)[:0] + recOptPool.Put(o) + }() + + *o = append( + *o, + metric.WithAttributes( + append( + attrs, + attribute.String("http.request.method", string(requestMethod)), + attribute.String("url.scheme", urlScheme), + )..., + ), + ) + + m.Int64Histogram.Record(ctx, val, *o...) +} + +// AttrErrorType returns an optional attribute for the "error.type" semantic +// convention. It represents the describes a class of error the operation ended +// with. +func (ServerRequestBodySize) AttrErrorType(val ErrorTypeAttr) attribute.KeyValue { + return attribute.String("error.type", string(val)) +} + +// AttrResponseStatusCode returns an optional attribute for the +// "http.response.status_code" semantic convention. It represents the +// [HTTP response status code]. +// +// [HTTP response status code]: https://tools.ietf.org/html/rfc7231#section-6 +func (ServerRequestBodySize) AttrResponseStatusCode(val int) attribute.KeyValue { + return attribute.Int("http.response.status_code", val) +} + +// AttrRoute returns an optional attribute for the "http.route" semantic +// convention. It represents the matched route, that is, the path template in the +// format used by the respective server framework. +func (ServerRequestBodySize) AttrRoute(val string) attribute.KeyValue { + return attribute.String("http.route", val) +} + +// AttrNetworkProtocolName returns an optional attribute for the +// "network.protocol.name" semantic convention. It represents the +// [OSI application layer] or non-OSI equivalent. +// +// [OSI application layer]: https://wikipedia.org/wiki/Application_layer +func (ServerRequestBodySize) AttrNetworkProtocolName(val string) attribute.KeyValue { + return attribute.String("network.protocol.name", val) +} + +// AttrNetworkProtocolVersion returns an optional attribute for the +// "network.protocol.version" semantic convention. It represents the actual +// version of the protocol used for network communication. +func (ServerRequestBodySize) AttrNetworkProtocolVersion(val string) attribute.KeyValue { + return attribute.String("network.protocol.version", val) +} + +// AttrServerAddress returns an optional attribute for the "server.address" +// semantic convention. It represents the name of the local HTTP server that +// received the request. +func (ServerRequestBodySize) AttrServerAddress(val string) attribute.KeyValue { + return attribute.String("server.address", val) +} + +// AttrServerPort returns an optional attribute for the "server.port" semantic +// convention. It represents the port of the local HTTP server that received the +// request. +func (ServerRequestBodySize) AttrServerPort(val int) attribute.KeyValue { + return attribute.Int("server.port", val) +} + +// AttrUserAgentSyntheticType returns an optional attribute for the +// "user_agent.synthetic.type" semantic convention. It represents the specifies +// the category of synthetic traffic, such as tests or bots. +func (ServerRequestBodySize) AttrUserAgentSyntheticType(val UserAgentSyntheticTypeAttr) attribute.KeyValue { + return attribute.String("user_agent.synthetic.type", string(val)) +} + +// ServerRequestDuration is an instrument used to record metric values conforming +// to the "http.server.request.duration" semantic conventions. It represents the +// duration of HTTP server requests. +type ServerRequestDuration struct { + metric.Float64Histogram +} + +// NewServerRequestDuration returns a new ServerRequestDuration instrument. +func NewServerRequestDuration( + m metric.Meter, + opt ...metric.Float64HistogramOption, +) (ServerRequestDuration, error) { + // Check if the meter is nil. + if m == nil { + return ServerRequestDuration{noop.Float64Histogram{}}, nil + } + + i, err := m.Float64Histogram( + "http.server.request.duration", + append([]metric.Float64HistogramOption{ + metric.WithDescription("Duration of HTTP server requests."), + metric.WithUnit("s"), + }, opt...)..., + ) + if err != nil { + return ServerRequestDuration{noop.Float64Histogram{}}, err + } + return ServerRequestDuration{i}, nil +} + +// Inst returns the underlying metric instrument. +func (m ServerRequestDuration) Inst() metric.Float64Histogram { + return m.Float64Histogram +} + +// Name returns the semantic convention name of the instrument. +func (ServerRequestDuration) Name() string { + return "http.server.request.duration" +} + +// Unit returns the semantic convention unit of the instrument +func (ServerRequestDuration) Unit() string { + return "s" +} + +// Description returns the semantic convention description of the instrument +func (ServerRequestDuration) Description() string { + return "Duration of HTTP server requests." +} + +// Record records val to the current distribution. +// +// The requestMethod is the HTTP request method. +// +// The urlScheme is the the [URI scheme] component identifying the used protocol. +// +// All additional attrs passed are included in the recorded value. +// +// [URI scheme]: https://www.rfc-editor.org/rfc/rfc3986#section-3.1 +func (m ServerRequestDuration) Record( + ctx context.Context, + val float64, + requestMethod RequestMethodAttr, + urlScheme string, + attrs ...attribute.KeyValue, +) { + o := recOptPool.Get().(*[]metric.RecordOption) + defer func() { + *o = (*o)[:0] + recOptPool.Put(o) + }() + + *o = append( + *o, + metric.WithAttributes( + append( + attrs, + attribute.String("http.request.method", string(requestMethod)), + attribute.String("url.scheme", urlScheme), + )..., + ), + ) + + m.Float64Histogram.Record(ctx, val, *o...) +} + +// AttrErrorType returns an optional attribute for the "error.type" semantic +// convention. It represents the describes a class of error the operation ended +// with. +func (ServerRequestDuration) AttrErrorType(val ErrorTypeAttr) attribute.KeyValue { + return attribute.String("error.type", string(val)) +} + +// AttrResponseStatusCode returns an optional attribute for the +// "http.response.status_code" semantic convention. It represents the +// [HTTP response status code]. +// +// [HTTP response status code]: https://tools.ietf.org/html/rfc7231#section-6 +func (ServerRequestDuration) AttrResponseStatusCode(val int) attribute.KeyValue { + return attribute.Int("http.response.status_code", val) +} + +// AttrRoute returns an optional attribute for the "http.route" semantic +// convention. It represents the matched route, that is, the path template in the +// format used by the respective server framework. +func (ServerRequestDuration) AttrRoute(val string) attribute.KeyValue { + return attribute.String("http.route", val) +} + +// AttrNetworkProtocolName returns an optional attribute for the +// "network.protocol.name" semantic convention. It represents the +// [OSI application layer] or non-OSI equivalent. +// +// [OSI application layer]: https://wikipedia.org/wiki/Application_layer +func (ServerRequestDuration) AttrNetworkProtocolName(val string) attribute.KeyValue { + return attribute.String("network.protocol.name", val) +} + +// AttrNetworkProtocolVersion returns an optional attribute for the +// "network.protocol.version" semantic convention. It represents the actual +// version of the protocol used for network communication. +func (ServerRequestDuration) AttrNetworkProtocolVersion(val string) attribute.KeyValue { + return attribute.String("network.protocol.version", val) +} + +// AttrServerAddress returns an optional attribute for the "server.address" +// semantic convention. It represents the name of the local HTTP server that +// received the request. +func (ServerRequestDuration) AttrServerAddress(val string) attribute.KeyValue { + return attribute.String("server.address", val) +} + +// AttrServerPort returns an optional attribute for the "server.port" semantic +// convention. It represents the port of the local HTTP server that received the +// request. +func (ServerRequestDuration) AttrServerPort(val int) attribute.KeyValue { + return attribute.Int("server.port", val) +} + +// AttrUserAgentSyntheticType returns an optional attribute for the +// "user_agent.synthetic.type" semantic convention. It represents the specifies +// the category of synthetic traffic, such as tests or bots. +func (ServerRequestDuration) AttrUserAgentSyntheticType(val UserAgentSyntheticTypeAttr) attribute.KeyValue { + return attribute.String("user_agent.synthetic.type", string(val)) +} + +// ServerResponseBodySize is an instrument used to record metric values +// conforming to the "http.server.response.body.size" semantic conventions. It +// represents the size of HTTP server response bodies. +type ServerResponseBodySize struct { + metric.Int64Histogram +} + +// NewServerResponseBodySize returns a new ServerResponseBodySize instrument. +func NewServerResponseBodySize( + m metric.Meter, + opt ...metric.Int64HistogramOption, +) (ServerResponseBodySize, error) { + // Check if the meter is nil. + if m == nil { + return ServerResponseBodySize{noop.Int64Histogram{}}, nil + } + + i, err := m.Int64Histogram( + "http.server.response.body.size", + append([]metric.Int64HistogramOption{ + metric.WithDescription("Size of HTTP server response bodies."), + metric.WithUnit("By"), + }, opt...)..., + ) + if err != nil { + return ServerResponseBodySize{noop.Int64Histogram{}}, err + } + return ServerResponseBodySize{i}, nil +} + +// Inst returns the underlying metric instrument. +func (m ServerResponseBodySize) Inst() metric.Int64Histogram { + return m.Int64Histogram +} + +// Name returns the semantic convention name of the instrument. +func (ServerResponseBodySize) Name() string { + return "http.server.response.body.size" +} + +// Unit returns the semantic convention unit of the instrument +func (ServerResponseBodySize) Unit() string { + return "By" +} + +// Description returns the semantic convention description of the instrument +func (ServerResponseBodySize) Description() string { + return "Size of HTTP server response bodies." +} + +// Record records val to the current distribution. +// +// The requestMethod is the HTTP request method. +// +// The urlScheme is the the [URI scheme] component identifying the used protocol. +// +// All additional attrs passed are included in the recorded value. +// +// [URI scheme]: https://www.rfc-editor.org/rfc/rfc3986#section-3.1 +// +// The size of the response payload body in bytes. This is the number of bytes +// transferred excluding headers and is often, but not always, present as the +// [Content-Length] header. For requests using transport encoding, this should be +// the compressed size. +// +// [Content-Length]: https://www.rfc-editor.org/rfc/rfc9110.html#field.content-length +func (m ServerResponseBodySize) Record( + ctx context.Context, + val int64, + requestMethod RequestMethodAttr, + urlScheme string, + attrs ...attribute.KeyValue, +) { + o := recOptPool.Get().(*[]metric.RecordOption) + defer func() { + *o = (*o)[:0] + recOptPool.Put(o) + }() + + *o = append( + *o, + metric.WithAttributes( + append( + attrs, + attribute.String("http.request.method", string(requestMethod)), + attribute.String("url.scheme", urlScheme), + )..., + ), + ) + + m.Int64Histogram.Record(ctx, val, *o...) +} + +// AttrErrorType returns an optional attribute for the "error.type" semantic +// convention. It represents the describes a class of error the operation ended +// with. +func (ServerResponseBodySize) AttrErrorType(val ErrorTypeAttr) attribute.KeyValue { + return attribute.String("error.type", string(val)) +} + +// AttrResponseStatusCode returns an optional attribute for the +// "http.response.status_code" semantic convention. It represents the +// [HTTP response status code]. +// +// [HTTP response status code]: https://tools.ietf.org/html/rfc7231#section-6 +func (ServerResponseBodySize) AttrResponseStatusCode(val int) attribute.KeyValue { + return attribute.Int("http.response.status_code", val) +} + +// AttrRoute returns an optional attribute for the "http.route" semantic +// convention. It represents the matched route, that is, the path template in the +// format used by the respective server framework. +func (ServerResponseBodySize) AttrRoute(val string) attribute.KeyValue { + return attribute.String("http.route", val) +} + +// AttrNetworkProtocolName returns an optional attribute for the +// "network.protocol.name" semantic convention. It represents the +// [OSI application layer] or non-OSI equivalent. +// +// [OSI application layer]: https://wikipedia.org/wiki/Application_layer +func (ServerResponseBodySize) AttrNetworkProtocolName(val string) attribute.KeyValue { + return attribute.String("network.protocol.name", val) +} + +// AttrNetworkProtocolVersion returns an optional attribute for the +// "network.protocol.version" semantic convention. It represents the actual +// version of the protocol used for network communication. +func (ServerResponseBodySize) AttrNetworkProtocolVersion(val string) attribute.KeyValue { + return attribute.String("network.protocol.version", val) +} + +// AttrServerAddress returns an optional attribute for the "server.address" +// semantic convention. It represents the name of the local HTTP server that +// received the request. +func (ServerResponseBodySize) AttrServerAddress(val string) attribute.KeyValue { + return attribute.String("server.address", val) +} + +// AttrServerPort returns an optional attribute for the "server.port" semantic +// convention. It represents the port of the local HTTP server that received the +// request. +func (ServerResponseBodySize) AttrServerPort(val int) attribute.KeyValue { + return attribute.Int("server.port", val) +} + +// AttrUserAgentSyntheticType returns an optional attribute for the +// "user_agent.synthetic.type" semantic convention. It represents the specifies +// the category of synthetic traffic, such as tests or bots. +func (ServerResponseBodySize) AttrUserAgentSyntheticType(val UserAgentSyntheticTypeAttr) attribute.KeyValue { + return attribute.String("user_agent.synthetic.type", string(val)) +} \ No newline at end of file diff --git a/vendor/modules.txt b/vendor/modules.txt index 10322dabc6..2225bf811d 100644 --- a/vendor/modules.txt +++ b/vendor/modules.txt @@ -246,6 +246,7 @@ go.opentelemetry.io/otel/semconv/v1.20.0 go.opentelemetry.io/otel/semconv/v1.26.0 go.opentelemetry.io/otel/semconv/v1.34.0 go.opentelemetry.io/otel/semconv/v1.34.0/goconv +go.opentelemetry.io/otel/semconv/v1.34.0/httpconv # go.opentelemetry.io/otel/exporters/otlp/otlpmetric/otlpmetricgrpc v1.37.0 ## explicit; go 1.23.0 go.opentelemetry.io/otel/exporters/otlp/otlpmetric/otlpmetricgrpc From 7803a9bbff209466be1f0edca6fcba7014ebc17e Mon Sep 17 00:00:00 2001 From: Dave Protasowski Date: Tue, 1 Jul 2025 21:51:13 -0400 Subject: [PATCH 3/8] drop custom stats reporter --- controller/controller.go | 26 +-- controller/controller_test.go | 35 --- controller/stats_reporter.go | 204 ------------------ controller/stats_reporter_test.go | 112 ---------- controller/testing/fake_stats_reporter.go | 67 ------ .../testing/fake_stats_reporter_test.go | 49 ----- 6 files changed, 5 insertions(+), 488 deletions(-) delete mode 100644 controller/stats_reporter.go delete mode 100644 controller/stats_reporter_test.go delete mode 100644 controller/testing/fake_stats_reporter.go delete mode 100644 controller/testing/fake_stats_reporter_test.go diff --git a/controller/controller.go b/controller/controller.go index bb5ab347a1..c253402751 100644 --- a/controller/controller.go +++ b/controller/controller.go @@ -214,9 +214,6 @@ type Impl struct { // the expense of slightly greater verbosity. logger *zap.SugaredLogger - // StatsReporter is used to send common controller metrics. - statsReporter StatsReporter - // Tracker allows reconcilers to associate a reference with particular key, // such that when the reference changes the key is queued for reconciliation. Tracker tracker.Interface @@ -227,7 +224,6 @@ type Impl struct { type ControllerOptions struct { WorkQueueName string Logger *zap.SugaredLogger - Reporter StatsReporter RateLimiter workqueue.TypedRateLimiter[any] Concurrency int } @@ -238,19 +234,15 @@ func NewContext(ctx context.Context, r Reconciler, options ControllerOptions) *I if options.RateLimiter == nil { options.RateLimiter = workqueue.DefaultTypedControllerRateLimiter[any]() } - if options.Reporter == nil { - options.Reporter = MustNewStatsReporter(options.WorkQueueName, options.Logger) - } if options.Concurrency == 0 { options.Concurrency = DefaultThreadsPerController } i := &Impl{ - Name: options.WorkQueueName, - Reconciler: r, - workQueue: newTwoLaneWorkQueue(options.WorkQueueName, options.RateLimiter), - logger: options.Logger, - statsReporter: options.Reporter, - Concurrency: options.Concurrency, + Name: options.WorkQueueName, + Reconciler: r, + workQueue: newTwoLaneWorkQueue(options.WorkQueueName, options.RateLimiter), + logger: options.Logger, + Concurrency: options.Concurrency, } if t := GetTracker(ctx); t != nil { @@ -511,17 +503,9 @@ func (c *Impl) processNextWorkItem() bool { c.logger.Debugf("Processing from queue %s (depth: %d)", safeKey(key), c.workQueue.Len()) startTime := time.Now() - // Send the metrics for the current queue depth - c.statsReporter.ReportQueueDepth(int64(c.workQueue.Len())) var err error defer func() { - status := trueString - if err != nil { - status = falseString - } - c.statsReporter.ReportReconcile(time.Since(startTime), status, key) - // We call Done here so the workqueue knows we have finished // processing this item. We also must remember to call Forget if // reconcile succeeds. If a transient error occurs, we do not call diff --git a/controller/controller_test.go b/controller/controller_test.go index e4b7f744f6..eb89f006d7 100644 --- a/controller/controller_test.go +++ b/controller/controller_test.go @@ -42,7 +42,6 @@ import ( "knative.dev/pkg/reconciler" "knative.dev/pkg/system" - . "knative.dev/pkg/controller/testing" . "knative.dev/pkg/logging/testing" _ "knative.dev/pkg/system/testing" . "knative.dev/pkg/testing" @@ -755,7 +754,6 @@ func TestEnqueueAfter(t *testing.T) { impl := NewContext(context.TODO(), &nopReconciler{}, ControllerOptions{ Logger: TestLogger(t), WorkQueueName: "Testing", - Reporter: &FakeStatsReporter{}, }) t.Cleanup(func() { @@ -822,7 +820,6 @@ func TestEnqueueKeyAfter(t *testing.T) { impl := NewContext(context.TODO(), &nopReconciler{}, ControllerOptions{ Logger: TestLogger(t), WorkQueueName: "Testing", - Reporter: &FakeStatsReporter{}, }) t.Cleanup(func() { impl.WorkQueue().ShutDown() @@ -884,7 +881,6 @@ func TestStartAndShutdown(t *testing.T) { impl := NewContext(context.TODO(), &nopReconciler{}, ControllerOptions{ Logger: TestLogger(t), WorkQueueName: "Testing", - Reporter: &FakeStatsReporter{}, }) ctx, cancel := context.WithCancel(context.Background()) @@ -960,7 +956,6 @@ func TestStartAndShutdownWithLeaderAwareNoElection(t *testing.T) { impl := NewContext(context.TODO(), r, ControllerOptions{ Logger: TestLogger(t), WorkQueueName: "Testing", - Reporter: &FakeStatsReporter{}, }) ctx, cancel := context.WithCancel(context.Background()) @@ -1042,7 +1037,6 @@ func TestStartAndShutdownWithLeaderAwareWithLostElection(t *testing.T) { impl := NewContext(context.TODO(), &nopReconciler{}, ControllerOptions{ Logger: TestLogger(t), WorkQueueName: "Testing", - Reporter: &FakeStatsReporter{}, }) ctx, cancel := context.WithCancel(context.Background()) @@ -1082,11 +1076,9 @@ func TestStartAndShutdownWithLeaderAwareWithLostElection(t *testing.T) { func TestStartAndShutdownWithWork(t *testing.T) { r := &CountingReconciler{} - reporter := &FakeStatsReporter{} impl := NewContext(context.TODO(), r, ControllerOptions{ Logger: TestLogger(t), WorkQueueName: "Testing", - Reporter: reporter, }) ctx, cancel := context.WithCancel(context.Background()) @@ -1123,8 +1115,6 @@ func TestStartAndShutdownWithWork(t *testing.T) { if got, want := impl.WorkQueue().NumRequeues(types.NamespacedName{Namespace: "foo", Name: "bar"}), 0; got != want { t.Errorf("requeues = %v, wanted %v", got, want) } - - checkStats(t, reporter, 1, 0, 1, trueString) } type fakeError struct{} @@ -1190,7 +1180,6 @@ func TestStartAndShutdownWithErroringWork(t *testing.T) { impl := NewContext(context.TODO(), &errorReconciler{}, ControllerOptions{ Logger: TestLogger(t), WorkQueueName: "Testing", - Reporter: &FakeStatsReporter{}, }) impl.EnqueueKey(item) @@ -1245,11 +1234,9 @@ func (er *permanentErrorReconciler) Reconcile(context.Context, string) error { func TestStartAndShutdownWithPermanentErroringWork(t *testing.T) { r := &permanentErrorReconciler{} - reporter := &FakeStatsReporter{} impl := NewContext(context.TODO(), r, ControllerOptions{ Logger: TestLogger(t), WorkQueueName: "Testing", - Reporter: reporter, }) ctx, cancel := context.WithCancel(context.Background()) @@ -1284,8 +1271,6 @@ func TestStartAndShutdownWithPermanentErroringWork(t *testing.T) { if got, want := impl.WorkQueue().NumRequeues(types.NamespacedName{Namespace: "foo", Name: "bar"}), 0; got != want { t.Errorf("Requeue count = %v, wanted %v", got, want) } - - checkStats(t, reporter, 1, 0, 1, falseString) } type requeueAfterReconciler struct { @@ -1313,11 +1298,9 @@ func TestStartAndShutdownWithRequeuingWork(t *testing.T) { for _, test := range tests { t.Run(test.name, func(t *testing.T) { r := &requeueAfterReconciler{duration: test.duration} - reporter := &FakeStatsReporter{} impl := NewContext(context.TODO(), r, ControllerOptions{ Logger: TestLogger(t), WorkQueueName: "Testing", - Reporter: reporter, }) ctx, cancel := context.WithCancel(context.Background()) @@ -1416,7 +1399,6 @@ func TestImplGlobalResync(t *testing.T) { impl := NewContext(context.TODO(), r, ControllerOptions{ Logger: TestLogger(t), WorkQueueName: "Testing", - Reporter: &FakeStatsReporter{}, }) ctx, cancel := context.WithCancel(context.Background()) @@ -1455,23 +1437,6 @@ func TestImplGlobalResync(t *testing.T) { } } -func checkStats(t *testing.T, r *FakeStatsReporter, reportCount, lastQueueDepth, reconcileCount int, lastReconcileSuccess string) { - qd := r.GetQueueDepths() - if got, want := len(qd), reportCount; got != want { - t.Errorf("Queue depth reports = %v, wanted %v", got, want) - } - if got, want := qd[len(qd)-1], int64(lastQueueDepth); got != want { - t.Errorf("Queue depth report = %v, wanted %v", got, want) - } - rd := r.GetReconcileData() - if got, want := len(rd), reconcileCount; got != want { - t.Errorf("Reconcile reports = %v, wanted %v", got, want) - } - if got, want := rd[len(rd)-1].Success, lastReconcileSuccess; got != want { - t.Errorf("Reconcile success = %v, wanted %v", got, want) - } -} - type fixedInformer struct { m sync.Mutex sunk bool diff --git a/controller/stats_reporter.go b/controller/stats_reporter.go deleted file mode 100644 index 99386f91b2..0000000000 --- a/controller/stats_reporter.go +++ /dev/null @@ -1,204 +0,0 @@ -/* -Copyright 2018 The Knative Authors - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -*/ - -package controller - -import ( - "context" - "errors" - "time" - - "go.opencensus.io/stats" - "go.opencensus.io/stats/view" - "go.opencensus.io/tag" - "go.uber.org/zap" - "k8s.io/apimachinery/pkg/types" - kubemetrics "k8s.io/client-go/tools/metrics" - "k8s.io/client-go/util/workqueue" - "knative.dev/pkg/metrics" - "knative.dev/pkg/metrics/metricskey" -) - -var ( - workQueueDepthStat = stats.Int64("work_queue_depth", "Depth of the work queue", stats.UnitDimensionless) - reconcileCountStat = stats.Int64("reconcile_count", "Number of reconcile operations", stats.UnitDimensionless) - reconcileLatencyStat = stats.Int64("reconcile_latency", "Latency of reconcile operations", stats.UnitMilliseconds) - - // reconcileDistribution defines the bucket boundaries for the histogram of reconcile latency metric. - // Bucket boundaries are 10ms, 100ms, 1s, 10s, 30s and 60s. - reconcileDistribution = view.Distribution(10, 100, 1000, 10000, 30000, 60000) - - // Create the tag keys that will be used to add tags to our measurements. - // Tag keys must conform to the restrictions described in - // go.opencensus.io/tag/validate.go. Currently those restrictions are: - // - length between 1 and 255 inclusive - // - characters are printable US-ASCII - reconcilerTagKey = tag.MustNewKey("reconciler") - successTagKey = tag.MustNewKey("success") - - // NamespaceTagKey marks metrics with a namespace. - NamespaceTagKey = tag.MustNewKey(metricskey.LabelNamespaceName) -) - -func init() { - // Register to receive metrics from kubernetes workqueues. - wp := &metrics.WorkqueueProvider{ - Adds: stats.Int64( - "workqueue_adds_total", - "Total number of adds handled by workqueue", - stats.UnitDimensionless, - ), - Depth: stats.Int64( - "workqueue_depth", - "Current depth of workqueue", - stats.UnitDimensionless, - ), - Latency: stats.Float64( - "workqueue_queue_latency_seconds", - "How long in seconds an item stays in workqueue before being requested.", - stats.UnitSeconds, - ), - Retries: stats.Int64( - "workqueue_retries_total", - "Total number of retries handled by workqueue", - stats.UnitDimensionless, - ), - WorkDuration: stats.Float64( - "workqueue_work_duration_seconds", - "How long in seconds processing an item from workqueue takes.", - stats.UnitSeconds, - ), - UnfinishedWorkSeconds: stats.Float64( - "workqueue_unfinished_work_seconds", - "How long in seconds the outstanding workqueue items have been in flight (total).", - stats.UnitSeconds, - ), - LongestRunningProcessorSeconds: stats.Float64( - "workqueue_longest_running_processor_seconds", - "How long in seconds the longest outstanding workqueue item has been in flight.", - stats.UnitSeconds, - ), - } - workqueue.SetProvider(wp) - - cp := &metrics.ClientProvider{ - Latency: stats.Float64( - "client_latency", - "How long Kubernetes API requests take", - stats.UnitSeconds, - ), - Result: stats.Int64( - "client_results", - "Total number of API requests (broken down by status code)", - stats.UnitDimensionless, - ), - } - opts := kubemetrics.RegisterOpts{ - RequestLatency: cp.NewLatencyMetric(), - RequestResult: cp.NewResultMetric(), - } - kubemetrics.Register(opts) - - views := []*view.View{{ - Description: "Depth of the work queue", - Measure: workQueueDepthStat, - Aggregation: view.LastValue(), - TagKeys: []tag.Key{reconcilerTagKey}, - }, { - Description: "Number of reconcile operations", - Measure: reconcileCountStat, - Aggregation: view.Count(), - TagKeys: []tag.Key{reconcilerTagKey, successTagKey, NamespaceTagKey}, - }, { - Description: "Latency of reconcile operations", - Measure: reconcileLatencyStat, - Aggregation: reconcileDistribution, - TagKeys: []tag.Key{reconcilerTagKey, successTagKey, NamespaceTagKey}, - }} - views = append(views, wp.DefaultViews()...) - views = append(views, cp.DefaultViews()...) - - // Create views to see our measurements. This can return an error if - // a previously-registered view has the same name with a different value. - // View name defaults to the measure name if unspecified. - if err := view.Register(views...); err != nil { - panic(err) - } -} - -// StatsReporter defines the interface for sending metrics -type StatsReporter interface { - // ReportQueueDepth reports the queue depth metric - ReportQueueDepth(v int64) error - - // ReportReconcile reports the count and latency metrics for a reconcile operation - ReportReconcile(duration time.Duration, success string, key types.NamespacedName) error -} - -// Reporter holds cached metric objects to report metrics -type reporter struct { - reconciler string - globalCtx context.Context -} - -// NewStatsReporter creates a reporter that collects and reports metrics -func NewStatsReporter(reconciler string) (StatsReporter, error) { - // Reconciler tag is static. Create a context containing that and cache it. - ctx, err := tag.New( - context.Background(), - tag.Insert(reconcilerTagKey, reconciler)) - if err != nil { - return nil, err - } - - return &reporter{reconciler: reconciler, globalCtx: ctx}, nil -} - -// MustNewStatsReporter creates a new instance of StatsReporter. -// Logs fatally if creation fails. -func MustNewStatsReporter(reconciler string, logger *zap.SugaredLogger) StatsReporter { - stats, err := NewStatsReporter(reconciler) - if err != nil { - logger.Fatalw("Failed to initialize the stats reporter", zap.Error(err)) - } - return stats -} - -// ReportQueueDepth reports the queue depth metric -func (r *reporter) ReportQueueDepth(v int64) error { - if r.globalCtx == nil { - return errors.New("reporter is not initialized correctly") - } - metrics.Record(r.globalCtx, workQueueDepthStat.M(v)) - return nil -} - -// ReportReconcile reports the count and latency metrics for a reconcile operation -func (r *reporter) ReportReconcile(duration time.Duration, success string, key types.NamespacedName) error { - ctx, err := tag.New( - context.Background(), - tag.Insert(reconcilerTagKey, r.reconciler), - tag.Insert(successTagKey, success), - tag.Insert(NamespaceTagKey, key.Namespace), - ) - if err != nil { - return err - } - - metrics.RecordBatch(ctx, reconcileCountStat.M(1), - reconcileLatencyStat.M(duration.Milliseconds())) - return nil -} diff --git a/controller/stats_reporter_test.go b/controller/stats_reporter_test.go deleted file mode 100644 index a9e99eb31d..0000000000 --- a/controller/stats_reporter_test.go +++ /dev/null @@ -1,112 +0,0 @@ -/* -Copyright 2018 The Knative Authors - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -*/ - -package controller - -import ( - "math" - "strings" - "testing" - "time" - - "go.opencensus.io/stats/view" - "k8s.io/apimachinery/pkg/types" - "knative.dev/pkg/metrics/metricstest" - _ "knative.dev/pkg/metrics/testing" -) - -func TestNewStatsReporterErrors(t *testing.T) { - // These are invalid as defined by the current OpenCensus library. - invalidTagValues := []string{ - "naïve", // Includes non-ASCII character. - strings.Repeat("a", 256), // Longer than 255 characters. - } - - for _, v := range invalidTagValues { - _, err := NewStatsReporter(v) - if err == nil { - t.Errorf("Expected err to not be nil for value %q, got nil", v) - } - } -} - -func TestReportQueueDepth(t *testing.T) { - r1 := &reporter{} - if err := r1.ReportQueueDepth(10); err == nil { - t.Error("Reporter.Report() expected an error for Report call before init. Got success.") - } - - r, _ := NewStatsReporter("testreconciler") - wantTags := map[string]string{ - "reconciler": "testreconciler", - } - - // Send statistics only once and observe the results - expectSuccess(t, func() error { return r.ReportQueueDepth(10) }) - metricstest.CheckLastValueData(t, "work_queue_depth", wantTags, 10) - - // Queue depth stats is a gauge - record multiple entries - last one should stick - expectSuccess(t, func() error { return r.ReportQueueDepth(1) }) - expectSuccess(t, func() error { return r.ReportQueueDepth(2) }) - expectSuccess(t, func() error { return r.ReportQueueDepth(3) }) - metricstest.CheckLastValueData(t, "work_queue_depth", wantTags, 3) -} - -func TestReportReconcile(t *testing.T) { - r, _ := NewStatsReporter("testreconciler") - rName := "test_resource" - rNamespace := "default" - wantTags := map[string]string{ - "reconciler": "testreconciler", - "success": "true", - "namespace_name": rNamespace, - } - - initialReconcileCount := int64(0) - if d, err := view.RetrieveData("reconcile_count"); err == nil && len(d) == 1 { - initialReconcileCount = d[0].Data.(*view.CountData).Value - } - initialMin, initialMax := math.SmallestNonzeroFloat64, math.SmallestNonzeroFloat64 - initialDistributionCount := int64(0) - if d, err := view.RetrieveData("reconcile_latency"); err == nil && len(d) == 1 { - dd := d[0].Data.(*view.DistributionData) - initialMin, initialMax = dd.Min, dd.Max - initialDistributionCount = dd.Count - } - key := types.NamespacedName{ - Name: rName, - Namespace: rNamespace, - } - - slow, fast := initialMax+5, initialMin-5 - - expectSuccess(t, func() error { return r.ReportReconcile(time.Duration(fast)*time.Millisecond, "true", key) }) - metricstest.CheckCountData(t, "reconcile_count", wantTags, initialReconcileCount+1) - metricstest.CheckDistributionData(t, "reconcile_latency", wantTags, initialDistributionCount+1, - fast, initialMax) - - expectSuccess(t, func() error { return r.ReportReconcile(time.Duration(slow)*time.Millisecond, "true", key) }) - metricstest.CheckCountData(t, "reconcile_count", wantTags, initialReconcileCount+2) - metricstest.CheckDistributionData(t, "reconcile_latency", wantTags, initialDistributionCount+2, - fast, slow) -} - -func expectSuccess(t *testing.T, f func() error) { - t.Helper() - if err := f(); err != nil { - t.Error("Reporter.Report() expected success but got error", err) - } -} diff --git a/controller/testing/fake_stats_reporter.go b/controller/testing/fake_stats_reporter.go deleted file mode 100644 index 68a23999ed..0000000000 --- a/controller/testing/fake_stats_reporter.go +++ /dev/null @@ -1,67 +0,0 @@ -/* -Copyright 2017 The Knative Authors - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -*/ - -package testing - -import ( - "sync" - "time" - - "k8s.io/apimachinery/pkg/types" -) - -// FakeStatsReporter is a fake implementation of StatsReporter -type FakeStatsReporter struct { - queueDepths []int64 - reconcileData []FakeReconcileStatData - Lock sync.Mutex -} - -// FakeReconcileStatData is used to record the calls to ReportReconcile -type FakeReconcileStatData struct { - Duration time.Duration - Success string -} - -// ReportQueueDepth records the call and returns success. -func (r *FakeStatsReporter) ReportQueueDepth(v int64) error { - r.Lock.Lock() - defer r.Lock.Unlock() - r.queueDepths = append(r.queueDepths, v) - return nil -} - -// ReportReconcile records the call and returns success. -func (r *FakeStatsReporter) ReportReconcile(duration time.Duration, success string, _ types.NamespacedName) error { - r.Lock.Lock() - defer r.Lock.Unlock() - r.reconcileData = append(r.reconcileData, FakeReconcileStatData{duration, success}) - return nil -} - -// GetQueueDepths returns the recorded queue depth values -func (r *FakeStatsReporter) GetQueueDepths() []int64 { - r.Lock.Lock() - defer r.Lock.Unlock() - return r.queueDepths -} - -// GetReconcileData returns the recorded reconcile data -func (r *FakeStatsReporter) GetReconcileData() []FakeReconcileStatData { - r.Lock.Lock() - defer r.Lock.Unlock() - return r.reconcileData -} diff --git a/controller/testing/fake_stats_reporter_test.go b/controller/testing/fake_stats_reporter_test.go deleted file mode 100644 index 6630ac1cac..0000000000 --- a/controller/testing/fake_stats_reporter_test.go +++ /dev/null @@ -1,49 +0,0 @@ -/* -Copyright 2018 The Knative Authors - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -*/ - -package testing - -import ( - "reflect" - "testing" - "time" - - "github.com/google/go-cmp/cmp" - "k8s.io/apimachinery/pkg/types" - - "knative.dev/pkg/controller" -) - -var _ controller.StatsReporter = (*FakeStatsReporter)(nil) - -func TestReportQueueDepth(t *testing.T) { - r := &FakeStatsReporter{} - r.ReportQueueDepth(10) - if diff := cmp.Diff(r.GetQueueDepths(), []int64{10}); diff != "" { - t.Error("queue depth len:", diff) - } -} - -func TestReportReconcile(t *testing.T) { - r := &FakeStatsReporter{} - r.ReportReconcile(time.Duration(123), "False", types.NamespacedName{ - Name: "test_name", - Namespace: "default", - }) - if got, want := r.GetReconcileData(), []FakeReconcileStatData{{time.Duration(123), "False"}}; !reflect.DeepEqual(want, got) { - t.Errorf("reconcile data len: want: %v, got: %v", want, got) - } -} From bd7f615c2829e909b567a8c9f6ec45542fa2bdbf Mon Sep 17 00:00:00 2001 From: Dave Protasowski Date: Wed, 2 Jul 2025 10:55:45 -0400 Subject: [PATCH 4/8] address linter --- controller/controller.go | 3 --- observability/metrics/k8s/tools.go | 7 ++++--- 2 files changed, 4 insertions(+), 6 deletions(-) diff --git a/controller/controller.go b/controller/controller.go index c253402751..9ccad81489 100644 --- a/controller/controller.go +++ b/controller/controller.go @@ -46,9 +46,6 @@ import ( ) const ( - falseString = "false" - trueString = "true" - // DefaultResyncPeriod is the default duration that is used when no // resync period is associated with a controllers initialization context. DefaultResyncPeriod = 10 * time.Hour diff --git a/observability/metrics/k8s/tools.go b/observability/metrics/k8s/tools.go index 83e8ab20ec..a897f416e6 100644 --- a/observability/metrics/k8s/tools.go +++ b/observability/metrics/k8s/tools.go @@ -38,9 +38,7 @@ const ( resultMetricDescription = "Count of response codes partitioned by method and host" ) -var ( - latencyBounds = []float64{0.005, 0.01, 0.025, 0.05, 0.075, 0.1, 0.25, 0.5, 0.75, 1, 2.5, 5, 7.5, 10} -) +var latencyBounds = []float64{0.005, 0.01, 0.025, 0.05, 0.075, 0.1, 0.25, 0.5, 0.75, 1, 2.5, 5, 7.5, 10} type ClientProvider struct { latencyMetric httpconv.ClientRequestDuration @@ -74,6 +72,9 @@ func NewClientMetricProvider(opts ...Option) (*ClientProvider, error) { metric.WithDescription(resultMetricDescription), metric.WithUnit("{item}"), ) + if err != nil { + return nil, err + } return cp, nil } From 6b3b93d76c7e4efe6f73031701f4a07986eb8699 Mon Sep 17 00:00:00 2001 From: Dave Protasowski Date: Wed, 2 Jul 2025 11:03:02 -0400 Subject: [PATCH 5/8] fix boilerplate --- observability/metrics/k8s/tools.go | 2 +- observability/metrics/k8s/tools_test.go | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/observability/metrics/k8s/tools.go b/observability/metrics/k8s/tools.go index a897f416e6..521c864155 100644 --- a/observability/metrics/k8s/tools.go +++ b/observability/metrics/k8s/tools.go @@ -5,7 +5,7 @@ Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at - http://www.apache.org/licenses/LICENSE-2.0 + http://www.apache.org/licenses/LICENSE-2.0 Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, diff --git a/observability/metrics/k8s/tools_test.go b/observability/metrics/k8s/tools_test.go index f5b0cd63ac..475529c584 100644 --- a/observability/metrics/k8s/tools_test.go +++ b/observability/metrics/k8s/tools_test.go @@ -5,7 +5,7 @@ Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at - http://www.apache.org/licenses/LICENSE-2.0 + http://www.apache.org/licenses/LICENSE-2.0 Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, From 404a7b57b47f65bd6ae4640e16aa31c93b46ddf3 Mon Sep 17 00:00:00 2001 From: Dave Protasowski Date: Wed, 2 Jul 2025 11:40:08 -0400 Subject: [PATCH 6/8] add some bounds checks --- observability/metrics/k8s/tools.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/observability/metrics/k8s/tools.go b/observability/metrics/k8s/tools.go index 521c864155..315534bbef 100644 --- a/observability/metrics/k8s/tools.go +++ b/observability/metrics/k8s/tools.go @@ -101,7 +101,7 @@ func (l *latency) Observe(ctx context.Context, verb string, u url.URL, latency t if portStr := u.Port(); portStr != "" { port, err := strconv.ParseInt(portStr, 10, 64) - if err != nil && port > 0 { + if err != nil && port > 0 && port < 65535 { serverPort = int(port) } } @@ -126,7 +126,7 @@ func (r *result) Increment(ctx context.Context, code string, method string, host if code == "200" { // happy path - noop - } else if c, err := strconv.ParseInt(code, 10, 64); err != nil { + } else if c, err := strconv.ParseInt(code, 10, 64); err != nil && c >= 100 && c < 600 { codeInt = int(c) } From 6ab449a258a3f7dbdd8effda618628ddb681b063 Mon Sep 17 00:00:00 2001 From: Dave Protasowski Date: Thu, 3 Jul 2025 08:44:33 -0400 Subject: [PATCH 7/8] Simplify int parsing and attribute handling - use strconv.Atoi - don't set status code attribute if we can't parse it --- observability/metrics/k8s/tools.go | 34 ++++++++++++++++++------------ 1 file changed, 20 insertions(+), 14 deletions(-) diff --git a/observability/metrics/k8s/tools.go b/observability/metrics/k8s/tools.go index 315534bbef..a8d13da15a 100644 --- a/observability/metrics/k8s/tools.go +++ b/observability/metrics/k8s/tools.go @@ -100,9 +100,8 @@ func (l *latency) Observe(ctx context.Context, verb string, u url.URL, latency t } if portStr := u.Port(); portStr != "" { - port, err := strconv.ParseInt(portStr, 10, 64) - if err != nil && port > 0 && port < 65535 { - serverPort = int(port) + if port, err := strconv.Atoi(portStr); err != nil { + serverPort = port } } @@ -122,21 +121,28 @@ type result struct { } func (r *result) Increment(ctx context.Context, code string, method string, host string) { - codeInt := 200 + var attrs attribute.Set if code == "200" { - // happy path - noop - } else if c, err := strconv.ParseInt(code, 10, 64); err != nil && c >= 100 && c < 600 { - codeInt = int(c) - } - - r.cp.resultMetric.Add(ctx, 1, - metric.WithAttributeSet(attribute.NewSet( + attrs = attribute.NewSet( semconv.ServerAddressKey.String(host), semconv.HTTPRequestMethodKey.String(strings.ToUpper(method)), - semconv.HTTPResponseStatusCodeKey.Int(codeInt), - )), - ) + semconv.HTTPResponseStatusCodeKey.Int(200), + ) + } else if c, err := strconv.Atoi(code); err != nil { + attrs = attribute.NewSet( + semconv.ServerAddressKey.String(host), + semconv.HTTPRequestMethodKey.String(strings.ToUpper(method)), + semconv.HTTPResponseStatusCodeKey.Int(c), + ) + } else { + attrs = attribute.NewSet( + semconv.ServerAddressKey.String(host), + semconv.HTTPRequestMethodKey.String(strings.ToUpper(method)), + ) + } + + r.cp.resultMetric.Add(ctx, 1, metric.WithAttributeSet(attrs)) } var ( From 6b157c4be71cf930c442293ce6e4364c535aa362 Mon Sep 17 00:00:00 2001 From: Dave Protasowski Date: Fri, 4 Jul 2025 10:47:19 -0400 Subject: [PATCH 8/8] add comment --- observability/metrics/k8s/tools.go | 2 ++ 1 file changed, 2 insertions(+) diff --git a/observability/metrics/k8s/tools.go b/observability/metrics/k8s/tools.go index a8d13da15a..679154fb83 100644 --- a/observability/metrics/k8s/tools.go +++ b/observability/metrics/k8s/tools.go @@ -123,6 +123,8 @@ type result struct { func (r *result) Increment(ctx context.Context, code string, method string, host string) { var attrs attribute.Set + // assume we hit the happy path most frequently then in + // that case we want to avoid the strconv.Atoi parsing if code == "200" { attrs = attribute.NewSet( semconv.ServerAddressKey.String(host),