Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion api-backend/Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -8,4 +8,4 @@ proto-generate:
--ts_proto_out=$(PROTO_OUT_DIR) \
-I $(PROTO_INCLUDE_DIR) \
$(PROTO_INCLUDE_DIR)/*.proto \
--ts_proto_opt=returnObservable=true,forceLong=long,useOptionals=true,esModuleInterop=true,outputEncodeMethods=false,outputJsonMethods=false,outputClientImpl=false,useProtoFieldName=true,snakeToCamel=false
--ts_proto_opt=returnObservable=true,forceLong=long,esModuleInterop=true,outputEncodeMethods=false,outputJsonMethods=false,outputClientImpl=false,useProtoFieldName=true,snakeToCamel=false
1 change: 1 addition & 0 deletions api-backend/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@
"sequelize": "^6.37.5",
"sequelize-typescript": "^2.1.6",
"ts-proto": "^2.6.1",
"ts-protoc-gen": "^0.15.0",
"uuid": "^11.1.0"
},
"devDependencies": {
Expand Down
7 changes: 3 additions & 4 deletions api-backend/src/aws/aws.module.ts
Original file line number Diff line number Diff line change
@@ -1,11 +1,10 @@
import { BullModule } from '@nestjs/bullmq';
import { forwardRef, Module } from '@nestjs/common';
import { AwsService } from './aws.service';
import { ConfigModule } from '@nestjs/config';
import { ECSClientService } from './ecs.service';
import { BotModule } from 'src/bot/bot.module';
import { BullModule } from '@nestjs/bullmq';
import { ECS_TASK_QUEUE } from 'src/constants/bull-queue';

import { AwsService } from './aws.service';
import { ECSClientService } from './ecs.service';
@Module({
imports: [
ConfigModule,
Expand Down
6 changes: 5 additions & 1 deletion api-backend/src/aws/ecs.service.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import { Injectable, Logger } from '@nestjs/common';
import { Inject, Injectable, Logger, forwardRef } from '@nestjs/common';
import {
ContainerOverride,
DescribeTasksCommand,
Expand Down Expand Up @@ -30,6 +30,7 @@ import { TASK_STOPPED_ERROR_CODES } from 'src/constants/ecs';
import { InjectQueue } from '@nestjs/bullmq';
import { Bot, ExecutionStatusLogEnum } from 'src/database/models/bot.model';
import { InjectModel } from '@nestjs/sequelize';
import { BotService } from 'src/bot/bot.service';
interface TaskInfo {
taskId?: string;
lastStatus?: string;
Expand All @@ -53,6 +54,8 @@ export class ECSClientService {
@InjectQueue(ECS_TASK_QUEUE)
private readonly ecsTaskQueue: Queue,
private readonly configService: ConfigService,
@Inject(forwardRef(() => BotService))
private readonly botService: BotService,
) {
this.ecsClient = new ECSClient({
credentials: {
Expand Down Expand Up @@ -222,6 +225,7 @@ export class ECSClientService {
await bot.update({
status: ExecutionStatusLogEnum.COMPLETED,
});
await this.botService.triggerTranscriptGeneration(bot.id);
}
} catch (error) {
await bot.update({
Expand Down
96 changes: 80 additions & 16 deletions api-backend/src/bot/bot.service.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
import {
BadRequestException,
forwardRef,
Inject,
Injectable,
InternalServerErrorException,
NotFoundException,
Expand All @@ -26,12 +28,15 @@ import {
TranscriptionLogResponse,
} from 'src/interfaces/proto-generated/transcript_management';
import { v4 as uuidv4 } from 'uuid';
import { TASK_STOPPED_ERROR_CODES } from 'src/constants/ecs';

@Injectable()
export class BotService {
constructor(
@InjectModel(Bot)
private botModel: typeof Bot,
private readonly awsService: AwsService,
@Inject(forwardRef(() => ECSClientService))
private readonly ecsService: ECSClientService,
private readonly configService: ConfigService,
private readonly workerService: WorkerService,
Expand Down Expand Up @@ -65,6 +70,19 @@ export class BotService {
return this.initiateBot(bot.id);
}

private mapPlatformToType(platform: BotPlatform): BotPlatformMappingType {
switch (platform) {
case BotPlatform.GOOGLE_MEET:
return BotPlatformMappingType.GOOGLE;
case BotPlatform.TEAMS:
return BotPlatformMappingType.MS_TEAMS;
case BotPlatform.ZOOM:
return BotPlatformMappingType.ZOOM;
default:
throw new Error(`Unknown platform: ${platform}`);
}
}

async initiateBot(bot: Bot | string): Promise<Bot> {
let botInstance: Bot;
if (typeof bot === 'string') {
Expand All @@ -80,7 +98,7 @@ export class BotService {
const metaData = {
id: botInstance.id,
user_id: botInstance.apiKey.userId,
bot_type: BotPlatformMappingType[botInstance.platform],
bot_type: String(this.mapPlatformToType(botInstance.platform)),
...(botInstance.title && { meeting_title: botInstance.title }),
};

Expand All @@ -97,21 +115,6 @@ export class BotService {
'Failed to generate presigned url.',
);

try {
await lastValueFrom(
this.workerService.createFileLog({
id: botInstance.id,
raw_file_key: tarObjectName,
...(botInstance.title && { meeting_title: botInstance.title }),
...(botInstance.apiKey.userId && {
created_by_user_id: botInstance.apiKey.userId,
}),
}),
);
} catch (e) {
console.log(e);
}

let taskId;
switch (botInstance.platform) {
case BotPlatform.GOOGLE_MEET:
Expand Down Expand Up @@ -149,6 +152,7 @@ export class BotService {
await botInstance.update({
taskId,
status: ExecutionStatusLogEnum.STARTED,
tarFileKey: tarObjectName,
});
await botInstance.reload();
return botInstance;
Expand Down Expand Up @@ -387,4 +391,64 @@ export class BotService {
hasMore: transcript.count > pagination.page_size * pagination.page,
};
}

async findBotsWithNonErrorFailures(): Promise<Bot[]> {
const bots = await this.botModel.findAll({
where: {
status: { [Op.not]: ExecutionStatusLogEnum.FAILED },
taskId: { [Op.not]: null },
// Only get bots from the last 24 hours
createdAt: {
[Op.gte]: moment().subtract(24, 'hours').toDate(),
},
},
include: [{ model: ApiKey, attributes: ['userId'] }],
});

// Filter bots to only those that failed for non-error reasons
const results = [];
for (const bot of bots) {
try {
const taskInfo = await this.ecsService.healthCheckTask(bot.taskId);
if (
taskInfo?.lastStatus === 'STOPPED' &&
!TASK_STOPPED_ERROR_CODES.includes(taskInfo?.stopCode)
) {
results.push(bot);
}
} catch (error) {
console.error(`Error checking task status for bot ${bot.id}:`, error);
}
}

return results;
}

async triggerTranscriptGeneration(botId: string): Promise<void> {
const bot = await this.botModel.findByPk(botId, {
include: [{ model: ApiKey, attributes: ['userId'] }],
});
if (!bot) throw new NotFoundException('Bot not found');

if (!bot.tarFileKey)
throw new BadRequestException('Bot has no tar file key');

try {
await lastValueFrom(
this.workerService.createFileLog({
id: bot.id,
raw_file_key: bot.tarFileKey,
...(bot.title && { meeting_title: bot.title }),
...(bot.apiKey.userId && {
created_by_user_id: bot.apiKey.userId,
}),
}),
);
} catch (error) {
console.error(
`Error triggering transcript generation for bot ${bot.id}:`,
error,
);
}
}
}
3 changes: 2 additions & 1 deletion api-backend/src/cron-job/cron-job.module.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,10 @@ import { CronJobService } from './cron-job.service';
import { ScheduleModule } from '@nestjs/schedule';
import { AwsModule } from 'src/aws/aws.module';
import { BotModule } from 'src/bot/bot.module';
import { GrpcModule } from 'src/grpc/grpc.module';

@Module({
imports: [ScheduleModule.forRoot(), AwsModule, BotModule],
imports: [ScheduleModule.forRoot(), AwsModule, BotModule, GrpcModule],
providers: [CronJobService],
exports: [CronJobService],
})
Expand Down
25 changes: 23 additions & 2 deletions api-backend/src/cron-job/cron-job.service.ts
Original file line number Diff line number Diff line change
@@ -1,13 +1,16 @@
import { Injectable } from '@nestjs/common';
import { CronExpression } from '@nestjs/schedule';
import { Cron } from '@nestjs/schedule';
import { Cron, CronExpression } from '@nestjs/schedule';
import { lastValueFrom } from 'rxjs';
import { ECSClientService } from 'src/aws/ecs.service';
import { BotService } from 'src/bot/bot.service';
import { WorkerService } from 'src/grpc/worker.service';

@Injectable()
export class CronJobService {
constructor(
private readonly ecsService: ECSClientService,
private readonly botService: BotService,
private readonly workerService: WorkerService,
) {}

@Cron(CronExpression.EVERY_5_MINUTES)
Expand All @@ -19,4 +22,22 @@ export class CronJobService {
async runEvery10Minutes(): Promise<void> {
await this.botService.initiateScheduledBot();
}

@Cron(CronExpression.EVERY_10_MINUTES)
async checkBotStatusAndProcess(): Promise<void> {
try {
console.log('Checking bot status and processing');
// First check if the worker service is healthy
const healthCheck = await lastValueFrom(this.workerService.healthCheck());

if (healthCheck?.ServingStatus !== '200') {
console.log('Worker service is not healthy, skipping bot status check');
return;
}
// Get bot status through ECS service
await this.ecsService.syncTaskStatus();
} catch (error) {
console.error('Error in bot status check cron job:', error);
}
}
}
6 changes: 6 additions & 0 deletions api-backend/src/database/models/bot.model.ts
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,12 @@ export class Bot extends Model {
})
status: ExecutionStatusLogEnum;

@Column({
type: DataType.STRING,
allowNull: true,
})
tarFileKey: string;

@Column({
type: DataType.JSONB,
allowNull: true,
Expand Down
8 changes: 4 additions & 4 deletions api-backend/src/grpc/worker.service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ import { Inject, Injectable } from '@nestjs/common';
import { ClientGrpc } from '@nestjs/microservices';
import { MICROSERVICES } from 'src/constants/grpc';
import {
FileManagementController,
DocumentFileManagementController,
HealthCheckController,
FileManagementListController,
TranscriptionLogListController,
Expand All @@ -29,8 +29,8 @@ export class WorkerService {

onModuleInit() {
this.fileManagementService =
this.client.getService<FileManagementController>(
'FileManagementController',
this.client.getService<DocumentFileManagementController>(
'DocumentFileManagementController',
);
this.healthCheckService = this.client.getService<HealthCheckController>(
'HealthCheckController',
Expand Down Expand Up @@ -60,7 +60,7 @@ export class WorkerService {
.add(this.deadlineInSeconds, 'seconds')
.toDate();

return this.fileManagementService.Create(payload, { deadline });
return this.fileManagementService.DocumentCreate(payload, { deadline });
}

listFileLogs(payload: any): Observable<FileManagementListResponse> {
Expand Down

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

17 changes: 9 additions & 8 deletions api-backend/src/proto/transcript_management.proto
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,8 @@ package worker_backend.transcript_management;

import "google/protobuf/empty.proto";

service FileManagementController {
rpc Create(FileManagementRequest) returns (FileManagementResponse) {}
service DocumentFileManagementController {
rpc DocumentCreate(FileManagementRequest) returns (FileManagementResponse) {}
}

service FileManagementListController {
Expand All @@ -32,7 +32,7 @@ message FileManagementRequest {
optional string id = 1;
string raw_file_key = 2;
optional string meeting_title = 3;
optional string created_by_user_id = 8;
optional string created_by_user_id = 4;
}

message FileManagementResponse {
Expand All @@ -42,10 +42,10 @@ message FileManagementResponse {
optional string meeting_title = 4;
optional string meeting_meeting_start_time = 5;
optional string meeting_meeting_end_time = 6;
optional string execution_id = 11;
optional int32 bot_used = 12;
optional int32 status = 13;
optional string created_by_user_id = 14;
optional string execution_id = 7;
optional int32 bot_used = 8;
optional int32 status = 9;
optional string created_by_user_id = 10;
}

message HealthCheckhealthCheckResponse {
Expand All @@ -66,4 +66,5 @@ message TranscriptionLogResponse {
optional string transcription_start_time_milliseconds = 3;
optional string transcription_end_time_milliseconds = 4;
optional string transcription_Data = 5;
}
}

Loading