Source code for tno.quantum.ml.datasets._anomalous_spiky_time_series

"""Module for time series dataset."""

from __future__ import annotations

from math import ceil

import numpy as np
from numpy.typing import NDArray
from tno.quantum.utils.validation import check_int, check_real

from tno.quantum.ml.datasets._utils import _safe_train_test_split


[docs] def get_anomalous_spiky_time_series_dataset( # noqa: PLR0913 n_samples: int = 100, n_features: int = 4, n_times: int = 200, anomaly_proportion: float = 0.5, random_seed: int = 42, test_size: float | int | None = None, ) -> tuple[ NDArray[np.float64], NDArray[np.int_], NDArray[np.float64], NDArray[np.int_] ]: r"""Create a time series dataset. This uses normally distributed spikes centered around zero. This function generates non-anomalous time series' with spikes for each feature which come from distributions with random standard deviations between `0` and `0.5`. Anomalous time series' have alternating intervals of small spikes and large spikes. These intervals are of length `10` time units. The small spikes for each feature come from distributions with random standard deviations between `0` and `0.3` and the large spikes for each features come from distributions with random standard deviations between `0.8` and `1.6`. There is an `80%` chance of a small spike and a `20%` chance of a large spike at each time for each feature. Example usage: >>> from tno.quantum.ml.datasets import get_anomalous_spiky_time_series_dataset >>> X_train, y_train, X_val, y_val = get_anomalous_spiky_time_series_dataset( ... n_samples=100, n_features=2, n_times=100, anomaly_proportion=0.5 ... ) >>> print(f"{X_train.shape=}\n{y_train.shape=}\n{X_val.shape=}\n{y_val.shape=}") X_train.shape=(75, 2, 100) y_train.shape=(75,) X_val.shape=(25, 2, 100) y_val.shape=(25,) Args: n_samples: Number of samples n_features: Number of features n_times: Number of evenly spaced times. anomaly_proportion: Percentage of the dataset that contains anomalies. random_seed: Seed to give to the random number generator. Defaults to `42`. test_size: The proportion of the dataset that is included in the test-split. Either represented by a percentage in the range [0.0, 1.0) or as absolute number of test samples in the range [1, inf). Defaults to 0.25. Returns: A tuple containing ``X_training``, ``y_training``, `X_validation`` and ``y_validation``. """ rng = np.random.default_rng(random_seed) # Validate input n_samples = check_int(n_samples, "n_samples", l_bound=1) n_features = check_int(n_features, "n_features", l_bound=1) n_times = check_int(n_times, "n_times", l_bound=1) anomaly_proportion = check_real( anomaly_proportion, "anomaly_proportion", l_bound=0, u_bound=1 ) # Generate non-anomalous time series. n_samples_anomalous = int(anomaly_proportion * n_samples) n_samples_non_anomalous = n_samples - n_samples_anomalous # Define noise levels noise_levels_non_anomalous = rng.uniform(low=0, high=0.5, size=n_features) low_noise_levels = rng.uniform(low=0, high=0.3, size=n_features) high_noise_levels = rng.uniform(low=0.8, high=1.6, size=n_features) # Generate non-anomalous time series X_non_anomalous = np.zeros(shape=(n_samples_non_anomalous, n_features, n_times)) y_non_anomalous = np.zeros(shape=(n_samples_non_anomalous), dtype=np.int_) for feature, noise_level in enumerate(noise_levels_non_anomalous): X_non_anomalous[:, feature, :] = rng.normal( 0, noise_level, (n_samples_non_anomalous, n_times) ) # Generate anomalous time series. X_anomalous = np.zeros(shape=(n_samples_anomalous, n_features, n_times)) y_anomalous = np.ones(shape=(n_samples_anomalous), dtype=np.int_) probability_large_spike = 0.2 spike_types = rng.binomial(n=1, p=probability_large_spike, size=ceil(n_times / 10)) for sample in range(n_samples_anomalous): for feature in range(n_features): low_noise_level = low_noise_levels[feature] high_noise_level = high_noise_levels[feature] X_anomalous[sample, feature, :] = np.array( [ rng.normal( 0, high_noise_level if spike_type else low_noise_level, size=10, ) for spike_type in spike_types ] ).flatten()[:n_times] X = np.concatenate((X_non_anomalous, X_anomalous), axis=0) y = np.concatenate((y_non_anomalous, y_anomalous), axis=0) # Split into training and validation data sets return _safe_train_test_split(X, y, test_size=test_size, random_state=random_seed)