From 81823d928c91d93a2aecd246891e2151d6ccd87c Mon Sep 17 00:00:00 2001 From: Nick Schuch Date: Tue, 17 Mar 2026 08:32:23 +1000 Subject: [PATCH] feat: Allow connectors to be updated --- internal/opensearchml/create_connector.go | 72 +++++++++++++++++------ main.go | 2 +- 2 files changed, 55 insertions(+), 19 deletions(-) diff --git a/internal/opensearchml/create_connector.go b/internal/opensearchml/create_connector.go index ef7a28c..78510a5 100644 --- a/internal/opensearchml/create_connector.go +++ b/internal/opensearchml/create_connector.go @@ -57,35 +57,41 @@ type CreateConnectorRequestAction struct { PostProcessFunction string `json:"post_process_function"` } -type CreateConnectorResponse struct { +type CreateOrUpdateConnectorResponse struct { ConnectorID string `json:"connector_id"` } -func (c *Client) CreateConnector(ctx context.Context, req CreateConnectorRequest) (CreateConnectorResponse, error) { +func (c *Client) CreateOrUpdateConnector(ctx context.Context, req CreateConnectorRequest) (CreateOrUpdateConnectorResponse, error) { if req.Name == "" { - return CreateConnectorResponse{}, fmt.Errorf("connector name is required") + return CreateOrUpdateConnectorResponse{}, fmt.Errorf("connector name is required") } + bodyBytes, err := json.Marshal(req) + if err != nil { + return CreateOrUpdateConnectorResponse{}, fmt.Errorf("marshal connector payload: %w", err) + } + + // If a connector with this name already exists, update it via PUT. if id, ok, err := c.FindConnectorIDByName(ctx, req.Name); err != nil { - return CreateConnectorResponse{}, fmt.Errorf("find connector by name: %w", err) + return CreateOrUpdateConnectorResponse{}, fmt.Errorf("find connector by name: %w", err) } else if ok { - return CreateConnectorResponse{ConnectorID: id}, nil - } + if err := c.updateConnector(ctx, id, bodyBytes); err != nil { + return CreateOrUpdateConnectorResponse{}, fmt.Errorf("update connector: %w", err) + } - bodyBytes, err := json.Marshal(req) - if err != nil { - return CreateConnectorResponse{}, fmt.Errorf("marshal create connector payload: %w", err) + return CreateOrUpdateConnectorResponse{ConnectorID: id}, nil } + // Connector does not exist yet, create it via POST. httpReq, err := http.NewRequestWithContext(ctx, http.MethodPost, "/_plugins/_ml/connectors/_create", bytes.NewReader(bodyBytes)) if err != nil { - return CreateConnectorResponse{}, fmt.Errorf("new request: %w", err) + return CreateOrUpdateConnectorResponse{}, fmt.Errorf("new request: %w", err) } httpReq.Header.Set("Content-Type", "application/json") httpResp, err := c.opensearch.Client.Perform(httpReq) if err != nil { - return CreateConnectorResponse{}, fmt.Errorf("perform create request: %w", err) + return CreateOrUpdateConnectorResponse{}, fmt.Errorf("perform create request: %w", err) } defer func() { if err := httpResp.Body.Close(); err != nil { @@ -95,28 +101,58 @@ func (c *Client) CreateConnector(ctx context.Context, req CreateConnectorRequest respBytes, _ := io.ReadAll(httpResp.Body) - // If another caller created it between our check and create, re-check and return. + // If another caller created it between our check and create, update the existing connector. if httpResp.StatusCode == http.StatusConflict { if id, ok, err := c.FindConnectorIDByName(ctx, req.Name); err != nil { - return CreateConnectorResponse{}, fmt.Errorf("create conflict; re-find connector: %w", err) + return CreateOrUpdateConnectorResponse{}, fmt.Errorf("create conflict; re-find connector: %w", err) } else if ok { - return CreateConnectorResponse{ConnectorID: id}, nil + if err := c.updateConnector(ctx, id, bodyBytes); err != nil { + return CreateOrUpdateConnectorResponse{}, fmt.Errorf("create conflict; update connector: %w", err) + } + + return CreateOrUpdateConnectorResponse{ConnectorID: id}, nil } } if httpResp.StatusCode < 200 || httpResp.StatusCode >= 300 { - return CreateConnectorResponse{}, fmt.Errorf("create connector failed: status=%d body=%s", httpResp.StatusCode, string(respBytes)) + return CreateOrUpdateConnectorResponse{}, fmt.Errorf("create connector failed: status=%d body=%s", httpResp.StatusCode, string(respBytes)) } - var out CreateConnectorResponse + var out CreateOrUpdateConnectorResponse if err := json.Unmarshal(respBytes, &out); err != nil { - return CreateConnectorResponse{}, fmt.Errorf("unmarshal create response: %w (body=%s)", err, string(respBytes)) + return CreateOrUpdateConnectorResponse{}, fmt.Errorf("unmarshal create response: %w (body=%s)", err, string(respBytes)) } if out.ConnectorID == "" { - return CreateConnectorResponse{}, fmt.Errorf("create response missing connector_id (body=%s)", string(respBytes)) + return CreateOrUpdateConnectorResponse{}, fmt.Errorf("create response missing connector_id (body=%s)", string(respBytes)) } return out, nil } + +// updateConnector sends a PUT request to update an existing connector. +func (c *Client) updateConnector(ctx context.Context, connectorID string, bodyBytes []byte) error { + httpReq, err := http.NewRequestWithContext(ctx, http.MethodPut, fmt.Sprintf("/_plugins/_ml/connectors/%s", connectorID), bytes.NewReader(bodyBytes)) + if err != nil { + return fmt.Errorf("new request: %w", err) + } + httpReq.Header.Set("Content-Type", "application/json") + + httpResp, err := c.opensearch.Client.Perform(httpReq) + if err != nil { + return fmt.Errorf("perform update request: %w", err) + } + defer func() { + if err := httpResp.Body.Close(); err != nil { + fmt.Printf("error closing response body: %v\n", err) + } + }() + + if httpResp.StatusCode < 200 || httpResp.StatusCode >= 300 { + respBytes, _ := io.ReadAll(httpResp.Body) + return fmt.Errorf("update connector failed: status=%d body=%s", httpResp.StatusCode, string(respBytes)) + } + + return nil +} diff --git a/main.go b/main.go index ec0f582..3edc48f 100644 --- a/main.go +++ b/main.go @@ -117,7 +117,7 @@ func handler(ctx context.Context) error { logger.SetAttr("model_group_id", groupResp.ModelGroupID) - connectorResp, err := client.CreateConnector(ctx, connector) + connectorResp, err := client.CreateOrUpdateConnector(ctx, connector) if err != nil { return logger.WrapError(err) }