Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

SNAP-1950 #361

Open
wants to merge 12 commits into
base: snappy/master
Choose a base branch
from
Original file line number Diff line number Diff line change
Expand Up @@ -187,5 +187,7 @@ public interface VersionTagEntry {
public VersionSource getMemberID();
public long getRegionVersion();
}


//public boolean isWriteLockedBySameThread();

}
Original file line number Diff line number Diff line change
Expand Up @@ -157,24 +157,45 @@ final void apply(final TXStateProxy tx) {
region = baseRegion = null;
}
final int numOps = this.pendingOps.size();
// take pendingTXRegionStates lock first so that
// GII thread doesn't block on TXRegionState.
//ArrayList<LocalRegion> lockedRegions = new ArrayList<>();
/*for (LocalRegion r : pendingOpsRegions) {
if (!r.isInitialized() && r.getImageState().lockPendingTXRegionStates(true, false)) {
lockedRegions.add(r);
}
}*/
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

commented portion can be removed to avoid confusion

boolean lockedPendingTXregionState = false;
for (int index = 0; index < numOps; index++) {
entry = this.pendingOps.get(index);
if (pendingOpsRegion == null) {
region = this.pendingOpsRegions.get(index);
if (region.isUsedForPartitionedRegionBucket()) {
baseRegion = region.getPartitionedRegion();
}
else {
} else {
baseRegion = region;
}
}
if (txState.isCoordinator()) {
region.waitForData();
}
txrs = txState.writeRegion(region);
if (txrs != null) {
txState.applyPendingOperation(entry, lockFlags, txrs, region,
baseRegion, eventTemplate, true, Boolean.TRUE, this);
if (!region.isInitialized() &&
region.getImageState().lockPendingTXRegionStates(true, false)) {
//lockedRegions.add(region);
lockedPendingTXregionState = true;
}
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

cleaner to do "boolean lockPendingTXRegionState = !region.isInitialized() && region.getImageState().lock..."

try {
txrs = txState.writeRegion(region);
if (txrs != null) {
txState.applyPendingOperation(entry, lockFlags, txrs, region,
baseRegion, eventTemplate, true, Boolean.TRUE, this);
}
} finally {
//for (LocalRegion r : lockedRegions) {
if (lockedPendingTXregionState) {
region.getImageState().unlockPendingTXRegionStates(true);
lockedPendingTXregionState = false;
}
}
}
} finally {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -150,7 +150,14 @@ public TXRegionState(final LocalRegion r, final TXState tx) {
this.expiryReadLock = r.getTxEntryExpirationReadLock();
this.isValid = true;

if (!r.isInitialized() && r.getImageState().lockPendingTXRegionStates(true, false)) {

// check if lock already taken by the same thread
// in that case don't take lock
ImageState imgState = r.getImageState();

//boolean alreadyLocked = imgState.isWriteLockedBySameThread();
if (!r.isInitialized()
&& (imgState.lockPendingTXRegionStates(true, false))) {
try {
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since the lock can be taken multiple times by same thread, won't it be better to make this a ReentrantReadWriteLock instead of "isWriteLockedBySameThread" check? The block inside will add TXRegionState in pending list and initialize pendingTXOps lists which I think is required.

if (!r.getImageState().addPendingTXRegionState(this)) {
this.pendingTXOps = null;
Expand All @@ -160,9 +167,9 @@ public TXRegionState(final LocalRegion r, final TXState tx) {
this.pendingTXLockFlags = new TIntArrayList();
}
} finally {
r.getImageState().unlockPendingTXRegionStates(true);
//if (!alreadyLocked)
r.getImageState().unlockPendingTXRegionStates(true);
}

} else {
this.pendingTXOps = null;
this.pendingTXLockFlags = null;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@
import com.gemstone.gemfire.distributed.internal.InternalDistributedSystem;
import com.gemstone.gemfire.i18n.LogWriterI18n;
import com.gemstone.gemfire.internal.Assert;
import com.gemstone.gemfire.internal.cache.locks.LockMode;
import com.gemstone.gemfire.internal.cache.locks.ReentrantReadWriteWriteShareLock;
import com.gemstone.gemfire.internal.util.concurrent.StoppableNonReentrantLock;
import com.gemstone.gemfire.internal.util.concurrent.StoppableReentrantReadWriteLock;
import com.gemstone.gemfire.internal.cache.locks.NonReentrantReadWriteLock;
Expand Down Expand Up @@ -75,8 +77,10 @@ public class UnsharedImageState implements ImageState {
* GII is complete
*/
private volatile THashMapWithCreate pendingTXRegionStates;
private final NonReentrantReadWriteLock pendingTXRegionStatesLock;
private volatile Thread pendingTXRegionStatesLockOwner;
private final ReentrantReadWriteWriteShareLock pendingTXRegionStatesLock;


//private volatile Thread pendingTXRegionStatesLockOwner;
private final AtomicInteger pendingTXOrder;
private volatile TObjectIntHashMap finishedTXIdOrders;

Expand All @@ -101,7 +105,7 @@ public class UnsharedImageState implements ImageState {
this.failedEvents = new ConcurrentTHashSet<EventID>(2);
this.pendingTXRegionStates = isLocal ? null : new THashMapWithCreate();
this.pendingTXRegionStatesLock = isLocal ? null
: new NonReentrantReadWriteLock(stopper);
: new ReentrantReadWriteWriteShareLock();
this.pendingTXOrder = new AtomicInteger(0);
}

Expand Down Expand Up @@ -133,6 +137,12 @@ public void setRequestedUnappliedDelta(boolean flag) {
requestedDelta = flag;
}

/*@Override
public boolean isWriteLockedBySameThread() {
return this.pendingTXRegionStatesLock.isWriteLocked() &&
(this.pendingTXRegionStatesLockOwner == Thread.currentThread());
}*/

@Override
public boolean requestedUnappliedDelta() {
return requestedDelta;
Expand Down Expand Up @@ -401,12 +411,13 @@ public TXRegionState getPendingTXRegionState(TXId txId, boolean lock) {
}
TXRegionState txrs = null;
if (lock) {
if (Thread.currentThread() != this.pendingTXRegionStatesLockOwner) {
this.pendingTXRegionStatesLock.attemptReadLock(-1);
}
else {
//if (Thread.currentThread() != this.pendingTXRegionStatesLockOwner) {
this.pendingTXRegionStatesLock.attemptLock(LockMode.SH,-1, this);
//this.pendingTXRegionStatesLock.attemptReadLock(-1);
//}
//else {
lock = false;
}
//}
}
try {
if (this.pendingTXRegionStates != null) {
Expand All @@ -421,7 +432,8 @@ public TXRegionState getPendingTXRegionState(TXId txId, boolean lock) {
}
} finally {
if (lock) {
this.pendingTXRegionStatesLock.releaseReadLock();
this.pendingTXRegionStatesLock.releaseLock(LockMode.SH,false, this);
//this.pendingTXRegionStatesLock.releaseReadLock();
}
}
return txrs;
Expand All @@ -445,11 +457,13 @@ public boolean lockPendingTXRegionStates(final boolean forWrite,
}
}
if (forWrite) {
this.pendingTXRegionStatesLock.attemptWriteLock(-1);
this.pendingTXRegionStatesLockOwner = Thread.currentThread();
this.pendingTXRegionStatesLock.attemptLock(LockMode.EX,-1, this);
//this.pendingTXRegionStatesLock.attemptWriteLock(-1);
//this.pendingTXRegionStatesLockOwner = Thread.currentThread();
}
else {
this.pendingTXRegionStatesLock.attemptReadLock(-1);
this.pendingTXRegionStatesLock.attemptLock(LockMode.SH,-1, this);
//this.pendingTXRegionStatesLock.attemptReadLock(-1);
}
if (this.pendingTXRegionStates != null) {
if (TXStateProxy.LOG_FINE) {
Expand All @@ -464,11 +478,14 @@ public boolean lockPendingTXRegionStates(final boolean forWrite,
}
else {
if (forWrite) {
this.pendingTXRegionStatesLockOwner = null;
this.pendingTXRegionStatesLock.releaseWriteLock();
//this.pendingTXRegionStatesLockOwner = null;
//this.pendingTXRegionStatesLock.releaseWriteLock();
this.pendingTXRegionStatesLock.releaseLock(LockMode.EX, false, this);
}
else {
this.pendingTXRegionStatesLock.releaseReadLock();
this.pendingTXRegionStatesLock.releaseLock(LockMode.SH, false, this);

//this.pendingTXRegionStatesLock.releaseReadLock();
}
return false;
}
Expand All @@ -481,11 +498,15 @@ public boolean lockPendingTXRegionStates(final boolean forWrite,
public void unlockPendingTXRegionStates(final boolean forWrite) {
if (this.pendingTXRegionStatesLock != null) {
if (forWrite) {
this.pendingTXRegionStatesLockOwner = null;
this.pendingTXRegionStatesLock.releaseWriteLock();
//this.pendingTXRegionStatesLockOwner = null;
//this.pendingTXRegionStatesLock.releaseWriteLock();
this.pendingTXRegionStatesLock.releaseLock(LockMode.EX, false, this);

}
else {
this.pendingTXRegionStatesLock.releaseReadLock();
//this.pendingTXRegionStatesLock.releaseReadLock();
this.pendingTXRegionStatesLock.releaseLock(LockMode.SH, false, this);

}
if (TXStateProxy.LOG_FINE) {
final LogWriterI18n logger = InternalDistributedSystem.getLoggerI18n();
Expand Down Expand Up @@ -529,7 +550,8 @@ public void setTXOrderForFinish(TXRegionState txrs) {
if (this.pendingTXRegionStatesLock != null) {
TObjectIntHashMap finishedOrders;
// assume read lock on pendingTXRegionStates is already held
Assert.assertTrue(this.pendingTXRegionStatesLock.numReaders() > 0);
//Assert.assertTrue(this.pendingTXRegionStatesLock.numReaders() > 0);
Assert.assertTrue(this.pendingTXRegionStatesLock.numReadOnlyLocks() > 0);
if ((finishedOrders = this.finishedTXIdOrders) != null) {
int order = finishedOrders.get(txrs.getTXState().getTransactionId());
if (order != 0) {
Expand Down Expand Up @@ -563,7 +585,8 @@ public void mergeFinishedTXOrders(final LocalRegion region,
final Collection<TXId> txIds) {
final THashMapWithCreate pendingTXRS = this.pendingTXRegionStates;
if (pendingTXRS != null) {
this.pendingTXRegionStatesLock.attemptWriteLock(-1);
//this.pendingTXRegionStatesLock.attemptWriteLock(-1);
this.pendingTXRegionStatesLock.attemptLock(LockMode.EX, -1, this);
try {
// first get the ordering for finished transactions from TX manager;
// this is deliberately invoked under the lock to sync against
Expand Down Expand Up @@ -597,7 +620,8 @@ public void mergeFinishedTXOrders(final LocalRegion region,
this.pendingTXOrder.addAndGet(increment);
this.finishedTXIdOrders = txIdOrders;
} finally {
this.pendingTXRegionStatesLock.releaseWriteLock();
//this.pendingTXRegionStatesLock.releaseWriteLock();
this.pendingTXRegionStatesLock.releaseLock(LockMode.EX, false, this);
}
}
}
Expand Down
32 changes: 17 additions & 15 deletions tests/sql/src/main/java/sql/sqlTx/thinClient/thinClientTx.bt
Original file line number Diff line number Diff line change
@@ -1,3 +1,18 @@
sql/sqlTx/thinClient/concUpdateTxClientHA.conf
locatorHosts = 1 locatorVMsPerHost =1 locatorThreadsPerVM = 1
serverHosts=1 serverVMsPerHost=5 serverThreadsPerVM=1
clientHosts = 1 clientVMsPerHost =6 clientThreadsPerVM = 5
nobatching=true
redundantCopies = 1

sql/sqlTx/thinClient/concUpdateTxClientHA.conf
locatorHosts = 1 locatorVMsPerHost =1 locatorThreadsPerVM = 1
serverHosts=1 serverVMsPerHost=5 serverThreadsPerVM=1
clientHosts = 1 clientVMsPerHost =6 clientThreadsPerVM = 5
nobatching=false
redundantCopies = 1


sql/sqlTx/thinClient/multiTablesTxClient.conf
locatorHosts = 1 locatorVMsPerHost =1 locatorThreadsPerVM = 1
serverHosts=1 serverVMsPerHost=2, 5 serverThreadsPerVM=1
Expand Down Expand Up @@ -78,20 +93,7 @@ sql/sqlTx/thinClient/randomPartitionTablesIndexTxNoBatchingClientHA.conf
workIterationsPerThread=500
redundantCopies = 1

sql/sqlTx/thinClient/concUpdateTxClientHA.conf
locatorHosts = 1 locatorVMsPerHost =1 locatorThreadsPerVM = 1
serverHosts=1 serverVMsPerHost=5 serverThreadsPerVM=1
clientHosts = 1 clientVMsPerHost =6 clientThreadsPerVM = 5
nobatching=true
redundantCopies = 1

sql/sqlTx/thinClient/concUpdateTxClientHA.conf
locatorHosts = 1 locatorVMsPerHost =1 locatorThreadsPerVM = 1
serverHosts=1 serverVMsPerHost=5 serverThreadsPerVM=1
clientHosts = 1 clientVMsPerHost =6 clientThreadsPerVM = 5
nobatching=false
redundantCopies = 1


sql/sqlTx/thinClient/randomPartitionTablesIndexUniqTxClient.conf
locatorHosts = 1 locatorVMsPerHost =1 locatorThreadsPerVM = 1
serverHosts=1 serverVMsPerHost=5 serverThreadsPerVM=1
Expand Down Expand Up @@ -138,4 +140,4 @@ sql/sqlTx/thinClient/randomPartitionNewTablesBatchingTxClientHA.conf
clientHosts = 1 clientVMsPerHost =6 clientThreadsPerVM = 8
workIterationsPerThread=500
redundantCopies = 1, 2, 3