Skip to content
3 changes: 1 addition & 2 deletions run_tests.sh
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ export PATH=$base:$PATH

function run_tests() {
current_dir=$(pwd)
for test in $(find tests -name 'test*.sh' | sort); do
for test in $(find tests -name 'test*.sh' | sort | grep "$ONLY"); do
mkdir -p $workdir/$test
cd $workdir/$test
echo Running tests/$test ...
Expand All @@ -70,4 +70,3 @@ function run_tests() {
}

run_tests

85 changes: 67 additions & 18 deletions sender
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,14 @@ import threading
import Queue
import signal
import traceback
import json

DEFAULT_SIGNAGURE_LENGTH=256

filter_stopped = Queue.Queue()
stop_signal = Queue.Queue()
running_lock = threading.Lock()
debug_on = False

def should_stop():
return not stop_signal.empty()
Expand All @@ -36,6 +38,7 @@ def main():
"IMPORTANT: only files with size >= signature-length (default %d) bytes will be processed. "
"Zip files will be open recursively, and only once." % DEFAULT_SIGNAGURE_LENGTH)

parser.add_option('-D', '--debug', action="store_true", dest="debug", help="Enable debug output", default=False)
parser.add_option('-f', '--follow', action="store_true", dest="follow", help="Pools the file very second for changes in an infinite loop.", default=False)
parser.add_option('-p', '--offsets', action="store", dest="offsets", help="File to persist the last offsets read for each file. If it doesn't exist, the files are read from beginning.", default='/dev/null')
parser.add_option('-t', '--tcp', action="store", dest="host", help="Sends the output to the host:port via TCP.", metavar="HOST:PORT")
Expand All @@ -49,6 +52,11 @@ def main():

options, args = parser.parse_args()

if options.debug:
log("DEBUG ON")
global debug_on
debug_on = True

if options.dump_pid:
f = open(options.dump_pid,'w')
f.write("%d" % os.getpid())
Expand Down Expand Up @@ -153,6 +161,8 @@ def log(msg):
sys.stderr.flush()

def debug(msg):
if debug_on:
log("DEBUG: "+ msg);
# sys.stderr.write(str(msg))
# sys.stderr.write("\n")
# sys.stderr.flush()
Expand Down Expand Up @@ -264,6 +274,8 @@ class Tail(object):
self.starting = options.starting
self.start_from_tail = options.start_from_tail
self.readOffsets()
self.duplicates = {}


def persistOffsets(self):
if self.offsetsfn == '/dev/null':
Expand Down Expand Up @@ -319,6 +331,7 @@ class Tail(object):
if should_stop():
return
for line in f:
if debug_on: debug("line %s" % line.replace("\n",""))
if not self.starting or line >= self.starting:
self.output.write(line)
if should_stop():
Expand Down Expand Up @@ -355,7 +368,7 @@ class Tail(object):
self.processZipFile(f, fn)

def processGzipFile(self, fn):
debug("gz: %s" % fn)
if debug_on: debug("gz: %s" % fn)
f = gzip.open(fn)
try:
self.processFile(f, '/var/tmp/fake.log', {})
Expand All @@ -369,46 +382,41 @@ class Tail(object):
shutil.rmtree(path)

def processFileByName(self, fn, existing):
debug("processFileByName: %s" % fn)
if debug_on: debug("processFileByName: %s" % fn)
f = open(fn, 'rb')
try:
self.processFile(f, fn, existing)
finally:
f.close()
debug("processFileByName, close file");

def processFile(self, f, fn, existing):
debug("processFile: %s" % fn)
sig = self.generateSignature(f)
if not sig:
return

if sig in existing and os.path.getsize(fn) != os.path.getsize(existing[sig]):
log("WARN Files '%s' and '%s' have same signature and different sizes" % (fn, existing[sig]))

def processFile(self, f, fn, sig):
info = self.offsets.get(sig, Info(sig=sig, name=fn))
if debug_on: debug("processFile %s %s" % (fn, info.dump()))

lastOffset = info.offset

info.name = fn

if self.isCompressed(fn):
debug("compressed: %s" % fn)
if debug_on: debug("compressed: %s" % fn)
if info.offset == 0:
if not self.start_from_tail:
self.processCompressedFile(f, fn)
info.offset = -1
else:
if self.start_from_tail:
if debug_on: debug("starting from tail %s" % fn)
info.offsets = os.path.getsize(fn)
if os.path.exists(fn) and os.path.getsize(fn) < info.offset:
log("WARN file %s was truncated" % fn)
log("METRIC ns=forwarder.truncated file=%s filesize=%s offset=%s" % (fn, os.path.getsize(fn), info.offset))
info.offset = os.path.getsize(fn)
else:
if debug_on: debug("Seeking to %s in %s, currently at %s" % (info.offset, fn, f.tell()))
f.seek(info.offset)
self.copy(f)
info.offset = f.tell()
if debug_on: debug("Setting offset for: %s to %s (info: %s)" % (fn, info.offset, info.dump()))

existing[sig] = fn

if lastOffset != info.offset:
self.offsets[sig] = info
Expand All @@ -422,23 +430,64 @@ class Tail(object):
if not filter_stopped.empty():
sys.exit(1)
existing = {}
to_process = {}
filehandles = {}
for fnpattern in self.fnpatterns:
for fn in glob.glob(fnpattern):
if debug_on: debug("Checking fn %s" % fn)
f = None
try:
if not os.path.isfile(fn):
log("File no longer exists: %s" % fn)
continue
if os.path.getsize(fn) < self.signatureLength and not self.isCompressed(fn):
log("Skipping as file too short to generate sig or file is compressed: %s" % fn)
continue
self.processFileByName(fn, existing)
f = open(fn, 'rb')
sig = self.generateSignature(f)
if debug_on: debug("Sig for fn %s is %s" % (fn, sig))
sig_fn = sig+fn;
if sig in existing:
if not sig_fn in self.duplicates:
log("METRIC ns=forwarder.duplicatesig file=%s dupe_of=%s sig=%s" % (fn, existing[sig], sig))
self.duplicates[sig_fn] = True;
if sig in to_process: #take original duplicate out of to_process
del to_process[sig]
f.close()
else:
if debug_on: debug("Adding file %s %s" % (fn, sig))
existing[sig] = fn #leave in existing in case more than 2 duplicates
to_process[sig] = fn
filehandles[sig] = f

except Exception, e:
log("Exception: %s" % e)
log("METRIC ns=forwarder.error.preprocess file=\"%s\" exception=\"%s\"" % (fn, str(e).replace("\n", "")))
log("Exception=\"%s\"" % e)
if f: f.close()
exc_type, exc_value, exc_traceback = sys.exc_info()
traceback.print_tb(exc_traceback)


if debug_on: debug("To Process %s" % to_process)
sigs = to_process.keys()
for sig in sigs:
f = None
try:
fn = to_process[sig]
if debug_on: debug("Processing file %s %s" % (fn, sig))
f = filehandles[sig]
self.processFile(f, fn, sig)
except Exception, e:
log("METRIC ns=forwarder.error.process file=\"%s\" exception=\"%s\"" % (fn, str(e).replace("\n", "")))
log("Exception=\"%s\"" % e)
exc_type, exc_value, exc_traceback = sys.exc_info()
traceback.print_tb(exc_traceback)
f.close()
finally:
if f and not f.closed: f.close()

if len(existing) != len(self.offsets):
self.purgeOffsetsNotIn(existing)
self.purgeOffsetsNotIn(to_process)
if not self.follow:
break
time.sleep(1)
Expand Down
3 changes: 2 additions & 1 deletion tests/test-kill-int-forwarder.sh
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,11 @@ uuid=$(last_uuid)

sender -f 'myapp.*' -l "grep --line-buffered -v ${uuid}" > /dev/null &
sender_pid=$!
sleep 0.1 # to avoid race condition
sleep 1 # to avoid race condition

assert "ps ax | grep ${uuid} | grep -v sender | grep -v filter_wrapper | grep line-buffered | count_lines" 1


kill -SIGINT $sender_pid

wait $sender_pid
Expand Down
2 changes: 1 addition & 1 deletion tests/test-kill-term-forwarder.sh
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ uuid=$(last_uuid)

sender -f 'myapp.*' -l "grep --line-buffered -v ${uuid}" > /dev/null &
sender_pid=$!
sleep 0.1 # to avoid race condition
sleep 1 # to avoid race condition

assert "ps ax | grep ${uuid} | grep -v sender | grep -v filter_wrapper | grep line-buffered | count_lines" 1

Expand Down
11 changes: 10 additions & 1 deletion tests/test-signature-size.sh
Original file line number Diff line number Diff line change
@@ -1,5 +1,14 @@
echo "AAAAAAAAAAAAAAAAAA BBBBBBBBBBBBBBB" > myapp.log
echo "AAAAAAAAAAAAAAAAAA CCCCCCCCCCCCCCC" > myapp.log.1

assert "sender -s 15 'myapp.*' | count_lines" 1
assert "sender -s 15 'myapp.*' | count_lines" 0
assert "sender -s 30 'myapp.*' | count_lines" 2

echo "AAAAAAAAAAAAAAAAAA DDDDDDDDDDDDDDD" > myapp.log.2
echo "AAAAAAAAAAAAAAAAAA DDDDDDDDDDDDDDD" > myapp.log.3
echo "AAAAAAAAAAAAAAAAAA DDDDDDDDDDDDDDD" > myapp.log.4
echo "AAAAAAAAAAAAAAAAAA DDDDDDDDDDDDDDD" > myapp.log.5

assert "sender -s 30 'myapp.*' | count_lines" 2

assert "sender -s 30 'myapp.*' 2>&1 | grep forwarder.duplicatesig | count_lines" 3
4 changes: 2 additions & 2 deletions tests/test-tcp-raw.sh
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
for i in {1..10}; do log_random >> myapp.log; done

nc -l -p 19501 2>/dev/null > received.log || nc -l 19501 > received.log 2>/dev/null &
nc -l -p 19501 2>/dev/null > received.log || nc -l 19501 > received.log 2>/dev/null &

server_pid=$!

sleep 0.1
sleep 1

sender 'myapp.*' -t localhost:19501

Expand Down