"""This module contains the ``PipelineSolver`` class."""
from __future__ import annotations
import time
from collections.abc import Mapping
from datetime import timedelta
from typing import Any
from tno.quantum.optimization.qubo.components import (
QUBO,
PostprocessorConfig,
PreprocessorConfig,
ResultInterface,
Solver,
SolverConfig,
)
from tno.quantum.optimization.qubo.solvers._pipeline._pipeline_result import (
PipelineResult,
)
from tno.quantum.optimization.qubo.solvers._pipeline._preprocess_result import (
PreprocessResult,
)
[docs]class PipelineSolver(Solver[PipelineResult]):
"""Solver class that represents a pipeline of a solver with pre- and postprocessors.
A pipeline solver solves a QUBO problem in three stages:
1. Preprocess the QUBO using the provided preprocessors.
2. Solve the preprocessed QUBO using the provided main solver.
3. Postprocess the obtained results using the provided postprocessors.
To make use of a pipeline, create a :py:class:`PipelineSolver` and provide it the
desired main solver, preprocessing and postprocessing. Then, a
:py:class:`PipelineSolver` can be treated like any other
:py:class:`~tno.quantum.optimization.qubo.components.Solver`, as shown in the
following example.
Example:
>>> from tno.quantum.optimization.qubo.solvers import PipelineSolver
>>> from tno.quantum.optimization.qubo.components import QUBO
>>>
>>> qubo = QUBO([
... [ 7, 6, -2, 9, 5],
... [ 8, -5, 9, -7, -5],
... [ 7, 9, -6, -3, -4],
... [ 7, -8, 1, 8, 6],
... [ 1, 1, 7, 8, -2]
... ])
>>>
>>> # Construct PipelineSolver from configuration objects
>>> solver_config = { "name": "bf_solver" }
>>> preprocess_config = { "name": "q_pro_plus_preprocessor" }
>>> postprocess_config = { "name": "steepest_descent_postprocessor" }
>>>
>>> pipeline = PipelineSolver(
... solver_config,
... preprocess = [ preprocess_config ],
... postprocess = [ postprocess_config ]
... ) # doctest: +SKIP
>>>
>>> # Solve QUBO using pipline like any other solver
>>> result = pipeline.solve(qubo) # doctest: +SKIP
>>> result.best_bitvector # doctest: +SKIP
BitVector(01010)
The `result` is a :py:class:`PipelineResult`. The the intermediate results can be
accessed as follows:
>>> # The result of the preprocessor
>>> result.preprocess_results[0] # doctest: +SKIP
<...>
>>> # The result of the main solver
>>> result.solver_result # doctest: +SKIP
<...>
>>> # The result of the postprocessor
>>> result.postprocess_results[0] # doctest: +SKIP
<...>
"""
[docs] def __init__(
self,
solver_config: SolverConfig | Mapping[str, Any],
*,
preprocess: (
PreprocessorConfig
| Mapping[str, Any]
| list[PreprocessorConfig | Mapping[str, Any]]
| None
) = None,
postprocess: (
PostprocessorConfig
| Mapping[str, Any]
| list[PostprocessorConfig | Mapping[str, Any]]
| None
) = None,
) -> None:
"""Init :py:class:`PipelineSolver`.
Args:
solver_config: Configuration of the main solver.
preprocess: Configuration(s) of the preprocessors.
postprocess: Configuration(s) of the postprocessors.
"""
# Instantiate solver
solver_config = SolverConfig.from_mapping(solver_config)
self._solver = solver_config.get_instance()
# Instantiate preprocessors
if isinstance(preprocess, Mapping):
preprocess = [preprocess]
if preprocess is None:
preprocess = []
self._preprocessors = [
PreprocessorConfig.from_mapping(config).get_instance()
for config in preprocess
]
# Instantiate postprocessors
if isinstance(postprocess, Mapping):
postprocess = [postprocess]
if postprocess is None:
postprocess = []
self._postprocessors = [
PostprocessorConfig.from_mapping(config).get_instance()
for config in postprocess
]
def _solve(self, qubo: QUBO) -> PipelineResult:
"""Solve QUBO using a pipeline of a solver with pre- and postprocessors.
Args:
qubo: QUBO to solve.
Returns:
A ``PipelineResult`` instance.
"""
# Apply preprocessing steps in order, storing the intermediate
# partial solutions and storing the execution time for the preprocess results
preprocess_partial_solutions = []
preprocess_execution_times = []
for preprocessor in self._preprocessors:
start_time = time.perf_counter()
partial_solution, qubo = preprocessor.preprocess(qubo)
execution_time = time.perf_counter() - start_time
preprocess_partial_solutions.append(partial_solution)
preprocess_execution_times.append(execution_time)
# Solve preprocessed QUBO
solver_result = self._solver.solve(qubo)
result: ResultInterface = solver_result
# Apply postprocessing steps
postprocess_results: list[ResultInterface] = []
for postprocessor in self._postprocessors:
result = postprocessor.postprocess(qubo, result)
postprocess_results.append(result)
# Construct `PreprocessResult`s (they are created in reverse order)
preprocess_results: list[PreprocessResult] = []
for partial_solution, execution_time in zip(
reversed(preprocess_partial_solutions), reversed(preprocess_execution_times)
):
result = PreprocessResult.from_result(partial_solution, result)
result.execution_time = timedelta(seconds=execution_time)
preprocess_results.append(result)
preprocess_results.reverse()
# Obtain `best_bitvector`, `best_value` and `freq` from the final result
best_bitvector = result.best_bitvector
best_value = result.best_value
freq = result.freq
# Construct and return PipelineResult
return PipelineResult.from_result(
best_bitvector,
best_value,
freq,
solver_result=solver_result,
preprocess_results=preprocess_results,
postprocess_results=postprocess_results,
)