Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 15 additions & 12 deletions README.md
Original file line number Diff line number Diff line change
@@ -1,13 +1,13 @@
# 블로그 콘텐츠 자동화 플랫폼
# 동적 워크플로우 관리 플랫폼

### AI 기반 워크플로우 오케스트레이터를 활용한 RAG 기반 블로그 콘텐츠 자동화 시스템
### AI 기반 자동화 트렌드에 맞추어 다양한 비즈니스 프로세스를 자동화할 수 있는 확장 가능한 워크플로우 관리 플랫폼

---

### 목차

1. [서비스 개요](#1-서비스-개요)
2. [시스템 개요](#2-시스템-개요)
2. [주요 기능](#2-주요-기능)
3. [데이터 베이스 설계(ERD)](#3-데이터베이스-설계-erd)
4. [시스템 아키텍처](#4-시스템-아키텍처)
5. [유스케이스 다이어그램](#5-유스케이스-다이어그램)
Expand All @@ -23,17 +23,19 @@
## 1. 서비스 개요

최근 커머스 업계의 AI 기반 콘텐츠 자동화 트렌드에도 불구하고, 여전히 트렌드 분석, 상품 조사, 콘텐츠 생성 및 발행 등 각 단계에서 시간 소모적인 수작업이 필요합니다.
본 프로젝트는 이러한 비효율을 해결하기 위해, **RAG(검색 증강 생성)** 기술을 기반으로 블로그 콘텐츠 생성의 전 과정을 자동화하는 워크플로우 플랫폼을 구축하는 것을 목표로 합니다.

---
본 프로젝트는 이러한 비효율을 해결하기 위해, 특정 도메인에 국한되지 않고 다양한 비즈니스 프로세스를 자동화할 수 있는 확장 가능한 워크플로우 플랫폼을 구축하는 것을 목표로 합니다. 초기 MVP 모델로 RAG(검색 증강 생성) 기반 블로그 콘텐츠 자동 생성 및 발행하는 마케팅 워크플로우를 구현했습니다.

## 2. 시스템 개요
---

본 시스템은 `Spring Boot` 기반의 오케스트레이터와 `Python(FastAPI)` 기반의 AI 워커로 구성된 **이중 레이어 아키텍처**를 채택하여, 각 서비스의 역할을 명확히 분리했습니다.
## **2. 주요 기능**

* **워크플로우 자동화**: 네이버 데이터 랩의 실시간 트렌드 키워드를 자동 수집하고, 싸다구몰(1688)에서 유사도 기반 검색을 통해 관련 상품을 매칭합니다.
* **AI 콘텐츠 생성**: 수집된 상품 정보와 이미지를 OCR 및 번역 기술로 분석한 후, RAG 기술을 통해 SEO에 최적화된 블로그 콘텐츠를 자동 생성합니다.
* **관리 및 모니터링**: 관리자 전용 대시보드에서 워크플로우 실행, 스케줄 제어, 실행 이력 및 결과를 실시간 모니터링합니다. Grafana로 서버 리소스와 API 상태를 시각적으로 확인할 수 있습니다.
* **워크플로우 자동화**: `Workflow → Job → Task`의 계층적 구조에 따라 복잡한 프로세스를 순차적으로 자동 실행합니다.
* **대표 워크플로우**: **블로그 콘텐츠 자동 생성 및 발행**
* 네이버 데이터 랩 트렌드 분석 → 싸다구몰 상품 정보 수집 → RAG 기반 AI 콘텐츠 생성 → 블로그 자동 발행
* **동적 스케줄링**: API를 통해 재배포 없이 스케줄을 실시간으로 생성, 수정, 삭제할 수 있습니다.
* **데이터 파이프라이닝**: 이전 Task의 실행 결과(Output)를 다음 Task의 입력(Input)으로 동적으로 전달합니다.
* **관리 및 모니터링**: 관리자 대시보드에서 워크플로우 실행 이력 및 결과를 실시간으로 모니터링하고, Grafana를 통해 서버 리소스를 시각적으로 확인합니다.

---

Expand Down Expand Up @@ -83,7 +85,7 @@

## 4. 시스템 아키텍처

역할과 책임을 명확히 분리하기 위해 `Spring Boot`가 **Orchestrator**, `FastAPI`가 **Worker** 역할을 수행하는 이중 레이어 아키텍처를 채택했습니다.
역할과 책임을 명확히 분리하기 위해 `Spring Boot`가 **Orchestrator**, `FastAPI`가 **AI Worker** 역할을 수행하는 이중 레이어 아키텍처를 채택했습니다.

* **Spring Boot (Orchestrator)**: `Workflow → Job → Task` 구조를 기반으로 전체 비즈니스 흐름을 제어합니다. 스케줄링(`Quartz`), 상태 관리, 데이터 영속성, 인증/인가 등 핵심 로직 담당.
* **FastAPI (Worker)**: 키워드 추출, 상품 검색, 웹 크롤링, AI 연동(RAG), OCR 등 Python 생태계 특화 작업을 담당.
Expand Down Expand Up @@ -169,6 +171,7 @@ GitHub Actions 기반으로 빌드 → 테스트 → Docker 빌드 및 푸시
* **WorkflowExecutionService**: 워크플로우 전체 실행 흐름 제어
* **TaskExecutionService**: Task 실행 및 재시도 정책 관리
* **TaskBodyBuilder (전략 패턴)**: 각 Task별 동적 Request Body 생성
* **WorkflowContextService (퍼사드 패턴)**: 각 TaskBodyBuilder가 이전 Task의 결과를 조회할 때 필요한 복잡한 DB 접근 로직(TaskIoDataMapper 사용 등)을 캡슐화하고, 단순화된 인터페이스를 제공하는 퍼사드(Facade) 역할
* **FastApiAdapter**: FastAPI 서버 통신 캡슐화
* **QuartzSchedulerInitializer**: DB 스케줄 정보 Quartz 엔진 동기화
* **ExecutionMdcManager**: 비동기 환경에서도 traceId 기반 분산 추적 로깅
Expand All @@ -180,7 +183,7 @@ GitHub Actions 기반으로 빌드 → 테스트 → Docker 빌드 및 푸시
Monorepo 형태로, `apps` 하위에 서비스별 디렉토리가 존재합니다.

```bash
Final-4team-icebang/
FlowWeaver/backend/
├── apps/
│ ├── user-service/ # Java/Spring Boot 서비스
│ └── pre-processing-service/# Python/FastAPI 서비스
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,4 +19,14 @@ public class RequestContextDto {
public static RequestContextDto forScheduler(String traceId) {
return new RequestContextDto(traceId, "scheduler", "quartz-scheduler");
}

/**
* 시스템 복구 실행용 컨텍스트를 생성하는 정적 팩토리 메서드입니다.
*
* @param traceId 기존 실행에서 사용하던 추적 ID
* @return 복구용 RequestContext 객체
*/
public static RequestContextDto forRecovery(String traceId) {
return new RequestContextDto(traceId, "system-recovery", "workflow-system-recovery");
}
}
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package site.icebang.domain.workflow.mapper;

import org.apache.ibatis.annotations.Mapper;
import org.apache.ibatis.annotations.Param;

import site.icebang.domain.workflow.model.JobRun;

Expand All @@ -9,4 +10,6 @@ public interface JobRunMapper {
void insert(JobRun jobRun);

void update(JobRun jobRun);

JobRun findSuccessfulJobByTraceId(@Param("traceId") String traceId, @Param("jobId") Long jobId);
}
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
package site.icebang.domain.workflow.mapper;

import java.util.List;

import org.apache.ibatis.annotations.Mapper;
import org.apache.ibatis.annotations.Param;

import site.icebang.domain.workflow.model.WorkflowRun;

Expand All @@ -9,4 +12,6 @@ public interface WorkflowRunMapper {
void insert(WorkflowRun workflowRun);

void update(WorkflowRun workflowRun);

List<WorkflowRun> findByStatus(@Param("status") String status);
}
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,18 @@ public void executeWorkflow(Long workflowId, RequestContextDto context) {
for (JobDto jobDto : jobDtos) {
Job job = new Job(jobDto);
mdcManager.setJobContext(job.getId());

// 이미 성공적으로 수행된 Job인지 확인합니다.
JobRun existingSuccessfulJob =
jobRunMapper.findSuccessfulJobByTraceId(context.getTraceId(), job.getId());
if (existingSuccessfulJob != null) {
workflowLogger.info(
"---------- Job 스킵 (이미 성공함): JobId={}, PreviousJobRunId={} ----------",
job.getId(),
existingSuccessfulJob.getId());
continue; // 이미 성공했으므로 실행하지 않고 다음 Job으로 넘어갑니다.
}

JobRun jobRun = JobRun.start(workflowRun.getId(), job.getId());
jobRunMapper.insert(jobRun);
workflowLogger.info(
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
package site.icebang.global.config;

import java.util.List;

import org.springframework.boot.ApplicationArguments;
import org.springframework.boot.ApplicationRunner;
import org.springframework.stereotype.Component;

import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;

import site.icebang.domain.workflow.dto.RequestContextDto;
import site.icebang.domain.workflow.mapper.WorkflowRunMapper;
import site.icebang.domain.workflow.model.WorkflowRun;
import site.icebang.domain.workflow.service.WorkflowExecutionService;

/**
* 애플리케이션 시작 시, 비정상 종료된 워크플로우를 감지하고 '실제로 복구(재실행)'하는 클래스입니다.
*
* <p>서버가 시작될 때 DB에 'RUNNING' 상태로 남아있는 워크플로우는 시스템 장애(전원 차단, OOM 등)로 인해 중단된 작업입니다. 이 클래스는 해당 작업들을
* 찾아내어 이전 기록을 정리하고, 작업을 <b>자동으로 재시작</b>합니다.
*/
@Slf4j
@Component
@RequiredArgsConstructor
public class WorkflowRecoveryInitializer implements ApplicationRunner {

private final WorkflowRunMapper workflowRunMapper;
private final WorkflowExecutionService workflowExecutionService;

@Override
public void run(ApplicationArguments args) {
try {
List<WorkflowRun> runningWorkflows = workflowRunMapper.findByStatus("RUNNING");

if (runningWorkflows.isEmpty()) {
return;
}

log.warn("비정상 종료된 워크플로우 {}건 발견. 자동 복구(재실행) 프로세스를 시작합니다.", runningWorkflows.size());

int recoveredCount = 0;
for (WorkflowRun run : runningWorkflows) {
if (recoverAndRestart(run)) {
recoveredCount++;
}
}
log.info("총 {}건의 워크플로우가 성공적으로 복구(재실행 요청)되었습니다.", recoveredCount);

} catch (Exception e) {
log.error("워크플로우 복구 프로세스 전체 실패", e);
}
}

private boolean recoverAndRestart(WorkflowRun run) {
try {
// 1. 기존 실행 이력 마감 (Cleaning)
// 중단된 시점의 상태를 FAILED로 확정지어 데이터 정합성을 맞춥니다.
run.finish("FAILED");
workflowRunMapper.update(run);
log.info("[복구 1단계] 기존 이력 정리 완료: RunID={}, WorkflowID={}", run.getId(), run.getWorkflowId());

// 2. 워크플로우 재실행 (Restarting)
// 기존 Trace ID를 승계하여 로그 추적성을 유지하며 서비스를 다시 시작합니다.
RequestContextDto recoveryContext = RequestContextDto.forRecovery(run.getTraceId());

workflowExecutionService.executeWorkflow(run.getWorkflowId(), recoveryContext);
log.info(
"[복구 2단계] 워크플로우 재실행 요청 완료: WorkflowID={}, TraceID={}",
run.getWorkflowId(),
run.getTraceId());

return true;
} catch (Exception e) {
log.error("워크플로우 개별 복구 실패: RunID={}", run.getId(), e);
return false;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,4 +25,15 @@
WHERE id = #{id}
</update>

<select id="findSuccessfulJobByTraceId" resultMap="JobRunResultMap">
SELECT j.*
FROM job_run j
JOIN workflow_run w ON j.workflow_run_id = w.id
WHERE w.trace_id = #{traceId}
AND j.job_id = #{jobId}
AND j.status = 'SUCCESS'
ORDER BY j.id DESC
LIMIT 1
</select>

</mapper>
Original file line number Diff line number Diff line change
Expand Up @@ -25,4 +25,8 @@
WHERE id = #{id}
</update>

<select id="findByStatus" resultMap="WorkflowRunResultMap">
SELECT * FROM workflow_run WHERE status = #{status}
</select>

</mapper>
Loading