diff --git a/.gitignore b/.gitignore index 6d5ed9f..226bf09 100644 --- a/.gitignore +++ b/.gitignore @@ -1,3 +1,4 @@ commander .DS_Store vendor/ +.claude/ diff --git a/go.mod b/go.mod index 729a388..aec0290 100644 --- a/go.mod +++ b/go.mod @@ -3,26 +3,26 @@ module commander go 1.25.4 require ( + github.com/coreos/go-oidc/v3 v3.18.0 github.com/gorilla/websocket v1.5.3 - github.com/mlund01/squadron-wire v0.0.40 + github.com/mlund01/squadron-wire v0.0.41 + github.com/onsi/ginkgo/v2 v2.28.1 + github.com/onsi/gomega v1.39.1 + golang.org/x/crypto v0.50.0 + golang.org/x/oauth2 v0.36.0 ) require ( github.com/Masterminds/semver/v3 v3.4.0 // indirect - github.com/coreos/go-oidc/v3 v3.18.0 // indirect github.com/go-jose/go-jose/v4 v4.1.4 // indirect github.com/go-logr/logr v1.4.3 // indirect github.com/go-task/slim-sprig/v3 v3.0.0 // indirect github.com/google/go-cmp v0.7.0 // indirect github.com/google/pprof v0.0.0-20260115054156-294ebfa9ad83 // indirect github.com/google/uuid v1.6.0 // indirect - github.com/onsi/ginkgo/v2 v2.28.1 // indirect - github.com/onsi/gomega v1.39.1 // indirect go.yaml.in/yaml/v3 v3.0.4 // indirect - golang.org/x/crypto v0.50.0 // indirect golang.org/x/mod v0.34.0 // indirect golang.org/x/net v0.52.0 // indirect - golang.org/x/oauth2 v0.36.0 // indirect golang.org/x/sync v0.20.0 // indirect golang.org/x/sys v0.43.0 // indirect golang.org/x/text v0.36.0 // indirect diff --git a/go.sum b/go.sum index 7535674..7068bd4 100644 --- a/go.sum +++ b/go.sum @@ -2,12 +2,22 @@ github.com/Masterminds/semver/v3 v3.4.0 h1:Zog+i5UMtVoCU8oKka5P7i9q9HgrJeGzI9SA1 github.com/Masterminds/semver/v3 v3.4.0/go.mod h1:4V+yj/TJE1HU9XfppCwVMZq3I84lprf4nC11bSS5beM= github.com/coreos/go-oidc/v3 v3.18.0 h1:V9orjXynvu5wiC9SemFTWnG4F45v403aIcjWo0d41+A= github.com/coreos/go-oidc/v3 v3.18.0/go.mod h1:DYCf24+ncYi+XkIH97GY1+dqoRlbaSI26KVTCI9SrY4= +github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= +github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/gkampitakis/ciinfo v0.3.2 h1:JcuOPk8ZU7nZQjdUhctuhQofk7BGHuIy0c9Ez8BNhXs= +github.com/gkampitakis/ciinfo v0.3.2/go.mod h1:1NIwaOcFChN4fa/B0hEBdAb6npDlFL8Bwx4dfRLRqAo= +github.com/gkampitakis/go-diff v1.3.2 h1:Qyn0J9XJSDTgnsgHRdz9Zp24RaJeKMUHg2+PDZZdC4M= +github.com/gkampitakis/go-diff v1.3.2/go.mod h1:LLgOrpqleQe26cte8s36HTWcTmMEur6OPYerdAAS9tk= +github.com/gkampitakis/go-snaps v0.5.15 h1:amyJrvM1D33cPHwVrjo9jQxX8g/7E2wYdZ+01KS3zGE= +github.com/gkampitakis/go-snaps v0.5.15/go.mod h1:HNpx/9GoKisdhw9AFOBT1N7DBs9DiHo/hGheFGBZ+mc= github.com/go-jose/go-jose/v4 v4.1.4 h1:moDMcTHmvE6Groj34emNPLs/qtYXRVcd6S7NHbHz3kA= github.com/go-jose/go-jose/v4 v4.1.4/go.mod h1:x4oUasVrzR7071A4TnHLGSPpNOm2a21K9Kf04k1rs08= github.com/go-logr/logr v1.4.3 h1:CjnDlHq8ikf6E492q6eKboGOC0T8CDaOvkHCIg8idEI= github.com/go-logr/logr v1.4.3/go.mod h1:9T104GzyrTigFIr8wt5mBrctHMim0Nb2HLGrmQ40KvY= github.com/go-task/slim-sprig/v3 v3.0.0 h1:sUs3vkvUymDpBKi3qH1YSqBQk9+9D/8M2mN1vB6EwHI= github.com/go-task/slim-sprig/v3 v3.0.0/go.mod h1:W848ghGpv3Qj3dhTPRyJypKRiqCdHZiAzKg9hl15HA8= +github.com/goccy/go-yaml v1.18.0 h1:8W7wMFS12Pcas7KU+VVkaiCng+kG8QiFeFwzFb+rwuw= +github.com/goccy/go-yaml v1.18.0/go.mod h1:XBurs7gK8ATbW4ZPGKgcbrY1Br56PdM69F7LkFRi1kA= github.com/google/go-cmp v0.7.0 h1:wk8382ETsv4JYUZwIsn6YpYiWiBsYLSJiTsyBybVuN8= github.com/google/go-cmp v0.7.0/go.mod h1:pXiqmnSA92OHEEa9HXL2W4E7lf9JzCmGVUdgjX3N/iU= github.com/google/pprof v0.0.0-20260115054156-294ebfa9ad83 h1:z2ogiKUYzX5Is6zr/vP9vJGqPwcdqsWjOt+V8J7+bTc= @@ -16,40 +26,58 @@ github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0= github.com/google/uuid v1.6.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= github.com/gorilla/websocket v1.5.3 h1:saDtZ6Pbx/0u+bgYQ3q96pZgCzfhKXGPqt7kZ72aNNg= github.com/gorilla/websocket v1.5.3/go.mod h1:YR8l580nyteQvAITg2hZ9XVh4b55+EU/adAjf1fMHhE= -github.com/mlund01/squadron-wire v0.0.40 h1:umcnOIzRgNP4m7AThrLIAP6s4j1sC798FII4Tx6Ijec= -github.com/mlund01/squadron-wire v0.0.40/go.mod h1:BmgUAhEkibCiJ2Cre+qfLs/KjeqTmm4BkfcMu6M+jLU= +github.com/joshdk/go-junit v1.0.0 h1:S86cUKIdwBHWwA6xCmFlf3RTLfVXYQfvanM5Uh+K6GE= +github.com/joshdk/go-junit v1.0.0/go.mod h1:TiiV0PqkaNfFXjEiyjWM3XXrhVyCa1K4Zfga6W52ung= +github.com/kr/pretty v0.3.1 h1:flRD4NNwYAUpkphVc1HcthR4KEIFJ65n8Mw5qdRn3LE= +github.com/kr/pretty v0.3.1/go.mod h1:hoEshYVHaxMs3cyo3Yncou5ZscifuDolrwPKZanG3xk= +github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY= +github.com/kr/text v0.2.0/go.mod h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE= +github.com/maruel/natural v1.1.1 h1:Hja7XhhmvEFhcByqDoHz9QZbkWey+COd9xWfCfn1ioo= +github.com/maruel/natural v1.1.1/go.mod h1:v+Rfd79xlw1AgVBjbO0BEQmptqb5HvL/k9GRHB7ZKEg= +github.com/mfridman/tparse v0.18.0 h1:wh6dzOKaIwkUGyKgOntDW4liXSo37qg5AXbIhkMV3vE= +github.com/mfridman/tparse v0.18.0/go.mod h1:gEvqZTuCgEhPbYk/2lS3Kcxg1GmTxxU7kTC8DvP0i/A= +github.com/mlund01/squadron-wire v0.0.41 h1:Jf4ElHuvtIE1PpbnH7HtwMZwkU8HA/ygEkWdwQFXDn8= +github.com/mlund01/squadron-wire v0.0.41/go.mod h1:BmgUAhEkibCiJ2Cre+qfLs/KjeqTmm4BkfcMu6M+jLU= github.com/onsi/ginkgo/v2 v2.28.1 h1:S4hj+HbZp40fNKuLUQOYLDgZLwNUVn19N3Atb98NCyI= github.com/onsi/ginkgo/v2 v2.28.1/go.mod h1:CLtbVInNckU3/+gC8LzkGUb9oF+e8W8TdUsxPwvdOgE= github.com/onsi/gomega v1.39.1 h1:1IJLAad4zjPn2PsnhH70V4DKRFlrCzGBNrNaru+Vf28= github.com/onsi/gomega v1.39.1/go.mod h1:hL6yVALoTOxeWudERyfppUcZXjMwIMLnuSfruD2lcfg= +github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= +github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= +github.com/rogpeppe/go-internal v1.13.1 h1:KvO1DLK/DRN07sQ1LQKScxyZJuNnedQ5/wKSR38lUII= +github.com/rogpeppe/go-internal v1.13.1/go.mod h1:uMEvuHeurkdAXX61udpOXGD/AzZDWNMNyH2VO9fmH0o= +github.com/stretchr/testify v1.8.4 h1:CcVxjf3Q8PM0mHUKJCdn+eZZtm5yQwehR5yeSVQQcUk= +github.com/stretchr/testify v1.8.4/go.mod h1:sz/lmYIOXD/1dqDmKjjqLyZ2RngseejIcXlSw2iwfAo= +github.com/tidwall/gjson v1.18.0 h1:FIDeeyB800efLX89e5a8Y0BNH+LOngJyGrIWxG2FKQY= +github.com/tidwall/gjson v1.18.0/go.mod h1:/wbyibRr2FHMks5tjHJ5F8dMZh3AcwJEMf5vlfC0lxk= +github.com/tidwall/match v1.1.1 h1:+Ho715JplO36QYgwN9PGYNhgZvoUSc9X2c80KVTi+GA= +github.com/tidwall/match v1.1.1/go.mod h1:eRSPERbgtNPcGhD8UCthc6PmLEQXEWd3PRB5JTxsfmM= +github.com/tidwall/pretty v1.2.1 h1:qjsOFOWWQl+N3RsoF5/ssm1pHmJJwhjlSbZ51I6wMl4= +github.com/tidwall/pretty v1.2.1/go.mod h1:ITEVvHYasfjBbM0u2Pg8T2nJnzm8xPwvNhhsoaGGjNU= +github.com/tidwall/sjson v1.2.5 h1:kLy8mja+1c9jlljvWTlSazM7cKDRfJuR/bOJhcY5NcY= +github.com/tidwall/sjson v1.2.5/go.mod h1:Fvgq9kS/6ociJEDnK0Fk1cpYF4FIW6ZF7LAe+6jwd28= go.yaml.in/yaml/v3 v3.0.4 h1:tfq32ie2Jv2UxXFdLJdh3jXuOzWiL1fo0bu/FbuKpbc= go.yaml.in/yaml/v3 v3.0.4/go.mod h1:DhzuOOF2ATzADvBadXxruRBLzYTpT36CKvDb3+aBEFg= golang.org/x/crypto v0.50.0 h1:zO47/JPrL6vsNkINmLoo/PH1gcxpls50DNogFvB5ZGI= golang.org/x/crypto v0.50.0/go.mod h1:3muZ7vA7PBCE6xgPX7nkzzjiUq87kRItoJQM1Yo8S+Q= -golang.org/x/mod v0.32.0 h1:9F4d3PHLljb6x//jOyokMv3eX+YDeepZSEo3mFJy93c= -golang.org/x/mod v0.32.0/go.mod h1:SgipZ/3h2Ci89DlEtEXWUk/HteuRin+HHhN+WbNhguU= golang.org/x/mod v0.34.0 h1:xIHgNUUnW6sYkcM5Jleh05DvLOtwc6RitGHbDk4akRI= golang.org/x/mod v0.34.0/go.mod h1:ykgH52iCZe79kzLLMhyCUzhMci+nQj+0XkbXpNYtVjY= -golang.org/x/net v0.49.0 h1:eeHFmOGUTtaaPSGNmjBKpbng9MulQsJURQUAfUwY++o= -golang.org/x/net v0.49.0/go.mod h1:/ysNB2EvaqvesRkuLAyjI1ycPZlQHM3q01F02UY/MV8= golang.org/x/net v0.52.0 h1:He/TN1l0e4mmR3QqHMT2Xab3Aj3L9qjbhRm78/6jrW0= golang.org/x/net v0.52.0/go.mod h1:R1MAz7uMZxVMualyPXb+VaqGSa3LIaUqk0eEt3w36Sw= golang.org/x/oauth2 v0.36.0 h1:peZ/1z27fi9hUOFCAZaHyrpWG5lwe0RJEEEeH0ThlIs= golang.org/x/oauth2 v0.36.0/go.mod h1:YDBUJMTkDnJS+A4BP4eZBjCqtokkg1hODuPjwiGPO7Q= -golang.org/x/sync v0.19.0 h1:vV+1eWNmZ5geRlYjzm2adRgW2/mcpevXNg50YZtPCE4= -golang.org/x/sync v0.19.0/go.mod h1:9KTHXmSnoGruLpwFjVSX0lNNA75CykiMECbovNTZqGI= golang.org/x/sync v0.20.0 h1:e0PTpb7pjO8GAtTs2dQ6jYa5BWYlMuX047Dco/pItO4= golang.org/x/sync v0.20.0/go.mod h1:9xrNwdLfx4jkKbNva9FpL6vEN7evnE43NNNJQ2LF3+0= -golang.org/x/sys v0.40.0 h1:DBZZqJ2Rkml6QMQsZywtnjnnGvHza6BTfYFWY9kjEWQ= -golang.org/x/sys v0.40.0/go.mod h1:OgkHotnGiDImocRcuBABYBEXf8A9a87e/uXjp9XT3ks= golang.org/x/sys v0.43.0 h1:Rlag2XtaFTxp19wS8MXlJwTvoh8ArU6ezoyFsMyCTNI= golang.org/x/sys v0.43.0/go.mod h1:4GL1E5IUh+htKOUEOaiffhrAeqysfVGipDYzABqnCmw= -golang.org/x/text v0.33.0 h1:B3njUFyqtHDUI5jMn1YIr5B0IE2U0qck04r6d4KPAxE= -golang.org/x/text v0.33.0/go.mod h1:LuMebE6+rBincTi9+xWTY8TztLzKHc/9C1uBCG27+q8= golang.org/x/text v0.36.0 h1:JfKh3XmcRPqZPKevfXVpI1wXPTqbkE5f7JA92a55Yxg= golang.org/x/text v0.36.0/go.mod h1:NIdBknypM8iqVmPiuco0Dh6P5Jcdk8lJL0CUebqK164= -golang.org/x/tools v0.41.0 h1:a9b8iMweWG+S0OBnlU36rzLp20z1Rp10w+IY2czHTQc= -golang.org/x/tools v0.41.0/go.mod h1:XSY6eDqxVNiYgezAVqqCeihT4j1U2CCsqvH3WhQpnlg= golang.org/x/tools v0.43.0 h1:12BdW9CeB3Z+J/I/wj34VMl8X+fEXBxVR90JeMX5E7s= golang.org/x/tools v0.43.0/go.mod h1:uHkMso649BX2cZK6+RpuIPXS3ho2hZo4FVwfoy1vIk0= +google.golang.org/protobuf v1.36.7 h1:IgrO7UwFQGJdRNXH/sQux4R1Dj1WAKcLElzeeRaXV2A= +google.golang.org/protobuf v1.36.7/go.mod h1:jduwjTPXsFjZGTmRluh+L6NjiWu7pchiJ2/5YcXBHnY= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= +gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127 h1:qIbj1fsPNlZgppZ+VLlY7N33q108Sa+fhmuc+sWQYwY= +gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= +gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA= +gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= diff --git a/internal/api/humaninputs.go b/internal/api/humaninputs.go new file mode 100644 index 0000000..d2977dc --- /dev/null +++ b/internal/api/humaninputs.go @@ -0,0 +1,214 @@ +package api + +import ( + "encoding/json" + "fmt" + "log" + "net/http" + "strconv" + "time" + + "github.com/mlund01/squadron-wire/protocol" + + "commander/internal/auth" + "commander/internal/hub" +) + +// Human-in-the-loop (ask_human) endpoints. Commander is a pure proxy: +// squadron owns the records, the mission event stream carries live +// updates. These handlers translate REST requests into wire RPCs. + +func handleListHumanInputs(h *hub.Hub) http.HandlerFunc { + return func(w http.ResponseWriter, r *http.Request) { + instanceID := r.PathValue("id") + conn := h.GetConnection(instanceID) + if conn == nil { + writeJSON(w, http.StatusServiceUnavailable, map[string]string{"error": "instance disconnected"}) + return + } + + q := r.URL.Query() + payload := protocol.GetHumanInputsPayload{ + State: q.Get("state"), + MissionID: q.Get("missionId"), + OldestFirst: q.Get("order") != "newest", + } + if v := q.Get("limit"); v != "" { + if n, err := strconv.Atoi(v); err == nil && n > 0 { + payload.Limit = n + } + } + if v := q.Get("offset"); v != "" { + if n, err := strconv.Atoi(v); err == nil && n >= 0 { + payload.Offset = n + } + } + + reqEnv, err := protocol.NewRequest(protocol.TypeGetHumanInputs, &payload) + if err != nil { + writeJSON(w, http.StatusInternalServerError, map[string]string{"error": err.Error()}) + return + } + resp, err := h.SendRequest(instanceID, reqEnv, proxyTimeout) + if err != nil { + writeJSON(w, http.StatusServiceUnavailable, map[string]string{"error": err.Error()}) + return + } + if resp.Type == protocol.TypeError { + var errPayload protocol.ErrorPayload + _ = protocol.DecodePayload(resp, &errPayload) + writeJSON(w, http.StatusBadGateway, map[string]string{"error": errPayload.Message}) + return + } + var result protocol.GetHumanInputsResultPayload + if err := protocol.DecodePayload(resp, &result); err != nil { + writeJSON(w, http.StatusInternalServerError, map[string]string{"error": err.Error()}) + return + } + writeJSON(w, http.StatusOK, map[string]any{ + "humanInputs": result.HumanInputs, + "total": result.Total, + }) + } +} + +func handleResolveHumanInput(h *hub.Hub) http.HandlerFunc { + return func(w http.ResponseWriter, r *http.Request) { + instanceID := r.PathValue("id") + toolCallID := r.PathValue("callId") + + conn := h.GetConnection(instanceID) + if conn == nil { + writeJSON(w, http.StatusServiceUnavailable, map[string]string{"error": "instance disconnected"}) + return + } + + var body struct { + Response string `json:"response"` + } + if err := json.NewDecoder(r.Body).Decode(&body); err != nil { + writeJSON(w, http.StatusBadRequest, map[string]string{"error": "invalid request body"}) + return + } + if body.Response == "" { + writeJSON(w, http.StatusBadRequest, map[string]string{"error": "response is required"}) + return + } + + reqEnv, err := protocol.NewRequest(protocol.TypeResolveHumanInput, &protocol.ResolveHumanInputPayload{ + ToolCallID: toolCallID, + Response: body.Response, + ResponderUserID: responderIDFromRequest(r), + }) + if err != nil { + writeJSON(w, http.StatusInternalServerError, map[string]string{"error": err.Error()}) + return + } + resp, err := h.SendRequest(instanceID, reqEnv, proxyTimeout) + if err != nil { + writeJSON(w, http.StatusServiceUnavailable, map[string]string{"error": err.Error()}) + return + } + if resp.Type == protocol.TypeError { + var errPayload protocol.ErrorPayload + _ = protocol.DecodePayload(resp, &errPayload) + writeJSON(w, http.StatusBadGateway, map[string]string{"error": errPayload.Message}) + return + } + var result protocol.ResolveHumanInputResultPayload + if err := protocol.DecodePayload(resp, &result); err != nil { + writeJSON(w, http.StatusInternalServerError, map[string]string{"error": err.Error()}) + return + } + if !result.Accepted { + status := http.StatusBadGateway + if result.Reason == "not found" { + status = http.StatusNotFound + } + writeJSON(w, status, map[string]string{"error": result.Reason}) + return + } + writeJSON(w, http.StatusOK, map[string]any{"humanInput": result.HumanInput}) + } +} + +// handleStreamHumanInputs is an SSE endpoint that pushes every +// human_input_requested / human_input_resolved mission event for the +// given squadron to the browser as it happens. Browsers get instant +// alerts without polling, and the stream survives background-tab +// throttling the way EventSource connections do. +// +// Commander already subscribes globally to squadron events on register, +// so by the time the browser opens this stream, the hub is seeing every +// relevant event in real time; we just fan them out to the SSE client. +func handleStreamHumanInputs(h *hub.Hub) http.HandlerFunc { + return func(w http.ResponseWriter, r *http.Request) { + instanceID := r.PathValue("id") + conn := h.GetConnection(instanceID) + if conn == nil { + writeJSON(w, http.StatusServiceUnavailable, map[string]string{"error": "instance disconnected"}) + return + } + + w.Header().Set("Content-Type", "text/event-stream") + w.Header().Set("Cache-Control", "no-cache") + w.Header().Set("Connection", "keep-alive") + w.Header().Set("X-Accel-Buffering", "no") // disable nginx buffering + w.Header().Set("Access-Control-Allow-Origin", "*") + + flusher, ok := w.(http.Flusher) + if !ok { + http.Error(w, "streaming not supported", http.StatusInternalServerError) + return + } + + // Send an opening comment so proxies don't buffer the first + // real event, and the client sees the connection is live. + fmt.Fprintf(w, ": open %d\n\n", time.Now().Unix()) + flusher.Flush() + + ch, cleanup := conn.SubscribeHumanInputEvents() + defer cleanup() + + // Keepalive pulses prevent intermediaries (and some browsers) + // from closing an idle stream after ~30s of silence. + pulse := time.NewTicker(20 * time.Second) + defer pulse.Stop() + + ctx := r.Context() + for { + select { + case <-ctx.Done(): + return + case <-pulse.C: + fmt.Fprintf(w, ": ping %d\n\n", time.Now().Unix()) + flusher.Flush() + case ev, ok := <-ch: + if !ok { + return + } + data, err := json.Marshal(ev) + if err != nil { + log.Printf("human-input SSE marshal: %v", err) + continue + } + fmt.Fprintf(w, "event: %s\ndata: %s\n\n", ev.EventType, data) + flusher.Flush() + } + } + } +} + +// responderIDFromRequest returns a stable identifier for the user who +// submitted the response. Falls back gracefully when auth is disabled so +// local dev still works. +func responderIDFromRequest(r *http.Request) string { + sess := auth.SessionFromContext(r.Context()) + if sess == nil { + return "" + } + if sess.Sub != "" { + return sess.Sub + } + return sess.Email +} diff --git a/internal/api/routes.go b/internal/api/routes.go index 9afe01c..68f362e 100644 --- a/internal/api/routes.go +++ b/internal/api/routes.go @@ -104,6 +104,12 @@ func RegisterRoutes(mux *http.ServeMux, h *hub.Hub, ka *keepalive.KeepAlive) { mux.HandleFunc("GET /api/instances/{id}/agents/{name}/chats", handleChatHistory(h)) mux.HandleFunc("GET /api/instances/{id}/chats/{sessionId}/messages", handleChatMessages(h)) mux.HandleFunc("DELETE /api/instances/{id}/chats/{sessionId}", handleArchiveChat(h)) + + // Human-in-the-loop (ask_human) endpoints — commander proxies to + // the squadron that owns the records. + mux.HandleFunc("GET /api/instances/{id}/human-inputs", handleListHumanInputs(h)) + mux.HandleFunc("GET /api/instances/{id}/human-inputs/stream", handleStreamHumanInputs(h)) + mux.HandleFunc("POST /api/instances/{id}/human-inputs/{callId}/resolve", handleResolveHumanInput(h)) } func handleListInstances(h *hub.Hub) http.HandlerFunc { diff --git a/internal/hub/connection.go b/internal/hub/connection.go index 0049670..0618896 100644 --- a/internal/hub/connection.go +++ b/internal/hub/connection.go @@ -37,6 +37,13 @@ type Connection struct { chatMu sync.Mutex chatSubs map[string][]chan *protocol.ChatEventPayload // sessionID → subscriber channels chatBuffer map[string][]*protocol.ChatEventPayload // buffered events before first subscriber + + // Instance-wide human-input event fan-out. Every mission event of + // type human_input_requested or human_input_resolved is pushed to + // all listeners on this slice. The SSE endpoint exposes this so the + // browser gets an instant notification — no polling, no throttling. + humanInputMu sync.Mutex + humanInputSubs []chan *protocol.MissionEventPayload } // NewConnection creates a new Connection wrapping a WebSocket. @@ -194,6 +201,15 @@ func (c *Connection) fanOutMissionEvent(env *protocol.Envelope) { return } + // Feed the instance-wide human-input stream whenever one of those + // events comes through. Unlike the per-mission path this one never + // buffers — if no one's listening the event is dropped; if someone + // is, everyone gets a copy immediately. + if payload.EventType == protocol.EventHumanInputRequested || + payload.EventType == protocol.EventHumanInputResolved { + c.fanOutHumanInputEvent(&payload) + } + c.eventMu.Lock() subs := c.eventSubs[payload.MissionID] if len(subs) == 0 { @@ -213,6 +229,45 @@ func (c *Connection) fanOutMissionEvent(env *protocol.Envelope) { } } +// SubscribeHumanInputEvents registers a channel that receives every +// human_input_requested / human_input_resolved mission event for this +// instance. Used by the SSE endpoint to push alerts to browsers with +// no polling involvement. Call the returned cancel func to unsubscribe. +func (c *Connection) SubscribeHumanInputEvents() (chan *protocol.MissionEventPayload, func()) { + ch := make(chan *protocol.MissionEventPayload, 32) + c.humanInputMu.Lock() + c.humanInputSubs = append(c.humanInputSubs, ch) + c.humanInputMu.Unlock() + + cleanup := func() { + c.humanInputMu.Lock() + defer c.humanInputMu.Unlock() + for i, sub := range c.humanInputSubs { + if sub == ch { + c.humanInputSubs = append(c.humanInputSubs[:i], c.humanInputSubs[i+1:]...) + break + } + } + close(ch) + } + return ch, cleanup +} + +func (c *Connection) fanOutHumanInputEvent(payload *protocol.MissionEventPayload) { + // Hold the lock for the whole fan-out so a concurrent unsubscribe + // can't close a channel mid-send. Channels are buffered + the send + // uses a default branch, so iteration stays bounded. + c.humanInputMu.Lock() + defer c.humanInputMu.Unlock() + for _, ch := range c.humanInputSubs { + select { + case ch <- payload: + default: + // Slow subscriber — drop rather than block the hub. + } + } +} + func (c *Connection) fanOutMissionComplete(env *protocol.Envelope) { var payload protocol.MissionCompletePayload if err := protocol.DecodePayload(env, &payload); err != nil { diff --git a/web/public/sonar-ping.mp3 b/web/public/sonar-ping.mp3 new file mode 100644 index 0000000..22bdec3 Binary files /dev/null and b/web/public/sonar-ping.mp3 differ diff --git a/web/src/App.tsx b/web/src/App.tsx index 571531c..4647f9a 100644 --- a/web/src/App.tsx +++ b/web/src/App.tsx @@ -16,6 +16,7 @@ import { FileViewerPage } from './pages/FileViewerPage' import { VariablesPage } from './pages/VariablesPage' import { SkillsPage } from './pages/SkillsPage' import { SkillDetail } from './pages/SkillDetail' +import { InboxPage } from './pages/InboxPage' function App() { return ( @@ -39,6 +40,7 @@ function App() { } /> } /> } /> + } /> diff --git a/web/src/api/client.ts b/web/src/api/client.ts index ae0c494..cee0ef9 100644 --- a/web/src/api/client.ts +++ b/web/src/api/client.ts @@ -248,3 +248,43 @@ export async function getCostSummary(instanceId: string, from?: string, to?: str const qs = params.toString(); return fetchJSON(`/instances/${instanceId}/costs${qs ? '?' + qs : ''}`); } + +// Human-in-the-loop (ask_human) API — commander proxies to the squadron +// that owns the records. All endpoints are instance-scoped. + +export interface ListHumanInputsOptions { + state?: 'open' | 'resolved'; + missionId?: string; + order?: 'oldest' | 'newest'; + limit?: number; + offset?: number; +} + +export async function listHumanInputs( + instanceId: string, + opts: ListHumanInputsOptions = {}, +): Promise { + const params = new URLSearchParams(); + if (opts.state) params.set('state', opts.state); + if (opts.missionId) params.set('missionId', opts.missionId); + if (opts.order) params.set('order', opts.order); + if (opts.limit) params.set('limit', String(opts.limit)); + if (opts.offset) params.set('offset', String(opts.offset)); + const qs = params.toString(); + return fetchJSON(`/instances/${instanceId}/human-inputs${qs ? '?' + qs : ''}`); +} + +export async function resolveHumanInput( + instanceId: string, + toolCallId: string, + response: string, +): Promise { + return fetchJSON( + `/instances/${instanceId}/human-inputs/${toolCallId}/resolve`, + { + method: 'POST', + headers: { 'Content-Type': 'application/json' }, + body: JSON.stringify({ response }), + }, + ); +} diff --git a/web/src/api/types.ts b/web/src/api/types.ts index fe26efe..9fd728c 100644 --- a/web/src/api/types.ts +++ b/web/src/api/types.ts @@ -419,3 +419,37 @@ export interface WriteBrowseFileResponse { export interface ListSharedFoldersResponse { folders: SharedFolderInfo[]; } + +// Human-in-the-loop (ask_human) types + +export interface HumanInputRequestDTO { + id: string; + missionId?: string; + missionName?: string; + taskId?: string; + taskName?: string; + toolCallId: string; + question: string; + shortSummary?: string; + additionalContext?: string; + choices?: string[]; + // multiSelect=true means the human picks 1+ choices instead of one. + // The submitted response is then a JSON-encoded array of strings + // (e.g. `["A","C"]`); the agent / API contract is single string in + // both cases. + multiSelect?: boolean; + state: 'open' | 'resolved'; + requestedAt: string; + resolvedAt?: string; + response?: string; + responderUserId?: string; +} + +export interface ListHumanInputsResponse { + humanInputs: HumanInputRequestDTO[]; + total: number; +} + +export interface ResolveHumanInputResponse { + humanInput: HumanInputRequestDTO; +} diff --git a/web/src/components/AppLayout.tsx b/web/src/components/AppLayout.tsx index c3caa01..eadf7df 100644 --- a/web/src/components/AppLayout.tsx +++ b/web/src/components/AppLayout.tsx @@ -1,9 +1,15 @@ -import { Outlet } from 'react-router-dom'; +import { Outlet, useParams } from 'react-router-dom'; import { SidebarProvider, SidebarInset } from '@/components/ui/sidebar'; import { TooltipProvider } from '@/components/ui/tooltip'; import { AppSidebar } from './AppSidebar'; +import { useHumanInputAlerts } from '@/hooks/use-human-input-alerts'; export function AppLayout() { + const { id } = useParams<{ id: string }>(); + // Toast + chime on every new ask_human request that arrives while + // the operator is anywhere except the Inbox. + useHumanInputAlerts(id); + return ( diff --git a/web/src/components/AppSidebar.tsx b/web/src/components/AppSidebar.tsx index 493b06e..ef513d9 100644 --- a/web/src/components/AppSidebar.tsx +++ b/web/src/components/AppSidebar.tsx @@ -28,8 +28,10 @@ import { KeyRound, FileCode, FolderOpen, + Inbox, type LucideIcon, } from 'lucide-react'; +import { useOpenHumanInputCount } from '@/hooks/use-human-inputs'; import { Tooltip, TooltipTrigger, TooltipContent } from '@/components/ui/tooltip'; import { ThemeToggle } from '@/components/ThemeToggle'; import { cn } from '@/lib/utils'; @@ -50,6 +52,7 @@ function StatusDot({ tone, live = false, size = 6 }: { tone: 'running' | 'comple const staticNavItems: { label: string; path: string; icon: LucideIcon }[] = [ { label: 'Missions', path: 'missions', icon: Rocket }, + { label: 'Inbox', path: 'inbox', icon: Inbox }, { label: 'Agents', path: 'agents', icon: Bot }, { label: 'Skills', path: 'skills', icon: Sparkles }, { label: 'Tools', path: 'tools', icon: Puzzle }, @@ -122,6 +125,8 @@ export function AppSidebar() { ? [...staticNavItems, { label: 'Folders', path: 'files', icon: FolderOpen }] : staticNavItems; + const inboxCount = useOpenHumanInputCount(id); + const activePath = location.pathname.split('/').at(-1) ?? ''; const activeSection = location.pathname.includes('/missions/') && location.pathname.includes('/run') ? 'missions' @@ -198,8 +203,14 @@ export function AppSidebar() {