From e203777fd01700fbdb5e6d103fbe95352935ccc5 Mon Sep 17 00:00:00 2001 From: mariofusco Date: Fri, 8 Sep 2023 10:25:18 +0200 Subject: [PATCH 1/7] Use a virtual threads friendly pool with Jackson --- bom/application/pom.xml | 2 +- extensions/jackson/runtime/pom.xml | 2 ++ .../java/io/quarkus/jackson/runtime/ObjectMapperProducer.java | 4 ++++ 3 files changed, 7 insertions(+), 1 deletion(-) diff --git a/bom/application/pom.xml b/bom/application/pom.xml index 021b108284740..e6f1ea9436434 100644 --- a/bom/application/pom.xml +++ b/bom/application/pom.xml @@ -92,7 +92,7 @@ 2.1.0 23.0.1 1.6.1.Final - 2.15.2 + 2.16.0-SNAPSHOT 1.0.0.Final 3.12.0 1.16.0 diff --git a/extensions/jackson/runtime/pom.xml b/extensions/jackson/runtime/pom.xml index c382137d4dd66..4c4e00ac20be7 100644 --- a/extensions/jackson/runtime/pom.xml +++ b/extensions/jackson/runtime/pom.xml @@ -24,10 +24,12 @@ com.fasterxml.jackson.datatype jackson-datatype-jdk8 + 2.15.2 com.fasterxml.jackson.module jackson-module-parameter-names + 2.15.2 io.quarkus diff --git a/extensions/jackson/runtime/src/main/java/io/quarkus/jackson/runtime/ObjectMapperProducer.java b/extensions/jackson/runtime/src/main/java/io/quarkus/jackson/runtime/ObjectMapperProducer.java index 0675633a7e99e..00b756ba06a07 100644 --- a/extensions/jackson/runtime/src/main/java/io/quarkus/jackson/runtime/ObjectMapperProducer.java +++ b/extensions/jackson/runtime/src/main/java/io/quarkus/jackson/runtime/ObjectMapperProducer.java @@ -11,6 +11,7 @@ import jakarta.inject.Singleton; import com.fasterxml.jackson.annotation.JsonInclude; +import com.fasterxml.jackson.core.util.BufferRecyclerPool; import com.fasterxml.jackson.databind.DeserializationFeature; import com.fasterxml.jackson.databind.MapperFeature; import com.fasterxml.jackson.databind.ObjectMapper; @@ -63,6 +64,9 @@ public ObjectMapper objectMapper(@All List customizers, for (ObjectMapperCustomizer customizer : sortedCustomizers) { customizer.customize(objectMapper); } + if (true) { + objectMapper.getFactory().setBufferRecyclerPool(BufferRecyclerPool.LockFreePool.shared()); + } return objectMapper; } From 87c87b12702f4f537ab4da925b65a6134573d267 Mon Sep 17 00:00:00 2001 From: mariofusco Date: Fri, 15 Sep 2023 10:33:35 +0200 Subject: [PATCH 2/7] introduce HybridJacksonPool --- extensions/jackson/runtime/pom.xml | 6 + .../jackson/runtime/HybridJacksonPool.java | 141 ++++++++++++++++++ .../jackson/runtime/ObjectMapperProducer.java | 6 +- 3 files changed, 150 insertions(+), 3 deletions(-) create mode 100644 extensions/jackson/runtime/src/main/java/io/quarkus/jackson/runtime/HybridJacksonPool.java diff --git a/extensions/jackson/runtime/pom.xml b/extensions/jackson/runtime/pom.xml index 4c4e00ac20be7..f6da2bcae9cff 100644 --- a/extensions/jackson/runtime/pom.xml +++ b/extensions/jackson/runtime/pom.xml @@ -35,6 +35,12 @@ io.quarkus quarkus-arc + + + org.jctools + jctools-core + 4.0.1 + diff --git a/extensions/jackson/runtime/src/main/java/io/quarkus/jackson/runtime/HybridJacksonPool.java b/extensions/jackson/runtime/src/main/java/io/quarkus/jackson/runtime/HybridJacksonPool.java new file mode 100644 index 0000000000000..4ff38a881061d --- /dev/null +++ b/extensions/jackson/runtime/src/main/java/io/quarkus/jackson/runtime/HybridJacksonPool.java @@ -0,0 +1,141 @@ +package io.quarkus.jackson.runtime; + +import com.fasterxml.jackson.core.util.BufferRecycler; +import com.fasterxml.jackson.core.util.BufferRecyclerPool; +import org.jctools.queues.MpmcUnboundedXaddArrayQueue; +import org.jctools.util.Pow2; +import org.jctools.util.UnsafeAccess; + +import java.lang.invoke.MethodHandle; +import java.lang.invoke.MethodHandles; +import java.lang.invoke.MethodType; +import java.util.concurrent.ThreadLocalRandom; +import java.util.function.Predicate; + +public class HybridJacksonPool implements BufferRecyclerPool { + + static final BufferRecyclerPool INSTANCE = new HybridJacksonPool(); + + private HybridJacksonPool() { } + + private static final Predicate isVirtual = findIsVirtual(); + + private static Predicate findIsVirtual() { + try { + MethodHandle virtualMh = MethodHandles.publicLookup().findVirtual(Thread.class, "isVirtual", MethodType.methodType(boolean.class)); + return t -> { + try { + return (boolean) virtualMh.invokeExact(t); + } catch (Throwable e) { + throw new RuntimeException(e); + } + }; + } catch (Exception e) { + return t -> false; + } + } + + private final BufferRecyclerPool nativePool = BufferRecyclerPool.threadLocalPool(); + + static class VirtualPoolHolder { + // Lazy on-demand initialization + private static final BufferRecyclerPool virtualPool = new StripedJCToolsPool(4); + } + + @Override + public BufferRecycler acquireBufferRecycler() { + return isVirtual.test(Thread.currentThread()) ? + VirtualPoolHolder.virtualPool.acquireBufferRecycler() : + nativePool.acquireBufferRecycler(); + } + + @Override + public void releaseBufferRecycler(BufferRecycler bufferRecycler) { + if (bufferRecycler instanceof VThreadBufferRecycler) { + // if it is a PooledBufferRecycler it has been acquired by a virtual thread, so it has to be release to the same pool + VirtualPoolHolder.virtualPool.releaseBufferRecycler(bufferRecycler); + } + // the native thread pool is based on ThreadLocal, so it doesn't have anything to do on release + } + + private static class StripedJCToolsPool implements BufferRecyclerPool { + + private static final long PROBE = getProbeOffset(); + + private final int mask; + + private final MpmcUnboundedXaddArrayQueue[] queues; + + public StripedJCToolsPool(int stripesCount) { + if (stripesCount <= 0) { + throw new IllegalArgumentException("Expecting a stripesCount that is larger than 0"); + } + + int size = Pow2.roundToPowerOfTwo(stripesCount); + mask = (size - 1); + + this.queues = new MpmcUnboundedXaddArrayQueue[size]; + for (int i = 0; i < size; i++) { + this.queues[i] = new MpmcUnboundedXaddArrayQueue<>(128); + } + } + + private static long getProbeOffset() { + try { + return UnsafeAccess.UNSAFE.objectFieldOffset(Thread.class.getDeclaredField("threadLocalRandomProbe")); + } catch (NoSuchFieldException e) { + return -1L; + } + } + + private int index() { + return probe() & mask; + } + + private int probe() { + // Fast path for reliable well-distributed probe, available from JDK 7+. + // As long as PROBE is final static this branch will be constant folded + // (i.e removed). + if (PROBE != -1) { + int probe; + if ((probe = UnsafeAccess.UNSAFE.getInt(Thread.currentThread(), PROBE)) == 0) { + ThreadLocalRandom.current(); // force initialization + probe = UnsafeAccess.UNSAFE.getInt(Thread.currentThread(), PROBE); + } + return probe; + } + + /* + * Else use much worse (for values distribution) method: + * Mix thread id with golden ratio and then xorshift it + * to spread consecutive ids (see Knuth multiplicative method as reference). + */ + int probe = (int) ((Thread.currentThread().getId() * 0x9e3779b9) & Integer.MAX_VALUE); + // xorshift + probe ^= probe << 13; + probe ^= probe >>> 17; + probe ^= probe << 5; + return probe; + } + + @Override + public BufferRecycler acquireBufferRecycler() { + int index = index(); + BufferRecycler bufferRecycler = queues[index].poll(); + return bufferRecycler != null ? bufferRecycler : new VThreadBufferRecycler(index); + } + + @Override + public void releaseBufferRecycler(BufferRecycler recycler) { + queues[((VThreadBufferRecycler) recycler).slot].offer(recycler); + } + } + + private static class VThreadBufferRecycler extends BufferRecycler { + private final int slot; + + VThreadBufferRecycler(int slot) { + this.slot = slot; + } + } +} diff --git a/extensions/jackson/runtime/src/main/java/io/quarkus/jackson/runtime/ObjectMapperProducer.java b/extensions/jackson/runtime/src/main/java/io/quarkus/jackson/runtime/ObjectMapperProducer.java index 00b756ba06a07..ee13044a4bc46 100644 --- a/extensions/jackson/runtime/src/main/java/io/quarkus/jackson/runtime/ObjectMapperProducer.java +++ b/extensions/jackson/runtime/src/main/java/io/quarkus/jackson/runtime/ObjectMapperProducer.java @@ -64,9 +64,9 @@ public ObjectMapper objectMapper(@All List customizers, for (ObjectMapperCustomizer customizer : sortedCustomizers) { customizer.customize(objectMapper); } - if (true) { - objectMapper.getFactory().setBufferRecyclerPool(BufferRecyclerPool.LockFreePool.shared()); - } + + objectMapper.getFactory().setBufferRecyclerPool(HybridJacksonPool.INSTANCE); + return objectMapper; } From 4f7e8a8733c3930a6dbf50437aa79bbc614cdbb3 Mon Sep 17 00:00:00 2001 From: mariofusco Date: Fri, 22 Sep 2023 17:23:46 +0200 Subject: [PATCH 3/7] rewrite custom jackson pool to get rid of JCTools dependency and unsafe usage --- extensions/jackson/runtime/pom.xml | 6 - .../jackson/runtime/HybridJacksonPool.java | 176 +++++++++++------- 2 files changed, 105 insertions(+), 77 deletions(-) diff --git a/extensions/jackson/runtime/pom.xml b/extensions/jackson/runtime/pom.xml index f6da2bcae9cff..4c4e00ac20be7 100644 --- a/extensions/jackson/runtime/pom.xml +++ b/extensions/jackson/runtime/pom.xml @@ -35,12 +35,6 @@ io.quarkus quarkus-arc - - - org.jctools - jctools-core - 4.0.1 - diff --git a/extensions/jackson/runtime/src/main/java/io/quarkus/jackson/runtime/HybridJacksonPool.java b/extensions/jackson/runtime/src/main/java/io/quarkus/jackson/runtime/HybridJacksonPool.java index 4ff38a881061d..9448748248a41 100644 --- a/extensions/jackson/runtime/src/main/java/io/quarkus/jackson/runtime/HybridJacksonPool.java +++ b/extensions/jackson/runtime/src/main/java/io/quarkus/jackson/runtime/HybridJacksonPool.java @@ -2,44 +2,24 @@ import com.fasterxml.jackson.core.util.BufferRecycler; import com.fasterxml.jackson.core.util.BufferRecyclerPool; -import org.jctools.queues.MpmcUnboundedXaddArrayQueue; -import org.jctools.util.Pow2; -import org.jctools.util.UnsafeAccess; import java.lang.invoke.MethodHandle; import java.lang.invoke.MethodHandles; import java.lang.invoke.MethodType; -import java.util.concurrent.ThreadLocalRandom; +import java.util.concurrent.atomic.AtomicReferenceArray; import java.util.function.Predicate; public class HybridJacksonPool implements BufferRecyclerPool { static final BufferRecyclerPool INSTANCE = new HybridJacksonPool(); - private HybridJacksonPool() { } - - private static final Predicate isVirtual = findIsVirtual(); - - private static Predicate findIsVirtual() { - try { - MethodHandle virtualMh = MethodHandles.publicLookup().findVirtual(Thread.class, "isVirtual", MethodType.methodType(boolean.class)); - return t -> { - try { - return (boolean) virtualMh.invokeExact(t); - } catch (Throwable e) { - throw new RuntimeException(e); - } - }; - } catch (Exception e) { - return t -> false; - } - } + private static final Predicate isVirtual = VirtualPredicate.findIsVirtualPredicate(); private final BufferRecyclerPool nativePool = BufferRecyclerPool.threadLocalPool(); static class VirtualPoolHolder { // Lazy on-demand initialization - private static final BufferRecyclerPool virtualPool = new StripedJCToolsPool(4); + private static final BufferRecyclerPool virtualPool = new StripedLockFreePool(4); } @Override @@ -58,58 +38,119 @@ public void releaseBufferRecycler(BufferRecycler bufferRecycler) { // the native thread pool is based on ThreadLocal, so it doesn't have anything to do on release } - private static class StripedJCToolsPool implements BufferRecyclerPool { + private static class StripedLockFreePool implements BufferRecyclerPool { - private static final long PROBE = getProbeOffset(); + private static final int CACHE_LINE_SHIFT = 4; - private final int mask; + private static final int CACHE_LINE_PADDING = 1 << CACHE_LINE_SHIFT; + + private final XorShiftThreadProbe threadProbe; - private final MpmcUnboundedXaddArrayQueue[] queues; + private final AtomicReferenceArray heads; - public StripedJCToolsPool(int stripesCount) { + public StripedLockFreePool(int stripesCount) { if (stripesCount <= 0) { throw new IllegalArgumentException("Expecting a stripesCount that is larger than 0"); } - int size = Pow2.roundToPowerOfTwo(stripesCount); - mask = (size - 1); + int size = roundToPowerOfTwo(stripesCount); + this.heads = new AtomicReferenceArray<>(size * CACHE_LINE_PADDING); + + int mask = (size - 1) << CACHE_LINE_SHIFT; + this.threadProbe = new XorShiftThreadProbe(mask); + } + + @Override + public BufferRecycler acquireBufferRecycler() { + int index = threadProbe.index(); + + Node currentHead = heads.get(index); + while (true) { + if (currentHead == null) { + return new VThreadBufferRecycler(index); + } - this.queues = new MpmcUnboundedXaddArrayQueue[size]; - for (int i = 0; i < size; i++) { - this.queues[i] = new MpmcUnboundedXaddArrayQueue<>(128); + Node witness = heads.compareAndExchange(index, currentHead, currentHead.next); + if (witness == currentHead) { + currentHead.next = null; + return currentHead.value; + } else { + currentHead = witness; + } } } - private static long getProbeOffset() { + @Override + public void releaseBufferRecycler(BufferRecycler recycler) { + VThreadBufferRecycler vThreadBufferRecycler = (VThreadBufferRecycler) recycler; + Node newHead = new Node(vThreadBufferRecycler); + + Node next = heads.get(vThreadBufferRecycler.slot); + while (true) { + Node witness = heads.compareAndExchange(vThreadBufferRecycler.slot, next, newHead); + if (witness == next) { + newHead.next = next; + return; + } else { + next = witness; + } + } + } + + private static class Node { + final VThreadBufferRecycler value; + Node next; + + Node(VThreadBufferRecycler value) { + this.value = value; + } + } + } + + private static class VThreadBufferRecycler extends BufferRecycler { + private final int slot; + + VThreadBufferRecycler(int slot) { + this.slot = slot; + } + } + + private static class VirtualPredicate { + private static final MethodHandle virtualMh = findVirtualMH(); + + private static MethodHandle findVirtualMH() { try { - return UnsafeAccess.UNSAFE.objectFieldOffset(Thread.class.getDeclaredField("threadLocalRandomProbe")); - } catch (NoSuchFieldException e) { - return -1L; + return MethodHandles.publicLookup().findVirtual(Thread.class, "isVirtual", MethodType.methodType(boolean.class)); + } catch (Exception e) { + return null; } } - private int index() { + private static Predicate findIsVirtualPredicate() { + return virtualMh != null ? t -> { + try { + return (boolean) virtualMh.invokeExact(t); + } catch (Throwable e) { + throw new RuntimeException(e); + } + } : t -> false; + } + } + + private static class XorShiftThreadProbe { + + private final int mask; + + + XorShiftThreadProbe(int mask) { + this.mask = mask; + } + + public int index() { return probe() & mask; } private int probe() { - // Fast path for reliable well-distributed probe, available from JDK 7+. - // As long as PROBE is final static this branch will be constant folded - // (i.e removed). - if (PROBE != -1) { - int probe; - if ((probe = UnsafeAccess.UNSAFE.getInt(Thread.currentThread(), PROBE)) == 0) { - ThreadLocalRandom.current(); // force initialization - probe = UnsafeAccess.UNSAFE.getInt(Thread.currentThread(), PROBE); - } - return probe; - } - - /* - * Else use much worse (for values distribution) method: - * Mix thread id with golden ratio and then xorshift it - * to spread consecutive ids (see Knuth multiplicative method as reference). - */ int probe = (int) ((Thread.currentThread().getId() * 0x9e3779b9) & Integer.MAX_VALUE); // xorshift probe ^= probe << 13; @@ -117,25 +158,18 @@ private int probe() { probe ^= probe << 5; return probe; } - - @Override - public BufferRecycler acquireBufferRecycler() { - int index = index(); - BufferRecycler bufferRecycler = queues[index].poll(); - return bufferRecycler != null ? bufferRecycler : new VThreadBufferRecycler(index); - } - - @Override - public void releaseBufferRecycler(BufferRecycler recycler) { - queues[((VThreadBufferRecycler) recycler).slot].offer(recycler); - } } - private static class VThreadBufferRecycler extends BufferRecycler { - private final int slot; + private static final int MAX_POW2 = 1 << 30; - VThreadBufferRecycler(int slot) { - this.slot = slot; + private static int roundToPowerOfTwo(final int value) { + if (value > MAX_POW2) { + throw new IllegalArgumentException("There is no larger power of 2 int for value:"+value+" since it exceeds 2^31."); + } + if (value < 0) { + throw new IllegalArgumentException("Given value:"+value+". Expecting value >= 0."); } + final int nextPow2 = 1 << (32 - Integer.numberOfLeadingZeros(value - 1)); + return nextPow2; } } From fe0183a3f37a7f27187dbcddd378ff4ad23f3ade Mon Sep 17 00:00:00 2001 From: mariofusco Date: Thu, 28 Sep 2023 10:01:58 +0200 Subject: [PATCH 4/7] upgrade to new jackson pool API --- .../jackson/runtime/HybridJacksonPool.java | 27 ++++++++++--------- .../jackson/runtime/ObjectMapperProducer.java | 1 - 2 files changed, 14 insertions(+), 14 deletions(-) diff --git a/extensions/jackson/runtime/src/main/java/io/quarkus/jackson/runtime/HybridJacksonPool.java b/extensions/jackson/runtime/src/main/java/io/quarkus/jackson/runtime/HybridJacksonPool.java index 9448748248a41..0ff4b58f69d83 100644 --- a/extensions/jackson/runtime/src/main/java/io/quarkus/jackson/runtime/HybridJacksonPool.java +++ b/extensions/jackson/runtime/src/main/java/io/quarkus/jackson/runtime/HybridJacksonPool.java @@ -1,7 +1,8 @@ package io.quarkus.jackson.runtime; import com.fasterxml.jackson.core.util.BufferRecycler; -import com.fasterxml.jackson.core.util.BufferRecyclerPool; +import com.fasterxml.jackson.core.util.JsonBufferRecyclers; +import com.fasterxml.jackson.core.util.RecyclerPool; import java.lang.invoke.MethodHandle; import java.lang.invoke.MethodHandles; @@ -9,36 +10,36 @@ import java.util.concurrent.atomic.AtomicReferenceArray; import java.util.function.Predicate; -public class HybridJacksonPool implements BufferRecyclerPool { +public class HybridJacksonPool implements RecyclerPool { - static final BufferRecyclerPool INSTANCE = new HybridJacksonPool(); + static final RecyclerPool INSTANCE = new HybridJacksonPool(); private static final Predicate isVirtual = VirtualPredicate.findIsVirtualPredicate(); - private final BufferRecyclerPool nativePool = BufferRecyclerPool.threadLocalPool(); + private final RecyclerPool nativePool = JsonBufferRecyclers.threadLocalPool(); static class VirtualPoolHolder { // Lazy on-demand initialization - private static final BufferRecyclerPool virtualPool = new StripedLockFreePool(4); + private static final RecyclerPool virtualPool = new StripedLockFreePool(4); } @Override - public BufferRecycler acquireBufferRecycler() { + public BufferRecycler acquirePooled() { return isVirtual.test(Thread.currentThread()) ? - VirtualPoolHolder.virtualPool.acquireBufferRecycler() : - nativePool.acquireBufferRecycler(); + VirtualPoolHolder.virtualPool.acquirePooled() : + nativePool.acquirePooled(); } @Override - public void releaseBufferRecycler(BufferRecycler bufferRecycler) { + public void releasePooled(BufferRecycler bufferRecycler) { if (bufferRecycler instanceof VThreadBufferRecycler) { // if it is a PooledBufferRecycler it has been acquired by a virtual thread, so it has to be release to the same pool - VirtualPoolHolder.virtualPool.releaseBufferRecycler(bufferRecycler); + VirtualPoolHolder.virtualPool.releasePooled(bufferRecycler); } // the native thread pool is based on ThreadLocal, so it doesn't have anything to do on release } - private static class StripedLockFreePool implements BufferRecyclerPool { + private static class StripedLockFreePool implements RecyclerPool { private static final int CACHE_LINE_SHIFT = 4; @@ -61,7 +62,7 @@ public StripedLockFreePool(int stripesCount) { } @Override - public BufferRecycler acquireBufferRecycler() { + public BufferRecycler acquirePooled() { int index = threadProbe.index(); Node currentHead = heads.get(index); @@ -81,7 +82,7 @@ public BufferRecycler acquireBufferRecycler() { } @Override - public void releaseBufferRecycler(BufferRecycler recycler) { + public void releasePooled(BufferRecycler recycler) { VThreadBufferRecycler vThreadBufferRecycler = (VThreadBufferRecycler) recycler; Node newHead = new Node(vThreadBufferRecycler); diff --git a/extensions/jackson/runtime/src/main/java/io/quarkus/jackson/runtime/ObjectMapperProducer.java b/extensions/jackson/runtime/src/main/java/io/quarkus/jackson/runtime/ObjectMapperProducer.java index ee13044a4bc46..4f098f1583397 100644 --- a/extensions/jackson/runtime/src/main/java/io/quarkus/jackson/runtime/ObjectMapperProducer.java +++ b/extensions/jackson/runtime/src/main/java/io/quarkus/jackson/runtime/ObjectMapperProducer.java @@ -11,7 +11,6 @@ import jakarta.inject.Singleton; import com.fasterxml.jackson.annotation.JsonInclude; -import com.fasterxml.jackson.core.util.BufferRecyclerPool; import com.fasterxml.jackson.databind.DeserializationFeature; import com.fasterxml.jackson.databind.MapperFeature; import com.fasterxml.jackson.databind.ObjectMapper; From 238152c2dab3da77120e84ce9708bdc1cf1bef1d Mon Sep 17 00:00:00 2001 From: mariofusco Date: Thu, 28 Sep 2023 11:36:03 +0200 Subject: [PATCH 5/7] add javadoc for the pool --- .../jackson/runtime/HybridJacksonPool.java | 19 +++++++++++++++++++ 1 file changed, 19 insertions(+) diff --git a/extensions/jackson/runtime/src/main/java/io/quarkus/jackson/runtime/HybridJacksonPool.java b/extensions/jackson/runtime/src/main/java/io/quarkus/jackson/runtime/HybridJacksonPool.java index 0ff4b58f69d83..37dcb9a41c226 100644 --- a/extensions/jackson/runtime/src/main/java/io/quarkus/jackson/runtime/HybridJacksonPool.java +++ b/extensions/jackson/runtime/src/main/java/io/quarkus/jackson/runtime/HybridJacksonPool.java @@ -10,6 +10,25 @@ import java.util.concurrent.atomic.AtomicReferenceArray; import java.util.function.Predicate; +/** + * This is a custom implementation of the Jackson's {@link RecyclerPool} intended to work equally well with both + * platform and virtual threads. This pool works regardless of the version of the JVM in use and internally uses + * 2 distinct pools one for platform threads (which is exactly the same {@link ThreadLocal} based one provided + * by Jackson out of the box) and the other designed for being virtual threads friendly. It switches between + * the 2 only depending on the nature of thread (virtual or not) requiring the acquisition of a pooled resource, + * obtained via {@link MethodHandle} to guarantee compatibility also with old JVM versions. The pool also guarantees + * that the pooled resource is always released to the same internal pool from where it has been acquired, regardless + * if the releasing thread is different from the one that originally made the acquisition. + *

+ * The virtual thread friendly inner pool is implemented with N striped linked lists using a simple lock free + * algorithm based on CAS. The striping is performed shuffling the id of the thread requiring to acquire a pooled + * resource with a xorshift based computation. The resulting of this computation is also stored in the pooled resource, + * bringing the twofold advantage of always releasing the resource in the same bucket from where it has been taken + * regardless if the releasing thread is different from the one that did the acquisition and avoiding the need of + * recalculating the position of that bucket also during the release. The heads of the linked lists are hold in an + * {@link AtomicReferenceArray} where each head has a distance of 16 positions from the adjacent ones to prevent + * the false sharing problem. + */ public class HybridJacksonPool implements RecyclerPool { static final RecyclerPool INSTANCE = new HybridJacksonPool(); From fb85b8ab6a1e281961a048d2fb62367a42954ea0 Mon Sep 17 00:00:00 2001 From: mariofusco Date: Thu, 28 Sep 2023 17:10:32 +0200 Subject: [PATCH 6/7] remove use of lambda expression --- .../jackson/runtime/HybridJacksonPool.java | 24 ++++++++++++++----- 1 file changed, 18 insertions(+), 6 deletions(-) diff --git a/extensions/jackson/runtime/src/main/java/io/quarkus/jackson/runtime/HybridJacksonPool.java b/extensions/jackson/runtime/src/main/java/io/quarkus/jackson/runtime/HybridJacksonPool.java index 37dcb9a41c226..f21bc4d49d88f 100644 --- a/extensions/jackson/runtime/src/main/java/io/quarkus/jackson/runtime/HybridJacksonPool.java +++ b/extensions/jackson/runtime/src/main/java/io/quarkus/jackson/runtime/HybridJacksonPool.java @@ -147,13 +147,25 @@ private static MethodHandle findVirtualMH() { } private static Predicate findIsVirtualPredicate() { - return virtualMh != null ? t -> { - try { - return (boolean) virtualMh.invokeExact(t); - } catch (Throwable e) { - throw new RuntimeException(e); + if (virtualMh != null) { + return new Predicate() { + @Override + public boolean test(Thread thread) { + try { + return (boolean) virtualMh.invokeExact(thread); + } catch (Throwable e) { + throw new RuntimeException(e); + } + } + }; + } + + return new Predicate() { + @Override + public boolean test(Thread thread) { + return false; } - } : t -> false; + }; } } From 15cc29c0906585fc1b7fe58f0f296f9e63b8548e Mon Sep 17 00:00:00 2001 From: mariofusco Date: Wed, 25 Oct 2023 12:16:11 +0200 Subject: [PATCH 7/7] add comment on Fibonacci hashing implementation --- .../jackson/runtime/HybridJacksonPool.java | 25 +++++++++++-------- 1 file changed, 14 insertions(+), 11 deletions(-) diff --git a/extensions/jackson/runtime/src/main/java/io/quarkus/jackson/runtime/HybridJacksonPool.java b/extensions/jackson/runtime/src/main/java/io/quarkus/jackson/runtime/HybridJacksonPool.java index f21bc4d49d88f..d6b2ba1a43c15 100644 --- a/extensions/jackson/runtime/src/main/java/io/quarkus/jackson/runtime/HybridJacksonPool.java +++ b/extensions/jackson/runtime/src/main/java/io/quarkus/jackson/runtime/HybridJacksonPool.java @@ -1,15 +1,15 @@ package io.quarkus.jackson.runtime; -import com.fasterxml.jackson.core.util.BufferRecycler; -import com.fasterxml.jackson.core.util.JsonBufferRecyclers; -import com.fasterxml.jackson.core.util.RecyclerPool; - import java.lang.invoke.MethodHandle; import java.lang.invoke.MethodHandles; import java.lang.invoke.MethodType; import java.util.concurrent.atomic.AtomicReferenceArray; import java.util.function.Predicate; +import com.fasterxml.jackson.core.util.BufferRecycler; +import com.fasterxml.jackson.core.util.JsonBufferRecyclers; +import com.fasterxml.jackson.core.util.RecyclerPool; + /** * This is a custom implementation of the Jackson's {@link RecyclerPool} intended to work equally well with both * platform and virtual threads. This pool works regardless of the version of the JVM in use and internally uses @@ -44,9 +44,8 @@ static class VirtualPoolHolder { @Override public BufferRecycler acquirePooled() { - return isVirtual.test(Thread.currentThread()) ? - VirtualPoolHolder.virtualPool.acquirePooled() : - nativePool.acquirePooled(); + return isVirtual.test(Thread.currentThread()) ? VirtualPoolHolder.virtualPool.acquirePooled() + : nativePool.acquirePooled(); } @Override @@ -140,7 +139,8 @@ private static class VirtualPredicate { private static MethodHandle findVirtualMH() { try { - return MethodHandles.publicLookup().findVirtual(Thread.class, "isVirtual", MethodType.methodType(boolean.class)); + return MethodHandles.publicLookup().findVirtual(Thread.class, "isVirtual", + MethodType.methodType(boolean.class)); } catch (Exception e) { return null; } @@ -173,7 +173,6 @@ private static class XorShiftThreadProbe { private final int mask; - XorShiftThreadProbe(int mask) { this.mask = mask; } @@ -183,6 +182,9 @@ public int index() { } private int probe() { + // Multiplicative Fibonacci hashing implementation + // 0x9e3779b9 is the integral part of the Golden Ratio's fractional part 0.61803398875… (sqrt(5)-1)/2 + // multiplied by 2^32, which has the best possible scattering properties. int probe = (int) ((Thread.currentThread().getId() * 0x9e3779b9) & Integer.MAX_VALUE); // xorshift probe ^= probe << 13; @@ -196,10 +198,11 @@ private int probe() { private static int roundToPowerOfTwo(final int value) { if (value > MAX_POW2) { - throw new IllegalArgumentException("There is no larger power of 2 int for value:"+value+" since it exceeds 2^31."); + throw new IllegalArgumentException( + "There is no larger power of 2 int for value:" + value + " since it exceeds 2^31."); } if (value < 0) { - throw new IllegalArgumentException("Given value:"+value+". Expecting value >= 0."); + throw new IllegalArgumentException("Given value:" + value + ". Expecting value >= 0."); } final int nextPow2 = 1 << (32 - Integer.numberOfLeadingZeros(value - 1)); return nextPow2;