Skip to content
Open
Show file tree
Hide file tree
Changes from 57 commits
Commits
Show all changes
66 commits
Select commit Hold shift + click to select a range
7d0974f
IPCache class added(WIP)
mazhurin Sep 23, 2020
783f4ba
ip_cache first experiment.
mazhurin Sep 23, 2020
ddd8228
Merge branch 'develop' into ip_cache
mazhurin Sep 23, 2020
ee9ba36
ip_cache logs + single update()
mazhurin Sep 23, 2020
0f7b001
ip_cache update() fix
mazhurin Sep 23, 2020
0cbc3f6
ip_cache log typo fix
mazhurin Sep 23, 2020
b97ed33
ip_cache persist added
mazhurin Sep 23, 2020
4562e8c
no counts in logging
mazhurin Sep 23, 2020
843cb61
last count() in logging removed
mazhurin Sep 23, 2020
6cf4d8d
spark ip_cache replaced with the local cache
mazhurin Sep 24, 2020
99b0e01
ip cache log message
mazhurin Sep 24, 2020
fac8c39
ip cache log message
mazhurin Sep 24, 2020
ab6fbf4
Full cache exception.
mazhurin Sep 24, 2020
6bc8997
Full cache warning instead of exception.
mazhurin Sep 24, 2020
9762959
IP cache persisting
mazhurin Sep 25, 2020
e528a45
logs message corrected with total
mazhurin Sep 25, 2020
39addb2
creating ip cache folder if not exists
mazhurin Sep 25, 2020
0c5481e
Thread safe singleton ip cache
mazhurin Sep 25, 2020
941107c
config fix in banjax_report
mazhurin Sep 25, 2020
ef69ce2
banjax_report with cache experiment
mazhurin Sep 25, 2020
6a7e961
banjax_report logs
mazhurin Sep 25, 2020
0564307
banjax_report logs type
mazhurin Sep 25, 2020
8204a97
incrementing fails in ip_cache
mazhurin Sep 25, 2020
0dc3507
logs if ip is not in the cache
mazhurin Sep 25, 2020
f966986
cache error handling
mazhurin Sep 25, 2020
05a1532
cache error handling 2
mazhurin Sep 25, 2020
4421776
Typo fix
mazhurin Sep 28, 2020
147427e
Merge branch 'develop' into ip_cache
mazhurin Sep 28, 2020
16824bf
Merge branch 'develop' into ip_cache
mazhurin Sep 28, 2020
c5fae7f
Setting ip_cache ttl when loading from a file.
mazhurin Sep 29, 2020
57d903f
rollback of setting ip_cache ttl when loading from a file.
mazhurin Sep 29, 2020
b86acc3
Simplifying IPCache constructor
mazhurin Sep 30, 2020
d5f4a23
Challenged column added to request_set table.
mazhurin Sep 30, 2020
0213b6e
typo
mazhurin Sep 30, 2020
2fad4a8
typo 2
mazhurin Sep 30, 2020
42c10eb
Sql Update for ip failed message.
mazhurin Sep 30, 2020
3e27d77
Sql Update typo fix
mazhurin Sep 30, 2020
288be2d
num_fails = 1
mazhurin Sep 30, 2020
0905262
White list fix.
mazhurin Oct 1, 2020
a7082bd
Empty challenge fix
mazhurin Oct 1, 2020
54cb126
Sliding window fix.
mazhurin Oct 1, 2020
4ec694c
White list fix. Challenged default zero fix.
mazhurin Oct 1, 2020
47ddd3d
Low rate attack default zero fix.
mazhurin Oct 1, 2020
68beba2
Low rate attack default zero fix 2.
mazhurin Oct 1, 2020
f765d6d
Low rate attack default zero fix 3.
mazhurin Oct 1, 2020
d470e39
Banned/passed reports from banjax
mazhurin Oct 5, 2020
8ba115b
Extra loggint removed
mazhurin Oct 5, 2020
2a95243
Report processing is commented out for now.
mazhurin Oct 5, 2020
ca697d3
Two ip caches: passed and pending
mazhurin Oct 6, 2020
129d6a5
a log removed
mazhurin Oct 6, 2020
e4005b7
Fix in ip_cache_passed processing
mazhurin Oct 6, 2020
fa28f7c
Fix in return ip_cache_passed processing
mazhurin Oct 6, 2020
9b2cd9a
Remove from pending if banned.
mazhurin Oct 6, 2020
89570ad
Saving ip_passed cache in the file.
mazhurin Oct 7, 2020
ca5416a
start report consumer even without -e
mazhurin Oct 7, 2020
a111070
Banjax thread moved into AttackDetectin task
mazhurin Oct 8, 2020
b9369c9
Unit test fix for permit in request_set_cache
mazhurin Oct 14, 2020
df9e7f5
IP_cache: docstring added, init_cache method. Switched to _pickle.
mazhurin Oct 15, 2020
074293e
The new version of spark-iforest
mazhurin Oct 19, 2020
d302c1e
Full path pending fix.
mazhurin Oct 19, 2020
d254ca6
Full path pending fix 2.
mazhurin Oct 19, 2020
f623ac4
White list ips optimized.
mazhurin Oct 20, 2020
3e7990c
Column name fix in host white list.
mazhurin Oct 20, 2020
7217a7a
IP cache update() accept a list of ips now.
mazhurin Oct 20, 2020
1a47421
Host white listing is moved to send_challenge()
mazhurin Oct 20, 2020
37c6e31
empty host white list fix
mazhurin Oct 23, 2020
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -39,3 +39,4 @@ plotly==4.5.0
pdoc==0.3.2
markdown>=3.0
kafka-python==2.0.1
cachetools
1 change: 1 addition & 0 deletions requirements_unit_tests.txt
Original file line number Diff line number Diff line change
Expand Up @@ -34,3 +34,4 @@ isoweek==1.3.3
pdoc==0.3.2
spark-testing-base
kafka-python==2.0.1
cachetools
5 changes: 5 additions & 0 deletions src/baskerville/db/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,10 @@ class RequestSet(Base, SerializableMixin):
process_flag = Column(Boolean, default=True)
prediction = Column(Integer)
attack_prediction = Column(Integer)
challenged = Column(Integer)
challenge_failed = Column(Integer)
challenge_passed = Column(Integer)
banned = Column(Integer)
low_rate_attack = Column(Integer)
score = Column(Float)
features = Column(JSON)
Expand Down Expand Up @@ -159,6 +163,7 @@ class RequestSet(Base, SerializableMixin):
'prediction',
'attack_prediction',
'low_rate_attack',
'challenged',
'score',
'label',
'id_attribute',
Expand Down
89 changes: 80 additions & 9 deletions src/baskerville/models/banjax_report_consumer.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,15 @@
import datetime
import threading
import json
from kafka import KafkaConsumer, KafkaProducer
import time
import logging
import sys
import types

from baskerville.db import set_up_db
from baskerville.models.config import KafkaConfig
from baskerville.models.ip_cache import IPCache
from baskerville.util.helpers import parse_config
import argparse
import os
Expand Down Expand Up @@ -35,9 +39,12 @@ class BanjaxReportConsumer(object):
"proxy.process.eventloop.time.max"
]

def __init__(self, kafka_config, logger):
self.config = kafka_config
def __init__(self, config, logger):
self.config = config
self.kafka_config = config.kafka
self.logger = logger
self.ip_cache = IPCache(config, self.logger)
self.session, self.engine = set_up_db(config.database.__dict__)

# XXX i think the metrics registry swizzling code is passing
# an extra argument here mistakenly?.?.
Expand All @@ -49,14 +56,14 @@ def _tmp_fun(_, _2, message):

def run(self):
consumer = KafkaConsumer(
self.config.banjax_report_topic,
self.kafka_config.banjax_report_topic,
group_id=None,
bootstrap_servers=self.config.bootstrap_servers,
security_protocol=self.config.security_protocol,
ssl_check_hostname=self.config.ssl_check_hostname,
ssl_cafile=self.config.ssl_cafile,
ssl_certfile=self.config.ssl_certfile,
ssl_keyfile=self.config.ssl_keyfile,
bootstrap_servers=self.kafka_config.bootstrap_servers,
security_protocol=self.kafka_config.security_protocol,
ssl_check_hostname=self.kafka_config.ssl_check_hostname,
ssl_cafile=self.kafka_config.ssl_cafile,
ssl_certfile=self.kafka_config.ssl_certfile,
ssl_keyfile=self.kafka_config.ssl_keyfile,
)

for message in consumer:
Expand Down Expand Up @@ -91,8 +98,72 @@ def consume_message(self, message):
# 'ip_failed_challenge'-type messages are reported when a challenge is failed
elif d.get("name") == "ip_failed_challenge":
self.consume_ip_failed_challenge_message(d)
elif d.get("name") == "ip_passed_challenge":
self.consume_ip_passed_challenge_message(d)
elif d.get("name") == "ip_banned":
self.consume_ip_banned_message(d)

def get_time_filter(self):
return (datetime.datetime.utcnow() - datetime.timedelta(
minutes=self.config.engine.banjax_sql_update_filter_minutes)).strftime("%Y-%m-%d %H:%M:%S %z")

def consume_ip_failed_challenge_message(self, message):
ip = message['value_ip']
num_fails = self.ip_cache.ip_failed_challenge(ip)
if num_fails == 0:
return message

try:
if num_fails >= self.config.engine.banjax_num_fails_to_ban:
self.ip_cache.ip_banned(ip)
sql = f'update request_sets set banned = 1 where ' \
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Have you tested the performance of update? I think we could consider having a separate table for the banjax bans , since they will be a lot less rows than request sets.
Also, do you use sql strings because of better performance?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  • No, I did not test the performance. Just monitor the performance of the postprocessing pipeline.
  • Not for performance. I was a bit concerned about that mysterious 1h shift issue and thought that SQL update with explicit time is solid and maybe more readable.

f'stop > \'{self.get_time_filter()}\' and challenged = 1 and ip = \'{ip}\''
else:
sql = f'update request_sets set challenge_failed = {num_fails} where ' \
f'stop > \'{self.get_time_filter()}\' and challenged = 1 and ip = \'{ip}\''

self.session.execute(sql)
self.session.commit()

except Exception:
self.session.rollback()
self.logger.error(Exception)
raise

return message

def consume_ip_passed_challenge_message(self, message):
ip = message['value_ip']
processed = self.ip_cache.ip_passed_challenge(ip)
if not processed:
return message
try:
sql = f'update request_sets set challenge_passed = 1 where ' \
f'stop > \'{self.get_time_filter()}\' and challenged = 1 and ip = \'{ip}\''
self.session.execute(sql)
self.session.commit()

except Exception:
self.session.rollback()
self.logger.error(Exception)
raise

return message

def consume_ip_banned_message(self, message):
ip = message['value_ip']
self.logger.info(f'Banjax ip_banned {ip} ...')
try:
sql = f'update request_sets set banned = 1 where ' \
f'stop > \'{self.get_time_filter()}\' and challenged = 1 and ip = \'{ip}\''
self.session.execute(sql)
self.session.commit()

except Exception:
self.session.rollback()
self.logger.error(Exception)
raise

return message


Expand Down
8 changes: 8 additions & 0 deletions src/baskerville/models/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -270,7 +270,15 @@ class EngineConfig(Config):
sliding_window = 360
low_rate_attack_period = [600, 3600]
low_rate_attack_total_request = [400, 2000]
ip_cache_passed_challenge_ttl = 60*60*24 # 24h
ip_cache_passed_challenge_size = 100000
ip_cache_pending_ttl = 60*60*1 # 1h
ip_cache_pending_size = 100000

white_list = None
banjax_sql_update_filter_minutes = 30
banjax_num_fails_to_ban = 9
register_banjax_metrics = False

def __init__(self, config, parent=None):
super(EngineConfig, self).__init__(config, parent)
Expand Down
56 changes: 1 addition & 55 deletions src/baskerville/models/engine.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,7 @@
#
# This source code is licensed under the BSD-style license found in the
# LICENSE file in the root directory of this source tree.
import threading

from baskerville.models.banjax_report_consumer import BanjaxReportConsumer
from baskerville.models.base import BaskervilleBase
from baskerville.models.config import BaskervilleConfig
from baskerville.models.pipeline_factory import PipelineFactory
Expand Down Expand Up @@ -34,9 +32,7 @@ def __init__(self, run_type, conf, register_metrics=True):
)
self.config = BaskervilleConfig(self.config).validate()

self.register_metrics = (
self.config.engine.metrics and register_metrics
)
self.register_metrics = self.config.engine.metrics and register_metrics

self.logger = get_logger(
self.__class__.__name__,
Expand Down Expand Up @@ -216,44 +212,6 @@ def register_pipeline_metrics(self):

self.logger.info('Registered metrics.')

def register_banjax_metrics(self):
from baskerville.util.enums import MetricClassEnum

def incr_counter_for_ip_failed_challenge(metric, self, return_value):
metric.labels(return_value.get('value_ip'), return_value.get('value_site')).inc()
return return_value

consume_ip_failed_challenge_message = metrics_registry.register_action_hook(
self.report_consumer.consume_ip_failed_challenge_message,
incr_counter_for_ip_failed_challenge,
metric_name='ip_failed_challenge_on_website',
metric_cls=MetricClassEnum.counter,
labelnames=['ip', 'website']
)

setattr(self.report_consumer, 'consume_ip_failed_challenge_message', consume_ip_failed_challenge_message)

for field_name in self.report_consumer.status_message_fields:
target_method = getattr(self.report_consumer, f"consume_{field_name}")

def setter_for_field(field_name_inner):
def label_with_id_and_set(metric, self, return_value):
metric.labels(return_value.get('id')).set(return_value.get(field_name_inner))
return return_value

return label_with_id_and_set

patched_method = metrics_registry.register_action_hook(
target_method,
setter_for_field(field_name),
metric_name=field_name.replace('.', '_'),
metric_cls=MetricClassEnum.gauge,
labelnames=['banjax_id']
)

setattr(self.report_consumer, f"consume_{field_name}", patched_method)
self.logger.info(f"Registered metric for {field_name}")

def run(self) -> None:
"""
Run steps:
Expand All @@ -266,14 +224,6 @@ def run(self) -> None:
self.pipeline = self._set_up_pipeline()
self.pipeline.initialize()

# self._register_metrics()

if self.register_metrics:
self.report_consumer = BanjaxReportConsumer(self.config.kafka, self.logger)
self.register_banjax_metrics()
self.banjax_thread = threading.Thread(target=self.report_consumer.run)
self.banjax_thread.start()

self.pipeline.run()

def finish_up(self):
Expand All @@ -286,10 +236,6 @@ def finish_up(self):
if self.pipeline:
self.pipeline.finish_up()

if self.banjax_thread:
self.banjax_thread.kill()
self.banjax_thread.join()

self.logger.info('{} says \'Goodbye\'.'.format(
self.__class__.__name__
)
Expand Down
112 changes: 112 additions & 0 deletions src/baskerville/models/ip_cache.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,112 @@
# Copyright (c) 2020, eQualit.ie inc.
# All rights reserved.
#
# This source code is licensed under the BSD-style license found in the
# LICENSE file in the root directory of this source tree.

import os
import pickle
import threading

from cachetools import TTLCache

from baskerville.util.helpers import get_default_ip_cache_path
from baskerville.util.singleton_thread_safe import SingletonThreadSafe


class IPCache(metaclass=SingletonThreadSafe):
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍 for the ThreadSafe and all the lock usage, cool!


def __init__(self, config, logger):
super().__init__()

self.logger = logger
self.lock = threading.Lock()

folder_path = get_default_ip_cache_path()
if not os.path.exists(folder_path):
os.mkdir(folder_path)

self.full_path_passed_challenge = os.path.join(folder_path, 'ip_cache_passed_challenge.bin')
if os.path.exists(self.full_path_passed_challenge):
self.logger.info(f'Loading passed challenge IP cache from file {self.full_path_passed_challenge}...')
with open(self.full_path_passed_challenge, 'rb') as f:
self.cache_passed = pickle.load(f)
else:
self.cache_passed = TTLCache(
maxsize=config.engine.ip_cache_passed_challenge_size,
ttl=config.engine.ip_cache_passed_challenge_ttl)
self.logger.info('A new instance of passed challege IP cache has been created')

self.full_path_pending = os.path.join(folder_path, 'ip_cache_pending.bin')
if os.path.exists(self.full_path_pending):
self.logger.info(f'Loading pending challenge IP cache from file {self.full_path_pending}...')
with open(self.full_path_pending, 'rb') as f:
self.cache_pending = pickle.load(f)
else:
self.cache_pending = TTLCache(
maxsize=config.engine.ip_cache_pending_size,
ttl=config.engine.ip_cache_pending_ttl)
self.logger.info('A new instance of pending IP cache has been created')

def update(self, records):
with self.lock:
self.logger.info('IP cache updating...')
if len(self.cache_passed) > 0.98 * self.cache_passed.maxsize:
self.logger.warning('IP cache passed challenge is 98% full. ')
if len(self.cache_pending) > 0.98 * self.cache_pending.maxsize:
self.logger.warning('IP cache pending challenge is 98% full. ')
result = []
for r in records:
if r['ip'] not in self.cache_passed and r['ip'] not in self.cache_pending:
result.append(r)

for r in result:
self.cache_pending[r['ip']] = {
'fails': 0
}

with open(self.full_path_pending, 'wb') as f:
pickle.dump(self.cache_pending, f)
self.logger.info(f'IP cache pending: {len(self.cache_pending)}, {len(result)} added')

return result

def ip_failed_challenge(self, ip):
with self.lock:
if ip not in self.cache_pending.keys():
return 0

try:
value = self.cache_pending[ip]
value['fails'] += 1
num_fails = value['fails']
self.cache_pending['ip'] = value
return num_fails

except KeyError as er:
self.logger.info(f'IP cache key error {er}')
pass

def ip_passed_challenge(self, ip):
with self.lock:
if ip in self.cache_passed.keys():
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need to use .keys()? Doesn't the cache implement __contains__ ? Keys is a generator, so it is less performant than let's say item in dict. (I hope this makes sense, I haven't looked into the TTL cache implementation)

return False
if ip not in self.cache_pending.keys():
return False
self.cache_passed[ip] = self.cache_pending[ip]
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok I see that we use the cache as a dict, so the above comment makes more sense to me at least 😛

del self.cache_pending[ip]
self.logger.info(f'IP {ip} passed challenge. Total IP in cache_passed: {len(self.cache_passed)}')

with open(self.full_path_passed_challenge, 'wb') as f:
pickle.dump(self.cache_passed, f)
self.logger.info(f'IP cache passed: {len(self.cache_passed)}, 1 added')
return True

def ip_banned(self, ip):
with self.lock:
try:
del self.cache_pending[ip]

except KeyError as er:
self.logger.info(f'IP cache key error {er}')
pass
Loading