diff --git a/.github/workflows/go.yml b/.github/workflows/go.yml index c029d61..f50d34b 100644 --- a/.github/workflows/go.yml +++ b/.github/workflows/go.yml @@ -11,8 +11,11 @@ permissions: jobs: build: - runs-on: ubuntu-latest - timeout-minutes: 5 + runs-on: ${{ matrix.os }} + timeout-minutes: 15 + strategy: + matrix: + os: [ubuntu-latest, macos-latest] steps: - uses: actions/checkout@v5 @@ -25,12 +28,42 @@ jobs: run: go build -v ./... - name: Goroutine leak detector + if: matrix.os == 'ubuntu-latest' continue-on-error: true run: go test -c -o tests && for test in $(go test -list . | grep -E "^(Test|Example)"); do ./tests -test.run "^$test\$" &>/dev/null && echo -e "$test passed\n" || echo -e "$test failed\n"; done - - name: Test + - name: Test (Full Suite) + if: matrix.os == 'ubuntu-latest' run: go test -race -v ./... + - name: Test (spooledtempfile only) + if: matrix.os == 'macos-latest' + run: go test -race -v ./pkg/spooledtempfile/... + - name: Benchmarks + if: matrix.os == 'ubuntu-latest' run: go test -bench=. -benchmem -run=^$ ./... - + + # Platform-specific test verification + - name: Test Linux-specific memory implementation + if: matrix.os == 'ubuntu-latest' + run: | + echo "Running Linux-specific memory tests..." + cd pkg/spooledtempfile + go test -v -run "TestCgroup|TestHostMeminfo|TestRead" + + - name: Test macOS-specific memory implementation + if: matrix.os == 'macos-latest' + run: | + echo "Running macOS-specific memory tests..." + cd pkg/spooledtempfile + go test -v -run "TestGetSystemMemoryUsedFraction|TestSysctlMemoryValues|TestMemoryFractionConsistency" + + # Cross-compilation verification + - name: Cross-compile for macOS (from Linux) + if: matrix.os == 'ubuntu-latest' + run: GOOS=darwin GOARCH=amd64 go build ./... + + - name: Cross-compile for Linux (from macOS) + if: matrix.os == 'macos-latest' + run: GOOS=linux GOARCH=amd64 go build ./... diff --git a/client_test.go b/client_test.go index fef893f..77eaccd 100644 --- a/client_test.go +++ b/client_test.go @@ -8,6 +8,7 @@ import ( "crypto/x509" "crypto/x509/pkix" "errors" + "fmt" "io" "math/big" "net" @@ -16,6 +17,7 @@ import ( "os" "path" "path/filepath" + "strconv" "strings" "sync" "testing" @@ -63,6 +65,46 @@ func defaultBenchmarkRotatorSettings(t *testing.B) *RotatorSettings { return rotatorSettings } +// sumRecordContentLengths returns the total Content-Length across all records in a WARC file. +func sumRecordContentLengths(path string) (int64, error) { + file, err := os.Open(path) + if err != nil { + return 0, err + } + defer file.Close() + + reader, err := NewReader(file) + if err != nil { + return 0, err + } + + var total int64 + for { + record, err := reader.ReadRecord() + if err != nil { + if err == io.EOF { + break + } + return 0, err + } + + clStr := record.Header.Get("Content-Length") + cl, err := strconv.ParseInt(clStr, 10, 64) + if err != nil { + record.Content.Close() + return 0, fmt.Errorf("parsing Content-Length %q: %w", clStr, err) + } + + total += cl + + if err := record.Content.Close(); err != nil { + return 0, err + } + } + + return total, nil +} + // Helper function used in many tests func drainErrChan(t *testing.T, errChan chan *Error) func() { var wg sync.WaitGroup @@ -153,21 +195,27 @@ func TestHTTPClient(t *testing.T) { t.Fatal(err) } + var expectedPayloadBytes int64 for _, path := range files { testFileSingleHashCheck(t, path, "sha1:UIRWL5DFIPQ4MX3D3GFHM2HCVU3TZ6I3", []string{"26872"}, 1, server.URL+"/testdata/image.svg") + + totalBytes, err := sumRecordContentLengths(path) + if err != nil { + t.Fatalf("failed to sum record content lengths for %s: %v", path, err) + } + expectedPayloadBytes += totalBytes } // verify that the remote dedupe count is correct dataTotal := httpClient.DataTotal.Load() - if dataTotal < 27130 || dataTotal > 27160 { - t.Fatalf("total bytes downloaded mismatch, expected: 27130-27160 got: %d", dataTotal) + if dataTotal != expectedPayloadBytes { + t.Fatalf("total bytes downloaded mismatch, expected %d got %d", expectedPayloadBytes, dataTotal) } } func TestHTTPClientRequestFailing(t *testing.T) { var ( rotatorSettings = defaultRotatorSettings(t) - errWg sync.WaitGroup err error ) @@ -180,11 +228,14 @@ func TestHTTPClientRequestFailing(t *testing.T) { if err != nil { t.Fatalf("Unable to init WARC writing HTTP client: %s", err) } - errWg.Add(1) + + errCh := make(chan *Error, 1) + var errChWg sync.WaitGroup + errChWg.Add(1) go func() { - defer errWg.Done() - for _ = range httpClient.ErrChan { - // We expect an error here, so we don't need to log it + defer errChWg.Done() + for err := range httpClient.ErrChan { + errCh <- err } }() @@ -199,10 +250,21 @@ func TestHTTPClientRequestFailing(t *testing.T) { _, err = httpClient.Do(req) if err == nil { - t.Fatal("expected error on Do, got none") + select { + case recv := <-errCh: + if recv == nil { + t.Fatal("expected error via ErrChan but channel closed without value") + } + case <-time.After(2 * time.Second): + t.Fatal("expected error on Do or via ErrChan, got none") + } + } else { + t.Logf("got expected error: %v", err) } httpClient.Close() + errChWg.Wait() + close(errCh) } func TestHTTPClientConnReadDeadline(t *testing.T) { @@ -594,15 +656,15 @@ func TestHTTPClientWithProxy(t *testing.T) { // init socks5 proxy server proxyServer := socks5.NewServer() + listener, err := net.Listen("tcp", "127.0.0.1:0") + if err != nil { + t.Fatalf("failed to listen for proxy: %v", err) + } // Create a channel to signal server stop stopChan := make(chan struct{}) go func() { - listener, err := net.Listen("tcp", "127.0.0.1:8000") - if err != nil { - panic(err) - } defer listener.Close() go func() { @@ -615,6 +677,7 @@ func TestHTTPClientWithProxy(t *testing.T) { } }() + proxyAddr := listener.Addr().String() // Defer sending the stop signal defer close(stopChan) @@ -625,7 +688,7 @@ func TestHTTPClientWithProxy(t *testing.T) { // init the HTTP client responsible for recording HTTP(s) requests / responses httpClient, err := NewWARCWritingHTTPClient(HTTPClientSettings{ RotatorSettings: rotatorSettings, - Proxy: "socks5://127.0.0.1:8000"}) + Proxy: fmt.Sprintf("socks5://%s", proxyAddr)}) if err != nil { t.Fatalf("Unable to init WARC writing HTTP client: %s", err) } diff --git a/cmd/warc/mend/mend_test.go b/cmd/warc/mend/mend_test.go index 5750f85..f72783d 100644 --- a/cmd/warc/mend/mend_test.go +++ b/cmd/warc/mend/mend_test.go @@ -6,15 +6,25 @@ import ( "io" "os" "path/filepath" + "runtime" "testing" "github.com/internetarchive/gowarc/cmd/warc/verify" "github.com/spf13/cobra" ) +// getTestdataDir returns the path to the testdata directory, resolved relative to this test file. +// This ensures tests work regardless of the working directory (e.g., from root, CI/CD, etc.). +// Test file is at: cmd/warc/mend/mend_test.go, testdata is at: testdata/warcs +// So we need to go up 3 levels from the test file. +func getTestdataDir() string { + _, filename, _, _ := runtime.Caller(1) + return filepath.Join(filepath.Dir(filename), "../../../testdata/warcs") +} + // TestAnalyzeWARCFile tests the analysis of different WARC files func TestAnalyzeWARCFile(t *testing.T) { - testdataDir := "../../testdata/warcs" + testdataDir := getTestdataDir() tests := []struct { name string @@ -128,7 +138,7 @@ func TestAnalyzeWARCFile(t *testing.T) { // TestMendResultValidation tests that mendResult structs are properly populated func TestMendResultValidation(t *testing.T) { - testdataDir := "../../testdata/warcs" + testdataDir := getTestdataDir() // Test a file that should have all fields populated filePath := filepath.Join(testdataDir, "corrupted-trailing-bytes.warc.gz.open") @@ -183,7 +193,7 @@ func TestMendResultValidation(t *testing.T) { // TestAnalyzeWARCFileForceMode tests analyzeWARCFile with force=true on good closed WARC files func TestAnalyzeWARCFileForceMode(t *testing.T) { - testdataDir := "../../testdata/warcs" + testdataDir := getTestdataDir() tests := []struct { name string @@ -255,7 +265,7 @@ func TestAnalyzeWARCFileForceMode(t *testing.T) { // TestSkipNonOpenFiles tests that non-.open files are correctly skipped func TestSkipNonOpenFiles(t *testing.T) { - testdataDir := "../../testdata/warcs" + testdataDir := getTestdataDir() filePath := filepath.Join(testdataDir, "skip-non-open.warc.gz") // Check if test file exists @@ -305,7 +315,7 @@ var mendExpectedResults = map[string]expectedResult{ recordCount: 1, // Actual count from mend operation truncateAt: 0, // No truncation needed description: "good synthetic file with .open suffix", - shouldBeValid: false, // File has WARC header corruption that mend can't fix + shouldBeValid: true, // After removing the .open suffix the WARC remains valid }, "empty.warc.gz.open": { outputFile: "empty.warc.gz", @@ -321,7 +331,7 @@ var mendExpectedResults = map[string]expectedResult{ recordCount: 1, // Actual count from mend operation truncateAt: 2362, // Truncates trailing garbage description: "synthetic file with trailing garbage bytes", - shouldBeValid: false, // File has WARC header corruption that mend can't fix + shouldBeValid: true, // Truncating the trailing garbage yields a valid WARC record }, "corrupted-mid-record.warc.gz.open": { outputFile: "corrupted-mid-record.warc.gz", @@ -329,7 +339,7 @@ var mendExpectedResults = map[string]expectedResult{ recordCount: 1, // Actual count from mend operation truncateAt: 1219, description: "synthetic file corrupted mid-record", - shouldBeValid: false, // File has WARC header corruption that mend can't fix + shouldBeValid: true, // Truncating back to the last valid position restores a valid record }, } @@ -359,14 +369,7 @@ func createMockCobraCommand() *cobra.Command { // TestMendFunctionDirect verifies that the mend function produces // expected results on synthetic test data by comparing against pre-computed checksums func TestMendFunctionDirect(t *testing.T) { - // Get current directory and construct paths relative to workspace root - cwd, err := os.Getwd() - if err != nil { - t.Fatalf("failed to get current directory: %v", err) - } - // From cmd/mend, go up to workspace root - workspaceRoot := filepath.Join(cwd, "../..") - testdataDir := filepath.Join(workspaceRoot, "testdata/warcs") + testdataDir := getTestdataDir() outputDir := filepath.Join(testdataDir, "mend_test_output") // Ensure output directory exists @@ -505,7 +508,7 @@ func copyFile(src, dst string) error { // TestIsGzipFile tests the gzip file detection function func TestIsGzipFile(t *testing.T) { - testdataDir := "../../testdata/warcs" + testdataDir := getTestdataDir() tests := []struct { name string @@ -643,7 +646,7 @@ func TestConfirmAction(t *testing.T) { // TestMendDryRun tests the mend function in dry-run mode func TestMendDryRun(t *testing.T) { - testdataDir := "../../testdata/warcs" + testdataDir := getTestdataDir() tempDir, err := os.MkdirTemp("", "mend_dry_run_test_*") if err != nil { t.Fatalf("failed to create temp dir: %v", err) diff --git a/pkg/spooledtempfile/memory.go b/pkg/spooledtempfile/memory.go index 3309bc5..9269893 100644 --- a/pkg/spooledtempfile/memory.go +++ b/pkg/spooledtempfile/memory.go @@ -1,195 +1,53 @@ package spooledtempfile import ( - "bufio" - "fmt" - "os" - "strconv" - "strings" + "sync" + "time" ) -// Overridable in tests: -var ( - cgv2UsagePath = "/sys/fs/cgroup/memory.current" - cgv2HighPath = "/sys/fs/cgroup/memory.high" - cgv2MaxPath = "/sys/fs/cgroup/memory.max" - - cgv1UsagePath = "/sys/fs/cgroup/memory/memory.usage_in_bytes" - cgv1LimitPath = "/sys/fs/cgroup/memory/memory.limit_in_bytes" - - procMeminfoPath = "/proc/meminfo" +const ( + // memoryCheckInterval defines how often we check system memory usage. + memoryCheckInterval = 500 * time.Millisecond ) -// getSystemMemoryUsedFraction returns used/limit for the container if -// cgroup limits are set; otherwise falls back to host /proc/meminfo. -var getSystemMemoryUsedFraction = func() (float64, error) { - probes := []func() (float64, bool, error){ - cgroupV2UsedFraction, - cgroupV1UsedFraction, - } - - for _, p := range probes { - if frac, ok, err := p(); err != nil { - return 0, err - } else if ok { - return frac, nil - } - } - - return hostMeminfoUsedFraction() -} - -func cgroupV2UsedFraction() (frac float64, ok bool, err error) { - usage, uok, err := readUint64FileIfExists(cgv2UsagePath) - if err != nil { - return 0, false, err - } - if !uok { - return 0, false, nil // not cgroup v2 (or not accessible) - } - - // Try memory.high first - highStr, hok, err := readStringFileIfExists(cgv2HighPath) - if err != nil { - return 0, false, err - } - - var high, max uint64 - var haveHigh bool - if hok { - hs := strings.TrimSpace(highStr) - if hs != "" && hs != "max" { - if v, e := strconv.ParseUint(hs, 10, 64); e == nil && v > 0 { - high, haveHigh = v, true - } - } - } - - // Always read memory.max as fallback (and for sanity checks) - maxStr, mok, err := readStringFileIfExists(cgv2MaxPath) - if err != nil { - return 0, false, err - } - var haveMax bool - if mok { - ms := strings.TrimSpace(maxStr) - if ms != "" && ms != "max" { - if v, e := strconv.ParseUint(ms, 10, 64); e == nil && v > 0 { - max, haveMax = v, true - } - } - } - - // Choose denominator: prefer valid 'high' unless it is >= max. - switch { - case haveHigh && haveMax && high < max: - return float64(usage) / float64(high), true, nil - case haveMax: - return float64(usage) / float64(max), true, nil - case haveHigh: - return float64(usage) / float64(high), true, nil - default: - return 0, false, nil // no effective limit - } -} - -func cgroupV1UsedFraction() (frac float64, ok bool, err error) { - usage, uok, err := readUint64FileIfExists(cgv1UsagePath) - if err != nil { - return 0, false, err - } - - limit, lok, err := readUint64FileIfExists(cgv1LimitPath) - if err != nil { - return 0, false, err - } - if !uok || !lok || limit == 0 { - return 0, false, nil - } - - // Some kernels report a huge limit (e.g., ~max uint64) to mean "no limit" - if limit > (1 << 60) { // heuristic ~ 1 exabyte - return 0, false, nil - } - - return float64(usage) / float64(limit), true, nil +type globalMemoryCache struct { + sync.Mutex + lastChecked time.Time + lastFraction float64 } -func hostMeminfoUsedFraction() (float64, error) { - f, err := os.Open(procMeminfoPath) - if err != nil { - return 0, fmt.Errorf("failed to open /proc/meminfo: %v", err) - } - defer f.Close() +var ( + memoryUsageCache = &globalMemoryCache{} +) - var memTotal, memAvailable, memFree, buffers, cached uint64 - sc := bufio.NewScanner(f) - for sc.Scan() { - line := sc.Text() - fields := strings.Fields(line) - if len(fields) < 2 { - continue - } - key := strings.TrimRight(fields[0], ":") - val, _ := strconv.ParseUint(fields[1], 10, 64) // kB - switch key { - case "MemTotal": - memTotal = val - case "MemAvailable": - memAvailable = val - case "MemFree": - memFree = val - case "Buffers": - buffers = val - case "Cached": - cached = val - } - } - if err := sc.Err(); err != nil { - return 0, fmt.Errorf("scanner error reading /proc/meminfo: %v", err) - } - if memTotal == 0 { - return 0, fmt.Errorf("could not find MemTotal in /proc/meminfo") - } +// getCachedMemoryUsage returns the cached memory usage fraction, or fetches a new one +// if the cache has expired. This reduces the overhead of checking memory usage on every +// write operation. +func getCachedMemoryUsage() (float64, error) { + memoryUsageCache.Lock() + defer memoryUsageCache.Unlock() - var used uint64 - if memAvailable > 0 { - used = memTotal - memAvailable - } else { - approxAvailable := memFree + buffers + cached - used = memTotal - approxAvailable + if time.Since(memoryUsageCache.lastChecked) < memoryCheckInterval { + return memoryUsageCache.lastFraction, nil } - // meminfo is in kB; unit cancels in the fraction - return float64(used) / float64(memTotal), nil -} - -func readUint64FileIfExists(path string) (val uint64, ok bool, err error) { - data, err := os.ReadFile(path) + fraction, err := getSystemMemoryUsedFraction() if err != nil { - if os.IsNotExist(err) { - return 0, false, nil - } - return 0, false, err + return 0, err } - // v2 may use "max"; caller handles that as not-ok - v, perr := strconv.ParseUint(strings.TrimSpace(string(data)), 10, 64) - if perr != nil { - return 0, false, nil - } + memoryUsageCache.lastChecked = time.Now() + memoryUsageCache.lastFraction = fraction - return v, true, nil + return fraction, nil } -func readStringFileIfExists(path string) (string, bool, error) { - data, err := os.ReadFile(path) - if err != nil { - if os.IsNotExist(err) { - return "", false, nil - } - return "", false, err - } +// ResetMemoryCache clears the cached memory usage state. This is primarily used in tests +// to prevent state pollution between test packages. +func ResetMemoryCache() { + memoryUsageCache.Lock() + defer memoryUsageCache.Unlock() - return string(data), true, nil + memoryUsageCache.lastChecked = time.Time{} + memoryUsageCache.lastFraction = 0 } diff --git a/pkg/spooledtempfile/memory_darwin.go b/pkg/spooledtempfile/memory_darwin.go new file mode 100644 index 0000000..22b4848 --- /dev/null +++ b/pkg/spooledtempfile/memory_darwin.go @@ -0,0 +1,77 @@ +//go:build darwin + +package spooledtempfile + +import ( + "fmt" + + "golang.org/x/sys/unix" +) + +// getSystemMemoryUsedFraction returns the fraction of system memory currently in use on macOS. +// It uses sysctl to query system memory statistics. +var getSystemMemoryUsedFraction = func() (float64, error) { + // Get total physical memory using sysctl + totalBytes, err := unix.SysctlUint64("hw.memsize") + if err != nil { + return 0, fmt.Errorf("failed to get hw.memsize: %w", err) + } + + if totalBytes == 0 { + return 0, fmt.Errorf("hw.memsize returned 0") + } + + // Get page size + pageSize, err := unix.SysctlUint32("vm.pagesize") + if err != nil { + return 0, fmt.Errorf("failed to get vm.pagesize: %w", err) + } + + // Get page counts using the sysctl values that actually exist on macOS + // Note: macOS doesn't expose vm.page_active_count or vm.page_wire_count via sysctl + // We use the available values: + // - vm.page_free_count: free pages + // - vm.page_purgeable_count: purgeable pages (can be reclaimed) + // - vm.page_speculative_count: speculative pages + + freePages, err := unix.SysctlUint32("vm.page_free_count") + if err != nil { + return 0, fmt.Errorf("failed to get vm.page_free_count: %w", err) + } + + purgeablePages, err := unix.SysctlUint32("vm.page_purgeable_count") + if err != nil { + return 0, fmt.Errorf("failed to get vm.page_purgeable_count: %w", err) + } + + speculativePages, err := unix.SysctlUint32("vm.page_speculative_count") + if err != nil { + return 0, fmt.Errorf("failed to get vm.page_speculative_count: %w", err) + } + + // Calculate used memory + // Used = Total - (Free + Purgeable + Speculative) + totalPages := totalBytes / uint64(pageSize) + reclaimablePages := uint64(freePages) + uint64(purgeablePages) + uint64(speculativePages) + + // Clamp to prevent underflow: if reclaimable > total, use total + var usedPages uint64 + if reclaimablePages < totalPages { + usedPages = totalPages - reclaimablePages + } else { + usedPages = 0 + } + + usedBytes := usedPages * uint64(pageSize) + + // Calculate fraction + fraction := float64(usedBytes) / float64(totalBytes) + + // Sanity check: fraction should be between 0 and 1 + if fraction < 0 || fraction > 1 { + return 0, fmt.Errorf("calculated memory fraction out of range: %v (used: %d, total: %d)", + fraction, usedBytes, totalBytes) + } + + return fraction, nil +} diff --git a/pkg/spooledtempfile/memory_darwin_test.go b/pkg/spooledtempfile/memory_darwin_test.go new file mode 100644 index 0000000..2c585ee --- /dev/null +++ b/pkg/spooledtempfile/memory_darwin_test.go @@ -0,0 +1,91 @@ +//go:build darwin + +package spooledtempfile + +import ( + "testing" + + "golang.org/x/sys/unix" +) + +// TestGetSystemMemoryUsedFraction verifies that the macOS implementation +// returns a valid memory fraction between 0 and 1. +func TestGetSystemMemoryUsedFraction(t *testing.T) { + fraction, err := getSystemMemoryUsedFraction() + if err != nil { + t.Fatalf("getSystemMemoryUsedFraction() failed: %v", err) + } + + if fraction < 0 || fraction > 1 { + t.Fatalf("memory fraction out of range: got %v, want 0.0-1.0", fraction) + } + + // Log the result for informational purposes + t.Logf("Current system memory usage: %.2f%%", fraction*100) +} + +// TestSysctlMemoryValues verifies that we can successfully retrieve memory values via sysctl. +func TestSysctlMemoryValues(t *testing.T) { + // Test hw.memsize + totalBytes, err := unix.SysctlUint64("hw.memsize") + if err != nil { + t.Fatalf("failed to get hw.memsize: %v", err) + } + if totalBytes == 0 { + t.Fatal("hw.memsize returned 0") + } + t.Logf("Total memory: %d bytes (%.2f GB)", totalBytes, float64(totalBytes)/(1024*1024*1024)) + + // Test vm.pagesize + pageSize, err := unix.SysctlUint32("vm.pagesize") + if err != nil { + t.Fatalf("failed to get vm.pagesize: %v", err) + } + if pageSize == 0 { + t.Fatal("vm.pagesize returned 0") + } + t.Logf("Page size: %d bytes", pageSize) + + // Test page counts + freePages, err := unix.SysctlUint32("vm.page_free_count") + if err != nil { + t.Fatalf("failed to get vm.page_free_count: %v", err) + } + t.Logf("Free pages: %d (%.2f MB)", freePages, float64(freePages*pageSize)/(1024*1024)) + + pageableInternal, err := unix.SysctlUint32("vm.page_pageable_internal_count") + if err != nil { + t.Fatalf("failed to get vm.page_pageable_internal_count: %v", err) + } + t.Logf("Pageable internal pages: %d (%.2f MB)", pageableInternal, float64(pageableInternal*pageSize)/(1024*1024)) + + pageableExternal, err := unix.SysctlUint32("vm.page_pageable_external_count") + if err != nil { + t.Fatalf("failed to get vm.page_pageable_external_count: %v", err) + } + t.Logf("Pageable external pages: %d (%.2f MB)", pageableExternal, float64(pageableExternal*pageSize)/(1024*1024)) +} + +// TestMemoryFractionConsistency verifies that multiple calls return consistent values. +func TestMemoryFractionConsistency(t *testing.T) { + const calls = 5 + var fractions [calls]float64 + + for i := 0; i < calls; i++ { + frac, err := getSystemMemoryUsedFraction() + if err != nil { + t.Fatalf("call %d failed: %v", i, err) + } + fractions[i] = frac + } + + // Check that all values are within a reasonable range of each other + // Memory usage shouldn't vary wildly between consecutive calls + for i := 1; i < calls; i++ { + diff := fractions[i] - fractions[i-1] + if diff < -0.2 || diff > 0.2 { + t.Errorf("memory fraction changed too much between calls: %v -> %v (diff: %v)", + fractions[i-1], fractions[i], diff) + } + } +} diff --git a/pkg/spooledtempfile/memory_linux.go b/pkg/spooledtempfile/memory_linux.go new file mode 100644 index 0000000..8ccd101 --- /dev/null +++ b/pkg/spooledtempfile/memory_linux.go @@ -0,0 +1,197 @@ +//go:build linux + +package spooledtempfile + +import ( + "bufio" + "fmt" + "os" + "strconv" + "strings" +) + +// Overridable in tests: +var ( + cgv2UsagePath = "/sys/fs/cgroup/memory.current" + cgv2HighPath = "/sys/fs/cgroup/memory.high" + cgv2MaxPath = "/sys/fs/cgroup/memory.max" + + cgv1UsagePath = "/sys/fs/cgroup/memory/memory.usage_in_bytes" + cgv1LimitPath = "/sys/fs/cgroup/memory/memory.limit_in_bytes" + + procMeminfoPath = "/proc/meminfo" +) + +// getSystemMemoryUsedFraction returns used/limit for the container if +// cgroup limits are set; otherwise falls back to host /proc/meminfo. +var getSystemMemoryUsedFraction = func() (float64, error) { + probes := []func() (float64, bool, error){ + cgroupV2UsedFraction, + cgroupV1UsedFraction, + } + + for _, p := range probes { + if frac, ok, err := p(); err != nil { + return 0, err + } else if ok { + return frac, nil + } + } + + return hostMeminfoUsedFraction() +} + +func cgroupV2UsedFraction() (frac float64, ok bool, err error) { + usage, uok, err := readUint64FileIfExists(cgv2UsagePath) + if err != nil { + return 0, false, err + } + if !uok { + return 0, false, nil // not cgroup v2 (or not accessible) + } + + // Try memory.high first + highStr, hok, err := readStringFileIfExists(cgv2HighPath) + if err != nil { + return 0, false, err + } + + var high, max uint64 + var haveHigh bool + if hok { + hs := strings.TrimSpace(highStr) + if hs != "" && hs != "max" { + if v, e := strconv.ParseUint(hs, 10, 64); e == nil && v > 0 { + high, haveHigh = v, true + } + } + } + + // Always read memory.max as fallback (and for sanity checks) + maxStr, mok, err := readStringFileIfExists(cgv2MaxPath) + if err != nil { + return 0, false, err + } + var haveMax bool + if mok { + ms := strings.TrimSpace(maxStr) + if ms != "" && ms != "max" { + if v, e := strconv.ParseUint(ms, 10, 64); e == nil && v > 0 { + max, haveMax = v, true + } + } + } + + // Choose denominator: prefer valid 'high' unless it is >= max. + switch { + case haveHigh && haveMax && high < max: + return float64(usage) / float64(high), true, nil + case haveMax: + return float64(usage) / float64(max), true, nil + case haveHigh: + return float64(usage) / float64(high), true, nil + default: + return 0, false, nil // no effective limit + } +} + +func cgroupV1UsedFraction() (frac float64, ok bool, err error) { + usage, uok, err := readUint64FileIfExists(cgv1UsagePath) + if err != nil { + return 0, false, err + } + + limit, lok, err := readUint64FileIfExists(cgv1LimitPath) + if err != nil { + return 0, false, err + } + if !uok || !lok || limit == 0 { + return 0, false, nil + } + + // Some kernels report a huge limit (e.g., ~max uint64) to mean "no limit" + if limit > (1 << 60) { // heuristic ~ 1 exabyte + return 0, false, nil + } + + return float64(usage) / float64(limit), true, nil +} + +func hostMeminfoUsedFraction() (float64, error) { + f, err := os.Open(procMeminfoPath) + if err != nil { + return 0, fmt.Errorf("failed to open %s: %v", procMeminfoPath, err) + } + defer f.Close() + + var memTotal, memAvailable, memFree, buffers, cached uint64 + sc := bufio.NewScanner(f) + for sc.Scan() { + line := sc.Text() + fields := strings.Fields(line) + if len(fields) < 2 { + continue + } + key := strings.TrimRight(fields[0], ":") + val, _ := strconv.ParseUint(fields[1], 10, 64) // kB + switch key { + case "MemTotal": + memTotal = val + case "MemAvailable": + memAvailable = val + case "MemFree": + memFree = val + case "Buffers": + buffers = val + case "Cached": + cached = val + } + } + if err := sc.Err(); err != nil { + return 0, fmt.Errorf("scanner error reading /proc/meminfo: %v", err) + } + if memTotal == 0 { + return 0, fmt.Errorf("could not find MemTotal in /proc/meminfo") + } + + var used uint64 + if memAvailable > 0 { + used = memTotal - memAvailable + } else { + approxAvailable := memFree + buffers + cached + used = memTotal - approxAvailable + } + + // meminfo is in kB; unit cancels in the fraction + return float64(used) / float64(memTotal), nil +} + +func readUint64FileIfExists(path string) (val uint64, ok bool, err error) { + data, err := os.ReadFile(path) + if err != nil { + if os.IsNotExist(err) { + return 0, false, nil + } + return 0, false, err + } + + // v2 may use "max"; caller handles that as not-ok + v, perr := strconv.ParseUint(strings.TrimSpace(string(data)), 10, 64) + if perr != nil { + return 0, false, nil + } + + return v, true, nil +} + +func readStringFileIfExists(path string) (string, bool, error) { + data, err := os.ReadFile(path) + if err != nil { + if os.IsNotExist(err) { + return "", false, nil + } + return "", false, err + } + + return string(data), true, nil +} diff --git a/pkg/spooledtempfile/memory_linux_test.go b/pkg/spooledtempfile/memory_linux_test.go new file mode 100644 index 0000000..c0e3efb --- /dev/null +++ b/pkg/spooledtempfile/memory_linux_test.go @@ -0,0 +1,364 @@ +//go:build linux + +package spooledtempfile + +import ( + "os" + "path/filepath" + "strconv" + "strings" + "testing" +) + +// --- helpers --- + +type savedPaths struct { + v2Usage, v2High, v2Max string + v1Usage, v1Limit string + meminfo string +} + +func writeFile(t *testing.T, dir, name, content string) string { + t.Helper() + p := filepath.Join(dir, name) + if err := os.MkdirAll(filepath.Dir(p), 0o755); err != nil { + t.Fatalf("mkdir: %v", err) + } + if err := os.WriteFile(p, []byte(content), 0o644); err != nil { + t.Fatalf("write %s: %v", p, err) + } + return p +} + +func saveAndRedirectPaths(t *testing.T, base string) (restore func()) { + t.Helper() + + old := savedPaths{ + v2Usage: cgv2UsagePath, v2High: cgv2HighPath, v2Max: cgv2MaxPath, + v1Usage: cgv1UsagePath, v1Limit: cgv1LimitPath, + meminfo: procMeminfoPath, + } + + // Point to files under base; tests will create only what they need. + cgv2UsagePath = filepath.Join(base, "sys/fs/cgroup/memory.current") + cgv2HighPath = filepath.Join(base, "sys/fs/cgroup/memory.high") + cgv2MaxPath = filepath.Join(base, "sys/fs/cgroup/memory.max") + cgv1UsagePath = filepath.Join(base, "sys/fs/cgroup/memory/memory.usage_in_bytes") + cgv1LimitPath = filepath.Join(base, "sys/fs/cgroup/memory/memory.limit_in_bytes") + procMeminfoPath = filepath.Join(base, "proc/meminfo") + + return func() { + cgv2UsagePath, cgv2HighPath, cgv2MaxPath = old.v2Usage, old.v2High, old.v2Max + cgv1UsagePath, cgv1LimitPath = old.v1Usage, old.v1Limit + procMeminfoPath = old.meminfo + } +} + +// --- cgroup v2 --- + +func TestCgroupV2_UsesHighWhenStricterThanMax(t *testing.T) { + dir := t.TempDir() + restore := saveAndRedirectPaths(t, dir) + defer restore() + + // usage=400 + writeFile(t, dir, "sys/fs/cgroup/memory.current", "400") + // high=800, max=1000 -> use high, frac = 0.5 + writeFile(t, dir, "sys/fs/cgroup/memory.high", "800") + writeFile(t, dir, "sys/fs/cgroup/memory.max", "1000") + + frac, ok, err := cgroupV2UsedFraction() + if err != nil { + t.Fatalf("err: %v", err) + } + if !ok { + t.Fatalf("expected ok") + } + if got, want := frac, 0.5; got != want { + t.Fatalf("frac=%v want=%v", got, want) + } +} + +func TestCgroupV2_FallbackToMaxWhenHighIsMax(t *testing.T) { + dir := t.TempDir() + restore := saveAndRedirectPaths(t, dir) + defer restore() + + writeFile(t, dir, "sys/fs/cgroup/memory.current", "300") + writeFile(t, dir, "sys/fs/cgroup/memory.high", "max") // unset + writeFile(t, dir, "sys/fs/cgroup/memory.max", "600") // use this + + frac, ok, err := cgroupV2UsedFraction() + if err != nil { + t.Fatalf("err: %v", err) + } + if !ok { + t.Fatalf("expected ok") + } + if got, want := frac, 0.5; got != want { + t.Fatalf("frac=%v want=%v", got, want) + } +} + +func TestCgroupV2_FallbackToMaxWhenHighInvalid(t *testing.T) { + dir := t.TempDir() + restore := saveAndRedirectPaths(t, dir) + defer restore() + + writeFile(t, dir, "sys/fs/cgroup/memory.current", "256") + writeFile(t, dir, "sys/fs/cgroup/memory.high", "not-a-number") + writeFile(t, dir, "sys/fs/cgroup/memory.max", "512") + + frac, ok, err := cgroupV2UsedFraction() + if err != nil { + t.Fatalf("err: %v", err) + } + if !ok { + t.Fatalf("expected ok") + } + if got, want := frac, 0.5; got != want { + t.Fatalf("frac=%v want=%v", got, want) + } +} + +func TestCgroupV2_UseMaxWhenHighGTE_Max(t *testing.T) { + dir := t.TempDir() + restore := saveAndRedirectPaths(t, dir) + defer restore() + + writeFile(t, dir, "sys/fs/cgroup/memory.current", "900") + writeFile(t, dir, "sys/fs/cgroup/memory.high", "1000") // >= max + writeFile(t, dir, "sys/fs/cgroup/memory.max", "1000") + + frac, ok, err := cgroupV2UsedFraction() + if err != nil { + t.Fatalf("err: %v", err) + } + if !ok { + t.Fatalf("expected ok") + } + if got, want := frac, 0.9; got != want { + t.Fatalf("frac=%v want=%v", got, want) + } +} + +func TestCgroupV2_OnlyHighSet(t *testing.T) { + dir := t.TempDir() + restore := saveAndRedirectPaths(t, dir) + defer restore() + + writeFile(t, dir, "sys/fs/cgroup/memory.current", "50") + writeFile(t, dir, "sys/fs/cgroup/memory.high", "100") + // no max file + + frac, ok, err := cgroupV2UsedFraction() + if err != nil { + t.Fatalf("err: %v", err) + } + if !ok { + t.Fatalf("expected ok") + } + if got, want := frac, 0.5; got != want { + t.Fatalf("frac=%v want=%v", got, want) + } +} + +func TestCgroupV2_NoLimitsOrUsageFile(t *testing.T) { + dir := t.TempDir() + restore := saveAndRedirectPaths(t, dir) + defer restore() + + // usage missing -> ok=false (not cgroup v2 / not accessible) + _, ok, err := cgroupV2UsedFraction() + if err != nil { + t.Fatalf("err: %v", err) + } + if ok { + t.Fatalf("expected ok=false when usage missing") + } + + // Now create usage but high=max and no max => no effective limit + writeFile(t, dir, "sys/fs/cgroup/memory.current", "123") + writeFile(t, dir, "sys/fs/cgroup/memory.high", "max") + frac, ok, err := cgroupV2UsedFraction() + if err != nil { + t.Fatalf("err: %v", err) + } + if ok { + t.Fatalf("expected ok=false with no effective limit, got ok and frac=%v", frac) + } +} + +// --- cgroup v1 --- + +func TestCgroupV1_NormalFraction(t *testing.T) { + dir := t.TempDir() + restore := saveAndRedirectPaths(t, dir) + defer restore() + + writeFile(t, dir, "sys/fs/cgroup/memory/memory.usage_in_bytes", "200") + writeFile(t, dir, "sys/fs/cgroup/memory/memory.limit_in_bytes", "400") + + frac, ok, err := cgroupV1UsedFraction() + if err != nil { + t.Fatalf("err: %v", err) + } + if !ok { + t.Fatalf("expected ok") + } + if got, want := frac, 0.5; got != want { + t.Fatalf("frac=%v want=%v", got, want) + } +} + +func TestCgroupV1_HugeLimitMeansNoLimit(t *testing.T) { + dir := t.TempDir() + restore := saveAndRedirectPaths(t, dir) + defer restore() + + writeFile(t, dir, "sys/fs/cgroup/memory/memory.usage_in_bytes", "42") + // > 1<<60 + writeFile(t, dir, "sys/fs/cgroup/memory/memory.limit_in_bytes", strconv.FormatUint((1<<60)+1, 10)) + + _, ok, err := cgroupV1UsedFraction() + if err != nil { + t.Fatalf("err: %v", err) + } + if ok { + t.Fatalf("expected ok=false for huge limit (no limit)") + } +} + +func TestCgroupV1_MissingFilesOrZeroLimit(t *testing.T) { + dir := t.TempDir() + restore := saveAndRedirectPaths(t, dir) + defer restore() + + // Only usage present -> not ok + writeFile(t, dir, "sys/fs/cgroup/memory/memory.usage_in_bytes", "7") + _, ok, err := cgroupV1UsedFraction() + if err != nil { + t.Fatalf("err: %v", err) + } + if ok { + t.Fatalf("expected ok=false when limit missing") + } + + // Now add zero limit -> not ok + writeFile(t, dir, "sys/fs/cgroup/memory/memory.limit_in_bytes", "0") + _, ok, err = cgroupV1UsedFraction() + if err != nil { + t.Fatalf("err: %v", err) + } + if ok { + t.Fatalf("expected ok=false for zero limit") + } +} + +// --- /proc/meminfo fallback --- + +func TestHostMeminfo_UsesMemAvailableWhenPresent(t *testing.T) { + dir := t.TempDir() + restore := saveAndRedirectPaths(t, dir) + defer restore() + + // MemTotal and MemAvailable are in kB + meminfo := strings.Join([]string{ + "MemTotal: 1000000 kB", + "MemAvailable: 250000 kB", + "Buffers: 10000 kB", + "Cached: 20000 kB", + "MemFree: 50000 kB", + }, "\n") + writeFile(t, dir, "proc/meminfo", meminfo) + + frac, err := hostMeminfoUsedFraction() + if err != nil { + t.Fatalf("err: %v", err) + } + + // used = total - available = 1000000 - 250000 = 750000 => 0.75 + if got, want := frac, 0.75; got != want { + t.Fatalf("frac=%v want=%v", got, want) + } +} + +func TestHostMeminfo_FallbackWithoutMemAvailable(t *testing.T) { + dir := t.TempDir() + restore := saveAndRedirectPaths(t, dir) + defer restore() + + meminfo := strings.Join([]string{ + "MemTotal: 1000000 kB", + "MemFree: 100000 kB", + "Buffers: 50000 kB", + "Cached: 150000 kB", + }, "\n") + writeFile(t, dir, "proc/meminfo", meminfo) + + frac, err := hostMeminfoUsedFraction() + if err != nil { + t.Fatalf("err: %v", err) + } + + // approxAvailable = 100000 + 50000 + 150000 = 300000 + // used = 1000000 - 300000 = 700000 => 0.7 + if got, want := frac, 0.7; got != want { + t.Fatalf("frac=%v want=%v", got, want) + } +} + +func TestHostMeminfo_Errors(t *testing.T) { + dir := t.TempDir() + restore := saveAndRedirectPaths(t, dir) + defer restore() + + // Missing file + if _, err := hostMeminfoUsedFraction(); err == nil { + t.Fatalf("expected error when /proc/meminfo missing") + } + + // Present but missing MemTotal + writeFile(t, dir, "proc/meminfo", "MemFree: 1 kB\n") + if _, err := hostMeminfoUsedFraction(); err == nil { + t.Fatalf("expected error when MemTotal missing") + } +} + +// --- read helpers --- + +func TestReadUint64FileIfExists(t *testing.T) { + dir := t.TempDir() + + // Missing -> ok=false, err=nil + if _, ok, err := readUint64FileIfExists(filepath.Join(dir, "nope")); err != nil || ok { + t.Fatalf("expected ok=false, err=nil for missing file; got ok=%v err=%v", ok, err) + } + + // Present & valid + p := writeFile(t, dir, "n.txt", "123\n") + v, ok, err := readUint64FileIfExists(p) + if err != nil || !ok || v != 123 { + t.Fatalf("got v=%d ok=%v err=%v; want 123,true,nil", v, ok, err) + } + + // Present & invalid -> ok=false, err=nil + p = writeFile(t, dir, "bad.txt", "not-a-number") + if _, ok, err := readUint64FileIfExists(p); err != nil || ok { + t.Fatalf("expected ok=false, err=nil for invalid number; got ok=%v err=%v", ok, err) + } +} + +func TestReadStringFileIfExists(t *testing.T) { + dir := t.TempDir() + + if _, ok, err := readStringFileIfExists(filepath.Join(dir, "nope")); err != nil || ok { + t.Fatalf("expected ok=false, err=nil for missing file; got ok=%v err=%v", ok, err) + } + + p := writeFile(t, dir, "s.txt", " hello \n") + s, ok, err := readStringFileIfExists(p) + if err != nil || !ok || strings.TrimSpace(s) != "hello" { + t.Fatalf("got s=%q ok=%v err=%v; want 'hello',true,nil", s, ok, err) + } +} diff --git a/pkg/spooledtempfile/memory_test.go b/pkg/spooledtempfile/memory_test.go index 63a7c45..68a406d 100644 --- a/pkg/spooledtempfile/memory_test.go +++ b/pkg/spooledtempfile/memory_test.go @@ -1,362 +1,83 @@ package spooledtempfile import ( - "os" - "path/filepath" - "strconv" - "strings" "testing" + "time" ) -// --- helpers --- +// TestGetCachedMemoryUsage verifies that the caching mechanism works correctly. +func TestGetCachedMemoryUsage(t *testing.T) { + // Save original function + originalFn := getSystemMemoryUsedFraction -type savedPaths struct { - v2Usage, v2High, v2Max string - v1Usage, v1Limit string - meminfo string -} - -func writeFile(t *testing.T, dir, name, content string) string { - t.Helper() - p := filepath.Join(dir, name) - if err := os.MkdirAll(filepath.Dir(p), 0o755); err != nil { - t.Fatalf("mkdir: %v", err) - } - if err := os.WriteFile(p, []byte(content), 0o644); err != nil { - t.Fatalf("write %s: %v", p, err) - } - return p -} - -func saveAndRedirectPaths(t *testing.T, base string) (restore func()) { - t.Helper() - - old := savedPaths{ - v2Usage: cgv2UsagePath, v2High: cgv2HighPath, v2Max: cgv2MaxPath, - v1Usage: cgv1UsagePath, v1Limit: cgv1LimitPath, - meminfo: procMeminfoPath, - } - - // Point to files under base; tests will create only what they need. - cgv2UsagePath = filepath.Join(base, "sys/fs/cgroup/memory.current") - cgv2HighPath = filepath.Join(base, "sys/fs/cgroup/memory.high") - cgv2MaxPath = filepath.Join(base, "sys/fs/cgroup/memory.max") - cgv1UsagePath = filepath.Join(base, "sys/fs/cgroup/memory/memory.usage_in_bytes") - cgv1LimitPath = filepath.Join(base, "sys/fs/cgroup/memory/memory.limit_in_bytes") - procMeminfoPath = filepath.Join(base, "proc/meminfo") - - return func() { - cgv2UsagePath, cgv2HighPath, cgv2MaxPath = old.v2Usage, old.v2High, old.v2Max - cgv1UsagePath, cgv1LimitPath = old.v1Usage, old.v1Limit - procMeminfoPath = old.meminfo - } -} - -// --- cgroup v2 --- - -func TestCgroupV2_UsesHighWhenStricterThanMax(t *testing.T) { - dir := t.TempDir() - restore := saveAndRedirectPaths(t, dir) - defer restore() - - // usage=400 - writeFile(t, dir, "sys/fs/cgroup/memory.current", "400") - // high=800, max=1000 -> use high, frac = 0.5 - writeFile(t, dir, "sys/fs/cgroup/memory.high", "800") - writeFile(t, dir, "sys/fs/cgroup/memory.max", "1000") - - frac, ok, err := cgroupV2UsedFraction() - if err != nil { - t.Fatalf("err: %v", err) - } - if !ok { - t.Fatalf("expected ok") - } - if got, want := frac, 0.5; got != want { - t.Fatalf("frac=%v want=%v", got, want) - } -} - -func TestCgroupV2_FallbackToMaxWhenHighIsMax(t *testing.T) { - dir := t.TempDir() - restore := saveAndRedirectPaths(t, dir) - defer restore() - - writeFile(t, dir, "sys/fs/cgroup/memory.current", "300") - writeFile(t, dir, "sys/fs/cgroup/memory.high", "max") // unset - writeFile(t, dir, "sys/fs/cgroup/memory.max", "600") // use this - - frac, ok, err := cgroupV2UsedFraction() - if err != nil { - t.Fatalf("err: %v", err) + // Track how many times the function is called + callCount := 0 + getSystemMemoryUsedFraction = func() (float64, error) { + callCount++ + return 0.5, nil } - if !ok { - t.Fatalf("expected ok") - } - if got, want := frac, 0.5; got != want { - t.Fatalf("frac=%v want=%v", got, want) - } -} - -func TestCgroupV2_FallbackToMaxWhenHighInvalid(t *testing.T) { - dir := t.TempDir() - restore := saveAndRedirectPaths(t, dir) - defer restore() - - writeFile(t, dir, "sys/fs/cgroup/memory.current", "256") - writeFile(t, dir, "sys/fs/cgroup/memory.high", "not-a-number") - writeFile(t, dir, "sys/fs/cgroup/memory.max", "512") - - frac, ok, err := cgroupV2UsedFraction() - if err != nil { - t.Fatalf("err: %v", err) - } - if !ok { - t.Fatalf("expected ok") - } - if got, want := frac, 0.5; got != want { - t.Fatalf("frac=%v want=%v", got, want) - } -} - -func TestCgroupV2_UseMaxWhenHighGTE_Max(t *testing.T) { - dir := t.TempDir() - restore := saveAndRedirectPaths(t, dir) - defer restore() - - writeFile(t, dir, "sys/fs/cgroup/memory.current", "900") - writeFile(t, dir, "sys/fs/cgroup/memory.high", "1000") // >= max - writeFile(t, dir, "sys/fs/cgroup/memory.max", "1000") - frac, ok, err := cgroupV2UsedFraction() - if err != nil { - t.Fatalf("err: %v", err) - } - if !ok { - t.Fatalf("expected ok") - } - if got, want := frac, 0.9; got != want { - t.Fatalf("frac=%v want=%v", got, want) - } -} - -func TestCgroupV2_OnlyHighSet(t *testing.T) { - dir := t.TempDir() - restore := saveAndRedirectPaths(t, dir) - defer restore() - - writeFile(t, dir, "sys/fs/cgroup/memory.current", "50") - writeFile(t, dir, "sys/fs/cgroup/memory.high", "100") - // no max file - - frac, ok, err := cgroupV2UsedFraction() - if err != nil { - t.Fatalf("err: %v", err) - } - if !ok { - t.Fatalf("expected ok") - } - if got, want := frac, 0.5; got != want { - t.Fatalf("frac=%v want=%v", got, want) - } -} + // Restore at the end + defer func() { + getSystemMemoryUsedFraction = originalFn + }() -func TestCgroupV2_NoLimitsOrUsageFile(t *testing.T) { - dir := t.TempDir() - restore := saveAndRedirectPaths(t, dir) - defer restore() + // Reset cache + ResetMemoryCache() - // usage missing -> ok=false (not cgroup v2 / not accessible) - _, ok, err := cgroupV2UsedFraction() + // First call should invoke the function + frac1, err := getCachedMemoryUsage() if err != nil { - t.Fatalf("err: %v", err) - } - if ok { - t.Fatalf("expected ok=false when usage missing") + t.Fatalf("first call failed: %v", err) } - - // Now create usage but high=max and no max => no effective limit - writeFile(t, dir, "sys/fs/cgroup/memory.current", "123") - writeFile(t, dir, "sys/fs/cgroup/memory.high", "max") - frac, ok, err := cgroupV2UsedFraction() - if err != nil { - t.Fatalf("err: %v", err) + if frac1 != 0.5 { + t.Fatalf("expected 0.5, got %v", frac1) } - if ok { - t.Fatalf("expected ok=false with no effective limit, got ok and frac=%v", frac) + if callCount != 1 { + t.Fatalf("expected 1 call, got %d", callCount) } -} - -// --- cgroup v1 --- - -func TestCgroupV1_NormalFraction(t *testing.T) { - dir := t.TempDir() - restore := saveAndRedirectPaths(t, dir) - defer restore() - writeFile(t, dir, "sys/fs/cgroup/memory/memory.usage_in_bytes", "200") - writeFile(t, dir, "sys/fs/cgroup/memory/memory.limit_in_bytes", "400") - - frac, ok, err := cgroupV1UsedFraction() + // Second immediate call should use cache + frac2, err := getCachedMemoryUsage() if err != nil { - t.Fatalf("err: %v", err) + t.Fatalf("second call failed: %v", err) } - if !ok { - t.Fatalf("expected ok") + if frac2 != 0.5 { + t.Fatalf("expected 0.5, got %v", frac2) } - if got, want := frac, 0.5; got != want { - t.Fatalf("frac=%v want=%v", got, want) + if callCount != 1 { + t.Fatalf("expected still 1 call (cached), got %d", callCount) } -} - -func TestCgroupV1_HugeLimitMeansNoLimit(t *testing.T) { - dir := t.TempDir() - restore := saveAndRedirectPaths(t, dir) - defer restore() - writeFile(t, dir, "sys/fs/cgroup/memory/memory.usage_in_bytes", "42") - // > 1<<60 - writeFile(t, dir, "sys/fs/cgroup/memory/memory.limit_in_bytes", strconv.FormatUint((1<<60)+1, 10)) + // Simulate cache expiration + memoryUsageCache.Lock() + memoryUsageCache.lastChecked = time.Now().Add(-memoryCheckInterval - time.Millisecond) + memoryUsageCache.Unlock() - _, ok, err := cgroupV1UsedFraction() + // Next call should invoke the function again + frac3, err := getCachedMemoryUsage() if err != nil { - t.Fatalf("err: %v", err) + t.Fatalf("third call failed: %v", err) } - if ok { - t.Fatalf("expected ok=false for huge limit (no limit)") - } -} - -func TestCgroupV1_MissingFilesOrZeroLimit(t *testing.T) { - dir := t.TempDir() - restore := saveAndRedirectPaths(t, dir) - defer restore() - - // Only usage present -> not ok - writeFile(t, dir, "sys/fs/cgroup/memory/memory.usage_in_bytes", "7") - _, ok, err := cgroupV1UsedFraction() - if err != nil { - t.Fatalf("err: %v", err) - } - if ok { - t.Fatalf("expected ok=false when limit missing") - } - - // Now add zero limit -> not ok - writeFile(t, dir, "sys/fs/cgroup/memory/memory.limit_in_bytes", "0") - _, ok, err = cgroupV1UsedFraction() - if err != nil { - t.Fatalf("err: %v", err) - } - if ok { - t.Fatalf("expected ok=false for zero limit") - } -} - -// --- /proc/meminfo fallback --- - -func TestHostMeminfo_UsesMemAvailableWhenPresent(t *testing.T) { - dir := t.TempDir() - restore := saveAndRedirectPaths(t, dir) - defer restore() - - // MemTotal and MemAvailable are in kB - meminfo := strings.Join([]string{ - "MemTotal: 1000000 kB", - "MemAvailable: 250000 kB", - "Buffers: 10000 kB", - "Cached: 20000 kB", - "MemFree: 50000 kB", - }, "\n") - writeFile(t, dir, "proc/meminfo", meminfo) - - frac, err := hostMeminfoUsedFraction() - if err != nil { - t.Fatalf("err: %v", err) + if frac3 != 0.5 { + t.Fatalf("expected 0.5, got %v", frac3) } - - // used = total - available = 1000000 - 250000 = 750000 => 0.75 - if got, want := frac, 0.75; got != want { - t.Fatalf("frac=%v want=%v", got, want) + if callCount != 2 { + t.Fatalf("expected 2 calls (cache expired), got %d", callCount) } } -func TestHostMeminfo_FallbackWithoutMemAvailable(t *testing.T) { - dir := t.TempDir() - restore := saveAndRedirectPaths(t, dir) - defer restore() - - meminfo := strings.Join([]string{ - "MemTotal: 1000000 kB", - "MemFree: 100000 kB", - "Buffers: 50000 kB", - "Cached: 150000 kB", - }, "\n") - writeFile(t, dir, "proc/meminfo", meminfo) - - frac, err := hostMeminfoUsedFraction() +// TestGetSystemMemoryUsedFraction_Integration verifies that the actual implementation +// returns a valid value on the current platform. +func TestGetSystemMemoryUsedFraction_Integration(t *testing.T) { + fraction, err := getSystemMemoryUsedFraction() if err != nil { - t.Fatalf("err: %v", err) + t.Fatalf("getSystemMemoryUsedFraction() failed: %v", err) } - // approxAvailable = 100000 + 50000 + 150000 = 300000 - // used = 1000000 - 300000 = 700000 => 0.7 - if got, want := frac, 0.7; got != want { - t.Fatalf("frac=%v want=%v", got, want) + if fraction < 0 || fraction > 1 { + t.Fatalf("memory fraction out of range: got %v, want 0.0-1.0", fraction) } -} - -func TestHostMeminfo_Errors(t *testing.T) { - dir := t.TempDir() - restore := saveAndRedirectPaths(t, dir) - defer restore() - - // Missing file - if _, err := hostMeminfoUsedFraction(); err == nil { - t.Fatalf("expected error when /proc/meminfo missing") - } - - // Present but missing MemTotal - writeFile(t, dir, "proc/meminfo", "MemFree: 1 kB\n") - if _, err := hostMeminfoUsedFraction(); err == nil { - t.Fatalf("expected error when MemTotal missing") - } -} - -// --- read helpers --- -func TestReadUint64FileIfExists(t *testing.T) { - dir := t.TempDir() - - // Missing -> ok=false, err=nil - if _, ok, err := readUint64FileIfExists(filepath.Join(dir, "nope")); err != nil || ok { - t.Fatalf("expected ok=false, err=nil for missing file; got ok=%v err=%v", ok, err) - } - - // Present & valid - p := writeFile(t, dir, "n.txt", "123\n") - v, ok, err := readUint64FileIfExists(p) - if err != nil || !ok || v != 123 { - t.Fatalf("got v=%d ok=%v err=%v; want 123,true,nil", v, ok, err) - } - - // Present & invalid -> ok=false, err=nil - p = writeFile(t, dir, "bad.txt", "not-a-number") - if _, ok, err := readUint64FileIfExists(p); err != nil || ok { - t.Fatalf("expected ok=false, err=nil for invalid number; got ok=%v err=%v", ok, err) - } -} - -func TestReadStringFileIfExists(t *testing.T) { - dir := t.TempDir() - - if _, ok, err := readStringFileIfExists(filepath.Join(dir, "nope")); err != nil || ok { - t.Fatalf("expected ok=false, err=nil for missing file; got ok=%v err=%v", ok, err) - } - - p := writeFile(t, dir, "s.txt", " hello \n") - s, ok, err := readStringFileIfExists(p) - if err != nil || !ok || strings.TrimSpace(s) != "hello" { - t.Fatalf("got s=%q ok=%v err=%v; want 'hello',true,nil", s, ok, err) - } + t.Logf("Current system memory usage: %.2f%%", fraction*100) } diff --git a/pkg/spooledtempfile/spooled.go b/pkg/spooledtempfile/spooled.go index f351b29..bf917c6 100644 --- a/pkg/spooledtempfile/spooled.go +++ b/pkg/spooledtempfile/spooled.go @@ -7,8 +7,6 @@ import ( "io" "log" "os" - "sync" - "time" "github.com/valyala/bytebufferpool" ) @@ -20,18 +18,6 @@ const ( MaxInMemorySize = 1024 * 1024 // DefaultMaxRAMUsageFraction is the default fraction of system RAM above which we'll force spooling to disk DefaultMaxRAMUsageFraction = 0.50 - // memoryCheckInterval defines how often we check system memory usage. - memoryCheckInterval = 500 * time.Millisecond -) - -type globalMemoryCache struct { - sync.Mutex - lastChecked time.Time - lastFraction float64 -} - -var ( - memoryUsageCache = &globalMemoryCache{} ) // ReaderAt is the interface for ReadAt - read at position, without moving pointer. @@ -269,22 +255,3 @@ func (s *spooledTempFile) isSystemMemoryUsageHigh() bool { } return usedFraction >= s.maxRAMUsageFraction } - -func getCachedMemoryUsage() (float64, error) { - memoryUsageCache.Lock() - defer memoryUsageCache.Unlock() - - if time.Since(memoryUsageCache.lastChecked) < memoryCheckInterval { - return memoryUsageCache.lastFraction, nil - } - - fraction, err := getSystemMemoryUsedFraction() - if err != nil { - return 0, err - } - - memoryUsageCache.lastChecked = time.Now() - memoryUsageCache.lastFraction = fraction - - return fraction, nil -} diff --git a/pkg/spooledtempfile/spooled_test.go b/pkg/spooledtempfile/spooled_test.go index cd1aefb..81d8b31 100644 --- a/pkg/spooledtempfile/spooled_test.go +++ b/pkg/spooledtempfile/spooled_test.go @@ -11,12 +11,37 @@ import ( "testing" ) +// mockMemoryUsage mocks system memory to the specified fraction for the duration of the test. +// It uses t.Cleanup to automatically restore the original function and cache state. +// fraction should be between 0.0 (0% used) and 1.0 (100% used). +func mockMemoryUsage(t *testing.T, fraction float64) { + t.Helper() + + // Save original function + originalFn := getSystemMemoryUsedFraction + + // Reset cache and mock function + ResetMemoryCache() + getSystemMemoryUsedFraction = func() (float64, error) { + return fraction, nil + } + + // Auto-restore on test completion + t.Cleanup(func() { + getSystemMemoryUsedFraction = originalFn + // Ensure global cache is clean to prevent state pollution to other test packages + ResetMemoryCache() + }) +} + func generateTestDataInKB(size int) []byte { return bytes.Repeat([]byte("A"), size*1024) } // TestInMemoryBasic writes data below threshold and verifies it remains in memory. func TestInMemoryBasic(t *testing.T) { + mockMemoryUsage(t, 0.30) // Mock low memory usage (30%) + spool := NewSpooledTempFile("test", os.TempDir(), 100, false, -1) defer spool.Close() @@ -66,6 +91,8 @@ func TestInMemoryBasic(t *testing.T) { // TestThresholdCrossing writes enough data to switch from in-memory to disk. func TestThresholdCrossing(t *testing.T) { + mockMemoryUsage(t, 0.30) // Mock low memory usage (30%) + spool := NewSpooledTempFile("test", os.TempDir(), 64*1024, false, -1) defer spool.Close() @@ -114,6 +141,7 @@ func TestThresholdCrossing(t *testing.T) { } // TestForceOnDisk checks the fullOnDisk parameter. +// Note: This test doesn't mock memory because fullOnDisk=true forces disk behavior regardless. func TestForceOnDisk(t *testing.T) { spool := NewSpooledTempFile("test", os.TempDir(), 64*1024, true, -1) defer spool.Close() @@ -142,6 +170,8 @@ func TestForceOnDisk(t *testing.T) { // TestReadAtAndSeekInMemory tests seeking and ReadAt on an in-memory spool. func TestReadAtAndSeekInMemory(t *testing.T) { + mockMemoryUsage(t, 0.30) // Mock low memory usage (30%) + spool := NewSpooledTempFile("test", os.TempDir(), 64*1024, false, -1) defer spool.Close() @@ -185,6 +215,7 @@ func TestReadAtAndSeekInMemory(t *testing.T) { } // TestReadAtAndSeekOnDisk tests seeking and ReadAt on a spool that has switched to disk. +// Note: This test doesn't mock memory because it writes 65KB to intentionally cross the 64KB threshold. func TestReadAtAndSeekOnDisk(t *testing.T) { spool := NewSpooledTempFile("test", os.TempDir(), 64*1024, false, -1) defer spool.Close() @@ -255,6 +286,8 @@ func TestWriteAfterReadPanic(t *testing.T) { // TestCloseInMemory checks closing while still in-memory. func TestCloseInMemory(t *testing.T) { + mockMemoryUsage(t, 0.30) // Mock low memory usage (30%) + spool := NewSpooledTempFile("test", os.TempDir(), 64*1024, false, -1) _, err := spool.Write([]byte("Small data")) @@ -284,6 +317,7 @@ func TestCloseInMemory(t *testing.T) { } // TestCloseOnDisk checks closing after spool has switched to disk. +// Note: This test doesn't mock memory because it writes 65KB to intentionally cross the threshold. func TestCloseOnDisk(t *testing.T) { spool := NewSpooledTempFile("test", os.TempDir(), 64*1024, false, -1) @@ -327,6 +361,8 @@ func TestCloseOnDisk(t *testing.T) { // TestLen verifies Len() for both in-memory and on-disk states. func TestLen(t *testing.T) { + mockMemoryUsage(t, 0.30) // Mock low memory usage (30%) + spool := NewSpooledTempFile("test", os.TempDir(), 64*1024, false, -1) defer spool.Close() @@ -351,6 +387,8 @@ func TestLen(t *testing.T) { // TestFileName checks correctness of FileName in both modes. func TestFileName(t *testing.T) { + mockMemoryUsage(t, 0.30) // Mock low memory usage (30%) + spool := NewSpooledTempFile("testprefix", os.TempDir(), 64*1024, false, -1) defer spool.Close() @@ -383,17 +421,7 @@ func TestFileName(t *testing.T) { // TestSkipInMemoryAboveRAMUsage verifies that if `isSystemMemoryUsageHigh()` // returns true, the spool goes directly to disk even for small writes. func TestSkipInMemoryAboveRAMUsage(t *testing.T) { - memoryUsageCache = &globalMemoryCache{} - // Save the old function so we can restore it later - oldGetSystemMemoryUsedFraction := getSystemMemoryUsedFraction - // Force system memory usage to appear above 50% - getSystemMemoryUsedFraction = func() (float64, error) { - return 0.60, nil // 60% used => above the 50% threshold - } - // Restore after test - defer func() { - getSystemMemoryUsedFraction = oldGetSystemMemoryUsedFraction - }() + mockMemoryUsage(t, 0.60) // Mock memory usage at 60% (above 50% threshold) // Even though threshold is large (e.g. 1MB), because our mock usage is 60%, // spool should skip memory and go straight to disk. @@ -427,9 +455,67 @@ func TestSkipInMemoryAboveRAMUsage(t *testing.T) { } } +// TestMemoryThresholdBelowLimit verifies behavior when memory is just below threshold (49%). +func TestMemoryThresholdBelowLimit(t *testing.T) { + mockMemoryUsage(t, 0.49) // Mock memory at 49% (below 50% threshold) + + spool := NewSpooledTempFile("test", os.TempDir(), 1024*1024, false, 0.50) + defer spool.Close() + + data := []byte("Should stay in memory") + _, err := spool.Write(data) + if err != nil { + t.Fatalf("Write error: %v", err) + } + + // Should stay in memory since 49% < 50% + if spool.FileName() != "" { + t.Errorf("Expected spool to stay in memory (49%% < 50%%), but got file: %s", spool.FileName()) + } +} + +// TestMemoryThresholdAtLimit verifies behavior when memory is exactly at threshold (50%). +func TestMemoryThresholdAtLimit(t *testing.T) { + mockMemoryUsage(t, 0.50) // Mock memory at exactly 50% (at threshold) + + spool := NewSpooledTempFile("test", os.TempDir(), 1024*1024, false, 0.50) + defer spool.Close() + + data := []byte("Should go to disk") + _, err := spool.Write(data) + if err != nil { + t.Fatalf("Write error: %v", err) + } + + // Should go to disk since 50% >= 50% + if spool.FileName() == "" { + t.Error("Expected spool to go to disk (50%% >= 50%%), but stayed in memory") + } +} + +// TestMemoryThresholdAboveLimit verifies behavior when memory is above threshold (51%). +func TestMemoryThresholdAboveLimit(t *testing.T) { + mockMemoryUsage(t, 0.51) // Mock memory at 51% (above 50% threshold) + + spool := NewSpooledTempFile("test", os.TempDir(), 1024*1024, false, 0.50) + defer spool.Close() + + data := []byte("Should go to disk") + _, err := spool.Write(data) + if err != nil { + t.Fatalf("Write error: %v", err) + } + + // Should go to disk since 51% > 50% + if spool.FileName() == "" { + t.Error("Expected spool to go to disk (51%% > 50%%), but stayed in memory") + } +} + // TestBufferGrowthWithinLimits verifies that the buffer grows dynamically but never exceeds MaxInMemorySize. func TestBufferGrowthWithinLimits(t *testing.T) { - memoryUsageCache = &globalMemoryCache{} + mockMemoryUsage(t, 0.30) // Mock low memory usage (30%) + spool := NewSpooledTempFile("test", os.TempDir(), 128*1024, false, -1) defer spool.Close() @@ -473,6 +559,8 @@ func TestBufferGrowthWithinLimits(t *testing.T) { // TestPoolBehavior verifies that buffers exceeding InitialBufferSize are not returned to the pool. func TestPoolBehavior(t *testing.T) { + mockMemoryUsage(t, 0.30) // Mock low memory to ensure in-memory pooling behavior is tested + spool := NewSpooledTempFile("test", os.TempDir(), 150*1024, false, -1) defer spool.Close() @@ -503,6 +591,8 @@ func TestPoolBehavior(t *testing.T) { } } +// TestBufferGrowthBeyondNewCap verifies buffer behavior when growth exceeds threshold. +// Note: This test doesn't mock memory because it writes 101KB to intentionally exceed the 100KB threshold. func TestBufferGrowthBeyondNewCap(t *testing.T) { spool := NewSpooledTempFile("test", os.TempDir(), 100*1024, false, -1) defer spool.Close() @@ -546,6 +636,8 @@ func TestBufferGrowthBeyondNewCap(t *testing.T) { } } +// TestSpoolingWhenIOCopy verifies spooling behavior with io.Copy for large data. +// Note: This test doesn't mock memory because it writes 500KB to intentionally trigger disk spooling. func TestSpoolingWhenIOCopy(t *testing.T) { spool := NewSpooledTempFile("test", os.TempDir(), 100*1024, false, -1) defer spool.Close() diff --git a/read.go b/read.go index 478ae9c..6b7b2e6 100644 --- a/read.go +++ b/read.go @@ -262,6 +262,10 @@ func (r *Reader) ReadRecord(opts ...ReadOpts) (*Record, error) { } return nil, fmt.Errorf("reading WARC version: %w", err) } + // Copy the WARC version before parsing headers since readUntilDelim reuses + // its backing buffer via a sync.Pool. Without copying, subsequent calls may + // overwrite the data referenced by warcVer, resulting in corrupted versions. + warcVersion := string(warcVer) header := NewHeader() for { @@ -330,7 +334,7 @@ func (r *Reader) ReadRecord(opts ...ReadOpts) (*Record, error) { record := &Record{ Header: header, Content: buf, - Version: string(warcVer), + Version: warcVersion, Offset: offset, Size: size, } diff --git a/smoke_test.go b/smoke_test.go new file mode 100644 index 0000000..6411001 --- /dev/null +++ b/smoke_test.go @@ -0,0 +1,154 @@ +package warc + +import ( + "io" + "os" + "strconv" + "testing" +) + +// TestSmokeWARCFormatRegression validates that the WARC format remains consistent +// by checking a frozen reference file (testdata/test.warc.gz) against known-good values. +// +// This test serves as a regression detector for WARC format changes, complementing the +// dynamic tests in client_test.go. It addresses the concern that byte-level format +// changes should be explicitly validated against a known-good snapshot. +// +// If this test fails, it indicates that either: +// 1. The WARC writing logic has changed in a way that affects the format +// 2. The reference file has been modified +// 3. There's a bug in the record serialization +func TestSmokeWARCFormatRegression(t *testing.T) { + const testFile = "testdata/warcs/test.warc.gz" + + // Expected file-level metrics + const expectedFileSize = 22350 // bytes (compressed) + const expectedTotalRecords = 3 + const expectedTotalContentLength = 22083 // sum of all Content-Length values + + // Expected record-level metrics + // These values were extracted from a known-good WARC file and serve as + // a snapshot of correct format behavior. + expectedRecords := []struct { + warcType string + contentLength int64 + blockDigest string + payloadDigest string // only for response records + targetURI string // only for response records + }{ + { + warcType: "warcinfo", + contentLength: 143, + blockDigest: "sha1:IYWIATZSPEOF7U5W7VGGJOSQTIWUDXQ6", + }, + { + warcType: "request", + contentLength: 110, + blockDigest: "sha1:JNDMG56JVTVVOQSDQRD25XWTGMRQAQDB", + }, + { + warcType: "response", + contentLength: 21830, + blockDigest: "sha1:LCKC4TTRSBWYHGYT5P22ON4DWY65WHDZ", + targetURI: "https://apis.google.com/js/platform.js", + }, + } + + // Validate file size + stat, err := os.Stat(testFile) + if err != nil { + t.Fatalf("failed to stat test file: %v", err) + } + if stat.Size() != expectedFileSize { + t.Errorf("file size mismatch: expected %d bytes, got %d bytes", expectedFileSize, stat.Size()) + } + + // Open and read WARC file + file, err := os.Open(testFile) + if err != nil { + t.Fatalf("failed to open test file: %v", err) + } + defer file.Close() + + reader, err := NewReader(file) + if err != nil { + t.Fatalf("failed to create WARC reader: %v", err) + } + + var recordCount int + var totalContentLength int64 + + // Read and validate each record + for recordCount < expectedTotalRecords { + record, err := reader.ReadRecord() + if err != nil { + if err == io.EOF { + break + } + t.Fatalf("failed to read record %d: %v", recordCount+1, err) + } + if record == nil { + break + } + + expected := expectedRecords[recordCount] + + // Validate WARC-Type + warcType := record.Header.Get("WARC-Type") + if warcType != expected.warcType { + t.Errorf("record %d: WARC-Type mismatch: expected %q, got %q", + recordCount+1, expected.warcType, warcType) + } + + // Validate Content-Length + contentLengthStr := record.Header.Get("Content-Length") + contentLength, err := strconv.ParseInt(contentLengthStr, 10, 64) + if err != nil { + t.Errorf("record %d: failed to parse Content-Length %q: %v", + recordCount+1, contentLengthStr, err) + } else { + if contentLength != expected.contentLength { + t.Errorf("record %d: Content-Length mismatch: expected %d, got %d", + recordCount+1, expected.contentLength, contentLength) + } + totalContentLength += contentLength + } + + // Validate WARC-Block-Digest + blockDigest := record.Header.Get("WARC-Block-Digest") + if blockDigest != expected.blockDigest { + t.Errorf("record %d: WARC-Block-Digest mismatch: expected %q, got %q", + recordCount+1, expected.blockDigest, blockDigest) + } + + // Validate response-specific fields + if warcType == "response" { + if expected.targetURI != "" { + targetURI := record.Header.Get("WARC-Target-URI") + if targetURI != expected.targetURI { + t.Errorf("record %d: WARC-Target-URI mismatch: expected %q, got %q", + recordCount+1, expected.targetURI, targetURI) + } + } + } + + // Close record content + if err := record.Content.Close(); err != nil { + t.Errorf("record %d: failed to close content: %v", recordCount+1, err) + } + + recordCount++ + } + + // Validate total record count + if recordCount != expectedTotalRecords { + t.Errorf("total record count mismatch: expected %d, got %d", + expectedTotalRecords, recordCount) + } + + // Validate total content length + if totalContentLength != expectedTotalContentLength { + t.Errorf("total content length mismatch: expected %d bytes, got %d bytes", + expectedTotalContentLength, totalContentLength) + } +}