Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions cmd/ax/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ func init() {

rootCmd.AddCommand(forkCmd)
rootCmd.AddCommand(traceCmd)
rootCmd.AddCommand(monitorCmd)
}

func connect(server string) (*grpc.ClientConn, error) {
Expand Down
296 changes: 296 additions & 0 deletions cmd/ax/monitor.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,296 @@
// Copyright 2026 Google LLC
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package main

import (
"context"
"database/sql"
_ "embed"
"encoding/json"
"fmt"
"log/slog"
"net"
"net/http"
"os"
"path/filepath"
"time"

"github.com/spf13/cobra"
_ "modernc.org/sqlite"
)

//go:embed web/index.html
var dashboardHTML string

var (
monitorAddr string
monitorConfigFile string
)

var monitorCmd = &cobra.Command{
Use: "monitor",
Short: "Start the AX Monitor Dashboard",
Long: `Start a local HTTP server to monitor AX conversations and executions.`,
RunE: runMonitor,
}

func init() {
monitorCmd.Flags().StringVar(&monitorAddr, "addr", "localhost:8080", "Server address to listen on")
monitorCmd.Flags().StringVar(&monitorConfigFile, "config", "ax.yaml", "Path to YAML configuration file")
}

type ConversationResponse struct {
ID string `json:"id"`
Agent string `json:"agent"`
Status string `json:"status"`
LastSeq int32 `json:"last_seq"`
Duration string `json:"duration"`
}

func runMonitor(cmd *cobra.Command, args []string) error {
ctx := cmd.Context()
slog.SetDefault(slog.New(slog.NewJSONHandler(os.Stdout, nil)))

cfg, err := newConfig(cmd, monitorConfigFile)
if err != nil {
return err
}

dbPath := cfg.EventLog.SQLiteConfig.Filename
slog.InfoContext(ctx, "Opening event log database", slog.String("path", dbPath))

if err := os.MkdirAll(filepath.Dir(dbPath), 0755); err != nil {
return fmt.Errorf("failed to create database directory: %w", err)
}

db, err := sql.Open("sqlite", dbPath)
if err != nil {
return fmt.Errorf("failed to open sqlite database: %w", err)
}
defer db.Close()

// Verify database connection
if err := db.PingContext(ctx); err != nil {
return fmt.Errorf("failed to ping database: %w", err)
}

// Create tables if they don't exist (to avoid crashes on fresh setup)
if _, err := db.Exec(`
CREATE TABLE IF NOT EXISTS conversation_log (
conversation_id TEXT NOT NULL,
seq INTEGER NOT NULL,
payload TEXT NOT NULL,
PRIMARY KEY (conversation_id, seq)
)`); err != nil {
return fmt.Errorf("failed to initialize conversation_log table: %w", err)
}

if _, err := db.Exec(`
CREATE TABLE IF NOT EXISTS execution_log (
exec_id TEXT NOT NULL,
payload TEXT NOT NULL,
timestamp DATETIME NOT NULL
)`); err != nil {
return fmt.Errorf("failed to initialize execution_log table: %w", err)
}

if _, err := db.Exec(`CREATE INDEX IF NOT EXISTS idx_execution_log_exec_id ON execution_log(exec_id)`); err != nil {
return fmt.Errorf("failed to create index on execution_log: %w", err)
}

// Setup API handlers
mux := http.NewServeMux()
mux.HandleFunc("/api/conversations", func(w http.ResponseWriter, r *http.Request) {
if r.Method != http.MethodGet {
http.Error(w, "Method not allowed", http.StatusMethodNotAllowed)
return
}

convs, err := fetchConversations(r.Context(), db)
if err != nil {
slog.ErrorContext(r.Context(), "Failed to fetch conversations", slog.Any("error", err))
http.Error(w, "Internal server error", http.StatusInternalServerError)
return
}

w.Header().Set("Content-Type", "application/json")
if err := json.NewEncoder(w).Encode(convs); err != nil {
slog.ErrorContext(r.Context(), "Failed to encode conversations response", slog.Any("error", err))
}
})

mux.HandleFunc("/api/trace", func(w http.ResponseWriter, r *http.Request) {
if r.Method != http.MethodGet {
http.Error(w, "Method not allowed", http.StatusMethodNotAllowed)
return
}

convID := r.URL.Query().Get("conversation")
if convID == "" {
http.Error(w, "Missing conversation ID", http.StatusBadRequest)
return
}

data, err := loadTraceData(r.Context(), cfg, convID)
if err != nil {
slog.ErrorContext(r.Context(), "Failed to load trace data", slog.String("conversation_id", convID), slog.Any("error", err))
http.Error(w, "Internal server error", http.StatusInternalServerError)
return
}

w.Header().Set("Content-Type", "application/json")
if err := json.NewEncoder(w).Encode(data); err != nil {
slog.ErrorContext(r.Context(), "Failed to encode trace data response", slog.Any("error", err))
}
})

mux.HandleFunc("/", func(w http.ResponseWriter, r *http.Request) {
w.Header().Set("Content-Type", "text/html; charset=utf-8")
fmt.Fprint(w, dashboardHTML)
})

listener, err := net.Listen("tcp", monitorAddr)
if err != nil {
return fmt.Errorf("failed to bind server to %s: %w", monitorAddr, err)
}
defer listener.Close()

addr := listener.Addr().String()
host, port, err := net.SplitHostPort(addr)
if err == nil && (host == "::" || host == "0.0.0.0" || host == "" || host == "[::]") {
addr = fmt.Sprintf("localhost:%s", port)
}
url := fmt.Sprintf("http://%s", addr)

slog.InfoContext(ctx, "AX Monitor Dashboard started", slog.String("url", url))

go openBrowser(url)

server := &http.Server{
Handler: mux,
}

return server.Serve(listener)
}

func fetchConversations(ctx context.Context, db *sql.DB) ([]ConversationResponse, error) {
query := `
SELECT
c.conversation_id,
c.last_seq,
c.state,
e.agent_id,
e.start_time,
e.end_time
FROM (
SELECT conversation_id, seq AS last_seq,
json_extract(payload, '$.exec_id') AS exec_id,
json_extract(payload, '$.state') AS state
FROM conversation_log
WHERE (conversation_id, seq) IN (
SELECT conversation_id, MAX(seq)
FROM conversation_log
GROUP BY conversation_id
)
) c
LEFT JOIN (
SELECT
exec_id,
json_extract(payload, '$.agent_id') AS agent_id,
MIN(timestamp) AS start_time,
MAX(timestamp) AS end_time
FROM execution_log
GROUP BY exec_id
) e ON c.exec_id = e.exec_id;
`
rows, err := db.QueryContext(ctx, query)
if err != nil {
return nil, err
}
defer rows.Close()

convs := []ConversationResponse{}
for rows.Next() {
var id string
var lastSeq int32
var state string
var agentID sql.NullString
var startTimeStr, endTimeStr sql.NullString

err := rows.Scan(&id, &lastSeq, &state, &agentID, &startTimeStr, &endTimeStr)
if err != nil {
return nil, err
}

agent := "unknown"
if agentID.Valid && agentID.String != "" {
agent = agentID.String
// Strip special prefix if it starts with "__"
if len(agent) > 2 && agent[:2] == "__" {
agent = agent[2:]
}
}

durationStr := "N/A"
if startTimeStr.Valid && endTimeStr.Valid {
startTime, err1 := parseSQLiteTime(startTimeStr.String)
endTime, err2 := parseSQLiteTime(endTimeStr.String)
if err1 == nil && err2 == nil {
duration := endTime.Sub(startTime)
durationStr = fmt.Sprintf("%.1fs", duration.Seconds())
} else {
slog.WarnContext(ctx, "Failed to parse sqlite timestamps", slog.String("start", startTimeStr.String), slog.String("end", endTimeStr.String), slog.Any("err1", err1), slog.Any("err2", err2))
}
}

status := state
if len(status) > 6 && status[:6] == "STATE_" {
status = status[6:]
}
if status == "PENDING" {
status = "RUNNING"
}

convs = append(convs, ConversationResponse{
ID: id,
Agent: agent,
Status: status,
LastSeq: lastSeq,
Duration: durationStr,
})
}

return convs, nil
}

func parseSQLiteTime(s string) (time.Time, error) {
layouts := []string{
time.RFC3339Nano,
time.RFC3339,
"2006-01-02 15:04:05.999999999-07:00",
"2006-01-02 15:04:05.999999999",
"2006-01-02 15:04:05.999999999 -0700 MST",
"2006-01-02 15:04:05",
}
var err error
var t time.Time
for _, layout := range layouts {
t, err = time.Parse(layout, s)
if err == nil {
return t, nil
}
}
return time.Time{}, err
}
Loading