Source code for portfolio_optimization.components.postprocess

"""This module implements required post processing steps."""
from __future__ import annotations

from collections.abc import Mapping

import numpy as np
from dimod import SampleSet
from numpy.typing import ArrayLike, NDArray
from scipy.spatial import ConvexHull  # pylint: disable=no-name-in-module

from tno.quantum.problems.portfolio_optimization.components.io import PortfolioData


[docs]class Decoder: """``Decoder`` class for decoding samples and samplesets."""
[docs] def __init__( self, portfolio_data: PortfolioData, k: int, ) -> None: """ Init for the ``Decoder`` Class. Args: portfolio_data: A ``PortfolioData`` object containing the portfolio to optimize. k: The number of bits that are used to represent the outstanding amount for each asset. A fixed point representation is used to represent `$2^k$` different equidistant values in the range `$[LB_i, UB_i]$` for asset i. """ self.number_of_assets = len(portfolio_data) self.k = k self.mantissa = np.power(2, np.arange(self.k)) self.l_bound = portfolio_data.get_l_bound() self.u_bound = portfolio_data.get_u_bound() self.multiplier = (self.u_bound - self.l_bound) / (2**self.k - 1)
[docs] def decode_sample(self, sample: Mapping[int, int]) -> NDArray[np.float_]: """Decodes a sample to the `oustanding_future` array. Args: sample: Sample as returned by D-Wave. Returns: Array containing all `outstanding future` values. """ sample_array = np.array( [sample[i] for i in range(self.number_of_assets * self.k)], dtype=np.uint8 ) sample_reshaped = sample_array.reshape((self.number_of_assets, self.k)) ints = np.sum(sample_reshaped * self.mantissa, axis=1) outstanding_future = self.l_bound + self.multiplier * ints if (self.l_bound > outstanding_future).any() or ( self.u_bound < outstanding_future ).any(): raise ValueError("Bounds not obeyed.") return np.asarray(outstanding_future, dtype=np.float_)
[docs] def decode_sampleset(self, sampleset: SampleSet) -> NDArray[np.float_]: """Efficiently decodes a `sampleset` create a matrix of `oustanding_future` values. Each row in the matrix corresponds to a different sample in the `sampleset`. Args: sampleset: ``SampleSet`` as returned by D-Wave. Returns: Matrix containing all `outstanding future` values. """ samples_matrix = sampleset.record.sample[:, : self.number_of_assets * self.k] samples_reshaped = samples_matrix.reshape( (len(sampleset), self.number_of_assets, self.k) ) ints = np.sum(samples_reshaped * self.mantissa, axis=2) outstanding_future = self.l_bound + self.multiplier * ints if (self.l_bound > outstanding_future).any() or ( self.u_bound < outstanding_future ).any(): raise ValueError("Bounds not obeyed.") return np.asarray(outstanding_future, dtype=np.float_)
[docs]def pareto_front( xvals: ArrayLike, yvals: ArrayLike, min_points: int = 50, upper_right_quadrant: bool = True, ) -> tuple[NDArray[np.float_], NDArray[np.float_]]: """Calculates the pareto front with at least min_points data points by repeatedly creating a convex hull around data points. Args: xvals: x-values of data points yvals: y-values of data points min_points: minimum number of points to be selected upper_right_quadrant: If ``True``, only show the upper right quadrant of the pareto front. Returns: x, y values of the points that are on the pareto front """ points = np.vstack((xvals, yvals)).T points = np.unique(points, axis=0) if len(points) < 3: return points.T[0], points.T[1] hull = ConvexHull(points) pareto_points = points[hull.vertices] # Expand the pareto front so that it contains at least min_points. for _ in range(min_points): if len(pareto_points) >= min_points: break # Remove current hull vertices from data and create a new hull points = np.delete(points, hull.vertices, axis=0) if len(points) < 3: break hull = ConvexHull(points) # Add the new hull vertices to the pareto front new_points = points[hull.vertices] pareto_points = np.vstack((pareto_points, new_points)) if upper_right_quadrant: pareto_points = _get_upper_quadrant(pareto_points) if upper_right_quadrant: pareto_points = _get_upper_quadrant(pareto_points) return pareto_points.T[0], pareto_points.T[1]
def _get_upper_quadrant(points: NDArray[np.float_]) -> NDArray[np.float_]: """Removes all values that are not in the upper right quadrant of the pareto front.""" x_values = points.T[0] y_values = points.T[1] x_bound = x_values[np.argmax(y_values)] y_bound = y_values[np.argmax(x_values)] mask = (x_values >= x_bound) & (y_values >= y_bound) x_values = x_values[mask] y_values = y_values[mask] return np.vstack((x_values, y_values)).T