Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions src/data/loader.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
# src/data/loader.py

"""Data loading module for MovieLens 1M dataset."""

import pandas as pd
from typing import Dict
from pathlib import Path
Expand All @@ -22,10 +24,8 @@
def load_dataset(folder: Path = FILE_PATH) -> Dict[str, pd.DataFrame]:
"""
Load all .dat files and assign headers based on filename.

Args:
folder (Path): The folder containing the .dat files.

Returns:
Dict[str, pd.DataFrame]: A dictionary mapping dataset names to DataFrames.
"""
Expand Down
36 changes: 16 additions & 20 deletions src/data/preprocessor.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,6 @@
# src/data/preprocessor.py

"""
Preprocessing module for MovieLens 1M dataset.

Responsibilities:
- Filter low activity movies
- Train/test split
"""
"""Preprocessing module for MovieLens 1M dataset."""

import pandas as pd
from sklearn.model_selection import train_test_split
Expand All @@ -26,6 +20,7 @@ def _get_missing_values(dataframe: pd.DataFrame) -> tuple:
"""
missing_values = dataframe.isnull().sum().sum()
missing_percentage = (missing_values / len(dataframe)) * 100

logger.info(f"Missing values: {missing_values}, Missing percentage: {missing_percentage:.2f}%")
return missing_values, missing_percentage

Expand All @@ -41,6 +36,7 @@ def _remove_duplicates(dataframe: pd.DataFrame) -> pd.DataFrame:
dataframe with duplicates removed
"""
duplicates = dataframe.duplicated().sum()

logger.info(f"Duplicated rows: {duplicates}")
return dataframe.drop_duplicates()

Expand All @@ -49,32 +45,31 @@ def _remove_duplicates(dataframe: pd.DataFrame) -> pd.DataFrame:
def filter_movies(ratings: pd.DataFrame, min_ratings: int = 10) -> pd.DataFrame:
"""
Filter out movies with fewer than min_ratings ratings.

Args:
ratings: ratings dataframe
min_ratings: minimum number of ratings required
Returns:
filtered ratings dataframe
"""
logger.info(f"Movies before filtering: {ratings['movie_id'].nunique()}")

# Filter logic: count ratings per movie and keep only those with enough ratings
movie_counts = ratings.groupby("movie_id")["rating"].count()
valid_movies = movie_counts[movie_counts >= min_ratings].index

# Log the number of movies removed due to low ratings
filtered = ratings[ratings["movie_id"].isin(valid_movies)]
movies_removed = ratings["movie_id"].nunique() - filtered["movie_id"].nunique()
logger.info(f"Movies after filtering: {filtered['movie_id'].nunique()}")

logger.info(f"Filtering movies with less than {min_ratings} ratings...")
logger.info(f"Before filtering: {ratings['movie_id'].nunique()}")
logger.info(f"After filtering: {filtered['movie_id'].nunique()}")
movies_removed = ratings["movie_id"].nunique() - filtered["movie_id"].nunique()
logger.info(f"Movies removed: {movies_removed}")

return filtered


# Train/test split for ratings
def train_test_split_ratings(
ratings: pd.DataFrame,
test_size: float = 0.2,
random_state: int = 42
) -> tuple:
def train_test_split_ratings(ratings: pd.DataFrame, test_size: float = 0.2, random_state: int = 42) -> tuple:
"""
Split ratings into train and test sets.

Expand All @@ -89,7 +84,7 @@ def train_test_split_ratings(
ratings,
test_size=test_size,
random_state=random_state,
stratify=ratings["user_id"]
stratify=ratings["user_id"] # stratify by user_id to ensure all users are represented in both sets
)

logger.info(f"Train size: {train.shape}")
Expand All @@ -111,18 +106,19 @@ def preprocess_pipeline(ratings: pd.DataFrame,movies: pd.DataFrame, users: pd.Da
tuple: train, test, movies, users
"""
# check missing values for all
logger.info("Checking for missing values...")
logger.info("Checking Missing Valuses...")
_get_missing_values(ratings)
_get_missing_values(movies)
_get_missing_values(users)

# remove duplicates from all
logger.info("Duplicate rows check and removal...")
logger.info("Duplicate Check and Removal...")
ratings = _remove_duplicates(ratings)
movies = _remove_duplicates(movies)
users = _remove_duplicates(users)

# filter low activity movies from ratings
logger.info("Filtering low activity movies...")
ratings = filter_movies(ratings)

# train test split on ratings
Expand Down
97 changes: 85 additions & 12 deletions src/features/build_features.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,26 +9,26 @@
- build_interaction_features: Builds interaction features from the ratings DataFrame.
"""

from pathlib import Path

import pandas as pd
import numpy as np
from pathlib import Path
from scipy.sparse import csr_matrix

from src.utils import get_logger

# Initialize logger
logger = get_logger(__name__)


def build_user_item_matrix(train: pd.DataFrame) -> tuple:
"""
Build sparse User-Item Matrix from training data.

Args:
train: training dataframe

train: training dataframe (ratings train data)
Returns:
tuple: sparse matrix, user_map, item_map
"""
from scipy.sparse import csr_matrix

# create mappings
user_ids = train["user_id"].unique()
Expand All @@ -48,13 +48,55 @@ def build_user_item_matrix(train: pd.DataFrame) -> tuple:
shape=(len(user_map), len(item_map))
)

# calculate sparsity
sparsity = 1 - user_item_matrix.nnz / (user_item_matrix.shape[0] * user_item_matrix.shape[1])

# log matrix info
logger.info(f"Matrix shape: {user_item_matrix.shape}")
logger.info(f"Sparsity: {1 - user_item_matrix.nnz / (user_item_matrix.shape[0] * user_item_matrix.shape[1]):.4f}")
logger.info(f"Sparsity: {sparsity:.4f}")

return user_item_matrix, user_map, item_map


# Note: This function is used by the feature pipeline and can also be reused for future feature engineering steps.
# Marix normalization function
def normalize_matrix(user_item_matrix: csr_matrix) -> tuple:
"""
Normalize user ratings by subtracting user mean.

Args:
user_item_matrix: sparse user-item matrix
Returns:
tuple: normalized matrix, user means
"""
# lil_matrix for efficient row operations, then convert back to csr_matrix
from scipy.sparse import lil_matrix

# initialize user means array
user_means = np.zeros(user_item_matrix.shape[0])

# calculate user means
for i in range(user_item_matrix.shape[0]):
user_ratings = user_item_matrix[i].toarray().flatten()
rated = user_ratings[user_ratings > 0]
if len(rated) > 0:
user_means[i] = rated.mean()

# convert to lil_matrix for efficient row operations
normalized = lil_matrix(user_item_matrix.shape, dtype=float)

for i in range(user_item_matrix.shape[0]):
row = user_item_matrix[i].toarray().flatten()
nonzero_idx = np.where(row > 0)[0]
if len(nonzero_idx) > 0 and user_means[i] > 0:
normalized[i, nonzero_idx] = row[nonzero_idx] - user_means[i]

# convert back to csr_matrix for efficient computations later
normalized = normalized.tocsr()

return normalized, user_means


# Functions to save features to disk
def save_features(user_item_matrix, user_map: dict, item_map: dict) -> None:
"""
Save sparse matrix and mappings to disk.
Expand All @@ -63,8 +105,7 @@ def save_features(user_item_matrix, user_map: dict, item_map: dict) -> None:
user_item_matrix: sparse user-item matrix
user_map: mapping of user_id to row index
item_map: mapping of movie_id to col index
"""

"""
from scipy.sparse import save_npz

matrix_path = Path(__file__).parent.parent.parent / "data" / "processed"
Expand All @@ -87,21 +128,53 @@ def save_features(user_item_matrix, user_map: dict, item_map: dict) -> None:
logger.info(f"Saved matrix to {matrix_path}")
logger.info(f"Saved mappings to {mappings_path}")


# Function to save normalized matrix and user means
def save_normalized_matrix(normalized_matrix: csr_matrix, user_means: np.ndarray) -> None:
"""
Save normalized matrix to disk.

Args:
normalized_matrix: normalized sparse user-item matrix
user_means: array of user means
"""
from scipy.sparse import save_npz

matrix_path = Path(__file__).parent.parent.parent / "data" / "processed"
matrix_path.mkdir(parents=True, exist_ok=True)

save_npz(matrix_path / "normalized_matrix.npz", normalized_matrix)
np.save(matrix_path / "user_means.npy", user_means)

logger.info(f"Saved normalized matrix to {matrix_path}")
logger.info(f"Saved user means to {matrix_path}")


# Main pipeline function to run all feature building steps
def build_features_pipeline(train: pd.DataFrame) -> tuple:
"""
Run all feature building steps in order.

Args:
train: training dataframe
Returns:
tuple: user_item_matrix, user_map, item_map
tuple: user_item_matrix, user_map, item_map, user_means, normalized_matrix
"""
logger.info("=== START: FEATURE BUILD PIPELINE ===")

# build matrix
logger.info("Building user-item matrix...")
user_item_matrix, user_map, item_map = build_user_item_matrix(train)

# normalize matrix
logger.info("Normalizing matrix...")
normalized_matrix, user_means = normalize_matrix(user_item_matrix)

# save to disk
logger.info("Saving features to disk...")
save_features(user_item_matrix, user_map, item_map)
save_normalized_matrix(normalized_matrix, user_means)

logger.info("Feature pipeline completed successfully.")
logger.info("=== END: FEATURE BUILD PIPELINE ===")

return user_item_matrix, user_map, item_map
return user_item_matrix, user_map, item_map, user_means, normalized_matrix
Loading