From 22898bcf94fbe3c07bc211f1b31e07923b4fa64f Mon Sep 17 00:00:00 2001 From: Allison Durham Date: Mon, 12 May 2025 18:04:18 -0700 Subject: [PATCH 1/8] lots of junk --- acp/api/v1alpha1/task_types.go | 5 + .../crd/bases/acp.humanlayer.dev_tasks.yaml | 5 + acp/config/localdev/kustomization.yaml | 2 +- acp/go.mod | 14 +- acp/go.sum | 30 +-- .../controller/task/send_response_url_test.go | 132 +++++++++++++ .../controller/task/task_controller.go | 74 ++++++++ .../task/task_responseurl_integration_test.go | 176 ++++++++++++++++++ 8 files changed, 400 insertions(+), 38 deletions(-) create mode 100644 acp/internal/controller/task/send_response_url_test.go create mode 100644 acp/internal/controller/task/task_responseurl_integration_test.go diff --git a/acp/api/v1alpha1/task_types.go b/acp/api/v1alpha1/task_types.go index 92653d5..ad6f80c 100644 --- a/acp/api/v1alpha1/task_types.go +++ b/acp/api/v1alpha1/task_types.go @@ -37,6 +37,11 @@ type TaskSpec struct { // for the ongoing conversation. // +optional ContextWindow []Message `json:"contextWindow,omitempty"` + + // ResponseUrl specifies a pre-generated URL that will be used for human contact responses. + // This allows the system to direct responses to a specific endpoint. + // +optional + ResponseUrl string `json:"responseUrl,omitempty"` } // Message represents a single message in the conversation diff --git a/acp/config/crd/bases/acp.humanlayer.dev_tasks.yaml b/acp/config/crd/bases/acp.humanlayer.dev_tasks.yaml index 9c36b61..f3693f9 100644 --- a/acp/config/crd/bases/acp.humanlayer.dev_tasks.yaml +++ b/acp/config/crd/bases/acp.humanlayer.dev_tasks.yaml @@ -153,6 +153,11 @@ spec: - role type: object type: array + responseUrl: + description: |- + ResponseUrl specifies a pre-generated URL that will be used for human contact responses. + This allows the system to direct responses to a specific endpoint. + type: string userMessage: description: |- UserMessage is the message to send to the agent. diff --git a/acp/config/localdev/kustomization.yaml b/acp/config/localdev/kustomization.yaml index a2d9560..551b350 100644 --- a/acp/config/localdev/kustomization.yaml +++ b/acp/config/localdev/kustomization.yaml @@ -26,4 +26,4 @@ patches: images: - name: controller newName: controller - newTag: "202504181049" + newTag: "202505121745" diff --git a/acp/go.mod b/acp/go.mod index 533bcd6..b0c03ff 100644 --- a/acp/go.mod +++ b/acp/go.mod @@ -3,10 +3,11 @@ module github.com/humanlayer/agentcontrolplane/acp go 1.24.0 require ( + github.com/gin-gonic/gin v1.10.0 github.com/mark3labs/mcp-go v0.15.0 github.com/onsi/ginkgo/v2 v2.23.2 github.com/onsi/gomega v1.36.2 - github.com/openai/openai-go v0.1.0-alpha.59 + github.com/stretchr/testify v1.10.0 github.com/tmc/langchaingo v0.1.13 go.opentelemetry.io/otel v1.34.0 go.opentelemetry.io/otel/exporters/otlp/otlpmetric/otlpmetrichttp v1.34.0 @@ -33,12 +34,10 @@ require ( github.com/bytedance/sonic v1.13.2 // indirect github.com/bytedance/sonic/loader v0.2.4 // indirect github.com/cloudwego/base64x v0.1.5 // indirect - github.com/cloudwego/iasm v0.2.0 // indirect github.com/dlclark/regexp2 v1.10.0 // indirect github.com/gabriel-vasile/mimetype v1.4.8 // indirect github.com/gage-technologies/mistral-go v1.1.0 // indirect github.com/gin-contrib/sse v1.1.0 // indirect - github.com/gin-gonic/gin v1.10.0 // indirect github.com/go-playground/locales v0.14.1 // indirect github.com/go-playground/universal-translator v0.18.1 // indirect github.com/go-playground/validator/v10 v10.26.0 // indirect @@ -53,10 +52,7 @@ require ( github.com/mattn/go-isatty v0.0.20 // indirect github.com/pelletier/go-toml/v2 v2.2.4 // indirect github.com/pkoukk/tiktoken-go v0.1.6 // indirect - github.com/tidwall/gjson v1.14.4 // indirect - github.com/tidwall/match v1.1.1 // indirect - github.com/tidwall/pretty v1.2.1 // indirect - github.com/tidwall/sjson v1.2.5 // indirect + github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2 // indirect github.com/twitchyliquid64/golang-asm v0.15.1 // indirect github.com/ugorji/go/codec v1.2.12 // indirect github.com/yosida95/uritemplate/v3 v3.0.2 // indirect @@ -106,7 +102,7 @@ require ( github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd // indirect github.com/modern-go/reflect2 v1.0.2 // indirect github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 // indirect - github.com/pkg/errors v0.9.1 // indirect + github.com/pkg/errors v0.9.1 github.com/prometheus/client_golang v1.19.1 // indirect github.com/prometheus/client_model v0.6.1 // indirect github.com/prometheus/common v0.55.0 // indirect @@ -119,7 +115,7 @@ require ( go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.53.0 // indirect go.opentelemetry.io/otel/exporters/otlp/otlptrace v1.34.0 // indirect go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc v1.27.0 // indirect - go.opentelemetry.io/otel/metric v1.34.0 + go.opentelemetry.io/otel/metric v1.34.0 // indirect go.opentelemetry.io/otel/trace v1.34.0 go.opentelemetry.io/proto/otlp v1.5.0 // indirect go.uber.org/multierr v1.11.0 // indirect diff --git a/acp/go.sum b/acp/go.sum index 0c2ecdc..7c5efd5 100644 --- a/acp/go.sum +++ b/acp/go.sum @@ -41,7 +41,6 @@ github.com/cespare/xxhash/v2 v2.3.0/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XL github.com/client9/misspell v0.3.4/go.mod h1:qj6jICC3Q7zFZvVWo7KLAzC3yx5G7kyvSDkc90ppPyw= github.com/cloudwego/base64x v0.1.5 h1:XPciSp1xaq2VCSt6lF0phncD4koWyULpl5bUxbfCyP4= github.com/cloudwego/base64x v0.1.5/go.mod h1:0zlkT4Wn5C6NdauXdJRhSKRlJvmclQ1hhJgA0rcu/8w= -github.com/cloudwego/iasm v0.2.0 h1:1KNIy1I1H9hNNFEEH3DVnI4UujN+1zjpuk6gwHLTssg= github.com/cloudwego/iasm v0.2.0/go.mod h1:8rXZaNYT2n95jn+zTI1sDr+IgcD2GVs0nlbbQPiEFhY= github.com/cncf/udpa/go v0.0.0-20191209042840-269d4d468f6f/go.mod h1:M8M6+tZqaGXZJjfX53e64911xZQV5JYwmTeXPW+k8Sc= github.com/cpuguy83/go-md2man/v2 v2.0.4/go.mod h1:tgQtvFlXSQOSOSIRvRPT7W67SCa46tRHOmNcaadrF8o= @@ -91,6 +90,8 @@ github.com/go-openapi/jsonreference v0.20.2/go.mod h1:Bl1zwGIM8/wsvqjsOQLJ/SH+En github.com/go-openapi/swag v0.22.3/go.mod h1:UzaqsxGiab7freDnrUUra0MwWfN/q7tE4j+VcZ0yl14= github.com/go-openapi/swag v0.23.0 h1:vsEVJDUo2hPJ2tu0/Xc+4noaxyEffXNIs3cOULZ+GrE= github.com/go-openapi/swag v0.23.0/go.mod h1:esZ8ITTYEsH1V2trKHjAN8Ai7xHb8RV+YSZ577vPjgQ= +github.com/go-playground/assert/v2 v2.2.0 h1:JvknZsQTYeFEAhQwI4qEt9cyV5ONwRHC+lYKSsYSR8s= +github.com/go-playground/assert/v2 v2.2.0/go.mod h1:VDjEfimB/XKnb+ZQfWdccd7VUvScMdVu0Titje2rxJ4= github.com/go-playground/locales v0.14.1 h1:EWaQ/wswjilfKLTECiXz7Rh+3BjFhfDFKv/oXslEjJA= github.com/go-playground/locales v0.14.1/go.mod h1:hxrqLVvrK65+Rwrd5Fc6F2O76J/NuW9t0sjnWqG1slY= github.com/go-playground/universal-translator v0.18.1 h1:Bcnm0ZwsGyWbCzImXv+pAJnYK9S473LQFuzCbDbfSFY= @@ -190,8 +191,6 @@ github.com/onsi/ginkgo/v2 v2.23.2 h1:LYLd7Wz401p0N7xR8y7WL6D2QZwKpbirDg0EVIvzvMM github.com/onsi/ginkgo/v2 v2.23.2/go.mod h1:zXTP6xIp3U8aVuXN8ENK9IXRaTjFnpVB9mGmaSRvxnM= github.com/onsi/gomega v1.36.2 h1:koNYke6TVk6ZmnyHrCXba/T/MoLBXFjeC1PtvYgw0A8= github.com/onsi/gomega v1.36.2/go.mod h1:DdwyADRjrc825LhMEkD76cHR5+pUnjhUN8GlHlRPHzY= -github.com/openai/openai-go v0.1.0-alpha.59 h1:T3IYwKSCezfIlL9Oi+CGvU03fq0RoH33775S78Ti48Y= -github.com/openai/openai-go v0.1.0-alpha.59/go.mod h1:3SdE6BffOX9HPEQv8IL/fi3LYZ5TUpRYaqGQZbyk11A= github.com/pelletier/go-toml/v2 v2.2.4 h1:mye9XuhQ6gvn5h28+VilKrrPoQVanw5PMw/TB0t5Ec4= github.com/pelletier/go-toml/v2 v2.2.4/go.mod h1:2gIqNv+qfxSVS7cM2xJQKtLSTLUE9V8t9Stt+h56mCY= github.com/pkg/errors v0.9.1 h1:FEBLx1zS214owpjy7qsBeixbURkuhQAwrK5UwLGTwt4= @@ -229,16 +228,6 @@ github.com/stretchr/testify v1.8.0/go.mod h1:yNjHg4UonilssWZ8iaSj1OCr/vHnekPRkoO github.com/stretchr/testify v1.8.1/go.mod h1:w2LPCIKwWwSfY2zedu0+kehJoqGctiVI29o6fzry7u4= github.com/stretchr/testify v1.10.0 h1:Xv5erBjTwe/5IxqUQTdXv5kgmIvbHo3QQyRwhJsOfJA= github.com/stretchr/testify v1.10.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY= -github.com/tidwall/gjson v1.14.2/go.mod h1:/wbyibRr2FHMks5tjHJ5F8dMZh3AcwJEMf5vlfC0lxk= -github.com/tidwall/gjson v1.14.4 h1:uo0p8EbA09J7RQaflQ1aBRffTR7xedD2bcIVSYxLnkM= -github.com/tidwall/gjson v1.14.4/go.mod h1:/wbyibRr2FHMks5tjHJ5F8dMZh3AcwJEMf5vlfC0lxk= -github.com/tidwall/match v1.1.1 h1:+Ho715JplO36QYgwN9PGYNhgZvoUSc9X2c80KVTi+GA= -github.com/tidwall/match v1.1.1/go.mod h1:eRSPERbgtNPcGhD8UCthc6PmLEQXEWd3PRB5JTxsfmM= -github.com/tidwall/pretty v1.2.0/go.mod h1:ITEVvHYasfjBbM0u2Pg8T2nJnzm8xPwvNhhsoaGGjNU= -github.com/tidwall/pretty v1.2.1 h1:qjsOFOWWQl+N3RsoF5/ssm1pHmJJwhjlSbZ51I6wMl4= -github.com/tidwall/pretty v1.2.1/go.mod h1:ITEVvHYasfjBbM0u2Pg8T2nJnzm8xPwvNhhsoaGGjNU= -github.com/tidwall/sjson v1.2.5 h1:kLy8mja+1c9jlljvWTlSazM7cKDRfJuR/bOJhcY5NcY= -github.com/tidwall/sjson v1.2.5/go.mod h1:Fvgq9kS/6ociJEDnK0Fk1cpYF4FIW6ZF7LAe+6jwd28= github.com/tmc/langchaingo v0.1.13 h1:rcpMWBIi2y3B90XxfE4Ao8dhCQPVDMaNPnN5cGB1CaA= github.com/tmc/langchaingo v0.1.13/go.mod h1:vpQ5NOIhpzxDfTZK9B6tf2GM/MoaHewPWM5KXXGh7hg= github.com/twitchyliquid64/golang-asm v0.15.1 h1:SU5vSMR7hnwNxj24w34ZyCi/FmDZTkS4MhqMhdFk5YI= @@ -290,8 +279,6 @@ golang.org/x/arch v0.16.0/go.mod h1:JmwW7aLIoRUKgaTzhkiEFxvcEiQGyOg9BMonBJUS7EE= golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w= golang.org/x/crypto v0.0.0-20191011191535-87dc89f01550/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI= golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto= -golang.org/x/crypto v0.33.0 h1:IOBPskki6Lysi0lo9qQvbxiQ+FvsCC/YWOecCHAixus= -golang.org/x/crypto v0.33.0/go.mod h1:bVdXmD7IV/4GdElGPozy6U7lWdRXA4qyRVGJV57uQ5M= golang.org/x/crypto v0.37.0 h1:kJNSjF/Xp7kU0iB2Z+9viTPMW4EqqsrywMXLJOOsXSE= golang.org/x/crypto v0.37.0/go.mod h1:vg+k43peMZ0pUMhYmVAWysMK35e6ioLh3wB8ZCAfbVc= golang.org/x/exp v0.0.0-20190121172915-509febef88a4/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA= @@ -311,8 +298,6 @@ golang.org/x/net v0.0.0-20190620200207-3b0461eec859/go.mod h1:z5CRVTTTmAJ677TzLL golang.org/x/net v0.0.0-20200226121028-0de0cce0169b/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= golang.org/x/net v0.0.0-20201021035429-f5854403a974/go.mod h1:sp8m0HH+o8qH0wwXwYZr8TS3Oi6o0r6Gce1SSxlDquU= golang.org/x/net v0.0.0-20201110031124-69a78807bb2b/go.mod h1:sp8m0HH+o8qH0wwXwYZr8TS3Oi6o0r6Gce1SSxlDquU= -golang.org/x/net v0.35.0 h1:T5GQRQb2y08kTAByq9L4/bz8cipCdA8FbRTXewonqY8= -golang.org/x/net v0.35.0/go.mod h1:EglIi67kWsHKlRzzVMUD93VMSWGFOMSZgxFjparz1Qk= golang.org/x/net v0.39.0 h1:ZCu7HMWDxpXpaiKdhzIfaltL9Lp31x/3fCP11bc6/fY= golang.org/x/net v0.39.0/go.mod h1:X7NRbYVEA+ewNkCNyJ513WmMdQ3BineSwVtN2zD/d+E= golang.org/x/oauth2 v0.0.0-20180821212333-d2e6202438be/go.mod h1:N/0e6XlmueqKjAGxoOufVs8QHGRruUQn6yWY3a++T0U= @@ -323,8 +308,6 @@ golang.org/x/sync v0.0.0-20181108010431-42b317875d0f/go.mod h1:RxMgew5VJxzue5/jJ golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20190911185100-cd5d95a43a6e/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20201020160332-67f06af15bc9/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= -golang.org/x/sync v0.11.0 h1:GGz8+XQP4FvTTrjZPzNKTMFtSXH80RAzG+5ghFPgK9w= -golang.org/x/sync v0.11.0/go.mod h1:Czt+wKu1gCyEFDUtn0jG5QVvpJ6rzVqr5aXyt9drQfk= golang.org/x/sync v0.13.0 h1:AauUjRAJ9OSnvULf/ARrrVywoJDy0YS2AwQ98I37610= golang.org/x/sync v0.13.0/go.mod h1:1dzgHSNfp02xaA81J2MS99Qcpr2w7fw1gpm99rleRqA= golang.org/x/sys v0.0.0-20180830151530-49385e6e1522/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= @@ -332,18 +315,12 @@ golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5h golang.org/x/sys v0.0.0-20190412213103-97732733099d/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20200930185726-fdedc70b468f/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.6.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= -golang.org/x/sys v0.30.0 h1:QjkSwP/36a20jFYWkSue1YwXzLmsV5Gfq7Eiy72C1uc= -golang.org/x/sys v0.30.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= golang.org/x/sys v0.32.0 h1:s77OFDvIQeibCmezSnk/q6iAfkdiQaJi4VzroCFrN20= golang.org/x/sys v0.32.0/go.mod h1:BJP2sWEmIv4KK5OTEluFJCKSidICx8ciO85XgH3Ak8k= -golang.org/x/term v0.29.0 h1:L6pJp37ocefwRRtYPKSWOWzOtWSxVajvz2ldH/xi3iU= -golang.org/x/term v0.29.0/go.mod h1:6bl4lRlvVuDgSf3179VpIxBF0o10JUpXWOnI7nErv7s= golang.org/x/term v0.31.0 h1:erwDkOK1Msy6offm1mOgvspSkslFnIGsFnxOKoufg3o= golang.org/x/term v0.31.0/go.mod h1:R4BeIy7D95HzImkxGkTW1UQTtP54tio2RyHz7PwK0aw= golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= -golang.org/x/text v0.22.0 h1:bofq7m3/HAFvbF51jz3Q9wLg3jkvSPuiZu/pD1XwgtM= -golang.org/x/text v0.22.0/go.mod h1:YRoo4H8PVmsu+E3Ou7cqLVH8oXWIHVoX0jqUWALQhfY= golang.org/x/text v0.24.0 h1:dd5Bzh4yt5KYA8f9CJHCP4FB4D51c2c6JvN37xJJkJ0= golang.org/x/text v0.24.0/go.mod h1:L8rBsPeo2pSS+xqN0d5u2ikmjtmoJbDBT1b7nHvFCdU= golang.org/x/time v0.7.0 h1:ntUhktv3OPE6TgYxXWv9vKvUSJyIFJlyohwbkEwPrKQ= @@ -393,8 +370,6 @@ google.golang.org/protobuf v1.22.0/go.mod h1:EGpADcykh3NcUnDUJcl1+ZksZNG86OlYog2 google.golang.org/protobuf v1.23.0/go.mod h1:EGpADcykh3NcUnDUJcl1+ZksZNG86OlYog2l/sGQquU= google.golang.org/protobuf v1.23.1-0.20200526195155-81db48ad09cc/go.mod h1:EGpADcykh3NcUnDUJcl1+ZksZNG86OlYog2l/sGQquU= google.golang.org/protobuf v1.25.0/go.mod h1:9JNX74DMeImyA3h4bdi1ymwjUzf21/xIlbajtzgsN7c= -google.golang.org/protobuf v1.36.3 h1:82DV7MYdb8anAVi3qge1wSnMDrnKK7ebr+I0hHRN1BU= -google.golang.org/protobuf v1.36.3/go.mod h1:9fA7Ob0pmnwhb644+1+CVWFRbNajQ6iRojtC/QF5bRE= google.golang.org/protobuf v1.36.6 h1:z1NpPI8ku2WgiWnf+t9wTPsn6eP1L7ksHUlkfLvd9xY= google.golang.org/protobuf v1.36.6/go.mod h1:jduwjTPXsFjZGTmRluh+L6NjiWu7pchiJ2/5YcXBHnY= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= @@ -428,7 +403,6 @@ k8s.io/kube-openapi v0.0.0-20241105132330-32ad38e42d3f/go.mod h1:R/HEjbvWI0qdfb8 k8s.io/utils v0.0.0-20241210054802-24370beab758 h1:sdbE21q2nlQtFh65saZY+rRM6x6aJJI8IUa1AmH/qa0= k8s.io/utils v0.0.0-20241210054802-24370beab758/go.mod h1:OLgZIPagt7ERELqWJFomSt595RzquPNLL48iOWgYOg0= nullprogram.com/x/optparse v1.0.0/go.mod h1:KdyPE+Igbe0jQUrVfMqDMeJQIJZEuyV7pjYmp6pbG50= -rsc.io/pdf v0.1.1/go.mod h1:n8OzWcQ6Sp37PL01nO98y4iUCRdTGarVfzxY20ICaU4= sigs.k8s.io/apiserver-network-proxy/konnectivity-client v0.31.0 h1:CPT0ExVicCzcpeN4baWEV2ko2Z/AsiZgEdwgcfwLgMo= sigs.k8s.io/apiserver-network-proxy/konnectivity-client v0.31.0/go.mod h1:Ve9uj1L+deCXFrPOk1LpFXqTg7LCFzFso6PA48q/XZw= sigs.k8s.io/controller-runtime v0.20.0 h1:jjkMo29xEXH+02Md9qaVXfEIaMESSpy3TBWPrsfQkQs= diff --git a/acp/internal/controller/task/send_response_url_test.go b/acp/internal/controller/task/send_response_url_test.go new file mode 100644 index 0000000..993ae76 --- /dev/null +++ b/acp/internal/controller/task/send_response_url_test.go @@ -0,0 +1,132 @@ +package task + +import ( + "context" + "encoding/json" + "net/http" + "net/http/httptest" + "sync" + "testing" + "time" + + acp "github.com/humanlayer/agentcontrolplane/acp/api/v1alpha1" + humanlayerapi "github.com/humanlayer/agentcontrolplane/acp/internal/humanlayerapi" + "github.com/stretchr/testify/assert" + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/client-go/tools/record" + "sigs.k8s.io/controller-runtime/pkg/log" + "sigs.k8s.io/controller-runtime/pkg/log/zap" +) + +// initTestReconciler creates a minimal TaskReconciler for testing +func initTestReconciler(t *testing.T) (*TaskReconciler, context.Context) { + // Initialize logger + logger := zap.New(zap.UseDevMode(true)) + ctx := context.Background() + ctx = log.IntoContext(ctx, logger) + + // Create a reconciler + scheme := runtime.NewScheme() + err := acp.AddToScheme(scheme) + assert.NoError(t, err, "Failed to add API schema") + + return &TaskReconciler{ + Scheme: scheme, + recorder: record.NewFakeRecorder(10), + }, ctx +} + +func TestSendFinalResultToResponseUrl(t *testing.T) { + // Create a channel to synchronize between test and handler + requestReceived := make(chan struct{}) + + // Track the received request for verification + var receivedRequest humanlayerapi.HumanContactInput + var receivedMutex sync.Mutex + + // Create a test server + server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + // Verify method and content type + assert.Equal(t, "POST", r.Method) + assert.Equal(t, "application/json", r.Header.Get("Content-Type")) + + // Decode the request body + decoder := json.NewDecoder(r.Body) + var req humanlayerapi.HumanContactInput + err := decoder.Decode(&req) + assert.NoError(t, err) + + // Store the request for later verification + receivedMutex.Lock() + receivedRequest = req + receivedMutex.Unlock() + + // Send a success response + w.WriteHeader(http.StatusOK) + w.Write([]byte(`{"status":"success"}`)) + + // Notify that request was received + close(requestReceived) + })) + defer server.Close() + + // Create a reconciler + reconciler, ctx := initTestReconciler(t) + + // Test sending result + testMsg := "This is the final task result" + err := reconciler.sendFinalResultToResponseUrl(ctx, server.URL, testMsg) + assert.NoError(t, err) + + // Wait for the request to be processed with a timeout + select { + case <-requestReceived: + // Request was received, continue with assertions + case <-time.After(2 * time.Second): + t.Fatal("Timed out waiting for request to be received") + } + + // Verify the request content + receivedMutex.Lock() + defer receivedMutex.Unlock() + + // Verify run_id and call_id are set + assert.NotEmpty(t, receivedRequest.GetRunId()) + assert.NotEmpty(t, receivedRequest.GetCallId()) + + // Verify the message content + assert.Equal(t, testMsg, receivedRequest.Spec.Msg) +} + +// Test handling of error responses +func TestSendFinalResultToResponseUrl_ErrorResponse(t *testing.T) { + // Create a test server that returns an error + server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + w.WriteHeader(http.StatusInternalServerError) + w.Write([]byte(`{"error":"something went wrong"}`)) + })) + defer server.Close() + + // Create a reconciler + reconciler, ctx := initTestReconciler(t) + + // Test sending result + err := reconciler.sendFinalResultToResponseUrl(ctx, server.URL, "test message") + + // Should return an error due to non-200 response + assert.Error(t, err) + assert.Contains(t, err.Error(), "received non-success status code: 500") +} + +// Test handling of connection errors +func TestSendFinalResultToResponseUrl_ConnectionError(t *testing.T) { + // Create a reconciler + reconciler, ctx := initTestReconciler(t) + + // Use an invalid URL to cause a connection error + err := reconciler.sendFinalResultToResponseUrl(ctx, "http://localhost:1", "test message") + + // Should return an error due to connection failure + assert.Error(t, err) + assert.Contains(t, err.Error(), "failed to send HTTP request") +} \ No newline at end of file diff --git a/acp/internal/controller/task/task_controller.go b/acp/internal/controller/task/task_controller.go index a836916..4eb602e 100644 --- a/acp/internal/controller/task/task_controller.go +++ b/acp/internal/controller/task/task_controller.go @@ -1,9 +1,13 @@ package task import ( + "bytes" "context" + "encoding/json" "errors" "fmt" + "io" + "net/http" "time" "github.com/google/uuid" @@ -20,6 +24,7 @@ import ( metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "github.com/humanlayer/agentcontrolplane/acp/internal/adapters" + "github.com/humanlayer/agentcontrolplane/acp/internal/humanlayerapi" "github.com/humanlayer/agentcontrolplane/acp/internal/llmclient" "github.com/humanlayer/agentcontrolplane/acp/internal/mcpmanager" "github.com/humanlayer/agentcontrolplane/acp/internal/validation" @@ -466,6 +471,25 @@ func (r *TaskReconciler) processLLMResponse(ctx context.Context, output *acp.Mes statusUpdate.Status.Error = "" r.recorder.Event(task, corev1.EventTypeNormal, "LLMFinalAnswer", "LLM response received successfully") + // If task has a responseUrl, send the final result to that URL + if task.Spec.ResponseUrl != "" { + go func() { + // Use a background context since we don't want this to block task completion + sendCtx, cancel := context.WithTimeout(context.Background(), 10*time.Second) + defer cancel() + + err := r.sendFinalResultToResponseUrl(sendCtx, task.Spec.ResponseUrl, output.Content) + if err != nil { + // Just log error, don't fail the task + logger.Error(err, "Failed to send final result to responseUrl", + "responseUrl", task.Spec.ResponseUrl) + } else { + logger.Info("Successfully sent final result to responseUrl", + "responseUrl", task.Spec.ResponseUrl) + } + }() + } + // End the task trace with OK status since we have a final answer. // The context passed here should ideally be the one from Reconcile after attachRootSpan. // r.endTaskTrace(ctx, task, codes.Ok, "Task completed successfully with final answer") @@ -778,6 +802,56 @@ func (r *TaskReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl. } // SetupWithManager sets up the controller with the Manager. +func (r *TaskReconciler) sendFinalResultToResponseUrl(ctx context.Context, responseUrl string, result string) error { + logger := log.FromContext(ctx) + logger.Info("Sending final result to responseUrl", "responseUrl", responseUrl) + + // Create the request body using the existing HumanLayerAPI types + runID := uuid.New().String() + callID := uuid.New().String() + + // Create the spec with the final result as the message + spec := humanlayerapi.NewHumanContactSpecInput(result) + + // Create the human contact input + humanContactInput := humanlayerapi.NewHumanContactInput(runID, callID, *spec) + + // Marshal the request body to JSON + jsonData, err := json.Marshal(humanContactInput) + if err != nil { + return fmt.Errorf("failed to marshal request body: %w", err) + } + + // Create the HTTP request + req, err := http.NewRequestWithContext(ctx, "POST", responseUrl, bytes.NewBuffer(jsonData)) + if err != nil { + return fmt.Errorf("failed to create HTTP request: %w", err) + } + + // Set content type header + req.Header.Set("Content-Type", "application/json") + + // Send the request + client := &http.Client{} + resp, err := client.Do(req) + if err != nil { + return fmt.Errorf("failed to send HTTP request: %w", err) + } + defer resp.Body.Close() + + // Check response status + if resp.StatusCode < 200 || resp.StatusCode >= 300 { + body, _ := io.ReadAll(resp.Body) + return fmt.Errorf("received non-success status code: %d, body: %s", resp.StatusCode, string(body)) + } + + logger.Info("Successfully sent final result to responseUrl", + "statusCode", resp.StatusCode, + "responseUrl", responseUrl) + return nil +} + + func (r *TaskReconciler) SetupWithManager(mgr ctrl.Manager) error { r.recorder = mgr.GetEventRecorderFor("task-controller") if r.newLLMClient == nil { diff --git a/acp/internal/controller/task/task_responseurl_integration_test.go b/acp/internal/controller/task/task_responseurl_integration_test.go new file mode 100644 index 0000000..a831134 --- /dev/null +++ b/acp/internal/controller/task/task_responseurl_integration_test.go @@ -0,0 +1,176 @@ +package task + +import ( + "context" + "encoding/json" + "net/http" + "net/http/httptest" + "sync" + "time" + + acp "github.com/humanlayer/agentcontrolplane/acp/api/v1alpha1" + "github.com/humanlayer/agentcontrolplane/acp/internal/humanlayerapi" + "github.com/humanlayer/agentcontrolplane/acp/internal/llmclient" + . "github.com/onsi/ginkgo/v2" + . "github.com/onsi/gomega" + v1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/types" + "k8s.io/client-go/tools/record" + "sigs.k8s.io/controller-runtime/pkg/reconcile" + "go.opentelemetry.io/otel/trace/noop" +) + +// MockLLMClient for testing +type MockLLMClient struct { + SendRequestResponse *acp.Message + SendRequestError error +} + +func (m *MockLLMClient) SendRequest(ctx context.Context, messages []acp.Message, tools []llmclient.Tool) (*acp.Message, error) { + return m.SendRequestResponse, m.SendRequestError +} + +// Creates a TaskReconciler with a custom LLM client factory +func reconcilerWithMockLLM(newLLMClient func(ctx context.Context, llm acp.LLM, apiKey string) (llmclient.LLMClient, error)) (*TaskReconciler, *record.FakeRecorder) { + recorder := record.NewFakeRecorder(10) + tracer := noop.NewTracerProvider().Tracer("test") + + r := &TaskReconciler{ + Client: k8sClient, + Scheme: k8sClient.Scheme(), + recorder: recorder, + newLLMClient: newLLMClient, + Tracer: tracer, + } + return r, recorder +} + +var _ = Describe("Task Controller with ResponseUrl", func() { + Context("when Task has responseUrl", func() { + var ( + server *httptest.Server + requestReceived chan struct{} + receivedRequest humanlayerapi.HumanContactInput + receivedMutex sync.Mutex + mockLLMClient *MockLLMClient + ) + + BeforeEach(func() { + // Set up the secret, LLM, and agent + _, _, _, teardown := setupSuiteObjects(ctx) + DeferCleanup(teardown) + + // Set up the mock LLM client to return a final answer + mockLLMClient = &MockLLMClient{ + SendRequestResponse: &acp.Message{ + Content: "This is the final answer", + }, + } + + // Set up the test server to receive the HTTP request + requestReceived = make(chan struct{}) + server = httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + // Decode the request body + decoder := json.NewDecoder(r.Body) + var req humanlayerapi.HumanContactInput + Expect(decoder.Decode(&req)).To(Succeed()) + + // Store the request for later verification + receivedMutex.Lock() + receivedRequest = req + receivedMutex.Unlock() + + // Send a success response + w.WriteHeader(http.StatusOK) + w.Write([]byte(`{"status":"success"}`)) + + // Notify that request was received + close(requestReceived) + })) + DeferCleanup(server.Close) + }) + + It("sends the final result to the responseUrl", func() { + By("creating a task with responseUrl") + // Create a task with responseUrl + customTask := &acp.Task{ + ObjectMeta: v1.ObjectMeta{ + Name: "task-with-responseurl", + Namespace: "default", + }, + Spec: acp.TaskSpec{ + AgentRef: acp.LocalObjectReference{ + Name: testAgent.Name, + }, + UserMessage: "What is the capital of France?", + ResponseUrl: server.URL, + }, + } + Expect(k8sClient.Create(ctx, customTask)).To(Succeed()) + task := customTask + DeferCleanup(func() { + Expect(k8sClient.Delete(ctx, task)).To(Succeed()) + }) + + // Create a mock LLM client factory + mockLLMClientFn := func(ctx context.Context, llm acp.LLM, apiKey string) (llmclient.LLMClient, error) { + return mockLLMClient, nil + } + + // Get reconciler with mock LLM client + By("creating reconciler with mock LLM client") + reconciler, _ := reconcilerWithMockLLM(mockLLMClientFn) + + By("reconciling the task to initialize it") + // First reconcile (should initialize the task) + result, err := reconciler.Reconcile(ctx, reconcile.Request{ + NamespacedName: types.NamespacedName{Name: task.Name, Namespace: "default"}, + }) + Expect(err).NotTo(HaveOccurred()) + Expect(result.Requeue).To(BeTrue()) + + // Get the updated task + Expect(k8sClient.Get(ctx, types.NamespacedName{Name: task.Name, Namespace: "default"}, task)).To(Succeed()) + Expect(task.Status.Phase).To(Equal(acp.TaskPhaseInitializing)) + + By("reconciling the task to prepare for LLM") + // Second reconcile (should validate agent and prepare for LLM) + result, err = reconciler.Reconcile(ctx, reconcile.Request{ + NamespacedName: types.NamespacedName{Name: task.Name, Namespace: "default"}, + }) + Expect(err).NotTo(HaveOccurred()) + Expect(result.Requeue).To(BeTrue()) + + // Get the updated task + Expect(k8sClient.Get(ctx, types.NamespacedName{Name: task.Name, Namespace: "default"}, task)).To(Succeed()) + Expect(task.Status.Phase).To(Equal(acp.TaskPhaseReadyForLLM)) + + By("reconciling the task to get final answer") + // Third reconcile (should send to LLM and get final answer) + result, err = reconciler.Reconcile(ctx, reconcile.Request{ + NamespacedName: types.NamespacedName{Name: task.Name, Namespace: "default"}, + }) + Expect(err).NotTo(HaveOccurred()) + + // Get the updated task + Expect(k8sClient.Get(ctx, types.NamespacedName{Name: task.Name, Namespace: "default"}, task)).To(Succeed()) + Expect(task.Status.Phase).To(Equal(acp.TaskPhaseFinalAnswer)) + Expect(task.Status.Output).To(Equal("This is the final answer")) + + By("waiting for HTTP request to be received") + // Wait for the HTTP request to be made + select { + case <-requestReceived: + // Request was received, continue with assertions + case <-time.After(5 * time.Second): + Fail("Timed out waiting for responseUrl request") + } + + By("verifying request content") + // Verify the request content + receivedMutex.Lock() + defer receivedMutex.Unlock() + Expect(receivedRequest.Spec.Msg).To(Equal("This is the final answer")) + }) + }) +}) \ No newline at end of file From aa3c4c5f611969539d37ac85808ffa55f1bc651f Mon Sep 17 00:00:00 2001 From: Allison Durham Date: Mon, 12 May 2025 18:12:52 -0700 Subject: [PATCH 2/8] okay lesgo things are passing --- .../controller/task/send_response_url_test.go | 14 +++++++------- .../controller/task/task_controller.go | 7 +++++-- .../task/task_responseurl_integration_test.go | 18 +++++++++--------- 3 files changed, 21 insertions(+), 18 deletions(-) diff --git a/acp/internal/controller/task/send_response_url_test.go b/acp/internal/controller/task/send_response_url_test.go index 993ae76..ce3b8d8 100644 --- a/acp/internal/controller/task/send_response_url_test.go +++ b/acp/internal/controller/task/send_response_url_test.go @@ -29,7 +29,7 @@ func initTestReconciler(t *testing.T) (*TaskReconciler, context.Context) { scheme := runtime.NewScheme() err := acp.AddToScheme(scheme) assert.NoError(t, err, "Failed to add API schema") - + return &TaskReconciler{ Scheme: scheme, recorder: record.NewFakeRecorder(10), @@ -39,7 +39,7 @@ func initTestReconciler(t *testing.T) (*TaskReconciler, context.Context) { func TestSendFinalResultToResponseUrl(t *testing.T) { // Create a channel to synchronize between test and handler requestReceived := make(chan struct{}) - + // Track the received request for verification var receivedRequest humanlayerapi.HumanContactInput var receivedMutex sync.Mutex @@ -63,7 +63,7 @@ func TestSendFinalResultToResponseUrl(t *testing.T) { // Send a success response w.WriteHeader(http.StatusOK) - w.Write([]byte(`{"status":"success"}`)) + _, _ = w.Write([]byte(`{"status":"success"}`)) // Notify that request was received close(requestReceived) @@ -103,7 +103,7 @@ func TestSendFinalResultToResponseUrl_ErrorResponse(t *testing.T) { // Create a test server that returns an error server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { w.WriteHeader(http.StatusInternalServerError) - w.Write([]byte(`{"error":"something went wrong"}`)) + _, _ = w.Write([]byte(`{"error":"something went wrong"}`)) })) defer server.Close() @@ -112,7 +112,7 @@ func TestSendFinalResultToResponseUrl_ErrorResponse(t *testing.T) { // Test sending result err := reconciler.sendFinalResultToResponseUrl(ctx, server.URL, "test message") - + // Should return an error due to non-200 response assert.Error(t, err) assert.Contains(t, err.Error(), "received non-success status code: 500") @@ -125,8 +125,8 @@ func TestSendFinalResultToResponseUrl_ConnectionError(t *testing.T) { // Use an invalid URL to cause a connection error err := reconciler.sendFinalResultToResponseUrl(ctx, "http://localhost:1", "test message") - + // Should return an error due to connection failure assert.Error(t, err) assert.Contains(t, err.Error(), "failed to send HTTP request") -} \ No newline at end of file +} diff --git a/acp/internal/controller/task/task_controller.go b/acp/internal/controller/task/task_controller.go index 4eb602e..f331fbf 100644 --- a/acp/internal/controller/task/task_controller.go +++ b/acp/internal/controller/task/task_controller.go @@ -837,7 +837,11 @@ func (r *TaskReconciler) sendFinalResultToResponseUrl(ctx context.Context, respo if err != nil { return fmt.Errorf("failed to send HTTP request: %w", err) } - defer resp.Body.Close() + defer func() { + if err := resp.Body.Close(); err != nil { + logger.Error(err, "Failed to close response body") + } + }() // Check response status if resp.StatusCode < 200 || resp.StatusCode >= 300 { @@ -851,7 +855,6 @@ func (r *TaskReconciler) sendFinalResultToResponseUrl(ctx context.Context, respo return nil } - func (r *TaskReconciler) SetupWithManager(mgr ctrl.Manager) error { r.recorder = mgr.GetEventRecorderFor("task-controller") if r.newLLMClient == nil { diff --git a/acp/internal/controller/task/task_responseurl_integration_test.go b/acp/internal/controller/task/task_responseurl_integration_test.go index a831134..0c05e72 100644 --- a/acp/internal/controller/task/task_responseurl_integration_test.go +++ b/acp/internal/controller/task/task_responseurl_integration_test.go @@ -13,11 +13,11 @@ import ( "github.com/humanlayer/agentcontrolplane/acp/internal/llmclient" . "github.com/onsi/ginkgo/v2" . "github.com/onsi/gomega" + "go.opentelemetry.io/otel/trace/noop" v1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/types" "k8s.io/client-go/tools/record" "sigs.k8s.io/controller-runtime/pkg/reconcile" - "go.opentelemetry.io/otel/trace/noop" ) // MockLLMClient for testing @@ -34,7 +34,7 @@ func (m *MockLLMClient) SendRequest(ctx context.Context, messages []acp.Message, func reconcilerWithMockLLM(newLLMClient func(ctx context.Context, llm acp.LLM, apiKey string) (llmclient.LLMClient, error)) (*TaskReconciler, *record.FakeRecorder) { recorder := record.NewFakeRecorder(10) tracer := noop.NewTracerProvider().Tracer("test") - + r := &TaskReconciler{ Client: k8sClient, Scheme: k8sClient.Scheme(), @@ -48,11 +48,11 @@ func reconcilerWithMockLLM(newLLMClient func(ctx context.Context, llm acp.LLM, a var _ = Describe("Task Controller with ResponseUrl", func() { Context("when Task has responseUrl", func() { var ( - server *httptest.Server - requestReceived chan struct{} - receivedRequest humanlayerapi.HumanContactInput - receivedMutex sync.Mutex - mockLLMClient *MockLLMClient + server *httptest.Server + requestReceived chan struct{} + receivedRequest humanlayerapi.HumanContactInput + receivedMutex sync.Mutex + mockLLMClient *MockLLMClient ) BeforeEach(func() { @@ -82,7 +82,7 @@ var _ = Describe("Task Controller with ResponseUrl", func() { // Send a success response w.WriteHeader(http.StatusOK) - w.Write([]byte(`{"status":"success"}`)) + _, _ = w.Write([]byte(`{"status":"success"}`)) // Notify that request was received close(requestReceived) @@ -173,4 +173,4 @@ var _ = Describe("Task Controller with ResponseUrl", func() { Expect(receivedRequest.Spec.Msg).To(Equal("This is the final answer")) }) }) -}) \ No newline at end of file +}) From 318734596a99a2370199ad5091300ec45ae9261b Mon Sep 17 00:00:00 2001 From: Allison Durham Date: Mon, 12 May 2025 18:30:36 -0700 Subject: [PATCH 3/8] go back to main packages --- acp/go.mod | 14 +- .../controller/task/send_response_url_test.go | 194 +++++++++--------- 2 files changed, 104 insertions(+), 104 deletions(-) diff --git a/acp/go.mod b/acp/go.mod index b0c03ff..533bcd6 100644 --- a/acp/go.mod +++ b/acp/go.mod @@ -3,11 +3,10 @@ module github.com/humanlayer/agentcontrolplane/acp go 1.24.0 require ( - github.com/gin-gonic/gin v1.10.0 github.com/mark3labs/mcp-go v0.15.0 github.com/onsi/ginkgo/v2 v2.23.2 github.com/onsi/gomega v1.36.2 - github.com/stretchr/testify v1.10.0 + github.com/openai/openai-go v0.1.0-alpha.59 github.com/tmc/langchaingo v0.1.13 go.opentelemetry.io/otel v1.34.0 go.opentelemetry.io/otel/exporters/otlp/otlpmetric/otlpmetrichttp v1.34.0 @@ -34,10 +33,12 @@ require ( github.com/bytedance/sonic v1.13.2 // indirect github.com/bytedance/sonic/loader v0.2.4 // indirect github.com/cloudwego/base64x v0.1.5 // indirect + github.com/cloudwego/iasm v0.2.0 // indirect github.com/dlclark/regexp2 v1.10.0 // indirect github.com/gabriel-vasile/mimetype v1.4.8 // indirect github.com/gage-technologies/mistral-go v1.1.0 // indirect github.com/gin-contrib/sse v1.1.0 // indirect + github.com/gin-gonic/gin v1.10.0 // indirect github.com/go-playground/locales v0.14.1 // indirect github.com/go-playground/universal-translator v0.18.1 // indirect github.com/go-playground/validator/v10 v10.26.0 // indirect @@ -52,7 +53,10 @@ require ( github.com/mattn/go-isatty v0.0.20 // indirect github.com/pelletier/go-toml/v2 v2.2.4 // indirect github.com/pkoukk/tiktoken-go v0.1.6 // indirect - github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2 // indirect + github.com/tidwall/gjson v1.14.4 // indirect + github.com/tidwall/match v1.1.1 // indirect + github.com/tidwall/pretty v1.2.1 // indirect + github.com/tidwall/sjson v1.2.5 // indirect github.com/twitchyliquid64/golang-asm v0.15.1 // indirect github.com/ugorji/go/codec v1.2.12 // indirect github.com/yosida95/uritemplate/v3 v3.0.2 // indirect @@ -102,7 +106,7 @@ require ( github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd // indirect github.com/modern-go/reflect2 v1.0.2 // indirect github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 // indirect - github.com/pkg/errors v0.9.1 + github.com/pkg/errors v0.9.1 // indirect github.com/prometheus/client_golang v1.19.1 // indirect github.com/prometheus/client_model v0.6.1 // indirect github.com/prometheus/common v0.55.0 // indirect @@ -115,7 +119,7 @@ require ( go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.53.0 // indirect go.opentelemetry.io/otel/exporters/otlp/otlptrace v1.34.0 // indirect go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc v1.27.0 // indirect - go.opentelemetry.io/otel/metric v1.34.0 // indirect + go.opentelemetry.io/otel/metric v1.34.0 go.opentelemetry.io/otel/trace v1.34.0 go.opentelemetry.io/proto/otlp v1.5.0 // indirect go.uber.org/multierr v1.11.0 // indirect diff --git a/acp/internal/controller/task/send_response_url_test.go b/acp/internal/controller/task/send_response_url_test.go index ce3b8d8..b7e263d 100644 --- a/acp/internal/controller/task/send_response_url_test.go +++ b/acp/internal/controller/task/send_response_url_test.go @@ -6,12 +6,11 @@ import ( "net/http" "net/http/httptest" "sync" - "testing" - "time" acp "github.com/humanlayer/agentcontrolplane/acp/api/v1alpha1" humanlayerapi "github.com/humanlayer/agentcontrolplane/acp/internal/humanlayerapi" - "github.com/stretchr/testify/assert" + . "github.com/onsi/ginkgo/v2" + . "github.com/onsi/gomega" "k8s.io/apimachinery/pkg/runtime" "k8s.io/client-go/tools/record" "sigs.k8s.io/controller-runtime/pkg/log" @@ -19,7 +18,7 @@ import ( ) // initTestReconciler creates a minimal TaskReconciler for testing -func initTestReconciler(t *testing.T) (*TaskReconciler, context.Context) { +func initTestReconciler() (*TaskReconciler, context.Context) { // Initialize logger logger := zap.New(zap.UseDevMode(true)) ctx := context.Background() @@ -28,7 +27,7 @@ func initTestReconciler(t *testing.T) (*TaskReconciler, context.Context) { // Create a reconciler scheme := runtime.NewScheme() err := acp.AddToScheme(scheme) - assert.NoError(t, err, "Failed to add API schema") + Expect(err).NotTo(HaveOccurred(), "Failed to add API schema") return &TaskReconciler{ Scheme: scheme, @@ -36,97 +35,94 @@ func initTestReconciler(t *testing.T) (*TaskReconciler, context.Context) { }, ctx } -func TestSendFinalResultToResponseUrl(t *testing.T) { - // Create a channel to synchronize between test and handler - requestReceived := make(chan struct{}) - - // Track the received request for verification - var receivedRequest humanlayerapi.HumanContactInput - var receivedMutex sync.Mutex - - // Create a test server - server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { - // Verify method and content type - assert.Equal(t, "POST", r.Method) - assert.Equal(t, "application/json", r.Header.Get("Content-Type")) - - // Decode the request body - decoder := json.NewDecoder(r.Body) - var req humanlayerapi.HumanContactInput - err := decoder.Decode(&req) - assert.NoError(t, err) - - // Store the request for later verification - receivedMutex.Lock() - receivedRequest = req - receivedMutex.Unlock() - - // Send a success response - w.WriteHeader(http.StatusOK) - _, _ = w.Write([]byte(`{"status":"success"}`)) - - // Notify that request was received - close(requestReceived) - })) - defer server.Close() - - // Create a reconciler - reconciler, ctx := initTestReconciler(t) - - // Test sending result - testMsg := "This is the final task result" - err := reconciler.sendFinalResultToResponseUrl(ctx, server.URL, testMsg) - assert.NoError(t, err) - - // Wait for the request to be processed with a timeout - select { - case <-requestReceived: - // Request was received, continue with assertions - case <-time.After(2 * time.Second): - t.Fatal("Timed out waiting for request to be received") - } - - // Verify the request content - receivedMutex.Lock() - defer receivedMutex.Unlock() - - // Verify run_id and call_id are set - assert.NotEmpty(t, receivedRequest.GetRunId()) - assert.NotEmpty(t, receivedRequest.GetCallId()) - - // Verify the message content - assert.Equal(t, testMsg, receivedRequest.Spec.Msg) -} - -// Test handling of error responses -func TestSendFinalResultToResponseUrl_ErrorResponse(t *testing.T) { - // Create a test server that returns an error - server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { - w.WriteHeader(http.StatusInternalServerError) - _, _ = w.Write([]byte(`{"error":"something went wrong"}`)) - })) - defer server.Close() - - // Create a reconciler - reconciler, ctx := initTestReconciler(t) - - // Test sending result - err := reconciler.sendFinalResultToResponseUrl(ctx, server.URL, "test message") - - // Should return an error due to non-200 response - assert.Error(t, err) - assert.Contains(t, err.Error(), "received non-success status code: 500") -} - -// Test handling of connection errors -func TestSendFinalResultToResponseUrl_ConnectionError(t *testing.T) { - // Create a reconciler - reconciler, ctx := initTestReconciler(t) - - // Use an invalid URL to cause a connection error - err := reconciler.sendFinalResultToResponseUrl(ctx, "http://localhost:1", "test message") - - // Should return an error due to connection failure - assert.Error(t, err) - assert.Contains(t, err.Error(), "failed to send HTTP request") -} +var _ = Describe("ResponseUrl Functionality", func() { + Context("when sending results to responseUrl", func() { + It("successfully sends the result and verifies content", func() { + // Create a channel to synchronize between test and handler + requestReceived := make(chan struct{}) + + // Track the received request for verification + var receivedRequest humanlayerapi.HumanContactInput + var receivedMutex sync.Mutex + + // Create a test server + server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + // Verify method and content type + Expect(r.Method).To(Equal("POST")) + Expect(r.Header.Get("Content-Type")).To(Equal("application/json")) + + // Decode the request body + decoder := json.NewDecoder(r.Body) + var req humanlayerapi.HumanContactInput + err := decoder.Decode(&req) + Expect(err).NotTo(HaveOccurred()) + + // Store the request for later verification + receivedMutex.Lock() + receivedRequest = req + receivedMutex.Unlock() + + // Send a success response + w.WriteHeader(http.StatusOK) + _, _ = w.Write([]byte(`{"status":"success"}`)) + + // Notify that request was received + close(requestReceived) + })) + defer server.Close() + + // Create a reconciler + reconciler, ctx := initTestReconciler() + + // Test sending result + testMsg := "This is the final task result" + err := reconciler.sendFinalResultToResponseUrl(ctx, server.URL, testMsg) + Expect(err).NotTo(HaveOccurred()) + + // Wait for the request to be processed with a timeout + Eventually(requestReceived).Should(BeClosed(), "Timed out waiting for request to be received") + + // Verify the request content + receivedMutex.Lock() + defer receivedMutex.Unlock() + + // Verify run_id and call_id are set + Expect(receivedRequest.GetRunId()).NotTo(BeEmpty()) + Expect(receivedRequest.GetCallId()).NotTo(BeEmpty()) + + // Verify the message content + Expect(receivedRequest.Spec.Msg).To(Equal(testMsg)) + }) + + It("handles error responses appropriately", func() { + // Create a test server that returns an error + server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + w.WriteHeader(http.StatusInternalServerError) + _, _ = w.Write([]byte(`{"error":"something went wrong"}`)) + })) + defer server.Close() + + // Create a reconciler + reconciler, ctx := initTestReconciler() + + // Test sending result + err := reconciler.sendFinalResultToResponseUrl(ctx, server.URL, "test message") + + // Should return an error due to non-200 response + Expect(err).To(HaveOccurred()) + Expect(err.Error()).To(ContainSubstring("received non-success status code: 500")) + }) + + It("handles connection errors appropriately", func() { + // Create a reconciler + reconciler, ctx := initTestReconciler() + + // Use an invalid URL to cause a connection error + err := reconciler.sendFinalResultToResponseUrl(ctx, "http://localhost:1", "test message") + + // Should return an error due to connection failure + Expect(err).To(HaveOccurred()) + Expect(err.Error()).To(ContainSubstring("failed to send HTTP request")) + }) + }) +}) From 4c8150e5c04d9a0567bc8a09d7943c0c25d6d16f Mon Sep 17 00:00:00 2001 From: Allison Durham Date: Mon, 12 May 2025 18:55:17 -0700 Subject: [PATCH 4/8] kubernetes events, easier debugging, and URL naming --- acp/api/v1alpha1/task_types.go | 4 +- .../crd/bases/acp.humanlayer.dev_tasks.yaml | 4 +- .../controller/task/send_response_url_test.go | 12 +- .../controller/task/task_controller.go | 148 +++++++++++++----- .../task/task_responseurl_integration_test.go | 14 +- 5 files changed, 130 insertions(+), 52 deletions(-) diff --git a/acp/api/v1alpha1/task_types.go b/acp/api/v1alpha1/task_types.go index ad6f80c..9a4814f 100644 --- a/acp/api/v1alpha1/task_types.go +++ b/acp/api/v1alpha1/task_types.go @@ -38,10 +38,10 @@ type TaskSpec struct { // +optional ContextWindow []Message `json:"contextWindow,omitempty"` - // ResponseUrl specifies a pre-generated URL that will be used for human contact responses. + // ResponseURL specifies a pre-generated URL that will be used for human contact responses. // This allows the system to direct responses to a specific endpoint. // +optional - ResponseUrl string `json:"responseUrl,omitempty"` + ResponseURL string `json:"responseURL,omitempty"` } // Message represents a single message in the conversation diff --git a/acp/config/crd/bases/acp.humanlayer.dev_tasks.yaml b/acp/config/crd/bases/acp.humanlayer.dev_tasks.yaml index f3693f9..d72f443 100644 --- a/acp/config/crd/bases/acp.humanlayer.dev_tasks.yaml +++ b/acp/config/crd/bases/acp.humanlayer.dev_tasks.yaml @@ -153,9 +153,9 @@ spec: - role type: object type: array - responseUrl: + responseURL: description: |- - ResponseUrl specifies a pre-generated URL that will be used for human contact responses. + ResponseURL specifies a pre-generated URL that will be used for human contact responses. This allows the system to direct responses to a specific endpoint. type: string userMessage: diff --git a/acp/internal/controller/task/send_response_url_test.go b/acp/internal/controller/task/send_response_url_test.go index b7e263d..c40d4f0 100644 --- a/acp/internal/controller/task/send_response_url_test.go +++ b/acp/internal/controller/task/send_response_url_test.go @@ -35,8 +35,8 @@ func initTestReconciler() (*TaskReconciler, context.Context) { }, ctx } -var _ = Describe("ResponseUrl Functionality", func() { - Context("when sending results to responseUrl", func() { +var _ = Describe("ResponseURL Functionality", func() { + Context("when sending results to responseURL", func() { It("successfully sends the result and verifies content", func() { // Create a channel to synchronize between test and handler requestReceived := make(chan struct{}) @@ -76,7 +76,7 @@ var _ = Describe("ResponseUrl Functionality", func() { // Test sending result testMsg := "This is the final task result" - err := reconciler.sendFinalResultToResponseUrl(ctx, server.URL, testMsg) + err := reconciler.sendFinalResultToResponseURL(ctx, server.URL, testMsg) Expect(err).NotTo(HaveOccurred()) // Wait for the request to be processed with a timeout @@ -106,11 +106,11 @@ var _ = Describe("ResponseUrl Functionality", func() { reconciler, ctx := initTestReconciler() // Test sending result - err := reconciler.sendFinalResultToResponseUrl(ctx, server.URL, "test message") + err := reconciler.sendFinalResultToResponseURL(ctx, server.URL, "test message") // Should return an error due to non-200 response Expect(err).To(HaveOccurred()) - Expect(err.Error()).To(ContainSubstring("received non-success status code: 500")) + Expect(err.Error()).To(ContainSubstring("server error from responseURL (status 500)")) }) It("handles connection errors appropriately", func() { @@ -118,7 +118,7 @@ var _ = Describe("ResponseUrl Functionality", func() { reconciler, ctx := initTestReconciler() // Use an invalid URL to cause a connection error - err := reconciler.sendFinalResultToResponseUrl(ctx, "http://localhost:1", "test message") + err := reconciler.sendFinalResultToResponseURL(ctx, "http://localhost:1", "test message") // Should return an error due to connection failure Expect(err).To(HaveOccurred()) diff --git a/acp/internal/controller/task/task_controller.go b/acp/internal/controller/task/task_controller.go index f331fbf..487c2e3 100644 --- a/acp/internal/controller/task/task_controller.go +++ b/acp/internal/controller/task/task_controller.go @@ -471,21 +471,38 @@ func (r *TaskReconciler) processLLMResponse(ctx context.Context, output *acp.Mes statusUpdate.Status.Error = "" r.recorder.Event(task, corev1.EventTypeNormal, "LLMFinalAnswer", "LLM response received successfully") - // If task has a responseUrl, send the final result to that URL - if task.Spec.ResponseUrl != "" { + // If task has a responseURL, send the final result to that URL + if task.Spec.ResponseURL != "" { + // Create a separate goroutine to handle the HTTP request asynchronously go func() { // Use a background context since we don't want this to block task completion sendCtx, cancel := context.WithTimeout(context.Background(), 10*time.Second) defer cancel() - err := r.sendFinalResultToResponseUrl(sendCtx, task.Spec.ResponseUrl, output.Content) + // Use a copy of the task for recording events from the goroutine + taskCopy := task.DeepCopy() + + err := r.sendFinalResultToResponseURL(sendCtx, task.Spec.ResponseURL, output.Content) if err != nil { - // Just log error, don't fail the task - logger.Error(err, "Failed to send final result to responseUrl", - "responseUrl", task.Spec.ResponseUrl) + // Log detailed error information + logger.Error(err, "Failed to send final result to responseURL", + "responseURL", task.Spec.ResponseURL, + "task", fmt.Sprintf("%s/%s", task.Namespace, task.Name)) + + // Record a warning event on the task + r.recorder.Event(taskCopy, corev1.EventTypeWarning, "ResponseURLError", + fmt.Sprintf("Failed to send result to response URL: %v", err)) + + // We could update the task status with the error, but we don't want to + // mark the task as failed just because the responseURL notification failed. + // The task itself was successful, the notification was the only failure. } else { - logger.Info("Successfully sent final result to responseUrl", - "responseUrl", task.Spec.ResponseUrl) + logger.Info("Successfully sent final result to responseURL", + "responseURL", task.Spec.ResponseURL) + + // Record a normal event for the successful notification + r.recorder.Event(taskCopy, corev1.EventTypeNormal, "ResponseURLSent", + "Successfully sent result to response URL") } }() } @@ -802,9 +819,11 @@ func (r *TaskReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl. } // SetupWithManager sets up the controller with the Manager. -func (r *TaskReconciler) sendFinalResultToResponseUrl(ctx context.Context, responseUrl string, result string) error { +// sendFinalResultToResponseURL sends the final task result to the specified URL +// It includes retry logic for transient errors and better error categorization +func (r *TaskReconciler) sendFinalResultToResponseURL(ctx context.Context, responseURL string, result string) error { logger := log.FromContext(ctx) - logger.Info("Sending final result to responseUrl", "responseUrl", responseUrl) + logger.Info("Sending final result to responseURL", "responseURL", responseURL) // Create the request body using the existing HumanLayerAPI types runID := uuid.New().String() @@ -822,37 +841,96 @@ func (r *TaskReconciler) sendFinalResultToResponseUrl(ctx context.Context, respo return fmt.Errorf("failed to marshal request body: %w", err) } - // Create the HTTP request - req, err := http.NewRequestWithContext(ctx, "POST", responseUrl, bytes.NewBuffer(jsonData)) - if err != nil { - return fmt.Errorf("failed to create HTTP request: %w", err) - } + // Define retry parameters + maxRetries := 3 + retryDelay := 1 * time.Second + var lastErr error + + // Attempt to send the request with retries for transient errors + for attempt := 0; attempt < maxRetries; attempt++ { + if attempt > 0 { + logger.Info("Retrying request to responseURL", + "responseURL", responseURL, + "attempt", attempt+1, + "maxRetries", maxRetries) + + // Wait before retrying, with exponential backoff + select { + case <-time.After(retryDelay): + retryDelay *= 2 // Exponential backoff + case <-ctx.Done(): + return fmt.Errorf("context cancelled during retry delay: %w", ctx.Err()) + } + } - // Set content type header - req.Header.Set("Content-Type", "application/json") + // Create the HTTP request + req, err := http.NewRequestWithContext(ctx, "POST", responseURL, bytes.NewBuffer(jsonData)) + if err != nil { + lastErr = fmt.Errorf("failed to create HTTP request: %w", err) + // Don't retry for request creation errors - they're not likely to be transient + return lastErr + } - // Send the request - client := &http.Client{} - resp, err := client.Do(req) - if err != nil { - return fmt.Errorf("failed to send HTTP request: %w", err) - } - defer func() { - if err := resp.Body.Close(); err != nil { - logger.Error(err, "Failed to close response body") + // Set content type header + req.Header.Set("Content-Type", "application/json") + req.Header.Set("User-Agent", "ACP-Task-Controller") + + // Send the request with timeout + client := &http.Client{ + Timeout: 5 * time.Second, + } + resp, err := client.Do(req) + if err != nil { + lastErr = fmt.Errorf("failed to send HTTP request: %w", err) + // Network errors are often transient, continue to retry + continue + } + + // Ensure we close the response body + defer func() { + if resp != nil && resp.Body != nil { + if err := resp.Body.Close(); err != nil { + logger.Error(err, "Failed to close response body") + } + } + }() + + // Check response status + if resp.StatusCode < 200 || resp.StatusCode >= 300 { + body, readErr := io.ReadAll(resp.Body) + bodyStr := "" + if readErr != nil { + bodyStr = fmt.Sprintf("[error reading response body: %v]", readErr) + } else { + bodyStr = string(body) + } + + // Categorize the error based on status code + switch { + case resp.StatusCode >= 500: + // Server errors (5xx) may be transient, retry + lastErr = fmt.Errorf("server error from responseURL (status %d): %s", resp.StatusCode, bodyStr) + continue + case resp.StatusCode == 429: + // Rate limiting (429) - retry with backoff + lastErr = fmt.Errorf("rate limited by responseURL (status 429): %s", bodyStr) + continue + default: + // Client errors (4xx) other than 429 are likely permanent, don't retry + return fmt.Errorf("client error from responseURL (status %d): %s", resp.StatusCode, bodyStr) + } } - }() - // Check response status - if resp.StatusCode < 200 || resp.StatusCode >= 300 { - body, _ := io.ReadAll(resp.Body) - return fmt.Errorf("received non-success status code: %d, body: %s", resp.StatusCode, string(body)) + // Success case + logger.Info("Successfully sent final result to responseURL", + "statusCode", resp.StatusCode, + "responseURL", responseURL, + "attempts", attempt+1) + return nil } - logger.Info("Successfully sent final result to responseUrl", - "statusCode", resp.StatusCode, - "responseUrl", responseUrl) - return nil + // If we got here, we exhausted all retries + return fmt.Errorf("failed to send result after %d attempts: %w", maxRetries, lastErr) } func (r *TaskReconciler) SetupWithManager(mgr ctrl.Manager) error { diff --git a/acp/internal/controller/task/task_responseurl_integration_test.go b/acp/internal/controller/task/task_responseurl_integration_test.go index 0c05e72..1f2fc0a 100644 --- a/acp/internal/controller/task/task_responseurl_integration_test.go +++ b/acp/internal/controller/task/task_responseurl_integration_test.go @@ -45,8 +45,8 @@ func reconcilerWithMockLLM(newLLMClient func(ctx context.Context, llm acp.LLM, a return r, recorder } -var _ = Describe("Task Controller with ResponseUrl", func() { - Context("when Task has responseUrl", func() { +var _ = Describe("Task Controller with ResponseURL", func() { + Context("when Task has ResponseURL", func() { var ( server *httptest.Server requestReceived chan struct{} @@ -90,9 +90,9 @@ var _ = Describe("Task Controller with ResponseUrl", func() { DeferCleanup(server.Close) }) - It("sends the final result to the responseUrl", func() { - By("creating a task with responseUrl") - // Create a task with responseUrl + It("sends the final result to the ResponseURL", func() { + By("creating a task with ResponseURL") + // Create a task with ResponseURL customTask := &acp.Task{ ObjectMeta: v1.ObjectMeta{ Name: "task-with-responseurl", @@ -103,7 +103,7 @@ var _ = Describe("Task Controller with ResponseUrl", func() { Name: testAgent.Name, }, UserMessage: "What is the capital of France?", - ResponseUrl: server.URL, + ResponseURL: server.URL, }, } Expect(k8sClient.Create(ctx, customTask)).To(Succeed()) @@ -163,7 +163,7 @@ var _ = Describe("Task Controller with ResponseUrl", func() { case <-requestReceived: // Request was received, continue with assertions case <-time.After(5 * time.Second): - Fail("Timed out waiting for responseUrl request") + Fail("Timed out waiting for ResponseURL request") } By("verifying request content") From f94e74864f252e575848ce6f099cbcbc1e157d32 Mon Sep 17 00:00:00 2001 From: Allison Durham Date: Mon, 12 May 2025 19:06:18 -0700 Subject: [PATCH 5/8] happy linter happy life --- .../controller/task/task_controller.go | 183 +++++++++--------- 1 file changed, 93 insertions(+), 90 deletions(-) diff --git a/acp/internal/controller/task/task_controller.go b/acp/internal/controller/task/task_controller.go index 487c2e3..c6e0d3b 100644 --- a/acp/internal/controller/task/task_controller.go +++ b/acp/internal/controller/task/task_controller.go @@ -473,38 +473,7 @@ func (r *TaskReconciler) processLLMResponse(ctx context.Context, output *acp.Mes // If task has a responseURL, send the final result to that URL if task.Spec.ResponseURL != "" { - // Create a separate goroutine to handle the HTTP request asynchronously - go func() { - // Use a background context since we don't want this to block task completion - sendCtx, cancel := context.WithTimeout(context.Background(), 10*time.Second) - defer cancel() - - // Use a copy of the task for recording events from the goroutine - taskCopy := task.DeepCopy() - - err := r.sendFinalResultToResponseURL(sendCtx, task.Spec.ResponseURL, output.Content) - if err != nil { - // Log detailed error information - logger.Error(err, "Failed to send final result to responseURL", - "responseURL", task.Spec.ResponseURL, - "task", fmt.Sprintf("%s/%s", task.Namespace, task.Name)) - - // Record a warning event on the task - r.recorder.Event(taskCopy, corev1.EventTypeWarning, "ResponseURLError", - fmt.Sprintf("Failed to send result to response URL: %v", err)) - - // We could update the task status with the error, but we don't want to - // mark the task as failed just because the responseURL notification failed. - // The task itself was successful, the notification was the only failure. - } else { - logger.Info("Successfully sent final result to responseURL", - "responseURL", task.Spec.ResponseURL) - - // Record a normal event for the successful notification - r.recorder.Event(taskCopy, corev1.EventTypeNormal, "ResponseURLSent", - "Successfully sent result to response URL") - } - }() + r.notifyResponseURLAsync(task, output.Content) } // End the task trace with OK status since we have a final answer. @@ -819,71 +788,82 @@ func (r *TaskReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl. } // SetupWithManager sets up the controller with the Manager. -// sendFinalResultToResponseURL sends the final task result to the specified URL -// It includes retry logic for transient errors and better error categorization -func (r *TaskReconciler) sendFinalResultToResponseURL(ctx context.Context, responseURL string, result string) error { - logger := log.FromContext(ctx) - logger.Info("Sending final result to responseURL", "responseURL", responseURL) +// notifyResponseURLAsync sends the final task result to the response URL asynchronously +func (r *TaskReconciler) notifyResponseURLAsync(task *acp.Task, result string) { + go func() { + ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) + defer cancel() + + logger := log.FromContext(ctx) + taskCopy := task.DeepCopy() + + err := r.sendFinalResultToResponseURL(ctx, task.Spec.ResponseURL, result) + if err != nil { + logger.Error(err, "Failed to send final result to responseURL", + "responseURL", task.Spec.ResponseURL, + "task", fmt.Sprintf("%s/%s", task.Namespace, task.Name)) + + r.recorder.Event(taskCopy, corev1.EventTypeWarning, "ResponseURLError", + fmt.Sprintf("Failed to send result to response URL: %v", err)) + } else { + logger.Info("Successfully sent final result to responseURL", + "responseURL", task.Spec.ResponseURL) + + r.recorder.Event(taskCopy, corev1.EventTypeNormal, "ResponseURLSent", + "Successfully sent result to response URL") + } + }() +} - // Create the request body using the existing HumanLayerAPI types +// createHumanContactRequest builds the request payload for sending to a response URL +func createHumanContactRequest(result string) ([]byte, error) { runID := uuid.New().String() callID := uuid.New().String() - - // Create the spec with the final result as the message spec := humanlayerapi.NewHumanContactSpecInput(result) + input := humanlayerapi.NewHumanContactInput(runID, callID, *spec) + return json.Marshal(input) +} - // Create the human contact input - humanContactInput := humanlayerapi.NewHumanContactInput(runID, callID, *spec) +// isRetryableStatusCode determines if an HTTP status code should trigger a retry +func isRetryableStatusCode(statusCode int) bool { + return statusCode >= 500 || statusCode == 429 +} + +// sendFinalResultToResponseURL sends the final task result to the specified URL +// It includes retry logic for transient errors and better error categorization +func (r *TaskReconciler) sendFinalResultToResponseURL(ctx context.Context, responseURL string, result string) error { + logger := log.FromContext(ctx) + logger.Info("Sending final result to responseURL", "responseURL", responseURL) - // Marshal the request body to JSON - jsonData, err := json.Marshal(humanContactInput) + // Create the request body + jsonData, err := createHumanContactRequest(result) if err != nil { return fmt.Errorf("failed to marshal request body: %w", err) } // Define retry parameters maxRetries := 3 - retryDelay := 1 * time.Second - var lastErr error - - // Attempt to send the request with retries for transient errors - for attempt := 0; attempt < maxRetries; attempt++ { - if attempt > 0 { - logger.Info("Retrying request to responseURL", - "responseURL", responseURL, - "attempt", attempt+1, - "maxRetries", maxRetries) - - // Wait before retrying, with exponential backoff - select { - case <-time.After(retryDelay): - retryDelay *= 2 // Exponential backoff - case <-ctx.Done(): - return fmt.Errorf("context cancelled during retry delay: %w", ctx.Err()) - } - } + initialDelay := 1 * time.Second + // Retry the operation with exponential backoff + return retryWithBackoff(ctx, maxRetries, initialDelay, responseURL, func() (bool, error) { // Create the HTTP request req, err := http.NewRequestWithContext(ctx, "POST", responseURL, bytes.NewBuffer(jsonData)) if err != nil { - lastErr = fmt.Errorf("failed to create HTTP request: %w", err) - // Don't retry for request creation errors - they're not likely to be transient - return lastErr + return false, fmt.Errorf("failed to create HTTP request: %w", err) // Non-retryable } - // Set content type header + // Set headers req.Header.Set("Content-Type", "application/json") req.Header.Set("User-Agent", "ACP-Task-Controller") - // Send the request with timeout + // Send the request client := &http.Client{ Timeout: 5 * time.Second, } resp, err := client.Do(req) if err != nil { - lastErr = fmt.Errorf("failed to send HTTP request: %w", err) - // Network errors are often transient, continue to retry - continue + return true, fmt.Errorf("failed to send HTTP request: %w", err) // Retryable } // Ensure we close the response body @@ -905,32 +885,55 @@ func (r *TaskReconciler) sendFinalResultToResponseURL(ctx context.Context, respo bodyStr = string(body) } - // Categorize the error based on status code - switch { - case resp.StatusCode >= 500: - // Server errors (5xx) may be transient, retry - lastErr = fmt.Errorf("server error from responseURL (status %d): %s", resp.StatusCode, bodyStr) - continue - case resp.StatusCode == 429: - // Rate limiting (429) - retry with backoff - lastErr = fmt.Errorf("rate limited by responseURL (status 429): %s", bodyStr) - continue - default: - // Client errors (4xx) other than 429 are likely permanent, don't retry - return fmt.Errorf("client error from responseURL (status %d): %s", resp.StatusCode, bodyStr) - } + // Return whether this error is retryable + retryable := isRetryableStatusCode(resp.StatusCode) + return retryable, fmt.Errorf("HTTP error from responseURL (status %d): %s", resp.StatusCode, bodyStr) } // Success case logger.Info("Successfully sent final result to responseURL", "statusCode", resp.StatusCode, - "responseURL", responseURL, - "attempts", attempt+1) - return nil + "responseURL", responseURL) + return false, nil + }) +} + +// retryWithBackoff executes an operation with exponential backoff +func retryWithBackoff(ctx context.Context, maxRetries int, initialDelay time.Duration, + responseURL string, operation func() (bool, error)) error { + + logger := log.FromContext(ctx) + var lastErr error + delay := initialDelay + + for attempt := 0; attempt < maxRetries; attempt++ { + if attempt > 0 { + logger.Info("Retrying request to responseURL", + "responseURL", responseURL, + "attempt", attempt+1, + "maxRetries", maxRetries) + + // Wait before retrying, with exponential backoff + select { + case <-time.After(delay): + delay *= 2 // Exponential backoff + case <-ctx.Done(): + return fmt.Errorf("context cancelled during retry: %w", ctx.Err()) + } + } + + shouldRetry, err := operation() + if err == nil { + return nil // Success + } + + lastErr = err + if !shouldRetry { + return err // Non-retryable error + } } - // If we got here, we exhausted all retries - return fmt.Errorf("failed to send result after %d attempts: %w", maxRetries, lastErr) + return fmt.Errorf("failed after %d attempts: %w", maxRetries, lastErr) } func (r *TaskReconciler) SetupWithManager(mgr ctrl.Manager) error { From 85faf3f708a2f16f1471dcfdd5f983ad32d947ea Mon Sep 17 00:00:00 2001 From: Allison Durham Date: Mon, 12 May 2025 19:07:13 -0700 Subject: [PATCH 6/8] lmao tests are happy now --- acp/internal/controller/task/send_response_url_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/acp/internal/controller/task/send_response_url_test.go b/acp/internal/controller/task/send_response_url_test.go index c40d4f0..b7acfae 100644 --- a/acp/internal/controller/task/send_response_url_test.go +++ b/acp/internal/controller/task/send_response_url_test.go @@ -110,7 +110,7 @@ var _ = Describe("ResponseURL Functionality", func() { // Should return an error due to non-200 response Expect(err).To(HaveOccurred()) - Expect(err.Error()).To(ContainSubstring("server error from responseURL (status 500)")) + Expect(err.Error()).To(ContainSubstring("HTTP error from responseURL (status 500)")) }) It("handles connection errors appropriately", func() { From 579ccb5a864c9d2ba8b74b52500040242d770f4d Mon Sep 17 00:00:00 2001 From: Allison Durham Date: Mon, 12 May 2025 19:41:00 -0700 Subject: [PATCH 7/8] Hello we have a nice script to run the README --- extract_commands.sh | 775 ++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 775 insertions(+) create mode 100755 extract_commands.sh diff --git a/extract_commands.sh b/extract_commands.sh new file mode 100755 index 0000000..c911605 --- /dev/null +++ b/extract_commands.sh @@ -0,0 +1,775 @@ +#!/bin/bash + +# Script that extracts and runs the setup commands from README.md +# This script parses the README.md file and extracts all bash commands within code blocks + +README_PATH="./README.md" +OUTPUT_FILE="./acp_commands.sh" + +echo "#!/bin/bash" > $OUTPUT_FILE +echo "" >> $OUTPUT_FILE +echo "# Commands extracted from $README_PATH" >> $OUTPUT_FILE +echo "# Generated on $(date)" >> $OUTPUT_FILE +echo "" >> $OUTPUT_FILE +echo "# Set -e to exit on error" >> $OUTPUT_FILE +echo "set -e" >> $OUTPUT_FILE +echo "" >> $OUTPUT_FILE +echo "# Add a function to check if we should continue after each step" >> $OUTPUT_FILE +echo "continue_prompt() {" >> $OUTPUT_FILE +echo " read -p \"Press Enter to continue to the next command, or Ctrl+C to exit...\" dummy" >> $OUTPUT_FILE +echo " echo \"\"" >> $OUTPUT_FILE +echo "}" >> $OUTPUT_FILE +echo "" >> $OUTPUT_FILE + +# Extract all bash command blocks that have actual commands +in_code_block=false +code_block_type="" +current_block="" +multiline_command=false +multiline_content="" + +while IFS= read -r line; do + # Check for code block start + if [[ "$line" =~ ^'```'(.*)$ ]]; then + block_type="${BASH_REMATCH[1]}" + if [[ "$block_type" == "bash" ]]; then + in_code_block=true + code_block_type="bash" + current_block="" + multiline_command=false + multiline_content="" + fi + continue + fi + + # Check for code block end + if [[ "$line" == '```' && "$in_code_block" == true ]]; then + in_code_block=false + + # Process the entire block if it's a valid command block + if [[ -n "$current_block" ]]; then + # Filter out blocks that aren't actual commands + if [[ ! "$current_block" =~ ^[[:space:]]*[A-Za-z0-9_-]+[[:space:]]+[A-Za-z0-9_-]+[[:space:]]+[A-Za-z0-9_-]+ ]] && + [[ ! "$current_block" =~ ^(NAME|NAMESPACE|STATUS|TYPE|REASON|AGE|FROM|MESSAGE|----|Output:) ]] && + [[ "$current_block" =~ (kind|kubectl|echo|export) ]]; then + + # Process multiline echo commands differently + in_multiline_echo=false + yaml_content="" + resource_kind="" + resource_name="" + + # Split block into lines for processing + while IFS= read -r cmd; do + # Skip lines that look like outputs + if [[ "$cmd" =~ ^(NAME|NAMESPACE|STATUS|TYPE|REASON|AGE|FROM|MESSAGE|----) ]] || + [[ "$cmd" =~ ^[[:space:]]*[0-9]+[[:space:]] ]] || + [[ "$cmd" =~ ^\{.*\}$ ]] || + [[ "$cmd" =~ ^[[:space:]]*\} ]] || + [[ "$cmd" =~ ^[[:space:]]*\> ]]; then + continue + fi + + # Skip blank lines + if [[ -z "$cmd" ]]; then + continue + fi + + # Skip lines that start with $ (shell prompt) + if [[ "$cmd" =~ ^\$ ]]; then + cmd="${cmd#$ }" + fi + + # Skip diagram notation + if [[ "$cmd" =~ ^graph|^flowchart|^subgraph ]]; then + continue + fi + + # Check for start of a multiline echo command (YAML creation) + if [[ "$cmd" =~ ^echo[[:space:]]*\'apiVersion: ]]; then + in_multiline_echo=true + yaml_content="$cmd" + continue + fi + + # Process lines that are part of a multiline echo + if [[ "$in_multiline_echo" == true ]]; then + yaml_content="$yaml_content"$'\n'"$cmd" + + # Extract resource kind and name for better output + if [[ "$cmd" =~ ^[[:space:]]*kind:[[:space:]]*([A-Za-z]+) ]]; then + resource_kind="${BASH_REMATCH[1]}" + fi + if [[ "$cmd" =~ ^[[:space:]]*[[:space:]]*name:[[:space:]]*([A-Za-z0-9_-]+) ]]; then + resource_name="${BASH_REMATCH[1]}" + fi + + # Check if we've reached the end of the multiline echo + if [[ "$cmd" =~ \'.*\|.*kubectl.*apply ]]; then + in_multiline_echo=false + + # Process the full echo command now that we have all of it + if [[ -n "$resource_kind" && -n "$resource_name" ]]; then + echo "echo \"Running: Creating $resource_kind $resource_name resource...\"" >> $OUTPUT_FILE + + # Add appropriate wait logic based on resource type + if [[ "$resource_kind" == "LLM" ]]; then + echo "$yaml_content" >> $OUTPUT_FILE + echo "echo \"Waiting for LLM $resource_name to initialize...\"" >> $OUTPUT_FILE + echo "for i in {1..10}; do" >> $OUTPUT_FILE + echo " if kubectl get llm $resource_name -o jsonpath='{.status.ready}' 2>/dev/null | grep -q 'true'; then" >> $OUTPUT_FILE + echo " echo \"LLM $resource_name is ready!\"" >> $OUTPUT_FILE + echo " break" >> $OUTPUT_FILE + echo " fi" >> $OUTPUT_FILE + echo " sleep 2" >> $OUTPUT_FILE + echo " echo -n \".\"" >> $OUTPUT_FILE + echo "done" >> $OUTPUT_FILE + echo "echo \"\"" >> $OUTPUT_FILE + elif [[ "$resource_kind" == "Agent" ]]; then + echo "$yaml_content" >> $OUTPUT_FILE + echo "echo \"Waiting for Agent $resource_name to initialize...\"" >> $OUTPUT_FILE + echo "for i in {1..10}; do" >> $OUTPUT_FILE + echo " if kubectl get agent $resource_name -o jsonpath='{.status.ready}' 2>/dev/null | grep -q 'true'; then" >> $OUTPUT_FILE + echo " echo \"Agent $resource_name is ready!\"" >> $OUTPUT_FILE + echo " break" >> $OUTPUT_FILE + echo " fi" >> $OUTPUT_FILE + echo " sleep 2" >> $OUTPUT_FILE + echo " echo -n \".\"" >> $OUTPUT_FILE + echo "done" >> $OUTPUT_FILE + echo "echo \"\"" >> $OUTPUT_FILE + elif [[ "$resource_kind" == "MCPServer" ]]; then + echo "$yaml_content" >> $OUTPUT_FILE + echo "echo \"Waiting for MCPServer $resource_name to initialize...\"" >> $OUTPUT_FILE + echo "for i in {1..10}; do" >> $OUTPUT_FILE + echo " if kubectl get mcpserver $resource_name -o jsonpath='{.status.ready}' 2>/dev/null | grep -q 'true'; then" >> $OUTPUT_FILE + echo " echo \"MCPServer $resource_name is ready!\"" >> $OUTPUT_FILE + echo " break" >> $OUTPUT_FILE + echo " fi" >> $OUTPUT_FILE + echo " sleep 2" >> $OUTPUT_FILE + echo " echo -n \".\"" >> $OUTPUT_FILE + echo "done" >> $OUTPUT_FILE + echo "echo \"\"" >> $OUTPUT_FILE + elif [[ "$resource_kind" == "Task" ]]; then + echo "$yaml_content" >> $OUTPUT_FILE + echo "echo \"Waiting for Task $resource_name to complete...\"" >> $OUTPUT_FILE + echo "for i in {1..15}; do" >> $OUTPUT_FILE + echo " status=\$(kubectl get task $resource_name -o jsonpath='{.status.phase}' 2>/dev/null || echo \"Pending\")" >> $OUTPUT_FILE + echo " if [[ \"\$status\" == \"FinalAnswer\" ]]; then" >> $OUTPUT_FILE + echo " echo \"Task $resource_name completed successfully!\"" >> $OUTPUT_FILE + echo " echo \"Result:\"" >> $OUTPUT_FILE + echo " kubectl get task $resource_name -o jsonpath='{.status.output}'" >> $OUTPUT_FILE + echo " echo \"\"" >> $OUTPUT_FILE + echo " break" >> $OUTPUT_FILE + echo " fi" >> $OUTPUT_FILE + echo " sleep 2" >> $OUTPUT_FILE + echo " echo -n \".\"" >> $OUTPUT_FILE + echo "done" >> $OUTPUT_FILE + echo "echo \"\"" >> $OUTPUT_FILE + else + echo "$yaml_content" >> $OUTPUT_FILE + fi + else + # If we couldn't determine the resource type/name, just apply it + echo "echo \"Running: Applying YAML resource\"" >> $OUTPUT_FILE + echo "$yaml_content" >> $OUTPUT_FILE + fi + echo "continue_prompt" >> $OUTPUT_FILE + echo "" >> $OUTPUT_FILE + fi + continue + fi + + # For normal commands, just add them to the output + echo "echo \"Running: $cmd\"" >> $OUTPUT_FILE + echo "$cmd" >> $OUTPUT_FILE + echo "continue_prompt" >> $OUTPUT_FILE + echo "" >> $OUTPUT_FILE + done <<< "$current_block" + fi + fi + + current_block="" + code_block_type="" + continue + fi + + # Collect code block content + if [[ "$in_code_block" == true && "$code_block_type" == "bash" ]]; then + current_block+="$line"$'\n' + fi +done < "$README_PATH" + +# Process code blocks inside "echo" multi-line strings +# These are YAML blocks that are piped to kubectl +extract_echo_blocks() { + local line="$1" + if [[ "$line" =~ ^echo[[:space:]]*\'(apiVersion:.*)\'[[:space:]]*\|[[:space:]]*kubectl[[:space:]]apply[[:space:]]-f[[:space:]]-$ ]]; then + # Found an echo with YAML content piped to kubectl apply + local yaml_content="${BASH_REMATCH[1]}" + + # Skip if this is from a
or other non-primary example + # Check for empty or incomplete YAML content + if [[ "$yaml_content" =~ "spec:" && ! "$yaml_content" =~ "name:" ]]; then + return 0 + fi + + # Try to extract resource kind and name from the YAML content + local resource_kind="" + local resource_name="" + + while IFS= read -r yaml_line; do + if [[ "$yaml_line" =~ ^kind:[[:space:]]*([A-Za-z]+) ]]; then + resource_kind="${BASH_REMATCH[1]}" + fi + if [[ "$yaml_line" =~ ^[[:space:]]*name:[[:space:]]*([A-Za-z0-9_-]+) ]]; then + resource_name="${BASH_REMATCH[1]}" + fi + done <<< "$yaml_content" + + # Add check if we found both kind and name + if [[ -n "$resource_kind" && -n "$resource_name" ]]; then + echo "echo \"Running: Create $resource_kind $resource_name if it doesn't exist\"" >> $OUTPUT_FILE + echo "# Add a small delay to allow resources to propagate" >> $OUTPUT_FILE + echo "sleep 3" >> $OUTPUT_FILE + echo "if ! kubectl get $resource_kind $resource_name &>/dev/null; then" >> $OUTPUT_FILE + echo " echo \"Creating $resource_kind $resource_name...\"" >> $OUTPUT_FILE + echo " echo '$yaml_content' | kubectl apply -f -" >> $OUTPUT_FILE + + # Add wait logic based on resource kind + if [[ "$resource_kind" == "LLM" ]]; then + echo " echo \"Waiting for $resource_kind $resource_name to become ready (up to 20 seconds)...\"" >> $OUTPUT_FILE + echo " for i in {1..10}; do" >> $OUTPUT_FILE + echo " if kubectl get llm $resource_name -o jsonpath='{.status.ready}' 2>/dev/null | grep -q 'true'; then" >> $OUTPUT_FILE + echo " echo \"$resource_kind $resource_name is ready!\"" >> $OUTPUT_FILE + echo " break" >> $OUTPUT_FILE + echo " fi" >> $OUTPUT_FILE + echo " sleep 2" >> $OUTPUT_FILE + echo " echo -n \".\"" >> $OUTPUT_FILE + echo " done" >> $OUTPUT_FILE + echo " echo \"\"" >> $OUTPUT_FILE + elif [[ "$resource_kind" == "Agent" ]]; then + echo " echo \"Waiting for $resource_kind $resource_name to become ready (up to 20 seconds)...\"" >> $OUTPUT_FILE + echo " for i in {1..10}; do" >> $OUTPUT_FILE + echo " if kubectl get agent $resource_name -o jsonpath='{.status.ready}' 2>/dev/null | grep -q 'true'; then" >> $OUTPUT_FILE + echo " echo \"$resource_kind $resource_name is ready!\"" >> $OUTPUT_FILE + echo " break" >> $OUTPUT_FILE + echo " fi" >> $OUTPUT_FILE + echo " sleep 2" >> $OUTPUT_FILE + echo " echo -n \".\"" >> $OUTPUT_FILE + echo " done" >> $OUTPUT_FILE + echo " echo \"\"" >> $OUTPUT_FILE + elif [[ "$resource_kind" == "MCPServer" ]]; then + echo " echo \"Waiting for $resource_kind $resource_name to become ready (up to 30 seconds)...\"" >> $OUTPUT_FILE + echo " for i in {1..15}; do" >> $OUTPUT_FILE + echo " if kubectl get mcpserver $resource_name -o jsonpath='{.status.ready}' 2>/dev/null | grep -q 'true'; then" >> $OUTPUT_FILE + echo " echo \"$resource_kind $resource_name is ready!\"" >> $OUTPUT_FILE + echo " break" >> $OUTPUT_FILE + echo " fi" >> $OUTPUT_FILE + echo " sleep 2" >> $OUTPUT_FILE + echo " echo -n \".\"" >> $OUTPUT_FILE + echo " done" >> $OUTPUT_FILE + echo " echo \"\"" >> $OUTPUT_FILE + elif [[ "$resource_kind" == "Task" ]]; then + echo " echo \"Waiting for $resource_kind $resource_name to complete (up to 60 seconds)...\"" >> $OUTPUT_FILE + echo " for i in {1..30}; do" >> $OUTPUT_FILE + echo " status=\$(kubectl get task $resource_name -o jsonpath='{.status.phase}' 2>/dev/null || echo \"Pending\")" >> $OUTPUT_FILE + echo " if [[ \"\$status\" == \"FinalAnswer\" ]]; then" >> $OUTPUT_FILE + echo " echo \"$resource_kind $resource_name completed successfully!\"" >> $OUTPUT_FILE + echo " kubectl get task $resource_name -o jsonpath='{.status.output}'" >> $OUTPUT_FILE + echo " echo \"\"" >> $OUTPUT_FILE + echo " break" >> $OUTPUT_FILE + echo " fi" >> $OUTPUT_FILE + echo " sleep 2" >> $OUTPUT_FILE + echo " echo -n \".\"" >> $OUTPUT_FILE + echo " done" >> $OUTPUT_FILE + echo " echo \"\"" >> $OUTPUT_FILE + fi + + echo "else" >> $OUTPUT_FILE + echo " echo \"$resource_kind $resource_name already exists, updating it...\"" >> $OUTPUT_FILE + echo " echo '$yaml_content' | kubectl apply -f -" >> $OUTPUT_FILE + echo "fi" >> $OUTPUT_FILE + else + # If we couldn't determine the resource type/name, just apply it + echo "echo \"Running: kubectl apply for YAML resource\"" >> $OUTPUT_FILE + echo "echo '$yaml_content' | kubectl apply -f -" >> $OUTPUT_FILE + fi + echo "continue_prompt" >> $OUTPUT_FILE + echo "" >> $OUTPUT_FILE + return 0 + fi + return 1 +} + +# Extract kubectl commands +extract_kubectl_commands() { + local line="$1" + # Skip specific version URLs that might cause conflicts + if [[ "$line" =~ kubectl[[:space:]]apply[[:space:]]-f.*v0\. ]]; then + # Skip versioned URLs - we'll use the latest + return 0 + fi + + # Skip kubectl describe commands which are just for viewing + if [[ "$line" =~ ^kubectl[[:space:]]describe[[:space:]] ]]; then + return 0 + fi + + # Special handling for the main operator deployment + if [[ "$line" =~ kubectl[[:space:]]apply[[:space:]]-f.*latest\.yaml ]]; then + echo "echo \"Running: Deploying ACP controller\"" >> $OUTPUT_FILE + echo "$line" >> $OUTPUT_FILE + echo "echo \"Waiting for controller deployment to initialize (30 seconds)...\"" >> $OUTPUT_FILE + echo "sleep 30" >> $OUTPUT_FILE + echo "kubectl wait --for=condition=available --timeout=60s deployment/acp-controller-manager || echo \"Controller may still be starting, continuing anyway...\"" >> $OUTPUT_FILE + echo "continue_prompt" >> $OUTPUT_FILE + echo "" >> $OUTPUT_FILE + return 0 + fi + + # Special handling for multi-line secret creation commands + if [[ "$line" =~ ^kubectl[[:space:]]create[[:space:]]secret[[:space:]]generic[[:space:]]([a-z0-9_-]+)[[:space:]]*\\$ ]]; then + local secret_name="${BASH_REMATCH[1]}" + # This is a multi-line secret creation command - we need special handling + echo "echo \"Running: Check if secret $secret_name exists, create if it doesn't\"" >> $OUTPUT_FILE + echo "if ! kubectl get secret $secret_name &>/dev/null; then" >> $OUTPUT_FILE + echo " echo \"Creating secret $secret_name...\"" >> $OUTPUT_FILE + + # Handle different secret types based on name + if [[ "$secret_name" == "openai" ]]; then + echo " if [[ -z \"\$OPENAI_API_KEY\" ]]; then" >> $OUTPUT_FILE + echo " echo \"Error: OPENAI_API_KEY environment variable is not set. Please set it and try again.\"" >> $OUTPUT_FILE + echo " read -p \"Do you want to set it now? (y/n): \" SET_KEY" >> $OUTPUT_FILE + echo " if [[ \"\$SET_KEY\" == \"y\" ]]; then" >> $OUTPUT_FILE + echo " read -p \"Enter your OpenAI API key: \" OPENAI_API_KEY" >> $OUTPUT_FILE + echo " export OPENAI_API_KEY" >> $OUTPUT_FILE + echo " else" >> $OUTPUT_FILE + echo " exit 1" >> $OUTPUT_FILE + echo " fi" >> $OUTPUT_FILE + echo " fi" >> $OUTPUT_FILE + echo " echo \"Creating OpenAI secret with your API key...\"" >> $OUTPUT_FILE + echo " kubectl create secret generic $secret_name --from-literal=OPENAI_API_KEY=\$OPENAI_API_KEY --namespace=default" >> $OUTPUT_FILE + elif [[ "$secret_name" == "anthropic" ]]; then + echo " if [[ -z \"\$ANTHROPIC_API_KEY\" ]]; then" >> $OUTPUT_FILE + echo " echo \"Error: ANTHROPIC_API_KEY environment variable is not set. Please set it and try again.\"" >> $OUTPUT_FILE + echo " read -p \"Do you want to set it now? (y/n): \" SET_KEY" >> $OUTPUT_FILE + echo " if [[ \"\$SET_KEY\" == \"y\" ]]; then" >> $OUTPUT_FILE + echo " read -p \"Enter your Anthropic API key: \" ANTHROPIC_API_KEY" >> $OUTPUT_FILE + echo " export ANTHROPIC_API_KEY" >> $OUTPUT_FILE + echo " else" >> $OUTPUT_FILE + echo " echo \"Skipping Anthropic setup\"" >> $OUTPUT_FILE + echo " return 0" >> $OUTPUT_FILE + echo " fi" >> $OUTPUT_FILE + echo " fi" >> $OUTPUT_FILE + echo " kubectl create secret generic $secret_name --from-literal=ANTHROPIC_API_KEY=\$ANTHROPIC_API_KEY --namespace=default" >> $OUTPUT_FILE + elif [[ "$secret_name" == "humanlayer" ]]; then + echo " if [[ -z \"\$HUMANLAYER_API_KEY\" ]]; then" >> $OUTPUT_FILE + echo " echo \"Error: HUMANLAYER_API_KEY environment variable is not set. Please set it and try again.\"" >> $OUTPUT_FILE + echo " read -p \"Do you want to set it now? (y/n): \" SET_KEY" >> $OUTPUT_FILE + echo " if [[ \"\$SET_KEY\" == \"y\" ]]; then" >> $OUTPUT_FILE + echo " read -p \"Enter your HumanLayer API key: \" HUMANLAYER_API_KEY" >> $OUTPUT_FILE + echo " export HUMANLAYER_API_KEY" >> $OUTPUT_FILE + echo " else" >> $OUTPUT_FILE + echo " echo \"Skipping HumanLayer setup\"" >> $OUTPUT_FILE + echo " return 0" >> $OUTPUT_FILE + echo " fi" >> $OUTPUT_FILE + echo " fi" >> $OUTPUT_FILE + echo " kubectl create secret generic $secret_name --from-literal=HUMANLAYER_API_KEY=\$HUMANLAYER_API_KEY --namespace=default" >> $OUTPUT_FILE + else + # Generic secret handling + echo " # Generic secret creation" >> $OUTPUT_FILE + echo " $line" >> $OUTPUT_FILE + fi + + echo " echo \"Secret $secret_name created successfully\"" >> $OUTPUT_FILE + echo " kubectl get secret $secret_name" >> $OUTPUT_FILE + echo "else" >> $OUTPUT_FILE + echo " echo \"Secret $secret_name already exists, skipping creation\"" >> $OUTPUT_FILE + echo "fi" >> $OUTPUT_FILE + echo "continue_prompt" >> $OUTPUT_FILE + echo "" >> $OUTPUT_FILE + return 0 + elif [[ "$line" =~ ^kubectl[[:space:]]create[[:space:]]secret[[:space:]]generic[[:space:]]([a-z0-9_-]+)[[:space:]] ]]; then + # Single line secret creation + local secret_name="${BASH_REMATCH[1]}" + echo "echo \"Running: Check if secret $secret_name exists, create if it doesn't\"" >> $OUTPUT_FILE + echo "if ! kubectl get secret $secret_name &>/dev/null; then" >> $OUTPUT_FILE + echo " echo \"Creating secret $secret_name...\"" >> $OUTPUT_FILE + echo " $line" >> $OUTPUT_FILE + echo "else" >> $OUTPUT_FILE + echo " echo \"Secret $secret_name already exists, skipping creation\"" >> $OUTPUT_FILE + echo "fi" >> $OUTPUT_FILE + echo "continue_prompt" >> $OUTPUT_FILE + echo "" >> $OUTPUT_FILE + return 0 + fi + + # Handle kubectl get commands + if [[ "$line" =~ ^kubectl[[:space:]]get[[:space:]]([a-z]+)[[:space:]]?([a-z0-9_-]*) ]]; then + local resource_type="${BASH_REMATCH[1]}" + local resource_name="${BASH_REMATCH[2]}" + + echo "echo \"Running: $line\"" >> $OUTPUT_FILE + echo "# Add a small delay to allow resources to propagate" >> $OUTPUT_FILE + echo "sleep 2" >> $OUTPUT_FILE + echo "$line" >> $OUTPUT_FILE + echo "continue_prompt" >> $OUTPUT_FILE + echo "" >> $OUTPUT_FILE + return 0 + elif [[ "$line" =~ ^kubectl[[:space:]]apply[[:space:]]-f.*$ ]]; then + # Just echo and run kubectl apply commands + echo "echo \"Running: $line\"" >> $OUTPUT_FILE + echo "$line" >> $OUTPUT_FILE + echo "continue_prompt" >> $OUTPUT_FILE + echo "" >> $OUTPUT_FILE + return 0 + elif [[ "$line" =~ ^kubectl[[:space:]]([a-z]+)[[:space:]]([a-z0-9-]+).*$ ]]; then + echo "echo \"Running: $line\"" >> $OUTPUT_FILE + echo "$line" >> $OUTPUT_FILE + echo "continue_prompt" >> $OUTPUT_FILE + echo "" >> $OUTPUT_FILE + return 0 + fi + return 1 +} + +# Extract export commands +extract_export_commands() { + local line="$1" + if [[ "$line" =~ ^export[[:space:]]([A-Z_]+)=.*$ ]]; then + echo "echo \"Running: $line\"" >> $OUTPUT_FILE + echo "$line" >> $OUTPUT_FILE + echo "continue_prompt" >> $OUTPUT_FILE + echo "" >> $OUTPUT_FILE + return 0 + fi + return 1 +} + +# Extract kind commands +extract_kind_commands() { + local line="$1" + if [[ "$line" =~ ^kind[[:space:]]create[[:space:]]cluster.*$ ]]; then + # Add safety check for creating a kind cluster + echo "echo \"Running: Check if kind cluster exists, create if it doesn't\"" >> $OUTPUT_FILE + echo "if ! kind get clusters 2>/dev/null | grep -q \"^kind$\"; then" >> $OUTPUT_FILE + echo " echo \"Creating new kind cluster...\"" >> $OUTPUT_FILE + echo " $line" >> $OUTPUT_FILE + echo "else" >> $OUTPUT_FILE + echo " echo \"Kind cluster already exists, using existing cluster\"" >> $OUTPUT_FILE + echo "fi" >> $OUTPUT_FILE + echo "continue_prompt" >> $OUTPUT_FILE + echo "" >> $OUTPUT_FILE + return 0 + elif [[ "$line" =~ ^kind[[:space:]]([a-z]+)[[:space:]].*$ ]]; then + echo "echo \"Running: $line\"" >> $OUTPUT_FILE + echo "$line" >> $OUTPUT_FILE + echo "continue_prompt" >> $OUTPUT_FILE + echo "" >> $OUTPUT_FILE + return 0 + fi + return 1 +} + +# Second pass to catch specific command patterns +while IFS= read -r line; do + # Skip comment lines + if [[ "$line" =~ ^#.*$ ]]; then + continue + fi + + extract_echo_blocks "$line" || extract_kubectl_commands "$line" || extract_export_commands "$line" || extract_kind_commands "$line" +done < "$README_PATH" + +# First add the setup banner at the beginning of the script +TMP_FILE=$(mktemp) +cat > $TMP_FILE << 'EOF' +#!/bin/bash + +# Commands extracted from ./README.md +# Generated on TIMESTAMP + +# Set -e to exit on error +set -e + +# Add a function to check if we should continue after each step +continue_prompt() { + read -p "Press Enter to continue to the next command, or Ctrl+C to exit..." dummy + echo "" +} + +# Banner information +cat << 'BANNER' +==================================================== + ACP (Agent Control Plane) Setup Script + Generated from README.md on TIMESTAMP + + This script will guide you through setting up ACP + Press Ctrl+C at any time to exit +==================================================== + +Before continuing, please make sure: + - You have kubectl installed + - You have kind installed + - Docker is running + - You have your OpenAI API key ready (or set as OPENAI_API_KEY) +BANNER + +# Check for required tools +if ! command -v kubectl &> /dev/null; then + echo "Error: kubectl is not installed. Please install it and try again." + exit 1 +fi + +if ! command -v kind &> /dev/null; then + echo "Error: kind is not installed. Please install it and try again." + exit 1 +fi + +# Check if Docker is running +if ! docker info &>/dev/null; then + echo "Error: Docker is not running. Please start Docker and try again." + exit 1 +fi + +# Check for OPENAI_API_KEY +if [[ -z "$OPENAI_API_KEY" ]]; then + echo "Warning: OPENAI_API_KEY environment variable is not set." + read -p "Do you want to set it now? (y/n): " SET_KEY + if [[ "$SET_KEY" == "y" ]]; then + read -p "Enter your OpenAI API key: " OPENAI_API_KEY + export OPENAI_API_KEY + else + echo "Cannot proceed without an OpenAI API key." + exit 1 + fi +else + echo "✅ OPENAI_API_KEY environment variable is set." +fi + +read -p "Press Enter to begin setup or Ctrl+C to exit..." dummy +echo "" +EOF + +# Replace timestamp +sed "s/TIMESTAMP/$(date)/" $TMP_FILE > $OUTPUT_FILE +rm $TMP_FILE + +# At the end of the process, add the commands in the right order +echo -e "\n# Checking if essential resources were created" >> $OUTPUT_FILE +echo "echo \"Checking if essential ACP resources were created...\"" >> $OUTPUT_FILE +echo "" >> $OUTPUT_FILE + +# LLM creation +echo "# Create the LLM resource" >> $OUTPUT_FILE +echo "echo \"Setting up LLM resource for GPT-4o...\"" >> $OUTPUT_FILE +echo "if ! kubectl get llm gpt-4o &>/dev/null; then" >> $OUTPUT_FILE +echo " echo 'apiVersion: acp.humanlayer.dev/v1alpha1 " >> $OUTPUT_FILE +echo "kind: LLM" >> $OUTPUT_FILE +echo "metadata:" >> $OUTPUT_FILE +echo " name: gpt-4o" >> $OUTPUT_FILE +echo "spec:" >> $OUTPUT_FILE +echo " provider: openai" >> $OUTPUT_FILE +echo " parameters:" >> $OUTPUT_FILE +echo " model: gpt-4o" >> $OUTPUT_FILE +echo " apiKeyFrom:" >> $OUTPUT_FILE +echo " secretKeyRef:" >> $OUTPUT_FILE +echo " name: openai" >> $OUTPUT_FILE +echo " key: OPENAI_API_KEY" >> $OUTPUT_FILE +echo "' | kubectl apply -f -" >> $OUTPUT_FILE +echo " echo \"Waiting for LLM to initialize...\"" >> $OUTPUT_FILE +echo " for i in {1..10}; do" >> $OUTPUT_FILE +echo " if kubectl get llm gpt-4o -o jsonpath='{.status.ready}' 2>/dev/null | grep -q 'true'; then" >> $OUTPUT_FILE +echo " echo \"LLM gpt-4o is ready!\"" >> $OUTPUT_FILE +echo " break" >> $OUTPUT_FILE +echo " fi" >> $OUTPUT_FILE +echo " sleep 2" >> $OUTPUT_FILE +echo " echo -n \".\"" >> $OUTPUT_FILE +echo " done" >> $OUTPUT_FILE +echo " echo \"\"" >> $OUTPUT_FILE +echo "else" >> $OUTPUT_FILE +echo " echo \"LLM gpt-4o already exists\"" >> $OUTPUT_FILE +echo "fi" >> $OUTPUT_FILE +echo "continue_prompt" >> $OUTPUT_FILE +echo "" >> $OUTPUT_FILE + +# Agent creation +echo "# Create the Agent resource" >> $OUTPUT_FILE +echo "echo \"Creating Agent resource...\"" >> $OUTPUT_FILE +echo "if ! kubectl get agent my-assistant &>/dev/null; then" >> $OUTPUT_FILE +echo " echo 'apiVersion: acp.humanlayer.dev/v1alpha1 " >> $OUTPUT_FILE +echo "kind: Agent" >> $OUTPUT_FILE +echo "metadata:" >> $OUTPUT_FILE +echo " name: my-assistant" >> $OUTPUT_FILE +echo "spec:" >> $OUTPUT_FILE +echo " llmRef:" >> $OUTPUT_FILE +echo " name: gpt-4o" >> $OUTPUT_FILE +echo " system: |" >> $OUTPUT_FILE +echo " You are a helpful assistant. Your job is to help the user with their tasks." >> $OUTPUT_FILE +echo "' | kubectl apply -f -" >> $OUTPUT_FILE +echo " echo \"Waiting for Agent to initialize...\"" >> $OUTPUT_FILE +echo " for i in {1..10}; do" >> $OUTPUT_FILE +echo " if kubectl get agent my-assistant -o jsonpath='{.status.ready}' 2>/dev/null | grep -q 'true'; then" >> $OUTPUT_FILE +echo " echo \"Agent my-assistant is ready!\"" >> $OUTPUT_FILE +echo " break" >> $OUTPUT_FILE +echo " fi" >> $OUTPUT_FILE +echo " sleep 2" >> $OUTPUT_FILE +echo " echo -n \".\"" >> $OUTPUT_FILE +echo " done" >> $OUTPUT_FILE +echo " echo \"\"" >> $OUTPUT_FILE +echo "else" >> $OUTPUT_FILE +echo " echo \"Agent my-assistant already exists\"" >> $OUTPUT_FILE +echo "fi" >> $OUTPUT_FILE +echo "continue_prompt" >> $OUTPUT_FILE +echo "" >> $OUTPUT_FILE + +# First task creation - hello-world +echo "# Create a task to interact with the agent" >> $OUTPUT_FILE +echo "echo \"Creating a task to interact with your agent...\"" >> $OUTPUT_FILE +echo "if ! kubectl get task hello-world-1 &>/dev/null; then" >> $OUTPUT_FILE +echo " echo 'apiVersion: acp.humanlayer.dev/v1alpha1 " >> $OUTPUT_FILE +echo "kind: Task" >> $OUTPUT_FILE +echo "metadata:" >> $OUTPUT_FILE +echo " name: hello-world-1" >> $OUTPUT_FILE +echo "spec:" >> $OUTPUT_FILE +echo " agentRef:" >> $OUTPUT_FILE +echo " name: my-assistant" >> $OUTPUT_FILE +echo " userMessage: \"What is the capital of the moon?\"" >> $OUTPUT_FILE +echo "' | kubectl apply -f -" >> $OUTPUT_FILE +echo " echo \"Waiting for Task to complete...\"" >> $OUTPUT_FILE +echo " for i in {1..15}; do" >> $OUTPUT_FILE +echo " status=\$(kubectl get task hello-world-1 -o jsonpath='{.status.phase}' 2>/dev/null || echo \"Pending\")" >> $OUTPUT_FILE +echo " if [[ \"\$status\" == \"FinalAnswer\" ]]; then" >> $OUTPUT_FILE +echo " echo \"Task hello-world-1 completed successfully!\"" >> $OUTPUT_FILE +echo " echo \"Result:\"" >> $OUTPUT_FILE +echo " kubectl get task hello-world-1 -o jsonpath='{.status.output}'" >> $OUTPUT_FILE +echo " echo \"\"" >> $OUTPUT_FILE +echo " break" >> $OUTPUT_FILE +echo " fi" >> $OUTPUT_FILE +echo " sleep 2" >> $OUTPUT_FILE +echo " echo -n \".\"" >> $OUTPUT_FILE +echo " done" >> $OUTPUT_FILE +echo " echo \"\"" >> $OUTPUT_FILE +echo "else" >> $OUTPUT_FILE +echo " echo \"Task hello-world-1 already exists\"" >> $OUTPUT_FILE +echo "fi" >> $OUTPUT_FILE +echo "continue_prompt" >> $OUTPUT_FILE +echo "" >> $OUTPUT_FILE + +# MCP server setup +echo "# Add MCP server setup" >> $OUTPUT_FILE +echo "echo \"Setting up MCP server for fetch tool...\"" >> $OUTPUT_FILE +echo "if ! kubectl get mcpserver fetch &>/dev/null; then" >> $OUTPUT_FILE +echo " echo 'apiVersion: acp.humanlayer.dev/v1alpha1 " >> $OUTPUT_FILE +echo "kind: MCPServer" >> $OUTPUT_FILE +echo "metadata:" >> $OUTPUT_FILE +echo " name: fetch" >> $OUTPUT_FILE +echo "spec:" >> $OUTPUT_FILE +echo " transport: \"stdio\"" >> $OUTPUT_FILE +echo " command: \"uvx\"" >> $OUTPUT_FILE +echo " args: [\"mcp-server-fetch\"]" >> $OUTPUT_FILE +echo "' | kubectl apply -f -" >> $OUTPUT_FILE +echo " echo \"Waiting for MCPServer fetch to initialize...\"" >> $OUTPUT_FILE +echo " for i in {1..10}; do" >> $OUTPUT_FILE +echo " if kubectl get mcpserver fetch -o jsonpath='{.status.ready}' 2>/dev/null | grep -q 'true'; then" >> $OUTPUT_FILE +echo " echo \"MCPServer fetch is ready!\"" >> $OUTPUT_FILE +echo " break" >> $OUTPUT_FILE +echo " fi" >> $OUTPUT_FILE +echo " sleep 2" >> $OUTPUT_FILE +echo " echo -n \".\"" >> $OUTPUT_FILE +echo " done" >> $OUTPUT_FILE +echo " echo \"\"" >> $OUTPUT_FILE +echo "else" >> $OUTPUT_FILE +echo " echo \"MCPServer fetch already exists\"" >> $OUTPUT_FILE +echo "fi" >> $OUTPUT_FILE +echo "continue_prompt" >> $OUTPUT_FILE +echo "" >> $OUTPUT_FILE + +# Update agent to use the fetch tool +echo "# Update agent to use fetch tool" >> $OUTPUT_FILE +echo "echo \"Updating agent to use fetch tool...\"" >> $OUTPUT_FILE +echo "echo 'apiVersion: acp.humanlayer.dev/v1alpha1 " >> $OUTPUT_FILE +echo "kind: Agent" >> $OUTPUT_FILE +echo "metadata:" >> $OUTPUT_FILE +echo " name: my-assistant" >> $OUTPUT_FILE +echo "spec:" >> $OUTPUT_FILE +echo " llmRef:" >> $OUTPUT_FILE +echo " name: gpt-4o" >> $OUTPUT_FILE +echo " system: |" >> $OUTPUT_FILE +echo " You are a helpful assistant. Your job is to help the user with their tasks." >> $OUTPUT_FILE +echo " mcpServers:" >> $OUTPUT_FILE +echo " - name: fetch" >> $OUTPUT_FILE +echo "' | kubectl apply -f -" >> $OUTPUT_FILE +echo "echo \"Waiting for updated agent to initialize...\"" >> $OUTPUT_FILE +echo "sleep 5" >> $OUTPUT_FILE +echo "kubectl get agent my-assistant -o wide" >> $OUTPUT_FILE +echo "continue_prompt" >> $OUTPUT_FILE +echo "" >> $OUTPUT_FILE + +# Create a task that uses the fetch tool +echo "# Create a task that uses the fetch tool" >> $OUTPUT_FILE +echo "echo \"Creating a task that uses the fetch tool...\"" >> $OUTPUT_FILE +echo "if ! kubectl get task fetch-task &>/dev/null; then" >> $OUTPUT_FILE +echo " echo 'apiVersion: acp.humanlayer.dev/v1alpha1 " >> $OUTPUT_FILE +echo "kind: Task" >> $OUTPUT_FILE +echo "metadata:" >> $OUTPUT_FILE +echo " name: fetch-task" >> $OUTPUT_FILE +echo "spec:" >> $OUTPUT_FILE +echo " agentRef:" >> $OUTPUT_FILE +echo " name: my-assistant" >> $OUTPUT_FILE +echo " userMessage: \"what is the data at https://lotrapi.co/api/v1/characters/1?\"" >> $OUTPUT_FILE +echo "' | kubectl apply -f -" >> $OUTPUT_FILE +echo " echo \"Waiting for fetch-task to complete...\"" >> $OUTPUT_FILE +echo " for i in {1..30}; do" >> $OUTPUT_FILE +echo " status=\$(kubectl get task fetch-task -o jsonpath='{.status.phase}' 2>/dev/null || echo \"Pending\")" >> $OUTPUT_FILE +echo " if [[ \"\$status\" == \"FinalAnswer\" ]]; then" >> $OUTPUT_FILE +echo " echo \"Task fetch-task completed successfully!\"" >> $OUTPUT_FILE +echo " echo \"Result:\"" >> $OUTPUT_FILE +echo " kubectl get task fetch-task -o jsonpath='{.status.output}'" >> $OUTPUT_FILE +echo " echo \"\"" >> $OUTPUT_FILE +echo " break" >> $OUTPUT_FILE +echo " fi" >> $OUTPUT_FILE +echo " sleep 2" >> $OUTPUT_FILE +echo " echo -n \".\"" >> $OUTPUT_FILE +echo " done" >> $OUTPUT_FILE +echo " echo \"\"" >> $OUTPUT_FILE +echo "else" >> $OUTPUT_FILE +echo " echo \"Task fetch-task already exists\"" >> $OUTPUT_FILE +echo "fi" >> $OUTPUT_FILE +echo "continue_prompt" >> $OUTPUT_FILE +echo "" >> $OUTPUT_FILE + + +# Add a final message +echo "# Add completion message" >> $OUTPUT_FILE +echo "cat << 'EOF'" >> $OUTPUT_FILE +echo "====================================================" >> $OUTPUT_FILE +echo " ACP Setup Complete!" >> $OUTPUT_FILE +echo " " >> $OUTPUT_FILE +echo " You can now interact with ACP using kubectl:" >> $OUTPUT_FILE +echo " - kubectl get llm" >> $OUTPUT_FILE +echo " - kubectl get agent" >> $OUTPUT_FILE +echo " - kubectl get task" >> $OUTPUT_FILE +echo " - kubectl get mcpserver" >> $OUTPUT_FILE +echo " " >> $OUTPUT_FILE +echo " When you're done, you can clean up with:" >> $OUTPUT_FILE +echo " - kubectl delete toolcall --all" >> $OUTPUT_FILE +echo " - kubectl delete task --all" >> $OUTPUT_FILE +echo " - kubectl delete agent --all" >> $OUTPUT_FILE +echo " - kubectl delete mcpserver --all" >> $OUTPUT_FILE +echo " - kubectl delete contactchannel --all" >> $OUTPUT_FILE +echo " - kubectl delete llm --all" >> $OUTPUT_FILE +echo " - kubectl delete secret openai anthropic humanlayer" >> $OUTPUT_FILE +echo " - kind delete cluster" >> $OUTPUT_FILE +echo "====================================================" >> $OUTPUT_FILE +echo "EOF" >> $OUTPUT_FILE + +# Make the script executable +chmod +x $OUTPUT_FILE + +echo "Commands have been extracted to $OUTPUT_FILE" +echo "Review the file contents before running:" +echo "--------------------------------------" +cat $OUTPUT_FILE +echo "--------------------------------------" +echo "To run the commands, execute: $OUTPUT_FILE" \ No newline at end of file From b8994c7294047a57093b270df046b533f866db4e Mon Sep 17 00:00:00 2001 From: Allison Durham Date: Mon, 12 May 2025 19:41:24 -0700 Subject: [PATCH 8/8] a git ignore as well --- .gitignore | 3 +++ 1 file changed, 3 insertions(+) diff --git a/.gitignore b/.gitignore index cac4a27..20d3bef 100644 --- a/.gitignore +++ b/.gitignore @@ -6,3 +6,6 @@ .DS_Store **/.DS_Store +# Generated files +acp_commands.sh +