diff --git a/README.md b/README.md index b6ffcb91..d6b58260 100644 --- a/README.md +++ b/README.md @@ -1,13 +1,13 @@ -# 블로그 콘텐츠 자동화 플랫폼 +# 동적 워크플로우 관리 플랫폼 -### AI 기반 워크플로우 오케스트레이터를 활용한 RAG 기반 블로그 콘텐츠 자동화 시스템 +### AI 기반 자동화 트렌드에 맞추어 다양한 비즈니스 프로세스를 자동화할 수 있는 확장 가능한 워크플로우 관리 플랫폼 --- ### 목차 1. [서비스 개요](#1-서비스-개요) -2. [시스템 개요](#2-시스템-개요) +2. [주요 기능](#2-주요-기능) 3. [데이터 베이스 설계(ERD)](#3-데이터베이스-설계-erd) 4. [시스템 아키텍처](#4-시스템-아키텍처) 5. [유스케이스 다이어그램](#5-유스케이스-다이어그램) @@ -23,17 +23,19 @@ ## 1. 서비스 개요 최근 커머스 업계의 AI 기반 콘텐츠 자동화 트렌드에도 불구하고, 여전히 트렌드 분석, 상품 조사, 콘텐츠 생성 및 발행 등 각 단계에서 시간 소모적인 수작업이 필요합니다. -본 프로젝트는 이러한 비효율을 해결하기 위해, **RAG(검색 증강 생성)** 기술을 기반으로 블로그 콘텐츠 생성의 전 과정을 자동화하는 워크플로우 플랫폼을 구축하는 것을 목표로 합니다. ---- +본 프로젝트는 이러한 비효율을 해결하기 위해, 특정 도메인에 국한되지 않고 다양한 비즈니스 프로세스를 자동화할 수 있는 확장 가능한 워크플로우 플랫폼을 구축하는 것을 목표로 합니다. 초기 MVP 모델로 RAG(검색 증강 생성) 기반 블로그 콘텐츠 자동 생성 및 발행하는 마케팅 워크플로우를 구현했습니다. -## 2. 시스템 개요 +--- -본 시스템은 `Spring Boot` 기반의 오케스트레이터와 `Python(FastAPI)` 기반의 AI 워커로 구성된 **이중 레이어 아키텍처**를 채택하여, 각 서비스의 역할을 명확히 분리했습니다. +## **2. 주요 기능** -* **워크플로우 자동화**: 네이버 데이터 랩의 실시간 트렌드 키워드를 자동 수집하고, 싸다구몰(1688)에서 유사도 기반 검색을 통해 관련 상품을 매칭합니다. -* **AI 콘텐츠 생성**: 수집된 상품 정보와 이미지를 OCR 및 번역 기술로 분석한 후, RAG 기술을 통해 SEO에 최적화된 블로그 콘텐츠를 자동 생성합니다. -* **관리 및 모니터링**: 관리자 전용 대시보드에서 워크플로우 실행, 스케줄 제어, 실행 이력 및 결과를 실시간 모니터링합니다. Grafana로 서버 리소스와 API 상태를 시각적으로 확인할 수 있습니다. + * **워크플로우 자동화**: `Workflow → Job → Task`의 계층적 구조에 따라 복잡한 프로세스를 순차적으로 자동 실행합니다. + * **대표 워크플로우**: **블로그 콘텐츠 자동 생성 및 발행** + * 네이버 데이터 랩 트렌드 분석 → 싸다구몰 상품 정보 수집 → RAG 기반 AI 콘텐츠 생성 → 블로그 자동 발행 + * **동적 스케줄링**: API를 통해 재배포 없이 스케줄을 실시간으로 생성, 수정, 삭제할 수 있습니다. + * **데이터 파이프라이닝**: 이전 Task의 실행 결과(Output)를 다음 Task의 입력(Input)으로 동적으로 전달합니다. + * **관리 및 모니터링**: 관리자 대시보드에서 워크플로우 실행 이력 및 결과를 실시간으로 모니터링하고, Grafana를 통해 서버 리소스를 시각적으로 확인합니다. --- @@ -83,7 +85,7 @@ ## 4. 시스템 아키텍처 -역할과 책임을 명확히 분리하기 위해 `Spring Boot`가 **Orchestrator**, `FastAPI`가 **Worker** 역할을 수행하는 이중 레이어 아키텍처를 채택했습니다. +역할과 책임을 명확히 분리하기 위해 `Spring Boot`가 **Orchestrator**, `FastAPI`가 **AI Worker** 역할을 수행하는 이중 레이어 아키텍처를 채택했습니다. * **Spring Boot (Orchestrator)**: `Workflow → Job → Task` 구조를 기반으로 전체 비즈니스 흐름을 제어합니다. 스케줄링(`Quartz`), 상태 관리, 데이터 영속성, 인증/인가 등 핵심 로직 담당. * **FastAPI (Worker)**: 키워드 추출, 상품 검색, 웹 크롤링, AI 연동(RAG), OCR 등 Python 생태계 특화 작업을 담당. @@ -169,6 +171,7 @@ GitHub Actions 기반으로 빌드 → 테스트 → Docker 빌드 및 푸시 * **WorkflowExecutionService**: 워크플로우 전체 실행 흐름 제어 * **TaskExecutionService**: Task 실행 및 재시도 정책 관리 * **TaskBodyBuilder (전략 패턴)**: 각 Task별 동적 Request Body 생성 +* **WorkflowContextService (퍼사드 패턴)**: 각 TaskBodyBuilder가 이전 Task의 결과를 조회할 때 필요한 복잡한 DB 접근 로직(TaskIoDataMapper 사용 등)을 캡슐화하고, 단순화된 인터페이스를 제공하는 퍼사드(Facade) 역할 * **FastApiAdapter**: FastAPI 서버 통신 캡슐화 * **QuartzSchedulerInitializer**: DB 스케줄 정보 Quartz 엔진 동기화 * **ExecutionMdcManager**: 비동기 환경에서도 traceId 기반 분산 추적 로깅 @@ -180,7 +183,7 @@ GitHub Actions 기반으로 빌드 → 테스트 → Docker 빌드 및 푸시 Monorepo 형태로, `apps` 하위에 서비스별 디렉토리가 존재합니다. ```bash -Final-4team-icebang/ +FlowWeaver/backend/ ├── apps/ │ ├── user-service/ # Java/Spring Boot 서비스 │ └── pre-processing-service/# Python/FastAPI 서비스 diff --git a/apps/user-service/src/main/java/site/icebang/domain/workflow/dto/RequestContextDto.java b/apps/user-service/src/main/java/site/icebang/domain/workflow/dto/RequestContextDto.java index 66ef57aa..47c8c5e2 100644 --- a/apps/user-service/src/main/java/site/icebang/domain/workflow/dto/RequestContextDto.java +++ b/apps/user-service/src/main/java/site/icebang/domain/workflow/dto/RequestContextDto.java @@ -19,4 +19,14 @@ public class RequestContextDto { public static RequestContextDto forScheduler(String traceId) { return new RequestContextDto(traceId, "scheduler", "quartz-scheduler"); } + + /** + * 시스템 복구 실행용 컨텍스트를 생성하는 정적 팩토리 메서드입니다. + * + * @param traceId 기존 실행에서 사용하던 추적 ID + * @return 복구용 RequestContext 객체 + */ + public static RequestContextDto forRecovery(String traceId) { + return new RequestContextDto(traceId, "system-recovery", "workflow-system-recovery"); + } } diff --git a/apps/user-service/src/main/java/site/icebang/domain/workflow/mapper/JobRunMapper.java b/apps/user-service/src/main/java/site/icebang/domain/workflow/mapper/JobRunMapper.java index a3546069..6249f259 100644 --- a/apps/user-service/src/main/java/site/icebang/domain/workflow/mapper/JobRunMapper.java +++ b/apps/user-service/src/main/java/site/icebang/domain/workflow/mapper/JobRunMapper.java @@ -1,6 +1,7 @@ package site.icebang.domain.workflow.mapper; import org.apache.ibatis.annotations.Mapper; +import org.apache.ibatis.annotations.Param; import site.icebang.domain.workflow.model.JobRun; @@ -9,4 +10,6 @@ public interface JobRunMapper { void insert(JobRun jobRun); void update(JobRun jobRun); + + JobRun findSuccessfulJobByTraceId(@Param("traceId") String traceId, @Param("jobId") Long jobId); } diff --git a/apps/user-service/src/main/java/site/icebang/domain/workflow/mapper/WorkflowRunMapper.java b/apps/user-service/src/main/java/site/icebang/domain/workflow/mapper/WorkflowRunMapper.java index 64bbcbc6..c644c974 100644 --- a/apps/user-service/src/main/java/site/icebang/domain/workflow/mapper/WorkflowRunMapper.java +++ b/apps/user-service/src/main/java/site/icebang/domain/workflow/mapper/WorkflowRunMapper.java @@ -1,6 +1,9 @@ package site.icebang.domain.workflow.mapper; +import java.util.List; + import org.apache.ibatis.annotations.Mapper; +import org.apache.ibatis.annotations.Param; import site.icebang.domain.workflow.model.WorkflowRun; @@ -9,4 +12,6 @@ public interface WorkflowRunMapper { void insert(WorkflowRun workflowRun); void update(WorkflowRun workflowRun); + + List findByStatus(@Param("status") String status); } diff --git a/apps/user-service/src/main/java/site/icebang/domain/workflow/service/WorkflowExecutionService.java b/apps/user-service/src/main/java/site/icebang/domain/workflow/service/WorkflowExecutionService.java index b18b87fe..1e4c2ab9 100644 --- a/apps/user-service/src/main/java/site/icebang/domain/workflow/service/WorkflowExecutionService.java +++ b/apps/user-service/src/main/java/site/icebang/domain/workflow/service/WorkflowExecutionService.java @@ -85,6 +85,18 @@ public void executeWorkflow(Long workflowId, RequestContextDto context) { for (JobDto jobDto : jobDtos) { Job job = new Job(jobDto); mdcManager.setJobContext(job.getId()); + + // 이미 성공적으로 수행된 Job인지 확인합니다. + JobRun existingSuccessfulJob = + jobRunMapper.findSuccessfulJobByTraceId(context.getTraceId(), job.getId()); + if (existingSuccessfulJob != null) { + workflowLogger.info( + "---------- Job 스킵 (이미 성공함): JobId={}, PreviousJobRunId={} ----------", + job.getId(), + existingSuccessfulJob.getId()); + continue; // 이미 성공했으므로 실행하지 않고 다음 Job으로 넘어갑니다. + } + JobRun jobRun = JobRun.start(workflowRun.getId(), job.getId()); jobRunMapper.insert(jobRun); workflowLogger.info( diff --git a/apps/user-service/src/main/java/site/icebang/global/config/WorkflowRecoveryInitializer.java b/apps/user-service/src/main/java/site/icebang/global/config/WorkflowRecoveryInitializer.java new file mode 100644 index 00000000..33cd20bd --- /dev/null +++ b/apps/user-service/src/main/java/site/icebang/global/config/WorkflowRecoveryInitializer.java @@ -0,0 +1,79 @@ +package site.icebang.global.config; + +import java.util.List; + +import org.springframework.boot.ApplicationArguments; +import org.springframework.boot.ApplicationRunner; +import org.springframework.stereotype.Component; + +import lombok.RequiredArgsConstructor; +import lombok.extern.slf4j.Slf4j; + +import site.icebang.domain.workflow.dto.RequestContextDto; +import site.icebang.domain.workflow.mapper.WorkflowRunMapper; +import site.icebang.domain.workflow.model.WorkflowRun; +import site.icebang.domain.workflow.service.WorkflowExecutionService; + +/** + * 애플리케이션 시작 시, 비정상 종료된 워크플로우를 감지하고 '실제로 복구(재실행)'하는 클래스입니다. + * + *

서버가 시작될 때 DB에 'RUNNING' 상태로 남아있는 워크플로우는 시스템 장애(전원 차단, OOM 등)로 인해 중단된 작업입니다. 이 클래스는 해당 작업들을 + * 찾아내어 이전 기록을 정리하고, 작업을 자동으로 재시작합니다. + */ +@Slf4j +@Component +@RequiredArgsConstructor +public class WorkflowRecoveryInitializer implements ApplicationRunner { + + private final WorkflowRunMapper workflowRunMapper; + private final WorkflowExecutionService workflowExecutionService; + + @Override + public void run(ApplicationArguments args) { + try { + List runningWorkflows = workflowRunMapper.findByStatus("RUNNING"); + + if (runningWorkflows.isEmpty()) { + return; + } + + log.warn("비정상 종료된 워크플로우 {}건 발견. 자동 복구(재실행) 프로세스를 시작합니다.", runningWorkflows.size()); + + int recoveredCount = 0; + for (WorkflowRun run : runningWorkflows) { + if (recoverAndRestart(run)) { + recoveredCount++; + } + } + log.info("총 {}건의 워크플로우가 성공적으로 복구(재실행 요청)되었습니다.", recoveredCount); + + } catch (Exception e) { + log.error("워크플로우 복구 프로세스 전체 실패", e); + } + } + + private boolean recoverAndRestart(WorkflowRun run) { + try { + // 1. 기존 실행 이력 마감 (Cleaning) + // 중단된 시점의 상태를 FAILED로 확정지어 데이터 정합성을 맞춥니다. + run.finish("FAILED"); + workflowRunMapper.update(run); + log.info("[복구 1단계] 기존 이력 정리 완료: RunID={}, WorkflowID={}", run.getId(), run.getWorkflowId()); + + // 2. 워크플로우 재실행 (Restarting) + // 기존 Trace ID를 승계하여 로그 추적성을 유지하며 서비스를 다시 시작합니다. + RequestContextDto recoveryContext = RequestContextDto.forRecovery(run.getTraceId()); + + workflowExecutionService.executeWorkflow(run.getWorkflowId(), recoveryContext); + log.info( + "[복구 2단계] 워크플로우 재실행 요청 완료: WorkflowID={}, TraceID={}", + run.getWorkflowId(), + run.getTraceId()); + + return true; + } catch (Exception e) { + log.error("워크플로우 개별 복구 실패: RunID={}", run.getId(), e); + return false; + } + } +} diff --git a/apps/user-service/src/main/resources/mybatis/mapper/JobRunMapper.xml b/apps/user-service/src/main/resources/mybatis/mapper/JobRunMapper.xml index 2cc51d78..927cbafe 100644 --- a/apps/user-service/src/main/resources/mybatis/mapper/JobRunMapper.xml +++ b/apps/user-service/src/main/resources/mybatis/mapper/JobRunMapper.xml @@ -25,4 +25,15 @@ WHERE id = #{id} + + \ No newline at end of file diff --git a/apps/user-service/src/main/resources/mybatis/mapper/WorkflowRunMapper.xml b/apps/user-service/src/main/resources/mybatis/mapper/WorkflowRunMapper.xml index 8011fc6c..74883182 100644 --- a/apps/user-service/src/main/resources/mybatis/mapper/WorkflowRunMapper.xml +++ b/apps/user-service/src/main/resources/mybatis/mapper/WorkflowRunMapper.xml @@ -25,4 +25,8 @@ WHERE id = #{id} + + \ No newline at end of file