Skip to content

Commit 4be12b3

Browse files
authored
fix(indexworker): detect which schema pg_trgm exists in (#2260)
The `pg_trgm` extension might already exist under a different schema, in which case, the create index statements will need to reference the schema when using the extension if the extension is not part of the `search_path`. This PR: - removes the migration to create the `pg_trgm` extension - looks for which schema the extension might live in - adds a check in the indexworker to ensure the extension exists before proceeding - this is mostly a nice-to-have as any failed indexes would be transparently re-created next time the worker runs
1 parent 61632f8 commit 4be12b3

File tree

3 files changed

+75
-14
lines changed

3 files changed

+75
-14
lines changed

internal/indexworker/indexworker.go

Lines changed: 30 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@ import (
1616

1717
// ErrAdvisoryLockAlreadyAcquired is returned when another process already holds the advisory lock
1818
var ErrAdvisoryLockAlreadyAcquired = errors.New("advisory lock already acquired by another process")
19+
var ErrExtensionNotFound = errors.New("extension not found")
1920

2021
// CreateIndexes ensures that the necessary indexes on the users table exist.
2122
// If the indexes already exist and are valid, it skips creation.
@@ -92,7 +93,14 @@ func CreateIndexes(ctx context.Context, config *conf.GlobalConfiguration, le *lo
9293
}
9394
}()
9495

95-
indexes := getUsersIndexes(config.DB.Namespace)
96+
// Look up which schema the pg_trgm extension is installed in
97+
trgmSchema, err := getTrgmExtensionSchema(db)
98+
if err != nil {
99+
le.Errorf("Failed to find pg_trgm extension schema: %+v", err)
100+
return ErrExtensionNotFound
101+
}
102+
103+
indexes := getUsersIndexes(config.DB.Namespace, trgmSchema)
96104
indexNames := make([]string, len(indexes))
97105
for i, idx := range indexes {
98106
indexNames[i] = idx.name
@@ -162,8 +170,25 @@ func CreateIndexes(ctx context.Context, config *conf.GlobalConfiguration, le *lo
162170
return nil
163171
}
164172

173+
// getTrgmExtensionSchema looks up which schema the pg_trgm extension is installed in
174+
func getTrgmExtensionSchema(db *pop.Connection) (string, error) {
175+
var schema string
176+
query := `
177+
SELECT extnamespace::regnamespace::text AS schema_name
178+
FROM pg_extension
179+
WHERE extname = 'pg_trgm'
180+
LIMIT 1
181+
`
182+
183+
if err := db.RawQuery(query).First(&schema); err != nil {
184+
return "", fmt.Errorf("failed to find pg_trgm extension schema: %w", err)
185+
}
186+
187+
return schema, nil
188+
}
189+
165190
// getUsersIndexes returns the list of indexes to create on the users table
166-
func getUsersIndexes(namespace string) []struct {
191+
func getUsersIndexes(namespace, trgmSchema string) []struct {
167192
name string
168193
query string
169194
} {
@@ -182,7 +207,7 @@ func getUsersIndexes(namespace string) []struct {
182207
{
183208
name: "idx_users_email_trgm",
184209
query: fmt.Sprintf(`CREATE INDEX CONCURRENTLY IF NOT EXISTS idx_users_email_trgm
185-
ON %q.users USING gin (email gin_trgm_ops);`, namespace),
210+
ON %q.users USING gin (email %s.gin_trgm_ops);`, namespace, trgmSchema),
186211
},
187212
// enables exact-match and prefix searches and sorting by phone number
188213
{
@@ -205,8 +230,8 @@ func getUsersIndexes(namespace string) []struct {
205230
{
206231
name: "idx_users_name_trgm",
207232
query: fmt.Sprintf(`CREATE INDEX CONCURRENTLY IF NOT EXISTS idx_users_name_trgm
208-
ON %q.users USING gin ((raw_user_meta_data->>'name') gin_trgm_ops)
209-
WHERE raw_user_meta_data->>'name' IS NOT NULL;`, namespace),
233+
ON %q.users USING gin ((raw_user_meta_data->>'name') %s.gin_trgm_ops)
234+
WHERE raw_user_meta_data->>'name' IS NOT NULL;`, namespace, trgmSchema),
210235
},
211236
}
212237
}

internal/indexworker/indexworker_test.go

Lines changed: 45 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -77,7 +77,7 @@ func (ts *IndexWorkerTestSuite) SetupTest() {
7777
}
7878

7979
func (ts *IndexWorkerTestSuite) cleanupIndexes() {
80-
indexes := getUsersIndexes(ts.namespace)
80+
indexes := getUsersIndexes(ts.namespace, ts.namespace)
8181
for _, idx := range indexes {
8282
// Drop any existing indexes (valid or invalid)
8383
dropQuery := fmt.Sprintf("DROP INDEX IF EXISTS %q.%s", ts.namespace, idx.name)
@@ -91,7 +91,7 @@ func (ts *IndexWorkerTestSuite) TestCreateIndexesHappyPath() {
9191
err := CreateIndexes(ctx, ts.config, ts.logger)
9292
require.NoError(ts.T(), err)
9393

94-
indexes := getUsersIndexes(ts.namespace)
94+
indexes := getUsersIndexes(ts.namespace, ts.namespace)
9595
existingIndexes, err := getIndexStatuses(ts.popDB, ts.namespace, getIndexNames(indexes))
9696
require.NoError(ts.T(), err)
9797

@@ -135,7 +135,7 @@ func (ts *IndexWorkerTestSuite) TestIdempotency() {
135135
require.NoError(ts.T(), err)
136136

137137
// Get the state after first run
138-
indexes := getUsersIndexes(ts.namespace)
138+
indexes := getUsersIndexes(ts.namespace, ts.namespace)
139139
firstRunIndexes, err := getIndexStatuses(ts.popDB, ts.namespace, getIndexNames(indexes))
140140
require.NoError(ts.T(), err)
141141
require.Equal(ts.T(), len(indexes), len(firstRunIndexes))
@@ -191,7 +191,7 @@ func (ts *IndexWorkerTestSuite) TestOutOfBandIndexRemoval() {
191191
require.NoError(ts.T(), err)
192192

193193
// Verify all indexes exist
194-
indexes := getUsersIndexes(ts.namespace)
194+
indexes := getUsersIndexes(ts.namespace, ts.namespace)
195195
existingIndexes, err := getIndexStatuses(ts.popDB, ts.namespace, getIndexNames(indexes))
196196
require.NoError(ts.T(), err)
197197
assert.Equal(ts.T(), len(indexes), len(existingIndexes))
@@ -277,7 +277,7 @@ func (ts *IndexWorkerTestSuite) TestConcurrentWorkers() {
277277
assert.Equal(ts.T(), numWorkers-1, lockSkipCount, "Other workers should skip due to lock")
278278

279279
// Verify all indexes were created successfully
280-
indexes := getUsersIndexes(ts.namespace)
280+
indexes := getUsersIndexes(ts.namespace, ts.namespace)
281281
existingIndexes, err := getIndexStatuses(ts.popDB, ts.namespace, getIndexNames(indexes))
282282
require.NoError(ts.T(), err)
283283
assert.Equal(ts.T(), len(indexes), len(existingIndexes), "All indexes should be created")
@@ -306,7 +306,7 @@ func (ts *IndexWorkerTestSuite) TestCreateIndexesWithInvalidIndexes() {
306306
require.NoError(ts.T(), err, "Initial CreateIndexes should succeed")
307307

308308
// Verify all indexes were created and are valid
309-
indexes := getUsersIndexes(ts.namespace)
309+
indexes := getUsersIndexes(ts.namespace, ts.namespace)
310310
initialIndexes, err := getIndexStatuses(ts.popDB, ts.namespace, getIndexNames(indexes))
311311
require.NoError(ts.T(), err)
312312
assert.Equal(ts.T(), len(indexes), len(initialIndexes), "All indexes should be created initially")
@@ -337,7 +337,7 @@ func (ts *IndexWorkerTestSuite) TestCreateIndexesWithInvalidIndexes() {
337337
defer manipulatorDB.Close()
338338

339339
// Select the first 2 indexes to mark as invalid
340-
allIndexes := getUsersIndexes(ts.namespace)
340+
allIndexes := getUsersIndexes(ts.namespace, ts.namespace)
341341
indexesToInvalidate := []string{allIndexes[0].name, allIndexes[1].name}
342342

343343
for _, indexName := range indexesToInvalidate {
@@ -393,6 +393,44 @@ func (ts *IndexWorkerTestSuite) TestCreateIndexesWithInvalidIndexes() {
393393
ts.logger.Infof("Successfully recovered from %d invalid indexes", len(indexesToInvalidate))
394394
}
395395

396+
// TestCreateIndexesWithoutTrgmExtension tests that CreateIndexes fails when pg_trgm extension doesn't exist
397+
// and that no indexes are created when this prerequisite check fails.
398+
func (ts *IndexWorkerTestSuite) TestCreateIndexesWithoutTrgmExtension() {
399+
ctx := context.Background()
400+
401+
// Drop the pg_trgm extension to simulate it not being available
402+
dropExtQuery := "DROP EXTENSION IF EXISTS pg_trgm CASCADE"
403+
err := ts.db.RawQuery(dropExtQuery).Exec()
404+
require.NoError(ts.T(), err, "Should be able to drop pg_trgm extension")
405+
406+
// Verify the extension is dropped
407+
var extensionExists bool
408+
checkExtQuery := "SELECT EXISTS(SELECT 1 FROM pg_extension WHERE extname = 'pg_trgm')"
409+
err = ts.db.RawQuery(checkExtQuery).First(&extensionExists)
410+
require.NoError(ts.T(), err)
411+
assert.False(ts.T(), extensionExists, "pg_trgm extension should not exist")
412+
413+
// Verify no indexes exist initially
414+
indexes := getUsersIndexes(ts.namespace, ts.namespace)
415+
existingIndexes, err := getIndexStatuses(ts.popDB, ts.namespace, getIndexNames(indexes))
416+
require.NoError(ts.T(), err)
417+
assert.Empty(ts.T(), existingIndexes, "No indexes should exist initially")
418+
419+
// Try to create indexes without pg_trgm extension
420+
err = CreateIndexes(ctx, ts.config, ts.logger)
421+
assert.Error(ts.T(), err, "CreateIndexes should fail when pg_trgm extension doesn't exist")
422+
assert.ErrorIs(ts.T(), err, ErrExtensionNotFound)
423+
424+
existingIndexes, err = getIndexStatuses(ts.popDB, ts.namespace, getIndexNames(indexes))
425+
require.NoError(ts.T(), err)
426+
assert.Empty(ts.T(), existingIndexes, "No indexes should have been created when pg_trgm is missing")
427+
428+
// Restore pg_trgm extension for other tests
429+
createExtQuery := "CREATE EXTENSION IF NOT EXISTS pg_trgm"
430+
err = ts.db.RawQuery(createExtQuery).Exec()
431+
require.NoError(ts.T(), err, "Should be able to restore pg_trgm extension")
432+
}
433+
396434
// Run the test suite
397435
func TestIndexWorker(t *testing.T) {
398436
suite.Run(t, new(IndexWorkerTestSuite))

migrations/20251103165243_add_pg_trgm_extension.postgres.up.sql

Lines changed: 0 additions & 2 deletions
This file was deleted.

0 commit comments

Comments
 (0)