From 90cbd4f9a5261a36749fd7c89af18881c098b8d6 Mon Sep 17 00:00:00 2001 From: andy <102524336+b1rdmania@users.noreply.github.com> Date: Wed, 18 Mar 2026 23:54:57 +0000 Subject: [PATCH] =?UTF-8?q?feat:=20v0.5.5=20=E2=80=94=20/update=20command?= =?UTF-8?q?=20and=20/reset=20queue=20clear?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - /update: git pull + npm run build + launchd restart via process.exit(0) - /reset: now also clears pendingTasks and pendingMessages so the queue doesn't keep draining after a kill - GroupQueue.clearQueue() method added --- CHANGELOG.md | 9 ++++++ package.json | 2 +- src/channels/telegram.ts | 63 +++++++++++++++++++++------------------- src/group-queue.ts | 44 +++++++++++----------------- src/index.ts | 62 ++++----------------------------------- 5 files changed, 66 insertions(+), 114 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 47cb745..dd3cf3e 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,5 +1,14 @@ # Changelog +## v0.5.5 (2026-03-18) — Remote control hotfix + +### Fixes +- `/reset` now clears the pending task and message queue, not just kills the current agent. Previously, resetting would immediately drain the next queued item, making it impossible to stop a runaway queue remotely. +- `/reset` reply updated to confirm both kill and queue clear. + +### New +- `/update` command: pulls latest code from git, rebuilds, and restarts via launchd. Allows remote updates over Telegram without SSH access. This is a bootstrapped command — requires one manual `git pull && npm run build` to get it onto a running instance, after which all future updates can be done via Telegram. + ## v0.5.0 (2026-03-04) — Public Beta First public beta release. GhostClaw is feature-complete and ready for testing. diff --git a/package.json b/package.json index 2065a1d..4d2fe00 100644 --- a/package.json +++ b/package.json @@ -1,6 +1,6 @@ { "name": "ghostclaw", - "version": "0.5.4", + "version": "0.5.5", "description": "Personal AI assistant. Bare metal, Telegram-first, no containers.", "type": "module", "main": "dist/index.js", diff --git a/src/channels/telegram.ts b/src/channels/telegram.ts index eb814dd..f6dc2a3 100644 --- a/src/channels/telegram.ts +++ b/src/channels/telegram.ts @@ -1,6 +1,7 @@ import { Bot, InputFile } from 'grammy'; import path from 'path'; import fs from 'fs'; +import { execSync } from 'child_process'; import { ASSISTANT_NAME, TRIGGER_PATTERN } from '../config.js'; import { logger } from '../logger.js'; @@ -56,7 +57,7 @@ export class TelegramChannel implements Channel { ctx.reply(`${ASSISTANT_NAME} is online.`); }); - // Command to force-kill a stalled agent and start fresh + // Command to force-kill a stalled agent, clear the queue, and start fresh this.bot.command('reset', (ctx) => { const chatJid = `tg:${ctx.chat.id}`; const group = this.opts.registeredGroups()[chatJid]; @@ -64,11 +65,35 @@ export class TelegramChannel implements Channel { ctx.reply('Not a registered chat.'); return; } - const killed = this.opts.onReset?.(chatJid) ?? false; - if (killed) { - ctx.reply('Reset. Agent killed — send me something to start fresh.'); - } else { - ctx.reply('Nothing running. Send me something to start.'); + this.opts.onReset?.(chatJid); + ctx.reply('Reset. Agent killed and queue cleared — send me something to start fresh.'); + }); + + // Command to pull latest code and restart + this.bot.command('update', async (ctx) => { + const chatJid = `tg:${ctx.chat.id}`; + const group = this.opts.registeredGroups()[chatJid]; + if (!group) { + ctx.reply('Not a registered chat.'); + return; + } + + await ctx.reply('Pulling latest code...'); + const cwd = process.cwd(); + + try { + const pullOut = execSync('git pull', { cwd, encoding: 'utf-8' }); + await ctx.reply(`git pull: ${pullOut.trim()}`); + + await ctx.reply('Building...'); + execSync('npm run build', { cwd, encoding: 'utf-8', timeout: 120_000 }); + + await ctx.reply('Done. Restarting — back in a moment.'); + setTimeout(() => process.exit(0), 500); + } catch (err) { + const msg = err instanceof Error ? err.message : String(err); + await ctx.reply(`Update failed:\n${msg.slice(0, 500)}`); + logger.error({ err }, '/update command failed'); } }); @@ -87,15 +112,11 @@ export class TelegramChannel implements Channel { const sender = ctx.from?.id.toString() || ''; const msgId = ctx.message.message_id.toString(); - // Determine chat name const chatName = ctx.chat.type === 'private' ? senderName : (ctx.chat as any).title || chatJid; - // Translate Telegram @bot_username mentions into TRIGGER_PATTERN format. - // Telegram @mentions (e.g., @andy_ai_bot) won't match TRIGGER_PATTERN - // (e.g., ^@Andy\b), so we prepend the trigger when the bot is @mentioned. const botUsername = ctx.me?.username?.toLowerCase(); if (botUsername) { const entities = ctx.message.entities || []; @@ -113,7 +134,6 @@ export class TelegramChannel implements Channel { } } - // Store chat metadata for discovery const isGroup = ctx.chat.type === 'group' || ctx.chat.type === 'supergroup'; this.opts.onChatMetadata( @@ -124,7 +144,6 @@ export class TelegramChannel implements Channel { isGroup, ); - // Only deliver full message for registered groups const group = this.opts.registeredGroups()[chatJid]; if (!group) { logger.debug( @@ -134,7 +153,6 @@ export class TelegramChannel implements Channel { return; } - // Deliver message — startMessageLoop() will pick it up this.opts.onMessage(chatJid, { id: msgId, chat_jid: chatJid, @@ -152,7 +170,6 @@ export class TelegramChannel implements Channel { ); }); - // Handle non-text messages with placeholders so the agent knows something was sent const storeNonText = (ctx: any, placeholder: string) => { const chatJid = `tg:${ctx.chat.id}`; const group = this.opts.registeredGroups()[chatJid]; @@ -191,19 +208,16 @@ export class TelegramChannel implements Channel { const chatJid = `tg:${ctx.chat.id}`; const group = this.opts.registeredGroups()[chatJid]; - // Download and save photo let placeholder = '[Photo]'; try { const photos = ctx.message.photo; - const largestPhoto = photos[photos.length - 1]; // Get highest resolution + const largestPhoto = photos[photos.length - 1]; const file = await ctx.api.getFile(largestPhoto.file_id); const url = `https://api.telegram.org/file/bot${this.botToken}/${file.file_path}`; - // Create media directory if needed const mediaDir = path.join(process.cwd(), 'data', 'telegram-media'); await fs.promises.mkdir(mediaDir, { recursive: true }); - // Download photo const resp = await fetch(url); if (resp.ok) { const buffer = Buffer.from(await resp.arrayBuffer()); @@ -218,9 +232,7 @@ export class TelegramChannel implements Channel { 'Downloaded Telegram photo', ); - // Also save to Desktop for easy access if (chatJid === 'tg:414798121') { - // Main chat const desktopPath = path.join( process.env.HOME || '', 'Desktop', @@ -280,12 +292,10 @@ export class TelegramChannel implements Channel { this.bot.on('message:location', (ctx) => storeNonText(ctx, '[Location]')); this.bot.on('message:contact', (ctx) => storeNonText(ctx, '[Contact]')); - // Handle errors gracefully this.bot.catch((err) => { logger.error({ err: err.message }, 'Telegram bot error'); }); - // Start polling — returns a Promise that resolves when started return new Promise((resolve) => { this.bot!.start({ onStart: (botInfo) => { @@ -316,7 +326,6 @@ export class TelegramChannel implements Channel { try { const numericId = jid.replace(/^tg:/, ''); - // Send as voice note if replying to a voice message if (voiceReply) { logger.info({ jid }, 'Attempting voice reply via TTS'); const audioBuffer = await textToSpeech(text); @@ -333,17 +342,14 @@ export class TelegramChannel implements Channel { } } - // Convert markdown to Telegram HTML so bold, italic, code etc. render const html = markdownToTelegramHtml(text); - // Telegram has a 4096 character limit per message — split if needed const MAX_LENGTH = 4096; if (html.length <= MAX_LENGTH) { await this.bot.api.sendMessage(numericId, html, { parse_mode: 'HTML', }); } else { - // Split on newlines, closing/reopening HTML tags across chunk boundaries const chunks: string[] = []; let current = ''; const TELEGRAM_TAGS = ['pre', 'code', 'b', 'i', 'u', 's', 'a']; @@ -358,13 +364,11 @@ export class TelegramChannel implements Channel { } if (current) chunks.push(current); - // Fix unclosed tags in each chunk const fixedChunks: string[] = []; - let carryTags: string[] = []; // tags to reopen in next chunk + let carryTags: string[] = []; for (const chunk of chunks) { let fixed = carryTags.map((t) => `<${t}>`).join('') + chunk; - // Track which tags are open at the end of this chunk const openTags: string[] = []; for (const tag of TELEGRAM_TAGS) { const opens = ( @@ -377,7 +381,6 @@ export class TelegramChannel implements Channel { } } - // Close any unclosed tags at end of this chunk (reverse order) carryTags = [...openTags]; for (const tag of [...openTags].reverse()) { fixed += ``; diff --git a/src/group-queue.ts b/src/group-queue.ts index 4b08431..949304d 100644 --- a/src/group-queue.ts +++ b/src/group-queue.ts @@ -68,17 +68,12 @@ export class GroupQueue { const state = this.getGroup(groupJid); if (state.active) { - // If an idle task container is blocking messages, kill it so the - // message can be processed immediately instead of waiting for the - // task's idle timeout (which can be 30+ minutes). if (state.isTaskContainer && state.idleWaiting) { logger.info( { groupJid }, 'Preempting idle task container for incoming message', ); this.closeStdin(groupJid); - // The message will be picked up when drainGroup runs after the - // task container exits (pendingMessages flag below). } state.pendingMessages = true; if (state.isTaskContainer && this.onMessageQueuedFn) { @@ -110,7 +105,6 @@ export class GroupQueue { const state = this.getGroup(groupJid); - // Prevent double-queuing of the same task if (state.pendingTasks.some((t) => t.id === taskId)) { logger.debug({ groupJid, taskId }, 'Task already queued, skipping'); return; @@ -137,7 +131,6 @@ export class GroupQueue { return; } - // Run immediately this.runTask(groupJid, { id: taskId, groupJid, fn }).catch((err) => logger.error({ groupJid, taskId, err }, 'Unhandled error in runTask'), ); @@ -155,10 +148,6 @@ export class GroupQueue { if (groupFolder) state.groupFolder = groupFolder; } - /** - * Mark the container as idle-waiting (finished work, waiting for IPC input). - * If tasks are pending, preempt the idle container immediately. - */ notifyIdle(groupJid: string): void { const state = this.getGroup(groupJid); state.idleWaiting = true; @@ -167,15 +156,11 @@ export class GroupQueue { } } - /** - * Send a follow-up message to the active container via IPC file. - * Returns true if the message was written, false if no active container. - */ sendMessage(groupJid: string, text: string): boolean { const state = this.getGroup(groupJid); if (!state.active || !state.groupFolder || state.isTaskContainer) return false; - state.idleWaiting = false; // Agent is about to receive work, no longer idle + state.idleWaiting = false; const inputDir = path.join(DATA_DIR, 'ipc', state.groupFolder, 'input'); try { @@ -191,9 +176,6 @@ export class GroupQueue { } } - /** - * Signal the active container to wind down by writing a close sentinel. - */ closeStdin(groupJid: string): void { const state = this.getGroup(groupJid); if (!state.active || !state.groupFolder) return; @@ -220,6 +202,22 @@ export class GroupQueue { return true; } + /** + * Clear all pending tasks and messages for a group. + * Call alongside killAgent() on /reset so the queue doesn't keep draining. + */ + clearQueue(groupJid: string): void { + const state = this.getGroup(groupJid); + const taskCount = state.pendingTasks.length; + state.pendingTasks = []; + state.pendingMessages = false; + // Remove from waiting list too + this.waitingGroups = this.waitingGroups.filter((jid) => jid !== groupJid); + if (taskCount > 0) { + logger.info({ groupJid, taskCount }, 'Queue cleared on reset'); + } + } + private async runForGroup( groupJid: string, reason: 'messages' | 'drain', @@ -313,7 +311,6 @@ export class GroupQueue { const state = this.getGroup(groupJid); - // Tasks first (they won't be re-discovered from SQLite like messages) if (state.pendingTasks.length > 0) { const task = state.pendingTasks.shift()!; this.runTask(groupJid, task).catch((err) => @@ -325,7 +322,6 @@ export class GroupQueue { return; } - // Then pending messages if (state.pendingMessages) { this.runForGroup(groupJid, 'drain').catch((err) => logger.error( @@ -336,7 +332,6 @@ export class GroupQueue { return; } - // Nothing pending for this group; check if other groups are waiting for a slot this.drainWaiting(); } @@ -348,7 +343,6 @@ export class GroupQueue { const nextJid = this.waitingGroups.shift()!; const state = this.getGroup(nextJid); - // Prioritize tasks over messages if (state.pendingTasks.length > 0) { const task = state.pendingTasks.shift()!; this.runTask(nextJid, task).catch((err) => @@ -365,16 +359,12 @@ export class GroupQueue { ), ); } - // If neither pending, skip this group } } async shutdown(_gracePeriodMs: number): Promise { this.shuttingDown = true; - // Count active containers but don't kill them — they'll finish on their own - // via idle timeout or container timeout. The --rm flag cleans them up on exit. - // This prevents WhatsApp reconnection restarts from killing working agents. const activeContainers: string[] = []; for (const [jid, state] of this.groups) { if (state.process && !state.process.killed && state.containerName) { diff --git a/src/index.ts b/src/index.ts index b261eb5..bb00c62 100644 --- a/src/index.ts +++ b/src/index.ts @@ -96,7 +96,6 @@ function registerGroup(jid: string, group: RegisteredGroup): void { registeredGroups[jid] = group; setRegisteredGroup(jid, group); - // Create group folder fs.mkdirSync(path.join(groupDir, 'logs'), { recursive: true }); logger.info( @@ -105,10 +104,6 @@ function registerGroup(jid: string, group: RegisteredGroup): void { ); } -/** - * Get available groups list for the agent. - * Returns groups ordered by most recent activity. - */ export function getAvailableGroups(): import('./container-runner.js').AvailableGroup[] { const chats = getAllChats(); const registeredJids = new Set(Object.keys(registeredGroups)); @@ -130,10 +125,6 @@ export function _setRegisteredGroups( registeredGroups = groups; } -/** - * Process all pending messages for a group. - * Called by the GroupQueue when it's this group's turn. - */ async function processGroupMessages(chatJid: string): Promise { const group = registeredGroups[chatJid]; if (!group) return true; @@ -155,7 +146,6 @@ async function processGroupMessages(chatJid: string): Promise { if (missedMessages.length === 0) return true; - // For non-main groups, check if trigger is required and present if (!isMainGroup && group.requiresTrigger !== false) { const hasTrigger = missedMessages.some((m) => TRIGGER_PATTERN.test(m.content.trim()), @@ -165,13 +155,10 @@ async function processGroupMessages(chatJid: string): Promise { const prompt = formatMessages(missedMessages); - // Check if any message in the batch was a voice note (tagged [Voice: ...]) const hasVoiceMessage = missedMessages.some((m) => m.content.startsWith('[Voice:'), ); - // Advance cursor so the piping path in startMessageLoop won't re-fetch - // these messages. Save the old cursor so we can roll back on error. const previousCursor = lastAgentTimestamp[chatJid] || ''; lastAgentTimestamp[chatJid] = missedMessages[missedMessages.length - 1].timestamp; @@ -182,7 +169,6 @@ async function processGroupMessages(chatJid: string): Promise { 'Processing messages', ); - // Track idle timer for closing stdin when agent is idle let idleTimer: ReturnType | null = null; const resetIdleTimer = () => { @@ -196,10 +182,6 @@ async function processGroupMessages(chatJid: string): Promise { }, IDLE_TIMEOUT); }; - // Stall-aware typing indicator: - // - Refreshes every 4s (Telegram expires typing after 5s) - // - Stops after 15s of no agent output (stall detection) - // - Resumes when output flows again const TYPING_STALL_MS = 15_000; let typingActive = true; let lastOutputAt = Date.now(); @@ -208,7 +190,6 @@ async function processGroupMessages(chatJid: string): Promise { if (typingActive && Date.now() - lastOutputAt < TYPING_STALL_MS) { channel.setTyping?.(chatJid, true)?.catch(() => {}); } else if (typingActive) { - // Stalled — stop typing until output resumes typingActive = false; channel.setTyping?.(chatJid, false)?.catch(() => {}); } @@ -219,28 +200,24 @@ async function processGroupMessages(chatJid: string): Promise { let outputSentToUser = false; const output = await runAgent(group, prompt, chatJid, async (result) => { - // Any streaming output resets the stall timer and resumes typing lastOutputAt = Date.now(); if (!typingActive) { typingActive = true; channel.setTyping?.(chatJid, true)?.catch(() => {}); } - // Streaming output callback — called for each agent result if (result.result) { const raw = typeof result.result === 'string' ? result.result : JSON.stringify(result.result); - // Strip ... blocks — agent uses these for internal reasoning const text = raw.replace(/[\s\S]*?<\/internal>/g, '').trim(); logger.info({ group: group.name }, `Agent output: ${raw.slice(0, 200)}`); if (text) { clearInterval(typingInterval); - await channel.sendMessage(chatJid, text, false); // Disabled voice replies + await channel.sendMessage(chatJid, text, false); outputSentToUser = true; } - // Only reset idle timer on actual results, not session-update markers (result: null) resetIdleTimer(); } @@ -258,8 +235,6 @@ async function processGroupMessages(chatJid: string): Promise { if (idleTimer) clearTimeout(idleTimer); if (output === 'error' || hadError) { - // If we already sent output to the user, don't roll back the cursor — - // the user got their response and re-processing would send duplicates. if (outputSentToUser) { logger.warn( { group: group.name }, @@ -267,7 +242,6 @@ async function processGroupMessages(chatJid: string): Promise { ); return true; } - // Roll back cursor so retries can re-process these messages lastAgentTimestamp[chatJid] = previousCursor; saveState(); logger.warn( @@ -289,7 +263,6 @@ async function runAgent( const isMain = group.folder === MAIN_GROUP_FOLDER; const sessionId = sessions[group.folder]; - // Update tasks snapshot for container to read (filtered by group) const tasks = getAllTasks(); writeTasksSnapshot( group.folder, @@ -305,7 +278,6 @@ async function runAgent( })), ); - // Update available groups snapshot (main group only can see all groups) const availableGroups = getAvailableGroups(); writeGroupsSnapshot( group.folder, @@ -314,7 +286,6 @@ async function runAgent( new Set(Object.keys(registeredGroups)), ); - // Wrap onOutput to track session ID from streamed results const wrappedOnOutput = onOutput ? async (output: ContainerOutput) => { if (output.newSessionId) { @@ -382,11 +353,9 @@ async function startMessageLoop(): Promise { if (messages.length > 0) { logger.info({ count: messages.length }, 'New messages'); - // Advance the "seen" cursor for all messages immediately lastTimestamp = newTimestamp; saveState(); - // Deduplicate by group const messagesByGroup = new Map(); for (const msg of messages) { const existing = messagesByGroup.get(msg.chat_jid); @@ -412,9 +381,6 @@ async function startMessageLoop(): Promise { const isMainGroup = group.folder === MAIN_GROUP_FOLDER; const needsTrigger = !isMainGroup && group.requiresTrigger !== false; - // For non-main groups, only act on trigger messages. - // Non-trigger messages accumulate in DB and get pulled as - // context when a trigger eventually arrives. if (needsTrigger) { const hasTrigger = groupMessages.some((m) => TRIGGER_PATTERN.test(m.content.trim()), @@ -422,8 +388,6 @@ async function startMessageLoop(): Promise { if (!hasTrigger) continue; } - // Pull all messages since lastAgentTimestamp so non-trigger - // context that accumulated between triggers is included. const allPending = getMessagesSince( chatJid, lastAgentTimestamp[chatJid] || '', @@ -441,14 +405,12 @@ async function startMessageLoop(): Promise { lastAgentTimestamp[chatJid] = messagesToSend[messagesToSend.length - 1].timestamp; saveState(); - // Show typing indicator while the container processes the piped message channel .setTyping?.(chatJid, true) ?.catch((err) => logger.warn({ chatJid, err }, 'Failed to set typing indicator'), ); } else { - // No active container — enqueue for a new one queue.enqueueMessageCheck(chatJid); } } @@ -460,10 +422,6 @@ async function startMessageLoop(): Promise { } } -/** - * Startup recovery: check for unprocessed messages in registered groups. - * Handles crash between advancing lastTimestamp and processing messages. - */ function recoverPendingMessages(): void { for (const [chatJid, group] of Object.entries(registeredGroups)) { const sinceTimestamp = lastAgentTimestamp[chatJid] || ''; @@ -482,15 +440,13 @@ function acquirePidLock(): void { const pidFile = path.join(DATA_DIR, 'ghostclaw.pid'); fs.mkdirSync(DATA_DIR, { recursive: true }); - // Kill any existing instance try { const oldPid = parseInt(fs.readFileSync(pidFile, 'utf-8').trim(), 10); if (oldPid && oldPid !== process.pid) { try { - process.kill(oldPid, 0); // Check if alive + process.kill(oldPid, 0); logger.warn({ oldPid }, 'Killing existing GhostClaw process'); process.kill(oldPid, 'SIGTERM'); - // Brief wait for clean shutdown const start = Date.now(); while (Date.now() - start < 3000) { try { @@ -500,7 +456,6 @@ function acquirePidLock(): void { } Atomics.wait(new Int32Array(new SharedArrayBuffer(4)), 0, 0, 100); } - // Force kill if still alive try { process.kill(oldPid, 'SIGKILL'); } catch { @@ -529,7 +484,6 @@ function releasePidLock(): void { async function main(): Promise { acquirePidLock(); - // Clear stale errors from previous runs so heartbeat starts fresh const errorsLog = path.join(process.cwd(), 'logs', 'errors.log'); try { fs.writeFileSync(errorsLog, ''); @@ -541,7 +495,6 @@ async function main(): Promise { logger.info('Database initialized'); loadState(); - // Graceful shutdown handlers const shutdown = async (signal: string) => { logger.info({ signal }, 'Shutdown signal received'); releasePidLock(); @@ -552,7 +505,6 @@ async function main(): Promise { process.on('SIGTERM', () => shutdown('SIGTERM')); process.on('SIGINT', () => shutdown('SIGINT')); - // Channel callbacks (shared by all channels) const channelOpts = { onMessage: (_chatJid: string, msg: NewMessage) => { storeMessage(msg); @@ -575,10 +527,12 @@ async function main(): Promise { isGroup?: boolean, ) => storeChatMetadata(chatJid, timestamp, name, channel, isGroup), registeredGroups: () => registeredGroups, - onReset: (chatJid: string) => queue.killAgent(chatJid), + onReset: (chatJid: string) => { + queue.clearQueue(chatJid); + return queue.killAgent(chatJid); + }, }; - // Create and connect channels if (TELEGRAM_BOT_TOKEN) { const telegram = new TelegramChannel(TELEGRAM_BOT_TOKEN, channelOpts); channels.push(telegram); @@ -591,7 +545,6 @@ async function main(): Promise { await whatsapp.connect(); } - // Initialize error alerts for main group const mainGroupJid = Object.keys(registeredGroups).find( (jid) => registeredGroups[jid].folder === MAIN_GROUP_FOLDER, ); @@ -604,11 +557,9 @@ async function main(): Promise { initErrorAlerts(sendMessageToAdmin, mainGroupJid); } - // Start dashboard setDashboardChannels(channels); startDashboard(); - // Start subsystems (independently of connection handler) startSchedulerLoop({ registeredGroups: () => registeredGroups, getSessions: () => sessions, @@ -664,7 +615,6 @@ async function main(): Promise { }); } -// Guard: only run when executed directly, not when imported by tests const isDirectRun = process.argv[1] && new URL(import.meta.url).pathname ===