diff --git a/Dockerfile b/Dockerfile new file mode 100644 index 0000000..9fe4f54 --- /dev/null +++ b/Dockerfile @@ -0,0 +1,9 @@ +FROM python:2.7.18 + +RUN apt-get update -y && \ + apt-get install -y zip unzip netcat + +RUN mkdir /app +WORKDIR /app + +CMD ["./run_tests.sh", "-p"] diff --git a/README.md b/README.md index cbf38de..8f9f2e7 100644 --- a/README.md +++ b/README.md @@ -129,6 +129,31 @@ Network Protocol Well, this is pretty "protocol-less", in the sense that when forwarding to a tcp address it will just open the connection and output raw traffic in the same way [netcat](http://en.wikipedia.org/wiki/Netcat) does. +Run Tests +--------- + +The easiest way to run the tests is using docker: +```shell +docker run --rm -it -v $(pwd):/app $(docker build -q .) +``` + +To help debug, you can also make the test intermediary persistent by setting a volume for /tmp dir: +```shell +mkdir tmp +docker run --rm -it -v $(pwd):/app -v $(pwd)/tmp:/tmp $(docker build -q .) +``` + +that will leave the intermediary files in the `tmp` folder. + +To run test manually from from within the container: +```shell +docker run --rm -it -v $(pwd):/app $(docker build -q .) bash +. prepare_tests_env.sh +``` +After that, you can run the tests manually, like: +```shell +. tests/test-filter.sh +``` License ------- diff --git a/detect-exceptions b/detect-exceptions new file mode 100755 index 0000000..5f75054 --- /dev/null +++ b/detect-exceptions @@ -0,0 +1,79 @@ +#!/usr/bin/env python + +import sys +import time +import json +import threading +import signal +import optparse + +exceptions = {} +in_exception = False +exception_lines = [] +running_lock = threading.Lock() + +parser = optparse.OptionParser(usage="Usage: %prog [options]") + +parser.add_option('-n', '--namespace', action="store", dest="ns", help="ns parameter of the json output.") +parser.add_option('-i', '--interval', action="store", dest="interval", + help="How often (in seconds) should flush exceptions.", default=5) + +options, args = parser.parse_args() + +def exception_ended(): + global in_exception, exception_lines, running_lock + exception_string = "\n".join(exception_lines) + running_lock.acquire(True) + try: + exceptions[exception_string] = exceptions.get(exception_string, 0) + 1 + finally: + running_lock.release() + + exception_lines = [] + in_exception = False + +def process_input(): + global in_exception, exception_lines + for line in sys.stdin: + line = line.rstrip() + if not in_exception and (line.startswith("Error") or line.startswith("Exception")): + in_exception = True + exception_lines.append(line) + continue + + if in_exception: + if line.startswith(' '): + exception_lines.append(line) + else: + exception_ended() + in_exception = False + flush() + + +def print_exceptions(): + global exceptions, running_lock + interval = float(options.interval) + while True: + time.sleep(interval) + flush() + + +def flush(): + global exceptions, running_lock + localExceptions = {} + running_lock.acquire(True) + try: + localExceptions = exceptions + exceptions = {} + finally: + running_lock.release() + for exception, count in localExceptions.items(): + print(json.dumps({"ns": options.ns, "count": count, "exception": exception})) + sys.stdout.flush() + + +thread = threading.Thread(target=print_exceptions) +thread.daemon = True +thread.start() + +process_input() diff --git a/forwarder b/forwarder index 80eef8f..fcff401 100755 --- a/forwarder +++ b/forwarder @@ -1,7 +1,12 @@ #!/usr/bin/env python from __future__ import print_function -import ConfigParser + +try: + from configparser import ConfigParser # Python 3 import +except ImportError: + from ConfigParser import ConfigParser # If ConfigParser missing, we're on Py3, import Py2 as Py3 name + import os import optparse import sys diff --git a/offsets b/offsets new file mode 100644 index 0000000..2a70637 --- /dev/null +++ b/offsets @@ -0,0 +1 @@ +b'bd4a6765a4aa9dd566cbe16e2d8ac81e' 11612437 /Users/gopi.thumati/lem-logs/log diff --git a/send_to_es b/send_to_es index 8e9f0ee..79bd525 100755 --- a/send_to_es +++ b/send_to_es @@ -1,13 +1,24 @@ #!/usr/bin/env python -import sys, json, os, urllib2, datetime, re, Queue, threading, time, optparse +import sys, json, os, datetime, re, threading, time, optparse +try: + import queue # Python 3 import +except ImportError: + import Queue as queue # If queue missing, we're on Py2, import Py2 as Py3 name + +try: + # For Python 3.0 and later + import urllib.request as urlopen +except ImportError: + # Fall back to Python 2's urllib2 + import urllib2 as urlopen class ExitToken: - pass + pass exit_token = ExitToken() -q = Queue.Queue(1000) +q = queue.Queue(1000) hostname = os.popen("hostname").read().strip() @@ -22,64 +33,64 @@ options, args = parser.parse_args() es_url = args[0] def to_event(event): - try: - match = re.match(r'(\d\d\d\d-\d\d-\d\d[ T]\d\d:\d\d:\d\d[.,])(\d*)([+-]\d\d\d\d)?.*', event['line']) - if match: - timestamp = match.group(1) + match.group(2)[0:3] - timestamp += match.group(3) if match.group(3) else "+0000" - if timestamp[10] == ' ': - timestamp = timestamp[0:10] + 'T' + timestamp[11:] - else: - timestamp = "%s%s%02d00" % (event['time'][0:-3], "+" if time.altzone <= 0 else "-", time.altzone/60/60*-1) - except ValueError, e: - timestamp = event['time'] - data = { - "@timestamp": timestamp, - "host": options.hostname, - "message": event_line(event['line'].strip()), - } - return json.dumps(data) + try: + match = re.match(r'(\d\d\d\d-\d\d-\d\d[ T]\d\d:\d\d:\d\d[.,])(\d*)([+-]\d\d\d\d)?.*', event['line']) + if match: + timestamp = match.group(1) + match.group(2)[0:3] + timestamp += match.group(3) if match.group(3) else "+0000" + if timestamp[10] == ' ': + timestamp = timestamp[0:10] + 'T' + timestamp[11:] + else: + timestamp = "%s%s%02d00" % (event['time'][0:-3], "+" if time.altzone <= 0 else "-", time.altzone/60/60*-1) + except ValueError as e: + timestamp = event['time'] + data = { + "@timestamp": timestamp, + "host": options.hostname, + "message": event_line(event['line'].strip()), + } + return json.dumps(data) def starts_with_space(line): - return len(line) > options.offset and line[options.offset] in [' ', '\t'] + return len(line) > options.offset and line[options.offset] in [' ', '\t'] def event_line(line): - return line[options.offset:] if options.cut else line + return line[options.offset:] if options.cut else line def sending(): - running = True - lastEvent = None - event = None - while event != exit_token: - count = 0; - payload = '' - while count < 1000 and event != exit_token and (not q.empty() or count == 0): - try: - event = q.get(True, 1) - except Queue.Empty, e: - event = None - if event and event != exit_token and starts_with_space(event['line']): - lastEvent['line'] += "\n" + event_line(event['line']) - else: - if lastEvent: - payload += "{\"index\": {}}\n" + to_event(lastEvent) + "\n" - count += 1 - lastEvent = None - if not event: - break - lastEvent = event - - if count > 0: - # print "----------------------" - # print payload[0:-1] - while True: - try: - urllib2.urlopen("%s/_bulk" % es_url, payload) - break - except URLError, e: - print >> sys.stderr, "Failed sending bulk to ES: %s" % str(e) - sleep(3) + running = True + lastEvent = None + event = None + while event != exit_token: + count = 0 + payload = '' + while count < 1000 and event != exit_token and (not q.empty() or count == 0): + try: + event = q.get(True, 1) + except queue.Empty as e: + event = None + if event and event != exit_token and starts_with_space(event['line']): + lastEvent['line'] += "\n" + event_line(event['line']) + else: + if lastEvent: + payload += "{\"index\": {}}\n" + to_event(lastEvent) + "\n" + count += 1 + lastEvent = None + if not event: + break + lastEvent = event + + if count > 0: + # print "----------------------" + # print payload[0:-1] + while True: + try: + urlopen.urlopen("%s/_bulk" % es_url, payload) + break + except URLError as e: + print >> sys.stderr, "Failed sending bulk to ES: %s" % str(e) + time.sleep(3) t = threading.Thread(target=sending) @@ -87,16 +98,16 @@ t.daemon = True t.start() try: - while 1: - line = sys.stdin.readline() - if not line: - q.put(exit_token) - break - line = line[0:-1] - if len(line) > 0: - q.put({'time': datetime.datetime.now().isoformat(), 'line': line}) - # q.put(to_event(line)) - t.join() -except KeyboardInterrupt, e: - pass + while 1: + line = sys.stdin.readline() + if not line: + q.put(exit_token) + break + line = line[0:-1] + if len(line) > 0: + q.put({'time': datetime.datetime.now().isoformat(), 'line': line}) + # q.put(to_event(line)) + t.join() +except KeyboardInterrupt as e: + pass diff --git a/send_to_rabbitmq b/send_to_rabbitmq index a662460..94c1f1e 100755 --- a/send_to_rabbitmq +++ b/send_to_rabbitmq @@ -1,13 +1,29 @@ #!/usr/bin/env python -import sys, json, os, urllib, urllib2, datetime, re, Queue, threading, time, optparse, base64 +import sys, json, os, datetime, re, threading, time, optparse, base64 +try: + import urllib.parse as quote # Python 3+ +except ImportError: + import urllib as quote # Python 2.X + +try: + # For Python 3.0 and later + import urllib.request as urlopen +except ImportError: + # Fall back to Python 2's urllib2 + import urllib2 as urlopen + +try: + import queue # Python 3 import +except ImportError: + import queue as queue # If queue missing, we're on Py2, import Py2 as Py3 name class ExitToken: - pass + pass exit_token = ExitToken() -q = Queue.Queue(1000) +q = queue.Queue(1000) hostname = os.popen("hostname").read().strip() @@ -23,13 +39,13 @@ parser.add_option('-u', '--credentials USERNAME:PASSWORD', action="store", dest= options, args = parser.parse_args() if not options.exchange: - parser.error('Exchange not given') + parser.error('Exchange not given') if len(args) == 0: - parser.error('RabbitMQ URL not given') + parser.error('RabbitMQ URL not given') -encoded_vhost = urllib.quote(options.vhost, safe='') -encoded_exchange = urllib.quote(options.exchange, safe='') +encoded_vhost = quote(options.vhost, safe='') +encoded_exchange = quote(options.exchange, safe='') rabbitmq_url = args[0] @@ -38,85 +54,83 @@ base64string = base64.standard_b64encode('%s:%s' % (cred[0], cred[1])).replace(' uri = "%s/api/exchanges/%s/%s/publish" % (rabbitmq_url, encoded_vhost, encoded_exchange) - def to_event(event): - try: - match = re.match(r'(\d\d\d\d-\d\d-\d\d[ T]\d\d:\d\d:\d\d[.,])(\d*)([+-]\d\d\d\d)?.*', event['line']) - if match: - timestamp = match.group(1) + match.group(2)[0:3] - timestamp += match.group(3) if match.group(3) else "+0000" - if timestamp[10] == ' ': - timestamp = timestamp[0:10] + 'T' + timestamp[11:] - else: - timestamp = "%s%s%02d00" % (event['time'][0:-3], "+" if time.altzone <= 0 else "-", time.altzone/60/60*-1) - except ValueError, e: - timestamp = event['time'] - data = { - "@timestamp": timestamp, - "host": options.hostname, - "message": event['line'].strip(), - } - return json.dumps(data) - + try: + match = re.match(r'(\d\d\d\d-\d\d-\d\d[ T]\d\d:\d\d:\d\d[.,])(\d*)([+-]\d\d\d\d)?.*', event['line']) + if match: + timestamp = match.group(1) + match.group(2)[0:3] + timestamp += match.group(3) if match.group(3) else "+0000" + if timestamp[10] == ' ': + timestamp = timestamp[0:10] + 'T' + timestamp[11:] + else: + timestamp = "%s%s%02d00" % (event['time'][0:-3], "+" if time.altzone <= 0 else "-", time.altzone/60/60*-1) + except ValueError as e: + timestamp = event['time'] + data = { + "@timestamp": timestamp, + "host": options.hostname, + "message": event['line'].strip(), + } + return json.dumps(data) def starts_with_space(line): - return len(line) > options.offset and line[options.offset] in [' ', '\t'] + return len(line) > options.offset and line[options.offset] in [' ', '\t'] def sending(): - running = True - lastEvent = None - event = None - while event != exit_token: - count = 0; - payload = '' - while count < 1 and event != exit_token and (not q.empty() or count == 0): - try: - event = q.get(True, 1) - except Queue.Empty, e: - event = None - if event and event != exit_token and starts_with_space(event['line']): - lastEvent['line'] += "\n" + event['line'] - else: - if lastEvent: - payload += json.dumps({ - "properties":{}, - "routing_key":options.routing_key, - "payload":to_event(lastEvent), - "payload_encoding":"string"} - ) - # payload += "{\"index\": {}}\n" + to_event(lastEvent) + "\n" - count += 1 - lastEvent = None - if not event: - break - lastEvent = event - - if count > 0: - request = urllib2.Request(uri) - request.add_header("Authorization", "Basic %s" % base64string) - # print "----------------------" - # print "sending to: %s" % uri - # print payload[0:-1] - - urllib2.urlopen(request, payload) - # print "done" - - # urllib2.urlopen(uri, payload) + running = True + lastEvent = None + event = None + while event != exit_token: + count = 0 + payload = '' + while count < 1 and event != exit_token and (not q.empty() or count == 0): + try: + event = q.get(True, 1) + except queue.Empty as e: + event = None + if event and event != exit_token and starts_with_space(event['line']): + lastEvent['line'] += "\n" + event['line'] + else: + if lastEvent: + payload += json.dumps({ + "properties":{}, + "routing_key":options.routing_key, + "payload":to_event(lastEvent), + "payload_encoding":"string"} + ) + # payload += "{\"index\": {}}\n" + to_event(lastEvent) + "\n" + count += 1 + lastEvent = None + if not event: + break + lastEvent = event + + if count > 0: + request = urlopen.Request(uri) + request.add_header("Authorization", "Basic %s" % base64string) + # print "----------------------" + # print "sending to: %s" % uri + # print payload[0:-1] + + urlopen.urlopen(request, payload) + # print "done" + + # urlopen.urlopen(uri, payload) t = threading.Thread(target=sending) t.daemon = True t.start() try: - while 1: - line = sys.stdin.readline() - if not line: - q.put(exit_token) - break - line = line[0:-1] - if len(line) > 0: - q.put({'time': datetime.datetime.now().isoformat(), 'line': line}) - t.join() -except KeyboardInterrupt, e: - pass + while 1: + line = sys.stdin.readline() + if not line: + q.put(exit_token) + break + line = line[0:-1] + if len(line) > 0: + q.put({'time': datetime.datetime.now().isoformat(), 'line': line}) + t.join() +except KeyboardInterrupt as e: + pass diff --git a/send_to_stomp b/send_to_stomp index f7968c7..daf3996 100755 --- a/send_to_stomp +++ b/send_to_stomp @@ -1,16 +1,20 @@ #!/usr/bin/env python -from __future__ import print_function -import sys, json, os, datetime, re, Queue, threading, time, optparse, base64 +import sys, json, os, datetime, re, threading, time, optparse, base64 +try: + import queue # Python 3 import +except ImportError: + import queue as queue # If queue missing, we're on Py2, import Py2 as Py3 name + from select import select from stompclient import StompClient import sys class ExitToken: - pass + pass exit_token = ExitToken() -q = Queue.Queue(1000) +q = queue.Queue(1000) hostname = os.popen("hostname").read().strip() @@ -28,27 +32,26 @@ parser.add_option('-b', '--heartbeat INTERVAL', action="store", dest="heartbeat" options, args = parser.parse_args() if not options.exchange: - parser.error('Exchange not given') + parser.error('Exchange not given') if len(args) == 0: - parser.error('Stomp host:port not given') + parser.error('Stomp host:port not given') def eprint(*args, **kwargs): - print(*args, file=sys.stderr, **kwargs) + print(*args, file=sys.stderr, **kwargs) def log_json(name, props={}): - a = { - 'timestamp': datetime.datetime.now().isoformat(), - 'ns': 'forwarder.stomp', - 'hostname': hostname, - 'name': name - } - z = props.copy() - z.update(a) - eprint(json.dumps(z)) - - -log_json('starting', dict([(property, value) for property, value in vars(options).iteritems()])) + a = { + 'timestamp': datetime.datetime.now().isoformat(), + 'ns': 'forwarder.stomp', + 'hostname': hostname, + 'name': name + } + z = props.copy() + z.update(a) + eprint(json.dumps(z)) + +log_json('starting', dict([(property, value) for property, value in vars(options).items()])) stomp_address = args[0].split(":") @@ -57,81 +60,79 @@ stomp = StompClient(stomp_address[0], int(stomp_address[1])) cred = options.credentials.split(":") stomp.connect(cred[0], cred[1], options.vhost) - def to_event(event): - try: - match = re.match(r'(\d\d\d\d-\d\d-\d\d[ T]\d\d:\d\d:\d\d[.,])(\d*)([+-]\d\d\d\d)?.*', event['line']) - if match: - timestamp = match.group(1) + match.group(2)[0:3] - timestamp += match.group(3) if match.group(3) else "+0000" - if timestamp[10] == ' ': - timestamp = timestamp[0:10] + 'T' + timestamp[11:] - else: - timestamp = "%s%s%02d00" % (event['time'][0:-3], "+" if time.altzone <= 0 else "-", time.altzone/60/60*-1) - except ValueError, e: - timestamp = event['time'] - return { - "@timestamp": timestamp, - "host": options.hostname, - "message": event_line(event['line'].strip()), - } - + try: + match = re.match(r'(\d\d\d\d-\d\d-\d\d[ T]\d\d:\d\d:\d\d[.,])(\d*)([+-]\d\d\d\d)?.*', event['line']) + if match: + timestamp = match.group(1) + match.group(2)[0:3] + timestamp += match.group(3) if match.group(3) else "+0000" + if timestamp[10] == ' ': + timestamp = timestamp[0:10] + 'T' + timestamp[11:] + else: + timestamp = "%s%s%02d00" % (event['time'][0:-3], "+" if time.altzone <= 0 else "-", time.altzone/60/60*-1) + except ValueError as e: + timestamp = event['time'] + return { + "@timestamp": timestamp, + "host": options.hostname, + "message": event_line(event['line'].strip()), + } def starts_with_space(line): - return len(line) > options.offset and line[options.offset] in [' ', '\t'] + return len(line) > options.offset and line[options.offset] in [' ', '\t'] def event_line(line): - return line[options.offset:] if options.cut else line + return line[options.offset:] if options.cut else line def sending(): - lastEvent = None - event = None - while event != exit_token: - count = 0; - payload = '' - while count < 1 and event != exit_token and (not q.empty() or count == 0): - try: - event = q.get(True, 1) - except Queue.Empty, e: - event = None - if event and event != exit_token and starts_with_space(event['line']): - lastEvent['line'] += "\n" + event_line(event['line']) - else: - if lastEvent: - payload += json.dumps(to_event(lastEvent)) - count += 1 - lastEvent = None - if not event: - break - lastEvent = event - - if count > 0: - stomp.send(options.exchange, options.routing_key, payload) + lastEvent = None + event = None + while event != exit_token: + count = 0 + payload = '' + while count < 1 and event != exit_token and (not q.empty() or count == 0): + try: + event = q.get(True, 1) + except queue.Empty as e: + event = None + if event and event != exit_token and starts_with_space(event['line']): + lastEvent['line'] += "\n" + event_line(event['line']) + else: + if lastEvent: + payload += json.dumps(to_event(lastEvent)) + count += 1 + lastEvent = None + if not event: + break + lastEvent = event + + if count > 0: + stomp.send(options.exchange, options.routing_key, payload) t = threading.Thread(target=sending) t.daemon = True t.start() try: - last_heartbeat = time.time(); - while 1: - rlist, _, _ = select([sys.stdin], [], [], 0.1) - if rlist: - line = sys.stdin.readline() - if not line: - q.put(exit_token) - break - line = line[0:-1] - if len(line) > 0: - q.put({'time': datetime.datetime.now().isoformat(), 'line': line}) - - now = time.time(); - if now - last_heartbeat >= options.heartbeat: - log_json('heartbeat') - last_heartbeat += options.heartbeat - - - t.join() -except KeyboardInterrupt, e: - pass + last_heartbeat = time.time() + while 1: + rlist, _, _ = select([sys.stdin], [], [], 0.1) + if rlist: + line = sys.stdin.readline() + if not line: + q.put(exit_token) + break + line = line[0:-1] + if len(line) > 0: + q.put({'time': datetime.datetime.now().isoformat(), 'line': line}) + + now = time.time() + if now - last_heartbeat >= options.heartbeat: + log_json('heartbeat') + last_heartbeat += options.heartbeat + + t.join() +except KeyboardInterrupt as e: + pass + diff --git a/sender b/sender index f1790f7..0c72822 100755 --- a/sender +++ b/sender @@ -7,27 +7,44 @@ import os import glob import socket import optparse -import md5 +import hashlib import zipfile import gzip import tempfile import shutil import subprocess import threading -import Queue import signal import traceback +import json DEFAULT_SIGNAGURE_LENGTH=256 -filter_stopped = Queue.Queue() -stop_signal = Queue.Queue() +try: + import queue # Python 3 import +except ImportError: + import Queue as queue # If queue missing, we're on Py2, import Py2 as Py3 name + +filter_stopped = queue.Queue() +stop_signal = queue.Queue() running_lock = threading.Lock() +debug_on = False +running = True def should_stop(): return not stop_signal.empty() +def heartbeat(): + global running + while running: + if running: + log("METRIC ns=forwarder.heartbeat result=ok"); + else: + log("METRIC ns=forwarder.heartbeat result=fail"); + time.sleep(3600) + def main(): + global running parser = optparse.OptionParser(usage="Usage: %prog [options] ... ", description= "Outputs the content of files resolved by the patterns passed as " "parameters and keep monitoring them for new content. " @@ -36,6 +53,7 @@ def main(): "IMPORTANT: only files with size >= signature-length (default %d) bytes will be processed. " "Zip files will be open recursively, and only once." % DEFAULT_SIGNAGURE_LENGTH) + parser.add_option('-D', '--debug', action="store_true", dest="debug", help="Enable debug output", default=False) parser.add_option('-f', '--follow', action="store_true", dest="follow", help="Pools the file very second for changes in an infinite loop.", default=False) parser.add_option('-p', '--offsets', action="store", dest="offsets", help="File to persist the last offsets read for each file. If it doesn't exist, the files are read from beginning.", default='/dev/null') parser.add_option('-t', '--tcp', action="store", dest="host", help="Sends the output to the host:port via TCP.", metavar="HOST:PORT") @@ -49,6 +67,11 @@ def main(): options, args = parser.parse_args() + if options.debug: + log("DEBUG ON") + global debug_on + debug_on = True + if options.dump_pid: f = open(options.dump_pid,'w') f.write("%d" % os.getpid()) @@ -68,7 +91,9 @@ def main(): a = options.host.split(":") try: output = Netcat(a[0], int(a[1]), options.retry_on_network_error) - except socket.error, e: + except socket.error as e: + global running + running=False log("Socket error: %s" % str(e)) sys.exit(1) @@ -86,7 +111,7 @@ def main(): try: last_output.write(line) last_output.flush() - except socket.error, e: + except socket.error as e: log("Socket error in filer: %s" % e) break except: @@ -115,11 +140,13 @@ def main(): log("That does it. Sending SIGINT.") os.kill(filter.pid, sig) break - except OSError, e: + except OSError as e: log("An exception happened while waiting for process to finish: %s" % e) def signal_handler(signal, frame): + global running + running = False log("%s caught" % signal_names[signal]) stop_signal.put(True) # if not filter.stdin.closed: @@ -153,6 +180,8 @@ def log(msg): sys.stderr.flush() def debug(msg): + if debug_on: + log("DEBUG: "+ msg); # sys.stderr.write(str(msg)) # sys.stderr.write("\n") # sys.stderr.flush() @@ -184,21 +213,25 @@ class Netcat(object): self.s = socket.socket(socket.AF_INET, socket.SOCK_STREAM) self.s.connect(self.addr) self.show_error_message = True - break; - except socket.error, e: + break + except socket.error as e: self.error("Socket error: %s" % e) - except IOError, e: + except IOError as e: self.error("IOError: %s" % e) self.exit_or_retry() def error(self, msg): + global running + running=False if self.show_error_message: log(msg) def exit_or_retry(self): + global running + running = True if not self.retry_on_network_error: + running = False sys.exit(1) - self.error("Retrying every 3 seconds.") self.show_error_message = False time.sleep(3) @@ -213,9 +246,9 @@ class Netcat(object): self.last_msg = msg self.send_last = False break - except socket.error, e: + except socket.error as e: self.error("Socket error: %s" % e) - except IOError, e: + except IOError as e: self.error("IOError: %s" % e) self.send_last = True self.exit_or_retry() @@ -264,6 +297,8 @@ class Tail(object): self.starting = options.starting self.start_from_tail = options.start_from_tail self.readOffsets() + self.duplicates = {} + def persistOffsets(self): if self.offsetsfn == '/dev/null': @@ -272,7 +307,7 @@ class Tail(object): temp = '%s-temp' % self.offsetsfn f = open(temp,'w') try: - for sig, info in self.offsets.iteritems(): + for sig, info in self.offsets.items(): f.write(info.dump()) f.write('\n') f.flush() @@ -299,13 +334,13 @@ class Tail(object): f.close() def generateSignature(self, f): - offset = f.tell(); + offset = f.tell() header = f.read(self.signatureLength) f.seek(offset) if len(header) == 0: return None else: - return md5.new(header).hexdigest() + return hashlib.md5(header).hexdigest().encode('utf-8') def purgeOffsetsNotIn(self, existing): newOffsets = {} @@ -319,10 +354,11 @@ class Tail(object): if should_stop(): return for line in f: + if debug_on: debug("line %s" % line.replace("\n","")) if not self.starting or line >= self.starting: self.output.write(line) if should_stop(): - break; + break self.output.flush() def isCompressed(self, fn): @@ -355,10 +391,11 @@ class Tail(object): self.processZipFile(f, fn) def processGzipFile(self, fn): - debug("gz: %s" % fn) + if debug_on: debug("gz: %s" % fn) f = gzip.open(fn) try: - self.processFile(f, '/var/tmp/fake.log', {}) + sig = self.generateSignature(f) + self.processFile(f, '/var/tmp/fake.log', sig) finally: f.close() @@ -369,46 +406,41 @@ class Tail(object): shutil.rmtree(path) def processFileByName(self, fn, existing): - debug("processFileByName: %s" % fn) + if debug_on: debug("processFileByName: %s" % fn) f = open(fn, 'rb') try: self.processFile(f, fn, existing) finally: f.close() + debug("processFileByName, close file"); - def processFile(self, f, fn, existing): - debug("processFile: %s" % fn) - sig = self.generateSignature(f) - if not sig: - return - - if sig in existing and os.path.getsize(fn) != os.path.getsize(existing[sig]): - log("WARN Files '%s' and '%s' have same signature and different sizes" % (fn, existing[sig])) - + def processFile(self, f, fn, sig): info = self.offsets.get(sig, Info(sig=sig, name=fn)) + if debug_on: debug("processFile %s %s" % (fn, info.dump())) lastOffset = info.offset - info.name = fn if self.isCompressed(fn): - debug("compressed: %s" % fn) + if debug_on: debug("compressed: %s" % fn) if info.offset == 0: if not self.start_from_tail: self.processCompressedFile(f, fn) info.offset = -1 else: if self.start_from_tail: + if debug_on: debug("starting from tail %s" % fn) info.offsets = os.path.getsize(fn) if os.path.exists(fn) and os.path.getsize(fn) < info.offset: - log("WARN file %s was truncated" % fn) + log("METRIC ns=forwarder.truncated file=%s filesize=%s offset=%s" % (fn, os.path.getsize(fn), info.offset)) info.offset = os.path.getsize(fn) else: + if debug_on: debug("Seeking to %s in %s, currently at %s" % (info.offset, fn, f.tell())) f.seek(info.offset) self.copy(f) info.offset = f.tell() + if debug_on: debug("Setting offset for: %s to %s (info: %s)" % (fn, info.offset, info.dump())) - existing[sig] = fn if lastOffset != info.offset: self.offsets[sig] = info @@ -416,29 +448,71 @@ class Tail(object): def run(self): + global running running_lock.acquire(True) try: while not should_stop(): if not filter_stopped.empty(): sys.exit(1) existing = {} + to_process = {} + filehandles = {} for fnpattern in self.fnpatterns: for fn in glob.glob(fnpattern): + if debug_on: debug("Checking fn %s" % fn) + f = None try: if not os.path.isfile(fn): log("File no longer exists: %s" % fn) continue if os.path.getsize(fn) < self.signatureLength and not self.isCompressed(fn): + log("Skipping as file too short to generate sig or file is compressed: %s" % fn) continue - self.processFileByName(fn, existing) - except Exception, e: - log("Exception: %s" % e) + f = open(fn, 'rb') + sig = self.generateSignature(f) + if debug_on: debug("Sig for fn %s is %s" % (fn, sig)) + sig_fn = sig+fn.encode('utf-8'); + if sig in existing: + if not sig_fn in self.duplicates: + log("METRIC ns=forwarder.duplicatesig file=%s dupe_of=%s sig=%s" % (fn, existing[sig], sig)) + self.duplicates[sig_fn] = True; + if sig in to_process: #take original duplicate out of to_process + del to_process[sig] + f.close() + else: + if debug_on: debug("Adding file %s %s" % (fn, sig)) + existing[sig] = fn #leave in existing in case more than 2 duplicates + to_process[sig] = fn + filehandles[sig] = f + + except Exception as e: + log("METRIC ns=forwarder.error.preprocess file=\"%s\" exception=\"%s\"" % (fn, str(e).replace("\n", ""))) + log("Exception=\"%s\"" % e) + if f: f.close() exc_type, exc_value, exc_traceback = sys.exc_info() traceback.print_tb(exc_traceback) + if debug_on: debug("To Process %s" % to_process) + sigs = to_process.keys() + for sig in sigs: + f = None + try: + fn = to_process[sig] + if debug_on: debug("Processing file %s %s" % (fn, sig)) + f = filehandles[sig] + self.processFile(f, fn, sig) + except Exception as e: + log("METRIC ns=forwarder.error.process file=\"%s\" exception=\"%s\"" % (fn, str(e).replace("\n", ""))) + log("Exception=\"%s\"" % e) + exc_type, exc_value, exc_traceback = sys.exc_info() + traceback.print_tb(exc_traceback) + f.close() + finally: + if f and not f.closed: f.close() + if len(existing) != len(self.offsets): - self.purgeOffsetsNotIn(existing) + self.purgeOffsetsNotIn(to_process) if not self.follow: break time.sleep(1) @@ -448,6 +522,8 @@ class Tail(object): if __name__ == '__main__': - main() - + heartbeat_thread = threading.Thread(target=heartbeat) + heartbeat_thread.daemon=True + heartbeat_thread.start() + main() #import pdb ; pdb.set_trace() diff --git a/tests/assert.sh b/tests/assert.sh index ffd2b95..3396bf7 100644 --- a/tests/assert.sh +++ b/tests/assert.sh @@ -102,7 +102,8 @@ assert() { # assert [stdin] (( tests_ran++ )) || : [[ -z "$DISCOVERONLY" ]] || return - expected=$(echo -ne "${2:-}") + # expected=$(echo -ne "${2:-}") + expected=${2:-} result="$(eval 2>/dev/null $1 <<< ${3:-})" || true if [[ "$result" == "$expected" ]]; then [[ -z "$DEBUG" ]] || echo -n . diff --git a/tests/test-kill-int-forwarder.sh b/tests/test-kill-int-forwarder.sh index 87ae499..cb8e314 100755 --- a/tests/test-kill-int-forwarder.sh +++ b/tests/test-kill-int-forwarder.sh @@ -5,11 +5,13 @@ uuid=$(last_uuid) sender -f 'myapp.*' -l "grep --line-buffered -v ${uuid}" > /dev/null & sender_pid=$! -sleep 0.1 # to avoid race condition +sleep 1 # to avoid race condition assert "ps ax | grep ${uuid} | grep -v sender | grep -v filter_wrapper | grep line-buffered | count_lines" 1 -kill -SIGINT $sender_pid +if ps -p $sender_pid > /dev/null; then + kill -2 $sender_pid +fi wait $sender_pid diff --git a/tests/test-kill-term-forwarder.sh b/tests/test-kill-term-forwarder.sh index 1b6b512..7980334 100755 --- a/tests/test-kill-term-forwarder.sh +++ b/tests/test-kill-term-forwarder.sh @@ -5,11 +5,13 @@ uuid=$(last_uuid) sender -f 'myapp.*' -l "grep --line-buffered -v ${uuid}" > /dev/null & sender_pid=$! -sleep 0.1 # to avoid race condition +sleep 1 # to avoid race condition assert "ps ax | grep ${uuid} | grep -v sender | grep -v filter_wrapper | grep line-buffered | count_lines" 1 -kill -SIGTERM $sender_pid +if ps -p $sender_pid > /dev/null; then + kill -15 $sender_pid +fi wait $sender_pid diff --git a/tests/test-opt-greater-or-equal-than.sh b/tests/test-opt-greater-or-equal-than.sh index 0ed4eaa..bc7d21d 100644 --- a/tests/test-opt-greater-or-equal-than.sh +++ b/tests/test-opt-greater-or-equal-than.sh @@ -1,5 +1,7 @@ for i in {1..10}; do log_random >> myapp.log; done +sleep 1 # to avoid race condition + log_random >> myapp.log ts=$(last_timestamp) log_random >> myapp.log diff --git a/tests/test-signature-size.sh b/tests/test-signature-size.sh index 97dce52..17df4f0 100644 --- a/tests/test-signature-size.sh +++ b/tests/test-signature-size.sh @@ -1,5 +1,14 @@ echo "AAAAAAAAAAAAAAAAAA BBBBBBBBBBBBBBB" > myapp.log echo "AAAAAAAAAAAAAAAAAA CCCCCCCCCCCCCCC" > myapp.log.1 -assert "sender -s 15 'myapp.*' | count_lines" 1 +assert "sender -s 15 'myapp.*' | count_lines" 0 assert "sender -s 30 'myapp.*' | count_lines" 2 + +echo "AAAAAAAAAAAAAAAAAA DDDDDDDDDDDDDDD" > myapp.log.2 +echo "AAAAAAAAAAAAAAAAAA DDDDDDDDDDDDDDD" > myapp.log.3 +echo "AAAAAAAAAAAAAAAAAA DDDDDDDDDDDDDDD" > myapp.log.4 +echo "AAAAAAAAAAAAAAAAAA DDDDDDDDDDDDDDD" > myapp.log.5 + +assert "sender -s 30 'myapp.*' | count_lines" 2 + +assert "sender -s 30 'myapp.*' 2>&1 | grep forwarder.duplicatesig | count_lines" 3