Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 14 additions & 6 deletions src/modules/authentication/services/authentication.service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,8 @@ class AuthenticationService {

await this.getCurrentData();

await this.config.setConfigValue("auth_failure_reported", false);

if (config.worker_id) process.env.WORKER_ID = config.worker_id;

return this.getConfig();
Expand Down Expand Up @@ -301,12 +303,15 @@ class AuthenticationService {
});
} catch (error) {
if (error instanceof WorkerError) throw error;

const authFailureReported = !!this.getConfig().auth_failure_reported;
if (!authFailureReported)
await this.config.setConfigValue("auth_failure_reported", true);

throw new WorkerError({
key: "@authentication_service_register/REGISTRATION_FAILED",
message: `Registration failed. ${getErrorMessage(error)}`,
debug: {
error,
},
...(authFailureReported ? {} : { debug: { error } }),
});
}
}
Expand All @@ -324,12 +329,15 @@ class AuthenticationService {
});
} catch (error) {
if (error instanceof WorkerError) throw error;

const authFailureReported = !!this.getConfig().auth_failure_reported;
if (!authFailureReported)
await this.config.setConfigValue("auth_failure_reported", true);

throw new WorkerError({
key: "@authentication_service_get_refresh_token/GET_REFRESH_TOKEN_FAILED",
message: `Get refresh token failed. ${getErrorMessage(error)}`,
debug: {
error,
},
...(authFailureReported ? {} : { debug: { error } }),
});
}
}
Expand Down
2 changes: 2 additions & 0 deletions src/modules/configuration/constants/defaultConfiguration.ts
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@ export const defaultConfiguration = {
access_token_expires_at: null as string | null | undefined,

name: null as string | null | undefined,

auth_failure_reported: false as boolean | null | undefined,
},
[CONFIGURATION.JOB]: {},

Expand Down
225 changes: 153 additions & 72 deletions src/modules/processing/services/processing.service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -622,6 +622,38 @@ class ProcessingService {
}
}

private async getAllLogsFromContainer({
container,
}: {
container: Docker.Container;
}): Promise<string> {
if (!container) return "";

try {
const logs = await container.logs({
stdout: true,
stderr: true,
});

return logs.toString("utf-8");
} catch (error) {
logger.error(
`❌ Fail to get full logs from container ${container.id}. ${getErrorMessage(error)}`,
);
return "";
}
}

private async getMatchedFilesByGlobPatterns({
containerDir,
globPatterns,
}: {
containerDir: string;
globPatterns: string[];
}): Promise<string[]> {
return glob(globPatterns.map(pattern => path.join(containerDir, pattern)));
}

private async updateProcessingFilePermissions(processing: IProcessing) {
const environment = getSystemEnvironment();

Expand Down Expand Up @@ -694,6 +726,19 @@ class ProcessingService {

await this.updateProcessingFilePermissions(processing);

const outputLogs = await this.getAllLogsFromContainer({ container });
if (outputLogs.trim().length > 0) {
const outputLogsFilename = `_autodroid_worker_processing_${processing.data.id}_output.log`;
const outputLogsPath = path.join(
processing.system_output_dir,
outputLogsFilename,
);

await fsPromises.writeFile(outputLogsPath, outputLogs, {
encoding: "utf-8",
});
}

const files = await fsPromises.readdir(processing.system_output_dir);
if (files.length === 0) {
throw new WorkerError({
Expand Down Expand Up @@ -725,16 +770,13 @@ class ProcessingService {

private async zipDirectory({
containerDir,
globPatterns,
files,
zipDestinationFilePath,
}: {
containerDir: string;
globPatterns: string[];
files: string[];
zipDestinationFilePath: string;
}): Promise<IProcessingFileData> {
const files = await glob(
globPatterns.map(pattern => path.join(containerDir, pattern)),
);
return new Promise<IProcessingFileData>((resolve, reject) => {
const output = fsSync.createWriteStream(zipDestinationFilePath);
const archive = archiver("zip", { zlib: { level: 9 } });
Expand Down Expand Up @@ -775,8 +817,12 @@ class ProcessingService {
kind: OutputFileKind;
fileData: IProcessingFileData;
}): Promise<string> {
if (processing.data.result_file?.upload_url)
return processing.data.result_file.upload_url;
const uploadUrlByKind = {
result_file: processing.data.result_file?.upload_url,
metrics_file: processing.data.metrics_file?.upload_url,
} satisfies Partial<Record<OutputFileKind, string | null | undefined>>;

if (uploadUrlByKind[kind]) return uploadUrlByKind[kind]!;

try {
const { data } = await retry("@processing/GENERATE_UPLOAD", () =>
Expand Down Expand Up @@ -805,89 +851,124 @@ class ProcessingService {
try {
const { processor } = processing.data;

const resultFilePath = path.join(
processing.system_working_dir,
`${processing.data.id}_result_file.zip`,
);

const resultFile = await this.zipDirectory({
const resultFiles = await this.getMatchedFilesByGlobPatterns({
containerDir: processing.system_output_dir,
globPatterns: processor.configuration.output_result_file_glob_patterns,
zipDestinationFilePath: resultFilePath,
});

logger.info(
`📦 Zip result file created for processing id ${processing.data.id}.`,
);

const metricsFilePath = path.join(
processing.system_working_dir,
`${processing.data.id}_metrics_file.zip`,
);

const metricsFile = await this.zipDirectory({
const metricsFiles = await this.getMatchedFilesByGlobPatterns({
containerDir: processing.system_output_dir,
globPatterns: processor.configuration.output_metrics_file_glob_patterns,
zipDestinationFilePath: metricsFilePath,
});

logger.info(
`📦 Zip metrics file created for processing id ${processing.data.id}.`,
);
if (resultFiles.length === 0 && metricsFiles.length === 0)
throw new WorkerError({
key: "@processing_service_zip_and_upload/NO_MATCHING_FILES",
message: `No files match result or metrics patterns for processing id ${processing.data.id}.`,
debug: {
processing_id: processing.data.id,
result_patterns:
processor.configuration.output_result_file_glob_patterns,
metrics_patterns:
processor.configuration.output_metrics_file_glob_patterns,
},
});

const resultFileUploadUrl = await this.getUploadUrl({
processing,
kind: "result_file",
fileData: resultFile,
});
if (resultFiles.length > 0) {
const resultFilePath = path.join(
processing.system_working_dir,
`${processing.data.id}_result_file.zip`,
);

logger.info(
`📦 Uploading result zip file for processing id ${processing.data.id}...`,
);
const resultFile = await this.zipDirectory({
containerDir: processing.system_output_dir,
files: resultFiles,
zipDestinationFilePath: resultFilePath,
});

const resultStream = fsSync.createReadStream(resultFilePath);
await axios.put(resultFileUploadUrl, resultStream, {
headers: {
"Content-Type": "application/zip",
},
});
logger.info(
`📦 Zip result file created for processing id ${processing.data.id}.`,
);

await retry("@processing/REPORT_RESULT_FILE_UPLOADED", () =>
this.context.api.client.post(
`/worker/processing/${processing.data.id}/result_file/uploaded`,
),
);
const resultFileUploadUrl = await this.getUploadUrl({
processing,
kind: "result_file",
fileData: resultFile,
});

logger.info(
`📤 Uploaded result zip file for processing id ${processing.data.id}!`,
);
logger.info(
`📦 Uploading result zip file for processing id ${processing.data.id}...`,
);

const metricsFileUploadUrl = await this.getUploadUrl({
processing,
kind: "metrics_file",
fileData: metricsFile,
});
const resultStream = fsSync.createReadStream(resultFilePath);
await axios.put(resultFileUploadUrl, resultStream, {
headers: {
"Content-Type": "application/zip",
},
});

logger.info(
`📦 Uploading metrics zip file for processing id ${processing.data.id}...`,
);
await retry("@processing/REPORT_RESULT_FILE_UPLOADED", () =>
this.context.api.client.post(
`/worker/processing/${processing.data.id}/result_file/uploaded`,
),
);

const metricsStream = fsSync.createReadStream(metricsFilePath);
await axios.put(metricsFileUploadUrl, metricsStream, {
headers: {
"Content-Type": "application/zip",
},
});
logger.info(
`📤 Uploaded result zip file for processing id ${processing.data.id}!`,
);
} else {
logger.warn(
`⚠️ No result files matched for processing id ${processing.data.id}. Skipping result upload.`,
);
}

await retry("@processing/REPORT_METRICS_FILE_UPLOADED", () =>
this.context.api.client.post(
`/worker/processing/${processing.data.id}/metrics_file/uploaded`,
),
);
if (metricsFiles.length > 0) {
const metricsFilePath = path.join(
processing.system_working_dir,
`${processing.data.id}_metrics_file.zip`,
);

logger.info(
`📤 Uploaded metrics zip file for processing id ${processing.data.id}!`,
);
const metricsFile = await this.zipDirectory({
containerDir: processing.system_output_dir,
files: metricsFiles,
zipDestinationFilePath: metricsFilePath,
});

logger.info(
`📦 Zip metrics file created for processing id ${processing.data.id}.`,
);

const metricsFileUploadUrl = await this.getUploadUrl({
processing,
kind: "metrics_file",
fileData: metricsFile,
});

logger.info(
`📦 Uploading metrics zip file for processing id ${processing.data.id}...`,
);

const metricsStream = fsSync.createReadStream(metricsFilePath);
await axios.put(metricsFileUploadUrl, metricsStream, {
headers: {
"Content-Type": "application/zip",
},
});

await retry("@processing/REPORT_METRICS_FILE_UPLOADED", () =>
this.context.api.client.post(
`/worker/processing/${processing.data.id}/metrics_file/uploaded`,
),
);

logger.info(
`📤 Uploaded metrics zip file for processing id ${processing.data.id}!`,
);
} else {
logger.warn(
`⚠️ No metrics files matched for processing id ${processing.data.id}. Skipping metrics upload.`,
);
}

logger.info(
`✅ Zip file uploaded successfully for processing id ${processing.data.id}.`,
Expand Down