-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathapi.go
More file actions
157 lines (136 loc) · 4.42 KB
/
api.go
File metadata and controls
157 lines (136 loc) · 4.42 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
// SPDX-License-Identifier: EUPL-1.2
// Remote communication primitive for the Core framework.
// API manages named streams to remote endpoints. The transport protocol
// (HTTP, WebSocket, SSE, MCP, TCP) is handled by protocol handlers
// registered by consumer packages.
//
// Drive is the phone book (WHERE to connect).
// API is the phone (HOW to connect).
//
// Usage:
//
// // Configure endpoint
// c.Drive().New(core.NewOptions(
// core.Option{Key: "name", Value: "charon"},
// core.Option{Key: "transport", Value: "http://10.69.69.165:9101/mcp"},
// ))
//
// // Open stream
// s := c.API().Stream("charon")
// if s.OK { stream := s.Value.(Stream) }
//
// // Remote Action dispatch
// r := c.API().Call("charon", "agentic.status", opts)
package core
import "context"
// Stream is a bidirectional connection to a remote endpoint.
// Consumers implement this for each transport protocol.
//
// type httpStream struct { ... }
// func (s *httpStream) Send(data []byte) error { ... }
// func (s *httpStream) Receive() ([]byte, error) { ... }
// func (s *httpStream) Close() error { ... }
type Stream interface {
Send(data []byte) error
Receive() ([]byte, error)
Close() error
}
// StreamFactory creates a Stream from a DriveHandle's transport config.
// Registered per-protocol by consumer packages.
type StreamFactory func(handle *DriveHandle) (Stream, error)
// API manages remote streams and protocol handlers.
type API struct {
core *Core
protocols *Registry[StreamFactory]
}
// API returns the remote communication primitive.
//
// c.API().Stream("charon")
func (c *Core) API() *API {
return c.api
}
// RegisterProtocol registers a stream factory for a URL scheme.
// Consumer packages call this during OnStartup.
//
// c.API().RegisterProtocol("http", httpStreamFactory)
// c.API().RegisterProtocol("https", httpStreamFactory)
// c.API().RegisterProtocol("mcp", mcpStreamFactory)
func (a *API) RegisterProtocol(scheme string, factory StreamFactory) {
a.protocols.Set(scheme, factory)
}
// Stream opens a connection to a named endpoint.
// Looks up the endpoint in Drive, extracts the protocol from the transport URL,
// and delegates to the registered protocol handler.
//
// r := c.API().Stream("charon")
// if r.OK { stream := r.Value.(Stream) }
func (a *API) Stream(name string) Result {
r := a.core.Drive().Get(name)
if !r.OK {
return Result{E("api.Stream", Concat("endpoint not found in Drive: ", name), nil), false}
}
handle := r.Value.(*DriveHandle)
scheme := extractScheme(handle.Transport)
fr := a.protocols.Get(scheme)
if !fr.OK {
return Result{E("api.Stream", Concat("no protocol handler for scheme: ", scheme), nil), false}
}
factory := fr.Value.(StreamFactory)
stream, err := factory(handle)
if err != nil {
return Result{err, false}
}
return Result{stream, true}
}
// Call invokes a named Action on a remote endpoint.
// This is the remote equivalent of c.Action("name").Run(ctx, opts).
//
// r := c.API().Call("charon", "agentic.status", opts)
func (a *API) Call(endpoint string, action string, opts Options) Result {
r := a.Stream(endpoint)
if !r.OK {
return r
}
stream := r.Value.(Stream)
defer stream.Close()
// Encode the action call as JSON-RPC (MCP compatible)
payload := Concat(`{"action":"`, action, `","options":`, JSONMarshalString(opts), `}`)
if err := stream.Send([]byte(payload)); err != nil {
return Result{err, false}
}
response, err := stream.Receive()
if err != nil {
return Result{err, false}
}
return Result{string(response), true}
}
// Protocols returns all registered protocol scheme names.
func (a *API) Protocols() []string {
return a.protocols.Names()
}
// extractScheme pulls the protocol from a transport URL.
// "http://host:port/path" → "http"
// "mcp://host:port" → "mcp"
func extractScheme(transport string) string {
for i, c := range transport {
if c == ':' {
return transport[:i]
}
}
return transport
}
// RemoteAction resolves "host:action.name" syntax for transparent remote dispatch.
// If the action name contains ":", the prefix is the endpoint and the suffix is the action.
//
// c.Action("charon:agentic.status") // → c.API().Call("charon", "agentic.status", opts)
func (c *Core) RemoteAction(name string, ctx context.Context, opts Options) Result {
for i, ch := range name {
if ch == ':' {
endpoint := name[:i]
action := name[i+1:]
return c.API().Call(endpoint, action, opts)
}
}
// No ":" — local action
return c.Action(name).Run(ctx, opts)
}