Skip to content

Latest commit

 

History

History
233 lines (158 loc) · 6.93 KB

File metadata and controls

233 lines (158 loc) · 6.93 KB

fetchWeatherDataConcurrently()

← 메인으로 돌아가기

개요

fetchWeatherDataConcurrently()는 여러 지역의 날씨 데이터를 병렬로 수집하는 핵심 함수입니다.

전체 데이터 흐름

함수 시그니처

코드의 자세한 내역을 참조하기 위해서는 해당 파일의 함수를 참조하십시오. FetchWeatherDataConcurrently

func FetchWeatherDataConcurrently(
targetGrid []grid.RepresentativeGrid,
date types.ForecastDateRange,
) (success []*types.Weather, failed []grid.RepresentativeGrid) {

파라미터

파라미터 상세정보
파라미터 타입 설명
targetGrid []grid.RepresentativeGrid 날씨 데이터를 수집할 지역 목록입니다. 각 지역은 grid.RepresentativeGrid 구조체로 표현되며, 지역의 좌표(NX, NY), 이름 등의 정보를 포함합니다.
date types.ForecastDateRange 예보 날짜 정보입니다. types.ForecastDateRange 구조체는 API 호출에 사용될 날짜(호출일, 시작일, 종료일) 및 데이터의 TTL(Time To Live) 정보를 포함합니다. TTL은 데이터의 유효 기간을 나타냅니다.

반환값

반환값 타입 설명
success []*types.Weather 성공적으로 수집된 날씨 데이터 배열입니다. types.Weather 구조체는 날씨 정보(기온, 하늘 상태, 강수량 등)를 포함합니다.
failed []grid.RepresentativeGrid API 호출에 실패한 지역 목록입니다.

동작 방식

1. 동시성 제어

semaphore := createSemaphore(len(grid)) // 최대 30개 동시 실행
  • Semaphore 패턴으로 동시 실행 수 제한
  • API 서버 과부하 방지
  • 최대 30개 goroutine 동시 실행

2. 병렬 데이터 수집

코드 상세
for _, info := range targetGrid {
wg.Add(1)

nx := info.Nx
ny := info.Ny

go func () {
defer wg.Done()
semaphore <- struct{}{}
defer func () { <-semaphore }()

fcstItem := weather_API.VillageFcstInfo(date.CallDate, "2300", nx, ny)
if fcstItem == nil {
failedChan <- info
return // 조기종료
}

weatherData := createWeatherData(date.TTL, fcstItem, info)
// success chan 저장
successChan <- weatherData

log.Info("데이터 불러오기 성공", "지역", info.Name, "시작일", date.TrueDate, "종료일", date.EndDate)
}()
}

병렬 데이터 수집

간략적 흐름

  1. API 호출 주소 생성
  2. 생성된 주소로 실행될 Fetch 코드 생성 semaphore:30 까지 생성
  3. Fetch 30개 동시 실행
  4. 결과에 따른 Channel 분리
  5. 모든 격자가 완료될때까지 반복

모든 단계가 완료 후, Channel는 결과 수집에서 자세하게 다룹니다.

3. 결과 수집

데이터가 변환되는 과정을 보기 위해선 해당 문서를 참조하십시오. API Data To DB Data

코드 상세
// 데이터 수집 종료 대기
go func () {
wg.Wait()
close(failedChan)
close(successChan)
}()

done := make(chan struct{})

// 성공 데이터 수신 goroutine
go func () {
for d := range successChan {
success = append(success, d...)
}
done <- struct{}{}
}()

// 실패 데이터 순차 수신
for e := range failedChan {
failed = append(failed, e)
}

// 총 데이터 수신 종료 대기
<-done

간략적 흐름

  1. 각 Channel 작업 완료 대기
  2. 동시성을 활용한 successChan 수신 및 success에 결과 주입
  3. 실패할 확률이 적은것을 고려한 순차적 failedChan 수신 및 failed에 결과 주입
  4. 위의 모든 사항 수신 종료 대기 및 반환

에러 처리

  • API 호출이 실패하는 경우, 해당 재호출을 위해 호출을 위해 사용된 값을 failed 값을 통해 전달됩니다.
  • main.go에서는 이 정보를 사용하여 재시도를 수행합니다.
  • 필요에 따라, exponential backoff와 같은 재시도 전략을 FetchWeatherDataConcurrently 함수 내부에 구현할 예정입니다.

성능

순차 처리 vs 병렬 처리

- **순차 처리**: 19개 지역, 95개 데이터 약 35초
- **동시 처리**: 19개 지역, 95개 데이터 약 2초 (15배 향상)

→ 약 15배 성능 향상 

실제 측정 결과

  • 이전 (순차): 00:00:00 - 00:00:34 (34초)
  • 이후 (병렬): 00:00:00 - 00:00:02 (2초)
  • 개선율: 약 15배 향상

재시도 메커니즘

재시도 코드
// 1차 시도
var data, failGrids = concurrently.FetchWeatherDataConcurrently(grid.RepresentativeGrids, rDate)

// 1차 실패 데이터 재호출
if len(failGrids) != 0 {
log.Warn("API 호출 실패", "건수", len(failGrids))
log.Info("재호출을 시도합니다...")

newData, reFailGrids := concurrently.FetchWeatherDataConcurrently(failGrids, rDate)

data = append(data, newData...)

if reFailGrids != nil {
log.Error("재호출 후에도 API 호출 실패", "실패건수", len(reFailGrids), "메시지", "서버 상태를 확인해주십시오")
}
}

재시도

동시성 설정 조정

createSemaphore 함수

func createSemaphore(maxConcurrent int) chan struct{} {
routineLimit := min(maxConcurrent, 30) // 최대 30개 제한
return make(chan struct{}, routineLimit)
}

관련 함수

주의사항

⚠️ API Rate Limit 고려

  • 기상청 API는 공공 서비스로 제한이 있을 수 있음
  • 간단한 테스트 결과 기본값 30은 안전한 수준
  • 과도한 동시 요청은 API 차단 위험

⚠️ 메모리 사용

  • 대량 데이터 수집 시 메모리 사용량 증가
  • 필요시 배치 처리 고려

⚠️ 네트워크 안정성

  • 느린 네트워크 환경에서는 타임아웃 설정 필요
  • 재시도 로직으로 안정성 확보