Skip to content

Commit

Permalink
Merge branch 'snappy/master' into spark_2.3_merge
Browse files Browse the repository at this point in the history
  • Loading branch information
ymahajan committed May 25, 2018
2 parents b1a43e2 + e806f70 commit d48e9f9
Show file tree
Hide file tree
Showing 19 changed files with 133 additions and 113 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ protected AbstractDiskRegionEntry(RegionEntryContext context, Object value) {
public void setValue(RegionEntryContext context, Object v) throws RegionClearedException {
Helper.update(this, (LocalRegion) context, v);
setRecentlyUsed(); // fix for bug #42284 - entry just put into the cache is evicted
initDiskIdForOffHeap(context, v);
initDiskIdForDiskBuffer(context, v);
}

/**
Expand All @@ -58,7 +58,7 @@ public void setValue(RegionEntryContext context, Object v) throws RegionCleared
@Override
public void setValueWithContext(RegionEntryContext context, Object value) {
_setValue(context, value);
initDiskIdForOffHeap(context, value);
initDiskIdForDiskBuffer(context, value);
if (value != null && context != null && context instanceof LocalRegion
&& ((LocalRegion)context).isThisRegionBeingClosedOrDestroyed()
&& isOffHeap()) {
Expand All @@ -71,7 +71,7 @@ && isOffHeap()) {
* Set the RegionEntry DiskId into SerializedDiskBuffer value, if present,
* so that the value can access data from disk when required independently.
*/
protected abstract void initDiskIdForOffHeap(RegionEntryContext context,
protected abstract void initDiskIdForDiskBuffer(RegionEntryContext context,
Object value);

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,18 +56,19 @@ protected AbstractOplogDiskRegionEntry(RegionEntryContext context, Object value)
protected abstract void setDiskId(RegionEntry oldRe);

@Override
protected final void initDiskIdForOffHeap(RegionEntryContext context,
protected final void initDiskIdForDiskBuffer(RegionEntryContext context,
Object value) {
// copy self to value if required
if (value instanceof SerializedDiskBuffer) {
((SerializedDiskBuffer)value).setDiskEntry(this, context);
}
}

public final void setDiskIdForRegion(RegionEntry oldRe) {
public final void setDiskIdForRegion(RegionEntryContext context,
RegionEntry oldRe) {
setDiskId(oldRe);
if (GemFireCacheImpl.hasNewOffHeap()) {
initDiskIdForOffHeap(null, getValueField());
if (!isOffHeap()) {
initDiskIdForDiskBuffer(context, getValueField());
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -883,7 +883,7 @@ public void copyRecoveredEntries(RegionMap rm, boolean entriesIncompatible) {
continue;
}
RegionEntry newRe = getEntryFactory().createEntry(owner, key, value);
copyRecoveredEntry(oldRe, newRe, currentTime);
copyRecoveredEntry(oldRe, newRe, owner, currentTime);
// newRe is now in this._getMap().
if (newRe.isTombstone()) {
VersionTag tag = newRe.getVersionStamp().asVersionTag();
Expand Down Expand Up @@ -935,7 +935,8 @@ public void copyRecoveredEntries(RegionMap rm, boolean entriesIncompatible) {

}

protected void copyRecoveredEntry(RegionEntry oldRe, RegionEntry newRe, long dummyVersionTs) {
protected void copyRecoveredEntry(RegionEntry oldRe, RegionEntry newRe,
LocalRegion owner, long dummyVersionTs) {
long lastModifiedTime = oldRe.getLastModified();
if (lastModifiedTime != 0) {
newRe.setLastModified(lastModifiedTime);
Expand All @@ -956,7 +957,7 @@ protected void copyRecoveredEntry(RegionEntry oldRe, RegionEntry newRe, long dum

if (newRe instanceof AbstractOplogDiskRegionEntry) {
AbstractOplogDiskRegionEntry newDe = (AbstractOplogDiskRegionEntry)newRe;
newDe.setDiskIdForRegion(oldRe);
newDe.setDiskIdForRegion(owner, oldRe);
_getOwner().getDiskRegion().replaceIncompatibleEntry((DiskEntry) oldRe, newDe);
}
_getMap().put(newRe.getKey(), newRe);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,9 @@
import java.nio.ByteBuffer;

import com.gemstone.gemfire.internal.shared.unsafe.FreeMemory;
import com.gemstone.gemfire.internal.shared.unsafe.UnsafeHolder;
import org.apache.spark.unsafe.Platform;
import org.apache.spark.unsafe.memory.MemoryAllocator;

/**
* Allocate, release and expand ByteBuffers (in-place if possible).
Expand All @@ -30,7 +32,9 @@ public abstract class BufferAllocator implements Closeable {
public static final String STORE_DATA_FRAME_OUTPUT =
"STORE_DATA_FRAME_OUTPUT";

/** special owner indicating execution pool memory */
/**
* Special owner indicating execution pool memory.
*/
public static final String EXECUTION = "EXECUTION";

/**
Expand Down Expand Up @@ -125,7 +129,39 @@ public ByteBuffer transfer(ByteBuffer buffer, String owner) {
* For direct ByteBuffers the release method is preferred to eagerly release
* the memory instead of depending on heap GC which can be delayed.
*/
public abstract void release(ByteBuffer buffer);
public final void release(ByteBuffer buffer) {
releaseBuffer(buffer);
}

/**
* For direct ByteBuffers the release method is preferred to eagerly release
* the memory instead of depending on heap GC which can be delayed.
*/
public static boolean releaseBuffer(ByteBuffer buffer) {
final boolean hasArray = buffer.hasArray();
if (MemoryAllocator.MEMORY_DEBUG_FILL_ENABLED) {
Object baseObject;
long baseOffset;
if (hasArray) {
baseObject = buffer.array();
baseOffset = Platform.BYTE_ARRAY_OFFSET + buffer.arrayOffset();
} else {
baseObject = null;
baseOffset = UnsafeHolder.getDirectBufferAddress(buffer);
}
Platform.setMemory(baseObject, baseOffset, buffer.capacity(),
MemoryAllocator.MEMORY_DEBUG_FILL_FREED_VALUE);
}
// Actual release should depend on buffer type and not allocator type.
// Reserved off-heap space will be decremented by FreeMemory implementation.
if (hasArray) {
buffer.rewind().limit(0);
return false;
} else {
UnsafeHolder.releaseDirectBuffer(buffer);
return true;
}
}

/**
* Indicates if this allocator will produce direct ByteBuffers.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -125,14 +125,6 @@ public ByteBuffer transfer(ByteBuffer buffer, String owner) {
}
}

@Override
public void release(ByteBuffer buffer) {
if (MemoryAllocator.MEMORY_DEBUG_FILL_ENABLED) {
buffer.rewind();
fill(buffer, MemoryAllocator.MEMORY_DEBUG_FILL_FREED_VALUE);
}
}

@Override
public boolean isDirect() {
return false;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -165,16 +165,6 @@ public ByteBuffer transfer(ByteBuffer buffer, String owner) {
}
}

@Override
public void release(ByteBuffer buffer) {
if (MemoryAllocator.MEMORY_DEBUG_FILL_ENABLED) {
buffer.rewind();
fill(buffer, MemoryAllocator.MEMORY_DEBUG_FILL_FREED_VALUE);
}
// reserved bytes will be decremented via FreeMemory implementations
UnsafeHolder.releaseDirectBuffer(buffer);
}

@Override
public boolean isDirect() {
return true;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -297,19 +297,9 @@ public static void changeDirectBufferCleaner(
}

/**
* Release explicitly if the passed ByteBuffer is a direct one. Avoid using
* Release explicitly assuming passed ByteBuffer is a direct one. Avoid using
* this directly rather use BufferAllocator.allocate/release where possible.
*/
public static void releaseIfDirectBuffer(ByteBuffer buffer) {
if (buffer != null) {
if (buffer.isDirect()) {
releaseDirectBuffer(buffer);
} else {
buffer.rewind().limit(0);
}
}
}

public static void releaseDirectBuffer(ByteBuffer buffer) {
sun.misc.Cleaner cleaner = ((sun.nio.ch.DirectBuffer)buffer).cleaner();
if (cleaner != null) {
Expand Down

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Original file line number Diff line number Diff line change
Expand Up @@ -29,10 +29,10 @@ import java.nio.ByteBuffer;
import java.util.*;
import javax.annotation.Generated;

import com.gemstone.gemfire.internal.shared.BufferAllocator;
import com.gemstone.gemfire.internal.shared.ByteBufferReference;
import com.gemstone.gemfire.internal.shared.ClientSharedData;
import com.gemstone.gemfire.internal.shared.FetchRequest;
import com.gemstone.gemfire.internal.shared.unsafe.UnsafeHolder;
import io.snappydata.thrift.common.SocketTimeout;
import io.snappydata.thrift.common.TProtocolDirectBinary;
import io.snappydata.thrift.common.ThriftUtils;
Expand Down Expand Up @@ -113,8 +113,8 @@ public class BlobChunk implements org.apache.thrift.TBase<BlobChunk, BlobChunk._
reference.release();
}
this.chunkReference = null;
} else {
UnsafeHolder.releaseIfDirectBuffer(this.chunk);
} else if (this.chunk != null) {
BufferAllocator.releaseBuffer(this.chunk);
}
this.chunk = ClientSharedData.NULL_BUFFER;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -458,7 +458,7 @@ exception SnappyException {
}

// default batch size
const i32 DEFAULT_RESULTSET_BATCHSIZE = 262144
const i32 DEFAULT_RESULTSET_BATCHSIZE = 8192
// default LOB chunk size
const i32 DEFAULT_LOB_CHUNKSIZE = 2097152 // 2MB

Expand Down

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,13 @@
import javax.xml.transform.TransformerException;

import com.gemstone.gemfire.LogWriter;
import com.gemstone.gemfire.cache.*;
import com.gemstone.gemfire.cache.Cache;
import com.gemstone.gemfire.cache.CacheException;
import com.gemstone.gemfire.cache.CacheListener;
import com.gemstone.gemfire.cache.DiskAccessException;
import com.gemstone.gemfire.cache.PartitionAttributesFactory;
import com.gemstone.gemfire.cache.Region;
import com.gemstone.gemfire.cache.RegionAttributes;
import com.gemstone.gemfire.cache.asyncqueue.AsyncEventQueue;
import com.gemstone.gemfire.cache.hdfs.internal.HDFSStoreImpl;
import com.gemstone.gemfire.cache.wan.GatewayReceiver;
Expand All @@ -48,7 +54,6 @@
import com.gemstone.gemfire.internal.cache.PartitionAttributesImpl;
import com.gemstone.gemfire.internal.concurrent.ConcurrentHashSet;
import com.pivotal.gemfirexd.NetworkInterface.ConnectionListener;
import com.pivotal.gemfirexd.ddl.IndexPersistenceDUnit;
import com.pivotal.gemfirexd.internal.engine.GemFireXDQueryObserverAdapter;
import com.pivotal.gemfirexd.internal.engine.GemFireXDQueryObserverHolder;
import com.pivotal.gemfirexd.internal.engine.GfxdConstants;
Expand Down Expand Up @@ -152,11 +157,6 @@ public class DistributedSQLTestBase extends DistributedTestBase {
public static final char fileSeparator = System.getProperty("file.separator")
.charAt(0);

/** this indicates whether beforeClass has been executed for current class */
protected static boolean beforeClassDone;
/** this stores the last test method in the current class for afterClass */
protected static String lastTest;

private static transient DistributedSQLTestBase testInstance = null;

private volatile boolean configureDefaultOffHeap = false;
Expand Down Expand Up @@ -347,7 +347,54 @@ protected void baseSetUp() throws Exception {
@Override
public void setUp() throws Exception {
baseSetUp();
IndexPersistenceDUnit.deleteAllOplogFiles();
deleteAllOplogFiles();
}

public static void deleteAllOplogFiles() throws IOException {
try {
File currDir = new File(".");
File[] files = currDir.listFiles();
getGlobalLogger().info("current dir is: " + currDir.getCanonicalPath());

if (files != null) {
for (File f : files) {
if (f.getAbsolutePath().contains("BACKUPGFXD-DEFAULT-DISKSTORE")) {
getGlobalLogger().info("deleting file: " + f + " from dir: " + currDir);
f.delete();
}
if (f.isDirectory()) {
File newDir = new File(f.getCanonicalPath());
File[] newFiles = newDir.listFiles();
for (File nf : newFiles) {
if (nf.getAbsolutePath().contains("BACKUPGFXD-DEFAULT-DISKSTORE")) {
getGlobalLogger().info(
"deleting file: " + nf + " from dir: " + newDir);
nf.delete();
}
}
}
}
for (File f : files) {
if (f.getAbsolutePath().contains("GFXD-DD-DISKSTORE")) {
getGlobalLogger().info("deleting file: " + f + " from dir: " + currDir);
f.delete();
}
if (f.isDirectory()) {
File newDir = new File(f.getCanonicalPath());
File[] newFiles = newDir.listFiles();
for (File nf : newFiles) {
if (nf.getAbsolutePath().contains("GFXD-DD-DISKSTORE")) {
getGlobalLogger().info(
"deleting file: " + nf + " from dir: " + newDir);
nf.delete();
}
}
}
}
}
} catch (IOException e) {
// ignore ...
}
}

protected void reduceLogLevelForTest(String logLevel) {
Expand Down
Loading

0 comments on commit d48e9f9

Please sign in to comment.