Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
29 changes: 5 additions & 24 deletions controller/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,9 +46,6 @@ import (
)

const (
falseString = "false"
trueString = "true"

// DefaultResyncPeriod is the default duration that is used when no
// resync period is associated with a controllers initialization context.
DefaultResyncPeriod = 10 * time.Hour
Expand Down Expand Up @@ -214,9 +211,6 @@ type Impl struct {
// the expense of slightly greater verbosity.
logger *zap.SugaredLogger

// StatsReporter is used to send common controller metrics.
statsReporter StatsReporter

// Tracker allows reconcilers to associate a reference with particular key,
// such that when the reference changes the key is queued for reconciliation.
Tracker tracker.Interface
Expand All @@ -227,7 +221,6 @@ type Impl struct {
type ControllerOptions struct {
WorkQueueName string
Logger *zap.SugaredLogger
Reporter StatsReporter
RateLimiter workqueue.TypedRateLimiter[any]
Concurrency int
}
Expand All @@ -238,19 +231,15 @@ func NewContext(ctx context.Context, r Reconciler, options ControllerOptions) *I
if options.RateLimiter == nil {
options.RateLimiter = workqueue.DefaultTypedControllerRateLimiter[any]()
}
if options.Reporter == nil {
options.Reporter = MustNewStatsReporter(options.WorkQueueName, options.Logger)
}
if options.Concurrency == 0 {
options.Concurrency = DefaultThreadsPerController
}
i := &Impl{
Name: options.WorkQueueName,
Reconciler: r,
workQueue: newTwoLaneWorkQueue(options.WorkQueueName, options.RateLimiter),
logger: options.Logger,
statsReporter: options.Reporter,
Concurrency: options.Concurrency,
Name: options.WorkQueueName,
Reconciler: r,
workQueue: newTwoLaneWorkQueue(options.WorkQueueName, options.RateLimiter),
logger: options.Logger,
Concurrency: options.Concurrency,
}

if t := GetTracker(ctx); t != nil {
Expand Down Expand Up @@ -511,17 +500,9 @@ func (c *Impl) processNextWorkItem() bool {
c.logger.Debugf("Processing from queue %s (depth: %d)", safeKey(key), c.workQueue.Len())

startTime := time.Now()
// Send the metrics for the current queue depth
c.statsReporter.ReportQueueDepth(int64(c.workQueue.Len()))

var err error
defer func() {
status := trueString
if err != nil {
status = falseString
}
c.statsReporter.ReportReconcile(time.Since(startTime), status, key)

// We call Done here so the workqueue knows we have finished
// processing this item. We also must remember to call Forget if
// reconcile succeeds. If a transient error occurs, we do not call
Expand Down
35 changes: 0 additions & 35 deletions controller/controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,6 @@ import (
"knative.dev/pkg/reconciler"
"knative.dev/pkg/system"

. "knative.dev/pkg/controller/testing"
. "knative.dev/pkg/logging/testing"
_ "knative.dev/pkg/system/testing"
. "knative.dev/pkg/testing"
Expand Down Expand Up @@ -755,7 +754,6 @@ func TestEnqueueAfter(t *testing.T) {
impl := NewContext(context.TODO(), &nopReconciler{}, ControllerOptions{
Logger: TestLogger(t),
WorkQueueName: "Testing",
Reporter: &FakeStatsReporter{},
})

t.Cleanup(func() {
Expand Down Expand Up @@ -822,7 +820,6 @@ func TestEnqueueKeyAfter(t *testing.T) {
impl := NewContext(context.TODO(), &nopReconciler{}, ControllerOptions{
Logger: TestLogger(t),
WorkQueueName: "Testing",
Reporter: &FakeStatsReporter{},
})
t.Cleanup(func() {
impl.WorkQueue().ShutDown()
Expand Down Expand Up @@ -884,7 +881,6 @@ func TestStartAndShutdown(t *testing.T) {
impl := NewContext(context.TODO(), &nopReconciler{}, ControllerOptions{
Logger: TestLogger(t),
WorkQueueName: "Testing",
Reporter: &FakeStatsReporter{},
})

ctx, cancel := context.WithCancel(context.Background())
Expand Down Expand Up @@ -960,7 +956,6 @@ func TestStartAndShutdownWithLeaderAwareNoElection(t *testing.T) {
impl := NewContext(context.TODO(), r, ControllerOptions{
Logger: TestLogger(t),
WorkQueueName: "Testing",
Reporter: &FakeStatsReporter{},
})

ctx, cancel := context.WithCancel(context.Background())
Expand Down Expand Up @@ -1042,7 +1037,6 @@ func TestStartAndShutdownWithLeaderAwareWithLostElection(t *testing.T) {
impl := NewContext(context.TODO(), &nopReconciler{}, ControllerOptions{
Logger: TestLogger(t),
WorkQueueName: "Testing",
Reporter: &FakeStatsReporter{},
})

ctx, cancel := context.WithCancel(context.Background())
Expand Down Expand Up @@ -1082,11 +1076,9 @@ func TestStartAndShutdownWithLeaderAwareWithLostElection(t *testing.T) {

func TestStartAndShutdownWithWork(t *testing.T) {
r := &CountingReconciler{}
reporter := &FakeStatsReporter{}
impl := NewContext(context.TODO(), r, ControllerOptions{
Logger: TestLogger(t),
WorkQueueName: "Testing",
Reporter: reporter,
})

ctx, cancel := context.WithCancel(context.Background())
Expand Down Expand Up @@ -1123,8 +1115,6 @@ func TestStartAndShutdownWithWork(t *testing.T) {
if got, want := impl.WorkQueue().NumRequeues(types.NamespacedName{Namespace: "foo", Name: "bar"}), 0; got != want {
t.Errorf("requeues = %v, wanted %v", got, want)
}

checkStats(t, reporter, 1, 0, 1, trueString)
}

type fakeError struct{}
Expand Down Expand Up @@ -1190,7 +1180,6 @@ func TestStartAndShutdownWithErroringWork(t *testing.T) {
impl := NewContext(context.TODO(), &errorReconciler{}, ControllerOptions{
Logger: TestLogger(t),
WorkQueueName: "Testing",
Reporter: &FakeStatsReporter{},
})
impl.EnqueueKey(item)

Expand Down Expand Up @@ -1245,11 +1234,9 @@ func (er *permanentErrorReconciler) Reconcile(context.Context, string) error {

func TestStartAndShutdownWithPermanentErroringWork(t *testing.T) {
r := &permanentErrorReconciler{}
reporter := &FakeStatsReporter{}
impl := NewContext(context.TODO(), r, ControllerOptions{
Logger: TestLogger(t),
WorkQueueName: "Testing",
Reporter: reporter,
})

ctx, cancel := context.WithCancel(context.Background())
Expand Down Expand Up @@ -1284,8 +1271,6 @@ func TestStartAndShutdownWithPermanentErroringWork(t *testing.T) {
if got, want := impl.WorkQueue().NumRequeues(types.NamespacedName{Namespace: "foo", Name: "bar"}), 0; got != want {
t.Errorf("Requeue count = %v, wanted %v", got, want)
}

checkStats(t, reporter, 1, 0, 1, falseString)
}

type requeueAfterReconciler struct {
Expand Down Expand Up @@ -1313,11 +1298,9 @@ func TestStartAndShutdownWithRequeuingWork(t *testing.T) {
for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
r := &requeueAfterReconciler{duration: test.duration}
reporter := &FakeStatsReporter{}
impl := NewContext(context.TODO(), r, ControllerOptions{
Logger: TestLogger(t),
WorkQueueName: "Testing",
Reporter: reporter,
})

ctx, cancel := context.WithCancel(context.Background())
Expand Down Expand Up @@ -1416,7 +1399,6 @@ func TestImplGlobalResync(t *testing.T) {
impl := NewContext(context.TODO(), r, ControllerOptions{
Logger: TestLogger(t),
WorkQueueName: "Testing",
Reporter: &FakeStatsReporter{},
})

ctx, cancel := context.WithCancel(context.Background())
Expand Down Expand Up @@ -1455,23 +1437,6 @@ func TestImplGlobalResync(t *testing.T) {
}
}

func checkStats(t *testing.T, r *FakeStatsReporter, reportCount, lastQueueDepth, reconcileCount int, lastReconcileSuccess string) {
qd := r.GetQueueDepths()
if got, want := len(qd), reportCount; got != want {
t.Errorf("Queue depth reports = %v, wanted %v", got, want)
}
if got, want := qd[len(qd)-1], int64(lastQueueDepth); got != want {
t.Errorf("Queue depth report = %v, wanted %v", got, want)
}
rd := r.GetReconcileData()
if got, want := len(rd), reconcileCount; got != want {
t.Errorf("Reconcile reports = %v, wanted %v", got, want)
}
if got, want := rd[len(rd)-1].Success, lastReconcileSuccess; got != want {
t.Errorf("Reconcile success = %v, wanted %v", got, want)
}
}

type fixedInformer struct {
m sync.Mutex
sunk bool
Expand Down
Loading
Loading