Should I save the data into a file or can we send child data over worker? #19
-
Beta Was this translation helpful? Give feedback.
Replies: 6 comments 5 replies
-
|
Hi @arthurvanl, great question! Short answer: for large data (>1-2 MB), save to disk/S3 and pass the file path in the job result. For small data (<1 MB), passing it directly through Why? Job results are stored in memory (LRU cache, max 5,000 entries) and optionally persisted to SQLite. Storing 20MB+ blobs per job would quickly exhaust memory and slow down SQLite operations. Recommended pattern for large data: import { writeFile, readFile } from 'fs/promises';
import { randomUUID } from 'crypto';
const worker = new Worker('fetch-data', async (job) => {
const data = await fetchFromFTP(job.data.endpoint);
if (JSON.stringify(data).length > 1_000_000) {
// Large data -> write to disk, return path
const path = `/tmp/jobs/${randomUUID()}.json`;
await writeFile(path, JSON.stringify(data));
return { type: 'file', path };
}
// Small data -> return directly
return { type: 'inline', data };
}, { embedded: true });
// Child worker reads parent result
const childWorker = new Worker('process-data', async (job) => {
const parentResult = flow.getParentResult(job.data.__flowParentId);
if (parentResult.type === 'file') {
const data = JSON.parse(await readFile(parentResult.path, 'utf-8'));
// process data...
} else {
// use parentResult.data directly
}
}, { embedded: true });For production, consider using S3/MinIO instead of local disk — bunqueue already has S3 backup support, and it scales better across instances. Also note that with v2.5.8, |
Beta Was this translation helpful? Give feedback.
-
|
A small addition to this @egeominotti A job like that will be ran multiple times parralel from eachother based on how many customers there are. Will this be a problem with using only 1 worker? And if not, what should be the best practice to make this work fast? |
Beta Was this translation helpful? Give feedback.
-
|
No, a single worker is not a problem — it depends on how you configure it. A single const worker = new Worker('customer-jobs', async (job) => {
const data = await fetchCustomerData(job.data.customerId);
// process...
}, { concurrency: 10 }); // Processes 10 jobs in parallelWith When to scale up
Recommendation for your use caseSince you're fetching data per customer (I/O-bound), start with a single Worker with higher concurrency: const worker = new Worker('fetch-data', processor, {
concurrency: 20, // 20 parallel customer fetches
batchSize: 20, // Pull 20 jobs at a time
});If you need more throughput, you can spin up additional Worker instances on the same queue — they'll automatically share the workload. Switch to |
Beta Was this translation helpful? Give feedback.
-
|
Great follow-up! Let me clarify a few things: 1. SandboxedWorker DOES have events
import { SandboxedWorker } from 'bunqueue/client';
const worker = new SandboxedWorker('process-files', {
processor: './processors/file-processor.ts',
concurrency: 4,
maxMemory: 512, // 512MB per worker thread (default is 256)
timeout: 300000, // 5 min timeout for large files
});
worker.on('active', (job) => {
console.log(`Job ${job.id} started at ${Date.now()}`);
});
worker.on('completed', (job, result) => {
console.log(`Job ${job.id} completed`, result);
});
worker.on('failed', (job, error) => {
console.error(`Job ${job.id} failed: ${error.message}`);
});
worker.on('progress', (job, progress) => {
console.log(`Job ${job.id}: ${progress}%`);
});
await worker.start();2. Do you actually need SandboxedWorker for 200-300MB files?It depends on what you're doing with the files:
If you're just reading from FTP and writing to disk/S3, a regular Worker is better. 3. upsertJobScheduler + chains: this won't work directly
import { Queue, Worker, FlowProducer } from 'bunqueue/client';
const queue = new Queue('triggers', { embedded: true });
const flow = new FlowProducer({ embedded: true });
// 1. Schedule a "trigger" job every 60 seconds
await queue.upsertJobScheduler('customer-pipeline', {
every: 60000,
}, {
name: 'trigger',
data: { customerId: 'abc123' },
});
// 2. Worker for the trigger job creates the actual chain
const triggerWorker = new Worker('triggers', async (job) => {
await flow.addChain([
{
name: 'fetch-data',
queueName: 'pipeline',
data: { customerId: job.data.customerId },
},
{
name: 'process-data',
queueName: 'pipeline',
data: { customerId: job.data.customerId },
},
{
name: 'save-results',
queueName: 'pipeline',
data: { customerId: job.data.customerId },
},
]);
return { chainCreated: true };
}, { embedded: true });
// 3. SandboxedWorker (or regular Worker) processes the chain steps
const pipelineWorker = new SandboxedWorker('pipeline', {
processor: './processors/pipeline.ts',
concurrency: 4,
maxMemory: 512,
timeout: 300000,
});
pipelineWorker.on('completed', (job, result) => {
console.log(`Step ${job.name} completed for customer ${job.data.customerId}`);
});
await pipelineWorker.start();4. Preventing overlapping chainsThere's no built-in mechanism to block new chains while a previous one is running. The scheduler will keep firing regardless. You can handle this in the trigger worker: // Track running chains (in-memory, or use a dedicated queue/flag)
const runningChains = new Map<string, boolean>();
const triggerWorker = new Worker('triggers', async (job) => {
const { customerId } = job.data;
// Skip if chain is already running for this customer
if (runningChains.get(customerId)) {
console.log(`Chain already running for ${customerId}, skipping`);
return { skipped: true };
}
runningChains.set(customerId, true);
const result = await flow.addChain([
{ name: 'fetch', queueName: 'pipeline', data: { customerId } },
{ name: 'process', queueName: 'pipeline', data: { customerId } },
{ name: 'save', queueName: 'pipeline', data: { customerId } },
]);
return { chainCreated: true, jobIds: result.jobIds };
}, { embedded: true });
// Clear the flag when the LAST step of the chain completes
pipelineWorker.on('completed', (job) => {
if (job.name === 'save') {
runningChains.delete(job.data.customerId);
}
});This way the scheduler keeps firing, but the trigger worker skips if a chain is still in progress for that customer. |
Beta Was this translation helpful? Give feedback.
-
|
It seems |
Beta Was this translation helpful? Give feedback.
-
|
I have another big thing which might be a problem when using a |
Beta Was this translation helpful? Give feedback.


Hi @arthurvanl, great question!
Short answer: for large data (>1-2 MB), save to disk/S3 and pass the file path in the job result. For small data (<1 MB), passing it directly through
getParentResultis fine.Why?
Job results are stored in memory (LRU cache, max 5,000 entries) and optionally persisted to SQLite. Storing 20MB+ blobs per job would quickly exhaust memory and slow down SQLite operations.
Recommended pattern for large data: