From 3a81c558533d45bd073493ba7c0e84db8c3c7944 Mon Sep 17 00:00:00 2001 From: Allison Durham Date: Wed, 21 May 2025 13:33:02 -0700 Subject: [PATCH 1/2] initial channelToken implementation --- acp/api/v1alpha1/task_types.go | 9 +- acp/api/v1alpha1/zz_generated.deepcopy.go | 5 + .../crd/bases/acp.humanlayer.dev_tasks.yaml | 23 +- acp/config/samples/acp_v1alpha1_task.yaml | 6 +- .../controller/task/send_response_url_test.go | 146 ----------- .../controller/task/task_controller.go | 238 +++++++----------- .../task_humanlayerapi_integration_test.go | 198 +++++++++++++++ .../task/task_responseurl_integration_test.go | 176 ------------- acp/internal/server/server.go | 60 +++-- 9 files changed, 363 insertions(+), 498 deletions(-) delete mode 100644 acp/internal/controller/task/send_response_url_test.go create mode 100644 acp/internal/controller/task/task_humanlayerapi_integration_test.go delete mode 100644 acp/internal/controller/task/task_responseurl_integration_test.go diff --git a/acp/api/v1alpha1/task_types.go b/acp/api/v1alpha1/task_types.go index 9a4814ff..53b21c25 100644 --- a/acp/api/v1alpha1/task_types.go +++ b/acp/api/v1alpha1/task_types.go @@ -38,10 +38,13 @@ type TaskSpec struct { // +optional ContextWindow []Message `json:"contextWindow,omitempty"` - // ResponseURL specifies a pre-generated URL that will be used for human contact responses. - // This allows the system to direct responses to a specific endpoint. + // BaseURL specifies the base URL for the human contact channel. // +optional - ResponseURL string `json:"responseURL,omitempty"` + BaseURL string `json:"baseURL,omitempty"` + + // ChannelTokenFrom references a secret containing the token for the human contact channel. + // +optional + ChannelTokenFrom *SecretKeyRef `json:"channelTokenFrom,omitempty"` } // Message represents a single message in the conversation diff --git a/acp/api/v1alpha1/zz_generated.deepcopy.go b/acp/api/v1alpha1/zz_generated.deepcopy.go index 30c11802..3d954a39 100644 --- a/acp/api/v1alpha1/zz_generated.deepcopy.go +++ b/acp/api/v1alpha1/zz_generated.deepcopy.go @@ -978,6 +978,11 @@ func (in *TaskSpec) DeepCopyInto(out *TaskSpec) { (*in)[i].DeepCopyInto(&(*out)[i]) } } + if in.ChannelTokenFrom != nil { + in, out := &in.ChannelTokenFrom, &out.ChannelTokenFrom + *out = new(SecretKeyRef) + **out = **in + } } // DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new TaskSpec. diff --git a/acp/config/crd/bases/acp.humanlayer.dev_tasks.yaml b/acp/config/crd/bases/acp.humanlayer.dev_tasks.yaml index d72f4433..ca366ba7 100644 --- a/acp/config/crd/bases/acp.humanlayer.dev_tasks.yaml +++ b/acp/config/crd/bases/acp.humanlayer.dev_tasks.yaml @@ -85,6 +85,24 @@ spec: required: - name type: object + baseURL: + description: BaseURL specifies the base URL for the human contact + channel. + type: string + channelTokenFrom: + description: ChannelTokenFrom references a secret containing the token + for the human contact channel. + properties: + key: + description: Key is the key in the secret + type: string + name: + description: Name is the name of the secret + type: string + required: + - key + - name + type: object contextWindow: description: |- ContextWindow provides the initial conversation context when creating a Task. @@ -153,11 +171,6 @@ spec: - role type: object type: array - responseURL: - description: |- - ResponseURL specifies a pre-generated URL that will be used for human contact responses. - This allows the system to direct responses to a specific endpoint. - type: string userMessage: description: |- UserMessage is the message to send to the agent. diff --git a/acp/config/samples/acp_v1alpha1_task.yaml b/acp/config/samples/acp_v1alpha1_task.yaml index 9a251404..1e5e0380 100644 --- a/acp/config/samples/acp_v1alpha1_task.yaml +++ b/acp/config/samples/acp_v1alpha1_task.yaml @@ -1,4 +1,4 @@ -apiVersion: acp.humanlayer.dev/v1alpha1 +apiVersion: acp.humanlayer.dev/v1alpha1 kind: Task metadata: name: fetch-example @@ -6,3 +6,7 @@ spec: agentRef: name: web-fetch-agent userMessage: "Please fetch the content from example.com and summarize what's on the site." + baseURL: "https://api.humanlayer.dev" + channelTokenFrom: + name: channel-token-secret + key: token diff --git a/acp/internal/controller/task/send_response_url_test.go b/acp/internal/controller/task/send_response_url_test.go deleted file mode 100644 index 4a2cfa74..00000000 --- a/acp/internal/controller/task/send_response_url_test.go +++ /dev/null @@ -1,146 +0,0 @@ -package task - -import ( - "context" - "encoding/json" - "net/http" - "net/http/httptest" - "sync" - - acp "github.com/humanlayer/agentcontrolplane/acp/api/v1alpha1" - humanlayerapi "github.com/humanlayer/agentcontrolplane/acp/internal/humanlayerapi" - . "github.com/onsi/ginkgo/v2" - . "github.com/onsi/gomega" - "k8s.io/apimachinery/pkg/runtime" - "k8s.io/client-go/tools/record" - "sigs.k8s.io/controller-runtime/pkg/log" - "sigs.k8s.io/controller-runtime/pkg/log/zap" -) - -// initTestReconciler creates a minimal TaskReconciler for testing -func initTestReconciler() (*TaskReconciler, context.Context) { - // Initialize logger - logger := zap.New(zap.UseDevMode(true)) - ctx := context.Background() - ctx = log.IntoContext(ctx, logger) - - // Create a reconciler - scheme := runtime.NewScheme() - err := acp.AddToScheme(scheme) - Expect(err).NotTo(HaveOccurred(), "Failed to add API schema") - - return &TaskReconciler{ - Scheme: scheme, - recorder: record.NewFakeRecorder(10), - }, ctx -} - -var _ = Describe("ResponseURL Functionality", func() { - Context("when sending results to responseURL", func() { - It("successfully sends the result and verifies content", func() { - // Create a channel to synchronize between test and handler - requestReceived := make(chan struct{}) - - // Track the received request for verification - var receivedRequest humanlayerapi.HumanContactInput - var receivedMutex sync.Mutex - - // Create a test server - server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { - // Verify method and content type - Expect(r.Method).To(Equal("POST")) - Expect(r.Header.Get("Content-Type")).To(Equal("application/json")) - - // Decode the request body - decoder := json.NewDecoder(r.Body) - var req humanlayerapi.HumanContactInput - err := decoder.Decode(&req) - Expect(err).NotTo(HaveOccurred()) - - // Store the request for later verification - receivedMutex.Lock() - receivedRequest = req - receivedMutex.Unlock() - - // Send a success response - w.WriteHeader(http.StatusOK) - _, _ = w.Write([]byte(`{"status":"success"}`)) - - // Notify that request was received - close(requestReceived) - })) - defer server.Close() - - // Create a reconciler - reconciler, ctx := initTestReconciler() - - // Test sending result - testMsg := "This is the final task result" - testTask := &acp.Task{ - Spec: acp.TaskSpec{ - ResponseURL: server.URL, - AgentRef: acp.LocalObjectReference{Name: "test-agent"}, - }, - } - err := reconciler.sendFinalResultToResponseURL(ctx, testTask, testMsg) - Expect(err).NotTo(HaveOccurred()) - - // Wait for the request to be processed with a timeout - Eventually(requestReceived).Should(BeClosed(), "Timed out waiting for request to be received") - - // Verify the request content - receivedMutex.Lock() - defer receivedMutex.Unlock() - - // Verify run_id and call_id are set - Expect(receivedRequest.GetRunId()).NotTo(BeEmpty()) - Expect(receivedRequest.GetCallId()).NotTo(BeEmpty()) - - // Verify the message content - Expect(receivedRequest.Spec.Msg).To(Equal(testMsg)) - }) - - It("handles error responses appropriately", func() { - // Create a test server that returns an error - server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { - w.WriteHeader(http.StatusInternalServerError) - _, _ = w.Write([]byte(`{"error":"something went wrong"}`)) - })) - defer server.Close() - - // Create a reconciler - reconciler, ctx := initTestReconciler() - - // Test sending result - testTask := &acp.Task{ - Spec: acp.TaskSpec{ - ResponseURL: server.URL, - AgentRef: acp.LocalObjectReference{Name: "test-agent"}, - }, - } - err := reconciler.sendFinalResultToResponseURL(ctx, testTask, "test message") - - // Should return an error due to non-200 response - Expect(err).To(HaveOccurred()) - Expect(err.Error()).To(ContainSubstring("HTTP error from responseURL (status 500)")) - }) - - It("handles connection errors appropriately", func() { - // Create a reconciler - reconciler, ctx := initTestReconciler() - - // Use an invalid URL to cause a connection error - testTask := &acp.Task{ - Spec: acp.TaskSpec{ - ResponseURL: "http://localhost:1", - AgentRef: acp.LocalObjectReference{Name: "test-agent"}, - }, - } - err := reconciler.sendFinalResultToResponseURL(ctx, testTask, "test message") - - // Should return an error due to connection failure - Expect(err).To(HaveOccurred()) - Expect(err.Error()).To(ContainSubstring("failed to send HTTP request")) - }) - }) -}) diff --git a/acp/internal/controller/task/task_controller.go b/acp/internal/controller/task/task_controller.go index 68870540..d4b50934 100644 --- a/acp/internal/controller/task/task_controller.go +++ b/acp/internal/controller/task/task_controller.go @@ -1,15 +1,11 @@ package task import ( - "bytes" "context" "crypto/rand" - "encoding/json" "errors" "fmt" - "io" "math/big" - "net/http" "time" "github.com/google/uuid" @@ -26,7 +22,7 @@ import ( metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "github.com/humanlayer/agentcontrolplane/acp/internal/adapters" - "github.com/humanlayer/agentcontrolplane/acp/internal/humanlayerapi" + "github.com/humanlayer/agentcontrolplane/acp/internal/humanlayer" "github.com/humanlayer/agentcontrolplane/acp/internal/llmclient" "github.com/humanlayer/agentcontrolplane/acp/internal/mcpmanager" "github.com/humanlayer/agentcontrolplane/acp/internal/validation" @@ -473,9 +469,9 @@ func (r *TaskReconciler) processLLMResponse(ctx context.Context, output *acp.Mes statusUpdate.Status.Error = "" r.recorder.Event(task, corev1.EventTypeNormal, "LLMFinalAnswer", "LLM response received successfully") - // If task has a responseURL, send the final result to that URL - if task.Spec.ResponseURL != "" { - r.notifyResponseURLAsync(task, output.Content) + // If task has BaseURL and ChannelTokenFrom, send the final result via HumanLayer API + if task.Spec.BaseURL != "" && task.Spec.ChannelTokenFrom != nil { + r.notifyHumanLayerAPIAsync(task, output.Content) } // End the task trace with OK status since we have a final answer. @@ -789,47 +785,6 @@ func (r *TaskReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl. return ctrl.Result{}, nil } -// notifyResponseURLAsync sends the final task result to the response URL asynchronously -func (r *TaskReconciler) notifyResponseURLAsync(task *acp.Task, result string) { - go func() { - ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) - defer cancel() - - logger := log.FromContext(ctx) - taskCopy := task.DeepCopy() - - err := r.sendFinalResultToResponseURL(ctx, task, result) - if err != nil { - logger.Error(err, "Failed to send final result to responseURL", - "responseURL", task.Spec.ResponseURL, - "task", fmt.Sprintf("%s/%s", task.Namespace, task.Name)) - - r.recorder.Event(taskCopy, corev1.EventTypeWarning, "ResponseURLError", - fmt.Sprintf("Failed to send result to response URL: %v", err)) - } else { - logger.Info("Successfully sent final result to responseURL", - "responseURL", task.Spec.ResponseURL) - - r.recorder.Event(taskCopy, corev1.EventTypeNormal, "ResponseURLSent", - "Successfully sent result to response URL") - } - }() -} - -// assertAvailablePRNG ensures that a cryptographically secure PRNG is available -func assertAvailablePRNG() { - buf := make([]byte, 1) - _, err := io.ReadFull(rand.Reader, buf) - if err != nil { - panic(fmt.Sprintf("crypto/rand is unavailable: Read() failed with %#v", err)) - } -} - -// init ensures that a cryptographically secure PRNG is available when the package is loaded -func init() { - assertAvailablePRNG() -} - // generateRandomString returns a securely generated random string func generateRandomString(n int) (string, error) { const letters = "0123456789ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz-" @@ -844,130 +799,109 @@ func generateRandomString(n int) (string, error) { return string(ret), nil } -// createHumanContactRequest builds the request payload for sending to a response URL -func createHumanContactRequest(agentName string, result string) ([]byte, error) { - // Use agent name as runId - runID := agentName - // Generate a secure random string for callId - callID, err := generateRandomString(7) - if err != nil { - return nil, fmt.Errorf("failed to generate secure random string: %w", err) +// Defined for testing purposes +var newHumanLayerClientFactory = humanlayer.NewHumanLayerClientFactory + +// sendFinalResultViaHumanLayerAPI sends the final task result using the HumanLayer API client +func (r *TaskReconciler) sendFinalResultViaHumanLayerAPI(ctx context.Context, task *acp.Task, result string) error { + logger := log.FromContext(ctx) + + if task.Spec.BaseURL == "" || task.Spec.ChannelTokenFrom == nil { + logger.Info("Skipping result notification, BaseURL or ChannelTokenFrom not set") + return nil } - spec := humanlayerapi.NewHumanContactSpecInput(result) - input := humanlayerapi.NewHumanContactInput(runID, callID, *spec) - return json.Marshal(input) -} -// isRetryableStatusCode determines if an HTTP status code should trigger a retry -func isRetryableStatusCode(statusCode int) bool { - return statusCode >= 500 || statusCode == 429 -} + // Get the channel token from the secret + var secret corev1.Secret + if err := r.Get(ctx, client.ObjectKey{ + Namespace: task.Namespace, + Name: task.Spec.ChannelTokenFrom.Name, + }, &secret); err != nil { + return fmt.Errorf("failed to get channel token secret: %w", err) + } -// sendFinalResultToResponseURL sends the final task result to the specified URL -// It includes retry logic for transient errors and better error categorization -func (r *TaskReconciler) sendFinalResultToResponseURL(ctx context.Context, task *acp.Task, result string) error { - logger := log.FromContext(ctx) - logger.Info("Sending final result to responseURL", "responseURL", task.Spec.ResponseURL) + channelToken := string(secret.Data[task.Spec.ChannelTokenFrom.Key]) + if channelToken == "" { + return fmt.Errorf("channel token is empty in secret %s/%s key %s", + task.Namespace, task.Spec.ChannelTokenFrom.Name, task.Spec.ChannelTokenFrom.Key) + } - // Create the request body - jsonData, err := createHumanContactRequest(task.Spec.AgentRef.Name, result) + // Create HumanLayer client factory with the BaseURL + clientFactory, err := newHumanLayerClientFactory(task.Spec.BaseURL) if err != nil { - return fmt.Errorf("failed to marshal request body: %w", err) + return fmt.Errorf("failed to create HumanLayer client factory: %w", err) } - // Define retry parameters - maxRetries := 3 - initialDelay := 1 * time.Second + // Create HumanLayer client + client := clientFactory.NewHumanLayerClient() + client.SetAPIKey(channelToken) // Use token from secret + client.SetRunID(task.Spec.AgentRef.Name) // Use agent name as runID - // Retry the operation with exponential backoff - return retryWithBackoff(ctx, maxRetries, initialDelay, task.Spec.ResponseURL, func() (bool, error) { - // Create the HTTP request - req, err := http.NewRequestWithContext(ctx, "POST", task.Spec.ResponseURL, bytes.NewBuffer(jsonData)) - if err != nil { - return false, fmt.Errorf("failed to create HTTP request: %w", err) // Non-retryable - } + // Generate a random callID + callID, err := generateRandomString(7) + if err != nil { + return fmt.Errorf("failed to generate callID: %w", err) + } + client.SetCallID(callID) - // Set headers - req.Header.Set("Content-Type", "application/json") - req.Header.Set("User-Agent", "ACP-Task-Controller") + // Retry up to 3 times + maxRetries := 3 + for attempt := 0; attempt < maxRetries; attempt++ { + // Send the request to HumanLayer API + humanContact, statusCode, err := client.RequestHumanContact(ctx, result) - // Send the request - client := &http.Client{ - Timeout: 5 * time.Second, + // Check for success + if err == nil && statusCode >= 200 && statusCode < 300 { + logger.Info("Successfully sent final result via HumanLayer API", + "baseURL", task.Spec.BaseURL, + "statusCode", statusCode, + "humanContactID", humanContact.GetCallId()) + return nil } - resp, err := client.Do(req) + + // Log the error if err != nil { - return true, fmt.Errorf("failed to send HTTP request: %w", err) // Retryable + logger.Error(err, "Failed to send human contact request", + "attempt", attempt+1, + "baseURL", task.Spec.BaseURL) + } else { + logger.Error(fmt.Errorf("HTTP error %d", statusCode), + "Failed to send human contact request", + "attempt", attempt+1, + "baseURL", task.Spec.BaseURL) } - // Ensure we close the response body - defer func() { - if resp != nil && resp.Body != nil { - if err := resp.Body.Close(); err != nil { - logger.Error(err, "Failed to close response body") - } - } - }() - - // Check response status - if resp.StatusCode < 200 || resp.StatusCode >= 300 { - body, readErr := io.ReadAll(resp.Body) - bodyStr := "" - if readErr != nil { - bodyStr = fmt.Sprintf("[error reading response body: %v]", readErr) - } else { - bodyStr = string(body) - } - - // Return whether this error is retryable - retryable := isRetryableStatusCode(resp.StatusCode) - return retryable, fmt.Errorf("HTTP error from responseURL (status %d): %s", resp.StatusCode, bodyStr) + // Exponential backoff + if attempt < maxRetries-1 { + time.Sleep(time.Second * time.Duration(1< 0 { - logger.Info("Retrying request to responseURL", - "responseURL", responseURL, - "attempt", attempt+1, - "maxRetries", maxRetries) - - // Wait before retrying, with exponential backoff - select { - case <-time.After(delay): - delay *= 2 // Exponential backoff - case <-ctx.Done(): - return fmt.Errorf("context cancelled during retry: %w", ctx.Err()) - } - } +// notifyHumanLayerAPIAsync sends the notification asynchronously +func (r *TaskReconciler) notifyHumanLayerAPIAsync(task *acp.Task, result string) { + go func() { + ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) + defer cancel() - shouldRetry, err := operation() - if err == nil { - return nil // Success - } + logger := log.FromContext(ctx) + taskCopy := task.DeepCopy() - lastErr = err - if !shouldRetry { - return err // Non-retryable error + err := r.sendFinalResultViaHumanLayerAPI(ctx, taskCopy, result) + if err != nil { + logger.Error(err, "Failed to send final result via HumanLayer API", + "baseURL", task.Spec.BaseURL) + r.recorder.Event(taskCopy, corev1.EventTypeWarning, "HumanLayerAPIError", + fmt.Sprintf("Failed to send result via HumanLayer API: %v", err)) + } else { + logger.Info("Successfully sent final result via HumanLayer API", + "baseURL", task.Spec.BaseURL) + r.recorder.Event(taskCopy, corev1.EventTypeNormal, "HumanLayerAPISent", + "Successfully sent result via HumanLayer API") } - } - - return fmt.Errorf("failed after %d attempts: %w", maxRetries, lastErr) + }() } func (r *TaskReconciler) SetupWithManager(mgr ctrl.Manager) error { diff --git a/acp/internal/controller/task/task_humanlayerapi_integration_test.go b/acp/internal/controller/task/task_humanlayerapi_integration_test.go new file mode 100644 index 00000000..d318bbb8 --- /dev/null +++ b/acp/internal/controller/task/task_humanlayerapi_integration_test.go @@ -0,0 +1,198 @@ +package task + +import ( + "context" + + acp "github.com/humanlayer/agentcontrolplane/acp/api/v1alpha1" + "github.com/humanlayer/agentcontrolplane/acp/internal/humanlayer" + "github.com/humanlayer/agentcontrolplane/acp/internal/humanlayerapi" + "github.com/humanlayer/agentcontrolplane/acp/internal/llmclient" + . "github.com/onsi/ginkgo/v2" + . "github.com/onsi/gomega" + "go.opentelemetry.io/otel/trace/noop" + corev1 "k8s.io/api/core/v1" + v1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/types" + "k8s.io/client-go/tools/record" + "sigs.k8s.io/controller-runtime/pkg/reconcile" +) + +type MockLLMClient struct { + SendRequestResponse *acp.Message + SendRequestError error +} + +func (m *MockLLMClient) SendRequest(ctx context.Context, messages []acp.Message, tools []llmclient.Tool) (*acp.Message, error) { + return m.SendRequestResponse, m.SendRequestError +} + +// Mock HumanLayer Client +type MockHumanLayerClientFactory struct { + client *MockHumanLayerClient +} + +type MockHumanLayerClient struct { + apiKey string + runID string + callID string + baseURL string + requests []string + responses []*humanlayerapi.HumanContactOutput + statusCodes []int + errors []error + callCount int +} + +func (f *MockHumanLayerClientFactory) NewHumanLayerClient() humanlayer.HumanLayerClientWrapper { + return f.client +} + +func (c *MockHumanLayerClient) SetSlackConfig(slackConfig *acp.SlackChannelConfig) {} +func (c *MockHumanLayerClient) SetEmailConfig(emailConfig *acp.EmailChannelConfig) {} +func (c *MockHumanLayerClient) SetFunctionCallSpec(functionName string, args map[string]interface{}) { +} + +func (c *MockHumanLayerClient) SetCallID(callID string) { + c.callID = callID +} + +func (c *MockHumanLayerClient) SetRunID(runID string) { + c.runID = runID +} + +func (c *MockHumanLayerClient) SetAPIKey(apiKey string) { + c.apiKey = apiKey +} + +func (c *MockHumanLayerClient) RequestApproval(ctx context.Context) (*humanlayerapi.FunctionCallOutput, int, error) { + return nil, 200, nil +} + +func (c *MockHumanLayerClient) RequestHumanContact(ctx context.Context, userMsg string) (*humanlayerapi.HumanContactOutput, int, error) { + c.requests = append(c.requests, userMsg) + + if c.callCount < len(c.responses) { + response := c.responses[c.callCount] + statusCode := c.statusCodes[c.callCount] + err := c.errors[c.callCount] + c.callCount++ + return response, statusCode, err + } + + // Default response + output := humanlayerapi.NewHumanContactOutput("test-run", c.callID, *humanlayerapi.NewHumanContactSpecOutput("Test result")) + return output, 200, nil +} + +func (c *MockHumanLayerClient) GetFunctionCallStatus(ctx context.Context) (*humanlayerapi.FunctionCallOutput, int, error) { + return nil, 200, nil +} + +func (c *MockHumanLayerClient) GetHumanContactStatus(ctx context.Context) (*humanlayerapi.HumanContactOutput, int, error) { + return nil, 200, nil +} + +func reconcilerWithMockLLM(newLLMClient func(ctx context.Context, llm acp.LLM, apiKey string) (llmclient.LLMClient, error)) (*TaskReconciler, *record.FakeRecorder) { + recorder := record.NewFakeRecorder(10) + tracer := noop.NewTracerProvider().Tracer("test") + return &TaskReconciler{ + Client: k8sClient, + Scheme: k8sClient.Scheme(), + recorder: recorder, + newLLMClient: newLLMClient, + Tracer: tracer, + }, recorder +} + +var _ = Describe("Task Controller with HumanLayer API", func() { + Context("using ChannelTokenFrom with secret reference", func() { + var ( + mockLLMClient *MockLLMClient + mockHumanLayerClient *MockHumanLayerClient + mockHumanLayerFactory *MockHumanLayerClientFactory + originalFactoryFunction func(string) (humanlayer.HumanLayerClientFactory, error) + ) + + BeforeEach(func() { + _, _, _, teardown := setupSuiteObjects(ctx) + DeferCleanup(teardown) + + mockLLMClient = &MockLLMClient{ + SendRequestResponse: &acp.Message{Content: "Test result"}, + } + + // Create a very simple mock HumanLayer client that just stores the API key + mockHumanLayerClient = &MockHumanLayerClient{ + responses: []*humanlayerapi.HumanContactOutput{humanlayerapi.NewHumanContactOutput("test-run", "test-call", *humanlayerapi.NewHumanContactSpecOutput("Test result"))}, + statusCodes: []int{200}, + errors: []error{nil}, + } + mockHumanLayerFactory = &MockHumanLayerClientFactory{ + client: mockHumanLayerClient, + } + + // Save original factory function and replace with mock + originalFactoryFunction = newHumanLayerClientFactory + newHumanLayerClientFactory = func(baseURL string) (humanlayer.HumanLayerClientFactory, error) { + mockHumanLayerClient.baseURL = baseURL + return mockHumanLayerFactory, nil + } + + DeferCleanup(func() { + // Restore original factory function + newHumanLayerClientFactory = originalFactoryFunction + }) + }) + + It("retrieves channel token from secret and uses it as API key", func() { + // Create a secret containing the token + secret := &corev1.Secret{ + ObjectMeta: v1.ObjectMeta{Name: "test-channel-token", Namespace: "default"}, + Data: map[string][]byte{ + "token": []byte("hl_testtoken"), + }, + } + Expect(k8sClient.Create(ctx, secret)).To(Succeed()) + DeferCleanup(func() { Expect(k8sClient.Delete(ctx, secret)).To(Succeed()) }) + + task := &acp.Task{ + ObjectMeta: v1.ObjectMeta{Name: "test-task", Namespace: "default"}, + Spec: acp.TaskSpec{ + AgentRef: acp.LocalObjectReference{Name: testAgent.Name}, + UserMessage: "Test message", + BaseURL: "https://api.example.com", + ChannelTokenFrom: &acp.SecretKeyRef{ + Name: "test-channel-token", + Key: "token", + }, + }, + } + Expect(k8sClient.Create(ctx, task)).To(Succeed()) + DeferCleanup(func() { Expect(k8sClient.Delete(ctx, task)).To(Succeed()) }) + + mockLLMClientFn := func(ctx context.Context, llm acp.LLM, apiKey string) (llmclient.LLMClient, error) { + return mockLLMClient, nil + } + reconciler, _ := reconcilerWithMockLLM(mockLLMClientFn) + + for i := 0; i < 3; i++ { + result, err := reconciler.Reconcile(ctx, reconcile.Request{ + NamespacedName: types.NamespacedName{Name: task.Name, Namespace: "default"}, + }) + Expect(err).NotTo(HaveOccurred()) + if i < 2 { + Expect(result.Requeue).To(BeTrue()) + } + Expect(k8sClient.Get(ctx, types.NamespacedName{Name: task.Name, Namespace: "default"}, task)).To(Succeed()) + } + + Expect(task.Status.Phase).To(Equal(acp.TaskPhaseFinalAnswer)) + Expect(task.Status.Output).To(Equal("Test result")) + + // Verify that the token from the secret was correctly used as the API key + Expect(mockHumanLayerClient.baseURL).To(Equal("https://api.example.com")) + Expect(mockHumanLayerClient.apiKey).To(Equal("hl_testtoken")) + Expect(mockHumanLayerClient.runID).To(Equal(testAgent.Name)) + }) + }) +}) diff --git a/acp/internal/controller/task/task_responseurl_integration_test.go b/acp/internal/controller/task/task_responseurl_integration_test.go deleted file mode 100644 index 1f2fc0a0..00000000 --- a/acp/internal/controller/task/task_responseurl_integration_test.go +++ /dev/null @@ -1,176 +0,0 @@ -package task - -import ( - "context" - "encoding/json" - "net/http" - "net/http/httptest" - "sync" - "time" - - acp "github.com/humanlayer/agentcontrolplane/acp/api/v1alpha1" - "github.com/humanlayer/agentcontrolplane/acp/internal/humanlayerapi" - "github.com/humanlayer/agentcontrolplane/acp/internal/llmclient" - . "github.com/onsi/ginkgo/v2" - . "github.com/onsi/gomega" - "go.opentelemetry.io/otel/trace/noop" - v1 "k8s.io/apimachinery/pkg/apis/meta/v1" - "k8s.io/apimachinery/pkg/types" - "k8s.io/client-go/tools/record" - "sigs.k8s.io/controller-runtime/pkg/reconcile" -) - -// MockLLMClient for testing -type MockLLMClient struct { - SendRequestResponse *acp.Message - SendRequestError error -} - -func (m *MockLLMClient) SendRequest(ctx context.Context, messages []acp.Message, tools []llmclient.Tool) (*acp.Message, error) { - return m.SendRequestResponse, m.SendRequestError -} - -// Creates a TaskReconciler with a custom LLM client factory -func reconcilerWithMockLLM(newLLMClient func(ctx context.Context, llm acp.LLM, apiKey string) (llmclient.LLMClient, error)) (*TaskReconciler, *record.FakeRecorder) { - recorder := record.NewFakeRecorder(10) - tracer := noop.NewTracerProvider().Tracer("test") - - r := &TaskReconciler{ - Client: k8sClient, - Scheme: k8sClient.Scheme(), - recorder: recorder, - newLLMClient: newLLMClient, - Tracer: tracer, - } - return r, recorder -} - -var _ = Describe("Task Controller with ResponseURL", func() { - Context("when Task has ResponseURL", func() { - var ( - server *httptest.Server - requestReceived chan struct{} - receivedRequest humanlayerapi.HumanContactInput - receivedMutex sync.Mutex - mockLLMClient *MockLLMClient - ) - - BeforeEach(func() { - // Set up the secret, LLM, and agent - _, _, _, teardown := setupSuiteObjects(ctx) - DeferCleanup(teardown) - - // Set up the mock LLM client to return a final answer - mockLLMClient = &MockLLMClient{ - SendRequestResponse: &acp.Message{ - Content: "This is the final answer", - }, - } - - // Set up the test server to receive the HTTP request - requestReceived = make(chan struct{}) - server = httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { - // Decode the request body - decoder := json.NewDecoder(r.Body) - var req humanlayerapi.HumanContactInput - Expect(decoder.Decode(&req)).To(Succeed()) - - // Store the request for later verification - receivedMutex.Lock() - receivedRequest = req - receivedMutex.Unlock() - - // Send a success response - w.WriteHeader(http.StatusOK) - _, _ = w.Write([]byte(`{"status":"success"}`)) - - // Notify that request was received - close(requestReceived) - })) - DeferCleanup(server.Close) - }) - - It("sends the final result to the ResponseURL", func() { - By("creating a task with ResponseURL") - // Create a task with ResponseURL - customTask := &acp.Task{ - ObjectMeta: v1.ObjectMeta{ - Name: "task-with-responseurl", - Namespace: "default", - }, - Spec: acp.TaskSpec{ - AgentRef: acp.LocalObjectReference{ - Name: testAgent.Name, - }, - UserMessage: "What is the capital of France?", - ResponseURL: server.URL, - }, - } - Expect(k8sClient.Create(ctx, customTask)).To(Succeed()) - task := customTask - DeferCleanup(func() { - Expect(k8sClient.Delete(ctx, task)).To(Succeed()) - }) - - // Create a mock LLM client factory - mockLLMClientFn := func(ctx context.Context, llm acp.LLM, apiKey string) (llmclient.LLMClient, error) { - return mockLLMClient, nil - } - - // Get reconciler with mock LLM client - By("creating reconciler with mock LLM client") - reconciler, _ := reconcilerWithMockLLM(mockLLMClientFn) - - By("reconciling the task to initialize it") - // First reconcile (should initialize the task) - result, err := reconciler.Reconcile(ctx, reconcile.Request{ - NamespacedName: types.NamespacedName{Name: task.Name, Namespace: "default"}, - }) - Expect(err).NotTo(HaveOccurred()) - Expect(result.Requeue).To(BeTrue()) - - // Get the updated task - Expect(k8sClient.Get(ctx, types.NamespacedName{Name: task.Name, Namespace: "default"}, task)).To(Succeed()) - Expect(task.Status.Phase).To(Equal(acp.TaskPhaseInitializing)) - - By("reconciling the task to prepare for LLM") - // Second reconcile (should validate agent and prepare for LLM) - result, err = reconciler.Reconcile(ctx, reconcile.Request{ - NamespacedName: types.NamespacedName{Name: task.Name, Namespace: "default"}, - }) - Expect(err).NotTo(HaveOccurred()) - Expect(result.Requeue).To(BeTrue()) - - // Get the updated task - Expect(k8sClient.Get(ctx, types.NamespacedName{Name: task.Name, Namespace: "default"}, task)).To(Succeed()) - Expect(task.Status.Phase).To(Equal(acp.TaskPhaseReadyForLLM)) - - By("reconciling the task to get final answer") - // Third reconcile (should send to LLM and get final answer) - result, err = reconciler.Reconcile(ctx, reconcile.Request{ - NamespacedName: types.NamespacedName{Name: task.Name, Namespace: "default"}, - }) - Expect(err).NotTo(HaveOccurred()) - - // Get the updated task - Expect(k8sClient.Get(ctx, types.NamespacedName{Name: task.Name, Namespace: "default"}, task)).To(Succeed()) - Expect(task.Status.Phase).To(Equal(acp.TaskPhaseFinalAnswer)) - Expect(task.Status.Output).To(Equal("This is the final answer")) - - By("waiting for HTTP request to be received") - // Wait for the HTTP request to be made - select { - case <-requestReceived: - // Request was received, continue with assertions - case <-time.After(5 * time.Second): - Fail("Timed out waiting for ResponseURL request") - } - - By("verifying request content") - // Verify the request content - receivedMutex.Lock() - defer receivedMutex.Unlock() - Expect(receivedRequest.Spec.Msg).To(Equal("This is the final answer")) - }) - }) -}) diff --git a/acp/internal/server/server.go b/acp/internal/server/server.go index 00d28314..cc069961 100644 --- a/acp/internal/server/server.go +++ b/acp/internal/server/server.go @@ -34,14 +34,20 @@ const ( transportTypeHTTP = "http" ) +// ChannelTokenRef defines a reference to a secret containing the channel token +type ChannelTokenRef struct { + Name string `json:"name"` // Name of the secret + Key string `json:"key"` // Key in the secret data +} + // CreateTaskRequest defines the structure of the request body for creating a task type CreateTaskRequest struct { - Namespace string `json:"namespace,omitempty"` // Optional, defaults to "default" - AgentName string `json:"agentName"` // Required - UserMessage string `json:"userMessage,omitempty"` // Optional if contextWindow is provided - ContextWindow []acp.Message `json:"contextWindow,omitempty"` // Optional if userMessage is provided - ResponseURL string `json:"responseURL,omitempty"` // Optional, URL for receiving task results - ResponseUrl string `json:"responseUrl,omitempty"` // Alternative casing for responseURL (deprecated) + Namespace string `json:"namespace,omitempty"` // Optional, defaults to "default" + AgentName string `json:"agentName"` // Required + UserMessage string `json:"userMessage,omitempty"` // Optional if contextWindow is provided + ContextWindow []acp.Message `json:"contextWindow,omitempty"` // Optional if userMessage is provided + BaseURL string `json:"baseURL,omitempty"` // Optional, base URL for the contact channel + ChannelTokenFrom *ChannelTokenRef `json:"channelTokenFrom,omitempty"` // Optional, reference to secret containing the token } // CreateAgentRequest defines the structure of the request body for creating an agent @@ -592,6 +598,17 @@ func (s *APIServer) getStatus(c *gin.Context) { }) } +// sanitizeTask removes sensitive information from a Task before returning it via API +func sanitizeTask(task acp.Task) acp.Task { + // Create a copy to avoid modifying the original + sanitized := task.DeepCopy() + + // Remove sensitive fields + sanitized.Spec.ChannelTokenFrom = nil + + return *sanitized +} + func (s *APIServer) listTasks(c *gin.Context) { ctx := c.Request.Context() logger := log.FromContext(ctx) @@ -616,7 +633,13 @@ func (s *APIServer) listTasks(c *gin.Context) { return } - c.JSON(http.StatusOK, taskList.Items) + // Sanitize sensitive information before returning + sanitizedTasks := make([]acp.Task, len(taskList.Items)) + for i, task := range taskList.Items { + sanitizedTasks[i] = sanitizeTask(task) + } + + c.JSON(http.StatusOK, sanitizedTasks) } func (s *APIServer) getTask(c *gin.Context) { @@ -643,7 +666,9 @@ func (s *APIServer) getTask(c *gin.Context) { return } - c.JSON(http.StatusOK, task) + // Sanitize the task before returning + sanitizedTask := sanitizeTask(task) + c.JSON(http.StatusOK, sanitizedTask) } func (s *APIServer) resourceExists(ctx context.Context, obj client.Object, namespace, name string) (bool, error) { @@ -1195,10 +1220,14 @@ func (s *APIServer) createTask(c *gin.Context) { return } - // Handle both responseURL and responseUrl fields (with responseURL taking precedence) - responseURL := req.ResponseURL - if responseURL == "" && req.ResponseUrl != "" { - responseURL = req.ResponseUrl + // Extract the baseURL and channelTokenFrom fields + baseURL := req.BaseURL + var channelTokenFrom *acp.SecretKeyRef + if req.ChannelTokenFrom != nil { + channelTokenFrom = &acp.SecretKeyRef{ + Name: req.ChannelTokenFrom.Name, + Key: req.ChannelTokenFrom.Key, + } } // Check if agent exists @@ -1229,9 +1258,10 @@ func (s *APIServer) createTask(c *gin.Context) { AgentRef: acp.LocalObjectReference{ Name: req.AgentName, }, - UserMessage: req.UserMessage, - ContextWindow: req.ContextWindow, - ResponseURL: responseURL, + UserMessage: req.UserMessage, + ContextWindow: req.ContextWindow, + BaseURL: baseURL, + ChannelTokenFrom: channelTokenFrom, }, } From 5248628bebccb2c81dd30e33930e9a5880297a21 Mon Sep 17 00:00:00 2001 From: Allison Durham Date: Wed, 21 May 2025 15:21:05 -0700 Subject: [PATCH 2/2] handle channelToken better --- acp/config/localdev/kustomization.yaml | 2 +- acp/internal/server/server.go | 46 ++++++++++++++++++++------ 2 files changed, 36 insertions(+), 12 deletions(-) diff --git a/acp/config/localdev/kustomization.yaml b/acp/config/localdev/kustomization.yaml index 6ac4f8cd..9dec8b1a 100644 --- a/acp/config/localdev/kustomization.yaml +++ b/acp/config/localdev/kustomization.yaml @@ -26,4 +26,4 @@ patches: images: - name: controller newName: controller - newTag: "202505141822" + newTag: "202505211432" diff --git a/acp/internal/server/server.go b/acp/internal/server/server.go index cc069961..c72a921b 100644 --- a/acp/internal/server/server.go +++ b/acp/internal/server/server.go @@ -42,12 +42,12 @@ type ChannelTokenRef struct { // CreateTaskRequest defines the structure of the request body for creating a task type CreateTaskRequest struct { - Namespace string `json:"namespace,omitempty"` // Optional, defaults to "default" - AgentName string `json:"agentName"` // Required - UserMessage string `json:"userMessage,omitempty"` // Optional if contextWindow is provided - ContextWindow []acp.Message `json:"contextWindow,omitempty"` // Optional if userMessage is provided - BaseURL string `json:"baseURL,omitempty"` // Optional, base URL for the contact channel - ChannelTokenFrom *ChannelTokenRef `json:"channelTokenFrom,omitempty"` // Optional, reference to secret containing the token + Namespace string `json:"namespace,omitempty"` // Optional, defaults to "default" + AgentName string `json:"agentName"` // Required + UserMessage string `json:"userMessage,omitempty"` // Optional if contextWindow is provided + ContextWindow []acp.Message `json:"contextWindow,omitempty"` // Optional if userMessage is provided + BaseURL string `json:"baseURL,omitempty"` // Optional, base URL for the contact channel + ChannelToken string `json:"channelToken,omitempty"` // Optional, token for the contact channel API } // CreateAgentRequest defines the structure of the request body for creating an agent @@ -1220,13 +1220,37 @@ func (s *APIServer) createTask(c *gin.Context) { return } - // Extract the baseURL and channelTokenFrom fields + // Extract the baseURL and channelToken fields baseURL := req.BaseURL + channelToken := req.ChannelToken + + // Create a secret for the channel token if provided var channelTokenFrom *acp.SecretKeyRef - if req.ChannelTokenFrom != nil { + if channelToken != "" { + // Generate a secret name based on the task + secretName := fmt.Sprintf("channel-token-%s", uuid.New().String()[:8]) + + // Create the secret + secret := &corev1.Secret{ + ObjectMeta: metav1.ObjectMeta{ + Name: secretName, + Namespace: namespace, + }, + Data: map[string][]byte{ + "token": []byte(channelToken), + }, + } + + if err := s.client.Create(ctx, secret); err != nil { + logger.Error(err, "Failed to create channel token secret") + c.JSON(http.StatusInternalServerError, gin.H{"error": "Failed to create channel token secret: " + err.Error()}) + return + } + + // Reference the secret channelTokenFrom = &acp.SecretKeyRef{ - Name: req.ChannelTokenFrom.Name, - Key: req.ChannelTokenFrom.Key, + Name: secretName, + Key: "token", } } @@ -1273,5 +1297,5 @@ func (s *APIServer) createTask(c *gin.Context) { } // Return the created task - c.JSON(http.StatusCreated, task) + c.JSON(http.StatusCreated, sanitizeTask(*task)) }