Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
152 changes: 148 additions & 4 deletions dpnegf/negf/lead_property.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
from joblib import Parallel, delayed
import h5py
import glob
import psutil


log = logging.getLogger(__name__)
Expand Down Expand Up @@ -452,10 +453,145 @@ def fermi_dirac(self, x) -> torch.Tensor:
@property
def gamma(self):
return self.sigmaLR2Gamma(self.se)



def compute_all_self_energy(eta, lead_L, lead_R, kpoints_grid, energy_grid,
def _estimate_worker_memory(lead_L, lead_R, kpoint=None, temp_allocation_factor=3.0):
"""
Estimate memory (in bytes) needed per joblib worker for self-energy calculation.

The estimation separates two components:
1. Base overhead: Fixed memory for Python process and imported libraries
(Python interpreter, NumPy, SciPy, PyTorch, DeePTB, etc.)
2. Computation memory: Dynamic memory for matrices and intermediate calculations,
scaled by a factor to account for temporary allocations during surface Green's
function iteration, matrix products, and LAPACK/BLAS workspace.

Parameters
----------
lead_L, lead_R : LeadProperty
Lead objects containing Hamiltonian data.
kpoint : array-like, optional
A sample k-point to use for fetching Hamiltonian matrices. If None, uses [0, 0, 0].
temp_allocation_factor : float
Multiplier for computation memory to account for intermediate arrays created
during surface Green's function iteration and matrix operations. Default 3.0.

Returns
-------
int
Estimated memory in bytes per worker.
"""
# Base overhead for Python process + libraries (interpreter, numpy, scipy, torch, etc.)
base_overhead = 300 * 1024 * 1024 # 300 MB

matrix_bytes = 0

if kpoint is None:
kpoint = [0.0, 0.0, 0.0] # use Gamma point if not provided

# Estimate from lead Hamiltonian matrices using get_hs_lead method
# Each complex128 element = 16 bytes
for lead in [lead_L, lead_R]:
try:
# get_hs_lead returns: (hL, hLL, hDL, sL, sLL, sDL)
hL, hLL, hDL, sL, sLL, sDL = lead.hamiltonian.get_hs_lead(
kpoint=kpoint, tab=lead.tab, v=lead.voltage
)
# Sum up memory for all matrices
for tensor in [hL, hLL, hDL, sL, sLL, sDL]:
if tensor is not None:
if hasattr(tensor, 'numel'): # PyTorch tensor
matrix_bytes += tensor.numel() * 16 # complex128
elif hasattr(tensor, 'nbytes'): # NumPy array
matrix_bytes += tensor.nbytes
except Exception as e:
log.warning(f"Could not estimate matrix memory from {lead.tab}: {e}"
" Using fallback matrix estimate: 100 MB this lead.")
# Fallback: assume 100 MB per lead
matrix_bytes += 100 * 1024 * 1024

# Total estimate: base overhead + scaled computation memory
computation_memory = matrix_bytes * temp_allocation_factor
total_estimate = base_overhead + int(computation_memory)

return total_estimate


def _get_safe_n_jobs(lead_L, lead_R, requested_n_jobs=-1, max_memory_fraction=0.7, min_workers=1, kpoint=None):
"""
Calculate safe number of parallel workers based on available system memory.

Parameters
----------
lead_L, lead_R : LeadProperty
Lead objects for memory estimation.
requested_n_jobs : int
User-requested n_jobs. -1 means auto-detect.
max_memory_fraction : float
Maximum fraction of available memory to use. Default 0.7.
min_workers : int
Minimum number of workers to use. Default 1.
kpoint : array-like, optional
A sample k-point for fetching Hamiltonian matrices to estimate memory.

Returns
-------
int
Safe number of parallel workers.
"""
cpu_count = os.cpu_count()
if cpu_count is None or cpu_count < 1:
cpu_count = 1
log.warning("os.cpu_count() returned None or invalid value. Defaulting to 1 CPU core.")

available_memory = psutil.virtual_memory().available
memory_per_worker = _estimate_worker_memory(lead_L, lead_R, kpoint=kpoint)

# Calculate max workers that fit in available memory
if memory_per_worker <= 0:
log.warning(f"Memory estimation returned non-positive value. Using min_workers={min_workers}.")
return min_workers

# Calculate max workers that fit in available memory
max_workers_by_memory = int((available_memory * max_memory_fraction) / memory_per_worker)
max_workers_by_memory = int((available_memory * max_memory_fraction) / memory_per_worker)
max_workers_by_memory = max(max_workers_by_memory, min_workers)

# Cap by CPU count
max_workers = min(max_workers_by_memory, cpu_count)

safe_n_worker = 0
# check requested_n_jobs is a number
if not isinstance(requested_n_jobs, int):
log.warning(f"Requested n_jobs={requested_n_jobs} is not an integer. \n"
f"Using min_workers={min_workers}.")
safe_n_worker = min_workers

if requested_n_jobs == -1:
safe_n_worker = max_workers
elif requested_n_jobs == 0:
log.warning(f"Requested n_jobs=0 is invalid. Using min_workers={min_workers}.")
safe_n_worker = min_workers
elif requested_n_jobs > 0:
if requested_n_jobs > max_workers:
log.warning(f"Requested n_jobs={requested_n_jobs} may exceed available memory. "
f"Limiting to {max_workers} workers "
f"(available: {available_memory / 1e9:.1f} GB, "
f"est. per worker: {memory_per_worker / 1e9:.1f} GB)")
safe_n_worker = max_workers
else:
safe_n_worker = requested_n_jobs
else:
# Negative values other than -1: joblib interprets as (cpu_count + 1 + n_jobs)
effective_n_jobs = max(cpu_count + 1 + requested_n_jobs, min_workers)
safe_n_worker = min(effective_n_jobs, max_workers)

log.info(f"Estimated safe n_jobs={safe_n_worker} based on available memory.")
return safe_n_worker
Comment on lines +520 to +590
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor

Fix duplicated max_workers_by_memory assignment and make non-integer requested_n_jobs handling consistent

  • Lines 556–557 compute max_workers_by_memory twice with the same expression; one of these assignments can be removed with no change in behavior.
  • The non-integer requested_n_jobs path (Lines 565–568) sets safe_n_worker = min_workers but then continues into the later conditionals, which can overwrite that value and may return a non-int worker count. That contradicts the docstring return type and can make downstream usage harder to reason about.
  • It would be clearer to either:
    • return min_workers immediately after logging for non‑integer requested_n_jobs, or
    • coerce once via requested_n_jobs = int(requested_n_jobs) and document that behavior.
  • To enforce the contract, you could also cast once at the end, e.g. return int(safe_n_worker).
🤖 Prompt for AI Agents
In dpnegf/negf/lead_property.py around lines 520–590, remove the duplicated
assignment to max_workers_by_memory (lines 556–557) so it is computed only once;
for non-integer requested_n_jobs, after logging immediately return min_workers
(do not continue into later branches) to avoid overwriting the value or
returning a non-int; and finally ensure the function returns an int by casting
safe_n_worker to int (e.g., return int(safe_n_worker)) before returning.




def compute_all_self_energy(eta, lead_L, lead_R, kpoints_grid, energy_grid,
self_energy_save_path=None, n_jobs=-1, batch_size=200):
"""
Computes and saves self-energy matrices for all combinations of k-points and energy values
Expand Down Expand Up @@ -494,17 +630,25 @@ def compute_all_self_energy(eta, lead_L, lead_R, kpoints_grid, energy_grid,
"Self energy files will be saved in lead_L's results_path.")
self_energy_save_path = os.path.join(lead_L.results_path, "self_energy")

# Calculate safe number of workers based on available memory
# Use first k-point for memory estimation
sample_kpoint = kpoints_grid[0] if len(kpoints_grid) > 0 else None
safe_n_jobs = _get_safe_n_jobs(lead_L, lead_R, requested_n_jobs=n_jobs, kpoint=sample_kpoint)
if n_jobs == -1:
log.info(f"Auto-detected safe n_jobs={safe_n_jobs} based on available memory")
elif safe_n_jobs < n_jobs:
log.info(f"Adjusted n_jobs from {n_jobs} to {safe_n_jobs} due to memory constraints")

total_tasks = [(k, e) for k in kpoints_grid for e in energy_grid]
if len(total_tasks) <= batch_size:
Parallel(n_jobs=n_jobs, backend="loky")(
Parallel(n_jobs=safe_n_jobs, backend="loky")(
delayed(self_energy_worker)(k, e, eta, lead_L, lead_R, self_energy_save_path)
for k, e in total_tasks
)
else:
for i in range(0, len(total_tasks), batch_size):
batch = total_tasks[i:i+batch_size]
Parallel(n_jobs=n_jobs, backend="loky")(
Parallel(n_jobs=safe_n_jobs, backend="loky")(
delayed(self_energy_worker)(k, e, eta, lead_L, lead_R, self_energy_save_path)
for k, e in batch
)
Expand Down
Loading