diff --git a/.changeset/cli-full-parity.md b/.changeset/cli-full-parity.md new file mode 100644 index 0000000..f25fcd5 --- /dev/null +++ b/.changeset/cli-full-parity.md @@ -0,0 +1,15 @@ +--- +"@resciencelab/agent-world-network": minor +--- + +feat(cli): add join/leave/ping/send commands for full plugin parity + +The standalone AWN CLI (`awn`) now exposes all capabilities available in the OpenClaw plugin: + +- `awn join ` — join a world by ID, slug, or direct address; resolves via Gateway and sends a signed `world.join` P2P message +- `awn leave ` — send `world.leave` and remove from joined list +- `awn joined` — list currently joined worlds +- `awn ping ` — check reachability of a known agent +- `awn send ` — send a signed `chat` P2P message to an agent + +Adds `sign_http_request()` and `build_signed_p2p_message()` helpers to `crypto.rs` (wire-compatible with the TypeScript plugin). The daemon gains a `joined_worlds` state map and five new IPC routes. diff --git a/.changeset/fix-gateway-peer-announce-compat.md b/.changeset/fix-gateway-peer-announce-compat.md new file mode 100644 index 0000000..11e98f9 --- /dev/null +++ b/.changeset/fix-gateway-peer-announce-compat.md @@ -0,0 +1,9 @@ +--- +"@resciencelab/agent-world-sdk": patch +--- + +fix(gateway): add /peer/announce backward-compat route and auto-redeploy on SDK version bump + +- Add `POST /peer/announce` backward-compat route for SDK < 1.4 world containers (returns legacy `{peers:[]}` shape) +- Raise default `STALE_TTL_MS` from 90 s to 15 min to prevent old SDK worlds (10 min announce interval, no heartbeat) from being pruned between announces +- Add `packages/agent-world-sdk/package.json` to `deploy-gateway.yml` path triggers so any SDK minor version bump automatically redeploys the gateway (fixes 403 signature mismatch caused by `PROTOCOL_VERSION` changing without gateway redeploy) diff --git a/.github/workflows/deploy-gateway.yml b/.github/workflows/deploy-gateway.yml index ac0cd4b..90bf095 100644 --- a/.github/workflows/deploy-gateway.yml +++ b/.github/workflows/deploy-gateway.yml @@ -6,6 +6,7 @@ on: paths: - "gateway/**" - "packages/agent-world-sdk/src/**" + - "packages/agent-world-sdk/package.json" workflow_dispatch: concurrency: diff --git a/gateway/server.mjs b/gateway/server.mjs index ac744c9..db62399 100644 --- a/gateway/server.mjs +++ b/gateway/server.mjs @@ -62,7 +62,7 @@ const DEFAULT_HTTP_PORT = parseInt(process.env.HTTP_PORT ?? "8100") const DEFAULT_PUBLIC_ADDR = process.env.PUBLIC_ADDR ?? null const DEFAULT_PUBLIC_URL = process.env.PUBLIC_URL ?? null const DEFAULT_DATA_DIR = process.env.DATA_DIR ?? "/data" -const DEFAULT_STALE_TTL_MS = parseInt(process.env.STALE_TTL_MS ?? String(90 * 1000)) +const DEFAULT_STALE_TTL_MS = parseInt(process.env.STALE_TTL_MS ?? String(15 * 60 * 1000)) const WEBHOOK_URL = process.env.WEBHOOK_URL ?? null const MAX_AGENTS = 500 const REGISTRY_VERSION = 1 @@ -811,6 +811,66 @@ export async function createGatewayApp(opts = {}) { return { ok: true, agents: getAgentsForExchange(20) }; }); + // Backward-compat: SDK versions < 1.4 post to /peer/announce instead of /agents. + // Accepts the same body, registers the same way, but returns the old {peers:[]} shape. + peer.post("/peer/announce", { + schema: { + summary: "Legacy peer announce (SDK < 1.4, maps to POST /agents)", + operationId: "postPeerAnnounce", + tags: ["gateway"], + body: { $ref: "AnnounceRequest#" }, + response: { + 200: { + type: "object", + properties: { peers: { type: "array", items: { $ref: "AgentRecord#" } } }, + }, + 400: { $ref: "Error#" }, + 403: { $ref: "Error#" }, + }, + }, + }, async (req, reply) => { + const ann = req.body; + if (!ann?.publicKey || !ann?.from) return reply.code(400).send({ error: "Invalid announce" }); + + const awSig = req.headers["x-agentworld-signature"]; + if (awSig) { + const authority = req.headers["host"] ?? "localhost"; + const result = verifyHttpRequestHeaders(req.headers, req.method, req.url, authority, req.rawBody, ann.publicKey); + if (!result.ok) return reply.code(403).send({ error: result.error }); + } else { + const { signature, ...signable } = ann; + const domainOk = verifyWithDomainSeparator(DOMAIN_SEPARATORS.ANNOUNCE, ann.publicKey, signable, signature); + if (!domainOk && !verifySignature(ann.publicKey, signable, signature)) { + return reply.code(403).send({ error: "Invalid signature" }); + } + } + + if (agentIdFromPublicKey(ann.publicKey) !== ann.from) { + return reply.code(400).send({ error: "agentId mismatch" }); + } + + const worldCap = Array.isArray(ann.capabilities) + ? ann.capabilities.find((cap) => typeof cap === "string" && cap.startsWith("world:")) + : undefined; + if (worldCap) { + const protocolWorldId = agentIdFromPublicKey(ann.publicKey); + upsertWorld(protocolWorldId, ann.publicKey, { + slug: typeof ann.slug === "string" && ann.slug.length > 0 + ? ann.slug + : worldCap.slice("world:".length) || ann.alias || protocolWorldId, + endpoints: ann.endpoints, + lastSeen: ann.timestamp, + persist: true, + }); + } else { + upsertAgent(ann.from, ann.publicKey, { + alias: ann.alias, endpoints: ann.endpoints, capabilities: ann.capabilities, persist: true, + }); + } + // Return legacy shape: {peers:[...]} instead of {ok, agents:[...]} + return { peers: getAgentsForExchange(20) }; + }); + peer.post("/agents/:agentId/heartbeat", { schema: { summary: "Lightweight liveness heartbeat", diff --git a/packages/awn-cli/Cargo.lock b/packages/awn-cli/Cargo.lock index e1d557f..b60d949 100644 --- a/packages/awn-cli/Cargo.lock +++ b/packages/awn-cli/Cargo.lock @@ -105,7 +105,7 @@ checksum = "c08606f8c3cbf4ce6ec8e28fb0014a2c086708fe954eaa885384a6165172e7e8" [[package]] name = "awn" -version = "1.4.0" +version = "1.5.1" dependencies = [ "assert_cmd", "axum", diff --git a/packages/awn-cli/skills/SKILL.md b/packages/awn-cli/skills/SKILL.md index c7abad8..01e5e77 100644 --- a/packages/awn-cli/skills/SKILL.md +++ b/packages/awn-cli/skills/SKILL.md @@ -63,6 +63,21 @@ awn worlds # list available worlds from Gateway awn world # get detailed info about a specific world ``` +### World membership + +``` +awn join # join a world +awn joined # list currently joined worlds +awn leave # leave a world +``` + +### P2P communication + +``` +awn ping # check agent reachability and latency +awn send "message" # send a signed P2P message +``` + ### JSON output (for agents) All commands support `--json` for structured, machine-readable output: @@ -72,6 +87,8 @@ awn --json status awn --json worlds awn --json world awn --json agents --capability world: +awn --json joined +awn --json ping ``` ## Command Groups @@ -92,15 +109,31 @@ awn --json agents --capability world: | `worlds` | List available worlds from Gateway + local cache | | `world ` | Get detailed info about a specific world including manifest and available actions | +### world membership + +| Command | Description | +|---------|-------------| +| `join ` | Join a world; resolves via Gateway or connects directly | +| `joined` | List currently joined worlds | +| `leave ` | Send `world.leave` and remove world from joined list | + +### messaging + +| Command | Description | +|---------|-------------| +| `ping ` | Check if an agent is reachable; reports latency | +| `send ` | Send a signed `chat` P2P message to an agent | + ## For AI Agents When using this CLI programmatically: 1. **Always use `--json` flag** for parseable output 2. **Start daemon first**: `awn daemon start` -3. **Workflow**: `awn worlds` → `awn world ` (view actions) → `awn join ` → `awn action ` +3. **Workflow**: `awn worlds` → `awn world ` (view manifest/actions) → `awn join ` → `awn agents` → `awn send "msg"` 4. **Check return codes** — 0 for success, non-zero for errors 5. **Parse stderr** for error messages on failure +6. **Join before messaging** — agent endpoints are only discovered on world join ### Discovering world capabilities diff --git a/packages/awn-cli/src/crypto.rs b/packages/awn-cli/src/crypto.rs index fb33769..7630fbb 100644 --- a/packages/awn-cli/src/crypto.rs +++ b/packages/awn-cli/src/crypto.rs @@ -1,5 +1,6 @@ use base64::engine::general_purpose::STANDARD as B64; use base64::Engine; + use ed25519_dalek::{Signer, SigningKey, Verifier, VerifyingKey}; use serde_json::Value; use sha2::{Digest, Sha256}; @@ -144,6 +145,161 @@ pub fn compute_content_digest(body: &str) -> String { format!("sha-256=:{}:", B64.encode(hash)) } +/// The six AgentWorld HTTP request headers returned by `sign_http_request`. +pub struct HttpRequestHeaders { + pub version: String, + pub from_agent: String, + pub key_id: String, + pub timestamp: String, + pub content_digest: String, + pub signature: String, +} + +/// Build and sign the AgentWorld request headers for an outbound HTTP call. +/// Wire-compatible with the TS `signHttpRequest()`. +pub fn sign_http_request( + identity: &crate::identity::Identity, + method: &str, + authority: &str, + path: &str, + body: &str, +) -> HttpRequestHeaders { + let ts = chrono::Utc::now().to_rfc3339_opts(chrono::SecondsFormat::Millis, false); + let kid = "#identity"; + let content_digest = compute_content_digest(body); + let signing_input = serde_json::json!({ + "v": PROTOCOL_VERSION, + "from": identity.agent_id, + "kid": kid, + "ts": ts, + "method": method.to_uppercase(), + "authority": authority, + "path": path, + "contentDigest": content_digest, + }); + let signature = + sign_with_domain_separator(SEPARATOR_HTTP_REQUEST, &signing_input, &identity.signing_key); + HttpRequestHeaders { + version: PROTOCOL_VERSION.to_string(), + from_agent: identity.agent_id.clone(), + key_id: kid.to_string(), + timestamp: ts, + content_digest, + signature, + } +} + +/// Build a signed P2P message payload ready for POST to `/peer/message`. +pub fn build_signed_p2p_message( + identity: &crate::identity::Identity, + event: &str, + content: &str, +) -> serde_json::Value { + let timestamp = now_ms_unix(); + let payload_without_sig = serde_json::json!({ + "from": identity.agent_id, + "publicKey": identity.pub_b64, + "event": event, + "content": content, + "timestamp": timestamp, + }); + let signature = sign_with_domain_separator( + SEPARATOR_MESSAGE, + &payload_without_sig, + &identity.signing_key, + ); + let mut msg = payload_without_sig; + msg["signature"] = serde_json::Value::String(signature); + msg +} + +fn now_ms_unix() -> u64 { + std::time::SystemTime::now() + .duration_since(std::time::UNIX_EPOCH) + .unwrap() + .as_millis() as u64 +} + +const MAX_CLOCK_SKEW_MS: u64 = 5 * 60 * 1000; + +/// The AgentWorld HTTP response headers produced by `sign_http_response`. +pub struct HttpResponseHeaders { + pub version: String, + pub from_agent: String, + pub key_id: String, + pub timestamp: String, + pub content_digest: String, + pub signature: String, +} + +/// Sign an outbound HTTP response. Wire-compatible with TS `signHttpResponse()`. +pub fn sign_http_response( + identity: &crate::identity::Identity, + status: u16, + body: &str, +) -> HttpResponseHeaders { + let ts = chrono::Utc::now().to_rfc3339_opts(chrono::SecondsFormat::Millis, false); + let kid = "#identity"; + let content_digest = compute_content_digest(body); + let signing_input = serde_json::json!({ + "v": PROTOCOL_VERSION, + "from": identity.agent_id, + "kid": kid, + "ts": ts, + "status": status, + "contentDigest": content_digest, + }); + let signature = sign_with_domain_separator( + SEPARATOR_HTTP_RESPONSE, + &signing_input, + &identity.signing_key, + ); + HttpResponseHeaders { + version: PROTOCOL_VERSION.to_string(), + from_agent: identity.agent_id.clone(), + key_id: kid.to_string(), + timestamp: ts, + content_digest, + signature, + } +} + +/// Verify an inbound signed HTTP response. Returns false on any failure. +/// Wire-compatible with TS `verifyHttpResponseHeaders()`. +pub fn verify_http_response( + version: &str, + from: &str, + key_id: &str, + timestamp: &str, + content_digest_header: &str, + signature: &str, + status: u16, + body: &str, + public_key_b64: &str, +) -> bool { + if let Ok(ts) = chrono::DateTime::parse_from_rfc3339(timestamp) { + let skew = now_ms_unix().abs_diff(ts.timestamp_millis() as u64); + if skew > MAX_CLOCK_SKEW_MS { + return false; + } + } else { + return false; + } + if compute_content_digest(body) != content_digest_header { + return false; + } + let signing_input = serde_json::json!({ + "v": version, + "from": from, + "kid": key_id, + "ts": timestamp, + "status": status, + "contentDigest": content_digest_header, + }); + verify_with_domain_separator(SEPARATOR_HTTP_RESPONSE, public_key_b64, &signing_input, signature) + .unwrap_or(false) +} + #[derive(Debug, thiserror::Error)] pub enum CryptoError { #[error("invalid base64 encoding")] diff --git a/packages/awn-cli/src/daemon.rs b/packages/awn-cli/src/daemon.rs index 9950f0d..a235aea 100644 --- a/packages/awn-cli/src/daemon.rs +++ b/packages/awn-cli/src/daemon.rs @@ -3,25 +3,50 @@ use axum::http::StatusCode; use axum::routing::{get, post}; use axum::{Json, Router}; use serde::{Deserialize, Serialize}; +use std::collections::HashMap; use std::net::SocketAddr; use std::path::PathBuf; use std::sync::{Arc, Mutex}; use tokio::sync::oneshot; +use crate::agent_db::{AgentDb, AgentRecord, Endpoint}; +use crate::crypto::{ + build_signed_p2p_message, sign_http_request, sign_http_response, verify_http_response, +}; use crate::identity::{self, Identity}; -use crate::agent_db::{Endpoint, AgentDb, AgentRecord}; const DEFAULT_IPC_PORT: u16 = 8199; const PORT_FILE: &str = "daemon.port"; const PID_FILE: &str = "daemon.pid"; +#[derive(Clone, Serialize, Deserialize)] +pub struct JoinedWorld { + #[serde(rename = "worldId")] + pub world_id: String, + #[serde(skip_serializing_if = "Option::is_none")] + pub slug: Option, + pub name: String, + pub address: String, + pub port: u16, + #[serde(rename = "publicKey")] + pub public_key: String, + #[serde(rename = "agentId")] + pub agent_id: String, + #[serde(rename = "joinedAt")] + pub joined_at: u64, +} + #[derive(Clone)] pub struct DaemonState { pub identity: Identity, pub agent_db: Arc>, + pub joined_worlds: Arc>>, + pub received_messages: Arc>>, pub data_dir: PathBuf, pub gateway_url: String, pub listen_port: u16, + /// Address to advertise in world.join endpoint payload (e.g. a public IP or VPN address). + pub advertise_address: Option, } #[derive(Serialize, Deserialize)] @@ -113,9 +138,30 @@ pub struct OkResponse { pub message: Option, } +#[derive(Serialize, Deserialize)] +pub struct JoinedWorldsResponse { + pub worlds: Vec, +} + +#[derive(Serialize, Deserialize)] +pub struct PingResponse { + pub ok: bool, + #[serde(skip_serializing_if = "Option::is_none")] + pub latency_ms: Option, + #[serde(skip_serializing_if = "Option::is_none")] + pub error: Option, +} + +#[derive(Deserialize)] +pub struct SendMessageBody { + pub agent_id: String, + pub message: String, +} + pub struct DaemonHandle { shutdown_tx: oneshot::Sender<()>, pub addr: SocketAddr, + pub peer_addr: SocketAddr, } impl DaemonHandle { @@ -124,33 +170,53 @@ impl DaemonHandle { } } +const MEMBER_REFRESH_SECS: u64 = 30; + pub async fn start_daemon( data_dir: PathBuf, gateway_url: String, listen_port: u16, ipc_port: u16, + advertise_address: Option, ) -> Result { let identity = identity::load_or_create_identity(&data_dir, "identity") .map_err(|e| DaemonError::Identity(e.to_string()))?; let agent_db = AgentDb::open(&data_dir); + // Bind peer listener first so we know the actual port before building state. + let peer_bind = SocketAddr::from(([0, 0, 0, 0], listen_port)); + let peer_listener = tokio::net::TcpListener::bind(peer_bind) + .await + .map_err(|e| DaemonError::Bind(format!("peer port {listen_port}: {e}")))?; + let bound_peer_addr = peer_listener.local_addr().unwrap(); + let state = DaemonState { identity, agent_db: Arc::new(Mutex::new(agent_db)), + joined_worlds: Arc::new(Mutex::new(HashMap::new())), + received_messages: Arc::new(Mutex::new(std::collections::VecDeque::with_capacity(100))), data_dir, gateway_url, - listen_port, + listen_port: bound_peer_addr.port(), // actual bound port (may differ when 0 requested) + advertise_address, }; let (shutdown_tx, shutdown_rx) = oneshot::channel(); let (ipc_shutdown_tx, ipc_shutdown_rx) = oneshot::channel::<()>(); - let app = Router::new() + // ── IPC server (loopback) ─────────────────────────────────────────────── + let ipc_app = Router::new() .route("/ipc/status", get(handle_status)) .route("/ipc/agents", get(handle_agents)) .route("/ipc/worlds", get(handle_worlds)) .route("/ipc/world/{world_id}", get(handle_world_info)) .route("/ipc/ping", get(handle_ping)) + .route("/ipc/joined", get(handle_joined_worlds)) + .route("/ipc/join/{world_id}", post(handle_join_world)) + .route("/ipc/leave/{world_id}", post(handle_leave_world)) + .route("/ipc/peer/ping/{agent_id}", get(handle_ping_agent)) + .route("/ipc/send", post(handle_send_message)) + .route("/ipc/messages", get(handle_messages)) .route( "/ipc/shutdown", post({ @@ -169,16 +235,34 @@ pub async fn start_daemon( } }), ) - .with_state(state); + .with_state(state.clone()); - let addr = SocketAddr::from(([127, 0, 0, 1], ipc_port)); - let listener = tokio::net::TcpListener::bind(addr) + let ipc_addr = SocketAddr::from(([127, 0, 0, 1], ipc_port)); + let ipc_listener = tokio::net::TcpListener::bind(ipc_addr) .await .map_err(|e| DaemonError::Bind(e.to_string()))?; - let bound_addr = listener.local_addr().unwrap(); + let bound_ipc_addr = ipc_listener.local_addr().unwrap(); + + // ── Peer server (all interfaces) ──────────────────────────────────────── + let peer_app = Router::new() + .route("/peer/ping", get(handle_peer_ping)) + .route("/peer/message", post(handle_peer_message)) + .with_state(state.clone()); + // ── Background: member refresh every 30 s ─────────────────────────────── + let refresh_state = state.clone(); tokio::spawn(async move { - axum::serve(listener, app) + let mut ticker = + tokio::time::interval(std::time::Duration::from_secs(MEMBER_REFRESH_SECS)); + ticker.tick().await; // skip first immediate tick + loop { + ticker.tick().await; + do_refresh_world_members(&refresh_state).await; + } + }); + + tokio::spawn(async move { + axum::serve(ipc_listener, ipc_app) .with_graceful_shutdown(async { tokio::select! { _ = shutdown_rx => {} @@ -189,9 +273,14 @@ pub async fn start_daemon( .ok(); }); + tokio::spawn(async move { + axum::serve(peer_listener, peer_app).await.ok(); + }); + Ok(DaemonHandle { shutdown_tx, - addr: bound_addr, + addr: bound_ipc_addr, + peer_addr: bound_peer_addr, }) } @@ -386,6 +475,602 @@ async fn handle_ping() -> Json { }) } +// ── Peer server handlers ──────────────────────────────────────────────────── + +async fn handle_peer_ping(State(state): State) -> axum::response::Response { + let body = serde_json::json!({ + "ok": true, + "agentId": state.identity.agent_id, + "publicKey": state.identity.pub_b64, + }) + .to_string(); + let h = sign_http_response(&state.identity, 200, &body); + axum::response::Response::builder() + .status(200) + .header("Content-Type", "application/json") + .header("X-AgentWorld-Version", h.version) + .header("X-AgentWorld-From", h.from_agent) + .header("X-AgentWorld-KeyId", h.key_id) + .header("X-AgentWorld-Timestamp", h.timestamp) + .header("Content-Digest", h.content_digest) + .header("X-AgentWorld-Signature", h.signature) + .body(axum::body::Body::from(body)) + .unwrap() +} + +async fn handle_peer_message( + State(state): State, + body: String, +) -> axum::response::Response { + if let Ok(msg) = serde_json::from_str::(&body) { + if msg.get("from").is_some() && msg.get("event").is_some() && msg.get("signature").is_some() { + let mut msgs = state.received_messages.lock().unwrap(); + if msgs.len() >= 100 { + msgs.pop_front(); + } + msgs.push_back(msg); + } + } + let resp_body = serde_json::json!({"ok": true}).to_string(); + let h = sign_http_response(&state.identity, 200, &resp_body); + axum::response::Response::builder() + .status(200) + .header("Content-Type", "application/json") + .header("X-AgentWorld-Version", h.version) + .header("X-AgentWorld-From", h.from_agent) + .header("X-AgentWorld-KeyId", h.key_id) + .header("X-AgentWorld-Timestamp", h.timestamp) + .header("Content-Digest", h.content_digest) + .header("X-AgentWorld-Signature", h.signature) + .body(axum::body::Body::from(resp_body)) + .unwrap() +} + +// ── IPC: received messages ────────────────────────────────────────────────── + +async fn handle_messages(State(state): State) -> Json { + let msgs: Vec<_> = state.received_messages.lock().unwrap().iter().cloned().collect(); + Json(serde_json::json!({ "messages": msgs })) +} + +// ── Background: member refresh ────────────────────────────────────────────── + +async fn do_refresh_world_members(state: &DaemonState) { + let worlds: Vec = + state.joined_worlds.lock().unwrap().values().cloned().collect(); + let client = reqwest::Client::builder() + .timeout(std::time::Duration::from_secs(10)) + .build() + .unwrap_or_default(); + + for world in worlds { + let is_ipv6 = world.address.contains(':') && !world.address.contains('.'); + let host = if is_ipv6 { + format!("[{}]:{}", world.address, world.port) + } else { + format!("{}:{}", world.address, world.port) + }; + let url = format!("http://{}/world/members", host); + let hdrs = sign_http_request(&state.identity, "GET", &host, "/world/members", ""); + + let result = client + .get(&url) + .header("X-AgentWorld-Version", &hdrs.version) + .header("X-AgentWorld-From", &hdrs.from_agent) + .header("X-AgentWorld-KeyId", &hdrs.key_id) + .header("X-AgentWorld-Timestamp", &hdrs.timestamp) + .header("Content-Digest", &hdrs.content_digest) + .header("X-AgentWorld-Signature", &hdrs.signature) + .send() + .await; + + match result { + Ok(r) if r.status().as_u16() == 403 || r.status().as_u16() == 404 => { + state.joined_worlds.lock().unwrap().remove(&world.world_id); + } + Ok(r) if r.status().is_success() => { + if let Ok(data) = r.json::().await { + if let Some(members) = data.get("members").and_then(|m| m.as_array()) { + let mut db = state.agent_db.lock().unwrap(); + for member in members { + let Some(aid) = member.get("agentId").and_then(|v| v.as_str()) else { + continue; + }; + if aid == state.identity.agent_id { + continue; + } + let alias = member.get("alias").and_then(|v| v.as_str()); + let endpoints: Vec = member + .get("endpoints") + .and_then(|v| serde_json::from_value(v.clone()).ok()) + .unwrap_or_default(); + db.upsert(aid, "", alias, Some(endpoints), None, Some("gossip"), None); + } + } + } + } + _ => {} + } + } +} + +// ── IPC: joined worlds ────────────────────────────────────────────────────── + +async fn handle_joined_worlds(State(state): State) -> Json { + let worlds = state + .joined_worlds + .lock() + .unwrap() + .values() + .cloned() + .collect(); + Json(JoinedWorldsResponse { worlds }) +} + +async fn handle_join_world( + State(state): State, + Path(world_id): Path, +) -> Result, StatusCode> { + let client = reqwest::Client::builder() + .timeout(std::time::Duration::from_secs(10)) + .build() + .unwrap(); + + // Resolve world: try direct worldId first, then search by slug + let (address, port, public_key, resolved_world_id, slug) = + resolve_world(&client, &state.gateway_url, &world_id) + .await + .map_err(|_| StatusCode::NOT_FOUND)?; + + // Build join P2P message — include our peer endpoint so the world server + // can reach us for broadcasts and member refreshes. + let advertise_addr = state + .advertise_address + .as_deref() + .unwrap_or("127.0.0.1") + .to_string(); + let content = serde_json::json!({ + "endpoints": [{ + "transport": "tcp", + "address": advertise_addr, + "port": state.listen_port, + "priority": 1, + "ttl": 3600 + }], + "alias": "" + }) + .to_string(); + let msg = build_signed_p2p_message(&state.identity, "world.join", &content); + + // Send to world server + let is_ipv6 = address.contains(':') && !address.contains('.'); + let host = if is_ipv6 { + format!("[{}]:{}", address, port) + } else { + format!("{}:{}", address, port) + }; + let url = format!("http://{}/peer/message", host); + let body = msg.to_string(); + let headers = sign_http_request(&state.identity, "POST", &host, "/peer/message", &body); + + let resp = client + .post(&url) + .header("Content-Type", "application/json") + .header("X-AgentWorld-Version", &headers.version) + .header("X-AgentWorld-From", &headers.from_agent) + .header("X-AgentWorld-KeyId", &headers.key_id) + .header("X-AgentWorld-Timestamp", &headers.timestamp) + .header("Content-Digest", &headers.content_digest) + .header("X-AgentWorld-Signature", &headers.signature) + .body(body) + .send() + .await + .map_err(|_| StatusCode::BAD_GATEWAY)?; + + if !resp.status().is_success() { + return Err(StatusCode::BAD_GATEWAY); + } + + let resp_data: serde_json::Value = resp.json().await.unwrap_or(serde_json::json!({})); + + // Extract manifest name for display + let name = resp_data + .get("manifest") + .and_then(|m| m.get("name")) + .and_then(|n| n.as_str()) + .unwrap_or(slug.as_deref().unwrap_or(&resolved_world_id)) + .to_string(); + + // Store joined world + let joined = JoinedWorld { + world_id: resolved_world_id.clone(), + slug: slug.clone(), + name: name.clone(), + address: address.clone(), + port, + public_key: public_key.clone(), + agent_id: public_key_to_agent_id(&public_key), + joined_at: now_ms(), + }; + state + .joined_worlds + .lock() + .unwrap() + .insert(resolved_world_id.clone(), joined); + + // Store co-members in agent_db + if let Some(members) = resp_data.get("members").and_then(|m| m.as_array()) { + let mut db = state.agent_db.lock().unwrap(); + for member in members { + if let Some(agent_id) = member.get("agentId").and_then(|v| v.as_str()) { + if agent_id == state.identity.agent_id { + continue; + } + let alias = member + .get("alias") + .and_then(|v| v.as_str()) + .map(|s| s.to_string()); + let endpoints: Vec = member + .get("endpoints") + .and_then(|v| serde_json::from_value(v.clone()).ok()) + .unwrap_or_default(); + db.upsert( + agent_id, + "", + alias.as_deref(), + Some(endpoints), + None, + Some("gossip"), + None, + ); + } + } + } + + let member_count = resp_data + .get("members") + .and_then(|m| m.as_array()) + .map(|a| a.len()) + .unwrap_or(0); + + Ok(Json(serde_json::json!({ + "ok": true, + "worldId": resolved_world_id, + "slug": slug, + "name": name, + "members": member_count, + "manifest": resp_data.get("manifest"), + }))) +} + +async fn handle_leave_world( + State(state): State, + Path(world_id): Path, +) -> Result, StatusCode> { + let joined = state + .joined_worlds + .lock() + .unwrap() + .get(&world_id) + .or_else(|| { + // Also try by slug — not directly possible with .get(), handled below + None + }) + .cloned(); + + // Also search by slug + let joined = if joined.is_none() { + state + .joined_worlds + .lock() + .unwrap() + .values() + .find(|w| w.slug.as_deref() == Some(&world_id)) + .cloned() + } else { + joined + }; + + let info = joined.ok_or(StatusCode::NOT_FOUND)?; + + let client = reqwest::Client::builder() + .timeout(std::time::Duration::from_secs(5)) + .build() + .unwrap(); + + let msg = build_signed_p2p_message(&state.identity, "world.leave", ""); + let is_ipv6 = info.address.contains(':') && !info.address.contains('.'); + let host = if is_ipv6 { + format!("[{}]:{}", info.address, info.port) + } else { + format!("{}:{}", info.address, info.port) + }; + let url = format!("http://{}/peer/message", host); + let body = msg.to_string(); + let headers = sign_http_request(&state.identity, "POST", &host, "/peer/message", &body); + + // Best-effort — don't fail if server is unreachable + let _ = client + .post(&url) + .header("Content-Type", "application/json") + .header("X-AgentWorld-Version", &headers.version) + .header("X-AgentWorld-From", &headers.from_agent) + .header("X-AgentWorld-KeyId", &headers.key_id) + .header("X-AgentWorld-Timestamp", &headers.timestamp) + .header("Content-Digest", &headers.content_digest) + .header("X-AgentWorld-Signature", &headers.signature) + .body(body) + .send() + .await; + + state + .joined_worlds + .lock() + .unwrap() + .remove(&info.world_id); + + Ok(Json(OkResponse { + ok: true, + message: Some(format!("Left world {}", info.world_id)), + })) +} + +async fn handle_ping_agent( + State(state): State, + Path(agent_id): Path, +) -> Json { + let endpoints = { + let db = state.agent_db.lock().unwrap(); + db.get(&agent_id) + .map(|r| r.endpoints.clone()) + .unwrap_or_default() + }; + + if endpoints.is_empty() { + return Json(PingResponse { + ok: false, + latency_ms: None, + error: Some("No known endpoints for agent".to_string()), + }); + } + + let client = reqwest::Client::builder() + .timeout(std::time::Duration::from_secs(5)) + .build() + .unwrap(); + + let mut sorted = endpoints; + sorted.sort_by_key(|e| e.priority); + + for ep in &sorted { + if ep.transport != "tcp" && ep.transport != "http" { + continue; + } + let is_ipv6 = ep.address.contains(':') && !ep.address.contains('.'); + let host = if is_ipv6 { + format!("[{}]:{}", ep.address, ep.port) + } else { + format!("{}:{}", ep.address, ep.port) + }; + let url = format!("http://{}/peer/ping", host); + let start = std::time::Instant::now(); + let Ok(resp) = client.get(&url).send().await else { continue }; + if !resp.status().is_success() { + continue; + } + let latency = start.elapsed().as_millis() as u64; + + // Verify signed response when we have the agent's public key. + let pub_key = { + let db = state.agent_db.lock().unwrap(); + db.get(&agent_id).map(|r| r.public_key.clone()).unwrap_or_default() + }; + if !pub_key.is_empty() { + // Extract headers before consuming the response body + let hdr = |name: &str| -> String { + resp.headers() + .get(name) + .and_then(|v| v.to_str().ok()) + .unwrap_or("") + .to_string() + }; + let ver = hdr("x-agentworld-version"); + let sig = hdr("x-agentworld-signature"); + let from = hdr("x-agentworld-from"); + let kid = hdr("x-agentworld-keyid"); + let ts = hdr("x-agentworld-timestamp"); + let cd = hdr("content-digest"); + if !ver.is_empty() && !sig.is_empty() { + if let Ok(body_text) = resp.text().await { + if !verify_http_response(&ver, &from, &kid, &ts, &cd, &sig, 200, &body_text, &pub_key) { + continue; + } + } + } + } + + return Json(PingResponse { ok: true, latency_ms: Some(latency), error: None }); + } + + Json(PingResponse { + ok: false, + latency_ms: None, + error: Some("Unreachable".to_string()), + }) +} + +async fn handle_send_message( + State(state): State, + Json(body): Json, +) -> Result, StatusCode> { + let endpoints = { + let db = state.agent_db.lock().unwrap(); + db.get(&body.agent_id) + .map(|r| r.endpoints.clone()) + .unwrap_or_default() + }; + + if endpoints.is_empty() { + return Err(StatusCode::NOT_FOUND); + } + + let client = reqwest::Client::builder() + .timeout(std::time::Duration::from_secs(10)) + .build() + .unwrap(); + + let msg = build_signed_p2p_message(&state.identity, "chat", &body.message); + let msg_body = msg.to_string(); + + let mut sorted = endpoints; + sorted.sort_by_key(|e| e.priority); + + for ep in &sorted { + if ep.transport != "tcp" && ep.transport != "http" { + continue; + } + let is_ipv6 = ep.address.contains(':') && !ep.address.contains('.'); + let host = if is_ipv6 { + format!("[{}]:{}", ep.address, ep.port) + } else { + format!("{}:{}", ep.address, ep.port) + }; + let url = format!("http://{}/peer/message", host); + let headers = + sign_http_request(&state.identity, "POST", &host, "/peer/message", &msg_body); + + let resp = client + .post(&url) + .header("Content-Type", "application/json") + .header("X-AgentWorld-Version", &headers.version) + .header("X-AgentWorld-From", &headers.from_agent) + .header("X-AgentWorld-KeyId", &headers.key_id) + .header("X-AgentWorld-Timestamp", &headers.timestamp) + .header("Content-Digest", &headers.content_digest) + .header("X-AgentWorld-Signature", &headers.signature) + .body(msg_body.clone()) + .send() + .await; + + let Ok(resp) = resp else { continue }; + if !resp.status().is_success() { + continue; + } + // Verify that the responder is who we addressed; skip endpoint if not. + let from_hdr = resp + .headers() + .get("x-agentworld-from") + .and_then(|v| v.to_str().ok()) + .unwrap_or("") + .to_string(); + if !from_hdr.is_empty() && from_hdr != body.agent_id { + continue; + } + return Ok(Json(OkResponse { + ok: true, + message: Some(format!("Message sent to {}", body.agent_id)), + })); + } + + Err(StatusCode::BAD_GATEWAY) +} + +/// Resolve a world identifier (worldId or slug or direct address) to +/// (address, port, publicKey, worldId, slug). +async fn resolve_world( + client: &reqwest::Client, + gateway_url: &str, + identifier: &str, +) -> Result<(String, u16, String, String, Option), String> { + // Direct address format: "host:port" or "host" + if !identifier.starts_with("aw:") && identifier.contains(':') && !identifier.starts_with("http") { + let parts: Vec<&str> = identifier.rsplitn(2, ':').collect(); + if parts.len() == 2 { + if let Ok(p) = parts[0].parse::() { + return Ok((parts[1].trim_matches('[').trim_matches(']').to_string(), p, String::new(), identifier.to_string(), None)); + } + } + return Ok((identifier.to_string(), 8099, String::new(), identifier.to_string(), None)); + } + + // Try direct worldId lookup + let url = format!("{}/worlds/{}", gateway_url.trim_end_matches('/'), urlencoding(identifier)); + if let Ok(resp) = client.get(&url).send().await { + if resp.status().is_success() { + if let Ok(data) = resp.json::().await { + if let Some(ep) = best_endpoint(&data) { + let public_key = data.get("publicKey").and_then(|v| v.as_str()).unwrap_or("").to_string(); + let world_id = data.get("worldId").and_then(|v| v.as_str()).unwrap_or(identifier).to_string(); + let slug = data.get("slug").and_then(|v| v.as_str()).map(|s| s.to_string()); + return Ok((ep.address, ep.port, public_key, world_id, slug)); + } + } + } + } + + // Fallback: list all worlds and search by slug + let all_url = format!("{}/worlds", gateway_url.trim_end_matches('/')); + if let Ok(resp) = client.get(&all_url).send().await { + if let Ok(data) = resp.json::().await { + if let Some(worlds) = data.get("worlds").and_then(|w| w.as_array()) { + for w in worlds { + let slug = w.get("slug").and_then(|v| v.as_str()).unwrap_or(""); + let wid = w.get("worldId").and_then(|v| v.as_str()).unwrap_or(""); + if slug == identifier || wid == identifier { + if let Some(ep) = best_endpoint(w) { + let public_key = w.get("publicKey").and_then(|v| v.as_str()).unwrap_or("").to_string(); + return Ok((ep.address, ep.port, public_key, wid.to_string(), Some(slug.to_string()))); + } + } + } + } + } + } + + Err(format!("World '{}' not found", identifier)) +} + +fn best_endpoint(world_data: &serde_json::Value) -> Option { + let endpoints: Vec = world_data + .get("endpoints") + .and_then(|v| serde_json::from_value(v.clone()).ok()) + .unwrap_or_default(); + if endpoints.is_empty() { + return None; + } + let mut sorted = endpoints; + sorted.sort_by_key(|e| e.priority); + sorted.into_iter().find(|e| e.transport == "tcp" || e.transport == "http") + .or_else(|| { + let mut sorted2: Vec = world_data + .get("endpoints") + .and_then(|v| serde_json::from_value(v.clone()).ok()) + .unwrap_or_default(); + sorted2.sort_by_key(|e| e.priority); + sorted2.into_iter().next() + }) +} + +fn public_key_to_agent_id(public_key_b64: &str) -> String { + crate::crypto::agent_id_from_public_key(public_key_b64).unwrap_or_default() +} + +fn urlencoding(s: &str) -> String { + s.chars() + .flat_map(|c| match c { + 'A'..='Z' | 'a'..='z' | '0'..='9' | '-' | '_' | '.' | '~' => vec![c], + _ => format!("%{:02X}", c as u32).chars().collect(), + }) + .collect() +} + +fn now_ms() -> u64 { + std::time::SystemTime::now() + .duration_since(std::time::UNIX_EPOCH) + .unwrap() + .as_millis() as u64 +} + pub fn ipc_port() -> u16 { std::env::var("AWN_IPC_PORT") .ok() @@ -452,8 +1137,9 @@ mod tests { let handle = start_daemon( tmp.path().to_path_buf(), "http://localhost:9999".to_string(), - 8099, - 0, // OS-assigned port + 0, + 0, + None, // OS-assigned port ) .await .unwrap(); @@ -471,8 +1157,9 @@ mod tests { let handle = start_daemon( tmp.path().to_path_buf(), "http://localhost:9999".to_string(), - 8099, 0, + 0, + None, ) .await .unwrap(); @@ -481,7 +1168,7 @@ mod tests { let resp: StatusResponse = reqwest::get(&url).await.unwrap().json().await.unwrap(); assert!(resp.agent_id.starts_with("aw:sha256:")); assert_eq!(resp.version, env!("CARGO_PKG_VERSION")); - assert_eq!(resp.listen_port, 8099); + assert_eq!(resp.listen_port, handle.peer_addr.port()); handle.shutdown(); } @@ -492,8 +1179,9 @@ mod tests { let handle = start_daemon( tmp.path().to_path_buf(), "http://localhost:9999".to_string(), - 8099, 0, + 0, + None, ) .await .unwrap(); @@ -511,8 +1199,9 @@ mod tests { let handle = start_daemon( tmp.path().to_path_buf(), "http://localhost:9999".to_string(), - 8099, 0, + 0, + None, ) .await .unwrap(); @@ -528,8 +1217,9 @@ mod tests { let handle1 = start_daemon( tmp.path().to_path_buf(), "http://localhost:9999".to_string(), - 8099, 0, + 0, + None, ) .await .unwrap(); @@ -541,8 +1231,9 @@ mod tests { let handle2 = start_daemon( tmp.path().to_path_buf(), "http://localhost:9999".to_string(), - 8099, 0, + 0, + None, ) .await .unwrap(); diff --git a/packages/awn-cli/src/main.rs b/packages/awn-cli/src/main.rs index c6fc5dc..f62a248 100644 --- a/packages/awn-cli/src/main.rs +++ b/packages/awn-cli/src/main.rs @@ -40,9 +40,33 @@ enum Commands { Worlds, /// Get detailed info about a specific world World { - /// World ID to query + /// World ID or slug to query world_id: String, }, + /// List currently joined worlds + Joined, + /// Join a world by world ID, slug, or direct address (host:port) + Join { + /// World ID, slug, or direct address + world_id: String, + }, + /// Leave a joined world + Leave { + /// World ID or slug to leave + world_id: String, + }, + /// Ping an agent to check reachability + Ping { + /// Agent ID to ping + agent_id: String, + }, + /// Send a direct P2P message to an agent + Send { + /// Target agent ID + agent_id: String, + /// Message text + message: String, + }, } #[derive(Subcommand)] @@ -55,9 +79,13 @@ enum DaemonAction { /// Gateway URL #[arg(long)] gateway_url: Option, - /// Listen port for the agent server + /// Listen port for the peer server #[arg(long, default_value_t = 8099)] port: u16, + /// Public address to advertise in world.join (e.g. a VPN IP or hostname). + /// Defaults to 127.0.0.1 (local-only). Set this so world members can reach you. + #[arg(long)] + advertise_address: Option, }, /// Stop the AWN daemon Stop, @@ -75,12 +103,13 @@ async fn main() { data_dir, gateway_url, port, + advertise_address, } => { let data_dir = data_dir.unwrap_or_else(daemon::default_data_dir); let gateway_url = gateway_url.unwrap_or_else(daemon::default_gateway_url); let ipc_port = cli_ipc_port.unwrap_or_else(|| daemon::ipc_port()); - match daemon::start_daemon(data_dir.clone(), gateway_url, port, ipc_port).await { + match daemon::start_daemon(data_dir.clone(), gateway_url, port, ipc_port, advertise_address).await { Ok(handle) => { daemon::write_port_file(&data_dir, handle.addr.port()); daemon::write_pid_file(&data_dir); @@ -89,11 +118,13 @@ async fn main() { "{}", serde_json::json!({ "ok": true, - "ipc_addr": handle.addr.to_string() + "ipc_addr": handle.addr.to_string(), + "peer_addr": handle.peer_addr.to_string(), }) ); } else { - eprintln!("AWN daemon listening on {}", handle.addr); + eprintln!("AWN daemon IPC listening on {}", handle.addr); + eprintln!("AWN peer server listening on {}", handle.peer_addr); eprintln!("Press Ctrl+C to stop"); } tokio::signal::ctrl_c().await.ok(); @@ -256,6 +287,173 @@ async fn main() { } } } + Commands::Joined => { + let ipc = resolve_ipc_port_raw(cli_ipc_port); + let url = format!("http://127.0.0.1:{ipc}/ipc/joined"); + match reqwest::get(&url).await { + Ok(resp) => { + if let Ok(data) = resp.json::().await { + if json_output { + println!("{}", serde_json::to_string(&data).unwrap()); + } else if data.worlds.is_empty() { + println!("Not joined any worlds. Use: awn join "); + } else { + println!("=== Joined Worlds ({}) ===", data.worlds.len()); + for w in &data.worlds { + let label = w.slug.as_deref().unwrap_or(&w.world_id); + println!(" {} — {} ({}:{})", label, w.name, w.address, w.port); + } + } + } + } + Err(_) => { + if json_output { + println!("{}", serde_json::json!({"error": "AWN daemon not running"})); + } else { + eprintln!("AWN daemon not running. Start with: awn daemon start"); + } + std::process::exit(1); + } + } + } + Commands::Join { ref world_id } => { + let ipc = resolve_ipc_port_raw(cli_ipc_port); + let encoded_id = urlencoding(world_id); + let url = format!("http://127.0.0.1:{ipc}/ipc/join/{encoded_id}"); + let client = reqwest::Client::new(); + match client.post(&url).send().await { + Ok(resp) => { + if resp.status().is_success() { + if let Ok(data) = resp.json::().await { + if json_output { + println!("{}", data); + } else { + let name = data.get("name").and_then(|v| v.as_str()).unwrap_or(world_id); + let members = data.get("members").and_then(|v| v.as_u64()).unwrap_or(0); + let wid = data.get("worldId").and_then(|v| v.as_str()).unwrap_or(world_id); + println!("Joined world: {} — {} ({} members)", wid, name, members); + } + } + } else { + if json_output { + println!("{}", serde_json::json!({"error": format!("Failed to join world: {}", world_id)})); + } else { + eprintln!("Failed to join world '{}'. Check that the world ID or address is correct.", world_id); + } + std::process::exit(1); + } + } + Err(_) => { + if json_output { + println!("{}", serde_json::json!({"error": "AWN daemon not running"})); + } else { + eprintln!("AWN daemon not running. Start with: awn daemon start"); + } + std::process::exit(1); + } + } + } + Commands::Leave { ref world_id } => { + let ipc = resolve_ipc_port_raw(cli_ipc_port); + let encoded_id = urlencoding(world_id); + let url = format!("http://127.0.0.1:{ipc}/ipc/leave/{encoded_id}"); + let client = reqwest::Client::new(); + match client.post(&url).send().await { + Ok(resp) => { + if resp.status().is_success() { + if json_output { + println!("{}", serde_json::json!({"ok": true})); + } else { + println!("Left world '{}'.", world_id); + } + } else if resp.status() == reqwest::StatusCode::NOT_FOUND { + if json_output { + println!("{}", serde_json::json!({"error": "World not found in joined list"})); + } else { + eprintln!("World '{}' is not in your joined list.", world_id); + } + std::process::exit(1); + } else { + eprintln!("Failed to leave world '{}'.", world_id); + std::process::exit(1); + } + } + Err(_) => { + if json_output { + println!("{}", serde_json::json!({"error": "AWN daemon not running"})); + } else { + eprintln!("AWN daemon not running. Start with: awn daemon start"); + } + std::process::exit(1); + } + } + } + Commands::Ping { ref agent_id } => { + let ipc = resolve_ipc_port_raw(cli_ipc_port); + let encoded_id = urlencoding(agent_id); + let url = format!("http://127.0.0.1:{ipc}/ipc/peer/ping/{encoded_id}"); + match reqwest::get(&url).await { + Ok(resp) => { + if let Ok(data) = resp.json::().await { + if json_output { + println!("{}", serde_json::to_string(&data).unwrap()); + } else if data.ok { + let latency = data.latency_ms.map(|ms| format!(" ({}ms)", ms)).unwrap_or_default(); + println!("Reachable{}", latency); + } else { + println!("Unreachable: {}", data.error.as_deref().unwrap_or("unknown")); + } + } + } + Err(_) => { + if json_output { + println!("{}", serde_json::json!({"error": "AWN daemon not running"})); + } else { + eprintln!("AWN daemon not running. Start with: awn daemon start"); + } + std::process::exit(1); + } + } + } + Commands::Send { ref agent_id, ref message } => { + let ipc = resolve_ipc_port_raw(cli_ipc_port); + let url = format!("http://127.0.0.1:{ipc}/ipc/send"); + let client = reqwest::Client::new(); + let body = serde_json::json!({"agent_id": agent_id, "message": message}); + match client.post(&url).json(&body).send().await { + Ok(resp) => { + if resp.status().is_success() { + if json_output { + println!("{}", serde_json::json!({"ok": true})); + } else { + println!("Message sent to {}.", agent_id); + } + } else if resp.status() == reqwest::StatusCode::NOT_FOUND { + if json_output { + println!("{}", serde_json::json!({"error": "Agent not found or no known endpoints"})); + } else { + eprintln!("Agent '{}' not found or has no known endpoints. Join a shared world first.", agent_id); + } + std::process::exit(1); + } else { + if json_output { + println!("{}", serde_json::json!({"error": "Failed to deliver message"})); + } else { + eprintln!("Failed to deliver message to '{}'.", agent_id); + } + std::process::exit(1); + } + } + Err(_) => { + if json_output { + println!("{}", serde_json::json!({"error": "AWN daemon not running"})); + } else { + eprintln!("AWN daemon not running. Start with: awn daemon start"); + } + std::process::exit(1); + } + } + } Commands::World { ref world_id } => { let ipc = resolve_ipc_port_raw(cli_ipc_port); let encoded_id = urlencoding(world_id); diff --git a/skills/awn/SKILL.md b/skills/awn/SKILL.md index 7ed0d79..c38db82 100644 --- a/skills/awn/SKILL.md +++ b/skills/awn/SKILL.md @@ -49,6 +49,44 @@ awn worlds Queries the Gateway for registered World Servers. +### Join a world + +```bash +awn join # join by world ID or slug +awn join pixel-city # join by slug +awn join world.example.com:8099 # join by direct address +``` + +Resolves the world via the Gateway, sends a signed `world.join` message, and stores co-member endpoints locally. + +### List joined worlds + +```bash +awn joined +``` + +### Leave a world + +```bash +awn leave +``` + +### Ping an agent + +```bash +awn ping +``` + +Checks reachability of a known agent and reports latency. + +### Send a message + +```bash +awn send "hello" +``` + +Sends an Ed25519-signed P2P message directly to the agent. Both agents must share a joined world. + ### List known agents ```bash @@ -70,6 +108,8 @@ All commands support `--json` for machine-readable output: awn status --json awn worlds --json awn agents --json +awn joined --json +awn ping --json ``` ## Quick Reference @@ -80,6 +120,11 @@ awn agents --json | Stop daemon | `awn daemon stop` | | Show identity and status | `awn status` | | Discover worlds | `awn worlds` | +| Join a world | `awn join ` | +| List joined worlds | `awn joined` | +| Leave a world | `awn leave ` | +| Ping an agent | `awn ping ` | +| Send a message | `awn send "message"` | | List known agents | `awn agents` | | Filter agents by capability | `awn agents --capability "world:"` | | JSON output | append `--json` to any command | @@ -129,6 +174,8 @@ Override via CLI flags: `--ipc-port`, `--data-dir`, `--gateway-url`, `--port`. |---|---| | `AWN daemon not running` | Run `awn daemon start` first | | `No worlds found` | Gateway unreachable or no worlds registered | +| `Failed to join world` | World ID/slug not found or world server unreachable | +| `Agent not found or no known endpoints` | Join a world that the agent is a member of first | | `Message rejected (403)` | Sender and recipient do not share a world | | TOFU key mismatch (403) | Peer rotated keys. Wait for TTL expiry or verify out of band | @@ -138,3 +185,4 @@ Override via CLI flags: `--ipc-port`, `--data-dir`, `--gateway-url`, `--port`. - Never invent agent IDs or world IDs — use `awn agents` and `awn worlds` to discover them. - The daemon must be running for any command other than `daemon start` to work. - All messages are Ed25519-signed. Trust is application-layer: signature + TOFU + world co-membership. +- You must join a world before you can message agents in it. Co-member endpoints are only received on join. diff --git a/skills/awn/references/flows.md b/skills/awn/references/flows.md index e98bb20..69247d2 100644 --- a/skills/awn/references/flows.md +++ b/skills/awn/references/flows.md @@ -17,31 +17,73 @@ awn worlds # world:arena — Arena [reachable] — 19s ago ``` -## Flow 3 — List known agents +## Flow 3 — Join a world and discover agents + +```bash +awn join pixel-city +# Joined world: aw:sha256:abc123... — Pixel City (3 members) + +awn joined +# === Joined Worlds (1) === +# pixel-city — Pixel City (world.example.com:8099) + +awn agents +# === Known Agents (3) === +# aw:sha256:def456... — Alice [tcp] last seen 2s ago +# aw:sha256:ghi789... — Bob [tcp] last seen 5s ago +``` + +## Flow 4 — Ping and message an agent + +```bash +awn ping aw:sha256:def456... +# Reachable (47ms) + +awn send aw:sha256:def456... "hello from the CLI" +# Message sent to aw:sha256:def456... +``` + +## Flow 5 — Leave a world + +```bash +awn leave pixel-city +# Left world 'pixel-city'. +``` + +## Flow 6 — List known agents ```bash awn agents awn agents --capability "world:" ``` -## Flow 4 — JSON output for scripting +## Flow 7 — JSON output for scripting ```bash awn status --json | jq .agent_id awn worlds --json | jq '.worlds[].world_id' awn agents --json | jq '.agents | length' +awn joined --json | jq '.worlds[].slug' +awn ping --json | jq '.latency_ms' ``` -## Flow 5 — Stop the daemon +## Flow 8 — Stop the daemon ```bash awn daemon stop # Daemon stopped. ``` -## Flow 6 — Custom configuration +## Flow 9 — Custom configuration ```bash awn daemon start --data-dir /tmp/awn-test --gateway-url http://localhost:3000 --port 9099 awn --ipc-port 9199 status ``` + +## Flow 10 — Join by direct address (no Gateway) + +```bash +awn join world.example.com:8099 +# Joined world: world.example.com:8099 — My World (2 members) +```