diff --git a/.dockerignore b/.dockerignore
index 6f4fb3d..c1ba168 100644
--- a/.dockerignore
+++ b/.dockerignore
@@ -5,3 +5,6 @@ npm-debug.log
*.md
.next
.git
+cypress.config.*
+.cypress/*
+cypress/
diff --git a/DataSource/index.tsx b/DataSource/index.tsx
index 8949011..28b8786 100644
--- a/DataSource/index.tsx
+++ b/DataSource/index.tsx
@@ -7,6 +7,7 @@ import { ConfigDropbox } from './Dropbox/ConfigDropbox';
import { DropboxPickerProvider } from './Dropbox/DropboxPicker/dropbox-picker.context';
import { ConfigGoogleDrive } from './GoogleDrive/ConfigGoogleDrive';
import { ConfigAws } from './Aws/ConfigAws';
+import { StopConnection } from '@/components/StopConnection/StopConnection';
export const DataSource = ({ connection, token, status }: { connection: ConnectionQuery, token: string | undefined | null, status: "PROCESSING" | "FINISHED" | undefined }) => {
switch (connection.service) {
@@ -15,11 +16,13 @@ export const DataSource = ({ connection, token, status }: { connection: Connecti
{connection.isConfigSet && }
+
>
case "DIRECT_UPLOAD":
return <>
+
>
case 'DROPBOX':
return <>
@@ -28,17 +31,20 @@ export const DataSource = ({ connection, token, status }: { connection: Connecti
+
>
- case 'AWS':
+ case 'AWS':
return <>
- {connection.isConfigSet && }
-
-
- >
+ {connection.isConfigSet && }
+
+
+
+ >
default:
return <>
{connection.isConfigSet && }
+
>
}
}
diff --git a/actions/connctions/set/index.ts b/actions/connctions/set/index.ts
index b82b968..2a46990 100644
--- a/actions/connctions/set/index.ts
+++ b/actions/connctions/set/index.ts
@@ -1,5 +1,7 @@
"use server"
import { authOptions } from "@/auth";
+import { databaseDrizzle } from "@/db";
+import { connections } from "@/db/schemas/connections";
import { setConnectionToProcess } from "@/fileProcessors/connectors";
import { fromErrorToFormState, toFormState } from "@/lib/zodErrorHandle";
import { addToProcessFilesQueue } from "@/workers/queues/jobs/processFiles.job";
@@ -17,7 +19,10 @@ export async function setConnectionConfig(_: FormState, formData: FormData) {
formData.set("userId", session.user.id)
const config = await setConnectionToProcess(formData)
- await addToProcessFilesQueue(config)
+ const jobId = await addToProcessFilesQueue(config)
+ await databaseDrizzle.update(connections).set({
+ jobId: jobId
+ })
revalidatePath("/connections");
return toFormState("SUCCESS", "start processing");
} catch (e) {
diff --git a/actions/connctions/stop/index.ts b/actions/connctions/stop/index.ts
new file mode 100644
index 0000000..331e4c6
--- /dev/null
+++ b/actions/connctions/stop/index.ts
@@ -0,0 +1,31 @@
+"use server"
+import { authOptions } from "@/auth";
+import { databaseDrizzle } from "@/db";
+import { fromErrorToFormState, toFormState } from "@/lib/zodErrorHandle";
+import { redisConnection } from "@/workers/redis";
+import { getServerSession } from "next-auth";
+
+type FormState = {
+ message: string;
+};
+
+export async function stopProcessing(_: FormState, formData: FormData) {
+ const session = await getServerSession(authOptions);
+ try {
+ if (!session?.user?.id) throw new Error("forbidden");
+ const connectionId = formData.get("connectionId");
+ if (!connectionId) throw new Error("Missing connection id")
+
+ const conn = await databaseDrizzle.query.connections.findFirst({
+ where: (c, u) => u.eq(c.id, connectionId.toString()),
+ columns: {
+ jobId: true,
+ }
+ })
+ if (!conn || !conn.jobId) throw new Error("Connection Not Found or connection not processing in this time")
+ await redisConnection.set(`cancel-job:${conn.jobId}`, '1');
+ return toFormState("SUCCESS", "stop processing");
+ } catch (e) {
+ return fromErrorToFormState(e);
+ }
+}
diff --git a/actions/connctions/sync/index.ts b/actions/connctions/sync/index.ts
index 60d355d..f59f397 100644
--- a/actions/connctions/sync/index.ts
+++ b/actions/connctions/sync/index.ts
@@ -57,10 +57,6 @@ export const syncConnectionConfig = async (_: FormState, formData: FormData) =>
})
if (!user) throw new Error("no such account")
- await databaseDrizzle.update(connections).set({
- isSyncing: true,
- }).where(eq(connections.id, connectionId));
-
const { currentConn, othersConn } = user.connections.reduce((acc, c) => {
if (c.id === connectionId) {
acc.currentConn = c;
@@ -71,8 +67,7 @@ export const syncConnectionConfig = async (_: FormState, formData: FormData) =>
}, { currentConn: null as Conn | null, othersConn: [] as Conn[] });
if (!currentConn) throw new Error("no such connection")
-
- await addToProcessFilesQueue({
+ const jobId = await addToProcessFilesQueue({
connectionId: connectionId,
service: currentConn.service,
metadata: currentConn.metadata || null,
@@ -81,6 +76,10 @@ export const syncConnectionConfig = async (_: FormState, formData: FormData) =>
files: [],
links: []
})
+ await databaseDrizzle.update(connections).set({
+ isSyncing: true,
+ jobId: jobId
+ }).where(eq(connections.id, connectionId));
revalidatePath("/connections");
return toFormState("SUCCESS", "start processing");
diff --git a/app/api/connections/[id]/files/route.ts b/app/api/connections/[id]/files/route.ts
index 097ebe9..0654e33 100644
--- a/app/api/connections/[id]/files/route.ts
+++ b/app/api/connections/[id]/files/route.ts
@@ -85,6 +85,8 @@ export async function DELETE(request: NextRequest, { params }: Params) {
where: (c, ops) => ops.and(ops.eq(c.userId, userId), ops.eq(c.id, id)),
columns: {
id: true,
+ isSyncing: true,
+ jobId: true,
},
})
@@ -94,6 +96,16 @@ export async function DELETE(request: NextRequest, { params }: Params) {
message: "Connection not found or access denied",
}, { status: 403 })
}
+
+ if (connection.isSyncing || connection.jobId) {
+ return NextResponse.json(
+ {
+ code: 'processing_in_progress',
+ message: 'Cannot delete while files are being processed. Please cancel the active processing operation first.'
+ },
+ { status: 409 }
+ )
+ }
const filesToDelete = validation.data.file
? [validation.data.file]
: validation.data.files ?? [];
diff --git a/app/api/connections/[id]/route.ts b/app/api/connections/[id]/route.ts
index 7394b83..ac1954e 100644
--- a/app/api/connections/[id]/route.ts
+++ b/app/api/connections/[id]/route.ts
@@ -107,7 +107,10 @@ export async function DELETE(request: NextRequest, { params }: Params) {
const { id } = await params
const { data: conn, error: deleteError } = await tryAndCatch(databaseDrizzle.query.connections.findFirst({
where: (conn, ops) => ops.and(ops.eq(conn.id, id), ops.eq(conn.userId, userId)),
- columns: {},
+ columns: {
+ jobId: true,
+ isSyncing: true,
+ },
with: {
files: {
columns: {
@@ -136,16 +139,25 @@ export async function DELETE(request: NextRequest, { params }: Params) {
)
}
+ if (conn.isSyncing || conn.jobId) {
+ return NextResponse.json(
+ {
+ code: 'processing_in_progress',
+ message: 'Cannot delete while files are being processed. Please cancel the active processing operation first.'
+ },
+ { status: 409 }
+ )
+ }
for (const { chunksIds } of conn.files) {
- await qdrantCLient.delete(qdrant_collection_name, {
+ await tryAndCatch(qdrantCLient.delete(qdrant_collection_name, {
points: chunksIds,
wait: wait === "true",
- })
+ }))
}
- await databaseDrizzle
+ await tryAndCatch(databaseDrizzle
.delete(connections)
- .where(eq(connections.id, id))
+ .where(eq(connections.id, id)))
return NextResponse.json({
code: "ok",
@@ -246,10 +258,10 @@ export async function PUT(request: NextRequest, { params }: Params) {
let error: Error | null = null;
if (links.length > 0 || files.length > 0) {
- const {error:err} = await tryAndCatch(directProcessFiles(filesConfig))
+ const { error: err } = await tryAndCatch(directProcessFiles(filesConfig))
error = err;
} else {
- const {error:err} = await tryAndCatch(connectionProcessFiles(filesConfig))
+ const { error: err } = await tryAndCatch(connectionProcessFiles(filesConfig))
error = err
}
diff --git a/app/api/connections/[id]/stop/route.ts b/app/api/connections/[id]/stop/route.ts
new file mode 100644
index 0000000..7ae26de
--- /dev/null
+++ b/app/api/connections/[id]/stop/route.ts
@@ -0,0 +1,78 @@
+import { databaseDrizzle } from "@/db"
+import { checkAuth } from "@/lib/api_key"
+import { tryAndCatch } from "@/lib/try-catch"
+import { redisConnection } from "@/workers/redis"
+import { NextRequest, NextResponse } from "next/server"
+import { APIError } from "openai"
+
+
+type Params = {
+ params: Promise<{
+ id: string
+ }>
+}
+
+export async function POST(request: NextRequest, { params }: Params) {
+ try {
+ const {
+ data: userId,
+ error: authError,
+ } = await tryAndCatch(checkAuth(request))
+ if (authError) {
+ return NextResponse.json({
+ code: authError.code,
+ message: authError.message,
+ }, { status: authError.status })
+ }
+
+ const { id } = await params
+ const { data: conn, error: queryError } = await tryAndCatch(databaseDrizzle.query.connections.findFirst({
+ where: (conn, ops) => ops.and(ops.eq(conn.userId, userId!), ops.eq(conn.id, id)),
+ columns: {
+ jobId: true,
+ }
+ }))
+ if (queryError) {
+ return NextResponse.json(
+ {
+ code: "internal_server_error",
+ message: "Failed to load connection status",
+ },
+ { status: 500 },
+ )
+ }
+ if (!conn) {
+ return NextResponse.json(
+ { code: 'not_found', message: 'Connection not found' },
+ { status: 404 }
+ )
+ }
+ if (!conn.jobId) {
+ return NextResponse.json(
+ { code: 'not_processing', message: 'No active processing job' },
+ { status: 400 }
+ )
+ }
+ const { error: redisError } = await tryAndCatch(
+ redisConnection.set(`cancel-job:${conn.jobId}`, '1')
+ )
+
+ if (redisError) {
+ return NextResponse.json(
+ { code: 'internal_error', message: 'Failed to cancel processing' },
+ { status: 500 }
+ )
+ }
+
+ return NextResponse.json(
+ { code: 'ok', message: 'Processing cancellation requested' },
+ { status: 200 }
+ )
+
+ } catch (error: any) {
+ return NextResponse.json(
+ { code: "internal_server_error", message: error.message },
+ { status: 500 },
+ );
+ }
+}
diff --git a/components/Connections/Connections.tsx b/components/Connections/Connections.tsx
index 1ce1518..239f8b0 100644
--- a/components/Connections/Connections.tsx
+++ b/components/Connections/Connections.tsx
@@ -19,9 +19,11 @@ import {
TooltipProvider,
TooltipTrigger,
} from "@/components/ui/tooltip"
+import { useRouter } from "next/navigation";
export default function Connections({ connections, tokens }: { connections: ConnectionQuery[], tokens: ConnectionToken }) {
const [isMounted, setIsMounted] = useState(false)
+ const route = useRouter()
const [connProgress, setConnProgress] = useState(null);
useEffect(() => {
@@ -41,6 +43,12 @@ export default function Connections({ connections, tokens }: { connections: Conn
}, [])
+ useEffect(() => {
+ if (connProgress?.status === 'FINISHED') {
+ route.refresh();
+ }
+ }, [connProgress]);
+
return connections.map(connection => {
const progress = connection.id === connProgress?.connectionId ? connProgress : null;
diff --git a/components/Connectors/Connectors.tsx b/components/Connectors/Connectors.tsx
index efa7912..97cbca3 100644
--- a/components/Connectors/Connectors.tsx
+++ b/components/Connectors/Connectors.tsx
@@ -3,14 +3,18 @@ import { FaGoogleDrive, FaDropbox } from "react-icons/fa";
import { ConnectionBtn } from './ConnectionBtn';
import { SiAwslambda } from 'react-icons/si';
+const isCloud = process.env.DCUP_ENV === 'CLOUD';
+
export const Connectors = async () => {
const connectors = [
- {
- id: 'google-drive',
- name: 'Google Drive',
- icon: ,
- description: 'Connect your Google Drive to access documents and files',
- },
+ ...(!isCloud ? [
+ {
+ id: 'google-drive',
+ name: 'Google Drive',
+ icon: ,
+ description: 'Connect your Google Drive to access documents and files',
+ },
+ ] : []),
{
id: "dropbox",
name: "Dropbox",
diff --git a/components/StopConnection/StopConnection.tsx b/components/StopConnection/StopConnection.tsx
new file mode 100644
index 0000000..e54f8ab
--- /dev/null
+++ b/components/StopConnection/StopConnection.tsx
@@ -0,0 +1,46 @@
+"use client"
+import { Button } from "../ui/button"
+import { Pause } from "lucide-react"
+import { useTransition } from "react"
+import { toast } from "@/hooks/use-toast"
+import { EMPTY_FORM_STATE } from "@/lib/zodErrorHandle"
+import { ConnectionQuery } from "@/app/(protected)/connections/page"
+import { stopProcessing } from "@/actions/connctions/stop"
+
+
+export const StopConnection = ({ connection, status }: {
+ connection: ConnectionQuery,
+ status: "PROCESSING" | "FINISHED" | undefined
+}) => {
+ const [isPending, startTransition] = useTransition();
+
+ const handleStopConnection = () => {
+ startTransition(async () => {
+ try {
+ const formData = new FormData();
+ formData.set("connectionId", connection.id)
+ const res = await stopProcessing(EMPTY_FORM_STATE, formData)
+ if (res.status !== 'SUCCESS') {
+ throw new Error(res.message)
+ }
+ toast({
+ title: res.message,
+ });
+
+ } catch (error: any) {
+ toast({
+ variant: "destructive",
+ title: "Uh oh! Something went wrong.",
+ description: error.message || "An unexpected error occurred.",
+ });
+ }
+ })
+ }
+
+ return (
+
+ )
+}
diff --git a/cypress/e2e/directUpload.cy.ts b/cypress/e2e/directUpload.cy.ts
index 8e8413c..1de2ecb 100644
--- a/cypress/e2e/directUpload.cy.ts
+++ b/cypress/e2e/directUpload.cy.ts
@@ -250,7 +250,7 @@ describe("Direct Upload UI", () => {
cy.get('[data-test="folderName"]').should('contain.text', "*")
cy.get('[data-test="processedFile"]').should('contain.text', 1)
cy.get('[data-test="processedPage"]').should('contain.text', 2)
-
+
// remove the only stored pdf file
cy.get('[data-test="btn-config"]')
.click()
@@ -444,6 +444,50 @@ describe("Direct Upload UI", () => {
expect(points).eq(0)
})
})
+
+ it('should handle processing cancellation with progress preservation', () => {
+
+ cy.uploadFiles({ files: ['invo.pdf', "sample.pdf"] })
+ cy.wait(1000)
+
+ const targetState = { file: 1, page: 2 }
+ let found = false
+
+ const checkProgress = (retries = 0) => {
+ cy.get('[data-test="processedFile"]').invoke('text').then(fileText => {
+ cy.get('[data-test="processedPage"]').invoke('text').then(pageText => {
+ const currentFile = parseInt(fileText)
+ const currentPage = parseInt(pageText)
+
+ if (currentFile >= targetState.file && currentPage >= targetState.page) {
+ found = true
+ cy.get('[data-test="stop-connection"]').click()
+ return
+ }
+
+ if (!found) {
+ cy.wait(1000) // Check every 500ms
+ checkProgress(retries + 1)
+ }
+ })
+ })
+ }
+ checkProgress()
+
+ // UI assertions
+ cy.get('[data-test="processedFile"]').should('contain', targetState.file)
+ cy.get('[data-test="processedPage"]').should('contain', targetState.page)
+
+ cy.task("getConnection", { email: fakeUser.email })
+ .then(({ conns }: any) => {
+ const conn = (conns as FileConnectionQuery[])[0]
+ expect(conn.service).eq("DIRECT_UPLOAD")
+ expect(conn.metadata).eq("{}")
+ expect(conn.limitPages).to.be.null
+ expect(conn.limitFiles).to.be.null
+ cy.checkIndexedFiles({ conn, files: [{ name: "invo.pdf", totalPages: 2 }, { name: "sample.pdf", totalPages: 0 }] })
+ })
+ })
})
describe("Direct Upload API", () => {
@@ -607,6 +651,7 @@ describe("Direct Upload API", () => {
})
})
})
+
it('should enforce page limits during file operations and maintain constraints', () => {
// Upload 1 pdf with 3 pages, it should process only 2
cy.task('addNewUser', fakeUser).then(user => {
diff --git a/cypress/e2e/dropbox.cy.ts b/cypress/e2e/dropbox.cy.ts
index 69387cd..dc4db06 100644
--- a/cypress/e2e/dropbox.cy.ts
+++ b/cypress/e2e/dropbox.cy.ts
@@ -509,4 +509,56 @@ describe("Dropbox connection UI Testing", () => {
expect(points).eq(0)
})
})
+
+ it('should handle processing cancellation with progress preservation', () => {
+
+ cy.task('getConnections', { email: fakeUser.email })
+ .then(res => {
+ const { conns } = res as { conns: ConnectionTable[] }
+ cy.get(`[data-test="btn-config-${conns[0].identifier}"]`)
+ .click()
+ .get('input[name="folderName"]')
+ .clear()
+ .type('_TEST_/invo.pdf/sample.pdf')
+ .get(`[data-test="btn-config-connection"]`)
+ .click()
+ })
+ cy.wait(1000)
+
+ const targetState = { file: 1, page: 2 }
+ let found = false
+
+ const checkProgress = (retries = 0) => {
+ cy.get('[data-test="processedFile"]').invoke('text').then(fileText => {
+ cy.get('[data-test="processedPage"]').invoke('text').then(pageText => {
+ const currentFile = parseInt(fileText)
+ const currentPage = parseInt(pageText)
+
+ if (currentFile >= targetState.file && currentPage >= targetState.page) {
+ found = true
+ cy.get('[data-test="stop-connection"]').click()
+ return
+ }
+
+ if (!found) {
+ cy.wait(1000) // Check every 500ms
+ checkProgress(retries + 1)
+ }
+ })
+ })
+ }
+ checkProgress()
+
+ // UI assertions
+ cy.get('[data-test="processedFile"]').should('contain', targetState.file)
+ cy.get('[data-test="processedPage"]').should('contain', targetState.page)
+
+ cy.task("getConnection", { email: fakeUser.email })
+ .then(({ conns }: any) => {
+ const conn = (conns as FileConnectionQuery[])[0]
+ expect(conn.limitPages).to.be.null
+ expect(conn.limitFiles).to.be.null
+ cy.checkIndexedFiles({ conn, source: "DROPBOX", files: [{ name: "invo.pdf", totalPages: 2 }, { name: "sample.pdf", totalPages: 0 }] })
+ })
+ })
})
diff --git a/db/schemas/connections.ts b/db/schemas/connections.ts
index 6247850..cf54d45 100644
--- a/db/schemas/connections.ts
+++ b/db/schemas/connections.ts
@@ -34,6 +34,7 @@ export const connections = pgTable("connection", {
limitFiles: integer("limit_files"),
lastSynced: timestamp("last_synced", { withTimezone: true }),
isSyncing: boolean("is_syncing").default(false).notNull(),
+ jobId: text("job_id"),
isConfigSet: boolean("is_config_set").default(false).notNull(),
createdAt: timestamp("createdAt", { withTimezone: true }).notNull().defaultNow(),
})
diff --git a/drizzle/meta/_journal.json b/drizzle/meta/_journal.json
index 8467fc4..c5704fa 100644
--- a/drizzle/meta/_journal.json
+++ b/drizzle/meta/_journal.json
@@ -71,6 +71,20 @@
"when": 1747198120772,
"tag": "0009_regular_hydra",
"breakpoints": true
+ },
+ {
+ "idx": 10,
+ "version": "7",
+ "when": 1748097253159,
+ "tag": "0010_red_reptil",
+ "breakpoints": true
+ },
+ {
+ "idx": 11,
+ "version": "7",
+ "when": 1748097469946,
+ "tag": "0011_keen_night_nurse",
+ "breakpoints": true
}
]
}
\ No newline at end of file
diff --git a/fileProcessors/connectors/index.ts b/fileProcessors/connectors/index.ts
index 01b0ab8..23d40ec 100644
--- a/fileProcessors/connectors/index.ts
+++ b/fileProcessors/connectors/index.ts
@@ -84,7 +84,6 @@ export const setConnectionToProcess = async (formData: FormData): Promise conn.id === connectionId)
if (connection && connection.limitPages) formData.set("pageLimit", connection.limitPages.toString())
diff --git a/fileProcessors/index.ts b/fileProcessors/index.ts
index 3a3f726..d44b661 100644
--- a/fileProcessors/index.ts
+++ b/fileProcessors/index.ts
@@ -23,7 +23,7 @@ export type PageContent = {
tables: unknown[]
}
-export const directProcessFiles = async ({ files, metadata, service, connectionId, links, pageLimit, fileLimit }: TQueue) => {
+export const directProcessFiles = async ({ files, metadata, service, connectionId, links, pageLimit, fileLimit }: TQueue, checkCancel?: () => Promise) => {
// Create promises for processing file URLs
const filePromises = files.map(async (file) => {
const arrayBuffer = Buffer.from(file.content, 'base64').buffer;
@@ -64,7 +64,7 @@ export const directProcessFiles = async ({ files, metadata, service, connectionI
if (pageLimit && pageLimit < currentPagesCount) {
await databaseDrizzle
.update(connections)
- .set({ isSyncing: false })
+ .set({ isSyncing: false, jobId: null })
.where(eq(connections.id, connectionId))
await publishProgress({
@@ -76,10 +76,10 @@ export const directProcessFiles = async ({ files, metadata, service, connectionI
})
return;
}
- return await processFiles(filesContent, service, connectionId, pageLimit, fileLimit, currentPagesCount, connection.files.length)
+ return await processFiles(filesContent, service, connectionId, pageLimit, fileLimit, currentPagesCount, connection.files.length, checkCancel)
}
-export const connectionProcessFiles = async ({ connectionId, service, pageLimit, fileLimit }: TQueue) => {
+export const connectionProcessFiles = async ({ connectionId, service, pageLimit, fileLimit }: TQueue, checkCancel?: () => Promise) => {
const connection = await databaseDrizzle.query.connections.findFirst({
where: (c, ops) => ops.eq(c.id, connectionId),
with: {
@@ -109,7 +109,7 @@ export const connectionProcessFiles = async ({ connectionId, service, pageLimit,
if (pageLimit && pageLimit < currentPagesCount) {
await databaseDrizzle
.update(connections)
- .set({ isSyncing: false })
+ .set({ isSyncing: false, jobId: null })
.where(eq(connections.id, connectionId))
await publishProgress({
@@ -121,15 +121,18 @@ export const connectionProcessFiles = async ({ connectionId, service, pageLimit,
})
return;
}
- return processFiles(filesContent, service, connectionId, pageLimit, fileLimit, 0, 0)
+ return processFiles(filesContent, service, connectionId, pageLimit, fileLimit, 0, 0, checkCancel)
}
-const processFiles = async (filesContent: FileContent[], service: string, connectionId: string, pageLimit: number | null, fileLimit: number | null, currentPagesCount: number, currentFileCount: number) => {
+const delay = (ms: number) => new Promise(res => setTimeout(res, ms))
+
+const processFiles = async (filesContent: FileContent[], service: string, connectionId: string, pageLimit: number | null, fileLimit: number | null, currentPagesCount: number, currentFileCount: number, checkCancel?: () => Promise) => {
const completedFiles: typeof processedFiles.$inferInsert[] = []
const allPoints = [];
let processedPage = 0;
let processedAllPages = currentPagesCount;
let limits = pageLimit ? pageLimit - currentPagesCount : Infinity;
+ let shouldCancel = false;
const now = new Date()
try {
const splitter = new RecursiveCharacterTextSplitter({
@@ -138,8 +141,7 @@ const processFiles = async (filesContent: FileContent[], service: string, connec
keepSeparator: true,
separators: ["\n\n## ", "\n\n# ", "\n\n", "\n", ". ", "! ", "? ", " "],
});
-
- for (let fileIndex = 0; fileIndex < filesContent.length && limits > 0; fileIndex++) {
+ fileLoop: for (let fileIndex = 0; fileIndex < filesContent.length && limits > 0; fileIndex++) {
const file = filesContent[fileIndex]
const chunksId = [];
if (fileLimit !== null && fileLimit > 0 && fileIndex >= fileLimit) break;
@@ -151,8 +153,14 @@ const processFiles = async (filesContent: FileContent[], service: string, connec
},
};
for (let pageIndex = 0; pageIndex < file.pages.length && limits > 0; pageIndex++) {
+ if (checkCancel && await checkCancel()) {
+ shouldCancel = true;
+ break;
+ }
+ await delay(1000)
+
const page = file.pages[pageIndex]
- if (limits <= 0) break;
+ if (limits <= 0 || shouldCancel) break;
const textPoints = await processingTextPage(page.text, pageIndex, baseMetadata, splitter)
if (textPoints) {
allPoints.push(textPoints);
@@ -185,6 +193,7 @@ const processFiles = async (filesContent: FileContent[], service: string, connec
chunksIds: chunksId as string[],
})
processedPage = 0
+ if (limits <= 0 || shouldCancel) break fileLoop;
}
if (allPoints.length > 0) {
@@ -209,6 +218,7 @@ const processFiles = async (filesContent: FileContent[], service: string, connec
.set({
lastSynced: now,
isSyncing: false,
+ jobId: null,
})
.where(eq(connections.id, connectionId))
diff --git a/next.config.js b/next.config.js
index 82544e2..5d165ae 100644
--- a/next.config.js
+++ b/next.config.js
@@ -1,6 +1,11 @@
/** @type {import('next').NextConfig} */
const nextConfig = {
output: 'standalone',
+ experimental: {
+ serverActions: {
+ bodySizeLimit: '10mb'
+ }
+ }
};
module.exports = nextConfig
diff --git a/workers/queues/jobs/processFiles.job.ts b/workers/queues/jobs/processFiles.job.ts
index 5d62cc7..38dd38d 100644
--- a/workers/queues/jobs/processFiles.job.ts
+++ b/workers/queues/jobs/processFiles.job.ts
@@ -1,4 +1,4 @@
-import { Queue, Worker } from "bullmq";
+import { Queue, Worker, Job } from "bullmq";
import { redisConnection } from "../../redis";
import { defaultQueueConfig } from "../config";
import { connectionProcessFiles, directProcessFiles } from "@/fileProcessors";
@@ -14,7 +14,7 @@ export type SerializedFile = {
};
export type TQueue = {
connectionId: string;
- pageLimit: number| null,
+ pageLimit: number | null,
fileLimit: number | null,
files: SerializedFile[],
links: string[],
@@ -22,11 +22,6 @@ export type TQueue = {
metadata: string | null,
};
-
-export const addToProcessFilesQueue = (data: TQueue) => {
- return processfilesQueue.add(processFilesJobName, data)
-};
-
const processfilesQueue = new Queue(processFilesJobName, {
connection: redisConnection,
defaultJobOptions: {
@@ -35,13 +30,35 @@ const processfilesQueue = new Queue(processFilesJobName, {
}
});
-new Worker(processFilesJobName, async ({ data }) => {
- const { service }: TQueue = data
+
+new Worker(processFilesJobName, async (job: Job) => {
+ const isCancelled = async () =>
+ (await redisConnection.get(`cancel-job:${job.id}`)) === '1';
+
+ const { service }: TQueue = job.data
if (service === "DIRECT_UPLOAD") {
- await directProcessFiles(data)
+ await processWithCancellation(directProcessFiles, job, isCancelled)
} else {
- await connectionProcessFiles(data)
+ await processWithCancellation(connectionProcessFiles, job, isCancelled);
}
+
+ await redisConnection.del(`cancel-job:${job.id}`);
}, {
connection: redisConnection
});
+
+export const addToProcessFilesQueue = async (data: TQueue) => {
+ const newJob = await processfilesQueue.add(processFilesJobName, data)
+ return newJob.id
+};
+
+/**
+ * Wraps a processing function to inject cancellation checks.
+ */
+async function processWithCancellation(
+ f: (data: any, checkCancel: () => Promise) => Promise,
+ job: Job,
+ checkCancel: () => Promise
+) {
+ await f(job.data, checkCancel);
+}