Skip to content

Commit

Permalink
Merge branch 'snappy/master' into SNAP-3104
Browse files Browse the repository at this point in the history
  • Loading branch information
Vatsal Mevada committed Feb 11, 2020
2 parents 058c2c6 + d0f2973 commit ca4261c
Show file tree
Hide file tree
Showing 77 changed files with 1,616 additions and 407 deletions.
2 changes: 1 addition & 1 deletion build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -130,7 +130,7 @@ allprojects {

PRODUCT_MAJOR = '1'
PRODUCT_MINOR = '6'
PRODUCT_MAINT = '4'
PRODUCT_MAINT = '5'
PRODUCT_CLASSIFIER = ''
PRODUCT_RELEASE_STAGE = ''
PRODUCT_VERSION = "${PRODUCT_MAJOR}.${PRODUCT_MINOR}.${PRODUCT_MAINT}${PRODUCT_CLASSIFIER}"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -266,6 +266,7 @@ protected void restoreStdOut( ) {
protected static final String PROPERTIES = "properties";
public static final String REBALANCE = "rebalance";
public static final String RECOVER = "recover";
public static final String RECOVERY_STATE_CHUNK_SIZE = "recovery-state-chunk-size";
public static final String SERVER_PORT = "server-port";
public static final String SERVER_BIND_ADDRESS = "server-bind-address";
public static final String DISABLE_DEFAULT_SERVER = "disable-default-server";
Expand Down Expand Up @@ -337,6 +338,9 @@ else if (arg.startsWith("-lock-memory")) {
else if (arg.startsWith("-rebalance")) {
options.put(REBALANCE, Boolean.TRUE);
}
else if (arg.startsWith("-recovery-state-chunk-size")) {
options.put(RECOVERY_STATE_CHUNK_SIZE, arg.substring(arg.indexOf("=") + 1));
}
else if (arg.startsWith("-recover")) {
options.put(RECOVER, Boolean.TRUE);
}
Expand Down Expand Up @@ -469,6 +473,9 @@ else if (arg.startsWith("-dir=")) {
else if (arg.startsWith("-rebalance")) {
options.put(REBALANCE, Boolean.TRUE);
}
else if (arg.startsWith("-recovery-state-chunk-size")) {
options.put(RECOVERY_STATE_CHUNK_SIZE, arg.substring(arg.indexOf("=") + 1));
}
else if (arg.startsWith("-recover")) {
options.put(RECOVER, "true");
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4935,4 +4935,5 @@ static int calcMemSize(Object value) {
return 0;
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -540,6 +540,8 @@ public class GemFireCacheImpl implements InternalCache, ClientCache, HasCachePer
/** indicates whether the snappy system has been booted in recovery mode */
private static boolean snappyRecoverMode;

private static int recoveryStateChunkSize;

private final CacheConfig cacheConfig;

// Stores the properties used to initialize declarables.
Expand Down Expand Up @@ -6123,6 +6125,13 @@ public final boolean isSnappyRecoveryMode() {
return snappyRecoverMode;
}

public void setRecoveryStateChunkSize(int size) {
recoveryStateChunkSize = size;
}
public int getRecoveryStateChunkSize() {
return recoveryStateChunkSize;
}

public final boolean isGFXDSystem() {
return gfxdSystem;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -729,7 +729,7 @@ private long recoverOplogs(long byteCount, boolean initialRecovery) {
if (!parent.isOffline()) {
// schedule GFXD index recovery first
parent.scheduleIndexRecovery(oplogSet, false);
if(recoverValues() && !recoverValuesSync()) {
if(!parent.getCache().isSnappyRecoveryMode() && recoverValues() && !recoverValuesSync()) {
//TODO DAN - should we defer compaction until after
//value recovery is complete? Or at least until after
//value recovery for a given oplog is complete?
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,11 +46,7 @@
import com.gemstone.gemfire.internal.cache.locks.NonReentrantLock;
import com.gemstone.gemfire.internal.cache.tier.sockets.ClientProxyMembershipID;
import com.gemstone.gemfire.internal.cache.tier.sockets.VersionedObjectList;
import com.gemstone.gemfire.internal.cache.versions.RegionVersionHolder;
import com.gemstone.gemfire.internal.cache.versions.RegionVersionVector;
import com.gemstone.gemfire.internal.cache.versions.VersionSource;
import com.gemstone.gemfire.internal.cache.versions.VersionStamp;
import com.gemstone.gemfire.internal.cache.versions.VersionTag;
import com.gemstone.gemfire.internal.cache.versions.*;
import com.gemstone.gemfire.internal.concurrent.ConcurrentTHashSet;
import com.gemstone.gemfire.internal.concurrent.CustomEntryConcurrentHashMap;
import com.gemstone.gemfire.internal.concurrent.MapCallback;
Expand Down Expand Up @@ -1146,18 +1142,27 @@ private void publishRecordedVersions() {
if (isSnapshot() || cache.snapshotEnabledForTest()) {
// first take a lock at cache level so that we don't go into deadlock or sort array before
// This is for tx RC, for snapshot just record all the versions from the queue
//TODO: this is performance issue: Need to make the lock granular at region level.
// also write a different recordVersion which will record without making clone

cache.acquireWriteLockOnSnapshotRvv();
try {
Map<LocalRegion, Map<VersionSource, VersionHolder>> s = new HashMap();
for (VersionInformation vi : queue) {
if (TXStateProxy.LOG_FINE) {
logger.info(LocalizedStrings.DEBUG, "Recording version " + vi + " from snapshot to " +
"region.");
"region.");
}

if (vi.region.isSnapshotEnabledRegion()) {
Map versionV = s.get(vi.region);
if (versionV == null) {
versionV = vi.region.getVersionVector().getMemToVSnapshotCopy();
s.put(vi.region, versionV);
}
vi.region.getVersionVector().
recordVersionForSnapshotWithoutPublish((VersionSource) vi.member, vi.version, versionV);
}
((LocalRegion)vi.region).getVersionVector().
recordVersionForSnapshot((VersionSource)vi.member, vi.version, null);
}
for (Map.Entry<LocalRegion, Map<VersionSource, VersionHolder>> entry : s.entrySet()) {
entry.getKey().getVersionVector().recordAllVersion(entry.getValue());
}
} finally {
cache.releaseWriteLockOnSnapshotRvv();
Expand Down Expand Up @@ -4270,8 +4275,9 @@ public boolean isSnapshot() {
}

@Override
public void recordVersionForSnapshot(Object member, long version, Region region) {
public void recordVersionForSnapshot(Object member, long version, LocalRegion region) {
queue.add(new VersionInformation(member, version, region));

Boolean wasPresent = writeRegions.putIfAbsent(region, true);
if (wasPresent == null) {
if (region instanceof BucketRegion) {
Expand All @@ -4284,8 +4290,8 @@ public void recordVersionForSnapshot(Object member, long version, Region region)
class VersionInformation {
Object member;
long version;
Region region;
public VersionInformation(Object member, long version, Region reg){
LocalRegion region;
public VersionInformation(Object member, long version, LocalRegion reg){
this.member = member;
this.version = version;
this.region = reg;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -177,5 +177,5 @@ public Object lockEntry(RegionEntry entry, Object key, Object callbackArg,

public boolean isSnapshot();

public void recordVersionForSnapshot(Object member, long version, Region region);
public void recordVersionForSnapshot(Object member, long version, LocalRegion region);
}
Original file line number Diff line number Diff line change
Expand Up @@ -4792,7 +4792,7 @@ public boolean isSnapshot() {
}

@Override
public void recordVersionForSnapshot(Object member, long version, Region region) {
public void recordVersionForSnapshot(Object member, long version, LocalRegion region) {
final TXState localState = getTXStateForRead();
localState.recordVersionForSnapshot(member, version, region);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,7 @@
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.util.Collections;
import java.util.ConcurrentModificationException;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.*;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
Expand Down Expand Up @@ -784,7 +779,9 @@ public void recordVersionForSnapshot(T member, long version, EntryEventImpl even
}

holder.recordVersion(version, logger);

memberToVersionSnapshot.put(holder.id, holder);

forPrinting = memberToVersionSnapshot;
}

Expand All @@ -800,6 +797,67 @@ public void recordVersionForSnapshot(T member, long version, EntryEventImpl even
}
}

public Map<T, RegionVersionHolder<T>> getMemToVSnapshotCopy() {
Map<T, RegionVersionHolder<T>> m = new HashMap();
for (Map.Entry<T, RegionVersionHolder<T>> entry : memberToVersionSnapshot.entrySet()) {
m.put(entry.getKey(), entry.getValue().clone());
}
return m;
}

public Map<T, RegionVersionHolder<T>> recordVersionForSnapshotWithoutPublish(T member, long version,
Map<T, RegionVersionHolder<T>> m) {
LogWriterI18n logger = getLoggerI18n();
T mbr = member;
GemFireCacheImpl cache = GemFireCacheImpl.getInstance();
if (cache != null && !cache.snapshotEnabled()) {
return null;
}
RegionVersionHolder<T> holder;
//Find the version holder object
synchronized (memberToVersionSnapshot) {
holder = m.get(mbr);
if (holder == null) {
mbr = getCanonicalId(mbr);
holder = new RegionVersionHolder<T>(mbr);
}
holder.recordVersion(version, logger);

// instead of putting the holder save it
// and do a putAll
m.put(holder.id, holder);
}

if (logger!= null && logger.fineEnabled()) {

logger.fine("Recorded version: " + version + " for member " + member + " in the snapshot region : "
+ " the snapshot is " + m +
" it contains version after recording "
+ m.get(member).contains(version));
}
return m;

}

public void recordAllVersion(Map<T, RegionVersionHolder<T>> versions) {
LogWriterI18n logger = getLoggerI18n();

GemFireCacheImpl cache = GemFireCacheImpl.getInstance();
if (cache != null && !cache.snapshotEnabled()) {
return;
}
RegionVersionHolder<T> holder;
Map<T, RegionVersionHolder<T>> forPrinting;
//Find the version holder object
synchronized (memberToVersionSnapshot) {
memberToVersionSnapshot.putAll(versions);
forPrinting = memberToVersionSnapshot;
}
if (logger!= null && logger.fineEnabled()) {

logger.fine("Recorded version: " + versions + " After record:" + forPrinting);
}
}
/**
* Records a received region-version. These are transmitted in VersionTags
* in messages between peers and from servers to clients. In general you
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@
*/
public class StringPrintWriter extends PrintWriter {

private final StringBuilder sb;
private StringBuilder sb;

private final static Writer dummyLock = new StringWriter();

Expand Down Expand Up @@ -223,6 +223,10 @@ public void flush() {
// nothing to be done
}

public void reset() {
this.sb.setLength(0);
}

@Override
public void close() {
// nothing to be done
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
/*
* Changes for SnappyData data platform.
*
* Portions Copyright (c) 2017-2019 TIBCO Software Inc. All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you
* may not use this file except in compliance with the License. You
* may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
* implied. See the License for the specific language governing
* permissions and limitations under the License. See accompanying
* LICENSE file.
*/
package com.snappydata.jdbc;

/**
* This allows using com.snappydata.jdbc.ClientDriver instead of
* {@link io.snappydata.jdbc.ClientDriver}
*/
public class ClientDriver extends io.snappydata.jdbc.ClientDriver {
}
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@
import java.util.stream.Collectors;

import com.pivotal.gemfirexd.Attribute;
import com.sun.xml.internal.fastinfoset.stax.events.Util;
import org.apache.tomcat.jdbc.pool.DataSource;
import org.apache.tomcat.jdbc.pool.PoolProperties;
import org.apache.tomcat.jdbc.pool.interceptor.StatementFinalizer;
Expand Down Expand Up @@ -162,12 +161,12 @@ private PoolProperties getPoolProperties(Properties prop) {
poolProperties.setDriverClassName(driverClassName);

String username = prop.getProperty(PoolProps.USER.key);
if (!Util.isEmptyString(username)) {
if (username != null && !username.isEmpty()) {
poolProperties.setUsername(username);
}

String password = prop.getProperty(PoolProps.PASSWORD.key);
if (!Util.isEmptyString(password)) {
if (password != null && !password.isEmpty()) {
poolProperties.setPassword(password);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -318,10 +318,13 @@ private ClientService(HostAddress hostAddr, OpenConnectionArgs connArgs)
throws SnappyException {

ClientConfiguration config = ClientConfiguration.getInstance();
ClientSharedUtils.getLogger(getClass()).info("Starting client on '" + hostName +
"' with ID='" + hostId + "' Source-Revision=" +
config.getSourceRevision());

String intpModeProperty = System.getProperty("LAUNCHER_INTERPRETER_MODE");
boolean isInterpreterMode = intpModeProperty != null && intpModeProperty.equals("true");
if (!isInterpreterMode) {
ClientSharedUtils.getLogger(getClass()).info("Starting client on '" + hostName +
"' with ID='" + hostId + "' Source-Revision=" +
config.getSourceRevision());
}
this.isClosed = true;

this.currentHostConnection = null;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -214,7 +214,7 @@ public void close() throws IOException {
if (!isClosed) {
GfxdConnectionWrapper.restoreContextStack(es, rs);
try {
rs.lightWeightClose();
rs.close();
} catch (SQLException e) {
logger.warn("Error while trying to free reader resources", e);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -282,7 +282,7 @@ public static synchronized boolean initTypes() {
DSFIDFactory.registerGemFireXDClass(PROJECTION_ROW,
() -> new ProjectionRow());
DSFIDFactory.registerGemFireXDClass(LEAD_NODE_DATA_MSG,
() -> new GetLeadNodeInfoAsStringMessage());
() -> new GetLeadNodeInfoMsg());
DSFIDFactory.registerGemFireXDClass(LEAD_DISK_STATE_MSG,
() -> new RecoveredMetadataRequestMessage());

Expand Down
Loading

0 comments on commit ca4261c

Please sign in to comment.