Time_RCD / models /POLY.py
Oliver Le
Initial commit
d03866e
"""Polynomial Anomoly Detector with GARCH method and raw error method
"""
# Author: Yinchen Wu <[email protected]>
import numpy as np
import math
from scipy.special import erf
from sklearn.preprocessing import MinMaxScaler
from sklearn.utils.validation import check_is_fitted
from .base import BaseDetector
from .distance import Fourier
from ..utils.utility import zscore
class POLY(BaseDetector):
"""An elementary method to detect pointwise anomolies using polynomial approxiamtion.
A polynomial of certain degree and window size is fitted to the given timeseries dataset.
A GARCH method is ran on the difference betweeen the approximation and the true value of
the dataset to estimate the volatitilies of each point. A detector score is derived on each
point based on the estimated volatitilies and residual to measure the normality of each point.
An alternative method that only considers absoulte difference is also used.
Parameters
----------
Power : int, optional (default=1)
The power of polynomial fitted to the data
neighborhood : int, optional (default=max (100, 10*window size))
The number of samples to fit for one subsequence. Since the timeseries may vary,
to caculate the score for the subsequnece (a, a+k) of samples k, we only fit the
polynomal on its neighborhood.
window: int, optional (default = 20)
The length of the window to detect the given anomolies
contamination : float in (0., 0.55), optional (default=0.1)
The amount of contamination of the data set, i.e. the proportion
of outliers in the data set. Used when fitting to define the threshold
on the decision function.
Attributes
----------
estimators_ : dictionary of coefficients at each polynomial
The collection of fitted sub-estimators.
decision_scores_ : numpy array of shape (n_samples,)
The outlier scores of the training data.
The higher, the more abnormal. Outliers tend to have higher
scores. This value is available once the detector is
fitted.
threshold_ : float
The threshold is based on ``contamination``. It is the
``n_samples * contamination`` most abnormal samples in
``decision_scores_``. The threshold is calculated for generating
binary outlier labels.
labels_ : int, either 0 or 1
The binary labels of the training data. 0 stands for inliers
and 1 for outliers/anomalies. It is generated by applying
``threshold_`` on ``decision_scores_``.
"""
def __init__(self, power = 1, window = 200, neighborhood = None, contamination = 0.1, normalize=True):
self.power = power
self.window = window
self.avg_window = None
if neighborhood == None:
self.neighborhood = max(10*window, 100)
else:
self.neighborhood = neighborhood
self.contamination = contamination
self.normalize = normalize
self.model_name = 'POLY'
def fit(self, X, y=None):
"""Fit detector. y is ignored in unsupervised methods.
Parameters
----------
X : numpy array of shape (n_samples, )
The input samples.
y : Ignored
Not used, present for API consistency by convention.
Returns
-------
self : object
Fitted estimator.
"""
if self.normalize: X = (X - X.min()) / (X.max() - X.min())
X = X.squeeze()
# validate inputs X and y (optional)
self._set_n_classes(y)
self.X_train_ = X
self.n_train_ = len(X)
self.decision_scores_ = np.zeros([self.n_train_, 1])
self.n_initial_ = min(500, int(0.1 * self.n_train_))
self.X_initial_ = X[:self.n_initial_]
window = self.window
if self.avg_window != None:
self.neighborhood = max(self.neighborhood, 6*self.avg_window)
if self.neighborhood > len(X):
self.neighborhood = len(X)
neighborhood = self.neighborhood
N = math.floor(self.n_train_ / window)
M = math.ceil(self.n_initial_ / window)
data = self.X_train_
power=self.power
fit = {}
for i in range(M, N+1):
index = int(i * window)
neighbor = int(neighborhood/2)
if index + neighbor < self.n_train_ and index - neighbor > 0:
x = np.concatenate((np.arange(index - neighbor, index), np.arange(index + window, index + neighbor)))
y = np.concatenate((data[index - neighbor: index], data[index + window: index + neighbor] ))
mymodel = np.poly1d(np.polyfit(x, y, power))
fit['model' + str(index)] = mymodel
elif index + neighbor >= self.n_train_ and index + window < self.n_train_:
x = np.concatenate((np.arange(self.n_train_ - neighborhood, index), np.arange(index + window, self.n_train_)))
y = np.concatenate((data[self.n_train_ - neighborhood: index], data[index + window: self.n_train_] ))
mymodel = np.poly1d(np.polyfit(x, y, power))
fit['model' + str(index)] = mymodel
elif index + window >= self.n_train_:
x = np.arange(self.n_train_ - neighborhood, index)
y = data[self.n_train_ - neighborhood: index]
mymodel = np.poly1d(np.polyfit(x, y, power))
fit['model' + str(index)] = mymodel
elif index + window < neighborhood:
x = np.concatenate((np.arange(0, index), np.arange(index + window, neighborhood)))
y = np.concatenate((data[0: index], data[index + window: neighborhood] ))
try:
mymodel = np.poly1d(np.polyfit(x, y, power))
except:
x = np.concatenate((np.arange(0, index), np.arange(len(data[index + window: neighborhood]))))
mymodel = np.poly1d(np.polyfit(x, y, power))
fit['model' + str(index)] = mymodel
else:
x = np.concatenate((np.arange(index - neighbor, index), np.arange(index + window, index + neighbor)))
y = np.concatenate((data[index - neighbor: index], data[index + window: index + neighbor] ))
print(data.shape)
print(x.shape)
print(y.shape)
mymodel = np.poly1d(np.polyfit(x, y, power))
fit['model' + str(index)] = mymodel
Y = np.zeros(self.n_train_ )
for i in range(M, N):
myline = np.linspace(window *i, window * (i+1), window)
Y[window *i: window * (i+1)] = fit['model' + str(i * window)](myline)
if self.n_train_ % N != 0:
x = np.arange(N*window, self.n_train_ )
Y[N*window:] = fit['model'+str(N * window)](x)
self.estimation = Y
self.estimator = fit
measure = Fourier()
measure.detector = self
measure.set_param()
self.decision_scores_ = self.decision_function(X, measure=measure)
return self
def decision_function(self, X= False, measure = None):
"""Derive the decision score based on the given distance measure
Parameters
----------
X : numpy array of shape (n_samples, )
The input samples.
measure : object
object for given distance measure with methods to derive the score
Returns
-------
self : object
Fitted estimator.
"""
if type(X) != bool:
self.X_train_ = X
estimation = self.estimation
window = self.window
n_train_ = self.n_train_
score = np.zeros(n_train_)
N = math.floor((self.n_train_) / window)
M = math.ceil(self.n_initial_ / window)
for i in range(M, N+1):
index = i * window
if i == N:
end = self.n_train_
else:
end = index + window
score[index: index+window] = measure.measure(self.X_train_[index: end], estimation[index: end], index)
self._mu = np.mean(self.decision_scores_)
self._sigma = np.std(self.decision_scores_)
return score
def predict_proba(self, X, method='linear', measure = None):
"""Predict the probability of a sample being outlier. Two approaches
are possible:
1. simply use Min-max conversion to linearly transform the outlier
scores into the range of [0,1]. The model must be
fitted first.
2. use unifying scores, see :cite:`kriegel2011interpreting`.
Parameters
----------
X : numpy array of shape (n_samples, n_features)
The input samples.
method : str, optional (default='linear')
probability conversion method. It must be one of
'linear' or 'unify'.
Returns
-------
outlier_probability : numpy array of shape (n_samples,)
For each observation, tells whether or not
it should be considered as an outlier according to the
fitted model. Return the outlier probability, ranging
in [0,1].
"""
check_is_fitted(self, ['decision_scores_', 'threshold_', 'labels_'])
train_scores = self.decision_scores_
self.fit(X)
self.decision_function(measure = measure)
test_scores = self.decision_scores_
probs = np.zeros([X.shape[0], int(self._classes)])
if method == 'linear':
scaler = MinMaxScaler().fit(train_scores.reshape(-1, 1))
probs[:, 1] = scaler.transform(
test_scores.reshape(-1, 1)).ravel().clip(0, 1)
probs[:, 0] = 1 - probs[:, 1]
return probs
elif method == 'unify':
# turn output into probability
pre_erf_score = (test_scores - self._mu) / (
self._sigma * np.sqrt(2))
erf_score = erf(pre_erf_score)
probs[:, 1] = erf_score.clip(0, 1).ravel()
probs[:, 0] = 1 - probs[:, 1]
return probs
else:
raise ValueError(method,
'is not a valid probability conversion method')