Skip to content
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ Main (unreleased)

- (_Experimental_) Additions to experimental `database_observability.postgres` component:
- `explain_plans` added the explain plan collector (@rgeyer)
- add `user` field to wait events within `query_samples` collector (@gaantunes)

- Add `otelcol.exporter.googlecloudpubsub` community component to export metrics, traces, and logs to Google Cloud Pub/Sub topic. (@eraac)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -397,11 +397,6 @@ func (c *ExplainPlan) fetchExplainPlans(ctx context.Context) error {
}

redactedByteExplainPlanJSON := redact(string(byteExplainPlanJSON))
if err != nil {
level.Error(logger).Log("msg", "failed to redact explain plan json", "err", err)
nonRecoverableFailureOccurred = true
continue
}

level.Debug(logger).Log("msg", "db native explain plan", "db_native_explain_plan", base64.StdEncoding.EncodeToString([]byte(redactedByteExplainPlanJSON)))

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,19 +48,13 @@ const selectPgStatActivity = `
FROM pg_stat_activity s
JOIN pg_database d ON s.datid = d.oid AND NOT d.datistemplate AND d.datallowconn
WHERE
s.pid <> pg_backend_pid() AND
s.backend_type != 'client backend' OR
(
s.backend_type != 'client backend' OR
(
coalesce(TRIM(s.query), '') != '' AND s.query_start IS NOT NULL AND
(
s.state != 'idle' OR
(s.state = 'idle' AND s.state_change > $1)
) AND
coalesce(TRIM(s.state), '') != ''
)
s.pid != pg_backend_pid() AND
coalesce(TRIM(s.query), '') != '' AND
s.query_id != 0 AND
s.state != 'idle'
)
AND query_id > 0
`

type QuerySamplesInfo struct {
Expand Down Expand Up @@ -103,11 +97,10 @@ type QuerySamples struct {
entryHandler loki.EntryHandler
disableQueryRedaction bool

logger log.Logger
running *atomic.Bool
ctx context.Context
cancel context.CancelFunc
lastScrape time.Time
logger log.Logger
running *atomic.Bool
ctx context.Context
cancel context.CancelFunc
}

func NewQuerySamples(args QuerySamplesArguments) (*QuerySamples, error) {
Expand Down Expand Up @@ -176,8 +169,7 @@ func calculateDuration(nullableTime sql.NullTime, currentTime time.Time) string
}

func (c *QuerySamples) fetchQuerySample(ctx context.Context) error {
scrapeTime := time.Now()
rows, err := c.dbConnection.QueryContext(ctx, selectPgStatActivity, c.lastScrape)
rows, err := c.dbConnection.QueryContext(ctx, selectPgStatActivity)
if err != nil {
return fmt.Errorf("failed to query pg_stat_activity: %w", err)
}
Expand Down Expand Up @@ -285,8 +277,9 @@ func (c *QuerySamples) fetchQuerySample(ctx context.Context) error {

if waitEvent != "" {
waitEventLabels := fmt.Sprintf(
`datname="%s" backend_type="%s" state="%s" wait_time="%s" wait_event_type="%s" wait_event="%s" wait_event_name="%s" blocked_by_pids="%v" queryid="%d" query="%s" engine="postgres"`,
`datname="%s" user="%s" backend_type="%s" state="%s" wait_time="%s" wait_event_type="%s" wait_event="%s" wait_event_name="%s" blocked_by_pids="%v" queryid="%d" query="%s" engine="postgres"`,
sample.DatabaseName.String,
sample.Username.String,
sample.BackendType.String,
sample.State.String,
stateDuration,
Expand All @@ -312,9 +305,6 @@ func (c *QuerySamples) fetchQuerySample(ctx context.Context) error {
return err
}

// Update last scrape time after successful scrape
c.lastScrape = scrapeTime

return nil
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ func TestQuerySamples_FetchQuerySamples(t *testing.T) {
{
name: "active query without wait event",
setupMock: func(mock sqlmock.Sqlmock) {
mock.ExpectQuery(selectPgStatActivity).WithArgs(sqlmock.AnyArg()).RowsWillBeClosed().
mock.ExpectQuery(selectPgStatActivity).RowsWillBeClosed().
WillReturnRows(sqlmock.NewRows([]string{
"now", "datname", "pid", "leader_pid",
"usename", "application_name", "client_addr", "client_port",
Expand Down Expand Up @@ -68,7 +68,7 @@ func TestQuerySamples_FetchQuerySamples(t *testing.T) {
{
name: "parallel query with leader PID",
setupMock: func(mock sqlmock.Sqlmock) {
mock.ExpectQuery(selectPgStatActivity).WithArgs(sqlmock.AnyArg()).RowsWillBeClosed().
mock.ExpectQuery(selectPgStatActivity).RowsWillBeClosed().
WillReturnRows(sqlmock.NewRows([]string{
"now", "datname", "pid", "leader_pid",
"usename", "application_name", "client_addr", "client_port",
Expand Down Expand Up @@ -101,7 +101,7 @@ func TestQuerySamples_FetchQuerySamples(t *testing.T) {
{
name: "query with wait event",
setupMock: func(mock sqlmock.Sqlmock) {
mock.ExpectQuery(selectPgStatActivity).WithArgs(sqlmock.AnyArg()).RowsWillBeClosed().
mock.ExpectQuery(selectPgStatActivity).RowsWillBeClosed().
WillReturnRows(sqlmock.NewRows([]string{
"now", "datname", "pid", "leader_pid",
"usename", "application_name", "client_addr", "client_port",
Expand All @@ -125,13 +125,13 @@ func TestQuerySamples_FetchQuerySamples(t *testing.T) {
},
expectedLines: []string{
`level="info" datname="testdb" pid="102" leader_pid="" user="testuser" app="testapp" client="127.0.0.1:5432" backend_type="client backend" backend_time="1h0m0s" xid="0" xmin="0" xact_time="2m0s" state="waiting" query_time="0s" queryid="124" query="UPDATE users SET status = ?" engine="postgres"`,
`level="info" datname="testdb" backend_type="client backend" state="waiting" wait_time="10s" wait_event_type="Lock" wait_event="relation" wait_event_name="Lock:relation" blocked_by_pids="[103 104]" queryid="124" query="UPDATE users SET status = ?" engine="postgres"`,
`level="info" datname="testdb" user="testuser" backend_type="client backend" state="waiting" wait_time="10s" wait_event_type="Lock" wait_event="relation" wait_event_name="Lock:relation" blocked_by_pids="[103 104]" queryid="124" query="UPDATE users SET status = ?" engine="postgres"`,
},
},
{
name: "insufficient privilege query - no loki entries expected",
setupMock: func(mock sqlmock.Sqlmock) {
mock.ExpectQuery(selectPgStatActivity).WithArgs(sqlmock.AnyArg()).RowsWillBeClosed().
mock.ExpectQuery(selectPgStatActivity).RowsWillBeClosed().
WillReturnRows(sqlmock.NewRows([]string{
"now", "datname", "pid", "leader_pid",
"usename", "application_name", "client_addr", "client_port",
Expand All @@ -155,7 +155,7 @@ func TestQuerySamples_FetchQuerySamples(t *testing.T) {
{
name: "null database name - no loki entries expected",
setupMock: func(mock sqlmock.Sqlmock) {
mock.ExpectQuery(selectPgStatActivity).WithArgs(sqlmock.AnyArg()).RowsWillBeClosed().
mock.ExpectQuery(selectPgStatActivity).RowsWillBeClosed().
WillReturnRows(sqlmock.NewRows([]string{
"now", "datname", "pid", "leader_pid",
"usename", "application_name", "client_addr", "client_port",
Expand All @@ -179,7 +179,7 @@ func TestQuerySamples_FetchQuerySamples(t *testing.T) {
{
name: "query with redaction disabled",
setupMock: func(mock sqlmock.Sqlmock) {
mock.ExpectQuery(selectPgStatActivity).WithArgs(sqlmock.AnyArg()).RowsWillBeClosed().
mock.ExpectQuery(selectPgStatActivity).RowsWillBeClosed().
WillReturnRows(sqlmock.NewRows([]string{
"now", "datname", "pid", "leader_pid",
"usename", "application_name", "client_addr", "client_port",
Expand Down
Loading