Skip to content

Commit

Permalink
feat: anomaly detection simulator (#163) (#2010)
Browse files Browse the repository at this point in the history
* feat: anomaly detection simulator (#163)
  • Loading branch information
baturayo authored Feb 6, 2024
1 parent 70b8753 commit 1d2e8ac
Show file tree
Hide file tree
Showing 13 changed files with 646 additions and 3 deletions.
6 changes: 6 additions & 0 deletions .streamlit/config.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
[theme]
primaryColor = "#00D891" # Primary color
backgroundColor = "#F5F7F7" # Background color
# secondaryBackgroundColor = "#00D891" # Color for the sidebar and other secondary backgrounds
textColor = "#262730" # Primary text color
font = "sans serif" # Font style (e.g., "sans serif", "serif", "monospace")
2 changes: 1 addition & 1 deletion requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
./soda/spark[odbc]
./soda/spark[databricks]
./soda/spark_df
./soda/scientific
./soda/scientific[simulator]
./soda/sqlserver
./soda/mysql
./soda/dask
Expand Down
59 changes: 59 additions & 0 deletions soda/core/soda/cli/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@
from __future__ import annotations

import logging
import os
import subprocess
import sys
from datetime import datetime, timezone
from pathlib import Path
Expand Down Expand Up @@ -608,6 +610,63 @@ def test_connection(
sys.exit(result)


@main.command(
short_help="Simulates anomaly detection parameters",
)
@click.option(
"-c",
"--configuration",
required=True,
multiple=True,
type=click.STRING,
)
@soda_trace
def simulate_anomaly_detection(configuration: list[str]) -> None:
try:
pass
except ImportError:
logging.error(
" soda-scientific[simulator] is not installed. "
"Please install the simulator sub package by running the following command: \n"
' pip install "soda-scientific[simulator]" -i https://pypi.cloud.soda.io'
)
return
configure_logging()

# Test whether the configuration file exists
fs = file_system()
scan = Scan()
for configuration_path in configuration:
if not fs.exists(configuration_path):
logging.error(
f"Configuration File Path Error: "
"Configuration path '{configuration_path}' does not exist. "
"Please provide a valid configuration file path. Exiting.."
)
return
scan.add_configuration_yaml_file(file_path=configuration_path)
try:
scan._configuration.soda_cloud.login()
except Exception as e:
logging.error(
"Soda Cloud Authentication Error: "
"Unable to login to Soda Cloud. Please provide a valid Soda Cloud credentials. "
f"\n{e}"
)
return
# This file path using Pathlib
logging.info("Starting Soda Anomaly Detection Simulator.. It might take a few seconds to start.")

# set environment variable SODA_CONFIG_FILE_PATH to the path of your configuration file
os.environ["SODA_CONFIG_FILE_PATH"] = configuration[0]

file_path = Path(__file__).parent.absolute()
src_dir = file_path.parent.parent.parent.absolute()
streamlit_app_path = src_dir / "scientific" / "soda" / "scientific" / "anomaly_detection_v2" / "simulate" / "app.py"

subprocess.run(["streamlit", "run", streamlit_app_path])


def __execute_query(connection, sql: str) -> list[tuple]:
try:
cursor = connection.cursor()
Expand Down
2 changes: 1 addition & 1 deletion soda/core/soda/cloud/soda_cloud.py
Original file line number Diff line number Diff line change
Expand Up @@ -312,7 +312,7 @@ def _get_historic_check_results(self, hd: HistoricCheckResultsDescriptor):
query_name="get_hisotric_check_results",
)

def _get_token(self):
def _get_token(self) -> str:
if not self.token:
login_command = {"type": "login"}
if self.api_key_id and self.api_key_secret:
Expand Down
9 changes: 8 additions & 1 deletion soda/scientific/setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,11 @@
"prophet>=1.1.5,<2.0.0",
]

# TODO Fix the params
simulator_deps = [
"streamlit>=1.30.0,<2.0.0",
"plotly>=5.18.0",
]

setup(
name=package_name,
version=package_version,
Expand All @@ -28,4 +32,7 @@
package_data={
"": ["detector_config.yaml"],
},
extras_require={
"simulator": simulator_deps,
},
)
15 changes: 15 additions & 0 deletions soda/scientific/soda/scientific/anomaly_detection_v2/exceptions.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,3 +31,18 @@ class WindowLengthError(Exception):
To be raised and passed as a result error message down the line.
"""


class AuthenticationException(Exception):
"""Thrown in case of authentication failure.
To be raised and passed as a result error message down the line.
"""


class CheckIDNotFoundException(Exception):
"""Thrown in case of check id not found.
class WindowLengthError(Exception):
To be raised and passed as a result error message down the line.
"""
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
from __future__ import annotations

import json
import os
from typing import Any, Dict

import requests
from soda.scan import Scan

from soda.scientific.anomaly_detection_v2.exceptions import (
AuthenticationException,
CheckIDNotFoundException,
)
from soda.scientific.anomaly_detection_v2.pydantic_models import (
AnomalyHistoricalMeasurement,
)


class AnomalyDetectionData:
def __init__(self, check_id: str) -> None:
self.check_id = check_id
self.check_results = self.get_check_results()
self.measurements = self.create_measurements()

def get_check_results(self) -> Dict[str, Any]:
soda_cloud_token, api_url = self.get_soda_cloud_token_and_api_url()
url = f"{api_url}/query?testResults"
payload = json.dumps({"type": "testResults", "token": soda_cloud_token, "testId": self.check_id})
headers = {
"Content-Type": "application/json",
"Accept": "application/json",
}
response = requests.request("POST", url, headers=headers, data=payload)
if response.status_code == 404:
raise CheckIDNotFoundException(
f"Check ID {self.check_id} does not point to an existing "
"check or points to a check that you do not have access to. "
"Please verify that the check URL is correct and "
"that you have access to it."
)
check_results = response.json()

# Sort check_results by scanTime
check_results["results"] = sorted(check_results["results"], key=lambda k: k["scanTime"])
return check_results

@staticmethod
def get_soda_cloud_token_and_api_url() -> tuple[str, str]:
config_file_path = os.getenv("SODA_CONFIG_FILE_PATH")
scan = Scan()
scan.add_configuration_yaml_file(file_path=config_file_path)
soda_cloud = scan._configuration.soda_cloud
try:
soda_cloud_token = soda_cloud._get_token()
except AttributeError:
raise AuthenticationException(
f"Soda Cloud token not found. Please check your {config_file_path}"
" file and make sure you have a valid api_key_id and api_key_secret."
)
api_url = soda_cloud.api_url
return soda_cloud_token, api_url

def create_measurements(self) -> Dict[str, Any]:
measurements = {
"results": [
AnomalyHistoricalMeasurement(
id=check_result.get("measurementId", "dummy_id"),
identity="dummy_identity",
value=check_result["value"],
dataTime=check_result["scanTime"],
).model_dump()
for check_result in self.check_results["results"]
]
}
return measurements
Loading

0 comments on commit 1d2e8ac

Please sign in to comment.