diff --git a/CHANGELOG.md b/CHANGELOG.md index 53cfcd3b2..727372ff0 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -16,6 +16,16 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 - #277: Methods for pretty-printing `Pattern`: `to_ascii`, `to_unicode`, `to_latex`. +- #312: The separation between `TensorNetworkBackend` and backends + that operate on a full-state representation, such as + `StatevecBackend` and `DensityMatrixBackend`, is now clearer with + the introduction of the abstract classes `DenseStateBackend` and + `DenseState`, which derive from `Backend` and `BackendState`, + respectively. `StatevecBackend` and `DensityMatrixBackend` inherit + from `DenseStateBackend`, while `Statevec` and `DensityMatrix` + inherit from `DenseState`. Note that the class hierarchy of + `BackendState` mirrors that of `Backend`. + - #322: Added a new `optimization` module containing: * a functional version of `standardize` that returns a standardized @@ -44,6 +54,9 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 semantics is preserved between circuits, ZX graphs, open graphs and patterns. +- #302, #308, #312: `Pattern`, `Circuit`, `PatternSimulator`, and + backends are now type-checked. + ### Changed - #277: The method `Pattern.print_pattern` is now deprecated. @@ -61,6 +74,17 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 Note: the method `perform_pauli_measurements` still places C commands before X and Z commands. +- #312: Backend's `State` has been renamed to `BackendState` to avoid + a name conflict with the `State` class defined in `graphix.states`, + which represents the state of a single qubit. + +- #312: `Backend[StateT_co]` and `DenseStateBackend[DenseStateT_co]` + are now parameterized by covariant type variables, allowing + subclasses to narrow the type of the state field to match their + specific state representation. Covariance is sound in this context + because the classes are frozen, and it ensures that + `Backend[BackendState]` is a supertype of all backend classes. + ## [0.3.1] - 2025-04-21 ### Added diff --git a/graphix/channels.py b/graphix/channels.py index b3cdb2fff..8bb0ebc1b 100644 --- a/graphix/channels.py +++ b/graphix/channels.py @@ -13,7 +13,7 @@ from graphix.ops import Ops if TYPE_CHECKING: - from collections.abc import Iterable + from collections.abc import Iterable, Iterator _T = TypeVar("_T", bound=np.generic) @@ -147,6 +147,10 @@ def __len__(self) -> int: """Return the number of Kraus operators.""" return len(self.__data) + def __iter__(self) -> Iterator[KrausData]: + """Iterate over Kraus operators.""" + return iter(self.__data) + @property def nqubit(self) -> int: """Return the number of qubits.""" diff --git a/graphix/graphsim.py b/graphix/graphsim.py index e7c1e2398..8976feeac 100644 --- a/graphix/graphsim.py +++ b/graphix/graphsim.py @@ -9,6 +9,7 @@ from graphix import utils from graphix.clifford import Clifford +from graphix.measurements import outcome from graphix.ops import Ops from graphix.sim.statevec import Statevec @@ -16,6 +17,8 @@ import functools from collections.abc import Iterable, Mapping + from graphix.measurements import Outcome + if TYPE_CHECKING: Graph = nx.Graph[int] @@ -373,7 +376,7 @@ def equivalent_fill_node(self, node: int) -> int: return 2 return 0 - def measure_x(self, node: int, choice: int = 0) -> int: + def measure_x(self, node: int, choice: Outcome = 0) -> Outcome: """Perform measurement in X basis. According to original paper, we realise X measurement by @@ -406,7 +409,7 @@ def measure_x(self, node: int, choice: int = 0) -> int: self.h(node) return self.measure_z(node, choice=choice) - def measure_y(self, node: int, choice: int = 0) -> int: + def measure_y(self, node: int, choice: Outcome = 0) -> Outcome: """Perform measurement in Y basis. According to original paper, we realise Y measurement by @@ -431,7 +434,7 @@ def measure_y(self, node: int, choice: int = 0) -> int: self.h(node) return self.measure_z(node, choice=choice) - def measure_z(self, node: int, choice: int = 0) -> int: + def measure_z(self, node: int, choice: Outcome = 0) -> Outcome: """Perform measurement in Z basis. To realize the simple Z measurement on undecorated graph state, @@ -455,7 +458,7 @@ def measure_z(self, node: int, choice: int = 0) -> int: if choice: for i in self.neighbors(node): self.flip_sign(i) - result = choice if not isolated else int(self.nodes[node]["sign"]) + result = choice if not isolated else outcome(self.nodes[node]["sign"]) self.remove_node(node) return result diff --git a/graphix/measurements.py b/graphix/measurements.py index 25c10d1a3..e9a7b9dff 100644 --- a/graphix/measurements.py +++ b/graphix/measurements.py @@ -4,7 +4,9 @@ import dataclasses import math -from typing import NamedTuple, SupportsInt +from typing import Literal, NamedTuple, SupportsInt + +from typing_extensions import TypeAlias # TypeAlias introduced in Python 3.10 from graphix import utils from graphix.fundamentals import Axis, Plane, Sign @@ -12,6 +14,13 @@ # Ruff suggests to move this import to a type-checking block, but dataclass requires it here from graphix.parameter import ExpressionOrFloat # noqa: TC001 +Outcome: TypeAlias = Literal[0, 1] + + +def outcome(b: bool) -> Outcome: + """Return 1 if True, 0 if False.""" + return 1 if b else 0 + @dataclasses.dataclass class Domains: diff --git a/graphix/noise_models/noise_model.py b/graphix/noise_models/noise_model.py index c6569b093..da66d4707 100644 --- a/graphix/noise_models/noise_model.py +++ b/graphix/noise_models/noise_model.py @@ -13,6 +13,7 @@ if TYPE_CHECKING: from graphix.channels import KrausChannel + from graphix.measurements import Outcome from graphix.simulator import PatternSimulator @@ -66,17 +67,17 @@ def measure(self) -> KrausChannel: ... @abc.abstractmethod - def confuse_result(self, result: bool) -> bool: + def confuse_result(self, result: Outcome) -> Outcome: """Return a possibly flipped measurement outcome. Parameters ---------- - result : bool + result : Outcome Ideal measurement result. Returns ------- - bool + Outcome Possibly corrupted result. """ diff --git a/graphix/noise_models/noiseless_noise_model.py b/graphix/noise_models/noiseless_noise_model.py index e0b9047ca..df4555347 100644 --- a/graphix/noise_models/noiseless_noise_model.py +++ b/graphix/noise_models/noiseless_noise_model.py @@ -7,12 +7,17 @@ from __future__ import annotations +from typing import TYPE_CHECKING + import numpy as np import typing_extensions from graphix.channels import KrausChannel, KrausData from graphix.noise_models.noise_model import NoiseModel +if TYPE_CHECKING: + from graphix.measurements import Outcome + class NoiselessNoiseModel(NoiseModel): """Noise model that performs no operation.""" @@ -51,7 +56,7 @@ def measure(self) -> KrausChannel: return KrausChannel([KrausData(1.0, np.eye(2))]) @typing_extensions.override - def confuse_result(self, result: bool) -> bool: + def confuse_result(self, result: Outcome) -> Outcome: """Return the unmodified measurement result. Parameters diff --git a/graphix/parameter.py b/graphix/parameter.py index 7731a855b..947381d55 100644 --- a/graphix/parameter.py +++ b/graphix/parameter.py @@ -334,6 +334,26 @@ def __hash__(self) -> int: T = TypeVar("T") +def check_expression_or_complex(value: object) -> ExpressionOrComplex: + """Check that the given object is of type ExpressionOrComplex and return it.""" + if isinstance(value, Expression): + return value + if isinstance(value, SupportsComplex): + return complex(value) + msg = f"ExpressionOrComplex expected, but {type(value)} found." + raise TypeError(msg) + + +def check_expression_or_float(value: object) -> ExpressionOrFloat: + """Check that the given object is of type ExpressionOrFloat and return it.""" + if isinstance(value, Expression): + return value + if isinstance(value, SupportsFloat): + return float(value) + msg = f"ExpressionOrFloat expected, but {type(value)} found." + raise TypeError(msg) + + @overload def subs(value: ExpressionOrFloat, variable: Parameter, substitute: ExpressionOrSupportsFloat) -> ExpressionOrFloat: ... diff --git a/graphix/pattern.py b/graphix/pattern.py index fe285305f..370c50f05 100644 --- a/graphix/pattern.py +++ b/graphix/pattern.py @@ -12,7 +12,7 @@ from copy import deepcopy from dataclasses import dataclass from pathlib import Path -from typing import TYPE_CHECKING, SupportsFloat +from typing import TYPE_CHECKING, SupportsFloat, TypeVar import networkx as nx from typing_extensions import assert_never, override @@ -23,7 +23,7 @@ from graphix.fundamentals import Axis, Plane, Sign from graphix.gflow import find_flow, find_gflow, get_layers from graphix.graphsim import GraphState -from graphix.measurements import PauliMeasurement +from graphix.measurements import Outcome, PauliMeasurement from graphix.pretty_print import OutputFormat, pattern_to_str from graphix.simulator import PatternSimulator from graphix.states import BasicStates @@ -35,9 +35,10 @@ from typing import Any, Literal from graphix.parameter import ExpressionOrFloat, ExpressionOrSupportsFloat, Parameter - from graphix.sim.base_backend import Backend - from graphix.sim.base_backend import State as BackendState - from graphix.sim.density_matrix import Data + from graphix.sim import Backend, BackendState, Data + + +_StateT_co = TypeVar("_StateT_co", bound="BackendState", covariant=True) @dataclass(frozen=True) @@ -81,7 +82,7 @@ class Pattern: total number of nodes in the resource state """ - results: dict[int, int] + results: dict[int, Outcome] __seq: list[Command] def __init__( @@ -250,7 +251,7 @@ def compose( mapped_inputs = [mapping_complete[n] for n in other.input_nodes] mapped_outputs = [mapping_complete[n] for n in other.output_nodes] - mapped_results = {mapping_complete[n]: m for n, m in other.results.items()} + mapped_results: dict[int, Outcome] = {mapping_complete[n]: m for n, m in other.results.items()} merged = mapping_values_set.intersection(self.__output_nodes) @@ -289,7 +290,7 @@ def update_command(cmd: Command) -> Command: seq = self.__seq + [update_command(c) for c in other] - results = {**self.results, **mapped_results} + results: dict[int, Outcome] = {**self.results, **mapped_results} p = Pattern(input_nodes=inputs, output_nodes=outputs, cmds=seq) p.results = results @@ -1354,7 +1355,7 @@ def space_list(self) -> list[int]: return n_list def simulate_pattern( - self, backend: Backend | str = "statevector", input_state: Data = BasicStates.PLUS, **kwargs: Any + self, backend: Backend[_StateT_co] | str = "statevector", input_state: Data = BasicStates.PLUS, **kwargs: Any ) -> BackendState: """Simulate the execution of the pattern by using :class:`graphix.simulator.PatternSimulator`. @@ -1644,7 +1645,7 @@ def measure_pauli(pattern: Pattern, leave_input: bool, copy: bool = False) -> Pa nodes, edges = pattern.get_graph() vop_init = pattern.get_vops(conj=False) graph_state = GraphState(nodes=nodes, edges=edges, vops=vop_init) - results: dict[int, int] = {} + results: dict[int, Outcome] = {} to_measure, non_pauli_meas = pauli_nodes(pattern, leave_input) if not leave_input and len(list(set(pattern.input_nodes) & {i[0].node for i in to_measure})) > 0: new_inputs = [] @@ -1684,7 +1685,7 @@ def measure_pauli(pattern: Pattern, leave_input: bool, copy: bool = False) -> Pa if basis.sign == Sign.PLUS: results[pattern_cmd.node] = measure(pattern_cmd.node, choice=0) else: - results[pattern_cmd.node] = 1 - measure(pattern_cmd.node, choice=1) + results[pattern_cmd.node] = 0 if measure(pattern_cmd.node, choice=1) else 1 # measure (remove) isolated nodes. if they aren't Pauli measurements, # measuring one of the results with probability of 1 should not occur as was possible above for Pauli measurements, diff --git a/graphix/sim/__init__.py b/graphix/sim/__init__.py index f58ae73cf..e54f62cab 100644 --- a/graphix/sim/__init__.py +++ b/graphix/sim/__init__.py @@ -1 +1,10 @@ """Simulation backends.""" + +from __future__ import annotations + +from graphix.sim.base_backend import Backend, BackendState +from graphix.sim.data import Data +from graphix.sim.density_matrix import DensityMatrix +from graphix.sim.statevec import Statevec + +__all__ = ["Backend", "BackendState", "Data", "DensityMatrix", "Statevec"] diff --git a/graphix/sim/base_backend.py b/graphix/sim/base_backend.py index 56a54954c..85de43aa4 100644 --- a/graphix/sim/base_backend.py +++ b/graphix/sim/base_backend.py @@ -2,30 +2,224 @@ from __future__ import annotations +import dataclasses +import sys from abc import ABC, abstractmethod -from typing import TYPE_CHECKING, Literal +from dataclasses import dataclass +from typing import TYPE_CHECKING, Generic, SupportsFloat, TypeVar import numpy as np +import numpy.typing as npt + +# TypeAlias introduced in Python 3.10 +# override introduced in Python 3.12 +from typing_extensions import TypeAlias, override from graphix.clifford import Clifford from graphix.command import CommandKind +from graphix.measurements import outcome from graphix.ops import Ops +from graphix.parameter import check_expression_or_complex from graphix.rng import ensure_rng from graphix.states import BasicStates if TYPE_CHECKING: - from collections.abc import Iterable, Iterator + from collections.abc import Collection, Iterable, Iterator, Sequence - import numpy.typing as npt from numpy.random import Generator from graphix import command + from graphix.channels import KrausChannel from graphix.fundamentals import Plane - from graphix.measurements import Measurement - from graphix.parameter import ExpressionOrFloat + from graphix.measurements import Measurement, Outcome + from graphix.parameter import ExpressionOrComplex, ExpressionOrFloat + from graphix.sim.data import Data from graphix.simulator import MeasureMethod +if sys.version_info >= (3, 10): + Matrix: TypeAlias = npt.NDArray[np.object_ | np.complex128] +else: + from typing import Union + + Matrix: TypeAlias = npt.NDArray[Union[np.object_, np.complex128]] + + +def tensordot(op: Matrix, psi: Matrix, axes: tuple[int | Sequence[int], int | Sequence[int]]) -> Matrix: + """Tensor dot product that preserves the type of `psi`. + + This wrapper around `np.tensordot` ensures static type checking + for both numeric (`complex128`) and symbolic (`object`) arrays. + Even though the runtime behavior is the same, NumPy's static types don't + support `Matrix` directly. + + If `psi` and `op` are numeric, the result is numeric. + If `psi` or `op` are symbolic, the other is converted to symbolic if needed and + the result is symbolic. + + Parameters + ---------- + op : Matrix + Operator tensor, either symbolic or numeric. + psi : Matrix + State tensor, either symbolic or numeric. + axes : tuple[int | Sequence[int], int | Sequence[int]] + Axes along which to contract `op` and `psi`. + + Returns + ------- + Matrix + The result of the tensor contraction with the same type as `psi`. + """ + if psi.dtype == np.complex128 and op.dtype == np.complex128: + psi_c = psi.astype(np.complex128, copy=False) + op_c = op.astype(np.complex128, copy=False) + return np.tensordot(op_c, psi_c, axes).astype(np.complex128) + psi_o = psi.astype(np.object_, copy=False) + op_o = op.astype(np.object_, copy=False) + return np.tensordot(op_o, psi_o, axes) + + +def kron(a: Matrix, b: Matrix) -> Matrix: + """Kronecker product with type-safe handling of symbolic and numeric matrices. + + The two matrices should have the same type. + + Parameters + ---------- + a : Matrix + Left operand (symbolic or numeric). + b : Matrix + Right operand (symbolic or numeric). + + Returns + ------- + Matrix + Kronecker product of `a` and `b`. + + Raises + ------ + TypeError + If `a` and `b` don't have the same type. + """ + if a.dtype == np.complex128 and b.dtype == np.complex128: + a_c = a.astype(np.complex128, copy=False) + b_c = b.astype(np.complex128, copy=False) + return np.kron(a_c, b_c).astype(np.complex128) + + if a.dtype == np.object_ and b.dtype == np.object_: + a_o = a.astype(np.object_, copy=False) + b_o = b.astype(np.object_, copy=False) + return np.kron(a_o, b_o) + + raise TypeError("Operands should have the same type.") + + +def outer(a: Matrix, b: Matrix) -> Matrix: + """Outer product with type-safe handling of symbolic and numeric vectors. + + The two matrices should have the same type. + + Parameters + ---------- + a : Matrix + Left operand (symbolic or numeric). + b : Matrix + Right operand (symbolic or numeric). + + Returns + ------- + Matrix + Outer product of `a` and `b`. + + Raises + ------ + TypeError + If `a` and `b` don't have the same type. + """ + if a.dtype == np.complex128 and b.dtype == np.complex128: + a_c = a.astype(np.complex128, copy=False) + b_c = b.astype(np.complex128, copy=False) + return np.outer(a_c, b_c).astype(np.complex128) + + if a.dtype == np.object_ and b.dtype == np.object_: + a_o = a.astype(np.object_, copy=False) + b_o = b.astype(np.object_, copy=False) + return np.outer(a_o, b_o) + + raise TypeError("Operands should have the same type.") + + +def vdot(a: Matrix, b: Matrix) -> ExpressionOrComplex: + """Conjugate dot product ⟨a|b⟩ with type-safe handling of symbolic and numeric vectors. + + The two matrices should have the same type. + + Parameters + ---------- + a : Matrix + Left operand (symbolic or numeric). + b : Matrix + Right operand (symbolic or numeric). + + Returns + ------- + ExpressionOrFloat + Dot product. + + Raises + ------ + TypeError + If `a` and `b` don't have the same type. + """ + if a.dtype == np.complex128 and b.dtype == np.complex128: + a_c = a.astype(np.complex128, copy=False) + b_c = b.astype(np.complex128, copy=False) + return complex(np.vdot(a_c, b_c)) + + if a.dtype == np.object_ and b.dtype == np.object_: + a_o = a.astype(np.object_, copy=False) + b_o = b.astype(np.object_, copy=False) + return check_expression_or_complex(np.vdot(a_o, b_o)) + + raise TypeError("Operands should have the same type.") + + +def matmul(a: Matrix, b: Matrix) -> Matrix: + """Matrix product a @ b with type-safe handling of symbolic and numeric vectors. + + The two matrices should have the same type. + + Parameters + ---------- + a : Matrix + Left operand (symbolic or numeric). + b : Matrix + Right operand (symbolic or numeric). + + Returns + ------- + Matrix + Matrix product. + + Raises + ------ + TypeError + If `a` and `b` don't have the same type. + """ + if a.dtype == np.complex128 and b.dtype == np.complex128: + a_c = a.astype(np.complex128, copy=False) + b_c = b.astype(np.complex128, copy=False) + return a_c @ b_c + + if a.dtype == np.object_ and b.dtype == np.object_: + a_o = a.astype(np.object_, copy=False) + b_o = b.astype(np.object_, copy=False) + return a_o @ b_o # type: ignore[no-any-return] + + raise TypeError("Operands should have the same type.") + + class NodeIndex: """A class for managing the mapping between node numbers and qubit indices in the internal state of the backend. @@ -38,6 +232,9 @@ class NodeIndex: in the backend's internal quantum state. """ + __dict: dict[int, int] + __list: list[int] + def __init__(self) -> None: """Initialize an empty mapping between nodes and qubit indices.""" self.__dict = {} @@ -127,15 +324,166 @@ def swap(self, i: int, j: int) -> None: self.__dict[node_j] = i -class State(ABC): - """Base class for backend state.""" +class NoiseNotSupportedError(Exception): + """Exception raised when `apply_channel` is called on a backend that does not support noise.""" + + def __str__(self) -> str: + """Return the error message.""" + return "This backend does not support noise." + + +class BackendState(ABC): + """ + Abstract base class for representing the quantum state of a backend. + + `BackendState` defines the interface for quantum state representations used by + various backend implementations. It provides a common foundation for different + simulation strategies, such as dense linear algebra or tensor network contraction. + + Concrete subclasses must implement the storage and manipulation logic appropriate + for a specific backend and representation strategy. + + Notes + ----- + This class is abstract and cannot be instantiated directly. + + Examples of concrete subclasses include: + - :class:`Statevec` (for pure states represented as state vectors) + - :class:`DensityMatrix` (for mixed states represented as density matrices) + - :class:`MBQCTensorNet` (for compressed representations using tensor networks) + + See Also + -------- + :class:`DenseState`, :class:`MBQCTensorNet`, :class:`Statevec`, :class:`DensityMatrix` + """ + + def apply_channel(self, channel: KrausChannel, qargs: Sequence[int]) -> None: # noqa: ARG002,PLR6301 + """Apply channel to the state.""" + raise NoiseNotSupportedError @abstractmethod - def flatten(self) -> npt.NDArray[np.complex128]: + def flatten(self) -> Matrix: """Return flattened state.""" -def _op_mat_from_result(vec: tuple[float, float, float], result: bool, symbolic: bool = False) -> npt.NDArray: +class DenseState(BackendState): + """ + Abstract base class for quantum states with full dense representations. + + `DenseState` defines the shared interface and behavior for state representations + that explicitly store the entire quantum state in memory as a dense array. + This includes both state vectors (for pure states) and density matrices (for + mixed states). + + This class serves as a common parent for :class:`Statevec` and :class:`DensityMatrix`, which + implement the concrete representations of dense quantum states. It is used in + simulation backends that operate using standard linear algebra on the full + state, such as :class:`StatevecBackend` and :class:`DensityMatrixBackend`. + + Notes + ----- + This class is abstract and cannot be instantiated directly. + + Not all :class:`BackendState` subclasses are dense. For example, :class:`MBQCTensorNet` is a + `BackendState` that represents the quantum state using a tensor network, rather than + a single dense array. + + See Also + -------- + :class:`Statevec`, :class:`DensityMatrix` + """ + + # Note that `@property` must appear before `@abstractmethod` for pyright + @property + @abstractmethod + def nqubit(self) -> int: + """Return the number of qubits.""" + + @abstractmethod + def add_nodes(self, nqubit: int, data: Data) -> None: + """ + Add nodes (qubits) to the state and initialize them in a specified state. + + Parameters + ---------- + nqubit : int + The number of qubits to add to the state. + + data : Data, optional + The state in which to initialize the newly added nodes. The supported forms + of state specification depend on the backend implementation. + + See :meth:`Backend.add_nodes` for further details. + """ + + @abstractmethod + def entangle(self, edge: tuple[int, int]) -> None: + """Connect graph nodes. + + Parameters + ---------- + edge : tuple of int + (control, target) qubit indices + """ + + @abstractmethod + def evolve(self, op: Matrix, qargs: Sequence[int]) -> None: + """Apply a multi-qubit operation. + + Parameters + ---------- + op : numpy.ndarray + 2^n*2^n matrix + qargs : list of int + target qubits' indices + """ + + @abstractmethod + def evolve_single(self, op: Matrix, i: int) -> None: + """Apply a single-qubit operation. + + Parameters + ---------- + op : numpy.ndarray + 2*2 matrix + i : int + qubit index + """ + + @abstractmethod + def expectation_single(self, op: Matrix, loc: int) -> complex: + """Return the expectation value of single-qubit operator. + + Parameters + ---------- + op : numpy.ndarray + 2*2 operator + loc : int + target qubit index + + Returns + ------- + complex : expectation value. + """ + + @abstractmethod + def remove_qubit(self, qarg: int) -> None: + """Remove a separable qubit from the system.""" + + @abstractmethod + def swap(self, qubits: tuple[int, int]) -> None: + """Swap qubits. + + Parameters + ---------- + qubits : tuple of int + (control, target) qubit indices + """ + + +def _op_mat_from_result( + vec: tuple[ExpressionOrFloat, ExpressionOrFloat, ExpressionOrFloat], result: Outcome, symbolic: bool = False +) -> Matrix: r"""Return the operator :math:`\tfrac{1}{2}(I + (-1)^r \vec{v}\cdot\vec{\sigma})`. Parameters @@ -152,25 +500,43 @@ def _op_mat_from_result(vec: tuple[float, float, float], result: bool, symbolic: numpy.ndarray 2x2 operator acting on the measured qubit. """ - dtype = "O" if symbolic else np.complex128 - op_mat = np.eye(2, dtype=dtype) / 2 sign = (-1) ** result - for i in range(3): - op_mat += sign * vec[i] * Clifford(i + 1).matrix / 2 - return op_mat + if symbolic: + op_mat_symbolic: npt.NDArray[np.object_] = np.eye(2, dtype=np.object_) / 2 + for i, t in enumerate(vec): + op_mat_symbolic += sign * t * Clifford(i + 1).matrix / 2 + return op_mat_symbolic + op_mat_complex: npt.NDArray[np.complex128] = np.eye(2, dtype=np.complex128) / 2 + x, y, z = vec + # mypy requires each of x, y, and z to be tested explicitly for it to infer + # that they are instances of `SupportsFloat`. + # In particular, using a loop or comprehension like + # `not all(isinstance(v, SupportsFloat) for v in (x, y, z))` is not supported. + if not isinstance(x, SupportsFloat) or not isinstance(y, SupportsFloat) or not isinstance(z, SupportsFloat): + raise TypeError("Vector of float expected with symbolic = False") + float_vec = [x, y, z] + for i, t in enumerate(float_vec): + op_mat_complex += sign * t * Clifford(i + 1).matrix / 2 + return op_mat_complex def perform_measure( - qubit: int, plane: Plane, angle: ExpressionOrFloat, state, rng, pr_calc: bool = True, symbolic: bool = False -) -> Literal[0, 1]: + qubit: int, + plane: Plane, + angle: ExpressionOrFloat, + state: DenseState, + rng: Generator, + pr_calc: bool = True, + symbolic: bool = False, +) -> Outcome: """Perform measurement of a qubit.""" vec = plane.polar(angle) if pr_calc: - op_mat = _op_mat_from_result(vec, False, symbolic=symbolic) + op_mat = _op_mat_from_result(vec, 0, symbolic=symbolic) prob_0 = state.expectation_single(op_mat, qubit) - result = rng.random() > prob_0 + result = outcome(rng.random() > abs(prob_0)) if result: - op_mat = _op_mat_from_result(vec, True, symbolic=symbolic) + op_mat = _op_mat_from_result(vec, 1, symbolic=symbolic) else: # choose the measurement result randomly result = rng.choice([0, 1]) @@ -179,80 +545,238 @@ def perform_measure( return result -class Backend: - """Base class for backends.""" +_StateT_co = TypeVar("_StateT_co", bound="BackendState", covariant=True) + + +@dataclass(frozen=True) +class Backend(Generic[_StateT_co]): + """ + Abstract base class for all quantum backends. + + A backend is responsible for managing a quantum system, including the set of active + qubits (nodes), their initialization, evolution, and measurement. It defines the + interface through which high-level quantum programs interact with the underlying + simulation or hardware model. + + Concrete subclasses implement specific state representations and simulation strategies, + such as dense state vectors, density matrices, or tensor networks. - def __init__( - self, - state: State, - node_index: NodeIndex | None = None, - pr_calc: bool = True, - rng: Generator | None = None, - symbolic: bool = False, - ): - """Construct a backend. + Responsibilities of a backend typically include: + - Managing a dynamic set of qubits (nodes) and their state + - Applying quantum gates or operations + - Performing measurements and returning classical outcomes + - Tracking and exposing the underlying quantum state + + Examples of concrete subclasses include: + - `StatevecBackend` (pure states via state vectors) + - `DensityMatrixBackend` (mixed states via density matrices) + - `TensorNetworkBackend` (compressed states via tensor networks) + + Parameters + ---------- + state : BackendState + internal state of the backend: instance of :class:`Statevec`, :class:`DensityMatrix`, or :class:`MBQCTensorNet`. + + Notes + ----- + This class is abstract and should not be instantiated directly. + + The class hierarchy of states mirrors the class hierarchy of backends: + - `DenseStateBackend` and `TensorNetworkBackend` are subclasses of `Backend`, + and `DenseState` and `MBQCTensorNet` are subclasses of `BackendState`. + - `StatevecBackend` and `DensityMatrixBackend` are subclasses of `DenseStateBackend`, + and `Statevec` and `DensityMatrix` are subclasses of `DenseState`. + + The type variable `_StateT_co` specifies the type of the ``state`` field, so that subclasses + provide a precise type for this field: + - `StatevecBackend` is a subtype of ``Backend[Statevec]``. + - `DensityMatrixBackend` is a subtype of ``Backend[DensityMatrix]``. + - `TensorNetworkBackend` is a subtype of ``Backend[MBQCTensorNet]``. + + The type variables `_StateT_co` and `_DenseStateT_co` are declared as covariant. + That is, ``Backend[T1]`` is a subtype of ``Backend[T2]`` if ``T1`` is a subtype of ``T2``. + This means that `StatevecBackend`, `DensityMatrixBackend`, and `TensorNetworkBackend` are + all subtypes of ``Backend[BackendState]``. + This covariance is sound because backends are frozen dataclasses; thus, the type of + ``state`` cannot be changed after instantiation. + + The interface expected from a backend includes the following methods: + - `add_nodes`: which executes `N` commands. + - `apply_channel`: used for noisy simulations. + The class `Backend` provides a default implementation that + raises `NoiseNotSupportedError`, indicating that the backend + does not support noise. Backends that support noise (e.g., + `DensityMatrixBackend`) override this method to implement the + effect of noise. + - `apply_clifford`: executes `C` commands. + - `correct_byproduct`: executes `X` and `Z` commands. + - `entangle_nodes`: executes `E` commands. + - `finalize`: called at the end of pattern simulation to convey + the order of output nodes. + - `measure`: executes `M` commands. + + See Also + -------- + :class:`BackendState`, :`class:`DenseStateBackend`, :class:`StatevecBackend`, :class:`DensityMatrixBackend`, :class:`TensorNetworkBackend` + """ + + # `init=False` is required because `state` cannot appear in a contravariant position + # (specifically, as a parameter of `__init__`) since `_StateT_co` is covariant. + state: _StateT_co = dataclasses.field(init=False) + + @abstractmethod + def add_nodes(self, nodes: Sequence[int], data: Data = BasicStates.PLUS) -> None: + r""" + Add new nodes (qubits) to the backend and initialize them in a specified state. Parameters ---------- - pr_calc : bool - whether or not to compute the probability distribution before choosing the measurement result. - if False, measurements yield results 0/1 with 50% probabilities each. - Optional, default is `True`. - node_index : NodeIndex - mapping between node numbers and qubit indices in the internal state of the backend. - state : State - internal state of the backend: instance of Statevec, DensityMatrix, or MBQCTensorNet. - symbolic : bool - If `False`, matrice data-type is `np.complex128`, for efficiency. - If `True`, matrice data-type is `O` (arbitrary Python object), to allow symbolic computation, at the price of performance cost. - Optional, default is `False`. + nodes : Sequence[int] + A list of node indices to add to the backend. These indices can be any + integer values but must be fresh: each index must be distinct from all + previously added nodes. + + data : Data, optional + The state in which to initialize the newly added nodes. The supported forms + of state specification depend on the backend implementation. + + All backends must support the basic predefined states in ``BasicStates``. + + - If a single basic state is provided, all new nodes are initialized in that state. + - If a list of basic states is provided, it must match the length of ``nodes``, and + each node is initialized with its corresponding state. + + Some backends support other forms of state specification. + + - ``StatevecBackend`` supports arbitrary state vectors: + - A single-qubit state vector will be broadcast to all nodes. + - A multi-qubit state vector of dimension :math:`2^n`, where :math:`n = \mathrm{len}(nodes)`, + initializes the new nodes jointly. + + - ``DensityMatrixBackend`` supports both state vectors and density matrices: + - State vectors are handled as in ``StatevecBackend``, and converted to + density matrices. + - A density matrix must have shape :math:`2^n \times 2^n`, where :math:`n = \mathrm{len}(nodes)`, + and is used to jointly initialize the new nodes. + + Notes + ----- + Previously existing nodes remain unchanged. """ - self.__state = state - if node_index is None: - self.__node_index = NodeIndex() - else: - self.__node_index = node_index.copy() - if not isinstance(pr_calc, bool): - raise TypeError("`pr_calc` should be bool") - # whether to compute the probability - self.__pr_calc = pr_calc - self.__rng = ensure_rng(rng) - self.__symbolic = symbolic - - def copy(self) -> Backend: - """Return a copy of the backend.""" - return Backend(self.__state, self.__node_index, self.__pr_calc, self.__rng) - @property - def rng(self) -> Generator: - """Return the associated random-number generator.""" - return self.__rng + def apply_channel(self, channel: KrausChannel, qargs: Collection[int]) -> None: # noqa: ARG002,PLR6301 + """Apply channel to the state. - @property - def state(self) -> State: - """Return the state of the backend.""" - return self.__state + The default implementation of this method raises + `NoiseNotSupportedError`, indicating that the backend does not + support noise. Backends that support noise (e.g., + `DensityMatrixBackend`) override this method to implement + the effect of noise. - @property - def node_index(self) -> NodeIndex: - """Return the node index table of the backend.""" - return self.__node_index + Parameters + ---------- + qargs : list of ints. Target qubits + """ + raise NoiseNotSupportedError - @property - def symbolic(self) -> bool: - """Return whether the backend supports symbolic computation.""" - return self.__symbolic + @abstractmethod + def apply_clifford(self, node: int, clifford: Clifford) -> None: + """Apply single-qubit Clifford gate, specified by vop index specified in graphix.clifford.CLIFFORD.""" + + @abstractmethod + def correct_byproduct(self, cmd: command.X | command.Z, measure_method: MeasureMethod) -> None: + """Byproduct correction correct for the X or Z byproduct operators, by applying the X or Z gate.""" - def add_nodes(self, nodes, data=BasicStates.PLUS) -> None: - """Add new qubit(s) to statevector in argument and assign the corresponding node number to list self.node_index. + @abstractmethod + def entangle_nodes(self, edge: tuple[int, int]) -> None: + """Apply CZ gate to two connected nodes. Parameters ---------- - nodes : list of node indices + edge : tuple (i, j) + a pair of node indices + """ + + @abstractmethod + def finalize(self, output_nodes: Iterable[int]) -> None: + """To be run at the end of pattern simulation to convey the order of output nodes.""" + + @abstractmethod + def measure(self, node: int, measurement: Measurement) -> Outcome: + """Perform measurement of a node and trace out the qubit. + + Parameters + ---------- + node: int + measurement: Measurement + """ + + +_DenseStateT_co = TypeVar("_DenseStateT_co", bound="DenseState", covariant=True) + + +@dataclass(frozen=True) +class DenseStateBackend(Backend[_DenseStateT_co], Generic[_DenseStateT_co]): + """ + Abstract base class for backends that represent quantum states explicitly in memory. + + This class defines common functionality for backends that store the entire quantum + state as a dense array—either as a state vector (pure state) or a density matrix + (mixed state)—and perform quantum operations using standard linear algebra. It is + designed to be the shared base class of `StatevecBackend` and `DensityMatrixBackend`. + + In contrast to :class:`TensorNetworkBackend`, which uses structured and compressed + representations (e.g., matrix product states) to scale to larger systems, + `DenseStateBackend` subclasses simulate quantum systems by maintaining the full + state in memory. This approach enables straightforward implementation of gates, + measurements, and noise models, but scales exponentially with the number of qubits. + + This class is not meant to be instantiated directly. + + Parameters + ---------- + node_index : NodeIndex, optional + Mapping between node numbers and qubit indices in the internal state of the backend. + pr_calc : bool, optional + Whether or not to compute the probability distribution before choosing the measurement outcome. + If False, measurements yield results 0/1 with 50% probabilities each. + rng : Generator, optional + Random number generator used to sample measurement outcomes. + symbolic : bool, optional + If True, support arbitrary objects (typically, symbolic expressions) in matrices. + + See Also + -------- + :class:`StatevecBackend`, :class:`DensityMatrixBackend`, :class:`TensorNetworkBackend` + """ + + node_index: NodeIndex = dataclasses.field(default_factory=NodeIndex) + pr_calc: bool = True + rng: Generator = dataclasses.field(default_factory=ensure_rng) + symbolic: bool = False + + @override + def add_nodes(self, nodes: Sequence[int], data: Data = BasicStates.PLUS) -> None: + """ + Add new nodes (qubits) to the backend and initialize them in a specified state. + + Parameters + ---------- + nodes : Sequence[int] + A list of node indices to add to the backend. These indices can be any + integer values but must be fresh: each index must be distinct from all + previously added nodes. + + data : Data, optional + The state in which to initialize the newly added nodes. The supported forms + of state specification depend on the backend implementation. + + See :meth:`Backend.add_nodes` for further details. """ self.state.add_nodes(nqubit=len(nodes), data=data) self.node_index.extend(nodes) + @override def entangle_nodes(self, edge: tuple[int, int]) -> None: """Apply CZ gate to two connected nodes. @@ -265,7 +789,8 @@ def entangle_nodes(self, edge: tuple[int, int]) -> None: control = self.node_index.index(edge[1]) self.state.entangle((target, control)) - def measure(self, node: int, measurement: Measurement) -> bool: + @override + def measure(self, node: int, measurement: Measurement) -> Outcome: """Perform measurement of a node and trace out the qubit. Parameters @@ -279,28 +804,38 @@ def measure(self, node: int, measurement: Measurement) -> bool: measurement.plane, measurement.angle, self.state, - rng=self.__rng, - pr_calc=self.__pr_calc, - symbolic=self.__symbolic, + rng=self.rng, + pr_calc=self.pr_calc, + symbolic=self.symbolic, ) self.node_index.remove(node) self.state.remove_qubit(loc) return result - def correct_byproduct(self, cmd: command.M, measure_method: MeasureMethod) -> None: + @override + def correct_byproduct(self, cmd: command.X | command.Z, measure_method: MeasureMethod) -> None: """Byproduct correction correct for the X or Z byproduct operators, by applying the X or Z gate.""" if np.mod(sum(measure_method.get_measure_result(j) for j in cmd.domain), 2) == 1: - if cmd.kind == CommandKind.X: - op = Ops.X - elif cmd.kind == CommandKind.Z: - op = Ops.Z + op = Ops.X if cmd.kind == CommandKind.X else Ops.Z self.apply_single(node=cmd.node, op=op) - def apply_single(self, node: int, op: npt.NDArray) -> None: + @override + def apply_channel(self, channel: KrausChannel, qargs: Collection[int]) -> None: + """Apply channel to the state. + + Parameters + ---------- + qargs : list of ints. Target qubits + """ + indices = [self.node_index.index(i) for i in qargs] + self.state.apply_channel(channel, indices) + + def apply_single(self, node: int, op: Matrix) -> None: """Apply a single gate to the state.""" index = self.node_index.index(node) self.state.evolve_single(op=op, i=index) + @override def apply_clifford(self, node: int, clifford: Clifford) -> None: """Apply single-qubit Clifford gate, specified by vop index specified in graphix.clifford.CLIFFORD.""" loc = self.node_index.index(node) @@ -314,6 +849,7 @@ def sort_qubits(self, output_nodes: Iterable[int]) -> None: self.state.swap((i, move_from)) self.node_index.swap(i, move_from) + @override def finalize(self, output_nodes: Iterable[int]) -> None: """To be run at the end of pattern simulation.""" self.sort_qubits(output_nodes) diff --git a/graphix/sim/data.py b/graphix/sim/data.py new file mode 100644 index 000000000..4e255591f --- /dev/null +++ b/graphix/sim/data.py @@ -0,0 +1,41 @@ +"""Type `Data` for initializing nodes in backends. + +The type `Data` is declared here to support type-checking +`base_backend`, but its definition requires importing the `statevec` +and `density_matrix` modules, both of which import `base_backend`. To +break this import cycle, `data` is only imported within the +type-checking block of `base_backend`. +""" + +from __future__ import annotations + +import sys +from collections.abc import Iterable + +from typing_extensions import TypeAlias # TypeAlias introduced in Python 3.10 + +from graphix.parameter import ExpressionOrSupportsComplex +from graphix.sim.density_matrix import DensityMatrix +from graphix.sim.statevec import Statevec +from graphix.states import State + +if sys.version_info >= (3, 10): + Data: TypeAlias = ( + State + | DensityMatrix + | Statevec + | Iterable[State] + | Iterable[ExpressionOrSupportsComplex] + | Iterable[Iterable[ExpressionOrSupportsComplex]] + ) +else: + from typing import Union + + Data: TypeAlias = Union[ + State, + DensityMatrix, + Statevec, + Iterable[State], + Iterable[ExpressionOrSupportsComplex], + Iterable[Iterable[ExpressionOrSupportsComplex]], + ] diff --git a/graphix/sim/density_matrix.py b/graphix/sim/density_matrix.py index 134773f58..a25e4ce6b 100644 --- a/graphix/sim/density_matrix.py +++ b/graphix/sim/density_matrix.py @@ -6,36 +6,40 @@ from __future__ import annotations import copy -import sys +import dataclasses +import math from collections.abc import Collection, Iterable -from typing import TYPE_CHECKING, SupportsComplex, SupportsFloat +from dataclasses import dataclass +from typing import TYPE_CHECKING, SupportsComplex import numpy as np +from typing_extensions import override from graphix import linalg_validations as lv -from graphix import parameter, states +from graphix import parameter from graphix.channels import KrausChannel -from graphix.parameter import Expression, ExpressionOrSupportsComplex -from graphix.sim.base_backend import Backend, State +from graphix.parameter import Expression, ExpressionOrFloat, ExpressionOrSupportsComplex +from graphix.sim.base_backend import DenseState, DenseStateBackend, Matrix, kron, matmul, outer, tensordot, vdot from graphix.sim.statevec import CNOT_TENSOR, CZ_TENSOR, SWAP_TENSOR, Statevec -from graphix.states import BasicStates +from graphix.states import BasicStates, State if TYPE_CHECKING: - from collections.abc import Mapping - - import numpy.typing as npt + from collections.abc import Mapping, Sequence from graphix.parameter import ExpressionOrSupportsFloat, Parameter + from graphix.sim.data import Data -class DensityMatrix(State): +class DensityMatrix(DenseState): """DensityMatrix object.""" + rho: Matrix + def __init__( self, data: Data = BasicStates.PLUS, nqubit: int | None = None, - ): + ) -> None: """Initialize density matrix objects. The behaviour builds on the one of *graphix.statevec.Statevec*. @@ -61,40 +65,42 @@ def __init__( if nqubit is not None and nqubit < 0: raise ValueError("nqubit must be a non-negative integer.") - def check_size_consistency(mat) -> None: + def check_size_consistency(mat: Matrix) -> None: if nqubit is not None and mat.shape != (2**nqubit, 2**nqubit): raise ValueError( f"Inconsistent parameters between nqubit = {nqubit} and the shape of the provided density matrix = {mat.shape}." ) if isinstance(data, DensityMatrix): - check_size_consistency(data) + check_size_consistency(data.rho) # safe: https://numpy.org/doc/stable/reference/generated/numpy.ndarray.copy.html self.rho = data.rho.copy() return if isinstance(data, Iterable): input_list = list(data) - if len(input_list) != 0: - # needed since Object is iterable but not subscribable! - try: - if isinstance(input_list[0], Iterable) and isinstance( - input_list[0][0], (Expression, SupportsComplex, SupportsFloat) - ): - self.rho = np.array(input_list) - if not lv.is_qubitop(self.rho): - raise ValueError("Cannot interpret the provided density matrix as a qubit operator.") - check_size_consistency(self.rho) - if self.rho.dtype != "O": - if not lv.is_unit_trace(self.rho): - raise ValueError("Density matrix must have unit trace.") - if not lv.is_psd(self.rho): - raise ValueError("Density matrix must be positive semi-definite.") - return - except TypeError: - pass + if len(input_list) != 0 and isinstance(input_list[0], Iterable): + + def get_row( + item: Iterable[ExpressionOrSupportsComplex] | State | Expression | SupportsComplex, + ) -> list[ExpressionOrSupportsComplex]: + if isinstance(item, Iterable): + return list(item) + raise TypeError("Every row of a matrix should be iterable.") + + input_matrix: list[list[ExpressionOrSupportsComplex]] = [get_row(item) for item in input_list] + self.rho = np.array(input_matrix) + if not lv.is_qubitop(self.rho): + raise ValueError("Cannot interpret the provided density matrix as a qubit operator.") + check_size_consistency(self.rho) + if self.rho.dtype != np.object_: + if not lv.is_unit_trace(self.rho): + raise ValueError("Density matrix must have unit trace.") + if not lv.is_psd(self.rho): + raise ValueError("Density matrix must be positive semi-definite.") + return statevec = Statevec(data, nqubit) # NOTE this works since np.outer flattens the inputs! - self.rho = np.outer(statevec.psi, statevec.psi.conj()) + self.rho = outer(statevec.psi, statevec.psi.conj()) @property def nqubit(self) -> int: @@ -105,12 +111,36 @@ def __str__(self) -> str: """Return a string description.""" return f"DensityMatrix object, with density matrix {self.rho} and shape {self.dims()}." - def add_nodes(self, nqubit, data) -> None: - """Add nodes to the density matrix.""" + @override + def add_nodes(self, nqubit: int, data: Data) -> None: + r""" + Add nodes (qubits) to the density matrix and initialize them in a specified state. + + Parameters + ---------- + nqubit : int + The number of qubits to add to the density matrix. + + data : Data, optional + The state in which to initialize the newly added nodes. + + - If a single basic state is provided, all new nodes are initialized in that state. + - If a list of basic states is provided, it must match the length of ``nodes``, and + each node is initialized with its corresponding state. + - A single-qubit state vector will be broadcast to all nodes. + - A multi-qubit state vector of dimension :math:`2^n` initializes the new nodes jointly. + - A density matrix must have shape :math:`2^n \times 2^n`, + and is used to jointly initialize the new nodes. + + Notes + ----- + Previously existing nodes remain unchanged. + """ dm_to_add = DensityMatrix(nqubit=nqubit, data=data) self.tensor(dm_to_add) - def evolve_single(self, op, i) -> None: + @override + def evolve_single(self, op: Matrix, i: int) -> None: """Single-qubit operation. Parameters @@ -126,11 +156,12 @@ def evolve_single(self, op, i) -> None: raise ValueError("op must be 2*2 matrix.") rho_tensor = self.rho.reshape((2,) * self.nqubit * 2) - rho_tensor = np.tensordot(np.tensordot(op, rho_tensor, axes=(1, i)), op.conj().T, axes=(i + self.nqubit, 0)) + rho_tensor = tensordot(tensordot(op, rho_tensor, axes=(1, i)), op.conj().T, axes=(i + self.nqubit, 0)) rho_tensor = np.moveaxis(rho_tensor, (0, -1), (i, i + self.nqubit)) self.rho = rho_tensor.reshape((2**self.nqubit, 2**self.nqubit)) - def evolve(self, op: npt.NDArray, qargs: Collection[int]) -> None: + @override + def evolve(self, op: Matrix, qargs: Sequence[int]) -> None: """Multi-qubit operation. Args: @@ -165,10 +196,10 @@ def evolve(self, op: npt.NDArray, qargs: Collection[int]) -> None: rho_tensor = self.rho.reshape((2,) * self.nqubit * 2) - rho_tensor = np.tensordot( - np.tensordot(op_tensor, rho_tensor, axes=[tuple(nqb_op + i for i in range(len(qargs))), tuple(qargs)]), + rho_tensor = tensordot( + tensordot(op_tensor, rho_tensor, axes=(tuple(nqb_op + i for i in range(len(qargs))), tuple(qargs))), op.conj().T.reshape((2,) * 2 * nqb_op), - axes=[tuple(i + self.nqubit for i in qargs), tuple(i for i in range(len(qargs)))], + axes=(tuple(i + self.nqubit for i in qargs), tuple(i for i in range(len(qargs)))), ) rho_tensor = np.moveaxis( rho_tensor, @@ -177,7 +208,8 @@ def evolve(self, op: npt.NDArray, qargs: Collection[int]) -> None: ) self.rho = rho_tensor.reshape((2**self.nqubit, 2**self.nqubit)) - def expectation_single(self, op: npt.NDArray, i: int) -> complex: + @override + def expectation_single(self, op: Matrix, loc: int) -> complex: """Return the expectation value of single-qubit operator. Args: @@ -188,8 +220,8 @@ def expectation_single(self, op: npt.NDArray, i: int) -> complex: ------- complex: expectation value (real for hermitian ops!). """ - if not (0 <= i < self.nqubit): - raise ValueError(f"Wrong target qubit {i}. Must between 0 and {self.nqubit - 1}.") + if not (0 <= loc < self.nqubit): + raise ValueError(f"Wrong target qubit {loc}. Must between 0 and {self.nqubit - 1}.") if op.shape != (2, 2): raise ValueError("op must be 2x2 matrix.") @@ -197,11 +229,13 @@ def expectation_single(self, op: npt.NDArray, i: int) -> complex: st1 = copy.copy(self) st1.normalize() - rho_tensor = st1.rho.reshape((2,) * st1.nqubit * 2) - rho_tensor = np.tensordot(op, rho_tensor, axes=[1, i]) - rho_tensor = np.moveaxis(rho_tensor, 0, i) + nqubit = self.nqubit + rho_tensor: Matrix = st1.rho.reshape((2,) * nqubit * 2) + rho_tensor = tensordot(op, rho_tensor, axes=((1,), (loc,))) + rho_tensor = np.moveaxis(rho_tensor, 0, loc) - return np.trace(rho_tensor.reshape((2**self.nqubit, 2**self.nqubit))) + # complex() needed with mypy strict mode (no-any-return) + return complex(np.trace(rho_tensor.reshape((2**nqubit, 2**nqubit)))) def dims(self) -> tuple[int, ...]: """Return the dimensions of the density matrix.""" @@ -219,7 +253,7 @@ def tensor(self, other: DensityMatrix) -> None: """ if not isinstance(other, DensityMatrix): other = DensityMatrix(other) - self.rho = np.kron(self.rho, other.rho) + self.rho = kron(self.rho, other.rho) def cnot(self, edge: tuple[int, int]) -> None: """Apply CNOT gate to density matrix. @@ -231,15 +265,16 @@ def cnot(self, edge: tuple[int, int]) -> None: """ self.evolve(CNOT_TENSOR.reshape(4, 4), edge) - def swap(self, edge: tuple[int, int]) -> None: + @override + def swap(self, qubits: tuple[int, int]) -> None: """Swap qubits. Parameters ---------- - edge : (int, int) or [int, int] + qubits : (int, int) (control, target) qubits indices. """ - self.evolve(SWAP_TENSOR.reshape(4, 4), edge) + self.evolve(SWAP_TENSOR.reshape(4, 4), qubits) def entangle(self, edge: tuple[int, int]) -> None: """Connect graph nodes. @@ -253,11 +288,21 @@ def entangle(self, edge: tuple[int, int]) -> None: def normalize(self) -> None: """Normalize density matrix.""" - self.rho /= np.trace(self.rho) + # Note that the following calls to `astype` are guaranteed to + # return the original NumPy array itself, since `copy=False` and + # the `dtype` matches. This is important because the array is + # then modified in place. + if self.rho.dtype == np.object_: + rho_o = self.rho.astype(np.object_, copy=False) + rho_o /= np.trace(rho_o) + else: + rho_c = self.rho.astype(np.complex128, copy=False) + rho_c /= np.trace(rho_c) - def remove_qubit(self, loc) -> None: + @override + def remove_qubit(self, qarg: int) -> None: """Remove a qubit.""" - self.ptrace(loc) + self.ptrace(qarg) self.normalize() def ptrace(self, qargs: Collection[int] | int) -> None: @@ -280,13 +325,12 @@ def ptrace(self, qargs: Collection[int] | int) -> None: rho_res = self.rho.reshape((2,) * n * 2) # ket, bra indices to trace out trace_axes = list(qargs) + [n + qarg for qarg in qargs] - rho_res = np.tensordot( - np.eye(2**qargs_num).reshape((2,) * qargs_num * 2), rho_res, axes=(list(range(2 * qargs_num)), trace_axes) - ) + op: Matrix = np.eye(2**qargs_num).reshape((2,) * qargs_num * 2).astype(np.complex128) + rho_res = tensordot(op, rho_res, axes=(range(2 * qargs_num), trace_axes)) self.rho = rho_res.reshape((2**nqubit_after, 2**nqubit_after)) - def fidelity(self, statevec: Statevec) -> float: + def fidelity(self, statevec: Statevec) -> ExpressionOrFloat: """Calculate the fidelity against reference statevector. Parameters @@ -294,13 +338,18 @@ def fidelity(self, statevec: Statevec) -> float: statevec : numpy array statevector (flattened numpy array) to compare with """ - return np.abs(statevec.transpose().conj() @ self.rho @ statevec) + result = vdot(statevec.psi, matmul(self.rho, statevec.psi)) + if isinstance(result, Expression): + return result + assert math.isclose(result.imag, 0) + return result.real - def flatten(self) -> npt.NDArray: + def flatten(self) -> Matrix: """Return flattened density matrix.""" return self.rho.flatten() - def apply_channel(self, channel: KrausChannel, qargs: Collection[int]) -> None: + @override + def apply_channel(self, channel: KrausChannel, qargs: Sequence[int]) -> None: """Apply a channel to a density matrix. Parameters @@ -350,41 +399,8 @@ def xreplace(self, assignment: Mapping[Parameter, ExpressionOrSupportsFloat]) -> return result -class DensityMatrixBackend(Backend): +@dataclass(frozen=True) +class DensityMatrixBackend(DenseStateBackend[DensityMatrix]): """MBQC simulator with density matrix method.""" - def __init__(self, **kwargs) -> None: - """Construct a density matrix backend.""" - super().__init__(DensityMatrix(nqubit=0), **kwargs) - - def apply_channel(self, channel: KrausChannel, qargs: Collection[int]) -> None: - """Apply channel to the state. - - Parameters - ---------- - qargs : list of ints. Target qubits - """ - indices = [self.node_index.index(i) for i in qargs] - self.state.apply_channel(channel, indices) - - -if sys.version_info >= (3, 10): - Data = ( - states.State - | DensityMatrix - | Statevec - | Iterable[states.State] - | Iterable[ExpressionOrSupportsComplex] - | Iterable[Iterable[ExpressionOrSupportsComplex]] - ) -else: - from typing import Union - - Data = Union[ - states.State, - DensityMatrix, - Statevec, - Iterable[states.State], - Iterable[ExpressionOrSupportsComplex], - Iterable[Iterable[ExpressionOrSupportsComplex]], - ] + state: DensityMatrix = dataclasses.field(init=False, default_factory=lambda: DensityMatrix(nqubit=0)) diff --git a/graphix/sim/statevec.py b/graphix/sim/statevec.py index eca7eca97..058260651 100644 --- a/graphix/sim/statevec.py +++ b/graphix/sim/statevec.py @@ -3,31 +3,27 @@ from __future__ import annotations import copy +import dataclasses import functools +import math from collections.abc import Iterable +from dataclasses import dataclass from typing import TYPE_CHECKING, SupportsComplex, SupportsFloat import numpy as np import numpy.typing as npt +from typing_extensions import override -from graphix import parameter, states, utils -from graphix.parameter import Expression, ExpressionOrSupportsComplex -from graphix.sim.base_backend import Backend, State +from graphix import parameter, states +from graphix.parameter import Expression, ExpressionOrSupportsComplex, check_expression_or_float +from graphix.sim.base_backend import DenseState, DenseStateBackend, Matrix, kron, tensordot from graphix.states import BasicStates if TYPE_CHECKING: - import collections - from collections.abc import Mapping + from collections.abc import Mapping, Sequence - from graphix.parameter import ExpressionOrSupportsFloat, Parameter - - -class StatevectorBackend(Backend): - """MBQC simulator with statevector method.""" - - def __init__(self, **kwargs) -> None: - """Construct a state vector backend.""" - super().__init__(Statevec(nqubit=0), **kwargs) + from graphix.parameter import ExpressionOrFloat, ExpressionOrSupportsFloat, Parameter + from graphix.sim.data import Data CZ_TENSOR = np.array( @@ -44,14 +40,16 @@ def __init__(self, **kwargs) -> None: ) -class Statevec(State): +class Statevec(DenseState): """Statevector object.""" + psi: Matrix + def __init__( self, data: Data = BasicStates.PLUS, nqubit: int | None = None, - ): + ) -> None: """Initialize statevector objects. `data` can be: @@ -66,11 +64,12 @@ def __init__( If both *nqubit* and *data* are provided, consistency of the dimensions is checked. If a *graphix.statevec.Statevec* is passed, returns a copy. - - :param data: input data to prepare the state. Can be a classical description or a numerical input, defaults to graphix.states.BasicStates.PLUS - :type data: Data, optional - :param nqubit: number of qubits to prepare, defaults to None - :type nqubit: int, optional + Parameters + ---------- + data : Data, optional + input data to prepare the state. Can be a classical description or a numerical input, defaults to graphix.states.BasicStates.PLUS + nqubit : int, optional + number of qubits to prepare, defaults to None """ if nqubit is not None and nqubit < 0: raise ValueError("nqubit must be a non-negative integer.") @@ -84,6 +83,11 @@ def __init__( self.psi = data.psi.copy() return + # The type + # list[states.State] | list[ExpressionOrSupportsComplex] | list[Iterable[ExpressionOrSupportsComplex]] + # would be more precise, but given a value X of type Iterable[A] | Iterable[B], + # mypy infers that list(X) has type list[A | B] instead of list[A] | list[B]. + input_list: list[states.State | ExpressionOrSupportsComplex | Iterable[ExpressionOrSupportsComplex]] if isinstance(data, states.State): if nqubit is None: nqubit = 1 @@ -100,18 +104,25 @@ def __init__( self.psi = np.array(1, dtype=np.complex128) elif isinstance(input_list[0], states.State): - utils.check_list_elements(input_list, states.State) if nqubit is None: nqubit = len(input_list) elif nqubit != len(input_list): raise ValueError("Mismatch between nqubit and length of input state.") - list_of_sv = [s.get_statevector() for s in input_list] - tmp_psi = functools.reduce(np.kron, list_of_sv) + + def get_statevector( + s: states.State | ExpressionOrSupportsComplex | Iterable[ExpressionOrSupportsComplex], + ) -> npt.NDArray[np.complex128]: + if not isinstance(s, states.State): + raise TypeError("Data should be an homogeneous sequence of states.") + return s.get_statevector() + + list_of_sv = [get_statevector(s) for s in input_list] + + tmp_psi = functools.reduce(lambda m0, m1: np.kron(m0, m1).astype(np.complex128), list_of_sv) # reshape self.psi = tmp_psi.reshape((2,) * nqubit) # `SupportsFloat` is needed because `numpy.float64` is not an instance of `SupportsComplex`! elif isinstance(input_list[0], (Expression, SupportsComplex, SupportsFloat)): - utils.check_list_elements(input_list, (Expression, SupportsComplex, SupportsFloat)) if nqubit is None: length = len(input_list) if length & (length - 1): @@ -131,12 +142,35 @@ def __str__(self) -> str: """Return a string description.""" return f"Statevec object with statevector {self.psi} and length {self.dims()}." - def add_nodes(self, nqubit, data) -> None: - """Add nodes to the state vector.""" + @override + def add_nodes(self, nqubit: int, data: Data) -> None: + r""" + Add nodes (qubits) to the state vector and initialize them in a specified state. + + Parameters + ---------- + nqubit : int + The number of qubits to add to the state vector. + + data : Data, optional + The state in which to initialize the newly added nodes. + + - If a single basic state is provided, all new nodes are initialized in that state. + - If a list of basic states is provided, it must match the length of ``nodes``, and + each node is initialized with its corresponding state. + - A single-qubit state vector will be broadcast to all nodes. + - A multi-qubit state vector of dimension :math:`2^n`, where :math:`n = \mathrm{len}(nodes)`, + initializes the new nodes jointly. + + Notes + ----- + Previously existing nodes remain unchanged. + """ sv_to_add = Statevec(nqubit=nqubit, data=data) self.tensor(sv_to_add) - def evolve_single(self, op: npt.NDArray, i: int) -> None: + @override + def evolve_single(self, op: Matrix, i: int) -> None: """Apply a single-qubit operation. Parameters @@ -146,10 +180,11 @@ def evolve_single(self, op: npt.NDArray, i: int) -> None: i : int qubit index """ - psi = np.tensordot(op, self.psi, (1, i)) + psi = tensordot(op, self.psi, (1, i)) self.psi = np.moveaxis(psi, 0, i) - def evolve(self, op: np.ndarray, qargs: list[int]) -> None: + @override + def evolve(self, op: Matrix, qargs: Sequence[int]) -> None: """Apply a multi-qubit operation. Parameters @@ -163,10 +198,10 @@ def evolve(self, op: np.ndarray, qargs: list[int]) -> None: # TODO shape = (2,)* 2 * op_dim shape = [2 for _ in range(2 * op_dim)] op_tensor = op.reshape(shape) - psi = np.tensordot( + psi = tensordot( op_tensor, self.psi, - (tuple(op_dim + i for i in range(len(qargs))), tuple(qargs)), + (tuple(op_dim + i for i in range(len(qargs))), qargs), ) self.psi = np.moveaxis(psi, range(len(qargs)), qargs) @@ -174,30 +209,14 @@ def dims(self) -> tuple[int, ...]: """Return the dimensions.""" return self.psi.shape - def ptrace(self, qargs) -> None: - """Perform partial trace of the selected qubits. - - .. warning:: - This method currently assumes qubits in qargs to be separable from the rest - (checks not implemented for speed). - Otherwise, the state returned will be forced to be pure which will result in incorrect output. - Correct behaviour will be implemented as soon as the densitymatrix class, currently under development - (PR #64), is merged. - - Parameters - ---------- - qargs : list of int - qubit indices to trace over - """ - nqubit_after = len(self.psi.shape) - len(qargs) - psi = self.psi - rho = np.tensordot(psi, psi.conj(), axes=(qargs, qargs)) # density matrix - rho = np.reshape(rho, (2**nqubit_after, 2**nqubit_after)) - evals, evecs = np.linalg.eig(rho) # back to statevector - # NOTE works since only one 1 in the eigenvalues corresponding to the state - # TODO use np.eigh since rho is Hermitian? - self.psi = np.reshape(evecs[:, np.argmax(evals)], (2,) * nqubit_after) + # Note that `@property` must appear before `@override` for pyright + @property + @override + def nqubit(self) -> int: + """Return the number of qubits.""" + return self.psi.size - 1 + @override def remove_qubit(self, qarg: int) -> None: r"""Remove a separable qubit from the system and assemble a statevector for remaining qubits. @@ -243,15 +262,17 @@ def remove_qubit(self, qarg: int) -> None: norm = _get_statevec_norm(self.psi) if isinstance(norm, SupportsFloat): assert not np.isclose(norm, 0) - psi = self.psi.take(indices=0, axis=qarg) + index: list[slice[int] | int] = [slice(None)] * self.psi.ndim + index[qarg] = 0 + psi = self.psi[tuple(index)] norm = _get_statevec_norm(psi) - self.psi = ( - psi - if not isinstance(norm, SupportsFloat) or not np.isclose(norm, 0) - else self.psi.take(indices=1, axis=qarg) - ) + if isinstance(norm, SupportsFloat) and math.isclose(norm, 0): + index[qarg] = 1 + psi = self.psi[tuple(index)] + self.psi = psi self.normalize() + @override def entangle(self, edge: tuple[int, int]) -> None: """Connect graph nodes. @@ -261,7 +282,7 @@ def entangle(self, edge: tuple[int, int]) -> None: (control, target) qubit indices """ # contraction: 2nd index - control index, and 3rd index - target index. - psi = np.tensordot(CZ_TENSOR, self.psi, ((2, 3), edge)) + psi = tensordot(CZ_TENSOR, self.psi, ((2, 3), edge)) # sort back axes self.psi = np.moveaxis(psi, (0, 1), edge) @@ -279,9 +300,9 @@ def tensor(self, other: Statevec) -> None: psi_other = other.psi.flatten() total_num = len(self.dims()) + len(other.dims()) - self.psi = np.kron(psi_self, psi_other).reshape((2,) * total_num) + self.psi = kron(psi_self, psi_other).reshape((2,) * total_num) - def cnot(self, qubits) -> None: + def cnot(self, qubits: tuple[int, int]) -> None: """Apply CNOT. Parameters @@ -290,11 +311,12 @@ def cnot(self, qubits) -> None: (control, target) qubit indices """ # contraction: 2nd index - control index, and 3rd index - target index. - psi = np.tensordot(CNOT_TENSOR, self.psi, ((2, 3), qubits)) + psi = tensordot(CNOT_TENSOR, self.psi, ((2, 3), qubits)) # sort back axes self.psi = np.moveaxis(psi, (0, 1), qubits) - def swap(self, qubits) -> None: + @override + def swap(self, qubits: tuple[int, int]) -> None: """Swap qubits. Parameters @@ -303,20 +325,31 @@ def swap(self, qubits) -> None: (control, target) qubit indices """ # contraction: 2nd index - control index, and 3rd index - target index. - psi = np.tensordot(SWAP_TENSOR, self.psi, ((2, 3), qubits)) + psi = tensordot(SWAP_TENSOR, self.psi, ((2, 3), qubits)) # sort back axes self.psi = np.moveaxis(psi, (0, 1), qubits) def normalize(self) -> None: """Normalize the state in-place.""" - norm = _get_statevec_norm(self.psi) - self.psi /= norm + # Note that the following calls to `astype` are guaranteed to + # return the original NumPy array itself, since `copy=False` and + # the `dtype` matches. This is important because the array is + # then modified in place. + if self.psi.dtype == np.object_: + psi_o = self.psi.astype(np.object_, copy=False) + norm_o = _get_statevec_norm_symbolic(psi_o) + psi_o /= norm_o + else: + psi_c = self.psi.astype(np.complex128, copy=False) + norm_c = _get_statevec_norm_numeric(psi_c) + psi_c /= norm_c - def flatten(self) -> npt.NDArray: + def flatten(self) -> Matrix: """Return flattened statevector.""" return self.psi.flatten() - def expectation_single(self, op: np.NDArray, loc: int) -> complex: + @override + def expectation_single(self, op: Matrix, loc: int) -> complex: """Return the expectation value of single-qubit operator. Parameters @@ -334,9 +367,9 @@ def expectation_single(self, op: np.NDArray, loc: int) -> complex: st1.normalize() st2 = copy.copy(st1) st1.evolve_single(op, loc) - return np.dot(st2.psi.flatten().conjugate(), st1.psi.flatten()) + return complex(np.dot(st2.psi.flatten().conjugate(), st1.psi.flatten())) - def expectation_value(self, op: np.NDArray, qargs: collections.abc.Iterable[int]) -> complex: + def expectation_value(self, op: Matrix, qargs: Sequence[int]) -> complex: """Return the expectation value of multi-qubit operator. Parameters @@ -354,7 +387,7 @@ def expectation_value(self, op: np.NDArray, qargs: collections.abc.Iterable[int] st2.normalize() st1 = copy.copy(st2) st1.evolve(op, qargs) - return np.dot(st2.psi.flatten().conjugate(), st1.psi.flatten()) + return complex(np.dot(st2.psi.flatten().conjugate(), st1.psi.flatten())) def subs(self, variable: Parameter, substitute: ExpressionOrSupportsFloat) -> Statevec: """Return a copy of the state vector where all occurrences of the given variable in measurement angles are substituted by the given value.""" @@ -369,22 +402,29 @@ def xreplace(self, assignment: Mapping[Parameter, ExpressionOrSupportsFloat]) -> return result -def _get_statevec_norm(psi): +@dataclass(frozen=True) +class StatevectorBackend(DenseStateBackend[Statevec]): + """MBQC simulator with statevector method.""" + + state: Statevec = dataclasses.field(init=False, default_factory=lambda: Statevec(nqubit=0)) + + +def _get_statevec_norm_symbolic(psi: npt.NDArray[np.object_]) -> ExpressionOrFloat: """Return norm of the state.""" - return np.sqrt(np.sum(psi.flatten().conj() * psi.flatten())) + flat = psi.flatten() + return check_expression_or_float(np.sqrt(np.sum(flat.conj() * flat))) -if TYPE_CHECKING: - from collections.abc import Iterable - - Data = states.State | Statevec | Iterable[states.State] | Iterable[ExpressionOrSupportsComplex] -else: - from collections.abc import Iterable - from typing import Union - - Data = Union[ - states.State, - Statevec, - Iterable[states.State], - Iterable[ExpressionOrSupportsComplex], - ] +def _get_statevec_norm_numeric(psi: npt.NDArray[np.complex128]) -> float: + flat = psi.flatten() + norm_sq = np.sum(flat.conj() * flat) + assert math.isclose(norm_sq.imag, 0, abs_tol=1e-15) + return math.sqrt(norm_sq.real) + + +def _get_statevec_norm(psi: Matrix) -> ExpressionOrFloat: + """Return norm of the state.""" + # Narrow psi to concrete dtype + if psi.dtype == np.object_: + return _get_statevec_norm_symbolic(psi.astype(np.object_, copy=False)) + return _get_statevec_norm_numeric(psi.astype(np.complex128, copy=False)) diff --git a/graphix/sim/tensornet.py b/graphix/sim/tensornet.py index 586fbac3b..fb6284dfe 100644 --- a/graphix/sim/tensornet.py +++ b/graphix/sim/tensornet.py @@ -3,224 +3,62 @@ from __future__ import annotations import string +import sys +import warnings +from abc import ABC from copy import deepcopy -from typing import TYPE_CHECKING +from dataclasses import dataclass +from typing import TYPE_CHECKING, SupportsComplex import numpy as np +import numpy.typing as npt import quimb.tensor as qtn from quimb.tensor import Tensor, TensorNetwork +# TypeAlias introduced in Python 3.10 +# override introduced in Python 3.12 +from typing_extensions import TypeAlias, override + from graphix import command from graphix.ops import Ops +from graphix.parameter import Expression from graphix.rng import ensure_rng -from graphix.sim.base_backend import Backend, State +from graphix.sim.base_backend import Backend, BackendState from graphix.states import BasicStates, PlanarState if TYPE_CHECKING: - import numpy.typing as npt + from collections.abc import Iterable, Sequence + + from cotengra.oe import PathOptimizer from numpy.random import Generator + from graphix import Pattern from graphix.clifford import Clifford - from graphix.measurements import Measurement + from graphix.measurements import Measurement, Outcome + from graphix.sim import Data from graphix.simulator import MeasureMethod +if sys.version_info >= (3, 10): + PrepareState: TypeAlias = str | npt.NDArray[np.complex128] +else: + from typing import Union -class TensorNetworkBackend(Backend): - """Tensor Network Simulator for MBQC. - - Executes the measurement pattern using TN expression of graph states. - """ - - def __init__( - self, pattern, graph_prep="auto", input_state=BasicStates.PLUS, rng: Generator | None = None, **kwargs - ): - """ - Construct a tensor network backend. - - Parameters - ---------- - pattern : graphix.Pattern - graph_prep : str - 'parallel' : - Faster method for preparing a graph state. - The expression of a graph state can be obtained from the graph geometry. - See https://journals.aps.org/pra/abstract/10.1103/PhysRevA.76.052315 for detail calculation. - Note that 'N' and 'E' commands in the measurement pattern are ignored. - 'sequential' : - Sequentially execute N and E commands, strictly following the measurement pattern. - In this strategy, All N and E commands executed sequentially. - 'auto'(default) : - Automatically select a preparation strategy based on the max degree of a graph - input_state : preparation for input states (only BasicStates.PLUS is supported for tensor networks yet), - rng: :class:`np.random.Generator` (default: *None*) - random number generator to use for measurements - **kwargs : Additional keyword args to be passed to quimb.tensor.TensorNetwork. - """ - if input_state != BasicStates.PLUS: - msg = "TensorNetworkBackend currently only supports BasicStates.PLUS as input state." - raise NotImplementedError(msg) - self.pattern = pattern - self.output_nodes = pattern.output_nodes - self.results = deepcopy(pattern.results) - if graph_prep in {"parallel", "sequential"}: - self.graph_prep = graph_prep - elif graph_prep == "opt": - self.graph_prep = "parallel" - print(f"graph preparation strategy '{graph_prep}' is deprecated and will be replaced by 'parallel'") - elif graph_prep == "auto": - max_degree = pattern.get_max_degree() - if max_degree > 5: - self.graph_prep = "sequential" - else: - self.graph_prep = "parallel" - else: - raise ValueError(f"Invalid graph preparation strategy: {graph_prep}") - - rng = ensure_rng(rng) - self.__rng = rng - if self.graph_prep == "parallel": - if not pattern.is_standard(): - raise ValueError("parallel preparation strategy does not support not-standardized pattern") - nodes, edges = pattern.get_graph() - state = MBQCTensorNet( - graph_nodes=nodes, - graph_edges=edges, - default_output_nodes=pattern.output_nodes, - rng=rng, - **kwargs, - ) - elif self.graph_prep == "sequential": - state = MBQCTensorNet(default_output_nodes=pattern.output_nodes, rng=rng, **kwargs) - self._decomposed_cz = _get_decomposed_cz() - self._isolated_nodes = pattern.get_isolated_nodes() - super().__init__(state) - - def add_nodes(self, nodes, data=BasicStates.PLUS) -> None: - """Add nodes to the network. - - Parameters - ---------- - nodes : iterator of int - index set of the new nodes. - """ - if data != BasicStates.PLUS: - raise NotImplementedError( - "TensorNetworkBackend currently only supports |+> input state (see https://github.com/TeamGraphix/graphix/issues/167)." - ) - if self.graph_prep == "sequential": - self.state.add_qubits(nodes) - elif self.graph_prep == "opt": - pass - - def entangle_nodes(self, edge) -> None: - """Make entanglement between nodes specified by edge. - - Parameters - ---------- - edge : tuple of int - edge specifies two target nodes of the CZ gate. - """ - if self.graph_prep == "sequential": - old_inds = [self.state._dangling[str(node)] for node in edge] - tids = self.state._get_tids_from_inds(old_inds, which="any") - tensors = [self.state.tensor_map[tid] for tid in tids] - new_inds = [gen_str() for _ in range(3)] - - # retag dummy indices - for i in range(2): - tensors[i].retag({"Open": "Close"}, inplace=True) - self.state._dangling[str(edge[i])] = new_inds[i] - cz_tn = TensorNetwork( - [ - qtn.Tensor( - self._decomposed_cz[0], - [new_inds[0], old_inds[0], new_inds[2]], - [str(edge[0]), "CZ", "Open"], - ), - qtn.Tensor( - self._decomposed_cz[1], - [new_inds[2], new_inds[1], old_inds[1]], - [str(edge[1]), "CZ", "Open"], - ), - ] - ) - self.state.add_tensor_network(cz_tn) - elif self.graph_prep == "opt": - pass - - def measure(self, node: int, measurement: Measurement) -> tuple[Backend, int]: - """Perform measurement of the node. - - In the context of tensornetwork, performing measurement equals to - applying measurement operator to the tensor. Here, directly contracted with the projected state. - - Parameters - ---------- - node : int - index of the node to measure - measurement : Measurement - measure plane and angle - """ - if node in self._isolated_nodes: - vector = self.state.get_open_tensor_from_index(node) - probs = np.abs(vector) ** 2 - probs /= np.sum(probs) - result = self.__rng.choice([0, 1], p=probs) - self.results[node] = result - buffer = 1 / probs[result] ** 0.5 - else: - # choose the measurement result randomly - result = self.__rng.choice([0, 1]) - self.results[node] = result - buffer = 2**0.5 - vec = PlanarState(measurement.plane, measurement.angle).get_statevector() - if result: - vec = measurement.plane.orth.matrix @ vec - proj_vec = vec * buffer - self.state.measure_single(node, basis=proj_vec) - return result - - def correct_byproduct(self, cmd: command.X | command.Z, measure_method: MeasureMethod) -> None: - """Perform byproduct correction. - - Parameters - ---------- - cmd : list - Byproduct command - i.e. ['X' or 'Z', node, signal_domain] - measure_method : MeasureMethod - The measure method to use - """ - if np.mod(sum(measure_method.get_measure_result(j) for j in cmd.domain), 2) == 1: - op = Ops.X if isinstance(cmd, command.X) else Ops.Z - self.state.evolve_single(cmd.node, op, cmd.kind) - - def apply_clifford(self, node: int, clifford: Clifford) -> None: - """Apply single-qubit Clifford gate. - - Parameters - ---------- - cmd : list - clifford command. - See https://arxiv.org/pdf/2212.11975.pdf for the detail. - """ - self.state.evolve_single(node, clifford.matrix) - - def finalize(self, output_nodes) -> None: - """Do nothing.""" + PrepareState: TypeAlias = Union[str, npt.NDArray[np.complex128]] -class MBQCTensorNet(State, TensorNetwork): +class MBQCTensorNet(BackendState, TensorNetwork): """Tensor Network Simulator interface for MBQC patterns, using quimb.tensor.core.TensorNetwork.""" + _dangling: dict[str, str] + def __init__( self, rng: Generator | None = None, - graph_nodes=None, - graph_edges=None, - default_output_nodes=None, - ts=None, - **kwargs, + graph_nodes: Iterable[int] | None = None, + graph_edges: Iterable[tuple[int, int]] | None = None, + default_output_nodes: Iterable[int] | None = None, + ts: list[TensorNetwork] | TensorNetwork | None = None, + virtual: bool = False, ) -> None: """ Initialize MBQCTensorNet. @@ -238,20 +76,15 @@ def __init__( """ if ts is None: ts = [] - if isinstance(ts, MBQCTensorNet): - super().__init__(ts=ts, **kwargs) - self._dangling = ts._dangling - self.default_output_nodes = default_output_nodes - else: - super().__init__(ts=ts, **kwargs) - self._dangling = {} - self.default_output_nodes = default_output_nodes + super().__init__(ts=ts, virtual=virtual) + self._dangling = ts._dangling if isinstance(ts, MBQCTensorNet) else {} + self.default_output_nodes = None if default_output_nodes is None else list(default_output_nodes) # prepare the graph state if graph_nodes and graph_edges are given if graph_nodes is not None and graph_edges is not None: self.set_graph_state(graph_nodes, graph_edges) self.__rng = ensure_rng(rng) - def get_open_tensor_from_index(self, index): + def get_open_tensor_from_index(self, index: int | str) -> npt.NDArray[np.complex128]: """Get tensor specified by node index. The tensor has a dangling edge. Parameters @@ -270,9 +103,9 @@ def get_open_tensor_from_index(self, index): tags = [index, "Open"] tid = next(iter(self._get_tids_from_tags(tags, which="all"))) tensor = self.tensor_map[tid] - return tensor.data + return tensor.data.astype(dtype=np.complex128) - def add_qubit(self, index, state="plus"): + def add_qubit(self, index: int, state: PrepareState = "plus") -> None: """Add a single qubit to the network. Parameters @@ -298,14 +131,18 @@ def add_qubit(self, index, state="plus"): elif state == "iminus": vec = BasicStates.MINUS_I.get_statevector() else: - assert state.shape == (2,), "state must be 2-element np.ndarray" - assert np.isclose(np.linalg.norm(state), 1), "state must be normalized" + if isinstance(state, str): + raise TypeError(f"Unknown state: {state}") + if state.shape != (2,): + raise ValueError("state must be 2-element np.ndarray") + if not np.isclose(np.linalg.norm(state), 1): + raise ValueError("state must be normalized") vec = state tsr = Tensor(vec, [ind], [tag, "Open"]) self.add_tensor(tsr) self._dangling[tag] = ind - def evolve_single(self, index, arr, label="U"): + def evolve_single(self, index: int, arr: npt.NDArray[np.complex128], label: str = "U") -> None: """Apply single-qubit operator to a qubit with the given index. Parameters @@ -332,24 +169,44 @@ def evolve_single(self, index, arr, label="U"): self._dangling[str(index)] = new_ind self.add_tensor(node_ts) - def add_qubits(self, indices, states="plus"): + def add_qubits(self, indices: Sequence[int], states: PrepareState | Iterable[PrepareState] = "plus") -> None: """Add qubits to the network. Parameters ---------- indices : iterator of int indices of the new qubits. - states (optional): str or 2*2 numpy.ndarray or list - initial states of the new qubits. - "plus", "minus", "zero", "one", "iplus", "iminus", or 1*2 np.ndarray (arbitrary state). - list of the above, to specify the initial state of each qubit. + states (optional): Data + initial state or list of initial states of the new qubits. """ - if not isinstance(states, list): - states = [states] * len(indices) - for i, ind in enumerate(indices): - self.add_qubit(ind, state=states[i]) + if isinstance(states, str): + states_iter: list[PrepareState] = [states] * len(indices) + else: + states_list = list(states) + # `states` is of type `PrepareState`, a type alias for + # `str | npt.NDArray[np.complex128]`. To distinguish + # between the two cases, we just need to check whether + # an element is a character or a complex number. + if len(states_list) == 0 or isinstance(states_list[0], SupportsComplex): + states_iter = [np.array(states_list)] * len(indices) + else: - def measure_single(self, index, basis="Z", bypass_probability_calculation=True, outcome=None): + def get_prepare_state(item: PrepareState | SupportsComplex) -> PrepareState: + if isinstance(item, SupportsComplex): + raise TypeError("Unexpected complex") + return item + + states_iter = [get_prepare_state(item) for item in states_list] + for ind, state in zip(indices, states_iter): + self.add_qubit(ind, state) + + def measure_single( + self, + index: int, + basis: str | npt.NDArray[np.complex128] = "Z", + bypass_probability_calculation: bool = True, + outcome: Outcome | None = None, + ) -> Outcome: """Measure a node in specified basis. Note this does not perform the partial trace. Parameters @@ -405,7 +262,7 @@ def measure_single(self, index, basis="Z", bypass_probability_calculation=True, self.add_tensor(proj_ts) return result - def set_graph_state(self, nodes, edges): + def set_graph_state(self, nodes: Iterable[int], edges: Iterable[tuple[int, int]]) -> None: """Prepare the graph state without directly applying CZ gates. Parameters @@ -417,8 +274,8 @@ def set_graph_state(self, nodes, edges): .. seealso:: :meth:`~graphix.sim.tensornet.TensorNetworkBackend.__init__()` """ - ind_dict = {} - vec_dict = {} + ind_dict: dict[int, list[str]] = {} + vec_dict: dict[int, list[bool]] = {} for edge in edges: for node in edge: if node not in ind_dict: @@ -453,7 +310,14 @@ def set_graph_state(self, nodes, edges): ) * 2 ** (dim_tensor / 4 - 1.0 / 2) self.add_tensor(Tensor(tensor, ind_dict[node], [str(node), "Open"])) - def get_basis_coefficient(self, basis, normalize=True, indices=None, **kwagrs): + def _require_default_output_nodes(self) -> list[int]: + if self.default_output_nodes is None: + raise ValueError("output_nodes is not set.") + return self.default_output_nodes + + def get_basis_coefficient( + self, basis: int | str, normalize: bool = True, indices: Sequence[int] | None = None + ) -> complex: """Calculate the coefficient of a given computational basis. Parameters @@ -471,7 +335,7 @@ def get_basis_coefficient(self, basis, normalize=True, indices=None, **kwagrs): coefficient """ if indices is None: - indices = self.default_output_nodes + indices = self._require_default_output_nodes() if isinstance(basis, str): basis = int(basis, 2) tn = self.copy() @@ -493,13 +357,13 @@ def get_basis_coefficient(self, basis, normalize=True, indices=None, **kwagrs): # contraction tn_simplified = tn.full_simplify("ADCR") - coef = tn_simplified.contract(output_inds=[], **kwagrs) + coef = tn_simplified.contract(output_inds=[]) if normalize: norm = self.get_norm() return coef / norm return coef - def get_basis_amplitude(self, basis, **kwagrs): + def get_basis_amplitude(self, basis: str | int) -> float: """Calculate the probability amplitude of the specified computational basis state. Parameters @@ -514,10 +378,10 @@ def get_basis_amplitude(self, basis, **kwagrs): """ if isinstance(basis, str): basis = int(basis, 2) - coef = self.get_basis_coefficient(basis, **kwagrs) + coef = self.get_basis_coefficient(basis) return abs(coef) ** 2 - def to_statevector(self, indices=None, **kwagrs) -> npt.NDArray: + def to_statevector(self, indices: Sequence[int] | None = None) -> npt.NDArray[np.complex128]: """Retrieve the statevector from the tensornetwork. This method tends to be slow however we plan to parallelize this. @@ -532,17 +396,17 @@ def to_statevector(self, indices=None, **kwagrs) -> npt.NDArray: numpy.ndarray : statevector """ - n_qubit = len(self.default_output_nodes) if indices is None else len(indices) - statevec = np.zeros(2**n_qubit, np.complex128) + n_qubit = len(self._require_default_output_nodes()) if indices is None else len(indices) + statevec: npt.NDArray[np.complex128] = np.zeros(2**n_qubit, np.complex128) for i in range(len(statevec)): - statevec[i] = self.get_basis_coefficient(i, normalize=False, indices=indices, **kwagrs) + statevec[i] = self.get_basis_coefficient(i, normalize=False, indices=indices) return statevec / np.linalg.norm(statevec) - def flatten(self) -> npt.NDArray: + def flatten(self) -> npt.NDArray[np.complex128]: """Return flattened statevector.""" return self.to_statevector().flatten() - def get_norm(self, **kwagrs): + def get_norm(self, optimize: str | PathOptimizer | None = None) -> float: """Calculate the norm of the state. Returns @@ -554,9 +418,16 @@ def get_norm(self, **kwagrs): tn_cp2 = tn_cp1.conj() tn = TensorNetwork([tn_cp1, tn_cp2]) tn_simplified = tn.full_simplify("ADCR") - return abs(tn_simplified.contract(output_inds=[], **kwagrs)) ** 0.5 + contraction = tn_simplified.contract(output_inds=[], optimize=optimize) + return float(abs(contraction) ** 0.5) - def expectation_value(self, op, qubit_indices, output_node_indices=None, **kwagrs): + def expectation_value( + self, + op: npt.NDArray[np.complex128], + qubit_indices: Sequence[int], + output_node_indices: Iterable[int] | None = None, + optimize: str | PathOptimizer | None = None, + ) -> float: """Calculate expectation value of the given operator. Parameters @@ -574,14 +445,8 @@ def expectation_value(self, op, qubit_indices, output_node_indices=None, **kwagr float : Expectation value """ - if output_node_indices is None: - if self.default_output_nodes is None: - raise ValueError("output_nodes is not set.") - target_nodes = [self.default_output_nodes[ind] for ind in qubit_indices] - out_inds = self.default_output_nodes - else: - target_nodes = [output_node_indices[ind] for ind in qubit_indices] - out_inds = output_node_indices + out_inds = self._require_default_output_nodes() if output_node_indices is None else list(output_node_indices) + target_nodes = [out_inds[ind] for ind in qubit_indices] op_dim = len(qubit_indices) op = op.reshape([2 for _ in range(2 * op_dim)]) new_ind_left = [gen_str() for _ in range(op_dim)] @@ -606,11 +471,11 @@ def expectation_value(self, op, qubit_indices, output_node_indices=None, **kwagr # contraction tn_cp_left = tn_cp_left.full_simplify("ADCR") - exp_val = tn_cp_left.contract(output_inds=[], **kwagrs) - norm = self.get_norm(**kwagrs) + exp_val = tn_cp_left.contract(output_inds=[], optimize=optimize) + norm = self.get_norm(optimize=optimize) return exp_val / norm**2 - def evolve(self, operator, qubit_indices, decompose=True, **kwagrs): + def evolve(self, operator: npt.NDArray[np.complex128], qubit_indices: list[int], decompose: bool = True) -> None: """Apply an arbitrary operator to the state. Parameters @@ -628,34 +493,35 @@ def evolve(self, operator, qubit_indices, decompose=True, **kwagrs): operator = operator.reshape(shape) # operator indices - node_indices = [self.default_output_nodes[index] for index in qubit_indices] + default_output_nodes = self._require_default_output_nodes() + node_indices = [default_output_nodes[index] for index in qubit_indices] old_ind_list = [self._dangling[str(index)] for index in node_indices] new_ind_list = [gen_str() for _ in range(len(node_indices))] for i in range(len(node_indices)): self._dangling[str(node_indices[i])] = new_ind_list[i] - ts = Tensor( + ts: Tensor | TensorNetwork = Tensor( operator, new_ind_list + old_ind_list, [str(index) for index in node_indices], ) if decompose: # decompose tensor into Matrix Product Operator(MPO) - tensors = [] - bond_inds = {0: None} + tensors: list[Tensor | TensorNetwork] = [] + bond_inds: dict[int, str | None] = {0: None} for i in range(len(node_indices) - 1): bond_inds[i + 1] = gen_str() - left_inds = [new_ind_list[i], old_ind_list[i]] - if bond_inds[i]: - left_inds.append(bond_inds[i]) - unit_tensor, ts = ts.split(left_inds=left_inds, bond_ind=bond_inds[i + 1], **kwagrs) + left_inds: list[str] = [new_ind_list[i], old_ind_list[i]] + bond_ind = bond_inds[i] + if bond_ind is not None: + left_inds.append(bond_ind) + unit_tensor, ts = ts.split(left_inds=left_inds, bond_ind=bond_inds[i + 1]) tensors.append(unit_tensor) tensors.append(ts) - op_tensor = TensorNetwork(tensors) - else: - op_tensor = ts - self.add(op_tensor) + ts = TensorNetwork(tensors) + self.add(ts) - def copy(self, deep=False): + @override + def copy(self, virtual: bool = False, deep: bool = False) -> MBQCTensorNet: """Return the copy of this object. Parameters @@ -674,7 +540,7 @@ def copy(self, deep=False): return self.__class__(rng=self.__rng, ts=self) -def _get_decomposed_cz(): +def _get_decomposed_cz() -> list[npt.NDArray[np.complex128]]: """Return the decomposed cz tensors. This is an internal method. @@ -698,23 +564,250 @@ def _get_decomposed_cz(): 4-rank x1 3-rank x2 """ cz_ts = Tensor( - Ops.CZ.reshape((2, 2, 2, 2)).astype(np.float64), + Ops.CZ.reshape((2, 2, 2, 2)).astype(np.complex128), ["O1", "O2", "I1", "I2"], ["CZ"], ) decomposed_cz = cz_ts.split(left_inds=["O1", "I1"], right_inds=["O2", "I2"], max_bond=4) return [ - decomposed_cz.tensors[0].data, - decomposed_cz.tensors[1].data, + decomposed_cz.tensors[0].data.astype(np.complex128), + decomposed_cz.tensors[1].data.astype(np.complex128), ] -def gen_str(): +@dataclass(frozen=True) +class _AbstractTensorNetworkBackend(Backend[MBQCTensorNet], ABC): + state: MBQCTensorNet + pattern: Pattern + graph_prep: str + input_state: Data + rng: Generator + output_nodes: list[int] + results: dict[int, Outcome] + _decomposed_cz: list[npt.NDArray[np.complex128]] + _isolated_nodes: set[int] + + +@dataclass(frozen=True) +class TensorNetworkBackend(_AbstractTensorNetworkBackend): + """Tensor Network Simulator for MBQC. + + Executes the measurement pattern using TN expression of graph states. + + Parameters + ---------- + pattern : graphix.Pattern + graph_prep : str + 'parallel' : + Faster method for preparing a graph state. + The expression of a graph state can be obtained from the graph geometry. + See https://journals.aps.org/pra/abstract/10.1103/PhysRevA.76.052315 for detail calculation. + Note that 'N' and 'E' commands in the measurement pattern are ignored. + 'sequential' : + Sequentially execute N and E commands, strictly following the measurement pattern. + In this strategy, All N and E commands executed sequentially. + 'auto'(default) : + Automatically select a preparation strategy based on the max degree of a graph + input_state : preparation for input states (only BasicStates.PLUS is supported for tensor networks yet), + rng: :class:`np.random.Generator` (default: *None*) + random number generator to use for measurements + """ + + def __init__( + self, pattern: Pattern, graph_prep: str = "auto", input_state: Data | None = None, rng: Generator | None = None + ) -> None: + """Construct a tensor network backend.""" + if input_state is None: + input_state = BasicStates.PLUS + elif input_state != BasicStates.PLUS: + msg = "TensorNetworkBackend currently only supports BasicStates.PLUS as input state." + raise NotImplementedError(msg) + rng = ensure_rng(rng) + if graph_prep in {"parallel", "sequential"}: + pass + elif graph_prep == "opt": + graph_prep = "parallel" + warnings.warn( + f"graph preparation strategy '{graph_prep}' is deprecated and will be replaced by 'parallel'", + stacklevel=1, + ) + elif graph_prep == "auto": + max_degree = pattern.get_max_degree() + # "parallel" does not support non standard pattern + graph_prep = "sequential" if max_degree > 5 or not pattern.is_standard() else "parallel" + else: + raise ValueError(f"Invalid graph preparation strategy: {graph_prep}") + results = deepcopy(pattern.results) + if graph_prep == "parallel": + if not pattern.is_standard(): + raise ValueError("parallel preparation strategy does not support not-standardized pattern") + nodes, edges = pattern.get_graph() + state = MBQCTensorNet( + graph_nodes=nodes, + graph_edges=edges, + default_output_nodes=pattern.output_nodes, + rng=rng, + ) + decomposed_cz = [] + else: # graph_prep == "sequential": + state = MBQCTensorNet(default_output_nodes=pattern.output_nodes, rng=rng) + decomposed_cz = _get_decomposed_cz() + isolated_nodes = pattern.get_isolated_nodes() + super().__init__( + state, + pattern, + graph_prep, + input_state, + rng, + pattern.output_nodes, + results, + decomposed_cz, + isolated_nodes, + ) + + @override + def add_nodes(self, nodes: Sequence[int], data: Data = BasicStates.PLUS) -> None: + """ + Add new nodes (qubits) to the network and initialize them in a specified state. + + Parameters + ---------- + nodes : Sequence[int] + A list of node indices to add to the backend. These indices can be any + integer values but must be fresh: each index must be distinct from all + previously added nodes. + + data : Data, optional + The state in which to initialize the newly added nodes. + + - If a single basic state is provided, all new nodes are initialized in that state. + - If a list of basic states is provided, it must match the length of ``nodes``, and + each node is initialized with its corresponding state. + + Notes + ----- + Previously existing nodes remain unchanged. + """ + if data != BasicStates.PLUS: + raise NotImplementedError( + "TensorNetworkBackend currently only supports |+> input state (see https://github.com/TeamGraphix/graphix/issues/167)." + ) + if self.graph_prep == "sequential": + self.state.add_qubits(nodes) + elif self.graph_prep == "opt": + pass + + @override + def entangle_nodes(self, edge: tuple[int, int]) -> None: + """Make entanglement between nodes specified by edge. + + Parameters + ---------- + edge : tuple of int + edge specifies two target nodes of the CZ gate. + """ + if self.graph_prep == "sequential": + old_inds = [self.state._dangling[str(node)] for node in edge] + tids = self.state._get_tids_from_inds(old_inds, which="any") + tensors = [self.state.tensor_map[tid] for tid in tids] + new_inds = [gen_str() for _ in range(3)] + + # retag dummy indices + for i in range(2): + tensors[i].retag({"Open": "Close"}, inplace=True) + self.state._dangling[str(edge[i])] = new_inds[i] + cz_tn = TensorNetwork( + [ + qtn.Tensor( + self._decomposed_cz[0], + [new_inds[0], old_inds[0], new_inds[2]], + [str(edge[0]), "CZ", "Open"], + ), + qtn.Tensor( + self._decomposed_cz[1], + [new_inds[2], new_inds[1], old_inds[1]], + [str(edge[1]), "CZ", "Open"], + ), + ] + ) + self.state.add_tensor_network(cz_tn) + elif self.graph_prep == "opt": + pass + + @override + def measure(self, node: int, measurement: Measurement) -> Outcome: + """Perform measurement of the node. + + In the context of tensornetwork, performing measurement equals to + applying measurement operator to the tensor. Here, directly contracted with the projected state. + + Parameters + ---------- + node : int + index of the node to measure + measurement : Measurement + measure plane and angle + """ + if node in self._isolated_nodes: + vector: npt.NDArray[np.complex128] = self.state.get_open_tensor_from_index(node) + probs = (np.abs(vector) ** 2).astype(np.float64) + probs /= np.sum(probs) + result: Outcome = self.rng.choice([0, 1], p=probs) + self.results[node] = result + buffer = 1 / probs[result] ** 0.5 + else: + # choose the measurement result randomly + result = self.rng.choice([0, 1]) + self.results[node] = result + buffer = 2**0.5 + if isinstance(measurement.angle, Expression): + raise TypeError("Parameterized pattern unsupported.") + vec = PlanarState(measurement.plane, measurement.angle).get_statevector() + if result: + vec = measurement.plane.orth.matrix @ vec + proj_vec = vec * buffer + self.state.measure_single(node, basis=proj_vec) + return result + + @override + def correct_byproduct(self, cmd: command.X | command.Z, measure_method: MeasureMethod) -> None: + """Perform byproduct correction. + + Parameters + ---------- + cmd : list + Byproduct command + i.e. ['X' or 'Z', node, signal_domain] + measure_method : MeasureMethod + The measure method to use + """ + if sum(measure_method.get_measure_result(j) for j in cmd.domain) % 2 == 1: + op = Ops.X if isinstance(cmd, command.X) else Ops.Z + self.state.evolve_single(cmd.node, op, str(cmd.kind)) + + @override + def apply_clifford(self, node: int, clifford: Clifford) -> None: + """Apply single-qubit Clifford gate. + + Parameters + ---------- + cmd : list + clifford command. + See https://arxiv.org/pdf/2212.11975.pdf for the detail. + """ + self.state.evolve_single(node, clifford.matrix) + + @override + def finalize(self, output_nodes: Iterable[int]) -> None: + """Do nothing.""" + + +def gen_str() -> str: """Generate dummy string for einsum.""" return qtn.rand_uuid() -def outer_product(vectors): +def outer_product(vectors: Sequence[npt.NDArray[np.complex128]]) -> npt.NDArray[np.complex128]: """Return the outer product of the given vectors. Parameters @@ -729,4 +822,4 @@ def outer_product(vectors): """ subscripts = string.ascii_letters[: len(vectors)] subscripts = ",".join(subscripts) + "->" + subscripts - return np.einsum(subscripts, *vectors) + return np.array(np.einsum(subscripts, *vectors), dtype=np.complex128) diff --git a/graphix/simulator.py b/graphix/simulator.py index 0aedb20eb..48e982f95 100644 --- a/graphix/simulator.py +++ b/graphix/simulator.py @@ -8,13 +8,14 @@ import abc import warnings -from typing import TYPE_CHECKING +from typing import TYPE_CHECKING, TypeVar import numpy as np +from graphix import command from graphix.clifford import Clifford -from graphix.command import BaseM, CommandKind, M, MeasureUpdate -from graphix.measurements import Measurement +from graphix.command import BaseM, CommandKind, MeasureUpdate +from graphix.measurements import Measurement, Outcome from graphix.sim.base_backend import Backend from graphix.sim.density_matrix import DensityMatrixBackend from graphix.sim.statevec import StatevectorBackend @@ -22,11 +23,15 @@ from graphix.states import BasicStates if TYPE_CHECKING: + from collections.abc import Mapping from typing import Any from graphix.noise_models.noise_model import NoiseModel from graphix.pattern import Pattern - from graphix.sim.density_matrix import Data + from graphix.sim import BackendState, Data + + +_StateT_co = TypeVar("_StateT_co", bound="BackendState", covariant=True) class MeasureMethod(abc.ABC): @@ -37,7 +42,7 @@ class MeasureMethod(abc.ABC): Example: class `ClientMeasureMethod` in https://github.com/qat-inria/veriphix """ - def measure(self, backend: Backend, cmd, noise_model=None) -> None: + def measure(self, backend: Backend[_StateT_co], cmd: BaseM, noise_model: NoiseModel | None = None) -> None: """Perform a measure.""" description = self.get_measurement_description(cmd) result = backend.measure(cmd.node, description) @@ -62,7 +67,7 @@ def get_measurement_description(self, cmd: BaseM) -> Measurement: ... @abc.abstractmethod - def get_measure_result(self, node: int) -> bool: + def get_measure_result(self, node: int) -> Outcome: """Return the result of a previous measurement. Parameters @@ -78,7 +83,7 @@ def get_measure_result(self, node: int) -> bool: ... @abc.abstractmethod - def set_measure_result(self, node: int, result: bool) -> None: + def set_measure_result(self, node: int, result: Outcome) -> None: """Store the result of a previous measurement. Parameters @@ -94,18 +99,23 @@ def set_measure_result(self, node: int, result: bool) -> None: class DefaultMeasureMethod(MeasureMethod): """Default measurement method implementing standard measurement plane/angle update for MBQC.""" - def __init__(self, results=None): + def __init__(self, results: Mapping[int, Outcome] | None = None): """Initialize with an optional result dictionary. Parameters ---------- - results : dict[int, bool] | None, optional + results : Mapping[int, Outcome] | None, optional Mapping of previously measured nodes to their results. If ``None``, an empty dictionary is created. + + Notes + ----- + If a mapping is provided, it is treated as read-only. Measurements + performed during simulation are stored in `self.results`, which is a copy + of the given mapping. The original `results` mapping is not modified. """ - if results is None: - results = {} - self.results = results + # results is coerced into dict, since `set_measure_result` mutates it. + self.results = {} if results is None else dict(results) def get_measurement_description(self, cmd: BaseM) -> Measurement: """Return the description of the measurement performed by ``cmd``. @@ -120,7 +130,7 @@ def get_measurement_description(self, cmd: BaseM) -> Measurement: Measurement Updated measurement specification. """ - assert isinstance(cmd, M) + assert isinstance(cmd, command.M) angle = cmd.angle * np.pi # extract signals for adaptive angle s_signal = sum(self.results[j] for j in cmd.s_domain) @@ -129,7 +139,7 @@ def get_measurement_description(self, cmd: BaseM) -> Measurement: angle = angle * measure_update.coeff + measure_update.add_term return Measurement(angle, measure_update.new_plane) - def get_measure_result(self, node: int) -> bool: + def get_measure_result(self, node: int) -> Outcome: """Return the result of a previous measurement. Parameters @@ -139,12 +149,12 @@ def get_measure_result(self, node: int) -> bool: Returns ------- - bool + Outcome Stored measurement outcome. """ return self.results[node] - def set_measure_result(self, node: int, result: bool) -> None: + def set_measure_result(self, node: int, result: Outcome) -> None: """Store the result of a previous measurement. Parameters @@ -163,10 +173,12 @@ class PatternSimulator: Executes the measurement pattern. """ + noise_model: NoiseModel | None + def __init__( self, pattern: Pattern, - backend: Backend | str = "statevector", + backend: Backend[BackendState] | str = "statevector", measure_method: MeasureMethod | None = None, noise_model: NoiseModel | None = None, **kwargs: Any, @@ -225,7 +237,7 @@ def measure_method(self) -> MeasureMethod: """Return the measure method.""" return self.__measure_method - def set_noise_model(self, model): + def set_noise_model(self, model: NoiseModel | None) -> None: """Set a noise model.""" if not isinstance(self.backend, DensityMatrixBackend) and model is not None: self.noise_model = None # if not initialized yet @@ -251,7 +263,8 @@ def run(self, input_state: Data = BasicStates.PLUS) -> None: self.backend.entangle_nodes(edge=cmd.nodes) elif cmd.kind == CommandKind.M: self.__measure_method.measure(self.backend, cmd) - elif cmd.kind in {CommandKind.X, CommandKind.Z}: + # Use of `==` here for mypy + elif cmd.kind == CommandKind.X or cmd.kind == CommandKind.Z: # noqa: PLR1714 self.backend.correct_byproduct(cmd, self.__measure_method) elif cmd.kind == CommandKind.C: self.backend.apply_clifford(cmd.node, cmd.clifford) @@ -274,11 +287,11 @@ def run(self, input_state: Data = BasicStates.PLUS) -> None: self.__measure_method.measure(self.backend, cmd, noise_model=self.noise_model) elif cmd.kind == CommandKind.X: self.backend.correct_byproduct(cmd, self.__measure_method) - if np.mod(sum(self.__measure_method.results[j] for j in cmd.domain), 2) == 1: + if np.mod(sum(self.__measure_method.get_measure_result(j) for j in cmd.domain), 2) == 1: self.backend.apply_channel(self.noise_model.byproduct_x(), [cmd.node]) elif cmd.kind == CommandKind.Z: self.backend.correct_byproduct(cmd, self.__measure_method) - if np.mod(sum(self.__measure_method.results[j] for j in cmd.domain), 2) == 1: + if np.mod(sum(self.__measure_method.get_measure_result(j) for j in cmd.domain), 2) == 1: self.backend.apply_channel(self.noise_model.byproduct_z(), [cmd.node]) elif cmd.kind == CommandKind.C: self.backend.apply_clifford(cmd.node, cmd.clifford) diff --git a/graphix/transpiler.py b/graphix/transpiler.py index 1c52d7e55..2b3c675a6 100644 --- a/graphix/transpiler.py +++ b/graphix/transpiler.py @@ -7,7 +7,7 @@ from __future__ import annotations import dataclasses -from typing import TYPE_CHECKING, Callable +from typing import TYPE_CHECKING, Callable, SupportsFloat import numpy as np from typing_extensions import assert_never @@ -19,12 +19,14 @@ from graphix.ops import Ops from graphix.parameter import ExpressionOrFloat, Parameter from graphix.pattern import Pattern -from graphix.sim import base_backend -from graphix.sim.statevec import Data, Statevec +from graphix.rng import ensure_rng +from graphix.sim import Data, Statevec, base_backend if TYPE_CHECKING: from collections.abc import Iterable, Mapping, Sequence + from numpy.random import Generator + from graphix.command import Command @@ -874,18 +876,22 @@ def _ccx_command( ) return ancilla[17], ancilla[15], ancilla[13], seq - def simulate_statevector(self, input_state: Data | None = None) -> SimulateResult: + def simulate_statevector(self, input_state: Data | None = None, rng: Generator | None = None) -> SimulateResult: """Run statevector simulation of the gate sequence. Parameters ---------- - input_state : :class:`graphix.sim.statevec.Statevec` + input_state : Data + rng : Generator + Random number generator used to sample measurement outcomes. Returns ------- result : :class:`SimulateResult` output state of the statevector simulation and results of classical measures. """ + symbolic = self.is_parameterized() + rng = ensure_rng(rng) state = Statevec(nqubit=self.width) if input_state is None else Statevec(nqubit=self.width, data=input_state) classical_measures = [] @@ -919,7 +925,9 @@ def simulate_statevector(self, input_state: Data | None = None) -> SimulateResul elif instr.kind == instruction.InstructionKind.CCX: state.evolve(Ops.CCX, [instr.controls[0], instr.controls[1], instr.target]) elif instr.kind == instruction.InstructionKind.M: - result = base_backend.perform_measure(instr.target, instr.plane, instr.angle * np.pi, state, np.random) + result = base_backend.perform_measure( + instr.target, instr.plane, instr.angle * np.pi, state, rng, symbolic + ) classical_measures.append(result) else: raise ValueError(f"Unknown instruction: {instr}") @@ -943,6 +951,27 @@ def map_angle(self, f: Callable[[Angle], Angle]) -> Circuit: result.instruction.append(instr) return result + def is_parameterized(self) -> bool: + """ + Return `True` if there is at least one measurement angle that is not just an instance of `SupportsFloat`. + + A parameterized circuit is a circuit where at least one + measurement angle is an expression that is not a number, + typically an instance of `sympy.Expr` (but we don't force to + choose `sympy` here). + + """ + # Use of `==` here for mypy + return any( + not isinstance(instr.angle, SupportsFloat) + for instr in self.instruction + if instr.kind == InstructionKind.RZZ # noqa: PLR1714 + or instr.kind == InstructionKind.M + or instr.kind == InstructionKind.RX + or instr.kind == InstructionKind.RY + or instr.kind == InstructionKind.RZ + ) + def subs(self, variable: Parameter, substitute: ExpressionOrFloat) -> Circuit: """Return a copy of the circuit where all occurrences of the given variable in measurement angles are substituted by the given value.""" return self.map_angle(lambda angle: parameter.subs(angle, variable, substitute)) diff --git a/noxfile.py b/noxfile.py index 57c58cd3b..b64c9a64a 100644 --- a/noxfile.py +++ b/noxfile.py @@ -49,8 +49,7 @@ def tests_all(session: Session) -> None: session.run("pytest", "--doctest-modules") -# TODO: Add 3.13 CI -@nox.session(python=["3.9", "3.10", "3.11", "3.12"]) +@nox.session(python=["3.9", "3.10", "3.11", "3.12", "3.13"]) def tests_symbolic(session: Session) -> None: """Run the test suite of graphix-symbolic.""" session.install("-e", ".") diff --git a/pyproject.toml b/pyproject.toml index ed535492f..22af33377 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -41,6 +41,9 @@ dev = { file = ["requirements-dev.txt"] } extra = { file = ["requirements-extra.txt"] } doc = { file = ["requirements-doc.txt"] } +[tool.setuptools.packages.find] +include = ["graphix", "stubs"] + [tool.ruff] line-length = 120 extend-exclude = ["docs"] @@ -144,11 +147,6 @@ exclude = [ '^graphix/gflow\.py$', '^graphix/linalg\.py$', '^graphix/random_objects\.py$', - '^graphix/simulator\.py$', - '^graphix/sim/base_backend\.py$', - '^graphix/sim/density_matrix\.py$', - '^graphix/sim/statevec\.py$', - '^graphix/sim/tensornet\.py$', '^graphix/visualization\.py$', '^tests/test_density_matrix\.py$', '^tests/test_gflow\.py$', @@ -164,6 +162,7 @@ exclude = [ follow_imports = "silent" follow_untyped_imports = true # required for qiskit, requires mypy >=1.14 strict = true +mypy_path = "./stubs" [tool.pyright] # Keep in sync with mypy @@ -182,12 +181,6 @@ exclude = [ "graphix/gflow.py", "graphix/linalg.py", "graphix/random_objects.py", - "graphix/simulator.py", - "graphix/sim/base_backend.py", - "graphix/sim/density_matrix.py", - "graphix/sim/statevec.py", - "graphix/sim/tensornet.py", - "graphix/transpiler.py", "graphix/visualization.py", "tests/test_density_matrix.py", "tests/test_gflow.py", @@ -206,6 +199,7 @@ reportUnknownLambdaType = "information" reportUnknownMemberType = "information" reportUnknownParameterType = "information" reportUnknownVariableType = "information" +extraPaths = ["./stubs"] [tool.coverage.report] exclude_also = [ diff --git a/stubs/quimb/__init__.pyi b/stubs/quimb/__init__.pyi new file mode 100644 index 000000000..e69de29bb diff --git a/stubs/quimb/tensor/__init__.pyi b/stubs/quimb/tensor/__init__.pyi new file mode 100644 index 000000000..9ab686443 --- /dev/null +++ b/stubs/quimb/tensor/__init__.pyi @@ -0,0 +1,83 @@ +from collections.abc import Iterator, Mapping, Sequence +from typing import Literal + +import numpy as np +import numpy.typing as npt +from cotengra.oe import PathOptimizer +from typing_extensions import Self + +class Tensor: + data: npt.NDArray[np.generic] + + def __init__( + self, + data: float | Tensor | npt.NDArray[np.generic] = 1.0, + inds: Sequence[str] = (), + tags: Sequence[str] | None = None, + left_inds: Sequence[str] | None = None, + ) -> None: ... + def reindex(self, retag_map: Mapping[str, str], inplace: bool = False) -> Tensor: ... + def retag(self, retag_map: Mapping[str, str], inplace: bool = False) -> Tensor: ... + def split( + self, + left_inds: str | Sequence[str], + max_bond: int | None = None, + bond_ind: str | None = None, + right_inds: str | Sequence[str] | None = None, + ) -> TensorNetwork: ... + @property + def H(self) -> Tensor: ... # noqa: N802 + +class TensorNetwork: + tensor_map: dict[str, Tensor] + + def __init__( + self, + ts: Sequence[Tensor | TensorNetwork] | TensorNetwork = (), + *, + virtual: bool = False, + check_collisions: bool = True, + ) -> None: ... + def _get_tids_from_tags( + self, tags: Sequence[str] | str | int | None, which: Literal["all", "any", "!all", "!any"] = "all" + ) -> set[str]: ... + def _get_tids_from_inds( + self, inds: Sequence[str] | str | int | None, which: Literal["all", "any", "!all", "!any"] = "all" + ) -> set[str]: ... + def __iter__(self) -> Iterator[Tensor]: ... + def add( + self, + t: Tensor | TensorNetwork | Sequence[Tensor | TensorNetwork], + virtual: bool = False, + check_collisions: bool = True, + ) -> None: ... + def add_tensor(self, tensor: Tensor, tid: str | None = None, virtual: bool = False) -> None: ... + def add_tensor_network(self, tn: TensorNetwork, virtual: bool = False, check_collisions: bool = False) -> None: ... + def conj(self) -> TensorNetwork: ... + def contract( + self, + *others: Sequence[TensorNetwork], + output_inds: Sequence[str] | None = None, + optimize: str | PathOptimizer | None = None, + ) -> float: ... + def copy(self, virtual: bool = False, deep: bool = False) -> TensorNetwork: ... + def full_simplify( + self, + seq: str = "ADCR", + output_inds: Sequence[str] | None = None, + atol: float = 1e-12, + equalize_norms: bool = False, + inplace: bool = False, + progbar: bool = False, + ) -> Self: ... + def split( + self, + left_inds: str | Sequence[str], + max_bond: int | None = None, + bond_ind: str | None = None, + right_inds: str | Sequence[str] | None = None, + ) -> TensorNetwork: ... + @property + def tensors(self) -> tuple[Tensor, ...]: ... + +def rand_uuid(base: str = "") -> str: ... diff --git a/tests/test_density_matrix.py b/tests/test_density_matrix.py index 38bc348d5..35b376b7a 100644 --- a/tests/test_density_matrix.py +++ b/tests/test_density_matrix.py @@ -41,12 +41,16 @@ def test_init_without_data_fail(self) -> None: DensityMatrix(nqubit=-2) def test_init_with_invalid_data_fail(self, fx_rng: Generator) -> None: - with pytest.raises(TypeError): + with pytest.raises(ValueError): DensityMatrix("hello") with pytest.raises(TypeError): DensityMatrix(1) - with pytest.raises(TypeError): + # Length is not a power of two + with pytest.raises(ValueError): DensityMatrix([1, 2, [3]]) + # setting an array element with a sequence (numpy error) + with pytest.raises(ValueError): + DensityMatrix([1, 2, [3], 4]) # check with hermitian dm but not unit trace with pytest.raises(ValueError): @@ -245,7 +249,7 @@ def test_expectation_single_success(self, fx_rng: Generator) -> None: def test_tensor_fail(self) -> None: dm = DensityMatrix(nqubit=1) - with pytest.raises(TypeError): + with pytest.raises(ValueError): dm.tensor("hello") with pytest.raises(TypeError): dm.tensor(1) diff --git a/tests/test_pattern.py b/tests/test_pattern.py index 6f3443e45..4f452894a 100644 --- a/tests/test_pattern.py +++ b/tests/test_pattern.py @@ -28,10 +28,10 @@ from collections.abc import Sequence from typing import Literal - from graphix.sim.base_backend import Backend, State + from graphix.sim.base_backend import BackendState -def compare_backend_result_with_statevec(backend_state: State, statevec: Statevec) -> float: +def compare_backend_result_with_statevec(backend_state: BackendState, statevec: Statevec) -> float: if isinstance(backend_state, Statevec): return float(np.abs(np.dot(backend_state.flatten().conjugate(), statevec.flatten()))) if isinstance(backend_state, DensityMatrix): @@ -185,7 +185,7 @@ def test_shift_signals(self, fx_bg: PCG64, jumps: int) -> None: @pytest.mark.parametrize("jumps", range(1, 11)) @pytest.mark.parametrize("backend", ["statevector", "densitymatrix"]) # TODO: tensor network backend is excluded because "parallel preparation strategy does not support not-standardized pattern". - def test_pauli_measurement_random_circuit(self, fx_bg: PCG64, jumps: int, backend: Backend) -> None: + def test_pauli_measurement_random_circuit(self, fx_bg: PCG64, jumps: int, backend: str) -> None: rng = Generator(fx_bg.jumped(jumps)) nqubits = 3 depth = 3 diff --git a/tests/test_pyzx.py b/tests/test_pyzx.py index cd0d0486d..53e8644c6 100644 --- a/tests/test_pyzx.py +++ b/tests/test_pyzx.py @@ -98,7 +98,8 @@ def test_rz() -> None: circuit = Circuit(2) circuit.rz(0, np.pi / 4) pattern = circuit.transpile().pattern - circ = zx.qasm("qreg q[2]; rz(pi / 4) q[0];") # type: ignore[attr-defined] + # pyzx 0.8 does not support arithmetic expressions such as `pi / 4`. + circ = zx.qasm(f"qreg q[2]; rz({np.pi / 4}) q[0];") # type: ignore[attr-defined] g = circ.to_graph() og = from_pyzx_graph(g) pattern_zx = og.to_pattern() diff --git a/tests/test_statevec_backend.py b/tests/test_statevec_backend.py index 3a83ce20f..9d14f372b 100644 --- a/tests/test_statevec_backend.py +++ b/tests/test_statevec_backend.py @@ -1,6 +1,5 @@ from __future__ import annotations -from copy import deepcopy from typing import TYPE_CHECKING import numpy as np @@ -12,30 +11,12 @@ from graphix.pauli import Pauli from graphix.sim.statevec import Statevec, StatevectorBackend from graphix.states import BasicStates, PlanarState -from tests.test_graphsim import meas_op if TYPE_CHECKING: from numpy.random import Generator class TestStatevec: - def test_remove_one_qubit(self) -> None: - n = 10 - k = 3 - - sv = Statevec(nqubit=n) - for i in range(n): - sv.entangle([i, (i + 1) % n]) - m_op = meas_op(np.pi / 5) - sv.evolve(m_op, [k]) - sv2 = deepcopy(sv) - - sv.remove_qubit(k) - sv2.ptrace([k]) - sv2.normalize() - - assert np.abs(sv.psi.flatten().dot(sv2.psi.flatten().conj())) == pytest.approx(1) - @pytest.mark.parametrize( "state", [BasicStates.PLUS, BasicStates.ZERO, BasicStates.ONE, BasicStates.PLUS_I, BasicStates.MINUS_I] ) diff --git a/tests/test_tnsim.py b/tests/test_tnsim.py index ddb08159a..0410f7f71 100644 --- a/tests/test_tnsim.py +++ b/tests/test_tnsim.py @@ -33,7 +33,7 @@ def random_op(sites: int, dtype: type, rng: Generator) -> npt.NDArray: class TestTN: def test_add_node(self, fx_rng: Generator) -> None: node_index = fx_rng.integers(0, 1000) - tn = MBQCTensorNet(fx_rng) + tn = MBQCTensorNet(rng=fx_rng) tn.add_qubit(node_index) @@ -42,7 +42,7 @@ def test_add_node(self, fx_rng: Generator) -> None: def test_add_nodes(self, fx_rng: Generator) -> None: node_index = set(fx_rng.integers(0, 1000, 20)) - tn = MBQCTensorNet(fx_rng) + tn = MBQCTensorNet(rng=fx_rng) tn.graph_prep = "sequential" tn.add_qubits(node_index) diff --git a/tests/test_transpiler.py b/tests/test_transpiler.py index 8168a7b3a..d645d9ddf 100644 --- a/tests/test_transpiler.py +++ b/tests/test_transpiler.py @@ -115,7 +115,7 @@ def test_transpiled(self, fx_rng: Generator) -> None: state_mbqc = pattern.simulate_pattern(rng=fx_rng) assert np.abs(np.dot(state_mbqc.flatten().conjugate(), state.flatten())) == pytest.approx(1) - def test_measure(self) -> None: + def test_measure(self, fx_rng: Generator) -> None: circuit = Circuit(2) circuit.h(1) circuit.cnot(0, 1) @@ -123,7 +123,7 @@ def test_measure(self) -> None: _ = circuit.transpile() def simulate_and_measure() -> int: - circuit_simulate = circuit.simulate_statevector() + circuit_simulate = circuit.simulate_statevector(rng=fx_rng) assert circuit_simulate.classical_measures[0] == (circuit_simulate.statevec.psi[0][1].imag > 0) return circuit_simulate.classical_measures[0]