Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions .devcontainer/devcontainer.json
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,9 @@
//forward the following ports
"forwardPorts": [8084],

//network
"network": "host",

//mount docker directly on the host
"mounts": ["source=/var/run/docker.sock,target=/var/run/docker.sock,type=bind"],

Expand Down
19 changes: 18 additions & 1 deletion .github/workflows/ci.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -53,4 +53,21 @@ jobs:
- name: Run cmd/main.go tests
working-directory: .
run: |
go test -v ./...
make test

go-e2e-tests:
runs-on: ubuntu-latest
steps:
- name: Checkout repository
uses: actions/checkout@v4

- name: Set up Go
uses: actions/setup-go@v4
with:
go-version: "1.24"
cache: true

- name: Run cmd/main.go tests
working-directory: .
run: |
make e2e
6 changes: 5 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -6,4 +6,8 @@ bin/
.env.local
.env.development.local
.env.test.local
.env.production.local
.env.production.local
/logs/
/kagent-tools
/*.out
*.html
26 changes: 22 additions & 4 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,10 @@ LDFLAGS := -X github.com/kagent-dev/tools/internal/version.Version=$(VERSION) -X
## Location to install dependencies to
LOCALBIN ?= $(shell pwd)/bin

.PHONY: clean
clean:
rm -rf ./bin/kagent-tools-*

.PHONY: fmt
fmt: ## Run go fmt against code.
go fmt ./...
Expand All @@ -23,11 +27,11 @@ vet: ## Run go vet against code.

.PHONY: lint
lint: golangci-lint ## Run golangci-lint linter
$(GOLANGCI_LINT) run
$(GOLANGCI_LINT) run --build-tags=test

.PHONY: lint-fix
lint-fix: golangci-lint ## Run golangci-lint linter and perform fixes
$(GOLANGCI_LINT) run --fix
$(GOLANGCI_LINT) run --build-tags=test --fix

.PHONY: lint-config
lint-config: golangci-lint ## Verify golangci-lint linter configuration
Expand All @@ -43,8 +47,16 @@ tidy: ## Run go mod tidy to ensure dependencies are up to date.
go mod tidy

.PHONY: test
test:
go test -v -cover ./...
test: build lint ## Run all tests with build, lint, and coverage
go test -tags=test -v -cover ./pkg/... ./internal/...

.PHONY: test-only
test-only: ## Run tests only (without build/lint for faster iteration)
go test -tags=test -v -cover ./pkg/... ./internal/...

.PHONY: e2e
e2e: test docker-build
go test -tags=test -v -cover ./e2e/...

bin/kagent-tools-linux-amd64:
CGO_ENABLED=0 GOOS=linux GOARCH=amd64 go build -ldflags "$(LDFLAGS)" -o bin/kagent-tools-linux-amd64 ./cmd
Expand Down Expand Up @@ -143,6 +155,12 @@ docker-build-all: DOCKER_BUILD_ARGS = --progress=plain --builder $(BUILDX_BUILDE
docker-build-all:
$(DOCKER_BUILDER) build $(DOCKER_BUILD_ARGS) $(TOOLS_IMAGE_BUILD_ARGS) -f Dockerfile ./

.PHONY: kind-update-kagent
kind-update-kagent: docker-build
kind get clusters | grep -q $(KIND_CLUSTER_NAME) || kind create cluster --name $(KIND_CLUSTER_NAME)
kind load docker-image --name $(KIND_CLUSTER_NAME) $(TOOLS_IMG)
kubectl patch --namespace kagent deployment/kagent --type='json' -p='[{"op": "replace", "path": "/spec/template/spec/containers/3/image", "value": "$(TOOLS_IMG)"}]'

## Tool Binaries
## Location to install dependencies t

Expand Down
189 changes: 147 additions & 42 deletions cmd/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,24 +7,29 @@ import (
"net/http"
"os"
"os/signal"
"runtime"
"strings"
"sync"
"syscall"
"time"

"github.com/joho/godotenv"
"github.com/kagent-dev/tools/internal/logger"
"github.com/kagent-dev/tools/internal/telemetry"
"github.com/kagent-dev/tools/internal/version"
"github.com/kagent-dev/tools/pkg/logger"
"github.com/kagent-dev/tools/pkg/utils"

"github.com/kagent-dev/tools/pkg/argo"
"github.com/kagent-dev/tools/pkg/cilium"
"github.com/kagent-dev/tools/pkg/helm"
"github.com/kagent-dev/tools/pkg/istio"
"github.com/kagent-dev/tools/pkg/k8s"
"github.com/kagent-dev/tools/pkg/prometheus"
"github.com/mark3labs/mcp-go/server"
"github.com/kagent-dev/tools/pkg/utils"
"github.com/spf13/cobra"
"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/codes"

"github.com/mark3labs/mcp-go/server"
)

var (
Expand Down Expand Up @@ -69,12 +74,36 @@ func run(cmd *cobra.Command, args []string) {
logger.Init()
defer logger.Sync()

logger.Get().Info("Starting "+Name, "version", Version, "git_commit", GitCommit, "build_date", BuildDate)

// Setup context with cancellation for graceful shutdown
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

// Initialize OpenTelemetry tracing
cfg := telemetry.LoadOtelCfg()

err := telemetry.SetupOTelSDK(ctx)
if err != nil {
logger.Get().Error("Failed to setup OpenTelemetry SDK", "error", err)
os.Exit(1)
}

// Start root span for server lifecycle
tracer := otel.Tracer("kagent-tools/server")
ctx, rootSpan := tracer.Start(ctx, "server.lifecycle")
defer rootSpan.End()

rootSpan.SetAttributes(
attribute.String("server.name", Name),
attribute.String("server.version", cfg.Telemetry.ServiceVersion),
attribute.String("server.git_commit", GitCommit),
attribute.String("server.build_date", BuildDate),
attribute.Bool("server.stdio_mode", stdio),
attribute.Int("server.port", port),
attribute.StringSlice("server.tools", tools),
)

logger.Get().Info("Starting "+Name, "version", Version, "git_commit", GitCommit, "build_date", BuildDate)

mcp := server.NewMCPServer(
Name,
Version,
Expand All @@ -91,7 +120,7 @@ func run(cmd *cobra.Command, args []string) {
signal.Notify(signalChan, os.Interrupt, syscall.SIGTERM)

// HTTP server reference (only used when not in stdio mode)
var sseServer *server.StreamableHTTPServer
var httpServer *http.Server

// Start server based on chosen mode
wg.Add(1)
Expand All @@ -101,16 +130,49 @@ func run(cmd *cobra.Command, args []string) {
runStdioServer(ctx, mcp)
}()
} else {
sseServer = server.NewStreamableHTTPServer(mcp)
sseServer := server.NewStreamableHTTPServer(mcp)

// Create a mux to handle different routes
mux := http.NewServeMux()

// Add health endpoint
mux.HandleFunc("/health", func(w http.ResponseWriter, r *http.Request) {
w.WriteHeader(http.StatusOK)
if err := writeResponse(w, []byte("OK")); err != nil {
logger.Get().Error("Failed to write health response", "error", err)
}
})

// Add metrics endpoint (basic implementation for e2e tests)
mux.HandleFunc("/metrics", func(w http.ResponseWriter, r *http.Request) {
w.Header().Set("Content-Type", "text/plain")
w.WriteHeader(http.StatusOK)

// Generate real runtime metrics instead of hardcoded values
metrics := generateRuntimeMetrics()
if err := writeResponse(w, []byte(metrics)); err != nil {
logger.Get().Error("Failed to write metrics response", "error", err)
}
})

// Handle all other routes with the MCP server wrapped in telemetry middleware
mux.Handle("/", telemetry.HTTPMiddleware(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
sseServer.ServeHTTP(w, r)
})))

httpServer = &http.Server{
Addr: fmt.Sprintf(":%d", port),
Handler: mux,
}

go func() {
defer wg.Done()
addr := fmt.Sprintf(":%d", port)
logger.Get().Info("Running KAgent Tools Server", "port", addr, "tools", strings.Join(tools, ","))
if err := sseServer.Start(addr); err != nil {
logger.Get().Info("Running KAgent Tools Server", "port", fmt.Sprintf(":%d", port), "tools", strings.Join(tools, ","))
if err := httpServer.ListenAndServe(); err != nil {
if !errors.Is(err, http.ErrServerClosed) {
logger.Get().Error(err, "Failed to start SSE server")
logger.Get().Error("Failed to start HTTP server", "error", err)
} else {
logger.Get().Info("SSE server closed gracefully.")
logger.Get().Info("HTTP server closed gracefully.")
}
}
}()
Expand All @@ -121,16 +183,23 @@ func run(cmd *cobra.Command, args []string) {
<-signalChan
logger.Get().Info("Received termination signal, shutting down server...")

// Mark root span as shutting down
rootSpan.AddEvent("server.shutdown.initiated")

// Cancel context to notify any context-aware operations
cancel()

// Gracefully shutdown HTTP server if running
if !stdio && sseServer != nil {
if !stdio && httpServer != nil {
shutdownCtx, shutdownCancel := context.WithTimeout(context.Background(), 5*time.Second)
defer shutdownCancel()

if err := sseServer.Shutdown(shutdownCtx); err != nil {
logger.Get().Error(err, "Failed to shutdown server gracefully")
if err := httpServer.Shutdown(shutdownCtx); err != nil {
logger.Get().Error("Failed to shutdown server gracefully", "error", err)
rootSpan.RecordError(err)
rootSpan.SetStatus(codes.Error, "Server shutdown failed")
} else {
rootSpan.AddEvent("server.shutdown.completed")
}
}
}()
Expand All @@ -140,6 +209,53 @@ func run(cmd *cobra.Command, args []string) {
logger.Get().Info("Server shutdown complete")
}

// writeResponse writes data to an HTTP response writer with proper error handling
func writeResponse(w http.ResponseWriter, data []byte) error {
_, err := w.Write(data)
return err
}

// generateRuntimeMetrics generates real runtime metrics for the /metrics endpoint
func generateRuntimeMetrics() string {
var m runtime.MemStats
runtime.ReadMemStats(&m)

now := time.Now().Unix()

// Build metrics in Prometheus format
metrics := strings.Builder{}

// Go runtime info
metrics.WriteString("# HELP go_info Information about the Go environment.\n")
metrics.WriteString("# TYPE go_info gauge\n")
metrics.WriteString(fmt.Sprintf("go_info{version=\"%s\"} 1\n", runtime.Version()))

// Process start time
metrics.WriteString("# HELP process_start_time_seconds Start time of the process since unix epoch in seconds.\n")
metrics.WriteString("# TYPE process_start_time_seconds gauge\n")
metrics.WriteString(fmt.Sprintf("process_start_time_seconds %d\n", now))

// Memory metrics
metrics.WriteString("# HELP go_memstats_alloc_bytes Number of bytes allocated and still in use.\n")
metrics.WriteString("# TYPE go_memstats_alloc_bytes gauge\n")
metrics.WriteString(fmt.Sprintf("go_memstats_alloc_bytes %d\n", m.Alloc))

metrics.WriteString("# HELP go_memstats_total_alloc_bytes Total number of bytes allocated, even if freed.\n")
metrics.WriteString("# TYPE go_memstats_total_alloc_bytes counter\n")
metrics.WriteString(fmt.Sprintf("go_memstats_total_alloc_bytes %d\n", m.TotalAlloc))

metrics.WriteString("# HELP go_memstats_sys_bytes Number of bytes obtained from system.\n")
metrics.WriteString("# TYPE go_memstats_sys_bytes gauge\n")
metrics.WriteString(fmt.Sprintf("go_memstats_sys_bytes %d\n", m.Sys))

// Goroutine count
metrics.WriteString("# HELP go_goroutines Number of goroutines that currently exist.\n")
metrics.WriteString("# TYPE go_goroutines gauge\n")
metrics.WriteString(fmt.Sprintf("go_goroutines %d\n", runtime.NumGoroutine()))

return metrics.String()
}

func runStdioServer(ctx context.Context, mcp *server.MCPServer) {
logger.Get().Info("Running KAgent Tools Server STDIO:", "tools", strings.Join(tools, ","))
stdioServer := server.NewStdioServer(mcp)
Expand All @@ -149,39 +265,28 @@ func runStdioServer(ctx context.Context, mcp *server.MCPServer) {
}

func registerMCP(mcp *server.MCPServer, enabledToolProviders []string, kubeconfig string) {

var toolProviderMap = map[string]func(*server.MCPServer, string){
"utils": utils.RegisterDateTimeTools,
"k8s": k8s.RegisterK8sTools,
"prometheus": prometheus.RegisterPrometheusTools,
"helm": helm.RegisterHelmTools,
"istio": istio.RegisterIstioTools,
"argo": argo.RegisterArgoTools,
"cilium": cilium.RegisterCiliumTools,
// A map to hold tool providers and their registration functions
toolProviderMap := map[string]func(*server.MCPServer){
"argo": argo.RegisterTools,
"cilium": cilium.RegisterTools,
"helm": helm.RegisterTools,
"istio": istio.RegisterTools,
"k8s": func(s *server.MCPServer) { k8s.RegisterTools(s, nil, kubeconfig) },
"prometheus": prometheus.RegisterTools,
"utils": utils.RegisterTools,
}

if len(kubeconfig) > 0 {
logger.Get().Info("Using kubeconfig file", "path", kubeconfig)
}

// If no tools specified, register all tools
// If no specific tools are specified, register all available tools.
if len(enabledToolProviders) == 0 {
logger.Get().Info("No specific tools provided, registering all tools")
for toolProvider, registerFunc := range toolProviderMap {
logger.Get().Info("Registering tools", "provider", toolProvider)
registerFunc(mcp, kubeconfig)
for name := range toolProviderMap {
enabledToolProviders = append(enabledToolProviders, name)
}
return
}

// Register only the specified tools
logger.Get().Info("provider list", "tools", enabledToolProviders)
for _, toolProviderName := range enabledToolProviders {
if registerFunc, ok := toolProviderMap[strings.ToLower(toolProviderName)]; ok {
logger.Get().Info("Registering tool", "provider", toolProviderName)
registerFunc(mcp, kubeconfig)
if registerFunc, ok := toolProviderMap[toolProviderName]; ok {
registerFunc(mcp)
} else {
logger.Get().Error(nil, "Unknown tool specified", "provider", toolProviderName)
logger.Get().Error("Unknown tool specified", "provider", toolProviderName)
}
}
}
Loading
Loading