Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 7 additions & 1 deletion .env.example
Original file line number Diff line number Diff line change
Expand Up @@ -85,4 +85,10 @@ POSTHOG_PROJECT_KEY=
# These control the server-side internal telemetry
# INTERNAL_OTEL_TRACE_EXPORTER_URL=<URL to send traces to>
# INTERNAL_OTEL_TRACE_LOGGING_ENABLED=1
# INTERNAL_OTEL_TRACE_INSTRUMENT_PRISMA_ENABLED=0,
# INTERNAL_OTEL_TRACE_INSTRUMENT_PRISMA_ENABLED=0

# Enable local observability stack (requires `pnpm run docker` to start otel-collector)
# Uncomment these to send metrics to the local Prometheus via OTEL Collector:
# INTERNAL_OTEL_METRIC_EXPORTER_ENABLED=1
# INTERNAL_OTEL_METRIC_EXPORTER_URL=http://localhost:4318/v1/metrics
# INTERNAL_OTEL_METRIC_EXPORTER_INTERVAL_MS=15000
3 changes: 2 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -62,4 +62,5 @@ apps/**/public/build
/packages/trigger-sdk/src/package.json
/packages/python/src/package.json
.claude
.mcp.log
.mcp.log
.cursor/debug.log
32 changes: 27 additions & 5 deletions apps/webapp/app/components/runs/v3/BatchStatus.tsx
Original file line number Diff line number Diff line change
@@ -1,17 +1,27 @@
import { CheckCircleIcon, XCircleIcon } from "@heroicons/react/20/solid";
import {
CheckCircleIcon,
ExclamationTriangleIcon,
XCircleIcon,
} from "@heroicons/react/20/solid";
import type { BatchTaskRunStatus } from "@trigger.dev/database";
import assertNever from "assert-never";
import { Spinner } from "~/components/primitives/Spinner";
import { cn } from "~/utils/cn";

export const allBatchStatuses = ["PENDING", "COMPLETED", "ABORTED"] as const satisfies Readonly<
Array<BatchTaskRunStatus>
>;
export const allBatchStatuses = [
"PROCESSING",
"PENDING",
"COMPLETED",
"PARTIAL_FAILED",
"ABORTED",
] as const satisfies Readonly<Array<BatchTaskRunStatus>>;

const descriptions: Record<BatchTaskRunStatus, string> = {
PROCESSING: "The batch is being processed and runs are being created.",
PENDING: "The batch has child runs that have not yet completed.",
COMPLETED: "All the batch child runs have finished.",
ABORTED: "The batch was aborted because some child tasks could not be triggered.",
PARTIAL_FAILED: "Some runs failed to be created. Successfully created runs are still executing.",
ABORTED: "The batch was aborted because child tasks could not be triggered.",
};

export function descriptionForBatchStatus(status: BatchTaskRunStatus): string {
Expand Down Expand Up @@ -47,10 +57,14 @@ export function BatchStatusIcon({
className: string;
}) {
switch (status) {
case "PROCESSING":
return <Spinner className={cn(batchStatusColor(status), className)} />;
case "PENDING":
return <Spinner className={cn(batchStatusColor(status), className)} />;
case "COMPLETED":
return <CheckCircleIcon className={cn(batchStatusColor(status), className)} />;
case "PARTIAL_FAILED":
return <ExclamationTriangleIcon className={cn(batchStatusColor(status), className)} />;
case "ABORTED":
return <XCircleIcon className={cn(batchStatusColor(status), className)} />;
default: {
Expand All @@ -61,10 +75,14 @@ export function BatchStatusIcon({

export function batchStatusColor(status: BatchTaskRunStatus): string {
switch (status) {
case "PROCESSING":
return "text-blue-500";
case "PENDING":
return "text-pending";
case "COMPLETED":
return "text-success";
case "PARTIAL_FAILED":
return "text-warning";
case "ABORTED":
return "text-error";
default: {
Expand All @@ -75,10 +93,14 @@ export function batchStatusColor(status: BatchTaskRunStatus): string {

export function batchStatusTitle(status: BatchTaskRunStatus): string {
switch (status) {
case "PROCESSING":
return "Processing";
case "PENDING":
return "In progress";
case "COMPLETED":
return "Completed";
case "PARTIAL_FAILED":
return "Partial failure";
case "ABORTED":
return "Aborted";
default: {
Expand Down
30 changes: 15 additions & 15 deletions apps/webapp/app/entry.server.tsx
Original file line number Diff line number Diff line change
@@ -1,22 +1,28 @@
import {
createReadableStreamFromReadable,
type DataFunctionArgs,
type EntryContext,
} from "@remix-run/node"; // or cloudflare/deno
import { createReadableStreamFromReadable, type EntryContext } from "@remix-run/node"; // or cloudflare/deno
import { RemixServer } from "@remix-run/react";
import { wrapHandleErrorWithSentry } from "@sentry/remix";
import { parseAcceptLanguage } from "intl-parse-accept-language";
import isbot from "isbot";
import { renderToPipeableStream } from "react-dom/server";
import { PassThrough } from "stream";
import * as Worker from "~/services/worker.server";
import { bootstrap } from "./bootstrap";
import { LocaleContextProvider } from "./components/primitives/LocaleProvider";
import {
OperatingSystemContextProvider,
OperatingSystemPlatform,
} from "./components/primitives/OperatingSystemProvider";
import { Prisma } from "./db.server";
import { env } from "./env.server";
import { eventLoopMonitor } from "./eventLoopMonitor.server";
import { logger } from "./services/logger.server";
import { resourceMonitor } from "./services/resourceMonitor.server";
import { singleton } from "./utils/singleton";
import { bootstrap } from "./bootstrap";
import { wrapHandleErrorWithSentry } from "@sentry/remix";
import { remoteBuildsEnabled } from "./v3/remoteImageBuilder.server";
import {
registerRunEngineEventBusHandlers,
setupBatchQueueCallbacks,
} from "./v3/runEngineHandlers.server";

const ABORT_DELAY = 30000;

Expand Down Expand Up @@ -228,19 +234,13 @@ process.on("uncaughtException", (error, origin) => {
});

singleton("RunEngineEventBusHandlers", registerRunEngineEventBusHandlers);
singleton("SetupBatchQueueCallbacks", setupBatchQueueCallbacks);

export { apiRateLimiter } from "./services/apiRateLimit.server";
export { engineRateLimiter } from "./services/engineRateLimit.server";
export { runWithHttpContext } from "./services/httpAsyncStorage.server";
export { socketIo } from "./v3/handleSocketIo.server";
export { wss } from "./v3/handleWebsockets.server";
export { runWithHttpContext } from "./services/httpAsyncStorage.server";
import { eventLoopMonitor } from "./eventLoopMonitor.server";
import { env } from "./env.server";
import { logger } from "./services/logger.server";
import { Prisma } from "./db.server";
import { registerRunEngineEventBusHandlers } from "./v3/runEngineHandlers.server";
import { remoteBuildsEnabled } from "./v3/remoteImageBuilder.server";
import { resourceMonitor } from "./services/resourceMonitor.server";

if (env.EVENT_LOOP_MONITOR_ENABLED === "1") {
eventLoopMonitor.enable();
Expand Down
28 changes: 28 additions & 0 deletions apps/webapp/app/env.server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -528,6 +528,7 @@ const EnvironmentSchema = z
MAXIMUM_TRACE_SUMMARY_VIEW_COUNT: z.coerce.number().int().default(25_000),
MAXIMUM_TRACE_DETAILED_SUMMARY_VIEW_COUNT: z.coerce.number().int().default(10_000),
TASK_PAYLOAD_OFFLOAD_THRESHOLD: z.coerce.number().int().default(524_288), // 512KB
BATCH_PAYLOAD_OFFLOAD_THRESHOLD: z.coerce.number().int().optional(), // Defaults to TASK_PAYLOAD_OFFLOAD_THRESHOLD if not set
TASK_PAYLOAD_MAXIMUM_SIZE: z.coerce.number().int().default(3_145_728), // 3MB
BATCH_TASK_PAYLOAD_MAXIMUM_SIZE: z.coerce.number().int().default(1_000_000), // 1MB
TASK_RUN_METADATA_MAXIMUM_SIZE: z.coerce.number().int().default(262_144), // 256KB
Expand All @@ -537,6 +538,14 @@ const EnvironmentSchema = z
MAX_BATCH_V2_TRIGGER_ITEMS: z.coerce.number().int().default(500),
MAX_BATCH_AND_WAIT_V2_TRIGGER_ITEMS: z.coerce.number().int().default(500),

// 2-phase batch API settings
STREAMING_BATCH_MAX_ITEMS: z.coerce.number().int().default(1_000), // Max items in streaming batch
STREAMING_BATCH_ITEM_MAXIMUM_SIZE: z.coerce.number().int().default(3_145_728),
BATCH_RATE_LIMIT_REFILL_RATE: z.coerce.number().int().default(10),
BATCH_RATE_LIMIT_MAX: z.coerce.number().int().default(1200),
BATCH_RATE_LIMIT_REFILL_INTERVAL: z.string().default("10s"),
BATCH_CONCURRENCY_LIMIT_DEFAULT: z.coerce.number().int().default(10),

REALTIME_STREAM_VERSION: z.enum(["v1", "v2"]).default("v1"),
REALTIME_STREAM_MAX_LENGTH: z.coerce.number().int().default(1000),
REALTIME_STREAM_TTL: z.coerce
Expand Down Expand Up @@ -931,6 +940,25 @@ const EnvironmentSchema = z
.default(process.env.REDIS_TLS_DISABLED ?? "false"),
BATCH_TRIGGER_WORKER_REDIS_CLUSTER_MODE_ENABLED: z.string().default("0"),

// BatchQueue DRR settings (Run Engine v2)
BATCH_QUEUE_DRR_QUANTUM: z.coerce.number().int().default(5),
BATCH_QUEUE_MAX_DEFICIT: z.coerce.number().int().default(50),
BATCH_QUEUE_CONSUMER_COUNT: z.coerce.number().int().optional(),
BATCH_QUEUE_CONSUMER_INTERVAL_MS: z.coerce.number().int().optional(),
// Global rate limit: max items processed per second across all consumers
// If not set, no global rate limiting is applied
BATCH_QUEUE_GLOBAL_RATE_LIMIT: z.coerce.number().int().positive().optional(),

// Batch rate limits and concurrency by plan type
// Rate limit: max items per minute for batch creation
BATCH_RATE_LIMIT_FREE: z.coerce.number().int().default(100), // 100 items/min for free
BATCH_RATE_LIMIT_PAID: z.coerce.number().int().default(10_000), // 10k items/min for paid
BATCH_RATE_LIMIT_ENTERPRISE: z.coerce.number().int().default(100_000), // 100k items/min for enterprise
// Processing concurrency: max concurrent batch items being processed
BATCH_CONCURRENCY_FREE: z.coerce.number().int().default(1),
BATCH_CONCURRENCY_PAID: z.coerce.number().int().default(10),
BATCH_CONCURRENCY_ENTERPRISE: z.coerce.number().int().default(50),

ADMIN_WORKER_ENABLED: z.string().default(process.env.WORKER_ENABLED ?? "true"),
ADMIN_WORKER_CONCURRENCY_WORKERS: z.coerce.number().int().default(2),
ADMIN_WORKER_CONCURRENCY_TASKS_PER_WORKER: z.coerce.number().int().default(10),
Expand Down
2 changes: 1 addition & 1 deletion apps/webapp/app/presenters/v3/BatchListPresenter.server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -195,7 +195,7 @@ WHERE
throw new Error(`Environment not found for Batch ${batch.id}`);
}

const hasFinished = batch.status !== "PENDING";
const hasFinished = batch.status !== "PENDING" && batch.status !== "PROCESSING";

return {
id: batch.id,
Expand Down
122 changes: 122 additions & 0 deletions apps/webapp/app/presenters/v3/BatchPresenter.server.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,122 @@
import { type BatchTaskRunStatus } from "@trigger.dev/database";
import { displayableEnvironment } from "~/models/runtimeEnvironment.server";
import { engine } from "~/v3/runEngine.server";
import { BasePresenter } from "./basePresenter.server";

type BatchPresenterOptions = {
environmentId: string;
batchId: string;
userId?: string;
};

export type BatchPresenterData = Awaited<ReturnType<BatchPresenter["call"]>>;

export class BatchPresenter extends BasePresenter {
public async call({ environmentId, batchId, userId }: BatchPresenterOptions) {
const batch = await this._replica.batchTaskRun.findFirst({
select: {
id: true,
friendlyId: true,
status: true,
runCount: true,
batchVersion: true,
createdAt: true,
updatedAt: true,
completedAt: true,
processingStartedAt: true,
processingCompletedAt: true,
successfulRunCount: true,
failedRunCount: true,
idempotencyKey: true,
runtimeEnvironment: {
select: {
id: true,
type: true,
slug: true,
orgMember: {
select: {
user: {
select: {
id: true,
name: true,
displayName: true,
},
},
},
},
},
},
errors: {
select: {
id: true,
index: true,
taskIdentifier: true,
error: true,
errorCode: true,
createdAt: true,
},
orderBy: {
index: "asc",
},
},
},
where: {
runtimeEnvironmentId: environmentId,
friendlyId: batchId,
},
});

if (!batch) {
throw new Error("Batch not found");
}

const hasFinished = batch.status !== "PENDING" && batch.status !== "PROCESSING";
const isV2 = batch.batchVersion === "runengine:v2";

// For v2 batches in PROCESSING state, get live progress from Redis
// This provides real-time updates without waiting for the batch to complete
let liveSuccessCount = batch.successfulRunCount ?? 0;
let liveFailureCount = batch.failedRunCount ?? 0;

if (isV2 && batch.status === "PROCESSING") {
const liveProgress = await engine.getBatchQueueProgress(batch.id);
if (liveProgress) {
liveSuccessCount = liveProgress.successCount;
liveFailureCount = liveProgress.failureCount;
}
}

return {
id: batch.id,
friendlyId: batch.friendlyId,
status: batch.status as BatchTaskRunStatus,
runCount: batch.runCount,
batchVersion: batch.batchVersion,
isV2,
createdAt: batch.createdAt.toISOString(),
updatedAt: batch.updatedAt.toISOString(),
completedAt: batch.completedAt?.toISOString(),
processingStartedAt: batch.processingStartedAt?.toISOString(),
processingCompletedAt: batch.processingCompletedAt?.toISOString(),
finishedAt: batch.completedAt
? batch.completedAt.toISOString()
: hasFinished
? batch.updatedAt.toISOString()
: undefined,
hasFinished,
successfulRunCount: liveSuccessCount,
failedRunCount: liveFailureCount,
idempotencyKey: batch.idempotencyKey,
environment: displayableEnvironment(batch.runtimeEnvironment, userId),
errors: batch.errors.map((error) => ({
id: error.id,
index: error.index,
taskIdentifier: error.taskIdentifier,
error: error.error,
errorCode: error.errorCode,
createdAt: error.createdAt.toISOString(),
})),
};
}
}

Loading
Loading