From b3303fb4e4079b4d90217bbaab32b614c40d5401 Mon Sep 17 00:00:00 2001 From: Tolya Korniltsev Date: Wed, 17 Sep 2025 14:06:15 +0700 Subject: [PATCH 1/8] pyroscope.java: decouple archive extraction from the profiler, remove Profiler(use Distribution). --- internal/component/pyroscope/java/args.go | 3 + .../component/pyroscope/java/asprof/asprof.go | 111 ++++-------------- .../pyroscope/java/asprof/asprof_darwin.go | 8 +- .../pyroscope/java/asprof/asprof_linux.go | 28 ++--- .../pyroscope/java/asprof/asprof_test.go | 42 +++---- .../pyroscope/java/asprof/extract.go | 45 +++++++ internal/component/pyroscope/java/java.go | 26 ++-- internal/component/pyroscope/java/loop.go | 18 ++- .../component/pyroscope/java/loop_test.go | 22 ++-- 9 files changed, 135 insertions(+), 168 deletions(-) diff --git a/internal/component/pyroscope/java/args.go b/internal/component/pyroscope/java/args.go index 8f1e69696a..ed8d313636 100644 --- a/internal/component/pyroscope/java/args.go +++ b/internal/component/pyroscope/java/args.go @@ -14,6 +14,9 @@ type Arguments struct { TmpDir string `alloy:"tmp_dir,attr,optional"` ProfilingConfig ProfilingConfig `alloy:"profiling_config,block,optional"` + + // undocumented + Dist string `alloy:"dist,block,optional"` } type ProfilingConfig struct { diff --git a/internal/component/pyroscope/java/asprof/asprof.go b/internal/component/pyroscope/java/asprof/asprof.go index 9ebd93e1cf..da541d6b94 100644 --- a/internal/component/pyroscope/java/asprof/asprof.go +++ b/internal/component/pyroscope/java/asprof/asprof.go @@ -8,11 +8,9 @@ import ( _ "embed" "encoding/hex" "fmt" - "io/fs" "os" "os/exec" "path/filepath" - "strings" "sync" ) @@ -20,27 +18,35 @@ var fsMutex sync.Mutex type Distribution struct { extractedDir string - version int } -func (d *Distribution) LauncherPath() string { - return filepath.Join(d.extractedDir, "bin/asprof") +func NewExtractedDistribution(extractedDir string) (Distribution, error) { + d := Distribution{extractedDir: extractedDir} + if _, err := os.Stat(d.LauncherPath()); err != nil { + return d, fmt.Errorf("asprof launcher not found: %w", err) + } + if _, err := os.Stat(d.LibPath()); err != nil { + return d, fmt.Errorf("asprof lib not found: %w", err) + } + return d, nil } -type Profiler struct { - tmpDir string - extractOnce sync.Once - dist *Distribution - extractError error - tmpDirMarker any - archiveHash string - archive Archive +func (d Distribution) LauncherPath() string { + return filepath.Join(d.extractedDir, "bin/asprof") } type Archive struct { - data []byte - version int - format int + data []byte + format int +} + +func (a *Archive) SHA1() string { + sum := sha1.Sum(a.data) + return hex.EncodeToString(sum[:]) +} + +func (a *Archive) DistName() string { + return fmt.Sprintf("alloy-asprof-%s", a.SHA1()) } const ( @@ -48,21 +54,11 @@ const ( ArchiveFormatZip ) -func NewProfiler(tmpDir string, archive Archive) *Profiler { - res := &Profiler{tmpDir: tmpDir, dist: new(Distribution), tmpDirMarker: "alloy-asprof"} - sum := sha1.Sum(archive.data) - hexSum := hex.EncodeToString(sum[:]) - res.archiveHash = hexSum - res.dist.version = archive.version - res.archive = archive - return res -} - -func (p *Profiler) Execute(dist *Distribution, argv []string) (string, string, error) { +func (d Distribution) Execute(argv []string) (string, string, error) { stdout := bytes.NewBuffer(nil) stderr := bytes.NewBuffer(nil) - exe := dist.LauncherPath() + exe := d.LauncherPath() cmd := exec.Command(exe, argv...) cmd.Stdout = stdout @@ -77,62 +73,3 @@ func (p *Profiler) Execute(dist *Distribution, argv []string) (string, string, e } return stdout.String(), stderr.String(), nil } - -func (p *Profiler) Distribution() *Distribution { - return p.dist -} - -func (p *Profiler) ExtractDistributions() error { - p.extractOnce.Do(func() { - p.extractError = p.extractDistributions() - }) - return p.extractError -} - -func (p *Profiler) extractDistributions() error { - fsMutex.Lock() - defer fsMutex.Unlock() - distName := p.getDistName() - - var launcher, dist []byte - err := readArchive(p.archive.data, p.archive.format, func(name string, fi fs.FileInfo, data []byte) error { - if strings.Contains(name, "asprof") { - launcher = data - } - if strings.Contains(name, "libasyncProfiler") { - dist = data - } - return nil - }) - if err != nil { - return err - } - if launcher == nil || dist == nil { - return fmt.Errorf("failed to find libasyncProfiler in archive %s", distName) - } - - fileMap := map[string][]byte{} - fileMap[filepath.Join(distName, p.dist.LauncherPath())] = launcher - fileMap[filepath.Join(distName, p.dist.LibPath())] = dist - tmpDirFile, err := os.Open(p.tmpDir) - if err != nil { - return fmt.Errorf("failed to open tmp dir %s: %w", p.tmpDir, err) - } - defer tmpDirFile.Close() - - if err = checkTempDirPermissions(tmpDirFile); err != nil { - return err - } - - for path, data := range fileMap { - if err = writeFile(tmpDirFile, path, data, true); err != nil { - return err - } - } - p.dist.extractedDir = filepath.Join(p.tmpDir, distName) - return nil -} - -func (p *Profiler) getDistName() string { - return fmt.Sprintf("%s-%s", p.tmpDirMarker, p.archiveHash) -} diff --git a/internal/component/pyroscope/java/asprof/asprof_darwin.go b/internal/component/pyroscope/java/asprof/asprof_darwin.go index be46d1c915..e123e18e27 100644 --- a/internal/component/pyroscope/java/asprof/asprof_darwin.go +++ b/internal/component/pyroscope/java/asprof/asprof_darwin.go @@ -13,15 +13,13 @@ var embeddedArchiveData []byte // bin/asprof // lib/libasyncProfiler.dylib -var embeddedArchiveVersion = 300 +var EmbeddedArchive = Archive{data: embeddedArchiveData, format: ArchiveFormatZip} -var EmbeddedArchive = Archive{data: embeddedArchiveData, version: embeddedArchiveVersion, format: ArchiveFormatZip} - -func (d *Distribution) LibPath() string { +func (d Distribution) LibPath() string { return filepath.Join(d.extractedDir, "lib/libasyncProfiler.dylib") } -func (p *Profiler) CopyLib(dist *Distribution, pid int) error { +func (d Distribution) CopyLib(pid int) error { return nil } diff --git a/internal/component/pyroscope/java/asprof/asprof_linux.go b/internal/component/pyroscope/java/asprof/asprof_linux.go index d12fa890fc..6b16d79993 100644 --- a/internal/component/pyroscope/java/asprof/asprof_linux.go +++ b/internal/component/pyroscope/java/asprof/asprof_linux.go @@ -10,22 +10,20 @@ import ( "strings" ) -var embeddedArchiveVersion = 300 +var EmbeddedArchive = Archive{data: embeddedArchiveData, format: ArchiveFormatTarGz} -var EmbeddedArchive = Archive{data: embeddedArchiveData, version: embeddedArchiveVersion, format: ArchiveFormatTarGz} - -func (d *Distribution) LibPath() string { +func (d Distribution) LibPath() string { return filepath.Join(d.extractedDir, "lib/libasyncProfiler.so") } -func (p *Profiler) CopyLib(dist *Distribution, pid int) error { +func (d Distribution) CopyLib(pid int) error { fsMutex.Lock() defer fsMutex.Unlock() - libData, err := os.ReadFile(dist.LibPath()) + libData, err := os.ReadFile(d.LibPath()) if err != nil { return err } - launcherData, err := os.ReadFile(dist.LauncherPath()) + launcherData, err := os.ReadFile(d.LauncherPath()) if err != nil { return err } @@ -35,8 +33,8 @@ func (p *Profiler) CopyLib(dist *Distribution, pid int) error { return fmt.Errorf("failed to open proc root %s: %w", procRoot, err) } defer procRootFile.Close() - dstLibPath := strings.TrimPrefix(dist.LibPath(), "/") - dstLauncherPath := strings.TrimPrefix(dist.LauncherPath(), "/") + dstLibPath := strings.TrimPrefix(d.LibPath(), "/") + dstLauncherPath := strings.TrimPrefix(d.LauncherPath(), "/") if err = writeFile(procRootFile, dstLibPath, libData, false); err != nil { return err } @@ -48,15 +46,5 @@ func (p *Profiler) CopyLib(dist *Distribution, pid int) error { } func ProcessPath(path string, pid int) string { - f := procFile{path, pid} - return f.procRootPath() -} - -type procFile struct { - path string - pid int -} - -func (f *procFile) procRootPath() string { - return filepath.Join("/proc", strconv.Itoa(f.pid), "root", f.path) + return filepath.Join("/proc", strconv.Itoa(pid), "root", path) } diff --git a/internal/component/pyroscope/java/asprof/asprof_test.go b/internal/component/pyroscope/java/asprof/asprof_test.go index ee9628b0ca..2b5fffc710 100644 --- a/internal/component/pyroscope/java/asprof/asprof_test.go +++ b/internal/component/pyroscope/java/asprof/asprof_test.go @@ -9,7 +9,7 @@ import ( "testing" "github.com/google/uuid" - "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" ) // extracting to /tmp @@ -23,45 +23,45 @@ import ( // write skippable tests with uid=0 func TestStickyDir(t *testing.T) { dir := "/tmp" - p := NewProfiler(dir, EmbeddedArchive) - p.tmpDirMarker = fmt.Sprintf("alloy-asprof-%s", uuid.NewString()) - t.Logf("tmpDirMarker: %s", p.tmpDirMarker) - err := p.ExtractDistributions() - assert.NoError(t, err) + tmpDirMarker := fmt.Sprintf("alloy-asprof-%s", uuid.NewString()) + t.Logf("tmpDirMarker: %s", tmpDirMarker) + dist, err := ExtractDistribution(EmbeddedArchive, dir, tmpDirMarker) + require.NoError(t, err) + require.NotNil(t, dist) } func TestOwnedDir(t *testing.T) { dir := t.TempDir() err := os.Chmod(dir, 0755) - assert.NoError(t, err) - p := NewProfiler(dir, EmbeddedArchive) - err = p.ExtractDistributions() - assert.NoError(t, err) + require.NoError(t, err) + dist, err := ExtractDistribution(EmbeddedArchive, dir, "alloy-asprof") + require.NoError(t, err) + require.NotNil(t, dist) } func TestOwnedDirWrongPermission(t *testing.T) { dir := t.TempDir() err := os.Chmod(dir, 0777) - assert.NoError(t, err) - p := NewProfiler(dir, EmbeddedArchive) - err = p.ExtractDistributions() - assert.Error(t, err) + require.NoError(t, err) + dist, err := ExtractDistribution(EmbeddedArchive, dir, "alloy-asprof-") + require.Error(t, err) + require.Empty(t, dist.extractedDir) } func TestDistSymlink(t *testing.T) { root := t.TempDir() err := os.Chmod(root, 0755) - assert.NoError(t, err) + require.NoError(t, err) manipulated := t.TempDir() err = os.Chmod(manipulated, 0755) - assert.NoError(t, err) - p := NewProfiler(root, EmbeddedArchive) - distName := p.getDistName() + require.NoError(t, err) + distName := "dist" err = os.Symlink(manipulated, filepath.Join(root, distName)) - assert.NoError(t, err) + require.NoError(t, err) - err = p.ExtractDistributions() + dist, err := ExtractDistribution(EmbeddedArchive, root, distName) t.Logf("expected %s", err) - assert.Error(t, err) + require.Error(t, err) + require.Empty(t, dist.extractedDir) } diff --git a/internal/component/pyroscope/java/asprof/extract.go b/internal/component/pyroscope/java/asprof/extract.go index 372c108741..167841b167 100644 --- a/internal/component/pyroscope/java/asprof/extract.go +++ b/internal/component/pyroscope/java/asprof/extract.go @@ -223,3 +223,48 @@ func checkExtractFile(f *os.File, parent *os.File) error { } return nil } + +func ExtractDistribution(a Archive, tmpDir, distName string) (Distribution, error) { + + d := Distribution{} + fsMutex.Lock() + defer fsMutex.Unlock() + + var launcher, lib []byte + err := readArchive(a.data, a.format, func(name string, fi fs.FileInfo, data []byte) error { + if strings.Contains(name, "asprof") { + launcher = data + } + if strings.Contains(name, "libasyncProfiler") { + lib = data + } + return nil + }) + if err != nil { + return d, err + } + if launcher == nil || lib == nil { + return d, fmt.Errorf("failed to find libasyncProfiler in archive %s", distName) + } + + fileMap := map[string][]byte{} + fileMap[filepath.Join(distName, d.LauncherPath())] = launcher + fileMap[filepath.Join(distName, d.LibPath())] = lib + tmpDirFile, err := os.Open(tmpDir) + if err != nil { + return d, fmt.Errorf("failed to open tmp dir %s: %w", tmpDir, err) + } + defer tmpDirFile.Close() + + if err = checkTempDirPermissions(tmpDirFile); err != nil { + return d, err + } + + for path, data := range fileMap { + if err = writeFile(tmpDirFile, path, data, true); err != nil { + return d, err + } + } + d.extractedDir = filepath.Join(tmpDir, distName) + return d, nil +} diff --git a/internal/component/pyroscope/java/java.go b/internal/component/pyroscope/java/java.go index c7e76f7112..4e0343dbd0 100644 --- a/internal/component/pyroscope/java/java.go +++ b/internal/component/pyroscope/java/java.go @@ -34,10 +34,20 @@ func init() { return nil, fmt.Errorf("java profiler: must be run as root") } a := args.(Arguments) - var profiler = asprof.NewProfiler(a.TmpDir, asprof.EmbeddedArchive) - err := profiler.ExtractDistributions() - if err != nil { - return nil, fmt.Errorf("extract async profiler: %w", err) + var ( + dist asprof.Distribution + err error + ) + if a.Dist != "" { + dist, err = asprof.NewExtractedDistribution(a.Dist) + if err != nil { + return nil, fmt.Errorf("invalid asprof dist: %w", err) + } + } else { + dist, err = asprof.ExtractDistribution(asprof.EmbeddedArchive, a.TmpDir, asprof.EmbeddedArchive.DistName()) + if err != nil { + return nil, fmt.Errorf("extract asprof: %w", err) + } } forwardTo := pyroscope.NewFanout(a.ForwardTo, opts.ID, opts.Registerer) @@ -45,7 +55,7 @@ func init() { opts: opts, args: a, forwardTo: forwardTo, - profiler: profiler, + profiler: dist, pid2process: make(map[int]*profilingLoop), } c.updateTargets(a) @@ -86,7 +96,7 @@ type javaComponent struct { mutex sync.Mutex pid2process map[int]*profilingLoop - profiler *asprof.Profiler + profiler asprof.Distribution } func (j *javaComponent) Run(ctx context.Context) error { @@ -141,10 +151,6 @@ func (j *javaComponent) updateTargets(args Arguments) { _ = level.Debug(j.opts.Logger).Log("msg", "active target", "target", fmt.Sprintf("%+v", target), "pid", pid) - if err != nil { - _ = level.Error(j.opts.Logger).Log("msg", "invalid target", "target", fmt.Sprintf("%v", target), "err", err) - continue - } proc := j.pid2process[pid] if proc == nil { proc = newProfilingLoop(pid, target, j.opts.Logger, j.profiler, j.forwardTo, j.args.ProfilingConfig) diff --git a/internal/component/pyroscope/java/loop.go b/internal/component/pyroscope/java/loop.go index 357a0f2931..d7f95d67c3 100644 --- a/internal/component/pyroscope/java/loop.go +++ b/internal/component/pyroscope/java/loop.go @@ -35,7 +35,6 @@ type profilingLoop struct { pid int target discovery.Target cancel context.CancelFunc - dist *asprof.Distribution jfrFile string startTime time.Time profiler Profiler @@ -50,21 +49,18 @@ type profilingLoop struct { } type Profiler interface { - CopyLib(dist *asprof.Distribution, pid int) error - Execute(dist *asprof.Distribution, argv []string) (string, string, error) - Distribution() *asprof.Distribution + CopyLib(pid int) error + Execute(argv []string) (string, string, error) } func newProfilingLoop(pid int, target discovery.Target, logger log.Logger, profiler Profiler, output *pyroscope.Fanout, cfg ProfilingConfig) *profilingLoop { ctx, cancel := context.WithCancel(context.Background()) - dist := profiler.Distribution() p := &profilingLoop{ logger: log.With(logger, "pid", pid), output: output, pid: pid, target: target, cancel: cancel, - dist: dist, jfrFile: fmt.Sprintf("/tmp/asprof-%d-%d.jfr", os.Getpid(), pid), cfg: cfg, profiler: profiler, @@ -80,7 +76,7 @@ func newProfilingLoop(pid int, target discovery.Target, logger log.Logger, profi } func (p *profilingLoop) loop(ctx context.Context) { - if err := p.profiler.CopyLib(p.dist, p.pid); err != nil { + if err := p.profiler.CopyLib(p.pid); err != nil { p.onError(fmt.Errorf("failed to copy libasyncProfiler.so: %w", err)) return } @@ -256,8 +252,8 @@ func (p *profilingLoop) start() error { strconv.Itoa(p.pid), ) - _ = level.Debug(p.logger).Log("cmd", fmt.Sprintf("%s %s", p.dist.LauncherPath(), strings.Join(argv, " "))) - stdout, stderr, err := p.profiler.Execute(p.dist, argv) + _ = level.Debug(p.logger).Log("cmd", strings.Join(argv, " ")) + stdout, stderr, err := p.profiler.Execute(argv) if err != nil { return fmt.Errorf("asprof failed to run: %w %s %s", err, stdout, stderr) } @@ -276,8 +272,8 @@ func (p *profilingLoop) stop() error { "-o", "jfr", strconv.Itoa(p.pid), } - _ = level.Debug(p.logger).Log("msg", "asprof", "cmd", fmt.Sprintf("%s %s", p.dist.LauncherPath(), strings.Join(argv, " "))) - stdout, stderr, err := p.profiler.Execute(p.dist, argv) + _ = level.Debug(p.logger).Log("msg", "asprof", "cmd", strings.Join(argv, " ")) + stdout, stderr, err := p.profiler.Execute(argv) if err != nil { return fmt.Errorf("asprof failed to run: %w %s %s", err, stdout, stderr) } diff --git a/internal/component/pyroscope/java/loop_test.go b/internal/component/pyroscope/java/loop_test.go index 241561ef5e..3188c26458 100644 --- a/internal/component/pyroscope/java/loop_test.go +++ b/internal/component/pyroscope/java/loop_test.go @@ -18,28 +18,22 @@ import ( "github.com/grafana/alloy/internal/component/discovery" "github.com/grafana/alloy/internal/component/pyroscope" - "github.com/grafana/alloy/internal/component/pyroscope/java/asprof" ) type mockProfiler struct { mock.Mock - dist *asprof.Distribution } -func (m *mockProfiler) CopyLib(dist *asprof.Distribution, pid int) error { - args := m.Called(dist, pid) +func (m *mockProfiler) CopyLib(pid int) error { + args := m.Called(pid) return args.Error(0) } -func (m *mockProfiler) Execute(dist *asprof.Distribution, argv []string) (string, string, error) { - args := m.Called(dist, argv) +func (m *mockProfiler) Execute(argv []string) (string, string, error) { + args := m.Called(argv) return args.String(0), args.String(1), args.Error(2) } -func (m *mockProfiler) Distribution() *asprof.Distribution { - return m.dist -} - type mockAppendable struct { mock.Mock } @@ -73,17 +67,17 @@ func newTestProfilingLoop(_ *testing.T, profiler *mockProfiler, appendable pyros } func TestProfilingLoop_StartStop(t *testing.T) { - profiler := &mockProfiler{dist: &asprof.Distribution{}} + profiler := &mockProfiler{} appendable := &mockAppendable{} pid := os.Getpid() jfrPath := fmt.Sprintf("/tmp/asprof-%d-%d.jfr", pid, pid) pCh := make(chan *profilingLoop) - profiler.On("CopyLib", profiler.dist, pid).Return(nil).Once() + profiler.On("CopyLib", pid).Return(nil).Once() // expect the profiler to be executed with the correct arguments to start it - profiler.On("Execute", profiler.dist, []string{ + profiler.On("Execute", []string{ "-f", jfrPath, "-o", "jfr", @@ -103,7 +97,7 @@ func TestProfilingLoop_StartStop(t *testing.T) { }).Return("", "", nil).Once() // expect the profiler to be executed with the correct arguments to stop it - profiler.On("Execute", profiler.dist, []string{ + profiler.On("Execute", []string{ "stop", "-o", "jfr", strconv.Itoa(pid), From 2dc42b843cb82ce0a6244915347cd8c96666ed67 Mon Sep 17 00:00:00 2001 From: Tolya Korniltsev Date: Wed, 17 Sep 2025 14:44:56 +0700 Subject: [PATCH 2/8] playground --- internal/component/pyroscope/java/args.go | 4 +- internal/component/pyroscope/java/java.go | 102 +++++++++--------- internal/component/pyroscope/java/loop.go | 6 +- .../util/internal/cmd/playground/main.go | 82 ++++++++++++-- 4 files changed, 131 insertions(+), 63 deletions(-) diff --git a/internal/component/pyroscope/java/args.go b/internal/component/pyroscope/java/args.go index ed8d313636..eddffeb56a 100644 --- a/internal/component/pyroscope/java/args.go +++ b/internal/component/pyroscope/java/args.go @@ -32,7 +32,7 @@ type ProfilingConfig struct { } func (rc *Arguments) UnmarshalAlloy(f func(interface{}) error) error { - *rc = defaultArguments() + *rc = DefaultArguments() type config Arguments return f((*config)(rc)) } @@ -46,7 +46,7 @@ func (arg *Arguments) Validate() error { } } -func defaultArguments() Arguments { +func DefaultArguments() Arguments { return Arguments{ TmpDir: "/tmp", ProfilingConfig: ProfilingConfig{ diff --git a/internal/component/pyroscope/java/java.go b/internal/component/pyroscope/java/java.go index 4e0343dbd0..6a5f302143 100644 --- a/internal/component/pyroscope/java/java.go +++ b/internal/component/pyroscope/java/java.go @@ -11,16 +11,18 @@ import ( "sync" "time" + "github.com/go-kit/log" "github.com/grafana/alloy/internal/component" "github.com/grafana/alloy/internal/component/discovery" "github.com/grafana/alloy/internal/component/pyroscope" "github.com/grafana/alloy/internal/component/pyroscope/java/asprof" "github.com/grafana/alloy/internal/featuregate" "github.com/grafana/alloy/internal/runtime/logging/level" + "github.com/prometheus/client_golang/prometheus" ) const ( - labelProcessID = "__process_pid__" + LabelProcessID = "__process_pid__" ) func init() { @@ -30,40 +32,42 @@ func init() { Args: Arguments{}, Build: func(opts component.Options, args component.Arguments) (component.Component, error) { - if os.Getuid() != 0 { - return nil, fmt.Errorf("java profiler: must be run as root") - } - a := args.(Arguments) - var ( - dist asprof.Distribution - err error - ) - if a.Dist != "" { - dist, err = asprof.NewExtractedDistribution(a.Dist) - if err != nil { - return nil, fmt.Errorf("invalid asprof dist: %w", err) - } - } else { - dist, err = asprof.ExtractDistribution(asprof.EmbeddedArchive, a.TmpDir, asprof.EmbeddedArchive.DistName()) - if err != nil { - return nil, fmt.Errorf("extract asprof: %w", err) - } - } - - forwardTo := pyroscope.NewFanout(a.ForwardTo, opts.ID, opts.Registerer) - c := &javaComponent{ - opts: opts, - args: a, - forwardTo: forwardTo, - profiler: dist, - pid2process: make(map[int]*profilingLoop), - } - c.updateTargets(a) - return c, nil + return New(opts.Logger, opts.Registerer, opts.ID, args.(Arguments)) }, }) } +func New(logger log.Logger, reg prometheus.Registerer, id string, a Arguments) (*Component, error) { + if os.Getuid() != 0 { + return nil, fmt.Errorf("java profiler: must be run as root") + } + var ( + dist asprof.Distribution + err error + ) + if a.Dist != "" { + dist, err = asprof.NewExtractedDistribution(a.Dist) + if err != nil { + return nil, fmt.Errorf("invalid asprof dist: %w", err) + } + } else { + dist, err = asprof.ExtractDistribution(asprof.EmbeddedArchive, a.TmpDir, asprof.EmbeddedArchive.DistName()) + if err != nil { + return nil, fmt.Errorf("extract asprof: %w", err) + } + } + forwardTo := pyroscope.NewFanout(a.ForwardTo, id, reg) + c := &Component{ + logger: logger, + args: a, + forwardTo: forwardTo, + profiler: dist, + pid2process: make(map[int]*profilingLoop), + } + c.updateTargets(a) + return c, nil +} + type debugInfo struct { ProfiledTargets []*debugInfoProfiledTarget `alloy:"profiled_targets,block"` } @@ -85,12 +89,12 @@ type debugInfoProfiledTarget struct { } var ( - _ component.DebugComponent = (*javaComponent)(nil) - _ component.Component = (*javaComponent)(nil) + _ component.DebugComponent = (*Component)(nil) + _ component.Component = (*Component)(nil) ) -type javaComponent struct { - opts component.Options +type Component struct { + logger log.Logger args Arguments forwardTo *pyroscope.Fanout @@ -99,7 +103,7 @@ type javaComponent struct { profiler asprof.Distribution } -func (j *javaComponent) Run(ctx context.Context) error { +func (j *Component) Run(ctx context.Context) error { defer func() { j.stop() }() @@ -107,7 +111,7 @@ func (j *javaComponent) Run(ctx context.Context) error { return nil } -func (j *javaComponent) DebugInfo() interface{} { +func (j *Component) DebugInfo() interface{} { j.mutex.Lock() defer j.mutex.Unlock() var di debugInfo @@ -122,39 +126,39 @@ func (j *javaComponent) DebugInfo() interface{} { return &di } -func (j *javaComponent) Update(args component.Arguments) error { +func (j *Component) Update(args component.Arguments) error { newArgs := args.(Arguments) j.forwardTo.UpdateChildren(newArgs.ForwardTo) j.updateTargets(newArgs) return nil } -func (j *javaComponent) updateTargets(args Arguments) { +func (j *Component) updateTargets(args Arguments) { j.mutex.Lock() defer j.mutex.Unlock() j.args = args active := make(map[int]struct{}) for _, target := range args.Targets { - pidStr, ok := target.Get(labelProcessID) + pidStr, ok := target.Get(LabelProcessID) if !ok { - _ = level.Error(j.opts.Logger).Log("msg", "could not find PID label", "pid", pidStr) + _ = level.Error(j.logger).Log("msg", "could not find PID label", "pid", pidStr) continue } pid64, err := strconv.ParseInt(pidStr, 10, 32) if err != nil { - _ = level.Error(j.opts.Logger).Log("msg", "could not convert process ID to a 32 bit integer", "pid", pidStr, "err", err) + _ = level.Error(j.logger).Log("msg", "could not convert process ID to a 32 bit integer", "pid", pidStr, "err", err) continue } pid := int(pid64) - _ = level.Debug(j.opts.Logger).Log("msg", "active target", + _ = level.Debug(j.logger).Log("msg", "active target", "target", fmt.Sprintf("%+v", target), "pid", pid) proc := j.pid2process[pid] if proc == nil { - proc = newProfilingLoop(pid, target, j.opts.Logger, j.profiler, j.forwardTo, j.args.ProfilingConfig) - _ = level.Debug(j.opts.Logger).Log("msg", "new process", "target", fmt.Sprintf("%+v", target)) + proc = newProfilingLoop(pid, target, j.logger, j.profiler, j.forwardTo, j.args.ProfilingConfig) + _ = level.Debug(j.logger).Log("msg", "new process", "target", fmt.Sprintf("%+v", target)) j.pid2process[pid] = proc } else { proc.update(target, j.args.ProfilingConfig) @@ -165,19 +169,19 @@ func (j *javaComponent) updateTargets(args Arguments) { if _, ok := active[pid]; ok { continue } - _ = level.Debug(j.opts.Logger).Log("msg", "inactive target", "pid", pid) + _ = level.Debug(j.logger).Log("msg", "inactive target", "pid", pid) _ = j.pid2process[pid].Close() delete(j.pid2process, pid) } } -func (j *javaComponent) stop() { - _ = level.Debug(j.opts.Logger).Log("msg", "stopping") +func (j *Component) stop() { + _ = level.Debug(j.logger).Log("msg", "stopping") j.mutex.Lock() defer j.mutex.Unlock() for _, proc := range j.pid2process { proc.Close() - _ = level.Debug(j.opts.Logger).Log("msg", "stopped", "pid", proc.pid) + _ = level.Debug(j.logger).Log("msg", "stopped", "pid", proc.pid) delete(j.pid2process, proc.pid) } } diff --git a/internal/component/pyroscope/java/loop.go b/internal/component/pyroscope/java/loop.go index d7f95d67c3..c486f2048d 100644 --- a/internal/component/pyroscope/java/loop.go +++ b/internal/component/pyroscope/java/loop.go @@ -241,11 +241,9 @@ func (p *profilingLoop) start() error { argv = append(argv, "--lock", cfg.Lock) } if cfg.LogLevel != "" { - argv = append(argv, "--loglevel", cfg.LogLevel) - } - if cfg.Quiet { - argv = append(argv, "--quiet") + argv = append(argv, "-L", cfg.LogLevel) } + argv = append(argv, "start", "--timeout", strconv.Itoa(int(p.interval().Seconds())), diff --git a/internal/component/pyroscope/util/internal/cmd/playground/main.go b/internal/component/pyroscope/util/internal/cmd/playground/main.go index 7c802a3136..45e1d293dd 100644 --- a/internal/component/pyroscope/util/internal/cmd/playground/main.go +++ b/internal/component/pyroscope/util/internal/cmd/playground/main.go @@ -4,11 +4,16 @@ package main import ( "context" + "flag" + "fmt" "os" + "strconv" "github.com/go-kit/log" + "github.com/grafana/alloy/internal/component/discovery" "github.com/grafana/alloy/internal/component/pyroscope" "github.com/grafana/alloy/internal/component/pyroscope/ebpf" + "github.com/grafana/alloy/internal/component/pyroscope/java" "github.com/grafana/alloy/internal/component/pyroscope/write" "github.com/oklog/run" "github.com/prometheus/client_golang/prometheus" @@ -20,6 +25,19 @@ var ( reg = prometheus.NewRegistry() ) +type config struct { + ebpfEnabled bool + javaPids pids +} + +func parseConfig() *config { + c := &config{} + flag.BoolVar(&c.ebpfEnabled, "ebpf", true, "enable ebpf") + flag.Var(&c.javaPids, "java", "java process id") + flag.Parse() + return c +} + func newWrite() pyroscope.Appendable { var receiver pyroscope.Appendable e := write.GetDefaultEndpointOptions() @@ -62,20 +80,68 @@ func newEbpf(forward pyroscope.Appendable) *ebpf.Component { } func main() { - w := newWrite() - e := newEbpf(w) - g := run.Group{} ctx, cancel := context.WithCancel(context.Background()) defer cancel() - - g.Add(func() error { - return e.Run(ctx) - }, func(err error) { + cancel2 := func(err error) { cancel() - }) + } + + cfg := parseConfig() + w := newWrite() + + if cfg.ebpfEnabled { + e := newEbpf(w) + g.Add(func() error { + return e.Run(ctx) + }, cancel2) + } + if len(cfg.javaPids) > 0 { + j := newJava(cfg.javaPids, w) + g.Add(func() error { + return j.Run(ctx) + }, cancel2) + } + if err := g.Run(); err != nil { _ = l.Log("msg", "error running component", "err", err) os.Exit(1) } } + +func newJava(ps pids, w pyroscope.Appendable) *java.Component { + args := java.DefaultArguments() + args.ForwardTo = []pyroscope.Appendable{w} + for _, pid := range ps { + exe, _ := os.Readlink(fmt.Sprintf("/proc/%d/exe", pid)) + cwd, _ := os.ReadFile(fmt.Sprintf("/proc/%d/cwd", pid)) + t := discovery.NewTargetFromMap(map[string]string{ + java.LabelProcessID: strconv.Itoa(pid), + "exe": exe, + "cwd": string(cwd), + }) + args.Targets = append(args.Targets, t) + } + + j, err := java.New(l, reg, "java", args) + if err != nil { + _ = l.Log("msg", "error creating java component", "err", err) + os.Exit(1) + } + return j +} + +type pids []int + +func (p *pids) String() string { + return fmt.Sprintf("%+v", *p) +} + +func (p *pids) Set(value string) error { + pid, err := strconv.Atoi(value) + if err != nil { + return err + } + *p = append(*p, pid) + return nil +} From 39dc15af99fb2c860e8aa76d683bdb6471f686c8 Mon Sep 17 00:00:00 2001 From: Tolya Korniltsev Date: Wed, 17 Sep 2025 15:45:35 +0700 Subject: [PATCH 3/8] fix args --- internal/component/pyroscope/java/args.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/internal/component/pyroscope/java/args.go b/internal/component/pyroscope/java/args.go index eddffeb56a..d98a7742a7 100644 --- a/internal/component/pyroscope/java/args.go +++ b/internal/component/pyroscope/java/args.go @@ -16,7 +16,7 @@ type Arguments struct { ProfilingConfig ProfilingConfig `alloy:"profiling_config,block,optional"` // undocumented - Dist string `alloy:"dist,block,optional"` + Dist string `alloy:"dist,attr,optional"` } type ProfilingConfig struct { From 708ffbf41a326e40b6356b9fd934317727cc51f5 Mon Sep 17 00:00:00 2001 From: Tolya Korniltsev Date: Wed, 17 Sep 2025 15:47:22 +0700 Subject: [PATCH 4/8] cleanup --- .../component/pyroscope/java/asprof/asprof.go | 46 +++++++++++++++++++ .../pyroscope/java/asprof/extract.go | 45 ------------------ 2 files changed, 46 insertions(+), 45 deletions(-) diff --git a/internal/component/pyroscope/java/asprof/asprof.go b/internal/component/pyroscope/java/asprof/asprof.go index da541d6b94..8f6d9fb75e 100644 --- a/internal/component/pyroscope/java/asprof/asprof.go +++ b/internal/component/pyroscope/java/asprof/asprof.go @@ -8,9 +8,11 @@ import ( _ "embed" "encoding/hex" "fmt" + "io/fs" "os" "os/exec" "path/filepath" + "strings" "sync" ) @@ -73,3 +75,47 @@ func (d Distribution) Execute(argv []string) (string, string, error) { } return stdout.String(), stderr.String(), nil } + +func ExtractDistribution(a Archive, tmpDir, distName string) (Distribution, error) { + d := Distribution{} + fsMutex.Lock() + defer fsMutex.Unlock() + + var launcher, lib []byte + err := readArchive(a.data, a.format, func(name string, fi fs.FileInfo, data []byte) error { + if strings.Contains(name, "asprof") { + launcher = data + } + if strings.Contains(name, "libasyncProfiler") { + lib = data + } + return nil + }) + if err != nil { + return d, err + } + if launcher == nil || lib == nil { + return d, fmt.Errorf("failed to find libasyncProfiler in archive %s", distName) + } + + fileMap := map[string][]byte{} + fileMap[filepath.Join(distName, d.LauncherPath())] = launcher + fileMap[filepath.Join(distName, d.LibPath())] = lib + tmpDirFile, err := os.Open(tmpDir) + if err != nil { + return d, fmt.Errorf("failed to open tmp dir %s: %w", tmpDir, err) + } + defer tmpDirFile.Close() + + if err = checkTempDirPermissions(tmpDirFile); err != nil { + return d, err + } + + for path, data := range fileMap { + if err = writeFile(tmpDirFile, path, data, true); err != nil { + return d, err + } + } + d.extractedDir = filepath.Join(tmpDir, distName) + return d, nil +} diff --git a/internal/component/pyroscope/java/asprof/extract.go b/internal/component/pyroscope/java/asprof/extract.go index 167841b167..372c108741 100644 --- a/internal/component/pyroscope/java/asprof/extract.go +++ b/internal/component/pyroscope/java/asprof/extract.go @@ -223,48 +223,3 @@ func checkExtractFile(f *os.File, parent *os.File) error { } return nil } - -func ExtractDistribution(a Archive, tmpDir, distName string) (Distribution, error) { - - d := Distribution{} - fsMutex.Lock() - defer fsMutex.Unlock() - - var launcher, lib []byte - err := readArchive(a.data, a.format, func(name string, fi fs.FileInfo, data []byte) error { - if strings.Contains(name, "asprof") { - launcher = data - } - if strings.Contains(name, "libasyncProfiler") { - lib = data - } - return nil - }) - if err != nil { - return d, err - } - if launcher == nil || lib == nil { - return d, fmt.Errorf("failed to find libasyncProfiler in archive %s", distName) - } - - fileMap := map[string][]byte{} - fileMap[filepath.Join(distName, d.LauncherPath())] = launcher - fileMap[filepath.Join(distName, d.LibPath())] = lib - tmpDirFile, err := os.Open(tmpDir) - if err != nil { - return d, fmt.Errorf("failed to open tmp dir %s: %w", tmpDir, err) - } - defer tmpDirFile.Close() - - if err = checkTempDirPermissions(tmpDirFile); err != nil { - return d, err - } - - for path, data := range fileMap { - if err = writeFile(tmpDirFile, path, data, true); err != nil { - return d, err - } - } - d.extractedDir = filepath.Join(tmpDir, distName) - return d, nil -} From 97f9a65547bfec13950e498b76581098156bb549 Mon Sep 17 00:00:00 2001 From: Tolya Korniltsev Date: Wed, 17 Sep 2025 15:50:03 +0700 Subject: [PATCH 5/8] todo --- internal/component/pyroscope/java/loop.go | 1 + 1 file changed, 1 insertion(+) diff --git a/internal/component/pyroscope/java/loop.go b/internal/component/pyroscope/java/loop.go index c486f2048d..8c96ab9ff5 100644 --- a/internal/component/pyroscope/java/loop.go +++ b/internal/component/pyroscope/java/loop.go @@ -180,6 +180,7 @@ func (p *profilingLoop) push(jfrBytes []byte, startTime time.Time, endTime time. sz := req.Profile.SizeVT() l := log.With(p.logger, "metric", metric, "sz", sz) ls := labels.NewBuilder(nil) + // todo do not use AsMap() for _, l := range jfrpprofPyroscope.Labels(target.AsMap(), profiles.JFREvent, req.Metric, "", spyName) { ls.Set(l.Name, l.Value) } From a7ed0b9750999cb8c63ac539b96e6ce64fc67aab Mon Sep 17 00:00:00 2001 From: Tolya Korniltsev Date: Thu, 9 Oct 2025 21:22:50 +0700 Subject: [PATCH 6/8] review fix --- internal/component/pyroscope/java/asprof/asprof.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/internal/component/pyroscope/java/asprof/asprof.go b/internal/component/pyroscope/java/asprof/asprof.go index 8f6d9fb75e..d5a813fb6f 100644 --- a/internal/component/pyroscope/java/asprof/asprof.go +++ b/internal/component/pyroscope/java/asprof/asprof.go @@ -42,13 +42,13 @@ type Archive struct { format int } -func (a *Archive) SHA1() string { +func (a *Archive) sha1() string { sum := sha1.Sum(a.data) return hex.EncodeToString(sum[:]) } func (a *Archive) DistName() string { - return fmt.Sprintf("alloy-asprof-%s", a.SHA1()) + return fmt.Sprintf("alloy-asprof-%s", a.sha1()) } const ( From 60ffef127df71aab952a5ee534be080d33e60259 Mon Sep 17 00:00:00 2001 From: Tolya Korniltsev Date: Thu, 9 Oct 2025 21:23:45 +0700 Subject: [PATCH 7/8] review fixes --- internal/component/pyroscope/java/java.go | 2 ++ 1 file changed, 2 insertions(+) diff --git a/internal/component/pyroscope/java/java.go b/internal/component/pyroscope/java/java.go index 6a5f302143..5112b21d2d 100644 --- a/internal/component/pyroscope/java/java.go +++ b/internal/component/pyroscope/java/java.go @@ -50,11 +50,13 @@ func New(logger log.Logger, reg prometheus.Registerer, id string, a Arguments) ( if err != nil { return nil, fmt.Errorf("invalid asprof dist: %w", err) } + _ = logger.Log("msg", "using extracted asprof dist", "dist", a.Dist) } else { dist, err = asprof.ExtractDistribution(asprof.EmbeddedArchive, a.TmpDir, asprof.EmbeddedArchive.DistName()) if err != nil { return nil, fmt.Errorf("extract asprof: %w", err) } + _ = logger.Log("msg", "using embedded asprof dist") } forwardTo := pyroscope.NewFanout(a.ForwardTo, id, reg) c := &Component{ From ca9ce814561581c9e684f4f63a2fc045e8fe45ec Mon Sep 17 00:00:00 2001 From: korniltsev-grafanista Date: Thu, 9 Oct 2025 21:26:36 +0700 Subject: [PATCH 8/8] pyroscope.java: add integration tests (#4454) * pyroscope.java: add integration tests fix package name Revert "pyroscope.java: Fix java log level parameter (#4440)" This reverts commit 4909877427447efb4132ea64801571672f985bc9. move the helper to pyroscope package * second integration test * revert compose tests * revert unneeded changes * fix buildtag * fix buildtag * improve start time for pyroscope container * skip integration test if it's not pyoroscope job * update makefile * pyroscope.java: Fix java log level parameter (#4440) * pyroscope.java: Fix java log level parameter The version bundled of the async profiler has no loglevel parameter: ``` ts=2025-09-16T08:16:50.898924708Z level=error component_path=/profiling.feature component_id=pyroscope.java.java_pods pid=1184752 err="failed to start: asprof failed to run: asprof failed to run /tmp/alloy-asprof-ae0261b1093f2bc4df44a87300fef98dcdebccb5/bin/asprof: exit status 1 Unrecognized option: --loglevel\n" ``` * Quiet is not a valid argument for the async-profiler cli It can only be used for attaching using agent * remove comments * fix --------- Co-authored-by: Christian Simon --- .dockerignore | 1 + .github/workflows/test_pyroscope_pr.yml | 2 +- .gitignore | 1 + .../java/integration/integration_test.go | 137 ++++++++++++++++++ .../pyroscope/testutil/components.go | 36 +++++ .../util/internal/cmd/playground/main.go | 18 +-- .../pyroscope/util/test/container/java.go | 54 +++++++ .../util/test/container/pyroscope.go | 46 ++++++ .../component/pyroscope/util/test/query.go | 24 +++ 9 files changed, 302 insertions(+), 17 deletions(-) create mode 100644 internal/component/pyroscope/java/integration/integration_test.go create mode 100644 internal/component/pyroscope/testutil/components.go create mode 100644 internal/component/pyroscope/util/test/container/java.go create mode 100644 internal/component/pyroscope/util/test/container/pyroscope.go create mode 100644 internal/component/pyroscope/util/test/query.go diff --git a/.dockerignore b/.dockerignore index 6b1a85acb8..7bddf77b7c 100644 --- a/.dockerignore +++ b/.dockerignore @@ -7,3 +7,4 @@ internal/web/ui/build/ packaging/windows/LICENSE packaging/windows/agent-windows-amd64.exe cmd/grafana-agent/Dockerfile +alloy diff --git a/.github/workflows/test_pyroscope_pr.yml b/.github/workflows/test_pyroscope_pr.yml index 46ee062364..8606937e3e 100644 --- a/.github/workflows/test_pyroscope_pr.yml +++ b/.github/workflows/test_pyroscope_pr.yml @@ -26,4 +26,4 @@ jobs: go-version-file: go.mod cache: false - - run: make GO_TAGS="nodocker" test-pyroscope \ No newline at end of file + - run: sudo make test-pyroscope diff --git a/.gitignore b/.gitignore index 9ddcfe6ade..905fa90beb 100644 --- a/.gitignore +++ b/.gitignore @@ -33,3 +33,4 @@ node_modules # file of the pair will detect a dirty work tree and detect the wrong tag name. .tag-only .image-tag +alloy diff --git a/internal/component/pyroscope/java/integration/integration_test.go b/internal/component/pyroscope/java/integration/integration_test.go new file mode 100644 index 0000000000..e7186b403d --- /dev/null +++ b/internal/component/pyroscope/java/integration/integration_test.go @@ -0,0 +1,137 @@ +//go:build linux && (amd64 || arm64) + +package integration + +import ( + "context" + "fmt" + "net/http" + "os" + "strings" + "sync" + "testing" + "time" + + "github.com/go-kit/log" + "github.com/grafana/alloy/internal/component/discovery" + "github.com/grafana/alloy/internal/component/pyroscope" + "github.com/grafana/alloy/internal/component/pyroscope/java" + "github.com/grafana/alloy/internal/component/pyroscope/testutil" + "github.com/grafana/alloy/internal/component/pyroscope/util/test" + pyroutil "github.com/grafana/alloy/internal/component/pyroscope/util/test/container" + querierv1 "github.com/grafana/pyroscope/api/gen/proto/go/querier/v1" + "github.com/prometheus/client_golang/prometheus" + "github.com/stretchr/testify/require" +) + +func TestPyroscopeJavaIntegration(t *testing.T) { + if os.Getenv("GITHUB_ACTIONS") == "true" && os.Getenv("GITHUB_JOB") != "test_pyroscope" { + t.Skip("Skipping Pyroscope Java integration test in GitHub Actions (job name is not test_pyroscope)") + } + wg := sync.WaitGroup{} + defer func() { + wg.Wait() + }() + ctx, cancel := context.WithCancel(context.Background()) + t.Cleanup(cancel) + l := log.NewSyncLogger(log.NewLogfmtLogger(os.Stderr)) + l = log.WithPrefix(l, + "test", t.Name(), + "ts", log.DefaultTimestampUTC, + ) + + _, pyroscopeEndpoint := pyroutil.StartPyroscopeContainer(t, ctx, l) + + _, javaEndpoint, pid := pyroutil.StartJavaApplicationContainer(t, ctx, l) + + t.Logf("Pyroscope endpoint: %s", pyroscopeEndpoint) + t.Logf("Java application endpoint: %s", javaEndpoint) + t.Logf("Java process PID in container: %d", pid) + + reg := prometheus.NewRegistry() + + writeComponent, err := testutil.CreateWriteComponent(l, reg, pyroscopeEndpoint) + require.NoError(t, err, "Failed to create write component") + + args := java.DefaultArguments() + args.ForwardTo = []pyroscope.Appendable{writeComponent} + args.ProfilingConfig.Interval = time.Second + args.Targets = []discovery.Target{ + discovery.NewTargetFromMap(map[string]string{ + java.LabelProcessID: fmt.Sprintf("%d", pid), + "service_name": "spring-petclinic", + }), + } + javaComponent, err := java.New( + log.With(l, "component", "pyroscope.java"), + reg, + "test-java", + args, + ) + require.NoError(t, err, "Failed to create java component") + + wg.Add(2) + go func() { + defer wg.Done() + _ = javaComponent.Run(ctx) + }() + go func() { + defer wg.Done() + for ctx.Err() == nil { + burn(javaEndpoint) + time.Sleep(100 * time.Millisecond) + } + }() + + require.Eventually(t, func() bool { + req := &querierv1.SelectMergeProfileRequest{ + ProfileTypeID: `process_cpu:cpu:nanoseconds:cpu:nanoseconds`, + LabelSelector: `{service_name="spring-petclinic"}`, + Start: time.Now().Add(-time.Hour).UnixMilli(), + End: time.Now().UnixMilli(), + } + res, err := test.Query(pyroscopeEndpoint, req) + if err != nil { + t.Logf("Error querying endpoint: %v", err) + return false + } + ss := res.String() + if !strings.Contains(ss, `org/springframework/samples/petclinic/web/VetController.showVetList`) { + return false + } + if !strings.Contains(ss, `libjvm.so.JavaThread::thread_main_inner`) { + return false + } + return true + }, 90*time.Second, 100*time.Millisecond) + + require.Eventually(t, func() bool { + req := &querierv1.SelectMergeProfileRequest{ + ProfileTypeID: `memory:alloc_in_new_tlab_bytes:bytes:space:bytes`, + LabelSelector: `{service_name="spring-petclinic"}`, + Start: time.Now().Add(-time.Hour).UnixMilli(), + End: time.Now().UnixMilli(), + } + res, err := test.Query(pyroscopeEndpoint, req) + if err != nil { + t.Logf("Error querying endpoint: %v", err) + return false + } + ss := res.String() + if !strings.Contains(ss, `org/springframework/samples/petclinic/web/VetController.showVetList`) { + return false + } + if strings.Contains(ss, `libjvm.so.JavaThread::thread_main_inner`) { + return false + } + return true + }, 90*time.Second, 100*time.Millisecond) + cancel() +} + +func burn(url string) { + _, _ = http.DefaultClient.Get(url + "/") + _, _ = http.DefaultClient.Get(url + "/owners/find") + _, _ = http.DefaultClient.Get(url + "/vets") + _, _ = http.DefaultClient.Get(url + "/oups") +} diff --git a/internal/component/pyroscope/testutil/components.go b/internal/component/pyroscope/testutil/components.go new file mode 100644 index 0000000000..fbf0ef75eb --- /dev/null +++ b/internal/component/pyroscope/testutil/components.go @@ -0,0 +1,36 @@ +//go:build linux && (arm64 || amd64) + +package testutil + +import ( + "fmt" + + "github.com/go-kit/log" + "github.com/grafana/alloy/internal/component/pyroscope" + "github.com/grafana/alloy/internal/component/pyroscope/write" + "github.com/prometheus/client_golang/prometheus" + "go.opentelemetry.io/otel/trace/noop" +) + +// CreateWriteComponent creates a pyroscope.write component that forwards to the given endpoint +func CreateWriteComponent(l log.Logger, reg prometheus.Registerer, endpoint string) (pyroscope.Appendable, error) { + var receiver pyroscope.Appendable + e := write.GetDefaultEndpointOptions() + e.URL = endpoint + + _, err := write.New( + log.With(l, "component", "pyroscope.write"), + noop.Tracer{}, + reg, + func(exports write.Exports) { + receiver = exports.Receiver + }, + "test", + "", + write.Arguments{Endpoints: []*write.EndpointOptions{&e}}, + ) + if err != nil { + return nil, fmt.Errorf("error creating write component: %w", err) + } + return receiver, nil +} diff --git a/internal/component/pyroscope/util/internal/cmd/playground/main.go b/internal/component/pyroscope/util/internal/cmd/playground/main.go index 45e1d293dd..400c88d351 100644 --- a/internal/component/pyroscope/util/internal/cmd/playground/main.go +++ b/internal/component/pyroscope/util/internal/cmd/playground/main.go @@ -14,10 +14,9 @@ import ( "github.com/grafana/alloy/internal/component/pyroscope" "github.com/grafana/alloy/internal/component/pyroscope/ebpf" "github.com/grafana/alloy/internal/component/pyroscope/java" - "github.com/grafana/alloy/internal/component/pyroscope/write" + "github.com/grafana/alloy/internal/component/pyroscope/testutil" "github.com/oklog/run" "github.com/prometheus/client_golang/prometheus" - "go.opentelemetry.io/otel/trace/noop" ) var ( @@ -39,20 +38,7 @@ func parseConfig() *config { } func newWrite() pyroscope.Appendable { - var receiver pyroscope.Appendable - e := write.GetDefaultEndpointOptions() - e.URL = "http://localhost:4040" - _, err := write.New( - log.With(l, "component", "write"), - noop.Tracer{}, - reg, - func(exports write.Exports) { - receiver = exports.Receiver - }, - "playground", - "", - write.Arguments{Endpoints: []*write.EndpointOptions{&e}}, - ) + receiver, err := testutil.CreateWriteComponent(l, reg, "http://localhost:4040") if err != nil { _ = l.Log("msg", "error creating write component", "err", err) os.Exit(1) diff --git a/internal/component/pyroscope/util/test/container/java.go b/internal/component/pyroscope/util/test/container/java.go new file mode 100644 index 0000000000..2942dd0eef --- /dev/null +++ b/internal/component/pyroscope/util/test/container/java.go @@ -0,0 +1,54 @@ +package container + +import ( + "context" + "fmt" + stdlog "log" + "testing" + "time" + + "github.com/docker/docker/api/types/container" + "github.com/docker/go-connections/nat" + "github.com/go-kit/log" + "github.com/stretchr/testify/require" + "github.com/testcontainers/testcontainers-go" + "github.com/testcontainers/testcontainers-go/wait" +) + +func StartJavaApplicationContainer(t *testing.T, ctx context.Context, l log.Logger) (testcontainers.Container, string, int) { + req := testcontainers.ContainerRequest{ + Image: "springcommunity/spring-framework-petclinic:latest", + ExposedPorts: []string{"8080/tcp"}, + WaitingFor: wait.ForHTTP("/").WithPort("8080/tcp").WithStartupTimeout(3 * time.Minute), + Env: map[string]string{ + "JAVA_OPTS": "-Xmx512m -Xms256m", + }, + HostConfigModifier: func(hc *container.HostConfig) { + hc.PidMode = "host" + }, + } + + c, err := testcontainers.GenericContainer(ctx, testcontainers.GenericContainerRequest{ + ContainerRequest: req, + Started: true, + Logger: stdlog.New(log.NewStdlibAdapter(l), "", 0), + }) + require.NoError(t, err) + + t.Cleanup(func() { + err := testcontainers.TerminateContainer(c) + require.NoError(t, err) + }) + + mappedPort, err := c.MappedPort(ctx, nat.Port("8080/tcp")) + require.NoError(t, err) + + host, err := c.Host(ctx) + require.NoError(t, err) + + endpoint := fmt.Sprintf("http://%s:%s", host, mappedPort.Port()) + inspected, err := c.Inspect(t.Context()) + require.NoError(t, err) + + return c, endpoint, inspected.State.Pid +} diff --git a/internal/component/pyroscope/util/test/container/pyroscope.go b/internal/component/pyroscope/util/test/container/pyroscope.go new file mode 100644 index 0000000000..cfde5ae261 --- /dev/null +++ b/internal/component/pyroscope/util/test/container/pyroscope.go @@ -0,0 +1,46 @@ +package container + +import ( + "context" + "fmt" + stdlog "log" + "testing" + + "github.com/go-kit/log" + "github.com/stretchr/testify/require" + "github.com/testcontainers/testcontainers-go" + "github.com/testcontainers/testcontainers-go/wait" +) + +func StartPyroscopeContainer(t *testing.T, ctx context.Context, l log.Logger) (testcontainers.Container, string) { + req := testcontainers.ContainerRequest{ + Image: "grafana/pyroscope:latest", + Cmd: []string{"--ingester.min-ready-duration=0s"}, + ExposedPorts: []string{"4040/tcp"}, + WaitingFor: wait.ForHTTP("/ready").WithPort("4040/tcp"), + Env: map[string]string{ + "PYROSCOPE_LOG_LEVEL": "debug", + }, + } + + c, err := testcontainers.GenericContainer(ctx, testcontainers.GenericContainerRequest{ + ContainerRequest: req, + Started: true, + Logger: stdlog.New(log.NewStdlibAdapter(l), "", 0), + }) + require.NoError(t, err) + + t.Cleanup(func() { + err := testcontainers.TerminateContainer(c) + require.NoError(t, err) + }) + + mappedPort, err := c.MappedPort(ctx, "4040/tcp") + require.NoError(t, err) + + host, err := c.Host(ctx) + require.NoError(t, err) + + endpoint := fmt.Sprintf("http://%s:%s", host, mappedPort.Port()) + return c, endpoint +} diff --git a/internal/component/pyroscope/util/test/query.go b/internal/component/pyroscope/util/test/query.go new file mode 100644 index 0000000000..9cb8ed3e3c --- /dev/null +++ b/internal/component/pyroscope/util/test/query.go @@ -0,0 +1,24 @@ +package test + +import ( + "context" + "net/http" + + "connectrpc.com/connect" + "github.com/google/pprof/profile" + querierv1 "github.com/grafana/pyroscope/api/gen/proto/go/querier/v1" + "github.com/grafana/pyroscope/api/gen/proto/go/querier/v1/querierv1connect" +) + +func Query(url string, q *querierv1.SelectMergeProfileRequest) (*profile.Profile, error) { + client := querierv1connect.NewQuerierServiceClient(http.DefaultClient, url) + res, err := client.SelectMergeProfile(context.Background(), connect.NewRequest(q)) + if err != nil { + return nil, err + } + bs, err := res.Msg.MarshalVT() + if err != nil { + return nil, err + } + return profile.ParseData(bs) +}