diff --git a/examples/distributed_speed_test.py b/examples/distributed_speed_test.py new file mode 100644 index 00000000..f759b336 --- /dev/null +++ b/examples/distributed_speed_test.py @@ -0,0 +1,46 @@ +from cdtools.tools.distributed import run_speed_test + +# Define the number of GPUs to use for the test. We always need to include +# a single GPU in the test. +# +# Here, we will run trials with 1 and 2 GPUs. +world_sizes = [1, 2] + +# We will run 3 trials per GPU to collect statistics on loss-versus-epoch/time +# data as well as runtime speedup. +runs = 3 + +# We will perform a speed test on a reconstruction script modified to run +# a speed test (see fancy_ptycho_speed_test.py) +script_path = 'fancy_ptycho_speed_test.py' + +# When we run the modified script with the speed test, a pickle dump file +# will be generated after each trial. The file contains data about loss-vs-time +# measured for the trial with one or several GPUs used. +output_dir = 'example_loss_data' + +# Define the file name prefix. The file will have the following name: +# `_nGPUs__TRIAL_.pkl` +file_prefix = 'speed_test' + +# We can plot several curves showing what the loss-versus/epoch curves look +# like for each GPU count. The plot will also show the relative runtime +# speed-up relative to the single-GPU runtime. +show_plot = True + +# We can also delete the pickle dump files after each trial run has been +# completed and stored by `run_speed_test` +delete_output_file = True + +# Run the test. This speed test will return several lists containing the +# means and standard deviations of the final recorded losses and runtime +# speed ups calculated over several trial runs. Each entry index maps to +# the GPU count specified by `world_sizes`. +final_loss_mean, final_loss_std, speed_up_mean, speed_up_std = \ + run_speed_test(world_sizes=world_sizes, + runs=runs, + script_path=script_path, + output_dir=output_dir, + file_prefix=file_prefix, + show_plot=show_plot, + delete_output_files=delete_output_file) diff --git a/examples/fancy_ptycho_speed_test.py b/examples/fancy_ptycho_speed_test.py new file mode 100644 index 00000000..94c870dd --- /dev/null +++ b/examples/fancy_ptycho_speed_test.py @@ -0,0 +1,53 @@ +import cdtools + + +# To modify fancy_ptycho.py for a multi-GPU speed test, we need to enclose the +# entire reconstruction script in a function. The function then needs to be +# decorated with cdtools.tools.distributed.report_speed_test. The decorator +# allows data to be saved and read by the multi-GPU speed test function +# which we will use to run this script. +@cdtools.tools.distributed.report_speed_test +def main(): + filename = 'example_data/lab_ptycho_data.cxi' + dataset = cdtools.datasets.Ptycho2DDataset.from_cxi(filename) + + model = cdtools.models.FancyPtycho.from_dataset( + dataset, + n_modes=3, + oversampling=2, + probe_support_radius=120, + propagation_distance=5e-3, + units='mm', + obj_view_crop=-50 + ) + + device = 'cuda' + model.to(device=device) + dataset.get_as(device=device) + + # Remove or comment out plotting existing plotting statements + for loss in model.Adam_optimize(50, dataset, lr=0.02, batch_size=40): + # Optional: ensure that only a single GPU prints a report by + # adding an if statement. Without this, the print statement will + # be called by all participating GPUs, resulting in multiple copies + # of the printed model report. + if model.rank == 0: + print(model.report()) + + for loss in model.Adam_optimize(25, dataset, lr=0.005, batch_size=40): + if model.rank == 0: + print(model.report()) + + for loss in model.Adam_optimize(25, dataset, lr=0.001, batch_size=40): + if model.rank == 0: + print(model.report()) + + model.tidy_probes() + + # We need to return the model so the data can be saved by the decorator. + return model + + +# We also need to include this if-name-main block at the end +if __name__ == '__main__': + main() diff --git a/pyproject.toml b/pyproject.toml index b7fd46a9..6ac3baa3 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -1,3 +1,6 @@ [tool.ruff] # Decrease the maximum line length to 79 characters. -line-length = 79 \ No newline at end of file +line-length = 79 + +[tool.pytest.ini_options] +testpaths = 'tests' \ No newline at end of file diff --git a/setup.py b/setup.py index ae710d77..663d75a3 100644 --- a/setup.py +++ b/setup.py @@ -47,5 +47,10 @@ "Programming Language :: Python :: 3", "Operating System :: OS Independent", ], + entry_points={ + 'console_scripts': { + 'cdt-torchrun = cdtools.tools.distributed.distributed:run_single_to_multi_gpu' + } + } ) diff --git a/src/cdtools/datasets/base.py b/src/cdtools/datasets/base.py index 3f8ec8c3..62ad3e43 100644 --- a/src/cdtools/datasets/base.py +++ b/src/cdtools/datasets/base.py @@ -19,6 +19,7 @@ import pathlib from cdtools.tools import data as cdtdata from torch.utils import data as torchdata +import os __all__ = ['CDataset'] @@ -92,6 +93,18 @@ def __init__( self.get_as(device='cpu') + # These attributes indicate to the CDataset methods whether or not + # multi-GPU calculations are being performed. These flags are mostly + # used to prevent the production of duplicate plots when CDataset.inspect + # is called. + rank = os.environ.get('RANK') + world_size = os.environ.get('WORLD_SIZE') + # Rank of the subprocess running the GPU (defauly rank 0) + self.rank = int(rank) if rank is not None else 0 + # Total number of GPUs being used. + self.world_size = int(world_size) if world_size is not None else 1 + self.multi_gpu_used = int(self.world_size) > 1 + def to(self, *args, **kwargs): """Sends the relevant data to the given device and dtype diff --git a/src/cdtools/datasets/ptycho_2d_dataset.py b/src/cdtools/datasets/ptycho_2d_dataset.py index 3825d6de..9d719275 100644 --- a/src/cdtools/datasets/ptycho_2d_dataset.py +++ b/src/cdtools/datasets/ptycho_2d_dataset.py @@ -198,6 +198,8 @@ def to_cxi(self, cxi_file): cxi_file : str, pathlib.Path, or h5py.File The .cxi file to write to """ + if self.multi_gpu_used and self.rank != 0: + return # If a bare string is passed if isinstance(cxi_file, str) or isinstance(cxi_file, pathlib.Path): @@ -230,7 +232,9 @@ def inspect( can display a base-10 log plot of the detector readout at each position. """ - + # FOR MULTI-GPU: Only run this method if it's called by the rank 0 GPU + if self.multi_gpu_used and self.rank != 0: + return def get_images(idx): inputs, output = self[idx] diff --git a/src/cdtools/models/base.py b/src/cdtools/models/base.py index df347b63..730ff7c5 100644 --- a/src/cdtools/models/base.py +++ b/src/cdtools/models/base.py @@ -29,15 +29,11 @@ """ import torch as t -from torch.utils import data as torchdata from matplotlib import pyplot as plt from matplotlib.widgets import Slider from matplotlib import ticker import numpy as np -import threading -import queue import time -from scipy import io from contextlib import contextmanager from cdtools.tools.data import nested_dict_to_h5, h5_to_nested_dict, nested_dict_to_numpy, nested_dict_to_torch from cdtools.reconstructors import AdamReconstructor, LBFGSReconstructor, SGDReconstructor @@ -65,6 +61,25 @@ def __init__(self): self.training_history = '' self.epoch = 0 + # These attributes indicate to the CDIModel methods whether or not + # multi-GPU calculations are being performed. These flags help + # trigger multi-GPU-specific function calls (i.e., all_reduce) and + # prevent redundant plots/reports/saves during multi-GPU use. + rank = os.environ.get('RANK') + world_size = os.environ.get('WORLD_SIZE') + + # Rank of the subprocess running the GPU (defauly rank 0) + self.rank = int(rank) if rank is not None else 0 + # Total number of GPUs being used. + self.world_size = int(world_size) if world_size is not None else 1 + self.multi_gpu_used = int(self.world_size) > 1 + + # Keep track of the time each loss history point was taken relative to + # the initialization of this model. + self.INITIAL_TIME = time.time() + self.loss_times = [] + + def from_dataset(self, dataset): raise NotImplementedError() @@ -197,7 +212,9 @@ def save_to_h5(self, filename, *args): *args Accepts any additional args that model.save_results needs, for this model """ - return nested_dict_to_h5(filename, self.save_results(*args)) + # FOR MULTI-GPU: Only run this method if it's called by the rank 0 GPU + if not (self.multi_gpu_used and self.rank != 0): + return nested_dict_to_h5(filename, self.save_results(*args)) @contextmanager @@ -219,12 +236,17 @@ def save_on_exit(self, filename, *args, exception_filename=None): """ try: yield - self.save_to_h5(filename, *args) - except: - if exception_filename is None: - exception_filename = filename - self.save_to_h5(exception_filename, *args) - raise + + # Only let the Rank 0 GPU handle saving in multi-GPU + if not (self.multi_gpu_used and self.rank != 0): + self.save_to_h5(filename, *args) + + except Exception as e: + if not (self.multi_gpu_used and self.rank != 0): + if exception_filename is None: + exception_filename = filename + self.save_to_h5(exception_filename, *args) + raise e @contextmanager def save_on_exception(self, filename, *args): @@ -242,13 +264,15 @@ def save_on_exception(self, filename, *args): *args Accepts any additional args that model.save_results needs, for this model """ + # FOR MULTI-GPU: Only run this method if it's called by the rank 0 GPU try: yield - except: - self.save_to_h5(filename, *args) - print('Intermediate results saved under name:') - print(filename, flush=True) - raise + except Exception as e: + if not (self.multi_gpu_used and self.rank != 0): + self.save_to_h5(filename, *args) + print('Intermediate results saved under name:') + print(filename, flush=True) + raise e def use_checkpoints(self, job_id, checkpoint_file_stem): @@ -270,6 +294,10 @@ def skip_computation(self): return False def save_checkpoint(self, *args, checkpoint_file=None): + # FOR MULTI-GPU: Only run this method if it's called by the rank 0 GPU + if self.multi_gpu_used and self.rank != 0: + return + checkpoint = self.save_results(*args) if (hasattr(self, 'current_optimizer') and self.current_optimizer is not None): @@ -578,6 +606,10 @@ def inspect(self, dataset=None, update=True): Whether to update existing plots or plot new ones """ + # FOR MULTI-GPU: Only run this method if it's called by the rank 0 GPU + if self.multi_gpu_used and self.rank != 0: + return + # We find or create all the figures first_update = False if update and hasattr(self, 'figs') and self.figs: @@ -660,7 +692,10 @@ def save_figures(self, prefix='', extension='.pdf'): extention : strategy Default is .eps, the file extension to save with. """ - + # FOR MULTI-GPU: Only run this method if it's called by the rank 0 GPU + if self.multi_gpu_used and self.rank != 0: + return + if hasattr(self, 'figs') and self.figs: figs = self.figs else: @@ -688,6 +723,10 @@ def compare(self, dataset, logarithmic=False): Whether to plot the diffraction on a logarithmic scale """ + # FOR MULTI-GPU: Only run this method if it's called by the rank 0 GPU + if self.multi_gpu_used and self.rank != 0: + return + fig, axes = plt.subplots(1,3,figsize=(12,5.3)) fig.tight_layout(rect=[0.02, 0.09, 0.98, 0.96]) axslider = plt.axes([0.15,0.06,0.75,0.03]) diff --git a/src/cdtools/reconstructors/base.py b/src/cdtools/reconstructors/base.py index 16668149..fec7b257 100644 --- a/src/cdtools/reconstructors/base.py +++ b/src/cdtools/reconstructors/base.py @@ -8,15 +8,19 @@ The subclasses of Reconstructor are required to implement their own data loaders and optimizer adjusters """ + from __future__ import annotations from typing import TYPE_CHECKING import torch as t +import torch.distributed as dist from torch.utils import data as td +from torch.utils.data.distributed import DistributedSampler import threading import queue import time from typing import List, Union +import cdtools.tools.distributed as cdtdist if TYPE_CHECKING: from cdtools.models import CDIModel @@ -94,14 +98,31 @@ def setup_dataloader(self, Optional, enable/disable shuffling of the dataset. This option is intended for diagnostic purposes and should be left as True. """ - if batch_size is not None: - self.data_loader = td.DataLoader(self.dataset, - batch_size=batch_size, - shuffle=shuffle) + if self.model.multi_gpu_used: + self.sampler = \ + DistributedSampler(self.dataset, + num_replicas=self.model.world_size, + rank=self.model.rank, + shuffle=shuffle, + drop_last=False) + + # Creating extra threads in children processes may cause problems. + # Leave num_workers at 0. + self.data_loader = \ + td.DataLoader(self.dataset, + batch_size=batch_size//self.model.world_size, + num_workers=0, + drop_last=False, + pin_memory=False, + sampler=self.sampler) else: - self.data_loader = td.Dataloader(self.dataset) + if batch_size is not None: + self.data_loader = td.DataLoader(self.dataset, + batch_size=batch_size, + shuffle=shuffle) + else: + self.data_loader = td.Dataloader(self.dataset) - def adjust_optimizer(self, **kwargs): """ Change hyperparameters for the utilized optimizer. @@ -150,6 +171,12 @@ def run_epoch(self, 'Reconstructor.run_epoch(), or use Reconstructor.optimize(), ' 'which does it automatically.' ) + + # If we're using DistributedSampler (i.e., multi-GPU useage), we need + # to tell it which epoch we're on. Otherwise data shuffling will not + # work properly + if self.model.multi_gpu_used: + self.data_loader.sampler.set_epoch(self.model.epoch) # Initialize some tracking variables @@ -201,8 +228,14 @@ def closure(): # And accumulate the gradients loss.backward() + # For multi-GPU, average and sync the gradients + losses + # across all participating GPUs. Also sum the losses. + if self.model.multi_gpu_used: + cdtdist.sync_and_avg_gradients(self.model) + dist.all_reduce(loss, op=dist.ReduceOp.SUM) + # Normalize the accumulating total loss - total_loss += loss.detach() + total_loss += loss.detach() // self.model.world_size # If we have a regularizer, we can calculate it separately, # and the gradients will add to the minibatch gradient @@ -212,6 +245,11 @@ def closure(): loss = self.model.regularizer(regularization_factor) loss.backward() + # For multi-GPU optimization, average and sync the + # gradients + losses across all participating GPUs. + if self.model.multi_gpu_used: + cdtdist.sync_and_avg_gradients(self.model) + return total_loss # This takes the step for this minibatch @@ -223,7 +261,18 @@ def closure(): if self.scheduler is not None: self.scheduler.step(loss) + # Broadcast the learning rate based on Rank 0 for multi-GPU + if self.model.multi_gpu_used: + for param_group in self.optimizer.param_groups: + # Make sure we broadcase over whatever device type + # we're using. Only tested over cuda. + lr_tensor = t.tensor(param_group['lr'], + device=self.model.obj.device) + dist.broadcast(lr_tensor, src=0) + param_group['lr'] = lr_tensor.item() + self.model.loss_history.append(loss) + self.model.loss_times.append(time.time() - self.model.INITIAL_TIME) self.model.epoch = len(self.model.loss_history) self.model.latest_iteration_time = time.time() - t0 self.model.training_history += self.model.report() + '\n' diff --git a/src/cdtools/tools/distributed/__init__.py b/src/cdtools/tools/distributed/__init__.py new file mode 100644 index 00000000..f2380d39 --- /dev/null +++ b/src/cdtools/tools/distributed/__init__.py @@ -0,0 +1,2 @@ +from cdtools.tools.distributed.distributed import * +from cdtools.tools.distributed.distributed import __all__, __doc__ diff --git a/src/cdtools/tools/distributed/distributed.py b/src/cdtools/tools/distributed/distributed.py new file mode 100644 index 00000000..d956343b --- /dev/null +++ b/src/cdtools/tools/distributed/distributed.py @@ -0,0 +1,664 @@ +"""Contains functions to make reconstruction scripts compatible +with multi-GPU distributive approaches in PyTorch. + +Multi-GPU computing here is based on distributed data parallelism, where +each GPU is given identical copies of a model and performs optimization +using different parts of the dataset. After the parameter gradients +are calculated (`loss.backwards()`) on each GPU, the gradients need to be +synchronized and averaged across all participating GPUs. + +The functions in this module assist with gradient synchronization, +setting up conditions necessary to perform distributive computing, and +executing multi-GPU jobs. +""" +import torch as t +import torch.distributed as dist +import datetime +import os +import subprocess +import argparse +import runpy +from ast import literal_eval +from matplotlib import pyplot as plt +import pickle +import random +from typing import Callable, Tuple, List +from pathlib import Path +from cdtools.models import CDIModel + +DISTRIBUTED_PATH = os.path.dirname(os.path.abspath(__file__)) +MIN_INT64 = t.iinfo(t.int64).min +MAX_INT64 = t.iinfo(t.int64).max + +__all__ = ['sync_and_avg_gradients', + 'run_single_to_multi_gpu', + 'run_single_gpu_script', + 'report_speed_test', + 'run_speed_test'] + + +def sync_and_avg_gradients(model: CDIModel): + """ + Synchronizes the average of the model parameter gradients across all + participating GPUs using all_reduce. + + Parameters: + model: CDIModel + Model for CDI/ptychography reconstruction + """ + for param in model.parameters(): + if param.requires_grad: + dist.all_reduce(param.grad.data, op=dist.ReduceOp.SUM) + param.grad.data /= model.world_size + + +def run_single_gpu_script(script_path: str, + backend: str = 'nccl', + timeout: int = 30, + nccl_p2p_disable: bool = True, + seed: int = None): + """ + Wraps single-GPU reconstruction scripts to be ran as a multi-GPU job via + torchrun calls. + + `run_single_gpu_script` is intended to be called within a script + (e.g., cdtools.tools.distributed.single_to_multi_gpu) with the following + form: + + ``` + # multi_gpu_job.py + import cdtools.tools.distributed as dist + if __name__ == '__main__': + dist.run_single_to_multi_gpu(script_path='YOUR_RECONSTRUCTION_SCRIPT.py', + backend='nccl', + timeout=30, + nccl_p2p_disable=True) + ``` + + `torchrun` should then be used to run this script as a single-node, + multi-gpu job through the command line interface using, for instance: + + ``` + torchrun + --standalone + --nnodes=1 + --nproc_per_node=$nGPUs + multi_gpu_job.py + ``` + + If you want to use specific GPU IDs for reconstructions, you need to set up + the environment variable `CDTOOLS_GPU_IDS` rather than + `CUDA_VISIBLE_DEVICES`. If you wanted to use GPU IDs `1, 3, 4` for example, + write: + + ``` + CDTOOLS_GPU_IDS=1,3,4 torchrun + --standalone + --nnodes=1 + --nproc_per_node=$nGPUs + multi_gpu_job.py + ``` + + `torchrun` will spawn a number of subprocesses equal to the number of GPUs + specified (--nproc_per_node). Each subprocess will run the specified + script (e.g., `multi_gpu_job` in the above example) which make a call to + `run_single_gpu_script`. + + `run_single_gpu_script` will first set up process group (lets the + different subprocesses and their respective GPUs communicate with each + other) and environment variables necessary for multi-GPU jobs. Afterwards, + each subprocess runs the single-GPU reconstruction script (e.g., + `YOUR_RECONSTRUCTION_SCRIPT.py` in the above example). Methods within + the `cdtools.Reconstructor` class/subclasses handle gradient + synchronization after backpropagation (loss.backward()) as well as + distributive data shuffling/loading. + + + NOTE: + 1) This method is indended to be called within a subprocess spawned + by `torchrun`. + 2) For each subprocess `torchrun` creates, the environment variable + `CUDA_VISIBLE_DEVICES` will be (re)defined based on the GPU rank + or the GPU ID list if `CDTOOLS_GPU_IDS` is defined. The + environment variable `NCCL_P2P_DISABLE` will also be (re)defined + based on `nccl_p2p_disable`. + 3) This method has only been tested using the `nccl` backend on a + single node, with `nccl_p2p_disable` set to `True`. + + Parameters: + script_name: str + The file path of the single-GPU script (either full or relative). + If you're using a relative path, make sure the string doesn't start + with a backslash. + backend: str + Multi-gpu communication backend to use. Default is the 'nccl' + backend, which is the only supported backend for CDTools. + See https://pytorch.org/docs/stable/distributed.html for + additional info about PyTorch-supported backends. + timeout: int + Timeout for operations executed against the process group in + seconds. Default is 30 seconds. After timeout has been reached, + all subprocesses will be aborted and the process calling this + method will crash. + nccl_p2p_disable: bool + Disable NCCL peer-2-peer communication. If you find that all your + GPUs are at 100% usage but the program isn't doing anything, try + enabling this variable. + seed: int + Seed for generating random numbers. + + Environment variables created/redefined: + `NCCL_P2P_DISABLE`: Enables or disables NCCL peer-to-peer communication + defined by `nccl_p2p_disable`. + `CUDA_VISIBLE_DEVICES`: The GPU IDs visible to each subprocess. For + each subprocess, this variable is set to the GPU ID the subprocess + has been assigned. + """ + + # Check if the file path actually exists before starting the process group + if not os.path.exists(script_path): + raise FileNotFoundError('Cannot open file: ' + + f'{os.path.join(os.getcwd(), script_path)}') + + # Enable/disable NCCL peer-to-peer communication. The boolean needs to be + # converted into a string for the environment variable. + os.environ['NCCL_P2P_DISABLE'] = str(int(nccl_p2p_disable)) + + """Force each subprocess to see only the GPU ID we assign it + Why do this? If this constraint is not imposed, then calling all_reduce + will cause all subprocess Ranks to occupy memory on both their own + respective GPUs (normal) as well as Rank 0's GPU (not intended behavior). + The root cause is not entirely clear but there are two ways to avoid + this behavior empirically: + 1) Force each subprocesses' CUDA_VISIBLE_DEVICE to be their assigned + GPU ids. + 2) Within the reconstruction script, change `device='cuda'` to + `device=f'cuda{model.rank}'` + + Option 1 is chosen here to use single-GPU reconstruction scripts AS-IS + for multi-GPU jobs. + """ + # The GPU rank and world_size is visible as an environment variable + # through torchrun calls. + rank = int(os.environ.get('RANK')) + world_size = int(os.environ.get('WORLD_SIZE')) + + # If the CDTOOLS_GPU_IDS environment variable is defined, then assign based + # on the GPU IDS provided in that list. Otherwise, use the rank for the + # GPU ID. + gpu_ids = os.environ.get('CDTOOLS_GPU_IDS') + + if gpu_ids is None: + gpu_id = rank + else: + gpu_id = literal_eval(gpu_ids)[rank] + + os.environ['CUDA_VISIBLE_DEVICES'] = str(gpu_id) + + if rank == 0: + print('[INFO]: Starting up multi-GPU reconstructions with ' + + f'{world_size} GPUs.') + + # Start up the process group (lets the different subprocesses talk with + # each other) + dist.init_process_group(backend=backend, + timeout=datetime.timedelta(seconds=timeout)) + + # Run the script + try: + # Force all subprocesses to either use the pre-specified or Rank 0's + # RNG seed + if seed is None: + seed_local = t.tensor(random.randint(MIN_INT64, MAX_INT64), + device='cuda', + dtype=t.int64) + dist.broadcast(seed_local, 0) + seed = seed_local.item() + + t.manual_seed(seed) + + runpy.run_path(script_path, run_name='__main__') + + if rank == 0: + print('[INFO]: Reconstructions complete.' + + ' Terminating process group.') + + finally: + dist.destroy_process_group() + if rank == 0: + print('[INFO]: Process group terminated. Multi-GPU job complete.') + + +def run_single_to_multi_gpu(): + """ + Runs a single-GPU reconstruction script as a single-node multi-GPU job via + torchrun. + + This convienience function can be executed as a python console script as + `cdt-torchrun` and serves as a wrapper over a `torchrun` call to + `cdtools.tools.distributed.single_to_multi_gpu`. + + In the simplest case, a reconstruction script can be ran as a multi-GPU job + (with `nGPU` number of GPUs) using the following `cdt-torchrun` call in + the command line interface: + ``` + cdt-torchrun + --ngpus= + YOUR_RECONSTRUCTION_SCRIPT.py + ``` + + which is equivalent to the following `torchrun` call + ``` + torchrun + --standalone + --nnodes=1 + --nproc_per_node=$nGPUs + -m cdtools.tools.distributed.single_to_multi_gpu + --backend=nccl + --timeout=30 + --nccl_p2p_disable=1 + YOUR_RECONSTRUCTION_SCRIPT.py + ``` + + Within a single node, `cdt-torchrun` will launch a given number of + subprocesses equivalent to the number of GPUs specified. This number must + be less than or equal to the actual number of GPUs available on your node. + + If you want to use specific GPU IDs for reconstructions, you need to set up + the environment variable `CDTOOLS_GPU_IDS` rather than + `CUDA_VISIBLE_DEVICES`. If you wanted to use GPU IDs `1, 3, 4` for example, + write: + + ``` + CDTOOLS_GPU_IDS=1,3,4 cdt-torchrun + --ngpus=3 + YOUR_RECONSTRUCTION_SCRIPT.py + ``` + + If additional `torchrun` arguments need to be passed, consider making + a direct `torchrun` call rather than use `cdt-torchrun`. + + NOTE: + 1) This method has only been tested using the `nccl` backend on a + single node, with `nccl_p2p_disable` set to `True`. + + Arguments: + script_path: str + Path of the single-GPU script (either full or partial path). + --ngpus: int + Number of GPUs to use. + --nnodes: int + Optional, number of nodes. Default 1; more than 1 nodes has not + been tested. + --backend: str + Optional, communication backend for distributed computing (either + `nccl` or `gloo`). + Default is `nccl` + --timeout: int + Optional, time in seconds before the distributed process is killed. + Default is 30 seconds. + --nccl_p2p_disable: int + Optional, disable (1) or enable (0) NCCL peer-to-peer + communication. Default is 1. + + """ + # Define the arguments we need to pass to dist.script_wrapper + parser = argparse.ArgumentParser() + + parser.add_argument('--ngpus', + type=int, + help='Number of GPUs to use.') + parser.add_argument('--nnodes', + type=str, + default=1, + help='Number of participating nodes.') + parser.add_argument('--backend', + type=str, + default='nccl', + choices=['nccl', 'gloo'], + help='Communication backend (nccl or gloo)') + parser.add_argument('--timeout', + type=int, + default=30, + help='Time before process is killed in seconds') + parser.add_argument('--nccl_p2p_disable', + type=int, + default=1, + choices=[0, 1], + help='Disable (1) or enable (0) NCCL peer-to-peer' + + 'communication') + parser.add_argument('script_path', + type=str, + help='Single GPU script file name (with or without ' + + '.py extension)') + + # Get the arguments + args = parser.parse_args() + + # Perform the torchrun call of the wrapped function + subprocess.run(['torchrun', + '--standalone', + f'--nnodes={args.nnodes}', + f'--nproc_per_node={args.ngpus}', + '-m', + 'cdtools.tools.distributed.single_to_multi_gpu', + f'--backend={args.backend}', + f'--timeout={args.timeout}', + f'--nccl_p2p_disable={args.nccl_p2p_disable}', + f'{args.script_path}']) + + +def report_speed_test(func: Callable): + """ + Decorator function which saves the loss-versus-time/epoch history of a + reconstruction script as a pickle dump file in a specified directory. + + The entire reconstruction script (excluding import statements) must + be wrapped by a function which returns the model. The script must also + have an if-name-main block to call the wrapped script. + + This decorator is intended to only be used by reconstruction scripts + that are called by `run_speed_test` to conduct multi-GPU performance + studies (loss-versus-time/epoch and runtime speed-ups) using `N` GPUs + and `M` trials per GPU count. `run_speed_test` sets several environment + variables specifing the name and directory of the result files to be saved. + + Parameters: + func: Callable + The entire reconstruction script wrapped in a function. Within the + script, the function must be called with an if-name-main statement. + Additionally, the function must return the reconstructed model. + + Expected environment variables: + `CDTOOLS_TRIAL_NUMBER`: The test trial number + `CDTOOLS_SPEED_TEST_RESULTS_DIR`: Directory to save the pickle dump + file. + `CDTOOLS_SPEED_TEST_PREFIX`: Prefix of the pickle dump file name. + + Outputs in the pickle dump file: + study_dict: dict + Results of the `N` GPU `M`-th trial run. Contains the following + key-value pairs: + `study_dict['loss history']`: List[np.float32] + Loss values as a function of epoch + `study_dict['time history']`: List[float] + Time recorded at each epoch in seconds + `study_dict['nGPUs']`: int + Number of GPUs used + `study_dict['trial']`: int + Trial number + """ + def wrapper(): + # Figure out how to name the save file and where to save it to + # These environment variables are provided by run_speed_test + trial_number = int(os.environ.get('CDTOOLS_TRIAL_NUMBER')) + output_dir = os.environ.get('CDTOOLS_SPEED_TEST_RESULTS_DIR') + file_prefix = os.environ.get('CDTOOLS_SPEED_TEST_PREFIX') + + # Run the script + model = func() + + # Save the model and loss history, but only using the rank 0 process + if model.rank == 0: + # Set up the file name: + file_name = f'{file_prefix}_nGPUs_{model.world_size}_' +\ + f'TRIAL_{trial_number}.pkl' + # Grab the loss and time history + loss_history = model.loss_history + time_history = model.loss_times + + # Store quantities in a dictionary + study_dict = {'loss history': loss_history, + 'time history': time_history, + 'nGPUs': model.world_size, + 'trial': trial_number} + # Save the quantities + with open(os.path.join(output_dir, file_name), 'wb') as save_file: + pickle.dump(study_dict, save_file) + + print(f'[INFO]: Saved results to: {file_name}') + return wrapper + + +def run_speed_test(world_sizes: List[int], + runs: int, + script_path: str, + output_dir: str, + file_prefix: str = 'speed_test', + show_plot: bool = True, + delete_output_files: bool = False, + nnodes: int = 1, + backend: str = 'nccl', + timeout: int = 30, + nccl_p2p_disable: bool = True, + seed: int = None + ) -> Tuple[List[float], + List[float], + List[float], + List[float]]: + """ + Executes a reconstruction script using `world_sizes` GPUs and `runs` + trials per GPU count using `torchrun` and + `cdtools.tools.distributed.single_to_multi_gpu`. + + `run_speed_test` requires the tested reconstruction script to be wrapped + in a function, which returns the reconstructed model, along with a + if-name-main block which calls the function. The function needs to be + decorated with `report_speed_test`. + + The speed test (specifically, `report_speed_test`) will generate pickle + dump files named `_nGPUs__TRIAL_.pkl` + at the directory `output_dir` (see documentation for `report_speed_test` + for the file content). If `output_dir` does not exist, one will be created + in the current directory. + + After each trial, the contents of the dump file are read and stored by + `run_speed_test` to calculate the mean and standard deviation of the + loss-versus-epoch/time and runtime speedup data over the `runs` trials + executed. If `delete_output_files` is enabled, then the pickle dump files + will be deleted after they have been read. + + `run_speed_test` executes the following in a subprocess to run + single/multi-GPU jobs + ``` + torchrun + --standalone + --nnodes=$NNODES + --nproc_per_node=$WORLD_SIZE + -m + cdtools.tools.distributed.single_to_multi_gpu + --backend=$BACKEND + --timeout=$TIMEOUT + --nccl_p2p_disable=$NCCL_P2P_DISABLE + YOUR_RECONSTRUCTION_SCRIPT.py + ``` + and provides the following environment variables to the child environment + that are necessary for the pickle dump files to be generated by + `report_speed_test`: + `CDTOOLS_TRIAL_NUMBER`: The test trial number + `CDTOOLS_SPEED_TEST_RESULTS_DIR`: Directory to save the pickle dump + file. + `CDTOOLS_SPEED_TEST_PREFIX`: Prefix of the pickle dump file name. + + Parameters: + world_sizes: List[int] + Number of GPUs to use. User can specify several GPU counts in a + list. But the first entry must be 1 (single-GPU). + runs: int + How many repeat reconstructions to perform + script_path: str + Path of the single-gpu reconstruction script. + output_dir: str + Directory of the loss-vs-time/epoch data generated for the speed + test. + file_prefix: str + Prefix of the speed test result file names + show_plot: bool + Show loss-versus-epoch/time and speed-up-versus-GPU count curves + delete_output_files: bool + Removes the results files produced by `report_speed_test` from + the output_dir after each trail run. + nnodes: int + Number of nodes to use. This module has only been tested with 1 + node. + backend: str + Communication backend for distributive computing. NVidia Collective + Communications Library ('nccl') is the default and only tested + option. See https://docs.pytorch.org/docs/stable/distributed.html + for other backends supported by pytorch (but have not been tested + in this package). + timeout: int + Timeout for operations to be executed in seconds. All processes + will be aborted after the timeout has been exceeded. + nccl_p2p_disable: bool + Sets the `NCCL_P2P_DISABLE` environment variable to enable/disable + nccl peer-to-peer communication. If you find that all your GPUs + are at 100% usage but the program isn't doing anything, try + enabling this variable. + seed: int + Seed for generating random numbers. Default is None (seed is + randomly generated). + + Returns: + final_loss_mean_list: List[float] + Mean final loss value over `runs` iterations for each `world_size` + value specified. + final_loss_std_list: List[float] + Standard deviation of the final loss value over `runs` iterations + for each `world_size`. + speed_up_mean_list: List[float] + Mean runtime speed-up over `runs` iterations for each `world_size` + value specified. Speed-up is defined as the + `runtime_nGPUs / runtime_1_GPU`. + speed_up_std_list: List[float] + Standard deviation of the runtime speed-up over `runs` iterations + for each `world_size`. + """ + + # Make sure the directory exists; or else create it + Path(output_dir).mkdir(parents=False, exist_ok=True) + + # Set stuff up for plots + if show_plot: + fig, (ax1, ax2, ax3) = plt.subplots(1, 3) + + # Store the value of the single GPU time + time_1gpu = 0 + std_1gpu = 0 + + # Store values of the different speed-up factors and final losses + # as a function of GPU count + speed_up_mean_list = [] + speed_up_std_list = [] + final_loss_mean_list = [] + final_loss_std_list = [] + + for world_size in world_sizes: + # Make a list to store the values + time_list = [] + loss_hist_list = [] + + for i in range(runs): + print('[INFO]: Resetting the model...') + print(f'[INFO]: Starting run {i+1}/{runs} on {world_size} GPU(s)') + + # The scripts running speed tests need to read the trial number + # they are on using. We send this information using environment + # variables sent to the child processes spawned by subprocess.run + child_env = os.environ.copy() + child_env['CDTOOLS_TRIAL_NUMBER'] = str(i) + child_env['CDTOOLS_SPEED_TEST_RESULTS_DIR'] = output_dir + child_env['CDTOOLS_SPEED_TEST_PREFIX'] = file_prefix + + # Set up the terminal commands for a single-node, multi-GPU job + cmd = ['torchrun', + '--standalone', + f'--nnodes={nnodes}', + f'--nproc_per_node={world_size}', + '-m', + 'cdtools.tools.distributed.single_to_multi_gpu', + f'--backend={backend}', + f'--timeout={timeout}', + f'--nccl_p2p_disable={int(nccl_p2p_disable)}'] + + if seed is not None: + cmd.append(f'--seed={seed}') + + cmd.append(f'{script_path}') + + # Run the single/multi-GPU job + try: + subprocess.run(cmd, check=True, env=child_env) + + except subprocess.CalledProcessError as e: + raise e + + # Load the loss results + print('[INFO]: Reconstruction complete. Loading loss results...') + + save_path = os.path.join(output_dir, + f'{file_prefix}_nGPUs_{world_size}_' + + f'TRIAL_{i}.pkl') + + with open(save_path, 'rb') as f: + results = pickle.load(f) + time_list.append(results['time history']) + loss_hist_list.append(results['loss history']) + + print('[INFO]: Loss results loaded.') + + if delete_output_files: + print(f'[INFO]: Removing {save_path}') + os.remove(save_path) + + # Calculate the statistics + time_mean = t.tensor(time_list).mean(dim=0)/60 + time_std = t.tensor(time_list).std(dim=0)/60 + loss_mean = t.tensor(loss_hist_list).mean(dim=0) + loss_std = t.tensor(loss_hist_list).std(dim=0) + + # If a single GPU is used, store the time + if world_size == 1: + time_1gpu = time_mean[-1] + std_1gpu = time_std[-1] + + # Calculate the speed-up relative to using a single GPU + speed_up_mean = time_1gpu / time_mean[-1] + speed_up_std = speed_up_mean * \ + t.sqrt((std_1gpu/time_1gpu)**2 + (time_std[-1]/time_mean[-1])**2) + + # Store the final lossess and speed-ups + final_loss_mean_list.append(loss_mean[-1].item()) + final_loss_std_list.append(loss_std[-1].item()) + speed_up_mean_list.append(speed_up_mean.item()) + speed_up_std_list.append(speed_up_std.item()) + + # Add another plot + if show_plot: + ax1.errorbar(time_mean, loss_mean, yerr=loss_std, xerr=time_std, + label=f'{world_size} GPUs') + ax2.errorbar(t.arange(0, loss_mean.shape[0]), loss_mean, + yerr=loss_std, label=f'{world_size} GPUs') + ax3.errorbar(world_size, speed_up_mean, yerr=speed_up_std, fmt='o') + + # Plot + if show_plot: + fig.suptitle(f'Multi-GPU performance test | {runs} runs performed') + ax1.set_yscale('log') + ax1.set_xscale('linear') + ax2.set_yscale('log') + ax2.set_xscale('linear') + ax3.set_yscale('linear') + ax3.set_xscale('linear') + ax1.legend() + ax2.legend() + ax1.set_xlabel('Time (min)') + ax1.set_ylabel('Loss') + ax2.set_xlabel('Epochs') + ax3.set_xlabel('Number of GPUs') + ax3.set_ylabel('Speed-up relative to single GPU') + + print('[INFO]: Multi-GPU speed test completed.') + + return (final_loss_mean_list, final_loss_std_list, + speed_up_mean_list, speed_up_std_list) diff --git a/src/cdtools/tools/distributed/single_to_multi_gpu.py b/src/cdtools/tools/distributed/single_to_multi_gpu.py new file mode 100644 index 00000000..9b8dbd99 --- /dev/null +++ b/src/cdtools/tools/distributed/single_to_multi_gpu.py @@ -0,0 +1,65 @@ +""" +A wrapper script to run single-GPU reconstruction scripts as a multi-GPU job +when called by torchrun. + +This script is intended to be called by torchrun. It is set up so that the +group process handling (init and destroy), definition of several environmental +variables, and actual execution of the single-GPU script are handled by a +single call to dist.run_single_gpu_script. + +For example, if we have the reconstruction script `reconstruct.py` and want to +use 4 GPUs, we can write the following: + +``` +torchrun + --nnodes=1 + --nproc_per_node=4 + single-to-multi-gpu.py + --script_path=reconstruct.py +``` +""" +import cdtools.tools.distributed as dist +import argparse + + +def get_args(): + # Define the arguments we need to pass to dist.script_wrapper + parser = argparse.ArgumentParser() + parser.add_argument('--backend', + type=str, + default='nccl', + choices=['nccl', 'gloo'], + help='Communication backend (nccl or gloo)') + parser.add_argument('--timeout', + type=int, + default=30, + help='Time before process is killed in seconds') + parser.add_argument('--nccl_p2p_disable', + type=int, + default=1, + choices=[0, 1], + help='Disable (1) or enable (0) NCCL peer-to-peer communication') # noqa + parser.add_argument('--seed', + type=int, + default=None, + help='Sets the RNG seed for all devices') + parser.add_argument('script_path', + type=str, + help='Single GPU script file name') + + return parser.parse_args() + + +def main(): + # Get args + args = get_args() + # Pass arguments to dist.script_wrapper + dist.run_single_gpu_script(script_path=args.script_path, + backend=args.backend, + timeout=args.timeout, + nccl_p2p_disable=bool(args.nccl_p2p_disable), + seed=args.seed) + + +if __name__ == '__main__': + main() diff --git a/tests/conftest.py b/tests/conftest.py index f0faea57..d45e4f55 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -32,20 +32,32 @@ def pytest_addoption(parser): default=False, help="run slow tests, primarily full reconstruction tests." ) + parser.addoption( + "--runmultigpu", + action="store_true", + default=False, + help="Runs tests using 2 NVIDIA CUDA GPUs." + ) def pytest_configure(config): config.addinivalue_line("markers", "slow: mark test as slow to run") + config.addinivalue_line("markers", "multigpu: run the multigpu test using 2 NVIDIA GPUs") def pytest_collection_modifyitems(config, items): - if config.getoption("--runslow"): - # --runslow given in cli: do not skip slow tests - return + # Skip the slow and/or multigpu tests if --runslow and/or --multigpu + # is given in cli. + skip_slow = pytest.mark.skip(reason="need --runslow option to run") + skip_multigpu = pytest.mark.skip(reason='need --runmultigpu option to run') + for item in items: - if "slow" in item.keywords: + if "slow" in item.keywords and not config.getoption("--runslow"): item.add_marker(skip_slow) + + if "multigpu" in item.keywords and not config.getoption("--runmultigpu"): + item.add_marker(skip_multigpu) @pytest.fixture @@ -415,3 +427,15 @@ def example_nested_dicts(pytestconfig): } return [test_dict_1, test_dict_2, test_dict_3] + + +@pytest.fixture(scope='module') +def multigpu_script_1(pytestconfig): + return str(pytestconfig.rootpath) + \ + '/tests/multi_gpu/multi_gpu_script_quality.py' + + +@pytest.fixture(scope='module') +def multigpu_script_2(pytestconfig): + return str(pytestconfig.rootpath) + \ + '/tests/multi_gpu/multi_gpu_script_plot_and_save.py' diff --git a/tests/multi_gpu/multi_gpu_script_plot_and_save.py b/tests/multi_gpu/multi_gpu_script_plot_and_save.py new file mode 100644 index 00000000..456fae29 --- /dev/null +++ b/tests/multi_gpu/multi_gpu_script_plot_and_save.py @@ -0,0 +1,77 @@ +import cdtools +import os +from matplotlib import pyplot as plt + +filename = os.environ.get('CDTOOLS_TESTING_DATA_PATH') +savedir = os.environ.get('CDTOOLS_TESTING_TMP_PATH') +SHOW_PLOT = bool(int(os.environ.get('CDTOOLS_TESTING_SHOW_PLOT'))) +dataset = cdtools.datasets.Ptycho2DDataset.from_cxi(filename) + +model = cdtools.models.FancyPtycho.from_dataset( + dataset, + n_modes=3, + oversampling=2, + probe_support_radius=120, + propagation_distance=5e-3, + units='mm', + obj_view_crop=-50, +) + +device = 'cuda' +model.to(device=device) +dataset.get_as(device=device) + +# Test Ptycho2DDataset.inspect +if SHOW_PLOT: + dataset.inspect() + +# Test Ptycho2DDataset.to_cxi +filename_to_cxi = os.path.join(savedir, + f'RANK_{model.rank}_test_to_cxi.h5') +dataset.to_cxi(filename_to_cxi) + +# Test CDIModel.save_to_h5 +filename_save_to_h5 = os.path.join(savedir, + f'RANK_{model.rank}_test_save_to.h5') +model.save_to_h5(filename_save_to_h5, dataset) + +# Test CDIModel.save_on_exit(), CDIModel.inspect() +filename_save_on_exit = os.path.join(savedir, + f'RANK_{model.rank}_test_save_on_exit.h5') + +with model.save_on_exit(filename_save_on_exit, dataset): + for loss in model.Adam_optimize(5, dataset, lr=0.02, batch_size=40): + if model.rank == 0: + print(model.report()) + if SHOW_PLOT: + model.inspect(dataset) + + +if SHOW_PLOT: + # Test CDIModel.compare(dataset) + model.compare(dataset) + + # Test CDIModel.save_figures() + filename_save_figures = os.path.join(savedir, + f'RANK_{model.rank}_test_plot_') + model.save_figures(prefix=filename_save_figures, + extension='.png') + + plt.close('all') + +# Test CDIModel.save_checkpoint +filename_save_checkpoint = \ + os.path.join(savedir, f'RANK_{model.rank}_test_save_checkpoint.pt') +model.save_checkpoint(dataset, checkpoint_file=filename_save_checkpoint) + +# Test CDIModel.save_on_exception() +filename_save_on_except = \ + os.path.join(savedir, f'RANK_{model.rank}_test_save_on_except.h5') + +with model.save_on_exception(filename_save_on_except, dataset): + for loss in model.Adam_optimize(10, dataset, lr=0.02, batch_size=40): + if model.rank == 0 and model.epoch <= 10: + print(model.report()) + elif model.epoch > 10: + raise Exception('This is a deliberate exception raised to ' + + 'test save on exception') diff --git a/tests/multi_gpu/multi_gpu_script_quality.py b/tests/multi_gpu/multi_gpu_script_quality.py new file mode 100644 index 00000000..a66d0ee0 --- /dev/null +++ b/tests/multi_gpu/multi_gpu_script_quality.py @@ -0,0 +1,40 @@ +import cdtools +import os + + +@cdtools.tools.distributed.report_speed_test +def main(): + filename = os.environ.get('CDTOOLS_TESTING_DATA_PATH') + dataset = cdtools.datasets.Ptycho2DDataset.from_cxi(filename) + + # FancyPtycho is the workhorse model + model = cdtools.models.FancyPtycho.from_dataset( + dataset, + n_modes=3, + oversampling=2, + probe_support_radius=120, + propagation_distance=5e-3, + units='mm', + obj_view_crop=-50, + ) + + device = 'cuda' + model.to(device=device) + dataset.get_as(device=device) + + for loss in model.Adam_optimize(50, dataset, lr=0.02, batch_size=40): + if model.rank == 0 and model.epoch % 10: + print(model.report()) + for loss in model.Adam_optimize(25, dataset, lr=0.005, batch_size=40): + if model.rank == 0 and model.epoch % 10: + print(model.report()) + for loss in model.Adam_optimize(25, dataset, lr=0.001, batch_size=40): + if model.rank == 0 and model.epoch % 10: + print(model.report()) + + model.tidy_probes() + return model + + +if __name__ == '__main__': + main() diff --git a/tests/multi_gpu/test_multi_gpu.py b/tests/multi_gpu/test_multi_gpu.py new file mode 100644 index 00000000..f26db2cc --- /dev/null +++ b/tests/multi_gpu/test_multi_gpu.py @@ -0,0 +1,169 @@ +import cdtools.tools.distributed as dist +import pytest +import os +import subprocess + +""" +This file contains several tests that are relevant to running multi-GPU +operations in CDTools. +""" + + +@pytest.mark.multigpu +def test_plotting_and_saving(lab_ptycho_cxi, + multigpu_script_2, + tmp_path, + show_plot): + """ + Run a multi-GPU test on a script that executes several plotting and + file-saving methods from CDIModel and ensure they run without failure. + + Also, make sure that only 1 GPU is generating the plots. + + If this test fails, one of three things happened: + 1) Either something failed while multigpu_script_2 was called + 2) Somehow, something aside from Rank 0 saved results + 3) multigpu_script_2 was not able to save all the data files + we asked it to save. + """ + # Pass the cxi directory to the reconstruction script + # Define a temporary directory + + # Run the test script, which generates several files that either have + # the prefix + cmd = ['torchrun', + '--standalone', + '--nnodes=1', + '--nproc_per_node=2', + '-m', + 'cdtools.tools.distributed.single_to_multi_gpu', + '--backend=nccl', + '--timeout=30', + '--nccl_p2p_disable=1', + multigpu_script_2] + + child_env = os.environ.copy() + child_env['CDTOOLS_TESTING_DATA_PATH'] = lab_ptycho_cxi + child_env['CDTOOLS_TESTING_TMP_PATH'] = str(tmp_path) + child_env['CDTOOLS_TESTING_SHOW_PLOT'] = str(int(show_plot)) + + try: + subprocess.run(cmd, check=True, env=child_env) + except subprocess.CalledProcessError: + # The called script is designed to throw an exception. + # TODO: Figure out how to distinguish between the engineered error + # in the script versus any other error. + pass + + # Check if all the generated file names only have the prefix 'RANK_0' + filelist = [f for f in os.listdir(tmp_path) + if os.path.isfile(os.path.join(tmp_path, f))] + + assert all([file.startswith('RANK_0') for file in filelist]) + print('All files have the RANK_0 prefix.') + + # Check if plots have been saved + if show_plot: + print('Plots generated: ' + + f"{sum([file.startswith('RANK_0_test_plot') for file in filelist])}") # noqa + assert any([file.startswith('RANK_0_test_plot') for file in filelist]) + else: + print('--plot not enabled. Checks on plotting and figure saving' + + ' will not be conducted.') + + # Check if we have all five data files saved + file_output_suffix = ('test_save_checkpoint.pt', + 'test_save_on_exit.h5', + 'test_save_on_except.h5', + 'test_save_to.h5', + 'test_to_cxi.h5') + + print(f'{sum([file.endswith(file_output_suffix) for file in filelist])}' + + ' out of 5 data files have been generated.') + assert sum([file.endswith(file_output_suffix) for file in filelist]) \ + == len(file_output_suffix) + + +@pytest.mark.multigpu +def test_reconstruction_quality(lab_ptycho_cxi, + multigpu_script_1, + tmp_path, + show_plot): + """ + Run a multi-GPU test based on fancy_ptycho_speed_test.py and make + sure the final reconstructed loss using 2 GPUs is similar to 1 GPU. + + This test requires us to have 2 NVIDIA GPUs and makes use of the + multi-GPU speed test. + + If this test fails, it indicates that the reconstruction quality is + getting noticably worse with increased GPU counts. This may be a symptom + of a synchronization/broadcasting issue between the different GPUs. + """ + # Pass the cxi directory to the reconstruction script + os.environ['CDTOOLS_TESTING_DATA_PATH'] = lab_ptycho_cxi + + # Set up and run a distributed speed test + world_sizes = [1, 2] + runs = 5 + file_prefix = 'speed_test' + + # Define a temporary directory + temp_dir = str(tmp_path) + + results = dist.run_speed_test(world_sizes=world_sizes, + runs=runs, + script_path=multigpu_script_1, + output_dir=temp_dir, + file_prefix=file_prefix, + show_plot=show_plot, + delete_output_files=True) + + # Ensure that both single and 2 GPU results produce losses lower than + # a threshold value of 0.0013. This is the same threshold used in + # test_fancy_ptycho.py + loss_mean = results[0] + assert loss_mean[0] < 0.0013 + assert loss_mean[1] < 0.0013 + + # Check if the two losses are similar to each other by seeing if their + # mean +- standard deviation intervals overlap with each other + loss_std = results[1] + single_gpu_loss_min = loss_mean[0] - loss_std[0] + single_gpu_loss_max = loss_mean[0] + loss_std[0] + multi_gpu_loss_min = loss_mean[1] - loss_std[1] + multi_gpu_loss_max = loss_mean[1] + loss_std[1] + has_overlap_loss = \ + min(single_gpu_loss_max, multi_gpu_loss_max)\ + > max(single_gpu_loss_min, multi_gpu_loss_min) + + print(f'Single GPU final loss: {loss_mean[0]} +- {loss_std[0]}') + print(f'Two GPU final loss: {loss_mean[1]} +- {loss_std[1]}') + print('Overlap between mean +- std of the single/multi GPU losses: ' + + f'{has_overlap_loss}') + + assert has_overlap_loss + + # Also make sure that we actually get some kind of speed up with + # multiple GPUs... + speed_mean = results[2] + speed_std = results[3] + + single_gpu_speed_min = speed_mean[0] - speed_std[0] + single_gpu_speed_max = speed_mean[0] + speed_std[0] + multi_gpu_speed_min = speed_mean[1] - speed_std[1] + multi_gpu_speed_max = speed_mean[1] + speed_std[1] + has_overlap_speed = \ + min(single_gpu_speed_max, multi_gpu_speed_max)\ + > max(single_gpu_speed_min, multi_gpu_speed_min) + + print(f'Single GPU runtime: {speed_mean[0]} +- {speed_std[0]}') + print(f'Two GPU runtime: {speed_mean[1]} +- {speed_std[1]}') + print('Overlap between the mean +- std of the single/multi GPU runtimes: ' + + f'{has_overlap_speed}') + + assert speed_mean[0] < speed_mean[1] + assert not has_overlap_speed + + # Clear the environment variable we created here + os.environ.pop('CDTOOLS_TESTING_DATA_PATH')