Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 18 additions & 4 deletions dpnegf/negf/lead_property.py
Original file line number Diff line number Diff line change
Expand Up @@ -517,7 +517,7 @@ def _estimate_worker_memory(lead_L, lead_R, kpoint=None, temp_allocation_factor=
return total_estimate


def _get_safe_n_jobs(lead_L, lead_R, requested_n_jobs=-1, max_memory_fraction=0.7, min_workers=1, kpoint=None):
def _get_safe_n_jobs(lead_L, lead_R, requested_n_jobs=-1, max_memory_fraction=0.7, min_workers=1, kpoint=None, n_cpus=None):
"""
Calculate safe number of parallel workers based on available system memory.

Expand All @@ -533,6 +533,9 @@ def _get_safe_n_jobs(lead_L, lead_R, requested_n_jobs=-1, max_memory_fraction=0.
Minimum number of workers to use. Default 1.
kpoint : array-like, optional
A sample k-point for fetching Hamiltonian matrices to estimate memory.
n_cpus : int or None, optional
Optional upper bound on CPU cores to consider. If provided, caps the
available CPU count before memory-based limiting.

Returns
-------
Expand All @@ -543,6 +546,11 @@ def _get_safe_n_jobs(lead_L, lead_R, requested_n_jobs=-1, max_memory_fraction=0.
if cpu_count is None or cpu_count < 1:
cpu_count = 1
log.warning("os.cpu_count() returned None or invalid value. Defaulting to 1 CPU core.")
if n_cpus is not None:
if isinstance(n_cpus, int) and n_cpus > 0:
cpu_count = min(cpu_count, n_cpus)
else:
log.warning(f"Requested n_cpus={n_cpus} is not a positive integer. Ignoring.")

available_memory = psutil.virtual_memory().available
memory_per_worker = _estimate_worker_memory(lead_L, lead_R, kpoint=kpoint)
Expand Down Expand Up @@ -592,7 +600,7 @@ def _get_safe_n_jobs(lead_L, lead_R, requested_n_jobs=-1, max_memory_fraction=0.


def compute_all_self_energy(eta, lead_L, lead_R, kpoints_grid, energy_grid,
self_energy_save_path=None, n_jobs=-1, batch_size=200):
self_energy_save_path=None, n_jobs=-1, batch_size=200, n_cpus=None):
"""
Computes and saves self-energy matrices for all combinations of k-points and energy values
for left and right leads.
Expand All @@ -615,9 +623,15 @@ def compute_all_self_energy(eta, lead_L, lead_R, kpoints_grid, energy_grid,
self_energy_save_path : str or None, optional
Directory to save self-energy files. If None, uses lead_L's results_path.
n_jobs : int, optional
Number of parallel jobs to use. Default is -1 (use all available CPUs).
Requested number of parallel jobs. Default is -1 (auto-detect). The
final worker count may be capped by ``n_cpus`` and reduced further by
memory-based limiting.
batch_size : int, optional
Number of (k, e) tasks per parallel batch. Default is 200.
n_cpus : int or None, optional
Optional upper bound on CPU cores to consider when auto-detecting or
capping ``n_jobs``. If None, uses all available CPUs. Memory-based
limiting may still reduce the final worker count.

Returns
-------
Expand All @@ -633,7 +647,7 @@ def compute_all_self_energy(eta, lead_L, lead_R, kpoints_grid, energy_grid,
# Calculate safe number of workers based on available memory
# Use first k-point for memory estimation
sample_kpoint = kpoints_grid[0] if len(kpoints_grid) > 0 else None
safe_n_jobs = _get_safe_n_jobs(lead_L, lead_R, requested_n_jobs=n_jobs, kpoint=sample_kpoint)
safe_n_jobs = _get_safe_n_jobs(lead_L, lead_R, requested_n_jobs=n_jobs, kpoint=sample_kpoint, n_cpus=n_cpus)
if n_jobs == -1:
log.info(f"Auto-detected safe n_jobs={safe_n_jobs} based on available memory")
elif safe_n_jobs < n_jobs:
Expand Down
14 changes: 14 additions & 0 deletions dpnegf/tests/test_auto_memory.py
Original file line number Diff line number Diff line change
Expand Up @@ -358,6 +358,20 @@ def test_capped_by_cpu_count(self, mock_os, mock_psutil):
# Should be capped at CPU count
assert result <= 4

@patch('dpnegf.negf.lead_property.psutil')
@patch('dpnegf.negf.lead_property.os')
def test_respects_n_cpus_cap(self, mock_os, mock_psutil):
"""Test that n_cpus caps available CPU count."""
mock_os.cpu_count.return_value = 8
mock_psutil.virtual_memory.return_value = Mock(available=128 * 1024**3)

lead_L = MockLead("lead_L", matrix_size=10)
lead_R = MockLead("lead_R", matrix_size=10)

result = _get_safe_n_jobs(lead_L, lead_R, requested_n_jobs=-1, n_cpus=2)

assert result == 2


# =============================================================================
# Integration tests
Expand Down