Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
33 changes: 15 additions & 18 deletions src/simulated_bifurcation/core/ising.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,13 +24,13 @@
from numpy import ndarray

from ..optimizer import SimulatedBifurcationEngine, SimulatedBifurcationOptimizer
from .utils import safe_get_device, safe_get_dtype
from .tensor_bearer import TensorBearer

# Workaround because `Self` type is only available in Python >= 3.11
SelfIsing = TypeVar("SelfIsing", bound="Ising")


class Ising(object):
class Ising(TensorBearer):
"""
Internal implementation of the Ising model.

Expand Down Expand Up @@ -92,13 +92,10 @@ def __init__(
dtype: Optional[torch.dtype] = None,
device: Optional[Union[str, torch.device]] = None,
) -> None:
self._dtype = safe_get_dtype(dtype)
self._device = safe_get_device(device)

if isinstance(J, ndarray):
J = torch.from_numpy(J)
if isinstance(h, ndarray):
h = torch.from_numpy(h)
super().__init__(dtype=dtype, device=device)
J = self._safe_get_tensor(J)
if h is not None:
h = self._safe_get_tensor(h)

if J.ndim != 2:
raise ValueError(
Expand All @@ -110,27 +107,25 @@ def __init__(
f"Expected J to be square, but got {rows} rows and {cols} columns."
)

self._J = J.to(dtype=self._dtype, device=self._device)
self._J = J
self._dimension = rows

if h is None:
self._h = torch.zeros(
self._dimension, dtype=self._dtype, device=self._device
)
self._h = torch.zeros(self._dimension, dtype=self.dtype, device=self.device)
elif h.shape != (self._dimension,):
raise ValueError(
f"Expected the shape of h to be {self._dimension}, but got {tuple(h.shape)}."
)
else:
self._h = h.to(dtype=self._dtype, device=self._device)
self._h = h

self._has_linear_term = not torch.equal(
self._h,
torch.zeros(self._dimension, dtype=self._dtype, device=self._device),
torch.zeros(self._dimension, dtype=self.dtype, device=self.device),
)

def __neg__(self) -> SelfIsing:
return self.__class__(-self._J, -self._h, self._dtype, self._device)
return self.__class__(-self._J, -self._h, self.dtype, self.device)

def as_simulated_bifurcation_tensor(self) -> torch.Tensor:
"""
Expand Down Expand Up @@ -179,8 +174,8 @@ def as_simulated_bifurcation_tensor(self) -> torch.Tensor:
if self._has_linear_term:
sb_tensor = torch.zeros(
(self._dimension + 1, self._dimension + 1),
dtype=self._dtype,
device=self._device,
dtype=self.dtype,
device=self.device,
)
sb_tensor[: self._dimension, : self._dimension] = symmetrical_J
sb_tensor[: self._dimension, self._dimension] = -self._h
Expand Down Expand Up @@ -351,6 +346,8 @@ def minimize(
verbose,
sampling_period,
convergence_threshold,
self.dtype,
self.device,
)
tensor = self.as_simulated_bifurcation_tensor()
spins = optimizer.run_integrator(tensor, early_stopping)
Expand Down
118 changes: 49 additions & 69 deletions src/simulated_bifurcation/core/quadratic_polynomial.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,20 +28,11 @@
from sympy import Poly

from .ising import Ising
from .utils import safe_get_device, safe_get_dtype
from .tensor_bearer import TensorBearer
from .variable import Variable

INTEGER_REGEX = re.compile("^int[1-9][0-9]*$")
DOMAIN_ERROR = ValueError(
f'Input type must be one of "spin" or "binary", or be a string starting'
f'with "int" and be followed by a positive integer.\n'
f"More formally, it should match the following regular expression.\n"
f"{INTEGER_REGEX}\n"
f'Examples: "int7", "int42", ...'
)


class QuadraticPolynomial(object):
class QuadraticPolynomial(TensorBearer):
"""
Internal implementation of a multivariate quadratic polynomial.

Expand Down Expand Up @@ -134,7 +125,7 @@ class QuadraticPolynomial(object):
Maximize this polynomial over {0, 1, ..., 14, 15} x {0, 1, ..., 14, 15}
(outputs are located on the GPU)

>>> best_vector, best_value = poly.maximize(domain="int4)
>>> best_vector, best_value = poly.maximize(domain="int4")
>>> best_vector
tensor([ 0., 15.], device='cuda:0')
>>> best_value
Expand All @@ -157,8 +148,7 @@ def __init__(
dtype: Optional[torch.dtype] = None,
device: Optional[Union[str, torch.device]] = None,
):
self._dtype = safe_get_dtype(dtype)
self._device = safe_get_device(device)
super().__init__(dtype=dtype, device=device)
self.sb_result = None

if len(polynomial_data) == 1 and isinstance(polynomial_data[0], Poly):
Expand All @@ -169,17 +159,17 @@ def __init__(
)
dimension = len(polynomial.gens)
self._quadratic_coefficients = torch.zeros(
dimension, dimension, dtype=self._dtype, device=self._device
dimension, dimension, dtype=self.dtype, device=self.device
)
self._linear_coefficients = torch.zeros(
dimension, dtype=self._dtype, device=self._device
dimension, dtype=self.dtype, device=self.device
)
self._bias = torch.tensor(0.0, dtype=self._dtype, device=self._device)
self._bias = torch.tensor(0.0, dtype=self.dtype, device=self.device)
for monom, coeff in polynomial.terms():
coeff = float(coeff)
if sum(monom) == 0:
self._bias = torch.tensor(
coeff, dtype=self._dtype, device=self._device
coeff, dtype=self.dtype, device=self.device
)
elif sum(monom) == 1:
self._linear_coefficients[monom.index(1)] = coeff
Expand All @@ -196,67 +186,58 @@ def __init__(
self._quadratic_coefficients = None
self._linear_coefficients = None
self._bias = None
for tensor_like in polynomial_data:
if isinstance(tensor_like, np.ndarray):
tensor_like = torch.from_numpy(tensor_like)
elif isinstance(tensor_like, (int, float)):
tensor_like = torch.tensor(
tensor_like, dtype=self._dtype, device=self._device
)
if isinstance(tensor_like, torch.Tensor):
if tensor_like.ndim == 0:
attribute_to_set = "_bias"
elif tensor_like.ndim == 1:
attribute_to_set = "_linear_coefficients"
elif tensor_like.ndim == 2:
attribute_to_set = "_quadratic_coefficients"
rows, cols = tensor_like.shape
if rows != cols:
raise ValueError(
"Provided quadratic coefficients tensor is not square."
)
else:
for polynomial_data_element in polynomial_data:
# noinspection PyTypeChecker
tensor_like = self._safe_get_tensor(polynomial_data_element)
if tensor_like.ndim == 0:
attribute_to_set = "_bias"
elif tensor_like.ndim == 1:
attribute_to_set = "_linear_coefficients"
elif tensor_like.ndim == 2:
attribute_to_set = "_quadratic_coefficients"
rows, cols = tensor_like.shape
if rows != cols:
raise ValueError(
f"Expected a tensor with at most 2 dimensions, got {tensor_like.ndim}."
)
if getattr(self, attribute_to_set) is not None:
raise ValueError(
f"Providing two tensors for the same degree is ambiguous. Got at least two tensors for degree {tensor_like.ndim}."
)
else:
if tensor_like.ndim > 0:
if dimension is None:
dimension = tensor_like.shape[0]
elif dimension != tensor_like.shape[0]:
raise ValueError(
f"Inconsistant shape among provided tensors. Expected {dimension} but got {tensor_like.shape[0]}."
)
setattr(
self,
attribute_to_set,
tensor_like.to(dtype=self._dtype, device=self._device),
"Provided quadratic coefficients tensor is not square."
)
else:
raise ValueError(
f"Unsupported coefficient tensor type: {type(tensor_like)}. Expected a torch.Tensor or a numpy.ndarray."
f"Expected a tensor with at most 2 dimensions, got {tensor_like.ndim}."
)
if getattr(self, attribute_to_set) is not None:
raise ValueError(
f"Providing two tensors for the same degree is ambiguous. Got at least two tensors for degree {tensor_like.ndim}."
)
else:
if tensor_like.ndim > 0:
if dimension is None:
dimension = tensor_like.shape[0]
elif dimension != tensor_like.shape[0]:
raise ValueError(
f"Inconsistent shape among provided tensors. Expected {dimension} but got {tensor_like.shape[0]}."
)
setattr(
self,
attribute_to_set,
tensor_like,
)
if self._quadratic_coefficients is None:
self._quadratic_coefficients = torch.zeros(
dimension, dimension, dtype=self._dtype, device=self._device
dimension, dimension, dtype=self.dtype, device=self.device
)
if self._linear_coefficients is None:
self._linear_coefficients = torch.zeros(
dimension, dtype=self._dtype, device=self._device
dimension, dtype=self.dtype, device=self.device
)
if self._bias is None:
self._bias = torch.tensor(0.0, dtype=self._dtype, device=self._device)
self._bias = torch.tensor(0.0, dtype=self.dtype, device=self.device)

self._dimension = self._quadratic_coefficients.shape[0]

def __call__(self, value: Union[torch.Tensor, np.ndarray]) -> torch.Tensor:
if not isinstance(value, torch.Tensor):
try:
value = torch.tensor(value, dtype=self._dtype, device=self._device)
value = torch.tensor(value, dtype=self.dtype, device=self.device)
except Exception as err:
raise TypeError("Input value cannot be cast to Tensor.") from err

Expand Down Expand Up @@ -368,12 +349,12 @@ def to_ising(self, domain: Union[str, List[str]]) -> Ising:
"""
variables = self.__get_variables(domain=domain)
spin_identity_vector = QuadraticPolynomial.__spin_identity_vector(
variables=variables, dtype=self._dtype, device=self._device
variables=variables, dtype=self.dtype, device=self.device
)
spin_weighted_integer_to_binary_matrix = (
spin_identity_vector + 1
) * QuadraticPolynomial.__integer_to_binary_matrix(
variables=variables, dtype=self._dtype, device=self._device
variables=variables, dtype=self.dtype, device=self.device
)
symmetric_quadratic_tensor = (
self._quadratic_coefficients + self._quadratic_coefficients.t()
Expand All @@ -396,7 +377,7 @@ def to_ising(self, domain: Union[str, List[str]]) -> Ising:
-1,
)
torch.diag(J)[...] = 0
return Ising(J, h, self._dtype, self._device)
return Ising(J, h, self.dtype, self.device)

def convert_spins(
self, optimized_spins: torch.Tensor, domain: Union[str, List[str]]
Expand Down Expand Up @@ -441,12 +422,12 @@ def convert_spins(
"""
variables = self.__get_variables(domain=domain)
spin_identity_vector = QuadraticPolynomial.__spin_identity_vector(
variables=variables, dtype=self._dtype, device=self._device
variables=variables, dtype=self.dtype, device=self.device
)
spin_weighted_integer_to_binary_matrix = (
spin_identity_vector + 1
) * QuadraticPolynomial.__integer_to_binary_matrix(
variables=variables, dtype=self._dtype, device=self._device
variables=variables, dtype=self.dtype, device=self.device
)
return (
None
Expand Down Expand Up @@ -581,9 +562,8 @@ def __optimize(
convergence_threshold=convergence_threshold,
timeout=timeout,
)
self.sb_result = self.convert_spins(optimized_spins, domain).to(
dtype=self._dtype, device=self._device
)
self.sb_result = self._cast_tensor(self.convert_spins(optimized_spins, domain))

result = self.sb_result.t()
evaluation = self(result)
if best_only:
Expand Down
57 changes: 57 additions & 0 deletions src/simulated_bifurcation/core/tensor_bearer.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
from typing import Optional, Union

import numpy as np
import torch


class TensorBearer:
"""
Utility abstract class to use as a parent class for objects relying on tensors.
"""

def __init__(
self,
dtype: Optional[torch.dtype] = None,
device: Optional[Union[str, torch.device]] = None,
):
self.__dtype = self.__safe_get_dtype(dtype)
self.__device = self.__safe_get_device(device)

@property
def dtype(self) -> torch.dtype:
return self.__dtype

@property
def device(self) -> torch.device:
return self.__device

@staticmethod
def __safe_get_dtype(dtype: Optional[torch.dtype]) -> torch.dtype:
if dtype is None:
return torch.float32
elif dtype == torch.float32 or dtype == torch.float64:
return dtype
raise ValueError(
"The Simulated Bifurcation algorithm can only run with a torch.float32 or a torch.float64 dtype."
)

@staticmethod
def __safe_get_device(device: Optional[Union[str, torch.device]]) -> torch.device:
return torch.get_default_device() if device is None else torch.device(device)

def _safe_get_tensor(
self, data: Union[torch.Tensor, np.ndarray, int, float]
) -> torch.Tensor:
if isinstance(data, torch.Tensor):
return self._cast_tensor(data)
elif isinstance(data, np.ndarray):
return self._cast_tensor(torch.from_numpy(data))
elif isinstance(data, (int, float)):
return self._cast_tensor(torch.tensor(data))
else:
raise TypeError(
"Tensors can only be interpreted from NumPy arrays or int/float values."
)

def _cast_tensor(self, tensor: torch.Tensor) -> torch.Tensor:
return tensor.to(dtype=self.dtype, device=self.device)
17 changes: 0 additions & 17 deletions src/simulated_bifurcation/core/utils.py

This file was deleted.

Loading