From 15d7b9a028a3904f8e182c413813e35f518ee8a9 Mon Sep 17 00:00:00 2001 From: Rahul Kekre Date: Fri, 20 Feb 2026 15:43:36 -0800 Subject: [PATCH 01/13] First attempt at adding events to worker controller --- .gitignore | 2 +- api/v1alpha1/worker_types.go | 20 +- api/v1alpha1/zz_generated.deepcopy.go | 45 +- cmd/main.go | 1 + ...temporal.io_temporalworkerdeployments.yaml | 42 +- internal/controller/clientpool/clientpool.go | 3 +- internal/controller/execplan.go | 20 +- internal/controller/reconciler_events_test.go | 427 ++++++++++++++++++ internal/controller/worker_controller.go | 62 ++- 9 files changed, 612 insertions(+), 10 deletions(-) create mode 100644 internal/controller/reconciler_events_test.go diff --git a/.gitignore b/.gitignore index d19960ea..f71ef8c0 100644 --- a/.gitignore +++ b/.gitignore @@ -10,4 +10,4 @@ certs .DS_Store .claude -.config \ No newline at end of file +config \ No newline at end of file diff --git a/api/v1alpha1/worker_types.go b/api/v1alpha1/worker_types.go index a09bf3d2..a6209afa 100644 --- a/api/v1alpha1/worker_types.go +++ b/api/v1alpha1/worker_types.go @@ -86,6 +86,21 @@ type TemporalWorkerDeploymentSpec struct { WorkerOptions WorkerOptions `json:"workerOptions"` } +// Condition type constants for TemporalWorkerDeployment. +const ( + // ConditionTemporalConnectionValid indicates whether the referenced TemporalConnection + // resource exists and is properly configured. + ConditionTemporalConnectionValid = "TemporalConnectionValid" + + // ConditionTemporalNamespaceAccessible indicates whether the Temporal namespace + // specified in workerOptions is reachable and the controller has access. + ConditionTemporalNamespaceAccessible = "TemporalNamespaceAccessible" + + // ConditionReady indicates whether the TemporalWorkerDeployment is fully reconciled + // and operational. + ConditionReady = "Ready" +) + // VersionStatus indicates the status of a version. // +enum type VersionStatus string @@ -155,8 +170,9 @@ type TemporalWorkerDeploymentStatus struct { // +kubebuilder:validation:Minimum=0 VersionCount int32 `json:"versionCount,omitempty"` - // TODO(jlegrone): Add additional status fields following Kubernetes API conventions - // https://github.com/kubernetes/community/blob/master/contributors/devel/sig-architecture/api-conventions.md#spec-and-status + // Conditions represent the latest available observations of the TemporalWorkerDeployment's current state. + // +optional + Conditions []metav1.Condition `json:"conditions,omitempty"` } // WorkflowExecutionStatus describes the current state of a workflow. diff --git a/api/v1alpha1/zz_generated.deepcopy.go b/api/v1alpha1/zz_generated.deepcopy.go index 1decf431..0b21324e 100644 --- a/api/v1alpha1/zz_generated.deepcopy.go +++ b/api/v1alpha1/zz_generated.deepcopy.go @@ -6,6 +6,7 @@ package v1alpha1 import ( "k8s.io/api/core/v1" + apiextensionsv1 "k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime" ) @@ -90,9 +91,44 @@ func (in *DeprecatedWorkerDeploymentVersion) DeepCopy() *DeprecatedWorkerDeploym return out } +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *GateInputSource) DeepCopyInto(out *GateInputSource) { + *out = *in + if in.ConfigMapKeyRef != nil { + in, out := &in.ConfigMapKeyRef, &out.ConfigMapKeyRef + *out = new(v1.ConfigMapKeySelector) + (*in).DeepCopyInto(*out) + } + if in.SecretKeyRef != nil { + in, out := &in.SecretKeyRef, &out.SecretKeyRef + *out = new(v1.SecretKeySelector) + (*in).DeepCopyInto(*out) + } +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new GateInputSource. +func (in *GateInputSource) DeepCopy() *GateInputSource { + if in == nil { + return nil + } + out := new(GateInputSource) + in.DeepCopyInto(out) + return out +} + // DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. func (in *GateWorkflowConfig) DeepCopyInto(out *GateWorkflowConfig) { *out = *in + if in.Input != nil { + in, out := &in.Input, &out.Input + *out = new(apiextensionsv1.JSON) + (*in).DeepCopyInto(*out) + } + if in.InputFrom != nil { + in, out := &in.InputFrom, &out.InputFrom + *out = new(GateInputSource) + (*in).DeepCopyInto(*out) + } } // DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new GateWorkflowConfig. @@ -142,7 +178,7 @@ func (in *RolloutStrategy) DeepCopyInto(out *RolloutStrategy) { if in.Gate != nil { in, out := &in.Gate, &out.Gate *out = new(GateWorkflowConfig) - **out = **in + (*in).DeepCopyInto(*out) } if in.Steps != nil { in, out := &in.Steps, &out.Steps @@ -477,6 +513,13 @@ func (in *TemporalWorkerDeploymentStatus) DeepCopyInto(out *TemporalWorkerDeploy *out = make([]byte, len(*in)) copy(*out, *in) } + if in.Conditions != nil { + in, out := &in.Conditions, &out.Conditions + *out = make([]metav1.Condition, len(*in)) + for i := range *in { + (*in)[i].DeepCopyInto(&(*out)[i]) + } + } } // DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new TemporalWorkerDeploymentStatus. diff --git a/cmd/main.go b/cmd/main.go index 646b9f51..c171d1b1 100644 --- a/cmd/main.go +++ b/cmd/main.go @@ -91,6 +91,7 @@ func main() { }))), mgr.GetClient(), ), + Recorder: mgr.GetEventRecorderFor("temporal-worker-controller"), MaxDeploymentVersionsIneligibleForDeletion: controller.GetControllerMaxDeploymentVersionsIneligibleForDeletion(), }).SetupWithManager(mgr); err != nil { setupLog.Error(err, "unable to create controller", "controller", "TemporalWorkerDeployment") diff --git a/helm/temporal-worker-controller/crds/temporal.io_temporalworkerdeployments.yaml b/helm/temporal-worker-controller/crds/temporal.io_temporalworkerdeployments.yaml index 9c9ad07c..614e5187 100644 --- a/helm/temporal-worker-controller/crds/temporal.io_temporalworkerdeployments.yaml +++ b/helm/temporal-worker-controller/crds/temporal.io_temporalworkerdeployments.yaml @@ -3950,13 +3950,13 @@ spec: required: - name type: object + temporalNamespace: + minLength: 1 + type: string unsafeCustomBuildID: maxLength: 63 pattern: ^[a-zA-Z0-9]([a-zA-Z0-9._-]*[a-zA-Z0-9])?$ type: string - temporalNamespace: - minLength: 1 - type: string required: - connectionRef - temporalNamespace @@ -3969,6 +3969,42 @@ spec: type: object status: properties: + conditions: + items: + properties: + lastTransitionTime: + format: date-time + type: string + message: + maxLength: 32768 + type: string + observedGeneration: + format: int64 + minimum: 0 + type: integer + reason: + maxLength: 1024 + minLength: 1 + pattern: ^[A-Za-z]([A-Za-z0-9_,:]*[A-Za-z0-9_])?$ + type: string + status: + enum: + - "True" + - "False" + - Unknown + type: string + type: + maxLength: 316 + pattern: ^([a-z0-9]([-a-z0-9]*[a-z0-9])?(\.[a-z0-9]([-a-z0-9]*[a-z0-9])?)*/)?(([A-Za-z0-9][-A-Za-z0-9_.]*)?[A-Za-z0-9])$ + type: string + required: + - lastTransitionTime + - message + - reason + - status + - type + type: object + type: array currentVersion: properties: buildID: diff --git a/internal/controller/clientpool/clientpool.go b/internal/controller/clientpool/clientpool.go index 816cf47f..4162a2ce 100644 --- a/internal/controller/clientpool/clientpool.go +++ b/internal/controller/clientpool/clientpool.go @@ -141,7 +141,8 @@ func (cp *ClientPool) fetchClientUsingMTLSSecret(secret corev1.Secret, opts NewC } if _, err := c.CheckHealth(context.Background(), &sdkclient.CheckHealthRequest{}); err != nil { - panic(err) + c.Close() + return nil, fmt.Errorf("temporal server health check failed: %w", err) } cp.mux.Lock() diff --git a/internal/controller/execplan.go b/internal/controller/execplan.go index df3c6769..c9573391 100644 --- a/internal/controller/execplan.go +++ b/internal/controller/execplan.go @@ -11,22 +11,26 @@ import ( "time" "github.com/go-logr/logr" + temporaliov1alpha1 "github.com/temporalio/temporal-worker-controller/api/v1alpha1" "github.com/temporalio/temporal-worker-controller/internal/temporal" enumspb "go.temporal.io/api/enums/v1" sdkclient "go.temporal.io/sdk/client" "go.temporal.io/sdk/worker" appsv1 "k8s.io/api/apps/v1" autoscalingv1 "k8s.io/api/autoscaling/v1" + corev1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "sigs.k8s.io/controller-runtime/pkg/client" ) -func (r *TemporalWorkerDeploymentReconciler) executePlan(ctx context.Context, l logr.Logger, temporalClient sdkclient.Client, p *plan) error { +func (r *TemporalWorkerDeploymentReconciler) executePlan(ctx context.Context, l logr.Logger, workerDeploy *temporaliov1alpha1.TemporalWorkerDeployment, temporalClient sdkclient.Client, p *plan) error { // Create deployment if p.CreateDeployment != nil { l.Info("creating deployment", "deployment", p.CreateDeployment) if err := r.Create(ctx, p.CreateDeployment); err != nil { l.Error(err, "unable to create deployment", "deployment", p.CreateDeployment) + r.Recorder.Eventf(workerDeploy, corev1.EventTypeWarning, "DeploymentCreateFailed", + "Failed to create Deployment %q: %v", p.CreateDeployment.Name, err) return err } } @@ -36,6 +40,8 @@ func (r *TemporalWorkerDeploymentReconciler) executePlan(ctx context.Context, l l.Info("deleting deployment", "deployment", d) if err := r.Delete(ctx, d); err != nil { l.Error(err, "unable to delete deployment", "deployment", d) + r.Recorder.Eventf(workerDeploy, corev1.EventTypeWarning, "DeploymentDeleteFailed", + "Failed to delete Deployment %q: %v", d.Name, err) return err } } @@ -52,6 +58,8 @@ func (r *TemporalWorkerDeploymentReconciler) executePlan(ctx context.Context, l scale := &autoscalingv1.Scale{Spec: autoscalingv1.ScaleSpec{Replicas: int32(replicas)}} if err := r.Client.SubResource("scale").Update(ctx, dep, client.WithSubResourceBody(scale)); err != nil { l.Error(err, "unable to scale deployment", "deployment", d, "replicas", replicas) + r.Recorder.Eventf(workerDeploy, corev1.EventTypeWarning, "DeploymentScaleFailed", + "Failed to scale Deployment %q to %d replicas: %v", d.Name, replicas, err) return fmt.Errorf("unable to scale deployment: %w", err) } } @@ -61,6 +69,8 @@ func (r *TemporalWorkerDeploymentReconciler) executePlan(ctx context.Context, l l.Info("updating deployment", "deployment", d.Name, "namespace", d.Namespace) if err := r.Update(ctx, d); err != nil { l.Error(err, "unable to update deployment", "deployment", d) + r.Recorder.Eventf(workerDeploy, corev1.EventTypeWarning, "DeploymentUpdateFailed", + "Failed to update Deployment %q: %v", d.Name, err) return fmt.Errorf("unable to update deployment: %w", err) } } @@ -129,6 +139,8 @@ func (r *TemporalWorkerDeploymentReconciler) executePlan(ctx context.Context, l _, err = temporalClient.ExecuteWorkflow(ctx, opts, wf.workflowType) } if err != nil { + r.Recorder.Eventf(workerDeploy, corev1.EventTypeWarning, "TestWorkflowStartFailed", + "Failed to start gate workflow %q (buildID %s): %v", wf.workflowType, wf.buildID, err) return fmt.Errorf("unable to start test workflow execution: %w", err) } } @@ -142,6 +154,8 @@ func (r *TemporalWorkerDeploymentReconciler) executePlan(ctx context.Context, l ConflictToken: vcfg.ConflictToken, Identity: getControllerIdentity(), }); err != nil { + r.Recorder.Eventf(workerDeploy, corev1.EventTypeWarning, "VersionRegistrationFailed", + "Failed to set buildID %q as current version: %v", vcfg.BuildID, err) return fmt.Errorf("unable to set current deployment version: %w", err) } } else { @@ -157,6 +171,8 @@ func (r *TemporalWorkerDeploymentReconciler) executePlan(ctx context.Context, l ConflictToken: vcfg.ConflictToken, Identity: getControllerIdentity(), }); err != nil { + r.Recorder.Eventf(workerDeploy, corev1.EventTypeWarning, "VersionRegistrationFailed", + "Failed to set buildID %q as ramping version (%.1f%%): %v", vcfg.BuildID, vcfg.RampPercentage, err) return fmt.Errorf("unable to set ramping deployment version: %w", err) } } @@ -172,6 +188,8 @@ func (r *TemporalWorkerDeploymentReconciler) executePlan(ctx context.Context, l }, }, }); err != nil { // would be cool to do this atomically with the update + r.Recorder.Eventf(workerDeploy, corev1.EventTypeWarning, "MetadataUpdateFailed", + "Failed to update version metadata for buildID %q: %v", vcfg.BuildID, err) return fmt.Errorf("unable to update metadata after setting current deployment: %w", err) } } diff --git a/internal/controller/reconciler_events_test.go b/internal/controller/reconciler_events_test.go new file mode 100644 index 00000000..db02a3c8 --- /dev/null +++ b/internal/controller/reconciler_events_test.go @@ -0,0 +1,427 @@ +// Unless explicitly stated otherwise all files in this repository are licensed under the MIT License. +// +// This product includes software developed at Datadog (https://www.datadoghq.com/). Copyright 2024 Datadog, Inc. + +package controller + +import ( + "context" + "strings" + "testing" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + temporaliov1alpha1 "github.com/temporalio/temporal-worker-controller/api/v1alpha1" + "github.com/temporalio/temporal-worker-controller/internal/controller/clientpool" + appsv1 "k8s.io/api/apps/v1" + corev1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/api/meta" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/types" + "k8s.io/client-go/tools/record" + ctrl "sigs.k8s.io/controller-runtime" + "sigs.k8s.io/controller-runtime/pkg/client" + "sigs.k8s.io/controller-runtime/pkg/client/fake" + "sigs.k8s.io/controller-runtime/pkg/client/interceptor" +) + +// newTestScheme creates a scheme with all required types registered. +func newTestScheme() *runtime.Scheme { + s := runtime.NewScheme() + _ = temporaliov1alpha1.AddToScheme(s) + _ = appsv1.AddToScheme(s) + _ = corev1.AddToScheme(s) + return s +} + +// newTestReconciler creates a TemporalWorkerDeploymentReconciler with a fake client and recorder. +func newTestReconciler(objs []client.Object) (*TemporalWorkerDeploymentReconciler, *record.FakeRecorder) { + return newTestReconcilerWithInterceptors(objs, interceptor.Funcs{}) +} + +// newTestReconcilerWithInterceptors creates a reconciler with a fake client that uses custom interceptors. +func newTestReconcilerWithInterceptors(objs []client.Object, funcs interceptor.Funcs) (*TemporalWorkerDeploymentReconciler, *record.FakeRecorder) { + scheme := newTestScheme() + fakeClient := fake.NewClientBuilder(). + WithScheme(scheme). + WithObjects(objs...). + WithStatusSubresource(&temporaliov1alpha1.TemporalWorkerDeployment{}). + WithIndex(&appsv1.Deployment{}, deployOwnerKey, func(rawObj client.Object) []string { + deploy := rawObj.(*appsv1.Deployment) + owner := metav1.GetControllerOf(deploy) + if owner == nil { + return nil + } + if owner.APIVersion != temporaliov1alpha1.GroupVersion.String() || owner.Kind != "TemporalWorkerDeployment" { + return nil + } + return []string{owner.Name} + }). + WithInterceptorFuncs(funcs). + Build() + + recorder := record.NewFakeRecorder(10) + + r := &TemporalWorkerDeploymentReconciler{ + Client: fakeClient, + Scheme: scheme, + TemporalClientPool: clientpool.New(nil, fakeClient), + Recorder: recorder, + DisableRecoverPanic: true, + MaxDeploymentVersionsIneligibleForDeletion: 75, + } + + return r, recorder +} + +// makeTWD creates a minimal TemporalWorkerDeployment for testing. +func makeTWD(name, namespace, connectionName string) *temporaliov1alpha1.TemporalWorkerDeployment { + replicas := int32(1) + progressDeadline := int32(600) + return &temporaliov1alpha1.TemporalWorkerDeployment{ + TypeMeta: metav1.TypeMeta{ + APIVersion: temporaliov1alpha1.GroupVersion.String(), + Kind: "TemporalWorkerDeployment", + }, + ObjectMeta: metav1.ObjectMeta{ + Name: name, + Namespace: namespace, + Generation: 1, + }, + Spec: temporaliov1alpha1.TemporalWorkerDeploymentSpec{ + Replicas: &replicas, + ProgressDeadlineSeconds: &progressDeadline, + Template: corev1.PodTemplateSpec{ + Spec: corev1.PodSpec{ + Containers: []corev1.Container{ + { + Name: "worker", + Image: "temporal/worker:v1", + }, + }, + }, + }, + WorkerOptions: temporaliov1alpha1.WorkerOptions{ + TemporalConnectionRef: temporaliov1alpha1.TemporalConnectionReference{ + Name: connectionName, + }, + TemporalNamespace: "default", + }, + RolloutStrategy: temporaliov1alpha1.RolloutStrategy{ + Strategy: temporaliov1alpha1.UpdateAllAtOnce, + }, + SunsetStrategy: temporaliov1alpha1.SunsetStrategy{ + ScaledownDelay: &metav1.Duration{}, + DeleteDelay: &metav1.Duration{}, + }, + }, + } +} + +// makeTemporalConnection creates a minimal TemporalConnection for testing. +func makeTemporalConnection(name, namespace, hostPort string) *temporaliov1alpha1.TemporalConnection { + return &temporaliov1alpha1.TemporalConnection{ + TypeMeta: metav1.TypeMeta{ + APIVersion: temporaliov1alpha1.GroupVersion.String(), + Kind: "TemporalConnection", + }, + ObjectMeta: metav1.ObjectMeta{ + Name: name, + Namespace: namespace, + }, + Spec: temporaliov1alpha1.TemporalConnectionSpec{ + HostPort: hostPort, + }, + } +} + +// drainEvents reads all events from the recorder channel and returns them. +func drainEvents(recorder *record.FakeRecorder) []string { + var events []string + for { + select { + case event := <-recorder.Events: + events = append(events, event) + default: + return events + } + } +} + +// assertEventEmitted checks that at least one event with the given reason was emitted. +func assertEventEmitted(t *testing.T, events []string, reason string) { + t.Helper() + for _, event := range events { + if strings.Contains(event, reason) { + return + } + } + t.Errorf("expected event with reason %q, got events: %v", reason, events) +} + +// assertNoEventEmitted checks that no event with the given reason was emitted. +func assertNoEventEmitted(t *testing.T, events []string, reason string) { + t.Helper() + for _, event := range events { + if strings.Contains(event, reason) { + t.Errorf("unexpected event with reason %q found: %s", reason, event) + return + } + } +} + +func TestReconcile_TemporalConnectionNotFound_EmitsEvent(t *testing.T) { + twd := makeTWD("test-worker", "default", "nonexistent-connection") + r, recorder := newTestReconciler([]client.Object{twd}) + + _, err := r.Reconcile(context.Background(), ctrl.Request{ + NamespacedName: types.NamespacedName{Name: "test-worker", Namespace: "default"}, + }) + + require.Error(t, err) + + events := drainEvents(recorder) + assertEventEmitted(t, events, "TemporalConnectionNotFound") + + // Check that the event contains the connection name + for _, event := range events { + if strings.Contains(event, "TemporalConnectionNotFound") { + assert.Contains(t, event, "nonexistent-connection") + assert.Contains(t, event, "Warning") + } + } +} + +func TestReconcile_TemporalConnectionNotFound_SetsCondition(t *testing.T) { + twd := makeTWD("test-worker", "default", "nonexistent-connection") + r, _ := newTestReconciler([]client.Object{twd}) + + _, err := r.Reconcile(context.Background(), ctrl.Request{ + NamespacedName: types.NamespacedName{Name: "test-worker", Namespace: "default"}, + }) + + require.Error(t, err) + + // Fetch the updated TWD to check conditions + var updated temporaliov1alpha1.TemporalWorkerDeployment + require.NoError(t, r.Get(context.Background(), types.NamespacedName{Name: "test-worker", Namespace: "default"}, &updated)) + + cond := meta.FindStatusCondition(updated.Status.Conditions, temporaliov1alpha1.ConditionTemporalConnectionValid) + require.NotNil(t, cond, "TemporalConnectionValid condition should be set") + assert.Equal(t, metav1.ConditionFalse, cond.Status) + assert.Equal(t, "TemporalConnectionNotFound", cond.Reason) + assert.Contains(t, cond.Message, "nonexistent-connection") +} + +func TestReconcile_AuthSecretInvalid_EmitsEvent(t *testing.T) { + // Create a TemporalConnection with mTLS that references a secret, + // but the MutualTLSSecretRef has an empty name (will cause resolveAuthSecretName to fail) + tc := makeTemporalConnection("my-connection", "default", "localhost:7233") + tc.Spec.MutualTLSSecretRef = &temporaliov1alpha1.SecretReference{Name: ""} // empty name triggers error in getTLSSecretName + + twd := makeTWD("test-worker", "default", "my-connection") + r, recorder := newTestReconciler([]client.Object{twd, tc}) + + _, err := r.Reconcile(context.Background(), ctrl.Request{ + NamespacedName: types.NamespacedName{Name: "test-worker", Namespace: "default"}, + }) + + // The mTLS secret ref has a name ("") which is technically non-empty in Go, + // so resolveAuthSecretName will return AuthModeTLS with secretName "". + // The UpsertClient will try to fetch the secret and fail. + // Either way, an error should occur and an event should be emitted. + require.Error(t, err) + + events := drainEvents(recorder) + // Should have either AuthSecretInvalid or TemporalClientCreationFailed + hasEvent := false + for _, event := range events { + if strings.Contains(event, "AuthSecretInvalid") || strings.Contains(event, "TemporalClientCreationFailed") { + hasEvent = true + break + } + } + assert.True(t, hasEvent, "expected AuthSecretInvalid or TemporalClientCreationFailed event, got: %v", events) +} + +func TestReconcile_TemporalClientCreationFailed_EmitsEventAndCondition(t *testing.T) { + // TemporalConnection exists but references a TLS secret that doesn't exist in k8s + tc := makeTemporalConnection("my-connection", "default", "localhost:7233") + tc.Spec.MutualTLSSecretRef = &temporaliov1alpha1.SecretReference{Name: "missing-tls-secret"} + + twd := makeTWD("test-worker", "default", "my-connection") + r, recorder := newTestReconciler([]client.Object{twd, tc}) + + _, err := r.Reconcile(context.Background(), ctrl.Request{ + NamespacedName: types.NamespacedName{Name: "test-worker", Namespace: "default"}, + }) + + require.Error(t, err) + + events := drainEvents(recorder) + assertEventEmitted(t, events, "TemporalClientCreationFailed") + + // Check condition + var updated temporaliov1alpha1.TemporalWorkerDeployment + require.NoError(t, r.Get(context.Background(), types.NamespacedName{Name: "test-worker", Namespace: "default"}, &updated)) + + // TemporalConnectionValid should be True (connection was fetched successfully) + connCond := meta.FindStatusCondition(updated.Status.Conditions, temporaliov1alpha1.ConditionTemporalConnectionValid) + require.NotNil(t, connCond) + assert.Equal(t, metav1.ConditionTrue, connCond.Status) + + // TemporalNamespaceAccessible should be False (client creation failed) + nsCond := meta.FindStatusCondition(updated.Status.Conditions, temporaliov1alpha1.ConditionTemporalNamespaceAccessible) + require.NotNil(t, nsCond) + assert.Equal(t, metav1.ConditionFalse, nsCond.Status) + assert.Equal(t, "TemporalClientCreationFailed", nsCond.Reason) +} + +func TestReconcile_TWDNotFound_NoEvent(t *testing.T) { + // No TWD exists — reconciling should return nil error (not found is ignored) + r, recorder := newTestReconciler(nil) + + _, err := r.Reconcile(context.Background(), ctrl.Request{ + NamespacedName: types.NamespacedName{Name: "does-not-exist", Namespace: "default"}, + }) + + require.NoError(t, err) + + events := drainEvents(recorder) + assert.Empty(t, events, "no events should be emitted when TWD is not found") +} + +func TestSetCondition_SetsNewCondition(t *testing.T) { + twd := makeTWD("test-worker", "default", "my-connection") + r, _ := newTestReconciler(nil) + + r.setCondition(twd, temporaliov1alpha1.ConditionReady, metav1.ConditionTrue, "TestReason", "Test message") + + require.Len(t, twd.Status.Conditions, 1) + assert.Equal(t, temporaliov1alpha1.ConditionReady, twd.Status.Conditions[0].Type) + assert.Equal(t, metav1.ConditionTrue, twd.Status.Conditions[0].Status) + assert.Equal(t, "TestReason", twd.Status.Conditions[0].Reason) + assert.Equal(t, "Test message", twd.Status.Conditions[0].Message) + assert.Equal(t, int64(1), twd.Status.Conditions[0].ObservedGeneration) +} + +func TestSetCondition_UpdatesExistingCondition(t *testing.T) { + twd := makeTWD("test-worker", "default", "my-connection") + r, _ := newTestReconciler(nil) + + // Set initial condition + r.setCondition(twd, temporaliov1alpha1.ConditionReady, metav1.ConditionTrue, "InitialReason", "Initial message") + require.Len(t, twd.Status.Conditions, 1) + + // Update the condition + r.setCondition(twd, temporaliov1alpha1.ConditionReady, metav1.ConditionFalse, "UpdatedReason", "Updated message") + + // Should still be exactly 1 condition, not 2 + require.Len(t, twd.Status.Conditions, 1) + assert.Equal(t, metav1.ConditionFalse, twd.Status.Conditions[0].Status) + assert.Equal(t, "UpdatedReason", twd.Status.Conditions[0].Reason) + assert.Equal(t, "Updated message", twd.Status.Conditions[0].Message) +} + +func TestSetCondition_MultipleDifferentConditions(t *testing.T) { + twd := makeTWD("test-worker", "default", "my-connection") + r, _ := newTestReconciler(nil) + + r.setCondition(twd, temporaliov1alpha1.ConditionTemporalConnectionValid, metav1.ConditionTrue, "Valid", "Connection is valid") + r.setCondition(twd, temporaliov1alpha1.ConditionTemporalNamespaceAccessible, metav1.ConditionTrue, "Accessible", "Namespace is accessible") + r.setCondition(twd, temporaliov1alpha1.ConditionReady, metav1.ConditionTrue, "Ready", "All good") + + require.Len(t, twd.Status.Conditions, 3) + + connCond := meta.FindStatusCondition(twd.Status.Conditions, temporaliov1alpha1.ConditionTemporalConnectionValid) + require.NotNil(t, connCond) + assert.Equal(t, metav1.ConditionTrue, connCond.Status) + + nsCond := meta.FindStatusCondition(twd.Status.Conditions, temporaliov1alpha1.ConditionTemporalNamespaceAccessible) + require.NotNil(t, nsCond) + assert.Equal(t, metav1.ConditionTrue, nsCond.Status) + + readyCond := meta.FindStatusCondition(twd.Status.Conditions, temporaliov1alpha1.ConditionReady) + require.NotNil(t, readyCond) + assert.Equal(t, metav1.ConditionTrue, readyCond.Status) +} + +func TestReconcile_ValidationFailure_NoEventEmitted(t *testing.T) { + // Use Progressive strategy with no steps to trigger a validation failure + twd := makeTWD("test-worker", "default", "my-connection") + twd.Spec.RolloutStrategy = temporaliov1alpha1.RolloutStrategy{ + Strategy: temporaliov1alpha1.UpdateProgressive, + Steps: nil, // Progressive requires steps + } + + // Also need a connection for this test — but validation happens before connection fetch + tc := makeTemporalConnection("my-connection", "default", "localhost:7233") + r, recorder := newTestReconciler([]client.Object{twd, tc}) + + result, err := r.Reconcile(context.Background(), ctrl.Request{ + NamespacedName: types.NamespacedName{Name: "test-worker", Namespace: "default"}, + }) + + // Validation failures don't return errors — they requeue after 5 minutes + require.NoError(t, err) + assert.True(t, result.Requeue, "should requeue on validation failure") + + // No events should be emitted for validation failures (user just needs to fix their spec) + events := drainEvents(recorder) + assertNoEventEmitted(t, events, "TemporalConnectionNotFound") + assertNoEventEmitted(t, events, "TemporalClientCreationFailed") +} + +func TestReconcile_ConnectionValid_ThenClientFails_ConditionsReflectBoth(t *testing.T) { + // Connection exists and is fetchable, but uses API key auth with a missing secret + tc := makeTemporalConnection("my-connection", "default", "localhost:7233") + tc.Spec.APIKeySecretRef = &corev1.SecretKeySelector{ + LocalObjectReference: corev1.LocalObjectReference{Name: "missing-api-key-secret"}, + Key: "api-key", + } + + twd := makeTWD("test-worker", "default", "my-connection") + r, recorder := newTestReconciler([]client.Object{twd, tc}) + + _, err := r.Reconcile(context.Background(), ctrl.Request{ + NamespacedName: types.NamespacedName{Name: "test-worker", Namespace: "default"}, + }) + + require.Error(t, err) + + events := drainEvents(recorder) + assertEventEmitted(t, events, "TemporalClientCreationFailed") + + // Verify conditions: connection valid but namespace not accessible + var updated temporaliov1alpha1.TemporalWorkerDeployment + require.NoError(t, r.Get(context.Background(), types.NamespacedName{Name: "test-worker", Namespace: "default"}, &updated)) + + connCond := meta.FindStatusCondition(updated.Status.Conditions, temporaliov1alpha1.ConditionTemporalConnectionValid) + require.NotNil(t, connCond, "TemporalConnectionValid condition should be set") + assert.Equal(t, metav1.ConditionTrue, connCond.Status, "connection was fetched successfully") + + nsCond := meta.FindStatusCondition(updated.Status.Conditions, temporaliov1alpha1.ConditionTemporalNamespaceAccessible) + require.NotNil(t, nsCond, "TemporalNamespaceAccessible condition should be set") + assert.Equal(t, metav1.ConditionFalse, nsCond.Status, "client creation should have failed") +} + +func TestReconcile_EventMessageContainsUsefulContext(t *testing.T) { + twd := makeTWD("my-deployment", "prod", "prod-connection") + r, recorder := newTestReconciler([]client.Object{twd}) + + _, _ = r.Reconcile(context.Background(), ctrl.Request{ + NamespacedName: types.NamespacedName{Name: "my-deployment", Namespace: "prod"}, + }) + + events := drainEvents(recorder) + require.NotEmpty(t, events) + + // Verify the event message contains the connection name for debugging + for _, event := range events { + if strings.Contains(event, "TemporalConnectionNotFound") { + assert.Contains(t, event, "prod-connection", + "Event message should include the missing connection name for debugging") + } + } +} diff --git a/internal/controller/worker_controller.go b/internal/controller/worker_controller.go index d6471890..9641b894 100644 --- a/internal/controller/worker_controller.go +++ b/internal/controller/worker_controller.go @@ -17,9 +17,11 @@ import ( appsv1 "k8s.io/api/apps/v1" corev1 "k8s.io/api/core/v1" apierrors "k8s.io/apimachinery/pkg/api/errors" + "k8s.io/apimachinery/pkg/api/meta" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/types" + "k8s.io/client-go/tools/record" ctrl "sigs.k8s.io/controller-runtime" "sigs.k8s.io/controller-runtime/pkg/client" "sigs.k8s.io/controller-runtime/pkg/controller" @@ -71,6 +73,7 @@ type TemporalWorkerDeploymentReconciler struct { client.Client Scheme *runtime.Scheme TemporalClientPool *clientpool.ClientPool + Recorder record.EventRecorder // Disables panic recovery if true DisableRecoverPanic bool @@ -96,6 +99,7 @@ type TemporalWorkerDeploymentReconciler struct { //+kubebuilder:rbac:groups=core,resources=secrets,verbs=get;list;watch //+kubebuilder:rbac:groups=apps,resources=deployments,verbs=get;list;watch;create;update;patch;delete //+kubebuilder:rbac:groups=apps,resources=deployments/scale,verbs=update +//+kubebuilder:rbac:groups="",resources=events,verbs=create;patch // Reconcile is part of the main kubernetes reconciliation loop which aims to // move the current state of the cluster closer to the desired state. @@ -146,6 +150,11 @@ func (r *TemporalWorkerDeploymentReconciler) Reconcile(ctx context.Context, req Namespace: workerDeploy.Namespace, }, &temporalConnection); err != nil { l.Error(err, "unable to fetch TemporalConnection") + r.Recorder.Eventf(&workerDeploy, corev1.EventTypeWarning, "TemporalConnectionNotFound", + "Unable to fetch TemporalConnection %q: %v", workerDeploy.Spec.WorkerOptions.TemporalConnectionRef.Name, err) + r.setCondition(&workerDeploy, temporaliov1alpha1.ConditionTemporalConnectionValid, metav1.ConditionFalse, + "TemporalConnectionNotFound", fmt.Sprintf("TemporalConnection %q not found: %v", workerDeploy.Spec.WorkerOptions.TemporalConnectionRef.Name, err)) + _ = r.Status().Update(ctx, &workerDeploy) return ctrl.Result{}, err } @@ -153,9 +162,18 @@ func (r *TemporalWorkerDeploymentReconciler) Reconcile(ctx context.Context, req authMode, secretName, err := resolveAuthSecretName(&temporalConnection) if err != nil { l.Error(err, "unable to resolve auth secret name") + r.Recorder.Eventf(&workerDeploy, corev1.EventTypeWarning, "AuthSecretInvalid", + "Unable to resolve auth secret from TemporalConnection %q: %v", temporalConnection.Name, err) + r.setCondition(&workerDeploy, temporaliov1alpha1.ConditionTemporalConnectionValid, metav1.ConditionFalse, + "AuthSecretInvalid", fmt.Sprintf("Unable to resolve auth secret: %v", err)) + _ = r.Status().Update(ctx, &workerDeploy) return ctrl.Result{}, err } + // Mark TemporalConnection as valid since we fetched it and resolved auth + r.setCondition(&workerDeploy, temporaliov1alpha1.ConditionTemporalConnectionValid, metav1.ConditionTrue, + "TemporalConnectionValid", "TemporalConnection is valid and auth secret is resolved") + // Get or update temporal client for connection temporalClient, ok := r.TemporalClientPool.GetSDKClient(clientpool.ClientPoolKey{ HostPort: temporalConnection.Spec.HostPort, @@ -171,6 +189,11 @@ func (r *TemporalWorkerDeploymentReconciler) Reconcile(ctx context.Context, req }) if err != nil { l.Error(err, "unable to create TemporalClient") + r.Recorder.Eventf(&workerDeploy, corev1.EventTypeWarning, "TemporalClientCreationFailed", + "Unable to create Temporal client for %s:%s: %v", temporalConnection.Spec.HostPort, workerDeploy.Spec.WorkerOptions.TemporalNamespace, err) + r.setCondition(&workerDeploy, temporaliov1alpha1.ConditionTemporalNamespaceAccessible, metav1.ConditionFalse, + "TemporalClientCreationFailed", fmt.Sprintf("Failed to connect to Temporal: %v", err)) + _ = r.Status().Update(ctx, &workerDeploy) return ctrl.Result{}, err } temporalClient = c @@ -203,14 +226,25 @@ func (r *TemporalWorkerDeploymentReconciler) Reconcile(ctx context.Context, req getControllerIdentity(), ) if err != nil { + r.Recorder.Eventf(&workerDeploy, corev1.EventTypeWarning, "TemporalStateFetchFailed", + "Unable to get Temporal worker deployment state: %v", err) + r.setCondition(&workerDeploy, temporaliov1alpha1.ConditionTemporalNamespaceAccessible, metav1.ConditionFalse, + "TemporalStateFetchFailed", fmt.Sprintf("Failed to query Temporal worker deployment state: %v", err)) + _ = r.Status().Update(ctx, &workerDeploy) return ctrl.Result{}, fmt.Errorf("unable to get Temporal worker deployment state: %w", err) } + // Mark Temporal namespace as accessible since we successfully queried state + r.setCondition(&workerDeploy, temporaliov1alpha1.ConditionTemporalNamespaceAccessible, metav1.ConditionTrue, + "TemporalNamespaceAccessible", "Successfully connected to Temporal namespace") + // Compute a new status from k8s and temporal state status, err := r.generateStatus(ctx, l, temporalClient, req, &workerDeploy, temporalState, k8sState) if err != nil { return ctrl.Result{}, err } + // Preserve conditions that were set during this reconciliation + status.Conditions = workerDeploy.Status.Conditions workerDeploy.Status = *status if err := r.Status().Update(ctx, &workerDeploy); err != nil { // Ignore "object has been modified" errors, since we'll just re-fetch @@ -235,14 +269,24 @@ func (r *TemporalWorkerDeploymentReconciler) Reconcile(ctx context.Context, req // Generate a plan to get to desired spec from current status plan, err := r.generatePlan(ctx, l, &workerDeploy, temporalConnection.Spec, temporalState) if err != nil { + r.Recorder.Eventf(&workerDeploy, corev1.EventTypeWarning, "PlanGenerationFailed", + "Unable to generate reconciliation plan: %v", err) return ctrl.Result{}, err } // Execute the plan, handling any errors - if err := r.executePlan(ctx, l, temporalClient, plan); err != nil { + if err := r.executePlan(ctx, l, &workerDeploy, temporalClient, plan); err != nil { + r.Recorder.Eventf(&workerDeploy, corev1.EventTypeWarning, "PlanExecutionFailed", + "Unable to execute reconciliation plan: %v", err) return ctrl.Result{}, err } + // Mark as Ready on successful reconciliation + if workerDeploy.Status.TargetVersion.BuildID == workerDeploy.Status.CurrentVersion.BuildID { + r.setCondition(&workerDeploy, temporaliov1alpha1.ConditionReady, metav1.ConditionTrue, + "RolloutSucceeded", "Target version rollout complete "+workerDeploy.Status.TargetVersion.BuildID) + } + return ctrl.Result{ Requeue: true, // TODO(jlegrone): Consider increasing this value if the only thing we need to check for is unreachable versions. @@ -252,6 +296,22 @@ func (r *TemporalWorkerDeploymentReconciler) Reconcile(ctx context.Context, req }, nil } +// setCondition sets a condition on the TemporalWorkerDeployment status. +func (r *TemporalWorkerDeploymentReconciler) setCondition( + workerDeploy *temporaliov1alpha1.TemporalWorkerDeployment, + conditionType string, + status metav1.ConditionStatus, + reason, message string, +) { + meta.SetStatusCondition(&workerDeploy.Status.Conditions, metav1.Condition{ + Type: conditionType, + Status: status, + ObservedGeneration: workerDeploy.Generation, + Reason: reason, + Message: message, + }) +} + // SetupWithManager sets up the controller with the Manager. func (r *TemporalWorkerDeploymentReconciler) SetupWithManager(mgr ctrl.Manager) error { if err := mgr.GetFieldIndexer().IndexField(context.Background(), &appsv1.Deployment{}, deployOwnerKey, func(rawObj client.Object) []string { From 4495d801368c46dc7413cc9f9ead2989c474a557 Mon Sep 17 00:00:00 2001 From: Rahul Kekre Date: Fri, 20 Feb 2026 16:33:09 -0800 Subject: [PATCH 02/13] Adding events to worker controller --- api/v1alpha1/worker_types.go | 12 ++--- internal/controller/reconciler_events_test.go | 52 +++++++------------ internal/controller/worker_controller.go | 18 +++---- 3 files changed, 30 insertions(+), 52 deletions(-) diff --git a/api/v1alpha1/worker_types.go b/api/v1alpha1/worker_types.go index a6209afa..5b8bf903 100644 --- a/api/v1alpha1/worker_types.go +++ b/api/v1alpha1/worker_types.go @@ -88,17 +88,13 @@ type TemporalWorkerDeploymentSpec struct { // Condition type constants for TemporalWorkerDeployment. const ( - // ConditionTemporalConnectionValid indicates whether the referenced TemporalConnection + // ConditionTemporalConnectionHealthy indicates whether the referenced TemporalConnection // resource exists and is properly configured. - ConditionTemporalConnectionValid = "TemporalConnectionValid" + ConditionTemporalConnectionHealthy = "TemporalConnectionHealthy" - // ConditionTemporalNamespaceAccessible indicates whether the Temporal namespace - // specified in workerOptions is reachable and the controller has access. - ConditionTemporalNamespaceAccessible = "TemporalNamespaceAccessible" - - // ConditionReady indicates whether the TemporalWorkerDeployment is fully reconciled + // ConditionRolloutReady indicates whether the TemporalWorkerDeployment is fully reconciled // and operational. - ConditionReady = "Ready" + ConditionRolloutReady = "RolloutReady" ) // VersionStatus indicates the status of a version. diff --git a/internal/controller/reconciler_events_test.go b/internal/controller/reconciler_events_test.go index db02a3c8..48d8addc 100644 --- a/internal/controller/reconciler_events_test.go +++ b/internal/controller/reconciler_events_test.go @@ -207,8 +207,8 @@ func TestReconcile_TemporalConnectionNotFound_SetsCondition(t *testing.T) { var updated temporaliov1alpha1.TemporalWorkerDeployment require.NoError(t, r.Get(context.Background(), types.NamespacedName{Name: "test-worker", Namespace: "default"}, &updated)) - cond := meta.FindStatusCondition(updated.Status.Conditions, temporaliov1alpha1.ConditionTemporalConnectionValid) - require.NotNil(t, cond, "TemporalConnectionValid condition should be set") + cond := meta.FindStatusCondition(updated.Status.Conditions, temporaliov1alpha1.ConditionTemporalConnectionHealthy) + require.NotNil(t, cond, "TemporalConnectionHealthy condition should be set") assert.Equal(t, metav1.ConditionFalse, cond.Status) assert.Equal(t, "TemporalConnectionNotFound", cond.Reason) assert.Contains(t, cond.Message, "nonexistent-connection") @@ -266,16 +266,11 @@ func TestReconcile_TemporalClientCreationFailed_EmitsEventAndCondition(t *testin var updated temporaliov1alpha1.TemporalWorkerDeployment require.NoError(t, r.Get(context.Background(), types.NamespacedName{Name: "test-worker", Namespace: "default"}, &updated)) - // TemporalConnectionValid should be True (connection was fetched successfully) - connCond := meta.FindStatusCondition(updated.Status.Conditions, temporaliov1alpha1.ConditionTemporalConnectionValid) + // TemporalConnectionHealthy should be False (client creation failed, overwriting the earlier True) + connCond := meta.FindStatusCondition(updated.Status.Conditions, temporaliov1alpha1.ConditionTemporalConnectionHealthy) require.NotNil(t, connCond) - assert.Equal(t, metav1.ConditionTrue, connCond.Status) - - // TemporalNamespaceAccessible should be False (client creation failed) - nsCond := meta.FindStatusCondition(updated.Status.Conditions, temporaliov1alpha1.ConditionTemporalNamespaceAccessible) - require.NotNil(t, nsCond) - assert.Equal(t, metav1.ConditionFalse, nsCond.Status) - assert.Equal(t, "TemporalClientCreationFailed", nsCond.Reason) + assert.Equal(t, metav1.ConditionFalse, connCond.Status) + assert.Equal(t, "TemporalClientCreationFailed", connCond.Reason) } func TestReconcile_TWDNotFound_NoEvent(t *testing.T) { @@ -296,10 +291,10 @@ func TestSetCondition_SetsNewCondition(t *testing.T) { twd := makeTWD("test-worker", "default", "my-connection") r, _ := newTestReconciler(nil) - r.setCondition(twd, temporaliov1alpha1.ConditionReady, metav1.ConditionTrue, "TestReason", "Test message") + r.setCondition(twd, temporaliov1alpha1.ConditionRolloutReady, metav1.ConditionTrue, "TestReason", "Test message") require.Len(t, twd.Status.Conditions, 1) - assert.Equal(t, temporaliov1alpha1.ConditionReady, twd.Status.Conditions[0].Type) + assert.Equal(t, temporaliov1alpha1.ConditionRolloutReady, twd.Status.Conditions[0].Type) assert.Equal(t, metav1.ConditionTrue, twd.Status.Conditions[0].Status) assert.Equal(t, "TestReason", twd.Status.Conditions[0].Reason) assert.Equal(t, "Test message", twd.Status.Conditions[0].Message) @@ -311,11 +306,11 @@ func TestSetCondition_UpdatesExistingCondition(t *testing.T) { r, _ := newTestReconciler(nil) // Set initial condition - r.setCondition(twd, temporaliov1alpha1.ConditionReady, metav1.ConditionTrue, "InitialReason", "Initial message") + r.setCondition(twd, temporaliov1alpha1.ConditionRolloutReady, metav1.ConditionTrue, "InitialReason", "Initial message") require.Len(t, twd.Status.Conditions, 1) // Update the condition - r.setCondition(twd, temporaliov1alpha1.ConditionReady, metav1.ConditionFalse, "UpdatedReason", "Updated message") + r.setCondition(twd, temporaliov1alpha1.ConditionRolloutReady, metav1.ConditionFalse, "UpdatedReason", "Updated message") // Should still be exactly 1 condition, not 2 require.Len(t, twd.Status.Conditions, 1) @@ -328,21 +323,16 @@ func TestSetCondition_MultipleDifferentConditions(t *testing.T) { twd := makeTWD("test-worker", "default", "my-connection") r, _ := newTestReconciler(nil) - r.setCondition(twd, temporaliov1alpha1.ConditionTemporalConnectionValid, metav1.ConditionTrue, "Valid", "Connection is valid") - r.setCondition(twd, temporaliov1alpha1.ConditionTemporalNamespaceAccessible, metav1.ConditionTrue, "Accessible", "Namespace is accessible") - r.setCondition(twd, temporaliov1alpha1.ConditionReady, metav1.ConditionTrue, "Ready", "All good") + r.setCondition(twd, temporaliov1alpha1.ConditionTemporalConnectionHealthy, metav1.ConditionTrue, "Healthy", "Connection is healthy") + r.setCondition(twd, temporaliov1alpha1.ConditionRolloutReady, metav1.ConditionTrue, "Ready", "All good") - require.Len(t, twd.Status.Conditions, 3) + require.Len(t, twd.Status.Conditions, 2) - connCond := meta.FindStatusCondition(twd.Status.Conditions, temporaliov1alpha1.ConditionTemporalConnectionValid) + connCond := meta.FindStatusCondition(twd.Status.Conditions, temporaliov1alpha1.ConditionTemporalConnectionHealthy) require.NotNil(t, connCond) assert.Equal(t, metav1.ConditionTrue, connCond.Status) - nsCond := meta.FindStatusCondition(twd.Status.Conditions, temporaliov1alpha1.ConditionTemporalNamespaceAccessible) - require.NotNil(t, nsCond) - assert.Equal(t, metav1.ConditionTrue, nsCond.Status) - - readyCond := meta.FindStatusCondition(twd.Status.Conditions, temporaliov1alpha1.ConditionReady) + readyCond := meta.FindStatusCondition(twd.Status.Conditions, temporaliov1alpha1.ConditionRolloutReady) require.NotNil(t, readyCond) assert.Equal(t, metav1.ConditionTrue, readyCond.Status) } @@ -393,17 +383,13 @@ func TestReconcile_ConnectionValid_ThenClientFails_ConditionsReflectBoth(t *test events := drainEvents(recorder) assertEventEmitted(t, events, "TemporalClientCreationFailed") - // Verify conditions: connection valid but namespace not accessible + // Verify condition: TemporalConnectionHealthy should be False (client creation failed) var updated temporaliov1alpha1.TemporalWorkerDeployment require.NoError(t, r.Get(context.Background(), types.NamespacedName{Name: "test-worker", Namespace: "default"}, &updated)) - connCond := meta.FindStatusCondition(updated.Status.Conditions, temporaliov1alpha1.ConditionTemporalConnectionValid) - require.NotNil(t, connCond, "TemporalConnectionValid condition should be set") - assert.Equal(t, metav1.ConditionTrue, connCond.Status, "connection was fetched successfully") - - nsCond := meta.FindStatusCondition(updated.Status.Conditions, temporaliov1alpha1.ConditionTemporalNamespaceAccessible) - require.NotNil(t, nsCond, "TemporalNamespaceAccessible condition should be set") - assert.Equal(t, metav1.ConditionFalse, nsCond.Status, "client creation should have failed") + connCond := meta.FindStatusCondition(updated.Status.Conditions, temporaliov1alpha1.ConditionTemporalConnectionHealthy) + require.NotNil(t, connCond, "TemporalConnectionHealthy condition should be set") + assert.Equal(t, metav1.ConditionFalse, connCond.Status, "client creation should have failed") } func TestReconcile_EventMessageContainsUsefulContext(t *testing.T) { diff --git a/internal/controller/worker_controller.go b/internal/controller/worker_controller.go index 9641b894..8a751fc7 100644 --- a/internal/controller/worker_controller.go +++ b/internal/controller/worker_controller.go @@ -152,7 +152,7 @@ func (r *TemporalWorkerDeploymentReconciler) Reconcile(ctx context.Context, req l.Error(err, "unable to fetch TemporalConnection") r.Recorder.Eventf(&workerDeploy, corev1.EventTypeWarning, "TemporalConnectionNotFound", "Unable to fetch TemporalConnection %q: %v", workerDeploy.Spec.WorkerOptions.TemporalConnectionRef.Name, err) - r.setCondition(&workerDeploy, temporaliov1alpha1.ConditionTemporalConnectionValid, metav1.ConditionFalse, + r.setCondition(&workerDeploy, temporaliov1alpha1.ConditionTemporalConnectionHealthy, metav1.ConditionFalse, "TemporalConnectionNotFound", fmt.Sprintf("TemporalConnection %q not found: %v", workerDeploy.Spec.WorkerOptions.TemporalConnectionRef.Name, err)) _ = r.Status().Update(ctx, &workerDeploy) return ctrl.Result{}, err @@ -164,15 +164,15 @@ func (r *TemporalWorkerDeploymentReconciler) Reconcile(ctx context.Context, req l.Error(err, "unable to resolve auth secret name") r.Recorder.Eventf(&workerDeploy, corev1.EventTypeWarning, "AuthSecretInvalid", "Unable to resolve auth secret from TemporalConnection %q: %v", temporalConnection.Name, err) - r.setCondition(&workerDeploy, temporaliov1alpha1.ConditionTemporalConnectionValid, metav1.ConditionFalse, + r.setCondition(&workerDeploy, temporaliov1alpha1.ConditionTemporalConnectionHealthy, metav1.ConditionFalse, "AuthSecretInvalid", fmt.Sprintf("Unable to resolve auth secret: %v", err)) _ = r.Status().Update(ctx, &workerDeploy) return ctrl.Result{}, err } // Mark TemporalConnection as valid since we fetched it and resolved auth - r.setCondition(&workerDeploy, temporaliov1alpha1.ConditionTemporalConnectionValid, metav1.ConditionTrue, - "TemporalConnectionValid", "TemporalConnection is valid and auth secret is resolved") + r.setCondition(&workerDeploy, temporaliov1alpha1.ConditionTemporalConnectionHealthy, metav1.ConditionTrue, + "TemporalConnectionHealthy", "TemporalConnection is healthy and auth secret is resolved") // Get or update temporal client for connection temporalClient, ok := r.TemporalClientPool.GetSDKClient(clientpool.ClientPoolKey{ @@ -191,7 +191,7 @@ func (r *TemporalWorkerDeploymentReconciler) Reconcile(ctx context.Context, req l.Error(err, "unable to create TemporalClient") r.Recorder.Eventf(&workerDeploy, corev1.EventTypeWarning, "TemporalClientCreationFailed", "Unable to create Temporal client for %s:%s: %v", temporalConnection.Spec.HostPort, workerDeploy.Spec.WorkerOptions.TemporalNamespace, err) - r.setCondition(&workerDeploy, temporaliov1alpha1.ConditionTemporalNamespaceAccessible, metav1.ConditionFalse, + r.setCondition(&workerDeploy, temporaliov1alpha1.ConditionTemporalConnectionHealthy, metav1.ConditionFalse, "TemporalClientCreationFailed", fmt.Sprintf("Failed to connect to Temporal: %v", err)) _ = r.Status().Update(ctx, &workerDeploy) return ctrl.Result{}, err @@ -228,16 +228,12 @@ func (r *TemporalWorkerDeploymentReconciler) Reconcile(ctx context.Context, req if err != nil { r.Recorder.Eventf(&workerDeploy, corev1.EventTypeWarning, "TemporalStateFetchFailed", "Unable to get Temporal worker deployment state: %v", err) - r.setCondition(&workerDeploy, temporaliov1alpha1.ConditionTemporalNamespaceAccessible, metav1.ConditionFalse, + r.setCondition(&workerDeploy, temporaliov1alpha1.ConditionTemporalConnectionHealthy, metav1.ConditionFalse, "TemporalStateFetchFailed", fmt.Sprintf("Failed to query Temporal worker deployment state: %v", err)) _ = r.Status().Update(ctx, &workerDeploy) return ctrl.Result{}, fmt.Errorf("unable to get Temporal worker deployment state: %w", err) } - // Mark Temporal namespace as accessible since we successfully queried state - r.setCondition(&workerDeploy, temporaliov1alpha1.ConditionTemporalNamespaceAccessible, metav1.ConditionTrue, - "TemporalNamespaceAccessible", "Successfully connected to Temporal namespace") - // Compute a new status from k8s and temporal state status, err := r.generateStatus(ctx, l, temporalClient, req, &workerDeploy, temporalState, k8sState) if err != nil { @@ -283,7 +279,7 @@ func (r *TemporalWorkerDeploymentReconciler) Reconcile(ctx context.Context, req // Mark as Ready on successful reconciliation if workerDeploy.Status.TargetVersion.BuildID == workerDeploy.Status.CurrentVersion.BuildID { - r.setCondition(&workerDeploy, temporaliov1alpha1.ConditionReady, metav1.ConditionTrue, + r.setCondition(&workerDeploy, temporaliov1alpha1.ConditionRolloutReady, metav1.ConditionTrue, "RolloutSucceeded", "Target version rollout complete "+workerDeploy.Status.TargetVersion.BuildID) } From 6c8e4f8243a989c1ba640bbcc8197c295408efac Mon Sep 17 00:00:00 2001 From: Rahul Kekre Date: Sat, 21 Feb 2026 06:12:21 -0800 Subject: [PATCH 03/13] fix the breaking tests --- cmd/main.go | 2 +- internal/controller/reconciler_events_test.go | 8 ++++---- internal/controller/worker_controller.go | 3 ++- internal/tests/internal/env_helpers.go | 1 + 4 files changed, 8 insertions(+), 6 deletions(-) diff --git a/cmd/main.go b/cmd/main.go index c171d1b1..4ebe6e4d 100644 --- a/cmd/main.go +++ b/cmd/main.go @@ -91,7 +91,7 @@ func main() { }))), mgr.GetClient(), ), - Recorder: mgr.GetEventRecorderFor("temporal-worker-controller"), + Recorder: mgr.GetEventRecorderFor("temporal-worker-controller"), MaxDeploymentVersionsIneligibleForDeletion: controller.GetControllerMaxDeploymentVersionsIneligibleForDeletion(), }).SetupWithManager(mgr); err != nil { setupLog.Error(err, "unable to create controller", "controller", "TemporalWorkerDeployment") diff --git a/internal/controller/reconciler_events_test.go b/internal/controller/reconciler_events_test.go index 48d8addc..6be9b541 100644 --- a/internal/controller/reconciler_events_test.go +++ b/internal/controller/reconciler_events_test.go @@ -64,10 +64,10 @@ func newTestReconcilerWithInterceptors(objs []client.Object, funcs interceptor.F recorder := record.NewFakeRecorder(10) r := &TemporalWorkerDeploymentReconciler{ - Client: fakeClient, - Scheme: scheme, - TemporalClientPool: clientpool.New(nil, fakeClient), - Recorder: recorder, + Client: fakeClient, + Scheme: scheme, + TemporalClientPool: clientpool.New(nil, fakeClient), + Recorder: recorder, DisableRecoverPanic: true, MaxDeploymentVersionsIneligibleForDeletion: 75, } diff --git a/internal/controller/worker_controller.go b/internal/controller/worker_controller.go index 8a751fc7..74fba1d3 100644 --- a/internal/controller/worker_controller.go +++ b/internal/controller/worker_controller.go @@ -278,7 +278,8 @@ func (r *TemporalWorkerDeploymentReconciler) Reconcile(ctx context.Context, req } // Mark as Ready on successful reconciliation - if workerDeploy.Status.TargetVersion.BuildID == workerDeploy.Status.CurrentVersion.BuildID { + if workerDeploy.Status.CurrentVersion != nil && + workerDeploy.Status.TargetVersion.BuildID == workerDeploy.Status.CurrentVersion.BuildID { r.setCondition(&workerDeploy, temporaliov1alpha1.ConditionRolloutReady, metav1.ConditionTrue, "RolloutSucceeded", "Target version rollout complete "+workerDeploy.Status.TargetVersion.BuildID) } diff --git a/internal/tests/internal/env_helpers.go b/internal/tests/internal/env_helpers.go index c8aebd02..ca261129 100644 --- a/internal/tests/internal/env_helpers.go +++ b/internal/tests/internal/env_helpers.go @@ -154,6 +154,7 @@ func setupTestEnvironment(t *testing.T) (*rest.Config, client.Client, manager.Ma Client: mgr.GetClient(), Scheme: mgr.GetScheme(), TemporalClientPool: clientPool, + Recorder: mgr.GetEventRecorderFor("temporal-worker-controller"), DisableRecoverPanic: true, MaxDeploymentVersionsIneligibleForDeletion: controller.GetControllerMaxDeploymentVersionsIneligibleForDeletion(), } From 5900793938e6de3c5d85291844860b1a5907c8d3 Mon Sep 17 00:00:00 2001 From: Rahul Kekre Date: Sat, 21 Feb 2026 12:27:39 -0800 Subject: [PATCH 04/13] Fix golangci-lint errors: RBAC comment spacing and deprecated Requeue usage Co-Authored-By: Claude Opus 4.6 --- internal/controller/reconciler_events_test.go | 2 +- internal/controller/worker_controller.go | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/internal/controller/reconciler_events_test.go b/internal/controller/reconciler_events_test.go index 6be9b541..d1288c4d 100644 --- a/internal/controller/reconciler_events_test.go +++ b/internal/controller/reconciler_events_test.go @@ -355,7 +355,7 @@ func TestReconcile_ValidationFailure_NoEventEmitted(t *testing.T) { // Validation failures don't return errors — they requeue after 5 minutes require.NoError(t, err) - assert.True(t, result.Requeue, "should requeue on validation failure") + assert.NotZero(t, result.RequeueAfter, "should requeue on validation failure") // No events should be emitted for validation failures (user just needs to fix their spec) events := drainEvents(recorder) diff --git a/internal/controller/worker_controller.go b/internal/controller/worker_controller.go index 74fba1d3..726684f9 100644 --- a/internal/controller/worker_controller.go +++ b/internal/controller/worker_controller.go @@ -99,7 +99,7 @@ type TemporalWorkerDeploymentReconciler struct { //+kubebuilder:rbac:groups=core,resources=secrets,verbs=get;list;watch //+kubebuilder:rbac:groups=apps,resources=deployments,verbs=get;list;watch;create;update;patch;delete //+kubebuilder:rbac:groups=apps,resources=deployments/scale,verbs=update -//+kubebuilder:rbac:groups="",resources=events,verbs=create;patch +// +kubebuilder:rbac:groups="",resources=events,verbs=create;patch // Reconcile is part of the main kubernetes reconciliation loop which aims to // move the current state of the cluster closer to the desired state. From e65c9d127346c77e5b8e874478a6a65a2be991a4 Mon Sep 17 00:00:00 2001 From: Rahul Kekre Date: Sat, 21 Feb 2026 12:52:46 -0800 Subject: [PATCH 05/13] Refactor executePlan to reduce cognitive complexity --- internal/controller/execplan.go | 118 +++++++++++++++++++------------- 1 file changed, 72 insertions(+), 46 deletions(-) diff --git a/internal/controller/execplan.go b/internal/controller/execplan.go index c9573391..9e462c6a 100644 --- a/internal/controller/execplan.go +++ b/internal/controller/execplan.go @@ -23,7 +23,7 @@ import ( "sigs.k8s.io/controller-runtime/pkg/client" ) -func (r *TemporalWorkerDeploymentReconciler) executePlan(ctx context.Context, l logr.Logger, workerDeploy *temporaliov1alpha1.TemporalWorkerDeployment, temporalClient sdkclient.Client, p *plan) error { +func (r *TemporalWorkerDeploymentReconciler) executeK8sOperations(ctx context.Context, l logr.Logger, workerDeploy *temporaliov1alpha1.TemporalWorkerDeployment, p *plan) error { // Create deployment if p.CreateDeployment != nil { l.Info("creating deployment", "deployment", p.CreateDeployment) @@ -45,6 +45,7 @@ func (r *TemporalWorkerDeploymentReconciler) executePlan(ctx context.Context, l return err } } + // Scale deployments for d, replicas := range p.ScaleDeployments { l.Info("scaling deployment", "deployment", d, "replicas", replicas) @@ -75,9 +76,10 @@ func (r *TemporalWorkerDeploymentReconciler) executePlan(ctx context.Context, l } } - // Get deployment handler - deploymentHandler := temporalClient.WorkerDeploymentClient().GetHandle(p.WorkerDeploymentName) + return nil +} +func (r *TemporalWorkerDeploymentReconciler) startTestWorkflows(ctx context.Context, l logr.Logger, workerDeploy *temporaliov1alpha1.TemporalWorkerDeployment, temporalClient sdkclient.Client, p *plan) error { for _, wf := range p.startTestWorkflows { // Log workflow start details if len(wf.input) > 0 { @@ -144,54 +146,78 @@ func (r *TemporalWorkerDeploymentReconciler) executePlan(ctx context.Context, l return fmt.Errorf("unable to start test workflow execution: %w", err) } } + return nil +} - // Register current version or ramps - if vcfg := p.UpdateVersionConfig; vcfg != nil { - if vcfg.SetCurrent { - l.Info("registering new current version", "buildID", vcfg.BuildID) - if _, err := deploymentHandler.SetCurrentVersion(ctx, sdkclient.WorkerDeploymentSetCurrentVersionOptions{ - BuildID: vcfg.BuildID, - ConflictToken: vcfg.ConflictToken, - Identity: getControllerIdentity(), - }); err != nil { - r.Recorder.Eventf(workerDeploy, corev1.EventTypeWarning, "VersionRegistrationFailed", - "Failed to set buildID %q as current version: %v", vcfg.BuildID, err) - return fmt.Errorf("unable to set current deployment version: %w", err) - } +func (r *TemporalWorkerDeploymentReconciler) updateVersionConfig(ctx context.Context, l logr.Logger, workerDeploy *temporaliov1alpha1.TemporalWorkerDeployment, deploymentHandler sdkclient.WorkerDeploymentHandle, p *plan) error { + vcfg := p.UpdateVersionConfig + if vcfg == nil { + return nil + } + + if vcfg.SetCurrent { + l.Info("registering new current version", "buildID", vcfg.BuildID) + if _, err := deploymentHandler.SetCurrentVersion(ctx, sdkclient.WorkerDeploymentSetCurrentVersionOptions{ + BuildID: vcfg.BuildID, + ConflictToken: vcfg.ConflictToken, + Identity: getControllerIdentity(), + }); err != nil { + r.Recorder.Eventf(workerDeploy, corev1.EventTypeWarning, "VersionRegistrationFailed", + "Failed to set buildID %q as current version: %v", vcfg.BuildID, err) + return fmt.Errorf("unable to set current deployment version: %w", err) + } + } else { + if vcfg.RampPercentage > 0 { + l.Info("applying ramp", "buildID", vcfg.BuildID, "percentage", vcfg.RampPercentage) } else { - if vcfg.RampPercentage > 0 { - l.Info("applying ramp", "buildID", vcfg.BuildID, "percentage", vcfg.RampPercentage) - } else { - l.Info("deleting ramp", "buildID", vcfg.BuildID) - } + l.Info("deleting ramp", "buildID", vcfg.BuildID) + } - if _, err := deploymentHandler.SetRampingVersion(ctx, sdkclient.WorkerDeploymentSetRampingVersionOptions{ - BuildID: vcfg.BuildID, - Percentage: float32(vcfg.RampPercentage), - ConflictToken: vcfg.ConflictToken, - Identity: getControllerIdentity(), - }); err != nil { - r.Recorder.Eventf(workerDeploy, corev1.EventTypeWarning, "VersionRegistrationFailed", - "Failed to set buildID %q as ramping version (%.1f%%): %v", vcfg.BuildID, vcfg.RampPercentage, err) - return fmt.Errorf("unable to set ramping deployment version: %w", err) - } + if _, err := deploymentHandler.SetRampingVersion(ctx, sdkclient.WorkerDeploymentSetRampingVersionOptions{ + BuildID: vcfg.BuildID, + Percentage: float32(vcfg.RampPercentage), + ConflictToken: vcfg.ConflictToken, + Identity: getControllerIdentity(), + }); err != nil { + r.Recorder.Eventf(workerDeploy, corev1.EventTypeWarning, "VersionRegistrationFailed", + "Failed to set buildID %q as ramping version (%.1f%%): %v", vcfg.BuildID, vcfg.RampPercentage, err) + return fmt.Errorf("unable to set ramping deployment version: %w", err) } - if _, err := deploymentHandler.UpdateVersionMetadata(ctx, sdkclient.WorkerDeploymentUpdateVersionMetadataOptions{ - Version: worker.WorkerDeploymentVersion{ - DeploymentName: p.WorkerDeploymentName, - BuildId: vcfg.BuildID, - }, - MetadataUpdate: sdkclient.WorkerDeploymentMetadataUpdate{ - UpsertEntries: map[string]interface{}{ - controllerIdentityMetadataKey: getControllerIdentity(), - controllerVersionMetadataKey: getControllerVersion(), - }, + } + + if _, err := deploymentHandler.UpdateVersionMetadata(ctx, sdkclient.WorkerDeploymentUpdateVersionMetadataOptions{ + Version: worker.WorkerDeploymentVersion{ + DeploymentName: p.WorkerDeploymentName, + BuildId: vcfg.BuildID, + }, + MetadataUpdate: sdkclient.WorkerDeploymentMetadataUpdate{ + UpsertEntries: map[string]interface{}{ + controllerIdentityMetadataKey: getControllerIdentity(), + controllerVersionMetadataKey: getControllerVersion(), }, - }); err != nil { // would be cool to do this atomically with the update - r.Recorder.Eventf(workerDeploy, corev1.EventTypeWarning, "MetadataUpdateFailed", - "Failed to update version metadata for buildID %q: %v", vcfg.BuildID, err) - return fmt.Errorf("unable to update metadata after setting current deployment: %w", err) - } + }, + }); err != nil { // would be cool to do this atomically with the update + r.Recorder.Eventf(workerDeploy, corev1.EventTypeWarning, "MetadataUpdateFailed", + "Failed to update version metadata for buildID %q: %v", vcfg.BuildID, err) + return fmt.Errorf("unable to update metadata after setting current deployment: %w", err) + } + + return nil +} + +func (r *TemporalWorkerDeploymentReconciler) executePlan(ctx context.Context, l logr.Logger, workerDeploy *temporaliov1alpha1.TemporalWorkerDeployment, temporalClient sdkclient.Client, p *plan) error { + if err := r.executeK8sOperations(ctx, l, workerDeploy, p); err != nil { + return err + } + + deploymentHandler := temporalClient.WorkerDeploymentClient().GetHandle(p.WorkerDeploymentName) + + if err := r.startTestWorkflows(ctx, l, workerDeploy, temporalClient, p); err != nil { + return err + } + + if err := r.updateVersionConfig(ctx, l, workerDeploy, deploymentHandler, p); err != nil { + return err } for _, buildId := range p.RemoveIgnoreLastModifierBuilds { From df3448aa0bf7033532e5a27c9d70444d749fe259 Mon Sep 17 00:00:00 2001 From: Rahul Kekre Date: Thu, 26 Feb 2026 10:50:34 -0800 Subject: [PATCH 06/13] addressing the comments --- internal/controller/execplan.go | 4 +++- internal/controller/worker_controller.go | 9 +-------- 2 files changed, 4 insertions(+), 9 deletions(-) diff --git a/internal/controller/execplan.go b/internal/controller/execplan.go index 9e462c6a..5051afa7 100644 --- a/internal/controller/execplan.go +++ b/internal/controller/execplan.go @@ -142,7 +142,7 @@ func (r *TemporalWorkerDeploymentReconciler) startTestWorkflows(ctx context.Cont } if err != nil { r.Recorder.Eventf(workerDeploy, corev1.EventTypeWarning, "TestWorkflowStartFailed", - "Failed to start gate workflow %q (buildID %s): %v", wf.workflowType, wf.buildID, err) + "Failed to start gate workflow %q (buildID %s, taskQueue %s): %v", wf.workflowType, wf.buildID, wf.taskQueue, err) return fmt.Errorf("unable to start test workflow execution: %w", err) } } @@ -166,6 +166,8 @@ func (r *TemporalWorkerDeploymentReconciler) updateVersionConfig(ctx context.Con "Failed to set buildID %q as current version: %v", vcfg.BuildID, err) return fmt.Errorf("unable to set current deployment version: %w", err) } + r.setCondition(workerDeploy, temporaliov1alpha1.ConditionRolloutReady, metav1.ConditionTrue, + "RolloutComplete", fmt.Sprintf("Rollout complete for buildID %s", vcfg.BuildID)) } else { if vcfg.RampPercentage > 0 { l.Info("applying ramp", "buildID", vcfg.BuildID, "percentage", vcfg.RampPercentage) diff --git a/internal/controller/worker_controller.go b/internal/controller/worker_controller.go index 726684f9..b8f121ab 100644 --- a/internal/controller/worker_controller.go +++ b/internal/controller/worker_controller.go @@ -99,7 +99,7 @@ type TemporalWorkerDeploymentReconciler struct { //+kubebuilder:rbac:groups=core,resources=secrets,verbs=get;list;watch //+kubebuilder:rbac:groups=apps,resources=deployments,verbs=get;list;watch;create;update;patch;delete //+kubebuilder:rbac:groups=apps,resources=deployments/scale,verbs=update -// +kubebuilder:rbac:groups="",resources=events,verbs=create;patch +// +kubebuilder:rbac:groups=events.k8s.io,resources=events,verbs=create;patch // Reconcile is part of the main kubernetes reconciliation loop which aims to // move the current state of the cluster closer to the desired state. @@ -277,13 +277,6 @@ func (r *TemporalWorkerDeploymentReconciler) Reconcile(ctx context.Context, req return ctrl.Result{}, err } - // Mark as Ready on successful reconciliation - if workerDeploy.Status.CurrentVersion != nil && - workerDeploy.Status.TargetVersion.BuildID == workerDeploy.Status.CurrentVersion.BuildID { - r.setCondition(&workerDeploy, temporaliov1alpha1.ConditionRolloutReady, metav1.ConditionTrue, - "RolloutSucceeded", "Target version rollout complete "+workerDeploy.Status.TargetVersion.BuildID) - } - return ctrl.Result{ Requeue: true, // TODO(jlegrone): Consider increasing this value if the only thing we need to check for is unreachable versions. From 0b76e067756f45a9e1b64bf0b99d4397c200d3ba Mon Sep 17 00:00:00 2001 From: Rahul Kekre Date: Fri, 27 Feb 2026 14:52:31 -0800 Subject: [PATCH 07/13] additional refactoring and addressing comments --- .gitignore | 1 + api/v1alpha1/worker_types.go | 4 +- internal/controller/execplan.go | 5 +++ internal/controller/worker_controller.go | 52 +++++++++++++++--------- 4 files changed, 40 insertions(+), 22 deletions(-) diff --git a/.gitignore b/.gitignore index f71ef8c0..a8c7eab4 100644 --- a/.gitignore +++ b/.gitignore @@ -8,6 +8,7 @@ skaffold.env certs .DS_Store +cover.out .claude config \ No newline at end of file diff --git a/api/v1alpha1/worker_types.go b/api/v1alpha1/worker_types.go index 5b8bf903..79fe99ed 100644 --- a/api/v1alpha1/worker_types.go +++ b/api/v1alpha1/worker_types.go @@ -92,8 +92,8 @@ const ( // resource exists and is properly configured. ConditionTemporalConnectionHealthy = "TemporalConnectionHealthy" - // ConditionRolloutReady indicates whether the TemporalWorkerDeployment is fully reconciled - // and operational. + // ConditionRolloutReady indicates whether the target version has been successfully + // registered as the current version, completing the rollout. ConditionRolloutReady = "RolloutReady" ) diff --git a/internal/controller/execplan.go b/internal/controller/execplan.go index 5051afa7..16226568 100644 --- a/internal/controller/execplan.go +++ b/internal/controller/execplan.go @@ -141,6 +141,7 @@ func (r *TemporalWorkerDeploymentReconciler) startTestWorkflows(ctx context.Cont _, err = temporalClient.ExecuteWorkflow(ctx, opts, wf.workflowType) } if err != nil { + l.Error(err, "unable to start test workflow execution", "workflowType", wf.workflowType, "buildID", wf.buildID, "taskQueue", wf.taskQueue) r.Recorder.Eventf(workerDeploy, corev1.EventTypeWarning, "TestWorkflowStartFailed", "Failed to start gate workflow %q (buildID %s, taskQueue %s): %v", wf.workflowType, wf.buildID, wf.taskQueue, err) return fmt.Errorf("unable to start test workflow execution: %w", err) @@ -162,6 +163,7 @@ func (r *TemporalWorkerDeploymentReconciler) updateVersionConfig(ctx context.Con ConflictToken: vcfg.ConflictToken, Identity: getControllerIdentity(), }); err != nil { + l.Error(err, "unable to set current deployment version", "buildID", vcfg.BuildID) r.Recorder.Eventf(workerDeploy, corev1.EventTypeWarning, "VersionRegistrationFailed", "Failed to set buildID %q as current version: %v", vcfg.BuildID, err) return fmt.Errorf("unable to set current deployment version: %w", err) @@ -181,6 +183,7 @@ func (r *TemporalWorkerDeploymentReconciler) updateVersionConfig(ctx context.Con ConflictToken: vcfg.ConflictToken, Identity: getControllerIdentity(), }); err != nil { + l.Error(err, "unable to set ramping deployment version", "buildID", vcfg.BuildID, "percentage", vcfg.RampPercentage) r.Recorder.Eventf(workerDeploy, corev1.EventTypeWarning, "VersionRegistrationFailed", "Failed to set buildID %q as ramping version (%.1f%%): %v", vcfg.BuildID, vcfg.RampPercentage, err) return fmt.Errorf("unable to set ramping deployment version: %w", err) @@ -199,6 +202,7 @@ func (r *TemporalWorkerDeploymentReconciler) updateVersionConfig(ctx context.Con }, }, }); err != nil { // would be cool to do this atomically with the update + l.Error(err, "unable to update version metadata", "buildID", vcfg.BuildID) r.Recorder.Eventf(workerDeploy, corev1.EventTypeWarning, "MetadataUpdateFailed", "Failed to update version metadata for buildID %q: %v", vcfg.BuildID, err) return fmt.Errorf("unable to update metadata after setting current deployment: %w", err) @@ -232,6 +236,7 @@ func (r *TemporalWorkerDeploymentReconciler) executePlan(ctx context.Context, l RemoveEntries: []string{temporal.IgnoreLastModifierKey}, }, }); err != nil { + l.Error(err, "unable to remove ignore-last-modifier metadata", "buildID", buildId) return fmt.Errorf("unable to update metadata to remove %s deployment: %w", temporal.IgnoreLastModifierKey, err) } } diff --git a/internal/controller/worker_controller.go b/internal/controller/worker_controller.go index b8f121ab..2aee63e7 100644 --- a/internal/controller/worker_controller.go +++ b/internal/controller/worker_controller.go @@ -150,11 +150,10 @@ func (r *TemporalWorkerDeploymentReconciler) Reconcile(ctx context.Context, req Namespace: workerDeploy.Namespace, }, &temporalConnection); err != nil { l.Error(err, "unable to fetch TemporalConnection") - r.Recorder.Eventf(&workerDeploy, corev1.EventTypeWarning, "TemporalConnectionNotFound", - "Unable to fetch TemporalConnection %q: %v", workerDeploy.Spec.WorkerOptions.TemporalConnectionRef.Name, err) - r.setCondition(&workerDeploy, temporaliov1alpha1.ConditionTemporalConnectionHealthy, metav1.ConditionFalse, - "TemporalConnectionNotFound", fmt.Sprintf("TemporalConnection %q not found: %v", workerDeploy.Spec.WorkerOptions.TemporalConnectionRef.Name, err)) - _ = r.Status().Update(ctx, &workerDeploy) + r.recordWarningAndSetCondition(ctx, &workerDeploy, temporaliov1alpha1.ConditionTemporalConnectionHealthy, + "TemporalConnectionNotFound", + fmt.Sprintf("Unable to fetch TemporalConnection %q: %v", workerDeploy.Spec.WorkerOptions.TemporalConnectionRef.Name, err), + fmt.Sprintf("TemporalConnection %q not found: %v", workerDeploy.Spec.WorkerOptions.TemporalConnectionRef.Name, err)) return ctrl.Result{}, err } @@ -162,11 +161,10 @@ func (r *TemporalWorkerDeploymentReconciler) Reconcile(ctx context.Context, req authMode, secretName, err := resolveAuthSecretName(&temporalConnection) if err != nil { l.Error(err, "unable to resolve auth secret name") - r.Recorder.Eventf(&workerDeploy, corev1.EventTypeWarning, "AuthSecretInvalid", - "Unable to resolve auth secret from TemporalConnection %q: %v", temporalConnection.Name, err) - r.setCondition(&workerDeploy, temporaliov1alpha1.ConditionTemporalConnectionHealthy, metav1.ConditionFalse, - "AuthSecretInvalid", fmt.Sprintf("Unable to resolve auth secret: %v", err)) - _ = r.Status().Update(ctx, &workerDeploy) + r.recordWarningAndSetCondition(ctx, &workerDeploy, temporaliov1alpha1.ConditionTemporalConnectionHealthy, + "AuthSecretInvalid", + fmt.Sprintf("Unable to resolve auth secret from TemporalConnection %q: %v", temporalConnection.Name, err), + fmt.Sprintf("Unable to resolve auth secret: %v", err)) return ctrl.Result{}, err } @@ -189,11 +187,10 @@ func (r *TemporalWorkerDeploymentReconciler) Reconcile(ctx context.Context, req }) if err != nil { l.Error(err, "unable to create TemporalClient") - r.Recorder.Eventf(&workerDeploy, corev1.EventTypeWarning, "TemporalClientCreationFailed", - "Unable to create Temporal client for %s:%s: %v", temporalConnection.Spec.HostPort, workerDeploy.Spec.WorkerOptions.TemporalNamespace, err) - r.setCondition(&workerDeploy, temporaliov1alpha1.ConditionTemporalConnectionHealthy, metav1.ConditionFalse, - "TemporalClientCreationFailed", fmt.Sprintf("Failed to connect to Temporal: %v", err)) - _ = r.Status().Update(ctx, &workerDeploy) + r.recordWarningAndSetCondition(ctx, &workerDeploy, temporaliov1alpha1.ConditionTemporalConnectionHealthy, + "TemporalClientCreationFailed", + fmt.Sprintf("Unable to create Temporal client for %s:%s: %v", temporalConnection.Spec.HostPort, workerDeploy.Spec.WorkerOptions.TemporalNamespace, err), + fmt.Sprintf("Failed to connect to Temporal: %v", err)) return ctrl.Result{}, err } temporalClient = c @@ -226,11 +223,10 @@ func (r *TemporalWorkerDeploymentReconciler) Reconcile(ctx context.Context, req getControllerIdentity(), ) if err != nil { - r.Recorder.Eventf(&workerDeploy, corev1.EventTypeWarning, "TemporalStateFetchFailed", - "Unable to get Temporal worker deployment state: %v", err) - r.setCondition(&workerDeploy, temporaliov1alpha1.ConditionTemporalConnectionHealthy, metav1.ConditionFalse, - "TemporalStateFetchFailed", fmt.Sprintf("Failed to query Temporal worker deployment state: %v", err)) - _ = r.Status().Update(ctx, &workerDeploy) + r.recordWarningAndSetCondition(ctx, &workerDeploy, temporaliov1alpha1.ConditionTemporalConnectionHealthy, + "TemporalStateFetchFailed", + fmt.Sprintf("Unable to get Temporal worker deployment state: %v", err), + fmt.Sprintf("Failed to query Temporal worker deployment state: %v", err)) return ctrl.Result{}, fmt.Errorf("unable to get Temporal worker deployment state: %w", err) } @@ -267,6 +263,7 @@ func (r *TemporalWorkerDeploymentReconciler) Reconcile(ctx context.Context, req if err != nil { r.Recorder.Eventf(&workerDeploy, corev1.EventTypeWarning, "PlanGenerationFailed", "Unable to generate reconciliation plan: %v", err) + _ = r.Status().Update(ctx, &workerDeploy) return ctrl.Result{}, err } @@ -274,6 +271,7 @@ func (r *TemporalWorkerDeploymentReconciler) Reconcile(ctx context.Context, req if err := r.executePlan(ctx, l, &workerDeploy, temporalClient, plan); err != nil { r.Recorder.Eventf(&workerDeploy, corev1.EventTypeWarning, "PlanExecutionFailed", "Unable to execute reconciliation plan: %v", err) + _ = r.Status().Update(ctx, &workerDeploy) return ctrl.Result{}, err } @@ -302,6 +300,20 @@ func (r *TemporalWorkerDeploymentReconciler) setCondition( }) } +// recordWarningAndSetCondition emits a warning event, sets a condition to False, and persists the status update. +func (r *TemporalWorkerDeploymentReconciler) recordWarningAndSetCondition( + ctx context.Context, + workerDeploy *temporaliov1alpha1.TemporalWorkerDeployment, + conditionType string, + reason string, + eventMessage string, + conditionMessage string, +) { + r.Recorder.Eventf(workerDeploy, corev1.EventTypeWarning, reason, eventMessage) + r.setCondition(workerDeploy, conditionType, metav1.ConditionFalse, reason, conditionMessage) + _ = r.Status().Update(ctx, workerDeploy) +} + // SetupWithManager sets up the controller with the Manager. func (r *TemporalWorkerDeploymentReconciler) SetupWithManager(mgr ctrl.Manager) error { if err := mgr.GetFieldIndexer().IndexField(context.Background(), &appsv1.Deployment{}, deployOwnerKey, func(rawObj client.Object) []string { From 6e92e3a56bfc95f8dab14c369184d709a1f52420 Mon Sep 17 00:00:00 2001 From: Carly de Frondeville Date: Tue, 3 Mar 2026 15:33:57 -0800 Subject: [PATCH 08/13] standardize event/condition reason constants and split by API stability --- api/v1alpha1/worker_types.go | 36 +++++++++++++++++-- internal/controller/execplan.go | 20 +++++------ internal/controller/reconciler_events_test.go | 12 +++---- internal/controller/util.go | 18 ++++++++++ internal/controller/worker_controller.go | 26 +++++++------- 5 files changed, 81 insertions(+), 31 deletions(-) diff --git a/api/v1alpha1/worker_types.go b/api/v1alpha1/worker_types.go index 79fe99ed..c481d202 100644 --- a/api/v1alpha1/worker_types.go +++ b/api/v1alpha1/worker_types.go @@ -92,9 +92,41 @@ const ( // resource exists and is properly configured. ConditionTemporalConnectionHealthy = "TemporalConnectionHealthy" - // ConditionRolloutReady indicates whether the target version has been successfully + // ConditionRolloutComplete indicates whether the target version has been successfully // registered as the current version, completing the rollout. - ConditionRolloutReady = "RolloutReady" + ConditionRolloutComplete = "RolloutComplete" +) + +// Condition reason constants for TemporalWorkerDeployment. +// +// These strings appear in status.conditions[].reason and are part of the CRD's +// status API. Operators, monitoring rules, and scripts may depend on them. +// They should be treated as stable within an API version and renamed only with +// a corresponding version bump. +const ( + // ReasonTemporalConnectionNotFound is set on ConditionTemporalConnectionHealthy + // when the referenced TemporalConnection resource cannot be found. + ReasonTemporalConnectionNotFound = "TemporalConnectionNotFound" + + // ReasonAuthSecretInvalid is set on ConditionTemporalConnectionHealthy when + // the auth secret referenced by the TemporalConnection cannot be resolved. + ReasonAuthSecretInvalid = "AuthSecretInvalid" + + // ReasonTemporalClientCreationFailed is set on ConditionTemporalConnectionHealthy + // when a Temporal SDK client cannot be created for the connection. + ReasonTemporalClientCreationFailed = "TemporalClientCreationFailed" + + // ReasonTemporalStateFetchFailed is set on ConditionTemporalConnectionHealthy + // when the controller cannot query the current worker deployment state from Temporal. + ReasonTemporalStateFetchFailed = "TemporalStateFetchFailed" + + // ReasonTemporalConnectionHealthy is set on ConditionTemporalConnectionHealthy + // when the connection is reachable and the auth secret is resolved. + ReasonTemporalConnectionHealthy = "TemporalConnectionHealthy" + + // ReasonRolloutComplete is set on ConditionRolloutComplete when the target + // version has been successfully registered as the current version. + ReasonRolloutComplete = "RolloutComplete" ) // VersionStatus indicates the status of a version. diff --git a/internal/controller/execplan.go b/internal/controller/execplan.go index 16226568..5c510920 100644 --- a/internal/controller/execplan.go +++ b/internal/controller/execplan.go @@ -29,7 +29,7 @@ func (r *TemporalWorkerDeploymentReconciler) executeK8sOperations(ctx context.Co l.Info("creating deployment", "deployment", p.CreateDeployment) if err := r.Create(ctx, p.CreateDeployment); err != nil { l.Error(err, "unable to create deployment", "deployment", p.CreateDeployment) - r.Recorder.Eventf(workerDeploy, corev1.EventTypeWarning, "DeploymentCreateFailed", + r.Recorder.Eventf(workerDeploy, corev1.EventTypeWarning, ReasonDeploymentCreateFailed, "Failed to create Deployment %q: %v", p.CreateDeployment.Name, err) return err } @@ -40,7 +40,7 @@ func (r *TemporalWorkerDeploymentReconciler) executeK8sOperations(ctx context.Co l.Info("deleting deployment", "deployment", d) if err := r.Delete(ctx, d); err != nil { l.Error(err, "unable to delete deployment", "deployment", d) - r.Recorder.Eventf(workerDeploy, corev1.EventTypeWarning, "DeploymentDeleteFailed", + r.Recorder.Eventf(workerDeploy, corev1.EventTypeWarning, ReasonDeploymentDeleteFailed, "Failed to delete Deployment %q: %v", d.Name, err) return err } @@ -59,7 +59,7 @@ func (r *TemporalWorkerDeploymentReconciler) executeK8sOperations(ctx context.Co scale := &autoscalingv1.Scale{Spec: autoscalingv1.ScaleSpec{Replicas: int32(replicas)}} if err := r.Client.SubResource("scale").Update(ctx, dep, client.WithSubResourceBody(scale)); err != nil { l.Error(err, "unable to scale deployment", "deployment", d, "replicas", replicas) - r.Recorder.Eventf(workerDeploy, corev1.EventTypeWarning, "DeploymentScaleFailed", + r.Recorder.Eventf(workerDeploy, corev1.EventTypeWarning, ReasonDeploymentScaleFailed, "Failed to scale Deployment %q to %d replicas: %v", d.Name, replicas, err) return fmt.Errorf("unable to scale deployment: %w", err) } @@ -70,7 +70,7 @@ func (r *TemporalWorkerDeploymentReconciler) executeK8sOperations(ctx context.Co l.Info("updating deployment", "deployment", d.Name, "namespace", d.Namespace) if err := r.Update(ctx, d); err != nil { l.Error(err, "unable to update deployment", "deployment", d) - r.Recorder.Eventf(workerDeploy, corev1.EventTypeWarning, "DeploymentUpdateFailed", + r.Recorder.Eventf(workerDeploy, corev1.EventTypeWarning, ReasonDeploymentUpdateFailed, "Failed to update Deployment %q: %v", d.Name, err) return fmt.Errorf("unable to update deployment: %w", err) } @@ -142,7 +142,7 @@ func (r *TemporalWorkerDeploymentReconciler) startTestWorkflows(ctx context.Cont } if err != nil { l.Error(err, "unable to start test workflow execution", "workflowType", wf.workflowType, "buildID", wf.buildID, "taskQueue", wf.taskQueue) - r.Recorder.Eventf(workerDeploy, corev1.EventTypeWarning, "TestWorkflowStartFailed", + r.Recorder.Eventf(workerDeploy, corev1.EventTypeWarning, ReasonTestWorkflowStartFailed, "Failed to start gate workflow %q (buildID %s, taskQueue %s): %v", wf.workflowType, wf.buildID, wf.taskQueue, err) return fmt.Errorf("unable to start test workflow execution: %w", err) } @@ -164,12 +164,12 @@ func (r *TemporalWorkerDeploymentReconciler) updateVersionConfig(ctx context.Con Identity: getControllerIdentity(), }); err != nil { l.Error(err, "unable to set current deployment version", "buildID", vcfg.BuildID) - r.Recorder.Eventf(workerDeploy, corev1.EventTypeWarning, "VersionRegistrationFailed", + r.Recorder.Eventf(workerDeploy, corev1.EventTypeWarning, ReasonVersionRegistrationFailed, "Failed to set buildID %q as current version: %v", vcfg.BuildID, err) return fmt.Errorf("unable to set current deployment version: %w", err) } - r.setCondition(workerDeploy, temporaliov1alpha1.ConditionRolloutReady, metav1.ConditionTrue, - "RolloutComplete", fmt.Sprintf("Rollout complete for buildID %s", vcfg.BuildID)) + r.setCondition(workerDeploy, temporaliov1alpha1.ConditionRolloutComplete, metav1.ConditionTrue, + temporaliov1alpha1.ReasonRolloutComplete, fmt.Sprintf("Rollout complete for buildID %s", vcfg.BuildID)) } else { if vcfg.RampPercentage > 0 { l.Info("applying ramp", "buildID", vcfg.BuildID, "percentage", vcfg.RampPercentage) @@ -184,7 +184,7 @@ func (r *TemporalWorkerDeploymentReconciler) updateVersionConfig(ctx context.Con Identity: getControllerIdentity(), }); err != nil { l.Error(err, "unable to set ramping deployment version", "buildID", vcfg.BuildID, "percentage", vcfg.RampPercentage) - r.Recorder.Eventf(workerDeploy, corev1.EventTypeWarning, "VersionRegistrationFailed", + r.Recorder.Eventf(workerDeploy, corev1.EventTypeWarning, ReasonVersionRegistrationFailed, "Failed to set buildID %q as ramping version (%.1f%%): %v", vcfg.BuildID, vcfg.RampPercentage, err) return fmt.Errorf("unable to set ramping deployment version: %w", err) } @@ -203,7 +203,7 @@ func (r *TemporalWorkerDeploymentReconciler) updateVersionConfig(ctx context.Con }, }); err != nil { // would be cool to do this atomically with the update l.Error(err, "unable to update version metadata", "buildID", vcfg.BuildID) - r.Recorder.Eventf(workerDeploy, corev1.EventTypeWarning, "MetadataUpdateFailed", + r.Recorder.Eventf(workerDeploy, corev1.EventTypeWarning, ReasonMetadataUpdateFailed, "Failed to update version metadata for buildID %q: %v", vcfg.BuildID, err) return fmt.Errorf("unable to update metadata after setting current deployment: %w", err) } diff --git a/internal/controller/reconciler_events_test.go b/internal/controller/reconciler_events_test.go index d1288c4d..466c702d 100644 --- a/internal/controller/reconciler_events_test.go +++ b/internal/controller/reconciler_events_test.go @@ -291,10 +291,10 @@ func TestSetCondition_SetsNewCondition(t *testing.T) { twd := makeTWD("test-worker", "default", "my-connection") r, _ := newTestReconciler(nil) - r.setCondition(twd, temporaliov1alpha1.ConditionRolloutReady, metav1.ConditionTrue, "TestReason", "Test message") + r.setCondition(twd, temporaliov1alpha1.ConditionRolloutComplete, metav1.ConditionTrue, "TestReason", "Test message") require.Len(t, twd.Status.Conditions, 1) - assert.Equal(t, temporaliov1alpha1.ConditionRolloutReady, twd.Status.Conditions[0].Type) + assert.Equal(t, temporaliov1alpha1.ConditionRolloutComplete, twd.Status.Conditions[0].Type) assert.Equal(t, metav1.ConditionTrue, twd.Status.Conditions[0].Status) assert.Equal(t, "TestReason", twd.Status.Conditions[0].Reason) assert.Equal(t, "Test message", twd.Status.Conditions[0].Message) @@ -306,11 +306,11 @@ func TestSetCondition_UpdatesExistingCondition(t *testing.T) { r, _ := newTestReconciler(nil) // Set initial condition - r.setCondition(twd, temporaliov1alpha1.ConditionRolloutReady, metav1.ConditionTrue, "InitialReason", "Initial message") + r.setCondition(twd, temporaliov1alpha1.ConditionRolloutComplete, metav1.ConditionTrue, "InitialReason", "Initial message") require.Len(t, twd.Status.Conditions, 1) // Update the condition - r.setCondition(twd, temporaliov1alpha1.ConditionRolloutReady, metav1.ConditionFalse, "UpdatedReason", "Updated message") + r.setCondition(twd, temporaliov1alpha1.ConditionRolloutComplete, metav1.ConditionFalse, "UpdatedReason", "Updated message") // Should still be exactly 1 condition, not 2 require.Len(t, twd.Status.Conditions, 1) @@ -324,7 +324,7 @@ func TestSetCondition_MultipleDifferentConditions(t *testing.T) { r, _ := newTestReconciler(nil) r.setCondition(twd, temporaliov1alpha1.ConditionTemporalConnectionHealthy, metav1.ConditionTrue, "Healthy", "Connection is healthy") - r.setCondition(twd, temporaliov1alpha1.ConditionRolloutReady, metav1.ConditionTrue, "Ready", "All good") + r.setCondition(twd, temporaliov1alpha1.ConditionRolloutComplete, metav1.ConditionTrue, "Ready", "All good") require.Len(t, twd.Status.Conditions, 2) @@ -332,7 +332,7 @@ func TestSetCondition_MultipleDifferentConditions(t *testing.T) { require.NotNil(t, connCond) assert.Equal(t, metav1.ConditionTrue, connCond.Status) - readyCond := meta.FindStatusCondition(twd.Status.Conditions, temporaliov1alpha1.ConditionRolloutReady) + readyCond := meta.FindStatusCondition(twd.Status.Conditions, temporaliov1alpha1.ConditionRolloutComplete) require.NotNil(t, readyCond) assert.Equal(t, metav1.ConditionTrue, readyCond.Status) } diff --git a/internal/controller/util.go b/internal/controller/util.go index 22fe12c2..3246253f 100644 --- a/internal/controller/util.go +++ b/internal/controller/util.go @@ -11,6 +11,24 @@ import ( "github.com/temporalio/temporal-worker-controller/internal/defaults" ) +// Event reason constants for TemporalWorkerDeployment. +// +// These strings appear in Kubernetes Event objects (kubectl get events) and are +// internal to the controller's implementation. They are not part of the CRD +// status API and may change between releases. Do not write alerting or automation +// that depends on these strings. +const ( + ReasonPlanGenerationFailed = "PlanGenerationFailed" + ReasonPlanExecutionFailed = "PlanExecutionFailed" + ReasonDeploymentCreateFailed = "DeploymentCreateFailed" + ReasonDeploymentDeleteFailed = "DeploymentDeleteFailed" + ReasonDeploymentScaleFailed = "DeploymentScaleFailed" + ReasonDeploymentUpdateFailed = "DeploymentUpdateFailed" + ReasonTestWorkflowStartFailed = "TestWorkflowStartFailed" + ReasonVersionRegistrationFailed = "VersionRegistrationFailed" + ReasonMetadataUpdateFailed = "MetadataUpdateFailed" +) + const ( controllerIdentityMetadataKey = "temporal.io/controller" controllerVersionMetadataKey = "temporal.io/controller-version" diff --git a/internal/controller/worker_controller.go b/internal/controller/worker_controller.go index 2aee63e7..cfe0bfb0 100644 --- a/internal/controller/worker_controller.go +++ b/internal/controller/worker_controller.go @@ -150,8 +150,8 @@ func (r *TemporalWorkerDeploymentReconciler) Reconcile(ctx context.Context, req Namespace: workerDeploy.Namespace, }, &temporalConnection); err != nil { l.Error(err, "unable to fetch TemporalConnection") - r.recordWarningAndSetCondition(ctx, &workerDeploy, temporaliov1alpha1.ConditionTemporalConnectionHealthy, - "TemporalConnectionNotFound", + r.recordWarningAndSetConditionFalse(ctx, &workerDeploy, temporaliov1alpha1.ConditionTemporalConnectionHealthy, + temporaliov1alpha1.ReasonTemporalConnectionNotFound, fmt.Sprintf("Unable to fetch TemporalConnection %q: %v", workerDeploy.Spec.WorkerOptions.TemporalConnectionRef.Name, err), fmt.Sprintf("TemporalConnection %q not found: %v", workerDeploy.Spec.WorkerOptions.TemporalConnectionRef.Name, err)) return ctrl.Result{}, err @@ -161,8 +161,8 @@ func (r *TemporalWorkerDeploymentReconciler) Reconcile(ctx context.Context, req authMode, secretName, err := resolveAuthSecretName(&temporalConnection) if err != nil { l.Error(err, "unable to resolve auth secret name") - r.recordWarningAndSetCondition(ctx, &workerDeploy, temporaliov1alpha1.ConditionTemporalConnectionHealthy, - "AuthSecretInvalid", + r.recordWarningAndSetConditionFalse(ctx, &workerDeploy, temporaliov1alpha1.ConditionTemporalConnectionHealthy, + temporaliov1alpha1.ReasonAuthSecretInvalid, fmt.Sprintf("Unable to resolve auth secret from TemporalConnection %q: %v", temporalConnection.Name, err), fmt.Sprintf("Unable to resolve auth secret: %v", err)) return ctrl.Result{}, err @@ -170,7 +170,7 @@ func (r *TemporalWorkerDeploymentReconciler) Reconcile(ctx context.Context, req // Mark TemporalConnection as valid since we fetched it and resolved auth r.setCondition(&workerDeploy, temporaliov1alpha1.ConditionTemporalConnectionHealthy, metav1.ConditionTrue, - "TemporalConnectionHealthy", "TemporalConnection is healthy and auth secret is resolved") + temporaliov1alpha1.ReasonTemporalConnectionHealthy, "TemporalConnection is healthy and auth secret is resolved") // Get or update temporal client for connection temporalClient, ok := r.TemporalClientPool.GetSDKClient(clientpool.ClientPoolKey{ @@ -187,8 +187,8 @@ func (r *TemporalWorkerDeploymentReconciler) Reconcile(ctx context.Context, req }) if err != nil { l.Error(err, "unable to create TemporalClient") - r.recordWarningAndSetCondition(ctx, &workerDeploy, temporaliov1alpha1.ConditionTemporalConnectionHealthy, - "TemporalClientCreationFailed", + r.recordWarningAndSetConditionFalse(ctx, &workerDeploy, temporaliov1alpha1.ConditionTemporalConnectionHealthy, + temporaliov1alpha1.ReasonTemporalClientCreationFailed, fmt.Sprintf("Unable to create Temporal client for %s:%s: %v", temporalConnection.Spec.HostPort, workerDeploy.Spec.WorkerOptions.TemporalNamespace, err), fmt.Sprintf("Failed to connect to Temporal: %v", err)) return ctrl.Result{}, err @@ -223,8 +223,8 @@ func (r *TemporalWorkerDeploymentReconciler) Reconcile(ctx context.Context, req getControllerIdentity(), ) if err != nil { - r.recordWarningAndSetCondition(ctx, &workerDeploy, temporaliov1alpha1.ConditionTemporalConnectionHealthy, - "TemporalStateFetchFailed", + r.recordWarningAndSetConditionFalse(ctx, &workerDeploy, temporaliov1alpha1.ConditionTemporalConnectionHealthy, + temporaliov1alpha1.ReasonTemporalStateFetchFailed, fmt.Sprintf("Unable to get Temporal worker deployment state: %v", err), fmt.Sprintf("Failed to query Temporal worker deployment state: %v", err)) return ctrl.Result{}, fmt.Errorf("unable to get Temporal worker deployment state: %w", err) @@ -261,7 +261,7 @@ func (r *TemporalWorkerDeploymentReconciler) Reconcile(ctx context.Context, req // Generate a plan to get to desired spec from current status plan, err := r.generatePlan(ctx, l, &workerDeploy, temporalConnection.Spec, temporalState) if err != nil { - r.Recorder.Eventf(&workerDeploy, corev1.EventTypeWarning, "PlanGenerationFailed", + r.Recorder.Eventf(&workerDeploy, corev1.EventTypeWarning, ReasonPlanGenerationFailed, "Unable to generate reconciliation plan: %v", err) _ = r.Status().Update(ctx, &workerDeploy) return ctrl.Result{}, err @@ -269,7 +269,7 @@ func (r *TemporalWorkerDeploymentReconciler) Reconcile(ctx context.Context, req // Execute the plan, handling any errors if err := r.executePlan(ctx, l, &workerDeploy, temporalClient, plan); err != nil { - r.Recorder.Eventf(&workerDeploy, corev1.EventTypeWarning, "PlanExecutionFailed", + r.Recorder.Eventf(&workerDeploy, corev1.EventTypeWarning, ReasonPlanExecutionFailed, "Unable to execute reconciliation plan: %v", err) _ = r.Status().Update(ctx, &workerDeploy) return ctrl.Result{}, err @@ -300,8 +300,8 @@ func (r *TemporalWorkerDeploymentReconciler) setCondition( }) } -// recordWarningAndSetCondition emits a warning event, sets a condition to False, and persists the status update. -func (r *TemporalWorkerDeploymentReconciler) recordWarningAndSetCondition( +// recordWarningAndSetConditionFalse emits a warning event, sets a condition to False, and persists the status update. +func (r *TemporalWorkerDeploymentReconciler) recordWarningAndSetConditionFalse( ctx context.Context, workerDeploy *temporaliov1alpha1.TemporalWorkerDeployment, conditionType string, From 9c1d7f581e05d11f5853f237ff5f81ba2e76dab9 Mon Sep 17 00:00:00 2001 From: Carly de Frondeville Date: Tue, 3 Mar 2026 15:37:22 -0800 Subject: [PATCH 09/13] rename VersionRegistrationFailed event reason to VersionPromotionFailed "Registration" already has a meaning in Temporal versioning (a worker polling for the first time creates a version record). "Promotion" better describes setting a version as current or ramping, which moves it forward in the rollout lifecycle. Co-Authored-By: Claude Sonnet 4.6 --- internal/controller/execplan.go | 4 ++-- internal/controller/util.go | 2 +- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/internal/controller/execplan.go b/internal/controller/execplan.go index 5c510920..07561fd7 100644 --- a/internal/controller/execplan.go +++ b/internal/controller/execplan.go @@ -164,7 +164,7 @@ func (r *TemporalWorkerDeploymentReconciler) updateVersionConfig(ctx context.Con Identity: getControllerIdentity(), }); err != nil { l.Error(err, "unable to set current deployment version", "buildID", vcfg.BuildID) - r.Recorder.Eventf(workerDeploy, corev1.EventTypeWarning, ReasonVersionRegistrationFailed, + r.Recorder.Eventf(workerDeploy, corev1.EventTypeWarning, ReasonVersionPromotionFailed, "Failed to set buildID %q as current version: %v", vcfg.BuildID, err) return fmt.Errorf("unable to set current deployment version: %w", err) } @@ -184,7 +184,7 @@ func (r *TemporalWorkerDeploymentReconciler) updateVersionConfig(ctx context.Con Identity: getControllerIdentity(), }); err != nil { l.Error(err, "unable to set ramping deployment version", "buildID", vcfg.BuildID, "percentage", vcfg.RampPercentage) - r.Recorder.Eventf(workerDeploy, corev1.EventTypeWarning, ReasonVersionRegistrationFailed, + r.Recorder.Eventf(workerDeploy, corev1.EventTypeWarning, ReasonVersionPromotionFailed, "Failed to set buildID %q as ramping version (%.1f%%): %v", vcfg.BuildID, vcfg.RampPercentage, err) return fmt.Errorf("unable to set ramping deployment version: %w", err) } diff --git a/internal/controller/util.go b/internal/controller/util.go index 3246253f..ca656dd8 100644 --- a/internal/controller/util.go +++ b/internal/controller/util.go @@ -25,7 +25,7 @@ const ( ReasonDeploymentScaleFailed = "DeploymentScaleFailed" ReasonDeploymentUpdateFailed = "DeploymentUpdateFailed" ReasonTestWorkflowStartFailed = "TestWorkflowStartFailed" - ReasonVersionRegistrationFailed = "VersionRegistrationFailed" + ReasonVersionPromotionFailed = "VersionPromotionFailed" ReasonMetadataUpdateFailed = "MetadataUpdateFailed" ) From 11d382a9702e38bccf2ab9cee1d7795d0485684d Mon Sep 17 00:00:00 2001 From: Carly de Frondeville Date: Tue, 3 Mar 2026 16:27:31 -0800 Subject: [PATCH 10/13] add functional tests for events and conditions, enumerate remaining test scenarios ranked by trigger difficulty --- internal/controller/util.go | 18 +-- internal/controller/worker_controller.go | 3 + .../conditions_events_integration_test.go | 137 ++++++++++++++++++ internal/tests/internal/integration_test.go | 3 + internal/tests/internal/validation_helpers.go | 59 ++++++++ 5 files changed, 211 insertions(+), 9 deletions(-) create mode 100644 internal/tests/internal/conditions_events_integration_test.go diff --git a/internal/controller/util.go b/internal/controller/util.go index ca656dd8..8c12fece 100644 --- a/internal/controller/util.go +++ b/internal/controller/util.go @@ -18,15 +18,15 @@ import ( // status API and may change between releases. Do not write alerting or automation // that depends on these strings. const ( - ReasonPlanGenerationFailed = "PlanGenerationFailed" - ReasonPlanExecutionFailed = "PlanExecutionFailed" - ReasonDeploymentCreateFailed = "DeploymentCreateFailed" - ReasonDeploymentDeleteFailed = "DeploymentDeleteFailed" - ReasonDeploymentScaleFailed = "DeploymentScaleFailed" - ReasonDeploymentUpdateFailed = "DeploymentUpdateFailed" - ReasonTestWorkflowStartFailed = "TestWorkflowStartFailed" - ReasonVersionPromotionFailed = "VersionPromotionFailed" - ReasonMetadataUpdateFailed = "MetadataUpdateFailed" + ReasonPlanGenerationFailed = "PlanGenerationFailed" + ReasonPlanExecutionFailed = "PlanExecutionFailed" + ReasonDeploymentCreateFailed = "DeploymentCreateFailed" + ReasonDeploymentDeleteFailed = "DeploymentDeleteFailed" + ReasonDeploymentScaleFailed = "DeploymentScaleFailed" + ReasonDeploymentUpdateFailed = "DeploymentUpdateFailed" + ReasonTestWorkflowStartFailed = "TestWorkflowStartFailed" + ReasonVersionPromotionFailed = "VersionPromotionFailed" + ReasonMetadataUpdateFailed = "MetadataUpdateFailed" ) const ( diff --git a/internal/controller/worker_controller.go b/internal/controller/worker_controller.go index cfe0bfb0..66da8367 100644 --- a/internal/controller/worker_controller.go +++ b/internal/controller/worker_controller.go @@ -275,6 +275,9 @@ func (r *TemporalWorkerDeploymentReconciler) Reconcile(ctx context.Context, req return ctrl.Result{}, err } + // Persist any conditions set during plan execution (e.g. RolloutComplete). + _ = r.Status().Update(ctx, &workerDeploy) + return ctrl.Result{ Requeue: true, // TODO(jlegrone): Consider increasing this value if the only thing we need to check for is unreachable versions. diff --git a/internal/tests/internal/conditions_events_integration_test.go b/internal/tests/internal/conditions_events_integration_test.go new file mode 100644 index 00000000..4928a2ce --- /dev/null +++ b/internal/tests/internal/conditions_events_integration_test.go @@ -0,0 +1,137 @@ +package internal + +// This file tests that status.conditions and Kubernetes Events are correctly +// populated by the controller. Only scenarios that are naturally triggered by +// the existing test machinery are covered here. +// +// Covered: +// - ConditionTemporalConnectionHealthy = True (any successful reconcile) +// - ConditionRolloutComplete = True (version promoted to current) +// - Event reason RolloutComplete (emitted alongside the condition above) +// - ConditionTemporalConnectionHealthy = False (missing TemporalConnection) +// - Event reason TemporalConnectionNotFound (emitted alongside the condition above) +// +// Not yet covered, ranked by ease of triggering: +// 1. ReasonAuthSecretInvalid (Easy): create a TemporalConnection with conflicting/incomplete +// auth config, e.g. mTLS mode with no secret ref. +// 2. ReasonTemporalClientCreationFailed (Medium): set hostPort to an unreachable address; +// depends on whether the Temporal SDK validates the connection eagerly at UpsertClient. +// 3. ReasonTemporalStateFetchFailed (Medium): set temporalNamespace on the TWD to a namespace +// that doesn't exist on the test server; the deployment state query would fail. +// 4. ReasonTestWorkflowStartFailed (Medium): need Temporal to reject StartWorkflow; could do +// this by naming a Gate workflow that is not registered on the worker. +// 5. ReasonDeploymentCreateFailed / UpdateFailed / ScaleFailed / DeleteFailed (Hard): +// need the k8s API server in envtest to reject the operation (e.g. via a webhook or quota). +// 6. ReasonVersionPromotionFailed / ReasonMetadataUpdateFailed (Hard): need SetCurrentVersion +// or UpdateVersionMetadata to fail on an otherwise healthy Temporal server. +// 7. ReasonPlanGenerationFailed / ReasonPlanExecutionFailed (Hard): meta-errors that wrap the +// above; would fire automatically if any of the above are triggered. + +import ( + "context" + "testing" + "time" + + temporaliov1alpha1 "github.com/temporalio/temporal-worker-controller/api/v1alpha1" + "github.com/temporalio/temporal-worker-controller/internal/testhelpers" + "go.temporal.io/server/temporaltest" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "sigs.k8s.io/controller-runtime/pkg/client" + "sigs.k8s.io/controller-runtime/pkg/manager" +) + +func runConditionsAndEventsTests( + t *testing.T, + k8sClient client.Client, + mgr manager.Manager, + ts *temporaltest.TestServer, + testNamespace string, +) { + cases := []testCase{ + { + // Verifies that ConditionTemporalConnectionHealthy is set to True whenever + // the controller successfully connects to Temporal. + name: "conditions-connection-healthy", + builder: testhelpers.NewTestCase(). + WithInput( + testhelpers.NewTemporalWorkerDeploymentBuilder(). + WithManualStrategy(). + WithTargetTemplate("v1.0"), + ). + WithExpectedStatus( + testhelpers.NewStatusBuilder(). + WithTargetVersion("v1.0", temporaliov1alpha1.VersionStatusInactive, -1, true, false), + ). + WithValidatorFunction(func(t *testing.T, ctx context.Context, tc testhelpers.TestCase, env testhelpers.TestEnv) { + twd := tc.GetTWD() + waitForCondition(t, ctx, env.K8sClient, twd.Name, twd.Namespace, + temporaliov1alpha1.ConditionTemporalConnectionHealthy, + metav1.ConditionTrue, + temporaliov1alpha1.ReasonTemporalConnectionHealthy, + 10*time.Second, time.Second) + }), + }, + { + // Verifies that ConditionRolloutComplete is set to True after the controller + // promotes a version to current. Note: only a condition is set here — no + // separate k8s Event is emitted for RolloutComplete. + name: "conditions-rollout-complete", + builder: testhelpers.NewTestCase(). + WithInput( + testhelpers.NewTemporalWorkerDeploymentBuilder(). + WithAllAtOnceStrategy(). + WithTargetTemplate("v1.0"), + ). + WithExpectedStatus( + testhelpers.NewStatusBuilder(). + WithTargetVersion("v1.0", temporaliov1alpha1.VersionStatusCurrent, -1, true, false). + WithCurrentVersion("v1.0", true, false), + ). + WithValidatorFunction(func(t *testing.T, ctx context.Context, tc testhelpers.TestCase, env testhelpers.TestEnv) { + twd := tc.GetTWD() + waitForCondition(t, ctx, env.K8sClient, twd.Name, twd.Namespace, + temporaliov1alpha1.ConditionRolloutComplete, + metav1.ConditionTrue, + temporaliov1alpha1.ReasonRolloutComplete, + 10*time.Second, time.Second) + }), + }, + } + + for _, tc := range cases { + t.Run(tc.name, func(t *testing.T) { + ctx := context.Background() + testTemporalWorkerDeploymentCreation(ctx, t, k8sClient, mgr, ts, tc.builder.BuildWithValues(tc.name, testNamespace, ts.GetDefaultNamespace())) + }) + } + + // conditions-missing-connection runs standalone because it deliberately omits the + // TemporalConnection resource that testTemporalWorkerDeploymentCreation always creates. + // We don't need to create a test Temporal server for this test case because the connection + // is missing anyways. + t.Run("conditions-missing-connection", func(t *testing.T) { + ctx := context.Background() + + twd := testhelpers.NewTemporalWorkerDeploymentBuilder(). + WithManualStrategy(). + WithTargetTemplate("v1.0"). + WithName("conditions-missing-connection"). + WithNamespace(testNamespace). + WithTemporalConnection("does-not-exist"). + WithTemporalNamespace(ts.GetDefaultNamespace()). + Build() + + if err := k8sClient.Create(ctx, twd); err != nil { + t.Fatalf("failed to create TWD: %v", err) + } + + waitForCondition(t, ctx, k8sClient, twd.Name, twd.Namespace, + temporaliov1alpha1.ConditionTemporalConnectionHealthy, + metav1.ConditionFalse, + temporaliov1alpha1.ReasonTemporalConnectionNotFound, + 30*time.Second, time.Second) + waitForEvent(t, ctx, k8sClient, twd.Name, twd.Namespace, + temporaliov1alpha1.ReasonTemporalConnectionNotFound, + 30*time.Second, time.Second) + }) +} diff --git a/internal/tests/internal/integration_test.go b/internal/tests/internal/integration_test.go index 953208c0..27b6b176 100644 --- a/internal/tests/internal/integration_test.go +++ b/internal/tests/internal/integration_test.go @@ -839,6 +839,9 @@ func TestIntegration(t *testing.T) { }) } + // Conditions and events tests + runConditionsAndEventsTests(t, k8sClient, mgr, ts, testNamespace.Name) + } // testTemporalWorkerDeploymentCreation tests the creation of a TemporalWorkerDeployment and waits for the expected status diff --git a/internal/tests/internal/validation_helpers.go b/internal/tests/internal/validation_helpers.go index 395e5474..a10b9180 100644 --- a/internal/tests/internal/validation_helpers.go +++ b/internal/tests/internal/validation_helpers.go @@ -14,7 +14,10 @@ import ( "go.temporal.io/sdk/worker" "go.temporal.io/server/temporaltest" appsv1 "k8s.io/api/apps/v1" + corev1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/types" + "sigs.k8s.io/controller-runtime/pkg/client" ) var ( @@ -418,6 +421,62 @@ func validateDeprecatedVersion(ctx context.Context, env testhelpers.TestEnv, exp return nil } +// waitForCondition polls until the named condition on the TWD matches the expected +// status and reason, or fatals on timeout. +func waitForCondition( + t *testing.T, + ctx context.Context, + k8sClient client.Client, + twdName, namespace, condType string, + expectedStatus metav1.ConditionStatus, + expectedReason string, + timeout, interval time.Duration, +) { + t.Helper() + eventually(t, timeout, interval, func() error { + var twd temporaliov1alpha1.TemporalWorkerDeployment + if err := k8sClient.Get(ctx, types.NamespacedName{Name: twdName, Namespace: namespace}, &twd); err != nil { + return fmt.Errorf("failed to get TWD: %w", err) + } + for _, c := range twd.Status.Conditions { + if c.Type == condType { + if c.Status != expectedStatus { + return fmt.Errorf("condition %q: expected status %q, got %q", condType, expectedStatus, c.Status) + } + if c.Reason != expectedReason { + return fmt.Errorf("condition %q: expected reason %q, got %q", condType, expectedReason, c.Reason) + } + return nil + } + } + return fmt.Errorf("condition %q not found on TWD %s/%s", condType, namespace, twdName) + }) +} + +// waitForEvent polls until at least one Kubernetes Event for the named TWD exists +// with the given reason, or fatals on timeout. +func waitForEvent( + t *testing.T, + ctx context.Context, + k8sClient client.Client, + twdName, namespace, reason string, + timeout, interval time.Duration, +) { + t.Helper() + eventually(t, timeout, interval, func() error { + var eventList corev1.EventList + if err := k8sClient.List(ctx, &eventList, client.InNamespace(namespace)); err != nil { + return fmt.Errorf("failed to list events: %w", err) + } + for _, e := range eventList.Items { + if e.InvolvedObject.Name == twdName && e.Reason == reason { + return nil + } + } + return fmt.Errorf("no event with reason %q found for TWD %s/%s", reason, namespace, twdName) + }) +} + func eventually(t *testing.T, timeout, interval time.Duration, check func() error) { deadline := time.Now().Add(timeout) var lastErr error From 371a02fcec7df92507f0efc69d7b9eb0a7c082d8 Mon Sep 17 00:00:00 2001 From: Carly de Frondeville Date: Tue, 3 Mar 2026 17:09:06 -0800 Subject: [PATCH 11/13] only write to status once per reconcile loop --- internal/controller/worker_controller.go | 26 +++++++++++------------- 1 file changed, 12 insertions(+), 14 deletions(-) diff --git a/internal/controller/worker_controller.go b/internal/controller/worker_controller.go index 66da8367..4f4c18f5 100644 --- a/internal/controller/worker_controller.go +++ b/internal/controller/worker_controller.go @@ -238,18 +238,6 @@ func (r *TemporalWorkerDeploymentReconciler) Reconcile(ctx context.Context, req // Preserve conditions that were set during this reconciliation status.Conditions = workerDeploy.Status.Conditions workerDeploy.Status = *status - if err := r.Status().Update(ctx, &workerDeploy); err != nil { - // Ignore "object has been modified" errors, since we'll just re-fetch - // on the next reconciliation loop. - if apierrors.IsConflict(err) { - return ctrl.Result{ - Requeue: true, - RequeueAfter: time.Second, - }, nil - } - l.Error(err, "unable to update TemporalWorker status") - return ctrl.Result{}, err - } // TODO(jlegrone): Set defaults via webhook rather than manually // (defaults were already set above, but have to be set again after status update) @@ -275,8 +263,18 @@ func (r *TemporalWorkerDeploymentReconciler) Reconcile(ctx context.Context, req return ctrl.Result{}, err } - // Persist any conditions set during plan execution (e.g. RolloutComplete). - _ = r.Status().Update(ctx, &workerDeploy) + // Single status write per reconcile: persists the generated status and any + // conditions set during this loop (e.g. TemporalConnectionHealthy, RolloutComplete). + if err := r.Status().Update(ctx, &workerDeploy); err != nil { + if apierrors.IsConflict(err) { + return ctrl.Result{ + Requeue: true, + RequeueAfter: time.Second, + }, nil + } + l.Error(err, "unable to update TemporalWorker status") + return ctrl.Result{}, err + } return ctrl.Result{ Requeue: true, From 65c82366fea55ec9c8155159316a6a1bdc78648b Mon Sep 17 00:00:00 2001 From: Carly de Frondeville Date: Tue, 3 Mar 2026 17:10:13 -0800 Subject: [PATCH 12/13] print percentage correctly when it's an integer in [0,100] --- internal/controller/execplan.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/internal/controller/execplan.go b/internal/controller/execplan.go index 07561fd7..2b044f81 100644 --- a/internal/controller/execplan.go +++ b/internal/controller/execplan.go @@ -185,7 +185,7 @@ func (r *TemporalWorkerDeploymentReconciler) updateVersionConfig(ctx context.Con }); err != nil { l.Error(err, "unable to set ramping deployment version", "buildID", vcfg.BuildID, "percentage", vcfg.RampPercentage) r.Recorder.Eventf(workerDeploy, corev1.EventTypeWarning, ReasonVersionPromotionFailed, - "Failed to set buildID %q as ramping version (%.1f%%): %v", vcfg.BuildID, vcfg.RampPercentage, err) + "Failed to set buildID %q as ramping version (%d%%): %v", vcfg.BuildID, vcfg.RampPercentage, err) return fmt.Errorf("unable to set ramping deployment version: %w", err) } } From ddef655bf962a7149ee6c446dd889c99e18dfdb6 Mon Sep 17 00:00:00 2001 From: Carly de Frondeville Date: Tue, 3 Mar 2026 18:47:01 -0800 Subject: [PATCH 13/13] move all unhealthy connection tests into one function, explain why ReasonAuthSecretInvalid is untriggerable --- internal/controller/worker_controller.go | 2 + .../conditions_events_integration_test.go | 129 +++++++++++++----- 2 files changed, 94 insertions(+), 37 deletions(-) diff --git a/internal/controller/worker_controller.go b/internal/controller/worker_controller.go index 4f4c18f5..3fa4e7f6 100644 --- a/internal/controller/worker_controller.go +++ b/internal/controller/worker_controller.go @@ -160,6 +160,8 @@ func (r *TemporalWorkerDeploymentReconciler) Reconcile(ctx context.Context, req // Get the Auth Mode and Secret Name authMode, secretName, err := resolveAuthSecretName(&temporalConnection) if err != nil { + // Note: as things are now, this will never happen, because getAPIKeySecretName only errors when secretRef == nil, + // but resolveAuthSecretName only calls it inside the tc.Spec.APIKeySecretRef != nil branch l.Error(err, "unable to resolve auth secret name") r.recordWarningAndSetConditionFalse(ctx, &workerDeploy, temporaliov1alpha1.ConditionTemporalConnectionHealthy, temporaliov1alpha1.ReasonAuthSecretInvalid, diff --git a/internal/tests/internal/conditions_events_integration_test.go b/internal/tests/internal/conditions_events_integration_test.go index 4928a2ce..37d13307 100644 --- a/internal/tests/internal/conditions_events_integration_test.go +++ b/internal/tests/internal/conditions_events_integration_test.go @@ -10,22 +10,26 @@ package internal // - Event reason RolloutComplete (emitted alongside the condition above) // - ConditionTemporalConnectionHealthy = False (missing TemporalConnection) // - Event reason TemporalConnectionNotFound (emitted alongside the condition above) +// - ReasonTemporalClientCreationFailed: TemporalConnection pointing to an unreachable port +// - ReasonTemporalStateFetchFailed: TWD pointing to a Temporal namespace that doesn't exist // -// Not yet covered, ranked by ease of triggering: -// 1. ReasonAuthSecretInvalid (Easy): create a TemporalConnection with conflicting/incomplete -// auth config, e.g. mTLS mode with no secret ref. -// 2. ReasonTemporalClientCreationFailed (Medium): set hostPort to an unreachable address; -// depends on whether the Temporal SDK validates the connection eagerly at UpsertClient. -// 3. ReasonTemporalStateFetchFailed (Medium): set temporalNamespace on the TWD to a namespace -// that doesn't exist on the test server; the deployment state query would fail. -// 4. ReasonTestWorkflowStartFailed (Medium): need Temporal to reject StartWorkflow; could do -// this by naming a Gate workflow that is not registered on the worker. -// 5. ReasonDeploymentCreateFailed / UpdateFailed / ScaleFailed / DeleteFailed (Hard): +// Not yet covered, ranked by ease of triggering in functional tests: +// 1. ReasonTestWorkflowStartFailed (Hard): Temporal does NOT reject StartWorkflow for +// unregistered workflow types — it queues the workflow and waits for a worker. There is +// no clean way to make StartWorkflow itself fail without injecting network errors or +// context cancellation. +// 2. ReasonDeploymentCreateFailed / UpdateFailed / ScaleFailed / DeleteFailed (Hard): // need the k8s API server in envtest to reject the operation (e.g. via a webhook or quota). -// 6. ReasonVersionPromotionFailed / ReasonMetadataUpdateFailed (Hard): need SetCurrentVersion -// or UpdateVersionMetadata to fail on an otherwise healthy Temporal server. -// 7. ReasonPlanGenerationFailed / ReasonPlanExecutionFailed (Hard): meta-errors that wrap the +// 3. ReasonVersionPromotionFailed / ReasonMetadataUpdateFailed (Hard): need SetCurrentVersion +// or UpdateVersionMetadata to fail on an otherwise healthy Temporal server. Similar challenge +// as making StartWorkflow fail. +// 4. ReasonPlanGenerationFailed / ReasonPlanExecutionFailed (Hard): meta-errors that wrap the // above; would fire automatically if any of the above are triggered. +// 5. ReasonAuthSecretInvalid (Impossible): This is never triggered. The CRD schema enforces a DNS name pattern +// on mutualTLSSecretRef.name, so an empty name is rejected before it reaches the controller. +// Additionally, resolveAuthSecretName never returns an error with the current +// implementation (the nil guard in getTLSSecretName/getAPIKeySecretName is redundant +// because the caller already checks non-nil). The code path is effectively dead. import ( "context" @@ -105,33 +109,84 @@ func runConditionsAndEventsTests( }) } - // conditions-missing-connection runs standalone because it deliberately omits the - // TemporalConnection resource that testTemporalWorkerDeploymentCreation always creates. - // We don't need to create a test Temporal server for this test case because the connection - // is missing anyways. + // The following three tests each trigger a different ConditionTemporalConnectionHealthy=False + // reason. They all run standalone (not through testTemporalWorkerDeploymentCreation) because + // the controller fails before creating any k8s Deployments, so the normal status-validation + // and deployment-wait machinery in testTemporalWorkerDeploymentCreation would time out. + // + // conditions-temporal-state-fetch-failed also cannot use the testCase/testCaseBuilder + // structure because testTemporalWorkerDeploymentCreation hardcodes the Temporal namespace + // via BuildWithValues(name, ns, ts.GetDefaultNamespace()) with no way to inject a custom + // namespace. + // + // All three share the same skeleton, extracted into testUnhealthyConnectionCondition below. t.Run("conditions-missing-connection", func(t *testing.T) { - ctx := context.Background() + // No TemporalConnection is created; the controller cannot find the one referenced by + // the TWD and immediately sets the condition to False. + testUnhealthyConnectionCondition(t, k8sClient, + "conditions-missing-connection", testNamespace, ts.GetDefaultNamespace(), + nil, + temporaliov1alpha1.ReasonTemporalConnectionNotFound) + }) + t.Run("conditions-client-creation-failed", func(t *testing.T) { + // Port 1 is never bound; the SDK returns ECONNREFUSED immediately. + testUnhealthyConnectionCondition(t, k8sClient, + "conditions-client-creation-failed", testNamespace, ts.GetDefaultNamespace(), + &temporaliov1alpha1.TemporalConnectionSpec{HostPort: "localhost:1"}, + temporaliov1alpha1.ReasonTemporalClientCreationFailed) + }) + t.Run("conditions-temporal-state-fetch-failed", func(t *testing.T) { + // The real server is reachable so client creation succeeds, but "does-not-exist" is + // not a registered Temporal namespace so Describe() fails. + testUnhealthyConnectionCondition(t, k8sClient, + "conditions-temporal-state-fetch-failed", testNamespace, "does-not-exist", + &temporaliov1alpha1.TemporalConnectionSpec{HostPort: ts.GetFrontendHostPort()}, + temporaliov1alpha1.ReasonTemporalStateFetchFailed) + }) +} - twd := testhelpers.NewTemporalWorkerDeploymentBuilder(). - WithManualStrategy(). - WithTargetTemplate("v1.0"). - WithName("conditions-missing-connection"). - WithNamespace(testNamespace). - WithTemporalConnection("does-not-exist"). - WithTemporalNamespace(ts.GetDefaultNamespace()). - Build() +// testUnhealthyConnectionCondition is shared by the four error-path condition tests. +// It optionally creates a TemporalConnection (nil connectionSpec = missing connection), +// creates a TWD pointing to that connection with the given temporalNamespace, then asserts +// that ConditionTemporalConnectionHealthy becomes False with the expected reason and that a +// matching Warning event is emitted. +func testUnhealthyConnectionCondition( + t *testing.T, + k8sClient client.Client, + name, testNamespace, temporalNamespace string, + connectionSpec *temporaliov1alpha1.TemporalConnectionSpec, + expectedReason string, +) { + t.Helper() + ctx := context.Background() - if err := k8sClient.Create(ctx, twd); err != nil { - t.Fatalf("failed to create TWD: %v", err) + if connectionSpec != nil { + conn := &temporaliov1alpha1.TemporalConnection{ + ObjectMeta: metav1.ObjectMeta{Name: name, Namespace: testNamespace}, + Spec: *connectionSpec, } + if err := k8sClient.Create(ctx, conn); err != nil { + t.Fatalf("failed to create TemporalConnection: %v", err) + } + } - waitForCondition(t, ctx, k8sClient, twd.Name, twd.Namespace, - temporaliov1alpha1.ConditionTemporalConnectionHealthy, - metav1.ConditionFalse, - temporaliov1alpha1.ReasonTemporalConnectionNotFound, - 30*time.Second, time.Second) - waitForEvent(t, ctx, k8sClient, twd.Name, twd.Namespace, - temporaliov1alpha1.ReasonTemporalConnectionNotFound, - 30*time.Second, time.Second) - }) + twd := testhelpers.NewTemporalWorkerDeploymentBuilder(). + WithManualStrategy(). + WithTargetTemplate("v1.0"). + WithName(name). + WithNamespace(testNamespace). + WithTemporalConnection(name). + WithTemporalNamespace(temporalNamespace). + Build() + + if err := k8sClient.Create(ctx, twd); err != nil { + t.Fatalf("failed to create TWD: %v", err) + } + + waitForCondition(t, ctx, k8sClient, twd.Name, twd.Namespace, + temporaliov1alpha1.ConditionTemporalConnectionHealthy, + metav1.ConditionFalse, expectedReason, + 30*time.Second, time.Second) + waitForEvent(t, ctx, k8sClient, twd.Name, twd.Namespace, + expectedReason, 30*time.Second, time.Second) }