"""This module implements a re-id feature for tracking in ultralytics 8."""
import copy
from collections.abc import Mapping, Sequence
from time import perf_counter
from typing import Any, cast
import numpy as np
from numpy.typing import NDArray
from tno.quantum.optimization.qubo.components import SolverConfig
from tno.quantum.problems.mot import DetectionSequence, calculate_cost, compute_iou
from tno.quantum.problems.mot._bqm import BQMResult, BQMSolver
from tno.quantum.problems.mot._utils import (
boundary_constraints,
capacity_constraints,
flow_constraints,
solve_binary_program,
)
[docs]
class FlowReID:
"""Class to initialize the flow re-identification object."""
[docs]
def __init__(self) -> None:
"""Initialize FlowReID object."""
self.result: None | BQMResult = None
def _solve_network_flow_reid_milp(
self, det_sequence: DetectionSequence
) -> list[NDArray[np.float64]]:
"""Constructs and solves network flow optimization for re-id.
Args:
det_sequence: the sequence of detections (FrameDets) for each frame.
Returns:
The solution to the optimization problem.
"""
start_algorithm = perf_counter()
T = len(det_sequence) # noqa: N806
nd = [d.size for d in det_sequence]
# Assumption: T>2, i.e., there are intermediate nodes
if T < 3:
msg = f"The length of the detection sequence is {T} but must be >2."
raise ValueError(msg)
# compute cost function
c = calculate_cost(det_sequence)
# capacity constraints intermediate nodes
a1, b_l1, b_u1 = capacity_constraints(nd)
# force flow on t=0 and t=T-1
a2, b_l2, b_u2 = boundary_constraints(nd)
# flow constraints
a3, b_l3, b_u3 = flow_constraints(nd)
# milp
A = np.concatenate((a1, a2, a3)) # noqa: N806
b_l = np.concatenate((b_l1, b_l2, b_l3))
b_u = np.concatenate((b_u1, b_u2, b_u3))
x = np.array(solve_binary_program(A, b_l, b_u, c), dtype=np.float64)
# post-process solution
x_tensor = []
pt = 0
for t in range(T - 1):
sub_x = x[pt : pt + nd[t] * nd[t + 1]]
sub_x = sub_x.reshape((nd[t], nd[t + 1]))
x_tensor.append(sub_x)
pt += nd[t] * nd[t + 1]
exec_time = perf_counter() - start_algorithm
self.result = BQMResult(
exec_time=exec_time,
cost=float(np.dot(c, x)),
solution=x_tensor,
converged=True,
)
return x_tensor
def _solve_network_flow_reid_qubo(
self,
det_sequence: DetectionSequence,
solver_config: SolverConfig | Mapping[str, Any] | None = None,
) -> list[NDArray[np.float64]]:
bqm = BQMSolver(det_sequence)
self.result = bqm.solve(solver_config)
if self.result.solution is None:
msg = "No BQM supplied"
raise ValueError(msg)
return self.result.solution
def _transform_tensor_to_det_sequence(
self, x_tensor: list[NDArray[np.float64]], det_sequence: DetectionSequence
) -> DetectionSequence:
"""Transforms output of the network flow optimization into a DetectionSequence.
Args:
x_tensor: The solution tensor from the network flow optimization.
det_sequence: The sequence of detections (FrameDets) used in the
optimization.
Returns:
The updated sequence of detections.
Raises:
ValueError: If x_tensor or det_sequence is empty.
"""
if len(x_tensor) == 0 or len(det_sequence) == 0:
msg = "x_tensor and det_sequence must not be empty."
raise ValueError(msg)
# assume the first frameDet is correct
updated_det_sequence = DetectionSequence([det_sequence[0]])
# iterate through the frames to update ids using the transition matrices
for frame, transition_matrix in enumerate(x_tensor):
updated_det_sequence.append(copy.deepcopy(det_sequence[frame + 1]))
updated_det_sequence[frame + 1].ids = np.dot(
transition_matrix.T, updated_det_sequence[frame].ids
)
return updated_det_sequence
def _sequence_checker(self, nd: list[int]) -> bool:
"""Check if a detection sequence is suitable for re-id.
Based on the number of detections, the checks are:
1. The length of the sequence is between 1 and 200.
2. The number of detections changes at some point.
3. The number of detections does not go below the initial one.
Args:
nd: list of number of detections per frame
Returns:
True if the sequence is suitable. False otherwise.
"""
# Check 1
if len(nd) < 1 or len(nd) > 200:
return False
# Check 2
unique = list(set(nd))
if len(unique) == 1:
return False
# Check 3
return min(unique) >= nd[0]
[docs]
def pick_sequences(
self, det_sequence: DetectionSequence, min_len: int = 50, max_len: int = 80
) -> list[tuple[int, int]]:
"""Picks frame sequences to run re-id on.
Sequences containing new id's are selected for the re-id module. The sequences
are checked for compatibility with the re-id module.
Args:
det_sequence: detection sequence
min_len: minimum length of picked sequences
max_len: maximum length of picked sequences
Returns:
List with start and end frames for the selected sequences.
"""
sequences = []
interesting_frames = []
known_ids = list(det_sequence[0].ids)
# find frames where new ids appear and they intersect another detection
for t, frame in enumerate(det_sequence):
for _id, box1 in zip(frame.ids, frame.xyxy, strict=True):
if _id not in known_ids:
iou = np.array(
[
compute_iou(
cast("Sequence[float]", box1),
cast("Sequence[float]", box2),
)
for box2 in frame.xyxy
]
)
matches = np.where(iou > 0.0)[0]
if len(matches) > 1:
interesting_frames.append(t)
known_ids.append(_id)
# find sequences around interesting frames
nd = [d.size for d in det_sequence]
for f in interesting_frames:
for m in range(min_len, max_len + 1, 5):
trial_sequence = nd[f - 2 : f - 2 + m]
if self._sequence_checker(trial_sequence):
sequences.append((f - 2, f - 2 + m))
break
return sequences
[docs]
def run_classic(self, det_sequence: DetectionSequence) -> DetectionSequence:
r"""Runs re-id using a classic MILP solver from SciPy.
The classic approach solves the following *network flow model*:
- *Capacity constraint*: At most one unit of flow per edge.
- *Flow conservation*: Incoming flow equals outgoing flow for intermediate nodes.
- *Boundary condition*: One unit of flow originates from each initial node.
This approach uses SciPy's MILP solver :func:`~scipy.optimize.milp`.
The decision variables are:
1. :math:`x_{ijt} \in \{0, 1\}` representing whether detection :math:`i` in
frame :math:`t` is associated with detection :math:`j` in frame :math:`t+1`.
- :math:`x_{ijt} = 1` Detection :math:`i` at time :math:`t` and detection :math:`j` at time :math:`t+1` belong to the same object.
- :math:`x_{ijt} = 0` No association.
2. :math:`y_{ij} \in \{0, 1\}` representing additional edges introduced when the
number of detections in intermediate frames drops below the initial frame count.
- :math:`y_{ij} = 1` Detection :math:`i` in frame :math:`f_1` is linked to detection :math:`j` in frame :math:`f_2` to maintain feasibility.
- :math:`y_{ij} = 0` No link between these detections.
When the number of detections drops below the initial layer, additional edges
are introduced between frames :math:`f_1` and :math:`f_2` to maintain feasibility:
.. math::
\begin{split}
& \sum_{j=1}^{nd_{f_1+1}} x_{ijf_1} + \sum_{j=1}^{nd_{f_2}} y_{ij} \leq 1, \quad i = 1, \dots, nd_{f_1} \\
& \sum_{j=1}^{nd_{f_1-1}} x_{jif_1-1} = \sum_{j=1}^{nd_{f_1+1}} x_{ijf_1} + \sum_{j=1}^{nd_{f_2}} y_{ij}, \quad i = 1, \dots, nd_{f_1} \\
& \sum_{j=1}^{nd_{f_2-1}} x_{jif_2-1} + \sum_{j=1}^{nd_{f_1}} y_{ji} = \sum_{j=1}^{nd_{f_2+1}} x_{ijf_2}, \quad i = 1, \dots, nd_{f_2}.
\end{split}
Args:
det_sequence: detection sequence to re-id.
Returns:
The detection sequence after re-id.
""" # noqa: E501
x_tensor = self._solve_network_flow_reid_milp(det_sequence)
if x_tensor:
return self._transform_tensor_to_det_sequence(x_tensor, det_sequence)
return det_sequence
[docs]
def run_qubo(
self,
det_sequence: DetectionSequence,
solver_config: SolverConfig | Mapping[str, Any] | None = None,
) -> DetectionSequence:
r"""Runs re-id using a qubo model.
Instead of handling constraints explicitly, we add **penalty terms** to the \
objective function:
.. math::
\\\mathbf{c}^T \mathbf{x} + \lambda H(\mathbf{x})
- :math:`\mathbf{c}^T \mathbf{x}`: Original cost function.
- :math:`H(\mathbf{x})`: Penalty function encoding constraints.
- :math:`\lambda`: Penalty weight ensuring feasibility.
By tuning :math:`\lambda`, minimizing this unconstrained quadratic objective \
yields feasible solutions.
For a constraint of the form :math:`\mathbf{a}^T \mathbf{x} = b`:
.. math::
\\H_1(\mathbf{x}) = (\mathbf{a}^T \mathbf{x} - b)^2
This penalty is zero when the constraint is satisfied and positive otherwise.
For a constraint of the form :math:`\sum{x_i} \leq 1`:
.. math::
\\H_2( \mathbf{x} ) = \sum_{i<j}{x_i x_j}
This ensures that at most one variable in the set can take the value 1.
Args:
det_sequence: detection sequence to re-id.
solver_config: Configuration for the qubo solver to use. Must be a
``SolverConfig`` or a mapping with ``"name"`` and ``"options"`` keys. If
``None`` (default) is provided, the
:class:`SimulatedAnnealingSolver
<tno.quantum.optimization.qubo.solvers.SimulatedAnnealingSolver>`
will be used, i.e. ``{"name": "simulated_annealing_solver",
"options": {}}``.
Returns:
The detection sequence after re-id. If the solver did not converge to a
feasible solution, returns the input detection sequence.
"""
x_tensor = self._solve_network_flow_reid_qubo(det_sequence, solver_config)
if x_tensor:
return self._transform_tensor_to_det_sequence(x_tensor, det_sequence)
return det_sequence
[docs]
def run_continuous_reid(self, det_sequence: DetectionSequence) -> DetectionSequence:
"""Runs re-id continuously.
Args:
det_sequence: detection sequence
Returns:
The detection sequence after re-id on several frame sequences.
"""
sequences = self.pick_sequences(det_sequence)
for start, end in sequences:
snippet = DetectionSequence(det_sequence[start:end])
new_snippet = self.run_classic(snippet)
det_sequence[start:end] = new_snippet
# if an id switch was found, fix it in the rest of the frames
if not np.array_equal(snippet[-1].ids, new_snippet[-1].ids):
diff_indices = np.where(snippet[-1].ids != new_snippet[-1].ids)[0]
for idx in diff_indices:
old_id = snippet[-1].ids[idx]
new_id = new_snippet[-1].ids[idx]
for frame_det in det_sequence[end:]:
frame_det.ids = np.array(
[new_id if _id == old_id else _id for _id in frame_det.ids]
)
return det_sequence