"""This module contains the ``QProPlusPreprocessor`` class."""
from __future__ import annotations
import logging
from typing import Literal
import numpy as np
from tno.quantum.optimization.qubo.components import QUBO, PartialSolution, Preprocessor
[docs]class QProPlusPreprocessor(Preprocessor):
"""Preprocessor class that partially solves QUBOs using the QPro+ algorithm.
The :py:class:`QProPlusPreprocessor` is a preprocessor that applies the `QPro+
algorithm` to reduce the size of a QUBO, while maintaining an optimal solution, as
described in: `'Logical and inequality implications for reducing the size and
difficulty of QUBO problems'` by Fred Glover, Mark Lewis, Gary Kochenberger
(https://www.sciencedirect.com/science/article/pii/S0377221717307567).
.. note::
In our implementation, the reduction rules are applied with the aim to
minimize the QUBO objective, while the convention of the paper is to maximize
the QUBO objective.
Example:
>>> from tno.quantum.optimization.qubo.components import QUBO
>>> from tno.quantum.optimization.qubo.preprocessors import QProPlusPreprocessor
>>>
>>> # Create an example QUBO
>>> qubo = QUBO([
... [ 7, 6, -2, 9, 5],
... [ 8, -5, 9, -7, -5],
... [ 7, 9, -6, -3, -4],
... [ 7, -8, 1, 8, 6],
... [ 1, 1, 7, 8, -2]
... ])
>>>
>>> # Preprocess QUBO using QProPlusPreprocessor
>>> preprocessor = QProPlusPreprocessor(max_iter=10)
>>> partial_solution, qubo_reduced = preprocessor.preprocess(qubo)
>>> partial_solution
PartialSolution {x_0 = 0, x_1 = 1 - x_2}
>>> qubo_reduced
QUBO of dim: 3x3, with 9 non zero elements.
>>>
>>> # Once the solution of the reduced QUBO is found, it can be expanded to the
>>> # solution of the original QUBO.
>>> solution_reduced = [0, 1, 0]
>>> solution = partial_solution.expand(solution_reduced)
>>> solution
BitVector(01010)
Attributes:
is_converged: If ``True``, indicates that no more rules can be applied.
"""
[docs] def __init__(self, *, max_iter: int | None = None, verbose: bool = False) -> None:
"""Init :py:class:`QProPlusPreprocessor`.
Args:
max_iter: Maximum number of iterations of applying the rules.
One iteration reduces the size of the QUBO by at most one. If not
provided, it will be set to the size of the QUBO.
verbose:
verbose: If ``True``, the rules which are being applied are printed.
"""
# Flag to keep track of whether the algorithm has converged
self.is_converged = False
# Flag to keep
self.verbose = verbose
# Since every (effective) iteration removes one variable, no more than qubo.size
# iterations are required
self._max_iter = max_iter
# Create logger
self._logger = logging.getLogger("QProPlus")
self._logger.setLevel(logging.INFO)
handler = logging.StreamHandler()
handler.setFormatter(logging.Formatter("%(name)s: %(message)s"))
self._logger.addHandler(handler)
[docs] def preprocess(self, qubo: QUBO) -> tuple[PartialSolution, QUBO]:
"""Performs the reduction algorithm for a maximum number of iterations.
Args:
qubo: QUBO to be preprocessed.
Returns:
Partial solution and corresponding preprocessed QUBO.
"""
# Get matrix and offset from QUBO, adding a minus sign to switch max/min
self._Q = -(qubo.matrix + qubo.matrix.T) / 2
self._c0 = -qubo.offset
# Store values for variables which are solved for in a PartialSolution
# Keep track of the (indices of the) free variables in a list
self._partial_solution = PartialSolution(qubo.size)
self._free_variables = list(range(qubo.size))
# Compute D_i^+ and D_i^-
self._compute_D_positive_negative()
# Flag to keep track of whether the algorithm has converged
self.is_converged = False
# Since every (effective) iteration removes one variable, no more than qubo.size
# iterations are required
max_iter = self._max_iter or qubo.size
for _it in range(max_iter):
if self.is_converged:
break
if self.verbose:
self._logger.info("Iteration %d:", _it)
self._logger.info(
" Remaining variables: %s", str(self._free_variables)
)
self._do_iteration()
# Construct QUBO object from the resulting Q and c0.
# Again, minus signs are added to switch max/min
qubo = QUBO(-self._Q, -self._c0)
return self._partial_solution, qubo
def _do_iteration(self) -> None:
"""Perform a single iteration of the algorithm.
A single iteration can remove at most one variable.
"""
# i: index of variable w.r.t. self.Q
# i_orig: index of variable in terms of the original QUBO
for i, i_orig in enumerate(self._free_variables):
if self.verbose:
self._logger.info(" Considering variable %d", i_orig)
# Apply rule 1.0 to x_i
if self._check_rule_1(i): # x_i = 1
self._apply_rule_1(i)
return
# Apply rule 2.0 to x_i
if self._check_rule_2(i): # x_i = 0
self._apply_rule_2(i)
return
# Apply rules to x_i, x_h
for h, h_orig in enumerate(self._free_variables[:i]):
if self.verbose:
self._logger.info(
" Considering variables %i and %d", i_orig, h_orig
)
# Apply rules 3.1 to 3.4
outcome_rule_3 = self._check_rule_3(i, h)
if outcome_rule_3:
[to_set_i, to_set_h] = outcome_rule_3
self._apply_rule_3(i, h, to_set_i, to_set_h)
return
# Apply rules 2.5 and 2.6
outcome_rule_2 = self._check_rule_2_extended(i, h)
if outcome_rule_2 == 1: # 2.5: x_i + x_h = 1
self._apply_rule_2_5(i, h)
return
if outcome_rule_2 == 2: # 2.6: x_i = x_h
self._apply_rule_2_6(i, h)
return
self.is_converged = True
def _compute_D_positive_negative(self) -> None: # noqa: N802
"""Computes D_i^+ and D_i^-."""
# NOTE: This method can possibly be optimized
off_diagonal = self._Q.copy()
np.fill_diagonal(off_diagonal, 0) # Set diagonal elements to 0
positive_off_diagonal_elements = np.where(off_diagonal > 0, off_diagonal, 0)
negative_off_diagonal_elements = np.where(off_diagonal < 0, off_diagonal, 0)
self.D_plus = np.sum(positive_off_diagonal_elements, axis=0) + np.sum(
positive_off_diagonal_elements, axis=1
)
self.D_minus = np.sum(negative_off_diagonal_elements, axis=0) + np.sum(
negative_off_diagonal_elements, axis=1
)
def _update_set_variable_value(self, i: int, value: int) -> None:
r"""Update to solve for x_i := value, removing the variable x_i.
c_j := c_j + d_ij for all j ∈ N \ {i}
c_0 := c_0 + c_i
N := N \ {i}
"""
d_i = self._Q[i] + self._Q[:, i]
if value == 1:
self._c0 += self._Q[i, i]
# add d_i to the diagonal
np.fill_diagonal(self._Q, self._Q.diagonal() + d_i)
positive_idx = np.where(d_i > 0)
negative_idx = np.where(d_i < 0)
self.D_plus[positive_idx] -= d_i[positive_idx]
self.D_minus[negative_idx] -= d_i[negative_idx]
self.D_minus = np.delete(self.D_minus, i)
self.D_plus = np.delete(self.D_plus, i)
self._Q = np.delete(np.delete(self._Q, i, 0), i, 1)
# Remove variable i and update partial solution
i_orig = self._free_variables.pop(i) # N := N \ {i}
self._partial_solution.assign_value(i_orig, value)
def _update_set_variable_equal(self, i: int, h: int) -> None:
r"""Update to solve for x_h := x_i, removing the variable x_h.
c_i := c_i + c_h + d_ih
d_ij := d_ij + d_hj for all j ∈ N \ {i,h}
Q_ij := Q_ij + Q_hj
Q_ji := Q_ji + Q_jh
d_ij := Q_ij + Q_ji = Q_ij + Q_ji + Q_hj + Q_jh = d_ij - d_hj
N := N \ {h}
"""
self._Q[i] += self._Q[h]
self._Q[:, i] += self._Q[:, h]
self._Q = np.delete(np.delete(self._Q, h, 0), h, 1)
# Recalculate D_i^+ and D_i^-
self._compute_D_positive_negative()
# Remove variable h and update partial solution
i_orig = self._free_variables[i]
h_orig = self._free_variables.pop(h) # N := N \ {h}
self._partial_solution.assign_variable(h_orig, i_orig, conj=False)
def _update_set_variable_unequal(self, i: int, h: int) -> None:
r"""Update to solve for x_h := 1 - x_i, removing the variable x_h.
c_i := c_i - c_h
c_0 := c_0 + c_h
c_j := c_j + d_jh for all j ∈ N \ {i, h}
d_ij := d_ij - d_hj for all j ∈ N \ {i, h}
Q_ij := Q_ij - Q_hj
Q_ji := Q_ji - Q_jh
d_ij := Q_ij + Q_ji = Q_ij + Q_ji - Q_hj - Q_jh = d_ij - d_hj
N := N \ {h}
"""
self._c0 += self._Q[h, h]
self._Q[i] -= self._Q[h]
self._Q[:, i] -= self._Q[:, h]
np.fill_diagonal(self._Q, self._Q.diagonal() + self._Q[h] + self._Q[:, h])
self._Q = np.delete(np.delete(self._Q, h, 0), h, 1)
# Recalculate D_i^+ and D_i^-
self._compute_D_positive_negative()
# Remove variable h and update partial solution
i_orig = self._free_variables[i]
h_orig = self._free_variables.pop(h) # N := N \ {h}
self._partial_solution.assign_variable(h_orig, i_orig, conj=True)
def _check_rule_1(self, i: int) -> bool:
"""Check the conditions for rule 1.0.
Rule 1.0:
If c_i + D_i^- >= 0, then x_i = 1 is optimal.
"""
c_i: float = self._Q[i, i]
D_i_negative: float = self.D_minus[i] # noqa: N806
return c_i + D_i_negative >= 0
def _check_rule_2(self, i: int) -> bool:
"""Check the conditions for rule 2.0.
Rule 2.0:
If c_i + D_i^+ <= 0, then x_i = 0 is optimal.
"""
c_i: float = self._Q[i, i]
D_i_positive: float = self.D_plus[i] # noqa: N806
return c_i + D_i_positive <= 0
def _check_rule_3(self, i: int, h: int) -> Literal[False] | tuple[int, int]:
"""Check the conditions for rules 3.1-3.4.
Rule 3.1 (d_{ih} >= 0):
If c_i + c_h - d_{ih} + D_i^+ + D_h^+ <= 0, then x_i = x_h = 0 in an
optimal solution.
Rule 3.2 (d_{ih} < 0):
If -c_i + c_h + d_{ih} - D_i^- + D_h^+ <= 0, then x_i = 1 and x_h=0 in an
optimal solution.
Rule 3.3 (d_{ih} < 0):
If c_i - c_h + d_{ih} + D_i^+ - D_h^- <= 0, then x_i = 0 and x_h = 1 in an
optimal solution.
Rule 3.4 (d_{ih} >= 0):
If -c_i - c_h - d_{ih} - D_i^- - D_h^- >= 0, then x_i = x_h = 1 in an
optimal solution.
"""
c_i, c_h = self._Q[i, i], self._Q[h, h]
D_i_positive, D_h_positive = self.D_plus[i], self.D_plus[h] # noqa: N806
D_i_negative, D_h_negative = self.D_minus[i], self.D_minus[h] # noqa: N806
d_ih = self._Q[i, h] + self._Q[h, i]
# Rule 3.1
if d_ih >= 0 and c_i + c_h - d_ih + D_i_positive + D_h_positive <= 0:
i, h = 0, 0
return i, h
# Rule 3.2
if d_ih < 0 and -c_i + c_h + d_ih - D_i_negative + D_h_positive <= 0:
i, h = 1, 0
return i, h
# Rule 3.3
if d_ih < 0 and c_i - c_h + d_ih + D_i_positive - D_h_negative <= 0:
i, h = 0, 1
return i, h
# Rule 3.4
if d_ih >= 0 and -c_i - c_h - d_ih - D_i_negative - D_h_negative <= 0:
i, h = 1, 1
return i, h
return False
def _check_rule_2_extended(self, i: int, h: int) -> Literal[False] | int:
"""Check the conditions for rules 2.5 and 2.6.
Rule 2.5 (d_{ih} < 0):
If c_i - d_{ih} + D_i^- >= 0 and c_i + d_{ih} + D_i^+ <= 0 (or swap i, h),
then x_i + x_h = 1 in an optimal solution.
Rule 2.6 (d_{ih} > 0):
If c_i - d_{ih} + D_i^+ <= 0 and c_i + d_{ih} + D_i^- >= 0 (or swap i, h),
then x_i = x_h in an optimal solution.
"""
c_i, c_h = self._Q[i, i], self._Q[h, h]
D_i_positive, D_h_positive = self.D_plus[i], self.D_plus[h] # noqa: N806
D_i_negative, D_h_negative = self.D_minus[i], self.D_minus[h] # noqa: N806
d_ih = self._Q[i, h] + self._Q[h, i]
if d_ih < 0:
a1 = c_i - d_ih + D_i_negative >= 0
a2 = c_h - d_ih + D_h_negative >= 0
b1 = c_i + d_ih + D_i_positive <= 0
b2 = c_h + d_ih + D_h_positive <= 0
if (a1 or a2) and (b1 or b2):
# Optimal solution has x_i + x_h = 1
return 1
elif d_ih > 0:
c1 = c_i - d_ih + D_i_positive <= 0
c2 = c_h + d_ih + D_h_negative >= 0
d1 = c_i + d_ih + D_i_negative >= 0
d2 = c_h - d_ih + D_h_positive <= 0
if (c1 or c2) and (d1 or d2):
# Optimal solution has x_i = x_h
return 2
return False
def _apply_rule_1(self, i: int) -> None:
"""Apply rule 1."""
if self.verbose:
i_orig = self._free_variables[i]
msg = f" Rule 1.0 applies for variable {i_orig} (current index {i}) as "
msg += f"{self._Q[i, i] + self.D_minus[i]} >= 0"
self._logger.info(msg)
self._update_set_variable_value(i, 1)
def _apply_rule_2(self, i: int) -> None:
"""Apply rule 2."""
if self.verbose:
i_orig = self._free_variables[i]
msg = f" Rule 2.0 applies for variable {i_orig} (current index {i}) as "
msg += f"{self._Q[i, i] + self.D_plus[i]} <= 0"
self._logger.info(msg)
self._update_set_variable_value(i, 0)
def _apply_rule_3(self, i: int, h: int, to_set_i: int, to_set_h: int) -> None:
"""Apply rule 3."""
h_orig = self._free_variables[h]
if self.verbose:
rule_idx = 1 + to_set_i + 2 * to_set_h
i_orig = self._free_variables[i]
msg = f" Rule 3.{rule_idx} applies for variables {i_orig} and {h_orig}:"
msg += f" set x_{i_orig} = {to_set_i} and x_{h_orig} = {to_set_h}"
self._logger.info(msg)
self._update_set_variable_value(i, to_set_i)
# Note: index h may have changed by deleting variable i
h = self._free_variables.index(h_orig)
self._update_set_variable_value(h, to_set_h)
def _apply_rule_2_5(self, i: int, h: int) -> None:
"""Apply rule 2.5."""
i_orig = self._free_variables[i]
h_orig = self._free_variables[h]
if self.verbose:
self._logger.info(
" Rule 2.5 applies for variables %d and %d", i_orig, h_orig
)
self._update_set_variable_unequal(i, h)
def _apply_rule_2_6(self, i: int, h: int) -> None:
"""Apply rule 2.6."""
i_orig = self._free_variables[i]
h_orig = self._free_variables[h]
if self.verbose:
self._logger.info(
" Rule 2.6 applies for variables %d and %d", i_orig, h_orig
)
self._update_set_variable_equal(i, h)
def reduce_qpro_plus(
qubo: QUBO, max_iter: int | None = None, *, verbose: bool = False
) -> tuple[PartialSolution, QUBO]:
"""Reduce dimensionality of QUBO, using the ``QProPlusPreprocessor``.
A wrapper function that constructs a ``QProPlusPreprocessor`` and uses it to reduce
the QUBO.
Args:
qubo: The QUBO to be reduced.
max_iter: Maximum number of iterations of applying the rules.
One iteration reduces the size of the QUBO by at most one. If not provided,
it will be set to the size of the QUBO.
verbose: If set to True, the rules which are being applied are logged (debug
level).
Returns:
A partial solution object and the corresponding lower-dimensional QUBO.
"""
if max_iter is None:
max_iter = qubo.size
preprocessor = QProPlusPreprocessor(max_iter=max_iter, verbose=verbose)
return preprocessor.preprocess(qubo)