Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion poseidon/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,8 @@
from poseidon.util.timing_variance import timing_variance
from poseidon.util.von_neumann import validate_rho

from poseidon.errors import errors

from poseidon import process

data = [
Expand Down Expand Up @@ -74,4 +76,4 @@

util = ["entropy_sn", "validate_rho", "timing_variance", "ip_to_int"]

__all__ = data + log + processpiece + prototype + simulations + tests + util + process.__all__
__all__ = data + errors.__all__ + log + processpiece + prototype + simulations + tests + util + process.__all__
2 changes: 1 addition & 1 deletion poseidon/data/custom_datasets.py
Original file line number Diff line number Diff line change
Expand Up @@ -401,7 +401,7 @@ def generate_netflow_dataset(n_samples=50000, random_state=42):


if __name__ == "__main__":
sel_n_samples = 25000
sel_n_samples = 10000
output_path = os.path.join(
DATASETS_CUSTOM_PATH,
f"{sel_n_samples}s-NF-custom-dataset-{int(time.time())}.csv",
Expand Down
3 changes: 3 additions & 0 deletions poseidon/errors/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
from poseidon.errors import errors

__all__ = errors.__all__
9 changes: 9 additions & 0 deletions poseidon/errors/errors.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
class BaseErrors(Exception):
pass


class DatasetAnalysisError(BaseErrors):
pass


__all__ = ["BaseErrors", "DatasetAnalysisError"]
76 changes: 41 additions & 35 deletions poseidon/process.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,6 @@
import os
import numpy as np
import dask.dataframe as dd
from dotenv import load_dotenv
from poseidon.data.dataset_type import DatasetType
from poseidon.processpiece.oversampling import Oversampling
from poseidon.data.poseidon_dtypes import dtypes
from poseidon.processpiece.engineering_split import DatasetSplit
Expand All @@ -12,34 +10,36 @@
apply_timing_variance,
apply_quantum_noise_simulation,
)
from poseidon.processpiece.load_dask_dataframe import load_large_dataset
from poseidon.processpiece.load_dask_dataframe import load_large_dataset, switch_to_dask
from poseidon.log.poseidon_log import PoseidonLogger

from tqdm import tqdm

logger = PoseidonLogger().get_logger()
load_dotenv(verbose=True)
DATASETS_RESAMPLED_PATH = os.getenv("DATASETS_RESAMPLED_PATH")
DATASETS_CUSTOM_PATH = os.getenv("DATASETS_CUSTOM_PATH")


def process():
req_smote = False

req_smote = True
save_train = True
save_val = False
save_test = False
# use_dataset = f"{DATASETS_CUSTOM_PATH}/25000s-NF-custom-dataset-1762267172.csv"
use_dataset = f"{DATASETS_CUSTOM_PATH}/10000s-NF-custom-dataset-1762341664.csv"

logger.info("데이터셋 처리 시작")
if req_smote:
resampled_df, ovs = process_oversampling(
dataset=DatasetType.NF_UNSW_NB15_V3, req_smote=True
)
resampled_df = process_oversampling(dataset_path=use_dataset)
else:
resampled_df = load_large_dataset(
file_path=f"{DATASETS_RESAMPLED_PATH}/NF-UNSW-NB15-v3-smote.csv",
file_path=use_dataset,
blocksize="256MB",
dtypes=dtypes,
npartitions=20,
)
ovs = Oversampling(dataset_path=use_dataset)
logger.info(" - 데이터셋 처리 완료")

logger.info("데이터셋 훈련, 검증, 테스트 분할 시작")
Expand All @@ -59,7 +59,12 @@ def process():
X_train, X_val, X_test = DatasetScaling().scale(
splited_X_train, splited_X_val, splited_X_test
)
logger.info(" - 데이터셋 스케일링 완료")
logger.info(
" - 데이터셋 스케일링 완료 (데이터 타입 열거: 훈련: %s, 검증: %s, 테스트: %s)",
type(X_train),
type(X_val),
type(X_test),
)

logger.info("섀넌 엔트로피 계산 시작")
X_train, X_val, X_test = cal_shannon_entropy(X_train, X_val, X_test)
Expand All @@ -76,20 +81,21 @@ def process():
# 파일 저장
if save_train:
logger.info("훈련 세트 저장 시작")
ovs.save_local(X_train, use_pandas_output=False)
X_train.to_csv(os.path.join(DATASETS_RESAMPLED_PATH, f"{use_dataset}-X-train.csv"), index=False)
# ovs.save_local(resampled_df=X_train, use_pandas_output=False)
logger.info(" - 훈련 세트 저장 완료")
if save_val:
logger.info("검증 세트 저장 시작")
ovs.save_local(X_val, use_pandas_output=False)
ovs.save_local(resampled_df=X_val, use_pandas_output=False)
logger.info(" - 검증 세트 저장 완료")
if save_test:
logger.info("테스트 세트 저장 시작")
ovs.save_local(X_test, use_pandas_output=False)
ovs.save_local(resampled_df=X_test, use_pandas_output=False)
logger.info(" - 테스트 세트 저장 완료")
logger.info("모든 작업 완료")


def process_oversampling(dataset, req_smote, test_set_filename=None):
def process_oversampling(dataset_path: str):
# 각 작업 단계를 tqdm으로 진행률 표시 (총 8단계)
with tqdm(
total=8,
Expand All @@ -100,14 +106,7 @@ def process_oversampling(dataset, req_smote, test_set_filename=None):
dynamic_ncols=True,
) as pbar:
# 1. Oversampling 객체 생성
pbar.set_description("데이터셋 로드")
ovs = Oversampling(
dataset=dataset,
is_smote_req=req_smote,
test_set_filename=test_set_filename,
)
tqdm.write("데이터셋 로드 완료")
pbar.update(1)
ovs = Oversampling(dataset_path=dataset_path)

# 2. 데이터셋 청크 로드
pbar.set_description("데이터셋 청크 로드")
Expand Down Expand Up @@ -154,14 +153,17 @@ def process_oversampling(dataset, req_smote, test_set_filename=None):
pbar.update(1)

# 8. 반환
return df, ovs
return df


def cal_shannon_entropy(to_df_X_train, to_df_X_val, to_df_X_test):
def cal_shannon_entropy(
to_df_X_train: np.ndarray, to_df_X_val: np.ndarray, to_df_X_test: np.ndarray
):
def apply_entropy_dask(row):
return apply_entropy(row)

to_df_X_train = dd.from_pandas(to_df_X_train, npartitions=20)
# Dask 데이터프레임인지 확인하고, 아니면 변환
to_df_X_train = switch_to_dask(to_df_X_train)
to_df_X_train["packet_entropy"] = to_df_X_train.apply(
apply_entropy_dask,
axis=1,
Expand All @@ -172,7 +174,7 @@ def apply_entropy_dask(row):
lambda x: float(x.item()) if hasattr(x, "item") else float(x)
)

to_df_X_val = dd.from_pandas(to_df_X_val, npartitions=20)
to_df_X_val = switch_to_dask(to_df_X_val)
to_df_X_val["packet_entropy"] = to_df_X_val.apply(
apply_entropy_dask,
axis=1,
Expand All @@ -183,7 +185,7 @@ def apply_entropy_dask(row):
lambda x: float(x.item()) if hasattr(x, "item") else float(x)
)

to_df_X_test = dd.from_pandas(to_df_X_test, npartitions=20)
to_df_X_test = switch_to_dask(to_df_X_test)
to_df_X_test["packet_entropy"] = to_df_X_test.apply(
apply_entropy_dask,
axis=1,
Expand All @@ -197,11 +199,13 @@ def apply_entropy_dask(row):
return to_df_X_train, to_df_X_val, to_df_X_test


def cal_timing_variance(to_df_X_train, to_df_X_val, to_df_X_test):
def cal_timing_variance(
to_df_X_train: np.ndarray, to_df_X_val: np.ndarray, to_df_X_test: np.ndarray
):
def apply_timing_variance_dask(row):
return apply_timing_variance(row)

to_df_X_train = dd.from_pandas(to_df_X_train, npartitions=20)
to_df_X_train = switch_to_dask(to_df_X_train)
to_df_X_train["timing_variance"] = to_df_X_train.apply(
apply_timing_variance_dask,
axis=1,
Expand All @@ -212,7 +216,7 @@ def apply_timing_variance_dask(row):
lambda x: float(x.item()) if hasattr(x, "item") else float(x)
)

to_df_X_val = dd.from_pandas(to_df_X_val, npartitions=20)
to_df_X_val = switch_to_dask(to_df_X_val)
to_df_X_val["timing_variance"] = to_df_X_val.apply(
apply_timing_variance_dask,
axis=1,
Expand All @@ -223,7 +227,7 @@ def apply_timing_variance_dask(row):
lambda x: float(x.item()) if hasattr(x, "item") else float(x)
)

to_df_X_test = dd.from_pandas(to_df_X_test, npartitions=20)
to_df_X_test = switch_to_dask(to_df_X_test)
to_df_X_test["timing_variance"] = to_df_X_test.apply(
apply_timing_variance_dask,
axis=1,
Expand All @@ -237,7 +241,9 @@ def apply_timing_variance_dask(row):
return to_df_X_train, to_df_X_val, to_df_X_test


def cal_quantum_noise_simulation(to_df_X_train, to_df_X_val, to_df_X_test):
def cal_quantum_noise_simulation(
to_df_X_train: np.ndarray, to_df_X_val: np.ndarray, to_df_X_test: np.ndarray
):
"""
양자 노이즈 시뮬레이션 연산을 계산하여 각 데이터프레임에 'quantum_noise_simulation' 피처를 추가합니다.

Expand All @@ -263,7 +269,7 @@ def cal_quantum_noise_simulation(to_df_X_train, to_df_X_val, to_df_X_test):
def apply_quantum_noise_simulation_dask(row):
return apply_quantum_noise_simulation(row)

to_df_X_train = dd.from_pandas(to_df_X_train, npartitions=20)
to_df_X_train = switch_to_dask(to_df_X_train)
to_df_X_train["quantum_noise_simulation"] = to_df_X_train.apply(
apply_quantum_noise_simulation_dask,
axis=1,
Expand All @@ -274,7 +280,7 @@ def apply_quantum_noise_simulation_dask(row):
"quantum_noise_simulation"
].apply(lambda x: float(x.item()) if hasattr(x, "item") else float(x))

to_df_X_val = dd.from_pandas(to_df_X_val, npartitions=20)
to_df_X_val = switch_to_dask(to_df_X_val)
to_df_X_val["quantum_noise_simulation"] = to_df_X_val.apply(
apply_quantum_noise_simulation_dask,
axis=1,
Expand All @@ -285,7 +291,7 @@ def apply_quantum_noise_simulation_dask(row):
"quantum_noise_simulation"
].apply(lambda x: float(x.item()) if hasattr(x, "item") else float(x))

to_df_X_test = dd.from_pandas(to_df_X_test, npartitions=20)
to_df_X_test = switch_to_dask(to_df_X_test)
to_df_X_test["quantum_noise_simulation"] = to_df_X_test.apply(
apply_quantum_noise_simulation_dask,
axis=1,
Expand Down
76 changes: 51 additions & 25 deletions poseidon/processpiece/engineering_split.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@

import dask.dataframe as dd
from sklearn.model_selection import train_test_split
from tqdm import tqdm


class DatasetSplit:
Expand Down Expand Up @@ -45,41 +46,66 @@ def split(self, label_col="Label", random_state=42, npartitions=2):
splited_y_test : dd.DataFrame
"""
# Dask DataFrame을 pandas DataFrame으로 변환
resampled_df = self.resampled_dataframe.compute()
with tqdm(total=self.resampled_dataframe.npartitions, desc="Pandas 변환 중", ncols=100) as pbar:
resampled_df = self.resampled_dataframe.compute()
pbar.update(self.resampled_dataframe.npartitions)

# X와 y 분리
splited_X = resampled_df.drop(label_col, axis=1)
splited_y = resampled_df[label_col]
with tqdm(total=1, desc="X, y 분리 중", ncols=100) as pbar:
splited_X = resampled_df.drop(label_col, axis=1)
splited_y = resampled_df[label_col]
pbar.update(1)

# 첫 번째 분할: 훈련(60%) vs. 임시(40%)
splited_X_train_pd, splited_X_temp_pd, splited_y_train_pd, splited_y_temp_pd = (
train_test_split(
splited_X,
splited_y,
test_size=0.4,
stratify=splited_y,
random_state=random_state,
with tqdm(total=1, desc="첫 번째 분할 중 (훈련/임시)", ncols=100) as pbar:
splited_X_train_pd, splited_X_temp_pd, splited_y_train_pd, splited_y_temp_pd = (
train_test_split(
splited_X,
splited_y,
test_size=0.4,
stratify=splited_y,
random_state=random_state,
)
)
)
pbar.update(1)

# 두 번째 분할: 임시를 검증(20%) vs. 테스트(20%)
splited_X_val_pd, splited_X_test_pd, splited_y_val_pd, splited_y_test_pd = (
train_test_split(
splited_X_temp_pd,
splited_y_temp_pd,
test_size=0.5,
stratify=splited_y_temp_pd,
random_state=random_state,
with tqdm(total=1, desc="두 번째 분할 중 (검증/테스트)", ncols=100) as pbar:
splited_X_val_pd, splited_X_test_pd, splited_y_val_pd, splited_y_test_pd = (
train_test_split(
splited_X_temp_pd,
splited_y_temp_pd,
test_size=0.5,
stratify=splited_y_temp_pd,
random_state=random_state,
)
)
)
pbar.update(1)

# Dask 변환
splited_X_train = dd.from_pandas(splited_X_train_pd, npartitions=npartitions)
splited_X_val = dd.from_pandas(splited_X_val_pd, npartitions=npartitions)
splited_X_test = dd.from_pandas(splited_X_test_pd, npartitions=npartitions)
splited_y_train = dd.from_pandas(splited_y_train_pd, npartitions=npartitions)
splited_y_val = dd.from_pandas(splited_y_val_pd, npartitions=npartitions)
splited_y_test = dd.from_pandas(splited_y_test_pd, npartitions=npartitions)
datasets_to_convert = [
(splited_X_train_pd, "X_train"),
(splited_X_val_pd, "X_val"),
(splited_X_test_pd, "X_test"),
(splited_y_train_pd, "y_train"),
(splited_y_val_pd, "y_val"),
(splited_y_test_pd, "y_test"),
]
with tqdm(total=len(datasets_to_convert), desc="Dask 변환 중", ncols=100) as pbar:
for df_pd, name in datasets_to_convert:
if name == "X_train":
splited_X_train = dd.from_pandas(df_pd, npartitions=npartitions)
elif name == "X_val":
splited_X_val = dd.from_pandas(df_pd, npartitions=npartitions)
elif name == "X_test":
splited_X_test = dd.from_pandas(df_pd, npartitions=npartitions)
elif name == "y_train":
splited_y_train = dd.from_pandas(df_pd, npartitions=npartitions)
elif name == "y_val":
splited_y_val = dd.from_pandas(df_pd, npartitions=npartitions)
elif name == "y_test":
splited_y_test = dd.from_pandas(df_pd, npartitions=npartitions)
pbar.update(1)

return (
splited_X_train,
Expand Down
11 changes: 9 additions & 2 deletions poseidon/processpiece/feature_calculate.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,6 @@
import contextlib
import io

import dask.array as da
import numpy as np
import qutip as qt
Expand Down Expand Up @@ -189,11 +192,15 @@ def get_feature_value(feat_name, default=0.0):
p = max(p, 0.001)

# 3. Bit-Flip 시뮬레이션 (생성된 초기 상태 사용)
bit_flip = BitFlipSimulation(initial_state=rho0, p_values=[p]).simulate()
# QuTiP의 출력을 억제하기 위해 stdout을 임시로 리다이렉트
with contextlib.redirect_stdout(io.StringIO()):
bit_flip = BitFlipSimulation(initial_state=rho0, p_values=[p]).simulate()
entropy_bit = bit_flip["entropies"][0]

# 4. Phase-Flip 시뮬레이션 (생성된 초기 상태 사용)
phase_flip = PhaseFlipSimulation(initial_state=rho0, p_values=[p]).simulate()
# QuTiP의 출력을 억제하기 위해 stdout을 임시로 리다이렉트
with contextlib.redirect_stdout(io.StringIO()):
phase_flip = PhaseFlipSimulation(initial_state=rho0, p_values=[p]).simulate()
entropy_phase = phase_flip["entropies"][0]

# 5. 평균 엔트로피 계산
Expand Down
Loading