Skip to content

Commit 3e0528e

Browse files
[#26504] YSQL, QueryDiagnostics: Resolve the race condition in creation and killing of yb_query_diagnostics bgworker
Summary: It is possible that we hit a race condition in the creation and killing of yb_query_diagnostics bgworker in the following scenario: a backend adds a new entry the bgworker process the entry and removes it from the hash table the bgworker checks that there are no entries, so it decides to kill itself. The bgworker releases the lock, but doesn't kill itself yet a backend acquires the lock the backend adds another entry the backend determines that the bgworker is currently running and releases the lock. ``` /* Worker was never initialized (invalid slot and generation) */ if (bg_worker_handle->slot == -1 && bg_worker_handle->generation == -1) BgWorkerRegister(); /* Worker was initialized but not currently running */ else if (GetBackgroundWorkerPid(bg_worker_handle, &pid) != BGWH_STARTED) BgWorkerRegister(); } LWLockRelease(bundles_in_progress_lock); ``` the bgworker then kills itself, as it had decided to do. At this point, we are stuck - a new bgworker will only be created if a new entry is added. To resolve this, we introduce a shared variable called bg_worker_should_be_active, which indicates the expected state of the background worker at any given time. When inserting a new bundle, if we expect the background worker to be inactive but find that it is still running, we wait for 5 seconds. If it remains active after this period, we raise an ereport(ERROR, ...). | backend | bgworker | | ---------------------------------------------------- | ---------------------------- | | Acquire exclusive lock | | | Start bgworker as it doesnt exist already | | | add a entry to hash table | | | release lock | | | collects the data | ... | | (interval over) | (interval over) | | | process,dump the data | | | remove entry from hash table | | | acquire exclusive lock | | | mark bgworker to be killed | | | release lock | | Tries to add another entry | | | Sees bgworker marked to be killed but not yet killed | | | So waits for 5 sec for it to be killed. | | | | Bgworker killed | | Acquire exclusive lock | | | Start bgworker as it doesnt exist already | | | add a entry to hash table | | | release lock | | Also note that, Since we take an EXCLUSIVE lock while creating the bgworker. Multiple clients should not be able to start bgworker. Jira: DB-15871 Test Plan: yb_build.sh --java-test 'org.yb.pgsql.TestYbQueryDiagnostics#testBgworkerRaceConditionResolved' yb_build.sh --java-test 'org.yb.pgsql.TestYbQueryDiagnostics#testBgworkerRaceConditionTimeout' Reviewers: asaha, telgersma Reviewed By: telgersma Subscribers: yql Differential Revision: https://phorge.dev.yugabyte.com/D42180
1 parent f58cded commit 3e0528e

File tree

5 files changed

+198
-45
lines changed

5 files changed

+198
-45
lines changed

java/yb-pgsql/src/test/java/org/yb/pgsql/TestYbQueryDiagnostics.java

+82-12
Original file line numberDiff line numberDiff line change
@@ -119,9 +119,7 @@ public QueryDiagnosticsStatus(Path path, String status, String description,
119119
// Use regex to split on commas not inside quotes
120120
private static final String pgssRegex = ",(?=(?:[^\"]*\"[^\"]*\")*[^\"]*$)";
121121

122-
@Before
123-
public void setUp() throws Exception {
124-
/* Set Gflags and restart cluster */
122+
private Map<String, String> queryDiagnosticsFlags() throws Exception {
125123
Map<String, String> flagMap = super.getTServerFlags();
126124
flagMap.put("allowed_preview_flags_csv", "ysql_yb_enable_query_diagnostics");
127125
flagMap.put("ysql_yb_enable_query_diagnostics", "true");
@@ -130,24 +128,34 @@ public void setUp() throws Exception {
130128
flagMap.put("ysql_beta_features", "true");
131129
flagMap.put("ysql_yb_ash_sampling_interval_ms", String.valueOf(ASH_SAMPLING_INTERVAL_MS));
132130

131+
return flagMap;
132+
}
133+
134+
@Before
135+
public void setUp() throws Exception {
136+
Map<String, String> flagMap = queryDiagnosticsFlags();
133137
restartClusterWithFlags(Collections.emptyMap(), flagMap);
134138

135139
setUpPreparedStatement();
136140
}
137141

142+
/* TODO(#26574): Fix triggering restartCluster twice for tests calling setUp manually */
138143
public void setUp(int queryDiagnosticsCircularBufferSize) throws Exception {
139-
/* Set Gflags and restart cluster */
140-
Map<String, String> flagMap = super.getTServerFlags();
141-
flagMap.put("allowed_preview_flags_csv", "ysql_yb_enable_query_diagnostics");
142-
flagMap.put("ysql_yb_enable_query_diagnostics", "true");
144+
Map<String, String> flagMap = queryDiagnosticsFlags();
145+
143146
appendToYsqlPgConf(flagMap,
144-
"yb_query_diagnostics_circular_buffer_size=" +
145-
queryDiagnosticsCircularBufferSize);
147+
"yb_query_diagnostics_circular_buffer_size=" +
148+
queryDiagnosticsCircularBufferSize);
149+
restartClusterWithFlags(Collections.emptyMap(), flagMap);
146150

147-
/* Required for some of the fields within schema details */
148-
flagMap.put("ysql_beta_features", "true");
149-
flagMap.put("ysql_yb_ash_sampling_interval_ms", String.valueOf(ASH_SAMPLING_INTERVAL_MS));
151+
setUpPreparedStatement();
152+
}
153+
154+
/* TODO(#26574): Fix triggering restartCluster twice for tests calling setUp manually */
155+
public void setUpWithBgworkerRaceConditionTestFlag() throws Exception {
156+
Map<String, String> flagMap = queryDiagnosticsFlags();
150157

158+
flagMap.put("TEST_ysql_yb_query_diagnostics_race_condition", "true");
151159
restartClusterWithFlags(Collections.emptyMap(), flagMap);
152160

153161
setUpPreparedStatement();
@@ -2085,4 +2093,66 @@ public void testDatabaseConnectionBackgroundWorker() throws Exception {
20852093
new String(Files.readAllBytes(schemaDetailsPath)));
20862094
}
20872095
}
2096+
2097+
/*
2098+
* Tests the case when bgworker doesn't gets killed for 10sec after marking bgworker inactive.
2099+
* This leads to insert query hitting 5s sleep and timing out.
2100+
*/
2101+
@Test
2102+
public void testBgworkerRaceConditionTimeout() throws Exception {
2103+
setUpWithBgworkerRaceConditionTestFlag();
2104+
2105+
final int diagnosticsInterval = 10;
2106+
final QueryDiagnosticsParams queryDiagnosticsParams = new QueryDiagnosticsParams(
2107+
diagnosticsInterval,
2108+
100 /* explainSampleRate */,
2109+
true /* explainAnalyze */,
2110+
true /* explainDist */,
2111+
false /* explainDebug */,
2112+
0 /* bindVarQueryMinDuration */);
2113+
2114+
try (Statement statement = connection.createStatement()) {
2115+
String queryid = generateUniqueQueryId();
2116+
runQueryDiagnostics(statement, queryid, queryDiagnosticsParams);
2117+
waitForBundleCompletion(queryid, statement, diagnosticsInterval);
2118+
2119+
/* bgworker should be waiting now */
2120+
runQueryDiagnostics(statement, queryid, queryDiagnosticsParams);
2121+
} catch (Exception e) {
2122+
assertTrue("Expected exception not thrown: " + e.getMessage(),
2123+
e.getMessage().contains("timed out after waiting 5 seconds"));
2124+
}
2125+
}
2126+
2127+
/*
2128+
* Tests the case when we hit 5s sleep while inserting the query but does not timeout.
2129+
*/
2130+
@Test
2131+
public void testBgworkerRaceConditionResolved() throws Exception {
2132+
setUpWithBgworkerRaceConditionTestFlag();
2133+
2134+
final int diagnosticsInterval = 10;
2135+
final QueryDiagnosticsParams queryDiagnosticsParams = new QueryDiagnosticsParams(
2136+
diagnosticsInterval,
2137+
100 /* explainSampleRate */,
2138+
true /* explainAnalyze */,
2139+
true /* explainDist */,
2140+
false /* explainDebug */,
2141+
0 /* bindVarQueryMinDuration */);
2142+
2143+
try (Statement statement = connection.createStatement()) {
2144+
String queryid = generateUniqueQueryId();
2145+
runQueryDiagnostics(statement, queryid, queryDiagnosticsParams);
2146+
waitForBundleCompletion(queryid, statement, diagnosticsInterval);
2147+
2148+
/*
2149+
* bgworker should be waiting now.
2150+
* We know that bgworker is waiting for 10sec, and timeout is 5sec so sleeping for 7s
2151+
* should avoid a timeout at the same time ensure that we did hit the race condition.
2152+
*/
2153+
Thread.sleep(7000);
2154+
runQueryDiagnostics(statement, queryid, queryDiagnosticsParams);
2155+
}
2156+
}
2157+
20882158
}

src/postgres/src/backend/utils/misc/yb_query_diagnostics.c

+97-33
Original file line numberDiff line numberDiff line change
@@ -109,7 +109,7 @@ typedef struct
109109
LocationLen locations[YB_QD_MAX_CONSTANTS];
110110
} YbQueryConstantsMetadata;
111111

112-
typedef struct DatabaseConnectionWorkerInfo
112+
typedef struct YbDatabaseConnectionWorkerInfo
113113
{
114114
/*
115115
* Create a deep copy of the data to prevent race conditions. The original
@@ -119,7 +119,7 @@ typedef struct DatabaseConnectionWorkerInfo
119119
*/
120120
YbQueryDiagnosticsEntry entry; /* Copy of the entry we're processing */
121121
bool initialized; /* Flag to check if the worker is initialized */
122-
} DatabaseConnectionWorkerInfo;
122+
} YbDatabaseConnectionWorkerInfo;
123123

124124
/* GUC variables */
125125
bool yb_enable_query_diagnostics;
@@ -143,11 +143,11 @@ static YbQueryConstantsMetadata query_constants = {
143143
.locations = {{0, 0}}
144144
};
145145

146-
typedef struct BackgroundWorkerHandle
146+
typedef struct YbBackgroundWorkerHandle
147147
{
148148
int slot;
149149
uint64 generation;
150-
} BackgroundWorkerHandle;
150+
} YbBackgroundWorkerHandle;
151151

152152
enum QueryOutputFormat
153153
{
@@ -162,14 +162,17 @@ static LWLock *bundles_in_progress_lock; /* protects bundles_in_progress
162162
static YbQueryDiagnosticsBundles *bundles_completed = NULL;
163163
static const char *status_msg[] = {"Success", "In Progress", "Error", "Cancelled"};
164164
static bool current_query_sampled = false;
165-
static BackgroundWorkerHandle *bg_worker_handle = NULL;
166-
static DatabaseConnectionWorkerInfo *database_connection_worker_info = NULL;
165+
static YbBackgroundWorkerHandle *bg_worker_handle = NULL;
166+
static YbDatabaseConnectionWorkerInfo *database_connection_worker_info = NULL;
167+
static bool *bg_worker_should_be_active = NULL;
167168

168169
static void YbQueryDiagnostics_post_parse_analyze(ParseState *pstate, Query *query,
169170
JumbleState *jstate);
170171
static void YbQueryDiagnostics_ExecutorStart(QueryDesc *queryDesc, int eflags);
171172
static void YbQueryDiagnostics_ExecutorEnd(QueryDesc *queryDesc);
172173

174+
static bool IsBgWorkerStopped();
175+
static void StartBgWorkerIfStopped();
173176
static void InsertNewBundleInfo(YbQueryDiagnosticsMetadata *metadata);
174177
static void FetchParams(YbQueryDiagnosticsParams *params, FunctionCallInfo fcinfo);
175178
static void ConstructDiagnosticsPath(YbQueryDiagnosticsMetadata *metadata);
@@ -256,8 +259,9 @@ YbQueryDiagnosticsShmemSize(void)
256259
sizeof(YbQueryDiagnosticsEntry)));
257260
size = add_size(size, YbQueryDiagnosticsBundlesShmemSize());
258261
size = add_size(size, sizeof(TimestampTz));
259-
size = add_size(size, sizeof(BackgroundWorkerHandle)); /* bg_worker_handle */
260-
size = add_size(size, sizeof(DatabaseConnectionWorkerInfo)); /* database_connection_worker_info */
262+
size = add_size(size, sizeof(YbBackgroundWorkerHandle)); /* bg_worker_handle */
263+
size = add_size(size, sizeof(YbDatabaseConnectionWorkerInfo)); /* database_connection_worker_info */
264+
size = add_size(size, sizeof(bool)); /* bg_worker_should_be_active */
261265

262266
return size;
263267
}
@@ -318,14 +322,23 @@ YbQueryDiagnosticsShmemInit(void)
318322
(*yb_pgss_last_reset_time) = 0;
319323

320324
/* Initialize the background worker handle */
321-
bg_worker_handle = (BackgroundWorkerHandle *) ShmemAlloc(sizeof(BackgroundWorkerHandle));
325+
bg_worker_handle = (YbBackgroundWorkerHandle *) ShmemAlloc(sizeof(YbBackgroundWorkerHandle));
322326
bg_worker_handle->slot = -1;
323327
bg_worker_handle->generation = -1;
324328

325329
/* Initialize the database connection worker info */
326-
database_connection_worker_info = (DatabaseConnectionWorkerInfo *)
327-
ShmemAlloc(sizeof(DatabaseConnectionWorkerInfo));
330+
database_connection_worker_info = (YbDatabaseConnectionWorkerInfo *)
331+
ShmemAlloc(sizeof(YbDatabaseConnectionWorkerInfo));
328332
database_connection_worker_info->initialized = false;
333+
334+
/* Initialize bg_worker_should_be_active */
335+
bg_worker_should_be_active = (bool *) ShmemAlloc(sizeof(bool));
336+
337+
/*
338+
* Initialize background worker as inactive until
339+
* explicitly triggered by a diagnostics request.
340+
*/
341+
(*bg_worker_should_be_active) = false;
329342
}
330343

331344
static inline int
@@ -598,8 +611,10 @@ BgWorkerRegister(void)
598611
Assert(status == BGWH_STARTED);
599612

600613
/* Save the handle for future use */
601-
bg_worker_handle->slot = handle->slot;
602-
bg_worker_handle->generation = handle->generation;
614+
memcpy(bg_worker_handle, handle, sizeof(YbBackgroundWorkerHandle));
615+
616+
/* Set the bg_worker_should_be_active to true */
617+
(*bg_worker_should_be_active) = true;
603618
}
604619

605620
/*
@@ -1141,21 +1156,83 @@ AccumulateExplain(QueryDesc *queryDesc, YbQueryDiagnosticsEntry *entry, bool exp
11411156
pfree(es);
11421157
}
11431158

1159+
static bool
1160+
IsBgWorkerStopped()
1161+
{
1162+
pid_t pid;
1163+
1164+
return (GetBackgroundWorkerPid((BackgroundWorkerHandle *)bg_worker_handle,
1165+
&pid) != BGWH_STARTED);
1166+
}
1167+
1168+
/*
1169+
* StartBgWorkerIfStopped()
1170+
* Starts the background worker if it is not already running.
1171+
*
1172+
* This function manages the background worker lifecycle with the following logic:
1173+
* 1. If no worker has ever been initialized (invalid slot and generation),
1174+
* registers a new worker immediately.
1175+
* 2. If a worker is marked as inactive and not running, registers a new worker.
1176+
* 3. If a worker is marked as inactive but still running (in termination process),
1177+
* waits for the worker to fully terminate before registering a new one.
1178+
* If the wait exceeds the timeout (5 seconds), raises an ERROR to prevent
1179+
* indefinite waiting.
1180+
*/
1181+
static void
1182+
StartBgWorkerIfStopped()
1183+
{
1184+
int max_attempts = 50;
1185+
int attempts = 0;
1186+
1187+
/* Worker was never initialized (invalid slot and generation) */
1188+
if (bg_worker_handle->slot == -1 && bg_worker_handle->generation == -1)
1189+
BgWorkerRegister();
1190+
1191+
/* Worker was initialized but not currently running */
1192+
else if ((*bg_worker_should_be_active) == false)
1193+
{
1194+
while (attempts < max_attempts)
1195+
{
1196+
if (IsBgWorkerStopped())
1197+
{
1198+
BgWorkerRegister();
1199+
break;
1200+
}
1201+
1202+
pg_usleep(100000); /* Sleep for 100ms */
1203+
attempts++;
1204+
}
1205+
1206+
if (attempts >= max_attempts)
1207+
ereport(ERROR,
1208+
(errcode(ERRCODE_INSUFFICIENT_RESOURCES),
1209+
errmsg("timed out after waiting 5 seconds for "
1210+
"query diagnostics background worker to terminate")));
1211+
}
1212+
}
1213+
11441214
/*
11451215
* InsertNewBundleInfo
11461216
* Adds the entry into bundles_in_progress hash table.
11471217
* Entry is inserted only if it is not already present,
1148-
* otherwise an error is raised.
1218+
* otherwise an error is raised. This function also starts
1219+
* yb_query_diagnostics bgworker if it is not already running.
11491220
*/
11501221
static void
11511222
InsertNewBundleInfo(YbQueryDiagnosticsMetadata *metadata)
11521223
{
11531224
int64 key = metadata->params.query_id;
11541225
bool found;
1155-
pid_t pid;
11561226
YbQueryDiagnosticsEntry *entry;
11571227

11581228
LWLockAcquire(bundles_in_progress_lock, LW_EXCLUSIVE);
1229+
1230+
/*
1231+
* Note that we need not worry about concurrent registration attempts
1232+
* from different sessions as we are within an EXCLUSIVE lock.
1233+
*/
1234+
StartBgWorkerIfStopped();
1235+
11591236
entry = (YbQueryDiagnosticsEntry *) hash_search(bundles_in_progress, &key,
11601237
HASH_ENTER, &found);
11611238

@@ -1177,19 +1254,6 @@ InsertNewBundleInfo(YbQueryDiagnosticsMetadata *metadata)
11771254
.query_offset = 0,
11781255
.query_len = 0,
11791256
};
1180-
1181-
/*
1182-
* Note that we need not worry about concurrent registration attempts
1183-
* from different sessions as we are within an EXCLUSIVE lock.
1184-
*/
1185-
1186-
/* Worker was never initialized (invalid slot and generation) */
1187-
if (bg_worker_handle->slot == -1 && bg_worker_handle->generation == -1)
1188-
BgWorkerRegister();
1189-
1190-
/* Worker was initialized but not currently running */
1191-
else if (GetBackgroundWorkerPid(bg_worker_handle, &pid) != BGWH_STARTED)
1192-
BgWorkerRegister();
11931257
}
11941258

11951259
LWLockRelease(bundles_in_progress_lock);
@@ -2357,9 +2421,6 @@ YbQueryDiagnosticsMain(Datum main_arg)
23572421
/* Check for expired entries within the shared hash table */
23582422
FlushAndCleanBundles();
23592423

2360-
/* Kill bgworker if there are no active bundles */
2361-
bool should_terminate = false;
2362-
23632424
/*
23642425
* Acquire the exclusive lock on the bundles_in_progress hash table.
23652426
* This is necessary to prevent other sessions from
@@ -2369,13 +2430,16 @@ YbQueryDiagnosticsMain(Datum main_arg)
23692430

23702431
/* If there are no bundles being diagnosed, switch off the bgworker */
23712432
if (hash_get_num_entries(bundles_in_progress) == 0)
2372-
should_terminate = true;
2433+
(*bg_worker_should_be_active) = false;
23732434

23742435
LWLockRelease(bundles_in_progress_lock);
23752436

23762437
/* Do this outside the lock to ensure we release the lock before exiting. */
2377-
if (should_terminate)
2438+
if (!(*bg_worker_should_be_active))
23782439
{
2440+
if (YBQueryDiagnosticsTestRaceCondition())
2441+
pg_usleep(10000000L); /* 10 seconds */
2442+
23792443
/* Kill the bgworker */
23802444
ereport(LOG,
23812445
(errmsg("stopping query diagnostics background worker "

src/postgres/src/common/pg_yb_common.c

+11
Original file line numberDiff line numberDiff line change
@@ -307,3 +307,14 @@ YBGetDatabaseOidFromEnv(const char *database_name)
307307
}
308308
return InvalidOid;
309309
}
310+
311+
bool
312+
YBQueryDiagnosticsTestRaceCondition()
313+
{
314+
static int cached_value = -1;
315+
if (cached_value == -1)
316+
{
317+
cached_value = YBCIsEnvVarTrue("FLAGS_TEST_ysql_yb_query_diagnostics_race_condition");
318+
}
319+
return cached_value;
320+
}

src/postgres/src/include/common/pg_yb_common.h

+5
Original file line numberDiff line numberDiff line change
@@ -152,4 +152,9 @@ extern bool YBColocateDatabaseByDefault();
152152
*/
153153
Oid YBGetDatabaseOidFromEnv(const char *database_name);
154154

155+
/**
156+
* Returns whether FLAGS_TEST_ysql_yb_query_diagnostics_race_condition is set.
157+
*/
158+
extern bool YBQueryDiagnosticsTestRaceCondition();
159+
155160
#endif /* PG_YB_COMMON_H */

src/yb/yql/pgwrapper/pg_wrapper.cc

+3
Original file line numberDiff line numberDiff line change
@@ -81,6 +81,9 @@ DEFINE_NON_RUNTIME_bool(yb_enable_valgrind, false,
8181
DEFINE_test_flag(bool, pg_collation_enabled, true,
8282
"True to enable collation support in YugaByte PostgreSQL.");
8383

84+
DEFINE_test_flag(bool, ysql_yb_query_diagnostics_race_condition, false,
85+
"If true, enables race condition testing for query diagnostics.");
86+
8487
// Default to 5MB
8588
DEFINE_UNKNOWN_string(
8689
pg_mem_tracker_tcmalloc_gc_release_bytes, std::to_string(5 * 1024 * 1024),

0 commit comments

Comments
 (0)