Spaces:
Running
Running
File size: 16,300 Bytes
d03866e |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 392 393 394 395 396 397 398 399 400 401 402 403 404 405 406 407 408 409 410 411 412 413 414 415 416 417 418 419 420 421 422 423 424 425 426 427 428 429 430 431 432 433 434 435 436 437 438 |
"""
This function is adapted from [eif] by [mgckind]
Original source: [https://github.com/sahandha/eif]
"""
# import eif as iso
from .base import BaseDetector
import numpy as np
import math
import random as rn
import os
import warnings
from ..utils.utility import zscore
def c_factor(n) :
"""
Average path length of unsuccesful search in a binary search tree given n points
Parameters
----------
n : int
Number of data points for the BST.
Returns
-------
float
Average path length of unsuccesful search in a BST
"""
return 2.0*(np.log(n-1)+0.5772156649) - (2.0*(n-1.)/(n*1.0))
class iForest(object):
"""
Creates an iForest object. This object holds the data as well as the trained trees (iTree objects).
Attributes
----------
X : list
Data used for training. It is a list of list of floats.
nobjs: int
Size of the dataset.
sample: int
Size of the sample to be used for tree creation.
Trees: list
A list of tree objects.
limit: int
Maximum depth a tree can have.
exlevel: int
Exention level to be used in the creating splitting critera.
c: float
Multiplicative factor used in computing the anomaly scores.
Methods
-------
CheckExtensionLevel()
Chaeck the validity of extension level provided by user based on the data
compute_paths(X_in)
Computes the anomaly score for data X_in
"""
def __init__(self, X, ntrees, sample_size, limit=None, ExtensionLevel=0):
"""
iForest(X, ntrees, sample_size, limit=None, ExtensionLevel=0)
Initialize a forest by passing in training data, number of trees to be used and the subsample size.
Parameters
----------
X : list of list of floats
Training data. List of [x1,x2,...,xn] coordinate points.
ntrees : int
Number of trees to be used.
sample_size : int
The size of the subsample to be used in creation of each tree. Must be smaller than |X|
limit : int
The maximum allowed tree depth. This is by default set to average length of unsucessful search in a binary tree.
ExtensionLevel : int
Specifies degree of freedom in choosing the hyperplanes for dividing up data. Must be smaller than the dimension n of the dataset.
"""
self.ntrees = ntrees
self.X = X
self.nobjs = len(X)
self.sample = sample_size
self.Trees = []
self.limit = limit
self.exlevel = ExtensionLevel
self.CheckExtensionLevel() # Extension Level check. See def for explanation.
if limit is None:
self.limit = int(np.ceil(np.log2(self.sample))) # Set limit to the default as specified by the paper (average depth of unsuccesful search through a binary tree).
self.c = c_factor(self.sample)
for i in range(self.ntrees): # This loop builds an ensemble of iTrees (the forest).
ix = rn.sample(range(self.nobjs), self.sample)
X_p = X[ix]
self.Trees.append(iTree(X_p, 0, self.limit, exlevel=self.exlevel))
def CheckExtensionLevel(self):
"""
This function makes sure the extension level provided by the user does not exceed the dimension of the data. An exception will be raised in the case of a violation.
"""
dim = self.X.shape[1]
if self.exlevel < 0:
raise Exception("Extension level has to be an integer between 0 and "+ str(dim-1)+".")
if self.exlevel > dim-1:
raise Exception("Your data has "+ str(dim) + " dimensions. Extension level can't be higher than " + str(dim-1) + ".")
def compute_paths(self, X_in = None):
"""
compute_paths(X_in = None)
Compute anomaly scores for all data points in a dataset X_in
Parameters
----------
X_in : list of list of floats
Data to be scored. iForest.Trees are used for computing the depth reached in each tree by each data point.
Returns
-------
float
Anomaly score for a given data point.
"""
if X_in is None:
X_in = self.X
S = np.zeros(len(X_in))
for i in range(len(X_in)):
h_temp = 0
for j in range(self.ntrees):
h_temp += PathFactor(X_in[i],self.Trees[j]).path*1.0 # Compute path length for each point
Eh = h_temp/self.ntrees # Average of path length travelled by the point in all trees.
S[i] = 2.0**(-Eh/self.c) # Anomaly Score
return S
class Node(object):
"""
A single node from each tree (each iTree object). Nodes containe information on hyperplanes used for data division, date to be passed to left and right nodes, whether they are external or internal nodes.
Attributes
----------
e: int
Depth of the tree to which the node belongs.
size: int
Size of the dataset present at the node.
X: list
Data at the node.
n: list
Normal vector used to build the hyperplane that splits the data in the node.
p: list
Intercept point through which the hyperplane passes.
lef: Node object
Left child node.
right: Node object
Right child node.
ntype: str
The type of the node: 'exNode', 'inNode'.
"""
def __init__(self, X, n, p, e, left, right, node_type = '' ):
"""
Node(X, n, p, e, left, right, node_type = '' )
Create a node in a given tree (iTree objectg)
Parameters
----------
X : list of list of floats
Training data available to each node. List of [x1,x2,...,xn] coordinate points.
n : list of floats
Normal vector for the hyperplane used for splitting data.
p : list of floats
Intercept point for the hyperplane used for splitting data.
left : Node object
Left child node.
right : Node object
Right child node.
node_type : str
Specifies if the node is external or internal. Takes two values: 'exNode', 'inNode'.
"""
self.e = e
self.size = len(X)
self.X = X # to be removed
self.n = n
self.p = p
self.left = left
self.right = right
self.ntype = node_type
class iTree(object):
"""
A single tree in the forest that is build using a unique subsample.
Attributes
----------
exlevel: int
Extension level used in the splitting criteria.
e: int
Depth of tree
X: list
Data present at the root node of this tree.
size: int
Size of the dataset.
dim: int
Dimension of the dataset.
Q: list
List of ordered integers smaller than dim.
l: int
Maxium depth a tree can reach before its creation is terminated.
n: list
Normal vector at the root of this tree, which is used in creating hyperplanes for splitting critera
p: list
Intercept point at the root of this tree through which the splitting hyperplane passes.
exnodes: int
The number of external nodes this tree has.
root: Node object
At each node create a new tree.
Methods
-------
make_tree(X, e, l)
Builds the tree recursively from a given node. Returns a Node object.
"""
def __init__(self,X,e,l, exlevel=0):
"""
iTree(X, e, l, exlevel=0)
Create a tree
Parameters
----------
X : list of list of floats
Subsample of training data. |X| = iForest.sample_size. List of [x1,x2,...,xn] coordinate points
e : int
Depth of the tree as it is being traversed down. e <= l.
l : int
The maximum depth the tree can reach before its creation is terminated.
exlevel : int
Specifies degree of freedom in choosing the hyperplanes for dividing up data. Must be smaller than the dimension n of the dataset.
"""
self.exlevel = exlevel
self.e = e
self.X = X #save data for now. Not really necessary.
self.size = len(X)
self.dim = self.X.shape[1]
self.Q = np.arange(np.shape(X)[1], dtype='int') # n dimensions
self.l = l
self.p = None # Intercept for the hyperplane for splitting data at a given node.
self.n = None # Normal vector for the hyperplane for splitting data at a given node.
self.exnodes = 0
self.root = self.make_tree(X,e,l) # At each node create a new tree, starting with root node.
def make_tree(self,X,e,l):
"""
make_tree(X,e,l)
Builds the tree recursively from a given node. Returns a Node object.
Parameters
----------
X: list of list of floats
Subsample of training data. |X| = iForest.sample_size. List of [x1,x2,...,xn] coordinate point.
e : int
Depth of the tree as it is being traversed down. Integer. e <= l.
l : int
The maximum depth the tree can reach before its creation is terminated. Integer.
Returns
-------
Node object
"""
self.e = e
if e >= l or len(X) <= 1: # A point is isolated in traning data, or the depth limit has been reached.
left = None
right = None
self.exnodes += 1
return Node(X, self.n, self.p, e, left, right, node_type = 'exNode')
else: # Building the tree continues. All these nodes are internal.
mins = X.min(axis=0)
maxs = X.max(axis=0)
idxs = np.random.choice(range(self.dim), self.dim-self.exlevel-1, replace=False) # Pick the indices for which the normal vector elements should be set to zero acccording to the extension level.
self.n = np.random.normal(0,1,self.dim) # A random normal vector picked form a uniform n-sphere. Note that in order to pick uniformly from n-sphere, we need to pick a random normal for each component of this vector.
self.n[idxs] = 0
self.p = np.random.uniform(mins,maxs) # Picking a random intercept point for the hyperplane splitting data.
w = (X-self.p).dot(self.n) < 0 # Criteria that determines if a data point should go to the left or right child node.
return Node(X, self.n, self.p, e,\
left=self.make_tree(X[w],e+1,l),\
right=self.make_tree(X[~w],e+1,l),\
node_type = 'inNode' )
class PathFactor(object):
"""
Given a single tree (iTree objext) and a data point x = [x1,x2,...,xn], compute the legth of the path traversed by the point on the tree when it reaches an external node.
Attributes
----------
path_list: list
A list of strings 'L' or 'R' which traces the path a data point travels down a tree.
x: list
A single data point, which is represented as a list of floats.
e: int
The depth of a given node in the tree.
Methods
-------
find_path(T)
Given a tree, it finds the path a single data points takes.
"""
def __init__(self,x,itree):
"""
PathFactor(x, itree)
Given a single tree (iTree objext) and a data point x = [x1,x2,...,xn], compute the legth of the path traversed by the point on the tree when it reaches an external node.
Parameters
----------
x : list of floats
A data point x = [x1, x2, ..., xn].
itree : iTree object
A single tree.
"""
self.path_list=[]
self.x = x
self.e = 0
self.path = self.find_path(itree.root)
def find_path(self,T):
"""
find_path(T)
Given a tree, find the path for a single data point based on the splitting criteria stored at each node.
Parameters
----------
T : iTree object
Returns
-------
int
The depth reached by the data point.
"""
if T.ntype == 'exNode':
if T.size <= 1: return self.e
else:
self.e = self.e + c_factor(T.size)
return self.e
else:
p = T.p # Intercept for the hyperplane for splitting data at a given node.
n = T.n # Normal vector for the hyperplane for splitting data at a given node.
self.e += 1
if (self.x-p).dot(n) < 0:
self.path_list.append('L')
return self.find_path(T.left)
else:
self.path_list.append('R')
return self.find_path(T.right)
def all_branches(node, current=[], branches = None):
"""
Utility function used in generating a graph visualization. It returns all the branches of a given tree so they can be visualized.
Parameters
----------
node: Node object
Returns
-------
list
list of branches that were reached.
"""
current = current[:node.e]
if branches is None: branches = []
if node.ntype == 'inNode':
current.append('L')
all_branches(node.left, current=current, branches=branches)
current = current[:-1]
current.append('R')
all_branches(node.right, current=current, branches=branches)
else:
branches.append(current)
return branches
class EIF(BaseDetector):
"""
Extenstion to the basic isolation forest. Implementation of https://doi.org/10.1109/TKDE.2019.2947676. Code from https://github.com/sahandha/eif
"""
def __init__(self, n_trees = 100, max_samples=None, extension_level=None, n_jobs=1, normalize=True):
self.model_name = 'EIF'
self.n_trees = n_trees
self.max_samples = max_samples
self.extension_level = extension_level
self.n_jobs = n_jobs
self.normalize = normalize
def fit(self, X, y=None):
"""Fit detector. y is ignored in unsupervised methods.
Parameters
----------
X : numpy array of shape (n_samples, n_features)
The input samples.
y : Ignored
Not used, present for API consistency by convention.
Returns
-------
self : object
Fitted estimator.
"""
if self.max_samples is not None:
self.sample_size = int(self.max_samples * X.shape[0])
else:
self.sample_size = min(256, X.shape[0])
self.limit = int(np.ceil(np.log2(self.sample_size)))
# Extension level 0 resembles standard isolation forest. If unspecified (`null`), then `extension_level=X.shape[1] - 1
if self.extension_level is None:
self.extension_level = X.shape[1] - 1
# eif = iso.iForest(
# X,
# ntrees=self.n_trees,
# sample_size=self.sample_size,
# limit=self.limit,
# ExtensionLevel=self.extension_level,
# )
if self.normalize: X = zscore(X, axis=1, ddof=1)
eif = iForest(
X,
ntrees=self.n_trees,
sample_size=self.sample_size,
limit=self.limit,
ExtensionLevel=self.extension_level,
)
self.decision_scores_ = eif.compute_paths(X_in=X)
def decision_function(self, X):
"""
Not used, present for API consistency by convention.
"""
pass |