diff --git a/poseidon/__init__.py b/poseidon/__init__.py index 9e85526..17e29d1 100644 --- a/poseidon/__init__.py +++ b/poseidon/__init__.py @@ -30,6 +30,8 @@ from poseidon.util.timing_variance import timing_variance from poseidon.util.von_neumann import validate_rho +from poseidon.errors import errors + from poseidon import process data = [ @@ -74,4 +76,4 @@ util = ["entropy_sn", "validate_rho", "timing_variance", "ip_to_int"] -__all__ = data + log + processpiece + prototype + simulations + tests + util + process.__all__ +__all__ = data + errors.__all__ + log + processpiece + prototype + simulations + tests + util + process.__all__ diff --git a/poseidon/data/custom_datasets.py b/poseidon/data/custom_datasets.py index 115f668..56ea224 100644 --- a/poseidon/data/custom_datasets.py +++ b/poseidon/data/custom_datasets.py @@ -401,7 +401,7 @@ def generate_netflow_dataset(n_samples=50000, random_state=42): if __name__ == "__main__": - sel_n_samples = 25000 + sel_n_samples = 10000 output_path = os.path.join( DATASETS_CUSTOM_PATH, f"{sel_n_samples}s-NF-custom-dataset-{int(time.time())}.csv", diff --git a/poseidon/errors/__init__.py b/poseidon/errors/__init__.py new file mode 100644 index 0000000..6c91cbc --- /dev/null +++ b/poseidon/errors/__init__.py @@ -0,0 +1,3 @@ +from poseidon.errors import errors + +__all__ = errors.__all__ diff --git a/poseidon/errors/errors.py b/poseidon/errors/errors.py new file mode 100644 index 0000000..442d13c --- /dev/null +++ b/poseidon/errors/errors.py @@ -0,0 +1,9 @@ +class BaseErrors(Exception): + pass + + +class DatasetAnalysisError(BaseErrors): + pass + + +__all__ = ["BaseErrors", "DatasetAnalysisError"] diff --git a/poseidon/process.py b/poseidon/process.py index 79cdadc..19ba53c 100644 --- a/poseidon/process.py +++ b/poseidon/process.py @@ -1,8 +1,6 @@ import os import numpy as np -import dask.dataframe as dd from dotenv import load_dotenv -from poseidon.data.dataset_type import DatasetType from poseidon.processpiece.oversampling import Oversampling from poseidon.data.poseidon_dtypes import dtypes from poseidon.processpiece.engineering_split import DatasetSplit @@ -12,7 +10,7 @@ apply_timing_variance, apply_quantum_noise_simulation, ) -from poseidon.processpiece.load_dask_dataframe import load_large_dataset +from poseidon.processpiece.load_dask_dataframe import load_large_dataset, switch_to_dask from poseidon.log.poseidon_log import PoseidonLogger from tqdm import tqdm @@ -20,26 +18,28 @@ logger = PoseidonLogger().get_logger() load_dotenv(verbose=True) DATASETS_RESAMPLED_PATH = os.getenv("DATASETS_RESAMPLED_PATH") +DATASETS_CUSTOM_PATH = os.getenv("DATASETS_CUSTOM_PATH") def process(): - req_smote = False - + req_smote = True save_train = True save_val = False save_test = False + # use_dataset = f"{DATASETS_CUSTOM_PATH}/25000s-NF-custom-dataset-1762267172.csv" + use_dataset = f"{DATASETS_CUSTOM_PATH}/10000s-NF-custom-dataset-1762341664.csv" + logger.info("데이터셋 처리 시작") if req_smote: - resampled_df, ovs = process_oversampling( - dataset=DatasetType.NF_UNSW_NB15_V3, req_smote=True - ) + resampled_df = process_oversampling(dataset_path=use_dataset) else: resampled_df = load_large_dataset( - file_path=f"{DATASETS_RESAMPLED_PATH}/NF-UNSW-NB15-v3-smote.csv", + file_path=use_dataset, blocksize="256MB", dtypes=dtypes, npartitions=20, ) + ovs = Oversampling(dataset_path=use_dataset) logger.info(" - 데이터셋 처리 완료") logger.info("데이터셋 훈련, 검증, 테스트 분할 시작") @@ -59,7 +59,12 @@ def process(): X_train, X_val, X_test = DatasetScaling().scale( splited_X_train, splited_X_val, splited_X_test ) - logger.info(" - 데이터셋 스케일링 완료") + logger.info( + " - 데이터셋 스케일링 완료 (데이터 타입 열거: 훈련: %s, 검증: %s, 테스트: %s)", + type(X_train), + type(X_val), + type(X_test), + ) logger.info("섀넌 엔트로피 계산 시작") X_train, X_val, X_test = cal_shannon_entropy(X_train, X_val, X_test) @@ -76,20 +81,21 @@ def process(): # 파일 저장 if save_train: logger.info("훈련 세트 저장 시작") - ovs.save_local(X_train, use_pandas_output=False) + X_train.to_csv(os.path.join(DATASETS_RESAMPLED_PATH, f"{use_dataset}-X-train.csv"), index=False) + # ovs.save_local(resampled_df=X_train, use_pandas_output=False) logger.info(" - 훈련 세트 저장 완료") if save_val: logger.info("검증 세트 저장 시작") - ovs.save_local(X_val, use_pandas_output=False) + ovs.save_local(resampled_df=X_val, use_pandas_output=False) logger.info(" - 검증 세트 저장 완료") if save_test: logger.info("테스트 세트 저장 시작") - ovs.save_local(X_test, use_pandas_output=False) + ovs.save_local(resampled_df=X_test, use_pandas_output=False) logger.info(" - 테스트 세트 저장 완료") logger.info("모든 작업 완료") -def process_oversampling(dataset, req_smote, test_set_filename=None): +def process_oversampling(dataset_path: str): # 각 작업 단계를 tqdm으로 진행률 표시 (총 8단계) with tqdm( total=8, @@ -100,14 +106,7 @@ def process_oversampling(dataset, req_smote, test_set_filename=None): dynamic_ncols=True, ) as pbar: # 1. Oversampling 객체 생성 - pbar.set_description("데이터셋 로드") - ovs = Oversampling( - dataset=dataset, - is_smote_req=req_smote, - test_set_filename=test_set_filename, - ) - tqdm.write("데이터셋 로드 완료") - pbar.update(1) + ovs = Oversampling(dataset_path=dataset_path) # 2. 데이터셋 청크 로드 pbar.set_description("데이터셋 청크 로드") @@ -154,14 +153,17 @@ def process_oversampling(dataset, req_smote, test_set_filename=None): pbar.update(1) # 8. 반환 - return df, ovs + return df -def cal_shannon_entropy(to_df_X_train, to_df_X_val, to_df_X_test): +def cal_shannon_entropy( + to_df_X_train: np.ndarray, to_df_X_val: np.ndarray, to_df_X_test: np.ndarray +): def apply_entropy_dask(row): return apply_entropy(row) - to_df_X_train = dd.from_pandas(to_df_X_train, npartitions=20) + # Dask 데이터프레임인지 확인하고, 아니면 변환 + to_df_X_train = switch_to_dask(to_df_X_train) to_df_X_train["packet_entropy"] = to_df_X_train.apply( apply_entropy_dask, axis=1, @@ -172,7 +174,7 @@ def apply_entropy_dask(row): lambda x: float(x.item()) if hasattr(x, "item") else float(x) ) - to_df_X_val = dd.from_pandas(to_df_X_val, npartitions=20) + to_df_X_val = switch_to_dask(to_df_X_val) to_df_X_val["packet_entropy"] = to_df_X_val.apply( apply_entropy_dask, axis=1, @@ -183,7 +185,7 @@ def apply_entropy_dask(row): lambda x: float(x.item()) if hasattr(x, "item") else float(x) ) - to_df_X_test = dd.from_pandas(to_df_X_test, npartitions=20) + to_df_X_test = switch_to_dask(to_df_X_test) to_df_X_test["packet_entropy"] = to_df_X_test.apply( apply_entropy_dask, axis=1, @@ -197,11 +199,13 @@ def apply_entropy_dask(row): return to_df_X_train, to_df_X_val, to_df_X_test -def cal_timing_variance(to_df_X_train, to_df_X_val, to_df_X_test): +def cal_timing_variance( + to_df_X_train: np.ndarray, to_df_X_val: np.ndarray, to_df_X_test: np.ndarray +): def apply_timing_variance_dask(row): return apply_timing_variance(row) - to_df_X_train = dd.from_pandas(to_df_X_train, npartitions=20) + to_df_X_train = switch_to_dask(to_df_X_train) to_df_X_train["timing_variance"] = to_df_X_train.apply( apply_timing_variance_dask, axis=1, @@ -212,7 +216,7 @@ def apply_timing_variance_dask(row): lambda x: float(x.item()) if hasattr(x, "item") else float(x) ) - to_df_X_val = dd.from_pandas(to_df_X_val, npartitions=20) + to_df_X_val = switch_to_dask(to_df_X_val) to_df_X_val["timing_variance"] = to_df_X_val.apply( apply_timing_variance_dask, axis=1, @@ -223,7 +227,7 @@ def apply_timing_variance_dask(row): lambda x: float(x.item()) if hasattr(x, "item") else float(x) ) - to_df_X_test = dd.from_pandas(to_df_X_test, npartitions=20) + to_df_X_test = switch_to_dask(to_df_X_test) to_df_X_test["timing_variance"] = to_df_X_test.apply( apply_timing_variance_dask, axis=1, @@ -237,7 +241,9 @@ def apply_timing_variance_dask(row): return to_df_X_train, to_df_X_val, to_df_X_test -def cal_quantum_noise_simulation(to_df_X_train, to_df_X_val, to_df_X_test): +def cal_quantum_noise_simulation( + to_df_X_train: np.ndarray, to_df_X_val: np.ndarray, to_df_X_test: np.ndarray +): """ 양자 노이즈 시뮬레이션 연산을 계산하여 각 데이터프레임에 'quantum_noise_simulation' 피처를 추가합니다. @@ -263,7 +269,7 @@ def cal_quantum_noise_simulation(to_df_X_train, to_df_X_val, to_df_X_test): def apply_quantum_noise_simulation_dask(row): return apply_quantum_noise_simulation(row) - to_df_X_train = dd.from_pandas(to_df_X_train, npartitions=20) + to_df_X_train = switch_to_dask(to_df_X_train) to_df_X_train["quantum_noise_simulation"] = to_df_X_train.apply( apply_quantum_noise_simulation_dask, axis=1, @@ -274,7 +280,7 @@ def apply_quantum_noise_simulation_dask(row): "quantum_noise_simulation" ].apply(lambda x: float(x.item()) if hasattr(x, "item") else float(x)) - to_df_X_val = dd.from_pandas(to_df_X_val, npartitions=20) + to_df_X_val = switch_to_dask(to_df_X_val) to_df_X_val["quantum_noise_simulation"] = to_df_X_val.apply( apply_quantum_noise_simulation_dask, axis=1, @@ -285,7 +291,7 @@ def apply_quantum_noise_simulation_dask(row): "quantum_noise_simulation" ].apply(lambda x: float(x.item()) if hasattr(x, "item") else float(x)) - to_df_X_test = dd.from_pandas(to_df_X_test, npartitions=20) + to_df_X_test = switch_to_dask(to_df_X_test) to_df_X_test["quantum_noise_simulation"] = to_df_X_test.apply( apply_quantum_noise_simulation_dask, axis=1, diff --git a/poseidon/processpiece/engineering_split.py b/poseidon/processpiece/engineering_split.py index 3d41b41..4dab940 100644 --- a/poseidon/processpiece/engineering_split.py +++ b/poseidon/processpiece/engineering_split.py @@ -5,6 +5,7 @@ import dask.dataframe as dd from sklearn.model_selection import train_test_split +from tqdm import tqdm class DatasetSplit: @@ -45,41 +46,66 @@ def split(self, label_col="Label", random_state=42, npartitions=2): splited_y_test : dd.DataFrame """ # Dask DataFrame을 pandas DataFrame으로 변환 - resampled_df = self.resampled_dataframe.compute() + with tqdm(total=self.resampled_dataframe.npartitions, desc="Pandas 변환 중", ncols=100) as pbar: + resampled_df = self.resampled_dataframe.compute() + pbar.update(self.resampled_dataframe.npartitions) # X와 y 분리 - splited_X = resampled_df.drop(label_col, axis=1) - splited_y = resampled_df[label_col] + with tqdm(total=1, desc="X, y 분리 중", ncols=100) as pbar: + splited_X = resampled_df.drop(label_col, axis=1) + splited_y = resampled_df[label_col] + pbar.update(1) # 첫 번째 분할: 훈련(60%) vs. 임시(40%) - splited_X_train_pd, splited_X_temp_pd, splited_y_train_pd, splited_y_temp_pd = ( - train_test_split( - splited_X, - splited_y, - test_size=0.4, - stratify=splited_y, - random_state=random_state, + with tqdm(total=1, desc="첫 번째 분할 중 (훈련/임시)", ncols=100) as pbar: + splited_X_train_pd, splited_X_temp_pd, splited_y_train_pd, splited_y_temp_pd = ( + train_test_split( + splited_X, + splited_y, + test_size=0.4, + stratify=splited_y, + random_state=random_state, + ) ) - ) + pbar.update(1) # 두 번째 분할: 임시를 검증(20%) vs. 테스트(20%) - splited_X_val_pd, splited_X_test_pd, splited_y_val_pd, splited_y_test_pd = ( - train_test_split( - splited_X_temp_pd, - splited_y_temp_pd, - test_size=0.5, - stratify=splited_y_temp_pd, - random_state=random_state, + with tqdm(total=1, desc="두 번째 분할 중 (검증/테스트)", ncols=100) as pbar: + splited_X_val_pd, splited_X_test_pd, splited_y_val_pd, splited_y_test_pd = ( + train_test_split( + splited_X_temp_pd, + splited_y_temp_pd, + test_size=0.5, + stratify=splited_y_temp_pd, + random_state=random_state, + ) ) - ) + pbar.update(1) # Dask 변환 - splited_X_train = dd.from_pandas(splited_X_train_pd, npartitions=npartitions) - splited_X_val = dd.from_pandas(splited_X_val_pd, npartitions=npartitions) - splited_X_test = dd.from_pandas(splited_X_test_pd, npartitions=npartitions) - splited_y_train = dd.from_pandas(splited_y_train_pd, npartitions=npartitions) - splited_y_val = dd.from_pandas(splited_y_val_pd, npartitions=npartitions) - splited_y_test = dd.from_pandas(splited_y_test_pd, npartitions=npartitions) + datasets_to_convert = [ + (splited_X_train_pd, "X_train"), + (splited_X_val_pd, "X_val"), + (splited_X_test_pd, "X_test"), + (splited_y_train_pd, "y_train"), + (splited_y_val_pd, "y_val"), + (splited_y_test_pd, "y_test"), + ] + with tqdm(total=len(datasets_to_convert), desc="Dask 변환 중", ncols=100) as pbar: + for df_pd, name in datasets_to_convert: + if name == "X_train": + splited_X_train = dd.from_pandas(df_pd, npartitions=npartitions) + elif name == "X_val": + splited_X_val = dd.from_pandas(df_pd, npartitions=npartitions) + elif name == "X_test": + splited_X_test = dd.from_pandas(df_pd, npartitions=npartitions) + elif name == "y_train": + splited_y_train = dd.from_pandas(df_pd, npartitions=npartitions) + elif name == "y_val": + splited_y_val = dd.from_pandas(df_pd, npartitions=npartitions) + elif name == "y_test": + splited_y_test = dd.from_pandas(df_pd, npartitions=npartitions) + pbar.update(1) return ( splited_X_train, diff --git a/poseidon/processpiece/feature_calculate.py b/poseidon/processpiece/feature_calculate.py index bbf41a6..176b4f7 100644 --- a/poseidon/processpiece/feature_calculate.py +++ b/poseidon/processpiece/feature_calculate.py @@ -1,3 +1,6 @@ +import contextlib +import io + import dask.array as da import numpy as np import qutip as qt @@ -189,11 +192,15 @@ def get_feature_value(feat_name, default=0.0): p = max(p, 0.001) # 3. Bit-Flip 시뮬레이션 (생성된 초기 상태 사용) - bit_flip = BitFlipSimulation(initial_state=rho0, p_values=[p]).simulate() + # QuTiP의 출력을 억제하기 위해 stdout을 임시로 리다이렉트 + with contextlib.redirect_stdout(io.StringIO()): + bit_flip = BitFlipSimulation(initial_state=rho0, p_values=[p]).simulate() entropy_bit = bit_flip["entropies"][0] # 4. Phase-Flip 시뮬레이션 (생성된 초기 상태 사용) - phase_flip = PhaseFlipSimulation(initial_state=rho0, p_values=[p]).simulate() + # QuTiP의 출력을 억제하기 위해 stdout을 임시로 리다이렉트 + with contextlib.redirect_stdout(io.StringIO()): + phase_flip = PhaseFlipSimulation(initial_state=rho0, p_values=[p]).simulate() entropy_phase = phase_flip["entropies"][0] # 5. 평균 엔트로피 계산 diff --git a/poseidon/processpiece/load_dask_dataframe.py b/poseidon/processpiece/load_dask_dataframe.py index 4dceefc..806b662 100644 --- a/poseidon/processpiece/load_dask_dataframe.py +++ b/poseidon/processpiece/load_dask_dataframe.py @@ -4,10 +4,13 @@ import os +import numpy as np +import pandas as pd import dask.dataframe as dd from dask.distributed import Client from poseidon.log.poseidon_log import PoseidonLogger +from poseidon.errors.errors import DatasetAnalysisError def load_large_dataset( @@ -54,7 +57,7 @@ def load_large_dataset( file_path = os.path.join(dir_path, filename) else: file_path = filename - + # Dask 클라이언트 생성: 로컬 클러스터로 병렬 처리 (안전한 리소스 관리) with Client( n_workers=npartitions, threads_per_worker=1 @@ -79,8 +82,39 @@ def load_large_dataset( logging.info("샘플 데이터가 로드되었습니다(첫 5행):\n%s", sample) # 기본 통계 계산으로 데이터 무결성 확인 (대용량 시 생략 가능) - stats = df.describe().compute() - logging.info(" - 데이터셋 통계는 다음과 같습니다:\n%s", stats) + # 타입 변환 오류 방지: describe() 호출 시 타입 충돌 처리 + try: + # describe()는 자동으로 수치형 컬럼만 선택하므로 타입 충돌 시 예외 처리 + stats = df.describe().compute() + logging.info(" - 데이터셋 통계는 다음과 같습니다:\n%s", stats) + except (ValueError, TypeError) as e: + # 타입 변환 오류 발생 시: 실제 데이터 타입을 사용하여 재시도 + logging.warning( + " - 데이터셋 통계 계산 중 타입 변환 오류가 발생했습니다: %s. 실제 데이터 타입을 사용하여 재시도합니다.", + str(e), + ) + try: + # 실제 데이터 타입을 확인하고 수치형 컬럼만 선택 + # Dask DataFrame의 dtypes 속성 사용 + numeric_cols = [ + col + for col, dtype in df.dtypes.items() + if np.issubdtype(dtype, np.number) + or str(dtype) + in ["int64", "float64", "int32", "float32", "Int64", "Float64"] + ] + + if numeric_cols: + numeric_df = df[numeric_cols] + stats = numeric_df.describe().compute() + logging.info(" - 데이터셋 통계는 다음과 같습니다:\n%s", stats) + else: + logging.info( + " - 수치형 컬럼이 없어 통계를 계산할 수 없습니다." + ) + except DatasetAnalysisError as e2: + # 재시도도 실패하면 경고만 출력하고 계속 진행 + logging.warning(" - 데이터셋 통계 계산을 건너뜁니다: %s", str(e2)) num_partitions = df.npartitions logging.info( @@ -100,4 +134,66 @@ def load_large_dataset( raise -__all__ = ["load_large_dataset"] +def switch_to_pandas(target, exclude_features: list = None): + if exclude_features is None: + exclude_features = [ + "IPV4_SRC_ADDR", + "IPV4_DST_ADDR", + "L4_SRC_PORT", + "L4_DST_PORT", + ] + columns_list = [name for name in target.columns if name not in exclude_features] + if isinstance(target, dd.DataFrame): + return target.compute() + elif isinstance(target, np.ndarray): + return pd.DataFrame(target, columns=columns_list) + elif isinstance(target, pd.DataFrame): + return target[columns_list] + else: + raise ValueError(f"{type(target)}은(는) 지원되지 않는 데이터 타입입니다.") + + +def switch_to_dask(target, exclude_features: list = None, npartitions: int = 20): + """ + 다양한 데이터 타입을 Dask DataFrame으로 변환하는 함수입니다. + + Args: + target: 변환할 데이터 (ndarray, pd.DataFrame, 또는 dd.DataFrame) + exclude_features: 제외할 컬럼 리스트 (기본값: IP 주소 및 포트 관련 컬럼) + npartitions: Dask DataFrame의 파티션 수 (기본값: 20) + + Returns: + dd.DataFrame: 변환된 Dask DataFrame + """ + if exclude_features is None: + exclude_features = [ + "IPV4_SRC_ADDR", + "IPV4_DST_ADDR", + "L4_SRC_PORT", + "L4_DST_PORT", + ] + + if isinstance(target, dd.DataFrame): + # 이미 Dask DataFrame인 경우 + columns_list = [name for name in target.columns if name not in exclude_features] + result = target[columns_list] + # 파티션 수 조정 (필요한 경우) + if result.npartitions != npartitions: + result = result.repartition(npartitions=npartitions) + return result + elif isinstance(target, pd.DataFrame): + # Pandas DataFrame을 Dask DataFrame으로 변환 + columns_list = [name for name in target.columns if name not in exclude_features] + filtered_df = target[columns_list] + return dd.from_pandas(filtered_df, npartitions=npartitions) + elif isinstance(target, np.ndarray): + # NumPy 배열을 Dask DataFrame으로 변환 + # 컬럼 이름이 없으므로 숫자로 생성하거나, exclude_features를 고려하지 않음 + # 실제 사용 시 컬럼 이름을 제공하는 것이 좋지만, 여기서는 기본 처리 + df = pd.DataFrame(target) + return dd.from_pandas(df, npartitions=npartitions) + else: + raise ValueError(f"{type(target)}은(는) 지원되지 않는 데이터 타입입니다. ndarray, pd.DataFrame, 또는 dd.DataFrame만 지원됩니다.") + + +__all__ = ["load_large_dataset", "switch_to_pandas"] diff --git a/poseidon/processpiece/oversampling.py b/poseidon/processpiece/oversampling.py index 156d5a7..13aa348 100644 --- a/poseidon/processpiece/oversampling.py +++ b/poseidon/processpiece/oversampling.py @@ -10,9 +10,9 @@ from dotenv import load_dotenv from poseidon.data.dataset import clip_partition, shuffle_and_split -from poseidon.data.dataset_type import DatasetType from poseidon.data.smote_knn import smote from poseidon.processpiece.load_dask_dataframe import load_large_dataset +from poseidon.log.poseidon_log import PoseidonLogger load_dotenv(verbose=True) DATASETS_ORIGIN_PATH = os.getenv("DATASETS_ORIGIN_PATH") @@ -24,6 +24,9 @@ ) +logger = PoseidonLogger().get_logger() + + class Oversampling: """ 원본 데이터셋에서 특정 피처의 불균형을 해결하기 위해 SMOTE 오버샘플링을 적용하는 클래스입니다. @@ -34,24 +37,9 @@ class Oversampling: 경우에 따라 tqdm 기능이 추가될 수 있습니다. """ - def __init__( - self, dataset: DatasetType = None, is_smote_req=True, test_set_filename=None - ): - self.dataset = dataset - self.test_set_filename = test_set_filename - # 테스트 셋이 아닌 경우 - if dataset != DatasetType.CUSTOM: - # 오버샘플링이 필요한 경우는 ORIGIN 에서 가져옴 (파일 이름에 변경 없음) - if is_smote_req: - self.dataset_path = os.path.join(DATASETS_ORIGIN_PATH, f"{dataset}.csv") - else: # 오버샘플링을 생략하는 경우는 RESAMPLED 에서 가져옴 (파일 이름에 "-smote" 추가되어 있음) - self.dataset_path = os.path.join( - DATASETS_RESAMPLED_PATH, f"{dataset}-smote.csv" - ) - else: # 데이터셋을 호출한 경우는 CUSTOM 에서 가져옴 (사용자가 파일 이름 지정) - self.dataset_path = os.path.join( - DATASETS_CUSTOM_PATH, f"{test_set_filename}.csv" - ) + def __init__(self, dataset_path: str): + self.dataset_path = dataset_path + self.dataset_name = os.path.basename(dataset_path) # 확장자명 포함 def load_chunks( self, file_format="csv", dtypes=None, blocksize="256MB", npartitions=20 @@ -157,84 +145,48 @@ def get_resampled_df( def save_local( self, resampled_df, - save_filename=None, - path=None, + output_path: str = DATASETS_RESAMPLED_PATH, use_smote_suffix=True, use_pandas_output=True, index=False, ): - try: - from tqdm import tqdm - - write_fn = tqdm.write - except ImportError: - write_fn = print - - # 파일명 결정: save_filename이 None이면 test_set_filename 또는 dataset 사용 - if save_filename is None: - if self.dataset == DatasetType.CUSTOM and self.test_set_filename: - # 확장자 제거 (.csv 등) - save_filename = os.path.splitext(self.test_set_filename)[0] - elif self.dataset: - save_filename = str(self.dataset) - else: - save_filename = "resampled_dataset" - - # 파일명에서 경로 구분자 제거 (파일명만 추출) - save_filename = os.path.basename(save_filename) - # 확장자 제거 (이미 확장자가 있을 수 있음) - save_filename = os.path.splitext(save_filename)[0] - - # 최종 파일명 생성 - suffix = "-smote" if use_smote_suffix else "" - filename = f"{save_filename}{suffix}.csv" - - # 저장 경로 결정 - if path is None: - # DATASETS_RESAMPLED_PATH 디렉토리가 없으면 생성 - os.makedirs(DATASETS_RESAMPLED_PATH, exist_ok=True) - save_path = os.path.join(DATASETS_RESAMPLED_PATH, filename) + if use_smote_suffix: + # 확장자명은 이미 self.dataset_name에 포함되어 있기 때문에, 뒤에서부터 "."을 읽고 확장자 앞 부분에 "-smote" 추가 + save_filename = ( + self.dataset_name.rsplit(".", 1)[0] + + "-smote." + + self.dataset_name.rsplit(".", 1)[1] + ) else: - # 지정된 경로가 디렉토리인지 파일인지 확인 - if os.path.isdir(path) or ( - not os.path.exists(path) and not path.endswith(".csv") - ): - # 디렉토리인 경우 파일명 추가 - os.makedirs(path, exist_ok=True) - save_path = os.path.join(path, filename) - else: - # 파일 경로인 경우 그대로 사용 - save_path = path + save_filename = self.dataset_name if use_pandas_output: # Dask DataFrame을 단일 CSV 파일로 저장하기 위해 compute() 사용 - write_fn( - f"> Pandas 데이터프레임을 로컬에 저장 중... (지정된 최종 경로: {save_path})" + logger.info( + "Pandas 데이터프레임 저장 중 ... (지정 경로: %s)", + os.path.join(output_path, save_filename), ) - resampled_df.compute().to_csv(save_path, index=index) - write_fn(" - Pandas 데이터프레임 로컬에 저장 완료") + if isinstance(resampled_df, pd.DataFrame): + resampled_df.to_csv( + os.path.join(output_path, save_filename), index=index + ) + elif isinstance(resampled_df, dd.DataFrame): + resampled_df.compute().to_csv( + os.path.join(output_path, save_filename), index=index + ) + else: + raise ValueError( + f"'{type(resampled_df)}'은(는) 지원되지 않는 데이터 타입입니다." + ) + logger.info(" - Pandas 데이터프레임 저장 완료") else: # Dask DataFrame을 파티션별로 저장 - # save_path를 디렉토리로 사용하고, 각 파티션 파일명을 name_function으로 지정 - save_dir = ( - os.path.dirname(save_path) - if os.path.dirname(save_path) - else DATASETS_RESAMPLED_PATH - ) - os.makedirs(save_dir, exist_ok=True) - - # 파일명에서 확장자 제거 (예: "dataset-smote.csv" -> "dataset-smote") - base_filename = os.path.splitext(os.path.basename(save_path))[0] - - # 각 파티션 파일명을 생성하는 함수 - def name_function(i): - return os.path.join(save_dir, f"{base_filename}-{i}.part") - - write_fn( - f"> Dask 데이터프레임을 로컬에 저장 중... (디렉토리: {save_dir}, 파일명 형식: {base_filename}-{{번호}}.part)" + logger.info( + "Dask 데이터프레임 저장 중 ... (지정 경로: %s)", + os.path.join(output_path, save_filename), ) - resampled_df.to_csv(save_dir, name_function=name_function, index=index) - write_fn(" - Dask 데이터프레임 로컬에 저장 완료") + resampled_df.to_csv(os.path.join(output_path, save_filename), index=index) + logger.info(" - Dask 데이터프레임 저장 완료") __all__ = ["Oversampling"] diff --git a/poseidon/simulations/noise_modeling.py b/poseidon/simulations/noise_modeling.py index 2c243ed..b9446d4 100644 --- a/poseidon/simulations/noise_modeling.py +++ b/poseidon/simulations/noise_modeling.py @@ -189,7 +189,6 @@ def simulate(self, verbose: int = 0): pur_value = rho_noisy.purity() self.result.purities.append(pur_value) # 폰 노이만 엔트로피 계산 - print(rho_noisy) rho_noisy_valid, trace_val = validate_rho(rho_noisy) ent_value = entropy_vn_jitted(rho_noisy_valid, trace_val) self.result.entropies.append(ent_value)