Source code for tno.quantum.problems.mot.re_id

"""This module implements a re-id feature for tracking in ultralytics 8."""

import copy
from collections.abc import Mapping, Sequence
from time import perf_counter
from typing import Any, cast

import numpy as np
from numpy.typing import NDArray
from tno.quantum.optimization.qubo.components import SolverConfig

from tno.quantum.problems.mot import DetectionSequence, calculate_cost, compute_iou
from tno.quantum.problems.mot._bqm import BQMResult, BQMSolver
from tno.quantum.problems.mot._utils import (
    boundary_constraints,
    capacity_constraints,
    flow_constraints,
    solve_binary_program,
)


[docs] class FlowReID: """Class to initialize the flow re-identification object."""
[docs] def __init__(self) -> None: """Initialize FlowReID object.""" self.result: None | BQMResult = None
def _solve_network_flow_reid_milp( self, det_sequence: DetectionSequence ) -> list[NDArray[np.float64]]: """Constructs and solves network flow optimization for re-id. Args: det_sequence: the sequence of detections (FrameDets) for each frame. Returns: The solution to the optimization problem. """ start_algorithm = perf_counter() T = len(det_sequence) # noqa: N806 nd = [d.size for d in det_sequence] # Assumption: T>2, i.e., there are intermediate nodes if T < 3: msg = f"The length of the detection sequence is {T} but must be >2." raise ValueError(msg) # compute cost function c = calculate_cost(det_sequence) # capacity constraints intermediate nodes a1, b_l1, b_u1 = capacity_constraints(nd) # force flow on t=0 and t=T-1 a2, b_l2, b_u2 = boundary_constraints(nd) # flow constraints a3, b_l3, b_u3 = flow_constraints(nd) # milp A = np.concatenate((a1, a2, a3)) # noqa: N806 b_l = np.concatenate((b_l1, b_l2, b_l3)) b_u = np.concatenate((b_u1, b_u2, b_u3)) x = np.array(solve_binary_program(A, b_l, b_u, c), dtype=np.float64) # post-process solution x_tensor = [] pt = 0 for t in range(T - 1): sub_x = x[pt : pt + nd[t] * nd[t + 1]] sub_x = sub_x.reshape((nd[t], nd[t + 1])) x_tensor.append(sub_x) pt += nd[t] * nd[t + 1] exec_time = perf_counter() - start_algorithm self.result = BQMResult( exec_time=exec_time, cost=float(np.dot(c, x)), solution=x_tensor, converged=True, ) return x_tensor def _solve_network_flow_reid_qubo( self, det_sequence: DetectionSequence, solver_config: SolverConfig | Mapping[str, Any] | None = None, ) -> list[NDArray[np.float64]]: bqm = BQMSolver(det_sequence) self.result = bqm.solve(solver_config) if self.result.solution is None: msg = "No BQM supplied" raise ValueError(msg) return self.result.solution def _transform_tensor_to_det_sequence( self, x_tensor: list[NDArray[np.float64]], det_sequence: DetectionSequence ) -> DetectionSequence: """Transforms output of the network flow optimization into a DetectionSequence. Args: x_tensor: The solution tensor from the network flow optimization. det_sequence: The sequence of detections (FrameDets) used in the optimization. Returns: The updated sequence of detections. Raises: ValueError: If x_tensor or det_sequence is empty. """ if len(x_tensor) == 0 or len(det_sequence) == 0: msg = "x_tensor and det_sequence must not be empty." raise ValueError(msg) # assume the first frameDet is correct updated_det_sequence = DetectionSequence([det_sequence[0]]) # iterate through the frames to update ids using the transition matrices for frame, transition_matrix in enumerate(x_tensor): updated_det_sequence.append(copy.deepcopy(det_sequence[frame + 1])) updated_det_sequence[frame + 1].ids = np.dot( transition_matrix.T, updated_det_sequence[frame].ids ) return updated_det_sequence def _sequence_checker(self, nd: list[int]) -> bool: """Check if a detection sequence is suitable for re-id. Based on the number of detections, the checks are: 1. The length of the sequence is between 1 and 200. 2. The number of detections changes at some point. 3. The number of detections does not go below the initial one. Args: nd: list of number of detections per frame Returns: True if the sequence is suitable. False otherwise. """ # Check 1 if len(nd) < 1 or len(nd) > 200: return False # Check 2 unique = list(set(nd)) if len(unique) == 1: return False # Check 3 return min(unique) >= nd[0]
[docs] def pick_sequences( self, det_sequence: DetectionSequence, min_len: int = 50, max_len: int = 80 ) -> list[tuple[int, int]]: """Picks frame sequences to run re-id on. Sequences containing new id's are selected for the re-id module. The sequences are checked for compatibility with the re-id module. Args: det_sequence: detection sequence min_len: minimum length of picked sequences max_len: maximum length of picked sequences Returns: List with start and end frames for the selected sequences. """ sequences = [] interesting_frames = [] known_ids = list(det_sequence[0].ids) # find frames where new ids appear and they intersect another detection for t, frame in enumerate(det_sequence): for _id, box1 in zip(frame.ids, frame.xyxy, strict=True): if _id not in known_ids: iou = np.array( [ compute_iou( cast("Sequence[float]", box1), cast("Sequence[float]", box2), ) for box2 in frame.xyxy ] ) matches = np.where(iou > 0.0)[0] if len(matches) > 1: interesting_frames.append(t) known_ids.append(_id) # find sequences around interesting frames nd = [d.size for d in det_sequence] for f in interesting_frames: for m in range(min_len, max_len + 1, 5): trial_sequence = nd[f - 2 : f - 2 + m] if self._sequence_checker(trial_sequence): sequences.append((f - 2, f - 2 + m)) break return sequences
[docs] def run_classic(self, det_sequence: DetectionSequence) -> DetectionSequence: r"""Runs re-id using a classic MILP solver from SciPy. The classic approach solves the following *network flow model*: - *Capacity constraint*: At most one unit of flow per edge. - *Flow conservation*: Incoming flow equals outgoing flow for intermediate nodes. - *Boundary condition*: One unit of flow originates from each initial node. This approach uses SciPy's MILP solver :func:`~scipy.optimize.milp`. The decision variables are: 1. :math:`x_{ijt} \in \{0, 1\}` representing whether detection :math:`i` in frame :math:`t` is associated with detection :math:`j` in frame :math:`t+1`. - :math:`x_{ijt} = 1` Detection :math:`i` at time :math:`t` and detection :math:`j` at time :math:`t+1` belong to the same object. - :math:`x_{ijt} = 0` No association. 2. :math:`y_{ij} \in \{0, 1\}` representing additional edges introduced when the number of detections in intermediate frames drops below the initial frame count. - :math:`y_{ij} = 1` Detection :math:`i` in frame :math:`f_1` is linked to detection :math:`j` in frame :math:`f_2` to maintain feasibility. - :math:`y_{ij} = 0` No link between these detections. When the number of detections drops below the initial layer, additional edges are introduced between frames :math:`f_1` and :math:`f_2` to maintain feasibility: .. math:: \begin{split} & \sum_{j=1}^{nd_{f_1+1}} x_{ijf_1} + \sum_{j=1}^{nd_{f_2}} y_{ij} \leq 1, \quad i = 1, \dots, nd_{f_1} \\ & \sum_{j=1}^{nd_{f_1-1}} x_{jif_1-1} = \sum_{j=1}^{nd_{f_1+1}} x_{ijf_1} + \sum_{j=1}^{nd_{f_2}} y_{ij}, \quad i = 1, \dots, nd_{f_1} \\ & \sum_{j=1}^{nd_{f_2-1}} x_{jif_2-1} + \sum_{j=1}^{nd_{f_1}} y_{ji} = \sum_{j=1}^{nd_{f_2+1}} x_{ijf_2}, \quad i = 1, \dots, nd_{f_2}. \end{split} Args: det_sequence: detection sequence to re-id. Returns: The detection sequence after re-id. """ # noqa: E501 x_tensor = self._solve_network_flow_reid_milp(det_sequence) if x_tensor: return self._transform_tensor_to_det_sequence(x_tensor, det_sequence) return det_sequence
[docs] def run_qubo( self, det_sequence: DetectionSequence, solver_config: SolverConfig | Mapping[str, Any] | None = None, ) -> DetectionSequence: r"""Runs re-id using a qubo model. Instead of handling constraints explicitly, we add **penalty terms** to the \ objective function: .. math:: \\\mathbf{c}^T \mathbf{x} + \lambda H(\mathbf{x}) - :math:`\mathbf{c}^T \mathbf{x}`: Original cost function. - :math:`H(\mathbf{x})`: Penalty function encoding constraints. - :math:`\lambda`: Penalty weight ensuring feasibility. By tuning :math:`\lambda`, minimizing this unconstrained quadratic objective \ yields feasible solutions. For a constraint of the form :math:`\mathbf{a}^T \mathbf{x} = b`: .. math:: \\H_1(\mathbf{x}) = (\mathbf{a}^T \mathbf{x} - b)^2 This penalty is zero when the constraint is satisfied and positive otherwise. For a constraint of the form :math:`\sum{x_i} \leq 1`: .. math:: \\H_2( \mathbf{x} ) = \sum_{i<j}{x_i x_j} This ensures that at most one variable in the set can take the value 1. Args: det_sequence: detection sequence to re-id. solver_config: Configuration for the qubo solver to use. Must be a ``SolverConfig`` or a mapping with ``"name"`` and ``"options"`` keys. If ``None`` (default) is provided, the :class:`SimulatedAnnealingSolver <tno.quantum.optimization.qubo.solvers.SimulatedAnnealingSolver>` will be used, i.e. ``{"name": "simulated_annealing_solver", "options": {}}``. Returns: The detection sequence after re-id. If the solver did not converge to a feasible solution, returns the input detection sequence. """ x_tensor = self._solve_network_flow_reid_qubo(det_sequence, solver_config) if x_tensor: return self._transform_tensor_to_det_sequence(x_tensor, det_sequence) return det_sequence
[docs] def run_continuous_reid(self, det_sequence: DetectionSequence) -> DetectionSequence: """Runs re-id continuously. Args: det_sequence: detection sequence Returns: The detection sequence after re-id on several frame sequences. """ sequences = self.pick_sequences(det_sequence) for start, end in sequences: snippet = DetectionSequence(det_sequence[start:end]) new_snippet = self.run_classic(snippet) det_sequence[start:end] = new_snippet # if an id switch was found, fix it in the rest of the frames if not np.array_equal(snippet[-1].ids, new_snippet[-1].ids): diff_indices = np.where(snippet[-1].ids != new_snippet[-1].ids)[0] for idx in diff_indices: old_id = snippet[-1].ids[idx] new_id = new_snippet[-1].ids[idx] for frame_det in det_sequence[end:]: frame_det.ids = np.array( [new_id if _id == old_id else _id for _id in frame_det.ids] ) return det_sequence