From 11d5883d96836d9cad5963d28a18014ed87e0721 Mon Sep 17 00:00:00 2001 From: Gabriel Date: Wed, 24 Sep 2025 15:48:28 -0300 Subject: [PATCH 01/10] removing unnecessary fields from samples, adding user to wait events --- .../postgres/collector/query_samples.go | 19 ++++------ .../postgres/collector/query_samples_test.go | 36 +++++++++---------- 2 files changed, 23 insertions(+), 32 deletions(-) diff --git a/internal/component/database_observability/postgres/collector/query_samples.go b/internal/component/database_observability/postgres/collector/query_samples.go index d8481b4954..5f006b6994 100644 --- a/internal/component/database_observability/postgres/collector/query_samples.go +++ b/internal/component/database_observability/postgres/collector/query_samples.go @@ -32,8 +32,7 @@ const selectPgStatActivity = ` s.application_name, s.client_addr, s.client_port, - s.backend_type, - s.backend_start, + s.backend_type, s.backend_xid, s.backend_xmin, s.xact_start, @@ -54,8 +53,7 @@ const selectPgStatActivity = ` ( coalesce(TRIM(s.query), '') != '' AND s.query_start IS NOT NULL AND ( - s.state != 'idle' OR - (s.state = 'idle' AND s.state_change > $1) + s.state != 'idle' OR ) AND coalesce(TRIM(s.state), '') != '' ) @@ -176,7 +174,7 @@ func calculateDuration(nullableTime sql.NullTime, currentTime time.Time) string } func (c *QuerySamples) fetchQuerySample(ctx context.Context) error { - scrapeTime := time.Now() + rows, err := c.dbConnection.QueryContext(ctx, selectPgStatActivity, c.lastScrape) if err != nil { return fmt.Errorf("failed to query pg_stat_activity: %w", err) @@ -195,7 +193,6 @@ func (c *QuerySamples) fetchQuerySample(ctx context.Context) error { &sample.ClientAddr, &sample.ClientPort, &sample.BackendType, - &sample.BackendStart, &sample.BackendXID, &sample.BackendXmin, &sample.XactStart, @@ -227,7 +224,6 @@ func (c *QuerySamples) fetchQuerySample(ctx context.Context) error { stateDuration := calculateDuration(sample.StateChange, sample.Now) queryDuration := calculateDuration(sample.QueryStart, sample.Now) xactDuration := calculateDuration(sample.XactStart, sample.Now) - backendDuration := calculateDuration(sample.BackendStart, sample.Now) clientAddr := "" if sample.ClientAddr.Valid { @@ -252,7 +248,7 @@ func (c *QuerySamples) fetchQuerySample(ctx context.Context) error { // Build query sample entry sampleLabels := fmt.Sprintf( - `datname="%s" pid="%d" leader_pid="%s" user="%s" app="%s" client="%s" backend_type="%s" backend_time="%s" xid="%d" xmin="%d" xact_time="%s" state="%s" query_time="%s" queryid="%d" query="%s" engine="postgres"`, + `datname="%s" pid="%d" leader_pid="%s" user="%s" app="%s" client="%s" backend_type="%s" xid="%d" xmin="%d" xact_time="%s" state="%s" query_time="%s" queryid="%d" query="%s" engine="postgres"`, sample.DatabaseName.String, sample.PID, leaderPID, @@ -260,7 +256,6 @@ func (c *QuerySamples) fetchQuerySample(ctx context.Context) error { sample.ApplicationName.String, clientAddr, sample.BackendType.String, - backendDuration, sample.BackendXID.Int32, sample.BackendXmin.Int32, xactDuration, @@ -285,8 +280,9 @@ func (c *QuerySamples) fetchQuerySample(ctx context.Context) error { if waitEvent != "" { waitEventLabels := fmt.Sprintf( - `datname="%s" backend_type="%s" state="%s" wait_time="%s" wait_event_type="%s" wait_event="%s" wait_event_name="%s" blocked_by_pids="%v" queryid="%d" query="%s" engine="postgres"`, + `datname="%s" user="%s" backend_type="%s" state="%s" wait_time="%s" wait_event_type="%s" wait_event="%s" wait_event_name="%s" blocked_by_pids="%v" queryid="%d" query="%s" engine="postgres"`, sample.DatabaseName.String, + sample.Username.String, sample.BackendType.String, sample.State.String, stateDuration, @@ -312,9 +308,6 @@ func (c *QuerySamples) fetchQuerySample(ctx context.Context) error { return err } - // Update last scrape time after successful scrape - c.lastScrape = scrapeTime - return nil } diff --git a/internal/component/database_observability/postgres/collector/query_samples_test.go b/internal/component/database_observability/postgres/collector/query_samples_test.go index fd31184c2d..78575e792c 100644 --- a/internal/component/database_observability/postgres/collector/query_samples_test.go +++ b/internal/component/database_observability/postgres/collector/query_samples_test.go @@ -27,7 +27,6 @@ func TestQuerySamples_FetchQuerySamples(t *testing.T) { stateChangeTime := now.Add(-10 * time.Second) // 10 seconds ago queryStartTime := now.Add(-30 * time.Second) // 30 seconds ago xactStartTime := now.Add(-2 * time.Minute) // 2 minutes ago - backendStartTime := now.Add(-1 * time.Hour) // 1 hour ago testCases := []struct { name string @@ -44,14 +43,14 @@ func TestQuerySamples_FetchQuerySamples(t *testing.T) { WillReturnRows(sqlmock.NewRows([]string{ "now", "datname", "pid", "leader_pid", "usename", "application_name", "client_addr", "client_port", - "backend_type", "backend_start", "backend_xid", "backend_xmin", + "backend_type", "backend_xid", "backend_xmin", "xact_start", "state", "state_change", "wait_event_type", "wait_event", "blocked_by_pids", "query_start", "query_id", "query", }).AddRow( now, "testdb", 100, sql.NullInt64{}, "testuser", "testapp", "127.0.0.1", 5432, - "client backend", backendStartTime, sql.NullInt32{Int32: 500, Valid: true}, sql.NullInt32{Int32: 400, Valid: true}, + "client backend", sql.NullInt32{Int32: 500, Valid: true}, sql.NullInt32{Int32: 400, Valid: true}, xactStartTime, "active", stateChangeTime, sql.NullString{}, sql.NullString{}, nil, queryStartTime, sql.NullInt64{Int64: 123, Valid: true}, "SELECT * FROM users", @@ -62,7 +61,7 @@ func TestQuerySamples_FetchQuerySamples(t *testing.T) { {"op": OP_QUERY_SAMPLE}, }, expectedLines: []string{ - `level="info" datname="testdb" pid="100" leader_pid="" user="testuser" app="testapp" client="127.0.0.1:5432" backend_type="client backend" backend_time="1h0m0s" xid="500" xmin="400" xact_time="2m0s" state="active" query_time="30s" queryid="123" query="SELECT * FROM users" engine="postgres" cpu_time="10s"`, + `level="info" datname="testdb" pid="100" leader_pid="" user="testuser" app="testapp" client="127.0.0.1:5432" backend_type="client backend" xid="500" xmin="400" xact_time="2m0s" state="active" query_time="30s" queryid="123" query="SELECT * FROM users" engine="postgres" cpu_time="10s"`, }, }, { @@ -72,14 +71,14 @@ func TestQuerySamples_FetchQuerySamples(t *testing.T) { WillReturnRows(sqlmock.NewRows([]string{ "now", "datname", "pid", "leader_pid", "usename", "application_name", "client_addr", "client_port", - "backend_type", "backend_start", "backend_xid", "backend_xmin", + "backend_type", "backend_xid", "backend_xmin", "xact_start", "state", "state_change", "wait_event_type", "wait_event", "blocked_by_pids", "query_start", "query_id", "query", }).AddRow( now, "testdb", 101, sql.NullInt64{Int64: 100, Valid: true}, "testuser", "testapp", "127.0.0.1", 5432, - "parallel worker", now, sql.NullInt32{}, sql.NullInt32{}, + "parallel worker", sql.NullInt32{}, sql.NullInt32{}, now, "active", now, sql.NullString{}, sql.NullString{}, nil, now, sql.NullInt64{Int64: 123, Valid: true}, "SELECT * FROM large_table", @@ -90,8 +89,7 @@ func TestQuerySamples_FetchQuerySamples(t *testing.T) { {"op": OP_QUERY_SAMPLE}, }, expectedLines: []string{ - fmt.Sprintf(`level="info" datname="testdb" pid="101" leader_pid="100" user="testuser" app="testapp" client="127.0.0.1:5432" backend_type="parallel worker" backend_time="%s" xid="0" xmin="0" xact_time="%s" state="active" query_time="%s" queryid="123" query="SELECT * FROM large_table" engine="postgres" cpu_time="%s"`, - time.Duration(0).String(), + fmt.Sprintf(`level="info" datname="testdb" pid="101" leader_pid="100" user="testuser" app="testapp" client="127.0.0.1:5432" backend_type="parallel worker" xid="0" xmin="0" xact_time="%s" state="active" query_time="%s" queryid="123" query="SELECT * FROM large_table" engine="postgres" cpu_time="%s"`, time.Duration(0).String(), time.Duration(0).String(), time.Duration(0).String(), @@ -105,14 +103,14 @@ func TestQuerySamples_FetchQuerySamples(t *testing.T) { WillReturnRows(sqlmock.NewRows([]string{ "now", "datname", "pid", "leader_pid", "usename", "application_name", "client_addr", "client_port", - "backend_type", "backend_start", "backend_xid", "backend_xmin", + "backend_type", "backend_xid", "backend_xmin", "xact_start", "state", "state_change", "wait_event_type", "wait_event", "blocked_by_pids", "query_start", "query_id", "query", }).AddRow( now, "testdb", 102, sql.NullInt64{}, "testuser", "testapp", "127.0.0.1", 5432, - "client backend", backendStartTime, sql.NullInt32{}, sql.NullInt32{}, + "client backend", sql.NullInt32{}, sql.NullInt32{}, xactStartTime, "waiting", stateChangeTime, sql.NullString{String: "Lock", Valid: true}, sql.NullString{String: "relation", Valid: true}, pq.Int64Array{103, 104}, now, sql.NullInt64{Int64: 124, Valid: true}, "UPDATE users SET status = 'active'", @@ -124,8 +122,8 @@ func TestQuerySamples_FetchQuerySamples(t *testing.T) { {"op": OP_WAIT_EVENT}, }, expectedLines: []string{ - `level="info" datname="testdb" pid="102" leader_pid="" user="testuser" app="testapp" client="127.0.0.1:5432" backend_type="client backend" backend_time="1h0m0s" xid="0" xmin="0" xact_time="2m0s" state="waiting" query_time="0s" queryid="124" query="UPDATE users SET status = ?" engine="postgres"`, - `level="info" datname="testdb" backend_type="client backend" state="waiting" wait_time="10s" wait_event_type="Lock" wait_event="relation" wait_event_name="Lock:relation" blocked_by_pids="[103 104]" queryid="124" query="UPDATE users SET status = ?" engine="postgres"`, + `level="info" datname="testdb" pid="102" leader_pid="" user="testuser" app="testapp" client="127.0.0.1:5432" backend_type="client backend" xid="0" xmin="0" xact_time="2m0s" state="waiting" query_time="0s" queryid="124" query="UPDATE users SET status = ?" engine="postgres"`, + `level="info" datname="testdb" user="testuser" backend_type="client backend" state="waiting" wait_time="10s" wait_event_type="Lock" wait_event="relation" wait_event_name="Lock:relation" blocked_by_pids="[103 104]" queryid="124" query="UPDATE users SET status = ?" engine="postgres"`, }, }, { @@ -135,14 +133,14 @@ func TestQuerySamples_FetchQuerySamples(t *testing.T) { WillReturnRows(sqlmock.NewRows([]string{ "now", "datname", "pid", "leader_pid", "usename", "application_name", "client_addr", "client_port", - "backend_type", "backend_start", "backend_xid", "backend_xmin", + "backend_type", "backend_xid", "backend_xmin", "xact_start", "state", "state_change", "wait_event_type", "wait_event", "blocked_by_pids", "query_start", "query_id", "query", }).AddRow( now, "testdb", 103, sql.NullInt64{}, "testuser", "testapp", "127.0.0.1", 5432, - "client backend", now, sql.NullInt32{}, sql.NullInt32{}, + "client backend", sql.NullInt32{}, sql.NullInt32{}, now, "active", now, sql.NullString{}, sql.NullString{}, nil, now, sql.NullInt64{Int64: 125, Valid: true}, "", @@ -159,14 +157,14 @@ func TestQuerySamples_FetchQuerySamples(t *testing.T) { WillReturnRows(sqlmock.NewRows([]string{ "now", "datname", "pid", "leader_pid", "usename", "application_name", "client_addr", "client_port", - "backend_type", "backend_start", "backend_xid", "backend_xmin", + "backend_type", "backend_xid", "backend_xmin", "xact_start", "state", "state_change", "wait_event_type", "wait_event", "blocked_by_pids", "query_start", "query_id", "query", }).AddRow( now, sql.NullString{Valid: false}, 104, sql.NullInt64{}, "testuser", "testapp", "127.0.0.1", 5432, - "client backend", now, sql.NullInt32{}, sql.NullInt32{}, + "client backend", sql.NullInt32{}, sql.NullInt32{}, now, "active", now, sql.NullString{}, sql.NullString{}, nil, now, sql.NullInt64{Int64: 126, Valid: true}, "SELECT * FROM users", @@ -183,14 +181,14 @@ func TestQuerySamples_FetchQuerySamples(t *testing.T) { WillReturnRows(sqlmock.NewRows([]string{ "now", "datname", "pid", "leader_pid", "usename", "application_name", "client_addr", "client_port", - "backend_type", "backend_start", "backend_xid", "backend_xmin", + "backend_type", "backend_xid", "backend_xmin", "xact_start", "state", "state_change", "wait_event_type", "wait_event", "blocked_by_pids", "query_start", "query_id", "query", }).AddRow( now, "testdb", 106, sql.NullInt64{}, "testuser", "testapp", "127.0.0.1", 5432, - "client backend", backendStartTime, sql.NullInt32{}, sql.NullInt32{}, + "client backend", sql.NullInt32{}, sql.NullInt32{}, xactStartTime, "active", stateChangeTime, sql.NullString{}, sql.NullString{}, nil, queryStartTime, sql.NullInt64{Int64: 128, Valid: true}, "SELECT * FROM users WHERE id = 123 AND email = 'test@example.com'", @@ -201,7 +199,7 @@ func TestQuerySamples_FetchQuerySamples(t *testing.T) { {"op": OP_QUERY_SAMPLE}, }, expectedLines: []string{ - `level="info" datname="testdb" pid="106" leader_pid="" user="testuser" app="testapp" client="127.0.0.1:5432" backend_type="client backend" backend_time="1h0m0s" xid="0" xmin="0" xact_time="2m0s" state="active" query_time="30s" queryid="128" query="SELECT * FROM users WHERE id = 123 AND email = 'test@example.com'" engine="postgres" cpu_time="10s"`, + `level="info" datname="testdb" pid="106" leader_pid="" user="testuser" app="testapp" client="127.0.0.1:5432" backend_type="client backend" xid="0" xmin="0" xact_time="2m0s" state="active" query_time="30s" queryid="128" query="SELECT * FROM users WHERE id = 123 AND email = 'test@example.com'" engine="postgres" cpu_time="10s"`, }, }, } From c3f60a1652ce89bc2d7080026552ce697226be6c Mon Sep 17 00:00:00 2001 From: Gabriel Date: Wed, 24 Sep 2025 15:53:26 -0300 Subject: [PATCH 02/10] fix --- .../postgres/collector/explain_plan.go | 5 ----- 1 file changed, 5 deletions(-) diff --git a/internal/component/database_observability/postgres/collector/explain_plan.go b/internal/component/database_observability/postgres/collector/explain_plan.go index f1a322f338..0990bfbfd2 100644 --- a/internal/component/database_observability/postgres/collector/explain_plan.go +++ b/internal/component/database_observability/postgres/collector/explain_plan.go @@ -397,11 +397,6 @@ func (c *ExplainPlan) fetchExplainPlans(ctx context.Context) error { } redactedByteExplainPlanJSON := redact(string(byteExplainPlanJSON)) - if err != nil { - level.Error(logger).Log("msg", "failed to redact explain plan json", "err", err) - nonRecoverableFailureOccurred = true - continue - } level.Debug(logger).Log("msg", "db native explain plan", "db_native_explain_plan", base64.StdEncoding.EncodeToString([]byte(redactedByteExplainPlanJSON))) From 66e192beb324c4e97776badbea0d4452928ee365 Mon Sep 17 00:00:00 2001 From: Gabriel Date: Wed, 24 Sep 2025 17:09:41 -0300 Subject: [PATCH 03/10] lint --- .../database_observability/postgres/collector/query_samples.go | 1 - 1 file changed, 1 deletion(-) diff --git a/internal/component/database_observability/postgres/collector/query_samples.go b/internal/component/database_observability/postgres/collector/query_samples.go index 5f006b6994..c9d8084391 100644 --- a/internal/component/database_observability/postgres/collector/query_samples.go +++ b/internal/component/database_observability/postgres/collector/query_samples.go @@ -174,7 +174,6 @@ func calculateDuration(nullableTime sql.NullTime, currentTime time.Time) string } func (c *QuerySamples) fetchQuerySample(ctx context.Context) error { - rows, err := c.dbConnection.QueryContext(ctx, selectPgStatActivity, c.lastScrape) if err != nil { return fmt.Errorf("failed to query pg_stat_activity: %w", err) From ebc3f160ea34f2bd3f60654818db00b4a9342a6a Mon Sep 17 00:00:00 2001 From: Gabriel Date: Fri, 26 Sep 2025 10:45:23 -0300 Subject: [PATCH 04/10] update changelog --- CHANGELOG.md | 2 ++ 1 file changed, 2 insertions(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index a28a3a779f..e90dc9526e 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -210,6 +210,8 @@ v1.11.0-rc.3 - Fix panic in `loki.write` when component is shutting down and `external_labels` are configured. (@kalleep) +- (_Experimental_) Remove `backend_time` field from query samples, and add `user` field to wait events within `database_observability.postgres` `query_samples` collector. + v1.10.2 ----------------- From 842694fd742d0224760433d97fce8a3bd994174b Mon Sep 17 00:00:00 2001 From: Gabriel Date: Mon, 29 Sep 2025 18:40:38 -0300 Subject: [PATCH 05/10] cleanup change --- .../postgres/collector/query_samples.go | 34 +++++++------- .../postgres/collector/query_samples_test.go | 46 ++++++++++--------- 2 files changed, 40 insertions(+), 40 deletions(-) diff --git a/internal/component/database_observability/postgres/collector/query_samples.go b/internal/component/database_observability/postgres/collector/query_samples.go index c9d8084391..aa16c63798 100644 --- a/internal/component/database_observability/postgres/collector/query_samples.go +++ b/internal/component/database_observability/postgres/collector/query_samples.go @@ -32,7 +32,8 @@ const selectPgStatActivity = ` s.application_name, s.client_addr, s.client_port, - s.backend_type, + s.backend_type, + s.backend_start, s.backend_xid, s.backend_xmin, s.xact_start, @@ -47,18 +48,13 @@ const selectPgStatActivity = ` FROM pg_stat_activity s JOIN pg_database d ON s.datid = d.oid AND NOT d.datistemplate AND d.datallowconn WHERE - s.pid <> pg_backend_pid() AND + backend_type != 'client backend' OR ( - s.backend_type != 'client backend' OR - ( - coalesce(TRIM(s.query), '') != '' AND s.query_start IS NOT NULL AND - ( - s.state != 'idle' OR - ) AND - coalesce(TRIM(s.state), '') != '' - ) + s.pid != pg_backend_pid() AND + coalesce(TRIM(s.query), '') != '' AND + s.query_id != 0 AND + s.state != 'idle' ) - AND query_id > 0 ` type QuerySamplesInfo struct { @@ -101,11 +97,10 @@ type QuerySamples struct { entryHandler loki.EntryHandler disableQueryRedaction bool - logger log.Logger - running *atomic.Bool - ctx context.Context - cancel context.CancelFunc - lastScrape time.Time + logger log.Logger + running *atomic.Bool + ctx context.Context + cancel context.CancelFunc } func NewQuerySamples(args QuerySamplesArguments) (*QuerySamples, error) { @@ -174,7 +169,7 @@ func calculateDuration(nullableTime sql.NullTime, currentTime time.Time) string } func (c *QuerySamples) fetchQuerySample(ctx context.Context) error { - rows, err := c.dbConnection.QueryContext(ctx, selectPgStatActivity, c.lastScrape) + rows, err := c.dbConnection.QueryContext(ctx, selectPgStatActivity) if err != nil { return fmt.Errorf("failed to query pg_stat_activity: %w", err) } @@ -192,6 +187,7 @@ func (c *QuerySamples) fetchQuerySample(ctx context.Context) error { &sample.ClientAddr, &sample.ClientPort, &sample.BackendType, + &sample.BackendStart, &sample.BackendXID, &sample.BackendXmin, &sample.XactStart, @@ -223,6 +219,7 @@ func (c *QuerySamples) fetchQuerySample(ctx context.Context) error { stateDuration := calculateDuration(sample.StateChange, sample.Now) queryDuration := calculateDuration(sample.QueryStart, sample.Now) xactDuration := calculateDuration(sample.XactStart, sample.Now) + backendDuration := calculateDuration(sample.BackendStart, sample.Now) clientAddr := "" if sample.ClientAddr.Valid { @@ -247,7 +244,7 @@ func (c *QuerySamples) fetchQuerySample(ctx context.Context) error { // Build query sample entry sampleLabels := fmt.Sprintf( - `datname="%s" pid="%d" leader_pid="%s" user="%s" app="%s" client="%s" backend_type="%s" xid="%d" xmin="%d" xact_time="%s" state="%s" query_time="%s" queryid="%d" query="%s" engine="postgres"`, + `datname="%s" pid="%d" leader_pid="%s" user="%s" app="%s" client="%s" backend_type="%s" backend_time="%s" xid="%d" xmin="%d" xact_time="%s" state="%s" query_time="%s" queryid="%d" query="%s" engine="postgres"`, sample.DatabaseName.String, sample.PID, leaderPID, @@ -255,6 +252,7 @@ func (c *QuerySamples) fetchQuerySample(ctx context.Context) error { sample.ApplicationName.String, clientAddr, sample.BackendType.String, + backendDuration, sample.BackendXID.Int32, sample.BackendXmin.Int32, xactDuration, diff --git a/internal/component/database_observability/postgres/collector/query_samples_test.go b/internal/component/database_observability/postgres/collector/query_samples_test.go index 78575e792c..e309c95dca 100644 --- a/internal/component/database_observability/postgres/collector/query_samples_test.go +++ b/internal/component/database_observability/postgres/collector/query_samples_test.go @@ -27,6 +27,7 @@ func TestQuerySamples_FetchQuerySamples(t *testing.T) { stateChangeTime := now.Add(-10 * time.Second) // 10 seconds ago queryStartTime := now.Add(-30 * time.Second) // 30 seconds ago xactStartTime := now.Add(-2 * time.Minute) // 2 minutes ago + backendStartTime := now.Add(-1 * time.Hour) // 1 hour ago testCases := []struct { name string @@ -39,18 +40,18 @@ func TestQuerySamples_FetchQuerySamples(t *testing.T) { { name: "active query without wait event", setupMock: func(mock sqlmock.Sqlmock) { - mock.ExpectQuery(selectPgStatActivity).WithArgs(sqlmock.AnyArg()).RowsWillBeClosed(). + mock.ExpectQuery(selectPgStatActivity).RowsWillBeClosed(). WillReturnRows(sqlmock.NewRows([]string{ "now", "datname", "pid", "leader_pid", "usename", "application_name", "client_addr", "client_port", - "backend_type", "backend_xid", "backend_xmin", + "backend_type", "backend_start", "backend_xid", "backend_xmin", "xact_start", "state", "state_change", "wait_event_type", "wait_event", "blocked_by_pids", "query_start", "query_id", "query", }).AddRow( now, "testdb", 100, sql.NullInt64{}, "testuser", "testapp", "127.0.0.1", 5432, - "client backend", sql.NullInt32{Int32: 500, Valid: true}, sql.NullInt32{Int32: 400, Valid: true}, + "client backend", backendStartTime, sql.NullInt32{Int32: 500, Valid: true}, sql.NullInt32{Int32: 400, Valid: true}, xactStartTime, "active", stateChangeTime, sql.NullString{}, sql.NullString{}, nil, queryStartTime, sql.NullInt64{Int64: 123, Valid: true}, "SELECT * FROM users", @@ -61,24 +62,24 @@ func TestQuerySamples_FetchQuerySamples(t *testing.T) { {"op": OP_QUERY_SAMPLE}, }, expectedLines: []string{ - `level="info" datname="testdb" pid="100" leader_pid="" user="testuser" app="testapp" client="127.0.0.1:5432" backend_type="client backend" xid="500" xmin="400" xact_time="2m0s" state="active" query_time="30s" queryid="123" query="SELECT * FROM users" engine="postgres" cpu_time="10s"`, + `level="info" datname="testdb" pid="100" leader_pid="" user="testuser" app="testapp" client="127.0.0.1:5432" backend_type="client backend" backend_time="1h0m0s" xid="500" xmin="400" xact_time="2m0s" state="active" query_time="30s" queryid="123" query="SELECT * FROM users" engine="postgres" cpu_time="10s"`, }, }, { name: "parallel query with leader PID", setupMock: func(mock sqlmock.Sqlmock) { - mock.ExpectQuery(selectPgStatActivity).WithArgs(sqlmock.AnyArg()).RowsWillBeClosed(). + mock.ExpectQuery(selectPgStatActivity).RowsWillBeClosed(). WillReturnRows(sqlmock.NewRows([]string{ "now", "datname", "pid", "leader_pid", "usename", "application_name", "client_addr", "client_port", - "backend_type", "backend_xid", "backend_xmin", + "backend_type", "backend_start", "backend_xid", "backend_xmin", "xact_start", "state", "state_change", "wait_event_type", "wait_event", "blocked_by_pids", "query_start", "query_id", "query", }).AddRow( now, "testdb", 101, sql.NullInt64{Int64: 100, Valid: true}, "testuser", "testapp", "127.0.0.1", 5432, - "parallel worker", sql.NullInt32{}, sql.NullInt32{}, + "parallel worker", now, sql.NullInt32{}, sql.NullInt32{}, now, "active", now, sql.NullString{}, sql.NullString{}, nil, now, sql.NullInt64{Int64: 123, Valid: true}, "SELECT * FROM large_table", @@ -89,7 +90,8 @@ func TestQuerySamples_FetchQuerySamples(t *testing.T) { {"op": OP_QUERY_SAMPLE}, }, expectedLines: []string{ - fmt.Sprintf(`level="info" datname="testdb" pid="101" leader_pid="100" user="testuser" app="testapp" client="127.0.0.1:5432" backend_type="parallel worker" xid="0" xmin="0" xact_time="%s" state="active" query_time="%s" queryid="123" query="SELECT * FROM large_table" engine="postgres" cpu_time="%s"`, + fmt.Sprintf(`level="info" datname="testdb" pid="101" leader_pid="100" user="testuser" app="testapp" client="127.0.0.1:5432" backend_type="parallel worker" backend_time="%s" xid="0" xmin="0" xact_time="%s" state="active" query_time="%s" queryid="123" query="SELECT * FROM large_table" engine="postgres" cpu_time="%s"`, + time.Duration(0).String(), time.Duration(0).String(), time.Duration(0).String(), time.Duration(0).String(), @@ -99,18 +101,18 @@ func TestQuerySamples_FetchQuerySamples(t *testing.T) { { name: "query with wait event", setupMock: func(mock sqlmock.Sqlmock) { - mock.ExpectQuery(selectPgStatActivity).WithArgs(sqlmock.AnyArg()).RowsWillBeClosed(). + mock.ExpectQuery(selectPgStatActivity).RowsWillBeClosed(). WillReturnRows(sqlmock.NewRows([]string{ "now", "datname", "pid", "leader_pid", "usename", "application_name", "client_addr", "client_port", - "backend_type", "backend_xid", "backend_xmin", + "backend_type", "backend_start", "backend_xid", "backend_xmin", "xact_start", "state", "state_change", "wait_event_type", "wait_event", "blocked_by_pids", "query_start", "query_id", "query", }).AddRow( now, "testdb", 102, sql.NullInt64{}, "testuser", "testapp", "127.0.0.1", 5432, - "client backend", sql.NullInt32{}, sql.NullInt32{}, + "client backend", backendStartTime, sql.NullInt32{}, sql.NullInt32{}, xactStartTime, "waiting", stateChangeTime, sql.NullString{String: "Lock", Valid: true}, sql.NullString{String: "relation", Valid: true}, pq.Int64Array{103, 104}, now, sql.NullInt64{Int64: 124, Valid: true}, "UPDATE users SET status = 'active'", @@ -122,25 +124,25 @@ func TestQuerySamples_FetchQuerySamples(t *testing.T) { {"op": OP_WAIT_EVENT}, }, expectedLines: []string{ - `level="info" datname="testdb" pid="102" leader_pid="" user="testuser" app="testapp" client="127.0.0.1:5432" backend_type="client backend" xid="0" xmin="0" xact_time="2m0s" state="waiting" query_time="0s" queryid="124" query="UPDATE users SET status = ?" engine="postgres"`, + `level="info" datname="testdb" pid="102" leader_pid="" user="testuser" app="testapp" client="127.0.0.1:5432" backend_type="client backend" backend_time="1h0m0s" xid="0" xmin="0" xact_time="2m0s" state="waiting" query_time="0s" queryid="124" query="UPDATE users SET status = ?" engine="postgres"`, `level="info" datname="testdb" user="testuser" backend_type="client backend" state="waiting" wait_time="10s" wait_event_type="Lock" wait_event="relation" wait_event_name="Lock:relation" blocked_by_pids="[103 104]" queryid="124" query="UPDATE users SET status = ?" engine="postgres"`, }, }, { name: "insufficient privilege query - no loki entries expected", setupMock: func(mock sqlmock.Sqlmock) { - mock.ExpectQuery(selectPgStatActivity).WithArgs(sqlmock.AnyArg()).RowsWillBeClosed(). + mock.ExpectQuery(selectPgStatActivity).RowsWillBeClosed(). WillReturnRows(sqlmock.NewRows([]string{ "now", "datname", "pid", "leader_pid", "usename", "application_name", "client_addr", "client_port", - "backend_type", "backend_xid", "backend_xmin", + "backend_type", "backend_start", "backend_xid", "backend_xmin", "xact_start", "state", "state_change", "wait_event_type", "wait_event", "blocked_by_pids", "query_start", "query_id", "query", }).AddRow( now, "testdb", 103, sql.NullInt64{}, "testuser", "testapp", "127.0.0.1", 5432, - "client backend", sql.NullInt32{}, sql.NullInt32{}, + "client backend", now, sql.NullInt32{}, sql.NullInt32{}, now, "active", now, sql.NullString{}, sql.NullString{}, nil, now, sql.NullInt64{Int64: 125, Valid: true}, "", @@ -153,18 +155,18 @@ func TestQuerySamples_FetchQuerySamples(t *testing.T) { { name: "null database name - no loki entries expected", setupMock: func(mock sqlmock.Sqlmock) { - mock.ExpectQuery(selectPgStatActivity).WithArgs(sqlmock.AnyArg()).RowsWillBeClosed(). + mock.ExpectQuery(selectPgStatActivity).RowsWillBeClosed(). WillReturnRows(sqlmock.NewRows([]string{ "now", "datname", "pid", "leader_pid", "usename", "application_name", "client_addr", "client_port", - "backend_type", "backend_xid", "backend_xmin", + "backend_type", "backend_start", "backend_xid", "backend_xmin", "xact_start", "state", "state_change", "wait_event_type", "wait_event", "blocked_by_pids", "query_start", "query_id", "query", }).AddRow( now, sql.NullString{Valid: false}, 104, sql.NullInt64{}, "testuser", "testapp", "127.0.0.1", 5432, - "client backend", sql.NullInt32{}, sql.NullInt32{}, + "client backend", now, sql.NullInt32{}, sql.NullInt32{}, now, "active", now, sql.NullString{}, sql.NullString{}, nil, now, sql.NullInt64{Int64: 126, Valid: true}, "SELECT * FROM users", @@ -177,18 +179,18 @@ func TestQuerySamples_FetchQuerySamples(t *testing.T) { { name: "query with redaction disabled", setupMock: func(mock sqlmock.Sqlmock) { - mock.ExpectQuery(selectPgStatActivity).WithArgs(sqlmock.AnyArg()).RowsWillBeClosed(). + mock.ExpectQuery(selectPgStatActivity).RowsWillBeClosed(). WillReturnRows(sqlmock.NewRows([]string{ "now", "datname", "pid", "leader_pid", "usename", "application_name", "client_addr", "client_port", - "backend_type", "backend_xid", "backend_xmin", + "backend_type", "backend_start", "backend_xid", "backend_xmin", "xact_start", "state", "state_change", "wait_event_type", "wait_event", "blocked_by_pids", "query_start", "query_id", "query", }).AddRow( now, "testdb", 106, sql.NullInt64{}, "testuser", "testapp", "127.0.0.1", 5432, - "client backend", sql.NullInt32{}, sql.NullInt32{}, + "client backend", backendStartTime, sql.NullInt32{}, sql.NullInt32{}, xactStartTime, "active", stateChangeTime, sql.NullString{}, sql.NullString{}, nil, queryStartTime, sql.NullInt64{Int64: 128, Valid: true}, "SELECT * FROM users WHERE id = 123 AND email = 'test@example.com'", @@ -199,7 +201,7 @@ func TestQuerySamples_FetchQuerySamples(t *testing.T) { {"op": OP_QUERY_SAMPLE}, }, expectedLines: []string{ - `level="info" datname="testdb" pid="106" leader_pid="" user="testuser" app="testapp" client="127.0.0.1:5432" backend_type="client backend" xid="0" xmin="0" xact_time="2m0s" state="active" query_time="30s" queryid="128" query="SELECT * FROM users WHERE id = 123 AND email = 'test@example.com'" engine="postgres" cpu_time="10s"`, + `level="info" datname="testdb" pid="106" leader_pid="" user="testuser" app="testapp" client="127.0.0.1:5432" backend_type="client backend" backend_time="1h0m0s" xid="0" xmin="0" xact_time="2m0s" state="active" query_time="30s" queryid="128" query="SELECT * FROM users WHERE id = 123 AND email = 'test@example.com'" engine="postgres" cpu_time="10s"`, }, }, } From 06354751d4a02b2d565429e27c0c8a7ad767a9e5 Mon Sep 17 00:00:00 2001 From: Gabriel Date: Mon, 29 Sep 2025 18:51:52 -0300 Subject: [PATCH 06/10] update changelog --- CHANGELOG.md | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 64e357add4..0fcf8f2fa8 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -93,6 +93,7 @@ v1.11.0-rc.3 - add `query_sample` collector for postgres (@gaantunes) - add `schema_details` collector for postgres (@fridgepoet) - include `server_id` label on logs and metrics (@matthewnolf) + - add `user` field to wait events within `query_samples` collector. - Add `otelcol.receiver.googlecloudpubsub` community component to receive metrics, traces, and logs from Google Cloud Pub/Sub subscription. (@eraac) @@ -212,8 +213,6 @@ v1.11.0-rc.3 - Fix panic in `loki.write` when component is shutting down and `external_labels` are configured. (@kalleep) -- (_Experimental_) Remove `backend_time` field from query samples, and add `user` field to wait events within `database_observability.postgres` `query_samples` collector. - v1.10.2 ----------------- From f921f38b88727c552b76a7d889ab4fd44f812d43 Mon Sep 17 00:00:00 2001 From: Gabriel Date: Mon, 29 Sep 2025 18:53:40 -0300 Subject: [PATCH 07/10] update changelog --- CHANGELOG.md | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 0fcf8f2fa8..1f3d19484f 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -17,6 +17,7 @@ Main (unreleased) - (_Experimental_) Additions to experimental `database_observability.postgres` component: - `explain_plans` added the explain plan collector (@rgeyer) + - add `user` field to wait events within `query_samples` collector. {@gaantunes} - Add `otelcol.exporter.googlecloudpubsub` community component to export metrics, traces, and logs to Google Cloud Pub/Sub topic. (@eraac) @@ -92,8 +93,7 @@ v1.11.0-rc.3 - add `cloud_provider.aws` configuration that enables optionally supplying the ARN of the database under observation. The ARN is appended to metric samples as labels for easier filtering and grouping of resources. - add `query_sample` collector for postgres (@gaantunes) - add `schema_details` collector for postgres (@fridgepoet) - - include `server_id` label on logs and metrics (@matthewnolf) - - add `user` field to wait events within `query_samples` collector. + - include `server_id` label on logs and metrics (@matthewnolf) - Add `otelcol.receiver.googlecloudpubsub` community component to receive metrics, traces, and logs from Google Cloud Pub/Sub subscription. (@eraac) From 9c4411fd982776c005eee67458e298404bc7a424 Mon Sep 17 00:00:00 2001 From: Gabriel Date: Mon, 29 Sep 2025 18:54:17 -0300 Subject: [PATCH 08/10] update changelog --- CHANGELOG.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 1f3d19484f..bc4a8742a9 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -93,7 +93,7 @@ v1.11.0-rc.3 - add `cloud_provider.aws` configuration that enables optionally supplying the ARN of the database under observation. The ARN is appended to metric samples as labels for easier filtering and grouping of resources. - add `query_sample` collector for postgres (@gaantunes) - add `schema_details` collector for postgres (@fridgepoet) - - include `server_id` label on logs and metrics (@matthewnolf) + - include `server_id` label on logs and metrics (@matthewnolf) - Add `otelcol.receiver.googlecloudpubsub` community component to receive metrics, traces, and logs from Google Cloud Pub/Sub subscription. (@eraac) From 4d538e3fc8c6c3fcf19acf2f0f29710d66b4dda0 Mon Sep 17 00:00:00 2001 From: Gabriel Date: Tue, 30 Sep 2025 08:59:02 -0300 Subject: [PATCH 09/10] nit --- CHANGELOG.md | 2 +- .../database_observability/postgres/collector/query_samples.go | 3 ++- 2 files changed, 3 insertions(+), 2 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index bc4a8742a9..e075da3434 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -17,7 +17,7 @@ Main (unreleased) - (_Experimental_) Additions to experimental `database_observability.postgres` component: - `explain_plans` added the explain plan collector (@rgeyer) - - add `user` field to wait events within `query_samples` collector. {@gaantunes} + - add `user` field to wait events within `query_samples` collector (@gaantunes) - Add `otelcol.exporter.googlecloudpubsub` community component to export metrics, traces, and logs to Google Cloud Pub/Sub topic. (@eraac) diff --git a/internal/component/database_observability/postgres/collector/query_samples.go b/internal/component/database_observability/postgres/collector/query_samples.go index aa16c63798..70eef74925 100644 --- a/internal/component/database_observability/postgres/collector/query_samples.go +++ b/internal/component/database_observability/postgres/collector/query_samples.go @@ -48,13 +48,14 @@ const selectPgStatActivity = ` FROM pg_stat_activity s JOIN pg_database d ON s.datid = d.oid AND NOT d.datistemplate AND d.datallowconn WHERE - backend_type != 'client backend' OR + s.backend_type != 'client backend' OR ( s.pid != pg_backend_pid() AND coalesce(TRIM(s.query), '') != '' AND s.query_id != 0 AND s.state != 'idle' ) + ` type QuerySamplesInfo struct { From 7ea54928cfd6a9ab09938f68badc570796eb2011 Mon Sep 17 00:00:00 2001 From: Gabriel Date: Tue, 30 Sep 2025 08:59:36 -0300 Subject: [PATCH 10/10] nit --- .../database_observability/postgres/collector/query_samples.go | 1 - 1 file changed, 1 deletion(-) diff --git a/internal/component/database_observability/postgres/collector/query_samples.go b/internal/component/database_observability/postgres/collector/query_samples.go index 70eef74925..3ecc7ae221 100644 --- a/internal/component/database_observability/postgres/collector/query_samples.go +++ b/internal/component/database_observability/postgres/collector/query_samples.go @@ -55,7 +55,6 @@ const selectPgStatActivity = ` s.query_id != 0 AND s.state != 'idle' ) - ` type QuerySamplesInfo struct {