Skip to content

Commit a025112

Browse files
committed
GH-5442 Further optimize multi-threading performance and reduce copy operations.
1 parent 18f1bef commit a025112

File tree

2 files changed

+90
-54
lines changed

2 files changed

+90
-54
lines changed

core/sail/lmdb/src/main/java/org/eclipse/rdf4j/sail/lmdb/LmdbSailStore.java

Lines changed: 11 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -216,7 +216,7 @@ void rollback() throws SailException {
216216
if (tripleStoreException != null) {
217217
throw wrapTripleStoreException();
218218
} else {
219-
Thread.yield();
219+
Thread.onSpinWait();
220220
}
221221
}
222222
} else {
@@ -478,7 +478,7 @@ public void flush() throws SailException {
478478
if (tripleStoreException != null) {
479479
throw wrapTripleStoreException();
480480
} else {
481-
Thread.yield();
481+
Thread.onSpinWait();
482482
}
483483
}
484484
}
@@ -491,7 +491,7 @@ public void flush() throws SailException {
491491
if (tripleStoreException != null) {
492492
throw wrapTripleStoreException();
493493
} else {
494-
Thread.yield();
494+
Thread.onSpinWait();
495495
}
496496
}
497497
}
@@ -676,30 +676,30 @@ private void startTransaction(boolean preferThreading) throws SailException {
676676
} else if (Thread.interrupted()) {
677677
throw new InterruptedException();
678678
} else {
679-
Thread.yield();
679+
Thread.onSpinWait();
680680
}
681681
}
682682
}
683683

684-
// keep thread running for at least 2ms to lock-free wait for the next
684+
// keep thread running for a short while to lock-free wait for the next
685685
// transaction
686686
long start = 0;
687687
while (running.get() && !nextTransactionAsync) {
688688
if (start == 0) {
689689
// System.currentTimeMillis() is expensive, so only call it when we
690690
// are sure we need to wait
691-
start = System.currentTimeMillis();
691+
start = System.nanoTime();
692692
}
693693

694-
if (System.currentTimeMillis() - start > 2) {
694+
if (System.nanoTime() - start > 100000) {
695695
synchronized (storeTxnStarted) {
696696
if (!nextTransactionAsync) {
697697
running.set(false);
698698
return;
699699
}
700700
}
701701
} else {
702-
Thread.yield();
702+
Thread.onSpinWait();
703703
}
704704
}
705705
}
@@ -846,15 +846,15 @@ public void execute() throws Exception {
846846
if (tripleStoreException != null) {
847847
throw wrapTripleStoreException();
848848
} else {
849-
Thread.yield();
849+
Thread.onSpinWait();
850850
}
851851
}
852852

853853
while (!removeOp.finished) {
854854
if (tripleStoreException != null) {
855855
throw wrapTripleStoreException();
856856
} else {
857-
Thread.yield();
857+
Thread.onSpinWait();
858858
}
859859
}
860860
return removeCount[0];
@@ -932,7 +932,7 @@ public CloseableIteration<? extends Statement> getStatements(Resource subj, IRI
932932
try {
933933
logger.warn("Failed to get statements, retrying", e);
934934
// try once more before giving up
935-
Thread.yield();
935+
Thread.onSpinWait();
936936
return createStatementIterator(txn, subj, pred, obj, explicit, contexts);
937937
} catch (IOException e2) {
938938
throw new SailException("Unable to get statements", e);

core/sail/lmdb/src/main/java/org/eclipse/rdf4j/sail/lmdb/ValueStore.java

Lines changed: 79 additions & 43 deletions
Original file line numberDiff line numberDiff line change
@@ -82,6 +82,7 @@
8282
import org.eclipse.rdf4j.sail.lmdb.model.LmdbValue;
8383
import org.lwjgl.PointerBuffer;
8484
import org.lwjgl.system.MemoryStack;
85+
import org.lwjgl.system.MemoryUtil;
8586
import org.lwjgl.util.lmdb.MDBEnvInfo;
8687
import org.lwjgl.util.lmdb.MDBStat;
8788
import org.lwjgl.util.lmdb.MDBVal;
@@ -127,6 +128,8 @@ class ValueStore extends AbstractValueFactory {
127128
* A simple cache containing the [VALUE_CACHE_SIZE] most-recently used values stored by their ID.
128129
*/
129130
private final AtomicReferenceArray<LmdbValue> valueCache;
131+
private final int VALUE_CACHE_SIZE;
132+
130133
/**
131134
* A simple cache containing the [ID_CACHE_SIZE] most-recently used value-IDs stored by their value.
132135
*/
@@ -194,7 +197,8 @@ class ValueStore extends AbstractValueFactory {
194197
this.mapSize = config.getValueDBSize();
195198
open();
196199

197-
valueCache = new AtomicReferenceArray<>(config.getValueCacheSize());
200+
VALUE_CACHE_SIZE = config.getValueCacheSize();
201+
valueCache = new AtomicReferenceArray<>(VALUE_CACHE_SIZE);
198202
valueIDCache = new ConcurrentCache<>(config.getValueIDCacheSize());
199203
namespaceCache = new ConcurrentCache<>(config.getNamespaceCacheSize());
200204
namespaceIDCache = new ConcurrentCache<>(config.getNamespaceIDCacheSize());
@@ -442,8 +446,8 @@ protected byte[] getData(long id) throws IOException {
442446
* @param id ID of a value object
443447
* @return the value object or <code>null</code> if not found
444448
*/
445-
LmdbValue cachedValue(long id) {
446-
LmdbValue value = valueCache.get((int) (id % valueCache.length()));
449+
LmdbValue cachedValue(final long id) {
450+
LmdbValue value = valueCache.get((int) (id % VALUE_CACHE_SIZE));
447451
if (value != null && value.getInternalID() == id) {
448452
return value;
449453
}
@@ -457,10 +461,9 @@ LmdbValue cachedValue(long id) {
457461
*
458462
* @param id ID of a value object
459463
* @param value ID of a value object
460-
* @return the value object or <code>null</code> if not found
461464
*/
462-
void cacheValue(long id, LmdbValue value) {
463-
valueCache.lazySet((int) (id % valueCache.length()), value);
465+
void cacheValue(final long id, final LmdbValue value) {
466+
valueCache.lazySet((int) (id % VALUE_CACHE_SIZE), value);
464467
}
465468

466469
/**
@@ -593,17 +596,16 @@ private void resizeMap(long txn, long requiredSize) throws IOException {
593596
}
594597
}
595598

596-
private void incrementRefCount(MemoryStack stack, long writeTxn, byte[] data) throws IOException {
599+
private void incrementRefCount(MemoryStack stack, long writeTxn, ByteBuffer data) throws IOException {
597600
// literals have a datatype id and URIs have a namespace id
598-
if (data[0] == LITERAL_VALUE || data[0] == URI_VALUE) {
601+
if (data.get(0) == LITERAL_VALUE || data.get(0) == URI_VALUE) {
599602
try {
600603
stack.push();
601-
ByteBuffer bb = ByteBuffer.wrap(data);
602604
// skip type marker
603-
int idLength = Varint.firstToLength(bb.get(1));
605+
int idLength = Varint.firstToLength(data.get(1));
604606
MDBVal idVal = MDBVal.calloc(stack);
605607
MDBVal dataVal = MDBVal.calloc(stack);
606-
idVal.mv_data(idBuffer(stack).put(ID_KEY).put(data, 1, idLength).flip());
608+
idVal.mv_data(idBuffer(stack).put(ID_KEY).put(data.duplicate().position(1).limit(1 + idLength)).flip());
607609
long newCount = 1;
608610
if (mdb_get(writeTxn, refCountsDbi, idVal, dataVal) == MDB_SUCCESS) {
609611
// update count
@@ -647,11 +649,12 @@ private boolean decrementRefCount(MemoryStack stack, long writeTxn, ByteBuffer i
647649
}
648650
}
649651

650-
private long findId(byte[] data, boolean create) throws IOException {
652+
private long findId(ByteBuffer data, boolean create) throws IOException {
651653
Long id = readTransaction(env, (stack, txn) -> {
652-
if (data.length <= MAX_KEY_SIZE) {
654+
int dataLength = data.remaining();
655+
if (dataLength <= MAX_KEY_SIZE) {
653656
MDBVal dataVal = MDBVal.calloc(stack);
654-
dataVal.mv_data(stack.bytes(data));
657+
dataVal.mv_data(data);
655658
MDBVal idVal = MDBVal.calloc(stack);
656659
if (mdb_get(txn, dbi, dataVal, idVal) == MDB_SUCCESS) {
657660
return data2id(idVal.mv_data());
@@ -660,15 +663,13 @@ private long findId(byte[] data, boolean create) throws IOException {
660663
return null;
661664
}
662665
// id was not found, create a new one
663-
resizeMap(txn, 2L * data.length + 2L * (2L + Long.BYTES));
666+
resizeMap(txn, 2L * dataLength + 2L * (2L + Long.BYTES));
664667

665-
long newId = nextId(data[0]);
668+
long newId = nextId(data.get(0));
666669
writeTransaction((stack2, writeTxn) -> {
667670
idVal.mv_data(id2data(idBuffer(stack), newId).flip());
668-
669671
E(mdb_put(writeTxn, dbi, dataVal, idVal, 0));
670672
E(mdb_put(writeTxn, dbi, idVal, dataVal, 0));
671-
672673
// update ref count if necessary
673674
incrementRefCount(stack2, writeTxn, data);
674675
return null;
@@ -677,7 +678,6 @@ private long findId(byte[] data, boolean create) throws IOException {
677678
} else {
678679
MDBVal idVal = MDBVal.calloc(stack);
679680

680-
ByteBuffer dataBb = ByteBuffer.wrap(data);
681681
long dataHash = hash(data);
682682
int maxHashKeyLength = 2 + 2 * Long.BYTES + 2;
683683
ByteBuffer hashBb = stack.malloc(maxHashKeyLength);
@@ -693,7 +693,8 @@ private long findId(byte[] data, boolean create) throws IOException {
693693
// ID of first value is directly stored with hash as key
694694
if (mdb_get(txn, dbi, hashVal, dataVal) == MDB_SUCCESS) {
695695
idVal.mv_data(dataVal.mv_data());
696-
if (mdb_get(txn, dbi, idVal, dataVal) == MDB_SUCCESS && dataVal.mv_data().compareTo(dataBb) == 0) {
696+
if (mdb_get(txn, dbi, idVal, dataVal) == MDB_SUCCESS
697+
&& dataVal.mv_data().compareTo(data.duplicate()) == 0) {
697698
return data2id(idVal.mv_data());
698699
}
699700
} else {
@@ -702,18 +703,17 @@ private long findId(byte[] data, boolean create) throws IOException {
702703
return null;
703704
}
704705

705-
resizeMap(txn, 2L * data.length + 2L * (2L + Long.BYTES));
706+
resizeMap(txn, 2L * dataLength + 2L * (2L + Long.BYTES));
706707

707-
long newId = nextId(data[0]);
708+
long newId = nextId(data.get(0));
708709
writeTransaction((stack2, writeTxn) -> {
709-
dataVal.mv_size(data.length);
710710
idVal.mv_data(id2data(idBuffer(stack), newId).flip());
711711

712712
// store mapping of hash -> ID
713713
E(mdb_put(txn, dbi, hashVal, idVal, 0));
714714
// store mapping of ID -> data
715-
E(mdb_put(writeTxn, dbi, idVal, dataVal, MDB_RESERVE));
716-
dataVal.mv_data().put(data);
715+
dataVal.mv_data(data);
716+
E(mdb_put(writeTxn, dbi, idVal, dataVal, 0));
717717

718718
// update ref count if necessary
719719
incrementRefCount(stack2, writeTxn, data);
@@ -744,7 +744,7 @@ private long findId(byte[] data, boolean create) throws IOException {
744744
hashIdBb.position(hashLength);
745745
idVal.mv_data(hashIdBb);
746746
if (mdb_get(txn, dbi, idVal, dataVal) == MDB_SUCCESS
747-
&& dataVal.mv_data().compareTo(dataBb) == 0) {
747+
&& dataVal.mv_data().compareTo(data.duplicate()) == 0) {
748748
// id was found if stored value is equal to requested value
749749
return data2id(hashIdBb);
750750
}
@@ -761,9 +761,9 @@ private long findId(byte[] data, boolean create) throws IOException {
761761
}
762762

763763
// id was not found, create a new one
764-
resizeMap(txn, 1 + Long.BYTES + maxHashKeyLength + 2L * data.length);
764+
resizeMap(txn, 1 + Long.BYTES + maxHashKeyLength + 2L * dataLength);
765765

766-
long newId = nextId(data[0]);
766+
long newId = nextId(data.get(0));
767767
writeTransaction((stack2, writeTxn) -> {
768768
// encode ID
769769
ByteBuffer idBb = id2data(idBuffer(stack), newId).flip();
@@ -781,10 +781,9 @@ private long findId(byte[] data, boolean create) throws IOException {
781781
dataVal.mv_data(stack.bytes());
782782
E(mdb_put(txn, dbi, hashVal, dataVal, 0));
783783

784-
dataVal.mv_size(data.length);
784+
dataVal.mv_data(data);
785785
// store mapping of ID -> data
786-
E(mdb_put(txn, dbi, idVal, dataVal, MDB_RESERVE));
787-
dataVal.mv_data().put(data);
786+
E(mdb_put(txn, dbi, idVal, dataVal, 0));
788787

789788
// update ref count if necessary
790789
incrementRefCount(stack2, writeTxn, data);
@@ -885,7 +884,24 @@ public long getId(Value value, boolean create) throws IOException {
885884
}
886885

887886
if (data != null) {
888-
long id = findId(data, create);
887+
long id;
888+
try (MemoryStack stack = MemoryStack.stackPush()) {
889+
int stackSize = stack.getSize();
890+
boolean allocateOnStack = data.length < stackSize - 8;
891+
ByteBuffer bb;
892+
if (allocateOnStack) {
893+
bb = stack.bytes(data);
894+
} else {
895+
bb = MemoryUtil.memAlloc(data.length).put(data).flip();
896+
}
897+
try {
898+
id = findId(bb, create);
899+
} finally {
900+
if (!allocateOnStack) {
901+
MemoryUtil.memFree(bb);
902+
}
903+
}
904+
}
889905

890906
if (id != LmdbValue.UNKNOWN_ID) {
891907
if (isOwnValue) {
@@ -1006,9 +1022,7 @@ protected void deleteValueToIdMappings(MemoryStack stack, long txn, Collection<L
10061022

10071023
int dataLength = dataBuffer.remaining();
10081024
if (dataLength > MAX_KEY_SIZE) {
1009-
byte[] data = new byte[dataLength];
1010-
dataBuffer.get(data);
1011-
long dataHash = hash(data);
1025+
long dataHash = hash(dataBuffer);
10121026

10131027
hashBb.clear();
10141028
hashBb.put(HASH_KEY);
@@ -1214,10 +1228,15 @@ public long storeValue(Value value) throws IOException {
12141228
* @param data The data to calculate the hash code for.
12151229
* @return A hash code for the supplied data.
12161230
*/
1217-
private long hash(byte[] data) {
1218-
CRC32 crc32 = new CRC32();
1219-
crc32.update(data);
1220-
return crc32.getValue();
1231+
private long hash(ByteBuffer data) {
1232+
try {
1233+
data.mark();
1234+
CRC32 crc32 = new CRC32();
1235+
crc32.update(data);
1236+
return crc32.getValue();
1237+
} finally {
1238+
data.reset();
1239+
}
12211240
}
12221241

12231242
/**
@@ -1467,11 +1486,28 @@ private long getNamespaceID(String namespace, boolean create) throws IOException
14671486
}
14681487

14691488
byte[] namespaceBytes = namespace.getBytes(StandardCharsets.UTF_8);
1470-
byte[] namespaceData = new byte[namespaceBytes.length + 1];
1471-
namespaceData[0] = NAMESPACE_VALUE;
1472-
System.arraycopy(namespaceBytes, 0, namespaceData, 1, namespaceBytes.length);
14731489

1474-
long id = findId(namespaceData, create);
1490+
long id;
1491+
try (MemoryStack stack = MemoryStack.stackPush()) {
1492+
int dataLength = 1 + namespaceBytes.length;
1493+
int stackSize = stack.getSize();
1494+
boolean allocateOnStack = dataLength < stackSize - 8;
1495+
ByteBuffer bb;
1496+
if (allocateOnStack) {
1497+
bb = stack.malloc(dataLength);
1498+
} else {
1499+
bb = MemoryUtil.memAlloc(dataLength);
1500+
}
1501+
try {
1502+
bb.put(NAMESPACE_VALUE).put(namespaceBytes).flip();
1503+
id = findId(bb, create);
1504+
} finally {
1505+
if (!allocateOnStack) {
1506+
MemoryUtil.memFree(bb);
1507+
}
1508+
}
1509+
}
1510+
14751511
if (id != LmdbValue.UNKNOWN_ID) {
14761512
namespaceIDCache.put(namespace, id);
14771513
}

0 commit comments

Comments
 (0)