Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 16 additions & 10 deletions frontend/src/App.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,10 @@ import { SvgIcon } from "@mui/material";
import SettingsIcon from "@mui/icons-material/Settings";
import { ParameterDisplayGroupsContext } from "./contexts/ParameterDisplayGroupsContext";
import { reducer, JobsContext } from "./contexts/JobsContext";
import { reducer as jobRunsReducer, JobRunsContext } from "./contexts/JobRunsContext";
import { ParameterStoreProvider } from "./contexts/ParameterStoreContext";
import { useJobsSync } from "./hooks/useJobsSync";
import { useJobRunsSync } from "./hooks/useJobRunsSync";
import { deviceInfoReducer, DeviceInfoContext } from "./contexts/DeviceInfoContext";
import { useDevicesSync } from "./hooks/useDevicesSync";
import { DeviceStateContext, deviceStateReducer } from "./contexts/DeviceStateContext";
Expand Down Expand Up @@ -74,6 +76,7 @@ export const BRANDING = {

export default function App() {
const [scheduledJobs, schedulerDispatch] = useReducer(reducer, {});
const [jobRuns, jobRunsDispatch] = useReducer(jobRunsReducer, {});
const [deviceInfo, deviceInfoDispatch] = useReducer(deviceInfoReducer, {});
const [deviceStates, deviceStateDispatch] = useReducer(deviceStateReducer, null);
const parameterStore = useParameterStore();
Expand All @@ -82,6 +85,7 @@ export default function App() {
const experiments = useExperiments();

useJobsSync(schedulerDispatch);
useJobRunsSync(jobRunsDispatch);
useDevicesSync(deviceStateDispatch, deviceInfoDispatch);

return (
Expand All @@ -91,16 +95,18 @@ export default function App() {
<DeviceStateContext.Provider value={deviceStates}>
<DeviceInfoContext.Provider value={deviceInfo}>
<JobsContext.Provider value={scheduledJobs}>
<ParameterDisplayGroupsContext.Provider
value={{
parameterDisplayGroups,
parameterNamespaceToDisplayGroups,
}}
>
<ExperimentsContext.Provider value={experiments}>
<Outlet />
</ExperimentsContext.Provider>
</ParameterDisplayGroupsContext.Provider>
<JobRunsContext.Provider value={jobRuns}>
<ParameterDisplayGroupsContext.Provider
value={{
parameterDisplayGroups,
parameterNamespaceToDisplayGroups,
}}
>
<ExperimentsContext.Provider value={experiments}>
<Outlet />
</ExperimentsContext.Provider>
</ParameterDisplayGroupsContext.Provider>
</JobRunsContext.Provider>
</JobsContext.Provider>
</DeviceInfoContext.Provider>
</DeviceStateContext.Provider>
Expand Down
1 change: 1 addition & 0 deletions frontend/src/components/JobStatusIndicator.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ const statusColorMap: Record<JobRunStatus, string> = {
[JobRunStatus.DONE]: "grey",
[JobRunStatus.PROCESSING]: "green",
[JobRunStatus.FAILED]: "red",
[JobRunStatus.PAUSED]: "orange",
};

const capitalize = (s: string | undefined) =>
Expand Down
28 changes: 27 additions & 1 deletion frontend/src/components/JobView.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -22,10 +22,12 @@ import { useJobInfo } from "../hooks/useJobInfo";
import { runMethod } from "../socket";
import { ExperimentMetadata } from "../types/ExperimentMetadata";
import { SerializedObject } from "../types/SerializedObject";
import { JobStatus } from "../types/enums";
import { JobRunStatus, JobStatus } from "../types/enums";
import { deserialize } from "../utils/deserializer";
import { updateJobParams } from "../utils/updateJobParams";
import { cancelJob } from "../utils/cancelJob";
import { pauseJob } from "../utils/pauseJob";
import { resumeJob } from "../utils/resumeJob";
import HistogramPlot from "./jobView/HistogramPlot";
import FitPanel from "./jobView/FitPanel";

Expand Down Expand Up @@ -226,11 +228,35 @@ export const JobView = ({
{jobInfo?.status !== JobStatus.PROCESSED && (
<>
<div style={{ flexGrow: 1 }} />
<Button
variant="contained"
color="primary"
disabled={jobRunInfo?.status !== JobRunStatus.PAUSED}
size="small"
onClick={() => {
if (jobId) resumeJob(Number(jobId));
}}
>
Resume
</Button>
<Button
variant="contained"
color="primary"
disabled={jobRunInfo?.status !== JobRunStatus.PROCESSING}
size="small"
sx={{ ml: 1 }}
onClick={() => {
if (jobId) pauseJob(Number(jobId));
}}
>
Pause
</Button>
<Button
variant="contained"
color="error"
disabled={jobInfo === null}
size="small"
sx={{ ml: 1 }}
onClick={() => {
if (jobId) cancelJob(Number(jobId));
}}
Expand Down
37 changes: 37 additions & 0 deletions frontend/src/contexts/JobRunsContext.tsx
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
import { createContext } from "react";
import { JobRun } from "../types/JobRun";

export interface JobRunUpdate {
run_id: number;
job_id: number;
updated_properties: Record<string, string>;
}

export type JobRunsByJobId = Record<number, JobRun>;

export type Action =
| { type: "SET_JOB_RUNS"; payload: JobRunsByJobId }
| { type: "ADD_JOB_RUN"; payload: JobRun }
| { type: "UPDATE_JOB_RUN"; payload: JobRunUpdate };

export const reducer = (state: JobRunsByJobId, action: Action): JobRunsByJobId => {
switch (action.type) {
case "SET_JOB_RUNS":
return { ...state, ...action.payload };
case "ADD_JOB_RUN":
return { ...state, [action.payload.job_id]: action.payload };
case "UPDATE_JOB_RUN": {
const run = state[action.payload.job_id];
if (!run) return state;

return {
...state,
[action.payload.job_id]: { ...run, ...action.payload.updated_properties },
};
}
default:
return state;
}
};

export const JobRunsContext = createContext<JobRunsByJobId>({});
41 changes: 41 additions & 0 deletions frontend/src/hooks/useJobRunsSync.tsx
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
import { Dispatch, useEffect } from "react";
import { runMethod, socket } from "../socket";
import { deserialize } from "../utils/deserializer";
import { JobRun } from "../types/JobRun";
import { Action, JobRunUpdate } from "../contexts/JobRunsContext";
import { SerializedObject } from "../types/SerializedObject";

interface NewJobRunEvent {
job_run: JobRun;
}

/**
* Synchronizes the latest job run per job with the backend scheduler.
*
* - Fetches the initial map via `scheduler.get_job_runs`.
* - Listens for `job_run.new` and `job_run.update` events.
* - Cleans up socket listeners on unmount.
*/
export function useJobRunsSync(dispatch: Dispatch<Action>) {
useEffect(() => {
runMethod("scheduler.get_job_runs", [], {}, (ack) => {
dispatch({
type: "SET_JOB_RUNS",
payload: deserialize(ack as SerializedObject),
});
});

const handleNew = (data: NewJobRunEvent) =>
dispatch({ type: "ADD_JOB_RUN", payload: data.job_run });
const handleUpdate = (data: JobRunUpdate) =>
dispatch({ type: "UPDATE_JOB_RUN", payload: data });

socket.on("job_run.new", handleNew);
socket.on("job_run.update", handleUpdate);

return () => {
socket.off("job_run.new", handleNew);
socket.off("job_run.update", handleUpdate);
};
}, [dispatch]);
}
5 changes: 5 additions & 0 deletions frontend/src/pages/data.tsx
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
import React, { useContext, useMemo } from "react";
import { List, ListItemButton, ListItemText, ListSubheader } from "@mui/material";
import { JobsContext } from "../contexts/JobsContext";
import { JobRunsContext } from "../contexts/JobRunsContext";
import { JobStatusIndicator } from "../components/JobStatusIndicator";
import { JobView } from "../components/JobView";
import { useSearchParams } from "react-router";
import { Job } from "../types/Job";
Expand All @@ -10,6 +12,7 @@ import { getExperimentNameFromExperimentId } from "../utils/experimentUtils";

export function DataPage() {
const jobs = useContext(JobsContext);
const jobRuns = useContext(JobRunsContext);
const [searchParams, setSearchParams] = useSearchParams();
const selectedJobId = searchParams.get("jobId");

Expand Down Expand Up @@ -77,6 +80,7 @@ export function DataPage() {
second: "2-digit",
}).format(new Date(job.created));

const run = jobRuns[job.id];
return (
<ListItemButton
key={job.id}
Expand All @@ -86,6 +90,7 @@ export function DataPage() {
openJobWindow(job.id, job.experiment_source.experiment_id)
}
>
<JobStatusIndicator status={run?.status} log={run?.log} />
<ListItemText
primary={`${getExperimentNameFromExperimentId(job.experiment_source.experiment_id)} (${
job.scan_parameters.length === 0
Expand Down
1 change: 1 addition & 0 deletions frontend/src/types/enums.ts
Original file line number Diff line number Diff line change
Expand Up @@ -15,4 +15,5 @@ export enum JobRunStatus {
FAILED = "failed",
CANCELLED = "cancelled",
DONE = "done",
PAUSED = "paused",
}
5 changes: 5 additions & 0 deletions frontend/src/utils/pauseJob.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
import { runMethod } from "../socket";

export const pauseJob = (jobId: number, callback?: (ack: unknown) => void) => {
runMethod("scheduler.pause_job", [], { job_id: jobId }, callback);
};
5 changes: 5 additions & 0 deletions frontend/src/utils/resumeJob.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
import { runMethod } from "../socket";

export const resumeJob = (jobId: number, callback?: (ack: unknown) => void) => {
runMethod("scheduler.resume_job", [], { job_id: jobId }, callback);
};
54 changes: 53 additions & 1 deletion src/icon/server/api/scheduler_controller.py
Original file line number Diff line number Diff line change
Expand Up @@ -181,13 +181,57 @@ def cancel_job(self, *, job_id: int) -> None:
if job.status in (JobStatus.PROCESSING, JobStatus.SUBMITTED):
JobRepository.update_job_status(job=job, status=JobStatus.PROCESSED)
job_run = JobRunRepository.get_run_by_job_id(job_id=job_id)
if job_run.status in (JobRunStatus.PENDING, JobRunStatus.PROCESSING):
if job_run.status in (
JobRunStatus.PENDING,
JobRunStatus.PROCESSING,
JobRunStatus.PAUSED,
):
JobRunRepository.update_run_by_id(
run_id=job_run.id,
status=JobRunStatus.CANCELLED,
log="Cancelled through user interaction.",
)

def pause_job(self, *, job_id: int) -> None:
"""Pause a running job.

The pre-processing worker holding the job will finish any in-flight work and
then block in a polling loop, keeping the remaining scan state in memory.
Tasks already queued for the hardware worker are diverted back to the
pre-processing worker via the existing ``outdated_tasks`` rewind mechanism.

No-op if the job run is not in ``PROCESSING`` state.

Args:
job_id: ID of the job to pause.
"""
job_run = JobRunRepository.get_run_by_job_id(job_id=job_id)
if job_run.status == JobRunStatus.PROCESSING:
JobRunRepository.update_run_by_id(
run_id=job_run.id,
status=JobRunStatus.PAUSED,
log="Paused through user interaction.",
)

def resume_job(self, *, job_id: int) -> None:
"""Resume a paused job.

The pre-processing worker observes the status change, regenerates any tasks
the hardware worker diverted while paused (picking up fresh parameter values
in the process), and continues producing data points from where it left off.

No-op if the job run is not in ``PAUSED`` state.

Args:
job_id: ID of the job to resume.
"""
job_run = JobRunRepository.get_run_by_job_id(job_id=job_id)
if job_run.status == JobRunStatus.PAUSED:
JobRunRepository.update_run_by_id(
run_id=job_run.id,
status=JobRunStatus.PROCESSING,
)

def get_scheduled_jobs(
self,
*,
Expand Down Expand Up @@ -239,6 +283,14 @@ def get_job_run_by_id(self, *, job_id: int) -> JobRun:
"""
return JobRunRepository.get_run_by_job_id(job_id=job_id)

def get_job_runs(self) -> dict[int, JobRun]:
"""Fetch the most recent job run for every job.

Returns:
Mapping from job ID to the latest job run record.
"""
return {run.job_id: run for run in JobRunRepository.get_latest_runs()}

async def _cast_scan_values_to_param_type(
self,
values: list[str | int | bool | float] | None = None,
Expand Down
3 changes: 3 additions & 0 deletions src/icon/server/data_access/models/enums.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,9 @@ class JobRunStatus(enum.Enum):
"""Run was cancelled before completion."""
DONE = "done"
"""Run completed successfully."""
PAUSED = "paused"
"""Run has been paused by the user; its pre-processing worker is holding its
remaining state in memory until the run is resumed or cancelled."""


class DeviceStatus(enum.Enum):
Expand Down
26 changes: 26 additions & 0 deletions src/icon/server/data_access/repositories/job_run_repository.py
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,7 @@ def update_run_by_id(
.returning(JobRun)
)
run = session.execute(stmt).scalar_one()
job_id = run.job_id
session.commit()

logger.debug("Updated run %s", run)
Expand All @@ -102,6 +103,7 @@ def update_run_by_id(
"event": "job_run.update",
"data": {
"run_id": run_id,
"job_id": job_id,
"updated_properties": {
"status": status.value,
"log": log,
Expand Down Expand Up @@ -142,6 +144,30 @@ def get_runs_by_status(

return session.execute(stmt).scalars().all()

@staticmethod
def get_latest_runs() -> Sequence[JobRun]:
"""Return the most recent job run for every job.

Returns:
One `JobRun` per `job_id`, picking the one with the latest
`scheduled_time` when multiple runs exist for a job.
"""
with sqlalchemy.orm.Session(engine) as session:
subq = (
select(
JobRun.job_id,
sqlalchemy.func.max(JobRun.scheduled_time).label("max_time"),
)
.group_by(JobRun.job_id)
.subquery()
)
stmt = select(JobRun).join(
subq,
(JobRun.job_id == subq.c.job_id)
& (JobRun.scheduled_time == subq.c.max_time),
)
return session.execute(stmt).scalars().all()

@staticmethod
def get_run_by_job_id(*, job_id: int, load_job: bool = False) -> JobRun:
"""Return the run associated with a given job ID.
Expand Down
Loading
Loading