diff --git a/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/ImageState.java b/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/ImageState.java index 3c7112037..a86be9b7f 100644 --- a/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/ImageState.java +++ b/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/ImageState.java @@ -187,5 +187,7 @@ public interface VersionTagEntry { public VersionSource getMemberID(); public long getRegionVersion(); } - + + //public boolean isWriteLockedBySameThread(); + } diff --git a/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/TXBatchMessage.java b/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/TXBatchMessage.java index 146504496..e71058d0a 100644 --- a/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/TXBatchMessage.java +++ b/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/TXBatchMessage.java @@ -157,24 +157,45 @@ final void apply(final TXStateProxy tx) { region = baseRegion = null; } final int numOps = this.pendingOps.size(); + // take pendingTXRegionStates lock first so that + // GII thread doesn't block on TXRegionState. + //ArrayList lockedRegions = new ArrayList<>(); + /*for (LocalRegion r : pendingOpsRegions) { + if (!r.isInitialized() && r.getImageState().lockPendingTXRegionStates(true, false)) { + lockedRegions.add(r); + } + }*/ + boolean lockedPendingTXregionState = false; for (int index = 0; index < numOps; index++) { entry = this.pendingOps.get(index); if (pendingOpsRegion == null) { region = this.pendingOpsRegions.get(index); if (region.isUsedForPartitionedRegionBucket()) { baseRegion = region.getPartitionedRegion(); - } - else { + } else { baseRegion = region; } } if (txState.isCoordinator()) { region.waitForData(); } - txrs = txState.writeRegion(region); - if (txrs != null) { - txState.applyPendingOperation(entry, lockFlags, txrs, region, - baseRegion, eventTemplate, true, Boolean.TRUE, this); + if (!region.isInitialized() && + region.getImageState().lockPendingTXRegionStates(true, false)) { + //lockedRegions.add(region); + lockedPendingTXregionState = true; + } + try { + txrs = txState.writeRegion(region); + if (txrs != null) { + txState.applyPendingOperation(entry, lockFlags, txrs, region, + baseRegion, eventTemplate, true, Boolean.TRUE, this); + } + } finally { + //for (LocalRegion r : lockedRegions) { + if (lockedPendingTXregionState) { + region.getImageState().unlockPendingTXRegionStates(true); + lockedPendingTXregionState = false; + } } } } finally { diff --git a/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/TXRegionState.java b/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/TXRegionState.java index 33b3a00b6..1d391a262 100644 --- a/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/TXRegionState.java +++ b/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/TXRegionState.java @@ -150,7 +150,14 @@ public TXRegionState(final LocalRegion r, final TXState tx) { this.expiryReadLock = r.getTxEntryExpirationReadLock(); this.isValid = true; - if (!r.isInitialized() && r.getImageState().lockPendingTXRegionStates(true, false)) { + + // check if lock already taken by the same thread + // in that case don't take lock + ImageState imgState = r.getImageState(); + + //boolean alreadyLocked = imgState.isWriteLockedBySameThread(); + if (!r.isInitialized() + && (imgState.lockPendingTXRegionStates(true, false))) { try { if (!r.getImageState().addPendingTXRegionState(this)) { this.pendingTXOps = null; @@ -160,9 +167,9 @@ public TXRegionState(final LocalRegion r, final TXState tx) { this.pendingTXLockFlags = new TIntArrayList(); } } finally { - r.getImageState().unlockPendingTXRegionStates(true); + //if (!alreadyLocked) + r.getImageState().unlockPendingTXRegionStates(true); } - } else { this.pendingTXOps = null; this.pendingTXLockFlags = null; diff --git a/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/UnsharedImageState.java b/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/UnsharedImageState.java index 6222906fa..bd50fbd3e 100644 --- a/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/UnsharedImageState.java +++ b/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/UnsharedImageState.java @@ -21,6 +21,8 @@ import com.gemstone.gemfire.distributed.internal.InternalDistributedSystem; import com.gemstone.gemfire.i18n.LogWriterI18n; import com.gemstone.gemfire.internal.Assert; +import com.gemstone.gemfire.internal.cache.locks.LockMode; +import com.gemstone.gemfire.internal.cache.locks.ReentrantReadWriteWriteShareLock; import com.gemstone.gemfire.internal.util.concurrent.StoppableNonReentrantLock; import com.gemstone.gemfire.internal.util.concurrent.StoppableReentrantReadWriteLock; import com.gemstone.gemfire.internal.cache.locks.NonReentrantReadWriteLock; @@ -75,8 +77,10 @@ public class UnsharedImageState implements ImageState { * GII is complete */ private volatile THashMapWithCreate pendingTXRegionStates; - private final NonReentrantReadWriteLock pendingTXRegionStatesLock; - private volatile Thread pendingTXRegionStatesLockOwner; + private final ReentrantReadWriteWriteShareLock pendingTXRegionStatesLock; + + + //private volatile Thread pendingTXRegionStatesLockOwner; private final AtomicInteger pendingTXOrder; private volatile TObjectIntHashMap finishedTXIdOrders; @@ -101,7 +105,7 @@ public class UnsharedImageState implements ImageState { this.failedEvents = new ConcurrentTHashSet(2); this.pendingTXRegionStates = isLocal ? null : new THashMapWithCreate(); this.pendingTXRegionStatesLock = isLocal ? null - : new NonReentrantReadWriteLock(stopper); + : new ReentrantReadWriteWriteShareLock(); this.pendingTXOrder = new AtomicInteger(0); } @@ -133,6 +137,12 @@ public void setRequestedUnappliedDelta(boolean flag) { requestedDelta = flag; } + /*@Override + public boolean isWriteLockedBySameThread() { + return this.pendingTXRegionStatesLock.isWriteLocked() && + (this.pendingTXRegionStatesLockOwner == Thread.currentThread()); + }*/ + @Override public boolean requestedUnappliedDelta() { return requestedDelta; @@ -401,12 +411,13 @@ public TXRegionState getPendingTXRegionState(TXId txId, boolean lock) { } TXRegionState txrs = null; if (lock) { - if (Thread.currentThread() != this.pendingTXRegionStatesLockOwner) { - this.pendingTXRegionStatesLock.attemptReadLock(-1); - } - else { + //if (Thread.currentThread() != this.pendingTXRegionStatesLockOwner) { + this.pendingTXRegionStatesLock.attemptLock(LockMode.SH,-1, this); + //this.pendingTXRegionStatesLock.attemptReadLock(-1); + //} + //else { lock = false; - } + //} } try { if (this.pendingTXRegionStates != null) { @@ -421,7 +432,8 @@ public TXRegionState getPendingTXRegionState(TXId txId, boolean lock) { } } finally { if (lock) { - this.pendingTXRegionStatesLock.releaseReadLock(); + this.pendingTXRegionStatesLock.releaseLock(LockMode.SH,false, this); + //this.pendingTXRegionStatesLock.releaseReadLock(); } } return txrs; @@ -445,11 +457,13 @@ public boolean lockPendingTXRegionStates(final boolean forWrite, } } if (forWrite) { - this.pendingTXRegionStatesLock.attemptWriteLock(-1); - this.pendingTXRegionStatesLockOwner = Thread.currentThread(); + this.pendingTXRegionStatesLock.attemptLock(LockMode.EX,-1, this); + //this.pendingTXRegionStatesLock.attemptWriteLock(-1); + //this.pendingTXRegionStatesLockOwner = Thread.currentThread(); } else { - this.pendingTXRegionStatesLock.attemptReadLock(-1); + this.pendingTXRegionStatesLock.attemptLock(LockMode.SH,-1, this); + //this.pendingTXRegionStatesLock.attemptReadLock(-1); } if (this.pendingTXRegionStates != null) { if (TXStateProxy.LOG_FINE) { @@ -464,11 +478,14 @@ public boolean lockPendingTXRegionStates(final boolean forWrite, } else { if (forWrite) { - this.pendingTXRegionStatesLockOwner = null; - this.pendingTXRegionStatesLock.releaseWriteLock(); + //this.pendingTXRegionStatesLockOwner = null; + //this.pendingTXRegionStatesLock.releaseWriteLock(); + this.pendingTXRegionStatesLock.releaseLock(LockMode.EX, false, this); } else { - this.pendingTXRegionStatesLock.releaseReadLock(); + this.pendingTXRegionStatesLock.releaseLock(LockMode.SH, false, this); + + //this.pendingTXRegionStatesLock.releaseReadLock(); } return false; } @@ -481,11 +498,15 @@ public boolean lockPendingTXRegionStates(final boolean forWrite, public void unlockPendingTXRegionStates(final boolean forWrite) { if (this.pendingTXRegionStatesLock != null) { if (forWrite) { - this.pendingTXRegionStatesLockOwner = null; - this.pendingTXRegionStatesLock.releaseWriteLock(); + //this.pendingTXRegionStatesLockOwner = null; + //this.pendingTXRegionStatesLock.releaseWriteLock(); + this.pendingTXRegionStatesLock.releaseLock(LockMode.EX, false, this); + } else { - this.pendingTXRegionStatesLock.releaseReadLock(); + //this.pendingTXRegionStatesLock.releaseReadLock(); + this.pendingTXRegionStatesLock.releaseLock(LockMode.SH, false, this); + } if (TXStateProxy.LOG_FINE) { final LogWriterI18n logger = InternalDistributedSystem.getLoggerI18n(); @@ -529,7 +550,8 @@ public void setTXOrderForFinish(TXRegionState txrs) { if (this.pendingTXRegionStatesLock != null) { TObjectIntHashMap finishedOrders; // assume read lock on pendingTXRegionStates is already held - Assert.assertTrue(this.pendingTXRegionStatesLock.numReaders() > 0); + //Assert.assertTrue(this.pendingTXRegionStatesLock.numReaders() > 0); + Assert.assertTrue(this.pendingTXRegionStatesLock.numReadOnlyLocks() > 0); if ((finishedOrders = this.finishedTXIdOrders) != null) { int order = finishedOrders.get(txrs.getTXState().getTransactionId()); if (order != 0) { @@ -563,7 +585,8 @@ public void mergeFinishedTXOrders(final LocalRegion region, final Collection txIds) { final THashMapWithCreate pendingTXRS = this.pendingTXRegionStates; if (pendingTXRS != null) { - this.pendingTXRegionStatesLock.attemptWriteLock(-1); + //this.pendingTXRegionStatesLock.attemptWriteLock(-1); + this.pendingTXRegionStatesLock.attemptLock(LockMode.EX, -1, this); try { // first get the ordering for finished transactions from TX manager; // this is deliberately invoked under the lock to sync against @@ -597,7 +620,8 @@ public void mergeFinishedTXOrders(final LocalRegion region, this.pendingTXOrder.addAndGet(increment); this.finishedTXIdOrders = txIdOrders; } finally { - this.pendingTXRegionStatesLock.releaseWriteLock(); + //this.pendingTXRegionStatesLock.releaseWriteLock(); + this.pendingTXRegionStatesLock.releaseLock(LockMode.EX, false, this); } } } diff --git a/tests/sql/src/main/java/sql/sqlTx/thinClient/thinClientTx.bt b/tests/sql/src/main/java/sql/sqlTx/thinClient/thinClientTx.bt index 680b5629b..b32d54916 100644 --- a/tests/sql/src/main/java/sql/sqlTx/thinClient/thinClientTx.bt +++ b/tests/sql/src/main/java/sql/sqlTx/thinClient/thinClientTx.bt @@ -1,3 +1,18 @@ +sql/sqlTx/thinClient/concUpdateTxClientHA.conf + locatorHosts = 1 locatorVMsPerHost =1 locatorThreadsPerVM = 1 + serverHosts=1 serverVMsPerHost=5 serverThreadsPerVM=1 + clientHosts = 1 clientVMsPerHost =6 clientThreadsPerVM = 5 + nobatching=true + redundantCopies = 1 + +sql/sqlTx/thinClient/concUpdateTxClientHA.conf + locatorHosts = 1 locatorVMsPerHost =1 locatorThreadsPerVM = 1 + serverHosts=1 serverVMsPerHost=5 serverThreadsPerVM=1 + clientHosts = 1 clientVMsPerHost =6 clientThreadsPerVM = 5 + nobatching=false + redundantCopies = 1 + + sql/sqlTx/thinClient/multiTablesTxClient.conf locatorHosts = 1 locatorVMsPerHost =1 locatorThreadsPerVM = 1 serverHosts=1 serverVMsPerHost=2, 5 serverThreadsPerVM=1 @@ -78,20 +93,7 @@ sql/sqlTx/thinClient/randomPartitionTablesIndexTxNoBatchingClientHA.conf workIterationsPerThread=500 redundantCopies = 1 -sql/sqlTx/thinClient/concUpdateTxClientHA.conf - locatorHosts = 1 locatorVMsPerHost =1 locatorThreadsPerVM = 1 - serverHosts=1 serverVMsPerHost=5 serverThreadsPerVM=1 - clientHosts = 1 clientVMsPerHost =6 clientThreadsPerVM = 5 - nobatching=true - redundantCopies = 1 - -sql/sqlTx/thinClient/concUpdateTxClientHA.conf - locatorHosts = 1 locatorVMsPerHost =1 locatorThreadsPerVM = 1 - serverHosts=1 serverVMsPerHost=5 serverThreadsPerVM=1 - clientHosts = 1 clientVMsPerHost =6 clientThreadsPerVM = 5 - nobatching=false - redundantCopies = 1 - + sql/sqlTx/thinClient/randomPartitionTablesIndexUniqTxClient.conf locatorHosts = 1 locatorVMsPerHost =1 locatorThreadsPerVM = 1 serverHosts=1 serverVMsPerHost=5 serverThreadsPerVM=1 @@ -138,4 +140,4 @@ sql/sqlTx/thinClient/randomPartitionNewTablesBatchingTxClientHA.conf clientHosts = 1 clientVMsPerHost =6 clientThreadsPerVM = 8 workIterationsPerThread=500 redundantCopies = 1, 2, 3 - \ No newline at end of file +