Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 20 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,25 @@
# Changelog

## [Unreleased]

### Added

- **`Metrics` interface** — `RecordCall(server, tool string, dur time.Duration, err error)` and
`RecordToolList(server string, count int)`. Register an implementation via `WithMetrics(m Metrics)`.
Passing `nil` is a no-op. Panics inside any method are recovered by the library. Both
`WithMetrics` and `AfterCallHook` may be registered simultaneously.
- **`RecordCall`** is invoked after every `CallTool` invocation (success, error, and pre-RPC
failures such as `ErrToolNotFound`). `dur` measures the wall-clock time of the upstream MCP
call only; for pre-RPC failures it is near-zero and accurately reflects that no RPC was made.
- **`RecordToolList`** is invoked after every successful tool-list fetch — both on initial connect
and after a live `notifications/tools/list_changed` refresh.
- **Auto-detected `clientInfo.version`**: the MCP handshake now advertises the consuming module's
real version read via `runtime/debug.ReadBuildInfo()`. Falls back to `"dev"` when build info is
unavailable or the version is the Go sentinel `"(devel)"` (produced by `go run` and untagged
builds). `WithClientIdentity` continues to override both name and version.

All changes are purely additive; no existing API is modified.

## [v0.2.0] — 2026-05-11

### Added
Expand Down
11 changes: 10 additions & 1 deletion caller.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,8 @@ func (mx *Multiplexer) CallTool(ctx context.Context, server, toolName string, ar
return nil, fmt.Errorf("%w: %q", ErrServerDown, server)
}

start := time.Now()

var toolMeta ToolInfo
found := false
for _, ti := range entryTools {
Expand All @@ -68,7 +70,9 @@ func (mx *Multiplexer) CallTool(ctx context.Context, server, toolName string, ar
}
}
if !found {
return nil, fmt.Errorf("%w: %s on server %s", ErrToolNotFound, toolName, server)
err := fmt.Errorf("%w: %s on server %s", ErrToolNotFound, toolName, server)
safeRecordCall(mx.opts.metrics, server, toolName, time.Since(start), err)
return nil, err
}

params := &mcp.CallToolParams{Name: toolName}
Expand Down Expand Up @@ -105,6 +109,8 @@ func (mx *Multiplexer) CallTool(ctx context.Context, server, toolName string, ar
defer cancel()

rawResult, callErr := entrySess.CallTool(callCtx, params)
dur := time.Since(start)

if callErr != nil {
var wrapped error
if callCtx.Err() != nil && errors.Is(callCtx.Err(), context.DeadlineExceeded) {
Expand All @@ -115,6 +121,7 @@ func (mx *Multiplexer) CallTool(ctx context.Context, server, toolName string, ar
mx.opts.logger.Error("mcpx: call failed",
F("server", server), F("tool", toolName), F("error", wrapped.Error()))
mx.runAfterCall(ctx, server, toolMeta, finalArgs, nil, wrapped)
safeRecordCall(mx.opts.metrics, server, toolName, dur, wrapped)
return nil, wrapped
}

Expand All @@ -124,12 +131,14 @@ func (mx *Multiplexer) CallTool(ctx context.Context, server, toolName string, ar
newText, err := hook(ctx, server, toolMeta, result.Text)
if err != nil {
mx.runAfterCall(ctx, server, toolMeta, finalArgs, result, err)
safeRecordCall(mx.opts.metrics, server, toolName, dur, err)
return nil, err
}
result.Text = newText
}

mx.runAfterCall(ctx, server, toolMeta, finalArgs, result, nil)
safeRecordCall(mx.opts.metrics, server, toolName, dur, nil)
return result, nil
}

Expand Down
7 changes: 7 additions & 0 deletions doc.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,13 @@
// Per-server call timeouts are supported via ServerConfig.CallTimeout; a zero
// value inherits the global default set via WithCallTimeout (30 s by default).
//
// Typed observability is available via the Metrics interface (RecordCall,
// RecordToolList). Register an implementation via WithMetrics; the default
// is a no-op. Panics inside Metrics methods are recovered by the library.
// The MCP handshake automatically advertises the consuming module's real
// version (read from build info); it falls back to "dev" when build info is
// unavailable. Override both name and version with WithClientIdentity.
//
// The library is logger-agnostic via the Logger interface (4 methods).
// Adapters for go.uber.org/zap and log/slog are provided as separate
// packages under log/zaplog and log/sloglog so the core stays
Expand Down
38 changes: 38 additions & 0 deletions metrics.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
package mcpx

import "time"

// Metrics is an optional observability sink for the multiplexer. Implement
// this interface to receive call-level and tool-list events and forward them
// to any backend (Prometheus, OpenTelemetry, statsd, …). Register via
// [WithMetrics].
//
// Implementations must be safe for concurrent use. Panics inside any method
// are recovered by the library and do not propagate to callers.
type Metrics interface {
// RecordCall is invoked after every [Multiplexer.CallTool] invocation.
// dur is the wall-clock time of the upstream MCP call only (argument
// validation and hook overhead are excluded).
// err is nil on success and matches the error returned to the caller.
RecordCall(server, tool string, dur time.Duration, err error)

// RecordToolList is invoked after every successful tool-list fetch —
// both on initial connect and after a live notifications/tools/list_changed
// refresh. count is the number of tools the server currently exposes.
RecordToolList(server string, count int)
}

type nopMetrics struct{}

func (nopMetrics) RecordCall(_ string, _ string, _ time.Duration, _ error) {}
func (nopMetrics) RecordToolList(_ string, _ int) {}

func safeRecordCall(m Metrics, server, tool string, dur time.Duration, err error) {
defer func() { recover() }() //nolint:errcheck
m.RecordCall(server, tool, dur, err)
}

func safeRecordToolList(m Metrics, server string, count int) {
defer func() { recover() }() //nolint:errcheck
m.RecordToolList(server, count)
}
216 changes: 216 additions & 0 deletions metrics_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,216 @@
package mcpx_test

import (
"context"
"encoding/json"
"net/http/httptest"
"sync"
"sync/atomic"
"testing"
"time"

"github.com/stretchr/testify/require"

mcpx "github.com/inhuman/mcp-multiplexer"
"github.com/inhuman/mcp-multiplexer/internal/testutil/mcptest"
)

// captureMetrics is a test Metrics implementation that records all calls.
type captureMetrics struct {
mu sync.Mutex
calls []recordedCall
lists []recordedList
onCall func(server, tool string, dur time.Duration, err error)
onList func(server string, count int)
panicky bool // if true, all methods panic
}

type recordedCall struct {
server, tool string
dur time.Duration
err error
}
type recordedList struct {
server string
count int
}

func (m *captureMetrics) RecordCall(server, tool string, dur time.Duration, err error) {
if m.panicky {
panic("intentional panic in RecordCall")
}
m.mu.Lock()
m.calls = append(m.calls, recordedCall{server, tool, dur, err})
m.mu.Unlock()
if m.onCall != nil {
m.onCall(server, tool, dur, err)
}
}

func (m *captureMetrics) RecordToolList(server string, count int) {
if m.panicky {
panic("intentional panic in RecordToolList")
}
m.mu.Lock()
m.lists = append(m.lists, recordedList{server, count})
m.mu.Unlock()
if m.onList != nil {
m.onList(server, count)
}
}

func (m *captureMetrics) Calls() []recordedCall {
m.mu.Lock()
defer m.mu.Unlock()
return append([]recordedCall(nil), m.calls...)
}

func (m *captureMetrics) Lists() []recordedList {
m.mu.Lock()
defer m.mu.Unlock()
return append([]recordedList(nil), m.lists...)
}

// newMetricsMux creates a multiplexer backed by srv with the given metrics.
func newMetricsMux(t *testing.T, srv *mcptest.Server, m mcpx.Metrics) (*mcpx.Multiplexer, func()) {
t.Helper()
ts := httptest.NewServer(srv.HTTPHandler())
mx, err := mcpx.New(t.Context(), mcpx.MultiplexerConfig{
Servers: []mcpx.ServerConfig{{Name: "s", Transport: mcpx.TransportHTTP, URL: ts.URL}},
}, mcpx.WithHTTPRetryMax(0), mcpx.WithMetrics(m))
require.NoError(t, err)
return mx, func() {
mx.Close()
ts.Close()
srv.Close()
}
}

func TestMetrics_RecordCall_Success(t *testing.T) {
srv := mcptest.NewServer(mcptest.WithTool(mcptest.ToolSpec{
Name: "echo",
Handler: func(_ context.Context, _ map[string]any) (string, error) { return "ok", nil },
}))
m := &captureMetrics{}
mx, cleanup := newMetricsMux(t, srv, m)
defer cleanup()

_, err := mx.CallTool(t.Context(), "s", "echo", nil)
require.NoError(t, err)

calls := m.Calls()
require.Len(t, calls, 1)
require.Equal(t, "s", calls[0].server)
require.Equal(t, "echo", calls[0].tool)
require.Positive(t, calls[0].dur)
require.NoError(t, calls[0].err)
}

func TestMetrics_RecordCall_Error(t *testing.T) {
srv := mcptest.NewServer(mcptest.WithTool(mcptest.ToolSpec{Name: "echo"}))
m := &captureMetrics{}
mx, cleanup := newMetricsMux(t, srv, m)
defer cleanup()

// tool exists but we call a non-existent one to trigger ErrToolNotFound
_, err := mx.CallTool(t.Context(), "s", "missing", nil)
require.Error(t, err)

calls := m.Calls()
require.Len(t, calls, 1)
require.Equal(t, "s", calls[0].server)
require.Equal(t, "missing", calls[0].tool)
// ErrToolNotFound is returned before any RPC so dur should be 0 (call
// never reached the transport). But the metric must still be recorded.
require.Error(t, calls[0].err)
}

func TestMetrics_RecordToolList(t *testing.T) {
srv := mcptest.NewServer(
mcptest.WithTool(mcptest.ToolSpec{Name: "a"}),
mcptest.WithTool(mcptest.ToolSpec{Name: "b"}),
mcptest.WithTool(mcptest.ToolSpec{Name: "c"}),
)
m := &captureMetrics{}
_, cleanup := newMetricsMux(t, srv, m)
defer cleanup()

lists := m.Lists()
require.Len(t, lists, 1)
require.Equal(t, "s", lists[0].server)
require.Equal(t, 3, lists[0].count)
}

func TestMetrics_PanicRecovered(t *testing.T) {
srv := mcptest.NewServer(mcptest.WithTool(mcptest.ToolSpec{
Name: "echo",
Handler: func(_ context.Context, _ map[string]any) (string, error) { return "ok", nil },
}))
m := &captureMetrics{panicky: true}

ts := httptest.NewServer(srv.HTTPHandler())
defer ts.Close()
defer srv.Close()

mx, err := mcpx.New(t.Context(), mcpx.MultiplexerConfig{
Servers: []mcpx.ServerConfig{{Name: "s", Transport: mcpx.TransportHTTP, URL: ts.URL}},
}, mcpx.WithHTTPRetryMax(0), mcpx.WithMetrics(m))
require.NoError(t, err) // RecordToolList panics during connect but must not propagate
defer mx.Close()

// RecordCall panics but call must still succeed
result, err := mx.CallTool(t.Context(), "s", "echo", nil)
require.NoError(t, err)
require.Equal(t, "ok", result.Text)
}

func TestMetrics_NilNoOp(t *testing.T) {
srv := mcptest.NewServer(mcptest.WithTool(mcptest.ToolSpec{
Name: "echo",
Handler: func(_ context.Context, _ map[string]any) (string, error) { return "ok", nil },
}))

ts := httptest.NewServer(srv.HTTPHandler())
defer ts.Close()
defer srv.Close()

// WithMetrics(nil) must not panic and must behave like no option was passed.
mx, err := mcpx.New(t.Context(), mcpx.MultiplexerConfig{
Servers: []mcpx.ServerConfig{{Name: "s", Transport: mcpx.TransportHTTP, URL: ts.URL}},
}, mcpx.WithHTTPRetryMax(0), mcpx.WithMetrics(nil))
require.NoError(t, err)
defer mx.Close()

result, err := mx.CallTool(t.Context(), "s", "echo", nil)
require.NoError(t, err)
require.Equal(t, "ok", result.Text)
}

func TestMetrics_WithAfterCallCoexists(t *testing.T) {
srv := mcptest.NewServer(mcptest.WithTool(mcptest.ToolSpec{
Name: "echo",
Handler: func(_ context.Context, _ map[string]any) (string, error) { return "ok", nil },
}))
m := &captureMetrics{}

var afterCallFired atomic.Bool
hook := mcpx.AfterCallHook(func(_ context.Context, _ string, _ mcpx.ToolInfo, _ json.RawMessage, _ *mcpx.CallResult, _ error) {
afterCallFired.Store(true)
})

ts := httptest.NewServer(srv.HTTPHandler())
defer ts.Close()
defer srv.Close()

mx, err := mcpx.New(t.Context(), mcpx.MultiplexerConfig{
Servers: []mcpx.ServerConfig{{Name: "s", Transport: mcpx.TransportHTTP, URL: ts.URL}},
}, mcpx.WithHTTPRetryMax(0), mcpx.WithMetrics(m), mcpx.WithAfterCall(hook))
require.NoError(t, err)
defer mx.Close()

_, err = mx.CallTool(t.Context(), "s", "echo", nil)
require.NoError(t, err)

require.True(t, afterCallFired.Load(), "AfterCallHook must fire alongside Metrics")
require.Len(t, m.Calls(), 1, "Metrics.RecordCall must fire alongside AfterCallHook")
}
1 change: 1 addition & 0 deletions multiplexer.go
Original file line number Diff line number Diff line change
Expand Up @@ -352,5 +352,6 @@ func (mx *Multiplexer) fetchTools(ctx context.Context, serverName string, sessio
}
infos = append(infos, info)
}
safeRecordToolList(mx.opts.metrics, serverName, len(infos))
return infos, nil
}
Loading
Loading