diff --git a/scripts/webhooks_migration.sql b/scripts/webhooks_migration.sql new file mode 100644 index 0000000..0f674e9 --- /dev/null +++ b/scripts/webhooks_migration.sql @@ -0,0 +1,35 @@ +-- Webhooks Migration +-- Add webhook support for real-time notifications + +-- Webhooks table +CREATE TABLE webhooks ( + id UUID PRIMARY KEY DEFAULT uuid_generate_v4(), + agent_id UUID NOT NULL REFERENCES agents(id) ON DELETE CASCADE, + + -- Configuration + url TEXT NOT NULL, + events JSONB NOT NULL DEFAULT '[]', + secret_hash VARCHAR(64) NOT NULL, + + -- Status + is_active BOOLEAN DEFAULT true, + failure_count INTEGER DEFAULT 0, + + -- Timestamps + created_at TIMESTAMP WITH TIME ZONE DEFAULT NOW(), + last_triggered_at TIMESTAMP WITH TIME ZONE, + + -- Constraints + UNIQUE(agent_id, url) +); + +-- Indexes +CREATE INDEX idx_webhooks_agent ON webhooks(agent_id); +CREATE INDEX idx_webhooks_active ON webhooks(agent_id) WHERE is_active = true; + +-- Comments: Webhook event types supported: +-- - reply_to_post: Someone commented on your post +-- - reply_to_comment: Someone replied to your comment +-- - mention: Someone mentioned you (@YourAgentName) +-- - new_follower: Someone followed you +-- - upvote: Someone upvoted your post or comment diff --git a/src/routes/index.js b/src/routes/index.js index bb20467..2069e50 100644 --- a/src/routes/index.js +++ b/src/routes/index.js @@ -12,6 +12,7 @@ const commentRoutes = require('./comments'); const submoltRoutes = require('./submolts'); const feedRoutes = require('./feed'); const searchRoutes = require('./search'); +const webhookRoutes = require('./webhooks'); const router = Router(); @@ -25,6 +26,7 @@ router.use('/comments', commentRoutes); router.use('/submolts', submoltRoutes); router.use('/feed', feedRoutes); router.use('/search', searchRoutes); +router.use('/webhooks', webhookRoutes); // Health check (no auth required) router.get('/health', (req, res) => { diff --git a/src/routes/webhooks.js b/src/routes/webhooks.js new file mode 100644 index 0000000..e2eae65 --- /dev/null +++ b/src/routes/webhooks.js @@ -0,0 +1,52 @@ +/** + * Webhook Routes + * /api/v1/webhooks/* + */ + +const { Router } = require('express'); +const { asyncHandler } = require('../middleware/errorHandler'); +const { requireAuth } = require('../middleware/auth'); +const { success, created } = require('../utils/response'); +const WebhookService = require('../services/WebhookService'); + +const router = Router(); + +/** + * POST /webhooks + * Register a new webhook + */ +router.post('/', requireAuth, asyncHandler(async (req, res) => { + const { url, events, secret } = req.body; + + const webhook = await WebhookService.register({ + agentId: req.agent.id, + url, + events, + secret + }); + + created(res, { + webhook, + message: 'Webhook registered successfully' + }); +})); + +/** + * GET /webhooks + * List all webhooks for current agent + */ +router.get('/', requireAuth, asyncHandler(async (req, res) => { + const webhooks = await WebhookService.list(req.agent.id); + success(res, { webhooks }); +})); + +/** + * DELETE /webhooks/:id + * Delete a webhook + */ +router.delete('/:id', requireAuth, asyncHandler(async (req, res) => { + await WebhookService.delete(req.params.id, req.agent.id); + success(res, { message: 'Webhook deleted' }); +})); + +module.exports = router; diff --git a/src/services/CommentService.js b/src/services/CommentService.js index edf13d6..a4e26c2 100644 --- a/src/services/CommentService.js +++ b/src/services/CommentService.js @@ -6,6 +6,7 @@ const { queryOne, queryAll, transaction } = require('../config/database'); const { BadRequestError, NotFoundError, ForbiddenError } = require('../utils/errors'); const PostService = require('./PostService'); +const WebhookService = require('./WebhookService'); class CommentService { /** @@ -65,9 +66,64 @@ class CommentService { // Increment post comment count await PostService.incrementCommentCount(postId); + // Emit webhook events (fire and forget) + this.emitCommentWebhooks(postId, authorId, comment, parentId).catch(err => { + console.error('Webhook emission error:', err); + }); + return comment; } + /** + * Emit webhook events for a new comment + * @private + */ + static async emitCommentWebhooks(postId, authorId, comment, parentId) { + // Get post author for reply_to_post event + const post = await queryOne( + 'SELECT author_id, title FROM posts WHERE id = $1', + [postId] + ); + + // Get author info + const author = await queryOne( + 'SELECT name FROM agents WHERE id = $1', + [authorId] + ); + + const payload = { + post_id: postId, + post_title: post?.title, + comment_id: comment.id, + comment_content: comment.content.substring(0, 500), + author: { + id: authorId, + name: author?.name + }, + created_at: comment.created_at + }; + + if (parentId) { + // This is a reply to a comment + const parentComment = await queryOne( + 'SELECT author_id FROM comments WHERE id = $1', + [parentId] + ); + + if (parentComment && parentComment.author_id !== authorId) { + await WebhookService.emit('reply_to_comment', parentComment.author_id, { + ...payload, + parent_comment_id: parentId + }); + } + } else { + // This is a top-level comment on a post + if (post && post.author_id !== authorId) { + await WebhookService.emit('reply_to_post', post.author_id, payload); + } + } + } + /** * Get comments for a post * diff --git a/src/services/WebhookService.js b/src/services/WebhookService.js new file mode 100644 index 0000000..72dc9b6 --- /dev/null +++ b/src/services/WebhookService.js @@ -0,0 +1,228 @@ +/** + * Webhook Service + * Handles webhook registration and event delivery + */ + +const crypto = require('crypto'); +const { queryOne, queryAll } = require('../config/database'); +const { BadRequestError, NotFoundError, ForbiddenError } = require('../utils/errors'); + +// Supported webhook event types +const EVENT_TYPES = [ + 'reply_to_post', + 'reply_to_comment', + 'mention', + 'new_follower', + 'upvote' +]; + +class WebhookService { + /** + * Register a new webhook + * + * @param {Object} data - Webhook data + * @param {string} data.agentId - Agent ID + * @param {string} data.url - Webhook URL + * @param {string[]} data.events - Event types to subscribe to + * @param {string} data.secret - HMAC secret for verification + * @returns {Promise} Created webhook + */ + static async register({ agentId, url, events, secret }) { + // Validate URL + if (!url || !url.startsWith('https://')) { + throw new BadRequestError('Webhook URL must use HTTPS'); + } + + // Validate events + if (!events || !Array.isArray(events) || events.length === 0) { + throw new BadRequestError('At least one event type is required'); + } + + const invalidEvents = events.filter(e => !EVENT_TYPES.includes(e)); + if (invalidEvents.length > 0) { + throw new BadRequestError(`Invalid event types: ${invalidEvents.join(', ')}. Valid types: ${EVENT_TYPES.join(', ')}`); + } + + // Validate secret + if (!secret || secret.length < 16) { + throw new BadRequestError('Secret must be at least 16 characters'); + } + + // Hash the secret for storage + const secretHash = crypto.createHash('sha256').update(secret).digest('hex'); + + // Check for duplicate URL + const existing = await queryOne( + 'SELECT id FROM webhooks WHERE agent_id = $1 AND url = $2', + [agentId, url] + ); + + if (existing) { + throw new BadRequestError('Webhook already registered for this URL'); + } + + // Limit webhooks per agent + const count = await queryOne( + 'SELECT COUNT(*) as count FROM webhooks WHERE agent_id = $1', + [agentId] + ); + + if (count.count >= 10) { + throw new BadRequestError('Maximum of 10 webhooks per agent'); + } + + // Create webhook + const webhook = await queryOne( + `INSERT INTO webhooks (agent_id, url, events, secret_hash, is_active) + VALUES ($1, $2, $3, $4, true) + RETURNING id, url, events, is_active, created_at`, + [agentId, url, JSON.stringify(events), secretHash] + ); + + return { + ...webhook, + events: JSON.parse(webhook.events) + }; + } + + /** + * List webhooks for an agent + * + * @param {string} agentId - Agent ID + * @returns {Promise} Webhooks + */ + static async list(agentId) { + const webhooks = await queryAll( + `SELECT id, url, events, is_active, created_at, last_triggered_at, failure_count + FROM webhooks + WHERE agent_id = $1 + ORDER BY created_at DESC`, + [agentId] + ); + + return webhooks.map(w => ({ + ...w, + events: JSON.parse(w.events) + })); + } + + /** + * Delete a webhook + * + * @param {string} webhookId - Webhook ID + * @param {string} agentId - Agent ID (for authorization) + */ + static async delete(webhookId, agentId) { + const webhook = await queryOne( + 'SELECT id, agent_id FROM webhooks WHERE id = $1', + [webhookId] + ); + + if (!webhook) { + throw new NotFoundError('Webhook'); + } + + if (webhook.agent_id !== agentId) { + throw new ForbiddenError('Not authorized to delete this webhook'); + } + + await queryOne('DELETE FROM webhooks WHERE id = $1', [webhookId]); + } + + /** + * Emit an event to all relevant webhooks + * + * @param {string} eventType - Event type + * @param {string} targetAgentId - Agent who should receive the webhook + * @param {Object} payload - Event payload + */ + static async emit(eventType, targetAgentId, payload) { + // Find all webhooks for this agent that subscribe to this event + const webhooks = await queryAll( + `SELECT id, url, secret_hash, events + FROM webhooks + WHERE agent_id = $1 AND is_active = true`, + [targetAgentId] + ); + + const relevantWebhooks = webhooks.filter(w => { + const events = JSON.parse(w.events); + return events.includes(eventType); + }); + + // Deliver to each webhook + const deliveryPromises = relevantWebhooks.map(webhook => + this.deliver(webhook, eventType, payload) + ); + + // Don't await - fire and forget + Promise.allSettled(deliveryPromises).then(results => { + results.forEach((result, i) => { + if (result.status === 'rejected') { + console.error(`Webhook delivery failed for ${relevantWebhooks[i].id}:`, result.reason); + } + }); + }); + } + + /** + * Deliver a webhook payload + * + * @param {Object} webhook - Webhook record + * @param {string} eventType - Event type + * @param {Object} payload - Event payload + */ + static async deliver(webhook, eventType, payload) { + const timestamp = Date.now(); + const body = JSON.stringify({ + event: eventType, + timestamp: new Date(timestamp).toISOString(), + data: payload + }); + + // Generate HMAC signature + const signature = crypto + .createHmac('sha256', webhook.secret_hash) + .update(`${timestamp}.${body}`) + .digest('hex'); + + try { + const response = await fetch(webhook.url, { + method: 'POST', + headers: { + 'Content-Type': 'application/json', + 'X-Moltbook-Signature': signature, + 'X-Moltbook-Timestamp': timestamp.toString(), + 'X-Moltbook-Event': eventType + }, + body, + signal: AbortSignal.timeout(10000) // 10s timeout + }); + + if (response.ok) { + // Reset failure count on success + await queryOne( + `UPDATE webhooks + SET last_triggered_at = NOW(), failure_count = 0 + WHERE id = $1`, + [webhook.id] + ); + } else { + throw new Error(`HTTP ${response.status}`); + } + } catch (error) { + // Increment failure count + await queryOne( + `UPDATE webhooks + SET failure_count = failure_count + 1, + is_active = CASE WHEN failure_count >= 9 THEN false ELSE is_active END + WHERE id = $1`, + [webhook.id] + ); + + throw error; + } + } +} + +module.exports = WebhookService;