From d13249a24942a81012309dd28593b17c39cf1a9b Mon Sep 17 00:00:00 2001 From: LouisCarpentier42 Date: Fri, 3 Oct 2025 10:47:16 +0200 Subject: [PATCH] feat: Implement HybridKNearestNeighbors Signed-off-by: LouisCarpentier42 --- docs/additional_information/changelog.rst | 1 + docs/additional_information/contributing.rst | 2 +- docs/api/anomaly_detection.rst | 1 + docs/bibliography.bib | 14 + .../_BasePyODAnomalyDetector.py | 5 +- .../_HybridKNearestNeighbors.py | 392 ++++++++++++++++++ .../anomaly_detection/_KNearestNeighbors.py | 2 +- dtaianomaly/anomaly_detection/__init__.py | 2 + .../test_HybridKNearestNeighbors.py | 126 ++++++ tests/utils/test_discovery.py | 1 + 10 files changed, 542 insertions(+), 4 deletions(-) create mode 100644 dtaianomaly/anomaly_detection/_HybridKNearestNeighbors.py create mode 100644 tests/anomaly_detection/test_HybridKNearestNeighbors.py diff --git a/docs/additional_information/changelog.rst b/docs/additional_information/changelog.rst index dad863b..a6e2751 100644 --- a/docs/additional_information/changelog.rst +++ b/docs/additional_information/changelog.rst @@ -27,6 +27,7 @@ Added - Implement and integrate type validation into ``dtaianomaly``. - Added marks to the test: ``slow`` and ``numba``. - Integrated numpydocs documentation formatting rules. +- Implement ``HybridKNearestNeighbors`` anomaly detector. Changed ^^^^^^^ diff --git a/docs/additional_information/contributing.rst b/docs/additional_information/contributing.rst index 3a344dd..c389a0d 100644 --- a/docs/additional_information/contributing.rst +++ b/docs/additional_information/contributing.rst @@ -268,4 +268,4 @@ Documentation | |check_box| Has a citation to the relevant paper(s) been added in the class documentation, and the bibtex added to ``docs/bibliography.bib``? | |check_box| Did you add the component in tha API-documentation in ``docs/api/``? | |check_box| If you added a :py:class:`~dtaianomaly.data.LazyDataLoader`, did you update `data/README.rst `_? - +| |check_box| Have you updated the changelog? diff --git a/docs/api/anomaly_detection.rst b/docs/api/anomaly_detection.rst index 519401b..abcdd4a 100644 --- a/docs/api/anomaly_detection.rst +++ b/docs/api/anomaly_detection.rst @@ -72,6 +72,7 @@ Neural methods AutoEncoder ConvolutionalNeuralNetwork + HybridKNearestNeighbors LongShortTermMemoryNetwork MultilayerPerceptron Transformer diff --git a/docs/bibliography.bib b/docs/bibliography.bib index ff77562..003f1b4 100644 --- a/docs/bibliography.bib +++ b/docs/bibliography.bib @@ -525,3 +525,17 @@ @InProceedings{perini2021quantifying } +@article{song2017hybrid, + author = {Song, Hongchao and Jiang, Zhuqing and Men, Aidong and Yang, Bo}, + title = {A Hybrid Semi-Supervised Anomaly Detection Model for High-Dimensional Data}, + journal = {Computational Intelligence and Neuroscience}, + volume = {2017}, + number = {1}, + pages = {8501683}, + doi = {https://doi.org/10.1155/2017/8501683}, + year = {2017} +} + + + + diff --git a/dtaianomaly/anomaly_detection/_BasePyODAnomalyDetector.py b/dtaianomaly/anomaly_detection/_BasePyODAnomalyDetector.py index d471bdb..324c03d 100644 --- a/dtaianomaly/anomaly_detection/_BasePyODAnomalyDetector.py +++ b/dtaianomaly/anomaly_detection/_BasePyODAnomalyDetector.py @@ -6,6 +6,7 @@ from dtaianomaly.anomaly_detection._BaseDetector import BaseDetector, Supervision from dtaianomaly.type_validation import IntegerAttribute, WindowSizeAttribute from dtaianomaly.windowing import ( + WINDOW_SIZE_TYPE, compute_window_size, reverse_sliding_window, sliding_window, @@ -42,7 +43,7 @@ class BasePyODAnomalyDetector(BaseDetector, abc.ABC): The PyOD anomaly detector """ - window_size: int | str + window_size: WINDOW_SIZE_TYPE stride: int kwargs: dict window_size_: int @@ -53,7 +54,7 @@ class BasePyODAnomalyDetector(BaseDetector, abc.ABC): "stride": IntegerAttribute(1), } - def __init__(self, window_size: int | str, stride: int = 1, **kwargs): + def __init__(self, window_size: WINDOW_SIZE_TYPE, stride: int = 1, **kwargs): super().__init__(self._supervision()) self.window_size = window_size self.stride = stride diff --git a/dtaianomaly/anomaly_detection/_HybridKNearestNeighbors.py b/dtaianomaly/anomaly_detection/_HybridKNearestNeighbors.py new file mode 100644 index 0000000..0489a47 --- /dev/null +++ b/dtaianomaly/anomaly_detection/_HybridKNearestNeighbors.py @@ -0,0 +1,392 @@ +import random +from typing import Literal + +import numpy as np +import torch +from sklearn.neighbors import NearestNeighbors + +from dtaianomaly.anomaly_detection._BaseDetector import BaseDetector, Supervision +from dtaianomaly.anomaly_detection._BaseNeuralDetector import ( + ACTIVATION_FUNCTION_TYPE, + ACTIVATION_FUNCTIONS, + BaseNeuralDetector, +) +from dtaianomaly.type_validation import ( + FloatAttribute, + IntegerAttribute, + ListAttribute, + LiteralAttribute, + NoneAttribute, + WindowSizeAttribute, +) +from dtaianomaly.windowing import ( + WINDOW_SIZE_TYPE, + compute_window_size, + reverse_sliding_window, + sliding_window, +) + +__all__ = ["HybridKNearestNeighbors"] + + +class HybridKNearestNeighbors(BaseDetector): + """ + Anomaly detection based on a hybrid K-NN with AutoEncoder embedding :cite:`song2017hybrid`. + + Combine an autoencoder model to learn a latent space representation of the subsequences + with an ensemble of K-NN instances. At training, an autoencoder is fitted using subsequences + from the training time series to embed them into a latent space. Then, the latent space + embeddings are split into multiple subsets in a bagging-like manner. For each subset, a + K-NN instance is initialized, and the average K-th nearest neighbor distance of each sample + is computed across all the subsets. At prediction time, the autoencoder creates the latent + space embedding of the sequences, and the average K-th nearest neighbor distance of each + test-sequence across all subsets is computed. The anomaly score of a test-sequence in regard + to a subset is then computed as the proportion of samples in the subset that have + a smaller average distance. The final anomaly score equals the average anomaly score + across all subsets. + + Parameters + ---------- + window_size : int or str + The window size to use for extracting sliding windows from the time series. This + value will be passed to :py:meth:`~dtaianomaly.anomaly_detection.compute_window_size`. + stride : int, default=1 + The stride, i.e., the step size for extracting sliding windows from the time series. + n_neighbors : int, default=5 + The number of neighbors to use for the nearest neighbor queries. + n_estimators : int, default=100 + The number of K-NN instance and consequently subsets to use. + max_samples : int or float, default='auto' + The number of samples to draw for each subset: + + - if ``int``: Draw at most ``max_samples`` samples. + - if ``float``: Draw at most ``max_samples`` percentage of the samples. + - if ``'auto'``: Set ``max_samples=n_windows/n_estimators``. + + metric : str, default='euclidean' + Distance metric for distance computations. Any metric of scikit-learn and + scipy.spatial.distance can be used. + hidden_layer_dimensions : list of ints, default=[64] + The number of neurons in each hidden layer of the encoder and decoder. The given list + equals the ordered sequence of neurons in the encoder. The layers in the decoder has + the same dimensions but mirrored. + latent_space_dimension : int default=16 + The dimension of the latent space. + activation_function : {"linear", "relu", "sigmoid", "tanh"} default="relu" + The activation function to use at the end of each layer. + batch_size : int, default=32 + The size of the batches to feed to the network. + n_epochs : int, default=5 + The number of epochs for which the neural network should be trained. + learning_rate : float, default=1e-3 + The learning rate to use for training the network. + device : str, default="cpu" + The device on which te neural network should be trained. + For more information, see: https://docs.pytorch.org/docs/stable/tensor_attributes.html#torch-device. + seed : int, default=None + The seed used for training the autoencoder and sampling the subsets. + + Attributes + ---------- + window_size_ : int + The effectively used window size for this anomaly detector + nearest_neighbors_ : list[NearestNeighbors] + The scikit-learn nearest neighbor instances for each subset + g_ : list[np.ndarray] + For each subset, a vector containing the average distance of each + sample within the subset to its K-th nearest neighbor across all + other subsets. + auto_encoder_ : torch.nn.Module + The auto encoder used to embed the windows in the time series. + + Notes + ----- + - :cite:t:`song2017hybrid` assigns a binary anomaly score for a test-sequence + with regards to a subset by checking if the number of sequences within the + subset that have a greater distance than the test-sequence exceeds some + predefined threshold :math:`\\alpha` (Equation 10). We drop this part to + reduce the number of parameters and to allow for a more fine-grained anomaly + score computation. + - Currently, a very simple feed-forward auto encoder is implemented. If you want + to use a more advanced model, you can extend this class and overwrite the + :py:meth:`~dtaianomaly.anomaly_detection.HybridKNearestNeighbors.build_auto_encoder` + which returns a torch.nn.Module with ``fit(windows)`` and a ``encode(windows)`` + methods. The given windows are those computed by :py:meth:`~dtaianomaly.windowing.sliding_window`. + + Examples + -------- + >>> from dtaianomaly.anomaly_detection import HybridKNearestNeighbors + >>> from dtaianomaly.data import demonstration_time_series + >>> x, y = demonstration_time_series() + >>> hybrid_knn = HybridKNearestNeighbors(64, seed=0).fit(x) + >>> hybrid_knn.decision_function(x) # doctest: +ELLIPSIS, +NORMALIZE_WHITESPACE + array([0.12284644, 0.38202247, 0.43220974, ..., 0.83470662, 0.81722846, + 0.85243446]) + """ + + window_size: WINDOW_SIZE_TYPE + stride: int + n_neighbors: int + n_estimators: int + max_samples: float | int | Literal["auto"] + metric: str + hidden_layer_dimensions: list[int] + latent_space_dimension: int + activation_function: ACTIVATION_FUNCTION_TYPE + n_epochs: int + learning_rate: float + device: str + seed: int | None + + window_size_: int + nearest_neighbors_: list[NearestNeighbors] + g_: list[np.ndarray] + auto_encoder_: torch.nn.Module + + attribute_validation = { + "window_size": WindowSizeAttribute(), + "stride": IntegerAttribute(1), + "n_neighbors": IntegerAttribute(minimum=1), + "n_estimators": IntegerAttribute(minimum=1), + "max_samples": IntegerAttribute(minimum=1) + | FloatAttribute(0.0, 1.0, inclusive_minimum=False) + | LiteralAttribute("auto"), + "hidden_layer_dimensions": ListAttribute(IntegerAttribute(minimum=1)), + "latent_space_dimension": IntegerAttribute(minimum=1), + "activation_function": LiteralAttribute(ACTIVATION_FUNCTIONS), + "batch_size": IntegerAttribute(minimum=1), + "n_epochs": IntegerAttribute(minimum=1), + "learning_rate": FloatAttribute(minimum=0.0, inclusive_minimum=False), + "seed": IntegerAttribute() | NoneAttribute(), + } + + def __init__( + self, + window_size: WINDOW_SIZE_TYPE, + stride: int = 1, + n_neighbors: int = 5, + n_estimators: int = 3, + max_samples: float | int = "auto", + metric: str = "euclidean", + hidden_layer_dimensions: list[int] = (64,), + latent_space_dimension: int = 16, + activation_function: ACTIVATION_FUNCTION_TYPE = "relu", + batch_size: int = 32, + n_epochs: int = 5, + learning_rate: float = 1e-3, + device: str = "cpu", + seed: int = None, + ): + super().__init__(Supervision.SEMI_SUPERVISED) + self.window_size = window_size + self.stride = stride + self.n_neighbors = n_neighbors + self.n_estimators = n_estimators + self.max_samples = max_samples + self.metric = metric + self.hidden_layer_dimensions = list(hidden_layer_dimensions) + self.latent_space_dimension = latent_space_dimension + self.activation_function = activation_function + self.batch_size = batch_size + self.n_epochs = n_epochs + self.learning_rate = learning_rate + self.device = device + self.seed = seed + + def _fit(self, X: np.ndarray, y: np.ndarray = None, **kwargs) -> None: + # Compute the window size + self.window_size_ = compute_window_size(X, self.window_size, **kwargs) + windows = sliding_window(X, self.window_size_, self.stride) + + self.auto_encoder_ = self.build_auto_encoder(windows) + self.auto_encoder_.fit(windows) + X_ = self.auto_encoder_.encode(windows) + + # Create the subsets + subsets = self._create_subsets(X_) + + # Fit the nearest neighbor instances + self.nearest_neighbors_ = [ + NearestNeighbors(n_neighbors=self.n_neighbors, metric=self.metric).fit( + subset + ) + for subset in subsets + ] + + # Compute the g-array + self.g_ = [self._compute_g(subset).reshape(1, -1) for subset in subsets] + + def _decision_function(self, X: np.ndarray) -> np.array: + # Compute the windows + windows = sliding_window(X, self.window_size_, self.stride) + X_ = self.auto_encoder_.encode(windows) + + # Compute the G-values + g = self._compute_g(X_).reshape(-1, 1) + + # Compute the p-values (Equation 9) + p = np.array( + [ + # The paper states less_equal, but then detects anomalies based on the smallest values. + # We compute greater to have higher scores for more anomalous points. + np.greater(g, self.g_[i]).mean(axis=1) + for i in range(self.n_estimators) + ] + ) + + # Apply reverse sliding window + return reverse_sliding_window( + p.mean(axis=0), self.window_size_, self.stride, X.shape[0] + ) + + def _create_subsets(self, X: np.ndarray) -> list[np.ndarray]: + rng = np.random.default_rng(self.seed) + if self.max_samples == "auto": + nb_samples = X.shape[0] / self.n_estimators + elif isinstance(self.max_samples, float): + nb_samples = self.max_samples * X.shape[0] + else: + nb_samples = self.max_samples + + return [ + X[rng.choice(X.shape[0], size=int(nb_samples), replace=False)] + for _ in range(self.n_estimators) + ] + + def _compute_g(self, X: np.ndarray) -> np.array: + # Equation 8 + return np.array( + [ + nearest_neighbors.kneighbors(X)[0][:, -1] + for nearest_neighbors in self.nearest_neighbors_ + ] + ).mean(axis=0) + + def build_auto_encoder(self, windows: np.ndarray) -> torch.nn.Module: + """ + Build an auto encoder. + + Build an auto encoder module that takes as input the given windows + and learns to reconstruct them. + + Parameters + ---------- + windows : array-like of shape (n_windows, n_attributes x window_size) + The windows that will be fed to the auto encoder. + + Returns + ------- + torch.nn.Module + Returns a torch neural network module which will take as input + the windows and learns to reconstruct them. The torch module + has a ``.fit(windows)`` method to learn the weights and a + ``.encode(windows)`` method to transform the windows into a + latent space embedding. + """ + return _AutoEncoder( + input_size=windows.shape[1], + hidden_layer_dimensions=self.hidden_layer_dimensions, + latent_space_dimension=self.latent_space_dimension, + activation_function=self.activation_function, + batch_size=self.batch_size, + n_epochs=self.n_epochs, + learning_rate=self.learning_rate, + device=self.device, + seed=self.seed, + ) + + +class _AutoEncoder(torch.nn.Module): + + encoder: torch.nn.Sequential + decorator: torch.nn.Sequential + batch_size: int + n_epochs: int + learning_rate: float + device: str + seed: int | None + + def __init__( + self, + input_size: int, + hidden_layer_dimensions: list[int], + latent_space_dimension: int, + activation_function: ACTIVATION_FUNCTION_TYPE, + batch_size: int, + n_epochs: int, + learning_rate: float, + device: str, + seed: int | None, + ): + super().__init__() + self.seed = seed + self._set_seed() + + encoder_layers = [] + prev_d = input_size + for d in hidden_layer_dimensions: + encoder_layers.append(torch.nn.Linear(prev_d, d)) + encoder_layers.append( + BaseNeuralDetector._build_activation_function(activation_function) + ) + prev_d = d + encoder_layers.append( + torch.nn.Linear(hidden_layer_dimensions[-1], latent_space_dimension) + ) + self.encoder = torch.nn.Sequential(*encoder_layers) + + decoder_layers = [] + prev_d = latent_space_dimension + for d in reversed(hidden_layer_dimensions): + decoder_layers.append(torch.nn.Linear(prev_d, d)) + decoder_layers.append( + BaseNeuralDetector._build_activation_function(activation_function) + ) + prev_d = d + decoder_layers.append(torch.nn.Linear(hidden_layer_dimensions[0], input_size)) + self.decoder = torch.nn.Sequential(*decoder_layers) + + self.batch_size = batch_size + self.n_epochs = n_epochs + self.learning_rate = learning_rate + self.device = device + self.to(device) + + def _set_seed(self): + if self.seed is not None: + random.seed(self.seed) + np.random.seed(self.seed) + torch.manual_seed(self.seed) + torch.cuda.manual_seed_all(self.seed) + torch.backends.cudnn.deterministic = True + torch.backends.cudnn.benchmark = False + + def forward(self, x): + encoded = self.encoder(x) + decoded = self.decoder(encoded) + return decoded + + def encode(self, windows: np.ndarray) -> np.ndarray: + with torch.no_grad(): + x = torch.Tensor(windows).to(self.device) + encoded = self.encoder(x) + return encoded.cpu().numpy() + + def fit(self, windows: np.ndarray): + self._set_seed() + + optimizer = torch.optim.Adam(self.parameters(), lr=self.learning_rate) + criterion = torch.nn.MSELoss() + + dataset = torch.utils.data.TensorDataset(torch.Tensor(windows)) + dataloader = torch.utils.data.DataLoader(dataset, batch_size=self.batch_size) + + self.train() + for epoch in range(self.n_epochs): + for batch in dataloader: + batch = batch[0].to(self.device) + self.zero_grad() + loss = criterion(self.forward(batch), batch) + loss.backward() + optimizer.step() + self.eval() diff --git a/dtaianomaly/anomaly_detection/_KNearestNeighbors.py b/dtaianomaly/anomaly_detection/_KNearestNeighbors.py index 4a32e01..ce01369 100644 --- a/dtaianomaly/anomaly_detection/_KNearestNeighbors.py +++ b/dtaianomaly/anomaly_detection/_KNearestNeighbors.py @@ -46,7 +46,7 @@ class KNearestNeighbors(BasePyODAnomalyDetector): Distance metric for distance computations. Any metric of scikit-learn and scipy.spatial.distance can be used. **kwargs - Arguments to be passed to the PyOD isolation forest. + Arguments to be passed to the PyOD K-NN. Attributes ---------- diff --git a/dtaianomaly/anomaly_detection/__init__.py b/dtaianomaly/anomaly_detection/__init__.py index 1611e21..b4f8caf 100644 --- a/dtaianomaly/anomaly_detection/__init__.py +++ b/dtaianomaly/anomaly_detection/__init__.py @@ -53,6 +53,7 @@ from ._CopulaBasedOutlierDetector import CopulaBasedOutlierDetector from ._DWT_MLEAD import DWT_MLEAD from ._HistogramBasedOutlierScore import HistogramBasedOutlierScore +from ._HybridKNearestNeighbors import HybridKNearestNeighbors from ._IsolationForest import IsolationForest from ._KernelPrincipalComponentAnalysis import KernelPrincipalComponentAnalysis from ._KMeansAnomalyDetector import KMeansAnomalyDetector @@ -124,4 +125,5 @@ "MOMENT", "TimeMoE", "RobustRandomCutForestAnomalyDetector", + "HybridKNearestNeighbors", ] diff --git a/tests/anomaly_detection/test_HybridKNearestNeighbors.py b/tests/anomaly_detection/test_HybridKNearestNeighbors.py new file mode 100644 index 0000000..b581139 --- /dev/null +++ b/tests/anomaly_detection/test_HybridKNearestNeighbors.py @@ -0,0 +1,126 @@ +import numpy as np +import pytest +from conftest import is_activation, is_linear, is_sequential + +from dtaianomaly.anomaly_detection import HybridKNearestNeighbors, Supervision +from dtaianomaly.anomaly_detection._BaseNeuralDetector import ACTIVATION_FUNCTIONS +from dtaianomaly.anomaly_detection._HybridKNearestNeighbors import _AutoEncoder +from dtaianomaly.windowing import sliding_window + + +class TestHybridKNearestNeighbors: + + def test_supervision(self): + assert HybridKNearestNeighbors(1).supervision == Supervision.SEMI_SUPERVISED + + def test_build_architecture_default(self, univariate_time_series): + window_size = 32 + detector = HybridKNearestNeighbors(window_size=window_size) + windows = sliding_window( + univariate_time_series, window_size=window_size, stride=1 + ) + auto_encoder = detector.build_auto_encoder(windows) + + assert isinstance(auto_encoder, _AutoEncoder) + + encoder_modules = auto_encoder.encoder.modules() + assert is_sequential(next(encoder_modules)) + assert is_linear(next(encoder_modules), window_size, 64) + assert is_activation(next(encoder_modules), "relu") + assert is_linear(next(encoder_modules), 64, 16) + with pytest.raises(StopIteration): + next(encoder_modules) + + decoder_modules = auto_encoder.decoder.modules() + assert is_sequential(next(decoder_modules)) + assert is_linear(next(decoder_modules), 16, 64) + assert is_activation(next(decoder_modules), "relu") + assert is_linear(next(decoder_modules), 64, window_size) + with pytest.raises(StopIteration): + next(decoder_modules) + + def test_build_architecture_custom_layers(self, univariate_time_series): + window_size = 32 + detector = HybridKNearestNeighbors( + window_size=window_size, + hidden_layer_dimensions=[64, 48, 32, 16], + latent_space_dimension=8, + ) + windows = sliding_window( + univariate_time_series, window_size=window_size, stride=1 + ) + auto_encoder = detector.build_auto_encoder(windows) + + assert isinstance(auto_encoder, _AutoEncoder) + + encoder_modules = auto_encoder.encoder.modules() + assert is_sequential(next(encoder_modules)) + assert is_linear(next(encoder_modules), window_size, 64) + assert is_activation(next(encoder_modules), "relu") + assert is_linear(next(encoder_modules), 64, 48) + assert is_activation(next(encoder_modules), "relu") + assert is_linear(next(encoder_modules), 48, 32) + assert is_activation(next(encoder_modules), "relu") + assert is_linear(next(encoder_modules), 32, 16) + assert is_activation(next(encoder_modules), "relu") + assert is_linear(next(encoder_modules), 16, 8) + with pytest.raises(StopIteration): + next(encoder_modules) + + decoder_modules = auto_encoder.decoder.modules() + assert is_sequential(next(decoder_modules)) + assert is_linear(next(decoder_modules), 8, 16) + assert is_activation(next(decoder_modules), "relu") + assert is_linear(next(decoder_modules), 16, 32) + assert is_activation(next(decoder_modules), "relu") + assert is_linear(next(decoder_modules), 32, 48) + assert is_activation(next(decoder_modules), "relu") + assert is_linear(next(decoder_modules), 48, 64) + assert is_activation(next(decoder_modules), "relu") + assert is_linear(next(decoder_modules), 64, window_size) + with pytest.raises(StopIteration): + next(decoder_modules) + + @pytest.mark.parametrize("activation_function", ACTIVATION_FUNCTIONS) + def test_build_architecture_custom_activation_function( + self, univariate_time_series, activation_function + ): + window_size = 32 + detector = HybridKNearestNeighbors( + window_size=window_size, activation_function=activation_function + ) + windows = sliding_window( + univariate_time_series, window_size=window_size, stride=1 + ) + auto_encoder = detector.build_auto_encoder(windows) + + assert isinstance(auto_encoder, _AutoEncoder) + + encoder_modules = auto_encoder.encoder.modules() + assert is_sequential(next(encoder_modules)) + assert is_linear(next(encoder_modules), window_size, 64) + assert is_activation(next(encoder_modules), activation_function) + assert is_linear(next(encoder_modules), 64, 16) + with pytest.raises(StopIteration): + next(encoder_modules) + + decoder_modules = auto_encoder.decoder.modules() + assert is_sequential(next(decoder_modules)) + assert is_linear(next(decoder_modules), 16, 64) + assert is_activation(next(decoder_modules), activation_function) + assert is_linear(next(decoder_modules), 64, window_size) + with pytest.raises(StopIteration): + next(decoder_modules) + + def test_seed(self, univariate_time_series): + detector1 = HybridKNearestNeighbors(1, seed=0) + y_pred1 = detector1.fit(univariate_time_series).decision_function( + univariate_time_series + ) + + detector2 = HybridKNearestNeighbors(1, seed=0) + y_pred2 = detector2.fit(univariate_time_series).decision_function( + univariate_time_series + ) + + assert np.array_equal(y_pred1, y_pred2) diff --git a/tests/utils/test_discovery.py b/tests/utils/test_discovery.py index 7eacdb2..d5e1c4b 100644 --- a/tests/utils/test_discovery.py +++ b/tests/utils/test_discovery.py @@ -35,6 +35,7 @@ anomaly_detection.MOMENT, anomaly_detection.TimeMoE, anomaly_detection.RobustRandomCutForestAnomalyDetector, + anomaly_detection.HybridKNearestNeighbors, ] data_loaders = [ data.DemonstrationTimeSeriesLoader,