diff --git a/.gitignore b/.gitignore index d19960ea..a8c7eab4 100644 --- a/.gitignore +++ b/.gitignore @@ -8,6 +8,7 @@ skaffold.env certs .DS_Store +cover.out .claude -.config \ No newline at end of file +config \ No newline at end of file diff --git a/api/v1alpha1/worker_types.go b/api/v1alpha1/worker_types.go index a09bf3d2..c481d202 100644 --- a/api/v1alpha1/worker_types.go +++ b/api/v1alpha1/worker_types.go @@ -86,6 +86,49 @@ type TemporalWorkerDeploymentSpec struct { WorkerOptions WorkerOptions `json:"workerOptions"` } +// Condition type constants for TemporalWorkerDeployment. +const ( + // ConditionTemporalConnectionHealthy indicates whether the referenced TemporalConnection + // resource exists and is properly configured. + ConditionTemporalConnectionHealthy = "TemporalConnectionHealthy" + + // ConditionRolloutComplete indicates whether the target version has been successfully + // registered as the current version, completing the rollout. + ConditionRolloutComplete = "RolloutComplete" +) + +// Condition reason constants for TemporalWorkerDeployment. +// +// These strings appear in status.conditions[].reason and are part of the CRD's +// status API. Operators, monitoring rules, and scripts may depend on them. +// They should be treated as stable within an API version and renamed only with +// a corresponding version bump. +const ( + // ReasonTemporalConnectionNotFound is set on ConditionTemporalConnectionHealthy + // when the referenced TemporalConnection resource cannot be found. + ReasonTemporalConnectionNotFound = "TemporalConnectionNotFound" + + // ReasonAuthSecretInvalid is set on ConditionTemporalConnectionHealthy when + // the auth secret referenced by the TemporalConnection cannot be resolved. + ReasonAuthSecretInvalid = "AuthSecretInvalid" + + // ReasonTemporalClientCreationFailed is set on ConditionTemporalConnectionHealthy + // when a Temporal SDK client cannot be created for the connection. + ReasonTemporalClientCreationFailed = "TemporalClientCreationFailed" + + // ReasonTemporalStateFetchFailed is set on ConditionTemporalConnectionHealthy + // when the controller cannot query the current worker deployment state from Temporal. + ReasonTemporalStateFetchFailed = "TemporalStateFetchFailed" + + // ReasonTemporalConnectionHealthy is set on ConditionTemporalConnectionHealthy + // when the connection is reachable and the auth secret is resolved. + ReasonTemporalConnectionHealthy = "TemporalConnectionHealthy" + + // ReasonRolloutComplete is set on ConditionRolloutComplete when the target + // version has been successfully registered as the current version. + ReasonRolloutComplete = "RolloutComplete" +) + // VersionStatus indicates the status of a version. // +enum type VersionStatus string @@ -155,8 +198,9 @@ type TemporalWorkerDeploymentStatus struct { // +kubebuilder:validation:Minimum=0 VersionCount int32 `json:"versionCount,omitempty"` - // TODO(jlegrone): Add additional status fields following Kubernetes API conventions - // https://github.com/kubernetes/community/blob/master/contributors/devel/sig-architecture/api-conventions.md#spec-and-status + // Conditions represent the latest available observations of the TemporalWorkerDeployment's current state. + // +optional + Conditions []metav1.Condition `json:"conditions,omitempty"` } // WorkflowExecutionStatus describes the current state of a workflow. diff --git a/api/v1alpha1/zz_generated.deepcopy.go b/api/v1alpha1/zz_generated.deepcopy.go index 1decf431..0b21324e 100644 --- a/api/v1alpha1/zz_generated.deepcopy.go +++ b/api/v1alpha1/zz_generated.deepcopy.go @@ -6,6 +6,7 @@ package v1alpha1 import ( "k8s.io/api/core/v1" + apiextensionsv1 "k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime" ) @@ -90,9 +91,44 @@ func (in *DeprecatedWorkerDeploymentVersion) DeepCopy() *DeprecatedWorkerDeploym return out } +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *GateInputSource) DeepCopyInto(out *GateInputSource) { + *out = *in + if in.ConfigMapKeyRef != nil { + in, out := &in.ConfigMapKeyRef, &out.ConfigMapKeyRef + *out = new(v1.ConfigMapKeySelector) + (*in).DeepCopyInto(*out) + } + if in.SecretKeyRef != nil { + in, out := &in.SecretKeyRef, &out.SecretKeyRef + *out = new(v1.SecretKeySelector) + (*in).DeepCopyInto(*out) + } +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new GateInputSource. +func (in *GateInputSource) DeepCopy() *GateInputSource { + if in == nil { + return nil + } + out := new(GateInputSource) + in.DeepCopyInto(out) + return out +} + // DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. func (in *GateWorkflowConfig) DeepCopyInto(out *GateWorkflowConfig) { *out = *in + if in.Input != nil { + in, out := &in.Input, &out.Input + *out = new(apiextensionsv1.JSON) + (*in).DeepCopyInto(*out) + } + if in.InputFrom != nil { + in, out := &in.InputFrom, &out.InputFrom + *out = new(GateInputSource) + (*in).DeepCopyInto(*out) + } } // DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new GateWorkflowConfig. @@ -142,7 +178,7 @@ func (in *RolloutStrategy) DeepCopyInto(out *RolloutStrategy) { if in.Gate != nil { in, out := &in.Gate, &out.Gate *out = new(GateWorkflowConfig) - **out = **in + (*in).DeepCopyInto(*out) } if in.Steps != nil { in, out := &in.Steps, &out.Steps @@ -477,6 +513,13 @@ func (in *TemporalWorkerDeploymentStatus) DeepCopyInto(out *TemporalWorkerDeploy *out = make([]byte, len(*in)) copy(*out, *in) } + if in.Conditions != nil { + in, out := &in.Conditions, &out.Conditions + *out = make([]metav1.Condition, len(*in)) + for i := range *in { + (*in)[i].DeepCopyInto(&(*out)[i]) + } + } } // DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new TemporalWorkerDeploymentStatus. diff --git a/cmd/main.go b/cmd/main.go index 646b9f51..4ebe6e4d 100644 --- a/cmd/main.go +++ b/cmd/main.go @@ -91,6 +91,7 @@ func main() { }))), mgr.GetClient(), ), + Recorder: mgr.GetEventRecorderFor("temporal-worker-controller"), MaxDeploymentVersionsIneligibleForDeletion: controller.GetControllerMaxDeploymentVersionsIneligibleForDeletion(), }).SetupWithManager(mgr); err != nil { setupLog.Error(err, "unable to create controller", "controller", "TemporalWorkerDeployment") diff --git a/helm/temporal-worker-controller/crds/temporal.io_temporalworkerdeployments.yaml b/helm/temporal-worker-controller/crds/temporal.io_temporalworkerdeployments.yaml index 9c9ad07c..614e5187 100644 --- a/helm/temporal-worker-controller/crds/temporal.io_temporalworkerdeployments.yaml +++ b/helm/temporal-worker-controller/crds/temporal.io_temporalworkerdeployments.yaml @@ -3950,13 +3950,13 @@ spec: required: - name type: object + temporalNamespace: + minLength: 1 + type: string unsafeCustomBuildID: maxLength: 63 pattern: ^[a-zA-Z0-9]([a-zA-Z0-9._-]*[a-zA-Z0-9])?$ type: string - temporalNamespace: - minLength: 1 - type: string required: - connectionRef - temporalNamespace @@ -3969,6 +3969,42 @@ spec: type: object status: properties: + conditions: + items: + properties: + lastTransitionTime: + format: date-time + type: string + message: + maxLength: 32768 + type: string + observedGeneration: + format: int64 + minimum: 0 + type: integer + reason: + maxLength: 1024 + minLength: 1 + pattern: ^[A-Za-z]([A-Za-z0-9_,:]*[A-Za-z0-9_])?$ + type: string + status: + enum: + - "True" + - "False" + - Unknown + type: string + type: + maxLength: 316 + pattern: ^([a-z0-9]([-a-z0-9]*[a-z0-9])?(\.[a-z0-9]([-a-z0-9]*[a-z0-9])?)*/)?(([A-Za-z0-9][-A-Za-z0-9_.]*)?[A-Za-z0-9])$ + type: string + required: + - lastTransitionTime + - message + - reason + - status + - type + type: object + type: array currentVersion: properties: buildID: diff --git a/internal/controller/clientpool/clientpool.go b/internal/controller/clientpool/clientpool.go index 2cea12f9..dd6cc0f8 100644 --- a/internal/controller/clientpool/clientpool.go +++ b/internal/controller/clientpool/clientpool.go @@ -154,7 +154,8 @@ func (cp *ClientPool) fetchClientUsingMTLSSecret(secret corev1.Secret, opts NewC } if _, err := c.CheckHealth(context.Background(), &sdkclient.CheckHealthRequest{}); err != nil { - panic(err) + c.Close() + return nil, fmt.Errorf("temporal server health check failed: %w", err) } cp.mux.Lock() diff --git a/internal/controller/execplan.go b/internal/controller/execplan.go index df3c6769..2b044f81 100644 --- a/internal/controller/execplan.go +++ b/internal/controller/execplan.go @@ -11,22 +11,26 @@ import ( "time" "github.com/go-logr/logr" + temporaliov1alpha1 "github.com/temporalio/temporal-worker-controller/api/v1alpha1" "github.com/temporalio/temporal-worker-controller/internal/temporal" enumspb "go.temporal.io/api/enums/v1" sdkclient "go.temporal.io/sdk/client" "go.temporal.io/sdk/worker" appsv1 "k8s.io/api/apps/v1" autoscalingv1 "k8s.io/api/autoscaling/v1" + corev1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "sigs.k8s.io/controller-runtime/pkg/client" ) -func (r *TemporalWorkerDeploymentReconciler) executePlan(ctx context.Context, l logr.Logger, temporalClient sdkclient.Client, p *plan) error { +func (r *TemporalWorkerDeploymentReconciler) executeK8sOperations(ctx context.Context, l logr.Logger, workerDeploy *temporaliov1alpha1.TemporalWorkerDeployment, p *plan) error { // Create deployment if p.CreateDeployment != nil { l.Info("creating deployment", "deployment", p.CreateDeployment) if err := r.Create(ctx, p.CreateDeployment); err != nil { l.Error(err, "unable to create deployment", "deployment", p.CreateDeployment) + r.Recorder.Eventf(workerDeploy, corev1.EventTypeWarning, ReasonDeploymentCreateFailed, + "Failed to create Deployment %q: %v", p.CreateDeployment.Name, err) return err } } @@ -36,9 +40,12 @@ func (r *TemporalWorkerDeploymentReconciler) executePlan(ctx context.Context, l l.Info("deleting deployment", "deployment", d) if err := r.Delete(ctx, d); err != nil { l.Error(err, "unable to delete deployment", "deployment", d) + r.Recorder.Eventf(workerDeploy, corev1.EventTypeWarning, ReasonDeploymentDeleteFailed, + "Failed to delete Deployment %q: %v", d.Name, err) return err } } + // Scale deployments for d, replicas := range p.ScaleDeployments { l.Info("scaling deployment", "deployment", d, "replicas", replicas) @@ -52,6 +59,8 @@ func (r *TemporalWorkerDeploymentReconciler) executePlan(ctx context.Context, l scale := &autoscalingv1.Scale{Spec: autoscalingv1.ScaleSpec{Replicas: int32(replicas)}} if err := r.Client.SubResource("scale").Update(ctx, dep, client.WithSubResourceBody(scale)); err != nil { l.Error(err, "unable to scale deployment", "deployment", d, "replicas", replicas) + r.Recorder.Eventf(workerDeploy, corev1.EventTypeWarning, ReasonDeploymentScaleFailed, + "Failed to scale Deployment %q to %d replicas: %v", d.Name, replicas, err) return fmt.Errorf("unable to scale deployment: %w", err) } } @@ -61,13 +70,16 @@ func (r *TemporalWorkerDeploymentReconciler) executePlan(ctx context.Context, l l.Info("updating deployment", "deployment", d.Name, "namespace", d.Namespace) if err := r.Update(ctx, d); err != nil { l.Error(err, "unable to update deployment", "deployment", d) + r.Recorder.Eventf(workerDeploy, corev1.EventTypeWarning, ReasonDeploymentUpdateFailed, + "Failed to update Deployment %q: %v", d.Name, err) return fmt.Errorf("unable to update deployment: %w", err) } } - // Get deployment handler - deploymentHandler := temporalClient.WorkerDeploymentClient().GetHandle(p.WorkerDeploymentName) + return nil +} +func (r *TemporalWorkerDeploymentReconciler) startTestWorkflows(ctx context.Context, l logr.Logger, workerDeploy *temporaliov1alpha1.TemporalWorkerDeployment, temporalClient sdkclient.Client, p *plan) error { for _, wf := range p.startTestWorkflows { // Log workflow start details if len(wf.input) > 0 { @@ -129,51 +141,89 @@ func (r *TemporalWorkerDeploymentReconciler) executePlan(ctx context.Context, l _, err = temporalClient.ExecuteWorkflow(ctx, opts, wf.workflowType) } if err != nil { + l.Error(err, "unable to start test workflow execution", "workflowType", wf.workflowType, "buildID", wf.buildID, "taskQueue", wf.taskQueue) + r.Recorder.Eventf(workerDeploy, corev1.EventTypeWarning, ReasonTestWorkflowStartFailed, + "Failed to start gate workflow %q (buildID %s, taskQueue %s): %v", wf.workflowType, wf.buildID, wf.taskQueue, err) return fmt.Errorf("unable to start test workflow execution: %w", err) } } + return nil +} - // Register current version or ramps - if vcfg := p.UpdateVersionConfig; vcfg != nil { - if vcfg.SetCurrent { - l.Info("registering new current version", "buildID", vcfg.BuildID) - if _, err := deploymentHandler.SetCurrentVersion(ctx, sdkclient.WorkerDeploymentSetCurrentVersionOptions{ - BuildID: vcfg.BuildID, - ConflictToken: vcfg.ConflictToken, - Identity: getControllerIdentity(), - }); err != nil { - return fmt.Errorf("unable to set current deployment version: %w", err) - } +func (r *TemporalWorkerDeploymentReconciler) updateVersionConfig(ctx context.Context, l logr.Logger, workerDeploy *temporaliov1alpha1.TemporalWorkerDeployment, deploymentHandler sdkclient.WorkerDeploymentHandle, p *plan) error { + vcfg := p.UpdateVersionConfig + if vcfg == nil { + return nil + } + + if vcfg.SetCurrent { + l.Info("registering new current version", "buildID", vcfg.BuildID) + if _, err := deploymentHandler.SetCurrentVersion(ctx, sdkclient.WorkerDeploymentSetCurrentVersionOptions{ + BuildID: vcfg.BuildID, + ConflictToken: vcfg.ConflictToken, + Identity: getControllerIdentity(), + }); err != nil { + l.Error(err, "unable to set current deployment version", "buildID", vcfg.BuildID) + r.Recorder.Eventf(workerDeploy, corev1.EventTypeWarning, ReasonVersionPromotionFailed, + "Failed to set buildID %q as current version: %v", vcfg.BuildID, err) + return fmt.Errorf("unable to set current deployment version: %w", err) + } + r.setCondition(workerDeploy, temporaliov1alpha1.ConditionRolloutComplete, metav1.ConditionTrue, + temporaliov1alpha1.ReasonRolloutComplete, fmt.Sprintf("Rollout complete for buildID %s", vcfg.BuildID)) + } else { + if vcfg.RampPercentage > 0 { + l.Info("applying ramp", "buildID", vcfg.BuildID, "percentage", vcfg.RampPercentage) } else { - if vcfg.RampPercentage > 0 { - l.Info("applying ramp", "buildID", vcfg.BuildID, "percentage", vcfg.RampPercentage) - } else { - l.Info("deleting ramp", "buildID", vcfg.BuildID) - } + l.Info("deleting ramp", "buildID", vcfg.BuildID) + } - if _, err := deploymentHandler.SetRampingVersion(ctx, sdkclient.WorkerDeploymentSetRampingVersionOptions{ - BuildID: vcfg.BuildID, - Percentage: float32(vcfg.RampPercentage), - ConflictToken: vcfg.ConflictToken, - Identity: getControllerIdentity(), - }); err != nil { - return fmt.Errorf("unable to set ramping deployment version: %w", err) - } + if _, err := deploymentHandler.SetRampingVersion(ctx, sdkclient.WorkerDeploymentSetRampingVersionOptions{ + BuildID: vcfg.BuildID, + Percentage: float32(vcfg.RampPercentage), + ConflictToken: vcfg.ConflictToken, + Identity: getControllerIdentity(), + }); err != nil { + l.Error(err, "unable to set ramping deployment version", "buildID", vcfg.BuildID, "percentage", vcfg.RampPercentage) + r.Recorder.Eventf(workerDeploy, corev1.EventTypeWarning, ReasonVersionPromotionFailed, + "Failed to set buildID %q as ramping version (%d%%): %v", vcfg.BuildID, vcfg.RampPercentage, err) + return fmt.Errorf("unable to set ramping deployment version: %w", err) } - if _, err := deploymentHandler.UpdateVersionMetadata(ctx, sdkclient.WorkerDeploymentUpdateVersionMetadataOptions{ - Version: worker.WorkerDeploymentVersion{ - DeploymentName: p.WorkerDeploymentName, - BuildId: vcfg.BuildID, - }, - MetadataUpdate: sdkclient.WorkerDeploymentMetadataUpdate{ - UpsertEntries: map[string]interface{}{ - controllerIdentityMetadataKey: getControllerIdentity(), - controllerVersionMetadataKey: getControllerVersion(), - }, + } + + if _, err := deploymentHandler.UpdateVersionMetadata(ctx, sdkclient.WorkerDeploymentUpdateVersionMetadataOptions{ + Version: worker.WorkerDeploymentVersion{ + DeploymentName: p.WorkerDeploymentName, + BuildId: vcfg.BuildID, + }, + MetadataUpdate: sdkclient.WorkerDeploymentMetadataUpdate{ + UpsertEntries: map[string]interface{}{ + controllerIdentityMetadataKey: getControllerIdentity(), + controllerVersionMetadataKey: getControllerVersion(), }, - }); err != nil { // would be cool to do this atomically with the update - return fmt.Errorf("unable to update metadata after setting current deployment: %w", err) - } + }, + }); err != nil { // would be cool to do this atomically with the update + l.Error(err, "unable to update version metadata", "buildID", vcfg.BuildID) + r.Recorder.Eventf(workerDeploy, corev1.EventTypeWarning, ReasonMetadataUpdateFailed, + "Failed to update version metadata for buildID %q: %v", vcfg.BuildID, err) + return fmt.Errorf("unable to update metadata after setting current deployment: %w", err) + } + + return nil +} + +func (r *TemporalWorkerDeploymentReconciler) executePlan(ctx context.Context, l logr.Logger, workerDeploy *temporaliov1alpha1.TemporalWorkerDeployment, temporalClient sdkclient.Client, p *plan) error { + if err := r.executeK8sOperations(ctx, l, workerDeploy, p); err != nil { + return err + } + + deploymentHandler := temporalClient.WorkerDeploymentClient().GetHandle(p.WorkerDeploymentName) + + if err := r.startTestWorkflows(ctx, l, workerDeploy, temporalClient, p); err != nil { + return err + } + + if err := r.updateVersionConfig(ctx, l, workerDeploy, deploymentHandler, p); err != nil { + return err } for _, buildId := range p.RemoveIgnoreLastModifierBuilds { @@ -186,6 +236,7 @@ func (r *TemporalWorkerDeploymentReconciler) executePlan(ctx context.Context, l RemoveEntries: []string{temporal.IgnoreLastModifierKey}, }, }); err != nil { + l.Error(err, "unable to remove ignore-last-modifier metadata", "buildID", buildId) return fmt.Errorf("unable to update metadata to remove %s deployment: %w", temporal.IgnoreLastModifierKey, err) } } diff --git a/internal/controller/reconciler_events_test.go b/internal/controller/reconciler_events_test.go new file mode 100644 index 00000000..466c702d --- /dev/null +++ b/internal/controller/reconciler_events_test.go @@ -0,0 +1,413 @@ +// Unless explicitly stated otherwise all files in this repository are licensed under the MIT License. +// +// This product includes software developed at Datadog (https://www.datadoghq.com/). Copyright 2024 Datadog, Inc. + +package controller + +import ( + "context" + "strings" + "testing" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + temporaliov1alpha1 "github.com/temporalio/temporal-worker-controller/api/v1alpha1" + "github.com/temporalio/temporal-worker-controller/internal/controller/clientpool" + appsv1 "k8s.io/api/apps/v1" + corev1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/api/meta" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/types" + "k8s.io/client-go/tools/record" + ctrl "sigs.k8s.io/controller-runtime" + "sigs.k8s.io/controller-runtime/pkg/client" + "sigs.k8s.io/controller-runtime/pkg/client/fake" + "sigs.k8s.io/controller-runtime/pkg/client/interceptor" +) + +// newTestScheme creates a scheme with all required types registered. +func newTestScheme() *runtime.Scheme { + s := runtime.NewScheme() + _ = temporaliov1alpha1.AddToScheme(s) + _ = appsv1.AddToScheme(s) + _ = corev1.AddToScheme(s) + return s +} + +// newTestReconciler creates a TemporalWorkerDeploymentReconciler with a fake client and recorder. +func newTestReconciler(objs []client.Object) (*TemporalWorkerDeploymentReconciler, *record.FakeRecorder) { + return newTestReconcilerWithInterceptors(objs, interceptor.Funcs{}) +} + +// newTestReconcilerWithInterceptors creates a reconciler with a fake client that uses custom interceptors. +func newTestReconcilerWithInterceptors(objs []client.Object, funcs interceptor.Funcs) (*TemporalWorkerDeploymentReconciler, *record.FakeRecorder) { + scheme := newTestScheme() + fakeClient := fake.NewClientBuilder(). + WithScheme(scheme). + WithObjects(objs...). + WithStatusSubresource(&temporaliov1alpha1.TemporalWorkerDeployment{}). + WithIndex(&appsv1.Deployment{}, deployOwnerKey, func(rawObj client.Object) []string { + deploy := rawObj.(*appsv1.Deployment) + owner := metav1.GetControllerOf(deploy) + if owner == nil { + return nil + } + if owner.APIVersion != temporaliov1alpha1.GroupVersion.String() || owner.Kind != "TemporalWorkerDeployment" { + return nil + } + return []string{owner.Name} + }). + WithInterceptorFuncs(funcs). + Build() + + recorder := record.NewFakeRecorder(10) + + r := &TemporalWorkerDeploymentReconciler{ + Client: fakeClient, + Scheme: scheme, + TemporalClientPool: clientpool.New(nil, fakeClient), + Recorder: recorder, + DisableRecoverPanic: true, + MaxDeploymentVersionsIneligibleForDeletion: 75, + } + + return r, recorder +} + +// makeTWD creates a minimal TemporalWorkerDeployment for testing. +func makeTWD(name, namespace, connectionName string) *temporaliov1alpha1.TemporalWorkerDeployment { + replicas := int32(1) + progressDeadline := int32(600) + return &temporaliov1alpha1.TemporalWorkerDeployment{ + TypeMeta: metav1.TypeMeta{ + APIVersion: temporaliov1alpha1.GroupVersion.String(), + Kind: "TemporalWorkerDeployment", + }, + ObjectMeta: metav1.ObjectMeta{ + Name: name, + Namespace: namespace, + Generation: 1, + }, + Spec: temporaliov1alpha1.TemporalWorkerDeploymentSpec{ + Replicas: &replicas, + ProgressDeadlineSeconds: &progressDeadline, + Template: corev1.PodTemplateSpec{ + Spec: corev1.PodSpec{ + Containers: []corev1.Container{ + { + Name: "worker", + Image: "temporal/worker:v1", + }, + }, + }, + }, + WorkerOptions: temporaliov1alpha1.WorkerOptions{ + TemporalConnectionRef: temporaliov1alpha1.TemporalConnectionReference{ + Name: connectionName, + }, + TemporalNamespace: "default", + }, + RolloutStrategy: temporaliov1alpha1.RolloutStrategy{ + Strategy: temporaliov1alpha1.UpdateAllAtOnce, + }, + SunsetStrategy: temporaliov1alpha1.SunsetStrategy{ + ScaledownDelay: &metav1.Duration{}, + DeleteDelay: &metav1.Duration{}, + }, + }, + } +} + +// makeTemporalConnection creates a minimal TemporalConnection for testing. +func makeTemporalConnection(name, namespace, hostPort string) *temporaliov1alpha1.TemporalConnection { + return &temporaliov1alpha1.TemporalConnection{ + TypeMeta: metav1.TypeMeta{ + APIVersion: temporaliov1alpha1.GroupVersion.String(), + Kind: "TemporalConnection", + }, + ObjectMeta: metav1.ObjectMeta{ + Name: name, + Namespace: namespace, + }, + Spec: temporaliov1alpha1.TemporalConnectionSpec{ + HostPort: hostPort, + }, + } +} + +// drainEvents reads all events from the recorder channel and returns them. +func drainEvents(recorder *record.FakeRecorder) []string { + var events []string + for { + select { + case event := <-recorder.Events: + events = append(events, event) + default: + return events + } + } +} + +// assertEventEmitted checks that at least one event with the given reason was emitted. +func assertEventEmitted(t *testing.T, events []string, reason string) { + t.Helper() + for _, event := range events { + if strings.Contains(event, reason) { + return + } + } + t.Errorf("expected event with reason %q, got events: %v", reason, events) +} + +// assertNoEventEmitted checks that no event with the given reason was emitted. +func assertNoEventEmitted(t *testing.T, events []string, reason string) { + t.Helper() + for _, event := range events { + if strings.Contains(event, reason) { + t.Errorf("unexpected event with reason %q found: %s", reason, event) + return + } + } +} + +func TestReconcile_TemporalConnectionNotFound_EmitsEvent(t *testing.T) { + twd := makeTWD("test-worker", "default", "nonexistent-connection") + r, recorder := newTestReconciler([]client.Object{twd}) + + _, err := r.Reconcile(context.Background(), ctrl.Request{ + NamespacedName: types.NamespacedName{Name: "test-worker", Namespace: "default"}, + }) + + require.Error(t, err) + + events := drainEvents(recorder) + assertEventEmitted(t, events, "TemporalConnectionNotFound") + + // Check that the event contains the connection name + for _, event := range events { + if strings.Contains(event, "TemporalConnectionNotFound") { + assert.Contains(t, event, "nonexistent-connection") + assert.Contains(t, event, "Warning") + } + } +} + +func TestReconcile_TemporalConnectionNotFound_SetsCondition(t *testing.T) { + twd := makeTWD("test-worker", "default", "nonexistent-connection") + r, _ := newTestReconciler([]client.Object{twd}) + + _, err := r.Reconcile(context.Background(), ctrl.Request{ + NamespacedName: types.NamespacedName{Name: "test-worker", Namespace: "default"}, + }) + + require.Error(t, err) + + // Fetch the updated TWD to check conditions + var updated temporaliov1alpha1.TemporalWorkerDeployment + require.NoError(t, r.Get(context.Background(), types.NamespacedName{Name: "test-worker", Namespace: "default"}, &updated)) + + cond := meta.FindStatusCondition(updated.Status.Conditions, temporaliov1alpha1.ConditionTemporalConnectionHealthy) + require.NotNil(t, cond, "TemporalConnectionHealthy condition should be set") + assert.Equal(t, metav1.ConditionFalse, cond.Status) + assert.Equal(t, "TemporalConnectionNotFound", cond.Reason) + assert.Contains(t, cond.Message, "nonexistent-connection") +} + +func TestReconcile_AuthSecretInvalid_EmitsEvent(t *testing.T) { + // Create a TemporalConnection with mTLS that references a secret, + // but the MutualTLSSecretRef has an empty name (will cause resolveAuthSecretName to fail) + tc := makeTemporalConnection("my-connection", "default", "localhost:7233") + tc.Spec.MutualTLSSecretRef = &temporaliov1alpha1.SecretReference{Name: ""} // empty name triggers error in getTLSSecretName + + twd := makeTWD("test-worker", "default", "my-connection") + r, recorder := newTestReconciler([]client.Object{twd, tc}) + + _, err := r.Reconcile(context.Background(), ctrl.Request{ + NamespacedName: types.NamespacedName{Name: "test-worker", Namespace: "default"}, + }) + + // The mTLS secret ref has a name ("") which is technically non-empty in Go, + // so resolveAuthSecretName will return AuthModeTLS with secretName "". + // The UpsertClient will try to fetch the secret and fail. + // Either way, an error should occur and an event should be emitted. + require.Error(t, err) + + events := drainEvents(recorder) + // Should have either AuthSecretInvalid or TemporalClientCreationFailed + hasEvent := false + for _, event := range events { + if strings.Contains(event, "AuthSecretInvalid") || strings.Contains(event, "TemporalClientCreationFailed") { + hasEvent = true + break + } + } + assert.True(t, hasEvent, "expected AuthSecretInvalid or TemporalClientCreationFailed event, got: %v", events) +} + +func TestReconcile_TemporalClientCreationFailed_EmitsEventAndCondition(t *testing.T) { + // TemporalConnection exists but references a TLS secret that doesn't exist in k8s + tc := makeTemporalConnection("my-connection", "default", "localhost:7233") + tc.Spec.MutualTLSSecretRef = &temporaliov1alpha1.SecretReference{Name: "missing-tls-secret"} + + twd := makeTWD("test-worker", "default", "my-connection") + r, recorder := newTestReconciler([]client.Object{twd, tc}) + + _, err := r.Reconcile(context.Background(), ctrl.Request{ + NamespacedName: types.NamespacedName{Name: "test-worker", Namespace: "default"}, + }) + + require.Error(t, err) + + events := drainEvents(recorder) + assertEventEmitted(t, events, "TemporalClientCreationFailed") + + // Check condition + var updated temporaliov1alpha1.TemporalWorkerDeployment + require.NoError(t, r.Get(context.Background(), types.NamespacedName{Name: "test-worker", Namespace: "default"}, &updated)) + + // TemporalConnectionHealthy should be False (client creation failed, overwriting the earlier True) + connCond := meta.FindStatusCondition(updated.Status.Conditions, temporaliov1alpha1.ConditionTemporalConnectionHealthy) + require.NotNil(t, connCond) + assert.Equal(t, metav1.ConditionFalse, connCond.Status) + assert.Equal(t, "TemporalClientCreationFailed", connCond.Reason) +} + +func TestReconcile_TWDNotFound_NoEvent(t *testing.T) { + // No TWD exists — reconciling should return nil error (not found is ignored) + r, recorder := newTestReconciler(nil) + + _, err := r.Reconcile(context.Background(), ctrl.Request{ + NamespacedName: types.NamespacedName{Name: "does-not-exist", Namespace: "default"}, + }) + + require.NoError(t, err) + + events := drainEvents(recorder) + assert.Empty(t, events, "no events should be emitted when TWD is not found") +} + +func TestSetCondition_SetsNewCondition(t *testing.T) { + twd := makeTWD("test-worker", "default", "my-connection") + r, _ := newTestReconciler(nil) + + r.setCondition(twd, temporaliov1alpha1.ConditionRolloutComplete, metav1.ConditionTrue, "TestReason", "Test message") + + require.Len(t, twd.Status.Conditions, 1) + assert.Equal(t, temporaliov1alpha1.ConditionRolloutComplete, twd.Status.Conditions[0].Type) + assert.Equal(t, metav1.ConditionTrue, twd.Status.Conditions[0].Status) + assert.Equal(t, "TestReason", twd.Status.Conditions[0].Reason) + assert.Equal(t, "Test message", twd.Status.Conditions[0].Message) + assert.Equal(t, int64(1), twd.Status.Conditions[0].ObservedGeneration) +} + +func TestSetCondition_UpdatesExistingCondition(t *testing.T) { + twd := makeTWD("test-worker", "default", "my-connection") + r, _ := newTestReconciler(nil) + + // Set initial condition + r.setCondition(twd, temporaliov1alpha1.ConditionRolloutComplete, metav1.ConditionTrue, "InitialReason", "Initial message") + require.Len(t, twd.Status.Conditions, 1) + + // Update the condition + r.setCondition(twd, temporaliov1alpha1.ConditionRolloutComplete, metav1.ConditionFalse, "UpdatedReason", "Updated message") + + // Should still be exactly 1 condition, not 2 + require.Len(t, twd.Status.Conditions, 1) + assert.Equal(t, metav1.ConditionFalse, twd.Status.Conditions[0].Status) + assert.Equal(t, "UpdatedReason", twd.Status.Conditions[0].Reason) + assert.Equal(t, "Updated message", twd.Status.Conditions[0].Message) +} + +func TestSetCondition_MultipleDifferentConditions(t *testing.T) { + twd := makeTWD("test-worker", "default", "my-connection") + r, _ := newTestReconciler(nil) + + r.setCondition(twd, temporaliov1alpha1.ConditionTemporalConnectionHealthy, metav1.ConditionTrue, "Healthy", "Connection is healthy") + r.setCondition(twd, temporaliov1alpha1.ConditionRolloutComplete, metav1.ConditionTrue, "Ready", "All good") + + require.Len(t, twd.Status.Conditions, 2) + + connCond := meta.FindStatusCondition(twd.Status.Conditions, temporaliov1alpha1.ConditionTemporalConnectionHealthy) + require.NotNil(t, connCond) + assert.Equal(t, metav1.ConditionTrue, connCond.Status) + + readyCond := meta.FindStatusCondition(twd.Status.Conditions, temporaliov1alpha1.ConditionRolloutComplete) + require.NotNil(t, readyCond) + assert.Equal(t, metav1.ConditionTrue, readyCond.Status) +} + +func TestReconcile_ValidationFailure_NoEventEmitted(t *testing.T) { + // Use Progressive strategy with no steps to trigger a validation failure + twd := makeTWD("test-worker", "default", "my-connection") + twd.Spec.RolloutStrategy = temporaliov1alpha1.RolloutStrategy{ + Strategy: temporaliov1alpha1.UpdateProgressive, + Steps: nil, // Progressive requires steps + } + + // Also need a connection for this test — but validation happens before connection fetch + tc := makeTemporalConnection("my-connection", "default", "localhost:7233") + r, recorder := newTestReconciler([]client.Object{twd, tc}) + + result, err := r.Reconcile(context.Background(), ctrl.Request{ + NamespacedName: types.NamespacedName{Name: "test-worker", Namespace: "default"}, + }) + + // Validation failures don't return errors — they requeue after 5 minutes + require.NoError(t, err) + assert.NotZero(t, result.RequeueAfter, "should requeue on validation failure") + + // No events should be emitted for validation failures (user just needs to fix their spec) + events := drainEvents(recorder) + assertNoEventEmitted(t, events, "TemporalConnectionNotFound") + assertNoEventEmitted(t, events, "TemporalClientCreationFailed") +} + +func TestReconcile_ConnectionValid_ThenClientFails_ConditionsReflectBoth(t *testing.T) { + // Connection exists and is fetchable, but uses API key auth with a missing secret + tc := makeTemporalConnection("my-connection", "default", "localhost:7233") + tc.Spec.APIKeySecretRef = &corev1.SecretKeySelector{ + LocalObjectReference: corev1.LocalObjectReference{Name: "missing-api-key-secret"}, + Key: "api-key", + } + + twd := makeTWD("test-worker", "default", "my-connection") + r, recorder := newTestReconciler([]client.Object{twd, tc}) + + _, err := r.Reconcile(context.Background(), ctrl.Request{ + NamespacedName: types.NamespacedName{Name: "test-worker", Namespace: "default"}, + }) + + require.Error(t, err) + + events := drainEvents(recorder) + assertEventEmitted(t, events, "TemporalClientCreationFailed") + + // Verify condition: TemporalConnectionHealthy should be False (client creation failed) + var updated temporaliov1alpha1.TemporalWorkerDeployment + require.NoError(t, r.Get(context.Background(), types.NamespacedName{Name: "test-worker", Namespace: "default"}, &updated)) + + connCond := meta.FindStatusCondition(updated.Status.Conditions, temporaliov1alpha1.ConditionTemporalConnectionHealthy) + require.NotNil(t, connCond, "TemporalConnectionHealthy condition should be set") + assert.Equal(t, metav1.ConditionFalse, connCond.Status, "client creation should have failed") +} + +func TestReconcile_EventMessageContainsUsefulContext(t *testing.T) { + twd := makeTWD("my-deployment", "prod", "prod-connection") + r, recorder := newTestReconciler([]client.Object{twd}) + + _, _ = r.Reconcile(context.Background(), ctrl.Request{ + NamespacedName: types.NamespacedName{Name: "my-deployment", Namespace: "prod"}, + }) + + events := drainEvents(recorder) + require.NotEmpty(t, events) + + // Verify the event message contains the connection name for debugging + for _, event := range events { + if strings.Contains(event, "TemporalConnectionNotFound") { + assert.Contains(t, event, "prod-connection", + "Event message should include the missing connection name for debugging") + } + } +} diff --git a/internal/controller/util.go b/internal/controller/util.go index 22fe12c2..8c12fece 100644 --- a/internal/controller/util.go +++ b/internal/controller/util.go @@ -11,6 +11,24 @@ import ( "github.com/temporalio/temporal-worker-controller/internal/defaults" ) +// Event reason constants for TemporalWorkerDeployment. +// +// These strings appear in Kubernetes Event objects (kubectl get events) and are +// internal to the controller's implementation. They are not part of the CRD +// status API and may change between releases. Do not write alerting or automation +// that depends on these strings. +const ( + ReasonPlanGenerationFailed = "PlanGenerationFailed" + ReasonPlanExecutionFailed = "PlanExecutionFailed" + ReasonDeploymentCreateFailed = "DeploymentCreateFailed" + ReasonDeploymentDeleteFailed = "DeploymentDeleteFailed" + ReasonDeploymentScaleFailed = "DeploymentScaleFailed" + ReasonDeploymentUpdateFailed = "DeploymentUpdateFailed" + ReasonTestWorkflowStartFailed = "TestWorkflowStartFailed" + ReasonVersionPromotionFailed = "VersionPromotionFailed" + ReasonMetadataUpdateFailed = "MetadataUpdateFailed" +) + const ( controllerIdentityMetadataKey = "temporal.io/controller" controllerVersionMetadataKey = "temporal.io/controller-version" diff --git a/internal/controller/worker_controller.go b/internal/controller/worker_controller.go index d6471890..3fa4e7f6 100644 --- a/internal/controller/worker_controller.go +++ b/internal/controller/worker_controller.go @@ -17,9 +17,11 @@ import ( appsv1 "k8s.io/api/apps/v1" corev1 "k8s.io/api/core/v1" apierrors "k8s.io/apimachinery/pkg/api/errors" + "k8s.io/apimachinery/pkg/api/meta" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/types" + "k8s.io/client-go/tools/record" ctrl "sigs.k8s.io/controller-runtime" "sigs.k8s.io/controller-runtime/pkg/client" "sigs.k8s.io/controller-runtime/pkg/controller" @@ -71,6 +73,7 @@ type TemporalWorkerDeploymentReconciler struct { client.Client Scheme *runtime.Scheme TemporalClientPool *clientpool.ClientPool + Recorder record.EventRecorder // Disables panic recovery if true DisableRecoverPanic bool @@ -96,6 +99,7 @@ type TemporalWorkerDeploymentReconciler struct { //+kubebuilder:rbac:groups=core,resources=secrets,verbs=get;list;watch //+kubebuilder:rbac:groups=apps,resources=deployments,verbs=get;list;watch;create;update;patch;delete //+kubebuilder:rbac:groups=apps,resources=deployments/scale,verbs=update +// +kubebuilder:rbac:groups=events.k8s.io,resources=events,verbs=create;patch // Reconcile is part of the main kubernetes reconciliation loop which aims to // move the current state of the cluster closer to the desired state. @@ -146,16 +150,30 @@ func (r *TemporalWorkerDeploymentReconciler) Reconcile(ctx context.Context, req Namespace: workerDeploy.Namespace, }, &temporalConnection); err != nil { l.Error(err, "unable to fetch TemporalConnection") + r.recordWarningAndSetConditionFalse(ctx, &workerDeploy, temporaliov1alpha1.ConditionTemporalConnectionHealthy, + temporaliov1alpha1.ReasonTemporalConnectionNotFound, + fmt.Sprintf("Unable to fetch TemporalConnection %q: %v", workerDeploy.Spec.WorkerOptions.TemporalConnectionRef.Name, err), + fmt.Sprintf("TemporalConnection %q not found: %v", workerDeploy.Spec.WorkerOptions.TemporalConnectionRef.Name, err)) return ctrl.Result{}, err } // Get the Auth Mode and Secret Name authMode, secretName, err := resolveAuthSecretName(&temporalConnection) if err != nil { + // Note: as things are now, this will never happen, because getAPIKeySecretName only errors when secretRef == nil, + // but resolveAuthSecretName only calls it inside the tc.Spec.APIKeySecretRef != nil branch l.Error(err, "unable to resolve auth secret name") + r.recordWarningAndSetConditionFalse(ctx, &workerDeploy, temporaliov1alpha1.ConditionTemporalConnectionHealthy, + temporaliov1alpha1.ReasonAuthSecretInvalid, + fmt.Sprintf("Unable to resolve auth secret from TemporalConnection %q: %v", temporalConnection.Name, err), + fmt.Sprintf("Unable to resolve auth secret: %v", err)) return ctrl.Result{}, err } + // Mark TemporalConnection as valid since we fetched it and resolved auth + r.setCondition(&workerDeploy, temporaliov1alpha1.ConditionTemporalConnectionHealthy, metav1.ConditionTrue, + temporaliov1alpha1.ReasonTemporalConnectionHealthy, "TemporalConnection is healthy and auth secret is resolved") + // Get or update temporal client for connection temporalClient, ok := r.TemporalClientPool.GetSDKClient(clientpool.ClientPoolKey{ HostPort: temporalConnection.Spec.HostPort, @@ -171,6 +189,10 @@ func (r *TemporalWorkerDeploymentReconciler) Reconcile(ctx context.Context, req }) if err != nil { l.Error(err, "unable to create TemporalClient") + r.recordWarningAndSetConditionFalse(ctx, &workerDeploy, temporaliov1alpha1.ConditionTemporalConnectionHealthy, + temporaliov1alpha1.ReasonTemporalClientCreationFailed, + fmt.Sprintf("Unable to create Temporal client for %s:%s: %v", temporalConnection.Spec.HostPort, workerDeploy.Spec.WorkerOptions.TemporalNamespace, err), + fmt.Sprintf("Failed to connect to Temporal: %v", err)) return ctrl.Result{}, err } temporalClient = c @@ -203,6 +225,10 @@ func (r *TemporalWorkerDeploymentReconciler) Reconcile(ctx context.Context, req getControllerIdentity(), ) if err != nil { + r.recordWarningAndSetConditionFalse(ctx, &workerDeploy, temporaliov1alpha1.ConditionTemporalConnectionHealthy, + temporaliov1alpha1.ReasonTemporalStateFetchFailed, + fmt.Sprintf("Unable to get Temporal worker deployment state: %v", err), + fmt.Sprintf("Failed to query Temporal worker deployment state: %v", err)) return ctrl.Result{}, fmt.Errorf("unable to get Temporal worker deployment state: %w", err) } @@ -211,19 +237,9 @@ func (r *TemporalWorkerDeploymentReconciler) Reconcile(ctx context.Context, req if err != nil { return ctrl.Result{}, err } + // Preserve conditions that were set during this reconciliation + status.Conditions = workerDeploy.Status.Conditions workerDeploy.Status = *status - if err := r.Status().Update(ctx, &workerDeploy); err != nil { - // Ignore "object has been modified" errors, since we'll just re-fetch - // on the next reconciliation loop. - if apierrors.IsConflict(err) { - return ctrl.Result{ - Requeue: true, - RequeueAfter: time.Second, - }, nil - } - l.Error(err, "unable to update TemporalWorker status") - return ctrl.Result{}, err - } // TODO(jlegrone): Set defaults via webhook rather than manually // (defaults were already set above, but have to be set again after status update) @@ -235,11 +251,30 @@ func (r *TemporalWorkerDeploymentReconciler) Reconcile(ctx context.Context, req // Generate a plan to get to desired spec from current status plan, err := r.generatePlan(ctx, l, &workerDeploy, temporalConnection.Spec, temporalState) if err != nil { + r.Recorder.Eventf(&workerDeploy, corev1.EventTypeWarning, ReasonPlanGenerationFailed, + "Unable to generate reconciliation plan: %v", err) + _ = r.Status().Update(ctx, &workerDeploy) return ctrl.Result{}, err } // Execute the plan, handling any errors - if err := r.executePlan(ctx, l, temporalClient, plan); err != nil { + if err := r.executePlan(ctx, l, &workerDeploy, temporalClient, plan); err != nil { + r.Recorder.Eventf(&workerDeploy, corev1.EventTypeWarning, ReasonPlanExecutionFailed, + "Unable to execute reconciliation plan: %v", err) + _ = r.Status().Update(ctx, &workerDeploy) + return ctrl.Result{}, err + } + + // Single status write per reconcile: persists the generated status and any + // conditions set during this loop (e.g. TemporalConnectionHealthy, RolloutComplete). + if err := r.Status().Update(ctx, &workerDeploy); err != nil { + if apierrors.IsConflict(err) { + return ctrl.Result{ + Requeue: true, + RequeueAfter: time.Second, + }, nil + } + l.Error(err, "unable to update TemporalWorker status") return ctrl.Result{}, err } @@ -252,6 +287,36 @@ func (r *TemporalWorkerDeploymentReconciler) Reconcile(ctx context.Context, req }, nil } +// setCondition sets a condition on the TemporalWorkerDeployment status. +func (r *TemporalWorkerDeploymentReconciler) setCondition( + workerDeploy *temporaliov1alpha1.TemporalWorkerDeployment, + conditionType string, + status metav1.ConditionStatus, + reason, message string, +) { + meta.SetStatusCondition(&workerDeploy.Status.Conditions, metav1.Condition{ + Type: conditionType, + Status: status, + ObservedGeneration: workerDeploy.Generation, + Reason: reason, + Message: message, + }) +} + +// recordWarningAndSetConditionFalse emits a warning event, sets a condition to False, and persists the status update. +func (r *TemporalWorkerDeploymentReconciler) recordWarningAndSetConditionFalse( + ctx context.Context, + workerDeploy *temporaliov1alpha1.TemporalWorkerDeployment, + conditionType string, + reason string, + eventMessage string, + conditionMessage string, +) { + r.Recorder.Eventf(workerDeploy, corev1.EventTypeWarning, reason, eventMessage) + r.setCondition(workerDeploy, conditionType, metav1.ConditionFalse, reason, conditionMessage) + _ = r.Status().Update(ctx, workerDeploy) +} + // SetupWithManager sets up the controller with the Manager. func (r *TemporalWorkerDeploymentReconciler) SetupWithManager(mgr ctrl.Manager) error { if err := mgr.GetFieldIndexer().IndexField(context.Background(), &appsv1.Deployment{}, deployOwnerKey, func(rawObj client.Object) []string { diff --git a/internal/tests/internal/conditions_events_integration_test.go b/internal/tests/internal/conditions_events_integration_test.go new file mode 100644 index 00000000..37d13307 --- /dev/null +++ b/internal/tests/internal/conditions_events_integration_test.go @@ -0,0 +1,192 @@ +package internal + +// This file tests that status.conditions and Kubernetes Events are correctly +// populated by the controller. Only scenarios that are naturally triggered by +// the existing test machinery are covered here. +// +// Covered: +// - ConditionTemporalConnectionHealthy = True (any successful reconcile) +// - ConditionRolloutComplete = True (version promoted to current) +// - Event reason RolloutComplete (emitted alongside the condition above) +// - ConditionTemporalConnectionHealthy = False (missing TemporalConnection) +// - Event reason TemporalConnectionNotFound (emitted alongside the condition above) +// - ReasonTemporalClientCreationFailed: TemporalConnection pointing to an unreachable port +// - ReasonTemporalStateFetchFailed: TWD pointing to a Temporal namespace that doesn't exist +// +// Not yet covered, ranked by ease of triggering in functional tests: +// 1. ReasonTestWorkflowStartFailed (Hard): Temporal does NOT reject StartWorkflow for +// unregistered workflow types — it queues the workflow and waits for a worker. There is +// no clean way to make StartWorkflow itself fail without injecting network errors or +// context cancellation. +// 2. ReasonDeploymentCreateFailed / UpdateFailed / ScaleFailed / DeleteFailed (Hard): +// need the k8s API server in envtest to reject the operation (e.g. via a webhook or quota). +// 3. ReasonVersionPromotionFailed / ReasonMetadataUpdateFailed (Hard): need SetCurrentVersion +// or UpdateVersionMetadata to fail on an otherwise healthy Temporal server. Similar challenge +// as making StartWorkflow fail. +// 4. ReasonPlanGenerationFailed / ReasonPlanExecutionFailed (Hard): meta-errors that wrap the +// above; would fire automatically if any of the above are triggered. +// 5. ReasonAuthSecretInvalid (Impossible): This is never triggered. The CRD schema enforces a DNS name pattern +// on mutualTLSSecretRef.name, so an empty name is rejected before it reaches the controller. +// Additionally, resolveAuthSecretName never returns an error with the current +// implementation (the nil guard in getTLSSecretName/getAPIKeySecretName is redundant +// because the caller already checks non-nil). The code path is effectively dead. + +import ( + "context" + "testing" + "time" + + temporaliov1alpha1 "github.com/temporalio/temporal-worker-controller/api/v1alpha1" + "github.com/temporalio/temporal-worker-controller/internal/testhelpers" + "go.temporal.io/server/temporaltest" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "sigs.k8s.io/controller-runtime/pkg/client" + "sigs.k8s.io/controller-runtime/pkg/manager" +) + +func runConditionsAndEventsTests( + t *testing.T, + k8sClient client.Client, + mgr manager.Manager, + ts *temporaltest.TestServer, + testNamespace string, +) { + cases := []testCase{ + { + // Verifies that ConditionTemporalConnectionHealthy is set to True whenever + // the controller successfully connects to Temporal. + name: "conditions-connection-healthy", + builder: testhelpers.NewTestCase(). + WithInput( + testhelpers.NewTemporalWorkerDeploymentBuilder(). + WithManualStrategy(). + WithTargetTemplate("v1.0"), + ). + WithExpectedStatus( + testhelpers.NewStatusBuilder(). + WithTargetVersion("v1.0", temporaliov1alpha1.VersionStatusInactive, -1, true, false), + ). + WithValidatorFunction(func(t *testing.T, ctx context.Context, tc testhelpers.TestCase, env testhelpers.TestEnv) { + twd := tc.GetTWD() + waitForCondition(t, ctx, env.K8sClient, twd.Name, twd.Namespace, + temporaliov1alpha1.ConditionTemporalConnectionHealthy, + metav1.ConditionTrue, + temporaliov1alpha1.ReasonTemporalConnectionHealthy, + 10*time.Second, time.Second) + }), + }, + { + // Verifies that ConditionRolloutComplete is set to True after the controller + // promotes a version to current. Note: only a condition is set here — no + // separate k8s Event is emitted for RolloutComplete. + name: "conditions-rollout-complete", + builder: testhelpers.NewTestCase(). + WithInput( + testhelpers.NewTemporalWorkerDeploymentBuilder(). + WithAllAtOnceStrategy(). + WithTargetTemplate("v1.0"), + ). + WithExpectedStatus( + testhelpers.NewStatusBuilder(). + WithTargetVersion("v1.0", temporaliov1alpha1.VersionStatusCurrent, -1, true, false). + WithCurrentVersion("v1.0", true, false), + ). + WithValidatorFunction(func(t *testing.T, ctx context.Context, tc testhelpers.TestCase, env testhelpers.TestEnv) { + twd := tc.GetTWD() + waitForCondition(t, ctx, env.K8sClient, twd.Name, twd.Namespace, + temporaliov1alpha1.ConditionRolloutComplete, + metav1.ConditionTrue, + temporaliov1alpha1.ReasonRolloutComplete, + 10*time.Second, time.Second) + }), + }, + } + + for _, tc := range cases { + t.Run(tc.name, func(t *testing.T) { + ctx := context.Background() + testTemporalWorkerDeploymentCreation(ctx, t, k8sClient, mgr, ts, tc.builder.BuildWithValues(tc.name, testNamespace, ts.GetDefaultNamespace())) + }) + } + + // The following three tests each trigger a different ConditionTemporalConnectionHealthy=False + // reason. They all run standalone (not through testTemporalWorkerDeploymentCreation) because + // the controller fails before creating any k8s Deployments, so the normal status-validation + // and deployment-wait machinery in testTemporalWorkerDeploymentCreation would time out. + // + // conditions-temporal-state-fetch-failed also cannot use the testCase/testCaseBuilder + // structure because testTemporalWorkerDeploymentCreation hardcodes the Temporal namespace + // via BuildWithValues(name, ns, ts.GetDefaultNamespace()) with no way to inject a custom + // namespace. + // + // All three share the same skeleton, extracted into testUnhealthyConnectionCondition below. + t.Run("conditions-missing-connection", func(t *testing.T) { + // No TemporalConnection is created; the controller cannot find the one referenced by + // the TWD and immediately sets the condition to False. + testUnhealthyConnectionCondition(t, k8sClient, + "conditions-missing-connection", testNamespace, ts.GetDefaultNamespace(), + nil, + temporaliov1alpha1.ReasonTemporalConnectionNotFound) + }) + t.Run("conditions-client-creation-failed", func(t *testing.T) { + // Port 1 is never bound; the SDK returns ECONNREFUSED immediately. + testUnhealthyConnectionCondition(t, k8sClient, + "conditions-client-creation-failed", testNamespace, ts.GetDefaultNamespace(), + &temporaliov1alpha1.TemporalConnectionSpec{HostPort: "localhost:1"}, + temporaliov1alpha1.ReasonTemporalClientCreationFailed) + }) + t.Run("conditions-temporal-state-fetch-failed", func(t *testing.T) { + // The real server is reachable so client creation succeeds, but "does-not-exist" is + // not a registered Temporal namespace so Describe() fails. + testUnhealthyConnectionCondition(t, k8sClient, + "conditions-temporal-state-fetch-failed", testNamespace, "does-not-exist", + &temporaliov1alpha1.TemporalConnectionSpec{HostPort: ts.GetFrontendHostPort()}, + temporaliov1alpha1.ReasonTemporalStateFetchFailed) + }) +} + +// testUnhealthyConnectionCondition is shared by the four error-path condition tests. +// It optionally creates a TemporalConnection (nil connectionSpec = missing connection), +// creates a TWD pointing to that connection with the given temporalNamespace, then asserts +// that ConditionTemporalConnectionHealthy becomes False with the expected reason and that a +// matching Warning event is emitted. +func testUnhealthyConnectionCondition( + t *testing.T, + k8sClient client.Client, + name, testNamespace, temporalNamespace string, + connectionSpec *temporaliov1alpha1.TemporalConnectionSpec, + expectedReason string, +) { + t.Helper() + ctx := context.Background() + + if connectionSpec != nil { + conn := &temporaliov1alpha1.TemporalConnection{ + ObjectMeta: metav1.ObjectMeta{Name: name, Namespace: testNamespace}, + Spec: *connectionSpec, + } + if err := k8sClient.Create(ctx, conn); err != nil { + t.Fatalf("failed to create TemporalConnection: %v", err) + } + } + + twd := testhelpers.NewTemporalWorkerDeploymentBuilder(). + WithManualStrategy(). + WithTargetTemplate("v1.0"). + WithName(name). + WithNamespace(testNamespace). + WithTemporalConnection(name). + WithTemporalNamespace(temporalNamespace). + Build() + + if err := k8sClient.Create(ctx, twd); err != nil { + t.Fatalf("failed to create TWD: %v", err) + } + + waitForCondition(t, ctx, k8sClient, twd.Name, twd.Namespace, + temporaliov1alpha1.ConditionTemporalConnectionHealthy, + metav1.ConditionFalse, expectedReason, + 30*time.Second, time.Second) + waitForEvent(t, ctx, k8sClient, twd.Name, twd.Namespace, + expectedReason, 30*time.Second, time.Second) +} diff --git a/internal/tests/internal/env_helpers.go b/internal/tests/internal/env_helpers.go index c8aebd02..ca261129 100644 --- a/internal/tests/internal/env_helpers.go +++ b/internal/tests/internal/env_helpers.go @@ -154,6 +154,7 @@ func setupTestEnvironment(t *testing.T) (*rest.Config, client.Client, manager.Ma Client: mgr.GetClient(), Scheme: mgr.GetScheme(), TemporalClientPool: clientPool, + Recorder: mgr.GetEventRecorderFor("temporal-worker-controller"), DisableRecoverPanic: true, MaxDeploymentVersionsIneligibleForDeletion: controller.GetControllerMaxDeploymentVersionsIneligibleForDeletion(), } diff --git a/internal/tests/internal/integration_test.go b/internal/tests/internal/integration_test.go index 953208c0..27b6b176 100644 --- a/internal/tests/internal/integration_test.go +++ b/internal/tests/internal/integration_test.go @@ -839,6 +839,9 @@ func TestIntegration(t *testing.T) { }) } + // Conditions and events tests + runConditionsAndEventsTests(t, k8sClient, mgr, ts, testNamespace.Name) + } // testTemporalWorkerDeploymentCreation tests the creation of a TemporalWorkerDeployment and waits for the expected status diff --git a/internal/tests/internal/validation_helpers.go b/internal/tests/internal/validation_helpers.go index 395e5474..a10b9180 100644 --- a/internal/tests/internal/validation_helpers.go +++ b/internal/tests/internal/validation_helpers.go @@ -14,7 +14,10 @@ import ( "go.temporal.io/sdk/worker" "go.temporal.io/server/temporaltest" appsv1 "k8s.io/api/apps/v1" + corev1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/types" + "sigs.k8s.io/controller-runtime/pkg/client" ) var ( @@ -418,6 +421,62 @@ func validateDeprecatedVersion(ctx context.Context, env testhelpers.TestEnv, exp return nil } +// waitForCondition polls until the named condition on the TWD matches the expected +// status and reason, or fatals on timeout. +func waitForCondition( + t *testing.T, + ctx context.Context, + k8sClient client.Client, + twdName, namespace, condType string, + expectedStatus metav1.ConditionStatus, + expectedReason string, + timeout, interval time.Duration, +) { + t.Helper() + eventually(t, timeout, interval, func() error { + var twd temporaliov1alpha1.TemporalWorkerDeployment + if err := k8sClient.Get(ctx, types.NamespacedName{Name: twdName, Namespace: namespace}, &twd); err != nil { + return fmt.Errorf("failed to get TWD: %w", err) + } + for _, c := range twd.Status.Conditions { + if c.Type == condType { + if c.Status != expectedStatus { + return fmt.Errorf("condition %q: expected status %q, got %q", condType, expectedStatus, c.Status) + } + if c.Reason != expectedReason { + return fmt.Errorf("condition %q: expected reason %q, got %q", condType, expectedReason, c.Reason) + } + return nil + } + } + return fmt.Errorf("condition %q not found on TWD %s/%s", condType, namespace, twdName) + }) +} + +// waitForEvent polls until at least one Kubernetes Event for the named TWD exists +// with the given reason, or fatals on timeout. +func waitForEvent( + t *testing.T, + ctx context.Context, + k8sClient client.Client, + twdName, namespace, reason string, + timeout, interval time.Duration, +) { + t.Helper() + eventually(t, timeout, interval, func() error { + var eventList corev1.EventList + if err := k8sClient.List(ctx, &eventList, client.InNamespace(namespace)); err != nil { + return fmt.Errorf("failed to list events: %w", err) + } + for _, e := range eventList.Items { + if e.InvolvedObject.Name == twdName && e.Reason == reason { + return nil + } + } + return fmt.Errorf("no event with reason %q found for TWD %s/%s", reason, namespace, twdName) + }) +} + func eventually(t *testing.T, timeout, interval time.Duration, check func() error) { deadline := time.Now().Add(timeout) var lastErr error