Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 13 additions & 0 deletions docs/api/visualization.rst
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,19 @@ Visualization module
>>> fig = plot_anomaly_scores(X, y, y_pred, figsize=(10, 3), method_to_plot=plot_time_series_colored_by_score)
>>> fig.suptitle("Example of 'plot_anomaly_scores'") # doctest: +SKIP

.. plot::
:context: close-figs

>>> from dtaianomaly.data import demonstration_time_series
>>> from dtaianomaly.visualization import plot_anomaly_scores, plot_time_series_colored_by_score
>>> from dtaianomaly.anomaly_detection import IsolationForest
>>> X, y = demonstration_time_series()
>>> detector = IsolationForest(window_size=100).fit(X)
>>> y_pred = detector.predict_proba(X)
>>> confidence = detector.predict_confidence(X)
>>> fig = plot_anomaly_scores(X, y, y_pred, confidence=confidence, figsize=(10, 3), method_to_plot=plot_time_series_colored_by_score)
>>> fig.suptitle("Example of 'plot_anomaly_scores' with confidence ranges") # doctest: +SKIP


.. autofunction:: dtaianomaly.visualization.plot_time_series_anomalies

Expand Down
79 changes: 79 additions & 0 deletions dtaianomaly/anomaly_detection/BaseDetector.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,11 @@
import pickle
import enum
import numpy as np
import scipy
from pathlib import Path
from typing import Optional, Union

from dtaianomaly.thresholding.thresholding import ContaminationRate
from dtaianomaly import utils
from dtaianomaly.PrettyPrintable import PrettyPrintable

Expand Down Expand Up @@ -125,6 +127,83 @@ def predict_proba(self, X: np.ndarray) -> np.ndarray:
else:
return (raw_scores - min_score) / (max_score - min_score)

def predict_confidence(self, X: np.ndarray, X_train: np.ndarray = None, contamination: float = 0.05, decision_scores_given: bool = False):
"""
Predict the confidence of the anomaly scores on the test given test data.

This method implements ExCeeD [perini2020quantifying]_ (Example-wise Confidence
of anomaly Detectors) to estimate the confidence. ExCeed transforms the predicted
decision scores to probability estimates using a Bayesian approach, which enables
to assign a confidence score to each prediction which captures the uncertainty
of the anomaly detector in that prediction.

Parameters
----------
X: array-like of shape (n_samples, n_attributes)
The test time series for which the confidence of anomaly scores
should be predicted.
X_train: array-like of shape (n_samples_train, n_attributes), default=None
The training time series, which can be used as reference. If
``X_train=None``, the test set is used as reference set.
contamination: float, default=0.05
The (estimated) contamination rate for the data, i.e., the expected
percentage of anomalies.
decision_scores_given: bool, default=False
Whether the given ``X`` and ``X_train`` represent time series data
or decision scores. If ``decision_scores_given=False`` (default),
then the given arrays are interpreted as time series. Otherwise,
they are interpreted as decision scores, as computed by
``decision_function()``.

Returns
-------
confidence: array-like of shape (n_samples)
The confidence of this anomaly detector in each prediction in the
given test time series.

References
----------
.. [perini2020quantifying] Perini, L., Vercruyssen, V., Davis, J. Quantifying
the Confidence of Anomaly Detectors in Their Example-Wise Predictions. In:
Machine Learning and Knowledge Discovery in Databases. ECML PKDD 2020.
Springer, Cham, doi: `10.1007/978-3-030-67664-3_14 <https://doi.org/10.1007/978-3-030-67664-3_14>`_.
"""
# Set the decision scores
if decision_scores_given:
if len(X.shape) > 1:
raise ValueError("In the 'predict_confidence()' method, it was indicated that the decision scores are provided "
"as X (decision_scores_given=True), but the shape of X does not correspond to the shape of decision"
f"scores: {X.shape}!")
if X_train is not None and len(X_train.shape) > 1:
raise ValueError("In the 'predict_confidence()' method, it was indicated that the decision scores are provided "
"as X (decision_scores_given=True), but the shape of X_train does not correspond to the shape of decision"
f"scores: {X.shape}!")
decision_scores = X
decision_scores_train = X_train if X_train is not None else decision_scores

else:
# Compute the decision scores
decision_scores = self.decision_function(X)
decision_scores_train = self.decision_function(X_train) if X_train is not None else decision_scores

# Convert the decision scores to binary predictions
prediction = ContaminationRate(contamination_rate=contamination).threshold(decision_scores)

# Apply the ExCeed method (https://github.com/Lorenzo-Perini/Confidence_AD/blob/master/ExCeeD.py)
n = decision_scores.shape[0]

count_instances = np.vectorize(lambda x: np.count_nonzero(decision_scores_train <= x))
n_instances = count_instances(decision_scores)

prob_func = np.vectorize(lambda x: (1 + x) / (2 + n))
posterior_prob = prob_func(n_instances) # Outlier probability according to ExCeeD

conf_func = np.vectorize(lambda p: 1 - scipy.stats.binom.cdf(n - int(n * contamination), n, p))
exWise_conf = conf_func(posterior_prob)
np.place(exWise_conf, prediction == 0, 1 - exWise_conf[prediction == 0]) # if the example is classified as normal, use 1 - confidence.

return exWise_conf

def save(self, path: Union[str, Path]) -> None:
"""
Save detector to disk as a pickle file with extension `.dtai`. If the given
Expand Down
10 changes: 9 additions & 1 deletion dtaianomaly/visualization/visualization.py
Original file line number Diff line number Diff line change
Expand Up @@ -268,6 +268,7 @@ def plot_with_zoom(
def plot_anomaly_scores(
X: np.array, y: np.array, y_pred: np.array,
time_steps: np.array = None, method_to_plot=plot_demarcated_anomalies,
confidence: np.array = None,
**kwargs) -> plt.Figure:
"""
Plot the given data with the ground truth anomalies, and compare the
Expand All @@ -290,6 +291,8 @@ def plot_anomaly_scores(
time series data), ``y`` (the anomaly labels``), ``time_steps``
(the time steps at which there was an observation) and ``ax``
(the axis on which the plot should be made).
confidence: np.array of shape (n_samples), default=None
The confidence of the anomaly scores.
**kwargs:
Arguments to be passed to plt.subplots().

Expand All @@ -310,7 +313,12 @@ def plot_anomaly_scores(

# Plot the anomaly scores
ax_pred.set_title('Predicted anomaly scores')
ax_pred.plot(time_steps, y_pred)
ax_pred.plot(time_steps, y_pred, label='Anomaly scores')

# Predict the confidence interval
if confidence is not None:
ax_pred.fill_between(time_steps, y_pred - (1-confidence), y_pred + (1-confidence), color='gray', alpha=0.5, label='Confidence range')
ax_pred.legend()

# Return the figure
return fig
Expand Down
78 changes: 78 additions & 0 deletions tests/anomaly_detection/test_BaseDetector.py
Original file line number Diff line number Diff line change
Expand Up @@ -94,3 +94,81 @@ def test_save_and_load(self, tmp_path):
def test_str(self):
assert str(baselines.RandomDetector()) == 'RandomDetector()'
assert str(baselines.AlwaysNormal()) == 'AlwaysNormal()'


class TestConfidence:

def test_predict_confidence(self, univariate_time_series):
X_train = univariate_time_series[:int(univariate_time_series.shape[0]*0.3)]
X_test = univariate_time_series[int(univariate_time_series.shape[0]*0.3):]

detector = baselines.RandomDetector().fit(X_train)
confidence = detector.predict_confidence(X_test, X_train)
assert confidence.shape[0] == X_test.shape[0]
assert len(confidence.shape) == 1

def test_predict_confidence_multivariate(self, multivariate_time_series):
X_train = multivariate_time_series[:int(multivariate_time_series.shape[0]*0.3), :]
X_test = multivariate_time_series[int(multivariate_time_series.shape[0]*0.3):, :]

detector = baselines.RandomDetector().fit(X_train)
confidence = detector.predict_confidence(X_test, X_train)
assert confidence.shape[0] == X_test.shape[0]
assert len(confidence.shape) == 1

def test_predict_confidence_no_train_data(self, univariate_time_series):
detector = baselines.RandomDetector().fit(univariate_time_series)
confidence = detector.predict_confidence(univariate_time_series)
assert confidence.shape[0] == univariate_time_series.shape[0]
assert len(confidence.shape) == 1

def test_predict_confidence_decision_scores_given(self, univariate_time_series):
detector = baselines.RandomDetector(seed=42).fit(univariate_time_series)
decision_scores = detector.decision_function(univariate_time_series)
confidence = detector.predict_confidence(decision_scores, decision_scores_given=True)
assert confidence.shape[0] == univariate_time_series.shape[0]
assert len(confidence.shape) == 1

confidence_other = detector.predict_confidence(univariate_time_series)
assert np.array_equal(confidence, confidence_other)

def test_predict_confidence_decision_scores_train_and_test_given(self, univariate_time_series):
X_train = univariate_time_series[:int(univariate_time_series.shape[0]*0.3)]
X_test = univariate_time_series[int(univariate_time_series.shape[0]*0.3):]
detector = baselines.RandomDetector(seed=42).fit(X_train)
decision_scores = detector.decision_function(X_test)
decision_scores_train = detector.decision_function(X_train)
confidence = detector.predict_confidence(decision_scores, decision_scores_train, decision_scores_given=True)
assert confidence.shape[0] == X_test.shape[0]
assert len(confidence.shape) == 1

confidence_other = detector.predict_confidence(X_test, X_train)
assert np.array_equal(confidence, confidence_other)

def test_predict_confidence_invalid_decision_scores_given(self, univariate_time_series):
univariate_time_series = univariate_time_series.reshape(-1, 1) # To make sure it has two dimensions
assert len(univariate_time_series.shape) > 1

detector = baselines.RandomDetector().fit(univariate_time_series)
with pytest.raises(ValueError):
detector.predict_confidence(univariate_time_series, decision_scores_given=True)

def test_predict_confidence_invalid_decision_scores_train_given(self, univariate_time_series):
univariate_time_series = univariate_time_series.reshape(-1, 1) # To make sure it has two dimensions
assert len(univariate_time_series.shape) > 1

X_train = univariate_time_series[:int(univariate_time_series.shape[0]*0.3), :]
X_test = univariate_time_series[int(univariate_time_series.shape[0]*0.3):, :]

detector = baselines.RandomDetector().fit(X_train)
decision_scores = detector.decision_function(X_test)

with pytest.raises(ValueError):
detector.predict_confidence(decision_scores, X_train, decision_scores_given=True)

def test_repeatability(self, univariate_time_series):
detector = baselines.RandomDetector(seed=42).fit(univariate_time_series)
confidence1 = detector.predict_confidence(univariate_time_series)
confidence2 = detector.predict_confidence(univariate_time_series)
print((confidence1 != confidence2).sum())
assert np.array_equal(confidence1, confidence2)
5 changes: 5 additions & 0 deletions tests/anomaly_detection/test_detectors.py
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,11 @@ def test_fit_predict_on_different_time_series(self, detector, univariate_time_se
decision_function = detector.predict_proba(X_test)
assert decision_function.shape[0] == X_test.shape[0]

def test_predict_confidence(self, detector, univariate_time_series):
detector.fit(univariate_time_series)
confidence = detector.predict_confidence(univariate_time_series)
assert confidence.shape[0] == univariate_time_series.shape[0]


@pytest.mark.parametrize('detector_class,additional_args', [
(anomaly_detection.ClusterBasedLocalOutlierFactor, {'n_clusters': 20}),
Expand Down
15 changes: 15 additions & 0 deletions tests/visualization/test_visualization.py
Original file line number Diff line number Diff line change
Expand Up @@ -92,3 +92,18 @@ def test_given_time_steps(self, plot_function, additional_args, obligated_y_pred
if obligated_y_pred:
additional_args['y_pred'] = np.random.choice([0, 1], size=univariate_time_series.shape[0], replace=True)
visualization.plot_with_zoom(univariate_time_series, y, start_zoom=100, end_zoom=200, time_steps=time_steps, method_to_plot=plot_function, **additional_args)


class TestPlotConfidence:

def test_univariate(self, univariate_time_series):
y = np.random.choice([0, 1], size=univariate_time_series.shape[0], replace=True)
y_pred = np.random.uniform(size=univariate_time_series.shape[0])
confidence = np.random.normal(0, 0.05, size=univariate_time_series.shape[0])
visualization.plot_anomaly_scores(univariate_time_series, y, y_pred, confidence=confidence)

def test_multivariate(self, multivariate_time_series):
y = np.random.choice([0, 1], size=multivariate_time_series.shape[0], replace=True)
y_pred = np.random.uniform(size=multivariate_time_series.shape[0])
confidence = np.random.normal(0, 0.05, size=multivariate_time_series.shape[0])
visualization.plot_anomaly_scores(multivariate_time_series, y, y_pred, confidence=confidence)
Loading