Skip to content

Parallelization of Worker Code #1

Description

@Vinceyou1
  1. npm i p-limit (used for concurrency scheduling + rate limits)
  2. get rid of all references to handleNotification, haven't pushed the frontend prisma stuff yet
  3. Comment out all dummy code, replace with the actual AI calls
import { createClient } from "redis";
import http from "http";
import pLimit from "p-limit";

import prismaDB from "./lib/prisma";
import { splitContentIntoChunks } from "./lib/utils";
import { generateObject, generateText } from "ai";
// import { openai } from "./lib/ai/available-models";
import { z } from "zod";
import { sleep } from "bun";
import { NotificationStatus, type Deal } from "@prisma/client";
// Constants
const REDIS_QUEUE_NAME = "dealListings";
const REDIS_PUBLISH_CHANNEL = "problem_done";
const WORKER_DELAY_MS = 2000;
const DEFAULT_PORT = 8080;
const REDIS_RETRY_DELAY = 5000;
const MAX_RETRIES = 3;

// Rate limiting constants
const MAX_CONCURRENT_JOBS = 10; // Maximum number of concurrent submissions being processed
const MAX_CONCURRENT_AI_CALLS = 10; // Maximum concurrent OpenAI API calls across all jobs

// Create rate limiters
const jobLimiter = pLimit(MAX_CONCURRENT_JOBS);
const aiCallLimiter = pLimit(MAX_CONCURRENT_AI_CALLS);

// Optional: Add monitoring for rate limiter status
setInterval(() => {
  console.log(
    `Rate limiter status - Jobs: ${jobLimiter.activeCount}/${jobLimiter.pendingCount} active/pending, AI calls: ${aiCallLimiter.activeCount}/${aiCallLimiter.pendingCount} active/pending`
  );
}, 10000); // Log every 10 seconds

interface Submission extends Deal {
  userId: string;
  screenerId: string;
  screenerContent: string;
  screenerName: string;
}

interface AIScreeningResult {
  title: string;
  score: number;
  sentiment: "POSITIVE" | "NEGATIVE" | "NEUTRAL";
  explanation: string;
}

/**
 * Generate final AI screening result
 */
async function generateFinalSummary(
  combinedSummary: string
): Promise<AIScreeningResult | null> {
  // Wrap the AI call with rate limiting
  return aiCallLimiter(async () => {
    try {
      console.log("Generating final AI screening result...");
      // const result = await generateObject({
      //   model: openai("gpt-4o-mini"),
      //   prompt: `Combine the following summaries into a single summary: ${combinedSummary}`,
      //   schema: z.object({
      //     title: z.string(),
      //     score: z.number(),
      //     sentiment: z.enum(["POSITIVE", "NEGATIVE", "NEUTRAL"]),
      //     explanation: z.string(),
      //   }),
      // });
      const result = {
        object: {
          title: "AI Screening Result 123",
          score: Math.floor(Math.random() * 100),
          sentiment: ["POSITIVE", "NEGATIVE", "NEUTRAL"].at(
            Math.floor(Math.random() * 3)
          ),
          explanation: "This is a sample screening result, auto-generated",
        } as AIScreeningResult,
      };
      console.log("here 1")
      await sleep(5000 + Math.random() * 1000).then(() =>
        console.log(
          "Final AI screening result generated successfully:",
          result.object
        )
      );
      console.log("here 2");
      return result.object;
    } catch (error) {
      console.error("Error generating final summary:", error);
      return null;
    }
  });
}

/**
 * Save AI screening result to database
 */
async function saveAIScreeningResult(
  submissionId: string,
  result: AIScreeningResult,
  combinedSummary: string
): Promise<boolean> {
  try {
    console.log(
      `Saving AI screening result to database for submission: ${submissionId}`
    );
    await prismaDB.aiScreening.create({
      data: {
        dealId: submissionId,
        title: result.title,
        explanation: result.explanation,
        score: result.score,
        sentiment: result.sentiment,
        content: combinedSummary,
      },
    });
    console.log("AI screening result saved successfully to database");
    return true;
  } catch (error) {
    console.error("Error saving AI screening result:", error);
    return false;
  }
}

/**
 * Process content chunks and generate summaries
 */
async function processContentChunks(
  chunks: string[],
  dealInfo: Submission
): Promise<string[]> {
  console.log(
    `Processing ${chunks.length} content chunks for deal: ${dealInfo.id}`
  );

  const dealContext = {
    title: dealInfo.title,
    brokerage: dealInfo.brokerage,
    dealCaption: dealInfo.dealCaption,
    dealType: dealInfo.dealType,
    ebitda: dealInfo.ebitda,
    ebitdaMargin: dealInfo.ebitdaMargin,
    companyLocation: dealInfo.companyLocation,
    revenue: dealInfo.revenue,
    caption: dealInfo.dealCaption,
    industry: dealInfo.industry,
  };

  // Process all chunks in parallel with rate limiting
  const chunkPromises = chunks.map((chunk, i) => {
    if (!chunk) {
      console.warn(`Chunk ${i + 1} is undefined, skipping...`);
      return Promise.resolve(`[Empty chunk]`);
    }

    // Wrap each AI call with rate limiting
    return aiCallLimiter(async () => {
      try {
        console.log(
          `Processing chunk ${i + 1}/${chunks.length} (${
            chunk.length
          } characters)`
        );

        const prompt = `Based on this deal context: ${JSON.stringify(
          dealContext
        )}, evaluate the following text: ${chunk}`;
        // const summary = await generateText({
        //   system:
        //     "You are an expert AI Assistant that specializes in deal sourcing, evaluation and private equity in general",
        //   model: openai("gpt-4o-mini"),
        //   prompt,
        // });
        const summary = {
          text: "testing 123",
        };

        console.log(`Chunk ${i + 1} summary generated by AI:`, summary.text);
        return summary.text;
      } catch (error) {
        console.error(`Error processing chunk ${i + 1}:`, error);
        return `[Error processing chunk]`;
      }
    });
  });

  // Wait for all chunks to be processed
  const summaries = await Promise.all(chunkPromises);
  console.log(`All ${chunks.length} chunks processed for deal: ${dealInfo.id}`);

  return summaries;
}

// Passing in Submission creates new notification, passing in string (id) edits existing one
async function handleNotification(
  submission: Submission | string,
  status: NotificationStatus
): Promise<string> {
  const create =
    typeof submission === "object" &&
    submission !== null &&
    "userId" in submission;

  try {
    if (create) {
      // Create new notification and return the notification ID (not dealId)
      const notification = await prismaDB.notification.create({
        data: {
          dealId: submission.id,
          userId: submission.userId,
          dealTitle: submission.title || "",
          status: status,
        },
      });
      return notification.id; // Return the notification ID
    } else {
      // Update existing notification using the notification ID
      await prismaDB.notification.update({
        where: {
          id: submission, // submission is the notification ID string
        },
        data: {
          status,
        },
      });
      return submission; // Return the same notification ID
    }
  } catch (error) {
    console.error("Error handling notification:", error);
    return "";
  }
}

/**
 * Process a single submission
 */
async function processSubmission(submission: Submission): Promise<boolean> {
  try {
    console.log(`=== Starting to process submission: ${submission.id} ===`);

    // Split content into chunks
    console.log("Splitting content into chunks...");
    const chunks = await splitContentIntoChunks(submission.screenerContent);
    console.log(`Content split into ${chunks.length} chunks`);

    if (chunks.length === 0) {
      console.warn("No content chunks generated - submission will be skipped");
      return false;
    }

    // Process chunks
    console.log("Processing content chunks...");
    const summaries = await processContentChunks(chunks, submission);
    const combinedSummary = summaries.join("\n\n=== Next Section ===\n\n");
    console.log(
      `Combined summary length: ${combinedSummary.length} characters`
    );

    // const combinedSummary = "testing 123";

    // Generate final result
    console.log("Generating final AI screening result...");
    const finalResult = await generateFinalSummary(combinedSummary);
    if (!finalResult) {
      console.error(
        "Failed to generate final summary - submission processing failed"
      );
      return false;
    }

    // Save to database
    console.log("Saving result to database...");
    const saveSuccess = await saveAIScreeningResult(
      submission.id,
      finalResult,
      combinedSummary
    );

    if (!saveSuccess) {
      console.error("Database save failed - submission processing incomplete");
      return false;
    } else {
      console.log(
        "Database save successful, publishing completion notification..."
      );
      console.log(`=== Submission ${submission.id} processed successfully ===`);
      return true;
    }
  } catch (error) {
    console.error(`=== Error processing submission ${submission.id}:`, error);
    return false;
  }
}

// Create a separate Redis client for publishing to avoid connection sharing issues
let publishClient: any = null;

async function getPublishClient() {
  if (!publishClient) {
    publishClient = createClient({ url: process.env.REDIS_URL });
    publishClient.on("error", (e: any) => console.error("Redis publish client error", e));
    await publishClient.connect();
    console.log("Redis publish client connected");
  }
  return publishClient;
}

async function start() {
  const redis = createClient({ url: process.env.REDIS_URL });
  redis.on("error", (e) => console.error("Redis error", e));
  await redis.connect();
  console.log("Worker connected to Redis");

  // Initialize the publish client
  await getPublishClient();

  // Test Redis publish functionality at startup
  try {
    console.log("Testing Redis publish functionality...");
    const pubClient = await getPublishClient();
    const testResult = await pubClient.publish("test_channel", "test_message");
    console.log(`Redis publish test successful. Subscribers: ${testResult}`);
  } catch (error) {
    console.error("Redis publish test failed:", error);
  }

  // Start a simple HTTP server for Cloud Run health checks
  const port = Number(process.env.PORT) || 8080;
  const server = http.createServer((req, res) => {
    if (!req.url) return;
    if (req.url === "/health" || req.url === "/") {
      res.writeHead(200, { "Content-Type": "text/plain" });
      res.end("OK");
    } else {
      res.writeHead(404, { "Content-Type": "text/plain" });
      res.end("Not Found");
    }
  });

  server.listen(port, () =>
    console.log(`Worker HTTP server listening on :${port}`)
  );

  // Helper function to process submission with error handling and completion notification
  async function handleSubmission(submission: Submission, notification_id: string) : Promise<void> {
    try {
      console.log(
        `Starting concurrent processing for submission: ${submission.id}`
      );

      // TODO: retry logic for if this fails
      const result = await processSubmission(submission);
      
      await handleNotification(notification_id, result ? "COMPLETED" : "FAILED");
      
      const notificationPayload = {
        userId: submission.userId,
        productId: submission.id,
        status: "done",
        productName: submission.title || submission.id,
      };
      
      console.log("Publishing notification:", JSON.stringify(notificationPayload));
      
      // Fire-and-forget Redis publish using dedicated client
      getPublishClient().then(pubClient => {
        return pubClient.publish(
          REDIS_PUBLISH_CHANNEL,
          JSON.stringify(notificationPayload)
        );
      }).then((subscribers) => {
        console.log("Redis publish successful, subscribers:", subscribers);
      }).catch((error) => {
        console.error("Redis publish failed:", error);
      });
      
      console.log("Notification dispatched (fire-and-forget)");

      console.log(
        `Concurrent processing completed for submission: ${submission.id}`
      );
    } catch (error) {
      console.error(
        `Error in concurrent processing for submission ${submission.id}:`,
        error
      );
      
      // Fire-and-forget error notification
      const errorPayload = {
        userId: submission.userId,
        productId: submission.id,
        status: "error",
        productName: submission.title || submission.id,
      };
      
      getPublishClient().then(pubClient => {
        return pubClient.publish(
          REDIS_PUBLISH_CHANNEL,
          JSON.stringify(errorPayload)
        );
      }).then(() => {
        console.log("Error notification published successfully");
      }).catch((publishError) => {
        console.error("Failed to publish error notification:", publishError);
      });
    }
  }

  while (true) {
    try {
      console.log("Waiting for items with BRPOP on:", REDIS_QUEUE_NAME);
      const res = await redis.brPop(REDIS_QUEUE_NAME, 0);
      if (!res) continue;
      const item = res.element;
      let submission: Submission;
      try {
        submission = JSON.parse(item);
      } catch {
        console.error("Invalid JSON item, skipping");
        continue;
      }

      console.log("Received and parsed submission", submission);

      const notification_id = await handleNotification(submission, "PENDING");

      // Start processing with job-level rate limiting (fire-and-forget)
      // Don't await - let it run in the background
      jobLimiter(() => handleSubmission(submission, notification_id)).catch((error) => {
        console.error(
          `Unhandled error in submission processing for ${submission.id}:`,
          error
        );
      });

      // Immediately continue to next job without waiting
      console.log(
        `Dispatched submission ${submission.id} for concurrent processing (max ${MAX_CONCURRENT_JOBS} concurrent jobs)`
      );
    } catch (e) {
      console.error("Worker loop error", e);
      await new Promise((r) => setTimeout(r, 1000));
    }
  }
}

start().catch((e) => {
  console.error("Worker failed to start", e);
  process.exit(1);
});

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Type

    No type

    Fields

    No fields configured for issues without a type.

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions