From b022a2b7696d54f1ae119d44bab34b8e12862d19 Mon Sep 17 00:00:00 2001 From: Dmitry Cherniachenko <2sabio@gmail.com> Date: Tue, 19 Sep 2023 12:02:45 +0200 Subject: [PATCH 1/2] Make IO tests more platform independent - Use TempDir() instead of hardcoded "/tmp" - Apply filepath.FromSlash() when comparing file paths - Close output files to allow their deletion --- input_file_test.go | 32 +++++++++++------- output_file_test.go | 82 +++++++++++++++++++++++++++++---------------- 2 files changed, 72 insertions(+), 42 deletions(-) diff --git a/input_file_test.go b/input_file_test.go index 59d057e1..c106245e 100644 --- a/input_file_test.go +++ b/input_file_test.go @@ -89,22 +89,23 @@ func TestInputFileWithGETAndPOST(t *testing.T) { func TestInputFileMultipleFilesWithRequestsOnly(t *testing.T) { rnd := rand.Int63() + tmpDir := t.TempDir() - file1, _ := os.OpenFile(fmt.Sprintf("/tmp/%d_0", rnd), os.O_WRONLY|os.O_CREATE|os.O_TRUNC, 0660) + file1, _ := os.OpenFile(fmt.Sprintf(tmpDir + "/%d_0", rnd), os.O_WRONLY|os.O_CREATE|os.O_TRUNC, 0660) file1.Write([]byte("1 1 1\ntest1")) file1.Write([]byte(payloadSeparator)) file1.Write([]byte("1 1 3\ntest2")) file1.Write([]byte(payloadSeparator)) file1.Close() - file2, _ := os.OpenFile(fmt.Sprintf("/tmp/%d_1", rnd), os.O_WRONLY|os.O_CREATE|os.O_TRUNC, 0660) + file2, _ := os.OpenFile(fmt.Sprintf(tmpDir + "/%d_1", rnd), os.O_WRONLY|os.O_CREATE|os.O_TRUNC, 0660) file2.Write([]byte("1 1 2\ntest3")) file2.Write([]byte(payloadSeparator)) file2.Write([]byte("1 1 4\ntest4")) file2.Write([]byte(payloadSeparator)) file2.Close() - input := NewFileInput(fmt.Sprintf("/tmp/%d*", rnd), false, 100, 0, false) + input := NewFileInput(fmt.Sprintf(tmpDir + "/%d*", rnd), false, 100, 0, false) for i := '1'; i <= '4'; i++ { msg, _ := input.PluginRead() @@ -119,8 +120,9 @@ func TestInputFileMultipleFilesWithRequestsOnly(t *testing.T) { func TestInputFileRequestsWithLatency(t *testing.T) { rnd := rand.Int63() + tmpDir := t.TempDir() - file, _ := os.OpenFile(fmt.Sprintf("/tmp/%d", rnd), os.O_WRONLY|os.O_CREATE|os.O_TRUNC, 0660) + file, _ := os.OpenFile(fmt.Sprintf(tmpDir + "/%d", rnd), os.O_WRONLY|os.O_CREATE|os.O_TRUNC, 0660) defer file.Close() file.Write([]byte("1 1 100000000\nrequest1")) @@ -130,7 +132,7 @@ func TestInputFileRequestsWithLatency(t *testing.T) { file.Write([]byte("1 3 250000000\nrequest3")) file.Write([]byte(payloadSeparator)) - input := NewFileInput(fmt.Sprintf("/tmp/%d", rnd), false, 100, 0, false) + input := NewFileInput(fmt.Sprintf(tmpDir + "/%d", rnd), false, 100, 0, false) start := time.Now().UnixNano() for i := 0; i < 3; i++ { @@ -147,8 +149,9 @@ func TestInputFileRequestsWithLatency(t *testing.T) { func TestInputFileMultipleFilesWithRequestsAndResponses(t *testing.T) { rnd := rand.Int63() + tmpDir := t.TempDir() - file1, _ := os.OpenFile(fmt.Sprintf("/tmp/%d_0", rnd), os.O_WRONLY|os.O_CREATE|os.O_TRUNC, 0660) + file1, _ := os.OpenFile(fmt.Sprintf(tmpDir + "/%d_0", rnd), os.O_WRONLY|os.O_CREATE|os.O_TRUNC, 0660) file1.Write([]byte("1 1 1\nrequest1")) file1.Write([]byte(payloadSeparator)) file1.Write([]byte("2 1 1\nresponse1")) @@ -159,7 +162,7 @@ func TestInputFileMultipleFilesWithRequestsAndResponses(t *testing.T) { file1.Write([]byte(payloadSeparator)) file1.Close() - file2, _ := os.OpenFile(fmt.Sprintf("/tmp/%d_1", rnd), os.O_WRONLY|os.O_CREATE|os.O_TRUNC, 0660) + file2, _ := os.OpenFile(fmt.Sprintf(tmpDir + "/%d_1", rnd), os.O_WRONLY|os.O_CREATE|os.O_TRUNC, 0660) file2.Write([]byte("1 3 2\nrequest3")) file2.Write([]byte(payloadSeparator)) file2.Write([]byte("2 3 2\nresponse3")) @@ -170,7 +173,7 @@ func TestInputFileMultipleFilesWithRequestsAndResponses(t *testing.T) { file2.Write([]byte(payloadSeparator)) file2.Close() - input := NewFileInput(fmt.Sprintf("/tmp/%d*", rnd), false, 100, 0, false) + input := NewFileInput(fmt.Sprintf(tmpDir + "/%d*", rnd), false, 100, 0, false) for i := '1'; i <= '4'; i++ { msg, _ := input.PluginRead() @@ -190,15 +193,16 @@ func TestInputFileMultipleFilesWithRequestsAndResponses(t *testing.T) { func TestInputFileLoop(t *testing.T) { rnd := rand.Int63() + tmpDir := t.TempDir() - file, _ := os.OpenFile(fmt.Sprintf("/tmp/%d", rnd), os.O_WRONLY|os.O_CREATE|os.O_TRUNC, 0660) + file, _ := os.OpenFile(fmt.Sprintf(tmpDir + "/%d", rnd), os.O_WRONLY|os.O_CREATE|os.O_TRUNC, 0660) file.Write([]byte("1 1 1\ntest1")) file.Write([]byte(payloadSeparator)) file.Write([]byte("1 1 2\ntest2")) file.Write([]byte(payloadSeparator)) file.Close() - input := NewFileInput(fmt.Sprintf("/tmp/%d", rnd), true, 100, 0, false) + input := NewFileInput(fmt.Sprintf(tmpDir + "/%d", rnd), true, 100, 0, false) // Even if we have just 2 requests in file, it should indifinitly loop for i := 0; i < 1000; i++ { @@ -211,22 +215,23 @@ func TestInputFileLoop(t *testing.T) { func TestInputFileCompressed(t *testing.T) { rnd := rand.Int63() + tmpDir := t.TempDir() - output := NewFileOutput(fmt.Sprintf("/tmp/%d_0.gz", rnd), &FileOutputConfig{FlushInterval: time.Minute, Append: true}) + output := NewFileOutput(fmt.Sprintf(tmpDir + "/%d_0.gz", rnd), &FileOutputConfig{FlushInterval: time.Minute, Append: true}) for i := 0; i < 1000; i++ { output.PluginWrite(&Message{Meta: []byte("1 1 1\r\n"), Data: []byte("test")}) } name1 := output.file.Name() output.Close() - output2 := NewFileOutput(fmt.Sprintf("/tmp/%d_1.gz", rnd), &FileOutputConfig{FlushInterval: time.Minute, Append: true}) + output2 := NewFileOutput(fmt.Sprintf(tmpDir + "/%d_1.gz", rnd), &FileOutputConfig{FlushInterval: time.Minute, Append: true}) for i := 0; i < 1000; i++ { output2.PluginWrite(&Message{Meta: []byte("1 1 1\r\n"), Data: []byte("test")}) } name2 := output2.file.Name() output2.Close() - input := NewFileInput(fmt.Sprintf("/tmp/%d*", rnd), false, 100, 0, false) + input := NewFileInput(fmt.Sprintf(tmpDir + "/%d*", rnd), false, 100, 0, false) for i := 0; i < 2000; i++ { input.PluginRead() } @@ -318,6 +323,7 @@ func CreateCaptureFile(requestGenerator *RequestGenerator) *CaptureFile { time.Sleep(100 * time.Millisecond) emitter.Close() + outputFile.flush() return NewExpectedCaptureFile(readPayloads, f) diff --git a/output_file_test.go b/output_file_test.go index 65d0cf7c..8cea7740 100644 --- a/output_file_test.go +++ b/output_file_test.go @@ -5,6 +5,7 @@ import ( "github.com/buger/goreplay/internal/size" "math/rand" "os" + "path/filepath" "reflect" "sort" "sync" @@ -15,9 +16,10 @@ import ( func TestFileOutput(t *testing.T) { wg := new(sync.WaitGroup) + tmpDir := t.TempDir() input := NewTestInput() - output := NewFileOutput("/tmp/test_requests.gor", &FileOutputConfig{FlushInterval: time.Minute, Append: true}) + output := NewFileOutput(tmpDir + "/test_requests.gor", &FileOutputConfig{FlushInterval: time.Minute, Append: true}) plugins := &InOutPlugins{ Inputs: []PluginReader{input}, @@ -38,7 +40,7 @@ func TestFileOutput(t *testing.T) { emitter.Close() var counter int64 - input2 := NewFileInput("/tmp/test_requests.gor", false, 100, 0, false) + input2 := NewFileInput(tmpDir + "/test_requests.gor", false, 100, 0, false) output2 := NewTestOutput(func(*Message) { atomic.AddInt64(&counter, 1) wg.Done() @@ -69,10 +71,12 @@ func TestFileOutputWithNameCleaning(t *testing.T) { } func TestFileOutputPathTemplate(t *testing.T) { - output := &FileOutput{pathTemplate: "/tmp/log-%Y-%m-%d-%S-%t", config: &FileOutputConfig{FlushInterval: time.Minute, Append: true}} + tmpDir := t.TempDir() + + output := &FileOutput{pathTemplate: tmpDir + "/log-%Y-%m-%d-%S-%t", config: &FileOutputConfig{FlushInterval: time.Minute, Append: true}} now := time.Now() output.payloadType = []byte("3") - expectedPath := fmt.Sprintf("/tmp/log-%s-%s-%s-%s-3", now.Format("2006"), now.Format("01"), now.Format("02"), now.Format("05")) + expectedPath := fmt.Sprintf(tmpDir + "/log-%s-%s-%s-%s-3", now.Format("2006"), now.Format("01"), now.Format("02"), now.Format("05")) path := output.filename() if expectedPath != path { @@ -81,7 +85,9 @@ func TestFileOutputPathTemplate(t *testing.T) { } func TestFileOutputMultipleFiles(t *testing.T) { - output := NewFileOutput("/tmp/log-%Y-%m-%d-%S", &FileOutputConfig{Append: true, FlushInterval: time.Minute}) + tmpDir := t.TempDir() + + output := NewFileOutput(tmpDir + "/log-%Y-%m-%d-%S", &FileOutputConfig{Append: true, FlushInterval: time.Minute}) if output.file != nil { t.Error("Should not initialize file if no writes") @@ -98,6 +104,7 @@ func TestFileOutputMultipleFiles(t *testing.T) { output.PluginWrite(&Message{Meta: []byte("1 1 1\r\n"), Data: []byte("test")}) name3 := output.file.Name() + output.Close() if name2 != name1 { t.Error("Fast changes should happen in same file:", name1, name2, name3) @@ -112,7 +119,9 @@ func TestFileOutputMultipleFiles(t *testing.T) { } func TestFileOutputFilePerRequest(t *testing.T) { - output := NewFileOutput("/tmp/log-%Y-%m-%d-%S-%r", &FileOutputConfig{Append: true}) + tmpDir := t.TempDir() + + output := NewFileOutput(tmpDir + "/log-%Y-%m-%d-%S-%r", &FileOutputConfig{Append: true}) if output.file != nil { t.Error("Should not initialize file if no writes") @@ -129,6 +138,7 @@ func TestFileOutputFilePerRequest(t *testing.T) { output.PluginWrite(&Message{Meta: []byte("1 3 1\r\n"), Data: []byte("test")}) name3 := output.file.Name() + output.Close() if name3 == name2 || name2 == name1 || name3 == name1 { t.Error("File name should change:", name1, name2, name3) @@ -140,7 +150,9 @@ func TestFileOutputFilePerRequest(t *testing.T) { } func TestFileOutputCompression(t *testing.T) { - output := NewFileOutput("/tmp/log-%Y-%m-%d-%S.gz", &FileOutputConfig{Append: true, FlushInterval: time.Minute}) + tmpDir := t.TempDir() + + output := NewFileOutput(tmpDir + "/log-%Y-%m-%d-%S.gz", &FileOutputConfig{Append: true, FlushInterval: time.Minute}) if output.file != nil { t.Error("Should not initialize file if no writes") @@ -162,14 +174,16 @@ func TestFileOutputCompression(t *testing.T) { } func TestGetFileIndex(t *testing.T) { + tmpDir := t.TempDir() + var tests = []struct { path string index int }{ - {"/tmp/logs", -1}, - {"/tmp/logs_1", 1}, - {"/tmp/logs_2.gz", 2}, - {"/tmp/logs_0.gz", 0}, + {tmpDir + "/logs", -1}, + {tmpDir + "/logs_1", 1}, + {tmpDir + "/logs_2.gz", 2}, + {tmpDir + "/logs_0.gz", 0}, } for _, c := range tests { @@ -180,17 +194,19 @@ func TestGetFileIndex(t *testing.T) { } func TestSetFileIndex(t *testing.T) { + tmpDir := t.TempDir() + var tests = []struct { path string index int newPath string }{ - {"/tmp/logs", 0, "/tmp/logs_0"}, - {"/tmp/logs.gz", 1, "/tmp/logs_1.gz"}, - {"/tmp/logs_1", 0, "/tmp/logs_0"}, - {"/tmp/logs_0", 10, "/tmp/logs_10"}, - {"/tmp/logs_0.gz", 10, "/tmp/logs_10.gz"}, - {"/tmp/logs_underscores.gz", 10, "/tmp/logs_underscores_10.gz"}, + {tmpDir + "/logs", 0, tmpDir + "/logs_0"}, + {tmpDir + "/logs.gz", 1, tmpDir + "/logs_1.gz"}, + {tmpDir + "/logs_1", 0, tmpDir + "/logs_0"}, + {tmpDir + "/logs_0", 10, tmpDir + "/logs_10"}, + {tmpDir + "/logs_0.gz", 10, tmpDir + "/logs_10.gz"}, + {tmpDir + "/logs_underscores.gz", 10, tmpDir + "/logs_underscores_10.gz"}, } for _, c := range tests { @@ -202,7 +218,8 @@ func TestSetFileIndex(t *testing.T) { func TestFileOutputAppendQueueLimitOverflow(t *testing.T) { rnd := rand.Int63() - name := fmt.Sprintf("/tmp/%d", rnd) + tmpDir := t.TempDir() + name := fmt.Sprintf(tmpDir + "/%d", rnd) output := NewFileOutput(name, &FileOutputConfig{Append: false, FlushInterval: time.Minute, QueueLimit: 2}) @@ -216,12 +233,13 @@ func TestFileOutputAppendQueueLimitOverflow(t *testing.T) { output.PluginWrite(&Message{Meta: []byte("1 1 1\r\n"), Data: []byte("test")}) name3 := output.file.Name() + output.Close() - if name2 != name1 || name1 != fmt.Sprintf("/tmp/%d_0", rnd) { + if name2 != name1 || name1 != filepath.FromSlash(fmt.Sprintf(tmpDir + "/%d_0", rnd)) { t.Error("Fast changes should happen in same file:", name1, name2, name3) } - if name3 == name1 || name3 != fmt.Sprintf("/tmp/%d_1", rnd) { + if name3 == name1 || name3 != filepath.FromSlash(fmt.Sprintf(tmpDir + "/%d_1", rnd)) { t.Error("File name should change:", name1, name2, name3) } @@ -231,7 +249,8 @@ func TestFileOutputAppendQueueLimitOverflow(t *testing.T) { func TestFileOutputAppendQueueLimitNoOverflow(t *testing.T) { rnd := rand.Int63() - name := fmt.Sprintf("/tmp/%d", rnd) + tmpDir := t.TempDir() + name := fmt.Sprintf(tmpDir + "/%d", rnd) output := NewFileOutput(name, &FileOutputConfig{Append: false, FlushInterval: time.Minute, QueueLimit: 3}) @@ -245,12 +264,13 @@ func TestFileOutputAppendQueueLimitNoOverflow(t *testing.T) { output.PluginWrite(&Message{Meta: []byte("1 1 1\r\n"), Data: []byte("test")}) name3 := output.file.Name() + output.Close() - if name2 != name1 || name1 != fmt.Sprintf("/tmp/%d_0", rnd) { + if name2 != name1 || name1 != filepath.FromSlash(fmt.Sprintf(tmpDir + "/%d_0", rnd)) { t.Error("Fast changes should happen in same file:", name1, name2, name3) } - if name3 != name1 || name3 != fmt.Sprintf("/tmp/%d_0", rnd) { + if name3 != name1 || name3 != filepath.FromSlash(fmt.Sprintf(tmpDir + "/%d_0", rnd)) { t.Error("File name should not change:", name1, name2, name3) } @@ -260,7 +280,8 @@ func TestFileOutputAppendQueueLimitNoOverflow(t *testing.T) { func TestFileOutputAppendQueueLimitGzips(t *testing.T) { rnd := rand.Int63() - name := fmt.Sprintf("/tmp/%d.gz", rnd) + tmpDir := t.TempDir() + name := fmt.Sprintf(tmpDir + "/%d.gz", rnd) output := NewFileOutput(name, &FileOutputConfig{Append: false, FlushInterval: time.Minute, QueueLimit: 2}) @@ -274,12 +295,13 @@ func TestFileOutputAppendQueueLimitGzips(t *testing.T) { output.PluginWrite(&Message{Meta: []byte("1 1 1\r\n"), Data: []byte("test")}) name3 := output.file.Name() + output.Close() - if name2 != name1 || name1 != fmt.Sprintf("/tmp/%d_0.gz", rnd) { + if name2 != name1 || name1 != filepath.FromSlash(fmt.Sprintf(tmpDir + "/%d_0.gz", rnd)) { t.Error("Fast changes should happen in same file:", name1, name2, name3) } - if name3 == name1 || name3 != fmt.Sprintf("/tmp/%d_1.gz", rnd) { + if name3 == name1 || name3 != filepath.FromSlash(fmt.Sprintf(tmpDir + "/%d_1.gz", rnd)) { t.Error("File name should change:", name1, name2, name3) } @@ -299,7 +321,8 @@ func TestFileOutputSort(t *testing.T) { func TestFileOutputAppendSizeLimitOverflow(t *testing.T) { rnd := rand.Int63() - name := fmt.Sprintf("/tmp/%d", rnd) + tmpDir := t.TempDir() + name := fmt.Sprintf(tmpDir + "/%d", rnd) message := []byte("1 1 1\r\ntest") @@ -317,12 +340,13 @@ func TestFileOutputAppendSizeLimitOverflow(t *testing.T) { output.PluginWrite(&Message{Meta: []byte("1 1 1\r\n"), Data: []byte("test")}) name3 := output.file.Name() + output.Close() - if name2 != name1 || name1 != fmt.Sprintf("/tmp/%d_0", rnd) { + if name2 != name1 || name1 != filepath.FromSlash(fmt.Sprintf(tmpDir + "/%d_0", rnd)) { t.Error("Fast changes should happen in same file:", name1, name2, name3) } - if name3 == name1 || name3 != fmt.Sprintf("/tmp/%d_1", rnd) { + if name3 == name1 || name3 != filepath.FromSlash(fmt.Sprintf(tmpDir + "/%d_1", rnd)) { t.Error("File name should change:", name1, name2, name3) } From bc8d0038f137b1252a457bdd7a354e419ce6171c Mon Sep 17 00:00:00 2001 From: Dmitry Cherniachenko <2sabio@gmail.com> Date: Tue, 19 Sep 2023 12:20:40 +0200 Subject: [PATCH 2/2] Add support for Zstandard compressed input and output files (.zst) --- input_file.go | 8 ++++++ input_file_test.go | 29 ++++++++++++++++++++- output_file.go | 10 ++++++++ output_file_test.go | 61 ++++++++++++++++++++++++++++++++++++++++++++- 4 files changed, 106 insertions(+), 2 deletions(-) diff --git a/input_file.go b/input_file.go index feaf479d..f581270f 100644 --- a/input_file.go +++ b/input_file.go @@ -8,6 +8,7 @@ import ( "errors" "expvar" "fmt" + "github.com/klauspost/compress/zstd" "io" "math" "os" @@ -187,6 +188,13 @@ func newFileInputReader(path string, readDepth int, dryRun bool) *fileInputReade return nil } r.reader = bufio.NewReader(gzReader) + } else if strings.HasSuffix(path, ".zst") { + zstdReader, err := zstd.NewReader(file) + if err != nil { + Debug(0, fmt.Sprintf("[INPUT-FILE] err: %q", err)) + return nil + } + r.reader = bufio.NewReader(zstdReader) } else { r.reader = bufio.NewReader(file) } diff --git a/input_file_test.go b/input_file_test.go index c106245e..77ef73a5 100644 --- a/input_file_test.go +++ b/input_file_test.go @@ -213,7 +213,7 @@ func TestInputFileLoop(t *testing.T) { os.Remove(file.Name()) } -func TestInputFileCompressed(t *testing.T) { +func TestInputFileCompressedGzip(t *testing.T) { rnd := rand.Int63() tmpDir := t.TempDir() @@ -240,6 +240,33 @@ func TestInputFileCompressed(t *testing.T) { os.Remove(name2) } +func TestInputFileCompressedZstd(t *testing.T) { + rnd := rand.Int63() + tmpDir := t.TempDir() + + output := NewFileOutput(fmt.Sprintf(tmpDir + "/%d_0.zst", rnd), &FileOutputConfig{FlushInterval: time.Minute, Append: true}) + for i := 0; i < 1000; i++ { + output.PluginWrite(&Message{Meta: []byte("1 1 1\r\n"), Data: []byte("test")}) + } + name1 := output.file.Name() + output.Close() + + output2 := NewFileOutput(fmt.Sprintf(tmpDir + "/%d_1.zst", rnd), &FileOutputConfig{FlushInterval: time.Minute, Append: true}) + for i := 0; i < 1000; i++ { + output2.PluginWrite(&Message{Meta: []byte("1 1 1\r\n"), Data: []byte("test")}) + } + name2 := output2.file.Name() + output2.Close() + + input := NewFileInput(fmt.Sprintf(tmpDir + "/%d*", rnd), false, 100, 0, false) + for i := 0; i < 2000; i++ { + input.PluginRead() + } + + os.Remove(name1) + os.Remove(name2) +} + type CaptureFile struct { msgs []*Message file *os.File diff --git a/output_file.go b/output_file.go index 4b5a0401..de50b257 100644 --- a/output_file.go +++ b/output_file.go @@ -6,6 +6,7 @@ import ( "errors" "fmt" "github.com/buger/goreplay/internal/size" + "github.com/klauspost/compress/zstd" "io" "log" "math/rand" @@ -231,6 +232,11 @@ func (o *FileOutput) PluginWrite(msg *Message) (n int, err error) { if strings.HasSuffix(o.currentName, ".gz") { o.writer = gzip.NewWriter(o.file) + } else if strings.HasSuffix(o.currentName, ".zst") { + o.writer, err = zstd.NewWriter(o.file) + if err != nil { + log.Fatal(o, "Error opening file %q. Error: %s", o.currentName, err) + } } else { o.writer = bufio.NewWriter(o.file) } @@ -274,6 +280,8 @@ func (o *FileOutput) flush() { if o.file != nil { if strings.HasSuffix(o.currentName, ".gz") { o.writer.(*gzip.Writer).Flush() + } else if strings.HasSuffix(o.currentName, ".zst") { + o.writer.(*zstd.Encoder).Flush() } else { o.writer.(*bufio.Writer).Flush() } @@ -294,6 +302,8 @@ func (o *FileOutput) closeLocked() error { if o.file != nil { if strings.HasSuffix(o.currentName, ".gz") { o.writer.(*gzip.Writer).Close() + } else if strings.HasSuffix(o.currentName, ".zst") { + o.writer.(*zstd.Encoder).Close() } else { o.writer.(*bufio.Writer).Flush() } diff --git a/output_file_test.go b/output_file_test.go index 8cea7740..1a973d8f 100644 --- a/output_file_test.go +++ b/output_file_test.go @@ -149,7 +149,7 @@ func TestFileOutputFilePerRequest(t *testing.T) { os.Remove(name3) } -func TestFileOutputCompression(t *testing.T) { +func TestFileOutputCompressionGzip(t *testing.T) { tmpDir := t.TempDir() output := NewFileOutput(tmpDir + "/log-%Y-%m-%d-%S.gz", &FileOutputConfig{Append: true, FlushInterval: time.Minute}) @@ -173,6 +173,30 @@ func TestFileOutputCompression(t *testing.T) { os.Remove(name) } +func TestFileOutputCompressionZstd(t *testing.T) { + tmpDir := t.TempDir() + + output := NewFileOutput(tmpDir + "/log-%Y-%m-%d-%S.zst", &FileOutputConfig{Append: true, FlushInterval: time.Minute}) + + if output.file != nil { + t.Error("Should not initialize file if no writes") + } + + for i := 0; i < 1000; i++ { + output.PluginWrite(&Message{Meta: []byte("1 1 1\r\n"), Data: []byte("test")}) + } + + name := output.file.Name() + output.Close() + + s, _ := os.Stat(name) + if s.Size() == 12*1000 { + t.Error("Should be compressed file:", s.Size()) + } + + os.Remove(name) +} + func TestGetFileIndex(t *testing.T) { tmpDir := t.TempDir() @@ -184,6 +208,8 @@ func TestGetFileIndex(t *testing.T) { {tmpDir + "/logs_1", 1}, {tmpDir + "/logs_2.gz", 2}, {tmpDir + "/logs_0.gz", 0}, + {tmpDir + "/logs_7.zst", 7}, + {tmpDir + "/logs_5.zst", 5}, } for _, c := range tests { @@ -203,9 +229,11 @@ func TestSetFileIndex(t *testing.T) { }{ {tmpDir + "/logs", 0, tmpDir + "/logs_0"}, {tmpDir + "/logs.gz", 1, tmpDir + "/logs_1.gz"}, + {tmpDir + "/logs.zst", 1, tmpDir + "/logs_1.zst"}, {tmpDir + "/logs_1", 0, tmpDir + "/logs_0"}, {tmpDir + "/logs_0", 10, tmpDir + "/logs_10"}, {tmpDir + "/logs_0.gz", 10, tmpDir + "/logs_10.gz"}, + {tmpDir + "/logs_0.zst", 10, tmpDir + "/logs_10.zst"}, {tmpDir + "/logs_underscores.gz", 10, tmpDir + "/logs_underscores_10.gz"}, } @@ -309,6 +337,37 @@ func TestFileOutputAppendQueueLimitGzips(t *testing.T) { os.Remove(name3) } +func TestFileOutputAppendQueueLimitZstd(t *testing.T) { + rnd := rand.Int63() + tmpDir := t.TempDir() + name := fmt.Sprintf(tmpDir + "/%d.zst", rnd) + + output := NewFileOutput(name, &FileOutputConfig{Append: false, FlushInterval: time.Minute, QueueLimit: 2}) + + output.PluginWrite(&Message{Meta: []byte("1 1 1\r\n"), Data: []byte("test")}) + name1 := output.file.Name() + + output.PluginWrite(&Message{Meta: []byte("1 1 1\r\n"), Data: []byte("test")}) + name2 := output.file.Name() + + output.updateName() + + output.PluginWrite(&Message{Meta: []byte("1 1 1\r\n"), Data: []byte("test")}) + name3 := output.file.Name() + output.Close() + + if name2 != name1 || name1 != filepath.FromSlash(fmt.Sprintf(tmpDir + "/%d_0.zst", rnd)) { + t.Error("Fast changes should happen in same file:", name1, name2, name3) + } + + if name3 == name1 || name3 != filepath.FromSlash(fmt.Sprintf(tmpDir + "/%d_1.zst", rnd)) { + t.Error("File name should change:", name1, name2, name3) + } + + os.Remove(name1) + os.Remove(name3) +} + func TestFileOutputSort(t *testing.T) { var files = []string{"2016_0", "2014_10", "2015_0", "2015_10", "2015_2"} var expected = []string{"2014_10", "2015_0", "2015_2", "2015_10", "2016_0"}