-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathmain.py
More file actions
27 lines (19 loc) · 774 Bytes
/
main.py
File metadata and controls
27 lines (19 loc) · 774 Bytes
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
from event_analyzer.generator import event_stream
from event_analyzer.processor import EventProcessor
from event_analyzer.detector import BurstDetector
def run():
processor = EventProcessor(window_seconds=30)
error_detector = BurstDetector(threshold=5)
try:
for event in event_stream(delay_seconds=1):
processor.process_event(event)
error_count = processor.count("ERROR")
if error_detector.check(error_count):
print(
f"[HIGH] ERROR_BURST: {error_count} ERROR events in sliding window"
)
except KeyboardInterrupt:
print("\nShutting down cleanly.")
print("Final counts:", dict(processor.count_by_type))
if __name__ == "__main__":
run()