From 4cddce50945ccde5a646baa99c55d291e508a11f Mon Sep 17 00:00:00 2001 From: Ganesh Kumar Date: Mon, 4 May 2026 15:46:06 +0530 Subject: [PATCH 1/2] Implement Conditional Webhook Registry Updates LiveReview Pre-Commit Check: ran (iter:4, coverage:64%) --- internal/api/connectors.go | 53 ++++++++++++++++- internal/api/server.go | 2 +- internal/jobqueue/jobqueue.go | 105 +++++++++++++++++++++++++--------- 3 files changed, 130 insertions(+), 30 deletions(-) diff --git a/internal/api/connectors.go b/internal/api/connectors.go index 47c300f8..cb19664f 100644 --- a/internal/api/connectors.go +++ b/internal/api/connectors.go @@ -446,7 +446,56 @@ func (s *Server) GetConnector(c echo.Context) error { // DeleteConnector handles deletion of a git provider connection func (s *Server) DeleteConnector(c echo.Context) error { - id := c.Param("id") + idStr := c.Param("id") + id, err := strconv.Atoi(idStr) + if err != nil { + return c.JSON(http.StatusBadRequest, ErrorResponse{ + Error: "Invalid connector ID", + }) + } + + // Validate connector ownership + if _, err := s.validateConnectorOwnership(c, id); err != nil { + return err + } + + // Fetch connector details for webhook removal + var provider, providerURL, patToken string + query := ` + SELECT provider, provider_url, pat_token + FROM integration_tokens + WHERE id = $1 + ` + err = s.db.QueryRow(query, id).Scan(&provider, &providerURL, &patToken) + if err != nil { + if err == sql.ErrNoRows { + return c.JSON(http.StatusNotFound, ErrorResponse{ + Error: "Connector not found", + }) + } + log.Printf("Failed to fetch connector details for deletion: %v", err) + return c.JSON(http.StatusInternalServerError, ErrorResponse{ + Error: "Database error: " + err.Error(), + }) + } + + // Fetch projects to queue webhook removal + repositoryData, err := s.fetchAndCacheRepositoryData(id, false, false) + if err != nil { + log.Printf("Failed to fetch repository data for connector %d during deletion: %v", id, err) + // We log the error but proceed with connector deletion so the user isn't permanently blocked + } else if repositoryData.Error != "" { + log.Printf("Repository access error during deletion of connector %d: %s", id, repositoryData.Error) + // Again, proceed to delete the connector + } else { + // Queue webhook removal jobs for each project + ctx := c.Request().Context() + for _, projectPath := range repositoryData.Projects { + if err := s.jobQueue.QueueWebhookRemovalJob(ctx, id, projectPath, provider, providerURL, patToken, true); err != nil { + log.Printf("Failed to queue webhook removal job for %s: %v", projectPath, err) + } + } + } // Execute the delete query result, err := s.db.Exec(` @@ -455,7 +504,7 @@ func (s *Server) DeleteConnector(c echo.Context) error { `, id) if err != nil { - log.Printf("Failed to delete connector with ID %s: %v", id, err) + log.Printf("Failed to delete connector with ID %d: %v", id, err) return c.JSON(http.StatusInternalServerError, ErrorResponse{ Error: "Database error: " + err.Error(), }) diff --git a/internal/api/server.go b/internal/api/server.go index f43ef901..c8f4ab3e 100644 --- a/internal/api/server.go +++ b/internal/api/server.go @@ -1630,7 +1630,7 @@ func (s *Server) DisableManualTriggerForAllProjects(c echo.Context) error { var queueErrors []string for _, projectPath := range repositoryData.Projects { - err := s.jobQueue.QueueWebhookRemovalJob(ctx, connectorId, projectPath, provider, providerURL, patToken) + err := s.jobQueue.QueueWebhookRemovalJob(ctx, connectorId, projectPath, provider, providerURL, patToken, false) if err != nil { queueErrors = append(queueErrors, fmt.Sprintf("Failed to queue removal job for %s: %v", projectPath, err)) } else { diff --git a/internal/jobqueue/jobqueue.go b/internal/jobqueue/jobqueue.go index ef8575d7..7e67a346 100644 --- a/internal/jobqueue/jobqueue.go +++ b/internal/jobqueue/jobqueue.go @@ -33,6 +33,7 @@ import ( "time" "github.com/jackc/pgx/v5" + "github.com/jackc/pgx/v5/pgconn" "github.com/jackc/pgx/v5/pgxpool" "github.com/livereview/internal/providers/gitea" networkjobqueue "github.com/livereview/network/jobqueue" @@ -153,11 +154,12 @@ type WebhookInstallWorker struct { // WebhookRemovalJobArgs represents the arguments for a webhook removal job type WebhookRemovalJobArgs struct { - ConnectorID int `json:"connector_id"` - ProjectPath string `json:"project_path"` - Provider string `json:"provider"` - BaseURL string `json:"base_url"` - PAT string `json:"pat"` + ConnectorID int `json:"connector_id"` + ProjectPath string `json:"project_path"` + Provider string `json:"provider"` + BaseURL string `json:"base_url"` + PAT string `json:"pat"` + SkipRegistryUpdate bool `json:"skip_registry_update"` } // Kind returns the job kind for River @@ -1413,11 +1415,13 @@ func (w *WebhookRemovalWorker) handleGitLabWebhookRemoval(ctx context.Context, a log.Printf("Successfully removed webhooks for project %s", args.ProjectPath) - // Update the webhook registry to mark as unconnected - err = w.updateWebhookRegistryForRemoval(ctx, args, projectID) - if err != nil { - log.Printf("Failed to update webhook registry for project %s: %v", args.ProjectPath, err) - return fmt.Errorf("failed to update webhook registry: %w", err) + // Update the webhook registry to mark as unconnected, unless skipped + if !args.SkipRegistryUpdate { + err = w.updateWebhookRegistryForRemoval(ctx, args, projectID) + if err != nil { + log.Printf("Failed to update webhook registry for project %s: %v", args.ProjectPath, err) + return fmt.Errorf("failed to update webhook registry: %w", err) + } } return nil @@ -1443,11 +1447,13 @@ func (w *WebhookRemovalWorker) handleGitHubWebhookRemoval(ctx context.Context, a log.Printf("Successfully removed webhooks for GitHub repository %s/%s", owner, repo) - // Update the webhook registry to mark as unconnected - err = w.updateWebhookRegistryForGitHubRemoval(ctx, args) - if err != nil { - log.Printf("Failed to update webhook registry for GitHub repository %s/%s: %v", owner, repo, err) - return fmt.Errorf("failed to update webhook registry: %w", err) + // Update the webhook registry to mark as unconnected, unless skipped + if !args.SkipRegistryUpdate { + err = w.updateWebhookRegistryForGitHubRemoval(ctx, args) + if err != nil { + log.Printf("Failed to update webhook registry for GitHub repository %s/%s: %v", owner, repo, err) + return fmt.Errorf("failed to update webhook registry: %w", err) + } } return nil @@ -1653,6 +1659,10 @@ func (w *WebhookRemovalWorker) updateWebhookRegistryForRemoval(ctx context.Conte IntegrationTokenID: args.ConnectorID, }) if err != nil { + if isForeignKeyError(err) { + log.Printf("Ignoring foreign key error for project %s (connector likely deleted)", args.ProjectPath) + return nil + } return fmt.Errorf("failed to insert GitLab removal registry: %w", err) } @@ -1671,6 +1681,10 @@ func (w *WebhookRemovalWorker) updateWebhookRegistryForRemoval(ctx context.Conte UpdatedAt: now, }) if err != nil { + if isForeignKeyError(err) { + log.Printf("Ignoring foreign key error for project %s (connector likely deleted)", args.ProjectPath) + return nil + } return fmt.Errorf("failed to update GitLab removal registry: %w", err) } @@ -1820,6 +1834,10 @@ func (w *WebhookRemovalWorker) updateWebhookRegistryForGitHubRemoval(ctx context IntegrationTokenID: args.ConnectorID, }) if err != nil { + if isForeignKeyError(err) { + log.Printf("Ignoring foreign key error for GitHub repository %s (connector likely deleted)", args.ProjectPath) + return nil + } return fmt.Errorf("failed to insert GitHub removal registry: %w", err) } @@ -1838,6 +1856,10 @@ func (w *WebhookRemovalWorker) updateWebhookRegistryForGitHubRemoval(ctx context UpdatedAt: now, }) if err != nil { + if isForeignKeyError(err) { + log.Printf("Ignoring foreign key error for GitHub repository %s (connector likely deleted)", args.ProjectPath) + return nil + } return fmt.Errorf("failed to update GitHub removal registry: %w", err) } @@ -1873,10 +1895,12 @@ func (w *WebhookRemovalWorker) handleBitbucketWebhookRemoval(ctx context.Context // Don't return error here - we still want to update the registry } - // Update the webhook_registry to mark as removed - err = w.updateWebhookRegistryForBitbucketRemoval(ctx, args) - if err != nil { - return fmt.Errorf("failed to update webhook registry: %w", err) + // Update the webhook_registry to mark as removed, unless skipped + if !args.SkipRegistryUpdate { + err = w.updateWebhookRegistryForBitbucketRemoval(ctx, args) + if err != nil { + return fmt.Errorf("failed to update webhook registry: %w", err) + } } log.Printf("Bitbucket webhook removal completed for repository: %s/%s", workspace, repo) @@ -2048,6 +2072,10 @@ func (w *WebhookRemovalWorker) updateWebhookRegistryForBitbucketRemoval(ctx cont IntegrationTokenID: args.ConnectorID, }) if err != nil { + if isForeignKeyError(err) { + log.Printf("Ignoring foreign key error for Bitbucket repository %s (connector likely deleted)", args.ProjectPath) + return nil + } return fmt.Errorf("failed to insert Bitbucket removal registry: %w", err) } @@ -2066,6 +2094,10 @@ func (w *WebhookRemovalWorker) updateWebhookRegistryForBitbucketRemoval(ctx cont UpdatedAt: now, }) if err != nil { + if isForeignKeyError(err) { + log.Printf("Ignoring foreign key error for Bitbucket repository %s (connector likely deleted)", args.ProjectPath) + return nil + } return fmt.Errorf("failed to update Bitbucket removal registry: %w", err) } @@ -2095,8 +2127,10 @@ func (w *WebhookRemovalWorker) handleGiteaWebhookRemoval(ctx context.Context, ar // continue to registry update even on API failure } - if err := w.updateWebhookRegistryForGiteaRemoval(ctx, args); err != nil { - return fmt.Errorf("failed to update webhook registry: %w", err) + if !args.SkipRegistryUpdate { + if err := w.updateWebhookRegistryForGiteaRemoval(ctx, args); err != nil { + return fmt.Errorf("failed to update webhook registry: %w", err) + } } log.Printf("Gitea webhook removal completed for repository: %s/%s", owner, repo) @@ -2225,6 +2259,10 @@ func (w *WebhookRemovalWorker) updateWebhookRegistryForGiteaRemoval(ctx context. IntegrationTokenID: args.ConnectorID, }) if err != nil { + if isForeignKeyError(err) { + log.Printf("Ignoring foreign key error for Gitea repository %s (connector likely deleted)", args.ProjectPath) + return nil + } return fmt.Errorf("failed to insert Gitea removal registry: %w", err) } @@ -2245,6 +2283,10 @@ func (w *WebhookRemovalWorker) updateWebhookRegistryForGiteaRemoval(ctx context. UpdatedAt: now, }) if err != nil { + if isForeignKeyError(err) { + log.Printf("Ignoring foreign key error for Gitea repository %s (connector likely deleted)", args.ProjectPath) + return nil + } return fmt.Errorf("failed to update Gitea removal registry: %w", err) } @@ -2252,6 +2294,14 @@ func (w *WebhookRemovalWorker) updateWebhookRegistryForGiteaRemoval(ctx context. return nil } +func isForeignKeyError(err error) bool { + var pgErr *pgconn.PgError + if errors.As(err, &pgErr) { + return pgErr.Code == "23503" + } + return false +} + // JobQueue manages the River job queue type JobQueue struct { client *river.Client[pgx.Tx] @@ -2329,13 +2379,14 @@ func (jq *JobQueue) QueueWebhookInstallJob(ctx context.Context, connectorID int, } // QueueWebhookRemovalJob queues a webhook removal job -func (jq *JobQueue) QueueWebhookRemovalJob(ctx context.Context, connectorID int, projectPath, provider, baseURL, pat string) error { +func (jq *JobQueue) QueueWebhookRemovalJob(ctx context.Context, connectorID int, projectPath, provider, baseURL, pat string, skipRegistryUpdate bool) error { args := WebhookRemovalJobArgs{ - ConnectorID: connectorID, - ProjectPath: projectPath, - Provider: provider, - BaseURL: baseURL, - PAT: pat, + ConnectorID: connectorID, + ProjectPath: projectPath, + Provider: provider, + BaseURL: baseURL, + PAT: pat, + SkipRegistryUpdate: skipRegistryUpdate, } _, err := jq.client.Insert(ctx, args, nil) From c8725f351b0131a3333a461ab5817a761439386a Mon Sep 17 00:00:00 2001 From: Ganesh Kumar Date: Mon, 4 May 2026 18:54:30 +0530 Subject: [PATCH 2/2] Propagate Foreign Key Errors in Webhook Removal LiveReview Pre-Commit Check: ran (iter:1, coverage:0%) --- internal/jobqueue/jobqueue.go | 41 ----------------------------------- 1 file changed, 41 deletions(-) diff --git a/internal/jobqueue/jobqueue.go b/internal/jobqueue/jobqueue.go index 7e67a346..0bda4f4d 100644 --- a/internal/jobqueue/jobqueue.go +++ b/internal/jobqueue/jobqueue.go @@ -33,7 +33,6 @@ import ( "time" "github.com/jackc/pgx/v5" - "github.com/jackc/pgx/v5/pgconn" "github.com/jackc/pgx/v5/pgxpool" "github.com/livereview/internal/providers/gitea" networkjobqueue "github.com/livereview/network/jobqueue" @@ -1659,10 +1658,6 @@ func (w *WebhookRemovalWorker) updateWebhookRegistryForRemoval(ctx context.Conte IntegrationTokenID: args.ConnectorID, }) if err != nil { - if isForeignKeyError(err) { - log.Printf("Ignoring foreign key error for project %s (connector likely deleted)", args.ProjectPath) - return nil - } return fmt.Errorf("failed to insert GitLab removal registry: %w", err) } @@ -1681,10 +1676,6 @@ func (w *WebhookRemovalWorker) updateWebhookRegistryForRemoval(ctx context.Conte UpdatedAt: now, }) if err != nil { - if isForeignKeyError(err) { - log.Printf("Ignoring foreign key error for project %s (connector likely deleted)", args.ProjectPath) - return nil - } return fmt.Errorf("failed to update GitLab removal registry: %w", err) } @@ -1834,10 +1825,6 @@ func (w *WebhookRemovalWorker) updateWebhookRegistryForGitHubRemoval(ctx context IntegrationTokenID: args.ConnectorID, }) if err != nil { - if isForeignKeyError(err) { - log.Printf("Ignoring foreign key error for GitHub repository %s (connector likely deleted)", args.ProjectPath) - return nil - } return fmt.Errorf("failed to insert GitHub removal registry: %w", err) } @@ -1856,10 +1843,6 @@ func (w *WebhookRemovalWorker) updateWebhookRegistryForGitHubRemoval(ctx context UpdatedAt: now, }) if err != nil { - if isForeignKeyError(err) { - log.Printf("Ignoring foreign key error for GitHub repository %s (connector likely deleted)", args.ProjectPath) - return nil - } return fmt.Errorf("failed to update GitHub removal registry: %w", err) } @@ -2072,10 +2055,6 @@ func (w *WebhookRemovalWorker) updateWebhookRegistryForBitbucketRemoval(ctx cont IntegrationTokenID: args.ConnectorID, }) if err != nil { - if isForeignKeyError(err) { - log.Printf("Ignoring foreign key error for Bitbucket repository %s (connector likely deleted)", args.ProjectPath) - return nil - } return fmt.Errorf("failed to insert Bitbucket removal registry: %w", err) } @@ -2094,10 +2073,6 @@ func (w *WebhookRemovalWorker) updateWebhookRegistryForBitbucketRemoval(ctx cont UpdatedAt: now, }) if err != nil { - if isForeignKeyError(err) { - log.Printf("Ignoring foreign key error for Bitbucket repository %s (connector likely deleted)", args.ProjectPath) - return nil - } return fmt.Errorf("failed to update Bitbucket removal registry: %w", err) } @@ -2259,10 +2234,6 @@ func (w *WebhookRemovalWorker) updateWebhookRegistryForGiteaRemoval(ctx context. IntegrationTokenID: args.ConnectorID, }) if err != nil { - if isForeignKeyError(err) { - log.Printf("Ignoring foreign key error for Gitea repository %s (connector likely deleted)", args.ProjectPath) - return nil - } return fmt.Errorf("failed to insert Gitea removal registry: %w", err) } @@ -2283,10 +2254,6 @@ func (w *WebhookRemovalWorker) updateWebhookRegistryForGiteaRemoval(ctx context. UpdatedAt: now, }) if err != nil { - if isForeignKeyError(err) { - log.Printf("Ignoring foreign key error for Gitea repository %s (connector likely deleted)", args.ProjectPath) - return nil - } return fmt.Errorf("failed to update Gitea removal registry: %w", err) } @@ -2294,14 +2261,6 @@ func (w *WebhookRemovalWorker) updateWebhookRegistryForGiteaRemoval(ctx context. return nil } -func isForeignKeyError(err error) bool { - var pgErr *pgconn.PgError - if errors.As(err, &pgErr) { - return pgErr.Code == "23503" - } - return false -} - // JobQueue manages the River job queue type JobQueue struct { client *river.Client[pgx.Tx]