Skip to content

Commit

Permalink
update track 2
Browse files Browse the repository at this point in the history
  • Loading branch information
bsteenwi committed Apr 8, 2024
1 parent 981c872 commit e27aa0a
Show file tree
Hide file tree
Showing 19,027 changed files with 28,390,321 additions and 1 deletion.
The diff you're trying to view is too large. We only load the first 3000 changed files.
25 changes: 24 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@ Within the Data folder in this reposity, you will find DKGs seperated by timesta
This data was constructed from a e-commerce microservice platform. In a microservice cluster, different services work together to achieve the best possible way to adapt different software components to new unkown behaviour.



## <img src="https://eval.ai/dist/images/evalai-logo-single.png" alt="Track 1" width="25"/> [Track 1](https://eval.ai/web/challenges/challenge-page/2267/overview)
The goal of this track is to predict occuring anomalies within an unseen test set. The data can be found in the Track 1 folder. For each round, both the knowledge graphs, a label.csv file and an example submission can be found.</br>
Submissions can be made using the [challenge eval.ai platform](https://eval.ai/web/challenges/challenge-page/2267/overview)
Expand All @@ -26,4 +25,28 @@ In this round, training labels for 3 days are provided within this repository. T
To be announced

## Track 2
The objective of Track 2 is to identify anomalies promptly as they emerge. In the realm of dynamic knowledge graphs, new graphs are continuously generated over time. For each graph, a detector is tasked with assessing whether it exhibits anomalous behavior. Your challenge in this track is to create detectors capable of analyzing incoming graphs and detecting anomalies at each timestamp. </br>
</br>
To do this, we provide participants with a full system to replay dynamic graphs. The code can be found within the Track 2 folder.
Our system exists out of 3 components:
* A replayer, which is responsible for replay a dynamic graphs dataset
* A detector, anamoly detection module
* A montior, which is responbsile for scoring the detector against the ground truth labels

The whole system is based upon Kafka to send and receive messages. By doing this, the replayer functionality produces knowledge graph events at defined time intervals. The detector can listen or consume these knowledge graphs and produces new events based on the detectors' outcome (anomalous or not). The monitor consumes then these anomaly messages and compares them against the ground truth (a real operator or a predefined ground truth set). The monitor outputs the true positive, true negative and average time till detection</br>

To make sure everything works harmoniously together, all components are provided in Docker containers and a docker-compose.yaml file is available to automate all necessary building steps.

**While we have implemented the replayer and monitor, the goal for the participants is to create a detector Docker container.**
They can either extend the provided detector with new functionality or create a new one based on the provided template.

We would like to ask the participants to host their detector containers in a public repository (e.g., using Docker Hub) and send us this link through the provided Google submission forms. Evaluation against a hidden replay dataset for each round will be performed after the defined submission dates.

### Round 1
Submission can be made using [this form](https://forms.gle/sQDAZAzNdRZBt4SXA)

### Round 2
To be announced



16 changes: 16 additions & 0 deletions Track 2/README
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
## Track 2

Within each round folder, you will find a detector, monitor and replay folder containing all necessary code to create a streaming knowledge graph environment.

Within the replayer folder, the replay.py functionality can be adapted to your needs.
You can:
* Load different data segments based on regions where anomalies do occur
* Change the time between knowledge graph events, to speed up local evaluations.

Do take into account that the challenge organizers will us the default replayer functionality to score the different systems.</br>

Two docker compose yaml files are also made available:
* a full-application file, that can build and run the detector, monitor and replayer + kafka in one
* a simpler docker compose file, that only exploits the kafka functionality for local development. Running this docker compose file enables participants to start the replayer, monitor and detector as individual python runs, for debugging purposes outisde the docker environment.

Participants can extend or create a new detector based on the code within the detector folder. Participants are allowed to use other languages than Python to create their detectors. Please contact the challenge organizers through slack if you need additional assitance.
16 changes: 16 additions & 0 deletions Track 2/Round 1/detector/Dockerfile
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
FROM python:3.11-slim-bookworm

# Set the working directory inside the container
WORKDIR /app

# Copy the requirements file into the container
COPY requirements.txt .

# Install the Python dependencies
RUN pip install --no-cache-dir -r requirements.txt

# Copy the rest of the application code into the container
COPY . .

# Define the command to run your application
CMD ["python","-u", "consumer.py"]
Binary file not shown.
7 changes: 7 additions & 0 deletions Track 2/Round 1/detector/anomaly_detector.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
def determine_anomaly(graph_timestamp, graph_content):
### Load graph

## Determine anomlous behaviour

### return boolean + graph timestamp
return True, graph_timestamp
27 changes: 27 additions & 0 deletions Track 2/Round 1/detector/consumer.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
from kafka import KafkaConsumer, KafkaProducer
from anomaly_detector import determine_anomaly
import os
import json

# Kafka broker configuration
bootstrap_servers = os.environ.get("KAFKA_BOOTSTRAP_SERVERS", "localhost:9092")
input_topic = 'kg'
output_topic = 'result'

# Function to consume files from input topic, process them, and produce result to output topic
def process_files(consumer, producer):
print("Ready, start listening:")
for message in consumer:
event_timestamp = message.timestamp
data = json.loads(message.value.decode('utf-8'))
anomaly, graph_timestamp = determine_anomaly(data['timestamp'], data['content'])
if anomaly:
anomaly_message = {'event_ts':event_timestamp, 'graph_ts':graph_timestamp}
producer.send(output_topic, value=json.dumps(anomaly_message).encode('utf-8'))
producer.flush()


if __name__ == '__main__':
consumer = KafkaConsumer(input_topic, bootstrap_servers=bootstrap_servers)
producer = KafkaProducer(bootstrap_servers=bootstrap_servers)
process_files(consumer, producer)
1 change: 1 addition & 0 deletions Track 2/Round 1/detector/requirements.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
kafka-python==2.0.2
73 changes: 73 additions & 0 deletions Track 2/Round 1/docker-compose-full-application.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
version: '2.1'

services:
consumer:
build:
context: ./detector
dockerfile: ./Dockerfile
depends_on:
kafka1:
condition: service_healthy
environment:
KAFKA_BOOTSTRAP_SERVERS: kafka1:19092

replayer:
build:
context: ./replayer
dockerfile: ./Dockerfile
depends_on:
kafka1:
condition: service_healthy
environment:
KAFKA_BOOTSTRAP_SERVERS: kafka1:19092

monitor:
build:
context: ./monitor
dockerfile: ./Dockerfile
depends_on:
kafka1:
condition: service_healthy
environment:
KAFKA_BOOTSTRAP_SERVERS: kafka1:19092

zoo1:
image: confluentinc/cp-zookeeper:7.3.2
hostname: zoo1
container_name: zoo1
ports:
- "2181:2181"
environment:
ZOOKEEPER_CLIENT_PORT: 2181
ZOOKEEPER_SERVER_ID: 1
ZOOKEEPER_SERVERS: zoo1:2888:3888

kafka1:
image: confluentinc/cp-kafka:7.3.2
hostname: kafka1
container_name: kafka1
ports:
- "9092:9092"
- "29092:29092"
- "9999:9999"
environment:
KAFKA_ADVERTISED_LISTENERS: INTERNAL://kafka1:19092,EXTERNAL://${DOCKER_HOST_IP:-127.0.0.1}:9092,DOCKER://host.docker.internal:29092
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: INTERNAL:PLAINTEXT,EXTERNAL:PLAINTEXT,DOCKER:PLAINTEXT
KAFKA_INTER_BROKER_LISTENER_NAME: INTERNAL
KAFKA_ZOOKEEPER_CONNECT: "zoo1:2181"
KAFKA_BROKER_ID: 1
KAFKA_LOG4J_LOGGERS: "kafka.controller=INFO,kafka.producer.async.DefaultEventHandler=INFO,state.change.logger=INFO"
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1
KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1
KAFKA_JMX_PORT: 9999
KAFKA_JMX_HOSTNAME: ${DOCKER_HOST_IP:-127.0.0.1}
KAFKA_AUTHORIZER_CLASS_NAME: kafka.security.authorizer.AclAuthorizer
KAFKA_ALLOW_EVERYONE_IF_NO_ACL_FOUND: "true"
healthcheck:
test: kafka-topics --bootstrap-server kafka1:29092 --list
interval: 5s
timeout: 10s
retries: 3
depends_on:
- zoo1
38 changes: 38 additions & 0 deletions Track 2/Round 1/docker-compose.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
version: '2.1'

services:
zoo1:
image: confluentinc/cp-zookeeper:7.3.2
hostname: zoo1
container_name: zoo1
ports:
- "2181:2181"
environment:
ZOOKEEPER_CLIENT_PORT: 2181
ZOOKEEPER_SERVER_ID: 1
ZOOKEEPER_SERVERS: zoo1:2888:3888

kafka1:
image: confluentinc/cp-kafka:7.3.2
hostname: kafka1
container_name: kafka1
ports:
- "9092:9092"
- "29092:29092"
- "9999:9999"
environment:
KAFKA_ADVERTISED_LISTENERS: INTERNAL://kafka1:19092,EXTERNAL://${DOCKER_HOST_IP:-127.0.0.1}:9092,DOCKER://host.docker.internal:29092
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: INTERNAL:PLAINTEXT,EXTERNAL:PLAINTEXT,DOCKER:PLAINTEXT
KAFKA_INTER_BROKER_LISTENER_NAME: INTERNAL
KAFKA_ZOOKEEPER_CONNECT: "zoo1:2181"
KAFKA_BROKER_ID: 1
KAFKA_LOG4J_LOGGERS: "kafka.controller=INFO,kafka.producer.async.DefaultEventHandler=INFO,state.change.logger=INFO"
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1
KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1
KAFKA_JMX_PORT: 9999
KAFKA_JMX_HOSTNAME: ${DOCKER_HOST_IP:-127.0.0.1}
KAFKA_AUTHORIZER_CLASS_NAME: kafka.security.authorizer.AclAuthorizer
KAFKA_ALLOW_EVERYONE_IF_NO_ACL_FOUND: "true"
depends_on:
- zoo1
16 changes: 16 additions & 0 deletions Track 2/Round 1/monitor/Dockerfile
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
FROM python:3.11-slim-bookworm

# Set the working directory inside the container
WORKDIR /app

# Copy the requirements file into the container
COPY requirements.txt .

# Install the Python dependencies
RUN pip install --no-cache-dir -r requirements.txt

# Copy the rest of the application code into the container
COPY . .

# Define the command to run your application
CMD ["python","-u", "monitor.py"]
71 changes: 71 additions & 0 deletions Track 2/Round 1/monitor/monitor.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
from kafka import KafkaConsumer, KafkaProducer
import json
import os
import pandas as pd

# Kafka broker configuration
bootstrap_servers = os.environ.get("KAFKA_BOOTSTRAP_SERVERS", "localhost:9092")
input_topic = 'result'

found_true_positive = set()
found_false_positive = set()
true_regions_frame = pd.read_csv('train_labels.csv')[['started_at','finished_at', 'fault_name']]

true_positives = 0
false_positives = 0
average_detection_time = 0

def calculate_score(graph_timestamp, false_pos_region_delta=300):
global average_detection_time, true_positives, false_positives
true_regions = true_regions_frame.values
timestamp_found = False
# Iterate over each true window
for true_start, true_end, _ in true_regions:
# Check if the predicted timestamp falls within the current true window
if int(pd.Timestamp(true_start).timestamp()) <= graph_timestamp <= int(pd.Timestamp(true_end).timestamp()):
# If it falls within the window and a true positive within the window was already found ,
# just continue, this are predictions related and close to eachother
if (true_start, true_end) in found_true_positive:
timestamp_found = True
break
# If it falls within the window and a true positive within the window is not found yet,
# count it as a true positive and set the flag to True
true_positives += 1
found_true_positive.add((true_start, true_end))
timestamp_found=True
break
# If the predicted timestamp falls outside the window, count it as a false positive
if not timestamp_found:
# but check if a previous false positive was within the range of an already detected false positive
in_region = False
for region in found_false_positive:
if region[0] <= graph_timestamp <= region[1]:
#if so, we just neglect it
in_region = True
break
if not in_region:
# otherwise, we add it, together with its range
false_positives += 1
if false_pos_region_delta is not None:
found_false_positive.add((graph_timestamp, graph_timestamp+false_pos_region_delta))

# Function to consume files from input topic, process them, and produce result to output topic
def process_files(consumer):
global average_detection_time, true_positives, false_positives
print("Ready, start listening:")
for message in consumer:
event_timestamp = message.timestamp
data = json.loads(message.value.decode('utf-8'))
if average_detection_time == 0:
average_detection_time = event_timestamp-data['event_ts']
else:
average_detection_time = (average_detection_time+event_timestamp-data['event_ts'])/2

calculate_score(data['graph_ts'])

print(true_positives, false_positives, average_detection_time)


if __name__ == '__main__':
consumer = KafkaConsumer(input_topic, bootstrap_servers=bootstrap_servers)
process_files(consumer)
2 changes: 2 additions & 0 deletions Track 2/Round 1/monitor/requirements.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
kafka-python==2.0.2
pandas==2.0.3
Loading

0 comments on commit e27aa0a

Please sign in to comment.