Skip to content

Commit

Permalink
[#24713] docdb: PG Integration with the DocDB advisory lock functiona…
Browse files Browse the repository at this point in the history
…lity

Summary:
This diff adds PG integration with the DocDB advisory lock functionality.
Only transaction level locks are supported.

When PG flag `ysql_yb_enable_advisory_lock` has been set to true, user can use transaction level advisory locks.
```
void pg_advisory_xact_lock(key bigint)
void pg_advisory_xact_lock(key1 int, key2 int)
void pg_advisory_xact_lock_shared(key bigint)
void pg_advisory_xact_lock_shared(key1 int, key2 int)
boolean pg_try_advisory_xact_lock(key bigint)
boolean pg_try_advisory_xact_lock(key1 int, key2 int)
boolean pg_try_advisory_xact_lock_shared(key bigint)
boolean pg_try_advisory_xact_lock_shared(key1 int, key2 int)

```

If session level advisory lock functions are called, it will raise an error from pg_client_session layer.
```
ERROR: session-level advisory locks are not yet implemented
```

In this diff, also abandoned the gflag `yb_enable_advisory_lock`, introduced a new preview PG gflag `ysql_yb_enable_advisory_locks` to guard this feature instead.

To enable this feature when starting a cluster, should set the following gflags on both master and tserver:
`ysql_yb_enable_advisory_locks=true,allowed_preview_flags_csv=ysql_yb_enable_advisory_locks`

**Upgrade/Rollback safety:**
The advisory lock feature is guarded by ysql_yb_enable_advisory_lock (false by default).
Jira: DB-13789

Test Plan:
Jenkins: urgent
advisory_lock-test
pg_advisory_lock-test
org.yb.pgsql.TestPgRegressIsolationWithoutWaitQueues#testPgRegress
org.yb.pgsql.TestPgRegressIsolation#testPgRegress

Reviewers: pjain, hsunder, bkolagani, yyan

Reviewed By: pjain, hsunder, bkolagani

Subscribers: svc_phabricator, ybase, yql

Differential Revision: https://phorge.dev.yugabyte.com/D40617
  • Loading branch information
yusong-yan committed Dec 17, 2024
1 parent 7497dd7 commit 8ce8475
Show file tree
Hide file tree
Showing 33 changed files with 1,136 additions and 107 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,16 @@ public class TestPgRegressIsolation extends BasePgRegressTest {
protected Map<String, String> getTServerFlags() {
Map<String, String> flagMap = super.getTServerFlags();
flagMap.put("yb_enable_read_committed_isolation", "true");
flagMap.put("allowed_preview_flags_csv", "ysql_yb_enable_advisory_locks");
flagMap.put("ysql_yb_enable_advisory_locks", "true");
return flagMap;
}

@Override
protected Map<String, String> getMasterFlags() {
Map<String, String> flagMap = super.getMasterFlags();
flagMap.put("allowed_preview_flags_csv", "ysql_yb_enable_advisory_locks");
flagMap.put("ysql_yb_enable_advisory_locks", "true");
return flagMap;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,16 @@ protected Map<String, String> getTServerFlags() {
Map<String, String> flagMap = super.getTServerFlags();
flagMap.put("enable_wait_queues", "false");
flagMap.put("yb_enable_read_committed_isolation", "true");
flagMap.put("allowed_preview_flags_csv", "ysql_yb_enable_advisory_locks");
flagMap.put("ysql_yb_enable_advisory_locks", "true");
return flagMap;
}

@Override
protected Map<String, String> getMasterFlags() {
Map<String, String> flagMap = super.getMasterFlags();
flagMap.put("allowed_preview_flags_csv", "ysql_yb_enable_advisory_locks");
flagMap.put("ysql_yb_enable_advisory_locks", "true");
return flagMap;
}

Expand Down
120 changes: 96 additions & 24 deletions src/postgres/src/backend/utils/adt/lockfuncs.c
Original file line number Diff line number Diff line change
Expand Up @@ -24,19 +24,80 @@
/* YB includes. */
#include "pg_yb_utils.h"

bool
ShouldAcquireYBAdvisoryLocks()
{
return IsYugaByteEnabled() && yb_enable_advisory_locks;
}

static void
YbPreventAdvisoryLocks(void)
YbRaiseAdvisoryLocksNotSupported(void)
{
if (!yb_silence_advisory_locks_not_supported_error)
ereport(ERROR,
(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
errmsg("advisory locks are not yet implemented"),
errhint(
"If the app doesn't need strict functionality, this error can be silenced "
"by using the GFlag yb_silence_advisory_locks_not_supported_error. "
"See https://github.com/yugabyte/yugabyte-db/issues/3642 for details.")));
"If the app doesn't need strict functionality, this error can be silenced "
"by using the GFlag yb_silence_advisory_locks_not_supported_error. "
"See https://github.com/yugabyte/yugabyte-db/issues/3642 for details.")));
}

YBAdvisoryLockId
GetYBAdvisoryLockId(LOCKTAG tag)
{
YBAdvisoryLockId lock;
// Current database oid.
lock.database_id = tag.locktag_field1;
// First of 2 int4 keys, or high-order half of an int8 key.
lock.classid = tag.locktag_field2;
// Second of 2 int4 keys, or low-order half of an int8 key.
lock.objid = tag.locktag_field3;
// 1 if using one int8 key. 2 if using two int4 keys.
lock.objsubid = tag.locktag_field4;
return lock;
}

// Returns true if lock is acquired, false if lock is skipped.
bool
HandleStatusIgnoreSkipLocking(YBCStatus status)
{
if (status && YBCIsTxnSkipLockingError(YBCStatusTransactionError(status)))
{
YBCFreeStatus(status);
return false;
}
HandleYBStatus(status);
return true;
}

#define TryAcquireYBAdvisoryLock(tag, mode, session_level) \
do { \
if (ShouldAcquireYBAdvisoryLocks()) \
PG_RETURN_BOOL(HandleStatusIgnoreSkipLocking(YBCAcquireAdvisoryLock( \
GetYBAdvisoryLockId(tag), mode, /* wait= */ false, session_level))); \
YbRaiseAdvisoryLocksNotSupported(); \
} while(0)

#define AcquireYBAdvisoryLock(tag, mode, session_level) \
do { \
if (ShouldAcquireYBAdvisoryLocks()) \
{ \
HandleYBStatus(YBCAcquireAdvisoryLock( \
GetYBAdvisoryLockId(tag), mode, /* wait= */ true, session_level)); \
PG_RETURN_VOID(); \
} \
YbRaiseAdvisoryLocksNotSupported(); \
} while(0)

#define ReleaseYBAdvisoryLock(tag, mode) \
do { \
if (ShouldAcquireYBAdvisoryLocks()) \
PG_RETURN_BOOL(HandleStatusIgnoreSkipLocking( \
YBCReleaseAdvisoryLock(GetYBAdvisoryLockId(tag), mode))); \
YbRaiseAdvisoryLocksNotSupported(); \
} while(0)

/*
* This must match enum LockTagType! Also, be sure to document any changes
* in the docs for the pg_locks view and for wait event types.
Expand Down Expand Up @@ -707,8 +768,8 @@ pg_advisory_lock_int8(PG_FUNCTION_ARGS)
int64 key = PG_GETARG_INT64(0);
LOCKTAG tag;

YbPreventAdvisoryLocks();
SET_LOCKTAG_INT64(tag, key);
AcquireYBAdvisoryLock(tag, YB_ADVISORY_LOCK_EXCLUSIVE, /* session_level= */ true);

(void) LockAcquire(&tag, ExclusiveLock, true, false);

Expand All @@ -725,8 +786,8 @@ pg_advisory_xact_lock_int8(PG_FUNCTION_ARGS)
int64 key = PG_GETARG_INT64(0);
LOCKTAG tag;

YbPreventAdvisoryLocks();
SET_LOCKTAG_INT64(tag, key);
AcquireYBAdvisoryLock(tag, YB_ADVISORY_LOCK_EXCLUSIVE, /* session_level= */ false);

(void) LockAcquire(&tag, ExclusiveLock, false, false);

Expand All @@ -742,8 +803,8 @@ pg_advisory_lock_shared_int8(PG_FUNCTION_ARGS)
int64 key = PG_GETARG_INT64(0);
LOCKTAG tag;

YbPreventAdvisoryLocks();
SET_LOCKTAG_INT64(tag, key);
AcquireYBAdvisoryLock(tag, YB_ADVISORY_LOCK_SHARED, /* session_level= */ true);

(void) LockAcquire(&tag, ShareLock, true, false);

Expand All @@ -760,8 +821,8 @@ pg_advisory_xact_lock_shared_int8(PG_FUNCTION_ARGS)
int64 key = PG_GETARG_INT64(0);
LOCKTAG tag;

YbPreventAdvisoryLocks();
SET_LOCKTAG_INT64(tag, key);
AcquireYBAdvisoryLock(tag, YB_ADVISORY_LOCK_SHARED, /* session_level= */ false);

(void) LockAcquire(&tag, ShareLock, false, false);

Expand All @@ -780,8 +841,9 @@ pg_try_advisory_lock_int8(PG_FUNCTION_ARGS)
LOCKTAG tag;
LockAcquireResult res;

YbPreventAdvisoryLocks();
SET_LOCKTAG_INT64(tag, key);
TryAcquireYBAdvisoryLock(
tag, YB_ADVISORY_LOCK_EXCLUSIVE, /* session_level= */ true);

res = LockAcquire(&tag, ExclusiveLock, true, true);

Expand All @@ -801,8 +863,9 @@ pg_try_advisory_xact_lock_int8(PG_FUNCTION_ARGS)
LOCKTAG tag;
LockAcquireResult res;

YbPreventAdvisoryLocks();
SET_LOCKTAG_INT64(tag, key);
TryAcquireYBAdvisoryLock(
tag, YB_ADVISORY_LOCK_EXCLUSIVE, /* session_level= */ false);

res = LockAcquire(&tag, ExclusiveLock, false, true);

Expand All @@ -821,8 +884,8 @@ pg_try_advisory_lock_shared_int8(PG_FUNCTION_ARGS)
LOCKTAG tag;
LockAcquireResult res;

YbPreventAdvisoryLocks();
SET_LOCKTAG_INT64(tag, key);
TryAcquireYBAdvisoryLock(tag, YB_ADVISORY_LOCK_SHARED, /* session_level= */ true);

res = LockAcquire(&tag, ShareLock, true, true);

Expand All @@ -842,8 +905,8 @@ pg_try_advisory_xact_lock_shared_int8(PG_FUNCTION_ARGS)
LOCKTAG tag;
LockAcquireResult res;

YbPreventAdvisoryLocks();
SET_LOCKTAG_INT64(tag, key);
TryAcquireYBAdvisoryLock(tag, YB_ADVISORY_LOCK_SHARED, /* session_level= */ false);

res = LockAcquire(&tag, ShareLock, false, true);

Expand All @@ -862,8 +925,8 @@ pg_advisory_unlock_int8(PG_FUNCTION_ARGS)
LOCKTAG tag;
bool res;

YbPreventAdvisoryLocks();
SET_LOCKTAG_INT64(tag, key);
ReleaseYBAdvisoryLock(tag, YB_ADVISORY_LOCK_EXCLUSIVE);

res = LockRelease(&tag, ExclusiveLock, true);

Expand All @@ -882,8 +945,8 @@ pg_advisory_unlock_shared_int8(PG_FUNCTION_ARGS)
LOCKTAG tag;
bool res;

YbPreventAdvisoryLocks();
SET_LOCKTAG_INT64(tag, key);
ReleaseYBAdvisoryLock(tag, YB_ADVISORY_LOCK_SHARED);

res = LockRelease(&tag, ShareLock, true);

Expand All @@ -900,8 +963,8 @@ pg_advisory_lock_int4(PG_FUNCTION_ARGS)
int32 key2 = PG_GETARG_INT32(1);
LOCKTAG tag;

YbPreventAdvisoryLocks();
SET_LOCKTAG_INT32(tag, key1, key2);
ReleaseYBAdvisoryLock(tag, YB_ADVISORY_LOCK_EXCLUSIVE);

(void) LockAcquire(&tag, ExclusiveLock, true, false);

Expand All @@ -919,8 +982,8 @@ pg_advisory_xact_lock_int4(PG_FUNCTION_ARGS)
int32 key2 = PG_GETARG_INT32(1);
LOCKTAG tag;

YbPreventAdvisoryLocks();
SET_LOCKTAG_INT32(tag, key1, key2);
AcquireYBAdvisoryLock(tag, YB_ADVISORY_LOCK_EXCLUSIVE, /* session_level= */ false);

(void) LockAcquire(&tag, ExclusiveLock, false, false);

Expand All @@ -937,8 +1000,8 @@ pg_advisory_lock_shared_int4(PG_FUNCTION_ARGS)
int32 key2 = PG_GETARG_INT32(1);
LOCKTAG tag;

YbPreventAdvisoryLocks();
SET_LOCKTAG_INT32(tag, key1, key2);
AcquireYBAdvisoryLock(tag, YB_ADVISORY_LOCK_SHARED, /* session_level= */ true);

(void) LockAcquire(&tag, ShareLock, true, false);

Expand All @@ -956,8 +1019,9 @@ pg_advisory_xact_lock_shared_int4(PG_FUNCTION_ARGS)
int32 key2 = PG_GETARG_INT32(1);
LOCKTAG tag;

YbPreventAdvisoryLocks();
SET_LOCKTAG_INT32(tag, key1, key2);
AcquireYBAdvisoryLock(tag, YB_ADVISORY_LOCK_SHARED, /* session_level= */ false);


(void) LockAcquire(&tag, ShareLock, false, false);

Expand All @@ -977,8 +1041,9 @@ pg_try_advisory_lock_int4(PG_FUNCTION_ARGS)
LOCKTAG tag;
LockAcquireResult res;

YbPreventAdvisoryLocks();
SET_LOCKTAG_INT32(tag, key1, key2);
TryAcquireYBAdvisoryLock(
tag, YB_ADVISORY_LOCK_EXCLUSIVE, /* session_level= */ true);

res = LockAcquire(&tag, ExclusiveLock, true, true);

Expand All @@ -999,8 +1064,9 @@ pg_try_advisory_xact_lock_int4(PG_FUNCTION_ARGS)
LOCKTAG tag;
LockAcquireResult res;

YbPreventAdvisoryLocks();
SET_LOCKTAG_INT32(tag, key1, key2);
TryAcquireYBAdvisoryLock(
tag, YB_ADVISORY_LOCK_EXCLUSIVE, /* session_level= */ false);

res = LockAcquire(&tag, ExclusiveLock, false, true);

Expand All @@ -1020,8 +1086,8 @@ pg_try_advisory_lock_shared_int4(PG_FUNCTION_ARGS)
LOCKTAG tag;
LockAcquireResult res;

YbPreventAdvisoryLocks();
SET_LOCKTAG_INT32(tag, key1, key2);
TryAcquireYBAdvisoryLock(tag, YB_ADVISORY_LOCK_SHARED, /* session_level= */ true);

res = LockAcquire(&tag, ShareLock, true, true);

Expand All @@ -1042,8 +1108,8 @@ pg_try_advisory_xact_lock_shared_int4(PG_FUNCTION_ARGS)
LOCKTAG tag;
LockAcquireResult res;

YbPreventAdvisoryLocks();
SET_LOCKTAG_INT32(tag, key1, key2);
TryAcquireYBAdvisoryLock(tag, YB_ADVISORY_LOCK_SHARED, /* session_level= */ false);

res = LockAcquire(&tag, ShareLock, false, true);

Expand All @@ -1063,8 +1129,8 @@ pg_advisory_unlock_int4(PG_FUNCTION_ARGS)
LOCKTAG tag;
bool res;

YbPreventAdvisoryLocks();
SET_LOCKTAG_INT32(tag, key1, key2);
ReleaseYBAdvisoryLock(tag, YB_ADVISORY_LOCK_EXCLUSIVE);

res = LockRelease(&tag, ExclusiveLock, true);

Expand All @@ -1084,8 +1150,8 @@ pg_advisory_unlock_shared_int4(PG_FUNCTION_ARGS)
LOCKTAG tag;
bool res;

YbPreventAdvisoryLocks();
SET_LOCKTAG_INT32(tag, key1, key2);
ReleaseYBAdvisoryLock(tag, YB_ADVISORY_LOCK_SHARED);

res = LockRelease(&tag, ShareLock, true);

Expand All @@ -1098,6 +1164,12 @@ pg_advisory_unlock_shared_int4(PG_FUNCTION_ARGS)
Datum
pg_advisory_unlock_all(PG_FUNCTION_ARGS)
{
if (ShouldAcquireYBAdvisoryLocks())
{
HandleYBStatus(YBCReleaseAllAdvisoryLocks(MyDatabaseId));
PG_RETURN_VOID();
}
YbRaiseAdvisoryLocksNotSupported();
LockReleaseSession(USER_LOCKMETHOD);

PG_RETURN_VOID();
Expand Down
13 changes: 12 additions & 1 deletion src/postgres/src/backend/utils/misc/guc.c
Original file line number Diff line number Diff line change
Expand Up @@ -3049,6 +3049,17 @@ static struct config_bool ConfigureNamesBool[] =
NULL, NULL, NULL
},

{
{"yb_enable_advisory_locks", PGC_SIGHUP, LOCK_MANAGEMENT,
gettext_noop("Enable advisory lock feature"),
NULL,
GUC_NOT_IN_SAMPLE
},
&yb_enable_advisory_locks,
false,
NULL, NULL, NULL
},

/* End-of-list marker */
{
{NULL, 0, 0, NULL, NULL}, NULL, false, NULL, NULL, NULL
Expand Down Expand Up @@ -10912,7 +10923,7 @@ void
ExecSetVariableStmt(VariableSetStmt *stmt, bool isTopLevel)
{
GucAction action = stmt->is_local ? GUC_ACTION_LOCAL : GUC_ACTION_SET;
bool YbDbAdminCanSet = false;
bool YbDbAdminCanSet = false;

if (IsYbDbAdminUser(GetUserId()))
{
Expand Down
Loading

0 comments on commit 8ce8475

Please sign in to comment.