Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
45 changes: 45 additions & 0 deletions src/Gateway/routes.ts
Original file line number Diff line number Diff line change
Expand Up @@ -886,6 +886,51 @@ router.get(
}
);

router.get(
"/admin/jobs/stats",
authenticateToken,
requireAdmin,
async (_req: Request, res: Response) => {
try {
const stats = await jobQueueService.getQueueStats();
res.json({
success: true,
timestamp: new Date().toISOString(),
stats,
});
} catch (error) {
logger.error("Failed to fetch job queue stats", { error });
res.status(500).json({
success: false,
message: "Failed to fetch job queue stats",
});
}
},
);

router.get(
"/admin/jobs/dead-letter",
authenticateToken,
requireAdmin,
async (req: Request, res: Response) => {
try {
const limit = Math.min(Number(req.query.limit) || 25, 100);
const jobs = await jobQueueService.getDeadLetterJobs(limit);
res.json({
success: true,
count: jobs.length,
jobs,
});
} catch (error) {
logger.error("Failed to fetch dead-letter jobs", { error });
res.status(500).json({
success: false,
message: "Failed to fetch dead-letter jobs",
});
}
},
);

// --- REAL-TIME UPDATES (Socket.io) ---

/**
Expand Down
94 changes: 36 additions & 58 deletions src/Gateway/webhook.service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,9 @@ import { Request } from "express";
import crypto from "crypto";
import AppDataSource from "../config/Datasource";
import { User } from "../Auth/user.entity";
import logger from "../config/logger";
import { jobQueueService } from "../jobs/jobQueue.service";
import { webhookIdempotencyService } from "./webhookIdempotency.service";

/**
* Stellar Horizon Webhook Payload Types
Expand Down Expand Up @@ -41,7 +44,6 @@ export interface WebhookResult {
export class StellarWebhookService {
private readonly WEBHOOK_SECRET: string;
private readonly userRepository = AppDataSource.getRepository(User);
private readonly processedWebhooks = new Map<string, number>();

constructor() {
this.WEBHOOK_SECRET = process.env.STELLAR_WEBHOOK_SECRET || "";
Expand Down Expand Up @@ -155,31 +157,6 @@ export class StellarWebhookService {
return true;
}

/**
* Check for idempotency - prevent duplicate webhook processing
*/
private isDuplicateWebhook(webhookId: string): boolean {
const now = Date.now();
const lastProcessed = this.processedWebhooks.get(webhookId);

// If already processed within the last 5 minutes, treat as duplicate
if (lastProcessed && now - lastProcessed < 5 * 60 * 1000) {
return true;
}

// Mark as processed
this.processedWebhooks.set(webhookId, now);

// Clean up old entries (older than 10 minutes)
for (const [id, timestamp] of this.processedWebhooks.entries()) {
if (now - timestamp > 10 * 60 * 1000) {
this.processedWebhooks.delete(id);
}
}

return false;
}

/**
* Find user by Stellar address
*/
Expand Down Expand Up @@ -209,33 +186,6 @@ export class StellarWebhookService {
return await this.userRepository.save(user);
}

/**
* Trigger auto-deployment for a funded user
* This is a placeholder - implement your actual deployment logic here
*/
private async triggerAutoDeployment(user: User): Promise<boolean> {
try {
console.log(`Triggering auto-deployment for user: ${user.id}`);

// TODO: Implement your actual deployment logic here
// This could involve:
// - Calling a deployment service
// - Triggering a smart contract deployment
// - Sending a notification to another service
// - Queuing a deployment job

// For now, we'll just mark the user as deployed
user.isDeployed = true;
await this.userRepository.save(user);

console.log(`Auto-deployment completed for user: ${user.id}`);
return true;
} catch (error) {
console.error("Error triggering auto-deployment:", error);
return false;
}
}

/**
* Process the funding webhook
*/
Expand Down Expand Up @@ -282,7 +232,16 @@ export class StellarWebhookService {
const payload: StellarWebhookPayload = rawBody;

// Check for idempotency
if (this.isDuplicateWebhook(payload.id)) {
const isNewWebhook = await webhookIdempotencyService.checkAndMark(
payload.id,
"stellar_funding",
{
transactionHash: payload.data.transaction_hash,
account: payload.data.account,
},
);

if (!isNewWebhook) {
return {
success: true,
message: "Webhook already processed (idempotent)",
Expand Down Expand Up @@ -315,17 +274,36 @@ export class StellarWebhookService {
payload.data.transaction_hash
);

// Trigger auto-deployment
const deploymentTriggered = await this.triggerAutoDeployment(updatedUser);
await jobQueueService.enqueue({
queue: "side-effects",
jobType: "funding.auto_deploy",
userId: updatedUser.id,
correlationId: payload.data.transaction_hash,
maxAttempts: 5,
payload: {
userId: updatedUser.id,
transactionHash: payload.data.transaction_hash,
amount: payload.data.amount,
stellarAccount: payload.data.account,
},
});

logger.info("Queued funding auto-deployment", {
userId: updatedUser.id,
webhookId: payload.id,
transactionHash: payload.data.transaction_hash,
});

return {
success: true,
message: "Funding webhook processed successfully",
userId: updatedUser.id,
deploymentTriggered,
deploymentTriggered: true,
};
} catch (error) {
console.error("Error processing funding webhook:", error);
logger.error("Error processing funding webhook", {
error: error instanceof Error ? error.message : String(error),
});
return {
success: false,
message:
Expand Down
6 changes: 5 additions & 1 deletion src/Gateway/webhookIdempotency.service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,11 @@ export class WebhookIdempotencyService {
return true;
} catch (error) {
// Unique constraint violation means duplicate
if (error instanceof Error && error.message.includes("duplicate")) {
const pgError = error as { code?: string; message?: string };
if (
pgError.code === "23505" ||
(error instanceof Error && error.message.includes("duplicate"))
) {
return false;
}
throw error;
Expand Down
12 changes: 12 additions & 0 deletions src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -13,12 +13,23 @@ import { durableOperationService } from "./Reliability/DurableOperationService";
class Server {
private server: http.Server;
private port: number;
private readonly jobWorker: JobWorker;

constructor() {
this.port = config.port || 3000;
this.server = http.createServer(app);
// Initialize Socket.io manager
initializeSocketManager(this.server);
this.jobWorker = new JobWorker(jobQueueService, {
workerId: `api-${process.pid}`,
queues: ["transactions", "side-effects"],
concurrency: 3,
leaseMs: 30000,
pollIntervalMs: 2500,
});
for (const handler of buildDefaultJobHandlers()) {
this.jobWorker.registerHandler(handler);
}
}

public async start(): Promise<void> {
Expand All @@ -31,6 +42,7 @@ class Server {
logger.info("Shutting down gracefully...");
horizonOperationStreamerService.stop();
priceSpikeAlertService.stop();
await this.jobWorker.stop();
await AppDataSource.destroy();
this.server.close(() => {
logger.info("Server closed");
Expand Down
3 changes: 3 additions & 0 deletions src/jobs/index.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
export * from "./job.entity";
export * from "./jobQueue.service";
export * from "./jobWorker";
79 changes: 79 additions & 0 deletions src/jobs/job.entity.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
import {
Column,
CreateDateColumn,
Entity,
Index,
PrimaryGeneratedColumn,
UpdateDateColumn,
} from "typeorm";

export type JobStatus =
| "pending"
| "leased"
| "completed"
| "dead_letter"
| "cancelled";

@Entity({ name: "job_queue" })
@Index(["queue", "status", "availableAt"])
@Index(["jobType", "status", "availableAt"])
@Index(["status", "leaseExpiresAt"])
@Index(["userId", "status"])
export class QueueJob {
@PrimaryGeneratedColumn("uuid")
id!: string;

@Column({ type: "varchar", length: 100 })
queue!: string;

@Column({ type: "varchar", length: 150 })
jobType!: string;

@Column({ type: "varchar", length: 32, default: "pending" })
status!: JobStatus;

@Column({ type: "uuid", nullable: true })
userId?: string | null;

@Column({ type: "varchar", length: 255, nullable: true })
correlationId?: string | null;

@Column({ type: "jsonb" })
payload!: Record<string, unknown>;

@Column({ type: "jsonb", nullable: true })
result?: Record<string, unknown> | null;

@Column({ type: "jsonb", nullable: true })
metadata?: Record<string, unknown> | null;

@Column({ type: "timestamp", default: () => "now()" })
availableAt!: Date;

@Column({ type: "timestamp", nullable: true })
leaseExpiresAt?: Date | null;

@Column({ type: "varchar", length: 120, nullable: true })
leasedBy?: string | null;

@Column({ type: "int", default: 0 })
attempts!: number;

@Column({ type: "int", default: 5 })
maxAttempts!: number;

@Column({ type: "text", nullable: true })
lastError?: string | null;

@Column({ type: "timestamp", nullable: true })
completedAt?: Date | null;

@Column({ type: "timestamp", nullable: true })
deadLetteredAt?: Date | null;

@CreateDateColumn()
createdAt!: Date;

@UpdateDateColumn()
updatedAt!: Date;
}
Loading