diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 214b235..30cec2f 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -26,7 +26,7 @@ jobs: fail-fast: false matrix: os: ['ubuntu-latest', 'windows-2022', 'macos-latest'] - python: ['3.8', '3.9', '3.10', '3.11'] + python: ['3.9', '3.10', '3.11', '3.12'] name: "Python ${{ matrix.python }} / ${{ matrix.os }}" runs-on: ${{ matrix.os }} diff --git a/CHANGELOG.md b/CHANGELOG.md index f1d811d..f795315 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -11,6 +11,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ### Changed - Updated to support Qiskit 1.0. +- Modified the APIs for Qiskit simulation and execution on IBMQ hardware. ### Fixed diff --git a/docs/source/tutorial.rst b/docs/source/tutorial.rst index e7bec8c..7a47076 100644 --- a/docs/source/tutorial.rst +++ b/docs/source/tutorial.rst @@ -29,7 +29,7 @@ First, let us import relevant modules and define function we will use: .. code-block:: python from graphix import Circuit - from graphix_ibmq.runner import IBMQBackend + from graphix_ibmq.backend import IBMQBackend import qiskit.quantum_info as qi from qiskit.visualization import plot_histogram import numpy as np diff --git a/examples/gallery/aer_sim.py b/examples/gallery/aer_sim.py index 66dc993..2e01b70 100644 --- a/examples/gallery/aer_sim.py +++ b/examples/gallery/aer_sim.py @@ -12,10 +12,9 @@ import matplotlib.pyplot as plt import networkx as nx import random -from graphix import Circuit -from graphix_ibmq.runner import IBMQBackend -from qiskit.tools.visualization import plot_histogram -from qiskit_aer.noise import NoiseModel, depolarizing_error +from graphix.transpiler import Circuit +from graphix_ibmq.backend import IBMQBackend +from qiskit.visualization import plot_histogram def cp(circuit, theta, control, target): @@ -84,23 +83,27 @@ def swap(circuit, a, b): pattern.minimize_space() # convert to qiskit circuit -backend = IBMQBackend(pattern) -print(type(backend.circ)) +backend = IBMQBackend.from_simulator() +compiled = backend.compile(pattern) #%% # We can now simulate the circuit with Aer. # run and get counts -result = backend.simulate() +job = backend.submit_job(compiled, shots=1024) +result = job.retrieve_result() #%% # We can also simulate the circuit with noise model # create an empty noise model +from qiskit_aer.noise import NoiseModel, depolarizing_error + +# add depolarizing error to all single qubit gates noise_model = NoiseModel() -# add depolarizing error to all single qubit u1, u2, u3 gates error = depolarizing_error(0.01, 1) -noise_model.add_all_qubit_quantum_error(error, ["u1", "u2", "u3"]) +noise_model.add_all_qubit_quantum_error(error, ["id", "rz", "sx", "x", "u1"]) +backend = IBMQBackend.from_simulator(noise_model=noise_model) # print noise model info print(noise_model) @@ -109,7 +112,8 @@ def swap(circuit, a, b): # Now we can run the simulation with noise model # run and get counts -result_noise = backend.simulate(noise_model=noise_model) +job = backend.submit_job(compiled, shots=1024) +result_noise = job.retrieve_result() #%% @@ -137,31 +141,3 @@ def swap(circuit, a, b): legend = ax.legend(fontsize=18) legend = ax.legend(loc='upper left') # %% - - -#%% -# Example demonstrating how to run a pattern on an IBM Quantum device. All explanations are provided as comments. - -# First, load the IBMQ account using an API token. -""" -from qiskit_ibm_runtime import QiskitRuntimeService -service = QiskitRuntimeService(channel="ibm_quantum", token="your_ibm_token", instance="ibm-q/open/main") -""" - -# Then, select the quantum system on which to run the circuit. -# If no system is specified, the least busy system will be automatically selected. -""" -backend.get_system(service, "ibm_kyoto") -""" - -# Finally, transpile the quantum circuit for the chosen system and execute it. -""" -backend.transpile() -result = backend.run(shots=128) -""" - -# To retrieve the result at a later time, use the code below. -""" -result = backend.retrieve_result("your_job_id") -""" -# %% diff --git a/examples/gallery/qiskit_to_graphix.py b/examples/gallery/qiskit_to_graphix.py index d79ac7d..69d643f 100644 --- a/examples/gallery/qiskit_to_graphix.py +++ b/examples/gallery/qiskit_to_graphix.py @@ -9,7 +9,7 @@ # %% -from qiskit import QuantumCircuit, transpile +from qiskit import transpile from qiskit.circuit.random.utils import random_circuit qc = random_circuit(5, 2, seed=42) diff --git a/examples/ibm_device.py b/examples/ibm_device.py index 2f3bf01..32f6330 100644 --- a/examples/ibm_device.py +++ b/examples/ibm_device.py @@ -12,10 +12,9 @@ import matplotlib.pyplot as plt import networkx as nx import random -from graphix import Circuit -from graphix_ibmq.runner import IBMQBackend -from qiskit_ibm_provider import IBMProvider -from qiskit.tools.visualization import plot_histogram +from graphix.transpiler import Circuit +from graphix_ibmq.backend import IBMQBackend +from qiskit.visualization import plot_histogram from qiskit.providers.fake_provider import FakeLagos @@ -68,7 +67,7 @@ def swap(circuit, a, b): swap(circuit, 0, 2) # transpile and plot the graph -pattern = circuit.transpile() +pattern = circuit.transpile().pattern nodes, edges = pattern.get_graph() g = nx.Graph() g.add_nodes_from(nodes) @@ -84,45 +83,39 @@ def swap(circuit, a, b): pattern.minimize_space() # convert to qiskit circuit -backend = IBMQBackend(pattern) -backend.to_qiskit() -print(type(backend.circ)) +backend = IBMQBackend.from_simulator() +compiled = backend.compile(pattern) #%% # load the account with API token -IBMProvider.save_account(token='MY API TOKEN') +from qiskit_ibm_runtime import QiskitRuntimeService +QiskitRuntimeService.save_account(channel="ibm_quantum", token="API TOKEN", overwrite=True) # get the device backend -instance_name = 'ibm-q/open/main' -backend_name = "ibm_lagos" -backend.get_backend(instance=instance_name,resource=backend_name) - -#%% -# Get provider and the backend. - -instance_name = "ibm-q/open/main" -backend_name = "ibm_lagos" - -backend.get_backend(instance=instance_name, resource=backend_name) +backend = IBMQBackend.from_hardware() #%% # We can now execute the circuit on the device backend. - -result = backend.run() +compiled = backend.compile(pattern) +job = backend.submit_job(compiled, shots=1024) #%% -# Retrieve the job if needed +# Retrieve the job result -# result = backend.retrieve_result("Job ID") +if job.is_done: + result = job.retrieve_result() #%% -# We can simulate the circuit with noise model based on the device we used +# We can simulate the circuit with device-based noise model. # get the noise model of the device backend -backend_noisemodel = FakeLagos() +from qiskit_ibm_runtime.fake_provider import FakeManilaV2 +backend = IBMQBackend.from_simulator(from_backend=FakeManilaV2()) # execute noisy simulation and get counts -result_noise = backend.simulate(noise_model=backend_noisemodel) +compiled = backend.compile(pattern) +job = backend.submit_job(compiled, shots=1024) +result_noise = job.retrieve_result() #%% # Now let us compare the results with theoretical output diff --git a/graphix_ibmq/backend.py b/graphix_ibmq/backend.py new file mode 100644 index 0000000..63aeaa9 --- /dev/null +++ b/graphix_ibmq/backend.py @@ -0,0 +1,145 @@ +from __future__ import annotations +from typing import TYPE_CHECKING +import logging + +from qiskit_aer import AerSimulator +from qiskit_aer.noise import NoiseModel +from qiskit.transpiler.preset_passmanagers import generate_preset_pass_manager +from qiskit_ibm_runtime import SamplerV2 as Sampler, QiskitRuntimeService + +from graphix_ibmq.compile_options import IBMQCompileOptions +from graphix_ibmq.compiler import IBMQPatternCompiler, IBMQCompiledCircuit +from graphix_ibmq.job import IBMQJob + +if TYPE_CHECKING: + from graphix.pattern import Pattern + from qiskit.providers.backend import BackendV2 + +logger = logging.getLogger(__name__) + + +class IBMQBackend: + """ + Manages compilation and execution on IBMQ simulators or hardware. + + This class configures the execution target and provides methods to compile + a graphix Pattern and submit it as a job. Instances should be created using + the `from_simulator` or `from_hardware` classmethods. + """ + + def __init__(self, backend: BackendV2 | None = None, options: IBMQCompileOptions | None = None) -> None: + if backend is None or options is None: + raise TypeError( + "IBMQBackend cannot be instantiated directly. " + "Please use the classmethods `IBMQBackend.from_simulator()` " + "or `IBMQBackend.from_hardware()`." + ) + self._backend: BackendV2 = backend + self._options: IBMQCompileOptions = options + + @classmethod + def from_simulator( + cls, + noise_model: NoiseModel | None = None, + from_backend: BackendV2 | None = None, + options: IBMQCompileOptions | None = None, + ) -> IBMQBackend: + """Creates an instance with a local Aer simulator as the backend. + + Parameters + ---------- + noise_model : NoiseModel, optional + A custom noise model for the simulation. + from_backend : BackendV2, optional + A hardware backend to base the noise model on. + Ignored if `noise_model` is provided. + options : IBMQCompileOptions, optional + Compilation and execution options. + """ + if noise_model is None and from_backend is not None: + noise_model = NoiseModel.from_backend(from_backend) + + aer_backend = AerSimulator(noise_model=noise_model) + compile_options = options if options is not None else IBMQCompileOptions() + + logger.info("Backend set to local AerSimulator.") + return cls(backend=aer_backend, options=compile_options) + + @classmethod + def from_hardware( + cls, + name: str | None = None, + min_qubits: int = 1, + options: IBMQCompileOptions | None = None, + ) -> IBMQBackend: + """Creates an instance with a real IBM Quantum hardware device as the backend. + + Parameters + ---------- + name : str, optional + The specific name of the device (e.g., 'ibm_brisbane'). If None, + the least busy backend with at least `min_qubits` will be selected. + min_qubits : int + The minimum number of qubits required. + options : IBMQCompileOptions, optional + Compilation and execution options. + """ + service = QiskitRuntimeService() + if name: + hw_backend = service.backend(name) + else: + hw_backend = service.least_busy(min_num_qubits=min_qubits, operational=True) + + compile_options = options if options is not None else IBMQCompileOptions() + + logger.info("Selected hardware backend: %s", hw_backend.name) + return cls(backend=hw_backend, options=compile_options) + + @staticmethod + def compile(pattern: Pattern, save_statevector: bool = False) -> IBMQCompiledCircuit: + """Compiles a graphix pattern into a Qiskit QuantumCircuit. + + This method is provided as a staticmethod because it does not depend + on the backend's state. + + Parameters + ---------- + pattern : Pattern + The graphix pattern to compile. + save_statevector : bool + If True, saves the statevector before the final measurement. + + Returns + ------- + IBMQCompiledCircuit + An object containing the compiled circuit and related metadata. + """ + compiler = IBMQPatternCompiler(pattern) + return compiler.compile(save_statevector=save_statevector) + + def submit_job(self, compiled_circuit: IBMQCompiledCircuit, shots: int = 1024) -> IBMQJob: + """ + Submits the compiled circuit to the configured backend for execution. + + Parameters + ---------- + compiled_circuit : IBMQCompiledCircuit + The compiled circuit object from the `compile` method. + shots : int, optional + The number of execution shots. Defaults to 1024. + + Returns + ------- + IBMQJob + A job object to monitor execution and retrieve results. + """ + pass_manager = generate_preset_pass_manager( + backend=self._backend, + optimization_level=self._options.optimization_level, + ) + transpiled_circuit = pass_manager.run(compiled_circuit.circuit) + + sampler = Sampler(mode=self._backend) + job = sampler.run([transpiled_circuit], shots=shots) + + return IBMQJob(job, compiled_circuit) diff --git a/graphix_ibmq/compile_options.py b/graphix_ibmq/compile_options.py new file mode 100644 index 0000000..1fc0972 --- /dev/null +++ b/graphix_ibmq/compile_options.py @@ -0,0 +1,30 @@ +from __future__ import annotations + +from dataclasses import dataclass + + +@dataclass +class IBMQCompileOptions: + """Compilation options specific to IBMQ backends. + + Attributes + ---------- + optimization_level : int + Optimization level for Qiskit transpiler (0 to 3). + save_statevector : bool + Whether to save the statevector before measurement (for debugging/testing). + layout_method : str + Qubit layout method used by the transpiler (for future use). + """ + + optimization_level: int = 3 + save_statevector: bool = False + layout_method: str = "trivial" + + def __repr__(self) -> str: + """Return a string representation of the compilation options.""" + return ( + f"IBMQCompileOptions(optimization_level={self.optimization_level}, " + f"save_statevector={self.save_statevector}, " + f"layout_method='{self.layout_method}')" + ) diff --git a/graphix_ibmq/compiler.py b/graphix_ibmq/compiler.py new file mode 100644 index 0000000..8751d4e --- /dev/null +++ b/graphix_ibmq/compiler.py @@ -0,0 +1,198 @@ +from __future__ import annotations +from dataclasses import dataclass + +import numpy as np +from qiskit import ClassicalRegister, QuantumRegister, QuantumCircuit + +from graphix.command import Command, N, M, E, X, Z, C +from graphix.fundamentals import Plane +from qiskit.circuit.classical import expr + +from typing import TYPE_CHECKING, Callable, Mapping, Sequence, Iterable + +if TYPE_CHECKING: + from graphix.pattern import Pattern + +CommandHandler = Callable[[Command], None] + + +class IBMQPatternCompiler: + def __init__(self, pattern: Pattern) -> None: + """ + Initializes the compiler with a given pattern. + + Parameters + ---------- + pattern : Pattern + The measurement-based quantum computation pattern. + """ + self._pattern = pattern + + num_qubits = self._pattern.max_space() + num_nodes = self._pattern.n_node + + qr = QuantumRegister(num_qubits) + self._classical_register = ClassicalRegister(num_nodes, name="meas") + self._circuit = QuantumCircuit(qr, self._classical_register) + + self._available_qubits = list(range(num_qubits)) + self._qubit_map: dict[int, int] = {} + self._creg_map: dict[int, int] = {} + self._next_creg_idx: int = 0 + + for node_idx in self._pattern.input_nodes: + circ_idx = self._allocate_qubit(node_idx) + self._circuit.h(circ_idx) + + def compile(self, save_statevector: bool = False) -> IBMQCompiledCircuit: + """ + Converts the MBQC pattern into a Qiskit QuantumCircuit. + + Parameters + ---------- + save_statevector : bool + If True, saves the statevector before output measurement. + + Returns + ------- + IBMQCompiledCircuit + A data class containing the compiled circuit and associated metadata. + """ + self._process_commands() + output_qubits = self._finalize_circuit(save_statevector) + + return IBMQCompiledCircuit( + circuit=self._circuit, + pattern=self._pattern, + register_dict=self._creg_map, + circ_output=output_qubits, + ) + + def _process_commands(self) -> None: + """Iterates through and processes all commands in the pattern.""" + for cmd in self._pattern: # Iterable[Command] + if isinstance(cmd, N): + self._apply_n(cmd) + elif isinstance(cmd, E): + self._apply_e(cmd) + elif isinstance(cmd, M): + self._apply_m(cmd) + elif isinstance(cmd, X): + self._apply_x(cmd) + elif isinstance(cmd, Z): + self._apply_z(cmd) + elif isinstance(cmd, C): + self._apply_c(cmd) + + def _allocate_qubit(self, node_idx: int) -> int: + """Allocates a qubit from the pool, resets it, and maps it to a node.""" + if not self._available_qubits: + raise RuntimeError("No available qubits to allocate.") + circ_idx = self._available_qubits.pop(0) + self._circuit.reset(circ_idx) + self._qubit_map[node_idx] = circ_idx + return circ_idx + + def _release_qubit(self, circ_idx: int) -> None: + """Releases a qubit, making it available for reuse.""" + self._available_qubits.append(circ_idx) + + def _apply_n(self, cmd: N) -> None: + """Handles the N command: create a new qubit in the |+> state.""" + circ_idx = self._allocate_qubit(cmd.node) + self._circuit.h(circ_idx) + + def _apply_e(self, cmd: E) -> None: + """Handles the E command: apply a CZ gate between two qubits.""" + qubit1 = self._qubit_map[cmd.nodes[0]] + qubit2 = self._qubit_map[cmd.nodes[1]] + self._circuit.cz(qubit1, qubit2) + + def _apply_m(self, cmd: M) -> None: + """Handles the M command: perform a measurement.""" + if cmd.plane != Plane.XY: + raise NotImplementedError("Non-XY plane measurements are not supported.") + + circ_idx = self._qubit_map[cmd.node] + + self._apply_classical_feedforward("X", circ_idx, cmd.s_domain) + self._apply_classical_feedforward("Z", circ_idx, cmd.t_domain) + + if cmd.angle != 0: + self._circuit.p(-cmd.angle * np.pi, circ_idx) + + self._circuit.h(circ_idx) + self._circuit.measure(circ_idx, self._next_creg_idx) + + self._creg_map[cmd.node] = self._next_creg_idx + self._next_creg_idx += 1 + self._release_qubit(circ_idx) + + def _apply_x(self, cmd: X) -> None: + """Handles the X command: apply a Pauli X correction.""" + circ_idx = self._qubit_map[cmd.node] + self._apply_classical_feedforward("X", circ_idx, cmd.domain) + + def _apply_z(self, cmd: Z) -> None: + """Handles the Z command: apply a Pauli Z correction.""" + circ_idx = self._qubit_map[cmd.node] + self._apply_classical_feedforward("Z", circ_idx, cmd.domain) + + def _apply_c(self, cmd: C) -> None: + """Handles the C command: apply a custom Qiskit circuit method.""" + circ_idx = self._qubit_map[cmd.node] + for method_name in cmd.clifford.qasm3: + getattr(self._circuit, method_name)(circ_idx) + + def _apply_classical_feedforward(self, op: str, target_qubit: int, domain: Iterable[int]) -> None: + """Applies classically-controlled X or Z gates based on measurement outcomes.""" + gate_map = {"X": self._circuit.x, "Z": self._circuit.z} + if op not in gate_map: + return + + apply_gate = gate_map[op] + + for node_idx in domain: + if node_idx in self._creg_map: + creg_idx = self._creg_map[node_idx] + clbit = self._classical_register[creg_idx] + cond = expr.equal(clbit, True) + with self._circuit.if_test(cond): + apply_gate(target_qubit) + elif self._pattern.results.get(node_idx) == 1: + apply_gate(target_qubit) + + def _finalize_circuit(self, save_statevector: bool) -> list[int]: + """Handles output measurements and optional statevector saving.""" + output_qubits = [self._qubit_map[node] for node in self._pattern.output_nodes] + + if save_statevector: + self._circuit.save_statevector() + + for node in self._pattern.output_nodes: + circ_idx = self._qubit_map[node] + self._circuit.measure(circ_idx, self._next_creg_idx) + self._creg_map[node] = self._next_creg_idx + self._next_creg_idx += 1 + + return output_qubits if save_statevector else [] + + +@dataclass +class IBMQCompiledCircuit: + """A compiled circuit with its associated pattern and register mapping. + + Attributes + ---------- + circuit : QuantumCircuit + The Qiskit quantum circuit generated from the pattern. + register_dict : Mapping[int, int] + Mapping from pattern node indices to classical register indices. + circ_output : Sequence[int] + List of output qubit indices in the compiled circuit. + """ + + circuit: QuantumCircuit + pattern: Pattern + register_dict: Mapping[int, int] + circ_output: Sequence[int] diff --git a/graphix_ibmq/converter.py b/graphix_ibmq/converter.py index 83c8422..243927a 100644 --- a/graphix_ibmq/converter.py +++ b/graphix_ibmq/converter.py @@ -1,3 +1,5 @@ +from __future__ import annotations + from graphix import Circuit from qiskit import QuantumCircuit, transpile diff --git a/graphix_ibmq/job.py b/graphix_ibmq/job.py new file mode 100644 index 0000000..fca438b --- /dev/null +++ b/graphix_ibmq/job.py @@ -0,0 +1,81 @@ +from __future__ import annotations +from dataclasses import dataclass +from typing import TYPE_CHECKING + +from qiskit.providers.jobstatus import JobStatus +from graphix_ibmq.result_utils import format_result + +if TYPE_CHECKING: + from qiskit_ibm_runtime.fake_provider.local_runtime_job import LocalRuntimeJob + from qiskit_ibm_runtime.runtime_job_v2 import RuntimeJobV2 + from graphix_ibmq.compiler import IBMQCompiledCircuit + + +@dataclass +class IBMQJob: + """ + A handler for jobs submitted to IBMQ devices and simulators. + + This class wraps a Qiskit job object and the corresponding compiled circuit, + providing methods to check the job's status and retrieve formatted results. + """ + + job: LocalRuntimeJob | RuntimeJobV2 + compiled_circuit: IBMQCompiledCircuit + + @property + def id(self) -> str: + """ + Returns the unique identifier of the job. + + Returns + ------- + str + The job ID. + """ + job_id: str = self.job.job_id() + return job_id + + @property + def is_done(self) -> bool: + """ + Checks if the job has completed execution. + + Returns + ------- + bool + True if the job is done, False otherwise. + """ + is_done: bool = self.job.status() is JobStatus.DONE + return is_done + + def retrieve_result(self, raw_result: bool = False) -> dict[str, int] | None: + """ + Retrieves the result from a completed job. + + If the job is not yet complete, this method returns None. + + Parameters + ---------- + raw_result : bool, optional + If True, returns the raw measurement counts dictionary. + If False (default), returns results formatted by the graphix pattern. + + Returns + ------- + dict or None + A dictionary containing the formatted results or raw counts. + Returns None if the job has not yet finished. + """ + if not self.is_done: + return None + + # Result from SamplerV2 contains a list of pub_results. + # We assume a single circuit was run, so we take the first element [0]. + result = self.job.result() + counts: dict[str, int] = result[0].data.meas.get_counts() + + if raw_result: + return counts + + return format_result(counts, self.compiled_circuit.pattern, self.compiled_circuit.register_dict) diff --git a/graphix_ibmq/result_utils.py b/graphix_ibmq/result_utils.py new file mode 100644 index 0000000..597e1f8 --- /dev/null +++ b/graphix_ibmq/result_utils.py @@ -0,0 +1,34 @@ +from __future__ import annotations + +from typing import TYPE_CHECKING, Mapping + +if TYPE_CHECKING: + from graphix.pattern import Pattern + + +def format_result(result: Mapping[str, int], pattern: Pattern, register_dict: Mapping[int, int]) -> dict[str, int]: + """Format raw measurement results into output-only bitstrings. + + Parameters + ---------- + result : dict of str to int + Raw result counts as returned by Qiskit (full bitstrings). + pattern : Pattern + Graphix pattern object containing output node information. + register_dict : dict of int to int + Mapping from pattern node index to classical register index. + + Returns + ------- + formatted : dict of str to int + Dictionary of bitstrings only for output nodes and their counts. + """ + n_node = pattern.n_node + output_keys = [register_dict[node] for node in pattern.output_nodes] + + formatted: dict[str, int] = {} + for bitstring, count in result.items(): + masked = "".join(bitstring[n_node - 1 - idx] for idx in output_keys) + formatted[masked] = formatted.get(masked, 0) + count + + return formatted diff --git a/graphix_ibmq/runner.py b/graphix_ibmq/runner.py deleted file mode 100644 index f29ae0d..0000000 --- a/graphix_ibmq/runner.py +++ /dev/null @@ -1,374 +0,0 @@ -from __future__ import annotations - -from typing import TYPE_CHECKING - -if TYPE_CHECKING: - from graphix.pattern import Pattern - -import numpy as np -from qiskit import ClassicalRegister, QuantumCircuit, QuantumRegister, transpile -from qiskit_aer import AerSimulator -from qiskit_aer.noise import NoiseModel -from qiskit_ibm_runtime import QiskitRuntimeService, IBMBackend, SamplerV2 - -from graphix_ibmq.clifford import CLIFFORD_CONJ, CLIFFORD_TO_QISKIT - - -class IBMQBackend: - """Interface for MBQC pattern execution on IBM quantum devices. - - Attributes - ---------- - pattern: graphix.Pattern object - MBQC pattern to be run on the device - circ: qiskit.circuit.quantumcircuit.QuantumCircuit object - qiskit circuit corresponding to the pattern. - service: qiskit_ibm_runtime.qiskit_runtime_service.QiskitRuntimeService object - the runtime service object. - system: qiskit_ibm_runtime.ibm_backend.IBMBackend object - the system to be used for the execution. - """ - - def __init__(self, pattern: Pattern): - """ - - Parameters - ---------- - pattern: graphix.Pattern object - MBQC pattern to be run on the IBMQ device or Aer simulator. - """ - self.pattern = pattern - self.to_qiskit() - - def get_system(self, service: QiskitRuntimeService, system: str = None): - """Get the system to be used for the execution. - - Parameters - ---------- - service: qiskit_ibm_runtime.qiskit_runtime_service.QiskitRuntimeService object - the runtime service object. - system: str, optional - the system name to be used. If None, the least busy system is used. - """ - if not isinstance(service, QiskitRuntimeService): - raise ValueError("Invalid service object.") - self.service = service - if system is not None: - if system not in [system_cand.name for system_cand in self.service.backends()]: - raise ValueError(f"{system} is not available.") - self.system = self.service.backend(system) - else: - self.system = self.service.least_busy(min_num_qubits=self.pattern.max_space(), operational=True) - - print(f"Using system {self.system.name}") - - def to_qiskit(self, save_statevector: bool = False): - """convert the MBQC pattern to the qiskit cuicuit and add to attributes. - - Parameters - ---------- - save_statevector : bool, optional - whether to save the statevector before the measurements of output qubits. - """ - n = self.pattern.max_space() - N_node = self.pattern.Nnode - - qr = QuantumRegister(n) - cr = ClassicalRegister(N_node) - circ = QuantumCircuit(qr, cr) - - empty_qubit = [i for i in range(n)] # list of free qubit indices - qubit_dict = {} # dictionary to record the correspondance of pattern nodes and circuit qubits - register_dict = {} # dictionary to record the correspondance of pattern nodes and classical registers - reg_idx = 0 # index of classical register - - def signal_process(op, signal): - if op == "X": - for s in signal: - if s in register_dict.keys(): - s_idx = register_dict[s] - with circ.if_test((cr[s_idx], 1)): - circ.x(circ_idx) - else: - if self.pattern.results[s] == 1: - circ.x(circ_idx) - if op == "Z": - for s in signal: - if s in register_dict.keys(): - s_idx = register_dict[s] - with circ.if_test((cr[s_idx], 1)): - circ.z(circ_idx) - else: - if self.pattern.results[s] == 1: - circ.z(circ_idx) - - for i in self.pattern.input_nodes: - circ_idx = empty_qubit[0] - empty_qubit.pop(0) - circ.reset(circ_idx) - circ.h(circ_idx) - qubit_dict[i] = circ_idx - - for cmd in self.pattern: - - if cmd[0] == "N": - circ_idx = empty_qubit[0] - empty_qubit.pop(0) - circ.reset(circ_idx) - circ.h(circ_idx) - qubit_dict[cmd[1]] = circ_idx - - if cmd[0] == "E": - circ.cz(qubit_dict[cmd[1][0]], qubit_dict[cmd[1][1]]) - - if cmd[0] == "M": - circ_idx = qubit_dict[cmd[1]] - plane = cmd[2] - alpha = cmd[3] * np.pi - s_list = cmd[4] - t_list = cmd[5] - - if len(cmd) == 6: - if plane == "XY": - # act p and h to implement non-Z-basis measurement - if alpha != 0: - signal_process("X", s_list) - circ.p(-alpha, circ_idx) # align |+_alpha> (or |+_-alpha>) with |+> - - signal_process("Z", t_list) - - circ.h(circ_idx) # align |+> with |0> - circ.measure(circ_idx, reg_idx) # measure and store the result - register_dict[cmd[1]] = reg_idx - reg_idx += 1 - empty_qubit.append(circ_idx) # liberate the circuit qubit - - elif len(cmd) == 7: - cid = cmd[6] - for op in CLIFFORD_TO_QISKIT[CLIFFORD_CONJ[cid]]: - exec(f"circ.{op}({circ_idx})") - - if plane == "XY": - # act p and h to implement non-Z-basis measurement - if alpha != 0: - signal_process("X", s_list) - circ.p(-alpha, circ_idx) # align |+_alpha> (or |+_-alpha>) with |+> - - signal_process("Z", t_list) - - circ.h(circ_idx) # align |+> with |0> - circ.measure(circ_idx, reg_idx) # measure and store the result - register_dict[cmd[1]] = reg_idx - reg_idx += 1 - circ.measure(circ_idx, reg_idx) # measure and store the result - empty_qubit.append(circ_idx) # liberate the circuit qubit - - if cmd[0] == "X": - circ_idx = qubit_dict[cmd[1]] - s_list = cmd[2] - signal_process("X", s_list) - - if cmd[0] == "Z": - circ_idx = qubit_dict[cmd[1]] - s_list = cmd[2] - signal_process("Z", s_list) - - if cmd[0] == "C": - circ_idx = qubit_dict[cmd[1]] - cid = cmd[2] - for op in CLIFFORD_TO_QISKIT[cid]: - exec(f"circ.{op}({circ_idx})") - - if save_statevector: - circ.save_statevector() - output_qubit = [] - for node in self.pattern.output_nodes: - circ_idx = qubit_dict[node] - circ.measure(circ_idx, reg_idx) - register_dict[node] = reg_idx - reg_idx += 1 - output_qubit.append(circ_idx) - - self.circ = circ - self.circ_output = output_qubit - - else: - for node in self.pattern.output_nodes: - circ_idx = qubit_dict[node] - circ.measure(circ_idx, reg_idx) - register_dict[node] = reg_idx - reg_idx += 1 - - self.register_dict = register_dict - self.circ = circ - - def set_input(self, psi: list[list[complex]]): - """set the input state of the circuit. - The input states are set to the circuit qubits corresponding to the first n nodes prepared in the pattern. - - Parameters - ---------- - psi : list[list[complex]] - list of the input states for each input. - Each input state is a list of complex of length 2, representing the coefficient of |0> and |1>. - """ - n = len(self.pattern.input_nodes) - if n != len(psi): - raise ValueError("Invalid input state.") - input_order = {i: self.pattern.input_nodes[i] for i in range(n)} - - idx = 0 - for k, ope in enumerate(self.circ.data): - if ope[0].name == "reset": - if idx in input_order.keys(): - qubit_idx = ope[1][0]._index - i = input_order[idx] - self.circ.initialize(psi[i], qubit_idx) - self.circ.data[k + 1] = self.circ.data.pop(-1) - idx += 1 - if idx >= max(input_order.keys()) + 1: - break - - def transpile(self, system: IBMBackend = None, optimization_level: int = 1): - """transpile the circuit for the designated resource. - - Parameters - ---------- - system: qiskit_ibm_runtime.ibm_backend.IBMBackend object, optional - system to be used for transpilation. - optimization_level : int, optional - the optimization level of the transpilation. - """ - if system is None: - if not hasattr(self, "system"): - raise ValueError("No system is set.") - system = self.system - self.circ = transpile(self.circ, backend=system, optimization_level=optimization_level) - - def simulate(self, shots: int = 1024, noise_model: NoiseModel = None, format_result: bool = True): - """simulate the circuit with Aer. - - Parameters - ---------- - shots : int, optional - the number of shots. - noise_model : :class:`qiskit_aer.backends.aer_simulator.AerSimulator` object, optional - noise model to be used in the simulation. - format_result : bool, optional - whether to format the result so that only the result corresponding to the output qubit is taken out. - - Returns - ---------- - result : dict - the measurement result. - """ - if noise_model is not None: - if type(noise_model) is NoiseModel: - simulator = AerSimulator(noise_model=noise_model) - else: - try: - simulator = AerSimulator.from_backend(noise_model) - except: - raise ValueError("Invalid noise model.") - else: - simulator = AerSimulator() - circ_sim = transpile(self.circ, simulator) - job = simulator.run(circ_sim, shots=shots) - result = job.result() - if format_result: - result = self.format_result(result) - - return result - - def run(self, shots: int = 1024, format_result: bool = True, optimization_level: int = 1): - """Run the MBQC pattern on IBMQ devices - - Parameters - ---------- - shots : int, optional - the number of shots. - format_result : bool, optional - whether to format the result so that only the result corresponding to the output qubit is taken out. - optimization_level : int, optional - the optimization level of the transpilation. - - Returns - ------- - result : dict - the measurement result. - """ - self.transpile(optimization_level=optimization_level) - if not hasattr(self, "system"): - raise ValueError("No system is set.") - sampler = SamplerV2(backend=self.system) - job = sampler.run([self.circ], shots=shots) # Pass the circuit and shot count to the run method - print(f"Your job's id: {job.job_id()}") - result = job.result() - result = next( - ( - getattr(result[0].data, attr_name) - for attr_name in dir(result[0].data) - if attr_name.startswith("c") and attr_name[1:].isdigit() - ), - None, - ) - if format_result: - result = self.format_result(result) - - return result - - def format_result(self, result: dict[str, int]): - """Format the result so that only the result corresponding to the output qubit is taken out. - - Returns - ------- - masked_results : dict - Dictionary of formatted results. - """ - masked_results = {} - N_node = self.pattern.Nnode - - # Iterate over original measurement results - for key, value in result.get_counts().items(): - masked_key = "" - for idx in self.pattern.output_nodes: - reg_idx = self.register_dict[idx] - masked_key += key[N_node - reg_idx - 1] - if masked_key in masked_results: - masked_results[masked_key] += value - else: - masked_results[masked_key] = value - - return masked_results - - def retrieve_result(self, job_id: str, format_result: bool = True): - """Retrieve the measurement results. - - Parameters - ---------- - job_id : str - the id of the job. - format_result : bool, optional - whether to format the result so that only the result corresponding to the output qubit is taken out. - - Returns - ------- - result : dict - the measurement result. - """ - if not hasattr(self, "service"): - raise ValueError("No service is set.") - job = self.service.job(job_id) - result = job.result() - result = next( - ( - getattr(result[0].data, attr_name) - for attr_name in dir(result[0].data) - if attr_name.startswith("c") and attr_name[1:].isdigit() - ), - None, - ) - if format_result: - result = self.format_result(result) - - return result diff --git a/requirements.txt b/requirements.txt index 485ac3f..ad46ef2 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,4 +1,5 @@ -numpy>=1.22,<1.26 -qiskit>=1.0 +numpy +qiskit>=1.0,<2 qiskit_ibm_runtime qiskit-aer +graphix diff --git a/setup.py b/setup.py index a19f1b4..8344a5a 100644 --- a/setup.py +++ b/setup.py @@ -27,15 +27,15 @@ "Development Status :: 4 - Beta", "Environment :: Console", "Intended Audience :: Science/Research", - "Programming Language :: Python :: 3.8", "Programming Language :: Python :: 3.9", "Programming Language :: Python :: 3.10", "Programming Language :: Python :: 3.11", + "Programming Language :: Python :: 3.12", "License :: OSI Approved :: Apache Software License", "Operating System :: OS Independent", "Topic :: Scientific/Engineering :: Physics", ], - "python_requires": ">=3.8,<3.12", + "python_requires": ">=3.8,<3.13", "install_requires": requirements, "extras_require": {"test": ["graphix"]}, } diff --git a/tests/random_circuit.py b/tests/random_circuit.py deleted file mode 100644 index e3c27c2..0000000 --- a/tests/random_circuit.py +++ /dev/null @@ -1,42 +0,0 @@ -import numpy as np -from graphix.transpiler import Circuit - - -def first_rotation(circuit, nqubits): - for qubit in range(nqubits): - circuit.rx(qubit, np.random.rand()) - - -def mid_rotation(circuit, nqubits): - for qubit in range(nqubits): - circuit.rx(qubit, np.random.rand()) - circuit.rz(qubit, np.random.rand()) - - -def last_rotation(circuit, nqubits): - for qubit in range(nqubits): - circuit.rz(qubit, np.random.rand()) - - -def entangler(circuit, pairs): - for a, b in pairs: - circuit.cnot(a, b) - - -def entangler_rzz(circuit, pairs): - for a, b in pairs: - circuit.rzz(a, b, np.random.rand()) - - -def generate_gate(nqubits, depth, pairs, use_rzz=False): - circuit = Circuit(nqubits) - first_rotation(circuit, nqubits) - entangler(circuit, pairs) - for k in range(depth - 1): - mid_rotation(circuit, nqubits) - if use_rzz: - entangler_rzz(circuit, pairs) - else: - entangler(circuit, pairs) - last_rotation(circuit, nqubits) - return circuit diff --git a/tests/test_backend.py b/tests/test_backend.py new file mode 100644 index 0000000..b13cef7 --- /dev/null +++ b/tests/test_backend.py @@ -0,0 +1,125 @@ +import pytest +from unittest.mock import MagicMock, patch + +from graphix_ibmq.backend import IBMQBackend +from graphix_ibmq.compile_options import IBMQCompileOptions +from graphix_ibmq.compiler import IBMQCompiledCircuit +from qiskit import QuantumCircuit +from qiskit_aer import AerSimulator + +# A dummy pattern object for tests +DUMMY_PATTERN = object() + + +def test_compile_is_static_and_works(monkeypatch): + """ + Verify that `compile` is a static method and works without an instance. + """ + # Arrange + mock_compiler_instance = MagicMock() + mock_compiled_circuit = IBMQCompiledCircuit( + circuit=QuantumCircuit(1), + pattern=DUMMY_PATTERN, + register_dict={}, + circ_output=[], + ) + mock_compiler_instance.compile.return_value = mock_compiled_circuit + + # Mock the compiler class to return our mock instance + mock_compiler_class = MagicMock(return_value=mock_compiler_instance) + monkeypatch.setattr("graphix_ibmq.backend.IBMQPatternCompiler", mock_compiler_class) + + # Act + compiled_circuit = IBMQBackend.compile(DUMMY_PATTERN) + + # Assert + assert compiled_circuit is mock_compiled_circuit + mock_compiler_class.assert_called_once_with(DUMMY_PATTERN) + mock_compiler_instance.compile.assert_called_once() + + +def test_from_simulator_creates_instance(): + """ + Verify that the `from_simulator` factory method creates a valid instance. + """ + # Act + backend = IBMQBackend.from_simulator() + + # Assert + assert isinstance(backend, IBMQBackend) + assert isinstance(backend._backend, AerSimulator) + assert backend._backend.options.noise_model is None + + +def test_from_hardware_creates_instance(monkeypatch): + """ + Verify that the `from_hardware` factory method creates a valid instance. + """ + # Arrange + mock_service_instance = MagicMock() + mock_backend_obj = MagicMock() + mock_backend_obj.name = "mock_hardware_backend" + mock_service_instance.backend.return_value = mock_backend_obj + monkeypatch.setattr("graphix_ibmq.backend.QiskitRuntimeService", lambda: mock_service_instance) + + # Act + backend = IBMQBackend.from_hardware(name="mock_hardware_backend") + + # Assert + assert isinstance(backend, IBMQBackend) + assert backend._backend is mock_backend_obj + mock_service_instance.backend.assert_called_once_with("mock_hardware_backend") + + +def test_direct_instantiation_raises_error(): + """ + Verify that calling IBMQBackend() directly raises a TypeError. + """ + # Act & Assert + with pytest.raises(TypeError, match="cannot be instantiated directly"): + IBMQBackend() + + +def test_submit_job_calls_sampler(monkeypatch): + """ + Verify that `submit_job` correctly uses the configured backend and calls the sampler. + """ + # Arrange + # 1. Create a valid backend instance using a factory + mock_qiskit_backend = MagicMock() + # Use patch to temporarily replace the classmethod's implementation + with patch.object( + IBMQBackend, + "from_hardware", + return_value=IBMQBackend(mock_qiskit_backend, IBMQCompileOptions()), + ): + backend = IBMQBackend.from_hardware() + + # 2. Mock the transpiler and sampler + mock_pass_manager = MagicMock() + mock_pass_manager.run.return_value = QuantumCircuit(1) # Return a dummy transpiled circuit + monkeypatch.setattr( + "graphix_ibmq.backend.generate_preset_pass_manager", + MagicMock(return_value=mock_pass_manager), + ) + + mock_sampler_instance = MagicMock() + mock_job = MagicMock() + mock_sampler_instance.run.return_value = mock_job + monkeypatch.setattr("graphix_ibmq.backend.Sampler", MagicMock(return_value=mock_sampler_instance)) + + # 3. Create a dummy compiled circuit to submit + dummy_compiled_circuit = IBMQCompiledCircuit( + circuit=QuantumCircuit(1), + pattern=DUMMY_PATTERN, + register_dict={}, + circ_output=[], + ) + + # Act + job_result = backend.submit_job(dummy_compiled_circuit) + + # Assert + mock_pass_manager.run.assert_called_once_with(dummy_compiled_circuit.circuit) + mock_sampler_instance.run.assert_called_once() + assert job_result.job is mock_job diff --git a/tests/test_compile_options.py b/tests/test_compile_options.py new file mode 100644 index 0000000..5e530b9 --- /dev/null +++ b/tests/test_compile_options.py @@ -0,0 +1,14 @@ +from graphix_ibmq.compile_options import IBMQCompileOptions + + +def test_default_options(): + opts = IBMQCompileOptions() + assert opts.optimization_level == 3 + assert opts.save_statevector is False + assert opts.layout_method == "trivial" + + +def test_repr(): + opts = IBMQCompileOptions(optimization_level=2, save_statevector=True, layout_method="dense") + expected = "IBMQCompileOptions(optimization_level=2, save_statevector=True, layout_method='dense')" + assert repr(opts) == expected diff --git a/tests/test_compiler.py b/tests/test_compiler.py new file mode 100644 index 0000000..8ea6245 --- /dev/null +++ b/tests/test_compiler.py @@ -0,0 +1,47 @@ +import pytest +import numpy as np +from qiskit import transpile +from qiskit_aer import AerSimulator +from graphix_ibmq.compiler import IBMQPatternCompiler +import graphix.random_objects as rc + + +def reduce_statevector_to_outputs(statevector: np.ndarray, output_qubits: list) -> np.ndarray: + """Reduce a full statevector to the subspace corresponding to output_qubits.""" + n = round(np.log2(len(statevector))) + reduced = np.zeros(2 ** len(output_qubits), dtype=complex) + for i, amp in enumerate(statevector): + bin_str = format(i, f"0{n}b") + reduced_idx = "".join([bin_str[n - idx - 1] for idx in output_qubits]) + reduced[int(reduced_idx, 2)] += amp + return reduced + + +@pytest.mark.parametrize( + "nqubits, depth", + [ + (3, 3), + (4, 4), + (5, 5), + ], +) +def test_ibmq_compiler_statevector_equivalence(nqubits, depth): + """Test that IBMQPatternCompiler circuit reproduces the same statevector as MBQC simulation.""" + sim = AerSimulator() + + for _ in range(5): # repeat with different random circuits + circuit = rc.rand_circuit(nqubits, depth) + pattern = circuit.transpile().pattern + mbqc_state = pattern.simulate_pattern() + + compiler = IBMQPatternCompiler(pattern) + compiled = compiler.compile(save_statevector=True) + qc = compiled.circuit + + transpiled = transpile(qc, sim) + result = sim.run(transpiled).result() + qiskit_state = np.array(result.get_statevector(qc)) + qiskit_reduced = reduce_statevector_to_outputs(qiskit_state, compiled.circ_output) + + fidelity = np.abs(np.dot(qiskit_reduced.conjugate(), mbqc_state.flatten())) + assert np.isclose(fidelity, 1.0, atol=1e-6), f"Fidelity mismatch: {fidelity}" diff --git a/tests/test_converter.py b/tests/test_converter.py index 7077cf2..b8c1ae7 100644 --- a/tests/test_converter.py +++ b/tests/test_converter.py @@ -1,7 +1,6 @@ import unittest -import numpy as np -from qiskit import QuantumCircuit, transpile +from qiskit import transpile from qiskit.circuit.random.utils import random_circuit from qiskit.quantum_info import Statevector diff --git a/tests/test_ibmq_interface.py b/tests/test_ibmq_interface.py deleted file mode 100644 index 7a2f26b..0000000 --- a/tests/test_ibmq_interface.py +++ /dev/null @@ -1,58 +0,0 @@ -import unittest - -import numpy as np - -import random_circuit as rc -from graphix_ibmq.runner import IBMQBackend - - -def modify_statevector(statevector, output_qubit): - N = round(np.log2(len(statevector))) - new_statevector = np.zeros(2 ** len(output_qubit), dtype=complex) - for i in range(len(statevector)): - i_str = format(i, f"0{N}b") - new_idx = "" - for idx in output_qubit: - new_idx += i_str[N - idx - 1] - new_statevector[int(new_idx, 2)] += statevector[i] - return new_statevector - - -class TestIBMQInterface(unittest.TestCase): - def test_to_qiskit(self): - nqubits = 5 - depth = 5 - pairs = [(i, np.mod(i + 1, nqubits)) for i in range(nqubits)] - circuit = rc.generate_gate(nqubits, depth, pairs) - pattern = circuit.transpile().pattern - state = pattern.simulate_pattern() - - ibmq_backend = IBMQBackend(pattern) - ibmq_backend.to_qiskit(save_statevector=True) - sim_result = ibmq_backend.simulate(format_result=False) - state_qiskit = sim_result.get_statevector(ibmq_backend.circ) - state_qiskit_mod = modify_statevector(np.array(state_qiskit), ibmq_backend.circ_output) - - np.testing.assert_almost_equal(np.abs(np.dot(state_qiskit_mod.conjugate(), state.flatten())), 1) - - def test_to_qiskit_after_pauli_preprocess(self): - nqubits = 5 - depth = 5 - pairs = [(i, np.mod(i + 1, nqubits)) for i in range(nqubits)] - circuit = rc.generate_gate(nqubits, depth, pairs) - pattern = circuit.transpile().pattern - pattern.perform_pauli_measurements() - pattern.minimize_space() - state = pattern.simulate_pattern() - - ibmq_backend = IBMQBackend(pattern) - ibmq_backend.to_qiskit(save_statevector=True) - sim_result = ibmq_backend.simulate(format_result=False) - state_qiskit = sim_result.get_statevector(ibmq_backend.circ) - state_qiskit_mod = modify_statevector(np.array(state_qiskit), ibmq_backend.circ_output) - - np.testing.assert_almost_equal(np.abs(np.dot(state_qiskit_mod.conjugate(), state.flatten())), 1) - - -if __name__ == "__main__": - unittest.main() diff --git a/tests/test_job.py b/tests/test_job.py new file mode 100644 index 0000000..1e2c1c9 --- /dev/null +++ b/tests/test_job.py @@ -0,0 +1,110 @@ +import pytest + +import graphix_ibmq.job as job_module + +from graphix_ibmq.job import IBMQJob +from qiskit.providers.jobstatus import JobStatus + + +class FakeMeas: + def __init__(self, counts: dict[str, int]): + self._counts = counts + + def get_counts(self) -> dict[str, int]: + return dict(self._counts) + + +class FakeData: + def __init__(self, counts: dict[str, int]): + self.meas = FakeMeas(counts) + + +class FakePubResult: + def __init__(self, counts: dict[str, int]): + self.data = FakeData(counts) + + +class FakeJobDone: + def __init__(self, job_id: str, counts: dict[str, int]): + self._job_id = job_id + self._pub_results = [FakePubResult(counts)] + + def job_id(self) -> str: + return self._job_id + + def status(self): + return JobStatus.DONE + + def result(self): + return self._pub_results + + +class FakeJobRunning(FakeJobDone): + def status(self): + return JobStatus.RUNNING + + +class FakeCompiledCircuit: + def __init__(self): + self.pattern = object() + self.register_dict = {"out": [0, 1, 2]} + + +@pytest.fixture +def counts(): + return {"000": 10, "011": 5, "101": 7} + + +def test_id_property_returns_job_id(counts): + fake = FakeJobDone("ABC123", counts) + job = IBMQJob(job=fake, compiled_circuit=FakeCompiledCircuit()) + assert job.id == "ABC123" + + +def test_is_done_true_false(counts): + job_done = IBMQJob(job=FakeJobDone("ID", counts), compiled_circuit=FakeCompiledCircuit()) + job_run = IBMQJob(job=FakeJobRunning("ID", counts), compiled_circuit=FakeCompiledCircuit()) + assert job_done.is_done is True + assert job_run.is_done is False + + +def test_retrieve_result_returns_none_if_not_done(counts): + job = IBMQJob(job=FakeJobRunning("ID", counts), compiled_circuit=FakeCompiledCircuit()) + assert job.retrieve_result() is None + assert job.retrieve_result(raw_result=True) is None + + +def test_retrieve_result_raw_counts_path_hits_get_counts(counts, monkeypatch): + called = {"format_called": False} + + def fake_format_result(_counts, _pattern, _reg): + called["format_called"] = True + return {"should_not_be_returned": 1} + + monkeypatch.setattr(job_module, "format_result", fake_format_result) + + job = IBMQJob(job=FakeJobDone("ID", counts), compiled_circuit=FakeCompiledCircuit()) + out = job.retrieve_result(raw_result=True) + assert out == counts + assert called["format_called"] is False + + +def test_retrieve_result_formatted_path_calls_format_result(counts, monkeypatch): + received = {} + + def fake_format_result(_counts, _pattern, _reg): + received["counts"] = _counts + received["pattern"] = _pattern + received["register_dict"] = _reg + return {"formatted": sum(_counts.values())} + + monkeypatch.setattr(job_module, "format_result", fake_format_result) + + fake_compiled = FakeCompiledCircuit() + job = IBMQJob(job=FakeJobDone("ID", counts), compiled_circuit=fake_compiled) + out = job.retrieve_result(raw_result=False) + + assert out == {"formatted": sum(counts.values())} + assert received["counts"] == counts + assert received["pattern"] is fake_compiled.pattern + assert received["register_dict"] is fake_compiled.register_dict diff --git a/tests/test_job_integration.py b/tests/test_job_integration.py new file mode 100644 index 0000000..4e703e2 --- /dev/null +++ b/tests/test_job_integration.py @@ -0,0 +1,49 @@ +# tests/test_job_integration.py +import time +import pytest + +pytest.importorskip("qiskit_aer") +pytest.importorskip("qiskit_ibm_runtime") +pytest.importorskip("graphix") +pytest.importorskip("graphix_ibmq") + +from graphix.transpiler import Circuit +from graphix_ibmq.backend import IBMQBackend +from graphix_ibmq.job import IBMQJob + + +@pytest.mark.integration +def test_retrieve_result_real_qiskit_aer_chain(tmp_path): + # 1) 1qubit H + circ = Circuit(1) + circ.h(0) + + pattern = circ.transpile().pattern + pattern.minimize_space() + + backend = IBMQBackend.from_simulator() + compiled = backend.compile(pattern) + + shots = 256 + ibmq_job = backend.submit_job(compiled, shots=shots) + + job = IBMQJob(job=ibmq_job.job, compiled_circuit=compiled) + + deadline = time.time() + 30 + while not job.is_done and time.time() < deadline: + time.sleep(0.05) + assert job.is_done, "Job did not reach DONE within timeout" + + counts_raw = job.retrieve_result(raw_result=True) + assert isinstance(counts_raw, dict) + + out_bits = len(compiled.register_dict.get("out", [])) + if out_bits == 0 and len(counts_raw) > 0: + out_bits = len(next(iter(counts_raw.keys()))) + assert out_bits >= 1 + assert all(isinstance(k, str) and len(k) == out_bits for k in counts_raw.keys()) + assert sum(counts_raw.values()) == shots + + counts_formatted = job.retrieve_result(raw_result=False) + assert isinstance(counts_formatted, dict) + assert len(counts_formatted) >= 1 diff --git a/tests/test_result_utils.py b/tests/test_result_utils.py new file mode 100644 index 0000000..172c22f --- /dev/null +++ b/tests/test_result_utils.py @@ -0,0 +1,15 @@ +from graphix_ibmq.result_utils import format_result + + +class DummyPattern: + def __init__(self, n_node, output_nodes): + self.n_node = n_node + self.output_nodes = output_nodes + + +def test_format_result_basic(): + raw = {"01": 10, "10": 5} + patt = DummyPattern(n_node=2, output_nodes=[0, 1]) + register_dict = {0: 0, 1: 1} + formatted = format_result(raw, patt, register_dict) + assert formatted == {"10": 10, "01": 5} diff --git a/tox.ini b/tox.ini index 423fc80..adc0d41 100644 --- a/tox.ini +++ b/tox.ini @@ -1,12 +1,12 @@ [tox] -envlist = py38, py39, py310, py311, lint +envlist = py39, py310, py311, py312, lint [gh-actions] python = - 3.8: lint, py38 - 3.9: py39 + 3.9: lint, py39 3.10: py310 3.11: py311 + 3.12: py312 [testenv] description = Run the unit tests @@ -19,7 +19,7 @@ commands = extras = test [testenv:lint] -basepython = python3.8 +basepython = python3.9 deps = black==22.8.0 commands =