diff --git a/api-backend/Makefile b/api-backend/Makefile index 4fcdaec..a5fc4ff 100644 --- a/api-backend/Makefile +++ b/api-backend/Makefile @@ -8,4 +8,4 @@ proto-generate: --ts_proto_out=$(PROTO_OUT_DIR) \ -I $(PROTO_INCLUDE_DIR) \ $(PROTO_INCLUDE_DIR)/*.proto \ - --ts_proto_opt=returnObservable=true,forceLong=long,useOptionals=true,esModuleInterop=true,outputEncodeMethods=false,outputJsonMethods=false,outputClientImpl=false,useProtoFieldName=true,snakeToCamel=false \ No newline at end of file + --ts_proto_opt=returnObservable=true,forceLong=long,esModuleInterop=true,outputEncodeMethods=false,outputJsonMethods=false,outputClientImpl=false,useProtoFieldName=true,snakeToCamel=false \ No newline at end of file diff --git a/api-backend/package.json b/api-backend/package.json index 1342867..6bd0c80 100644 --- a/api-backend/package.json +++ b/api-backend/package.json @@ -43,6 +43,7 @@ "sequelize": "^6.37.5", "sequelize-typescript": "^2.1.6", "ts-proto": "^2.6.1", + "ts-protoc-gen": "^0.15.0", "uuid": "^11.1.0" }, "devDependencies": { diff --git a/api-backend/src/aws/aws.module.ts b/api-backend/src/aws/aws.module.ts index 0be8a6d..b1393e0 100644 --- a/api-backend/src/aws/aws.module.ts +++ b/api-backend/src/aws/aws.module.ts @@ -1,11 +1,10 @@ +import { BullModule } from '@nestjs/bullmq'; import { forwardRef, Module } from '@nestjs/common'; -import { AwsService } from './aws.service'; import { ConfigModule } from '@nestjs/config'; -import { ECSClientService } from './ecs.service'; import { BotModule } from 'src/bot/bot.module'; -import { BullModule } from '@nestjs/bullmq'; import { ECS_TASK_QUEUE } from 'src/constants/bull-queue'; - +import { AwsService } from './aws.service'; +import { ECSClientService } from './ecs.service'; @Module({ imports: [ ConfigModule, diff --git a/api-backend/src/aws/ecs.service.ts b/api-backend/src/aws/ecs.service.ts index ebf2cba..444ab4b 100644 --- a/api-backend/src/aws/ecs.service.ts +++ b/api-backend/src/aws/ecs.service.ts @@ -1,4 +1,4 @@ -import { Injectable, Logger } from '@nestjs/common'; +import { Inject, Injectable, Logger, forwardRef } from '@nestjs/common'; import { ContainerOverride, DescribeTasksCommand, @@ -30,6 +30,7 @@ import { TASK_STOPPED_ERROR_CODES } from 'src/constants/ecs'; import { InjectQueue } from '@nestjs/bullmq'; import { Bot, ExecutionStatusLogEnum } from 'src/database/models/bot.model'; import { InjectModel } from '@nestjs/sequelize'; +import { BotService } from 'src/bot/bot.service'; interface TaskInfo { taskId?: string; lastStatus?: string; @@ -53,6 +54,8 @@ export class ECSClientService { @InjectQueue(ECS_TASK_QUEUE) private readonly ecsTaskQueue: Queue, private readonly configService: ConfigService, + @Inject(forwardRef(() => BotService)) + private readonly botService: BotService, ) { this.ecsClient = new ECSClient({ credentials: { @@ -222,6 +225,7 @@ export class ECSClientService { await bot.update({ status: ExecutionStatusLogEnum.COMPLETED, }); + await this.botService.triggerTranscriptGeneration(bot.id); } } catch (error) { await bot.update({ diff --git a/api-backend/src/bot/bot.service.ts b/api-backend/src/bot/bot.service.ts index 4283b21..098a278 100644 --- a/api-backend/src/bot/bot.service.ts +++ b/api-backend/src/bot/bot.service.ts @@ -1,5 +1,7 @@ import { BadRequestException, + forwardRef, + Inject, Injectable, InternalServerErrorException, NotFoundException, @@ -26,12 +28,15 @@ import { TranscriptionLogResponse, } from 'src/interfaces/proto-generated/transcript_management'; import { v4 as uuidv4 } from 'uuid'; +import { TASK_STOPPED_ERROR_CODES } from 'src/constants/ecs'; + @Injectable() export class BotService { constructor( @InjectModel(Bot) private botModel: typeof Bot, private readonly awsService: AwsService, + @Inject(forwardRef(() => ECSClientService)) private readonly ecsService: ECSClientService, private readonly configService: ConfigService, private readonly workerService: WorkerService, @@ -65,6 +70,19 @@ export class BotService { return this.initiateBot(bot.id); } + private mapPlatformToType(platform: BotPlatform): BotPlatformMappingType { + switch (platform) { + case BotPlatform.GOOGLE_MEET: + return BotPlatformMappingType.GOOGLE; + case BotPlatform.TEAMS: + return BotPlatformMappingType.MS_TEAMS; + case BotPlatform.ZOOM: + return BotPlatformMappingType.ZOOM; + default: + throw new Error(`Unknown platform: ${platform}`); + } + } + async initiateBot(bot: Bot | string): Promise { let botInstance: Bot; if (typeof bot === 'string') { @@ -80,7 +98,7 @@ export class BotService { const metaData = { id: botInstance.id, user_id: botInstance.apiKey.userId, - bot_type: BotPlatformMappingType[botInstance.platform], + bot_type: String(this.mapPlatformToType(botInstance.platform)), ...(botInstance.title && { meeting_title: botInstance.title }), }; @@ -97,21 +115,6 @@ export class BotService { 'Failed to generate presigned url.', ); - try { - await lastValueFrom( - this.workerService.createFileLog({ - id: botInstance.id, - raw_file_key: tarObjectName, - ...(botInstance.title && { meeting_title: botInstance.title }), - ...(botInstance.apiKey.userId && { - created_by_user_id: botInstance.apiKey.userId, - }), - }), - ); - } catch (e) { - console.log(e); - } - let taskId; switch (botInstance.platform) { case BotPlatform.GOOGLE_MEET: @@ -149,6 +152,7 @@ export class BotService { await botInstance.update({ taskId, status: ExecutionStatusLogEnum.STARTED, + tarFileKey: tarObjectName, }); await botInstance.reload(); return botInstance; @@ -387,4 +391,64 @@ export class BotService { hasMore: transcript.count > pagination.page_size * pagination.page, }; } + + async findBotsWithNonErrorFailures(): Promise { + const bots = await this.botModel.findAll({ + where: { + status: { [Op.not]: ExecutionStatusLogEnum.FAILED }, + taskId: { [Op.not]: null }, + // Only get bots from the last 24 hours + createdAt: { + [Op.gte]: moment().subtract(24, 'hours').toDate(), + }, + }, + include: [{ model: ApiKey, attributes: ['userId'] }], + }); + + // Filter bots to only those that failed for non-error reasons + const results = []; + for (const bot of bots) { + try { + const taskInfo = await this.ecsService.healthCheckTask(bot.taskId); + if ( + taskInfo?.lastStatus === 'STOPPED' && + !TASK_STOPPED_ERROR_CODES.includes(taskInfo?.stopCode) + ) { + results.push(bot); + } + } catch (error) { + console.error(`Error checking task status for bot ${bot.id}:`, error); + } + } + + return results; + } + + async triggerTranscriptGeneration(botId: string): Promise { + const bot = await this.botModel.findByPk(botId, { + include: [{ model: ApiKey, attributes: ['userId'] }], + }); + if (!bot) throw new NotFoundException('Bot not found'); + + if (!bot.tarFileKey) + throw new BadRequestException('Bot has no tar file key'); + + try { + await lastValueFrom( + this.workerService.createFileLog({ + id: bot.id, + raw_file_key: bot.tarFileKey, + ...(bot.title && { meeting_title: bot.title }), + ...(bot.apiKey.userId && { + created_by_user_id: bot.apiKey.userId, + }), + }), + ); + } catch (error) { + console.error( + `Error triggering transcript generation for bot ${bot.id}:`, + error, + ); + } + } } diff --git a/api-backend/src/cron-job/cron-job.module.ts b/api-backend/src/cron-job/cron-job.module.ts index 419ceea..bca2600 100644 --- a/api-backend/src/cron-job/cron-job.module.ts +++ b/api-backend/src/cron-job/cron-job.module.ts @@ -3,9 +3,10 @@ import { CronJobService } from './cron-job.service'; import { ScheduleModule } from '@nestjs/schedule'; import { AwsModule } from 'src/aws/aws.module'; import { BotModule } from 'src/bot/bot.module'; +import { GrpcModule } from 'src/grpc/grpc.module'; @Module({ - imports: [ScheduleModule.forRoot(), AwsModule, BotModule], + imports: [ScheduleModule.forRoot(), AwsModule, BotModule, GrpcModule], providers: [CronJobService], exports: [CronJobService], }) diff --git a/api-backend/src/cron-job/cron-job.service.ts b/api-backend/src/cron-job/cron-job.service.ts index b30775a..6260039 100644 --- a/api-backend/src/cron-job/cron-job.service.ts +++ b/api-backend/src/cron-job/cron-job.service.ts @@ -1,13 +1,16 @@ import { Injectable } from '@nestjs/common'; -import { CronExpression } from '@nestjs/schedule'; -import { Cron } from '@nestjs/schedule'; +import { Cron, CronExpression } from '@nestjs/schedule'; +import { lastValueFrom } from 'rxjs'; import { ECSClientService } from 'src/aws/ecs.service'; import { BotService } from 'src/bot/bot.service'; +import { WorkerService } from 'src/grpc/worker.service'; + @Injectable() export class CronJobService { constructor( private readonly ecsService: ECSClientService, private readonly botService: BotService, + private readonly workerService: WorkerService, ) {} @Cron(CronExpression.EVERY_5_MINUTES) @@ -19,4 +22,22 @@ export class CronJobService { async runEvery10Minutes(): Promise { await this.botService.initiateScheduledBot(); } + + @Cron(CronExpression.EVERY_10_MINUTES) + async checkBotStatusAndProcess(): Promise { + try { + console.log('Checking bot status and processing'); + // First check if the worker service is healthy + const healthCheck = await lastValueFrom(this.workerService.healthCheck()); + + if (healthCheck?.ServingStatus !== '200') { + console.log('Worker service is not healthy, skipping bot status check'); + return; + } + // Get bot status through ECS service + await this.ecsService.syncTaskStatus(); + } catch (error) { + console.error('Error in bot status check cron job:', error); + } + } } diff --git a/api-backend/src/database/models/bot.model.ts b/api-backend/src/database/models/bot.model.ts index c116a8f..d0507a1 100644 --- a/api-backend/src/database/models/bot.model.ts +++ b/api-backend/src/database/models/bot.model.ts @@ -118,6 +118,12 @@ export class Bot extends Model { }) status: ExecutionStatusLogEnum; + @Column({ + type: DataType.STRING, + allowNull: true, + }) + tarFileKey: string; + @Column({ type: DataType.JSONB, allowNull: true, diff --git a/api-backend/src/grpc/worker.service.ts b/api-backend/src/grpc/worker.service.ts index fc43e0e..c128adf 100644 --- a/api-backend/src/grpc/worker.service.ts +++ b/api-backend/src/grpc/worker.service.ts @@ -2,7 +2,7 @@ import { Inject, Injectable } from '@nestjs/common'; import { ClientGrpc } from '@nestjs/microservices'; import { MICROSERVICES } from 'src/constants/grpc'; import { - FileManagementController, + DocumentFileManagementController, HealthCheckController, FileManagementListController, TranscriptionLogListController, @@ -29,8 +29,8 @@ export class WorkerService { onModuleInit() { this.fileManagementService = - this.client.getService( - 'FileManagementController', + this.client.getService( + 'DocumentFileManagementController', ); this.healthCheckService = this.client.getService( 'HealthCheckController', @@ -60,7 +60,7 @@ export class WorkerService { .add(this.deadlineInSeconds, 'seconds') .toDate(); - return this.fileManagementService.Create(payload, { deadline }); + return this.fileManagementService.DocumentCreate(payload, { deadline }); } listFileLogs(payload: any): Observable { diff --git a/api-backend/src/interfaces/proto-generated/google/protobuf/empty.ts b/api-backend/src/interfaces/proto-generated/google/protobuf/empty.ts index 0edf5e2..17eab2d 100644 --- a/api-backend/src/interfaces/proto-generated/google/protobuf/empty.ts +++ b/api-backend/src/interfaces/proto-generated/google/protobuf/empty.ts @@ -1,7 +1,7 @@ // Code generated by protoc-gen-ts_proto. DO NOT EDIT. // versions: // protoc-gen-ts_proto v2.6.1 -// protoc v5.28.3 +// protoc v5.29.3 // source: google/protobuf/empty.proto /* eslint-disable */ diff --git a/api-backend/src/interfaces/proto-generated/transcript_management.ts b/api-backend/src/interfaces/proto-generated/transcript_management.ts index a0a0f77..026cd40 100644 --- a/api-backend/src/interfaces/proto-generated/transcript_management.ts +++ b/api-backend/src/interfaces/proto-generated/transcript_management.ts @@ -1,7 +1,7 @@ // Code generated by protoc-gen-ts_proto. DO NOT EDIT. // versions: // protoc-gen-ts_proto v2.6.1 -// protoc v5.28.3 +// protoc v5.29.3 // source: transcript_management.proto /* eslint-disable */ @@ -58,8 +58,8 @@ export interface TranscriptionLogResponse { transcription_Data?: string | undefined; } -export interface FileManagementController { - Create(request: FileManagementRequest): Observable; +export interface DocumentFileManagementController { + DocumentCreate(request: FileManagementRequest): Observable; } export interface FileManagementListController { diff --git a/api-backend/src/proto/transcript_management.proto b/api-backend/src/proto/transcript_management.proto index 35a97ba..a745465 100644 --- a/api-backend/src/proto/transcript_management.proto +++ b/api-backend/src/proto/transcript_management.proto @@ -4,8 +4,8 @@ package worker_backend.transcript_management; import "google/protobuf/empty.proto"; -service FileManagementController { - rpc Create(FileManagementRequest) returns (FileManagementResponse) {} +service DocumentFileManagementController { + rpc DocumentCreate(FileManagementRequest) returns (FileManagementResponse) {} } service FileManagementListController { @@ -32,7 +32,7 @@ message FileManagementRequest { optional string id = 1; string raw_file_key = 2; optional string meeting_title = 3; - optional string created_by_user_id = 8; + optional string created_by_user_id = 4; } message FileManagementResponse { @@ -42,10 +42,10 @@ message FileManagementResponse { optional string meeting_title = 4; optional string meeting_meeting_start_time = 5; optional string meeting_meeting_end_time = 6; - optional string execution_id = 11; - optional int32 bot_used = 12; - optional int32 status = 13; - optional string created_by_user_id = 14; + optional string execution_id = 7; + optional int32 bot_used = 8; + optional int32 status = 9; + optional string created_by_user_id = 10; } message HealthCheckhealthCheckResponse { @@ -66,4 +66,5 @@ message TranscriptionLogResponse { optional string transcription_start_time_milliseconds = 3; optional string transcription_end_time_milliseconds = 4; optional string transcription_Data = 5; -} \ No newline at end of file +} + diff --git a/api-backend/yarn.lock b/api-backend/yarn.lock index 61b5b4e..49a4a33 100644 --- a/api-backend/yarn.lock +++ b/api-backend/yarn.lock @@ -4115,6 +4115,11 @@ globby@^11.1.0: merge2 "^1.4.1" slash "^3.0.0" +google-protobuf@^3.15.5: + version "3.21.4" + resolved "https://registry.yarnpkg.com/google-protobuf/-/google-protobuf-3.21.4.tgz#2f933e8b6e5e9f8edde66b7be0024b68f77da6c9" + integrity sha512-MnG7N936zcKTco4Jd2PX2U96Kf9PxygAPKBug+74LHzmHXmceN16MmRcdgZv+DGef/S9YvQAfRsNCn4cjf9yyQ== + gopd@^1.0.1, gopd@^1.2.0: version "1.2.0" resolved "https://registry.yarnpkg.com/gopd/-/gopd-1.2.0.tgz#89f56b8217bdbc8802bd299df6d7f1081d7e51a1" @@ -6389,6 +6394,13 @@ ts-proto@^2.6.1: ts-poet "^6.7.0" ts-proto-descriptors "2.0.0" +ts-protoc-gen@^0.15.0: + version "0.15.0" + resolved "https://registry.yarnpkg.com/ts-protoc-gen/-/ts-protoc-gen-0.15.0.tgz#2fec5930b46def7dcc9fa73c060d770b7b076b7b" + integrity sha512-TycnzEyrdVDlATJ3bWFTtra3SCiEP0W0vySXReAuEygXCUr1j2uaVyL0DhzjwuUdQoW5oXPwk6oZWeA0955V+g== + dependencies: + google-protobuf "^3.15.5" + tsconfig-paths-webpack-plugin@4.2.0: version "4.2.0" resolved "https://registry.yarnpkg.com/tsconfig-paths-webpack-plugin/-/tsconfig-paths-webpack-plugin-4.2.0.tgz#f7459a8ed1dd4cf66ad787aefc3d37fff3cf07fc" diff --git a/worker-backend/api/transcript_management/worker.py b/worker-backend/api/transcript_management/worker.py index a614a1c..38cd4fa 100644 --- a/worker-backend/api/transcript_management/worker.py +++ b/worker-backend/api/transcript_management/worker.py @@ -28,6 +28,7 @@ def _transcript_generator_worker(file_key: str): transcriber = aai.Transcriber() try: + logging.info(f"file key: {file_key}: {s3_client.bucket_name}") user_id, bot_type, execution_id, meeting_title = get_meta_data(file_key, s3_client) file_ref, created = FileLog.objects.update_or_create( raw_file_key=file_key,