Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 9 additions & 0 deletions Dockerfile
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
FROM python:2.7.18

RUN apt-get update -y && \
apt-get install -y zip unzip netcat

RUN mkdir /app
WORKDIR /app

CMD ["./run_tests.sh", "-p"]
25 changes: 25 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -129,6 +129,31 @@ Network Protocol

Well, this is pretty "protocol-less", in the sense that when forwarding to a tcp address it will just open the connection and output raw traffic in the same way [netcat](http://en.wikipedia.org/wiki/Netcat) does.

Run Tests
---------

The easiest way to run the tests is using docker:
```shell
docker run --rm -it -v $(pwd):/app $(docker build -q .)
```

To help debug, you can also make the test intermediary persistent by setting a volume for /tmp dir:
```shell
mkdir tmp
docker run --rm -it -v $(pwd):/app -v $(pwd)/tmp:/tmp $(docker build -q .)
```

that will leave the intermediary files in the `tmp` folder.

To run test manually from from within the container:
```shell
docker run --rm -it -v $(pwd):/app $(docker build -q .) bash
. prepare_tests_env.sh
```
After that, you can run the tests manually, like:
```shell
. tests/test-filter.sh
```

License
-------
Expand Down
79 changes: 79 additions & 0 deletions detect-exceptions
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
#!/usr/bin/env python

import sys
import time
import json
import threading
import signal
import optparse

exceptions = {}
in_exception = False
exception_lines = []
running_lock = threading.Lock()

parser = optparse.OptionParser(usage="Usage: %prog [options]")

parser.add_option('-n', '--namespace', action="store", dest="ns", help="ns parameter of the json output.")
parser.add_option('-i', '--interval', action="store", dest="interval",
help="How often (in seconds) should flush exceptions.", default=5)

options, args = parser.parse_args()

def exception_ended():
global in_exception, exception_lines, running_lock
exception_string = "\n".join(exception_lines)
running_lock.acquire(True)
try:
exceptions[exception_string] = exceptions.get(exception_string, 0) + 1
finally:
running_lock.release()

exception_lines = []
in_exception = False

def process_input():
global in_exception, exception_lines
for line in sys.stdin:
line = line.rstrip()
if not in_exception and (line.startswith("Error") or line.startswith("Exception")):
in_exception = True
exception_lines.append(line)
continue

if in_exception:
if line.startswith(' '):
exception_lines.append(line)
else:
exception_ended()
in_exception = False
flush()


def print_exceptions():
global exceptions, running_lock
interval = float(options.interval)
while True:
time.sleep(interval)
flush()


def flush():
global exceptions, running_lock
localExceptions = {}
running_lock.acquire(True)
try:
localExceptions = exceptions
exceptions = {}
finally:
running_lock.release()
for exception, count in localExceptions.items():
print(json.dumps({"ns": options.ns, "count": count, "exception": exception}))
sys.stdout.flush()


thread = threading.Thread(target=print_exceptions)
thread.daemon = True
thread.start()

process_input()
7 changes: 6 additions & 1 deletion forwarder
Original file line number Diff line number Diff line change
@@ -1,7 +1,12 @@
#!/usr/bin/env python

from __future__ import print_function
import ConfigParser

try:
from configparser import ConfigParser # Python 3 import
except ImportError:
from ConfigParser import ConfigParser # If ConfigParser missing, we're on Py3, import Py2 as Py3 name

import os
import optparse
import sys
Expand Down
1 change: 1 addition & 0 deletions offsets
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
b'bd4a6765a4aa9dd566cbe16e2d8ac81e' 11612437 /Users/gopi.thumati/lem-logs/log
143 changes: 77 additions & 66 deletions send_to_es
Original file line number Diff line number Diff line change
@@ -1,13 +1,24 @@
#!/usr/bin/env python

import sys, json, os, urllib2, datetime, re, Queue, threading, time, optparse
import sys, json, os, datetime, re, threading, time, optparse

try:
import queue # Python 3 import
except ImportError:
import Queue as queue # If queue missing, we're on Py2, import Py2 as Py3 name

try:
# For Python 3.0 and later
import urllib.request as urlopen
except ImportError:
# Fall back to Python 2's urllib2
import urllib2 as urlopen

class ExitToken:
pass
pass
exit_token = ExitToken()

q = Queue.Queue(1000)
q = queue.Queue(1000)

hostname = os.popen("hostname").read().strip()

Expand All @@ -22,81 +33,81 @@ options, args = parser.parse_args()
es_url = args[0]

def to_event(event):
try:
match = re.match(r'(\d\d\d\d-\d\d-\d\d[ T]\d\d:\d\d:\d\d[.,])(\d*)([+-]\d\d\d\d)?.*', event['line'])
if match:
timestamp = match.group(1) + match.group(2)[0:3]
timestamp += match.group(3) if match.group(3) else "+0000"
if timestamp[10] == ' ':
timestamp = timestamp[0:10] + 'T' + timestamp[11:]
else:
timestamp = "%s%s%02d00" % (event['time'][0:-3], "+" if time.altzone <= 0 else "-", time.altzone/60/60*-1)
except ValueError, e:
timestamp = event['time']
data = {
"@timestamp": timestamp,
"host": options.hostname,
"message": event_line(event['line'].strip()),
}
return json.dumps(data)
try:
match = re.match(r'(\d\d\d\d-\d\d-\d\d[ T]\d\d:\d\d:\d\d[.,])(\d*)([+-]\d\d\d\d)?.*', event['line'])
if match:
timestamp = match.group(1) + match.group(2)[0:3]
timestamp += match.group(3) if match.group(3) else "+0000"
if timestamp[10] == ' ':
timestamp = timestamp[0:10] + 'T' + timestamp[11:]
else:
timestamp = "%s%s%02d00" % (event['time'][0:-3], "+" if time.altzone <= 0 else "-", time.altzone/60/60*-1)
except ValueError as e:
timestamp = event['time']
data = {
"@timestamp": timestamp,
"host": options.hostname,
"message": event_line(event['line'].strip()),
}
return json.dumps(data)


def starts_with_space(line):
return len(line) > options.offset and line[options.offset] in [' ', '\t']
return len(line) > options.offset and line[options.offset] in [' ', '\t']

def event_line(line):
return line[options.offset:] if options.cut else line
return line[options.offset:] if options.cut else line

def sending():
running = True
lastEvent = None
event = None
while event != exit_token:
count = 0;
payload = ''
while count < 1000 and event != exit_token and (not q.empty() or count == 0):
try:
event = q.get(True, 1)
except Queue.Empty, e:
event = None
if event and event != exit_token and starts_with_space(event['line']):
lastEvent['line'] += "\n" + event_line(event['line'])
else:
if lastEvent:
payload += "{\"index\": {}}\n" + to_event(lastEvent) + "\n"
count += 1
lastEvent = None
if not event:
break
lastEvent = event

if count > 0:
# print "----------------------"
# print payload[0:-1]
while True:
try:
urllib2.urlopen("%s/_bulk" % es_url, payload)
break
except URLError, e:
print >> sys.stderr, "Failed sending bulk to ES: %s" % str(e)
sleep(3)
running = True
lastEvent = None
event = None
while event != exit_token:
count = 0
payload = ''
while count < 1000 and event != exit_token and (not q.empty() or count == 0):
try:
event = q.get(True, 1)
except queue.Empty as e:
event = None
if event and event != exit_token and starts_with_space(event['line']):
lastEvent['line'] += "\n" + event_line(event['line'])
else:
if lastEvent:
payload += "{\"index\": {}}\n" + to_event(lastEvent) + "\n"
count += 1
lastEvent = None
if not event:
break
lastEvent = event

if count > 0:
# print "----------------------"
# print payload[0:-1]
while True:
try:
urlopen.urlopen("%s/_bulk" % es_url, payload)
break
except URLError as e:
print >> sys.stderr, "Failed sending bulk to ES: %s" % str(e)
time.sleep(3)


t = threading.Thread(target=sending)
t.daemon = True
t.start()

try:
while 1:
line = sys.stdin.readline()
if not line:
q.put(exit_token)
break
line = line[0:-1]
if len(line) > 0:
q.put({'time': datetime.datetime.now().isoformat(), 'line': line})
# q.put(to_event(line))
t.join()
except KeyboardInterrupt, e:
pass
while 1:
line = sys.stdin.readline()
if not line:
q.put(exit_token)
break
line = line[0:-1]
if len(line) > 0:
q.put({'time': datetime.datetime.now().isoformat(), 'line': line})
# q.put(to_event(line))
t.join()
except KeyboardInterrupt as e:
pass

Loading