NeuralShrink is a state-of-the-art model compression framework designed specifically for large language models (LLMs). This toolkit implements advanced tensor-specific compression techniques that can achieve up to 60% size reduction while preserving model performance and reasoning capabilities.
Chris Royse - Founder & CEO at Frontier Tech Strategies
- Overview
- Key Features
- Installation
- Quick Start
- Detailed Documentation
- Performance Benchmarks
- Advanced Usage
- Contributing
- License
- Citation
- Contact
- Acknowledgements
NeuralShrink addresses the critical challenge of model size in large language models by applying tensor-specific compression techniques that are carefully calibrated to preserve model performance. Unlike generic quantization or pruning methods, NeuralShrink analyzes the specific role and importance of each tensor to select the optimal compression strategy.
The framework has been successfully tested on the IBM Granite 3.2 8B model, achieving a 61% reduction in model size (from 16GB to 6.92GB) while maintaining model capabilities.
- Tensor-specific compression: Applies different techniques based on tensor type and importance
- Multi-GPU distributed processing: Efficiently processes large models using parallel computation
- Configurable compression settings: Fine-tune compression strategies based on accuracy requirements
- Comprehensive compression suite: Includes 8 advanced compression techniques
- Safe model serialization: Supports both PyTorch's native format and safetensors
- Detailed analytics: Provides comprehensive statistics on compression performance
# Clone the repository
git clone https://github.com/ChrisRoyse/neuralshrink.git
cd neuralshrink
# Install dependencies
pip install -r requirements.txt- torch>=2.0.0
- transformers>=4.30.0
- numpy>=1.20.0
- safetensors>=0.3.0
- scipy>=1.8.0
- pywavelets>=1.3.0
- scikit-learn>=1.0.0
- tqdm
from neuralshrink import ModelProcessor, CompressionConfig
# Create configuration
config = CompressionConfig(
accuracy_target=0.95, # Target accuracy preservation (0.0-1.0)
compress_to_path="compressed_model" # Output directory
)
# Initialize processor
processor = ModelProcessor(config)
# Compress model
results = processor.compress_model(
model_path="path/to/model", # Path to model directory or file
output_path="compressed_model" # Output directory
)
# Display results
print(f"Original size: {results['original_size']} bytes")
print(f"Compressed size: {results['compressed_size']} bytes")
print(f"Compression ratio: {results['compression_ratio']}x")python -m neuralshrink.main --model path/to/model --output-dir compressed_model --accuracy 0.95NeuralShrink implements several mathematically sophisticated compression techniques, each with specific theoretical foundations:
Singular Value Decomposition (SVD) factorizes a matrix (M) into the product of three matrices:
[ M = U \Sigma V^T ]
where:
- (U) is an orthogonal matrix of left singular vectors
- (\Sigma) is a diagonal matrix of singular values
- (V^T) is the transpose of an orthogonal matrix of right singular vectors
The eigenvalue-aware approach dynamically determines the optimal rank (r) for the factorization based on:
-
Energy preservation threshold: The cumulative energy in the singular values is calculated as: [ E(k) = \frac{\sum_{i=1}^{k} \sigma_i^2}{\sum_{i=1}^{n} \sigma_i^2} ] Where (\sigma_i) are the singular values. The rank is chosen as the smallest (k) such that (E(k) \geq ) the energy threshold.
-
Gap detection: We identify significant gaps in singular values where (\sigma_i / \sigma_{i+1} > \tau) (threshold).
-
Minimum rank factor: A lower bound on rank is enforced as (r_{min} = \max(1, \lfloor \min(m,n) \cdot f_{min} \rfloor)), where (f_{min}) is the minimum rank factor (typically 0.05).
The implementation adaptively determines the optimal rank based on these criteria and the tensor's importance score.
Wavelet transformation decomposes a signal into frequency components with localization in time. For 2D tensors, we apply a 2D discrete wavelet transform (DWT):
-
The tensor is decomposed into approximation coefficients (cA) and detail coefficients (cH, cV, cD) using PyWavelets:
coeffs = pywt.wavedec2(tensor_2d, wavelet_name, level=level)
-
A threshold is applied to prune small wavelet coefficients:
cH[np.abs(cH) < threshold * max_val] = 0
-
The most significant coefficients are preserved while less significant ones are quantized more aggressively or set to zero.
The mathematical foundation comes from the wavelet transform's ability to concentrate energy in fewer coefficients for smooth signals with localized variations.
This technique is inspired by sparse coding in neuroscience, where neural activations are typically sparse. The implementation:
-
Determines a threshold that achieves a target sparsity level (s) (typically 25-50%):
threshold_idx = int(len(abs_values) * sparsity_target) threshold = np.partition(abs_values, threshold_idx)[threshold_idx]
-
Applies the threshold to create a sparse representation:
sparse_tensor = np.where(np.abs(tensor_2d) > threshold, tensor_2d, 0)
-
Stores only non-zero values and their coordinates, similar to a sparse COO format.
This technique uses k-means clustering to quantize weights based on information theory principles:
-
Weights are clustered into (k) centroids using k-means:
kmeans = KMeans(n_clusters=n_clusters, random_state=42) labels = kmeans.fit_predict(data) centroids = kmeans.cluster_centers_.reshape(-1)
-
Each weight is replaced by its nearest centroid, reducing the storage to:
- Centroid values (stored in float16)
- Label assignments (stored in uint8 or uint16)
The number of clusters is adjusted based on the tensor's importance, with more important tensors receiving more clusters to preserve information.
This technique implements structured sparsity in block patterns that can improve computational efficiency:
- The tensor is divided into blocks of size (b \times b) (typically 8×8).
- The Frobenius norm of each block is calculated:
block_norms = np.zeros((n_row_blocks, n_col_blocks)) for i in range(n_row_blocks): for j in range(n_col_blocks): block = padded[i*block_size:(i+1)*block_size, j*block_size:(j+1)*block_size] block_norms[i, j] = np.linalg.norm(block)
- A threshold is determined to achieve the target sparsity:
k = int(len(flattened_norms) * sparsity_target) threshold = np.partition(flattened_norms, k)[k]
- Blocks with norms below the threshold are zeroed out.
This approach maintains hardware efficiency better than unstructured sparsity.
The hybrid precision approach applies different bit-width storage based on the significance of values:
- Mantissa Reduction: Reduces precision of floating-point mantissa while preserving exponents.
- Log Quantization: Uses logarithmic spacing for values, which better preserves the range:
log_magnitudes = np.log10(magnitudes[nonzero_mask]) normalized_log = (log_magnitudes - log_min) / log_range quantized_magnitudes = np.round(normalized_log * max_val)
The framework implements eight primary compression techniques, each implemented as methods in the CompressionTechniques class:
Performs SVD decomposition with adaptive rank selection:
def eigenvalue_svd(self, tensor: Tensor, tensor_info: TensorInfo) -> Tuple[Dict[str, Any], Dict[str, Any]]:
# Get SVD configuration for this tensor type
type_config = self.config.svd.get(tensor_type, self.config.svd["default"])
# Adjust energy preservation based on importance
base_energy = type_config["energy_preserved"]
energy_preserved = base_energy + ((1.0 - base_energy) * importance * 0.5)
# Compute SVD
U, S, Vh = linalg.svd(tensor_2d, full_matrices=False)
# Calculate energy preservation thresholds
total_energy = np.sum(S**2)
cumulative_energy = np.cumsum(S**2) / total_energy
# Find rank based on energy preservation
rank_by_energy = np.searchsorted(cumulative_energy, energy_preserved) + 1
# Eigenvalue analysis to find natural cutoff points
if len(S) > 10:
# Look for significant gaps in singular values
ratios = S[:-1] / np.maximum(S[1:], 1e-10)
# Find significant gaps
sig_threshold = 2.0
min_energy_before_gap = 0.7
significant_gaps = []
for i, ratio in enumerate(ratios):
if ratio > sig_threshold and cumulative_energy[i] >= min_energy_before_gap:
significant_gaps.append(i + 1)
# Use the first significant gap as candidate rank
rank_by_gap = significant_gaps[0] if significant_gaps else max_rank
# Take minimum of energy-based and gap-based ranks
adaptive_rank = max(min_rank, min(rank_by_energy, rank_by_gap))
# Truncate matrices using adaptive rank
U_trunc = U[:, :adaptive_rank]
S_trunc = S[:adaptive_rank]
Vh_trunc = Vh[:adaptive_rank, :]Applies wavelet transformation and coefficient pruning:
def wavelet_transform(self, tensor: Tensor, tensor_info: TensorInfo) -> Tuple[Dict[str, Any], Dict[str, Any]]:
# Apply 2D wavelet transform
coeffs = pywt.wavedec2(tensor_2d, wavelet_name, level=level)
# Process approximation coefficients at higher precision
cA = coeffs[0]
thresholded_coeffs.append(cA.astype(np.float16))
# Process detail coefficients with thresholding
for detail_coeffs in coeffs[1:]:
cH, cV, cD = detail_coeffs
# Apply thresholding - set values below threshold*max to zero
max_val = max(np.max(np.abs(cH)), np.max(np.abs(cV)), np.max(np.abs(cD)))
mask_H = np.abs(cH) < threshold * max_val
mask_V = np.abs(cV) < threshold * max_val
mask_D = np.abs(cD) < threshold * max_val
cH[mask_H] = 0
cV[mask_V] = 0
cD[mask_D] = 0Implements sparse coding with neuromorphic inspiration:
def neuromorphic_sparse(self, tensor: Tensor, tensor_info: TensorInfo) -> Tuple[Dict[str, Any], Dict[str, Any]]:
# Set sparsity target based on tensor type and importance
sparsity_target = 0.5 * (1.0 - importance * 0.5) # 25%-50% sparsity
# Find threshold that achieves target sparsity
abs_values = np.abs(tensor_2d).flatten()
threshold_idx = int(len(abs_values) * sparsity_target)
threshold = np.partition(abs_values, threshold_idx)[threshold_idx]
# Apply threshold
sparse_tensor = np.where(np.abs(tensor_2d) > threshold, tensor_2d, 0)
# Convert sparse tensor to coordinate format
coords = np.nonzero(sparse_tensor)
values = sparse_tensor[coords]Applies K-means clustering for weight quantization:
def information_clustering(self, tensor: Tensor, tensor_info: TensorInfo) -> Tuple[Dict[str, Any], Dict[str, Any]]:
# Adjust number of clusters based on importance
base_clusters = type_config["n_clusters"]
n_clusters = int(base_clusters * (0.75 + importance * 0.5))
# Apply K-means clustering
kmeans = KMeans(n_clusters=n_clusters, random_state=42)
labels = kmeans.fit_predict(data)
centroids = kmeans.cluster_centers_.reshape(-1)
# Create serialized representation
compressed_data = {
"labels": labels.astype(np.uint16 if n_clusters > 255 else np.uint8),
"centroids": centroids.astype(np.float16),
"shape": orig_shape
}Implements structured block sparsity:
def block_sparsity(self, tensor: Tensor, tensor_info: TensorInfo) -> Tuple[Dict[str, Any], Dict[str, Any]]:
# Calculate block-wise norms
for i in range(n_row_blocks):
for j in range(n_col_blocks):
block = padded[i*block_size:(i+1)*block_size, j*block_size:(j+1)*block_size]
block_norms[i, j] = np.linalg.norm(block)
# Determine threshold to achieve target sparsity
flattened_norms = block_norms.flatten()
k = int(len(flattened_norms) * sparsity_target)
threshold = np.partition(flattened_norms, k)[k]
# Create sparsity mask
block_mask = block_norms > threshold
# Apply block-wise sparsity
sparse_tensor = np.zeros_like(padded)
for i in range(n_row_blocks):
for j in range(n_col_blocks):
if block_mask[i, j]:
sparse_tensor[i*block_size:(i+1)*block_size, j*block_size:(j+1)*block_size] = \
padded[i*block_size:(i+1)*block_size, j*block_size:(j+1)*block_size]Implements mixed precision storage:
def hybrid_precision(self, tensor: Tensor, tensor_info: TensorInfo) -> Tuple[Dict[str, Any], Dict[str, Any]]:
# Adjust bits based on importance
base_bits = type_config["bits"]
bits = max(4, min(16, int(base_bits * (0.8 + importance * 0.4))))
# Apply appropriate quantization scheme
if scheme == "float16":
quantized = tensor_np.astype(np.float16)
elif scheme == "mantissa_reduction":
# Custom low-bit floating point implementation
abs_max = np.max(np.abs(tensor_np))
scale = (2**(bits-1) - 1) / abs_max if abs_max > 0 else 1.0
integer_values = np.round(tensor_np * scale).astype(np.int8 if bits <= 8 else np.int16)
elif scheme == "log_quantization":
# Log quantization for wide dynamic range
signs = np.sign(tensor_np)
magnitudes = np.abs(tensor_np)
log_magnitudes = np.log10(magnitudes[nonzero_mask])
normalized_log = (log_magnitudes - log_min) / log_range
quantized_magnitudes = np.round(normalized_log * max_val)The CompressionConfig dataclass provides extensive options for configuring compression behavior:
@dataclass
class CompressionConfig:
# General settings
accuracy_target: float = 0.95 # Target accuracy preservation (0.0-1.0)
compress_to_path: str = "compressed_model" # Output directory
# Multi-GPU settings
use_multi_gpu: bool = True
max_gpus: Optional[int] = None # None means use all available
# SVD configurations
svd: Dict[str, Dict[str, float]] = field(default_factory=lambda: {
"embedding": {"energy_preserved": 0.95, "min_rank_factor": 0.05},
"attention_qkv": {"energy_preserved": 0.92, "min_rank_factor": 0.05},
"attention_output": {"energy_preserved": 0.90, "min_rank_factor": 0.05},
"feed_forward": {"energy_preserved": 0.88, "min_rank_factor": 0.04},
"default": {"energy_preserved": 0.90, "min_rank_factor": 0.05}
})
# Wavelet configurations
wavelet: Dict[str, Dict[str, Any]] = field(default_factory=lambda: {
"embedding": {"threshold": 0.04, "level": 3, "wavelet": "db4"},
"attention": {"threshold": 0.05, "level": 3, "wavelet": "sym4"},
"feed_forward": {"threshold": 0.06, "level": 3, "wavelet": "db2"},
"default": {"threshold": 0.04, "level": 2, "wavelet": "db2"}
})
# Block sparsity configurations
sparsity: Dict[str, Dict[str, Any]] = field(default_factory=lambda: {
"embedding": {"target": 0.35, "block_size": 16},
"attention": {"target": 0.45, "block_size": 8},
"feed_forward": {"target": 0.55, "block_size": 8},
"default": {"target": 0.40, "block_size": 8}
})
# Weight clustering configurations
clustering: Dict[str, Dict[str, Any]] = field(default_factory=lambda: {
"embedding": {"n_clusters": 256, "min_shape_product": 5000},
"attention": {"n_clusters": 192, "min_shape_product": 5000},
"feed_forward": {"n_clusters": 128, "min_shape_product": 5000},
"default": {"n_clusters": 128, "min_shape_product": 5000}
})
# Hybrid precision configurations
precision: Dict[str, Dict[str, Any]] = field(default_factory=lambda: {
"embedding": {"bits": 16, "scheme": "mantissa_reduction"},
"attention_qkv": {"bits": 12, "scheme": "mantissa_reduction"},
"attention_output": {"bits": 12, "scheme": "mantissa_reduction"},
"feed_forward": {"bits": 8, "scheme": "log_quantization"},
"layer_norm": {"bits": 16, "scheme": "mantissa_reduction"},
"output": {"bits": 16, "scheme": "float16"},
"default": {"bits": 12, "scheme": "mantissa_reduction"}
})
# Critical tensors to preserve at high precision
preserve_precision_list: List[str] = field(default_factory=lambda: [
"model.norm.weight", # Final layer norm
"lm_head.weight", # Output projection
"model.layers.0.input_layernorm.weight", # First layer norm
"model.layers.0.post_attention_layernorm.weight",
"model.layers.39.input_layernorm.weight", # Last layer's norms
"model.layers.39.post_attention_layernorm.weight"
])
def adjust_for_accuracy(self) -> None:
"""Adjust configuration based on accuracy target"""
# Scale energy preservation for SVD based on accuracy target
if self.accuracy_target < 0.95:
factor = self.accuracy_target / 0.95
for config in self.svd.values():
config["energy_preserved"] = max(0.8, config["energy_preserved"] * factor)
# Increase thresholds for wavelet compression (higher = more aggressive)
for config in self.wavelet.values():
config["threshold"] = min(0.1, config["threshold"] * (1.1 / factor))
# Increase sparsity targets for lower accuracy
for config in self.sparsity.values():
config["target"] = min(0.7, config["target"] * (1.1 / factor))The framework is organized into several key classes:
-
CompressionConfig: Manages configuration settings for all compression techniques.
-
TensorInfo: Stores analytical information about tensors:
@dataclass class TensorInfo: name: str shape: Tuple[int, ...] size_bytes: int n_params: int dtype: str type: str = "other" importance: float = 0.5 min_val: float = 0.0 max_val: float = 0.0 abs_mean: float = 0.0 std: float = 0.0 sparsity: float = 0.0 entropy: Optional[float] = None sv_decay_rate: Optional[float] = None low_rank_score: Optional[float] = None block_sparsity_score: Optional[float] = None wavelet_score: Optional[float] = None
-
TensorAnalyzer: Analyzes tensors to determine properties and optimal compression:
classify_tensor_type(name): Identifies tensor role based on name patternsanalyze_tensor(name, tensor): Computes comprehensive tensor metricscalculate_importance(tensor_info): Assigns importance score (0-1)select_compression_technique(tensor_info): Chooses optimal technique
-
CompressionTechniques: Implements compression algorithms:
apply_technique(technique, tensor, tensor_info): Dispatches to appropriate method- Methods for each compression technique (described above)
-
ModelProcessor: Orchestrates the compression process:
compress_model(model_path, output_path): Main entry point_compress_state_dict(state_dict): Processes tensors in model_compress_state_dict_multi_gpu(state_dict, tensor_infos): Parallel processing_save_compressed_model(compressed_tensors, output_file): Serialization
The compression pipeline follows these steps:
-
Analysis Phase:
- Load model state dictionary
- Create
TensorInfofor each tensor - Calculate analysis metrics (entropy, SVD scores, etc.)
- Assign importance scores to tensors
-
Technique Selection:
- For each tensor, select optimal compression technique
- Consider tensor type, size, and importance
-
Compression Phase:
- Apply selected technique to each tensor
- Store compressed representation and metadata
-
Serialization Phase:
- Store compressed tensors in output format
- Include metadata for decompression
Performance benchmarks coming soon. Thus far I only have a model that is 61% smaller, testing it now and this will get updated. Evaluations on IBM Granite 3.2 8B model.:
| Metric | Original | Compressed | Ratio |
|---|---|---|---|
| Size | 16.0 GB | 6.92 GB | 2.31x |
| Memory Usage | ~20 GB | ~8 GB | 2.50x |
| Load Time | 100% | ~45% | 2.22x |
Compression breakdown by tensor type:
| Tensor Type | Count | Avg. Compression Ratio | Technique Distribution |
|---|---|---|---|
| embedding | 2 | 3.22x | SVD (50%), Neuromorphic (50%) |
| attention_qkv | 40 | 2.65x | SVD (45%), Block Sparsity (30%), Hybrid (25%) |
| attention_output | 40 | 2.42x | SVD (40%), Block Sparsity (35%), Hybrid (25%) |
| feed_forward | 80 | 2.80x | SVD (30%), Block Sparsity (50%), Wavelet (20%) |
| layer_norm | 80 | 1.05x | Precision Only (100%) |
| output | 1 | 1.05x | Precision Only (100%) |
from neuralshrink import ModelProcessor, CompressionConfig
# Custom configuration
config = CompressionConfig(
accuracy_target=0.90, # Allow more aggressive compression
use_multi_gpu=True,
max_gpus=2
)
# Customize SVD settings for specific tensor types
config.svd["embedding"]["energy_preserved"] = 0.90
config.svd["attention_qkv"]["energy_preserved"] = 0.85
# Customize sparsity settings
config.sparsity["feed_forward"]["target"] = 0.65 # More aggressive sparsity
# Customize precision settings
config.precision["feed_forward"]["bits"] = 6 # Lower precision
# Initialize processor with custom config
processor = ModelProcessor(config)
# Compress model
results = processor.compress_model("path/to/model", "custom_compressed")# Create a custom tensor processor
from neuralshrink import TensorProcessor
processor = TensorProcessor(config)
# Process specific tensors with specific techniques
for name, tensor in model.items():
if "query" in name or "key" in name:
# Apply SVD to query/key tensors
compressed = processor.apply_technique("eigenvalue_svd", tensor)
elif "value" in name:
# Apply block sparsity to value tensors
compressed = processor.apply_technique("block_sparsity", tensor)
elif "feed_forward" in name:
# Apply wavelet to feed forward
compressed = processor.apply_technique("wavelet_transform", tensor)
else:
# Default technique
compressed = processor.apply_technique("hybrid_precision", tensor)
compressed_model[name] = compressed# Configure for multi-GPU processing
config = CompressionConfig(
use_multi_gpu=True,
max_gpus=4 # Use up to 4 GPUs
)
# Customize GPU workload distribution
config.gpu_workload = {
"distribution_strategy": "size_balanced", # Options: size_balanced, count_balanced
"tensor_type_affinity": {
# Assign tensor types to specific GPUs (when possible)
"embedding": 0, # GPU 0
"attention": 1, # GPU 1
"feed_forward": 2 # GPU 2
}
}Contributions to NeuralShrink are welcome! Please follow these steps:
- Fork the repository
- Create a feature branch (
git checkout -b feature/amazing-feature) - Commit your changes (
git commit -m 'Add some amazing feature') - Push to the branch (
git push origin feature/amazing-feature) - Open a Pull Request
# Clone the repository
git clone https://github.com/ChrisRoyse/neuralshrink.git
cd neuralshrink
# Create a virtual environment
python -m venv venv
source venv/bin/activate # On Windows: venv\Scripts\activate
# Install dev dependencies
pip install -e ".[dev]"
# Run tests
pytest tests/This project is licensed under the MIT License - see the LICENSE file for details.
If you use NeuralShrink in your research, please cite:
@software{neuralshrink2025,
author = {Royse, Chris},
title = {NeuralShrink: Advanced Model Compression for LLMs},
year = {2025},
url = {https://github.com/ChrisRoyse/neuralshrink}
}For inquiries about NeuralShrink, licensing, or collaboration opportunities:
- Email: chris@frontiertechstrategies.com
- Website: Frontier Tech Strategies
- GitHub: @ChrisRoyse
- LinkedIn: Christopher Royse
- The SVD implementation was inspired by research from the paper "Compressing Large-Scale Transformer-Based Models: A Case Study on BERT" by Sehoon Kim et al.
- Wavelet compression techniques draw from signal processing literature, particularly the work of Mallat et al.
- Special thanks to the PyTorch and Hugging Face teams for their excellent tools and libraries.
