diff --git a/gateway/platforms/tlon.py b/gateway/platforms/tlon.py index 5c9661626fc2..049e604a977d 100644 --- a/gateway/platforms/tlon.py +++ b/gateway/platforms/tlon.py @@ -18,6 +18,9 @@ TLON_ALLOWED_USERS - Comma-separated ships allowed to interact TLON_ALLOW_ALL_USERS - Set to "true" to allow all users (default: false) TLON_AUTO_DISCOVER - Set to "true" to auto-discover all group channels + TLON_BOT_ALIASES - Comma-separated names that count as mentions (default: Hermes) + TLON_OWNER_LISTEN_ENABLED - Set to "false" to require mentions from owner in groups + TLON_CHANNEL_REFRESH_INTERVAL - Seconds between group/channel discovery refreshes (default: 120) """ import asyncio @@ -43,12 +46,14 @@ from gateway.platforms.tlon_approval import ( PendingApproval, create_pending_approval, + emoji_to_approval_action, find_pending_approval, format_approval_request, format_blocked_list, format_confirmation, format_pending_list, has_duplicate_pending, + normalize_notification_id, prune_expired, ) from gateway.platforms.tlon_discovery import ( @@ -95,6 +100,11 @@ def _normalize_ship(ship: str) -> str: return ship +def _parse_csv(value: str) -> List[str]: + """Parse a comma-separated env value into non-empty stripped strings.""" + return [item.strip() for item in value.split(",") if item.strip()] + + def _parse_channel_nest(nest: str) -> Optional[Dict[str, str]]: """Parse a channel nest like 'chat/~host/channel-name'.""" parts = nest.split("/", 2) @@ -758,7 +768,18 @@ async def _process_event(self, event_data: str) -> None: except Exception as e: logger.error("[tlon] Event handler error: %s", e) elif event_json is not None: - logger.debug("[tlon] Ignoring event with unknown sub_id=%s", sub_id) + # Some %channels/%groups events arrive without a usable + # subscription id. OpenClaw broadcasts these to all handlers and + # lets each handler filter by event shape; do the same so group + # mentions do not disappear before _handle_channel_event sees them. + logger.debug("[tlon] Broadcasting event with unknown sub_id=%s", sub_id) + for handler in list(self._event_handlers.values()): + if not handler.get("event"): + continue + try: + await handler["event"](event_json) + except Exception as e: + logger.error("[tlon] Event handler error: %s", e) async def _ack(self, event_id: int) -> None: """Acknowledge events up to event_id.""" @@ -1039,8 +1060,23 @@ def __init__(self, config: PlatformConfig): for s in os.getenv("TLON_BLOCKED_SHIPS", "").split(",") if s.strip() ) + alias_values = _parse_csv(os.getenv("TLON_BOT_ALIASES", "Hermes")) + self.bot_aliases: List[str] = [] + for alias in alias_values: + if alias and alias.lower() not in {item.lower() for item in self.bot_aliases}: + self.bot_aliases.append(alias) + self.owner_listen_enabled = ( + os.getenv("TLON_OWNER_LISTEN_ENABLED", "true").lower() + not in ("false", "0", "no") + ) + self.owner_listen_disabled_channels: Set[str] = set( + _parse_csv(os.getenv("TLON_OWNER_LISTEN_DISABLED_CHANNELS", "")) + ) self.channel_rules: Dict[str, Dict[str, Any]] = self._load_channel_rules_from_env() self.pending_approvals: List[PendingApproval] = [] + self._exec_approval_prompts: Dict[str, Dict[str, str]] = {} + self._exec_approval_prompt_by_session: Dict[str, str] = {} + self._processed_approval_reactions: Set[str] = set() # SSE client self._sse: Optional[TlonSSEClient] = None @@ -1051,6 +1087,23 @@ def __init__(self, config: PlatformConfig): self._processed_ids: Set[str] = set() self._processed_dm_invites: Set[str] = set() self._max_processed = 2000 + self._dm_poll_task: Optional[asyncio.Task] = None + self._channel_refresh_task: Optional[asyncio.Task] = None + self._dm_poll_initialized: Set[str] = set() + self.dm_poll_enabled = ( + os.getenv("TLON_DM_POLL_ENABLED", "true").lower() + not in ("false", "0", "no") + ) + self.dm_poll_interval = self._env_float("TLON_DM_POLL_INTERVAL", 10.0) + self.dm_poll_limit = max(1, self._env_int("TLON_DM_POLL_LIMIT", 20)) + self.dm_poll_initial_catchup_seconds = max( + 0.0, + self._env_float("TLON_DM_POLL_INITIAL_CATCHUP_SECONDS", 1800.0), + ) + self.channel_refresh_interval = self._env_float( + "TLON_CHANNEL_REFRESH_INTERVAL", + 120.0, + ) # Bot nickname cache self._bot_nickname: Optional[str] = None @@ -1064,6 +1117,29 @@ def __init__(self, config: PlatformConfig): self._channel_to_group: Dict[str, str] = {} self._group_names: Dict[str, str] = {} self._participated_threads: Set[Tuple[str, str]] = set() + self._blob_retry_delay = self._env_float("TLON_BLOB_RETRY_DELAY", 5.0) + + @staticmethod + def _env_float(name: str, default: float) -> float: + raw = os.getenv(name, "") + if not raw: + return default + try: + return float(raw) + except ValueError: + logger.warning("[tlon] Ignoring invalid %s=%r", name, raw) + return default + + @staticmethod + def _env_int(name: str, default: int) -> int: + raw = os.getenv(name, "") + if not raw: + return default + try: + return int(raw) + except ValueError: + logger.warning("[tlon] Ignoring invalid %s=%r", name, raw) + return default def _load_channel_rules_from_env(self) -> Dict[str, Dict[str, Any]]: raw = os.getenv("TLON_CHANNEL_RULES", "") @@ -1130,6 +1206,12 @@ def _apply_settings(self, settings: TlonSettings) -> None: for item in settings.pending_approvals if isinstance(item, dict) ] + if settings.owner_listen_enabled is not None: + self.owner_listen_enabled = settings.owner_listen_enabled + if settings.owner_listen_disabled_channels is not None: + self.owner_listen_disabled_channels = { + item for item in settings.owner_listen_disabled_channels if item + } async def _put_settings_entry(self, key: str, value: Any) -> None: if not self._sse: @@ -1223,10 +1305,34 @@ async def connect(self) -> bool: on_quit=lambda: logger.info("[tlon] Settings subscription quit received"), ) + # Match OpenClaw's group read side: subscribe to groups-ui for + # live channel/group membership changes, and foreigns for group + # invite updates. These are best-effort; channels/chat firehoses + # remain the primary inbound message streams. + await self._sse.subscribe( + app="groups", + path="/groups/ui", + on_event=self._handle_groups_ui_event, + on_error=lambda e: logger.error("[tlon] Groups-ui error: %s", e), + on_quit=lambda: logger.info("[tlon] Groups-ui quit received"), + ) + logger.info("[tlon] Subscribed to groups-ui for real-time channel detection") + + await self._sse.subscribe( + app="groups", + path="/v1/foreigns", + on_event=self._handle_group_foreigns_event, + on_error=lambda e: logger.error("[tlon] Foreigns error: %s", e), + on_quit=lambda: logger.info("[tlon] Foreigns quit received"), + ) + logger.info("[tlon] Subscribed to foreigns (/v1/foreigns) for group invites") + # Connect and start streaming await self._sse.connect() self._running = True + self._start_dm_history_poller() + self._start_channel_refresh() logger.info("[tlon] Connected and listening!") return True @@ -1237,11 +1343,284 @@ async def connect(self) -> bool: async def disconnect(self) -> None: """Disconnect from the Tlon ship.""" self._running = False + if self._dm_poll_task: + self._dm_poll_task.cancel() + with contextlib.suppress(asyncio.CancelledError): + await self._dm_poll_task + self._dm_poll_task = None + if self._channel_refresh_task: + self._channel_refresh_task.cancel() + with contextlib.suppress(asyncio.CancelledError): + await self._channel_refresh_task + self._channel_refresh_task = None if self._sse: await self._sse.close() self._sse = None logger.info("[tlon] Disconnected") + def _start_channel_refresh(self) -> None: + if ( + not self.auto_discover + or self.channel_refresh_interval <= 0 + or (self._channel_refresh_task and not self._channel_refresh_task.done()) + ): + return + self._channel_refresh_task = asyncio.create_task(self._channel_refresh_loop()) + logger.info( + "[tlon] Channel discovery refresh enabled every %.1fs", + self.channel_refresh_interval, + ) + + async def _channel_refresh_loop(self) -> None: + try: + while self._running: + await asyncio.sleep(self.channel_refresh_interval) + await self._refresh_discovered_channels() + except asyncio.CancelledError: + raise + except Exception as e: + logger.error("[tlon] Channel discovery refresh stopped: %s", e, exc_info=True) + + async def _refresh_discovered_channels(self) -> None: + if not self._sse or not self.auto_discover: + return + discovered = await self._discover_channels() + for nest in sorted(discovered): + if nest not in self.monitored_channels: + self.monitored_channels.add(nest) + logger.info("[tlon] Now watching new channel: %s", nest) + + def _dm_poll_targets(self) -> Set[str]: + """Return explicit DM partners worth polling as an SSE fallback.""" + targets: Set[str] = set() + for ship in ( + [self.owner_ship] + + list(self.allowed_users) + + list(self.dm_allowlist) + + list(self.default_authorized_ships) + ): + normalized = _normalize_ship(ship) + if normalized and normalized != self.ship_name: + targets.add(normalized) + return targets + + def _start_dm_history_poller(self) -> None: + if ( + not self.dm_poll_enabled + or self.dm_poll_interval <= 0 + or not self._dm_poll_targets() + or (self._dm_poll_task and not self._dm_poll_task.done()) + ): + return + self._dm_poll_task = asyncio.create_task(self._dm_history_poll_loop()) + logger.info( + "[tlon] DM history fallback poller enabled for %d target(s) every %.1fs", + len(self._dm_poll_targets()), + self.dm_poll_interval, + ) + + async def _dm_history_poll_loop(self) -> None: + try: + while self._running: + await self._poll_dm_histories() + await asyncio.sleep(self.dm_poll_interval) + except asyncio.CancelledError: + raise + except Exception as e: + logger.error("[tlon] DM history fallback poller stopped: %s", e, exc_info=True) + + async def _poll_dm_histories(self) -> None: + if not self._sse or not self._sse._connected: + return + for partner in sorted(self._dm_poll_targets()): + try: + await self._poll_dm_history(partner) + except Exception as e: + logger.debug("[tlon] DM history poll failed for %s: %s", partner, e) + + async def _poll_dm_history(self, partner: str) -> None: + posts = await self._fetch_dm_history_posts(partner) + if partner not in self._dm_poll_initialized: + await self._initialize_dm_history(partner, posts) + self._dm_poll_initialized.add(partner) + return + + for post in posts: + if self._should_process_dm_history_post(post): + await self._process_dm_history_post(partner, post) + + async def _fetch_dm_history_posts(self, partner: str) -> List[Dict[str, Any]]: + if not self._sse: + return [] + data = await self._sse.scry( + f"/chat/v4/dm/{partner}/writs/newest/{self.dm_poll_limit}/heavy" + ) + posts = self._extract_dm_history_posts(data) + posts.sort(key=lambda post: self._dm_history_sent(post) or 0) + return posts + + @staticmethod + def _extract_dm_history_posts(data: Any) -> List[Dict[str, Any]]: + if not isinstance(data, dict): + return [] + writs = data.get("writs") + if isinstance(writs, dict): + posts: List[Dict[str, Any]] = [] + for key, value in writs.items(): + if isinstance(value, dict): + post = dict(value) + post.setdefault("_history_key", key) + posts.append(post) + return posts + if isinstance(writs, list): + return [post for post in writs if isinstance(post, dict)] + return [] + + async def _initialize_dm_history( + self, + partner: str, + posts: List[Dict[str, Any]], + ) -> None: + """Seed old history and catch up at most one recent unanswered DM.""" + now_ms = int(time.time() * 1000) + catchup_after_ms = now_ms - int(self.dm_poll_initial_catchup_seconds * 1000) + latest_own_sent = max( + ( + sent + for post in posts + if self._dm_history_author(post) == self.ship_name + and not self._is_dm_history_status_notice(post) + for sent in [self._dm_history_sent(post)] + if sent is not None + ), + default=0, + ) + + catchup: List[Dict[str, Any]] = [] + for post in posts: + msg_id = self._dm_history_id(post) + if not msg_id: + continue + author = self._dm_history_author(post) + sent = self._dm_history_sent(post) + if ( + author + and author != self.ship_name + and sent is not None + and sent >= catchup_after_ms + and sent > latest_own_sent + ): + catchup.append(post) + continue + self._mark_processed(msg_id) + + if not catchup: + return + + # Process only the newest recent unanswered DM on startup. If several + # old user commands accumulated while the gateway was down, avoid + # replaying side-effectful requests unexpectedly. + selected = max(catchup, key=lambda post: self._dm_history_sent(post) or 0) + selected_id = self._dm_history_id(selected) + for post in catchup: + msg_id = self._dm_history_id(post) + if msg_id and msg_id != selected_id: + self._mark_processed(msg_id) + + logger.info( + "[tlon] DM history fallback catching up latest unanswered DM from %s", + partner, + ) + await self._process_dm_history_post(partner, selected) + + def _should_process_dm_history_post(self, post: Dict[str, Any]) -> bool: + msg_id = self._dm_history_id(post) + if not msg_id or msg_id in self._processed_ids: + return False + author = self._dm_history_author(post) + return bool(author and author != self.ship_name) + + async def _process_dm_history_post( + self, + partner: str, + post: Dict[str, Any], + ) -> None: + event = self._dm_history_event(partner, post) + if event: + await self._handle_dm_event(event) + + def _dm_history_event( + self, + partner: str, + post: Dict[str, Any], + ) -> Optional[Dict[str, Any]]: + essay = self._dm_history_essay(post) + msg_id = self._dm_history_id(post) + if not essay or not msg_id: + return None + return { + "whom": partner, + "id": msg_id, + "response": {"add": {"essay": essay}}, + } + + @staticmethod + def _dm_history_essay(post: Dict[str, Any]) -> Optional[Dict[str, Any]]: + essay = post.get("essay") or post.get("memo") + return essay if isinstance(essay, dict) else None + + def _dm_history_author(self, post: Dict[str, Any]) -> str: + essay = self._dm_history_essay(post) + if essay: + return _extract_author_ship(essay.get("author")) + return _extract_author_ship(post.get("author")) + + def _dm_history_text(self, post: Dict[str, Any]) -> str: + essay = self._dm_history_essay(post) + if not essay: + return "" + return _extract_message_text(essay.get("content")) + + def _is_dm_history_status_notice(self, post: Dict[str, Any]) -> bool: + text = self._dm_history_text(post) + return ( + "Gateway shutting down" in text + or "Gateway online" in text + or "Gateway restarted" in text + ) + + def _dm_history_sent(self, post: Dict[str, Any]) -> Optional[int]: + essay = self._dm_history_essay(post) + raw = (essay or {}).get("sent") if essay else None + if raw is None: + raw = post.get("sent") or post.get("sentAt") + try: + return int(raw) + except (TypeError, ValueError): + return None + + def _dm_history_id(self, post: Dict[str, Any]) -> str: + seal = post.get("seal") if isinstance(post.get("seal"), dict) else {} + msg_id = ( + post.get("id") + or seal.get("id") + or post.get("writId") + or post.get("_history_key") + ) + if msg_id: + msg_id = str(msg_id) + if "/" in msg_id: + return msg_id + author = self._dm_history_author(post) + return f"{author}/{msg_id}" if author else msg_id + + essay = self._dm_history_essay(post) + sent = self._dm_history_sent(post) + author = _extract_author_ship((essay or {}).get("author")) if essay else "" + if author and sent is not None: + return f"{author}/{_da_from_unix(sent)}" + return "" + async def handle_message(self, event) -> None: """Override base adapter's handle_message to bypass the pending-message replay system which causes echo loops on Tlon. @@ -1322,7 +1701,7 @@ async def send( if chat_id.startswith("~"): # DM โ€” pass reply_to for thread replies # reply_to should be the parent writ-id (e.g. "~ship/170.141...") - await self._send_dm(chat_id, story, sent_at, reply_to=reply_to) + msg_id = await self._send_dm(chat_id, story, sent_at, reply_to=reply_to) else: # Channel post โ€” pass reply_to for thread replies # reply_to should be the parent post ID (bare or @ud formatted) @@ -1334,11 +1713,10 @@ async def send( formatted_reply = _format_ud(int(bare)) else: formatted_reply = str(reply_to) - await self._send_channel_post(chat_id, story, sent_at, reply_to=formatted_reply) + msg_id = await self._send_channel_post(chat_id, story, sent_at, reply_to=formatted_reply) if formatted_reply: self._participated_threads.add((chat_id, _normalize_post_id(formatted_reply))) - msg_id = f"{self.ship_name}/{sent_at}" logger.info("[tlon] โœ“ Message sent: %s", msg_id) return SendResult(success=True, message_id=msg_id) @@ -1352,7 +1730,7 @@ async def _send_dm( story: list, sent_at: int, reply_to: Optional[str] = None, - ) -> None: + ) -> str: """Send a DM via %chat poke.""" to_ship = _normalize_ship(to_ship) # Author uses ~ prefix (matching @tloncorp/api) @@ -1416,6 +1794,52 @@ async def _send_dm( mark=os.getenv("TLON_DM_ACTION_MARK", "chat-dm-action-1"), json_data=dm_json, ) + return writ_id + + async def send_exec_approval( + self, + chat_id: str, + command: str, + session_key: str, + description: str = "dangerous command", + metadata: Optional[Dict[str, Any]] = None, + ) -> SendResult: + """Send a Tlon reaction-based dangerous-command approval prompt. + + Tlon has no gateway-native buttons, so mirror OpenClaw's approval UX: + the owner reacts to the prompt with ๐Ÿ‘, ๐Ÿ‘Ž, or ๐Ÿ›‘. The prompt is sent + to the configured owner DM when available so command status does not + leak into group channels. + """ + cmd_preview = command[:2000] + "..." if len(command) > 2000 else command + text = ( + "โš ๏ธ Dangerous command requires approval:\n" + f"```\n{cmd_preview}\n```\n" + f"Reason: {description}\n\n" + "React to this message: ๐Ÿ‘ approve once ยท ๐Ÿ‘Ž deny ยท ๐Ÿ›‘ deny/stop\n\n" + "Or reply `/approve`, `/approve session`, `/approve always`, or `/deny`." + ) + + target_chat_id = self.owner_ship or chat_id + result = await self.send(target_chat_id, text, metadata=metadata) + if not result.success or not result.message_id: + return result + + normalized_id = normalize_notification_id(result.message_id) + old_id = self._exec_approval_prompt_by_session.get(session_key) + if old_id: + self._exec_approval_prompts.pop(old_id, None) + self._exec_approval_prompts[normalized_id] = { + "session_key": session_key, + "chat_id": target_chat_id, + } + self._exec_approval_prompt_by_session[session_key] = normalized_id + logger.info( + "[tlon] Registered reaction approval prompt %s for session %s", + normalized_id, + session_key, + ) + return result async def _send_channel_post( self, @@ -1423,7 +1847,7 @@ async def _send_channel_post( story: list, sent_at: int, reply_to: Optional[str] = None, - ) -> None: + ) -> str: """Send a post to a channel (chat, heap, diary).""" # Author field WITH ~ prefix (matching @tloncorp/api convention) author = self.ship_name if self.ship_name.startswith("~") else f"~{self.ship_name}" @@ -1482,6 +1906,7 @@ async def _send_channel_post( mark=os.getenv("TLON_CHANNEL_ACTION_MARK", "channel-action-1"), json_data=action_json, ) + return _da_from_unix(sent_at) async def send_image( self, @@ -1509,10 +1934,10 @@ async def send_image( sent_at = int(time.time() * 1000) try: if chat_id.startswith("~"): - await self._send_dm(chat_id, story, sent_at, reply_to) + msg_id = await self._send_dm(chat_id, story, sent_at, reply_to) else: - await self._send_channel_post(chat_id, story, sent_at, reply_to) - return SendResult(success=True, message_id=f"{self.ship_name}/{sent_at}") + msg_id = await self._send_channel_post(chat_id, story, sent_at, reply_to) + return SendResult(success=True, message_id=msg_id) except Exception as e: return SendResult(success=False, error=str(e)) @@ -1533,33 +1958,45 @@ async def get_chat_info(self, chat_id: str) -> Dict[str, Any]: def _is_bot_mentioned(self, text: str) -> bool: """Check if the bot is mentioned in the text.""" - text_lower = text.lower() - # Check ship name mention - if self.ship_name.lower() in text_lower: - return True - # Check nickname mention - if self._bot_nickname and self._bot_nickname.lower() in text_lower: - return True + for name in self._bot_mention_names(): + if self._mention_name_matches(text, name): + return True return False def _strip_bot_mention(self, text: str) -> str: """Remove bot mentions from text.""" - # Remove ship name - text = re.sub( - re.escape(self.ship_name), - "", - text, - flags=re.IGNORECASE, - ).strip() - # Remove nickname if set - if self._bot_nickname: - text = re.sub( - re.escape(self._bot_nickname), - "", - text, - flags=re.IGNORECASE, - ).strip() - return text + for name in self._bot_mention_names(): + text = self._remove_mention_name(text, name) + return text.strip() + + def _bot_mention_names(self) -> List[str]: + names = [self.ship_name, self._bot_nickname, *self.bot_aliases] + result: List[str] = [] + seen: Set[str] = set() + for name in names: + if not name: + continue + key = name.lower() + if key in seen: + continue + seen.add(key) + result.append(name) + return result + + def _mention_name_matches(self, text: str, name: str) -> bool: + pattern = self._mention_pattern(name) + return bool(re.search(pattern, text, flags=re.IGNORECASE)) + + def _remove_mention_name(self, text: str, name: str) -> str: + pattern = self._mention_pattern(name, allow_trailing_punctuation=True) + return re.sub(pattern, "", text, flags=re.IGNORECASE).strip() + + @staticmethod + def _mention_pattern(name: str, *, allow_trailing_punctuation: bool = False) -> str: + suffix = r"(?:[:,]\s*)?" if allow_trailing_punctuation else "" + if name.startswith("~"): + return rf"(? bool: """ @@ -1623,6 +2060,13 @@ async def _prepare_media_context( def _is_owner(self, ship: str) -> bool: return bool(self.owner_ship and _normalize_ship(ship) == self.owner_ship) + def _should_owner_listen(self, ship: str, nest: str) -> bool: + return ( + self.owner_listen_enabled + and self._is_owner(ship) + and nest not in self.owner_listen_disabled_channels + ) + def _is_blocked(self, ship: str) -> bool: return _normalize_ship(ship) in self.blocked_ships @@ -1665,16 +2109,18 @@ async def _queue_approval(self, approval: PendingApproval) -> None: logger.info("[tlon] Approval already pending for %s", approval.requesting_ship) return + try: + result = await self.send(self.owner_ship, format_approval_request(approval)) + if result.success and result.message_id: + approval.notification_message_id = normalize_notification_id(result.message_id) + logger.info("[tlon] Queued approval %s for %s", approval.id, approval.requesting_ship) + except Exception as e: + logger.debug("[tlon] Failed to notify owner about approval %s: %s", approval.id, e) self.pending_approvals.append(approval) await self._put_settings_entry( "pendingApprovals", json.dumps([item.to_dict() for item in self.pending_approvals]), ) - try: - await self.send(self.owner_ship, format_approval_request(approval)) - logger.info("[tlon] Queued approval %s for %s", approval.id, approval.requesting_ship) - except Exception as e: - logger.debug("[tlon] Failed to notify owner about approval %s: %s", approval.id, e) async def _handle_owner_command(self, sender: str, text: str) -> Optional[str]: if not self._is_owner(sender) or not text.startswith("/"): @@ -1710,10 +2156,100 @@ async def _handle_owner_command(self, sender: str, text: str) -> Optional[str]: approval = find_pending_approval(self.pending_approvals, arg) if not approval: + if command in {"/approve", "/deny"}: + return None return "No matching pending Tlon approval." return await self._execute_approval_action(approval, action) + async def _handle_dm_reaction_event(self, event: Dict[str, Any], response: Dict[str, Any]) -> bool: + """Handle OpenClaw-style approval reactions on owner DM prompts. + + Returns True when the event was a reaction event and should not fall + through to normal message handling. + """ + add_react = response.get("add-react") + del_react = response.get("del-react") + if not add_react and not del_react: + return False + if not isinstance(add_react, dict): + return True + + react_author = _extract_author_ship(add_react.get("author") or add_react.get("ship")) + react_emoji = str(add_react.get("react") or add_react.get("emoji") or "") + if not react_author or react_author == self.ship_name: + return True + + message_id = normalize_notification_id(event.get("id")) + reaction_key = f"{message_id}:{react_author}:{react_emoji}" + if reaction_key in self._processed_approval_reactions: + return True + self._processed_approval_reactions.add(reaction_key) + if len(self._processed_approval_reactions) > self._max_processed: + self._processed_approval_reactions = set( + list(self._processed_approval_reactions)[-self._max_processed:] + ) + + action = emoji_to_approval_action(react_emoji) + if not action: + return True + + if not self._is_owner(react_author): + logger.info( + "[tlon] Ignoring approval reaction %s from non-owner %s", + react_emoji, + react_author, + ) + return True + + pending = next( + ( + approval + for approval in prune_expired(self.pending_approvals) + if approval.notification_message_id == message_id + ), + None, + ) + if pending: + logger.info( + "[tlon] Reaction-based Tlon approval: %s -> %s for %s", + react_emoji, + action, + pending.id, + ) + confirmation = await self._execute_approval_action(pending, action) + await self.send(self.owner_ship or react_author, confirmation) + return True + + prompt = self._exec_approval_prompts.get(message_id) + if not prompt: + return True + + choice = "once" if action == "approve" else "deny" + session_key = prompt.get("session_key", "") + try: + from tools.approval import resolve_gateway_approval + + count = resolve_gateway_approval(session_key, choice) + except Exception as exc: + logger.error("[tlon] Failed to resolve exec approval reaction: %s", exc) + return True + + if count: + self._exec_approval_prompts.pop(message_id, None) + self._exec_approval_prompt_by_session.pop(session_key, None) + logger.info( + "[tlon] Reaction resolved %d exec approval(s) for session %s choice=%s", + count, + session_key, + choice, + ) + if choice == "once": + await self.send(self.owner_ship or react_author, "Approved command once. Continuing.") + else: + await self.send(self.owner_ship or react_author, "Denied command. Continuing.") + return True + async def _execute_approval_action(self, approval: PendingApproval, action: str) -> str: if action == "approve": if approval.type == "dm": @@ -1792,29 +2328,164 @@ async def _handle_group_invites(self, foreigns: Dict[str, Any]) -> None: inviter = _normalize_ship( str(invite.get("ship") or invite.get("inviter") or invite.get("invitedBy") or "") ) - if allowlist and inviter not in allowlist and not self._is_owner(inviter): - continue group_flag = invite.get("groupFlag") if not isinstance(group_flag, str): continue - try: - await self._sse.poke( - app="groups", - mark="group-join", - json_data={"flag": group_flag, "join-all": True}, + if self._is_owner(inviter): + await self._accept_group_invite(group_flag) + continue + if not self.auto_accept_group_invites: + await self._queue_group_invite_approval(inviter, invite) + continue + if not allowlist: + logger.info( + "[tlon] Rejected group invite from %s to %s (empty group invite allowlist)", + inviter or "(unknown)", + group_flag, ) - logger.info("[tlon] Auto-accepted group invite to %s", group_flag) - except Exception as e: - logger.debug("[tlon] Failed to accept group invite %s: %s", group_flag, e) + await self._queue_group_invite_approval(inviter, invite) + continue + if inviter not in allowlist: + logger.info( + "[tlon] Rejected group invite from %s to %s (not in group invite allowlist)", + inviter or "(unknown)", + group_flag, + ) + await self._queue_group_invite_approval(inviter, invite) + continue + await self._accept_group_invite(group_flag) + + async def _accept_group_invite(self, group_flag: str) -> None: + if not self._sse: + return + try: + await self._sse.poke( + app="groups", + mark="group-join", + json_data={"flag": group_flag, "join-all": True}, + ) + logger.info("[tlon] Auto-accepted group invite to %s", group_flag) + except Exception as e: + logger.debug("[tlon] Failed to accept group invite %s: %s", group_flag, e) + + async def _queue_group_invite_approval(self, inviter: str, invite: Dict[str, Any]) -> None: + if not self.owner_ship or not inviter: + return + group_flag = invite.get("groupFlag") + if not isinstance(group_flag, str): + return + approval = create_pending_approval( + approval_type="group", + requesting_ship=inviter, + group_flag=group_flag, + group_title=( + invite.get("groupTitle") + if isinstance(invite.get("groupTitle"), str) + else None + ), + existing_ids=[item.id for item in self.pending_approvals], + ) + await self._queue_approval(approval) + + async def _handle_group_foreigns_event(self, event: Any) -> None: + """Handle OpenClaw-compatible groups /v1/foreigns updates.""" + if not isinstance(event, dict): + return + await self._handle_group_invites(event) + + async def _handle_groups_ui_event(self, event: Any) -> None: + """ + Handle OpenClaw-compatible groups /groups/ui updates. + + OpenClaw uses this stream to notice new channels after group joins or + invite acceptance. It watches chat/heap channels from either + ``event.channels`` or ``event.join.channels``. + """ + if not isinstance(event, dict): + return + + group_flag = self._groups_ui_group_flag(event) + + channels = event.get("channels") + if isinstance(channels, dict): + for channel_nest in channels.keys(): + await self._watch_group_channel( + channel_nest, + group_flag=group_flag, + reason="new channel (invite accepted)", + persist=self.auto_accept_group_invites, + ) + + join = event.get("join") + if isinstance(join, dict): + join_group = join.get("group") + if isinstance(join_group, str): + group_flag = join_group + join_channels = join.get("channels") + if isinstance(join_channels, list): + for channel_nest in join_channels: + await self._watch_group_channel( + channel_nest, + group_flag=group_flag, + reason="joined channel", + persist=self.auto_accept_group_invites, + ) + + @staticmethod + def _groups_ui_group_flag(event: Dict[str, Any]) -> Optional[str]: + flag = event.get("flag") + if isinstance(flag, str): + return flag + group = event.get("group") + if isinstance(group, dict): + group_flag = group.get("flag") or group.get("id") + if isinstance(group_flag, str): + return group_flag + return None + + async def _watch_group_channel( + self, + channel_nest: Any, + *, + group_flag: Optional[str], + reason: str, + persist: bool = False, + ) -> bool: + if not isinstance(channel_nest, str): + return False + if not channel_nest.startswith(("chat/", "heap/")): + return False + if group_flag: + self._channel_to_group[channel_nest] = group_flag + if channel_nest in self.monitored_channels: + return False + + self.monitored_channels.add(channel_nest) + logger.info("[tlon] Auto-detected %s: %s", reason, channel_nest) + if persist: + await self._persist_group_channel(channel_nest) + return True + + async def _persist_group_channel(self, channel_nest: str) -> None: + current = list(self._settings.group_channels or []) + if channel_nest in current: + return + current.append(channel_nest) + self._settings.group_channels = current + await self._put_settings_entry("groupChannels", current) + logger.info("[tlon] Persisted %s to settings store", channel_nest) async def _discover_channels(self) -> Set[str]: """Discover channels from groups the bot is a member of.""" discovered = TlonDiscovery() - try: - init_data = await self._sse.scry("/groups-ui/v7/init.json") - discovered = parse_groups_ui_init(init_data) - except Exception as e: - logger.debug("[tlon] groups-ui discovery failed: %s", e) + for path in ("/groups-ui/v6/init.json", "/groups-ui/v7/init.json"): + try: + init_data = await self._sse.scry(path) + discovered = parse_groups_ui_init(init_data) + break + except Exception as e: + logger.debug("[tlon] groups-ui discovery failed at %s: %s", path, e) + else: try: groups = await self._sse.scry("/groups/v1/groups.json") discovered = parse_legacy_groups(groups) @@ -1865,11 +2536,12 @@ async def _handle_channel_event(self, event: Any) -> None: if not nest: return - # Auto-watch channels from firehose + # Match OpenClaw: if the channels firehose delivers a chat/heap + # event, the bot is in that channel, so watch it immediately. if nest not in self.monitored_channels: - if self.auto_discover and (nest.startswith("chat/") or nest.startswith("heap/")): + if nest.startswith(("chat/", "heap/")): self.monitored_channels.add(nest) - logger.info("[tlon] Auto-watching channel: %s", nest) + logger.info("[tlon] Auto-watching channel from firehose: %s", nest) else: return @@ -1889,7 +2561,9 @@ async def _handle_channel_event(self, event: Any) -> None: # Two event shapes: # 1) Top-level post: r-post.set.essay (type="post") - # 2) Thread reply: r-post.reply["r-reply"].set.memo + # 2) Thread reply: r-post.reply["r-reply"].set["reply-essay"] + # OpenClaw and current %channels use reply-essay; keep memo/essay + # as compatibility fallbacks for older or simplified fixtures. post_data = r_post.get("set") or {} essay = post_data.get("essay") if isinstance(post_data, dict) else None @@ -1903,7 +2577,11 @@ async def _handle_channel_event(self, event: Any) -> None: if r_reply: reply_set = r_reply.get("set") if reply_set and isinstance(reply_set, dict): - reply_memo = reply_set.get("memo") or reply_set.get("essay") + reply_memo = ( + reply_set.get("reply-essay") + or reply_set.get("memo") + or reply_set.get("essay") + ) is_thread_reply = True content = reply_memo or essay @@ -1936,9 +2614,30 @@ async def _handle_channel_event(self, event: Any) -> None: parent_id = seal.get("parent-id") or seal.get("parent") raw_text = _extract_message_text(content.get("content")) + effective_blob = content.get("blob") + + # Thread replies often arrive as memo events without blob metadata. + # Use the v5 reply-essay scry OpenClaw uses before deciding the + # event is empty. + if is_thread_reply and not effective_blob and not raw_text.strip() and msg_id and effective_id: + effective_blob = await self._fetch_reply_blob(nest, msg_id, effective_id) + + # Top-level file/image uploads can race the SSE event. If the + # message has no text/blob yet, wait briefly and retry through v4, + # which preserves essay.blob. + if ( + not is_thread_reply + and not effective_blob + and not raw_text.strip() + and effective_id + and self._blob_retry_delay > 0 + ): + await asyncio.sleep(self._blob_retry_delay) + effective_blob = await self._fetch_post_blob(nest, effective_id) + text, media_urls, media_types, message_type = await self._prepare_media_context( story_content=content.get("content"), - blob=content.get("blob"), + blob=effective_blob, text=raw_text, ) if not text.strip() and not media_urls: @@ -1948,13 +2647,39 @@ async def _handle_channel_event(self, event: Any) -> None: sender, nest, text[:80]) mentioned = self._is_bot_mentioned(text) + owner_listen = self._should_owner_listen(sender, nest) + + # If a text message already triggers the bot, do one delayed blob + # retry so captioned file/PDF uploads are visible to the agent too. + if ( + not is_thread_reply + and not effective_blob + and raw_text.strip() + and (mentioned or owner_listen) + and self._blob_retry_delay > 0 + ): + await asyncio.sleep(self._blob_retry_delay) + retry_blob = await self._fetch_post_blob(nest, effective_id) + if retry_blob: + effective_blob = retry_blob + text, media_urls, media_types, message_type = await self._prepare_media_context( + story_content=content.get("content"), + blob=effective_blob, + text=raw_text, + ) + mentioned = self._is_bot_mentioned(text) + thread_key = (nest, _normalize_post_id(parent_id)) if parent_id else None in_participated_thread = bool(thread_key and thread_key in self._participated_threads) - owner_blob_only = bool(self._is_owner(sender) and media_urls and not raw_text.strip()) + owner_blob_only = bool( + self._is_owner(sender) + and (media_urls or effective_blob) + and not raw_text.strip() + ) # In group channels, respond to mentions, participated threads, or - # owner blob-only messages. - if not (mentioned or in_participated_thread or owner_blob_only): + # owner messages when owner-listen is enabled. + if not (mentioned or in_participated_thread or owner_blob_only or owner_listen): logger.debug("[tlon] Not mentioned, ignoring") return @@ -2020,6 +2745,107 @@ async def _handle_channel_event(self, event: Any) -> None: except Exception as e: logger.error("[tlon] Channel event error: %s", e, exc_info=True) + async def _fetch_post_blob(self, nest: str, post_id: Any) -> Optional[str]: + """Fetch a top-level post blob through channels v4. + + Tlon's firehose can arrive before upload metadata has propagated, and + older/lightweight scries can strip blobs. OpenClaw uses channels v4 + around/post scries here because they preserve essay.blob. + """ + if not self._sse or not nest or not post_id: + return None + formatted_id = self._format_scry_post_id(post_id) + if not formatted_id: + return None + try: + data = await self._sse.scry( + f"/channels/v4/{nest}/posts/around/{formatted_id}/1/post" + ) + except Exception as e: + logger.debug("[tlon] Blob post scry failed for %s/%s: %s", nest, post_id, e) + return None + + posts = self._posts_from_scry_response(data) + wanted = _normalize_post_id(post_id) + for post in posts: + seal = post.get("seal") if isinstance(post, dict) else {} + essay = post.get("essay") if isinstance(post, dict) else None + if not isinstance(essay, dict): + r_post = post.get("r-post", {}) if isinstance(post, dict) else {} + set_data = r_post.get("set", {}) if isinstance(r_post, dict) else {} + essay = set_data.get("essay") if isinstance(set_data, dict) else None + seal = set_data.get("seal", seal) if isinstance(set_data, dict) else seal + seal_id = _normalize_post_id((seal or {}).get("id") if isinstance(seal, dict) else "") + if wanted and seal_id and seal_id != wanted and len(posts) != 1: + continue + blob = essay.get("blob") if isinstance(essay, dict) else None + if blob is not None: + return blob if isinstance(blob, str) else json.dumps(blob) + return None + + async def _fetch_reply_blob(self, nest: str, parent_id: Any, reply_id: Any) -> Optional[str]: + """Fetch a thread reply blob through channels v5 reply essays.""" + if not self._sse or not nest or not parent_id or not reply_id: + return None + formatted_parent = self._format_scry_post_id(parent_id) + if not formatted_parent: + return None + try: + data = await self._sse.scry( + f"/channels/v5/{nest}/posts/post/id/{formatted_parent}/replies/newest/5" + ) + except Exception as e: + logger.debug( + "[tlon] Blob reply scry failed for %s/%s/%s: %s", + nest, + parent_id, + reply_id, + e, + ) + return None + + replies = self._posts_from_scry_response(data) + wanted = _normalize_post_id(reply_id) + for reply in replies: + if not isinstance(reply, dict): + continue + seal = reply.get("seal") or {} + essay = reply.get("reply-essay") or reply.get("memo") + r_reply = reply.get("r-reply") + if isinstance(r_reply, dict): + set_data = r_reply.get("set", {}) + if isinstance(set_data, dict): + essay = essay or set_data.get("reply-essay") or set_data.get("memo") + seal = seal or set_data.get("seal", {}) + seal_id = _normalize_post_id(seal.get("id") if isinstance(seal, dict) else "") + if wanted and seal_id and seal_id != wanted and len(replies) != 1: + continue + blob = essay.get("blob") if isinstance(essay, dict) else None + if blob is not None: + return blob if isinstance(blob, str) else json.dumps(blob) + return None + + @staticmethod + def _posts_from_scry_response(data: Any) -> List[Dict[str, Any]]: + if isinstance(data, list): + return [item for item in data if isinstance(item, dict)] + if not isinstance(data, dict): + return [] + for key in ("posts", "replies", "writs"): + value = data.get(key) + if isinstance(value, list): + return [item for item in value if isinstance(item, dict)] + if isinstance(value, dict): + return [item for item in value.values() if isinstance(item, dict)] + return [item for item in data.values() if isinstance(item, dict)] + + @staticmethod + def _format_scry_post_id(post_id: Any) -> str: + bare = str(post_id or "").split("/")[-1].replace(".", "") + if bare.isdigit(): + return _format_ud(int(bare)) + return str(post_id or "") + async def _handle_dm_event(self, event: Any) -> None: """Handle a chat firehose (/v3) event.""" try: @@ -2071,6 +2897,8 @@ async def _handle_dm_event(self, event: Any) -> None: whom = event["whom"] msg_id = event.get("id") response = event["response"] + if isinstance(response, dict) and await self._handle_dm_reaction_event(event, response): + return # Extract message content essay = response.get("add", {}).get("essay") if isinstance(response.get("add"), dict) else None diff --git a/gateway/platforms/tlon_approval.py b/gateway/platforms/tlon_approval.py index 105922bb9306..fa1d80af3bde 100644 --- a/gateway/platforms/tlon_approval.py +++ b/gateway/platforms/tlon_approval.py @@ -11,6 +11,30 @@ APPROVAL_TTL_SECONDS = 48 * 60 * 60 +def emoji_to_approval_action(emoji: str) -> Optional[str]: + """Map OpenClaw-style approval reactions to approval actions.""" + if emoji == "๐Ÿ‘": + return "approve" + if emoji == "๐Ÿ‘Ž": + return "deny" + if emoji == "๐Ÿ›‘": + return "block" + return None + + +def normalize_notification_id(message_id: Any) -> str: + """Normalize Tlon writ/post ids for reaction approval comparisons. + + send() may return ``~ship/170.141...`` while firehose events may provide + either the same writ id or a bare/dotted @ud. Strip the optional ship + prefix and dots so both forms compare equal. + """ + value = str(message_id or "") + if value.startswith("~") and "/" in value: + value = value.split("/", 1)[1] + return value.replace(".", "") + + @dataclass class PendingApproval: id: str @@ -136,6 +160,8 @@ def format_approval_request(approval: PendingApproval) -> str: subject, preview, "", + "React to this message: ๐Ÿ‘ approve ยท ๐Ÿ‘Ž deny ยท ๐Ÿ›‘ block", + "", f"Pending approval id: {approval.id}", "", "Use one of:", diff --git a/gateway/platforms/tlon_settings.py b/gateway/platforms/tlon_settings.py index e723a1d95202..4dce5abdbff7 100644 --- a/gateway/platforms/tlon_settings.py +++ b/gateway/platforms/tlon_settings.py @@ -24,6 +24,8 @@ class TlonSettings: owner_ship: Optional[str] = None pending_approvals: Optional[List[Dict[str, Any]]] = None show_model_signature: Optional[bool] = None + owner_listen_enabled: Optional[bool] = None + owner_listen_disabled_channels: Optional[List[str]] = None def parse_settings_response(raw: Any) -> TlonSettings: @@ -44,6 +46,8 @@ def parse_settings_response(raw: Any) -> TlonSettings: settings.owner_ship = bucket.get("ownerShip") if isinstance(bucket.get("ownerShip"), str) else None settings.pending_approvals = _pending_approvals(bucket.get("pendingApprovals")) settings.show_model_signature = _bool_or_none(bucket.get("showModelSig")) + settings.owner_listen_enabled = _bool_or_none(bucket.get("ownerListenEnabled")) + settings.owner_listen_disabled_channels = _string_list(bucket.get("ownerListenDisabledChannels")) return settings @@ -84,6 +88,8 @@ def apply_settings_update(current: TlonSettings, key: str, value: Any) -> TlonSe "owner_ship": current.owner_ship, "pending_approvals": current.pending_approvals, "show_model_signature": current.show_model_signature, + "owner_listen_enabled": current.owner_listen_enabled, + "owner_listen_disabled_channels": current.owner_listen_disabled_channels, } if key == "groupChannels": @@ -108,6 +114,10 @@ def apply_settings_update(current: TlonSettings, key: str, value: Any) -> TlonSe data["pending_approvals"] = _pending_approvals(value) elif key == "showModelSig": data["show_model_signature"] = _bool_or_none(value) + elif key == "ownerListenEnabled": + data["owner_listen_enabled"] = _bool_or_none(value) + elif key == "ownerListenDisabledChannels": + data["owner_listen_disabled_channels"] = _string_list(value) return TlonSettings(**data) diff --git a/tests/gateway/test_tlon_adapter.py b/tests/gateway/test_tlon_adapter.py index fb0c49cbaeb9..4d15da57890c 100644 --- a/tests/gateway/test_tlon_adapter.py +++ b/tests/gateway/test_tlon_adapter.py @@ -4,12 +4,19 @@ import pytest from gateway.config import PlatformConfig +from gateway.platforms.base import MessageType, SendResult from gateway.platforms.tlon import ( TlonAdapter, + TlonSSEClient, _extract_message_text, _text_to_story, ) -from gateway.platforms.tlon_approval import create_pending_approval, format_pending_list +from gateway.platforms.tlon_approval import ( + create_pending_approval, + format_approval_request, + format_pending_list, + normalize_notification_id, +) from gateway.platforms.tlon_discovery import parse_groups_ui_init from gateway.platforms.tlon_media import ( TlonDownloadedAttachment, @@ -137,6 +144,8 @@ def test_parse_settings_response_reads_tlon_bucket(): "channelRules": '{"chat/~host/general":{"mode":"restricted","allowedShips":["~nec"]}}', "defaultAuthorizedShips": ["~bus"], "ownerShip": "~ten", + "ownerListenEnabled": False, + "ownerListenDisabledChannels": ["chat/~host/noisy"], } } } @@ -148,6 +157,43 @@ def test_parse_settings_response_reads_tlon_bucket(): assert settings.channel_rules["chat/~host/general"]["allowedShips"] == ["~nec"] assert settings.default_authorized_ships == ["~bus"] assert settings.owner_ship == "~ten" + assert settings.owner_listen_enabled is False + assert settings.owner_listen_disabled_channels == ["chat/~host/noisy"] + + +@pytest.mark.asyncio +async def test_sse_broadcasts_unknown_subscription_id_like_openclaw(): + client = TlonSSEClient("http://ship.test", "code", "~bot-palnet") + seen = [] + + async def channel_handler(event): + seen.append(("channels", event)) + + async def dm_handler(event): + seen.append(("chat", event)) + + await client.subscribe( + app="channels", + path="/v2", + on_event=channel_handler, + on_error=None, + on_quit=None, + ) + await client.subscribe( + app="chat", + path="/v3", + on_event=dm_handler, + on_error=None, + on_quit=None, + ) + + await client._process_event( + 'id: 1\n' + 'data: {"json":{"nest":"chat/~host/general","response":{"post":{}}}}\n' + ) + + assert [name for name, _event in seen] == ["channels", "chat"] + assert seen[0][1]["nest"] == "chat/~host/general" def test_approval_formatting_lists_pending_request(): @@ -163,6 +209,210 @@ def test_approval_formatting_lists_pending_request(): assert "~zod" in pending +@pytest.mark.asyncio +async def test_connect_subscribes_to_openclaw_group_and_channel_read_paths(monkeypatch): + monkeypatch.setenv("TLON_SHIP_URL", "http://ship.test") + monkeypatch.setenv("TLON_SHIP_NAME", "~bot-palnet") + monkeypatch.setenv("TLON_SHIP_CODE", "code") + + class FakeTlonSSE: + instances = [] + + def __init__(self, **kwargs): + self.kwargs = kwargs + self.subscriptions = [] + self._connected = False + FakeTlonSSE.instances.append(self) + + async def authenticate(self): + return None + + async def scry(self, path): + if path == "/contacts/v1/self.json": + return {} + if path == "/settings/all.json": + return {} + return {} + + async def subscribe(self, *, app, path, on_event, on_error, on_quit): + self.subscriptions.append((app, path)) + + async def connect(self): + self._connected = True + + async def close(self): + self._connected = False + + monkeypatch.setattr("gateway.platforms.tlon.TlonSSEClient", FakeTlonSSE) + + adapter = TlonAdapter(PlatformConfig()) + assert await adapter.connect() is True + + subscriptions = FakeTlonSSE.instances[0].subscriptions + assert ("channels", "/v2") in subscriptions + assert ("chat", "/v3") in subscriptions + assert ("groups", "/groups/ui") in subscriptions + assert ("groups", "/v1/foreigns") in subscriptions + + await adapter.disconnect() + + +@pytest.mark.asyncio +async def test_channel_event_auto_watches_chat_and_heap_like_openclaw(monkeypatch): + monkeypatch.setenv("TLON_SHIP_NAME", "~bot-palnet") + monkeypatch.setenv("TLON_ALLOW_ALL_USERS", "true") + monkeypatch.setenv("TLON_AUTO_DISCOVER", "false") + adapter = TlonAdapter(PlatformConfig()) + adapter.monitored_channels = set() + adapter.handle_message = AsyncMock() + + await adapter._handle_channel_event({ + "nest": "chat/~host/new", + "response": { + "post": { + "id": "auto-watch-post", + "r-post": { + "set": { + "seal": {"id": "auto-watch-post"}, + "essay": { + "author": "~zod", + "sent": 1_700_000_000_000, + "content": [{"inline": [{"ship": "~bot-palnet"}, " hello"]}], + }, + } + }, + } + }, + }) + + assert "chat/~host/new" in adapter.monitored_channels + adapter.handle_message.assert_awaited_once() + + +@pytest.mark.asyncio +async def test_groups_ui_event_watches_joined_chat_and_heap_channels(monkeypatch): + monkeypatch.setenv("TLON_SHIP_NAME", "~bot-palnet") + adapter = TlonAdapter(PlatformConfig()) + adapter.auto_accept_group_invites = True + adapter._sse = AsyncMock() + + await adapter._handle_groups_ui_event({ + "flag": "~host/group", + "channels": { + "chat/~host/general": {}, + "heap/~host/gallery": {}, + "diary/~host/blog": {}, + }, + "join": { + "group": "~host/group", + "channels": ["chat/~host/joined", "diary/~host/notes"], + }, + }) + + assert adapter.monitored_channels == { + "chat/~host/general", + "heap/~host/gallery", + "chat/~host/joined", + } + assert adapter._channel_to_group["chat/~host/general"] == "~host/group" + assert adapter._channel_to_group["heap/~host/gallery"] == "~host/group" + assert adapter._channel_to_group["chat/~host/joined"] == "~host/group" + values = [ + call.kwargs["json_data"]["put-entry"]["value"] + for call in adapter._sse.poke.await_args_list + ] + assert values[-1] == [ + "chat/~host/general", + "heap/~host/gallery", + "chat/~host/joined", + ] + + +@pytest.mark.asyncio +async def test_foreigns_event_auto_accepts_group_invites(monkeypatch): + monkeypatch.setenv("TLON_SHIP_NAME", "~bot-palnet") + adapter = TlonAdapter(PlatformConfig()) + adapter.auto_accept_group_invites = True + adapter._settings.group_invite_allowlist = ["~zod"] + adapter._sse = AsyncMock() + + await adapter._handle_group_foreigns_event({ + "~host/group": { + "invites": [{"valid": True, "ship": "~zod"}], + } + }) + + adapter._sse.poke.assert_awaited_once() + call = adapter._sse.poke.await_args.kwargs + assert call["app"] == "groups" + assert call["mark"] == "group-join" + assert call["json_data"] == {"flag": "~host/group", "join-all": True} + + +@pytest.mark.asyncio +async def test_foreigns_event_empty_allowlist_is_fail_closed(monkeypatch): + monkeypatch.setenv("TLON_SHIP_NAME", "~bot-palnet") + adapter = TlonAdapter(PlatformConfig()) + adapter.auto_accept_group_invites = True + adapter._settings.group_invite_allowlist = [] + adapter.owner_ship = "~malmur-halmex" + adapter._sse = AsyncMock() + adapter.send = AsyncMock(return_value=SendResult(success=True, message_id="~bot/170.141")) + adapter._put_settings_entry = AsyncMock() + + await adapter._handle_group_foreigns_event({ + "~host/group": { + "invites": [{"valid": True, "ship": "~zod"}], + } + }) + + adapter._sse.poke.assert_not_awaited() + assert len(adapter.pending_approvals) == 1 + assert adapter.pending_approvals[0].type == "group" + assert adapter.pending_approvals[0].requesting_ship == "~zod" + + +@pytest.mark.asyncio +async def test_foreigns_event_queues_approval_when_auto_accept_disabled(monkeypatch): + monkeypatch.setenv("TLON_SHIP_NAME", "~bot-palnet") + adapter = TlonAdapter(PlatformConfig()) + adapter.auto_accept_group_invites = False + adapter._settings.group_invite_allowlist = ["~zod"] + adapter.owner_ship = "~malmur-halmex" + adapter._sse = AsyncMock() + adapter.send = AsyncMock(return_value=SendResult(success=True, message_id="~bot/170.141")) + adapter._put_settings_entry = AsyncMock() + + await adapter._handle_group_foreigns_event({ + "~host/group": { + "preview": {"meta": {"title": "Test Group"}}, + "invites": [{"valid": True, "ship": "~zod"}], + } + }) + + adapter._sse.poke.assert_not_awaited() + assert len(adapter.pending_approvals) == 1 + assert adapter.pending_approvals[0].group_flag == "~host/group" + assert adapter.pending_approvals[0].group_title == "Test Group" + + +@pytest.mark.asyncio +async def test_foreigns_event_accepts_owner_invite_even_without_auto_accept(monkeypatch): + monkeypatch.setenv("TLON_SHIP_NAME", "~bot-palnet") + adapter = TlonAdapter(PlatformConfig()) + adapter.auto_accept_group_invites = False + adapter.owner_ship = "~malmur-halmex" + adapter._sse = AsyncMock() + + await adapter._handle_group_foreigns_event({ + "~host/group": { + "invites": [{"valid": True, "ship": "~malmur-halmex"}], + } + }) + + adapter._sse.poke.assert_awaited_once() + + @pytest.mark.asyncio async def test_channel_event_routes_top_level_mentions(monkeypatch): monkeypatch.setenv("TLON_SHIP_NAME", "~bot-palnet") @@ -201,6 +451,213 @@ async def test_channel_event_routes_top_level_mentions(monkeypatch): assert isinstance(event.timestamp, datetime) +@pytest.mark.asyncio +async def test_channel_event_routes_bot_alias_mentions(monkeypatch): + monkeypatch.setenv("TLON_SHIP_NAME", "~bot-palnet") + monkeypatch.setenv("TLON_BOT_ALIASES", "Hermes") + monkeypatch.setenv("TLON_ALLOW_ALL_USERS", "true") + adapter = TlonAdapter(PlatformConfig()) + adapter.monitored_channels = {"chat/~host/test"} + adapter.handle_message = AsyncMock() + + await adapter._handle_channel_event({ + "nest": "chat/~host/test", + "response": { + "post": { + "id": "alias-post", + "r-post": { + "set": { + "seal": {"id": "alias-post"}, + "essay": { + "author": "~zod", + "sent": 1_700_000_000_000, + "content": [{"inline": ["Hermes: hello"]}], + }, + } + }, + } + }, + }) + + adapter.handle_message.assert_awaited_once() + event = adapter.handle_message.await_args.args[0] + assert event.text == "hello" + + +@pytest.mark.asyncio +async def test_channel_event_routes_owner_without_mention(monkeypatch): + monkeypatch.setenv("TLON_SHIP_NAME", "~bot-palnet") + monkeypatch.setenv("TLON_OWNER_SHIP", "~zod") + monkeypatch.setenv("TLON_OWNER_LISTEN_ENABLED", "true") + adapter = TlonAdapter(PlatformConfig()) + adapter.monitored_channels = {"chat/~host/test"} + adapter.handle_message = AsyncMock() + + await adapter._handle_channel_event({ + "nest": "chat/~host/test", + "response": { + "post": { + "id": "owner-listen-post", + "r-post": { + "set": { + "seal": {"id": "owner-listen-post"}, + "essay": { + "author": "~zod", + "sent": 1_700_000_000_000, + "content": [{"inline": ["hello without mention"]}], + }, + } + }, + } + }, + }) + + adapter.handle_message.assert_awaited_once() + event = adapter.handle_message.await_args.args[0] + assert event.text == "hello without mention" + + +@pytest.mark.asyncio +async def test_channel_event_retries_delayed_top_level_blob(monkeypatch): + monkeypatch.setenv("TLON_SHIP_NAME", "~bot-palnet") + monkeypatch.setenv("TLON_OWNER_SHIP", "~zod") + monkeypatch.setattr("gateway.platforms.tlon.asyncio.sleep", AsyncMock()) + adapter = TlonAdapter(PlatformConfig()) + adapter.monitored_channels = {"chat/~host/test"} + adapter.handle_message = AsyncMock() + adapter._fetch_post_blob = AsyncMock( + return_value='[{"type":"file","fileUri":"https://example.com/a.pdf","name":"a.pdf","mimeType":"application/pdf"}]' + ) + + async def fake_prepare(*, story_content, blob, text): + if blob: + return ( + '[file: a.pdf (application/pdf, unknown size)] https://example.com/a.pdf', + ["/tmp/a.pdf"], + ["application/pdf"], + MessageType.DOCUMENT, + ) + return text, [], [], MessageType.TEXT + + adapter._prepare_media_context = fake_prepare + + await adapter._handle_channel_event({ + "nest": "chat/~host/test", + "response": { + "post": { + "id": "170141184507864167403996323545639550976", + "r-post": { + "set": { + "seal": {"id": "170141184507864167403996323545639550976"}, + "essay": { + "author": "~zod", + "sent": 1_700_000_000_000, + "content": [], + }, + } + }, + } + }, + }) + + adapter._fetch_post_blob.assert_awaited_once() + adapter.handle_message.assert_awaited_once() + event = adapter.handle_message.await_args.args[0] + assert event.message_type == MessageType.DOCUMENT + assert event.media_urls == ["/tmp/a.pdf"] + assert "a.pdf" in event.text + + +@pytest.mark.asyncio +async def test_channel_event_fetches_thread_reply_blob(monkeypatch): + monkeypatch.setenv("TLON_SHIP_NAME", "~bot-palnet") + monkeypatch.setenv("TLON_OWNER_SHIP", "~zod") + adapter = TlonAdapter(PlatformConfig()) + adapter.monitored_channels = {"chat/~host/test"} + adapter.handle_message = AsyncMock() + adapter._fetch_reply_blob = AsyncMock( + return_value='[{"type":"file","fileUri":"https://example.com/thread.pdf","name":"thread.pdf","mimeType":"application/pdf"}]' + ) + + async def fake_prepare(*, story_content, blob, text): + if blob: + return ( + '[file: thread.pdf (application/pdf, unknown size)] https://example.com/thread.pdf', + ["/tmp/thread.pdf"], + ["application/pdf"], + MessageType.DOCUMENT, + ) + return text, [], [], MessageType.TEXT + + adapter._prepare_media_context = fake_prepare + + await adapter._handle_channel_event({ + "nest": "chat/~host/test", + "response": { + "post": { + "id": "parent-post", + "r-post": { + "reply": { + "id": "reply-post", + "r-reply": { + "set": { + "seal": {"parent-id": "parent-post"}, + "memo": { + "author": "~zod", + "sent": 1_700_000_000_000, + "content": [], + }, + } + }, + } + }, + } + }, + }) + + adapter._fetch_reply_blob.assert_awaited_once_with( + "chat/~host/test", + "parent-post", + "reply-post", + ) + adapter.handle_message.assert_awaited_once() + event = adapter.handle_message.await_args.args[0] + assert event.reply_to_message_id == "parent-post" + assert event.media_urls == ["/tmp/thread.pdf"] + + +@pytest.mark.asyncio +async def test_channel_event_ignores_owner_when_owner_listen_disabled_for_channel(monkeypatch): + monkeypatch.setenv("TLON_SHIP_NAME", "~bot-palnet") + monkeypatch.setenv("TLON_OWNER_SHIP", "~zod") + monkeypatch.setenv("TLON_OWNER_LISTEN_ENABLED", "true") + monkeypatch.setenv("TLON_OWNER_LISTEN_DISABLED_CHANNELS", "chat/~host/test") + adapter = TlonAdapter(PlatformConfig()) + adapter.monitored_channels = {"chat/~host/test"} + adapter.handle_message = AsyncMock() + + await adapter._handle_channel_event({ + "nest": "chat/~host/test", + "response": { + "post": { + "id": "owner-listen-disabled-post", + "r-post": { + "set": { + "seal": {"id": "owner-listen-disabled-post"}, + "essay": { + "author": "~zod", + "sent": 1_700_000_000_000, + "content": [{"inline": ["hello without mention"]}], + }, + } + }, + } + }, + }) + + adapter.handle_message.assert_not_awaited() + + @pytest.mark.asyncio async def test_channel_event_routes_thread_reply_to_parent(monkeypatch): monkeypatch.setenv("TLON_SHIP_NAME", "~bot-palnet") @@ -242,6 +699,48 @@ async def test_channel_event_routes_thread_reply_to_parent(monkeypatch): assert event.source.thread_id == "parent-post" +@pytest.mark.asyncio +async def test_channel_event_routes_openclaw_thread_reply_essay(monkeypatch): + monkeypatch.setenv("TLON_SHIP_NAME", "~bot-palnet") + monkeypatch.setenv("TLON_ALLOW_ALL_USERS", "true") + adapter = TlonAdapter(PlatformConfig()) + adapter.monitored_channels = {"chat/~host/test"} + adapter.handle_message = AsyncMock() + + await adapter._handle_channel_event({ + "nest": "chat/~host/test", + "response": { + "post": { + "id": "parent-post", + "r-post": { + "reply": { + "id": "reply-post", + "r-reply": { + "set": { + "seal": {"parent-id": "parent-post"}, + "reply-essay": { + "author": "~zod", + "sent": 1_700_000_000_000, + "content": [ + {"inline": ["got it ", {"ship": "~bot-palnet"}]} + ], + }, + } + }, + } + }, + } + }, + }) + + adapter.handle_message.assert_awaited_once() + event = adapter.handle_message.await_args.args[0] + assert event.text == "got it" + assert event.message_id == "reply-post" + assert event.reply_to_message_id == "parent-post" + assert event.source.thread_id == "parent-post" + + @pytest.mark.asyncio async def test_channel_event_routes_blob_only_owner_message(monkeypatch): monkeypatch.setenv("TLON_SHIP_NAME", "~bot-palnet") @@ -357,7 +856,7 @@ async def test_unauthorized_dm_queues_owner_approval(monkeypatch): monkeypatch.setenv("TLON_OWNER_SHIP", "~ten") adapter = TlonAdapter(PlatformConfig()) adapter.handle_message = AsyncMock() - adapter.send = AsyncMock() + adapter.send = AsyncMock(return_value=SendResult(success=True, message_id="~bot-palnet/170.141")) await adapter._handle_dm_event({ "whom": "~zod", @@ -376,4 +875,180 @@ async def test_unauthorized_dm_queues_owner_approval(monkeypatch): adapter.handle_message.assert_not_awaited() assert len(adapter.pending_approvals) == 1 assert adapter.pending_approvals[0].requesting_ship == "~zod" + assert adapter.pending_approvals[0].notification_message_id == "170141" + adapter.send.assert_awaited_once() + + +def test_tlon_approval_request_mentions_reactions(): + approval = create_pending_approval( + approval_type="dm", + requesting_ship="~zod", + existing_ids=[], + message_preview="hello", + ) + + text = format_approval_request(approval) + + assert "React to this message: ๐Ÿ‘ approve ยท ๐Ÿ‘Ž deny ยท ๐Ÿ›‘ block" in text + + +@pytest.mark.asyncio +async def test_owner_reaction_approves_pending_tlon_dm(monkeypatch): + monkeypatch.setenv("TLON_SHIP_NAME", "~bot-palnet") + monkeypatch.setenv("TLON_OWNER_SHIP", "~ten") + adapter = TlonAdapter(PlatformConfig()) + adapter._put_settings_entry = AsyncMock() + adapter._dispatch_pending_message = AsyncMock() + adapter.send = AsyncMock(return_value=SendResult(success=True, message_id="~bot-palnet/2")) + approval = create_pending_approval( + approval_type="dm", + requesting_ship="~zod", + existing_ids=[], + ) + approval.notification_message_id = normalize_notification_id("~bot-palnet/170.141") + adapter.pending_approvals = [approval] + + await adapter._handle_dm_event({ + "whom": "~ten", + "id": "~bot-palnet/170.141", + "response": { + "add-react": { + "author": "~ten", + "react": "๐Ÿ‘", + } + }, + }) + + assert "~zod" in adapter.dm_allowlist + assert adapter.pending_approvals == [] + adapter._dispatch_pending_message.assert_awaited_once_with(approval) + adapter.send.assert_awaited_once() + + +@pytest.mark.asyncio +async def test_send_exec_approval_registers_owner_dm_reaction_prompt(monkeypatch): + monkeypatch.setenv("TLON_SHIP_NAME", "~bot-palnet") + monkeypatch.setenv("TLON_OWNER_SHIP", "~ten") + adapter = TlonAdapter(PlatformConfig()) + adapter.send = AsyncMock(return_value=SendResult(success=True, message_id="~bot-palnet/170.141")) + + result = await adapter.send_exec_approval( + "chat/~host/general", + "rm -rf /tmp/example", + "tlon:chat/~host/general:~ten", + "dangerous command", + ) + + assert result.success adapter.send.assert_awaited_once() + assert adapter.send.await_args.args[0] == "~ten" + normalized = normalize_notification_id("~bot-palnet/170.141") + assert adapter._exec_approval_prompts[normalized]["session_key"] == "tlon:chat/~host/general:~ten" + + +@pytest.mark.asyncio +async def test_owner_reaction_resolves_exec_approval(monkeypatch): + monkeypatch.setenv("TLON_SHIP_NAME", "~bot-palnet") + monkeypatch.setenv("TLON_OWNER_SHIP", "~ten") + adapter = TlonAdapter(PlatformConfig()) + adapter.send = AsyncMock(return_value=SendResult(success=True, message_id="~bot-palnet/2")) + normalized = normalize_notification_id("~bot-palnet/170.141") + adapter._exec_approval_prompts[normalized] = { + "session_key": "tlon:~ten:~ten", + "chat_id": "~ten", + } + adapter._exec_approval_prompt_by_session["tlon:~ten:~ten"] = normalized + calls = [] + + def fake_resolve(session_key, choice, resolve_all=False): + calls.append((session_key, choice, resolve_all)) + return 1 + + monkeypatch.setattr("tools.approval.resolve_gateway_approval", fake_resolve) + + await adapter._handle_dm_event({ + "whom": "~ten", + "id": "~bot-palnet/170.141", + "response": { + "add-react": { + "author": "~ten", + "react": "๐Ÿ›‘", + } + }, + }) + + assert calls == [("tlon:~ten:~ten", "deny", False)] + assert normalized not in adapter._exec_approval_prompts + assert "tlon:~ten:~ten" not in adapter._exec_approval_prompt_by_session + + +def _dm_history_post(author: str, sent: int, text: str, post_id: str): + return { + "seal": {"id": post_id}, + "essay": { + "author": author, + "sent": sent, + "kind": "/chat", + "blob": None, + "content": [{"inline": [text]}], + "meta": None, + }, + "type": "post", + } + + +@pytest.mark.asyncio +async def test_dm_history_initial_catchup_only_routes_newest_unanswered(monkeypatch): + monkeypatch.setenv("TLON_SHIP_NAME", "~bot-palnet") + monkeypatch.setenv("TLON_OWNER_SHIP", "~zod") + adapter = TlonAdapter(PlatformConfig()) + adapter.handle_message = AsyncMock() + adapter.dm_poll_initial_catchup_seconds = 600 + + now_ms = 1_700_001_000_000 + monkeypatch.setattr("gateway.platforms.tlon.time.time", lambda: now_ms / 1000) + + await adapter._initialize_dm_history( + "~zod", + [ + _dm_history_post("~bot-palnet", now_ms - 500_000, "old reply", "~bot-palnet/1"), + _dm_history_post("~zod", now_ms - 300_000, "make a group", "~zod/2"), + _dm_history_post("~zod", now_ms - 100_000, "hello", "~zod/3"), + _dm_history_post( + "~bot-palnet", + now_ms - 50_000, + "Gateway shutting down - Your current task will be interrupted.", + "~bot-palnet/status", + ), + ], + ) + + adapter.handle_message.assert_awaited_once() + event = adapter.handle_message.await_args.args[0] + assert event.text == "hello" + assert event.message_id == "~zod/3" + assert "~zod/2" in adapter._processed_ids + + +@pytest.mark.asyncio +async def test_dm_history_poll_routes_new_unprocessed_messages(monkeypatch): + monkeypatch.setenv("TLON_SHIP_NAME", "~bot-palnet") + monkeypatch.setenv("TLON_OWNER_SHIP", "~zod") + adapter = TlonAdapter(PlatformConfig()) + adapter.handle_message = AsyncMock() + adapter._sse = AsyncMock() + adapter._sse.scry.return_value = { + "writs": { + "1": _dm_history_post("~zod", 1_700_000_000_000, "one", "~zod/1"), + "2": _dm_history_post("~zod", 1_700_000_001_000, "two", "~zod/2"), + } + } + adapter._dm_poll_initialized.add("~zod") + + await adapter._poll_dm_history("~zod") + + assert adapter.handle_message.await_count == 2 + assert [call.args[0].text for call in adapter.handle_message.await_args_list] == [ + "one", + "two", + ] diff --git a/tests/tools/test_tlon_tool.py b/tests/tools/test_tlon_tool.py index 2d1de643bde5..c8ed235b9b1e 100644 --- a/tests/tools/test_tlon_tool.py +++ b/tests/tools/test_tlon_tool.py @@ -1,6 +1,7 @@ import pytest from tools.tlon_tool import ( + TlonHttpError, TlonGroups, TlonHooks, TlonMessages, @@ -62,6 +63,15 @@ async def thread(self, **kwargs): async def scry(self, app, path, **_kwargs): self.scries.append({"app": app, "path": path}) + if app == "groups" and path == "/v2/groups": + return { + "~host/group": { + "meta": {"title": "Test Group"}, + "channels": { + "chat/~host/test": {"meta": {"title": "General"}}, + }, + } + } if app == "groups" and path.startswith("/v2/ui/groups/"): return self.group if app == "channels-server" and path == "/v0/hooks": @@ -108,6 +118,7 @@ async def test_group_create_owned_creates_group_and_assigns_admin(): assert result["success"] is True assert result["owner_ship"] == "~malmur-halmex" + assert result["admin_ships"] == ["~malmur-halmex"] assert client.threads[0]["desk"] == "groups" assert client.threads[0]["input_mark"] == "group-create-thread" assert client.threads[0]["body"]["guestList"] == ["~malmur-halmex"] @@ -177,6 +188,71 @@ async def test_group_create_owned_creates_group_and_assigns_admin(): assert result["admin_assigned"] is True assert result["admin_assignment"]["promoted"] == ["~malmur-halmex"] +@pytest.mark.asyncio +async def test_group_create_with_admins_force_adds_admin_seats(): + client = FakeTlonClient() + groups = TlonGroups(client) + + result = await groups.handle( + "group_create_with_admins", + { + "title": "Hermes Group", + "ships": ["~malmur-halmex", "~nec"], + }, + ) + + assert result["success"] is True + assert result["admin_ships"] == ["~malmur-halmex", "~nec"] + assert client.threads[0]["body"]["guestList"] == ["~malmur-halmex", "~nec"] + for ship in ["~malmur-halmex", "~nec"]: + assert ship in result["admin_assignment"]["promoted"] + assert client.group["seats"][ship]["roles"] == ["admin"] + + +@pytest.mark.asyncio +async def test_group_info_resolves_tlon_url_channel_to_parent_group(): + client = FakeTlonClient() + groups = TlonGroups(client) + + result = await groups.handle( + "group_info", + { + "group_id": ( + "https://host.tlon.network/apps/groups/Messages/Channel/ChannelRoot" + "?channelId=chat%2F~host%2Ftest&groupId=~host%2Fwrong" + ), + }, + ) + + assert result["success"] is True + assert result["group_id"] == "~host/group" + assert result["channel_id"] == "chat/~host/test" + assert result["resolved_from"] == "channel_id" + + +@pytest.mark.asyncio +async def test_group_info_returns_candidates_instead_of_raising_404(): + class NotFoundClient(FakeTlonClient): + async def scry(self, app, path, **kwargs): + if app == "groups" and path.startswith("/v2/ui/groups/"): + raise TlonHttpError( + "not found", + status=404, + app=app, + path=path, + ) + return await super().scry(app, path, **kwargs) + + client = NotFoundClient() + groups = TlonGroups(client) + + result = await groups.handle("group_info", {"group_id": "~host/wrong"}) + + assert result["success"] is True + assert result["found"] is False + assert result["requested_group_id"] == "~host/wrong" + assert result["candidates"][0]["group_id"] == "~host/group" + @pytest.mark.asyncio async def test_notebook_post_uses_diary_metadata(): diff --git a/tools/tlon_tool.py b/tools/tlon_tool.py index 64803de09e9b..57e0e973ba2a 100644 --- a/tools/tlon_tool.py +++ b/tools/tlon_tool.py @@ -20,7 +20,7 @@ from collections.abc import Iterable from pathlib import Path from typing import Any, Dict, List, Optional -from urllib.parse import urlparse +from urllib.parse import parse_qs, unquote, urlparse from agent.redact import redact_sensitive_text from gateway.platforms.tlon import ( @@ -49,7 +49,10 @@ "post reactions/edits/deletes, activity, expose controls, and raw " "scry/poke/thread calls. For ordinary message sending, use send_message; " "for creating a group or channel, inviting ships, or making someone admin, " - "use this tool." + "use this tool. When asked to create a group and make a ship admin in " + "one request, use group_create_with_admins or group_create_owned with " + "ship/ships; it force-adds seats and assigns admin directly, so do not " + "ask the user to accept/join first." ), "parameters": { "type": "object", @@ -64,6 +67,7 @@ "group_info", "group_create", "group_create_owned", + "group_create_with_admins", "group_invite", "group_leave", "group_join", @@ -182,8 +186,8 @@ "description": {"type": "string", "description": "Group/channel/role description."}, "image": {"type": "string", "description": "Image URL for metadata/profile."}, "cover": {"type": "string", "description": "Cover URL for metadata/profile."}, - "ship": {"type": "string", "description": "Single ship, e.g. ~zod."}, - "ships": {"type": "array", "items": {"type": "string"}, "description": "Ship list."}, + "ship": {"type": "string", "description": "Single ship, e.g. ~zod. For group_create_with_admins/group_create_owned this ship becomes admin."}, + "ships": {"type": "array", "items": {"type": "string"}, "description": "Ship list. For group_create_with_admins/group_create_owned these ships become admins."}, "role_id": {"type": "string", "description": "Group role id."}, "privacy": {"type": "string", "enum": ["public", "private", "secret"], "description": "Group privacy."}, "post_id": {"type": "string", "description": "Post id, dotted or bare @ud. For DMs, include ~author/id when possible."}, @@ -214,6 +218,16 @@ class TlonToolError(Exception): """User-facing Tlon tool failure.""" +class TlonHttpError(TlonToolError): + """HTTP failure from a Tlon scry/poke/thread endpoint.""" + + def __init__(self, message: str, *, status: int, app: str = "", path: str = ""): + super().__init__(message) + self.status = status + self.app = app + self.path = path + + class TlonHttpClient: def __init__(self, *, ship_url: str, ship_name: str, ship_code: str): self.ship_url = ship_url.rstrip("/") @@ -281,7 +295,12 @@ async def scry(self, app: str, path: str, *, timeout: int = 60) -> Any: ) as resp: if resp.status != 200: text = await resp.text() - raise TlonToolError(f"Scry {app}{path} failed: HTTP {resp.status} - {text[:300]}") + raise TlonHttpError( + f"Scry {app}{path} failed: HTTP {resp.status} - {text[:300]}", + status=resp.status, + app=app, + path=path, + ) return await resp.json() async def poke(self, app: str, mark: str, json_data: Any, *, timeout: int = 30) -> Dict[str, Any]: @@ -433,27 +452,71 @@ async def handle(self, action: str, args: Dict[str, Any]) -> Dict[str, Any]: ) return _ok(action, groups=groups) if action == "group_info": - group_id = _required(args, "group_id") - return _ok(action, group_id=group_id, group=await self.client.scry("groups", f"/v2/ui/groups/{group_id}")) - if action in {"group_create", "group_create_owned"}: - owner = _normalize_ship(str(args.get("ship") or args.get("owner") or os.getenv("TLON_OWNER_SHIP", ""))) - members = _ships(args) - if action == "group_create_owned" and owner: - members = _unique_ships([owner, *members]) + raw_group_ref = str(args.get("group_id") or "") + channel_id = _normalize_channel_ref(str(args.get("channel_id") or "")) + if not channel_id: + channel_id = ( + _query_value_from_url(raw_group_ref, "channelId") + or _query_value_from_url(raw_group_ref, "channel_id") + ) + group_id = _normalize_group_ref(raw_group_ref) + if _is_group_channel(group_id) and not channel_id: + channel_id = group_id + group_id = "" + if not group_id and not channel_id: + raise TlonToolError("group_info requires group_id or channel_id") + if channel_id: + resolved = await self.resolve_group_id(channel_id=channel_id, group_id=group_id) + if resolved: + group_id, group = resolved + return _ok( + action, + group_id=group_id, + channel_id=channel_id, + group=group, + resolved_from="channel_id", + ) + try: + return _ok( + action, + group_id=group_id, + group=await self.client.scry("groups", f"/v2/ui/groups/{group_id}"), + ) + except TlonHttpError as exc: + if exc.status != 404: + raise + return await self._group_not_found_result( + action, + requested_group_id=group_id, + channel_id=channel_id, + ) + if action in {"group_create", "group_create_owned", "group_create_with_admins"}: + create_with_admins = action in {"group_create_owned", "group_create_with_admins"} + owner = _normalize_ship(str(args.get("owner") or args.get("ship") or os.getenv("TLON_OWNER_SHIP", ""))) + admin_ships = _ships(args) + if create_with_admins and owner: + admin_ships = _unique_ships([*admin_ships, owner]) + if action == "group_create_with_admins" and not admin_ships: + raise TlonToolError("group_create_with_admins requires ship or ships") + members = _unique_ships(admin_ships if create_with_admins else _ships(args)) created = await self.create_group( title=_required(args, "title"), description=str(args.get("description") or ""), member_ids=members, ) - if action == "group_create_owned" and owner: + if create_with_admins and admin_ships: admin_result = await self.promote_admins( created["group_id"], - [owner], + admin_ships, add_missing_seats=True, ) - created["owner_ship"] = owner + if owner: + created["owner_ship"] = owner + created["admin_ships"] = admin_ships created["admin_role"] = _ADMIN_ROLE_ID - created["admin_assigned"] = owner in admin_result["promoted"] + created["admin_assigned"] = all( + ship in admin_result["promoted"] for ship in admin_ships + ) created["admin_assignment"] = admin_result return _ok(action, **created) if action == "group_invite": @@ -681,9 +744,11 @@ async def promote_admins( if still_missing: raise TlonToolError( - "Admin assignment did not verify for " + "Direct admin assignment did not verify for " f"{', '.join(still_missing)} in {group_id}. " - "The group was updated, but the admin role is not visible in group state." + "The group was updated, but the admin role is not visible in group state. " + "Do not ask the user to accept an invite first; retry group_promote " + "or inspect group_info." ) return { @@ -712,6 +777,54 @@ async def _poke_group(self, group_id: str, group_action: Dict[str, Any]) -> None {"group": {"flag": group_id, "a-group": group_action}}, ) + async def resolve_group_id( + self, + *, + channel_id: str = "", + group_id: str = "", + ) -> Optional[tuple[str, Dict[str, Any]]]: + groups = await _groups_index(self.client) + if not groups: + return None + match = _find_group_in_groups(groups, channel_id or group_id) + if match: + return match + if group_id: + return _find_group_in_groups(groups, group_id) + return None + + async def _group_not_found_result( + self, + action: str, + *, + requested_group_id: str, + channel_id: str = "", + ) -> Dict[str, Any]: + groups = await _groups_index(self.client) + match = _find_group_in_groups(groups, channel_id or requested_group_id) + if match: + group_id, group = match + return _ok( + action, + group_id=group_id, + requested_group_id=requested_group_id, + channel_id=channel_id or None, + group=group, + resolved_from="groups_index", + ) + + return _ok( + action, + found=False, + requested_group_id=requested_group_id, + channel_id=channel_id or None, + candidates=_candidate_groups(groups, requested_group_id, channel_id), + hint=( + "Group not found. Use one of the returned candidate group_id values, " + "or pass the channel_id so the tool can resolve its parent group." + ), + ) + class TlonChannels: def __init__(self, client: TlonHttpClient): @@ -1591,6 +1704,52 @@ async def _best_effort_scry_or_default(client: TlonHttpClient, candidates: List[ return default +async def _groups_index(client: TlonHttpClient) -> Any: + return await _best_effort_scry_or_default( + client, + [ + ("groups", "/v2/groups"), + ("groups-ui", "/v7/init"), + ("groups-ui", "/v8/init"), + ], + {}, + ) + + +def _normalize_group_ref(value: str) -> str: + raw = str(value or "").strip() + if not raw: + return "" + from_url = _query_value_from_url(raw, "groupId") or _query_value_from_url(raw, "group_id") + if from_url: + return from_url + for prefix in ("group/", "groups/"): + if raw.startswith(prefix): + raw = raw[len(prefix):] + return unquote(raw).strip() + + +def _normalize_channel_ref(value: str) -> str: + raw = str(value or "").strip() + if not raw: + return "" + from_url = _query_value_from_url(raw, "channelId") or _query_value_from_url(raw, "channel_id") + if from_url: + return from_url + return unquote(raw).strip() + + +def _query_value_from_url(value: str, key: str) -> str: + parsed = urlparse(value) + if not parsed.scheme and not parsed.query: + return "" + query = parse_qs(parsed.query) + vals = query.get(key) + if vals: + return unquote(vals[0]).strip() + return "" + + def _filter_init_channels(init: Any, mode: str) -> List[Any]: channels = init.get("channels", []) if isinstance(init, dict) else [] if not isinstance(channels, list): @@ -1603,7 +1762,14 @@ def _find_channel(group: Any, channel_id: str) -> Optional[Dict[str, Any]]: channels = group.get("channels") if isinstance(group, dict) else None if isinstance(channels, list): for channel in channels: - if isinstance(channel, dict) and channel.get("id") == channel_id: + if not isinstance(channel, dict): + continue + candidate_id = ( + channel.get("id") + or channel.get("channelId") + or channel.get("nest") + ) + if candidate_id == channel_id: return channel if isinstance(channels, dict): data = channels.get(channel_id) @@ -1612,19 +1778,108 @@ def _find_channel(group: Any, channel_id: str) -> Optional[Dict[str, Any]]: return None -def _find_channel_in_groups(groups: Any, channel_id: str) -> Optional[Dict[str, Any]]: - iterable = groups.values() if isinstance(groups, dict) else groups - if not isinstance(iterable, Iterable): +def _iter_group_items(groups: Any) -> Iterable[tuple[str, Dict[str, Any]]]: + if isinstance(groups, dict) and isinstance(groups.get("groups"), dict): + groups = groups["groups"] + if isinstance(groups, dict): + for raw_group_id, group in groups.items(): + if not isinstance(group, dict): + continue + group_id = str(group.get("id") or group.get("flag") or raw_group_id) + yield group_id, group + return + if isinstance(groups, list): + for group in groups: + if not isinstance(group, dict): + continue + group_id = group.get("id") or group.get("flag") + if group_id: + yield str(group_id), group + + +def _find_group_in_groups(groups: Any, ref: str) -> Optional[tuple[str, Dict[str, Any]]]: + wanted = str(ref or "").strip() + if not wanted: return None - for group in iterable: - if not isinstance(group, dict): - continue + + if _is_group_channel(wanted): + for group_id, group in _iter_group_items(groups): + if _find_channel(group, wanted): + return group_id, group + return None + + for group_id, group in _iter_group_items(groups): + if group_id == wanted: + return group_id, group + + matches: List[tuple[str, Dict[str, Any]]] = [] + for group_id, group in _iter_group_items(groups): + slug = group_id.split("/", 1)[-1] + title = _group_title(group) + if wanted == slug or (title and wanted.lower() == title.lower()): + matches.append((group_id, group)) + if len(matches) == 1: + return matches[0] + return None + + +def _find_channel_in_groups(groups: Any, channel_id: str) -> Optional[Dict[str, Any]]: + for group_id, group in _iter_group_items(groups): channel = _find_channel(group, channel_id) if channel: - return {"group_id": group.get("id") or group.get("flag"), "group": group, "channel": channel} + return {"group_id": group_id, "group": group, "channel": channel} return None +def _candidate_groups(groups: Any, requested_group_id: str = "", channel_id: str = "") -> List[Dict[str, Any]]: + requested_host = "" + if "/" in requested_group_id: + requested_host = requested_group_id.split("/", 1)[0] + + candidates: List[Dict[str, Any]] = [] + for group_id, group in _iter_group_items(groups): + channels = _group_channel_ids(group) + if channel_id and channel_id not in channels: + if requested_host and not group_id.startswith(f"{requested_host}/"): + continue + elif requested_host and not group_id.startswith(f"{requested_host}/"): + continue + + item: Dict[str, Any] = { + "group_id": group_id, + "title": _group_title(group), + } + if channels: + item["channels"] = channels[:10] + candidates.append(item) + if len(candidates) >= 10: + break + return candidates + + +def _group_title(group: Dict[str, Any]) -> str: + meta = group.get("meta") if isinstance(group, dict) else None + if isinstance(meta, dict) and isinstance(meta.get("title"), str): + return meta["title"] + title = group.get("title") if isinstance(group, dict) else "" + return title if isinstance(title, str) else "" + + +def _group_channel_ids(group: Dict[str, Any]) -> List[str]: + channels = group.get("channels") if isinstance(group, dict) else None + out: List[str] = [] + if isinstance(channels, dict): + out.extend(str(key) for key in channels.keys()) + elif isinstance(channels, list): + for channel in channels: + if not isinstance(channel, dict): + continue + channel_id = channel.get("id") or channel.get("channelId") or channel.get("nest") + if channel_id: + out.append(str(channel_id)) + return out + + def _find_channel_section(group: Any, channel_id: str) -> str: sections = group.get("navSections") or group.get("zone") or [] if isinstance(sections, list):