Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 24 additions & 0 deletions data/samples/sample_log_schema.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
{
"name": "CustomLogs",
"properties": {
"timestamp": {
"type": "string",
"format": "date",
"pattern": "(\\d\\d\\d\\d-([0-2])?\\d-([0-3])?\\dT?([0-2])?\\d:([0-5])?\\d:([0-5])?\\d\\.\\d?\\d?\\d?Z?)"
},
"some_property": {
"type": "string"
},
"some_other_property": {
"type": "string"
},
"an_integer_property": {
"type": "integer"
},
"a_numeric_property": {
"type": "string"
}
},
"required": ["timestamp", "some_property"],
"additionalProperties": false
}
215 changes: 0 additions & 215 deletions src/baskerville/models/anomaly_detector.py

This file was deleted.

156 changes: 156 additions & 0 deletions src/baskerville/models/anomaly_model.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,156 @@
from pyspark.ml.feature import StandardScaler, StandardScalerModel, StringIndexer, StringIndexerModel
from pyspark.ml.linalg import Vectors, VectorUDT
from pyspark.storagelevel import StorageLevel
from pyspark.sql import functions as F
from pyspark.sql.functions import array

from baskerville.models.model_interface import ModelInterface
from baskerville.spark.helpers import map_to_array
from baskerville.spark.udfs import to_dense_vector_udf
from pyspark_iforest.ml.iforest import IForest, IForestModel
import os
import numpy as np

from baskerville.util.file_manager import FileManager


class AnomalyModel(ModelInterface):

def __init__(self, feature_map_column='features',
features=None,
categorical_features=[],
prediction_column="prediction",
threshold=0.5,
score_column="score",
num_trees=100, max_samples=1.0, max_features=1.0, max_depth=10,
contamination=0.1, bootstrap=False, approximate_quantile_relative_error=0.,
seed=777,
scaler_with_mean=False, scaler_with_std=True):
super().__init__()
self.prediction_column = prediction_column
self.score_column = score_column
self.num_trees = num_trees
self.max_samples = max_samples
self.max_features = max_features
self.max_depth = max_depth
self.contamination = contamination
self.bootstrap = bootstrap
self.approximate_quantile_relative_error = approximate_quantile_relative_error
self.seed = seed
self.scaler_with_mean = scaler_with_mean
self.scaler_with_std = scaler_with_std
self.features = features
self.categorical_features = categorical_features
self.feature_map_column = feature_map_column

self.storage_level = StorageLevel.OFF_HEAP
self.scaler_model = None
self.iforest_model = None
self.threshold = threshold
self.indexes = None
self.features_values_column = 'features_values'
self.features_values_scaled = 'features_values_scaled'

def set_storage_level(self, storage_level):
self.storage_level = storage_level

def build_features_vectors(self, df):
res = map_to_array(
df,
map_col=self.feature_map_column,
array_col=self.features_values_column,
map_keys=self.features
).persist(self.storage_level)
df.unpersist()

return res.withColumn(
self.features_values_column,
to_dense_vector_udf(self.features_values_column)
)

def _create_indexes(self, df):
self.indexes = []
for c in self.categorical_features:
indexer = StringIndexer(inputCol=c, outputCol=f'{c}_index') \
.setHandleInvalid('keep') \
.setStringOrderType('alphabetAsc')
self.indexes.append(indexer.fit(df))

def _add_categorical_features(self, df, feature_column):
index_columns = []
for index_model in self.indexes:
df = index_model.transform(df)
index_columns.append(index_model.getOutputCol())

add_categories = F.udf(lambda features, arr: Vectors.dense(np.append(features, [v for v in arr])),
VectorUDT())
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍
Do you think this could be moved under spark/udfs, or is it one of those cases where we need to have it here for it to work properly?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would rather do the opposite and move to_dense_vector_udf from spark/udfs to anomaly_model.py and delete the import spark/udfs here. I feel like these 2 udfs are not going to be used anywhere and solely belong to the model implementation file. But you are right, we need to be consistent one way or another. It's not a big deal, I moved it.

df = df.withColumn('features_all', add_categories(feature_column, array(*index_columns))) \
.drop(*index_columns) \
.drop(feature_column) \
.withColumnRenamed('features_all', feature_column)
return df

def train(self, df):
df = self.build_features_vectors(df)

scaler = StandardScaler()
scaler.setInputCol(self.features_values_column)
scaler.setOutputCol(self.features_values_scaled)
scaler.setWithMean(self.scaler_with_mean)
scaler.setWithStd(self.scaler_with_std)
self.scaler_model = scaler.fit(df)
df = self.scaler_model.transform(df).persist(
self.storage_level
)
if len(self.categorical_features):
self._create_indexes(df)
self._add_categorical_features(df, self.features_values_scaled)

iforest = IForest(
featuresCol=self.features_values_scaled,
predictionCol=self.prediction_column,
# anomalyScore=self.score_column,
numTrees=self.num_trees,
maxSamples=self.max_samples,
maxFeatures=self.max_features,
maxDepth=self.max_depth,
contamination=self.contamination,
bootstrap=self.bootstrap,
approxQuantileRelativeError=self.approximate_quantile_relative_error,
numCategoricalFeatures=len(self.categorical_features)
)
iforest.setSeed(self.seed)
params = {'threshold': self.threshold}
self.iforest_model = iforest.fit(df, params)
df.unpersist()
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this the last part where df is used? (wondering if it makes sense to unpersist here)

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Right. Should not be here. I call persist both in train/predict in build_features_vectors. For train() we don't need it to be persisted anymore. I moved unpersist to the training pipeline. Do you think it's OK?


def predict(self, df):
df = self.build_features_vectors(df)
df = self.scaler_model.transform(df)
if len(self.categorical_features):
df = self._add_categorical_features(df, self.features_values_scaled)
df = self.iforest_model.transform(df)
df = df.withColumnRenamed('anomalyScore', self.score_column)
return df

def save(self, path, spark_session=None):
file_manager = FileManager(path, spark_session)
file_manager.save_to_file(self.get_params(), os.path.join(path, 'params.json'), format='json')
self.iforest_model.write().overwrite().save(os.path.join(path, 'iforest'))
self.scaler_model.write().overwrite().save(os.path.join(path, 'scaler'))

if len(self.categorical_features):
for feature, index in zip(self.categorical_features, self.indexes):
index.write().overwrite().save(os.path.join(path, 'indexes', feature))
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Aren't we saving anything in the Model 's table? Or did I miss it somewhere?


def load(self, path, spark_session=None):
self.iforest_model = IForestModel.load(os.path.join(path, 'iforest'))
self.scaler_model = StandardScalerModel().load(os.path.join(path, 'scaler'))
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Very very minor, I think StandardScalerModel doesn't need to be instantiated for load?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also we can avoid all of the os.path.join(path, 'iforest or scaler') by using: get_classifier_load_path(path) and get_scaler_load_path(path) (perhaps the naming is not the best for these functions.. 🤔 but anyway, I just thought we could avoid having the os.path... everywhere)

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done. +1


file_manager = FileManager(path, spark_session)
params = file_manager.load_from_file(os.path.join(path, 'params.json'), format='json')
self.set_params(**params)

self.indexes = []
for feature in self.categorical_features:
self.indexes.append(StringIndexerModel.load(os.path.join(path, 'indexes', feature)))
Loading