-
Notifications
You must be signed in to change notification settings - Fork 7
v0.5.5 — /update command + /reset queue clear #1
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -1,6 +1,7 @@ | ||
| import { Bot, InputFile } from 'grammy'; | ||
| import path from 'path'; | ||
| import fs from 'fs'; | ||
| import { execSync } from 'child_process'; | ||
|
|
||
| import { ASSISTANT_NAME, TRIGGER_PATTERN } from '../config.js'; | ||
| import { logger } from '../logger.js'; | ||
|
|
@@ -56,19 +57,43 @@ export class TelegramChannel implements Channel { | |
| ctx.reply(`${ASSISTANT_NAME} is online.`); | ||
| }); | ||
|
|
||
| // Command to force-kill a stalled agent and start fresh | ||
| // Command to force-kill a stalled agent, clear the queue, and start fresh | ||
| this.bot.command('reset', (ctx) => { | ||
| const chatJid = `tg:${ctx.chat.id}`; | ||
| const group = this.opts.registeredGroups()[chatJid]; | ||
| if (!group) { | ||
| ctx.reply('Not a registered chat.'); | ||
| return; | ||
| } | ||
| const killed = this.opts.onReset?.(chatJid) ?? false; | ||
| if (killed) { | ||
| ctx.reply('Reset. Agent killed — send me something to start fresh.'); | ||
| } else { | ||
| ctx.reply('Nothing running. Send me something to start.'); | ||
| this.opts.onReset?.(chatJid); | ||
| ctx.reply('Reset. Agent killed and queue cleared — send me something to start fresh.'); | ||
| }); | ||
|
|
||
| // Command to pull latest code and restart | ||
| this.bot.command('update', async (ctx) => { | ||
| const chatJid = `tg:${ctx.chat.id}`; | ||
| const group = this.opts.registeredGroups()[chatJid]; | ||
| if (!group) { | ||
| ctx.reply('Not a registered chat.'); | ||
| return; | ||
| } | ||
|
|
||
| await ctx.reply('Pulling latest code...'); | ||
| const cwd = process.cwd(); | ||
|
|
||
| try { | ||
| const pullOut = execSync('git pull', { cwd, encoding: 'utf-8' }); | ||
| await ctx.reply(`git pull: ${pullOut.trim()}`); | ||
|
|
||
| await ctx.reply('Building...'); | ||
| execSync('npm run build', { cwd, encoding: 'utf-8', timeout: 120_000 }); | ||
|
|
||
| await ctx.reply('Done. Restarting — back in a moment.'); | ||
| setTimeout(() => process.exit(0), 500); | ||
| } catch (err) { | ||
| const msg = err instanceof Error ? err.message : String(err); | ||
| await ctx.reply(`Update failed:\n${msg.slice(0, 500)}`); | ||
| logger.error({ err }, '/update command failed'); | ||
| } | ||
|
Comment on lines
+73
to
97
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
This command can execute repo update/build/restart, but it’s currently available to any registered chat. That’s a high-impact control path and should be restricted to an admin channel/allowlist. 🔐 Proposed fix (restrict to main/admin group)-import { ASSISTANT_NAME, TRIGGER_PATTERN } from '../config.js';
+import { ASSISTANT_NAME, MAIN_GROUP_FOLDER, TRIGGER_PATTERN } from '../config.js';
this.bot.command('update', async (ctx) => {
const chatJid = `tg:${ctx.chat.id}`;
const group = this.opts.registeredGroups()[chatJid];
if (!group) {
ctx.reply('Not a registered chat.');
return;
}
+ if (group.folder !== MAIN_GROUP_FOLDER) {
+ await ctx.reply('Not authorized to run /update from this chat.');
+ return;
+ }
await ctx.reply('Pulling latest code...');🤖 Prompt for AI Agents |
||
| }); | ||
|
|
||
|
|
@@ -87,15 +112,11 @@ export class TelegramChannel implements Channel { | |
| const sender = ctx.from?.id.toString() || ''; | ||
| const msgId = ctx.message.message_id.toString(); | ||
|
|
||
| // Determine chat name | ||
| const chatName = | ||
| ctx.chat.type === 'private' | ||
| ? senderName | ||
| : (ctx.chat as any).title || chatJid; | ||
|
|
||
| // Translate Telegram @bot_username mentions into TRIGGER_PATTERN format. | ||
| // Telegram @mentions (e.g., @andy_ai_bot) won't match TRIGGER_PATTERN | ||
| // (e.g., ^@Andy\b), so we prepend the trigger when the bot is @mentioned. | ||
| const botUsername = ctx.me?.username?.toLowerCase(); | ||
| if (botUsername) { | ||
| const entities = ctx.message.entities || []; | ||
|
|
@@ -113,7 +134,6 @@ export class TelegramChannel implements Channel { | |
| } | ||
| } | ||
|
|
||
| // Store chat metadata for discovery | ||
| const isGroup = | ||
| ctx.chat.type === 'group' || ctx.chat.type === 'supergroup'; | ||
| this.opts.onChatMetadata( | ||
|
|
@@ -124,7 +144,6 @@ export class TelegramChannel implements Channel { | |
| isGroup, | ||
| ); | ||
|
|
||
| // Only deliver full message for registered groups | ||
| const group = this.opts.registeredGroups()[chatJid]; | ||
| if (!group) { | ||
| logger.debug( | ||
|
|
@@ -134,7 +153,6 @@ export class TelegramChannel implements Channel { | |
| return; | ||
| } | ||
|
|
||
| // Deliver message — startMessageLoop() will pick it up | ||
| this.opts.onMessage(chatJid, { | ||
| id: msgId, | ||
| chat_jid: chatJid, | ||
|
|
@@ -152,7 +170,6 @@ export class TelegramChannel implements Channel { | |
| ); | ||
| }); | ||
|
|
||
| // Handle non-text messages with placeholders so the agent knows something was sent | ||
| const storeNonText = (ctx: any, placeholder: string) => { | ||
| const chatJid = `tg:${ctx.chat.id}`; | ||
| const group = this.opts.registeredGroups()[chatJid]; | ||
|
|
@@ -191,19 +208,16 @@ export class TelegramChannel implements Channel { | |
| const chatJid = `tg:${ctx.chat.id}`; | ||
| const group = this.opts.registeredGroups()[chatJid]; | ||
|
|
||
| // Download and save photo | ||
| let placeholder = '[Photo]'; | ||
| try { | ||
| const photos = ctx.message.photo; | ||
| const largestPhoto = photos[photos.length - 1]; // Get highest resolution | ||
| const largestPhoto = photos[photos.length - 1]; | ||
| const file = await ctx.api.getFile(largestPhoto.file_id); | ||
| const url = `https://api.telegram.org/file/bot${this.botToken}/${file.file_path}`; | ||
|
|
||
| // Create media directory if needed | ||
| const mediaDir = path.join(process.cwd(), 'data', 'telegram-media'); | ||
| await fs.promises.mkdir(mediaDir, { recursive: true }); | ||
|
|
||
| // Download photo | ||
| const resp = await fetch(url); | ||
| if (resp.ok) { | ||
| const buffer = Buffer.from(await resp.arrayBuffer()); | ||
|
|
@@ -218,9 +232,7 @@ export class TelegramChannel implements Channel { | |
| 'Downloaded Telegram photo', | ||
| ); | ||
|
|
||
| // Also save to Desktop for easy access | ||
| if (chatJid === 'tg:414798121') { | ||
| // Main chat | ||
| const desktopPath = path.join( | ||
| process.env.HOME || '', | ||
| 'Desktop', | ||
|
|
@@ -280,12 +292,10 @@ export class TelegramChannel implements Channel { | |
| this.bot.on('message:location', (ctx) => storeNonText(ctx, '[Location]')); | ||
| this.bot.on('message:contact', (ctx) => storeNonText(ctx, '[Contact]')); | ||
|
|
||
| // Handle errors gracefully | ||
| this.bot.catch((err) => { | ||
| logger.error({ err: err.message }, 'Telegram bot error'); | ||
| }); | ||
|
|
||
| // Start polling — returns a Promise that resolves when started | ||
| return new Promise<void>((resolve) => { | ||
| this.bot!.start({ | ||
| onStart: (botInfo) => { | ||
|
|
@@ -316,7 +326,6 @@ export class TelegramChannel implements Channel { | |
| try { | ||
| const numericId = jid.replace(/^tg:/, ''); | ||
|
|
||
| // Send as voice note if replying to a voice message | ||
| if (voiceReply) { | ||
| logger.info({ jid }, 'Attempting voice reply via TTS'); | ||
| const audioBuffer = await textToSpeech(text); | ||
|
|
@@ -333,17 +342,14 @@ export class TelegramChannel implements Channel { | |
| } | ||
| } | ||
|
|
||
| // Convert markdown to Telegram HTML so bold, italic, code etc. render | ||
| const html = markdownToTelegramHtml(text); | ||
|
|
||
| // Telegram has a 4096 character limit per message — split if needed | ||
| const MAX_LENGTH = 4096; | ||
| if (html.length <= MAX_LENGTH) { | ||
| await this.bot.api.sendMessage(numericId, html, { | ||
| parse_mode: 'HTML', | ||
| }); | ||
| } else { | ||
| // Split on newlines, closing/reopening HTML tags across chunk boundaries | ||
| const chunks: string[] = []; | ||
| let current = ''; | ||
| const TELEGRAM_TAGS = ['pre', 'code', 'b', 'i', 'u', 's', 'a']; | ||
|
|
@@ -358,13 +364,11 @@ export class TelegramChannel implements Channel { | |
| } | ||
| if (current) chunks.push(current); | ||
|
|
||
| // Fix unclosed tags in each chunk | ||
| const fixedChunks: string[] = []; | ||
| let carryTags: string[] = []; // tags to reopen in next chunk | ||
| let carryTags: string[] = []; | ||
| for (const chunk of chunks) { | ||
| let fixed = carryTags.map((t) => `<${t}>`).join('') + chunk; | ||
|
|
||
| // Track which tags are open at the end of this chunk | ||
| const openTags: string[] = []; | ||
| for (const tag of TELEGRAM_TAGS) { | ||
| const opens = ( | ||
|
|
@@ -377,7 +381,6 @@ export class TelegramChannel implements Channel { | |
| } | ||
| } | ||
|
|
||
| // Close any unclosed tags at end of this chunk (reverse order) | ||
| carryTags = [...openTags]; | ||
| for (const tag of [...openTags].reverse()) { | ||
| fixed += `</${tag}>`; | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -68,17 +68,12 @@ export class GroupQueue { | |
| const state = this.getGroup(groupJid); | ||
|
|
||
| if (state.active) { | ||
| // If an idle task container is blocking messages, kill it so the | ||
| // message can be processed immediately instead of waiting for the | ||
| // task's idle timeout (which can be 30+ minutes). | ||
| if (state.isTaskContainer && state.idleWaiting) { | ||
| logger.info( | ||
| { groupJid }, | ||
| 'Preempting idle task container for incoming message', | ||
| ); | ||
| this.closeStdin(groupJid); | ||
| // The message will be picked up when drainGroup runs after the | ||
| // task container exits (pendingMessages flag below). | ||
| } | ||
| state.pendingMessages = true; | ||
| if (state.isTaskContainer && this.onMessageQueuedFn) { | ||
|
|
@@ -110,7 +105,6 @@ export class GroupQueue { | |
|
|
||
| const state = this.getGroup(groupJid); | ||
|
|
||
| // Prevent double-queuing of the same task | ||
| if (state.pendingTasks.some((t) => t.id === taskId)) { | ||
| logger.debug({ groupJid, taskId }, 'Task already queued, skipping'); | ||
| return; | ||
|
|
@@ -137,7 +131,6 @@ export class GroupQueue { | |
| return; | ||
| } | ||
|
|
||
| // Run immediately | ||
| this.runTask(groupJid, { id: taskId, groupJid, fn }).catch((err) => | ||
| logger.error({ groupJid, taskId, err }, 'Unhandled error in runTask'), | ||
| ); | ||
|
|
@@ -155,10 +148,6 @@ export class GroupQueue { | |
| if (groupFolder) state.groupFolder = groupFolder; | ||
| } | ||
|
|
||
| /** | ||
| * Mark the container as idle-waiting (finished work, waiting for IPC input). | ||
| * If tasks are pending, preempt the idle container immediately. | ||
| */ | ||
| notifyIdle(groupJid: string): void { | ||
| const state = this.getGroup(groupJid); | ||
| state.idleWaiting = true; | ||
|
|
@@ -167,15 +156,11 @@ export class GroupQueue { | |
| } | ||
| } | ||
|
|
||
| /** | ||
| * Send a follow-up message to the active container via IPC file. | ||
| * Returns true if the message was written, false if no active container. | ||
| */ | ||
| sendMessage(groupJid: string, text: string): boolean { | ||
| const state = this.getGroup(groupJid); | ||
| if (!state.active || !state.groupFolder || state.isTaskContainer) | ||
| return false; | ||
| state.idleWaiting = false; // Agent is about to receive work, no longer idle | ||
| state.idleWaiting = false; | ||
|
|
||
| const inputDir = path.join(DATA_DIR, 'ipc', state.groupFolder, 'input'); | ||
| try { | ||
|
|
@@ -191,9 +176,6 @@ export class GroupQueue { | |
| } | ||
| } | ||
|
|
||
| /** | ||
| * Signal the active container to wind down by writing a close sentinel. | ||
| */ | ||
| closeStdin(groupJid: string): void { | ||
| const state = this.getGroup(groupJid); | ||
| if (!state.active || !state.groupFolder) return; | ||
|
|
@@ -220,6 +202,22 @@ export class GroupQueue { | |
| return true; | ||
| } | ||
|
|
||
| /** | ||
| * Clear all pending tasks and messages for a group. | ||
| * Call alongside killAgent() on /reset so the queue doesn't keep draining. | ||
| */ | ||
| clearQueue(groupJid: string): void { | ||
| const state = this.getGroup(groupJid); | ||
| const taskCount = state.pendingTasks.length; | ||
| state.pendingTasks = []; | ||
| state.pendingMessages = false; | ||
| // Remove from waiting list too | ||
| this.waitingGroups = this.waitingGroups.filter((jid) => jid !== groupJid); | ||
| if (taskCount > 0) { | ||
| logger.info({ groupJid, taskCount }, 'Queue cleared on reset'); | ||
| } | ||
| } | ||
|
Comment on lines
+205
to
+219
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
A previously scheduled retry can still fire after reset and restart processing for the same group. That makes 🛠️ Proposed fix interface GroupState {
active: boolean;
idleWaiting: boolean;
isTaskContainer: boolean;
pendingMessages: boolean;
pendingTasks: QueuedTask[];
process: ChildProcess | null;
containerName: string | null;
groupFolder: string | null;
retryCount: number;
+ retryTimer: NodeJS.Timeout | null;
}
// in getGroup() initializer
state = {
active: false,
idleWaiting: false,
isTaskContainer: false,
pendingMessages: false,
pendingTasks: [],
process: null,
containerName: null,
groupFolder: null,
retryCount: 0,
+ retryTimer: null,
};
clearQueue(groupJid: string): void {
const state = this.getGroup(groupJid);
const taskCount = state.pendingTasks.length;
state.pendingTasks = [];
state.pendingMessages = false;
+ state.retryCount = 0;
+ if (state.retryTimer) {
+ clearTimeout(state.retryTimer);
+ state.retryTimer = null;
+ }
this.waitingGroups = this.waitingGroups.filter((jid) => jid !== groupJid);
if (taskCount > 0) {
logger.info({ groupJid, taskCount }, 'Queue cleared on reset');
}
}
private scheduleRetry(groupJid: string, state: GroupState): void {
state.retryCount++;
if (state.retryCount > MAX_RETRIES) {
...
return;
}
const delayMs = BASE_RETRY_MS * Math.pow(2, state.retryCount - 1);
+ if (state.retryTimer) clearTimeout(state.retryTimer);
- setTimeout(() => {
+ state.retryTimer = setTimeout(() => {
+ state.retryTimer = null;
if (!this.shuttingDown) {
this.enqueueMessageCheck(groupJid);
}
}, delayMs);
}🤖 Prompt for AI Agents |
||
|
|
||
| private async runForGroup( | ||
| groupJid: string, | ||
| reason: 'messages' | 'drain', | ||
|
|
@@ -313,7 +311,6 @@ export class GroupQueue { | |
|
|
||
| const state = this.getGroup(groupJid); | ||
|
|
||
| // Tasks first (they won't be re-discovered from SQLite like messages) | ||
| if (state.pendingTasks.length > 0) { | ||
| const task = state.pendingTasks.shift()!; | ||
| this.runTask(groupJid, task).catch((err) => | ||
|
|
@@ -325,7 +322,6 @@ export class GroupQueue { | |
| return; | ||
| } | ||
|
|
||
| // Then pending messages | ||
| if (state.pendingMessages) { | ||
| this.runForGroup(groupJid, 'drain').catch((err) => | ||
| logger.error( | ||
|
|
@@ -336,7 +332,6 @@ export class GroupQueue { | |
| return; | ||
| } | ||
|
|
||
| // Nothing pending for this group; check if other groups are waiting for a slot | ||
| this.drainWaiting(); | ||
| } | ||
|
|
||
|
|
@@ -348,7 +343,6 @@ export class GroupQueue { | |
| const nextJid = this.waitingGroups.shift()!; | ||
| const state = this.getGroup(nextJid); | ||
|
|
||
| // Prioritize tasks over messages | ||
| if (state.pendingTasks.length > 0) { | ||
| const task = state.pendingTasks.shift()!; | ||
| this.runTask(nextJid, task).catch((err) => | ||
|
|
@@ -365,16 +359,12 @@ export class GroupQueue { | |
| ), | ||
| ); | ||
| } | ||
| // If neither pending, skip this group | ||
| } | ||
| } | ||
|
|
||
| async shutdown(_gracePeriodMs: number): Promise<void> { | ||
| this.shuttingDown = true; | ||
|
|
||
| // Count active containers but don't kill them — they'll finish on their own | ||
| // via idle timeout or container timeout. The --rm flag cleans them up on exit. | ||
| // This prevents WhatsApp reconnection restarts from killing working agents. | ||
| const activeContainers: string[] = []; | ||
| for (const [jid, state] of this.groups) { | ||
| if (state.process && !state.process.killed && state.containerName) { | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
/resetresponse is misleading when nothing was running.onResetreturns kill status, but the reply always says “Agent killed”. This can misreport state.💡 Proposed fix
📝 Committable suggestion
🤖 Prompt for AI Agents