diff --git a/config.yml b/config.yml index 3f26a4f0..f224ce14 100644 --- a/config.yml +++ b/config.yml @@ -46,6 +46,7 @@ globus: uri: beegfs.als.lbl.gov uuid: d33b5d6e-1603-414e-93cb-bcb732b7914a name: bl733-beegfs-data + # 8.3.2 ENDPOINTS spot832: @@ -72,17 +73,35 @@ globus: uuid: 75b478b2-37af-46df-bfbd-71ed692c6506 name: data832_scratch - alcf832_raw: + alcf832_synaps_raw: + root_path: /data/bl832/raw + uri: alcf.anl.gov + uuid: 728a8e30-32ef-4000-814c-f9ccbc00bf13 + name: alcf832_synaps_raw + + alcf832_synaps_recon: + root_path: /data/bl832/scratch/reconstruction/ + uri: alcf.anl.gov + uuid: 728a8e30-32ef-4000-814c-f9ccbc00bf13 + name: alcf832_synaps_recon + + alcf832_synaps_segment: + root_path: /data/bl832/scratch/segmentation/ + uri: alcf.anl.gov + uuid: 728a8e30-32ef-4000-814c-f9ccbc00bf13 + name: alcf832_synaps_segment + + alcf832_iri_raw: root_path: /data/raw uri: alcf.anl.gov uuid: 55c3adf6-31f1-4647-9a38-52591642f7e7 - name: alcf_raw + name: alcf_iri_raw - alcf832_scratch: + alcf832_iri_scratch: root_path: /data/scratch uri: alcf.anl.gov uuid: 55c3adf6-31f1-4647-9a38-52591642f7e7 - name: alcf_scratch + name: alcf_iri_scratch alcf_eagle832: root_path: /IRIBeta/als/example diff --git a/orchestration/_tests/test_bl832/__init__.py b/orchestration/_tests/test_bl832/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/orchestration/_tests/test_bl832/test_alcf.py b/orchestration/_tests/test_bl832/test_alcf.py new file mode 100644 index 00000000..b5434e0a --- /dev/null +++ b/orchestration/_tests/test_bl832/test_alcf.py @@ -0,0 +1,637 @@ +# orchestration/_tests/bl832/test_alcf.py + +import pytest +from uuid import uuid4 + +from prefect.blocks.system import Secret +from prefect.variables import Variable +from prefect.testing.utilities import prefect_test_harness + + +# ────────────────────────────────────────────────────────────────────────────── +# Session fixture +# ────────────────────────────────────────────────────────────────────────────── + +@pytest.fixture(autouse=True, scope="session") +def prefect_test_fixture(): + """Set up Prefect test harness with all required secrets and variables.""" + with prefect_test_harness(): + Secret(value=str(uuid4())).save(name="globus-client-id", overwrite=True) + Secret(value=str(uuid4())).save(name="globus-client-secret", overwrite=True) + Secret(value=str(uuid4())).save(name="globus-compute-endpoint", overwrite=True) + + Variable.set( + name="alcf-allocation-root-path", + value={"alcf-allocation-root-path": "/eagle/test"}, + overwrite=True, _sync=True + ) + Variable.set(name="pruning-config", + value={"max_wait_seconds": 600}, overwrite=True, _sync=True) + yield + + +# ────────────────────────────────────────────────────────────────────────────── +# Shared helpers +# ────────────────────────────────────────────────────────────────────────────── + +def _make_future(mocker, value): + """Return a mock Prefect future whose .result() yields the given value.""" + f = mocker.MagicMock() + f.result.return_value = value + return f + + +def _patch_config832(mocker): + """ + Patch all Config832 network dependencies and return a real Config832 instance. + + Flow tests receive a real Config832 so Prefect's parameter type validation + passes, but all underlying Globus clients and endpoints are mocked. + """ + mock_secret = mocker.MagicMock() + mock_secret.get.return_value = str(uuid4()) + mocker.patch("prefect.blocks.system.Secret.load", return_value=mock_secret) + + endpoint_mock = mocker.MagicMock() + mocker.patch( + "orchestration.flows.bl832.config.transfer.build_endpoints", + return_value={k: endpoint_mock for k in [ + "spot832", "data832", "data832_raw", "data832_scratch", + "nersc832", "nersc_alsdev", + "nersc832_alsdev_raw", "nersc832_alsdev_scratch", + "nersc832_alsdev_pscratch_raw", "nersc832_alsdev_pscratch_scratch", + "nersc832_alsdev_recon_scripts", + "alcf832_iri_raw", "alcf832_iri_scratch", + "alcf832_synaps_raw", "alcf832_synaps_recon", "alcf832_synaps_segment", + ]} + ) + mocker.patch("orchestration.flows.bl832.config.transfer.build_apps", + return_value={"als_transfer": "mock_app"}) + mocker.patch("orchestration.flows.bl832.config.transfer.init_transfer_client", + return_value=mocker.MagicMock()) + mocker.patch("orchestration.flows.bl832.config.flows.get_flows_client", + return_value=mocker.MagicMock()) + mocker.patch("orchestration.config.settings", mocker.MagicMock()) + + from orchestration.flows.bl832.config import Config832 + return Config832() + + +def _mock_executor(mocker): + """Return a context-manager-compatible mock Executor.""" + mock_exec = mocker.MagicMock() + mock_exec.__enter__ = mocker.MagicMock(return_value=mock_exec) + mock_exec.__exit__ = mocker.MagicMock(return_value=False) + mocker.patch("orchestration.flows.bl832.alcf.Executor", return_value=mock_exec) + return mock_exec + + +# ────────────────────────────────────────────────────────────────────────────── +# Controller fixture +# +# Controller tests instantiate ALCFTomographyHPCController directly, outside +# any flow context. We patch get_run_logger (used in __init__ and every method) +# and Variable.get (used in __init__ for allocation root and in methods for +# endpoint UUIDs). +# ────────────────────────────────────────────────────────────────────────────── + +@pytest.fixture +def mock_controller(mocker): + """ALCFTomographyHPCController with all external dependencies mocked.""" + mocker.patch("orchestration.flows.bl832.alcf.get_run_logger", + return_value=mocker.MagicMock()) + + def variable_get_side_effect(name, default=None, _sync=False): + if name == "alcf-allocation-root-path": + result = mocker.MagicMock() + result.get.return_value = "/eagle/test" + return result + # endpoint UUIDs and other variables: return a stable UUID string + return default if default is not None else str(uuid4()) + + mocker.patch("orchestration.flows.bl832.alcf.Variable.get", + side_effect=variable_get_side_effect) + + mocker.patch("orchestration.flows.bl832.alcf.Client") + mocker.patch("orchestration.flows.bl832.alcf.Secret") + + mock_config = mocker.MagicMock() + from orchestration.flows.bl832.alcf import ALCFTomographyHPCController + return ALCFTomographyHPCController(config=mock_config) + + +# ────────────────────────────────────────────────────────────────────────────── +# ALCFTomographyHPCController — reconstruct +# ────────────────────────────────────────────────────────────────────────────── + +def test_reconstruct_success(mocker, mock_controller): + mock_exec = _mock_executor(mocker) + mocker.patch.object(mock_controller, "_wait_for_globus_compute_future", return_value=True) + + result = mock_controller.reconstruct(file_path="folder/file.h5") + + assert result is True + mock_exec.submit.assert_called_once() + + +def test_reconstruct_failure(mocker, mock_controller): + _mock_executor(mocker) + mocker.patch.object(mock_controller, "_wait_for_globus_compute_future", return_value=False) + + result = mock_controller.reconstruct(file_path="folder/file.h5") + + assert result is False + + +# ────────────────────────────────────────────────────────────────────────────── +# ALCFTomographyHPCController — build_multi_resolution +# ────────────────────────────────────────────────────────────────────────────── + +def test_build_multi_resolution_success(mocker, mock_controller): + mock_exec = _mock_executor(mocker) + mocker.patch.object(mock_controller, "_wait_for_globus_compute_future", return_value=True) + + result = mock_controller.build_multi_resolution(file_path="folder/file.h5") + + assert result is True + mock_exec.submit.assert_called_once() + + +def test_build_multi_resolution_failure(mocker, mock_controller): + _mock_executor(mocker) + mocker.patch.object(mock_controller, "_wait_for_globus_compute_future", return_value=False) + + result = mock_controller.build_multi_resolution(file_path="folder/file.h5") + + assert result is False + + +# ────────────────────────────────────────────────────────────────────────────── +# ALCFTomographyHPCController — segmentation_sam3 +# ────────────────────────────────────────────────────────────────────────────── + +def test_segmentation_sam3_success(mocker, mock_controller): + mock_exec = _mock_executor(mocker) + mocker.patch.object(mock_controller, "_wait_for_globus_compute_future", return_value=True) + + result = mock_controller.segmentation_sam3(recon_folder_path="folder/recfile") + + assert result is True + mock_exec.submit.assert_called_once() + + +def test_segmentation_sam3_failure(mocker, mock_controller): + _mock_executor(mocker) + mocker.patch.object(mock_controller, "_wait_for_globus_compute_future", return_value=False) + + result = mock_controller.segmentation_sam3(recon_folder_path="folder/recfile") + + assert result is False + + +def test_segmentation_sam3_output_paths(mocker, mock_controller): + """ + SAM3 output dir should be under /sam3 and use the rec→seg path substitution. + Given 'folder/recfile', the code does replace('/rec', '/seg') → 'folder/segfile', + so output_dir should contain 'segfile' and '/sam3'. + """ + mock_exec = _mock_executor(mocker) + mocker.patch.object(mock_controller, "_wait_for_globus_compute_future", return_value=True) + + mock_controller.segmentation_sam3(recon_folder_path="folder/recfile") + + call_kwargs = mock_exec.submit.call_args.kwargs + assert "segfile" in call_kwargs.get("output_dir", "") + assert "/sam3" in call_kwargs.get("output_dir", "") + + +# ────────────────────────────────────────────────────────────────────────────── +# ALCFTomographyHPCController — segmentation_dino +# ────────────────────────────────────────────────────────────────────────────── + +def test_segmentation_dino_success(mocker, mock_controller): + mock_exec = _mock_executor(mocker) + mocker.patch.object(mock_controller, "_wait_for_globus_compute_future", return_value=True) + + result = mock_controller.segmentation_dino(recon_folder_path="folder/recfile") + + assert result is True + mock_exec.submit.assert_called_once() + + +def test_segmentation_dino_failure(mocker, mock_controller): + _mock_executor(mocker) + mocker.patch.object(mock_controller, "_wait_for_globus_compute_future", return_value=False) + + result = mock_controller.segmentation_dino(recon_folder_path="folder/recfile") + + assert result is False + + +def test_segmentation_dino_output_paths(mocker, mock_controller): + """DINO output dir should be under /dino.""" + mock_exec = _mock_executor(mocker) + mocker.patch.object(mock_controller, "_wait_for_globus_compute_future", return_value=True) + + mock_controller.segmentation_dino(recon_folder_path="folder/recfile") + + call_kwargs = mock_exec.submit.call_args.kwargs + assert "/dino" in call_kwargs.get("output_dir", "") + + +# ────────────────────────────────────────────────────────────────────────────── +# ALCFTomographyHPCController — combine_segmentations +# ────────────────────────────────────────────────────────────────────────────── + +def test_combine_segmentations_success(mocker, mock_controller): + mock_exec = _mock_executor(mocker) + mocker.patch.object(mock_controller, "_wait_for_globus_compute_future", return_value=True) + + result = mock_controller.combine_segmentations(recon_folder_path="folder/recfile") + + assert result is True + mock_exec.submit.assert_called_once() + + +def test_combine_segmentations_failure(mocker, mock_controller): + _mock_executor(mocker) + mocker.patch.object(mock_controller, "_wait_for_globus_compute_future", return_value=False) + + result = mock_controller.combine_segmentations(recon_folder_path="folder/recfile") + + assert result is False + + +def test_combine_segmentations_passes_sam3_and_dino_paths(mocker, mock_controller): + """Wrapper should be called with both sam3 and dino result paths.""" + mock_exec = _mock_executor(mocker) + mocker.patch.object(mock_controller, "_wait_for_globus_compute_future", return_value=True) + + mock_controller.combine_segmentations(recon_folder_path="folder/recfile") + + call_kwargs = mock_exec.submit.call_args.kwargs + assert "/sam3" in call_kwargs.get("sam3_results", "") + assert "/dino" in call_kwargs.get("dino_results", "") + assert "/combined" in call_kwargs.get("combined_output", "") + + +# ────────────────────────────────────────────────────────────────────────────── +# Prefect tasks +# ────────────────────────────────────────────────────────────────────────────── + +def test_alcf_segmentation_sam3_task_success(mocker): + from orchestration.flows.bl832.alcf import alcf_segmentation_sam3_task + + mocker.patch("orchestration.flows.bl832.alcf.get_run_logger", return_value=mocker.MagicMock()) + mock_controller = mocker.MagicMock() + mock_controller.segmentation_sam3.return_value = True + mocker.patch("orchestration.flows.bl832.alcf.get_controller", return_value=mock_controller) + + result = alcf_segmentation_sam3_task.fn( + recon_folder_path="folder/recfile", config=mocker.MagicMock() + ) + + mock_controller.segmentation_sam3.assert_called_once_with(recon_folder_path="folder/recfile") + assert result is True + + +def test_alcf_segmentation_sam3_task_failure(mocker): + from orchestration.flows.bl832.alcf import alcf_segmentation_sam3_task + + mocker.patch("orchestration.flows.bl832.alcf.get_run_logger", return_value=mocker.MagicMock()) + mock_controller = mocker.MagicMock() + mock_controller.segmentation_sam3.return_value = False + mocker.patch("orchestration.flows.bl832.alcf.get_controller", return_value=mock_controller) + + result = alcf_segmentation_sam3_task.fn( + recon_folder_path="folder/recfile", config=mocker.MagicMock() + ) + + assert result is False + + +def test_alcf_segmentation_dino_task_success(mocker): + from orchestration.flows.bl832.alcf import alcf_segmentation_dino_task + + mocker.patch("orchestration.flows.bl832.alcf.get_run_logger", return_value=mocker.MagicMock()) + mock_controller = mocker.MagicMock() + mock_controller.segmentation_dino.return_value = True + mocker.patch("orchestration.flows.bl832.alcf.get_controller", return_value=mock_controller) + + result = alcf_segmentation_dino_task.fn( + recon_folder_path="folder/recfile", config=mocker.MagicMock() + ) + + mock_controller.segmentation_dino.assert_called_once_with(recon_folder_path="folder/recfile") + assert result is True + + +def test_alcf_segmentation_dino_task_failure(mocker): + from orchestration.flows.bl832.alcf import alcf_segmentation_dino_task + + mocker.patch("orchestration.flows.bl832.alcf.get_run_logger", return_value=mocker.MagicMock()) + mock_controller = mocker.MagicMock() + mock_controller.segmentation_dino.return_value = False + mocker.patch("orchestration.flows.bl832.alcf.get_controller", return_value=mock_controller) + + result = alcf_segmentation_dino_task.fn( + recon_folder_path="folder/recfile", config=mocker.MagicMock() + ) + + assert result is False + + +def test_alcf_combine_segmentations_task_success(mocker): + from orchestration.flows.bl832.alcf import alcf_combine_segmentations_task + + mocker.patch("orchestration.flows.bl832.alcf.get_run_logger", return_value=mocker.MagicMock()) + mock_controller = mocker.MagicMock() + mock_controller.combine_segmentations.return_value = True + mocker.patch("orchestration.flows.bl832.alcf.get_controller", return_value=mock_controller) + + result = alcf_combine_segmentations_task.fn( + recon_folder_path="folder/recfile", config=mocker.MagicMock() + ) + + mock_controller.combine_segmentations.assert_called_once_with(recon_folder_path="folder/recfile") + assert result is True + + +def test_alcf_combine_segmentations_task_failure(mocker): + from orchestration.flows.bl832.alcf import alcf_combine_segmentations_task + + mocker.patch("orchestration.flows.bl832.alcf.get_run_logger", return_value=mocker.MagicMock()) + mock_controller = mocker.MagicMock() + mock_controller.combine_segmentations.return_value = False + mocker.patch("orchestration.flows.bl832.alcf.get_controller", return_value=mock_controller) + + result = alcf_combine_segmentations_task.fn( + recon_folder_path="folder/recfile", config=mocker.MagicMock() + ) + + assert result is False + + +# ────────────────────────────────────────────────────────────────────────────── +# alcf_recon_flow +# +# Flow tests pass a real Config832 instance (not a MagicMock) so Prefect's +# parameter type validation passes. Config832's network dependencies are all +# mocked via _patch_config832. Controller methods are patched on the class so +# the real __init__ runs (in the flow context) while no HPC calls are made. +# ────────────────────────────────────────────────────────────────────────────── + +def test_alcf_recon_flow_success(mocker): + from orchestration.flows.bl832.alcf import alcf_recon_flow, ALCFTomographyHPCController + + mock_config = _patch_config832(mocker) + + mock_transfer = mocker.MagicMock() + mock_transfer.copy.return_value = True + mocker.patch("orchestration.flows.bl832.alcf.get_transfer_controller", return_value=mock_transfer) + + mock_prune = mocker.MagicMock() + mocker.patch("orchestration.flows.bl832.alcf.get_prune_controller", return_value=mock_prune) + + mocker.patch.object(ALCFTomographyHPCController, "reconstruct", return_value=True) + mocker.patch.object(ALCFTomographyHPCController, "build_multi_resolution", return_value=True) + + result = alcf_recon_flow(file_path="folder/file.h5", config=mock_config) + + assert result is True + assert mock_transfer.copy.call_count == 3 # raw→ALCF, tiff→data832, zarr→data832 + assert mock_prune.prune.call_count == 5 # raw, tiff(ALCF), zarr(ALCF), tiff(data832), zarr(data832) + + +def test_alcf_recon_flow_transfer_failure(mocker): + from orchestration.flows.bl832.alcf import alcf_recon_flow, ALCFTomographyHPCController + + mock_config = _patch_config832(mocker) + + mock_transfer = mocker.MagicMock() + mock_transfer.copy.return_value = False + mocker.patch("orchestration.flows.bl832.alcf.get_transfer_controller", return_value=mock_transfer) + mocker.patch("orchestration.flows.bl832.alcf.get_prune_controller", return_value=mocker.MagicMock()) + mocker.patch.object(ALCFTomographyHPCController, "reconstruct", return_value=True) + mocker.patch.object(ALCFTomographyHPCController, "build_multi_resolution", return_value=True) + + with pytest.raises(ValueError, match="Transfer to ALCF Failed"): + alcf_recon_flow(file_path="folder/file.h5", config=mock_config) + + +def test_alcf_recon_flow_recon_failure(mocker): + from orchestration.flows.bl832.alcf import alcf_recon_flow, ALCFTomographyHPCController + + mock_config = _patch_config832(mocker) + + mock_transfer = mocker.MagicMock() + mock_transfer.copy.return_value = True + mocker.patch("orchestration.flows.bl832.alcf.get_transfer_controller", return_value=mock_transfer) + mocker.patch("orchestration.flows.bl832.alcf.get_prune_controller", return_value=mocker.MagicMock()) + mocker.patch.object(ALCFTomographyHPCController, "reconstruct", return_value=False) + mocker.patch.object(ALCFTomographyHPCController, "build_multi_resolution", return_value=True) + + with pytest.raises(ValueError, match="Reconstruction at ALCF Failed"): + alcf_recon_flow(file_path="folder/file.h5", config=mock_config) + + +def test_alcf_recon_flow_multires_failure(mocker): + from orchestration.flows.bl832.alcf import alcf_recon_flow, ALCFTomographyHPCController + + mock_config = _patch_config832(mocker) + + mock_transfer = mocker.MagicMock() + mock_transfer.copy.return_value = True + mocker.patch("orchestration.flows.bl832.alcf.get_transfer_controller", return_value=mock_transfer) + mocker.patch("orchestration.flows.bl832.alcf.get_prune_controller", return_value=mocker.MagicMock()) + mocker.patch.object(ALCFTomographyHPCController, "reconstruct", return_value=True) + mocker.patch.object(ALCFTomographyHPCController, "build_multi_resolution", return_value=False) + + with pytest.raises(ValueError, match="Tiff to Zarr at ALCF Failed"): + alcf_recon_flow(file_path="folder/file.h5", config=mock_config) + + +# ────────────────────────────────────────────────────────────────────────────── +# alcf_forge_recon_segment_flow +# ────────────────────────────────────────────────────────────────────────────── + +def test_alcf_forge_recon_segment_flow_success(mocker): + from orchestration.flows.bl832.alcf import alcf_forge_recon_segment_flow, ALCFTomographyHPCController + + mock_config = _patch_config832(mocker) + + mock_transfer = mocker.MagicMock() + mock_transfer.copy.return_value = True + mocker.patch("orchestration.flows.bl832.alcf.get_transfer_controller", return_value=mock_transfer) + mocker.patch("orchestration.flows.bl832.alcf.get_prune_controller", return_value=mocker.MagicMock()) + mocker.patch.object(ALCFTomographyHPCController, "reconstruct", return_value=True) + + mock_seg_task = mocker.patch( + "orchestration.flows.bl832.alcf.alcf_segmentation_sam3_task", return_value=True + ) + + result = alcf_forge_recon_segment_flow(file_path="folder/file.h5", config=mock_config) + + assert result is True + mock_seg_task.assert_called_once() + assert mock_transfer.copy.call_count == 3 # raw→ALCF, tiff→data832, seg→data832 + + +def test_alcf_forge_recon_segment_flow_transfer_failure(mocker): + from orchestration.flows.bl832.alcf import alcf_forge_recon_segment_flow, ALCFTomographyHPCController + + mock_config = _patch_config832(mocker) + + mock_transfer = mocker.MagicMock() + mock_transfer.copy.return_value = False + mocker.patch("orchestration.flows.bl832.alcf.get_transfer_controller", return_value=mock_transfer) + mocker.patch("orchestration.flows.bl832.alcf.get_prune_controller", return_value=mocker.MagicMock()) + mocker.patch.object(ALCFTomographyHPCController, "reconstruct", return_value=True) + + with pytest.raises(ValueError, match="Transfer to ALCF Failed"): + alcf_forge_recon_segment_flow(file_path="folder/file.h5", config=mock_config) + + +def test_alcf_forge_recon_segment_flow_recon_failure(mocker): + from orchestration.flows.bl832.alcf import alcf_forge_recon_segment_flow, ALCFTomographyHPCController + + mock_config = _patch_config832(mocker) + + mock_transfer = mocker.MagicMock() + mock_transfer.copy.return_value = True + mocker.patch("orchestration.flows.bl832.alcf.get_transfer_controller", return_value=mock_transfer) + mocker.patch("orchestration.flows.bl832.alcf.get_prune_controller", return_value=mocker.MagicMock()) + mocker.patch.object(ALCFTomographyHPCController, "reconstruct", return_value=False) + + with pytest.raises(ValueError, match="Reconstruction at ALCF Failed"): + alcf_forge_recon_segment_flow(file_path="folder/file.h5", config=mock_config) + + +def test_alcf_forge_recon_segment_flow_seg_failure(mocker): + """Flow should return False (not raise) when segmentation fails.""" + from orchestration.flows.bl832.alcf import alcf_forge_recon_segment_flow, ALCFTomographyHPCController + + mock_config = _patch_config832(mocker) + + mock_transfer = mocker.MagicMock() + mock_transfer.copy.return_value = True + mocker.patch("orchestration.flows.bl832.alcf.get_transfer_controller", return_value=mock_transfer) + mocker.patch("orchestration.flows.bl832.alcf.get_prune_controller", return_value=mocker.MagicMock()) + mocker.patch.object(ALCFTomographyHPCController, "reconstruct", return_value=True) + mocker.patch("orchestration.flows.bl832.alcf.alcf_segmentation_sam3_task", return_value=False) + + result = alcf_forge_recon_segment_flow(file_path="folder/file.h5", config=mock_config) + + assert result is False + + +# ────────────────────────────────────────────────────────────────────────────── +# alcf_forge_recon_multisegment_flow +# ────────────────────────────────────────────────────────────────────────────── + +def test_alcf_forge_recon_multisegment_flow_both_succeed(mocker): + from orchestration.flows.bl832.alcf import alcf_forge_recon_multisegment_flow, ALCFTomographyHPCController + + mock_config = _patch_config832(mocker) + + mock_transfer = mocker.MagicMock() + mock_transfer.copy.return_value = True + mocker.patch("orchestration.flows.bl832.alcf.get_transfer_controller", return_value=mock_transfer) + mocker.patch("orchestration.flows.bl832.alcf.get_prune_controller", return_value=mocker.MagicMock()) + mocker.patch.object(ALCFTomographyHPCController, "reconstruct", return_value=True) + mock_combine = mocker.patch.object( + ALCFTomographyHPCController, "combine_segmentations", return_value=True + ) + + mock_sam3_task = mocker.patch("orchestration.flows.bl832.alcf.alcf_segmentation_sam3_task") + mock_dino_task = mocker.patch("orchestration.flows.bl832.alcf.alcf_segmentation_dino_task") + mock_sam3_task.submit.return_value = _make_future(mocker, True) + mock_dino_task.submit.return_value = _make_future(mocker, True) + + result = alcf_forge_recon_multisegment_flow(file_path="folder/file.h5", config=mock_config) + + assert result is True + mock_sam3_task.submit.assert_called_once() + mock_dino_task.submit.assert_called_once() + mock_combine.assert_called_once() + + +def test_alcf_forge_recon_multisegment_flow_only_sam3_succeeds(mocker): + """Combine is skipped when DINO fails; flow still returns True.""" + from orchestration.flows.bl832.alcf import alcf_forge_recon_multisegment_flow, ALCFTomographyHPCController + + mock_config = _patch_config832(mocker) + + mock_transfer = mocker.MagicMock() + mock_transfer.copy.return_value = True + mocker.patch("orchestration.flows.bl832.alcf.get_transfer_controller", return_value=mock_transfer) + mocker.patch("orchestration.flows.bl832.alcf.get_prune_controller", return_value=mocker.MagicMock()) + mocker.patch.object(ALCFTomographyHPCController, "reconstruct", return_value=True) + mock_combine = mocker.patch.object( + ALCFTomographyHPCController, "combine_segmentations", return_value=True + ) + + mock_sam3_task = mocker.patch("orchestration.flows.bl832.alcf.alcf_segmentation_sam3_task") + mock_dino_task = mocker.patch("orchestration.flows.bl832.alcf.alcf_segmentation_dino_task") + mock_sam3_task.submit.return_value = _make_future(mocker, True) + mock_dino_task.submit.return_value = _make_future(mocker, False) + + result = alcf_forge_recon_multisegment_flow(file_path="folder/file.h5", config=mock_config) + + assert result is True + mock_combine.assert_not_called() + + +def test_alcf_forge_recon_multisegment_flow_both_seg_fail(mocker): + from orchestration.flows.bl832.alcf import alcf_forge_recon_multisegment_flow, ALCFTomographyHPCController + + mock_config = _patch_config832(mocker) + + mock_transfer = mocker.MagicMock() + mock_transfer.copy.return_value = True + mocker.patch("orchestration.flows.bl832.alcf.get_transfer_controller", return_value=mock_transfer) + mocker.patch("orchestration.flows.bl832.alcf.get_prune_controller", return_value=mocker.MagicMock()) + mocker.patch.object(ALCFTomographyHPCController, "reconstruct", return_value=True) + mock_combine = mocker.patch.object( + ALCFTomographyHPCController, "combine_segmentations", return_value=True + ) + + mock_sam3_task = mocker.patch("orchestration.flows.bl832.alcf.alcf_segmentation_sam3_task") + mock_dino_task = mocker.patch("orchestration.flows.bl832.alcf.alcf_segmentation_dino_task") + mock_sam3_task.submit.return_value = _make_future(mocker, False) + mock_dino_task.submit.return_value = _make_future(mocker, False) + + result = alcf_forge_recon_multisegment_flow(file_path="folder/file.h5", config=mock_config) + + assert result is False + mock_combine.assert_not_called() + + +def test_alcf_forge_recon_multisegment_flow_recon_failure(mocker): + from orchestration.flows.bl832.alcf import alcf_forge_recon_multisegment_flow, ALCFTomographyHPCController + + mock_config = _patch_config832(mocker) + + mock_transfer = mocker.MagicMock() + mock_transfer.copy.return_value = True + mocker.patch("orchestration.flows.bl832.alcf.get_transfer_controller", return_value=mock_transfer) + mocker.patch("orchestration.flows.bl832.alcf.get_prune_controller", return_value=mocker.MagicMock()) + mocker.patch.object(ALCFTomographyHPCController, "reconstruct", return_value=False) + + with pytest.raises(ValueError, match="Reconstruction at ALCF Failed"): + alcf_forge_recon_multisegment_flow(file_path="folder/file.h5", config=mock_config) + + +def test_alcf_forge_recon_multisegment_flow_transfer_failure(mocker): + from orchestration.flows.bl832.alcf import alcf_forge_recon_multisegment_flow, ALCFTomographyHPCController + + mock_config = _patch_config832(mocker) + + mock_transfer = mocker.MagicMock() + mock_transfer.copy.return_value = False + mocker.patch("orchestration.flows.bl832.alcf.get_transfer_controller", return_value=mock_transfer) + mocker.patch("orchestration.flows.bl832.alcf.get_prune_controller", return_value=mocker.MagicMock()) + mocker.patch.object(ALCFTomographyHPCController, "reconstruct", return_value=True) + + with pytest.raises(ValueError, match="Transfer to ALCF Failed"): + alcf_forge_recon_multisegment_flow(file_path="folder/file.h5", config=mock_config) diff --git a/orchestration/_tests/test_globus_flow.py b/orchestration/_tests/test_globus_flow.py index 4e424bad..6459815e 100644 --- a/orchestration/_tests/test_globus_flow.py +++ b/orchestration/_tests/test_globus_flow.py @@ -147,8 +147,8 @@ def __init__(self) -> None: MockSecret.for_endpoint("nersc832_alsdev_raw")), "nersc832_alsdev_scratch": MockEndpoint("mock_nersc832_alsdev_scratch_path", MockSecret.for_endpoint("nersc832_alsdev_scratch")), - "alcf832_raw": MockEndpoint("mock_alcf832_raw_path", MockSecret.for_endpoint("alcf832_raw")), - "alcf832_scratch": MockEndpoint("mock_alcf832_scratch_path", MockSecret.for_endpoint("alcf832_scratch")), + "alcf832_iri_raw": MockEndpoint("mock_alcf832_raw_path", MockSecret.for_endpoint("alcf832_iri_raw")), + "alcf832_iri_scratch": MockEndpoint("mock_alcf832_scratch_path", MockSecret.for_endpoint("alcf832_iri_scratch")), } # Mock apps @@ -163,8 +163,8 @@ def __init__(self) -> None: self.spot832 = self.endpoints["spot832"] self.data832 = self.endpoints["data832"] self.nersc832 = self.endpoints["nersc832"] - self.alcf832_raw = self.endpoints["alcf832_raw"] - self.alcf832_scratch = self.endpoints["alcf832_scratch"] + self.alcf832_iri_raw = self.endpoints["alcf832_iri_raw"] + self.alcf832_iri_scratch = self.endpoints["alcf832_iri_scratch"] self.data832_raw = self.endpoints["data832_raw"] self.data832_scratch = self.endpoints["data832_scratch"] self.nersc832_alsdev_scratch = self.endpoints["nersc832_alsdev_scratch"] @@ -247,8 +247,11 @@ def test_alcf_recon_flow(mocker: MockFixture): "nersc832_alsdev_pscratch_raw": mocker.MagicMock(), "nersc832_alsdev_pscratch_scratch": mocker.MagicMock(), "nersc832_alsdev_recon_scripts": mocker.MagicMock(), - "alcf832_raw": mocker.MagicMock(), - "alcf832_scratch": mocker.MagicMock(), + "alcf832_iri_raw": mocker.MagicMock(), + "alcf832_iri_scratch": mocker.MagicMock(), + "alcf832_synaps_raw": mocker.MagicMock(), + "alcf832_synaps_recon": mocker.MagicMock(), + "alcf832_synaps_segment": mocker.MagicMock(), } ) mocker.patch( @@ -298,10 +301,12 @@ def test_alcf_recon_flow(mocker: MockFixture): return_value=mock_transfer_controller ) - # 7) Patch schedule_pruning => skip real scheduling - mock_schedule_pruning = mocker.patch( - "orchestration.flows.bl832.alcf.schedule_pruning", - return_value=True + # 7) Patch get_prune_controller(...) => skip real scheduling + mock_prune_controller = mocker.MagicMock() + mock_prune_controller.prune.return_value = True + mocker.patch( + "orchestration.flows.bl832.alcf.get_prune_controller", + return_value=mock_prune_controller ) file_path = "/global/raw/transfer_tests/test.h5" @@ -316,13 +321,13 @@ def test_alcf_recon_flow(mocker: MockFixture): assert mock_transfer_controller.copy.call_count == 3, "Should do 3 transfers in success path" mock_hpc_reconstruct.assert_called_once() mock_hpc_multires.assert_called_once() - mock_schedule_pruning.assert_called_once() + assert mock_prune_controller.prune.call_count == 5, "Should schedule 5 prune operations in success path" # Reset for next scenario mock_transfer_controller.copy.reset_mock() mock_hpc_reconstruct.reset_mock() mock_hpc_multires.reset_mock() - mock_schedule_pruning.reset_mock() + mock_prune_controller.prune.reset_mock() # # ---------- CASE 2: HPC reconstruction fails ---------- @@ -339,13 +344,13 @@ def test_alcf_recon_flow(mocker: MockFixture): assert mock_transfer_controller.copy.call_count == 1, ( "Should only do the first data832->alcf copy before HPC fails" ) - mock_schedule_pruning.assert_not_called() + mock_prune_controller.prune.assert_not_called() # Reset mock_transfer_controller.copy.reset_mock() mock_hpc_reconstruct.reset_mock() mock_hpc_multires.reset_mock() - mock_schedule_pruning.reset_mock() + mock_prune_controller.prune.reset_mock() # ---------- CASE 3: Tiff->Zarr fails ---------- mock_transfer_controller.copy.return_value = True @@ -360,13 +365,13 @@ def test_alcf_recon_flow(mocker: MockFixture): # HPC is done, so there's 2 successful transfer (data832->alcf). # We have not transferred tiff or zarr => total 2 copies assert mock_transfer_controller.copy.call_count == 2 - mock_schedule_pruning.assert_not_called() + mock_prune_controller.prune.assert_not_called() # Reset mock_transfer_controller.copy.reset_mock() mock_hpc_reconstruct.reset_mock() mock_hpc_multires.reset_mock() - mock_schedule_pruning.reset_mock() + mock_prune_controller.prune.reset_mock() # ---------- CASE 4: data832->ALCF fails immediately ---------- mock_transfer_controller.copy.return_value = False @@ -380,4 +385,4 @@ def test_alcf_recon_flow(mocker: MockFixture): mock_hpc_multires.assert_not_called() # The only call is the failing copy mock_transfer_controller.copy.assert_called_once() - mock_schedule_pruning.assert_not_called() + mock_prune_controller.prune.assert_not_called() diff --git a/orchestration/flows/bl832/alcf.py b/orchestration/flows/bl832/alcf.py index bdf96ac2..989fd7db 100644 --- a/orchestration/flows/bl832/alcf.py +++ b/orchestration/flows/bl832/alcf.py @@ -1,5 +1,4 @@ from concurrent.futures import Future -import datetime from pathlib import Path import time from typing import Optional @@ -12,8 +11,8 @@ from orchestration.flows.bl832.config import Config832 from orchestration.flows.bl832.job_controller import get_controller, HPC, TomographyHPCController +from orchestration.prune_controller import get_prune_controller, PruneMethod from orchestration.transfer_controller import get_transfer_controller, CopyMethod -from orchestration.prefect import schedule_prefect_flow class ALCFTomographyHPCController(TomographyHPCController): @@ -22,20 +21,24 @@ class ALCFTomographyHPCController(TomographyHPCController): There is a @staticmethod wrapper for each compute task submitted via Globus Compute. Also, there is a shared wait_for_globus_compute_future method that waits for the task to complete. - Args: - TomographyHPCController (ABC): Abstract class for tomography HPC controllers. + :param TomographyHPCController: Abstract class for tomography HPC controllers. """ def __init__( self, config: Config832 ) -> None: + """ + Initialize the ALCF Tomography HPC Controller. + + :param config: Configuration object for the controller. + """ super().__init__(config) # Load allocation root from the Prefect JSON block # The block must be registered with the name "alcf-allocation-root-path" logger = get_run_logger() allocation_data = Variable.get("alcf-allocation-root-path", _sync=True) - self.allocation_root = allocation_data.get("alcf-allocation-root-path") + self.allocation_root = allocation_data.get("alcf-allocation-root-path") # eagle/SYNAPS-I/ if not self.allocation_root: raise ValueError("Allocation root not found in JSON block 'alcf-allocation-root-path'") logger.info(f"Allocation root loaded: {self.allocation_root}") @@ -47,27 +50,30 @@ def reconstruct( """ Run tomography reconstruction at ALCF through Globus Compute. - Args: - file_path (str): Path to the file to be processed. - - Returns: - bool: True if the task completed successfully, False otherwise. + :param file_path : Path to the file to be processed. + :return: True if the task completed successfully, False otherwise. """ logger = get_run_logger() file_name = Path(file_path).stem + ".h5" folder_name = Path(file_path).parent.name - iri_als_bl832_rundir = f"{self.allocation_root}/data/raw" - iri_als_bl832_recon_script = f"{self.allocation_root}/scripts/globus_reconstruction.py" + rundir = f"{self.allocation_root}/data/bl832/raw" + recon_script = f"{self.allocation_root}/reconstruction/scripts/globus_reconstruction_multinode.py" gcc = Client(code_serialization_strategy=CombinedCode()) - with Executor(endpoint_id=Secret.load("globus-compute-endpoint").get(), client=gcc) as fxe: + endpoint_id = Variable.get( + "alcf-globus-compute-recon-uuid", + default="4953017e-6127-4587-9ee3-b71db7623122", + _sync=True + ) + + with Executor(endpoint_id=endpoint_id, client=gcc) as fxe: logger.info(f"Running Tomopy reconstruction on {file_name} at ALCF") future = fxe.submit( - self._reconstruct_wrapper, - iri_als_bl832_rundir, - iri_als_bl832_recon_script, + self._reconstruct_wrapper_multinode, + rundir, + recon_script, file_name, folder_name ) @@ -76,22 +82,19 @@ def reconstruct( @staticmethod def _reconstruct_wrapper( - rundir: str = "/eagle/IRIProd/ALS/data/raw", - script_path: str = "/eagle/IRIProd/ALS/scripts/globus_reconstruction.py", + rundir: str = "/eagle/SYNAPS-I/data/bl832/raw", + script_path: str = "/eagle/SYNAPS-I/reconstruction/scripts/globus_reconstruction.py", h5_file_name: str = None, folder_path: str = None ) -> str: """ Python function that wraps around the application call for Tomopy reconstruction on ALCF - Args: - rundir (str): the directory on the eagle file system (ALCF) where the input data are located - script_path (str): the path to the script that will run the reconstruction - h5_file_name (str): the name of the h5 file to be reconstructed - folder_path (str): the path to the folder containing the h5 file - - Returns: - str: confirmation message + :param rundir: the directory on the eagle file system (ALCF) where the input data are located + :param script_path: the path to the script that will run the reconstruction + :param h5_file_name: the name of the h5 file to be reconstructed + :param folder_path: the path to the folder containing the h5 file + :return: confirmation message """ import os import subprocess @@ -115,6 +118,126 @@ def _reconstruct_wrapper( f"{recon_res}" ) + @staticmethod + def _reconstruct_wrapper_multinode( + rundir: str, + script_path: str, + h5_file_name: str, + folder_path: str, + node_list: list[str] = None, # Pass explicitly + num_nodes: int = 8, + ) -> str: + """ + Wrapper function to run Tomopy reconstruction using mpiexec on ALCF across multiple nodes. + + :param rundir: the directory on the eagle file system (ALCF) where the input data are located + :param script_path: the path to the script that will run the reconstruction + :param h5_file_name: the name of the h5 file to be reconstructed + :param folder_path: the path to the folder containing the h5 file + :param node_list: list of nodes to use for reconstruction (if None, will attempt to read from PBS_NODEFILE) + :param num_nodes: number of nodes to use for reconstruction (used if node_list is None) + :return: confirmation message + """ + import os + import subprocess + import time + import h5py + import tempfile + + rec_start = time.time() + os.chdir(rundir) + + # If node_list not provided, try PBS_NODEFILE + if node_list is None: + pbs_nodefile = os.environ.get("PBS_NODEFILE") + if pbs_nodefile and os.path.exists(pbs_nodefile): + with open(pbs_nodefile, 'r') as f: + all_lines = [line.strip() for line in f if line.strip()] + node_list = list(dict.fromkeys(all_lines)) + else: + # Fallback: get nodes from PBS_NODENUM or assume localhost + node_list = ["localhost"] + + num_nodes = len(node_list) + print("=== RECON DEBUG ===") + print(f"Using {num_nodes} nodes: {node_list}") + + # Read number of slices + h5_path = f"{rundir}/{folder_path}/{h5_file_name}" + with h5py.File(h5_path, 'r') as f: + num_slices = f['/exchange/data'].shape[1] + + print(f"Total slices: {num_slices}") + slices_per_node = num_slices // num_nodes + + venv_path = "/eagle/SYNAPS-I/reconstruction/env/tomopy" + + # Critical: Set environment variables BEFORE the conda activation + env_setup = ( + "export TMPDIR=/tmp && " + "export NUMEXPR_MAX_THREADS=64 && " + "export NUMEXPR_NUM_THREADS=64 && " + "export OMP_NUM_THREADS=64 && " + "export MKL_NUM_THREADS=64 && " + "module use /soft/modulefiles && " + "module load conda && " + "source $(conda info --base)/etc/profile.d/conda.sh && " + f"conda activate {venv_path} && " + f"cd {rundir} && " + ) + + procs = [] + temp_hostfiles = [] + + for i, node in enumerate(node_list): + sino_start = i * slices_per_node + sino_end = num_slices if i == num_nodes - 1 else (i + 1) * slices_per_node + + cmd = f"python {script_path} {h5_file_name} {folder_path} {sino_start} {sino_end}" + + with tempfile.NamedTemporaryFile(mode='w', delete=False, suffix='.hosts') as f: + f.write(node + '\n') + temp_hostfile = f.name + temp_hostfiles.append(temp_hostfile) + + # Use --cpu-bind to ensure proper CPU affinity + full_cmd = [ + "mpiexec", + "-n", "1", + "-ppn", "1", + "--cpu-bind", "depth", + "-d", "64", # depth=64 cores per rank + "-hostfile", temp_hostfile, + "bash", "-c", env_setup + cmd + ] + + print(f"Launching on {node}: slices {sino_start}-{sino_end}") + proc = subprocess.Popen(full_cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE) + procs.append((proc, node, sino_start, sino_end)) + + # Wait and collect results + failed = [] + for proc, node, sino_start, sino_end in procs: + stdout, stderr = proc.communicate() + if proc.returncode != 0: + print(f"FAILED on {node} (slices {sino_start}-{sino_end})") + print(f"STDERR: {stderr.decode()[-2000:]}") + failed.append(node) + else: + print(f"SUCCESS on {node} (slices {sino_start}-{sino_end})") + + # Cleanup + for hf in temp_hostfiles: + try: + os.remove(hf) + except OSError: + pass + + if failed: + raise RuntimeError(f"Reconstruction failed on nodes: {failed}") + + return f"Reconstructed {h5_file_name} across {num_nodes} nodes in {time.time() - rec_start:.1f}s" + def build_multi_resolution( self, file_path: str = "", @@ -122,11 +245,8 @@ def build_multi_resolution( """ Tiff to Zarr code that is executed using Globus Compute - Args: - file_path (str): Path to the file to be processed. - - Returns: - bool: True if the task completed successfully, False otherwise. + :param file_path: Path to the file to be processed. + :return: True if the task completed successfully, False otherwise. """ logger = get_run_logger() @@ -163,13 +283,11 @@ def _build_multi_resolution_wrapper( """ Python function that wraps around the application call for Tiff to Zarr on ALCF - Args: - rundir (str): the directory on the eagle file system (ALCF) where the input data are located - script_path (str): the path to the script that will convert the tiff files to zarr - recon_path (str): the path to the reconstructed data - raw_path (str): the path to the raw data - Returns: - str: confirmation message + :param rundir: the directory on the eagle file system (ALCF) where the input data are located + :param script_path: the path to the script that will convert the tiff files to zarr + :param recon_path: the path to the reconstructed data + :param raw_path: the path to the raw data + :return: confirmation message """ import os import subprocess @@ -185,24 +303,504 @@ def _build_multi_resolution_wrapper( f"Converted tiff files to zarr;\n {zarr_res}" ) + def segmentation_sam3( + self, + recon_folder_path: str = "", + ) -> bool: + """ + Run tomography segmentation at ALCF through Globus Compute. + + :param recon_folder_path: Path to the reconstructed data folder to be processed. + :return: True if the task completed successfully, False otherwise. + """ + logger = get_run_logger() + + # Operate on reconstructed data + # Input: folder_name/rec20211222_125057_petiole4/ + # Output should go to: folder_name/seg20211222_125057_petiole4/ + + rundir = f"{self.allocation_root}/data/bl832/scratch/reconstruction/{recon_folder_path}" + output_folder = recon_folder_path.replace('/rec', '/seg') + seg_base = f"{self.allocation_root}/data/bl832/scratch/segmentation/{output_folder}" + output_dir = f"{seg_base}/sam3" # SAM3 writes class folders directly here + gcc = Client(code_serialization_strategy=CombinedCode()) + + endpoint_id = Variable.get( + "alcf-globus-compute-seg-uuid", + default="168c595b-9493-42db-9c6a-aad960913de2", + _sync=True + ) + + segmentation_module = "src.inference_v6" + workdir = f"{self.allocation_root}/segmentation/scripts/inference_latest/forge_feb_seg_model_demo" + + with Executor(endpoint_id=endpoint_id, client=gcc) as fxe: + logger.info(f"Running segmentation on {recon_folder_path} at ALCF") + future = fxe.submit( + self._segmentation_sam3_wrapper, + input_dir=rundir, + output_dir=output_dir, + script_module=segmentation_module, + workdir=workdir + ) + result = self._wait_for_globus_compute_future(future, "segmentation", check_interval=10) + + return result + + def segmentation_dino( + self, + recon_folder_path: str = "", + ) -> bool: + """ + Run tomography segmentation at ALCF through Globus Compute. + + :param recon_folder_path: Path to the reconstructed data folder to be processed. + :return: True if the task completed successfully, False otherwise. + """ + logger = get_run_logger() + + # Operate on reconstructed data + # Input: folder_name/rec20211222_125057_petiole4/ + # Output should go to: folder_name/seg20211222_125057_petiole4/ + + rundir = f"{self.allocation_root}/data/bl832/scratch/reconstruction/{recon_folder_path}" + output_folder = recon_folder_path.replace('/rec', '/seg') + seg_base = f"{self.allocation_root}/data/bl832/scratch/segmentation/{output_folder}" + output_dir = f"{seg_base}/dino" # DINO writes class folders directly here + + gcc = Client(code_serialization_strategy=CombinedCode()) + + endpoint_id = Variable.get( + "alcf-globus-compute-seg-dino-uuid", + default="07b24393-f649-4f6b-8860-1bfb211d17f4", + _sync=True + ) + + segmentation_module = "src.inference_dino_v1" + workdir = f"{self.allocation_root}/segmentation/scripts/inference_latest/forge_feb_seg_model_demo" + + with Executor(endpoint_id=endpoint_id, client=gcc) as fxe: + logger.info(f"Running segmentation on {recon_folder_path} at ALCF") + future = fxe.submit( + self._segmentation_dino_wrapper, + input_dir=rundir, + output_dir=output_dir, + script_module=segmentation_module, + workdir=workdir + ) + result = self._wait_for_globus_compute_future(future, "segmentation_dino", check_interval=10) + + return result + + @staticmethod + def _segmentation_sam3_wrapper( + input_dir: str = "/eagle/SYNAPS-I/data/bl832/scratch/reconstruction/", + output_dir: str = "/eagle/SYNAPS-I/data/bl832/scratch/segmentation/", + script_module: str = "src.inference_v6", + workdir: str = "/eagle/SYNAPS-I/segmentation/scripts/inference_latest/forge_feb_seg_model_demo", + nproc_per_node: int = 4, + patch_size: int = 1000, + overlap_ratio: float = 0.5, + batch_size: int = 8, + confidence: float = 0.5, + prompts: list[str] = ['Phloem Fibers', 'Hydrated Xylem vessels', 'Air-based Pith cells', 'Dehydrated Xylem vessels'], + bpe_path: str = "/eagle/SYNAPS-I/segmentation/sam3_finetune/sam3/bpe_simple_vocab_16e6.txt.gz", + finetuned_checkpoint: str = "/eagle/SYNAPS-I/segmentation/sam3_finetune/sam3/checkpoint_v6.pt", + original_checkpoint: str = "/eagle/SYNAPS-I/segmentation/sam3_finetune/sam3/sam3.pt", + use_finetuned: bool = True, + skip_existing: bool = False, + ) -> str: + """ + Wrapper function to run segmentation using torch.distributed.run on ALCF. + + :param input_dir: Directory containing input data for segmentation. + :param output_dir: Directory to save segmentation outputs. + :param script_module: Python module containing the segmentation code to run. + :param workdir: Working directory for the segmentation script. + :param nproc_per_node: Number of processes per node for distributed training. + :param patch_size: Patch size for segmentation. + :param overlap_ratio: Overlap ratio for patch-based segmentation. + :param batch_size: Batch size for segmentation. + :param confidence: Confidence threshold for segmentation. + :param prompts: List of class prompts for segmentation. + :param bpe_path: Path to the BPE vocab file for SAM. + :param finetuned_checkpoint: Path to the finetuned SAM checkpoint. + :param original_checkpoint: Path to the original SAM checkpoint. + :param use_finetuned: Whether to use the finetuned checkpoint or not. + :param skip_existing: Whether to skip segmentation for patches that already have outputs. + :return: Confirmation message upon completion. + """ + import os + import subprocess + import time + + seg_start = time.time() + os.chdir(workdir) + + # Get PBS info + pbs_nodefile = os.environ.get("PBS_NODEFILE") + pbs_jobid = os.environ.get("PBS_JOBID", "12345") + + print("=== PBS DEBUG ===") + print(f"PBS_NODEFILE: {pbs_nodefile}") + print(f"PBS_JOBID: {pbs_jobid}") + + if pbs_nodefile and os.path.exists(pbs_nodefile): + with open(pbs_nodefile, 'r') as f: + all_lines = [line.strip() for line in f if line.strip()] + unique_nodes = list(dict.fromkeys(all_lines)) + actual_nnodes = len(unique_nodes) + master_addr = unique_nodes[0] + print(f"PBS_NODEFILE contents: {all_lines}") + print(f"Unique nodes ({actual_nnodes}): {unique_nodes}") + print(f"Master: {master_addr}") + else: + actual_nnodes = 1 + master_addr = "localhost" + print("No PBS_NODEFILE, single node mode") + + venv_path = "/eagle/SYNAPS-I/segmentation/env" + + # Build command as a list (no shell escaping needed) + cmd_list = [ + f"{venv_path}/bin/python", "-m", "torch.distributed.run", + f"--nnodes={actual_nnodes}", + f"--nproc_per_node={nproc_per_node}", + f"--rdzv_id={pbs_jobid}", + "--rdzv_backend=c10d", + f"--rdzv_endpoint={master_addr}:29500", + "-m", script_module, + "--input-dir", input_dir, + "--output-dir", output_dir, + "--patch-size", str(patch_size), + "--overlap-ratio", str(overlap_ratio), + "--batch-size", str(batch_size), + "--confidence", str(confidence), + "--bpe-path", bpe_path, + "--prompts", + ] + + # Add prompts directly - no quotes needed with list-based subprocess + cmd_list.extend(prompts) + + if use_finetuned: + cmd_list.extend([ + "--finetuned-checkpoint", finetuned_checkpoint, + "--original-checkpoint", original_checkpoint, + ]) + else: + cmd_list.extend(["--original-checkpoint", original_checkpoint]) + + if skip_existing: + cmd_list.append("--skip-existing") + + # Environment variables + env = os.environ.copy() + env.update({ + "PATH": f"{venv_path}/bin:{env.get('PATH', '')}", + "HF_HUB_CACHE": "/eagle/SYNAPS-I/segmentation/.cache/huggingface", + "HF_HOME": "/eagle/SYNAPS-I/segmentation/.cache/huggingface", + "CUDA_DEVICE_ORDER": "PCI_BUS_ID", + "NCCL_NET_GDR_LEVEL": "PHB", + "NCCL_CROSS_NIC": "1", + "NCCL_COLLNET_ENABLE": "1", + "NCCL_NET": "AWS Libfabric", + "FI_CXI_DISABLE_HOST_REGISTER": "1", + "FI_MR_CACHE_MONITOR": "userfaultfd", + "FI_CXI_DEFAULT_CQ_SIZE": "131072", + }) + + # Prepend to LD_LIBRARY_PATH + ld_path = env.get("LD_LIBRARY_PATH", "") + env["LD_LIBRARY_PATH"] = f"/soft/libraries/aws-ofi-nccl/v1.9.1-aws/lib:/soft/libraries/hwloc/lib/:{ld_path}" + + if actual_nnodes > 1: + # Use mpiexec to launch on all nodes + command = [ + "mpiexec", + "-n", str(actual_nnodes), + "-ppn", "1", + "-hostfile", pbs_nodefile, + "--cpu-bind", "depth", + "-d", "16", + ] + cmd_list + else: + command = cmd_list + + print(f"Running: {' '.join(command)}") + + result = subprocess.run(command, env=env, cwd=workdir, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True) + print(f"STDOUT: {result.stdout[-3000:] if result.stdout else 'None'}") + print(f"STDERR: {result.stderr[-3000:] if result.stderr else 'None'}") + + if result.returncode != 0: + raise RuntimeError( + f"Segmentation failed: {result.returncode}\n" + f"STDERR: {result.stderr[-2000:] if result.stderr else 'None'}" + ) + + return f"Completed in {time.time() - seg_start:.1f}s" + + @staticmethod + def _segmentation_dino_wrapper( + input_dir: str = "/eagle/SYNAPS-I/data/bl832/scratch/reconstruction/", + output_dir: str = "/eagle/SYNAPS-I/data/bl832/scratch/segmentation/", + finetuned_checkpoint: str = "/eagle/SYNAPS-I/segmentation/dino/best.ckpt", + save_overlay: bool = True, + batch_size: int = 4, + num_workers: int = 4, + nproc_per_node: int = 4, + workdir: str = "/eagle/SYNAPS-I/segmentation/scripts/inference_latest/forge_feb_seg_model_demo", + script_module: str = "src.inference_dino_v1", + ) -> str: + """ + Wrapper function to run segmentation using the DINO model on ALCF. + + :param input_dir: Directory containing input data for segmentation. + :param output_dir: Directory to save segmentation outputs. + :param finetuned_checkpoint: Path to the finetuned DINO model checkpoint. + :param save_overlay: Whether to save overlay visualizations of the segmentation. + :param batch_size: Batch size for segmentation. + :param num_workers: Number of worker processes for data loading. + :param nproc_per_node: Number of processes per node for distributed training. + :param workdir: Working directory for the segmentation script. + :return: Confirmation message upon completion. + """ + import os + import subprocess + import time + + seg_start = time.time() + os.chdir(workdir) + + # Get PBS info + pbs_nodefile = os.environ.get("PBS_NODEFILE") + pbs_jobid = os.environ.get("PBS_JOBID", "12345") + + print("=== PBS DEBUG ===") + print(f"PBS_NODEFILE: {pbs_nodefile}") + print(f"PBS_JOBID: {pbs_jobid}") + + if pbs_nodefile and os.path.exists(pbs_nodefile): + with open(pbs_nodefile, 'r') as f: + all_lines = [line.strip() for line in f if line.strip()] + unique_nodes = list(dict.fromkeys(all_lines)) + actual_nnodes = len(unique_nodes) + master_addr = unique_nodes[0] + print(f"PBS_NODEFILE contents: {all_lines}") + print(f"Unique nodes ({actual_nnodes}): {unique_nodes}") + print(f"Master: {master_addr}") + else: + actual_nnodes = 1 + master_addr = "localhost" + print("No PBS_NODEFILE, single node mode") + + venv_path = "/eagle/SYNAPS-I/segmentation/env_dino_cellpose" + + # Build command as a list + cmd_list = [ + f"{venv_path}/bin/python", "-m", "torch.distributed.run", + f"--nnodes={actual_nnodes}", + f"--nproc_per_node={nproc_per_node}", + f"--rdzv_id={pbs_jobid}", + "--rdzv_backend=c10d", + f"--rdzv_endpoint={master_addr}:29500", + "-m", script_module, + "--input-dir", input_dir, + "--output-dir", output_dir, + "--batch-size", str(batch_size), + "--finetuned-checkpoint", finetuned_checkpoint, + "--save-overlay", + ] + + # Environment variables + env = os.environ.copy() + env.update({ + "PATH": f"{venv_path}/bin:{env.get('PATH', '')}", + "HF_HUB_CACHE": "/eagle/SYNAPS-I/segmentation/.cache/huggingface", + "HF_HOME": "/eagle/SYNAPS-I/segmentation/.cache/huggingface", + "CUDA_DEVICE_ORDER": "PCI_BUS_ID", + "NCCL_NET_GDR_LEVEL": "PHB", + "NCCL_CROSS_NIC": "1", + "NCCL_COLLNET_ENABLE": "1", + "NCCL_NET": "AWS Libfabric", + "FI_CXI_DISABLE_HOST_REGISTER": "1", + "FI_MR_CACHE_MONITOR": "userfaultfd", + "FI_CXI_DEFAULT_CQ_SIZE": "131072", + }) + + # Prepend to LD_LIBRARY_PATH + ld_path = env.get("LD_LIBRARY_PATH", "") + env["LD_LIBRARY_PATH"] = f"/soft/libraries/aws-ofi-nccl/v1.9.1-aws/lib:/soft/libraries/hwloc/lib/:{ld_path}" + + if actual_nnodes > 1: + # Use mpiexec to launch on all nodes + command = [ + "mpiexec", + "-n", str(actual_nnodes), + "-ppn", "1", + "-hostfile", pbs_nodefile, + "--cpu-bind", "depth", + "-d", "16", + ] + cmd_list + else: + command = cmd_list + + print(f"Running: {' '.join(command)}") + + result = subprocess.run(command, env=env, cwd=workdir, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True) + print(f"STDOUT: {result.stdout[-3000:] if result.stdout else 'None'}") + print(f"STDERR: {result.stderr[-3000:] if result.stderr else 'None'}") + + if result.returncode != 0: + raise RuntimeError( + f"Segmentation failed: {result.returncode}\n" + f"STDERR: {result.stderr[-2000:] if result.stderr else 'None'}" + ) + + return f"DINO Segmentation completed in {time.time() - seg_start:.1f}s" + + def combine_segmentations( + self, + recon_folder_path: str = "", + ) -> bool: + """ + Run CPU-based combination of Cellpose+DINO and SAM3+DINO segmentation results at ALCF + through Globus Compute. + + :param recon_folder_path: Path to the reconstructed data folder (e.g. 'folder/rec20250101_scan/') + :return: True if the task completed successfully, False otherwise. + """ + logger = get_run_logger() + + output_folder = recon_folder_path.replace("/rec", "/seg") + seg_base = f"{self.allocation_root}/data/bl832/scratch/segmentation/{output_folder}" + + input_dir = f"{self.allocation_root}/data/bl832/scratch/reconstruction/{recon_folder_path}" + sam3_results = f"{seg_base}/sam3" + dino_results = f"{seg_base}/dino" + combined_output = f"{seg_base}/combined" + + workdir = f"{self.allocation_root}/segmentation/scripts/inference_latest/forge_feb_seg_model_demo" + + gcc = Client(code_serialization_strategy=CombinedCode()) + + endpoint_id = Variable.get( + "alcf-globus-compute-seg-combine-uuid", + default="4aae6420-3724-4df7-8884-81ff6c4c4381", + _sync=True + ) + + with Executor(endpoint_id=endpoint_id, client=gcc) as fxe: + logger.info(f"Running segmentation combination on {recon_folder_path} at ALCF") + future = fxe.submit( + self._combine_segmentations_wrapper, + input_dir=input_dir, + dino_results=dino_results, + sam3_results=sam3_results, + combined_output=combined_output, + workdir=workdir, + dilate_px=5 + ) + result = self._wait_for_globus_compute_future(future, "combine_segmentations", check_interval=10) + + return result + + @staticmethod + def _combine_segmentations_wrapper( + input_dir: str = "/eagle/SYNAPS-I/data/bl832/scratch/reconstruction/", + dino_results: str = "/eagle/SYNAPS-I/data/bl832/scratch/segmentation/dino", + sam3_results: str = "/eagle/SYNAPS-I/data/bl832/scratch/segmentation/sam3", + combined_output: str = "/eagle/SYNAPS-I/data/bl832/scratch/segmentation/combined", + workdir: str = "/eagle/SYNAPS-I/segmentation/scripts/inference_latest/forge_feb_seg_model_demo", + dilate_px: int = 5, + ) -> str: + """ + Wrapper function to combine segmentation results from SAM+DINO. + + :param input_dir: Directory containing input data for segmentation. + :param dino_results: Directory containing DINO segmentation results. + :param sam3_results: Directory containing SAM3 segmentation results. + :param combined_output: Directory to save combined segmentation outputs. + :param workdir: Working directory for the combination script. + :param dilate_px: Number of pixels to dilate the SAM masks for better coverage in the combination step. + :return: Confirmation message upon completion. + """ + import os + import subprocess + import time + + combine_start = time.time() + os.chdir(workdir) + + venv_path = "/eagle/SYNAPS-I/segmentation/env_dino_cellpose" + + env = os.environ.copy() + env.update({ + "PATH": f"{venv_path}/bin:{env.get('PATH', '')}", + "HF_HUB_CACHE": "/eagle/SYNAPS-I/segmentation/.cache/huggingface", + "HF_HOME": "/eagle/SYNAPS-I/segmentation/.cache/huggingface", + }) + + tasks = [ + { + "name": "sam_dino", + "module": "src.combine_sam_dino_v3", + "args": [ + "--input-dir", input_dir, + "--instance-masks-dir", sam3_results, + "--semantic-masks-dir", f"{dino_results}/semantic_masks", + "--output-dir", f"{combined_output}/sam_dino", + "--dilate-px", str(dilate_px), + "--save-extracted", + "--dino-trust", "Cortex", "Phloem_Fibers", "Phloem", + "Air-based_Pith_cells", "Water-based_Pith_cells", + ], + }, + ] + + failed = [] + for combine_task in tasks: + cmd = [f"{venv_path}/bin/python", "-m", combine_task["module"]] + combine_task["args"] + print(f"Running {combine_task['name']}: {' '.join(cmd)}") + + result = subprocess.run( + cmd, + env=env, + cwd=workdir, + stdout=subprocess.PIPE, + stderr=subprocess.PIPE, + text=True, + ) + print(f"STDOUT [{combine_task['name']}]: {result.stdout[-2000:] if result.stdout else 'None'}") + print(f"STDERR [{combine_task['name']}]: {result.stderr[-2000:] if result.stderr else 'None'}") + + if result.returncode != 0: + print(f"FAILED [{combine_task['name']}]: return code {result.returncode}") + failed.append(combine_task["name"]) + else: + print(f"SUCCESS [{combine_task['name']}]") + + if failed: + raise RuntimeError(f"Segmentation combination failed for: {failed}") + + return f"Segmentation combination completed in {time.time() - combine_start:.1f}s" + @staticmethod def _wait_for_globus_compute_future( future: Future, task_name: str, check_interval: int = 20, - walltime: int = 1200 # seconds = 20 minutes + walltime: int = 3600 # seconds = 60 minutes ) -> bool: """ Wait for a Globus Compute task to complete, assuming that if future.done() is False, the task is running. - Args: - future: The future object returned from the Globus Compute Executor submit method. - task_name: A descriptive name for the task being executed (used for logging). - check_interval: The interval (in seconds) between status checks. - walltime: The maximum time (in seconds) to wait for the task to complete. - - Returns: - bool: True if the task completed successfully within walltime, False otherwise. + :param future: The future object returned from the Globus Compute Executor submit method. + :param task_name: A descriptive name for the task being executed (used for logging). + :param check_interval: The interval (in seconds) between status checks. + :param walltime: The maximum time (in seconds) to wait for the task to complete. + :return: True if the task completed successfully within walltime, False otherwise. """ logger = get_run_logger() @@ -257,125 +855,185 @@ def _wait_for_globus_compute_future( return success -@task(name="schedule_prune_task") -def schedule_prune_task( - path: str, - location: str, - schedule_days: datetime.timedelta, - source_endpoint=None, - check_endpoint=None +@flow(name="alcf_recon_flow", flow_run_name="alcf_recon-{file_path}") +def alcf_recon_flow( + file_path: str, + config: Optional[Config832] = None, ) -> bool: """ - Schedules a Prefect flow to prune files from a specified location. - - Args: - path (str): The file path to the folder containing the files. - location (str): The server location (e.g., 'alcf832_raw') where the files will be pruned. - schedule_days (int): The number of days after which the file should be deleted. - source_endpoint (str): The source endpoint for the files. - check_endpoint (str): The endpoint to check for the existence of the files. + Process and transfer a file from bl832 to ALCF and run reconstruction and segmentation. - Returns: - bool: True if the task was scheduled successfully, False otherwise. + :param file_path: The path to the file to be processed. + :param config: Configuration object for the flow. + :return: True if the flow completed successfully, False otherwise. """ logger = get_run_logger() - try: - flow_name = f"delete {location}: {Path(path).name}" - schedule_prefect_flow( - deployment_name=f"prune_{location}/prune_{location}", - flow_run_name=flow_name, - parameters={ - "relative_path": path, - "source_endpoint": source_endpoint, - "check_endpoint": check_endpoint - }, - duration_from_now=schedule_days - ) - return True - except Exception as e: - logger.error(f"Failed to schedule prune task: {e}") - return False + if config is None: + config = Config832() + # set up file paths + path = Path(file_path) + folder_name = path.parent.name + file_name = path.stem + h5_file_name = file_name + '.h5' + scratch_path_tiff = folder_name + '/rec' + file_name + '/' + scratch_path_zarr = folder_name + '/rec' + file_name + '.zarr/' + # initialize transfer_controller with globus + logger.info("Initializing Globus Transfer Controller.") + transfer_controller = get_transfer_controller( + transfer_type=CopyMethod.GLOBUS, + config=config + ) -@task(name="schedule_pruning") -def schedule_pruning( - alcf_raw_path: str = None, - alcf_scratch_path_tiff: str = None, - alcf_scratch_path_zarr: str = None, - nersc_scratch_path_tiff: str = None, - nersc_scratch_path_zarr: str = None, - data832_raw_path: str = None, - data832_scratch_path_tiff: str = None, - data832_scratch_path_zarr: str = None, - one_minute: bool = False, - config: Config832 = None -) -> bool: - """ - This function schedules the deletion of files from specified locations on ALCF, NERSC, and data832. - - Args: - alcf_raw_path (str, optional): The raw path of the h5 file on ALCF. - alcf_scratch_path_tiff (str, optional): The scratch path for TIFF files on ALCF. - alcf_scratch_path_zarr (str, optional): The scratch path for Zarr files on ALCF. - nersc_scratch_path_tiff (str, optional): The scratch path for TIFF files on NERSC. - nersc_scratch_path_zarr (str, optional): The scratch path for Zarr files on NERSC. - data832_scratch_path (str, optional): The scratch path on data832. - one_minute (bool, optional): Defaults to False. Whether to schedule the deletion after one minute. - config (Config832, optional): Configuration object for the flow. - - Returns: - bool: True if the tasks were scheduled successfully, False otherwise. - """ - logger = get_run_logger() + alcf_reconstruction_success = False + alcf_multi_res_success = False + data832_tiff_transfer_success = False + data832_zarr_transfer_success = False - pruning_config = Variable.get("pruning-config", _sync=True) + # STEP 1: Transfer data from data832 to ALCF + logger.info("Copying raw data to ALCF.") + data832_raw_path = f"{folder_name}/{h5_file_name}" + alcf_transfer_success = transfer_controller.copy( + file_path=data832_raw_path, + source=config.data832_raw, + destination=config.alcf832_synaps_raw + ) + logger.info(f"Transfer status: {alcf_transfer_success}") - if one_minute: - alcf_delay = datetime.timedelta(minutes=1) - nersc_delay = datetime.timedelta(minutes=1) - data832_delay = datetime.timedelta(minutes=1) + if not alcf_transfer_success: + logger.error("Transfer failed due to configuration or authorization issues.") + raise ValueError("Transfer to ALCF Failed") else: - alcf_delay = datetime.timedelta(days=pruning_config["delete_alcf832_files_after_days"]) - nersc_delay = datetime.timedelta(days=pruning_config["delete_nersc832_files_after_days"]) - data832_delay = datetime.timedelta(days=pruning_config["delete_data832_files_after_days"]) - - # (path, location, days, source_endpoint, check_endpoint) - delete_schedules = [ - (alcf_raw_path, "alcf832_raw", alcf_delay, config.alcf832_raw, config.data832_raw), - (alcf_scratch_path_tiff, "alcf832_scratch", alcf_delay, config.alcf832_scratch, config.data832_scratch), - (alcf_scratch_path_zarr, "alcf832_scratch", alcf_delay, config.alcf832_scratch, config.data832_scratch), - (nersc_scratch_path_tiff, "nersc832_alsdev_scratch", nersc_delay, config.nersc832_alsdev_scratch, None), - (nersc_scratch_path_zarr, "nersc832_alsdev_scratch", nersc_delay, config.nersc832_alsdev_scratch, None), - (data832_raw_path, "data832_raw", data832_delay, config.data832_raw, None), - (data832_scratch_path_tiff, "data832_scratch", data832_delay, config.data832_scratch, None), - (data832_scratch_path_zarr, "data832_scratch", data832_delay, config.data832_scratch, None) - ] - - for path, location, days, source_endpoint, check_endpoint in delete_schedules: - if path: - schedule_prune_task(path, location, days, source_endpoint, check_endpoint) - logger.info(f"Scheduled delete from {location} at {days} days") + logger.info("Transfer to ALCF Successful.") + + # STEP 2: Run Tomopy Reconstruction on Globus Compute + logger.info(f"Starting ALCF reconstruction flow for {file_path=}") + + # Initialize the Tomography Controller and run the reconstruction + logger.info("Initializing ALCF Tomography HPC Controller.") + tomography_controller = get_controller( + hpc_type=HPC.ALCF, + config=config + ) + logger.info(f"Starting ALCF reconstruction task for {file_path=}") + alcf_reconstruction_success = tomography_controller.reconstruct( + file_path=file_path, + ) + if not alcf_reconstruction_success: + logger.error("Reconstruction Failed.") + raise ValueError("Reconstruction at ALCF Failed") else: - logger.info(f"Path not provided for {location}, skipping scheduling of deletion task.") + logger.info("Reconstruction Successful.") - return True + # STEP 3: Send reconstructed data (tiff) to data832 + logger.info(f"Transferring {file_name} from {config.alcf832_synaps_recon} " + f"at ALCF to {config.data832_scratch} at data832") + data832_tiff_transfer_success = transfer_controller.copy( + file_path=scratch_path_tiff, + source=config.alcf832_synaps_recon, + destination=config.data832_scratch + ) + logger.info(f"Transfer reconstructed TIFF data to data832 success: {data832_tiff_transfer_success}") + # STEP 4: Run the Tiff to Zarr Globus Flow + logger.info(f"Starting ALCF tiff to zarr flow for {file_path=}") + alcf_multi_res_success = tomography_controller.build_multi_resolution( + file_path=file_path, + ) + if not alcf_multi_res_success: + logger.error("Tiff to Zarr Failed.") + raise ValueError("Tiff to Zarr at ALCF Failed") + else: + logger.info("Tiff to Zarr Successful.") + # STEP 5: Send reconstructed data (zarr) to data832 + logger.info(f"Transferring {file_name} from {config.alcf832_synaps_recon} " + f"at ALCF to {config.data832_scratch} at data832") + data832_zarr_transfer_success = transfer_controller.copy( + file_path=scratch_path_zarr, + source=config.alcf832_synaps_recon, + destination=config.data832_scratch + ) -@flow(name="alcf_recon_flow", flow_run_name="alcf_recon-{file_path}") -def alcf_recon_flow( + # Place holder in case we want to transfer to NERSC for long term storage + # nersc_transfer_success = False + + # STEP 6: Schedule Pruning of files + logger.info("Scheduling file pruning tasks.") + prune_controller = get_prune_controller( + prune_type=PruneMethod.GLOBUS, + config=config + ) + + # Prune from ALCF raw + if alcf_transfer_success: + logger.info("Scheduling pruning of ALCF raw data.") + prune_controller.prune( + file_path=data832_raw_path, + source_endpoint=config.alcf832_synaps_raw, + check_endpoint=None, + days_from_now=2.0 + ) + + # Prune TIFFs from ALCF scratch/reconstruction + if alcf_reconstruction_success: + logger.info("Scheduling pruning of ALCF scratch reconstruction data.") + prune_controller.prune( + file_path=scratch_path_tiff, + source_endpoint=config.alcf832_synaps_recon, + check_endpoint=config.data832_scratch, + days_from_now=2.0 + ) + + # Prune ZARR from ALCF scratch/reconstruction + if alcf_multi_res_success: + logger.info("Scheduling pruning of ALCF scratch zarr reconstruction data.") + prune_controller.prune( + file_path=scratch_path_zarr, + source_endpoint=config.alcf832_synaps_recon, + check_endpoint=config.data832_scratch, + days_from_now=2.0 + ) + + # Prune reconstructed TIFFs from data832 scratch + if data832_tiff_transfer_success: + logger.info("Scheduling pruning of data832 scratch reconstruction TIFF data.") + prune_controller.prune( + file_path=scratch_path_tiff, + source_endpoint=config.data832_scratch, + check_endpoint=None, + days_from_now=30.0 + ) + + # Prune reconstructed ZARR from data832 scratch + if data832_zarr_transfer_success: + logger.info("Scheduling pruning of data832 scratch reconstruction ZARR data.") + prune_controller.prune( + file_path=scratch_path_zarr, + source_endpoint=config.data832_scratch, + check_endpoint=None, + days_from_now=30.0 + ) + + # TODO: ingest to scicat + + if alcf_reconstruction_success and alcf_multi_res_success: + return True + else: + return False + + +@flow(name="alcf_forge_recon_segment_flow", flow_run_name="alcf_recon_seg-{file_path}") +def alcf_forge_recon_segment_flow( file_path: str, config: Optional[Config832] = None, ) -> bool: """ - Process and transfer a file from a source to the ALCF. - - Args: - file_path (str): The path to the file to be processed. - config (Config832): Configuration object for the flow. + Process and transfer a file from bl832 to ALCF and run reconstruction and segmentation. - Returns: - bool: True if the flow completed successfully, False otherwise. + :param file_path: The path to the file to be processed. + :param config: Configuration object for the flow. + :return: True if the flow completed successfully, False otherwise. """ logger = get_run_logger() @@ -387,21 +1045,27 @@ def alcf_recon_flow( file_name = path.stem h5_file_name = file_name + '.h5' scratch_path_tiff = folder_name + '/rec' + file_name + '/' - scratch_path_zarr = folder_name + '/rec' + file_name + '.zarr/' + scratch_path_segment = folder_name + '/seg' + file_name + '/' # initialize transfer_controller with globus + logger.info("Initializing Globus Transfer Controller.") transfer_controller = get_transfer_controller( transfer_type=CopyMethod.GLOBUS, config=config ) + alcf_reconstruction_success = False + alcf_segmentation_success = False + data832_tiff_transfer_success = False + segment_transfer_success = False + # STEP 1: Transfer data from data832 to ALCF - logger.info("Copying data to ALCF.") + logger.info("Copying raw data to ALCF.") data832_raw_path = f"{folder_name}/{h5_file_name}" alcf_transfer_success = transfer_controller.copy( file_path=data832_raw_path, source=config.data832_raw, - destination=config.alcf832_raw + destination=config.alcf832_synaps_raw ) logger.info(f"Transfer status: {alcf_transfer_success}") @@ -411,14 +1075,16 @@ def alcf_recon_flow( else: logger.info("Transfer to ALCF Successful.") - # STEP 2A: Run the Tomopy Reconstruction Globus Flow + # STEP 2: Run the Tomopy Reconstruction Globus Flow logger.info(f"Starting ALCF reconstruction flow for {file_path=}") # Initialize the Tomography Controller and run the reconstruction + logger.info("Initializing ALCF Tomography HPC Controller.") tomography_controller = get_controller( hpc_type=HPC.ALCF, config=config ) + logger.info(f"Starting ALCF reconstruction task for {file_path=}") alcf_reconstruction_success = tomography_controller.reconstruct( file_path=file_path, ) @@ -428,64 +1094,414 @@ def alcf_recon_flow( else: logger.info("Reconstruction Successful.") - # Transfer A: Send reconstructed data (tiff) to data832 - logger.info(f"Transferring {file_name} from {config.alcf832_scratch} " + # STEP 3: Send reconstructed data (tiff) to data832 + logger.info(f"Transferring {file_name} from {config.alcf832_synaps_recon} " f"at ALCF to {config.data832_scratch} at data832") data832_tiff_transfer_success = transfer_controller.copy( file_path=scratch_path_tiff, - source=config.alcf832_scratch, + source=config.alcf832_synaps_recon, destination=config.data832_scratch ) + logger.info(f"Transfer reconstructed TIFF data to data832 success: {data832_tiff_transfer_success}") - # STEP 2B: Run the Tiff to Zarr Globus Flow - logger.info(f"Starting ALCF tiff to zarr flow for {file_path=}") - alcf_multi_res_success = tomography_controller.build_multi_resolution( - file_path=file_path, + # STEP 4: Run the Segmentation Task at ALCF + logger.info(f"Starting ALCF segmentation task for {scratch_path_tiff=}") + alcf_segmentation_success = alcf_segmentation_sam3_task( + recon_folder_path=scratch_path_tiff, + config=config ) - if not alcf_multi_res_success: - logger.error("Tiff to Zarr Failed.") - raise ValueError("Tiff to Zarr at ALCF Failed") + if not alcf_segmentation_success: + logger.warning("Segmentation at ALCF Failed") else: - logger.info("Tiff to Zarr Successful.") - # Transfer B: Send reconstructed data (zarr) to data832 - logger.info(f"Transferring {file_name} from {config.alcf832_scratch} " + logger.info("Segmentation at ALCF Successful") + + # STEP 5: Send segmented data to data832 + logger.info(f"Transferring {file_name} from {config.alcf832_synaps_segment} " f"at ALCF to {config.data832_scratch} at data832") - data832_zarr_transfer_success = transfer_controller.copy( - file_path=scratch_path_zarr, - source=config.alcf832_scratch, + segment_transfer_success = transfer_controller.copy( + file_path=scratch_path_segment, + source=config.alcf832_synaps_segment, destination=config.data832_scratch ) + logger.info(f"Transfer segmented data to data832 success: {segment_transfer_success}") - # Place holder in case we want to transfer to NERSC for long term storage - nersc_transfer_success = False - - data832_tiff_transfer_success, data832_zarr_transfer_success, nersc_transfer_success - schedule_pruning( - alcf_raw_path=f"{folder_name}/{h5_file_name}" if alcf_transfer_success else None, - alcf_scratch_path_tiff=f"{scratch_path_tiff}" if alcf_reconstruction_success else None, - alcf_scratch_path_zarr=f"{scratch_path_zarr}" if alcf_multi_res_success else None, - nersc_scratch_path_tiff=f"{scratch_path_tiff}" if nersc_transfer_success else None, - nersc_scratch_path_zarr=f"{scratch_path_zarr}" if nersc_transfer_success else None, - data832_raw_path=f"{folder_name}/{h5_file_name}" if alcf_transfer_success else None, - data832_scratch_path_tiff=f"{scratch_path_tiff}" if data832_tiff_transfer_success else None, - data832_scratch_path_zarr=f"{scratch_path_zarr}" if data832_zarr_transfer_success else None, - one_minute=False, # Set to False for production durations + # STEP 6: Schedule Pruning of files + logger.info("Scheduling file pruning tasks.") + prune_controller = get_prune_controller( + prune_type=PruneMethod.GLOBUS, config=config ) + # Prune from ALCF raw + if alcf_transfer_success: + logger.info("Scheduling pruning of ALCF raw data.") + prune_controller.prune( + file_path=data832_raw_path, + source_endpoint=config.alcf832_synaps_raw, + check_endpoint=None, + days_from_now=2.0 + ) + + # Prune TIFFs from ALCF scratch/reconstruction + if alcf_reconstruction_success: + logger.info("Scheduling pruning of ALCF scratch reconstruction data.") + prune_controller.prune( + file_path=scratch_path_tiff, + source_endpoint=config.alcf832_synaps_recon, + check_endpoint=config.data832_scratch, + days_from_now=2.0 + ) + + # Prune TIFFs from ALCF scratch/segmentation + if alcf_segmentation_success: + logger.info("Scheduling pruning of ALCF scratch segmentation data.") + prune_controller.prune( + file_path=scratch_path_segment, + source_endpoint=config.alcf832_synaps_segment, + check_endpoint=config.data832_scratch, + days_from_now=2.0 + ) + + # Prune reconstructed TIFFs from data832 scratch + if data832_tiff_transfer_success: + logger.info("Scheduling pruning of data832 scratch reconstruction TIFF data.") + prune_controller.prune( + file_path=scratch_path_tiff, + source_endpoint=config.data832_scratch, + check_endpoint=None, + days_from_now=30.0 + ) + + # Prune segmented data from data832 scratch + if alcf_segmentation_success and segment_transfer_success: + logger.info("Scheduling pruning of data832 scratch segmentation data.") + prune_controller.prune( + file_path=scratch_path_segment, + source_endpoint=config.data832_scratch, + check_endpoint=None, + days_from_now=30.0 + ) + # TODO: ingest to scicat - if alcf_reconstruction_success and alcf_multi_res_success: + if alcf_reconstruction_success and alcf_segmentation_success: return True else: return False -if __name__ == "__main__": - folder_name = 'dabramov' - file_name = '20230606_151124_jong-seto_fungal-mycelia_roll-AQ_fungi1_fast' - flow_success = alcf_recon_flow( - file_path=f"/{folder_name}/{file_name}.h5", +@flow(name="alcf_forge_recon_multisegment_flow", + flow_run_name="alcf_recon_multiseg-{file_path}") +def alcf_forge_recon_multisegment_flow( + file_path: str, + config: Optional[Config832] = None, +) -> bool: + """ + Transfer raw data to ALCF, run multinode reconstruction synchronously, + then run SAM3 and DINO segmentation concurrently. + + :param file_path: Path to the raw .h5 file (relative), e.g. 'folder/20250101_scan.h5' + :param config: Optional Config832 instance. + :return: True if reconstruction and all segmentation tasks succeeded, False otherwise. + """ + logger = get_run_logger() + + if config is None: + config = Config832() + + path = Path(file_path) + folder_name = path.parent.name + file_name = path.stem + h5_file_name = file_name + ".h5" + scratch_path_tiff = folder_name + "/rec" + file_name + "/" + scratch_path_segment = folder_name + "/seg" + file_name + "/" + + # ── STEP 1: Transfer raw data to ALCF ──────────────────────────────────── + logger.info("Initializing Globus Transfer Controller.") + transfer_controller = get_transfer_controller( + transfer_type=CopyMethod.GLOBUS, + config=config + ) + + data832_raw_path = f"{folder_name}/{h5_file_name}" + logger.info(f"Transferring raw data to ALCF: {data832_raw_path}") + alcf_transfer_success = transfer_controller.copy( + file_path=data832_raw_path, + source=config.data832_raw, + destination=config.alcf832_synaps_raw + ) + + if not alcf_transfer_success: + logger.error("Transfer to ALCF failed. Aborting flow.") + raise ValueError("Transfer to ALCF Failed") + + logger.info("Transfer to ALCF successful.") + + # ── STEP 2: Multinode reconstruction (sync) ─────────────────────────────── + logger.info("Initializing ALCF Tomography HPC Controller.") + tomography_controller = get_controller( + hpc_type=HPC.ALCF, + config=config + ) + + logger.info(f"Starting multinode reconstruction for {file_path=}") + alcf_reconstruction_success = tomography_controller.reconstruct(file_path=file_path) + + if not alcf_reconstruction_success: + logger.error("Reconstruction failed. Aborting segmentation steps.") + raise ValueError("Reconstruction at ALCF Failed") + + logger.info("Reconstruction successful.") + + # ── STEP 3: Transfer reconstructed TIFFs back to data832 ───────────────── + logger.info(f"Transferring TIFFs from ALCF to data832: {scratch_path_tiff}") + data832_tiff_transfer_success = transfer_controller.copy( + file_path=scratch_path_tiff, + source=config.alcf832_synaps_recon, + destination=config.data832_scratch + ) + logger.info(f"TIFF transfer to data832: {data832_tiff_transfer_success}") + + # ── STEP 4: SAM3 / DINO concurrently ────────────────────────── + logger.info("Submitting SAM3 and DINO segmentation tasks concurrently.") + + sam3_future = alcf_segmentation_sam3_task.submit( + recon_folder_path=scratch_path_tiff, config=config + ) + dino_future = alcf_segmentation_dino_task.submit( + recon_folder_path=scratch_path_tiff, config=config + ) + + sam3_success = sam3_future.result() + dino_success = dino_future.result() + logger.info(f"Segmentation results — SAM3: {sam3_success}, DINO: {dino_success}") + + any_seg_success = any([sam3_success, dino_success]) + + # ── STEP 5: Combine segmentation results (sync, CPU) ───────────────────── + combine_success = False + + if dino_success and sam3_success: + logger.info("Running segmentation combination (SAM3+DINO).") + combine_success = tomography_controller.combine_segmentations( + recon_folder_path=scratch_path_tiff + ) + logger.info(f"Combination result: {combine_success}") + else: + logger.warning("Skipping combination: requires DINO plus SAM3.") + + # ── STEP 6: Transfer segmentation outputs to data832 ───────────────────── + segment_transfer_success = False + if any_seg_success: + logger.info(f"Transferring segmentation outputs from ALCF to data832: {scratch_path_segment}") + segment_transfer_success = transfer_controller.copy( + file_path=scratch_path_segment, + source=config.alcf832_synaps_segment, + destination=config.data832_scratch + ) + logger.info(f"Segmentation transfer to data832: {segment_transfer_success}") + + # ── STEP 7: Pruning ─────────────────────────────────────────────────────── + logger.info("Scheduling file pruning tasks.") + prune_controller = get_prune_controller( + prune_type=PruneMethod.GLOBUS, + config=config + ) + + prune_controller.prune( + file_path=data832_raw_path, + source_endpoint=config.alcf832_synaps_raw, + check_endpoint=None, + days_from_now=2.0 + ) + + prune_controller.prune( + file_path=scratch_path_tiff, + source_endpoint=config.alcf832_synaps_recon, + check_endpoint=config.data832_scratch, + days_from_now=2.0 + ) + + if any_seg_success: + prune_controller.prune( + file_path=scratch_path_segment, + source_endpoint=config.alcf832_synaps_segment, + check_endpoint=config.data832_scratch, + days_from_now=2.0 + ) + + if data832_tiff_transfer_success: + prune_controller.prune( + file_path=scratch_path_tiff, + source_endpoint=config.data832_scratch, + check_endpoint=None, + days_from_now=30.0 + ) + + if segment_transfer_success: + prune_controller.prune( + file_path=scratch_path_segment, + source_endpoint=config.data832_scratch, + check_endpoint=None, + days_from_now=30.0 + ) + + # TODO: ingest to scicat + + return alcf_reconstruction_success and any_seg_success + + +@task(name="alcf_segmentation_sam3_task") +def alcf_segmentation_sam3_task( + recon_folder_path: str, + config: Optional[Config832] = None, +) -> bool: + """ + Run segmentation task at ALCF. + + :param recon_folder_path: Path to the reconstructed data folder to be processed. + :param config: Configuration object for the flow. + :return: True if the task completed successfully, False otherwise. + """ + logger = get_run_logger() + if config is None: + logger.info("No config provided, using default Config832.") + config = Config832() + + # Initialize the Tomography Controller and run the segmentation + logger.info("Initializing ALCF Tomography HPC Controller.") + tomography_controller = get_controller( + hpc_type=HPC.ALCF, + config=config + ) + logger.info(f"Starting ALCF segmentation task for {recon_folder_path=}") + alcf_segmentation_success = tomography_controller.segmentation_sam3( + recon_folder_path=recon_folder_path, + ) + if not alcf_segmentation_success: + logger.error("Segmentation Failed.") + else: + logger.info("Segmentation Successful.") + return alcf_segmentation_success + + +@task(name="alcf_segmentation_dino_task") +def alcf_segmentation_dino_task( + recon_folder_path: str, + config: Optional[Config832] = None, +) -> bool: + """ + Run DINO segmentation task at ALCF. + + :param recon_folder_path: Path to the reconstructed data folder to be processed. + :param config: Configuration object for the flow. + :return: True if the task completed successfully, False otherwise. + """ + logger = get_run_logger() + if config is None: + config = Config832() + tomography_controller = get_controller(hpc_type=HPC.ALCF, config=config) + logger.info(f"Starting DINO segmentation task for {recon_folder_path=}") + success = tomography_controller.segmentation_dino(recon_folder_path=recon_folder_path) + logger.info(f"DINO segmentation {'successful' if success else 'failed'}.") + return success + + +@task(name="alcf_combine_segmentations_task") +def alcf_combine_segmentations_task( + recon_folder_path: str, + config: Optional[Config832] = None, +) -> bool: + """ + Run segmentation combination task at ALCF. + + :param recon_folder_path: Path to the reconstructed data folder to be processed. + :param config: Configuration object for the flow. + :return: True if the task completed successfully, False otherwise. + """ + logger = get_run_logger() + if config is None: + config = Config832() + tomography_controller = get_controller(hpc_type=HPC.ALCF, config=config) + logger.info(f"Starting combine segmentation task for {recon_folder_path=}") + success = tomography_controller.combine_segmentations(recon_folder_path=recon_folder_path) + logger.info(f"Combine segmentation {'successful' if success else 'failed'}.") + return success + + +@flow(name="alcf_segmentation_integration_test", flow_run_name="alcf_segmentation_integration_test") +def alcf_segmentation_integration_test() -> bool: + """ + Integration test for the ALCF segmentation task. + + :return: True if the segmentation task completed successfully, False otherwise. + """ + logger = get_run_logger() + logger.info("Starting ALCF segmentation integration test.") + recon_folder_path = 'DD-00842_hexemer/test_16' # 'rec20211222_125057_petiole4' # 'test' # + flow_success = alcf_segmentation_sam3_task( + recon_folder_path=recon_folder_path, + config=Config832() + ) + logger.info(f"Flow success: {flow_success}") + return flow_success + + +@flow(name="alcf_segmentation_dino_integration_test", flow_run_name="alcf_segmentation_dino_integration_test") +def alcf_segmentation_dino_integration_test() -> bool: + """ + Integration test for the ALCF DINO segmentation task. + + :return: True if the segmentation task completed successfully, False otherwise. + """ + logger = get_run_logger() + logger.info("Starting ALCF segmentation DINO integration test.") + recon_folder_path = 'DD-00842_hexemer/test_16' # rec20260212_133951_petiole30' # 'test' # + flow_success = alcf_segmentation_dino_task( + recon_folder_path=recon_folder_path, + config=Config832() + ) + logger.info(f"Flow success: {flow_success}") + return flow_success + + +@flow(name="alcf_combine_segmentations_integration_test", flow_run_name="alcf_combine_segmentations_integration_test") +def alcf_combine_segmentations_integration_test() -> bool: + """ + Integration test for the ALCF combined segmentation task. + + :return: True if the segmentation task completed successfully, False otherwise. + """ + logger = get_run_logger() + logger.info("Starting ALCF segmentation combine integration test.") + recon_folder_path = 'DD-00842_hexemer/test_16' # rec20260212_133951_petiole30' # 'test' # + flow_success = alcf_combine_segmentations_task( + recon_folder_path=recon_folder_path, config=Config832() ) - print(flow_success) + logger.info(f"Flow success: {flow_success}") + return flow_success + + +@flow(name="alcf_reconstruction_integration_test", flow_run_name="alcf_reconstruction_integration_test") +def alcf_reconstruction_integration_test() -> bool: + """ + Integration test for the ALCF reconstruction task. + + :return: True if the reconstruction task completed successfully, False otherwise. + """ + logger = get_run_logger() + logger.info("Starting ALCF reconstruction integration test.") + raw_file_path = '_ra-00823_bard/20251218_111600_silkraw.h5' # 'test' # + + tomography_controller = get_controller( + hpc_type=HPC.ALCF, + config=Config832() + ) + + flow_success = tomography_controller.reconstruct( + file_path=f"{raw_file_path}", + ) + + logger.info(f"Flow success: {flow_success}") + return flow_success diff --git a/orchestration/flows/bl832/config.py b/orchestration/flows/bl832/config.py index 788eef4a..d523952d 100644 --- a/orchestration/flows/bl832/config.py +++ b/orchestration/flows/bl832/config.py @@ -24,7 +24,10 @@ def _beam_specific_config(self) -> None: self.nersc832_alsdev_pscratch_raw = self.endpoints["nersc832_alsdev_pscratch_raw"] self.nersc832_alsdev_pscratch_scratch = self.endpoints["nersc832_alsdev_pscratch_scratch"] self.nersc832_alsdev_recon_scripts = self.endpoints["nersc832_alsdev_recon_scripts"] - self.alcf832_raw = self.endpoints["alcf832_raw"] - self.alcf832_scratch = self.endpoints["alcf832_scratch"] + self.alcf832_synaps_raw = self.endpoints["alcf832_synaps_raw"] + self.alcf832_synaps_recon = self.endpoints["alcf832_synaps_recon"] + self.alcf832_synaps_segment = self.endpoints["alcf832_synaps_segment"] + self.alcf832_iri_raw = self.endpoints["alcf832_iri_raw"] + self.alcf832_iri_scratch = self.endpoints["alcf832_iri_scratch"] self.scicat = self.config["scicat"] self.ghcr_images832 = self.config["ghcr_images832"] diff --git a/orchestration/flows/bl832/dispatcher.py b/orchestration/flows/bl832/dispatcher.py index cf1d0c64..1a0e91ae 100644 --- a/orchestration/flows/bl832/dispatcher.py +++ b/orchestration/flows/bl832/dispatcher.py @@ -17,6 +17,12 @@ class FlowParameterMapper: "alcf_recon_flow/alcf_recon_flow": [ "file_path", "config"], + "alcf_forge_recon_segment_flow/alcf_forge_recon_segment_flow": [ + "file_path", + "config"], + "alcf_forge_recon_multisegment_flow/alcf_forge_recon_multisegment_flow": [ + "file_path", + "config"], # From move.py "new_832_file_flow/new_file_832": [ "file_path", @@ -55,22 +61,32 @@ class DecisionFlowInputModel(BaseModel): @task(name="setup_decision_settings") -def setup_decision_settings(alcf_recon: bool, nersc_recon: bool, new_file_832: bool) -> dict: +def setup_decision_settings(alcf_recon: bool, + alcf_forge_recon_segment: bool, + alcf_forge_recon_multisegment: bool, + nersc_recon: bool, + new_file_832: bool) -> dict: """ This task is used to define the settings for the decision making process of the BL832 beamline. :param alcf_recon: Boolean indicating whether to run the ALCF reconstruction flow. + :param alcf_forge_recon_segment: Boolean indicating whether to run the ALCF forge reconstruction segmentation flow. :param nersc_recon: Boolean indicating whether to run the NERSC reconstruction flow. - :param nersc_move: Boolean indicating whether to move files to NERSC. + :param new_file_832: Boolean indicating whether to run the new file 832 flow. :return: A dictionary containing the settings for each flow. """ logger = get_run_logger() try: logger.info(f"Setting up decision settings: alcf_recon={alcf_recon}, " - f"nersc_recon={nersc_recon}, new_file_832={new_file_832}") + f"alcf_forge_recon_segment={alcf_forge_recon_segment}, " + f"alcf_forge_recon_multisegment={alcf_forge_recon_multisegment}, " + f"nersc_recon={nersc_recon}, " + f"new_file_832={new_file_832}") # Define which flows to run based on the input settings settings = { "alcf_recon_flow/alcf_recon_flow": alcf_recon, + "alcf_forge_recon_segment_flow/alcf_forge_recon_segment_flow": alcf_forge_recon_segment, + "alcf_forge_recon_multisegment_flow/alcf_forge_recon_multisegment_flow": alcf_forge_recon_multisegment, "nersc_recon_flow/nersc_recon_flow": nersc_recon, "new_832_file_flow/new_file_832": new_file_832 } @@ -145,6 +161,21 @@ async def dispatcher( alcf_params = FlowParameterMapper.get_flow_parameters("alcf_recon_flow/alcf_recon_flow", available_params) tasks.append(run_recon_flow_async("alcf_recon_flow/alcf_recon_flow", alcf_params)) + if decision_settings.get("alcf_forge_recon_segment_flow/alcf_forge_recon_segment_flow"): + alcf_forge_params = FlowParameterMapper.get_flow_parameters( + "alcf_forge_recon_segment_flow/alcf_forge_recon_segment_flow", + available_params + ) + tasks.append(run_recon_flow_async("alcf_forge_recon_segment_flow/alcf_forge_recon_segment_flow", alcf_forge_params)) + + if decision_settings.get("alcf_forge_recon_multisegment_flow/alcf_forge_recon_multisegment_flow"): + alcf_forge_params = FlowParameterMapper.get_flow_parameters( + "alcf_forge_recon_multisegment_flow/alcf_forge_recon_multisegment_flow", + available_params + ) + tasks.append(run_recon_flow_async("alcf_forge_recon_multisegment_flow/alcf_forge_recon_multisegment_flow", + alcf_forge_params)) + if decision_settings.get("nersc_recon_flow/nersc_recon_flow"): nersc_params = FlowParameterMapper.get_flow_parameters("nersc_recon_flow/nersc_recon_flow", available_params) tasks.append(run_recon_flow_async("nersc_recon_flow/nersc_recon_flow", nersc_params)) @@ -169,7 +200,11 @@ async def dispatcher( """ try: # Setup decision settings based on input parameters - setup_decision_settings(alcf_recon=True, nersc_recon=True, new_file_832=True) + setup_decision_settings(alcf_recon=True, + alcf_forge_recon_segment=False, + alcf_forge_recon_multisegment=False, + nersc_recon=True, + new_file_832=True) # Run the main decision flow with the specified parameters # asyncio.run(dispatcher( # config={}, # PYTEST, ALCF, NERSC diff --git a/orchestration/flows/bl832/prefect.yaml b/orchestration/flows/bl832/prefect.yaml index a1d4613b..53b03708 100644 --- a/orchestration/flows/bl832/prefect.yaml +++ b/orchestration/flows/bl832/prefect.yaml @@ -55,6 +55,18 @@ deployments: name: alcf_recon_flow_pool work_queue_name: alcf_recon_flow_queue +- name: alcf_forge_recon_segment_flow + entrypoint: orchestration/flows/bl832/alcf.py:alcf_forge_recon_segment_flow + work_pool: + name: alcf_recon_flow_pool + work_queue_name: alcf_forge_recon_segment_flow_queue + +- name: alcf_forge_recon_multisegment_flow + entrypoint: orchestration/flows/bl832/alcf.py:alcf_forge_recon_multisegment_flow + work_pool: + name: alcf_recon_flow_pool + work_queue_name: alcf_forge_recon_segment_flow_queue + # Pruning flows - name: prune_globus_endpoint entrypoint: orchestration/prune_controller.py:prune_globus_endpoint diff --git a/scripts/create_latex_for_segmentation_timing.py b/scripts/create_latex_for_segmentation_timing.py new file mode 100644 index 00000000..23f4f286 --- /dev/null +++ b/scripts/create_latex_for_segmentation_timing.py @@ -0,0 +1,831 @@ +""" +Scrape flow-prd timing data for the LaTeX table. + +Targets: + - nersc_forge_recon_multisegment_flow + - alcf_forge_recon_multisegment_flow + - new_file_832 + +Timing sources per label: + + Source = "log" → actual HPC wall-clock time from job output re-logged by controller + Source = "task" → Prefect @task run duration (includes overhead: queue wait, staging, etc.) + + Data Transfers (NERSC) task transfer_data_to_nersc @task in new_832_file_flow + log timestamp delta between 'Copying raw data to NERSC' and + 'Transfer to NERSC' log lines in nersc_forge_recon_multisegment_flow + Data Transfers (ALCF) log timestamp delta between + 'Transferring raw data to ALCF: ...' and + 'Transfer to ALCF successful.' in alcf_forge_recon_multisegment_flow + Data Transfers (NERSC) log timestamp delta between NERSC transfer start/end lines + in nersc_forge_recon_multisegment_flow (confirm with --dump-transfer-logs) + Reconstruction (NERSC) log " RECONSTRUCTION: Ns <-- actual recon time" + SAM3 (NERSC) log " Total time: Xm Ys (Zs)" from _fetch_seg_timing_from_output + task nersc_segmentation_task (fallback if log not matched) + DINOv3 (NERSC) task nersc_segmentation_dino_task (no HPC timing logged) + Combine (NERSC) task nersc_combine_segmentations_task (no HPC timing logged) + Reconstruction (ALCF) log "Total duration of the reconstruction task: N.NN seconds." + SAM3 (ALCF) log "Total duration of the segmentation task: N.NN seconds." + DINOv3 (ALCF) log "Total duration of the segmentation_dino task: N.NN seconds." + Combine (ALCF) log "Total duration of the combine_segmentations task: N.NN seconds." + +Usage: + python scrape_flow_prd_timing.py --after 2026-02-24 --before 2026-02-26 --latex + python scrape_flow_prd_timing.py --check +""" + +import argparse +import os +import re +import statistics +from collections import defaultdict +from dataclasses import dataclass, field +from datetime import datetime, timezone +from typing import Optional + +import httpx +from dotenv import load_dotenv + +load_dotenv() + +SERVER_URL = "https://flow-prd.als.lbl.gov" + +RECON_DEPLOYMENTS = { + "nersc_forge_recon_multisegment_flow", + "alcf_forge_recon_multisegment_flow", +} +TRANSFER_DEPLOYMENTS: set = set() # unused; kept for compat + +# Dispatcher runs process_new_832_file_task inline (not via run_deployment), +# so NERSC transfer log brackets appear in dispatcher flow run logs. +DISPATCHER_DEPLOYMENTS = { + "run_832_dispatcher", +} + +TABLE_ROWS = [ + "Data Transfers (NERSC)", + "Data Transfers (ALCF)", + "Reconstruction (NERSC)", + "SAM3 (NERSC)", + "DINOv3 (NERSC)", + "Combine (NERSC)", + "Reconstruction (ALCF)", + "SAM3 (ALCF)", + "DINOv3 (ALCF)", + "Combine (ALCF)", +] + +TABLE_METADATA = { + # facility, resources, task_type + "Data Transfers (NERSC)": ("NERSC", "ESNet/Globus", "Transfer"), + "Data Transfers (ALCF)": ("ALCF", "ESNet/Globus", "Transfer"), + "Reconstruction (NERSC)": ("NERSC", "16 CPU Nodes", "Compute"), + "SAM3 (NERSC)": ("NERSC", "42 GPU Nodes", "Compute"), + "DINOv3 (NERSC)": ("NERSC", "8 GPU Nodes", "Compute"), + "Combine (NERSC)": ("NERSC", "8 CPU Nodes", "Compute"), + "Reconstruction (ALCF)": ("ALCF", "8 CPU Nodes", "Compute"), + "SAM3 (ALCF)": ("ALCF", "4 GPU Nodes", "Compute"), + "DINOv3 (ALCF)": ("ALCF", "4 GPU Nodes", "Compute"), + "Combine (ALCF)": ("ALCF", "2 CPU Nodes", "Compute"), +} + +# Grouped layout for the LaTeX table. +# Each entry: (display_name, task_type, [row_label, ...]) +# Groups with 2 rows get \multirow{2}{*}{name}; single-row groups get plain text. +TABLE_GROUPS = [ + ("Data Transfers", "Transfer", [ + "Data Transfers (NERSC)", + "Data Transfers (ALCF)", + ]), + ("Reconstruction", "Compute", [ + "Reconstruction (NERSC)", + "Reconstruction (ALCF)", + ]), + ("SAM3", "Compute", [ + "SAM3 (NERSC)", + "SAM3 (ALCF)", + ]), + ("DINOv3", "Compute", [ + "DINOv3 (NERSC)", + "DINOv3 (ALCF)", + ]), + ("Combine", "Compute", [ + "Combine (NERSC)", + "Combine (ALCF)", + ]), +] + +# --------------------------------------------------------------------------- +# Log patterns — yield seconds via named group "secs" +# +# NERSC (nersc_hpc_controller.py): +# Reconstruction: +# logger.info(f" RECONSTRUCTION: {timing.get('reconstruction','N/A')}s <-- actual recon time") +# SAM3: +# _fetch_seg_timing_from_output() re-logs SLURM stdout verbatim: +# logger.info(f" {line}") where line = "Total time: 5m 23s (323s)" +# Raw seconds are in the parentheses. +# DINO / Combine: no HPC timing logged → fall back to Prefect @task duration. +# +# ALCF (alcf_hpc_controller.py): +# All four tasks use _wait_for_globus_compute_future(future, task_name, ...): +# logger.info(f"Total duration of the {task_name} task: {elapsed_time:.2f} seconds.") +# task_name: "reconstruction" | "segmentation" | "segmentation_dino" | "combine_segmentations" +# --------------------------------------------------------------------------- + +LOG_PATTERNS: dict[str, re.Pattern] = { + "Reconstruction (NERSC)": re.compile( + r"RECONSTRUCTION:\s+(?P[\d.]+)s\s+<-- actual recon time" + ), + "SAM3 (NERSC)": re.compile( + r"Total time:\s+\d+m\s+\d+s\s+\((?P\d+)s\)" + ), + "Reconstruction (ALCF)": re.compile( + r"Total duration of the reconstruction task:\s+(?P[\d.]+)\s+seconds\." + ), + "SAM3 (ALCF)": re.compile( + r"Total duration of the segmentation task:\s+(?P[\d.]+)\s+seconds\." + ), + "DINOv3 (ALCF)": re.compile( + r"Total duration of the segmentation_dino task:\s+(?P[\d.]+)\s+seconds\." + ), + "Combine (ALCF)": re.compile( + r"Total duration of the combine_segmentations task:\s+(?P[\d.]+)\s+seconds\." + ), +} + +TRANSFER_BRACKET_PATTERNS: dict[str, tuple[re.Pattern, re.Pattern]] = { + # alcf_forge_recon_multisegment_flow (confirmed from log dump): + # start: logger.info(f"Transferring raw data to ALCF: {data832_raw_path}") + # end: logger.info("Transfer to ALCF successful.") + "Data Transfers (ALCF)": ( + re.compile(r"Transferring raw data to ALCF", re.IGNORECASE), + re.compile(r"Transfer to ALCF successful", re.IGNORECASE), + ), + # run_832_dispatcher → process_new_832_file_task → transfer_data_to_nersc (move.py): + # start: logger.info(f"Transferring {file_path} from data832 to nersc") + # end: logger.info(f"File successfully transferred from data832 to NERSC ...") + "Data Transfers (NERSC)": ( + re.compile(r"Transferring .* from data832 to nersc", re.IGNORECASE), + re.compile(r"File successfully transferred from data832 to NERSC", re.IGNORECASE), + ), +} + +# For these labels, prefer log-extracted HPC time over Prefect @task wall time. +LOG_PREFERRED = set(LOG_PATTERNS.keys()) + +# --------------------------------------------------------------------------- +# Prefect @task classification +# transfer_spot_to_data → spot storage → ALCF (alcf_forge_recon_multisegment_flow STEP 1) +# transfer_data_to_nersc → data832 → NERSC (nersc_forge_recon_multisegment_flow) +# --------------------------------------------------------------------------- +TASK_LABEL_MAP = [ + ("nersc_segmentation_dino_task", "DINOv3 (NERSC)"), + ("nersc_segmentation_task", "SAM3 (NERSC)"), + ("nersc_combine_segmentations", "Combine (NERSC)"), + ("alcf_segmentation_dino_task", "DINOv3 (ALCF)"), + ("alcf_segmentation_task", "SAM3 (ALCF)"), + ("transfer_data_to_nersc", "Data Transfers (NERSC)"), +] + +RED = "\033[91m" +GREEN = "\033[92m" +YELLOW = "\033[93m" +DIM = "\033[2m" +BOLD = "\033[1m" +RESET = "\033[0m" + + +# ── Data types ───────────────────────────────────────────────────────────────── + +@dataclass +class TimingStats: + label: str + durations: list[float] = field(default_factory=list) + source: str = "task" # "task" | "log" | "mixed" + + @property + def count(self): return len(self.durations) + + @property + def mean_minutes(self): return statistics.mean(self.durations) / 60 if self.durations else None + + @property + def median_minutes(self): return statistics.median(self.durations) / 60 if self.durations else None + + @property + def stdev_minutes(self): return statistics.stdev(self.durations) / 60 if len(self.durations) > 1 else None + + @property + def min_minutes(self): return min(self.durations) / 60 if self.durations else None + + @property + def max_minutes(self): return max(self.durations) / 60 if self.durations else None + + +# ── HTTP client ──────────────────────────────────────────────────────────────── + +def get_client() -> httpx.Client: + token = os.environ.get("PREFECT_API_KEY") + if not token: + raise SystemExit("PREFECT_API_KEY not set.") + return httpx.Client( + headers={"Authorization": f"Bearer {token}"}, + timeout=30, + follow_redirects=True, + ) + + +def check_connectivity(client: httpx.Client) -> None: + resp = client.get(f"{SERVER_URL}/api/health") + print(f" {GREEN}✓ health: {resp.status_code}{RESET}") + resp = client.get(f"{SERVER_URL}/api/me") + if resp.status_code == 200: + print(f" {GREEN}✓ auth OK (user: {resp.json().get('name', 'unknown')}){RESET}") + elif resp.status_code == 404: + resp2 = client.post(f"{SERVER_URL}/api/deployments/filter", json={"limit": 1}) + if resp2.status_code != 200: + raise SystemExit(f"Auth failed: {resp2.status_code}") + print(f" {GREEN}✓ auth OK{RESET}") + else: + raise SystemExit(f"Auth failed: {resp.status_code} {resp.text[:200]}") + + +# ── Paginated API fetchers ───────────────────────────────────────────────────── + +def get_deployments(client: httpx.Client, target_names: set) -> list: + """ + Fetch deployments whose name matches any entry in target_names. + + Prefect deployment names can take several forms: + - exact flow name: "new_832_file_flow" + - flow/deployment: "new_832_file_flow/default" + - arbitrary deployment name set by the operator + + We match if the deployment name exactly equals a target OR contains a target + as a substring (handles "flow_name/deployment_name" format). + All deployment names are printed when any target is not found so the user can + identify the correct name to put in TRANSFER_DEPLOYMENTS / RECON_DEPLOYMENTS. + """ + resp = client.post(f"{SERVER_URL}/api/deployments/filter", json={"limit": 200}) + resp.raise_for_status() + all_deps = resp.json() + + def _matches(dep_name: str) -> bool: + return any( + t == dep_name or t in dep_name or dep_name in t + for t in target_names + ) + + matched = [d for d in all_deps if _matches(d["name"])] + missing = target_names - {t for d in matched for t in target_names if t in d["name"] or t == d["name"]} + + if missing: + print(f" {YELLOW}⚠ deployments not found for: {missing}{RESET}") + print(f" {DIM} All deployment names on this server:{RESET}") + for d in sorted(all_deps, key=lambda x: x["name"]): + print(f" {DIM} {d['name']!r}{RESET}") + return matched + + +def get_flow_runs( + client: httpx.Client, + deployment_ids: list, + after: datetime, + before: datetime, + states: list[str] | None = None, +) -> list: + """ + Fetch COMPLETED flow runs for the given deployments and date range. + State filter is applied at the API level so failed/cancelled runs are + never fetched, avoiding wasted task_run and log API calls. + Pass states=[] to retrieve all states (e.g. for diagnostics). + """ + if states is None: + states = ["COMPLETED"] + + all_runs, offset = [], 0 + while True: + flow_filter: dict = { + "start_time": { + "after_": after.isoformat(), + "before_": before.isoformat(), + }, + "deployment_id": {"any_": deployment_ids}, + } + if states: + flow_filter["state"] = {"type": {"any_": states}} + + resp = client.post( + f"{SERVER_URL}/api/flow_runs/filter", + json={"flow_runs": flow_filter, "limit": 200, "offset": offset}, + ) + resp.raise_for_status() + batch = resp.json() + if not batch: + break + all_runs.extend(batch) + print(f" {DIM}fetched {len(all_runs)} flow runs...{RESET}", end="\r", flush=True) + if len(batch) < 200: + break + offset += 200 + print(f" {DIM}fetched {len(all_runs)} flow runs{RESET} ") + return all_runs + + +def get_task_runs(client: httpx.Client, flow_run_id: str) -> list: + all_trs, offset = [], 0 + while True: + resp = client.post( + f"{SERVER_URL}/api/task_runs/filter", + json={ + "flow_runs": {"id": {"any_": [flow_run_id]}}, + "limit": 200, + "offset": offset, + }, + ) + resp.raise_for_status() + batch = resp.json() + if not batch: + break + all_trs.extend(batch) + if len(batch) < 200: + break + offset += 200 + return all_trs + + +def get_logs_for_flow_run(client: httpx.Client, flow_run_id: str) -> list[dict]: + """ + Fetch all log entries for a flow run via POST /api/logs/filter. + Returns a list of {"message": str, "timestamp": str} dicts. + Captures output from both @task functions AND inline flow code. + Returns [] if the endpoint is unavailable. + """ + all_entries, offset = [], 0 + while True: + resp = client.post( + f"{SERVER_URL}/api/logs/filter", + json={ + "logs": {"flow_run_id": {"any_": [flow_run_id]}}, + "limit": 200, + "offset": offset, + }, + ) + if resp.status_code != 200: + return all_entries + batch = resp.json() + if not batch: + break + all_entries.extend( + {"message": e.get("message", ""), "timestamp": e.get("timestamp", "")} + for e in batch + ) + if len(batch) < 200: + break + offset += 200 + return all_entries + + +# ── Helpers ──────────────────────────────────────────────────────────────────── + +def dur_sec(obj: dict) -> Optional[float]: + trt = obj.get("total_run_time") + if trt and trt > 0: + return float(trt) + start, end = obj.get("start_time"), obj.get("end_time") + if not start or not end: + return None + try: + return max(0.0, ( + datetime.fromisoformat(end) - datetime.fromisoformat(start) + ).total_seconds()) + except (ValueError, TypeError): + return None + + +def classify_task(name: str) -> Optional[str]: + n = name.lower() + for key, label in TASK_LABEL_MAP: + if key in n: + return label + return None + + +def strip_hash(name: str) -> str: + """Remove Prefect's 3-char hex suffix (e.g. 'my_task-a3f' -> 'my_task').""" + parts = name.rsplit("-", 1) + if len(parts) == 2 and len(parts[1]) == 3 and all( + c in "0123456789abcdef" for c in parts[1] + ): + return parts[0] + return name + + +def extract_log_timings(entries: list[dict]) -> dict[str, float]: + """ + Scan all log entries for known HPC timing patterns. + Returns {label: seconds}. Takes the FIRST match per label. + """ + found: dict[str, float] = {} + for entry in entries: + msg = entry["message"] + for label, pattern in LOG_PATTERNS.items(): + if label not in found: + m = pattern.search(msg) + if m: + found[label] = float(m.group("secs")) + return found + + +def _parse_ts(ts: str) -> Optional[datetime]: + """Parse an ISO timestamp string; return None on failure.""" + if not ts: + return None + try: + return datetime.fromisoformat(ts.replace("Z", "+00:00")) + except (ValueError, TypeError): + return None + + +def extract_transfer_timings( + entries: list[dict], + debug: bool = False, +) -> dict[str, float]: + """ + Compute Globus transfer duration by bracketing start/end log messages + using their timestamps. Works on recon flow logs where transfer_controller.copy() + is called inline (no @task decorator, so no Prefect task run timing available). + + Entries must have "message" and "timestamp" keys (ISO string from Prefect logs API). + Returns {label: seconds} for any transfer brackets successfully matched. + + Set debug=True to print every entry considered for each label (for pattern tuning). + """ + found: dict[str, float] = {} + + # Check whether any entry has a usable timestamp at all + ts_available = any(_parse_ts(e.get("timestamp", "")) is not None for e in entries) + + for label, (start_pat, end_pat) in TRANSFER_BRACKET_PATTERNS.items(): + t_start: Optional[datetime] = None + for entry in entries: + msg = entry["message"] + ts = entry.get("timestamp", "") + if debug: + print(f" [{label}] ts={ts[:19]} msg={msg[:80]}") + if t_start is None and start_pat.search(msg): + t_start = _parse_ts(ts) + if debug: + print(f" ^^ START matched (ts_parsed={t_start})") + if not ts_available: + # No timestamps in this log stream — can't bracket + break + elif t_start is not None and end_pat.search(msg): + t_end = _parse_ts(ts) + if debug: + print(f" ^^ END matched (ts_parsed={t_end})") + if t_end is not None and t_start is not None: + delta = (t_end - t_start).total_seconds() + if delta > 0: + found[label] = delta + break + + if not ts_available and entries: + # Surface this once so the user knows why transfer timing is missing + found["_no_timestamps"] = 0.0 # sentinel, filtered out by caller + + return found + + +# ── Main scraping ────────────────────────────────────────────────────────────── + +def scrape_timing(after: datetime, before: datetime) -> dict[str, TimingStats]: + stats: dict[str, TimingStats] = {label: TimingStats(label) for label in TABLE_ROWS} + log_hits: defaultdict[str, int] = defaultdict(int) + log_misses: defaultdict[str, int] = defaultdict(int) + logs_api_available: Optional[bool] = None + + with get_client() as client: + print(f"\n{BOLD}Connecting to {SERVER_URL}{RESET}") + check_connectivity(client) + + print(f"\n{BOLD}Fetching target deployments...{RESET}") + all_deps = get_deployments(client, RECON_DEPLOYMENTS | DISPATCHER_DEPLOYMENTS) + dep_name_map = {d["id"]: d["name"] for d in all_deps} + recon_dep_ids = [d["id"] for d in all_deps if d["name"] in RECON_DEPLOYMENTS] + + # ── Recon / segmentation flows ───────────────────────────────────────── + if recon_dep_ids: + print(f"\nFetching recon/seg flow runs {after.date()} → {before.date()}...") + flow_runs = get_flow_runs(client, recon_dep_ids, after=after, before=before) + print(f" {GREEN}{len(flow_runs)} COMPLETED runs{RESET}") + + for i, fr in enumerate(flow_runs): + frid = fr["id"] + dep_name = dep_name_map.get(fr.get("deployment_id", ""), "unknown") + is_nersc = "nersc" in dep_name + is_alcf = "alcf" in dep_name + start = (fr.get("start_time") or "")[:16] + total = fr.get("total_run_time", 0) or 0 + print( + f" [{i+1}/{len(flow_runs)}] {DIM}{dep_name} | {start} | " + f"{total/60:.1f}min{RESET} ", + end="\r", flush=True, + ) + + # ── Step 1: flow logs → actual HPC wall-clock times ──────────── + entries = get_logs_for_flow_run(client, frid) + if logs_api_available is None: + logs_api_available = bool(entries) + + log_timings = extract_log_timings(entries) if entries else {} + transfer_timings = extract_transfer_timings(entries) if entries else {} + + for label, secs in log_timings.items(): + stats[label].durations.append(secs) + stats[label].source = "log" + log_hits[label] += 1 + + no_ts = transfer_timings.pop("_no_timestamps", None) + if no_ts is not None and logs_api_available: + print(f" {YELLOW}⚠ Log entries have no timestamps — transfer bracket timing unavailable{RESET}") + for label, secs in transfer_timings.items(): + stats[label].durations.append(secs) + stats[label].source = "log" + + for label in LOG_PATTERNS: + if label not in log_timings: + if is_nersc and "NERSC" in label: + log_misses[label] += 1 + elif is_alcf and "ALCF" in label: + log_misses[label] += 1 + + # Track ALCF transfer bracket hits/misses (NERSC is @task-based) + for label in TRANSFER_BRACKET_PATTERNS: + if label not in transfer_timings: + if is_alcf and "ALCF" in label: + log_misses[label] += 1 + else: + log_hits[label] += 1 + + # ── Step 2: Prefect @task durations ─────────────────────────── + for tr in get_task_runs(client, frid): + if tr.get("state_type") != "COMPLETED": + continue + tr_name = tr.get("name") or tr.get("task_key") or "" + label = classify_task(tr_name) + if not label: + continue + if label in LOG_PREFERRED and label in log_timings: + continue # already have cleaner HPC-level timing + d = dur_sec(tr) + if d and d > 0: + stats[label].durations.append(d) + if stats[label].source == "log": + stats[label].source = "mixed" + + print(f"\n Done.{' ' * 60}") + + avail_str = ( + "logs API available" if logs_api_available + else "logs API unavailable — all timing from Prefect task runs" + if logs_api_available is False + else "no flows processed" + ) + print(f"\n {BOLD}Log-based HPC timing extraction ({avail_str}):{RESET}") + for label in list(LOG_PATTERNS) + list(TRANSFER_BRACKET_PATTERNS): # NERSC transfer via @task + hits = log_hits.get(label, 0) + misses = log_misses.get(label, 0) + total_relevant = hits + misses + if total_relevant == 0: + print(f" {DIM} {label:<34} (no relevant flow runs){RESET}") + elif hits > 0: + pct = 100 * hits // total_relevant + print(f" {GREEN}✓{RESET} {label:<34} {hits}/{total_relevant} runs ({pct}%)") + else: + print(f" {YELLOW}✗{RESET} {label:<34} pattern not matched in {misses} runs") + + # ── Dispatcher flows (run_832_dispatcher) ────────────────────────────── + # process_new_832_file_task is called directly inside the dispatcher flow + # (not via run_deployment), so its logs appear in the dispatcher flow run. + # NERSC transfer timing extracted via log brackets: + # start: f"Transferring {file_path} from data832 to nersc" + # end: f"File successfully transferred from data832 to NERSC ..." + dispatch_dep_ids = [d["id"] for d in all_deps if d["name"] in DISPATCHER_DEPLOYMENTS] + if dispatch_dep_ids: + print(f"\nFetching dispatcher flow runs {after.date()} → {before.date()}...") + flow_runs = get_flow_runs(client, dispatch_dep_ids, after=after, before=before) + print(f" {GREEN}{len(flow_runs)} COMPLETED runs{RESET}") + + nersc_transfer_hits = 0 + for i, fr in enumerate(flow_runs): + frid = fr["id"] + print(f" [{i+1}/{len(flow_runs)}] {DIM}run_832_dispatcher{RESET} ", + end="\r", flush=True) + + entries = get_logs_for_flow_run(client, frid) + transfer_timings = extract_transfer_timings(entries) if entries else {} + transfer_timings.pop("_no_timestamps", None) + for label, secs in transfer_timings.items(): + if secs > 0: + stats[label].durations.append(secs) + stats[label].source = "log" + if "NERSC" in label: + nersc_transfer_hits += 1 + + n_disp = len(flow_runs) + hit_str = f"{GREEN}✓{RESET}" if nersc_transfer_hits == n_disp else f"{YELLOW}✗{RESET}" + print(f"\n {hit_str} Data Transfers (NERSC) log bracket {nersc_transfer_hits}/{n_disp} runs") + print(f"\n Done.{' ' * 60}") + + return stats + + +# ── Output ───────────────────────────────────────────────────────────────────── + +SOURCE_TAG = { + "log": f"{DIM}[HPC log]{RESET}", + "task": f"{DIM}[Prefect task]{RESET}", + "mixed": f"{YELLOW}[mixed log+task]{RESET}", +} + + +def print_summary(stats: dict[str, TimingStats]) -> None: + print(f"\n{'=' * 80}") + print(f"{BOLD}TIMING SUMMARY{RESET}") + print("=" * 80) + for label in TABLE_ROWS: + s = stats[label] + if s.count == 0: + print(f" {DIM}{label:<34} no data{RESET}") + else: + stdev_str = f" σ={s.stdev_minutes:.1f}" if s.stdev_minutes else "" + src_tag = SOURCE_TAG.get(s.source, "") + print( + f" {BOLD}{label:<34}{RESET}" + f" mean={s.mean_minutes:.1f}min" + f" median={s.median_minutes:.1f}min" + f" [{s.min_minutes:.1f}–{s.max_minutes:.1f}]" + f"{stdev_str} n={s.count} {src_tag}" + ) + + +def print_latex_table(stats: dict[str, TimingStats]) -> None: + """ + Emit a grouped LaTeX table: tasks as \\multirow blocks, facility as sub-rows. + Requires \\usepackage{multirow} in the preamble. + """ + + def time_cell(label: str) -> str: + s = stats[label] + if s.count == 0: + return "--" + t = f"{s.mean_minutes:.1f}" + if s.stdev_minutes is not None: + t += f" $\\pm$ {s.stdev_minutes:.1f}" + t += f" (n={s.count})" + return t + + lines = [ + "", + r"% ── LaTeX Table ──────────────────────────────────────────────────────", + r"\begin{table}[h]", + r"\centering", + r"\begin{tabular}{lllll}", + r"\hline", + r"\textbf{Task} & \textbf{Facility} & \textbf{Resources} & \textbf{Type} & \textbf{Time (min)} \\", + r"\hline", + ] + + for group_name, task_type, row_labels in TABLE_GROUPS: + n = len(row_labels) + for idx, label in enumerate(row_labels): + facility, resources, _ = TABLE_METADATA[label] + tc = time_cell(label) + + if n == 1: + task_col = group_name + elif idx == 0: + # First row: emit \multirow on its own line, continuation indented + lines.append(f"\\multirow{{{n}}}{{*}}{{{group_name}}}") + task_col = " " + else: + task_col = " " + + lines.append( + f" {task_col}" + f" & {facility:<6}" + f" & {resources:<13}" + f" & {task_type:<8}" + f" & {tc} \\\\" + ) + + lines.append(r"\hline") + + lines += [ + r"\end{tabular}", + ( + r"\caption{Summary of computational tasks, resources, and wall-clock execution times" + r" (mean $\pm$ std). Each GPU node has 4 NVIDIA A100 GPUs." + r" CPU nodes on ALCF Polaris have a single 64 core AMD EPYC ``Milan'' processor," + r" and CPU nodes on NERSC Perlmutter have two 64 core AMD EPYC 7763 processors.}" + ), + r"\label{tab:TimingResults}", + r"\end{table}", + ] + + print("\n".join(lines)) + + +def _dump_transfer_logs(after: datetime, before: datetime) -> None: + """ + Print log messages from the first available recon flow run. + Use this to identify the exact phrasing of transfer start/end lines + so TRANSFER_BRACKET_PATTERNS can be tuned if needed. + """ + with get_client() as client: + all_deps = get_deployments(client, RECON_DEPLOYMENTS) + dep_ids = [d["id"] for d in all_deps] + if not dep_ids: + print("No recon deployments found.") + return + runs = get_flow_runs(client, dep_ids, after=after, before=before) + if not runs: + print("No completed runs found in date range.") + return + fr = runs[0] + dep_name = fr.get("deployment_id", "?") + print(f"\nDumping logs for flow run {fr['id']} ({dep_name[:60]}):") + entries = get_logs_for_flow_run(client, fr["id"]) + if not entries: + print(" No log entries returned (API may not support /api/logs/filter).") + return + transfer_keywords = re.compile( + r"(transfer|copy|globus|ALCF|NERSC|raw|scratch)", re.IGNORECASE + ) + hits = [e for e in entries if transfer_keywords.search(e["message"])] + + # Also run bracket extractor in debug mode + print("\n Running bracket extractor in debug mode:") + extract_transfer_timings(entries[:200], debug=True) + print(f" {len(entries)} total entries, {len(hits)} match transfer keywords:\n") + for e in hits[:60]: + ts = e["timestamp"][:19] + msg = e["message"][:120] + print(f" {ts} {msg}") + + +def parse_date(s: str) -> datetime: + return datetime.strptime(s, "%Y-%m-%d").replace(tzinfo=timezone.utc) + + +def main() -> None: + parser = argparse.ArgumentParser( + description="Scrape flow-prd forge recon/segment/transfer timing" + ) + parser.add_argument("--after", "-a", default="2026-02-24", + help="Start date inclusive (YYYY-MM-DD)") + parser.add_argument("--before", "-b", default="2026-02-26", + help="End date exclusive (YYYY-MM-DD)") + parser.add_argument("--latex", "-l", action="store_true", + help="Print populated LaTeX table") + parser.add_argument("--check", "-c", action="store_true", + help="Connectivity check only") + parser.add_argument("--dump-transfer-logs", action="store_true", + help="Print first 40 log messages from one recon flow run " + "(use to diagnose transfer bracket pattern mismatches)") + args = parser.parse_args() + + key = os.environ.get("PREFECT_API_KEY") + if key: + print(f"{DIM}PREFECT_API_KEY: {len(key)} chars, starts '{key[:6]}...'{RESET}") + else: + raise SystemExit(f"{RED}PREFECT_API_KEY not set in environment or .env{RESET}") + + if args.check: + with get_client() as client: + print(f"\n{BOLD}Checking {SERVER_URL}{RESET}") + check_connectivity(client) + return + + if args.dump_transfer_logs: + _dump_transfer_logs( + after=parse_date(args.after), + before=parse_date(args.before), + ) + return + + stats = scrape_timing( + after=parse_date(args.after), + before=parse_date(args.before), + ) + print_summary(stats) + if args.latex: + print_latex_table(stats) + else: + print(f"\n{DIM}Tip: add --latex to print the populated LaTeX table.{RESET}") + + +if __name__ == "__main__": + main() diff --git a/scripts/polaris/globus_compute_recon_config.yaml b/scripts/polaris/globus_compute_recon_config.yaml new file mode 100644 index 00000000..66ffd331 --- /dev/null +++ b/scripts/polaris/globus_compute_recon_config.yaml @@ -0,0 +1,39 @@ +engine: + type: GlobusComputeEngine # This engine uses the HighThroughputExecutor + max_retries_on_system_failure: 2 + max_workers: 1 # Sets one worker per node + prefetch_capacity: 0 # Increase if you have many more tasks than workers + + address: + type: address_by_interface + ifname: bond0 + + strategy: simple + job_status_kwargs: + max_idletime: 300 + strategy_period: 60 + + provider: + type: PBSProProvider + + launcher: + type: MpiExecLauncher + # Ensures 1 manger per node, work on all 64 cores + bind_cmd: --cpu-bind + overrides: --depth=64 --ppn 1 + + account: SYNAPS-I + queue: debug + cpus_per_node: 64 + + # e.g., "#PBS -l filesystems=home:grand:eagle\n#PBS -k doe" + scheduler_options: "#PBS -l filesystems=home:eagle" + + # Node setup: activate necessary conda environment and such + worker_init: "module use /soft/modulefiles; module load conda; conda activate /eagle/SYNAPS-I/reconstruction/env/tomopy; export PATH=$PATH:/eagle/SYNAPSE-I/; cd $HOME/.globus_compute/globus_compute_reconstruction" + + walltime: 00:60:00 # Jobs will end after 60 minutes + nodes_per_block: 2 # All jobs will have 1 node + init_blocks: 0 + min_blocks: 0 + max_blocks: 2 # No more than 1 job will be scheduled at a time diff --git a/scripts/polaris/globus_compute_recon_config_multinode.yaml b/scripts/polaris/globus_compute_recon_config_multinode.yaml new file mode 100644 index 00000000..8ae3d728 --- /dev/null +++ b/scripts/polaris/globus_compute_recon_config_multinode.yaml @@ -0,0 +1,39 @@ +engine: + type: GlobusComputeEngine # This engine uses the HighThroughputExecutor + max_retries_on_system_failure: 0 + max_workers: 1 # Sets one worker per node + prefetch_capacity: 0 # Increase if you have many more tasks than workers + + address: + type: address_by_interface + ifname: bond0 + + strategy: simple + job_status_kwargs: + max_idletime: 300 + strategy_period: 60 + + provider: + type: PBSProProvider + + launcher: + type: MpiExecLauncher + # Ensures 1 manger per node, work on all 64 cores + bind_cmd: --cpu-bind + overrides: --depth=64 --ppn 1 + + account: SYNAPS-I + queue: demand + cpus_per_node: 64 + + # e.g., "#PBS -l filesystems=home:grand:eagle\n#PBS -k doe" + scheduler_options: "#PBS -l filesystems=home:eagle" + + # Node setup: activate necessary conda environment and such + worker_init: "module use /soft/modulefiles; module load conda; conda activate /eagle/SYNAPS-I/reconstruction/env/tomopy; export PATH=$PATH:/eagle/SYNAPSE-I/; cd $HOME/.globus_compute/globus_compute_reconstruction" + + walltime: 59:00 # Jobs will end after 60 minutes + nodes_per_block: 4 # All jobs will have 1 node + init_blocks: 0 + min_blocks: 0 + max_blocks: 2 # No more than 1 job will be scheduled at a time diff --git a/scripts/polaris/globus_compute_segment_cellpose_config_multi_node.yaml b/scripts/polaris/globus_compute_segment_cellpose_config_multi_node.yaml new file mode 100644 index 00000000..4eada3f2 --- /dev/null +++ b/scripts/polaris/globus_compute_segment_cellpose_config_multi_node.yaml @@ -0,0 +1,48 @@ +engine: + type: GlobusComputeEngine + max_retries_on_system_failure: 0 + max_workers_per_node: 1 + prefetch_capacity: 0 + + address: + type: address_by_interface + ifname: bond0 + + strategy: simple + job_status_kwargs: + max_idletime: 300 + strategy_period: 60 + + provider: + type: PBSProProvider + + launcher: + type: SimpleLauncher + + account: SYNAPS-I + queue: demand + cpus_per_node: 64 # Full node for multi-node jobs + + # Request 4 nodes with 4 GPUs each + scheduler_options: "#PBS -l filesystems=home:eagle -l select=4:ngpus=4" + + worker_init: | + export TMPDIR=/tmp + module use /soft/modulefiles + module load conda + source $(conda info --base)/etc/profile.d/conda.sh + conda activate /eagle/SYNAPS-I/segmentation/env_dino_cellpose + # export HF_HUB_CACHE=/eagle/SYNAPS-I/segmentation/.cache/huggingface + export HF_HOME=$HF_HUB_CACHE + export CUDA_DEVICE_ORDER=PCI_BUS_ID + # Enable IB for multi-node communication + export NCCL_IB_DISABLE=0 + export NCCL_P2P_DISABLE=0 + export OMP_NUM_THREADS=8 + cd /eagle/SYNAPS-I/segmentation/scripts/inference_v5/forge_feb_seg_model_demo + + walltime: 59:00 + nodes_per_block: 4 # Changed from 1 to 2 + init_blocks: 0 + min_blocks: 0 + max_blocks: 1 diff --git a/scripts/polaris/globus_compute_segment_combine_config_multi_node.yaml b/scripts/polaris/globus_compute_segment_combine_config_multi_node.yaml new file mode 100644 index 00000000..779b71c7 --- /dev/null +++ b/scripts/polaris/globus_compute_segment_combine_config_multi_node.yaml @@ -0,0 +1,46 @@ +engine: + type: GlobusComputeEngine + max_retries_on_system_failure: 0 + max_workers: 1 + prefetch_capacity: 0 + + address: + type: address_by_interface + ifname: bond0 + + strategy: simple + job_status_kwargs: + max_idletime: 300 + strategy_period: 60 + + provider: + type: PBSProProvider + + launcher: + type: MpiExecLauncher + bind_cmd: --cpu-bind + overrides: --depth=64 --ppn 1 + + account: SYNAPS-I + queue: demand + cpus_per_node: 64 + + # No ngpus requested — CPU-only combine step + scheduler_options: "#PBS -l filesystems=home:eagle -l select=2" + + worker_init: | + export TMPDIR=/tmp + module use /soft/modulefiles + module load conda + source $(conda info --base)/etc/profile.d/conda.sh + conda activate /eagle/SYNAPS-I/segmentation/env_dino_cellpose + # export HF_HUB_CACHE=/eagle/SYNAPS-I/segmentation/.cache/huggingface + export HF_HOME=$HF_HUB_CACHE + export OMP_NUM_THREADS=64 + cd /eagle/SYNAPS-I/segmentation/scripts/inference_v5/forge_feb_seg_model_demo + + walltime: 59:00 + nodes_per_block: 2 + init_blocks: 0 + min_blocks: 0 + max_blocks: 1 diff --git a/scripts/polaris/globus_compute_segment_config_multi_node.yaml b/scripts/polaris/globus_compute_segment_config_multi_node.yaml new file mode 100644 index 00000000..35de5bd9 --- /dev/null +++ b/scripts/polaris/globus_compute_segment_config_multi_node.yaml @@ -0,0 +1,48 @@ +engine: + type: GlobusComputeEngine + max_retries_on_system_failure: 0 + max_workers_per_node: 1 + prefetch_capacity: 0 + + address: + type: address_by_interface + ifname: bond0 + + strategy: simple + job_status_kwargs: + max_idletime: 300 + strategy_period: 60 + + provider: + type: PBSProProvider + + launcher: + type: SimpleLauncher + + account: SYNAPS-I + queue: demand + cpus_per_node: 64 # Full node for multi-node jobs + + # Request 4 nodes with 4 GPUs each + scheduler_options: "#PBS -l filesystems=home:eagle -l select=4:ngpus=4" + + worker_init: | + export TMPDIR=/tmp + module use /soft/modulefiles + module load conda + conda activate base + source /eagle/SYNAPS-I/segmentation/env/bin/activate + export HF_HUB_CACHE=/eagle/SYNAPS-I/segmentation/.cache/huggingface + export HF_HOME=$HF_HUB_CACHE + export CUDA_DEVICE_ORDER=PCI_BUS_ID + # Enable IB for multi-node communication + export NCCL_IB_DISABLE=0 + export NCCL_P2P_DISABLE=0 + export OMP_NUM_THREADS=8 + cd /eagle/SYNAPS-I/segmentation/scripts/forge_feb_seg_model_demo + + walltime: 59:00 + nodes_per_block: 4 # Changed from 1 to 2 + init_blocks: 0 + min_blocks: 0 + max_blocks: 1 diff --git a/scripts/polaris/globus_compute_segment_config_single_node.yaml b/scripts/polaris/globus_compute_segment_config_single_node.yaml new file mode 100644 index 00000000..89dd9979 --- /dev/null +++ b/scripts/polaris/globus_compute_segment_config_single_node.yaml @@ -0,0 +1,50 @@ +engine: + type: GlobusComputeEngine # This engine uses the HighThroughputExecutor + max_retries_on_system_failure: 2 + # max_workers: 1 # Sets one worker per node + max_workers_per_node: 1 + prefetch_capacity: 0 # Increase if you have many more tasks than workers + + address: + type: address_by_interface + ifname: bond0 + + strategy: simple + job_status_kwargs: + max_idletime: 300 + strategy_period: 60 + + provider: + type: PBSProProvider + + launcher: + type: SimpleLauncher + # type: MpiExecLauncher + # Ensures 1 manger per node, work on all 64 cores + # bind_cmd: --cpu-bind + # overrides: --depth=64 --ppn 1 + + account: SYNAPS-I + queue: demand # debug (1-2 nodes), debug-scaling (1-10 nodes), or some other queue, probably want demand (1-56 nodes) for real-time things, prod (496 nodes) + # minimum node 1, max 56 nodes. Max time 59 minutes + cpus_per_node: 4 # may want to change to 4 (only 4 GPUs per node) + + # e.g., "#PBS -l filesystems=home:grand:eagle\n#PBS -k doe" + scheduler_options: "#PBS -l filesystems=home:eagle -l select=1:ngpus=4" + # Node setup: activate necessary conda environment and such + # worker_init: "module use /soft/modulefiles; module load conda; conda activate /eagle/SYNAPS-I/segmentation/env/; export PATH=$PATH:/eagle/SYNAPS-I/; cd $HOME/.globus_compute/globus_compute_segmentation" + worker_init: | + export TMPDIR=/tmp + module use /soft/modulefiles + module load conda + conda activate base + source /eagle/SYNAPS-I/segmentation/env/bin/activate + export HF_HUB_CACHE=/eagle/SYNAPS-I/segmentation/.cache/huggingface + export HF_HOME=$HF_HUB_CACHE + cd /eagle/SYNAPS-I/segmentation/scripts/forge_feb_seg_model_demo + + walltime: 59:00 # Jobs will end after 59 minutes + nodes_per_block: 1 # All jobs will have 1 node + init_blocks: 0 + min_blocks: 0 + max_blocks: 2 # No more than 1 job will be scheduled at a time diff --git a/scripts/polaris/globus_compute_segment_dino_config_multi_node.yaml b/scripts/polaris/globus_compute_segment_dino_config_multi_node.yaml new file mode 100644 index 00000000..4eada3f2 --- /dev/null +++ b/scripts/polaris/globus_compute_segment_dino_config_multi_node.yaml @@ -0,0 +1,48 @@ +engine: + type: GlobusComputeEngine + max_retries_on_system_failure: 0 + max_workers_per_node: 1 + prefetch_capacity: 0 + + address: + type: address_by_interface + ifname: bond0 + + strategy: simple + job_status_kwargs: + max_idletime: 300 + strategy_period: 60 + + provider: + type: PBSProProvider + + launcher: + type: SimpleLauncher + + account: SYNAPS-I + queue: demand + cpus_per_node: 64 # Full node for multi-node jobs + + # Request 4 nodes with 4 GPUs each + scheduler_options: "#PBS -l filesystems=home:eagle -l select=4:ngpus=4" + + worker_init: | + export TMPDIR=/tmp + module use /soft/modulefiles + module load conda + source $(conda info --base)/etc/profile.d/conda.sh + conda activate /eagle/SYNAPS-I/segmentation/env_dino_cellpose + # export HF_HUB_CACHE=/eagle/SYNAPS-I/segmentation/.cache/huggingface + export HF_HOME=$HF_HUB_CACHE + export CUDA_DEVICE_ORDER=PCI_BUS_ID + # Enable IB for multi-node communication + export NCCL_IB_DISABLE=0 + export NCCL_P2P_DISABLE=0 + export OMP_NUM_THREADS=8 + cd /eagle/SYNAPS-I/segmentation/scripts/inference_v5/forge_feb_seg_model_demo + + walltime: 59:00 + nodes_per_block: 4 # Changed from 1 to 2 + init_blocks: 0 + min_blocks: 0 + max_blocks: 1