Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CONTRIBUTORS.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,3 +7,4 @@ Other contributors, listed alphabetically, are:
* Jelle Van De Sijpe
* Nick Seeuws (Leuven.AI, KU Leuven)
* Pablo Ruiz Morales (DTAI, KU Leuven)
* Xiaokun Zhu (DTAI, KU Leuven)
2 changes: 2 additions & 0 deletions docs/additional_information/changelog.rst
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,8 @@ Added
- Added marks to the test: ``slow`` and ``numba``.
- Integrated numpydocs documentation formatting rules.
- Implement ``HybridKNearestNeighbors`` anomaly detector.
- Implement ``SquaredDifference`` baseline anomaly detector.
- Implement ``MovingWindowVariance`` baseline anomaly detector.

Changed
^^^^^^^
Expand Down
2 changes: 2 additions & 0 deletions docs/api/anomaly_detection.rst
Original file line number Diff line number Diff line change
Expand Up @@ -98,4 +98,6 @@ Baselines

AlwaysNormal
AlwaysAnomalous
MovingWindowVariance
RandomDetector
SquaredDifference
10 changes: 9 additions & 1 deletion dtaianomaly/anomaly_detection/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,13 @@
TimeSeriesDataset,
)
from ._Transformer import Transformer
from .baselines import AlwaysAnomalous, AlwaysNormal, RandomDetector
from .baselines import (
AlwaysAnomalous,
AlwaysNormal,
MovingWindowVariance,
RandomDetector,
SquaredDifference,
)

__all__ = [
# Base
Expand All @@ -90,6 +96,8 @@
"AlwaysNormal",
"AlwaysAnomalous",
"RandomDetector",
"MovingWindowVariance",
"SquaredDifference",
# Detectors
"ClusterBasedLocalOutlierFactor",
"CopulaBasedOutlierDetector",
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
import numpy as np

from dtaianomaly import utils
from dtaianomaly.anomaly_detection._BaseDetector import BaseDetector, Supervision
from dtaianomaly.type_validation import WindowSizeAttribute
from dtaianomaly.windowing import (
WINDOW_SIZE_TYPE,
compute_window_size,
reverse_sliding_window,
sliding_window,
)

__all__ = ["MovingWindowVariance"]


class MovingWindowVariance(BaseDetector):
"""
Detect anomalies based on the variance within a window.

Baseline anomaly detector, assigns an anomaly score purely based on the
variance within a sliding window. This detector does not look at any
recurring patterns within the data. Formally, the anomaly score :math:`s_i`
for a window :math:`T_{i,i+w-1}` equals :math:`s_i = var(T_{i,i+w-1})`.

Parameters
----------
window_size : int or str
The window size to use for extracting sliding windows from the time series. This
value will be passed to :py:meth:`~dtaianomaly.anomaly_detection.compute_window_size`.

Attributes
----------
window_size_ : int
The effectively used window size for this anomaly detector.

Examples
--------
>>> from dtaianomaly.anomaly_detection import MovingWindowVariance
>>> from dtaianomaly.data import demonstration_time_series
>>> x, y = demonstration_time_series()
>>> baseline = MovingWindowVariance(16).fit(x)
>>> baseline.decision_function(x)
array([0.06820711, 0.07130246, 0.07286874, ..., 0.01125165, 0.00984333,
0.00986772])
"""

window_size: WINDOW_SIZE_TYPE
window_size_: int
attribute_validation = {"window_size": WindowSizeAttribute()}

def __init__(self, window_size: WINDOW_SIZE_TYPE):
super().__init__(Supervision.UNSUPERVISED)
self.window_size = window_size

def _fit(self, X: np.ndarray, y: np.ndarray = None, **kwargs) -> None:
if not utils.is_univariate(X):
raise ValueError("Input must be univariate!")
self.window_size_ = compute_window_size(X, self.window_size, **kwargs)

def _decision_function(self, X: np.ndarray) -> np.array:
# Check if the given dataset is univariate
if not utils.is_univariate(X):
raise ValueError("Input must be univariate!")

variances = np.var(sliding_window(X, self.window_size_, 1), axis=1)
return reverse_sliding_window(variances, self.window_size_, 1, X.shape[0])
72 changes: 72 additions & 0 deletions dtaianomaly/anomaly_detection/baselines/_SquaredDifference.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
import numpy as np

from dtaianomaly import utils
from dtaianomaly.anomaly_detection._BaseDetector import BaseDetector, Supervision
from dtaianomaly.type_validation import BoolAttribute

__all__ = ["SquaredDifference"]


class SquaredDifference(BaseDetector):
"""
Compute anomaly scores based on the squared difference.

Baseline anomaly detector, assings an anomaly score as the squared
difference of an observation and the previous observation. Formally,
the anomaly score :math:`s_i` for observation :math:`x_i` equals
:math:`s_i = (x_i - x_{i-1})^2`.

Parameters
----------
square_errors : bool, default=True
If the differences should be squared. If False, this detector equals the
absolute difference.

Examples
--------
>>> from dtaianomaly.anomaly_detection import SquaredDifference
>>> from dtaianomaly.data import demonstration_time_series
>>> x, y = demonstration_time_series()
>>> baseline = SquaredDifference().fit(x)
>>> baseline.decision_function(x)
array([0.00779346, 0.00779346, 0.00260361, ..., 0.00286662, 0.05578398,
0.02683475])
"""

square_errors: bool
attribute_validation = {"square_errors": BoolAttribute()}

def __init__(self, square_errors: bool = True):
super().__init__(Supervision.UNSUPERVISED)
self.square_errors = square_errors

def _fit(self, X: np.ndarray, y: np.ndarray = None, **kwargs) -> None:
"""Requires no fitting."""

def _decision_function(self, X: np.ndarray) -> np.array:
# Check if the given dataset is univariate
if not utils.is_univariate(X):
raise ValueError("Input must be univariate!")

decision_scores = np.empty(shape=X.shape[0])
decision_scores[1:] = np.abs(np.diff(X.squeeze()))
decision_scores[0] = decision_scores[1] # Padding
if self.square_errors:
decision_scores = np.square(decision_scores)
return decision_scores


def main():
from dtaianomaly.data import demonstration_time_series
from dtaianomaly.visualization import plot_anomaly_scores

x, y = demonstration_time_series()
x = x.reshape(-1, 1)

baseline = SquaredDifference().fit(x)
y_pred = baseline.decision_function(x)
plot_anomaly_scores(x, y, y_pred, figsize=(20, 5)).show()


if __name__ == "__main__":
main()
4 changes: 4 additions & 0 deletions dtaianomaly/anomaly_detection/baselines/__init__.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,13 @@
from ._AlwaysAnomalous import AlwaysAnomalous
from ._AlwaysNormal import AlwaysNormal
from ._MovingWindowVariance import MovingWindowVariance
from ._RandomDetector import RandomDetector
from ._SquaredDifference import SquaredDifference

__all__ = [
"AlwaysAnomalous",
"AlwaysNormal",
"RandomDetector",
"SquaredDifference",
"MovingWindowVariance",
]
2 changes: 2 additions & 0 deletions tests/anomaly_detection/test_detectors.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@
anomaly_detection.SpectralResidual,
anomaly_detection.MOMENT,
anomaly_detection.TimeMoE,
anomaly_detection.SquaredDifference,
anomaly_detection.MovingWindowVariance,
]
DETECTORS_TO_EXCLUDE = [
anomaly_detection.MOMENT, # Due to dependency conflicts
Expand Down
2 changes: 2 additions & 0 deletions tests/utils/test_discovery.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,8 @@
anomaly_detection.TimeMoE,
anomaly_detection.RobustRandomCutForestAnomalyDetector,
anomaly_detection.HybridKNearestNeighbors,
anomaly_detection.MovingWindowVariance,
anomaly_detection.SquaredDifference,
]
data_loaders = [
data.DemonstrationTimeSeriesLoader,
Expand Down