Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion backend/controllers/verificationController.js
Original file line number Diff line number Diff line change
Expand Up @@ -628,20 +628,23 @@ const bulkIssueCredentials = async (req, res) => {
}

const adminId = req.user.id;
const dryRun = req.query.dryRun === 'true' || req.body?.dryRun === true;
const job = await BulkVerificationJob.create({
status: 'pending',
mode: dryRun ? 'dry-run' : 'bulk',
totalRows: normalizedRecords.length,
actorId: adminId,
});

// Background processing (service re-validates again)
bulkVerificationService.processJob(job._id, normalizedRecords, adminId).catch((err) => {
bulkVerificationService.processJob(job._id, normalizedRecords, adminId, { dryRun }).catch((err) => {
console.error(`Error processing bulk verification job ${job._id}:`, err);
});

res.status(202).json(apiResponse.successResponse({
jobId: job._id,
status: job.status,
mode: job.mode,
totalRows: job.totalRows,
}, 'Bulk verification job initiated successfully'));
return;
Expand Down
11 changes: 10 additions & 1 deletion backend/models/BulkVerificationJob.js
Original file line number Diff line number Diff line change
Expand Up @@ -23,13 +23,22 @@ const bulkVerificationJobSchema = new mongoose.Schema({
type: Number,
default: 0,
},
mode: {
type: String,
enum: ['bulk', 'dry-run'],
default: 'bulk',
},
results: [{
rowNumber: { type: Number, required: true },
userId: { type: String, trim: true },
walletAddress: { type: String, lowercase: true, trim: true },
action: { type: String, trim: true },
idempotencyKey: { type: String, trim: true },
status: { type: String, enum: ['success', 'failure', 'skipped'], required: true },
status: {
type: String,
enum: ['success', 'failure', 'skipped', 'dry-run-success', 'dry-run-failure', 'dry-run-skipped'],
required: true,
},
error: { type: String },
details: { type: mongoose.Schema.Types.Mixed, default: {} },
}],
Expand Down
176 changes: 82 additions & 94 deletions backend/services/bulkVerificationService.js
Original file line number Diff line number Diff line change
Expand Up @@ -30,19 +30,17 @@ const parseCSV = (csvText) => {

if (char === '"') {
if (inQuotes && nextChar === '"') {
// Escaped double quote ("") -> actual double quote character
currentField += '"';
i++; // Skip next quote
i++;
} else {
// Toggle quote state
inQuotes = !inQuotes;
}
} else if (char === ',' && !inQuotes) {
currentLine.push(currentField.trim());
currentField = '';
} else if ((char === '\n' || char === '\r') && !inQuotes) {
if (char === '\r' && nextChar === '\n') {
i++; // Skip \n
i++;
}
currentLine.push(currentField.trim());
if (currentLine.length > 0 && (currentLine.length > 1 || currentLine[0] !== '')) {
Expand All @@ -57,7 +55,9 @@ const parseCSV = (csvText) => {

if (currentField || currentLine.length > 0) {
currentLine.push(currentField.trim());
lines.push(currentLine);
if (currentLine.length > 0 && (currentLine.length > 1 || currentLine[0] !== '')) {
lines.push(currentLine);
}
}

if (lines.length === 0) return [];
Expand All @@ -67,10 +67,7 @@ const parseCSV = (csvText) => {

for (let i = 1; i < lines.length; i++) {
const line = lines[i];
// Skip entirely empty lines
if (line.length === 0 || (line.length === 1 && line[0] === '')) {
continue;
}
if (line.length === 0 || (line.length === 1 && line[0] === '')) continue;

const record = {};
headers.forEach((header, index) => {
Expand All @@ -82,20 +79,27 @@ const parseCSV = (csvText) => {
return records;
};

const statusFor = ({ dryRun, outcomeType }) => {
if (!dryRun) return outcomeType;
if (outcomeType === 'success') return 'dry-run-success';
if (outcomeType === 'skipped') return 'dry-run-skipped';
if (outcomeType === 'failure') return 'dry-run-failure';
return outcomeType;
};

/**
* Background processor for bulk verification jobs
* @param {string} jobId - Database BulkVerificationJob ObjectId
* @param {Array<Object>} records - List of parsed records from CSV
* @param {string} adminId - Admin/actor user ID
*/
const processJob = async (jobId, records, adminId) => {
const processJob = async (jobId, records, adminId, { dryRun } = {}) => {
const job = await BulkVerificationJob.findById(jobId);
if (!job) return;

job.status = 'processing';
await job.save();

// Audit bulk job initiated
try {
await appendAuditEvent({
action: 'bulk_job_initiated',
Expand All @@ -114,7 +118,8 @@ const processJob = async (jobId, records, adminId) => {

for (let i = 0; i < records.length; i++) {
const record = records[i];
const rowNumber = i + 2; // CSV is 1-indexed, header is row 1
const rowNumber = i + 2;

const inputUserId = record.userid || '';
const email = record.email || '';
const walletAddress = record.walletaddress || '';
Expand All @@ -127,42 +132,29 @@ const processJob = async (jobId, records, adminId) => {
const action = CHALLENGE_ACTIONS[actionStr] || CHALLENGE_ACTIONS.ISSUE_CREDENTIAL;

try {
// 1. Resolve User
// Resolve user
let user = null;
if (userId) {
if (/^[a-fA-F0-9]{24}$/.test(userId)) {
user = await User.findById(userId);
}
} else if (email) {
if (userId && /^[a-fA-F0-9]{24}$/.test(userId)) {
user = await User.findById(userId);
} else if (!userId && email) {
user = await User.findOne({ email });
if (user) {
userId = user._id.toString();
}
}

if (!user) {
throw new Error('User not found');
}


if (!user) {
throw new Error('User not found');
if (user) userId = user._id.toString();
}

if (!user) throw new Error('User not found');
userId = user._id.toString();

// 2. Validate walletAddress
// Validate wallet
if (!walletAddress || !/^0x[a-fA-F0-9]{40}$/.test(walletAddress)) {
throw new Error('Invalid wallet address');
}

// 3. Derive Idempotency Key
// Derive idempotency key
const idempotencyKey = crypto
.createHash('sha256')
.update(`${userId}:${walletAddress.toLowerCase()}:${action}`)
.digest('hex');

// 4. Reserve Idempotency Key
const fingerprint = createFingerprint({ action, actorId: adminId, userId, walletAddress });
const reservation = await reserveIdempotencyKey({
action,
Expand All @@ -171,89 +163,91 @@ const processJob = async (jobId, records, adminId) => {
fingerprint,
});

// Idempotency record already completed
if (!reservation.reserved) {
if (reservation.record && reservation.record.state === 'completed') {
results.push({
rowNumber,
userId,


const status = statusFor({ dryRun, outcomeType: 'skipped' });
results.push({
rowNumber,
userId,
walletAddress,
action,
idempotencyKey,
status: 'skipped',
status,
details: { message: 'Request already processed (idempotency)' },
});

// Audit minimal per-row summary (success)
try {
await appendAuditEvent({
action: 'bulk_row_processed',
actorId: adminId,
targetUserId: userId,
walletAddress,
status: 'success',
metadata: {
rowNumber,
bulkStatus: 'skipped',
bulkAction: action,
},
req: {},
});
} catch (_) {
// best-effort
}
successCount++;
continue;
} else {
throw new Error('Request already in progress or duplicate idempotency key');
}

// Reserved but not completed
const status = statusFor({ dryRun, outcomeType: 'failure' });
results.push({
rowNumber,
userId,
walletAddress,
action,
idempotencyKey,
status,
details: { message: 'Request already in progress or duplicate idempotency key' },
});
failureCount += dryRun ? 1 : 1;
continue;
}

// 5. Check if user is already verified (for ISSUE_CREDENTIAL)
// Already verified (issue credential only)
if (action === CHALLENGE_ACTIONS.ISSUE_CREDENTIAL && user.verification?.isVerified) {
const status = statusFor({ dryRun, outcomeType: 'skipped' });
results.push({
rowNumber,
userId,

walletAddress,
action,
idempotencyKey,
status: 'skipped',
status,
details: { message: 'User is already verified' },
});

try {
await appendAuditEvent({
action: 'bulk_row_processed',
if (!dryRun) {
await storeCompletedIdempotencyRecord({
action,
actorId: adminId,
targetUserId: userId,
walletAddress,
status: 'success',
metadata: {
rowNumber,
bulkStatus: 'skipped',
bulkAction: action,
},
req: {},
key: idempotencyKey,
fingerprint,
response: { success: true, message: 'User already verified' },
});
} catch (_) {
// best-effort
}

successCount++;
await storeCompletedIdempotencyRecord({
continue;
}

if (dryRun) {
const predictedDetails = {
predicted: true,
message: 'Dry-run prediction: would execute side effects if dryRun=false',
willExecute: {
action,
signaturePresent: Boolean(signature),
nonceRequired: Boolean(signature),
idempotencyReserved: true,
},
};

results.push({
rowNumber,
userId,
walletAddress,
action,
actorId: adminId,
key: idempotencyKey,
fingerprint,
response: { success: true, message: 'User already verified' },
idempotencyKey,
status: statusFor({ dryRun, outcomeType: 'success' }),
details: predictedDetails,
});
successCount++;
continue;
}

// 6. Process Action
// Execute real side effects
let outcome = null;

if (signature) {
if (!nonce || !expiresAtVal) {
throw new Error('Nonce and expiresAt are required when signature is provided');
Expand Down Expand Up @@ -291,7 +285,6 @@ const processJob = async (jobId, records, adminId) => {
});
}

// 7. Store Completed Idempotency Record
await storeCompletedIdempotencyRecord({
action,
actorId: adminId,
Expand All @@ -310,19 +303,19 @@ const processJob = async (jobId, records, adminId) => {
details: outcome,
});
successCount++;

} catch (error) {
const status = statusFor({ dryRun, outcomeType: 'failure' });
results.push({
rowNumber,
userId: userId || undefined,
walletAddress: walletAddress || undefined,
action,
status: 'failure',
idempotencyKey: undefined,
status,
error: error.message || error.toString(),
});
failureCount++;

// Audit minimal per-row summary (failure)
try {
await appendAuditEvent({
action: 'bulk_row_processed',
Expand All @@ -343,8 +336,6 @@ const processJob = async (jobId, records, adminId) => {
}
}


// Update progress after each row
job.processedRows = i + 1;
await job.save();
}
Expand All @@ -355,7 +346,6 @@ const processJob = async (jobId, records, adminId) => {
job.results = results;
await job.save();

// Audit bulk job completed
try {
await appendAuditEvent({
action: 'bulk_job_completed',
Expand All @@ -369,10 +359,8 @@ const processJob = async (jobId, records, adminId) => {
}
};


module.exports = {
parseCSV,
processJob,
};


Loading