diff --git a/controllers/dashboards/dashboard_controller.go b/controllers/dashboards/dashboard_controller.go index 3d3ea14..fb06e52 100644 --- a/controllers/dashboards/dashboard_controller.go +++ b/controllers/dashboards/dashboard_controller.go @@ -23,6 +23,8 @@ import ( persesv1 "github.com/perses/perses/pkg/model/api/v1" persesv1Common "github.com/perses/perses/pkg/model/api/v1/common" + v1 "github.com/perses/perses/pkg/client/api/v1" + logger "github.com/sirupsen/logrus" "k8s.io/apimachinery/pkg/api/meta" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" @@ -86,14 +88,29 @@ func (r *PersesDashboardReconciler) reconcileDashboardInAllInstances(ctx context } func (r *PersesDashboardReconciler) syncPersesDashboard(ctx context.Context, perses persesv1alpha2.Perses, dashboard *persesv1alpha2.PersesDashboard) (*ctrl.Result, common.ConditionStatusReason, error) { - persesClient, err := r.ClientFactory.CreateClient(ctx, r.APIReader, perses) - + clients, err := r.ClientFactory.CreateClientsForAllPods(ctx, r.APIReader, perses) if err != nil { - dlog.WithError(err).Error("Failed to create perses rest client") + dlog.WithError(err).Error("Failed to create perses rest clients") return subreconciler.RequeueWithErrorAndReason(err, common.ReasonConnectionFailed) } - _, err = persesClient.Project().Get(dashboard.Namespace) + var errs []error + for _, persesClient := range clients { + if err := r.syncDashboardToClient(ctx, persesClient, dashboard); err != nil { + errs = append(errs, err) + } + } + + if len(errs) > 0 { + return subreconciler.RequeueWithErrorAndReason(errors.Join(errs...), common.ReasonBackendError) + } + + res, err := subreconciler.ContinueReconciling() + return res, "", err +} + +func (r *PersesDashboardReconciler) syncDashboardToClient(ctx context.Context, persesClient v1.ClientInterface, dashboard *persesv1alpha2.PersesDashboard) error { + _, err := persesClient.Project().Get(dashboard.Namespace) if err != nil { if errors.Is(err, perseshttp.RequestNotFoundError) { @@ -111,13 +128,13 @@ func (r *PersesDashboardReconciler) syncPersesDashboard(ctx context.Context, per if err != nil { dlog.WithError(err).Errorf("Failed to create perses project: %s", dashboard.Namespace) - return subreconciler.RequeueWithErrorAndReason(err, common.ReasonBackendError) + return err } dlog.Infof("Project created: %s", dashboard.Namespace) } else { dlog.WithError(err).Errorf("project error: %s", dashboard.Namespace) - return subreconciler.RequeueWithErrorAndReason(err, common.ReasonBackendError) + return err } } @@ -137,33 +154,24 @@ func (r *PersesDashboardReconciler) syncPersesDashboard(ctx context.Context, per if err != nil { if errors.Is(err, perseshttp.RequestNotFoundError) { _, err = persesClient.Dashboard(dashboard.Namespace).Create(persesDashboard) - if err != nil { dlog.WithError(err).Errorf("Failed to create dashboard: %s", dashboard.Name) - return subreconciler.RequeueWithErrorAndReason(err, common.ReasonBackendError) + return err } - dlog.Infof("Dashboard created: %s", dashboard.Name) - - res, err := subreconciler.ContinueReconciling() - return res, "", err - } - - return subreconciler.RequeueWithErrorAndReason(err, common.ReasonBackendError) - } else { - _, err = persesClient.Dashboard(dashboard.Namespace).Update(persesDashboard) - - if err != nil { - dlog.WithError(err).Errorf("Failed to update dashboard: %s", dashboard.Name) - - return subreconciler.RequeueWithErrorAndReason(err, common.ReasonBackendError) + return nil } + return err + } - dlog.Infof("Dashboard updated: %s", dashboard.Name) + _, err = persesClient.Dashboard(dashboard.Namespace).Update(persesDashboard) + if err != nil { + dlog.WithError(err).Errorf("Failed to update dashboard: %s", dashboard.Name) + return err } - res, err := subreconciler.ContinueReconciling() - return res, "", err + dlog.Infof("Dashboard updated: %s", dashboard.Name) + return nil } func (r *PersesDashboardReconciler) deleteDashboardInAllInstances(ctx context.Context, _ ctrl.Request, dashbboardNamespace string, dashboardName string) (*ctrl.Result, error) { @@ -190,35 +198,46 @@ func (r *PersesDashboardReconciler) deleteDashboardInAllInstances(ctx context.Co } func (r *PersesDashboardReconciler) deleteDashboard(ctx context.Context, perses persesv1alpha2.Perses, dashboardNamespace string, dashboardName string) (*ctrl.Result, error) { - persesClient, err := r.ClientFactory.CreateClient(ctx, r.APIReader, perses) - + clients, err := r.ClientFactory.CreateClientsForAllPods(ctx, r.APIReader, perses) if err != nil { - dlog.WithError(err).Error("Failed to create perses rest client") + dlog.WithError(err).Error("Failed to create perses rest clients") return subreconciler.RequeueWithError(err) } - _, err = persesClient.Project().Get(dashboardNamespace) + var errs []error + for _, persesClient := range clients { + if err := r.deleteDashboardFromClient(persesClient, dashboardNamespace, dashboardName); err != nil { + errs = append(errs, err) + } + } + + if len(errs) > 0 { + return subreconciler.RequeueWithError(errors.Join(errs...)) + } + + return subreconciler.ContinueReconciling() +} +func (r *PersesDashboardReconciler) deleteDashboardFromClient(persesClient v1.ClientInterface, dashboardNamespace string, dashboardName string) error { + _, err := persesClient.Project().Get(dashboardNamespace) if err != nil { + if errors.Is(err, perseshttp.RequestNotFoundError) { + return nil + } dlog.WithError(err).Errorf("project error: %s", dashboardNamespace) - - return subreconciler.RequeueWithError(err) + return err } err = persesClient.Dashboard(dashboardNamespace).Delete(dashboardName) - - // Ignore NotFound — the resource may have already been deleted from Perses directly. - // Any other error means the delete failed and should be retried. if err != nil { if errors.Is(err, perseshttp.RequestNotFoundError) { dlog.Infof("Dashboard not found: %s", dashboardName) - return subreconciler.ContinueReconciling() + return nil } dlog.WithError(err).Errorf("Failed to delete dashboard: %s", dashboardName) - return subreconciler.RequeueWithError(err) + return err } dlog.Infof("Dashboard deleted: %s", dashboardName) - - return subreconciler.ContinueReconciling() + return nil } diff --git a/controllers/dashboards/persesdashboard_controller.go b/controllers/dashboards/persesdashboard_controller.go index be1b92c..54f4fde 100644 --- a/controllers/dashboards/persesdashboard_controller.go +++ b/controllers/dashboards/persesdashboard_controller.go @@ -68,6 +68,7 @@ var log = logger.WithField("module", "perses_dashboards_controller") // +kubebuilder:rbac:groups=perses.dev,resources=persesdashboards,verbs=get;list;watch;create;update;patch;delete // +kubebuilder:rbac:groups=perses.dev,resources=persesdashboards/status,verbs=get;update;patch // +kubebuilder:rbac:groups=perses.dev,resources=persesdashboards/finalizers,verbs=update +// +kubebuilder:rbac:groups=core,resources=pods,verbs=get;list;watch func (r *PersesDashboardReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) { start := time.Now() objKey := req.String() diff --git a/controllers/datasources/datasource_controller.go b/controllers/datasources/datasource_controller.go index fe24d2c..889066f 100644 --- a/controllers/datasources/datasource_controller.go +++ b/controllers/datasources/datasource_controller.go @@ -89,14 +89,29 @@ func (r *PersesDatasourceReconciler) reconcileDatasourcesInAllInstances(ctx cont } func (r *PersesDatasourceReconciler) syncPersesDatasource(ctx context.Context, perses persesv1alpha2.Perses, datasource *persesv1alpha2.PersesDatasource) (*ctrl.Result, persescommon.ConditionStatusReason, error) { - persesClient, err := r.ClientFactory.CreateClient(ctx, r.APIReader, perses) - + clients, err := r.ClientFactory.CreateClientsForAllPods(ctx, r.APIReader, perses) if err != nil { - dlog.WithError(err).Error("Failed to create perses rest client") + dlog.WithError(err).Error("Failed to create perses rest clients") return subreconciler.RequeueWithErrorAndReason(err, persescommon.ReasonConnectionFailed) } - _, err = persesClient.Project().Get(datasource.Namespace) + var errs []error + for _, persesClient := range clients { + if err := r.syncDatasourceToClient(ctx, persesClient, datasource); err != nil { + errs = append(errs, err) + } + } + + if len(errs) > 0 { + return subreconciler.RequeueWithErrorAndReason(errors.Join(errs...), persescommon.ReasonBackendError) + } + + res, err := subreconciler.ContinueReconciling() + return res, "", err +} + +func (r *PersesDatasourceReconciler) syncDatasourceToClient(ctx context.Context, persesClient v1.ClientInterface, datasource *persesv1alpha2.PersesDatasource) error { + _, err := persesClient.Project().Get(datasource.Namespace) if err != nil { if errors.Is(err, perseshttp.RequestNotFoundError) { @@ -114,22 +129,21 @@ func (r *PersesDatasourceReconciler) syncPersesDatasource(ctx context.Context, p if err != nil { dlog.WithError(err).Errorf("Failed to create perses project: %s", datasource.Namespace) - return subreconciler.RequeueWithErrorAndReason(err, persescommon.ReasonBackendError) + return err } dlog.Infof("Project created: %s", datasource.Namespace) } else { dlog.WithError(err).Errorf("project error: %s", datasource.Namespace) - return subreconciler.RequeueWithErrorAndReason(err, persescommon.ReasonBackendError) + return err } } - // create a secret holding the secret configuration so the datasource can reference it if persescommon.HasSecretConfig(datasource.Spec.Client) { - _, reason, err := r.syncPersesSecret(ctx, persesClient, datasource) + _, _, err := r.syncPersesSecret(ctx, persesClient, datasource) if err != nil { dlog.WithError(err).Errorf("Failed to create datasource secret: %s", datasource.Name) - return subreconciler.RequeueWithErrorAndReason(err, reason) + return err } } @@ -149,32 +163,24 @@ func (r *PersesDatasourceReconciler) syncPersesDatasource(ctx context.Context, p if err != nil { if errors.Is(err, perseshttp.RequestNotFoundError) { _, err = persesClient.Datasource(datasource.Namespace).Create(datasourceWithName) - if err != nil { dlog.WithError(err).Errorf("Failed to create datasource: %s", datasource.Name) - return subreconciler.RequeueWithErrorAndReason(err, persescommon.ReasonBackendError) + return err } - dlog.Infof("Datasource created: %s", datasource.Name) - - res, err := subreconciler.ContinueReconciling() - return res, "", err - } - - return subreconciler.RequeueWithErrorAndReason(err, persescommon.ReasonBackendError) - } else { - _, err = persesClient.Datasource(datasource.Namespace).Update(datasourceWithName) - - if err != nil { - dlog.WithError(err).Errorf("Failed to update datasource: %s", datasource.Name) - return subreconciler.RequeueWithErrorAndReason(err, persescommon.ReasonBackendError) + return nil } + return err + } - dlog.Infof("Datasource updated: %s", datasource.Name) + _, err = persesClient.Datasource(datasource.Namespace).Update(datasourceWithName) + if err != nil { + dlog.WithError(err).Errorf("Failed to update datasource: %s", datasource.Name) + return err } - res, err := subreconciler.ContinueReconciling() - return res, "", err + dlog.Infof("Datasource updated: %s", datasource.Name) + return nil } // creates/updates a Perses Secret with configuration, @@ -386,51 +392,60 @@ func (r *PersesDatasourceReconciler) deleteDatasourceInAllInstances(ctx context. } func (r *PersesDatasourceReconciler) deleteDatasource(ctx context.Context, perses persesv1alpha2.Perses, datasourceNamespace string, datasourceName string) (*ctrl.Result, error) { - persesClient, err := r.ClientFactory.CreateClient(ctx, r.APIReader, perses) - + clients, err := r.ClientFactory.CreateClientsForAllPods(ctx, r.APIReader, perses) if err != nil { - dlog.WithError(err).Error("Failed to create perses rest client") + dlog.WithError(err).Error("Failed to create perses rest clients") return subreconciler.RequeueWithError(err) } - _, err = persesClient.Project().Get(datasourceNamespace) + var errs []error + for _, persesClient := range clients { + if err := r.deleteDatasourceFromClient(persesClient, datasourceNamespace, datasourceName); err != nil { + errs = append(errs, err) + } + } + + if len(errs) > 0 { + return subreconciler.RequeueWithError(errors.Join(errs...)) + } + return subreconciler.ContinueReconciling() +} + +func (r *PersesDatasourceReconciler) deleteDatasourceFromClient(persesClient v1.ClientInterface, datasourceNamespace string, datasourceName string) error { + _, err := persesClient.Project().Get(datasourceNamespace) if err != nil { + if errors.Is(err, perseshttp.RequestNotFoundError) { + return nil + } dlog.WithError(err).Errorf("project error: %s", datasourceNamespace) - - return subreconciler.RequeueWithError(err) + return err } - // Ignore NotFound — the resource may have already been deleted from Perses directly. - // Any other error means the delete failed and should be retried. - // Secret delete is attempted regardless of whether the datasource was found or not. err = persesClient.Datasource(datasourceNamespace).Delete(datasourceName) - if err != nil { if errors.Is(err, perseshttp.RequestNotFoundError) { dlog.Infof("Datasource not found: %s", datasourceName) } else { dlog.WithError(err).Errorf("Failed to delete datasource: %s", datasourceName) - return subreconciler.RequeueWithError(err) + return err } } else { dlog.Infof("Datasource deleted: %s", datasourceName) } secretName := datasourceName + persescommon.SecretNameSuffix - err = persesClient.Secret(datasourceNamespace).Delete(secretName) - if err != nil { if errors.Is(err, perseshttp.RequestNotFoundError) { dlog.Infof("Secret not found: %s", secretName) } else { dlog.WithError(err).Errorf("Failed to delete secret: %s", secretName) - return subreconciler.RequeueWithError(err) + return err } } else { dlog.Infof("Secret deleted: %s", secretName) } - return subreconciler.ContinueReconciling() + return nil } diff --git a/controllers/datasources/persesdatasource_controller.go b/controllers/datasources/persesdatasource_controller.go index 9894200..33af556 100644 --- a/controllers/datasources/persesdatasource_controller.go +++ b/controllers/datasources/persesdatasource_controller.go @@ -68,6 +68,7 @@ var log = logger.WithField("module", "perses_datasource_controller") // +kubebuilder:rbac:groups=perses.dev,resources=persesdatasources/status,verbs=get;update;patch // +kubebuilder:rbac:groups=perses.dev,resources=persesdatasources/finalizers,verbs=update // +kubebuilder:rbac:groups="",resources=configmaps;secrets,verbs=watch;get +// +kubebuilder:rbac:groups=core,resources=pods,verbs=get;list;watch func (r *PersesDatasourceReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) { start := time.Now() objKey := req.String() diff --git a/controllers/globaldatasources/globaldatasource_controller.go b/controllers/globaldatasources/globaldatasource_controller.go index 88f1a13..be40b68 100644 --- a/controllers/globaldatasources/globaldatasource_controller.go +++ b/controllers/globaldatasources/globaldatasource_controller.go @@ -87,24 +87,37 @@ func (r *PersesGlobalDatasourceReconciler) reconcileGlobalDatasourcesInAllInstan } func (r *PersesGlobalDatasourceReconciler) syncPersesGlobalDatasource(ctx context.Context, perses persesv1alpha2.Perses, globaldatasource *persesv1alpha2.PersesGlobalDatasource) (*ctrl.Result, persescommon.ConditionStatusReason, error) { - persesClient, err := r.ClientFactory.CreateClient(ctx, r.APIReader, perses) - + clients, err := r.ClientFactory.CreateClientsForAllPods(ctx, r.APIReader, perses) if err != nil { - gdlog.WithError(err).Error("Failed to create perses rest client") + gdlog.WithError(err).Error("Failed to create perses rest clients") return subreconciler.RequeueWithErrorAndReason(err, persescommon.ReasonConnectionFailed) + } + + var errs []error + for _, persesClient := range clients { + if err := r.syncGlobalDatasourceToClient(ctx, persesClient, globaldatasource); err != nil { + errs = append(errs, err) + } + } + if len(errs) > 0 { + return subreconciler.RequeueWithErrorAndReason(errors.Join(errs...), persescommon.ReasonBackendError) } - // create a secret holding the secret configuration so the globaldatasource can reference it + res, err := subreconciler.ContinueReconciling() + return res, "", err +} + +func (r *PersesGlobalDatasourceReconciler) syncGlobalDatasourceToClient(ctx context.Context, persesClient v1.ClientInterface, globaldatasource *persesv1alpha2.PersesGlobalDatasource) error { if persescommon.HasSecretConfig(globaldatasource.Spec.Client) { - _, reason, err := r.syncPersesGlobalSecret(ctx, persesClient, globaldatasource) + _, _, err := r.syncPersesGlobalSecret(ctx, persesClient, globaldatasource) if err != nil { gdlog.WithError(err).Errorf("Failed to create globaldatasource secret: %s", globaldatasource.Name) - return subreconciler.RequeueWithErrorAndReason(err, reason) + return err } } - _, err = persesClient.GlobalDatasource().Get(globaldatasource.Name) + _, err := persesClient.GlobalDatasource().Get(globaldatasource.Name) globalDatasourceWithName := &persesv1.GlobalDatasource{ Kind: persesv1.KindGlobalDatasource, @@ -118,34 +131,24 @@ func (r *PersesGlobalDatasourceReconciler) syncPersesGlobalDatasource(ctx contex if err != nil { if errors.Is(err, perseshttp.RequestNotFoundError) { _, err = persesClient.GlobalDatasource().Create(globalDatasourceWithName) - if err != nil { gdlog.WithError(err).Errorf("Failed to create globaldatasource: %s", globaldatasource.Name) - return subreconciler.RequeueWithErrorAndReason(err, persescommon.ReasonBackendError) - + return err } - gdlog.Infof("GlobalDatasource created: %s", globaldatasource.Name) - - res, err := subreconciler.ContinueReconciling() - return res, "", err - } - - res, err := subreconciler.RequeueWithError(err) - return res, persescommon.ReasonBackendError, err - } else { - _, err = persesClient.GlobalDatasource().Update(globalDatasourceWithName) - - if err != nil { - gdlog.WithError(err).Errorf("Failed to update globaldatasource: %s", globaldatasource.Name) - return subreconciler.RequeueWithErrorAndReason(err, persescommon.ReasonBackendError) + return nil } + return err + } - gdlog.Infof("GlobalDatasource updated: %s", globaldatasource.Name) + _, err = persesClient.GlobalDatasource().Update(globalDatasourceWithName) + if err != nil { + gdlog.WithError(err).Errorf("Failed to update globaldatasource: %s", globaldatasource.Name) + return err } - res, err := subreconciler.ContinueReconciling() - return res, "", err + gdlog.Infof("GlobalDatasource updated: %s", globaldatasource.Name) + return nil } // creates/updates a Perses Global Secret with configuration, @@ -370,43 +373,51 @@ func (r *PersesGlobalDatasourceReconciler) deleteGlobalDatasourceInAllInstances( } func (r *PersesGlobalDatasourceReconciler) deleteGlobalDatasource(ctx context.Context, perses persesv1alpha2.Perses, datasourceName string) (*ctrl.Result, error) { - persesClient, err := r.ClientFactory.CreateClient(ctx, r.APIReader, perses) - + clients, err := r.ClientFactory.CreateClientsForAllPods(ctx, r.APIReader, perses) if err != nil { - gdlog.WithError(err).Error("Failed to create perses rest client") + gdlog.WithError(err).Error("Failed to create perses rest clients") return subreconciler.RequeueWithError(err) } - // Ignore NotFound — the resource may have already been deleted from Perses directly. - // Any other error means the delete failed and should be retried. - // Secret delete is attempted regardless of whether the datasource was found or not. - err = persesClient.GlobalDatasource().Delete(datasourceName) + var errs []error + for _, persesClient := range clients { + if err := r.deleteGlobalDatasourceFromClient(persesClient, datasourceName); err != nil { + errs = append(errs, err) + } + } + + if len(errs) > 0 { + return subreconciler.RequeueWithError(errors.Join(errs...)) + } + + return subreconciler.ContinueReconciling() +} +func (r *PersesGlobalDatasourceReconciler) deleteGlobalDatasourceFromClient(persesClient v1.ClientInterface, datasourceName string) error { + err := persesClient.GlobalDatasource().Delete(datasourceName) if err != nil { if errors.Is(err, perseshttp.RequestNotFoundError) { gdlog.Infof("GlobalDatasource not found: %s", datasourceName) } else { gdlog.WithError(err).Errorf("Failed to delete global datasource: %s", datasourceName) - return subreconciler.RequeueWithError(err) + return err } } else { gdlog.Infof("GlobalDatasource deleted: %s", datasourceName) } secretName := datasourceName + persescommon.SecretNameSuffix - err = persesClient.GlobalSecret().Delete(secretName) - if err != nil { if errors.Is(err, perseshttp.RequestNotFoundError) { gdlog.Infof("GlobalSecret not found: %s", secretName) } else { gdlog.WithError(err).Errorf("Failed to delete global secret: %s", secretName) - return subreconciler.RequeueWithError(err) + return err } } else { gdlog.Infof("GlobalSecret deleted: %s", secretName) } - return subreconciler.ContinueReconciling() + return nil } diff --git a/controllers/globaldatasources/persesglobaldatasource_controller.go b/controllers/globaldatasources/persesglobaldatasource_controller.go index 4c8d59d..e01020c 100644 --- a/controllers/globaldatasources/persesglobaldatasource_controller.go +++ b/controllers/globaldatasources/persesglobaldatasource_controller.go @@ -68,6 +68,7 @@ var log = logger.WithField("module", "perses_globaldatasource_controller") // +kubebuilder:rbac:groups=perses.dev,resources=persesglobaldatasources/status,verbs=get;update;patch // +kubebuilder:rbac:groups=perses.dev,resources=persesglobaldatasources/finalizers,verbs=update // +kubebuilder:rbac:groups="",resources=configmaps;secrets,verbs=watch;get +// +kubebuilder:rbac:groups=core,resources=pods,verbs=get;list;watch func (r *PersesGlobalDatasourceReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) { start := time.Now() objKey := req.String() diff --git a/internal/perses/common/perses_client_factory.go b/internal/perses/common/perses_client_factory.go index 09e28ce..6c71996 100644 --- a/internal/perses/common/perses_client_factory.go +++ b/internal/perses/common/perses_client_factory.go @@ -19,6 +19,7 @@ import ( "fmt" "os" + corev1 "k8s.io/api/core/v1" "sigs.k8s.io/controller-runtime/pkg/client" v1 "github.com/perses/perses/pkg/client/api/v1" @@ -33,6 +34,7 @@ const tokenPath = "/var/run/secrets/kubernetes.io/serviceaccount/token" type PersesClientFactory interface { CreateClient(ctx context.Context, client client.Reader, perses persesv1alpha2.Perses) (v1.ClientInterface, error) + CreateClientsForAllPods(ctx context.Context, k8sClient client.Reader, perses persesv1alpha2.Perses) ([]v1.ClientInterface, error) } type PersesClientFactoryWithConfig struct{} @@ -41,42 +43,30 @@ func NewWithConfig() PersesClientFactory { return &PersesClientFactoryWithConfig{} } -func (f *PersesClientFactoryWithConfig) CreateClient(ctx context.Context, client client.Reader, perses persesv1alpha2.Perses) (v1.ClientInterface, error) { - var urlStr string - - var httpProtocol = "http" - if isTLSEnabled(&perses) { +func (f *PersesClientFactoryWithConfig) getProtocolAndPort(perses *persesv1alpha2.Perses) (string, int32) { + httpProtocol := "http" + if isTLSEnabled(perses) { httpProtocol = "https" } - - serverURLFlag := flag.Lookup(PersesServerURLFlag) - if serverURLFlag != nil && serverURLFlag.Value.String() != "" { - urlStr = serverURLFlag.Value.String() - } else { - containerPort := DefaultContainerPort - if perses.Spec.ContainerPort != nil { - containerPort = *perses.Spec.ContainerPort - } - urlStr = fmt.Sprintf("%s://%s.%s.svc.cluster.local:%d%s", httpProtocol, perses.Name, perses.Namespace, containerPort, perses.Spec.Config.APIPrefix) - } - parsedURL, err := common.ParseURL(urlStr) - if err != nil { - return nil, err + containerPort := DefaultContainerPort + if perses.Spec.ContainerPort != nil { + containerPort = *perses.Spec.ContainerPort } + return httpProtocol, containerPort +} - config := clientConfig.RestConfigClient{ - URL: parsedURL, - } +func (f *PersesClientFactoryWithConfig) buildBaseConfig(ctx context.Context, k8sClient client.Reader, perses *persesv1alpha2.Perses) (clientConfig.RestConfigClient, error) { + config := clientConfig.RestConfigClient{} - if isKubernetesAuthEnabled(&perses) { + if isKubernetesAuthEnabled(perses) { tokenBytes, err := os.ReadFile(tokenPath) if err != nil { - return nil, fmt.Errorf("failed to read service account token from %s: %w", tokenPath, err) + return config, fmt.Errorf("failed to read service account token from %s: %w", tokenPath, err) } saToken := string(tokenBytes) if saToken == "" { - return nil, fmt.Errorf("service account token is empty, ensure the Perses operator has the correct permissions") + return config, fmt.Errorf("service account token is empty, ensure the Perses operator has the correct permissions") } config.Headers = map[string]string{ @@ -84,7 +74,7 @@ func (f *PersesClientFactoryWithConfig) CreateClient(ctx context.Context, client } } - if isClientTLSEnabled(&perses) { + if isClientTLSEnabled(perses) { tls := perses.Spec.Client.TLS insecureSkipVerify := false @@ -99,12 +89,10 @@ func (f *PersesClientFactoryWithConfig) CreateClient(ctx context.Context, client if tls.CaCert != nil { switch tls.CaCert.Type { case persesv1alpha2.SecretSourceTypeSecret, persesv1alpha2.SecretSourceTypeConfigMap: - caData, _, err := GetTLSCertData(ctx, client, perses.Namespace, perses.Name, tls.CaCert) - + caData, _, err := GetTLSCertData(ctx, k8sClient, perses.Namespace, perses.Name, tls.CaCert) if err != nil { - return nil, err + return config, err } - tlsConfig.CA = caData case persesv1alpha2.SecretSourceTypeFile: tlsConfig.CAFile = tls.CaCert.CertPath @@ -114,12 +102,10 @@ func (f *PersesClientFactoryWithConfig) CreateClient(ctx context.Context, client if tls.UserCert != nil { switch tls.UserCert.Type { case persesv1alpha2.SecretSourceTypeSecret, persesv1alpha2.SecretSourceTypeConfigMap: - cert, key, err := GetTLSCertData(ctx, client, perses.Namespace, perses.Name, tls.UserCert) - + cert, key, err := GetTLSCertData(ctx, k8sClient, perses.Namespace, perses.Name, tls.UserCert) if err != nil { - return nil, err + return config, err } - tlsConfig.Cert = cert tlsConfig.Key = key case persesv1alpha2.SecretSourceTypeFile: @@ -130,14 +116,103 @@ func (f *PersesClientFactoryWithConfig) CreateClient(ctx context.Context, client config.TLSConfig = tlsConfig } + return config, nil +} + +func (f *PersesClientFactoryWithConfig) createClientForURL(urlStr string, baseConfig clientConfig.RestConfigClient) (v1.ClientInterface, error) { + parsedURL, err := common.ParseURL(urlStr) + if err != nil { + return nil, err + } + + config := baseConfig + config.URL = parsedURL + restClient, err := clientConfig.NewRESTClient(config) if err != nil { return nil, err } - persesClient := v1.NewWithClient(restClient) + return v1.NewWithClient(restClient), nil +} + +func (f *PersesClientFactoryWithConfig) getServiceURL(perses *persesv1alpha2.Perses) string { + httpProtocol, containerPort := f.getProtocolAndPort(perses) + + serverURLFlag := flag.Lookup(PersesServerURLFlag) + if serverURLFlag != nil && serverURLFlag.Value.String() != "" { + return serverURLFlag.Value.String() + } - return persesClient, nil + return fmt.Sprintf("%s://%s.%s.svc.cluster.local:%d%s", httpProtocol, perses.Name, perses.Namespace, containerPort, perses.Spec.Config.APIPrefix) +} + +func (f *PersesClientFactoryWithConfig) CreateClient(ctx context.Context, k8sClient client.Reader, perses persesv1alpha2.Perses) (v1.ClientInterface, error) { + baseConfig, err := f.buildBaseConfig(ctx, k8sClient, &perses) + if err != nil { + return nil, err + } + + urlStr := f.getServiceURL(&perses) + return f.createClientForURL(urlStr, baseConfig) +} + +func (f *PersesClientFactoryWithConfig) CreateClientsForAllPods(ctx context.Context, k8sClient client.Reader, perses persesv1alpha2.Perses) ([]v1.ClientInterface, error) { + if perses.Spec.Config.Database.SQL != nil { + c, err := f.CreateClient(ctx, k8sClient, perses) + if err != nil { + return nil, err + } + return []v1.ClientInterface{c}, nil + } + + baseConfig, err := f.buildBaseConfig(ctx, k8sClient, &perses) + if err != nil { + return nil, err + } + + podList := &corev1.PodList{} + err = k8sClient.List(ctx, podList, + client.InNamespace(perses.Namespace), + client.MatchingLabels(LabelsForPerses(perses.Name, &perses)), + ) + if err != nil { + return nil, fmt.Errorf("failed to list pods for Perses instance %s/%s: %w", perses.Namespace, perses.Name, err) + } + + httpProtocol, containerPort := f.getProtocolAndPort(&perses) + + var clients []v1.ClientInterface + for i := range podList.Items { + pod := &podList.Items[i] + if !isPodReady(pod) { + continue + } + urlStr := fmt.Sprintf("%s://%s:%d%s", httpProtocol, pod.Status.PodIP, containerPort, perses.Spec.Config.APIPrefix) + c, err := f.createClientForURL(urlStr, baseConfig) + if err != nil { + return nil, err + } + clients = append(clients, c) + } + + if len(clients) == 0 { + return nil, fmt.Errorf("no ready pods found for Perses instance %s/%s", perses.Namespace, perses.Name) + } + + return clients, nil +} + +func isPodReady(pod *corev1.Pod) bool { + if pod.Status.Phase != corev1.PodRunning || pod.Status.PodIP == "" { + return false + } + for _, cond := range pod.Status.Conditions { + if cond.Type == corev1.PodReady && cond.Status == corev1.ConditionTrue { + return true + } + } + return false } type PersesClientFactoryWithClient struct { @@ -151,3 +226,7 @@ func NewWithClient(client v1.ClientInterface) PersesClientFactory { func (f *PersesClientFactoryWithClient) CreateClient(_ context.Context, _ client.Reader, _ persesv1alpha2.Perses) (v1.ClientInterface, error) { return f.client, nil } + +func (f *PersesClientFactoryWithClient) CreateClientsForAllPods(_ context.Context, _ client.Reader, _ persesv1alpha2.Perses) ([]v1.ClientInterface, error) { + return []v1.ClientInterface{f.client}, nil +}