Skip to content
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -210,6 +210,8 @@ v1.11.0-rc.3

- Fix panic in `loki.write` when component is shutting down and `external_labels` are configured. (@kalleep)

- (_Experimental_) Remove `backend_time` field from query samples, and add `user` field to wait events within `database_observability.postgres` `query_samples` collector.

v1.10.2
-----------------

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -397,11 +397,6 @@ func (c *ExplainPlan) fetchExplainPlans(ctx context.Context) error {
}

redactedByteExplainPlanJSON := redact(string(byteExplainPlanJSON))
if err != nil {
level.Error(logger).Log("msg", "failed to redact explain plan json", "err", err)
nonRecoverableFailureOccurred = true
continue
}

level.Debug(logger).Log("msg", "db native explain plan", "db_native_explain_plan", base64.StdEncoding.EncodeToString([]byte(redactedByteExplainPlanJSON)))

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,7 @@ const selectPgStatActivity = `
s.application_name,
s.client_addr,
s.client_port,
s.backend_type,
s.backend_start,
s.backend_type,
s.backend_xid,
s.backend_xmin,
s.xact_start,
Expand All @@ -54,8 +53,7 @@ const selectPgStatActivity = `
(
coalesce(TRIM(s.query), '') != '' AND s.query_start IS NOT NULL AND
(
s.state != 'idle' OR
(s.state = 'idle' AND s.state_change > $1)
s.state != 'idle' OR
) AND
coalesce(TRIM(s.state), '') != ''
)
Expand Down Expand Up @@ -176,7 +174,6 @@ func calculateDuration(nullableTime sql.NullTime, currentTime time.Time) string
}

func (c *QuerySamples) fetchQuerySample(ctx context.Context) error {
scrapeTime := time.Now()
rows, err := c.dbConnection.QueryContext(ctx, selectPgStatActivity, c.lastScrape)
if err != nil {
return fmt.Errorf("failed to query pg_stat_activity: %w", err)
Expand All @@ -195,7 +192,6 @@ func (c *QuerySamples) fetchQuerySample(ctx context.Context) error {
&sample.ClientAddr,
&sample.ClientPort,
&sample.BackendType,
&sample.BackendStart,
&sample.BackendXID,
&sample.BackendXmin,
&sample.XactStart,
Expand Down Expand Up @@ -227,7 +223,6 @@ func (c *QuerySamples) fetchQuerySample(ctx context.Context) error {
stateDuration := calculateDuration(sample.StateChange, sample.Now)
queryDuration := calculateDuration(sample.QueryStart, sample.Now)
xactDuration := calculateDuration(sample.XactStart, sample.Now)
backendDuration := calculateDuration(sample.BackendStart, sample.Now)

clientAddr := ""
if sample.ClientAddr.Valid {
Expand All @@ -252,15 +247,14 @@ func (c *QuerySamples) fetchQuerySample(ctx context.Context) error {

// Build query sample entry
sampleLabels := fmt.Sprintf(
`datname="%s" pid="%d" leader_pid="%s" user="%s" app="%s" client="%s" backend_type="%s" backend_time="%s" xid="%d" xmin="%d" xact_time="%s" state="%s" query_time="%s" queryid="%d" query="%s" engine="postgres"`,
`datname="%s" pid="%d" leader_pid="%s" user="%s" app="%s" client="%s" backend_type="%s" xid="%d" xmin="%d" xact_time="%s" state="%s" query_time="%s" queryid="%d" query="%s" engine="postgres"`,
sample.DatabaseName.String,
sample.PID,
leaderPID,
sample.Username.String,
sample.ApplicationName.String,
clientAddr,
sample.BackendType.String,
backendDuration,
sample.BackendXID.Int32,
sample.BackendXmin.Int32,
xactDuration,
Expand All @@ -285,8 +279,9 @@ func (c *QuerySamples) fetchQuerySample(ctx context.Context) error {

if waitEvent != "" {
waitEventLabels := fmt.Sprintf(
`datname="%s" backend_type="%s" state="%s" wait_time="%s" wait_event_type="%s" wait_event="%s" wait_event_name="%s" blocked_by_pids="%v" queryid="%d" query="%s" engine="postgres"`,
`datname="%s" user="%s" backend_type="%s" state="%s" wait_time="%s" wait_event_type="%s" wait_event="%s" wait_event_name="%s" blocked_by_pids="%v" queryid="%d" query="%s" engine="postgres"`,
sample.DatabaseName.String,
sample.Username.String,
sample.BackendType.String,
sample.State.String,
stateDuration,
Expand All @@ -312,9 +307,6 @@ func (c *QuerySamples) fetchQuerySample(ctx context.Context) error {
return err
}

// Update last scrape time after successful scrape
c.lastScrape = scrapeTime

return nil
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@ func TestQuerySamples_FetchQuerySamples(t *testing.T) {
stateChangeTime := now.Add(-10 * time.Second) // 10 seconds ago
queryStartTime := now.Add(-30 * time.Second) // 30 seconds ago
xactStartTime := now.Add(-2 * time.Minute) // 2 minutes ago
backendStartTime := now.Add(-1 * time.Hour) // 1 hour ago

testCases := []struct {
name string
Expand All @@ -44,14 +43,14 @@ func TestQuerySamples_FetchQuerySamples(t *testing.T) {
WillReturnRows(sqlmock.NewRows([]string{
"now", "datname", "pid", "leader_pid",
"usename", "application_name", "client_addr", "client_port",
"backend_type", "backend_start", "backend_xid", "backend_xmin",
"backend_type", "backend_xid", "backend_xmin",
"xact_start", "state", "state_change", "wait_event_type",
"wait_event", "blocked_by_pids", "query_start", "query_id",
"query",
}).AddRow(
now, "testdb", 100, sql.NullInt64{},
"testuser", "testapp", "127.0.0.1", 5432,
"client backend", backendStartTime, sql.NullInt32{Int32: 500, Valid: true}, sql.NullInt32{Int32: 400, Valid: true},
"client backend", sql.NullInt32{Int32: 500, Valid: true}, sql.NullInt32{Int32: 400, Valid: true},
xactStartTime, "active", stateChangeTime, sql.NullString{},
sql.NullString{}, nil, queryStartTime, sql.NullInt64{Int64: 123, Valid: true},
"SELECT * FROM users",
Expand All @@ -62,7 +61,7 @@ func TestQuerySamples_FetchQuerySamples(t *testing.T) {
{"op": OP_QUERY_SAMPLE},
},
expectedLines: []string{
`level="info" datname="testdb" pid="100" leader_pid="" user="testuser" app="testapp" client="127.0.0.1:5432" backend_type="client backend" backend_time="1h0m0s" xid="500" xmin="400" xact_time="2m0s" state="active" query_time="30s" queryid="123" query="SELECT * FROM users" engine="postgres" cpu_time="10s"`,
`level="info" datname="testdb" pid="100" leader_pid="" user="testuser" app="testapp" client="127.0.0.1:5432" backend_type="client backend" xid="500" xmin="400" xact_time="2m0s" state="active" query_time="30s" queryid="123" query="SELECT * FROM users" engine="postgres" cpu_time="10s"`,
},
},
{
Expand All @@ -72,14 +71,14 @@ func TestQuerySamples_FetchQuerySamples(t *testing.T) {
WillReturnRows(sqlmock.NewRows([]string{
"now", "datname", "pid", "leader_pid",
"usename", "application_name", "client_addr", "client_port",
"backend_type", "backend_start", "backend_xid", "backend_xmin",
"backend_type", "backend_xid", "backend_xmin",
"xact_start", "state", "state_change", "wait_event_type",
"wait_event", "blocked_by_pids", "query_start", "query_id",
"query",
}).AddRow(
now, "testdb", 101, sql.NullInt64{Int64: 100, Valid: true},
"testuser", "testapp", "127.0.0.1", 5432,
"parallel worker", now, sql.NullInt32{}, sql.NullInt32{},
"parallel worker", sql.NullInt32{}, sql.NullInt32{},
now, "active", now, sql.NullString{},
sql.NullString{}, nil, now, sql.NullInt64{Int64: 123, Valid: true},
"SELECT * FROM large_table",
Expand All @@ -90,8 +89,7 @@ func TestQuerySamples_FetchQuerySamples(t *testing.T) {
{"op": OP_QUERY_SAMPLE},
},
expectedLines: []string{
fmt.Sprintf(`level="info" datname="testdb" pid="101" leader_pid="100" user="testuser" app="testapp" client="127.0.0.1:5432" backend_type="parallel worker" backend_time="%s" xid="0" xmin="0" xact_time="%s" state="active" query_time="%s" queryid="123" query="SELECT * FROM large_table" engine="postgres" cpu_time="%s"`,
time.Duration(0).String(),
fmt.Sprintf(`level="info" datname="testdb" pid="101" leader_pid="100" user="testuser" app="testapp" client="127.0.0.1:5432" backend_type="parallel worker" xid="0" xmin="0" xact_time="%s" state="active" query_time="%s" queryid="123" query="SELECT * FROM large_table" engine="postgres" cpu_time="%s"`,
time.Duration(0).String(),
time.Duration(0).String(),
time.Duration(0).String(),
Expand All @@ -105,14 +103,14 @@ func TestQuerySamples_FetchQuerySamples(t *testing.T) {
WillReturnRows(sqlmock.NewRows([]string{
"now", "datname", "pid", "leader_pid",
"usename", "application_name", "client_addr", "client_port",
"backend_type", "backend_start", "backend_xid", "backend_xmin",
"backend_type", "backend_xid", "backend_xmin",
"xact_start", "state", "state_change", "wait_event_type",
"wait_event", "blocked_by_pids", "query_start", "query_id",
"query",
}).AddRow(
now, "testdb", 102, sql.NullInt64{},
"testuser", "testapp", "127.0.0.1", 5432,
"client backend", backendStartTime, sql.NullInt32{}, sql.NullInt32{},
"client backend", sql.NullInt32{}, sql.NullInt32{},
xactStartTime, "waiting", stateChangeTime, sql.NullString{String: "Lock", Valid: true},
sql.NullString{String: "relation", Valid: true}, pq.Int64Array{103, 104}, now, sql.NullInt64{Int64: 124, Valid: true},
"UPDATE users SET status = 'active'",
Expand All @@ -124,8 +122,8 @@ func TestQuerySamples_FetchQuerySamples(t *testing.T) {
{"op": OP_WAIT_EVENT},
},
expectedLines: []string{
`level="info" datname="testdb" pid="102" leader_pid="" user="testuser" app="testapp" client="127.0.0.1:5432" backend_type="client backend" backend_time="1h0m0s" xid="0" xmin="0" xact_time="2m0s" state="waiting" query_time="0s" queryid="124" query="UPDATE users SET status = ?" engine="postgres"`,
`level="info" datname="testdb" backend_type="client backend" state="waiting" wait_time="10s" wait_event_type="Lock" wait_event="relation" wait_event_name="Lock:relation" blocked_by_pids="[103 104]" queryid="124" query="UPDATE users SET status = ?" engine="postgres"`,
`level="info" datname="testdb" pid="102" leader_pid="" user="testuser" app="testapp" client="127.0.0.1:5432" backend_type="client backend" xid="0" xmin="0" xact_time="2m0s" state="waiting" query_time="0s" queryid="124" query="UPDATE users SET status = ?" engine="postgres"`,
`level="info" datname="testdb" user="testuser" backend_type="client backend" state="waiting" wait_time="10s" wait_event_type="Lock" wait_event="relation" wait_event_name="Lock:relation" blocked_by_pids="[103 104]" queryid="124" query="UPDATE users SET status = ?" engine="postgres"`,
},
},
{
Expand All @@ -135,14 +133,14 @@ func TestQuerySamples_FetchQuerySamples(t *testing.T) {
WillReturnRows(sqlmock.NewRows([]string{
"now", "datname", "pid", "leader_pid",
"usename", "application_name", "client_addr", "client_port",
"backend_type", "backend_start", "backend_xid", "backend_xmin",
"backend_type", "backend_xid", "backend_xmin",
"xact_start", "state", "state_change", "wait_event_type",
"wait_event", "blocked_by_pids", "query_start", "query_id",
"query",
}).AddRow(
now, "testdb", 103, sql.NullInt64{},
"testuser", "testapp", "127.0.0.1", 5432,
"client backend", now, sql.NullInt32{}, sql.NullInt32{},
"client backend", sql.NullInt32{}, sql.NullInt32{},
now, "active", now, sql.NullString{},
sql.NullString{}, nil, now, sql.NullInt64{Int64: 125, Valid: true},
"<insufficient privilege>",
Expand All @@ -159,14 +157,14 @@ func TestQuerySamples_FetchQuerySamples(t *testing.T) {
WillReturnRows(sqlmock.NewRows([]string{
"now", "datname", "pid", "leader_pid",
"usename", "application_name", "client_addr", "client_port",
"backend_type", "backend_start", "backend_xid", "backend_xmin",
"backend_type", "backend_xid", "backend_xmin",
"xact_start", "state", "state_change", "wait_event_type",
"wait_event", "blocked_by_pids", "query_start", "query_id",
"query",
}).AddRow(
now, sql.NullString{Valid: false}, 104, sql.NullInt64{},
"testuser", "testapp", "127.0.0.1", 5432,
"client backend", now, sql.NullInt32{}, sql.NullInt32{},
"client backend", sql.NullInt32{}, sql.NullInt32{},
now, "active", now, sql.NullString{},
sql.NullString{}, nil, now, sql.NullInt64{Int64: 126, Valid: true},
"SELECT * FROM users",
Expand All @@ -183,14 +181,14 @@ func TestQuerySamples_FetchQuerySamples(t *testing.T) {
WillReturnRows(sqlmock.NewRows([]string{
"now", "datname", "pid", "leader_pid",
"usename", "application_name", "client_addr", "client_port",
"backend_type", "backend_start", "backend_xid", "backend_xmin",
"backend_type", "backend_xid", "backend_xmin",
"xact_start", "state", "state_change", "wait_event_type",
"wait_event", "blocked_by_pids", "query_start", "query_id",
"query",
}).AddRow(
now, "testdb", 106, sql.NullInt64{},
"testuser", "testapp", "127.0.0.1", 5432,
"client backend", backendStartTime, sql.NullInt32{}, sql.NullInt32{},
"client backend", sql.NullInt32{}, sql.NullInt32{},
xactStartTime, "active", stateChangeTime, sql.NullString{},
sql.NullString{}, nil, queryStartTime, sql.NullInt64{Int64: 128, Valid: true},
"SELECT * FROM users WHERE id = 123 AND email = '[email protected]'",
Expand All @@ -201,7 +199,7 @@ func TestQuerySamples_FetchQuerySamples(t *testing.T) {
{"op": OP_QUERY_SAMPLE},
},
expectedLines: []string{
`level="info" datname="testdb" pid="106" leader_pid="" user="testuser" app="testapp" client="127.0.0.1:5432" backend_type="client backend" backend_time="1h0m0s" xid="0" xmin="0" xact_time="2m0s" state="active" query_time="30s" queryid="128" query="SELECT * FROM users WHERE id = 123 AND email = '[email protected]'" engine="postgres" cpu_time="10s"`,
`level="info" datname="testdb" pid="106" leader_pid="" user="testuser" app="testapp" client="127.0.0.1:5432" backend_type="client backend" xid="0" xmin="0" xact_time="2m0s" state="active" query_time="30s" queryid="128" query="SELECT * FROM users WHERE id = 123 AND email = '[email protected]'" engine="postgres" cpu_time="10s"`,
},
},
}
Expand Down
Loading