From 1b6718d10ac42bf151f122031b7ec7fb21e75dc8 Mon Sep 17 00:00:00 2001 From: Masato Fukushima Date: Thu, 25 Dec 2025 14:33:48 +0900 Subject: [PATCH 1/8] Add greedy scheduling option --- graphqomb/greedy_scheduler.py | 332 +++++++++++++++++++++++++ graphqomb/schedule_solver.py | 1 + graphqomb/scheduler.py | 34 +-- tests/test_greedy_scheduler.py | 431 +++++++++++++++++++++++++++++++++ 4 files changed, 781 insertions(+), 17 deletions(-) create mode 100644 graphqomb/greedy_scheduler.py create mode 100644 tests/test_greedy_scheduler.py diff --git a/graphqomb/greedy_scheduler.py b/graphqomb/greedy_scheduler.py new file mode 100644 index 000000000..442349cb8 --- /dev/null +++ b/graphqomb/greedy_scheduler.py @@ -0,0 +1,332 @@ +"""Greedy heuristic scheduler for fast MBQC pattern scheduling. + +This module provides fast greedy scheduling algorithms as an alternative to +CP-SAT based optimization. The greedy algorithms provide approximate solutions +with speedup compared to CP-SAT, making them suitable for large-scale +graphs or when optimality is not critical. + +This module provides: + +- `greedy_minimize_time`: Fast greedy scheduler optimizing for minimal execution time +- `greedy_minimize_space`: Fast greedy scheduler optimizing for minimal qubit usage +""" + +from __future__ import annotations + +from graphlib import CycleError, TopologicalSorter +from typing import TYPE_CHECKING + +if TYPE_CHECKING: + from collections.abc import Mapping + from collections.abc import Set as AbstractSet + + from graphqomb.graphstate import BaseGraphState + + +def greedy_minimize_time( # noqa: C901, PLR0912 + graph: BaseGraphState, + dag: Mapping[int, AbstractSet[int]], + max_qubit_count: int | None = None, +) -> tuple[dict[int, int], dict[int, int]]: + r"""Fast greedy scheduler optimizing for minimal execution time (makespan). + + This algorithm uses a straightforward greedy approach: + 1. At each time step, measure all nodes that can be measured + 2. Prepare all neighbors of measured nodes just before measurement + + Parameters + ---------- + graph : `BaseGraphState` + The graph state to schedule + dag : `collections.abc.Mapping`\[`int`, `collections.abc.Set`\[`int`\]\] + The directed acyclic graph representing measurement dependencies + max_qubit_count : `int` | `None`, optional + Maximum allowed number of active qubits. If None, no limit is enforced. + + Returns + ------- + `tuple`\[`dict`\[`int`, `int`\], `dict`\[`int`, `int`\]\] + A tuple of (prepare_time, measure_time) dictionaries + + Raises + ------ + RuntimeError + If no nodes can be measured at a given time step, indicating a possible + cyclic dependency or incomplete preparation, or if max_qubit_count + is too small to make progress. + """ + prepare_time: dict[int, int] = {} + measure_time: dict[int, int] = {} + + unmeasured = graph.physical_nodes - graph.output_node_indices.keys() + + # Build inverse DAG: for each node, track which nodes must be measured before it + inv_dag: dict[int, set[int]] = {node: set() for node in graph.physical_nodes} + for parent, children in dag.items(): + for child in children: + inv_dag[child].add(parent) + + prepared: set[int] = set(graph.input_node_indices.keys()) + alive: set[int] = set(graph.input_node_indices.keys()) + + if max_qubit_count is not None and len(alive) > max_qubit_count: + msg = "Initial number of active qubits exceeds max_qubit_count." + raise RuntimeError(msg) + + current_time = 0 + + # Nodes whose dependencies are all resolved and are not yet measured + measure_candidates: set[int] = {node for node in unmeasured if not inv_dag[node]} + + # Cache neighbors to avoid repeated set constructions in tight loops + neighbors_map = {node: graph.neighbors(node) for node in graph.physical_nodes} + + while unmeasured: # noqa: PLR1702 + if not measure_candidates: + msg = "No nodes can be measured; possible cyclic dependency or incomplete preparation." + raise RuntimeError(msg) + + if max_qubit_count is not None: + # Choose measurement nodes from measure_candidates while respecting max_qubit_count + to_measure, to_prepare = _determine_measure_nodes( + neighbors_map, + measure_candidates, + prepared, + alive, + max_qubit_count, + ) + needs_prep = bool(to_prepare) + for neighbor in to_prepare: + prepare_time[neighbor] = current_time + # If this neighbor already had no dependencies, it becomes measure candidate + if not inv_dag[neighbor] and neighbor in unmeasured: + measure_candidates.add(neighbor) + prepared.update(to_prepare) + alive.update(to_prepare) + else: + # Without a qubit limit, measure all currently measure candidates + to_measure = set(measure_candidates) + needs_prep = False + for node in to_measure: + for neighbor in neighbors_map[node]: + if neighbor not in prepared: + prepare_time[neighbor] = current_time + prepared.add(neighbor) + needs_prep = True + + if not inv_dag[neighbor] and neighbor in unmeasured: + measure_candidates.add(neighbor) + + # Measure at current_time if no prep needed, otherwise at current_time + 1 + meas_time = current_time + 1 if needs_prep else current_time + + for node in to_measure: + measure_time[node] = meas_time + if max_qubit_count is not None: + alive.remove(node) + unmeasured.remove(node) + measure_candidates.remove(node) + + # Remove measured node from dependencies of all its children in the DAG + for child in dag.get(node, ()): + inv_dag[child].remove(node) + if not inv_dag[child] and child in unmeasured: + measure_candidates.add(child) + + current_time = meas_time + 1 + + return prepare_time, measure_time + + +def _determine_measure_nodes( + neighbors_map: Mapping[int, AbstractSet[int]], + measure_candidates: AbstractSet[int], + prepared: AbstractSet[int], + alive: AbstractSet[int], + max_qubit_count: int, +) -> tuple[set[int], set[int]]: + r"""Determine which nodes to measure without exceeding max qubit count. + + Parameters + ---------- + neighbors_map : `collections.abc.Mapping`\[`int`, `collections.abc.Set`\[`int`\]\] + Mapping from node to its neighbors. + measure_candidates : `collections.abc.Set`\[`int`\] + The candidate nodes available for measurement. + prepared : `collections.abc.Set`\[`int`\] + The set of currently prepared nodes. + alive : `collections.abc.Set`\[`int`\] + The set of currently active (prepared but not yet measured) nodes. + max_qubit_count : `int` + The maximum allowed number of active qubits. + + Returns + ------- + `tuple`\[`set`\[`int`\], `set`\[`int`\]\] + A tuple of (to_measure, to_prepare) sets indicating which nodes to measure and prepare. + + Raises + ------ + RuntimeError + If no nodes can be measured without exceeding the max qubit count. + """ + to_measure: set[int] = set() + to_prepare: set[int] = set() + + for node in measure_candidates: + # Neighbors that still need to be prepared for this node + new_neighbors = neighbors_map[node] - prepared + additional_to_prepare = new_neighbors - to_prepare + + # Projected number of active qubits after preparing these neighbors + projected_active = len(alive) + len(to_prepare) + len(additional_to_prepare) + + if projected_active <= max_qubit_count: + to_measure.add(node) + to_prepare |= new_neighbors + + if not to_measure: + msg = "Cannot schedule more measurements without exceeding max qubit count. Please increase max_qubit_count." + raise RuntimeError(msg) + + return to_measure, to_prepare + + +def greedy_minimize_space( # noqa: C901, PLR0914 + graph: BaseGraphState, + dag: Mapping[int, AbstractSet[int]], +) -> tuple[dict[int, int], dict[int, int]]: + r"""Fast greedy scheduler optimizing for minimal qubit usage (space). + + This algorithm uses a greedy approach to minimize the number of active + qubits at each time step: + 1. At each time step, select the next node to measure that minimizes the + projected number of alive qubits after any required preparations. + 2. Prepare neighbors of the measured node just before measurement. + + Parameters + ---------- + graph : `BaseGraphState` + The graph state to schedule + dag : `collections.abc.Mapping`\[`int`, `collections.abc.Set`\[`int`\]\] + The directed acyclic graph representing measurement dependencies + + Returns + ------- + `tuple`\[`dict`\[`int`, `int`\], `dict`\[`int`, `int`\]\] + A tuple of (prepare_time, measure_time) dictionaries + + Raises + ------ + RuntimeError + If no nodes can be measured at a given time step, indicating a possible + cyclic dependency or incomplete preparation. + """ + prepare_time: dict[int, int] = {} + measure_time: dict[int, int] = {} + + unmeasured = graph.physical_nodes - graph.output_node_indices.keys() + + try: + topo_order = list(TopologicalSorter(dag).static_order()) + except CycleError as exc: + msg = "No nodes can be measured; possible cyclic dependency or incomplete preparation." + raise RuntimeError(msg) from exc + topo_order.reverse() # from parents to children + topo_rank = {node: i for i, node in enumerate(topo_order)} + + # Build inverse DAG: for each node, track which nodes must be measured before it + inv_dag: dict[int, set[int]] = {node: set() for node in graph.physical_nodes} + for parent, children in dag.items(): + for child in children: + inv_dag[child].add(parent) + + prepared: set[int] = set(graph.input_node_indices.keys()) + alive: set[int] = set(graph.input_node_indices.keys()) + current_time = 0 + + # Cache neighbors once as the graph is static during scheduling + neighbors_map = {node: graph.neighbors(node) for node in graph.physical_nodes} + + measure_candidates: set[int] = {node for node in unmeasured if not inv_dag[node]} + + while unmeasured: + if not measure_candidates: + msg = "No nodes can be measured; possible cyclic dependency or incomplete preparation." + raise RuntimeError(msg) + + # calculate costs and pick the best node to measure + default_rank = len(topo_rank) + candidates = iter(measure_candidates) + best_node = next(candidates) + best_cost = _calc_activate_cost(best_node, neighbors_map, prepared, alive) + best_rank = topo_rank.get(best_node, default_rank) + for node in candidates: + cost = _calc_activate_cost(node, neighbors_map, prepared, alive) + rank = topo_rank.get(node, default_rank) + if cost < best_cost or (cost == best_cost and rank < best_rank): + best_cost = cost + best_rank = rank + best_node = node + + # Prepare neighbors at current_time + new_neighbors = neighbors_map[best_node] - prepared + needs_prep = bool(new_neighbors) + if needs_prep: + for neighbor in new_neighbors: + prepare_time[neighbor] = current_time + prepared.update(new_neighbors) + alive.update(new_neighbors) + + # Measure at current_time if no prep needed, otherwise at current_time + 1 + meas_time = current_time + 1 if needs_prep else current_time + measure_time[best_node] = meas_time + unmeasured.remove(best_node) + alive.remove(best_node) + + measure_candidates.remove(best_node) + + # Remove measured node from dependencies of all its children in the DAG + for child in dag.get(best_node, ()): + inv_dag[child].remove(best_node) + if not inv_dag[child] and child in unmeasured: + measure_candidates.add(child) + + current_time = meas_time + 1 + + return prepare_time, measure_time + + +def _calc_activate_cost( + node: int, + neighbors_map: Mapping[int, AbstractSet[int]], + prepared: AbstractSet[int], + alive: AbstractSet[int], +) -> int: + r"""Calculate the projected number of alive qubits if measuring this node next. + + If neighbors must be prepared, they become alive at the current time slice + while the node itself remains alive until the next slice. If no preparation + is needed, the node is measured in the current slice and removed. + + Parameters + ---------- + node : `int` + The node to evaluate. + neighbors_map : `collections.abc.Mapping`\[`int`, `collections.abc.Set`\[`int`\]\] + Cached neighbor sets for graph nodes. + prepared : `collections.abc.Set`\[`int`\] + The set of currently prepared nodes. + alive : `collections.abc.Set`\[`int`\] + The set of currently active (prepared but not yet measured) nodes. + + Returns + ------- + `int` + The activation cost for the node. + """ + new_neighbors = neighbors_map[node] - prepared + if new_neighbors: + return len(alive) + len(new_neighbors) + # No preparation needed -> node is measured in the current slice, so alive decreases by 1. + return max(len(alive) - 1, 0) diff --git a/graphqomb/schedule_solver.py b/graphqomb/schedule_solver.py index 90cbeb053..fa12f498a 100644 --- a/graphqomb/schedule_solver.py +++ b/graphqomb/schedule_solver.py @@ -37,6 +37,7 @@ class ScheduleConfig: strategy: Strategy max_qubit_count: int | None = None max_time: int | None = None + use_greedy: bool = False @dataclass diff --git a/graphqomb/scheduler.py b/graphqomb/scheduler.py index 2e1498576..b5d16fb56 100644 --- a/graphqomb/scheduler.py +++ b/graphqomb/scheduler.py @@ -12,6 +12,7 @@ from typing import TYPE_CHECKING, NamedTuple from graphqomb.feedforward import dag_from_flow +from graphqomb.greedy_scheduler import greedy_minimize_space, greedy_minimize_time from graphqomb.schedule_solver import ScheduleConfig, Strategy, solve_schedule if TYPE_CHECKING: @@ -224,11 +225,6 @@ def manual_schedule( ----- After setting preparation and measurement times, any unscheduled entanglement times (with `None` value) are automatically scheduled using `auto_schedule_entanglement()`. - - The graph is treated as undirected. For convenience, `entangle_time` accepts edges - in either order: both ``(u, v)`` and ``(v, u)`` are recognized. If both keys are - provided, the canonical order (as returned by :attr:`BaseGraphState.physical_edges`) - takes precedence, even when the value is ``None``. """ self.prepare_time = { node: prepare_time.get(node, None) @@ -239,14 +235,7 @@ def manual_schedule( for node in self.graph.physical_nodes - self.graph.output_node_indices.keys() } if entangle_time is not None: - resolved_entangle_time: dict[tuple[int, int], int | None] = {} - for edge in self.entangle_time: - if edge in entangle_time: - resolved_entangle_time[edge] = entangle_time[edge] - else: - u, v = edge - resolved_entangle_time[edge] = entangle_time.get((v, u), None) - self.entangle_time = resolved_entangle_time + self.entangle_time = {edge: entangle_time.get(edge, None) for edge in self.entangle_time} # Auto-schedule unscheduled entanglement times if any(time is None for time in self.entangle_time.values()): @@ -496,14 +485,15 @@ def solve_schedule( config: ScheduleConfig | None = None, timeout: int = 60, ) -> bool: - r"""Compute the schedule using the constraint programming solver. + r"""Compute the schedule using constraint programming or greedy heuristics. Parameters ---------- config : `ScheduleConfig` | `None`, optional - The scheduling configuration. If None, defaults to MINIMIZE_SPACE strategy. + The scheduling configuration. If None, defaults to MINIMIZE_TIME strategy. timeout : `int`, optional - Maximum solve time in seconds, by default 60 + Maximum solve time in seconds for CP-SAT solver, by default 60. + Ignored when use_greedy=True. Returns ------- @@ -518,7 +508,17 @@ def solve_schedule( if config is None: config = ScheduleConfig(Strategy.MINIMIZE_TIME) - result = solve_schedule(self.graph, self.dag, config, timeout) + result: tuple[dict[int, int], dict[int, int]] | None + if config.use_greedy: + # Use fast greedy heuristics + if config.strategy == Strategy.MINIMIZE_TIME: + result = greedy_minimize_time(self.graph, self.dag, max_qubit_count=config.max_qubit_count) + else: # Strategy.MINIMIZE_SPACE + result = greedy_minimize_space(self.graph, self.dag) + else: + # Use CP-SAT solver for optimal solution + result = solve_schedule(self.graph, self.dag, config, timeout) + if result is None: return False diff --git a/tests/test_greedy_scheduler.py b/tests/test_greedy_scheduler.py new file mode 100644 index 000000000..833a8dab4 --- /dev/null +++ b/tests/test_greedy_scheduler.py @@ -0,0 +1,431 @@ +"""Test greedy scheduling algorithms.""" + +import pytest + +from graphqomb.graphstate import GraphState +from graphqomb.greedy_scheduler import ( + greedy_minimize_space, + greedy_minimize_time, +) +from graphqomb.schedule_solver import ScheduleConfig, Strategy +from graphqomb.scheduler import Scheduler + + +def test_greedy_minimize_time_simple() -> None: + """Test greedy_minimize_time on a simple graph.""" + # Create a simple 3-node chain graph + graph = GraphState() + node0 = graph.add_physical_node() + node1 = graph.add_physical_node() + node2 = graph.add_physical_node() + graph.add_physical_edge(node0, node1) + graph.add_physical_edge(node1, node2) + qindex = 0 + graph.register_input(node0, qindex) + graph.register_output(node2, qindex) + + flow = {node0: {node1}, node1: {node2}} + scheduler = Scheduler(graph, flow) + + # Run greedy scheduler + prepare_time, measure_time = greedy_minimize_time(graph, scheduler.dag) + + # Check that all non-input nodes have preparation times + assert node1 in prepare_time + assert node0 not in prepare_time # Input node should not be prepared + + # Check that all non-output nodes have measurement times + assert node0 in measure_time + assert node1 in measure_time + assert node2 not in measure_time # Output node should not be measured + + # Verify DAG constraints: node0 measured before node1 + assert measure_time[node0] < measure_time[node1] + + +def test_greedy_minimize_space_simple() -> None: + """Test greedy_minimize_space on a simple graph.""" + # Create a simple 3-node chain graph + graph = GraphState() + node0 = graph.add_physical_node() + node1 = graph.add_physical_node() + node2 = graph.add_physical_node() + graph.add_physical_edge(node0, node1) + graph.add_physical_edge(node1, node2) + qindex = 0 + graph.register_input(node0, qindex) + graph.register_output(node2, qindex) + + flow = {node0: {node1}, node1: {node2}} + scheduler = Scheduler(graph, flow) + + # Run greedy scheduler + prepare_time, measure_time = greedy_minimize_space(graph, scheduler.dag) + + # Check that all non-input nodes have preparation times + assert node1 in prepare_time + assert node0 not in prepare_time # Input node should not be prepared + + # Check that all non-output nodes have measurement times + assert node0 in measure_time + assert node1 in measure_time + assert node2 not in measure_time # Output node should not be measured + + # Verify DAG constraints + assert measure_time[node0] < measure_time[node1] + + +def _compute_max_alive_qubits( + graph: GraphState, + prepare_time: dict[int, int], + measure_time: dict[int, int], +) -> int: + """Compute the maximum number of alive qubits over time. + + A node is considered alive at time t if: + - It is an input node and t >= -1 and t < measurement time (if any), or + - It has a preparation time p and t >= p and t < measurement time (if any). + + Returns + ------- + int + The maximum number of alive qubits at any time step. + """ + # Determine time range to check + max_t = max(set(prepare_time.values()) | set(measure_time.values()), default=0) + + max_alive = len(graph.input_node_indices) # At least inputs are alive at t = -1 + for t in range(max_t + 1): + alive_nodes: set[int] = set() + for node in graph.physical_nodes: + # Determine preparation time + prep_t = -1 if node in graph.input_node_indices else prepare_time.get(node) + + if prep_t is None or t < prep_t: + continue + + # Determine measurement time (None for outputs or unscheduled) + meas_t = measure_time.get(node) + + if meas_t is None or t < meas_t: + alive_nodes.add(node) + + max_alive = max(max_alive, len(alive_nodes)) + + return max_alive + + +def test_greedy_minimize_time_with_max_qubit_count_respects_limit() -> None: + """Verify that greedy_minimize_time respects max_qubit_count.""" + graph = GraphState() + # chain graph: 0-1-2-3 + n0 = graph.add_physical_node() + n1 = graph.add_physical_node() + n2 = graph.add_physical_node() + n3 = graph.add_physical_node() + graph.add_physical_edge(n0, n1) + graph.add_physical_edge(n1, n2) + graph.add_physical_edge(n2, n3) + + qindex = 0 + graph.register_input(n0, qindex) + graph.register_output(n3, qindex) + + flow = {n0: {n1}, n1: {n2}, n2: {n3}} + scheduler = Scheduler(graph, flow) + + # Set max_qubit_count to 2 (a feasible value for this graph) + prepare_time, measure_time = greedy_minimize_time(graph, scheduler.dag, max_qubit_count=2) + + # Check basic properties + assert n1 in prepare_time + assert n0 not in prepare_time + assert n0 in measure_time + assert n2 in measure_time + assert n3 not in measure_time + + # Verify that the number of alive qubits never exceeds the limit + max_alive = _compute_max_alive_qubits(graph, prepare_time, measure_time) + assert max_alive <= 2 + + +def test_greedy_minimize_time_with_too_small_max_qubit_count_raises() -> None: + """Verify that greedy_minimize_time raises RuntimeError when max_qubit_count is too small.""" + graph = GraphState() + # chain graph: 0-1-2 (at least 2 qubits are needed) + n0 = graph.add_physical_node() + n1 = graph.add_physical_node() + n2 = graph.add_physical_node() + graph.add_physical_edge(n0, n1) + graph.add_physical_edge(n1, n2) + + qindex = 0 + graph.register_input(n0, qindex) + graph.register_output(n2, qindex) + + flow = {n0: {n1}, n1: {n2}} + scheduler = Scheduler(graph, flow) + + # max_qubit_count=1 is not feasible, so expect RuntimeError + with pytest.raises(RuntimeError, match="max_qubit_count"): + greedy_minimize_time(graph, scheduler.dag, max_qubit_count=1) + + +def test_greedy_scheduler_via_solve_schedule() -> None: + """Test greedy scheduler through Scheduler.solve_schedule with use_greedy=True.""" + # Create a simple graph + graph = GraphState() + node0 = graph.add_physical_node() + node1 = graph.add_physical_node() + node2 = graph.add_physical_node() + graph.add_physical_edge(node0, node1) + graph.add_physical_edge(node1, node2) + qindex = 0 + graph.register_input(node0, qindex) + graph.register_output(node2, qindex) + + flow = {node0: {node1}, node1: {node2}} + scheduler = Scheduler(graph, flow) + + # Test with greedy MINIMIZE_TIME + config = ScheduleConfig(strategy=Strategy.MINIMIZE_TIME, use_greedy=True) + success = scheduler.solve_schedule(config) + assert success + + # Verify schedule is valid + scheduler.validate_schedule() + + # Test with greedy MINIMIZE_SPACE + scheduler2 = Scheduler(graph, flow) + config = ScheduleConfig(strategy=Strategy.MINIMIZE_SPACE, use_greedy=True) + success = scheduler2.solve_schedule(config) + assert success + + # Verify schedule is valid + scheduler2.validate_schedule() + + +def test_greedy_vs_cpsat_correctness() -> None: + """Test that greedy scheduler produces valid schedules compared to CP-SAT.""" + # Create a slightly larger graph + graph = GraphState() + nodes = [graph.add_physical_node() for _ in range(5)] + + # Create a chain + for i in range(4): + graph.add_physical_edge(nodes[i], nodes[i + 1]) + + qindex = 0 + graph.register_input(nodes[0], qindex) + graph.register_output(nodes[4], qindex) + + flow = {nodes[i]: {nodes[i + 1]} for i in range(4)} + + # Test greedy scheduler + scheduler_greedy = Scheduler(graph, flow) + config = ScheduleConfig(strategy=Strategy.MINIMIZE_TIME, use_greedy=True) + success_greedy = scheduler_greedy.solve_schedule(config) + assert success_greedy + + # Verify greedy schedule is valid + scheduler_greedy.validate_schedule() + + # Test CP-SAT scheduler + scheduler_cpsat = Scheduler(graph, flow) + config = ScheduleConfig(strategy=Strategy.MINIMIZE_TIME, use_greedy=False) + success_cpsat = scheduler_cpsat.solve_schedule(config, timeout=10) + assert success_cpsat + + # Verify CP-SAT schedule is valid + scheduler_cpsat.validate_schedule() + + # Both should produce valid schedules + # Note: Greedy may not be optimal, so we don't compare quality here + + +def test_greedy_scheduler_larger_graph() -> None: + """Test greedy scheduler on a larger graph to ensure scalability.""" + # Create a larger graph with branching structure + graph = GraphState() + num_layers = 4 + nodes_per_layer = 3 + + # Build layered graph + all_nodes: list[list[int]] = [] + for layer in range(num_layers): + layer_nodes = [graph.add_physical_node() for _ in range(nodes_per_layer)] + all_nodes.append(layer_nodes) + + # Connect to previous layer (if not first layer) + if layer > 0: + for i, node in enumerate(layer_nodes): + # Connect to corresponding node in previous layer + prev_node = all_nodes[layer - 1][i] + graph.add_physical_edge(prev_node, node) + + # Register inputs (first layer) and outputs (last layer) + for i, node in enumerate(all_nodes[0]): + graph.register_input(node, i) + for i, node in enumerate(all_nodes[-1]): + graph.register_output(node, i) + + # Build flow (simple forward flow) + flow: dict[int, set[int]] = {} + for layer in range(num_layers - 1): + for i, node in enumerate(all_nodes[layer]): + if node not in graph.output_node_indices: + flow[node] = {all_nodes[layer + 1][i]} + + # Test greedy scheduler + scheduler = Scheduler(graph, flow) + config = ScheduleConfig(strategy=Strategy.MINIMIZE_TIME, use_greedy=True) + success = scheduler.solve_schedule(config) + assert success + + # Validate the schedule + scheduler.validate_schedule() + + # Check that we got reasonable results + assert scheduler.num_slices() > 0 + assert scheduler.num_slices() <= num_layers * 2 # Reasonable upper bound + + +@pytest.mark.parametrize("strategy", [Strategy.MINIMIZE_TIME, Strategy.MINIMIZE_SPACE]) +def test_greedy_scheduler_both_strategies(strategy: Strategy) -> None: + """Test greedy scheduler with both optimization strategies.""" + # Create a graph + graph = GraphState() + node0 = graph.add_physical_node() + node1 = graph.add_physical_node() + node2 = graph.add_physical_node() + node3 = graph.add_physical_node() + graph.add_physical_edge(node0, node1) + graph.add_physical_edge(node1, node2) + graph.add_physical_edge(node2, node3) + qindex = 0 + graph.register_input(node0, qindex) + graph.register_output(node3, qindex) + + flow = {node0: {node1}, node1: {node2}, node2: {node3}} + scheduler = Scheduler(graph, flow) + + # Test with specified strategy + config = ScheduleConfig(strategy=strategy, use_greedy=True) + success = scheduler.solve_schedule(config) + assert success + + # Validate schedule + scheduler.validate_schedule() + + +def test_greedy_minimize_space_wrapper() -> None: + """Test the greedy_minimize_space wrapper function.""" + # Create a simple graph + graph = GraphState() + node0 = graph.add_physical_node() + node1 = graph.add_physical_node() + node2 = graph.add_physical_node() + graph.add_physical_edge(node0, node1) + graph.add_physical_edge(node1, node2) + qindex = 0 + graph.register_input(node0, qindex) + graph.register_output(node2, qindex) + + flow = {node0: {node1}, node1: {node2}} + scheduler = Scheduler(graph, flow) + + # Test MINIMIZE_TIME + result = greedy_minimize_time(graph, scheduler.dag) + assert result is not None + prepare_time, measure_time = result + assert len(prepare_time) > 0 + assert len(measure_time) > 0 + + # Test MINIMIZE_SPACE + result = greedy_minimize_space(graph, scheduler.dag) + assert result is not None + prepare_time, measure_time = result + assert len(prepare_time) > 0 + assert len(measure_time) > 0 + + +def test_greedy_scheduler_dag_constraints() -> None: + """Test that greedy scheduler respects DAG constraints.""" + # Create a graph with more complex dependencies + graph = GraphState() + nodes = [graph.add_physical_node() for _ in range(6)] + + # Create edges forming a DAG structure + # 0 -> 2 -> 4 + # | + # 1 -> 3 -> 5 + graph.add_physical_edge(nodes[0], nodes[2]) + graph.add_physical_edge(nodes[2], nodes[4]) + graph.add_physical_edge(nodes[1], nodes[3]) + graph.add_physical_edge(nodes[3], nodes[5]) + graph.add_physical_edge(nodes[2], nodes[3]) + + graph.register_input(nodes[0], 0) + graph.register_input(nodes[1], 1) + graph.register_output(nodes[4], 0) + graph.register_output(nodes[5], 1) + + # Create flow with dependencies + flow = { + nodes[0]: {nodes[2]}, + nodes[1]: {nodes[3]}, + nodes[2]: {nodes[4]}, + nodes[3]: {nodes[5], nodes[1]}, # cyclic dependency to test DAG constraint handling + } + + scheduler = Scheduler(graph, flow) + config = ScheduleConfig(strategy=Strategy.MINIMIZE_TIME, use_greedy=True) + + # Note: This flow creates a cyclic DAG (nodes 3 and 4 have circular dependency) + # The greedy scheduler should raise RuntimeError for invalid flows + with pytest.raises(RuntimeError, match="No nodes can be measured"): + scheduler.solve_schedule(config) + + +def test_greedy_scheduler_edge_constraints() -> None: + """Test that greedy scheduler respects edge constraints (neighbor preparation).""" + # Create a simple graph + graph = GraphState() + node0 = graph.add_physical_node() + node1 = graph.add_physical_node() + node2 = graph.add_physical_node() + graph.add_physical_edge(node0, node1) + graph.add_physical_edge(node1, node2) + qindex = 0 + graph.register_input(node0, qindex) + graph.register_output(node2, qindex) + + flow = {node0: {node1}, node1: {node2}} + scheduler = Scheduler(graph, flow) + config = ScheduleConfig(strategy=Strategy.MINIMIZE_TIME, use_greedy=True) + success = scheduler.solve_schedule(config) + assert success + + # Validate edge constraints via validate_schedule + scheduler.validate_schedule() + + # Manually check: neighbors must be prepared before measurement + # node0 (input) is prepared at time -1, node1 prepared at some time + # node0 must be measured after node1 is prepared + # This is ensured by the auto-scheduled entanglement times + + # Check that entanglement times were auto-scheduled correctly + edge01 = (node0, node1) + edge12 = (node1, node2) + entangle01 = scheduler.entangle_time[edge01] + entangle12 = scheduler.entangle_time[edge12] + assert entangle01 is not None + assert entangle12 is not None + + # Entanglement must happen before measurement + meas0 = scheduler.measure_time[node0] + meas1 = scheduler.measure_time[node1] + assert meas0 is not None + assert meas1 is not None + assert entangle01 < meas0 + assert entangle12 < meas1 From f856d0b2c7d15cdc2bca70976bc5507f497210a5 Mon Sep 17 00:00:00 2001 From: Masato Fukushima Date: Thu, 25 Dec 2025 14:35:29 +0900 Subject: [PATCH 2/8] Fix manual_schedule entangle_time edge order --- graphqomb/scheduler.py | 14 +++++++++++++- 1 file changed, 13 insertions(+), 1 deletion(-) diff --git a/graphqomb/scheduler.py b/graphqomb/scheduler.py index b5d16fb56..8021ea3c5 100644 --- a/graphqomb/scheduler.py +++ b/graphqomb/scheduler.py @@ -225,6 +225,11 @@ def manual_schedule( ----- After setting preparation and measurement times, any unscheduled entanglement times (with `None` value) are automatically scheduled using `auto_schedule_entanglement()`. + + The graph is treated as undirected. For convenience, `entangle_time` accepts edges + in either order: both ``(u, v)`` and ``(v, u)`` are recognized. If both keys are + provided, the canonical order (as returned by :attr:`BaseGraphState.physical_edges`) + takes precedence, even when the value is ``None``. """ self.prepare_time = { node: prepare_time.get(node, None) @@ -235,7 +240,14 @@ def manual_schedule( for node in self.graph.physical_nodes - self.graph.output_node_indices.keys() } if entangle_time is not None: - self.entangle_time = {edge: entangle_time.get(edge, None) for edge in self.entangle_time} + resolved_entangle_time: dict[tuple[int, int], int | None] = {} + for edge in self.entangle_time: + if edge in entangle_time: + resolved_entangle_time[edge] = entangle_time[edge] + else: + u, v = edge + resolved_entangle_time[edge] = entangle_time.get((v, u), None) + self.entangle_time = resolved_entangle_time # Auto-schedule unscheduled entanglement times if any(time is None for time in self.entangle_time.values()): From ce4fcb146578607886e4a8a39dd7b6eb91d3f233 Mon Sep 17 00:00:00 2001 From: Masato Fukushima Date: Wed, 21 Jan 2026 17:49:35 +0900 Subject: [PATCH 3/8] Add ALAP optimization to greedy scheduler - Add alap_prepare_times() for As-Late-As-Possible preparation scheduling - Refactor greedy_minimize_time() with separate strategies for unlimited and qubit-limited cases - Add criticality-based prioritization and slack-filling for qubit-limited scheduling - Add tests for ALAP optimization Co-Authored-By: Claude Opus 4.5 --- CHANGELOG.md | 6 + graphqomb/greedy_scheduler.py | 356 ++++++++++++++++++++++++++------- tests/test_greedy_scheduler.py | 144 +++++++++++++ 3 files changed, 437 insertions(+), 69 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 9585916a3..a9913ee6a 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -9,6 +9,10 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ### Added +- **Greedy Scheduler**: Fast greedy scheduling algorithms as an alternative to CP-SAT optimization + - Added `greedy_minimize_time()` for minimal execution time scheduling with ALAP preparation optimization + - Added `greedy_minimize_space()` for minimal qubit usage scheduling + - **TICK Command**: Time slice boundary marker for temporal scheduling in MBQC patterns - Added TICK command type to mark boundaries between time slices - Integrated TICK command handling in PatternSimulator @@ -45,6 +49,8 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ### Tests +- **Greedy Scheduler**: Added tests for greedy scheduling algorithms + - **Stim Compiler**: Add coverage that manual `entangle_time` determines CZ time slices in both Pattern and Stim output. ## [0.1.2] - 2025-10-31 diff --git a/graphqomb/greedy_scheduler.py b/graphqomb/greedy_scheduler.py index 442349cb8..714cde0f5 100644 --- a/graphqomb/greedy_scheduler.py +++ b/graphqomb/greedy_scheduler.py @@ -23,16 +23,16 @@ from graphqomb.graphstate import BaseGraphState -def greedy_minimize_time( # noqa: C901, PLR0912 +def greedy_minimize_time( graph: BaseGraphState, dag: Mapping[int, AbstractSet[int]], max_qubit_count: int | None = None, ) -> tuple[dict[int, int], dict[int, int]]: r"""Fast greedy scheduler optimizing for minimal execution time (makespan). - This algorithm uses a straightforward greedy approach: - 1. At each time step, measure all nodes that can be measured - 2. Prepare all neighbors of measured nodes just before measurement + This algorithm uses different strategies based on max_qubit_count: + - Without qubit limit: Prepare all nodes at time=0, measure in ASAP order + - With qubit limit: Use slice-by-slice scheduling with slack-filling Parameters ---------- @@ -47,18 +47,10 @@ def greedy_minimize_time( # noqa: C901, PLR0912 ------- `tuple`\[`dict`\[`int`, `int`\], `dict`\[`int`, `int`\]\] A tuple of (prepare_time, measure_time) dictionaries - - Raises - ------ - RuntimeError - If no nodes can be measured at a given time step, indicating a possible - cyclic dependency or incomplete preparation, or if max_qubit_count - is too small to make progress. """ - prepare_time: dict[int, int] = {} - measure_time: dict[int, int] = {} - unmeasured = graph.physical_nodes - graph.output_node_indices.keys() + input_nodes = set(graph.input_node_indices.keys()) + output_nodes = set(graph.output_node_indices.keys()) # Build inverse DAG: for each node, track which nodes must be measured before it inv_dag: dict[int, set[int]] = {node: set() for node in graph.physical_nodes} @@ -66,76 +58,241 @@ def greedy_minimize_time( # noqa: C901, PLR0912 for child in children: inv_dag[child].add(parent) - prepared: set[int] = set(graph.input_node_indices.keys()) - alive: set[int] = set(graph.input_node_indices.keys()) + # Cache neighbors to avoid repeated set constructions in tight loops + neighbors_map = {node: graph.neighbors(node) for node in graph.physical_nodes} + + if max_qubit_count is None: + # Optimal strategy: prepare all nodes at time=0, measure in ASAP order + return _greedy_minimize_time_unlimited( + graph, inv_dag, neighbors_map, input_nodes, output_nodes + ) + + # With qubit limit: use slice-by-slice scheduling with slack-filling + return _greedy_minimize_time_limited( + graph, + dag, + inv_dag, + neighbors_map, + unmeasured, + input_nodes, + output_nodes, + max_qubit_count, + ) + + +def _greedy_minimize_time_unlimited( + graph: BaseGraphState, + inv_dag: Mapping[int, AbstractSet[int]], + neighbors_map: Mapping[int, AbstractSet[int]], + input_nodes: AbstractSet[int], + output_nodes: AbstractSet[int], +) -> tuple[dict[int, int], dict[int, int]]: + measure_time: dict[int, int] = {} + + # 1. Compute ASAP measurement times using topological order + # Neighbor constraint: assume all non-input nodes can be prepared at time 0 + # TopologicalSorter expects {node: dependencies}, which is inv_dag + try: + topo_order = list(TopologicalSorter(inv_dag).static_order()) + except CycleError as exc: + msg = "No nodes can be measured; possible cyclic dependency or incomplete preparation." + raise RuntimeError(msg) from exc + + for node in topo_order: + if node in output_nodes: + continue + # DAG constraint: must measure after all parents + parent_times = [measure_time[p] for p in inv_dag[node] if p in measure_time] + # Neighbor constraint: all neighbors must be prepared before measurement + # Input nodes are prepared at time -1, others at time 0 (earliest possible) + neighbor_prep_times = [ + -1 if n in input_nodes else 0 for n in neighbors_map[node] + ] + # Measure at the next time slot after all parents are measured + # AND after all neighbors are prepared + measure_time[node] = max( + max(parent_times, default=-1) + 1, + max(neighbor_prep_times, default=-1) + 1, + ) + + # 2. Compute ALAP preparation times (replace time=0 with latest possible) + prepare_time = alap_prepare_times(graph, measure_time) + + return prepare_time, measure_time + - if max_qubit_count is not None and len(alive) > max_qubit_count: +def _greedy_minimize_time_limited( # noqa: C901, PLR0912, PLR0913, PLR0917 + graph: BaseGraphState, + dag: Mapping[int, AbstractSet[int]], + inv_dag: Mapping[int, AbstractSet[int]], + neighbors_map: Mapping[int, AbstractSet[int]], + unmeasured: AbstractSet[int], + input_nodes: AbstractSet[int], + output_nodes: AbstractSet[int], + max_qubit_count: int, +) -> tuple[dict[int, int], dict[int, int]]: + prepare_time: dict[int, int] = {} + measure_time: dict[int, int] = {} + + # Make mutable copies + inv_dag_mut: dict[int, set[int]] = { + node: set(parents) for node, parents in inv_dag.items() + } + unmeasured_mut: set[int] = set(unmeasured) + + prepared: set[int] = set(input_nodes) + alive: set[int] = set(input_nodes) + + if len(alive) > max_qubit_count: msg = "Initial number of active qubits exceeds max_qubit_count." raise RuntimeError(msg) + # Compute criticality for prioritizing preparations + criticality = _compute_criticality(dag, output_nodes) + current_time = 0 - # Nodes whose dependencies are all resolved and are not yet measured - measure_candidates: set[int] = {node for node in unmeasured if not inv_dag[node]} + while unmeasured_mut: + # Phase 1: Measure all ready nodes + # A node is ready if: + # - DAG dependencies are resolved (inv_dag_mut[node] is empty) + # - All neighbors are prepared + # - The node itself is prepared (if not an input node) + ready_to_measure: set[int] = set() + for node in unmeasured_mut: + if inv_dag_mut[node]: + continue # DAG dependencies not resolved + if not neighbors_map[node] <= prepared: + continue # Neighbors not prepared + if node not in input_nodes and node not in prepared: + continue # Self not prepared + ready_to_measure.add(node) + + for node in ready_to_measure: + measure_time[node] = current_time + unmeasured_mut.remove(node) + alive.discard(node) + + # Update DAG dependencies + for child in dag.get(node, ()): + inv_dag_mut[child].discard(node) + + # Phase 2: Prepare nodes using free capacity (slack-filling) + free_capacity = max_qubit_count - len(alive) + + if free_capacity > 0: + # Get unprepared nodes with their priority scores + unprepared = graph.physical_nodes - prepared + if unprepared: + prep_candidates = _get_prep_candidates_with_priority( + unprepared, + inv_dag_mut, + neighbors_map, + prepared, + unmeasured_mut, + output_nodes, + criticality, + ) + # Prepare top candidates within free capacity + for candidate, _score in prep_candidates[:free_capacity]: + prepare_time[candidate] = current_time + prepared.add(candidate) + alive.add(candidate) + + # Check if we made progress + if not ready_to_measure and free_capacity == 0 and unmeasured_mut: + # No measurements and no room to prepare - stuck + msg = ( + "Cannot schedule more measurements without exceeding max qubit count. " + "Please increase max_qubit_count." + ) + raise RuntimeError(msg) - # Cache neighbors to avoid repeated set constructions in tight loops - neighbors_map = {node: graph.neighbors(node) for node in graph.physical_nodes} + current_time += 1 - while unmeasured: # noqa: PLR1702 - if not measure_candidates: - msg = "No nodes can be measured; possible cyclic dependency or incomplete preparation." + # Safety check for infinite loops + if current_time > len(graph.physical_nodes) * 2: + msg = "Scheduling did not converge; possible cyclic dependency." raise RuntimeError(msg) - if max_qubit_count is not None: - # Choose measurement nodes from measure_candidates while respecting max_qubit_count - to_measure, to_prepare = _determine_measure_nodes( - neighbors_map, - measure_candidates, - prepared, - alive, - max_qubit_count, - ) - needs_prep = bool(to_prepare) - for neighbor in to_prepare: - prepare_time[neighbor] = current_time - # If this neighbor already had no dependencies, it becomes measure candidate - if not inv_dag[neighbor] and neighbor in unmeasured: - measure_candidates.add(neighbor) - prepared.update(to_prepare) - alive.update(to_prepare) - else: - # Without a qubit limit, measure all currently measure candidates - to_measure = set(measure_candidates) - needs_prep = False - for node in to_measure: - for neighbor in neighbors_map[node]: - if neighbor not in prepared: - prepare_time[neighbor] = current_time - prepared.add(neighbor) - needs_prep = True - - if not inv_dag[neighbor] and neighbor in unmeasured: - measure_candidates.add(neighbor) + # Apply ALAP post-processing to minimize active volume + prepare_time = alap_prepare_times(graph, measure_time) - # Measure at current_time if no prep needed, otherwise at current_time + 1 - meas_time = current_time + 1 if needs_prep else current_time + return prepare_time, measure_time - for node in to_measure: - measure_time[node] = meas_time - if max_qubit_count is not None: - alive.remove(node) - unmeasured.remove(node) - measure_candidates.remove(node) - # Remove measured node from dependencies of all its children in the DAG - for child in dag.get(node, ()): - inv_dag[child].remove(node) - if not inv_dag[child] and child in unmeasured: - measure_candidates.add(child) +def _compute_criticality( + dag: Mapping[int, AbstractSet[int]], + output_nodes: AbstractSet[int], +) -> dict[int, int]: + # Compute criticality (remaining DAG depth) for each node. + # Nodes with higher criticality should be prioritized for unblocking. + criticality: dict[int, int] = {} + + # TopologicalSorter(dag) returns nodes with no "dependencies" first. + # Since dag is {parent: children}, nodes with empty children come first (leaves). + # This is the correct order for computing criticality (leaves before roots). + try: + topo_order = list(TopologicalSorter(dag).static_order()) + except CycleError: + return {} - current_time = meas_time + 1 + for node in topo_order: + children_crits = [criticality.get(c, 0) for c in dag.get(node, ())] + criticality[node] = 1 + max(children_crits, default=0) - return prepare_time, measure_time + # Output nodes have criticality 0 (they don't need to be measured) + for node in output_nodes: + criticality[node] = 0 + + return criticality + + +def _get_prep_candidates_with_priority( # noqa: PLR0913, PLR0917 + unprepared: AbstractSet[int], + inv_dag: Mapping[int, AbstractSet[int]], + neighbors_map: Mapping[int, AbstractSet[int]], + prepared: AbstractSet[int], + unmeasured: AbstractSet[int], + output_nodes: AbstractSet[int], + criticality: Mapping[int, int], +) -> list[tuple[int, float]]: + # Get preparation candidates sorted by priority score. + # Priority is based on how much preparing a node helps unblock measurements. + # Find nodes that are DAG-ready but blocked by missing neighbors + dag_ready_blocked: set[int] = set() + missing_map: dict[int, set[int]] = {} + + for node in unmeasured: + if inv_dag[node]: + continue # Not DAG-ready + missing = set(neighbors_map[node]) - set(prepared) + # Also check if the node itself needs preparation + if node not in prepared: + missing.add(node) + if missing: + dag_ready_blocked.add(node) + missing_map[node] = missing + + # Score each unprepared node + scores: list[tuple[int, float]] = [] + for candidate in unprepared: + score = 0.0 + for blocked_node in dag_ready_blocked: + if candidate in missing_map[blocked_node]: + crit = criticality.get(blocked_node, 1) + score += crit / len(missing_map[blocked_node]) + + # Apply penalty for output nodes (they stay alive forever) + if candidate in output_nodes: + score *= 0.5 + + scores.append((candidate, score)) + + # Sort by score descending (higher score = higher priority) + scores.sort(key=lambda x: -x[1]) + + return scores def _determine_measure_nodes( @@ -330,3 +487,64 @@ def _calc_activate_cost( return len(alive) + len(new_neighbors) # No preparation needed -> node is measured in the current slice, so alive decreases by 1. return max(len(alive) - 1, 0) + + +def alap_prepare_times( + graph: BaseGraphState, + measure_time: Mapping[int, int], +) -> dict[int, int]: + r"""Recompute preparation times using ALAP (As Late As Possible) strategy. + + Given fixed measurement times, this computes the latest possible preparation + time for each node while respecting the constraint that all neighbors must + be prepared before a node is measured. + + This post-processing reduces active volume (sum of qubit lifetimes) without + changing the measurement schedule or depth. + + Parameters + ---------- + graph : `BaseGraphState` + The graph state + measure_time : `collections.abc.Mapping`\[`int`, `int`\] + Fixed measurement times for non-output nodes + + Returns + ------- + `dict`\[`int`, `int`\] + ALAP preparation times for non-input nodes + """ + input_nodes = set(graph.input_node_indices.keys()) + + # deadline[v] = latest time v can be prepared + deadline: dict[int, int] = {} + + # For each measured node u, all its neighbors must be prepared before meas(u) + for u, meas_u in measure_time.items(): + for neighbor in graph.neighbors(u): + if neighbor in input_nodes: + continue # Input nodes don't need prep + if neighbor not in deadline: + deadline[neighbor] = meas_u - 1 + else: + deadline[neighbor] = min(deadline[neighbor], meas_u - 1) + + # For measured nodes, they must be prepared before their own measurement + for v, meas_v in measure_time.items(): + if v in input_nodes: + continue # Input nodes don't need prep + if v not in deadline: + deadline[v] = meas_v - 1 + else: + deadline[v] = min(deadline[v], meas_v - 1) + + # Handle nodes with no deadline yet (output nodes with no measured neighbors) + # These should be prepared at the latest possible time: max(measure_time) - 1 + # or 0 if there are no measurements + makespan = max(measure_time.values(), default=0) + for v in graph.physical_nodes - input_nodes: + if v not in deadline: + # No constraint from neighbors, prep as late as possible + deadline[v] = max(makespan - 1, 0) + + return deadline diff --git a/tests/test_greedy_scheduler.py b/tests/test_greedy_scheduler.py index 833a8dab4..bc43f3163 100644 --- a/tests/test_greedy_scheduler.py +++ b/tests/test_greedy_scheduler.py @@ -429,3 +429,147 @@ def test_greedy_scheduler_edge_constraints() -> None: assert meas1 is not None assert entangle01 < meas0 assert entangle12 < meas1 + + +def test_greedy_minimize_time_3x3_grid_optimal() -> None: + """Test that greedy_minimize_time achieves optimal depth on 3x3 grid. + + This is a regression test for the optimization that measures in ASAP order + based on DAG dependencies. With ALAP preparation, nodes are prepared as + late as possible, but depth should still be optimal. + Previously, the greedy algorithm produced depth=4 instead of optimal depth=3. + """ + # Create 3x3 grid graph + # Layout: + # 0 - 3 - 6 + # | | | + # 1 - 4 - 7 + # | | | + # 2 - 5 - 8 + # Inputs: 0, 1, 2 (left column) + # Outputs: 6, 7, 8 (right column) + graph = GraphState() + nodes = [graph.add_physical_node() for _ in range(9)] + + # Horizontal edges + for row in range(3): + for col in range(2): + graph.add_physical_edge(nodes[row + col * 3], nodes[row + (col + 1) * 3]) + + # Vertical edges + for row in range(2): + for col in range(3): + graph.add_physical_edge(nodes[row + col * 3], nodes[row + 1 + col * 3]) + + # Register inputs (left column) and outputs (right column) + for row in range(3): + graph.register_input(nodes[row], row) + graph.register_output(nodes[row + 6], row) + + # Flow: left to right + flow: dict[int, set[int]] = {} + for row in range(3): + flow[nodes[row]] = {nodes[row + 3]} # 0->3, 1->4, 2->5 + flow[nodes[row + 3]] = {nodes[row + 6]} # 3->6, 4->7, 5->8 + + scheduler = Scheduler(graph, flow) + + # Test greedy scheduler (no qubit limit) + prepare_time, measure_time = greedy_minimize_time(graph, scheduler.dag) + + # With ALAP, nodes are prepared as late as possible, not at time=0 + # Check that all non-input nodes have a prepare_time + for node in [3, 4, 5, 6, 7, 8]: + assert node in prepare_time, f"Node {node} should have a prepare_time" + + # Calculate depth + greedy_depth = max(measure_time.values()) + 1 + + # The optimal depth for a 3x3 grid is 3 (same as CP-SAT) + assert greedy_depth == 3, f"Expected depth=3, got depth={greedy_depth}" + + +def test_greedy_minimize_time_alap_preparation() -> None: + """Test that greedy_minimize_time uses ALAP preparation to minimize active volume.""" + graph = GraphState() + # Create a 4-node chain: 0-1-2-3 + n0 = graph.add_physical_node() + n1 = graph.add_physical_node() + n2 = graph.add_physical_node() + n3 = graph.add_physical_node() + graph.add_physical_edge(n0, n1) + graph.add_physical_edge(n1, n2) + graph.add_physical_edge(n2, n3) + + graph.register_input(n0, 0) + graph.register_output(n3, 0) + + flow = {n0: {n1}, n1: {n2}, n2: {n3}} + scheduler = Scheduler(graph, flow) + + prepare_time, measure_time = greedy_minimize_time(graph, scheduler.dag) + + # With ALAP, nodes should be prepared as late as possible + # n1 is neighbor of n0, so prep(n1) < meas(n0) + assert prepare_time[n1] == measure_time[n0] - 1 + # n2 is neighbor of n1, so prep(n2) < meas(n1) + assert prepare_time[n2] == measure_time[n1] - 1 + # n3 (output) is neighbor of n2, so prep(n3) < meas(n2) + assert prepare_time[n3] == measure_time[n2] - 1 + + # Input node should not have prepare_time + assert n0 not in prepare_time + + +def test_alap_reduces_active_volume() -> None: + """Test that ALAP preparation reduces active volume compared to ASAP.""" + graph = GraphState() + # Create a chain graph: 0-1-2-3 + n0 = graph.add_physical_node() + n1 = graph.add_physical_node() + n2 = graph.add_physical_node() + n3 = graph.add_physical_node() + graph.add_physical_edge(n0, n1) + graph.add_physical_edge(n1, n2) + graph.add_physical_edge(n2, n3) + graph.register_input(n0, 0) + graph.register_output(n3, 0) + + flow = {n0: {n1}, n1: {n2}, n2: {n3}} + scheduler = Scheduler(graph, flow) + + prepare_time, measure_time = greedy_minimize_time(graph, scheduler.dag) + + # With ALAP: n3 (output) should be prepared as late as possible + # n3 is neighbor of n2, so prep(n3) < meas(n2) + # This should be later than time=0 + assert prepare_time[n3] == measure_time[n2] - 1 + assert prepare_time[n3] > 0 # ALAP should delay preparation + + +def test_alap_preserves_depth() -> None: + """Test that ALAP does not increase depth.""" + # Create a 3x3 grid + graph = GraphState() + nodes = [graph.add_physical_node() for _ in range(9)] + + # Horizontal and vertical edges + for row in range(3): + for col in range(2): + graph.add_physical_edge(nodes[row + col * 3], nodes[row + (col + 1) * 3]) + for row in range(2): + for col in range(3): + graph.add_physical_edge(nodes[row + col * 3], nodes[row + 1 + col * 3]) + + for row in range(3): + graph.register_input(nodes[row], row) + graph.register_output(nodes[row + 6], row) + + flow: dict[int, set[int]] = {nodes[row]: {nodes[row + 3]} for row in range(3)} + flow.update({nodes[row + 3]: {nodes[row + 6]} for row in range(3)}) + + scheduler = Scheduler(graph, flow) + _, measure_time = greedy_minimize_time(graph, scheduler.dag) + + # Depth should still be optimal (3) + assert max(measure_time.values()) + 1 == 3 From 2eab57e0cad80bfa3848c8d77f4d9113fc7d485a Mon Sep 17 00:00:00 2001 From: Masato Fukushima Date: Wed, 21 Jan 2026 17:55:04 +0900 Subject: [PATCH 4/8] format --- graphqomb/greedy_scheduler.py | 15 ++++----------- 1 file changed, 4 insertions(+), 11 deletions(-) diff --git a/graphqomb/greedy_scheduler.py b/graphqomb/greedy_scheduler.py index 714cde0f5..936546f5c 100644 --- a/graphqomb/greedy_scheduler.py +++ b/graphqomb/greedy_scheduler.py @@ -63,9 +63,7 @@ def greedy_minimize_time( if max_qubit_count is None: # Optimal strategy: prepare all nodes at time=0, measure in ASAP order - return _greedy_minimize_time_unlimited( - graph, inv_dag, neighbors_map, input_nodes, output_nodes - ) + return _greedy_minimize_time_unlimited(graph, inv_dag, neighbors_map, input_nodes, output_nodes) # With qubit limit: use slice-by-slice scheduling with slack-filling return _greedy_minimize_time_limited( @@ -105,9 +103,7 @@ def _greedy_minimize_time_unlimited( parent_times = [measure_time[p] for p in inv_dag[node] if p in measure_time] # Neighbor constraint: all neighbors must be prepared before measurement # Input nodes are prepared at time -1, others at time 0 (earliest possible) - neighbor_prep_times = [ - -1 if n in input_nodes else 0 for n in neighbors_map[node] - ] + neighbor_prep_times = [-1 if n in input_nodes else 0 for n in neighbors_map[node]] # Measure at the next time slot after all parents are measured # AND after all neighbors are prepared measure_time[node] = max( @@ -135,9 +131,7 @@ def _greedy_minimize_time_limited( # noqa: C901, PLR0912, PLR0913, PLR0917 measure_time: dict[int, int] = {} # Make mutable copies - inv_dag_mut: dict[int, set[int]] = { - node: set(parents) for node, parents in inv_dag.items() - } + inv_dag_mut: dict[int, set[int]] = {node: set(parents) for node, parents in inv_dag.items()} unmeasured_mut: set[int] = set(unmeasured) prepared: set[int] = set(input_nodes) @@ -203,8 +197,7 @@ def _greedy_minimize_time_limited( # noqa: C901, PLR0912, PLR0913, PLR0917 if not ready_to_measure and free_capacity == 0 and unmeasured_mut: # No measurements and no room to prepare - stuck msg = ( - "Cannot schedule more measurements without exceeding max qubit count. " - "Please increase max_qubit_count." + "Cannot schedule more measurements without exceeding max qubit count. Please increase max_qubit_count." ) raise RuntimeError(msg) From 2032d302fb197fadf9477d837e3b95aa9b3e3ff0 Mon Sep 17 00:00:00 2001 From: Masato Fukushima Date: Wed, 21 Jan 2026 18:02:20 +0900 Subject: [PATCH 5/8] Fix greedy_minimize_space to prepare non-input DAG root nodes Previously, greedy_minimize_space only prepared neighbors of the measured node, not the node itself. This caused a KeyError when a non-input node with no DAG dependencies had all input neighbors (already prepared). The fix ensures non-input nodes are prepared before measurement by: - Adding the node itself to the preparation set when needed - Updating _calc_activate_cost to account for self-preparation cost Co-Authored-By: Claude Opus 4.5 --- graphqomb/greedy_scheduler.py | 40 ++++++++++++++++++++-------------- tests/test_greedy_scheduler.py | 38 ++++++++++++++++++++++++++++++++ 2 files changed, 62 insertions(+), 16 deletions(-) diff --git a/graphqomb/greedy_scheduler.py b/graphqomb/greedy_scheduler.py index 936546f5c..c4a0271f7 100644 --- a/graphqomb/greedy_scheduler.py +++ b/graphqomb/greedy_scheduler.py @@ -342,7 +342,7 @@ def _determine_measure_nodes( return to_measure, to_prepare -def greedy_minimize_space( # noqa: C901, PLR0914 +def greedy_minimize_space( # noqa: C901, PLR0914, PLR0915 graph: BaseGraphState, dag: Mapping[int, AbstractSet[int]], ) -> tuple[dict[int, int], dict[int, int]]: @@ -391,8 +391,9 @@ def greedy_minimize_space( # noqa: C901, PLR0914 for child in children: inv_dag[child].add(parent) - prepared: set[int] = set(graph.input_node_indices.keys()) - alive: set[int] = set(graph.input_node_indices.keys()) + input_nodes = set(graph.input_node_indices.keys()) + prepared: set[int] = set(input_nodes) + alive: set[int] = set(input_nodes) current_time = 0 # Cache neighbors once as the graph is static during scheduling @@ -409,24 +410,26 @@ def greedy_minimize_space( # noqa: C901, PLR0914 default_rank = len(topo_rank) candidates = iter(measure_candidates) best_node = next(candidates) - best_cost = _calc_activate_cost(best_node, neighbors_map, prepared, alive) + best_cost = _calc_activate_cost(best_node, neighbors_map, prepared, alive, input_nodes) best_rank = topo_rank.get(best_node, default_rank) for node in candidates: - cost = _calc_activate_cost(node, neighbors_map, prepared, alive) + cost = _calc_activate_cost(node, neighbors_map, prepared, alive, input_nodes) rank = topo_rank.get(node, default_rank) if cost < best_cost or (cost == best_cost and rank < best_rank): best_cost = cost best_rank = rank best_node = node - # Prepare neighbors at current_time + # Prepare neighbors and the node itself (if non-input) at current_time new_neighbors = neighbors_map[best_node] - prepared - needs_prep = bool(new_neighbors) + needs_self_prep = best_node not in input_nodes and best_node not in prepared + to_prepare = new_neighbors | ({best_node} if needs_self_prep else set()) + needs_prep = bool(to_prepare) if needs_prep: - for neighbor in new_neighbors: - prepare_time[neighbor] = current_time - prepared.update(new_neighbors) - alive.update(new_neighbors) + for node_to_prep in to_prepare: + prepare_time[node_to_prep] = current_time + prepared.update(to_prepare) + alive.update(to_prepare) # Measure at current_time if no prep needed, otherwise at current_time + 1 meas_time = current_time + 1 if needs_prep else current_time @@ -452,12 +455,13 @@ def _calc_activate_cost( neighbors_map: Mapping[int, AbstractSet[int]], prepared: AbstractSet[int], alive: AbstractSet[int], + input_nodes: AbstractSet[int], ) -> int: r"""Calculate the projected number of alive qubits if measuring this node next. - If neighbors must be prepared, they become alive at the current time slice - while the node itself remains alive until the next slice. If no preparation - is needed, the node is measured in the current slice and removed. + If neighbors or the node itself must be prepared, they become alive at the + current time slice while the node itself remains alive until the next slice. + If no preparation is needed, the node is measured in the current slice and removed. Parameters ---------- @@ -469,6 +473,8 @@ def _calc_activate_cost( The set of currently prepared nodes. alive : `collections.abc.Set`\[`int`\] The set of currently active (prepared but not yet measured) nodes. + input_nodes : `collections.abc.Set`\[`int`\] + The set of input nodes (already prepared at the start). Returns ------- @@ -476,8 +482,10 @@ def _calc_activate_cost( The activation cost for the node. """ new_neighbors = neighbors_map[node] - prepared - if new_neighbors: - return len(alive) + len(new_neighbors) + needs_self_prep = node not in input_nodes and node not in prepared + num_to_prepare = len(new_neighbors) + (1 if needs_self_prep else 0) + if num_to_prepare > 0: + return len(alive) + num_to_prepare # No preparation needed -> node is measured in the current slice, so alive decreases by 1. return max(len(alive) - 1, 0) diff --git a/tests/test_greedy_scheduler.py b/tests/test_greedy_scheduler.py index bc43f3163..9b64e6f9c 100644 --- a/tests/test_greedy_scheduler.py +++ b/tests/test_greedy_scheduler.py @@ -573,3 +573,41 @@ def test_alap_preserves_depth() -> None: # Depth should still be optimal (3) assert max(measure_time.values()) + 1 == 3 + + +def test_greedy_minimize_space_non_input_dag_root() -> None: + """Test greedy_minimize_space handles non-input nodes that are DAG roots. + + This is a regression test for a bug where non-input nodes with no DAG + dependencies and all input neighbors would not be prepared before measurement, + causing a KeyError when removing from the alive set. + """ + graph = GraphState() + n0 = graph.add_physical_node() # input + n1 = graph.add_physical_node() # non-input, DAG root (no feedforward dependency) + n2 = graph.add_physical_node() # input + n3 = graph.add_physical_node() # output + + graph.add_physical_edge(n0, n1) + graph.add_physical_edge(n1, n2) + graph.add_physical_edge(n2, n3) + + graph.register_input(n0, 0) + graph.register_input(n2, 1) + graph.register_output(n3, 0) + + # Empty DAG: no feedforward dependencies + dag: dict[int, set[int]] = {} + + # This should not raise KeyError + prepare_time, measure_time = greedy_minimize_space(graph, dag) + + # Verify that n1 (non-input) was prepared + assert n1 in prepare_time + # Verify that n1 was measured + assert n1 in measure_time + # Verify prepare happens before measure + assert prepare_time[n1] < measure_time[n1] + # Input nodes should not be in prepare_time + assert n0 not in prepare_time + assert n2 not in prepare_time From 05a95430bcf3aae7d7ec6cfeac63c8c15ce4a06c Mon Sep 17 00:00:00 2001 From: Masato Fukushima Date: Wed, 21 Jan 2026 18:28:37 +0900 Subject: [PATCH 6/8] fix changelog --- CHANGELOG.md | 14 +++++++++++--- 1 file changed, 11 insertions(+), 3 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 3a4e20c70..64c2cf672 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -5,13 +5,23 @@ All notable changes to this project will be documented in this file. The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html). -## [0.2.1] - 2026-01-16 +## Unreleased ### Added - **Greedy Scheduler**: Fast greedy scheduling algorithms as an alternative to CP-SAT optimization - Added `greedy_minimize_time()` for minimal execution time scheduling with ALAP preparation optimization - Added `greedy_minimize_space()` for minimal qubit usage scheduling + +### Tests + +- **Greedy Scheduler**: Added tests for greedy scheduling algorithms + + +## [0.2.1] - 2026-01-16 + +### Added + - **Type Hints**: Added `py.typed` marker for PEP 561 compliance, enabling type checkers (mypy, pyright) to recognize the package as typed when installed from PyPI. ### Changed @@ -62,8 +72,6 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ### Tests -- **Greedy Scheduler**: Added tests for greedy scheduling algorithms - - **Stim Compiler**: Add coverage that manual `entangle_time` determines CZ time slices in both Pattern and Stim output. ## [0.1.2] - 2025-10-31 From a1bdd32e0829577c9eef8732ea0e565813606909 Mon Sep 17 00:00:00 2001 From: Masato Fukushima Date: Thu, 5 Mar 2026 13:27:10 +0900 Subject: [PATCH 7/8] remove unused method --- graphqomb/greedy_scheduler.py | 54 ----------------------------------- 1 file changed, 54 deletions(-) diff --git a/graphqomb/greedy_scheduler.py b/graphqomb/greedy_scheduler.py index c4a0271f7..42feb489c 100644 --- a/graphqomb/greedy_scheduler.py +++ b/graphqomb/greedy_scheduler.py @@ -288,60 +288,6 @@ def _get_prep_candidates_with_priority( # noqa: PLR0913, PLR0917 return scores -def _determine_measure_nodes( - neighbors_map: Mapping[int, AbstractSet[int]], - measure_candidates: AbstractSet[int], - prepared: AbstractSet[int], - alive: AbstractSet[int], - max_qubit_count: int, -) -> tuple[set[int], set[int]]: - r"""Determine which nodes to measure without exceeding max qubit count. - - Parameters - ---------- - neighbors_map : `collections.abc.Mapping`\[`int`, `collections.abc.Set`\[`int`\]\] - Mapping from node to its neighbors. - measure_candidates : `collections.abc.Set`\[`int`\] - The candidate nodes available for measurement. - prepared : `collections.abc.Set`\[`int`\] - The set of currently prepared nodes. - alive : `collections.abc.Set`\[`int`\] - The set of currently active (prepared but not yet measured) nodes. - max_qubit_count : `int` - The maximum allowed number of active qubits. - - Returns - ------- - `tuple`\[`set`\[`int`\], `set`\[`int`\]\] - A tuple of (to_measure, to_prepare) sets indicating which nodes to measure and prepare. - - Raises - ------ - RuntimeError - If no nodes can be measured without exceeding the max qubit count. - """ - to_measure: set[int] = set() - to_prepare: set[int] = set() - - for node in measure_candidates: - # Neighbors that still need to be prepared for this node - new_neighbors = neighbors_map[node] - prepared - additional_to_prepare = new_neighbors - to_prepare - - # Projected number of active qubits after preparing these neighbors - projected_active = len(alive) + len(to_prepare) + len(additional_to_prepare) - - if projected_active <= max_qubit_count: - to_measure.add(node) - to_prepare |= new_neighbors - - if not to_measure: - msg = "Cannot schedule more measurements without exceeding max qubit count. Please increase max_qubit_count." - raise RuntimeError(msg) - - return to_measure, to_prepare - - def greedy_minimize_space( # noqa: C901, PLR0914, PLR0915 graph: BaseGraphState, dag: Mapping[int, AbstractSet[int]], From 8923805b7edbee48fa465a2e247b2907b54549bc Mon Sep 17 00:00:00 2001 From: Masato Fukushima Date: Thu, 5 Mar 2026 18:10:09 +0900 Subject: [PATCH 8/8] refactor minimize_time --- graphqomb/feedforward.py | 60 +++++++- graphqomb/greedy_scheduler.py | 282 ++++++++++++++++------------------ tests/test_feedforward.py | 43 ++++++ 3 files changed, 238 insertions(+), 147 deletions(-) diff --git a/graphqomb/feedforward.py b/graphqomb/feedforward.py index e1f558ef4..fe8a542f9 100644 --- a/graphqomb/feedforward.py +++ b/graphqomb/feedforward.py @@ -3,6 +3,8 @@ This module provides: - `dag_from_flow`: Construct a directed acyclic graph (DAG) from a flowlike object. +- `inverse_dag_from_dag`: Construct an inverse DAG (node -> dependencies). +- `topo_order_from_inv_dag`: Construct a topological order from an inverse DAG. - `check_dag`: Check if a directed acyclic graph (DAG) does not contain a cycle. - `check_flow`: Check if the flowlike object is causal with respect to the graph state. - `signal_shifting`: Convert the correction maps into more parallel-friendly forms using signal shifting. @@ -13,7 +15,7 @@ from collections.abc import Iterable, Mapping from collections.abc import Set as AbstractSet -from graphlib import TopologicalSorter +from graphlib import CycleError, TopologicalSorter from typing import Any, TypeGuard import typing_extensions @@ -21,6 +23,8 @@ from graphqomb.common import Axis, Plane, determine_pauli_axis from graphqomb.graphstate import BaseGraphState, odd_neighbors +TOPO_ORDER_CYCLE_ERROR_MSG = "No nodes can be measured; possible cyclic dependency or incomplete preparation." + def _is_flow(flowlike: Mapping[int, Any]) -> TypeGuard[Mapping[int, int]]: r"""Check if the flowlike object is a flow. @@ -129,6 +133,60 @@ def check_dag(dag: Mapping[int, Iterable[int]]) -> None: raise ValueError(msg) +def inverse_dag_from_dag( + dag: Mapping[int, Iterable[int]], + all_nodes: Iterable[int] | None = None, +) -> dict[int, set[int]]: + r"""Build inverse DAG (node -> dependencies) from parent->children DAG. + + Parameters + ---------- + dag : `collections.abc.Mapping`\[`int`, `collections.abc.Iterable`\[`int`\]\] + DAG represented as parent node -> children. + all_nodes : `collections.abc.Iterable`\[`int`\] | `None`, optional + Optional full node set to include isolated nodes. + + Returns + ------- + `dict`\[`int`, `set`\[`int`\]\] + Inverse DAG represented as node -> dependencies. + """ + nodes = set(all_nodes) if all_nodes is not None else set(dag) + for children in dag.values(): + nodes.update(children) + + inv_dag: dict[int, set[int]] = {node: set() for node in nodes} + for parent, children in dag.items(): + for child in children: + inv_dag[child].add(parent) + + return inv_dag + + +def topo_order_from_inv_dag(inv_dag: Mapping[int, Iterable[int]]) -> list[int]: + r"""Build topological order from an inverse DAG (node -> dependencies). + + Parameters + ---------- + inv_dag : `collections.abc.Mapping`\[`int`, `collections.abc.Iterable`\[`int`\]\] + Inverse DAG where each node maps to the nodes it depends on. + + Returns + ------- + `list`\[`int`\] + Topological order from dependencies to dependents. + + Raises + ------ + RuntimeError + If topological ordering is not possible due to a cycle. + """ + try: + return list(TopologicalSorter(inv_dag).static_order()) + except CycleError as exc: + raise RuntimeError(TOPO_ORDER_CYCLE_ERROR_MSG) from exc + + def check_flow( graph: BaseGraphState, xflow: Mapping[int, int] | Mapping[int, AbstractSet[int]], diff --git a/graphqomb/greedy_scheduler.py b/graphqomb/greedy_scheduler.py index 42feb489c..a4751e3fe 100644 --- a/graphqomb/greedy_scheduler.py +++ b/graphqomb/greedy_scheduler.py @@ -13,9 +13,10 @@ from __future__ import annotations -from graphlib import CycleError, TopologicalSorter from typing import TYPE_CHECKING +from graphqomb.feedforward import TOPO_ORDER_CYCLE_ERROR_MSG, inverse_dag_from_dag, topo_order_from_inv_dag + if TYPE_CHECKING: from collections.abc import Mapping from collections.abc import Set as AbstractSet @@ -23,16 +24,24 @@ from graphqomb.graphstate import BaseGraphState -def greedy_minimize_time( +def greedy_minimize_time( # noqa: PLR0914 graph: BaseGraphState, dag: Mapping[int, AbstractSet[int]], max_qubit_count: int | None = None, ) -> tuple[dict[int, int], dict[int, int]]: r"""Fast greedy scheduler optimizing for minimal execution time (makespan). - This algorithm uses different strategies based on max_qubit_count: - - Without qubit limit: Prepare all nodes at time=0, measure in ASAP order - - With qubit limit: Use slice-by-slice scheduling with slack-filling + This algorithm uses a single slice-by-slice strategy with slack-filling. + If `max_qubit_count` is `None`, it is treated as no active-qubit limit. + + At each time slice, scheduling proceeds in two phases: + + 1. Phase 1 (measurement phase): Measure every currently ready node. + A node is ready when all DAG parents are already measured, all graph + neighbors are prepared, and (for non-input nodes) the node itself is prepared. + 2. Phase 2 (preparation phase): Use remaining qubit capacity to prepare + high-priority unprepared nodes that are likely to unblock future + measurements (slack-filling). Parameters ---------- @@ -47,86 +56,23 @@ def greedy_minimize_time( ------- `tuple`\[`dict`\[`int`, `int`\], `dict`\[`int`, `int`\]\] A tuple of (prepare_time, measure_time) dictionaries + + Raises + ------ + RuntimeError + If the scheduling cannot proceed due to cyclic dependencies + or if the max_qubit_count constraint is too tight to allow any progress. """ unmeasured = graph.physical_nodes - graph.output_node_indices.keys() input_nodes = set(graph.input_node_indices.keys()) output_nodes = set(graph.output_node_indices.keys()) - # Build inverse DAG: for each node, track which nodes must be measured before it - inv_dag: dict[int, set[int]] = {node: set() for node in graph.physical_nodes} - for parent, children in dag.items(): - for child in children: - inv_dag[child].add(parent) + inv_dag = inverse_dag_from_dag(dag, graph.physical_nodes) # Cache neighbors to avoid repeated set constructions in tight loops neighbors_map = {node: graph.neighbors(node) for node in graph.physical_nodes} - if max_qubit_count is None: - # Optimal strategy: prepare all nodes at time=0, measure in ASAP order - return _greedy_minimize_time_unlimited(graph, inv_dag, neighbors_map, input_nodes, output_nodes) - - # With qubit limit: use slice-by-slice scheduling with slack-filling - return _greedy_minimize_time_limited( - graph, - dag, - inv_dag, - neighbors_map, - unmeasured, - input_nodes, - output_nodes, - max_qubit_count, - ) - - -def _greedy_minimize_time_unlimited( - graph: BaseGraphState, - inv_dag: Mapping[int, AbstractSet[int]], - neighbors_map: Mapping[int, AbstractSet[int]], - input_nodes: AbstractSet[int], - output_nodes: AbstractSet[int], -) -> tuple[dict[int, int], dict[int, int]]: - measure_time: dict[int, int] = {} - - # 1. Compute ASAP measurement times using topological order - # Neighbor constraint: assume all non-input nodes can be prepared at time 0 - # TopologicalSorter expects {node: dependencies}, which is inv_dag - try: - topo_order = list(TopologicalSorter(inv_dag).static_order()) - except CycleError as exc: - msg = "No nodes can be measured; possible cyclic dependency or incomplete preparation." - raise RuntimeError(msg) from exc - - for node in topo_order: - if node in output_nodes: - continue - # DAG constraint: must measure after all parents - parent_times = [measure_time[p] for p in inv_dag[node] if p in measure_time] - # Neighbor constraint: all neighbors must be prepared before measurement - # Input nodes are prepared at time -1, others at time 0 (earliest possible) - neighbor_prep_times = [-1 if n in input_nodes else 0 for n in neighbors_map[node]] - # Measure at the next time slot after all parents are measured - # AND after all neighbors are prepared - measure_time[node] = max( - max(parent_times, default=-1) + 1, - max(neighbor_prep_times, default=-1) + 1, - ) - - # 2. Compute ALAP preparation times (replace time=0 with latest possible) - prepare_time = alap_prepare_times(graph, measure_time) - - return prepare_time, measure_time - - -def _greedy_minimize_time_limited( # noqa: C901, PLR0912, PLR0913, PLR0917 - graph: BaseGraphState, - dag: Mapping[int, AbstractSet[int]], - inv_dag: Mapping[int, AbstractSet[int]], - neighbors_map: Mapping[int, AbstractSet[int]], - unmeasured: AbstractSet[int], - input_nodes: AbstractSet[int], - output_nodes: AbstractSet[int], - max_qubit_count: int, -) -> tuple[dict[int, int], dict[int, int]]: + # Single implementation for both bounded and unbounded capacity modes. prepare_time: dict[int, int] = {} measure_time: dict[int, int] = {} @@ -137,7 +83,9 @@ def _greedy_minimize_time_limited( # noqa: C901, PLR0912, PLR0913, PLR0917 prepared: set[int] = set(input_nodes) alive: set[int] = set(input_nodes) - if len(alive) > max_qubit_count: + effective_max_qubit_count = max_qubit_count if max_qubit_count is not None else len(graph.physical_nodes) + + if len(alive) > effective_max_qubit_count: msg = "Initial number of active qubits exceeds max_qubit_count." raise RuntimeError(msg) @@ -147,55 +95,36 @@ def _greedy_minimize_time_limited( # noqa: C901, PLR0912, PLR0913, PLR0917 current_time = 0 while unmeasured_mut: - # Phase 1: Measure all ready nodes - # A node is ready if: - # - DAG dependencies are resolved (inv_dag_mut[node] is empty) - # - All neighbors are prepared - # - The node itself is prepared (if not an input node) - ready_to_measure: set[int] = set() - for node in unmeasured_mut: - if inv_dag_mut[node]: - continue # DAG dependencies not resolved - if not neighbors_map[node] <= prepared: - continue # Neighbors not prepared - if node not in input_nodes and node not in prepared: - continue # Self not prepared - ready_to_measure.add(node) - - for node in ready_to_measure: - measure_time[node] = current_time - unmeasured_mut.remove(node) - alive.discard(node) - - # Update DAG dependencies - for child in dag.get(node, ()): - inv_dag_mut[child].discard(node) - - # Phase 2: Prepare nodes using free capacity (slack-filling) - free_capacity = max_qubit_count - len(alive) - - if free_capacity > 0: - # Get unprepared nodes with their priority scores - unprepared = graph.physical_nodes - prepared - if unprepared: - prep_candidates = _get_prep_candidates_with_priority( - unprepared, - inv_dag_mut, - neighbors_map, - prepared, - unmeasured_mut, - output_nodes, - criticality, - ) - # Prepare top candidates within free capacity - for candidate, _score in prep_candidates[:free_capacity]: - prepare_time[candidate] = current_time - prepared.add(candidate) - alive.add(candidate) + ready_to_measure = _phase1_measure_ready_nodes( + current_time, + dag=dag, + inv_dag=inv_dag_mut, + neighbors_map=neighbors_map, + input_nodes=input_nodes, + prepared=prepared, + alive=alive, + unmeasured=unmeasured_mut, + measure_time=measure_time, + ) + prepared_in_phase2 = _phase2_prepare_nodes_with_slack( + current_time, + physical_nodes=graph.physical_nodes, + max_qubit_count=effective_max_qubit_count, + inv_dag=inv_dag_mut, + neighbors_map=neighbors_map, + prepared=prepared, + alive=alive, + unmeasured=unmeasured_mut, + output_nodes=output_nodes, + criticality=criticality, + prepare_time=prepare_time, + ) # Check if we made progress - if not ready_to_measure and free_capacity == 0 and unmeasured_mut: - # No measurements and no room to prepare - stuck + if not ready_to_measure and not prepared_in_phase2 and unmeasured_mut: + if max_qubit_count is None: + raise RuntimeError(TOPO_ORDER_CYCLE_ERROR_MSG) + # No measurements and no room to prepare under qubit-capacity constraint. msg = ( "Cannot schedule more measurements without exceeding max qubit count. Please increase max_qubit_count." ) @@ -214,6 +143,82 @@ def _greedy_minimize_time_limited( # noqa: C901, PLR0912, PLR0913, PLR0917 return prepare_time, measure_time +def _phase1_measure_ready_nodes( # noqa: PLR0913 + current_time: int, + *, + dag: Mapping[int, AbstractSet[int]], + inv_dag: dict[int, set[int]], + neighbors_map: Mapping[int, AbstractSet[int]], + input_nodes: AbstractSet[int], + prepared: set[int], + alive: set[int], + unmeasured: set[int], + measure_time: dict[int, int], +) -> set[int]: + # Phase 1: measure all currently ready nodes. + ready_to_measure: set[int] = set() + for node in unmeasured: + if inv_dag[node]: + continue + if not neighbors_map[node] <= prepared: + continue + if node not in input_nodes and node not in prepared: + continue + ready_to_measure.add(node) + + for node in ready_to_measure: + measure_time[node] = current_time + unmeasured.remove(node) + alive.discard(node) + for child in dag.get(node, ()): + inv_dag[child].discard(node) + + return ready_to_measure + + +def _phase2_prepare_nodes_with_slack( # noqa: PLR0913 + current_time: int, + *, + physical_nodes: AbstractSet[int], + max_qubit_count: int, + inv_dag: Mapping[int, AbstractSet[int]], + neighbors_map: Mapping[int, AbstractSet[int]], + prepared: set[int], + alive: set[int], + unmeasured: AbstractSet[int], + output_nodes: AbstractSet[int], + criticality: Mapping[int, int], + prepare_time: dict[int, int], +) -> bool: + # Phase 2: fill free qubit capacity with high-priority preparation candidates. + free_capacity = max_qubit_count - len(alive) + if free_capacity <= 0: + return False + + unprepared = physical_nodes - prepared + if not unprepared: + return False + + prep_candidates = _get_prep_candidates_with_priority( + unprepared, + inv_dag, + neighbors_map, + prepared, + unmeasured, + output_nodes, + criticality, + ) + + prepared_in_phase2 = False + for candidate, _score in prep_candidates[:free_capacity]: + prepare_time[candidate] = current_time + prepared.add(candidate) + alive.add(candidate) + prepared_in_phase2 = True + + return prepared_in_phase2 + + def _compute_criticality( dag: Mapping[int, AbstractSet[int]], output_nodes: AbstractSet[int], @@ -222,13 +227,9 @@ def _compute_criticality( # Nodes with higher criticality should be prioritized for unblocking. criticality: dict[int, int] = {} - # TopologicalSorter(dag) returns nodes with no "dependencies" first. - # Since dag is {parent: children}, nodes with empty children come first (leaves). - # This is the correct order for computing criticality (leaves before roots). - try: - topo_order = list(TopologicalSorter(dag).static_order()) - except CycleError: - return {} + # For criticality we need children first, so reverse dependency-first topo order. + topo_order = topo_order_from_inv_dag(inverse_dag_from_dag(dag)) + topo_order.reverse() for node in topo_order: children_crits = [criticality.get(c, 0) for c in dag.get(node, ())] @@ -288,7 +289,7 @@ def _get_prep_candidates_with_priority( # noqa: PLR0913, PLR0917 return scores -def greedy_minimize_space( # noqa: C901, PLR0914, PLR0915 +def greedy_minimize_space( # noqa: PLR0914 graph: BaseGraphState, dag: Mapping[int, AbstractSet[int]], ) -> tuple[dict[int, int], dict[int, int]]: @@ -323,20 +324,10 @@ def greedy_minimize_space( # noqa: C901, PLR0914, PLR0915 unmeasured = graph.physical_nodes - graph.output_node_indices.keys() - try: - topo_order = list(TopologicalSorter(dag).static_order()) - except CycleError as exc: - msg = "No nodes can be measured; possible cyclic dependency or incomplete preparation." - raise RuntimeError(msg) from exc - topo_order.reverse() # from parents to children + inv_dag = inverse_dag_from_dag(dag, graph.physical_nodes) + topo_order = topo_order_from_inv_dag(inv_dag) # from parents to children topo_rank = {node: i for i, node in enumerate(topo_order)} - # Build inverse DAG: for each node, track which nodes must be measured before it - inv_dag: dict[int, set[int]] = {node: set() for node in graph.physical_nodes} - for parent, children in dag.items(): - for child in children: - inv_dag[child].add(parent) - input_nodes = set(graph.input_node_indices.keys()) prepared: set[int] = set(input_nodes) alive: set[int] = set(input_nodes) @@ -349,8 +340,7 @@ def greedy_minimize_space( # noqa: C901, PLR0914, PLR0915 while unmeasured: if not measure_candidates: - msg = "No nodes can be measured; possible cyclic dependency or incomplete preparation." - raise RuntimeError(msg) + raise RuntimeError(TOPO_ORDER_CYCLE_ERROR_MSG) # calculate costs and pick the best node to measure default_rank = len(topo_rank) diff --git a/tests/test_feedforward.py b/tests/test_feedforward.py index b49beb644..6907e99a8 100644 --- a/tests/test_feedforward.py +++ b/tests/test_feedforward.py @@ -6,14 +6,17 @@ from graphqomb.circuit import MBQCCircuit, circuit2graph from graphqomb.common import Axis, AxisMeasBasis, Plane, PlannerMeasBasis, Sign from graphqomb.feedforward import ( + TOPO_ORDER_CYCLE_ERROR_MSG, _is_flow, _is_gflow, check_dag, check_flow, dag_from_flow, + inverse_dag_from_dag, pauli_simplification, propagate_correction_map, signal_shifting, + topo_order_from_inv_dag, ) from graphqomb.graphstate import GraphState from graphqomb.qompiler import qompile @@ -96,6 +99,46 @@ def test_check_flow_true_for_acyclic() -> None: check_flow(graphstate, flow) +def test_topo_order_from_inv_dag_basic() -> None: + inv_dag = { + 0: set(), + 1: {0}, + 2: {1}, + } + assert topo_order_from_inv_dag(inv_dag) == [0, 1, 2] + + +def test_inverse_dag_from_dag_basic() -> None: + dag = { + 0: {1, 2}, + 1: {2}, + 2: set(), + } + assert inverse_dag_from_dag(dag) == { + 0: set(), + 1: {0}, + 2: {0, 1}, + } + + +def test_inverse_dag_from_dag_with_all_nodes() -> None: + dag = {0: {1}} + assert inverse_dag_from_dag(dag, all_nodes={0, 1, 2}) == { + 0: set(), + 1: {0}, + 2: set(), + } + + +def test_topo_order_from_inv_dag_cycle_raises() -> None: + inv_dag = { + 0: {1}, + 1: {0}, + } + with pytest.raises(RuntimeError, match=TOPO_ORDER_CYCLE_ERROR_MSG): + topo_order_from_inv_dag(inv_dag) + + # Tests for propagate_correction_map