From 05f58594bc435bad9dfdc2a5d13669d2feaad875 Mon Sep 17 00:00:00 2001 From: Antti Luomi Date: Mon, 20 Apr 2026 11:51:25 +0000 Subject: [PATCH 01/13] Add backend pause/resume API for experiment jobs --- src/icon/server/api/scheduler_controller.py | 48 ++++++++++++++++++++- src/icon/server/data_access/models/enums.py | 3 ++ src/icon/server/pre_processing/worker.py | 18 +++----- src/icon/server/scheduler/scheduler.py | 6 ++- 4 files changed, 61 insertions(+), 14 deletions(-) diff --git a/src/icon/server/api/scheduler_controller.py b/src/icon/server/api/scheduler_controller.py index b794edc2..2b5bfb0d 100644 --- a/src/icon/server/api/scheduler_controller.py +++ b/src/icon/server/api/scheduler_controller.py @@ -185,13 +185,59 @@ def cancel_job(self, *, job_id: int) -> None: if job.status in (JobStatus.PROCESSING, JobStatus.SUBMITTED): JobRepository.update_job_status(job=job, status=JobStatus.PROCESSED) job_run = JobRunRepository.get_run_by_job_id(job_id=job_id) - if job_run.status in (JobRunStatus.PENDING, JobRunStatus.PROCESSING): + if job_run.status in ( + JobRunStatus.PENDING, + JobRunStatus.PROCESSING, + JobRunStatus.PAUSED, + ): JobRunRepository.update_run_by_id( run_id=job_run.id, status=JobRunStatus.CANCELLED, log="Cancelled through user interaction.", ) + def pause_job(self, *, job_id: int) -> None: + """Pause a running job. + + The pre-processing worker holding the job will finish any in-flight work and + then block in a polling loop, keeping the remaining scan state in memory. + Tasks already queued for the hardware worker are diverted back to the + pre-processing worker via the existing ``outdated_tasks`` rewind mechanism. + + No-op if the job run is not in ``PROCESSING`` state. + + Args: + job_id: ID of the job to pause. + """ + + job_run = JobRunRepository.get_run_by_job_id(job_id=job_id) + if job_run.status == JobRunStatus.PROCESSING: + JobRunRepository.update_run_by_id( + run_id=job_run.id, + status=JobRunStatus.PAUSED, + log="Paused through user interaction.", + ) + + def resume_job(self, *, job_id: int) -> None: + """Resume a paused job. + + The pre-processing worker observes the status change, regenerates any tasks + the hardware worker diverted while paused (picking up fresh parameter values + in the process), and continues producing data points from where it left off. + + No-op if the job run is not in ``PAUSED`` state. + + Args: + job_id: ID of the job to resume. + """ + + job_run = JobRunRepository.get_run_by_job_id(job_id=job_id) + if job_run.status == JobRunStatus.PAUSED: + JobRunRepository.update_run_by_id( + run_id=job_run.id, + status=JobRunStatus.PROCESSING, + ) + def get_scheduled_jobs( self, *, diff --git a/src/icon/server/data_access/models/enums.py b/src/icon/server/data_access/models/enums.py index 5e6a896a..02398877 100644 --- a/src/icon/server/data_access/models/enums.py +++ b/src/icon/server/data_access/models/enums.py @@ -32,6 +32,9 @@ class JobRunStatus(enum.Enum): """Run was cancelled before completion.""" DONE = "done" """Run completed successfully.""" + PAUSED = "paused" + """Run has been paused by the user; its pre-processing worker is holding its + remaining state in memory until the run is resumed or cancelled.""" class DeviceStatus(enum.Enum): diff --git a/src/icon/server/pre_processing/worker.py b/src/icon/server/pre_processing/worker.py index a4cccc39..3b2baa4b 100644 --- a/src/icon/server/pre_processing/worker.py +++ b/src/icon/server/pre_processing/worker.py @@ -214,12 +214,9 @@ def run(self) -> None: pre_processing_task.job_run.id, ) - if ( - JobRunRepository.get_run_by_job_id( - job_id=pre_processing_task.job.id - ).status - == JobRunStatus.PROCESSING - ): + if JobRunRepository.get_run_by_job_id( + job_id=pre_processing_task.job.id + ).status in (JobRunStatus.PROCESSING, JobRunStatus.PAUSED): JobRunRepository.update_run_by_id( run_id=pre_processing_task.job_run.id, status=JobRunStatus.DONE, @@ -231,12 +228,9 @@ def run(self) -> None: e, ) - if ( - JobRunRepository.get_run_by_job_id( - job_id=pre_processing_task.job.id - ).status - == JobRunStatus.PROCESSING - ): + if JobRunRepository.get_run_by_job_id( + job_id=pre_processing_task.job.id + ).status in (JobRunStatus.PROCESSING, JobRunStatus.PAUSED): JobRunRepository.update_run_by_id( run_id=pre_processing_task.job_run.id, status=JobRunStatus.FAILED, diff --git a/src/icon/server/scheduler/scheduler.py b/src/icon/server/scheduler/scheduler.py index e1b717ce..0fce5101 100644 --- a/src/icon/server/scheduler/scheduler.py +++ b/src/icon/server/scheduler/scheduler.py @@ -19,7 +19,11 @@ def initialise_job_tables() -> None: # update job_runs table job_runs = JobRunRepository.get_runs_by_status( - status=[JobRunStatus.PENDING, JobRunStatus.PROCESSING] + status=[ + JobRunStatus.PENDING, + JobRunStatus.PROCESSING, + JobRunStatus.PAUSED, + ] ) for job_run in job_runs: JobRunRepository.update_run_by_id( From 6522d2cd1e7b0988d2ca01c752d2362ce861b996 Mon Sep 17 00:00:00 2001 From: Antti Luomi Date: Mon, 20 Apr 2026 14:30:03 +0000 Subject: [PATCH 02/13] Pause pre-processing and divert hardware tasks for paused jobs --- src/icon/server/hardware_processing/worker.py | 8 ++++++- src/icon/server/pre_processing/worker.py | 24 +++++++++++++++++++ 2 files changed, 31 insertions(+), 1 deletion(-) diff --git a/src/icon/server/hardware_processing/worker.py b/src/icon/server/hardware_processing/worker.py index 7509b36d..ee8e46d4 100644 --- a/src/icon/server/hardware_processing/worker.py +++ b/src/icon/server/hardware_processing/worker.py @@ -166,7 +166,13 @@ def run(self) -> None: run_id=task.pre_processing_task.job_run.id, ) ) - if task.created < parameter_update_timestamp: + job_run_status = JobRunRepository.get_run_by_job_id( + job_id=task.pre_processing_task.job.id, + ).status + if ( + task.created < parameter_update_timestamp + or job_run_status == JobRunStatus.PAUSED + ): task.outdated_tasks.put(task) continue try: diff --git a/src/icon/server/pre_processing/worker.py b/src/icon/server/pre_processing/worker.py index 3b2baa4b..deeec7af 100644 --- a/src/icon/server/pre_processing/worker.py +++ b/src/icon/server/pre_processing/worker.py @@ -383,6 +383,28 @@ def _handle_parameter_updates( mode=ParamUpdateMode.ONLY_NEW_PARAMETERS, ) + def _wait_while_paused( + self, + pre_processing_task: PreProcessingTask, + namespace: ExperimentIdentifier, + ) -> None: + """Block until the job run's status is no longer ``PAUSED``. + + Parameter-update events are still drained while paused, so calibrations or + parameter edits the user makes during the pause take effect on resume. + The loop also exits if the status transitions to a non-``PAUSED`` value + (e.g. ``CANCELLED`` or back to ``PROCESSING``). + """ + + while ( + JobRunRepository.get_run_by_job_id( + job_id=pre_processing_task.job.id + ).status + == JobRunStatus.PAUSED + ): + self._handle_parameter_updates(pre_processing_task, namespace=namespace) + time.sleep(0.2) + def _submit_task_to_hw_worker( self, *, @@ -411,6 +433,7 @@ def _handle_regular_scan( scan_parameter_value_combinations ): self._handle_parameter_updates(pre_processing_task, namespace) + self._wait_while_paused(pre_processing_task, namespace=namespace) # TODO: this should probably be done with multiple workers to # speed up the preparation of JSONs @@ -501,6 +524,7 @@ def _handle_realtime_scan( ): return self._handle_parameter_updates(pre_processing_task, namespace=namespace) + self._wait_while_paused(pre_processing_task, namespace=namespace) frozen_data_point = freeze_dict(data_point) hardware_task = hardware_tasks.get(frozen_data_point) if ( From 6036deb7bd27d74a52568e0993bbe3f5cb1e83a0 Mon Sep 17 00:00:00 2001 From: Antti Luomi Date: Tue, 21 Apr 2026 10:15:41 +0000 Subject: [PATCH 03/13] Add pause/resume buttons and PAUSED status to job view --- .../src/components/JobStatusIndicator.tsx | 1 + frontend/src/components/JobView.tsx | 28 ++++++++++++++++++- frontend/src/types/enums.ts | 1 + frontend/src/utils/pauseJob.ts | 5 ++++ frontend/src/utils/resumeJob.ts | 5 ++++ 5 files changed, 39 insertions(+), 1 deletion(-) create mode 100644 frontend/src/utils/pauseJob.ts create mode 100644 frontend/src/utils/resumeJob.ts diff --git a/frontend/src/components/JobStatusIndicator.tsx b/frontend/src/components/JobStatusIndicator.tsx index 76e5c7ec..196a473e 100644 --- a/frontend/src/components/JobStatusIndicator.tsx +++ b/frontend/src/components/JobStatusIndicator.tsx @@ -7,6 +7,7 @@ const statusColorMap: Record = { [JobRunStatus.DONE]: "grey", [JobRunStatus.PROCESSING]: "green", [JobRunStatus.FAILED]: "red", + [JobRunStatus.PAUSED]: "orange", }; const capitalize = (s: string | undefined) => diff --git a/frontend/src/components/JobView.tsx b/frontend/src/components/JobView.tsx index 12b3f57c..d7b664a1 100644 --- a/frontend/src/components/JobView.tsx +++ b/frontend/src/components/JobView.tsx @@ -22,10 +22,12 @@ import { useJobInfo } from "../hooks/useJobInfo"; import { runMethod } from "../socket"; import { ExperimentMetadata } from "../types/ExperimentMetadata"; import { SerializedObject } from "../types/SerializedObject"; -import { JobStatus } from "../types/enums"; +import { JobRunStatus, JobStatus } from "../types/enums"; import { deserialize } from "../utils/deserializer"; import { updateJobParams } from "../utils/updateJobParams"; import { cancelJob } from "../utils/cancelJob"; +import { pauseJob } from "../utils/pauseJob"; +import { resumeJob } from "../utils/resumeJob"; import HistogramPlot from "./jobView/HistogramPlot"; function getPlotTitle(scheduledTime?: string, experimentName?: string): string { @@ -217,11 +219,35 @@ export const JobView = ({ {jobInfo?.status !== JobStatus.PROCESSED && ( <>
+ +