From dc2f1fbbf668b579bd0f77a9f0c33eb7060bf108 Mon Sep 17 00:00:00 2001 From: Julien Viet Date: Thu, 30 May 2024 08:59:37 +0200 Subject: [PATCH] Duplicating a duplicated context is supported but the semantic of the actual locals is not defined. This update the duplicated context duplication by doing a copy of each local in the duplicated duplicate. This introduce a duplicator for each local that is responsible for copying the object when it is not null. --- .../java/io/vertx/core/impl/ContextImpl.java | 2 ++ .../io/vertx/core/impl/ContextInternal.java | 3 +- .../io/vertx/core/impl/ContextLocalImpl.java | 23 ++++++++++--- .../io/vertx/core/impl/DuplicatedContext.java | 25 +++++++++++++-- .../java/io/vertx/core/impl/LocalSeq.java | 32 +++++++++++++++---- .../java/io/vertx/core/impl/VertxImpl.java | 9 ++++-- .../spi/context/storage/ContextLocal.java | 16 +++++++++- src/test/java/io/vertx/core/ContextTest.java | 14 ++++++++ 8 files changed, 105 insertions(+), 19 deletions(-) diff --git a/src/main/java/io/vertx/core/impl/ContextImpl.java b/src/main/java/io/vertx/core/impl/ContextImpl.java index 46b73cc6881..37a0e0d631a 100644 --- a/src/main/java/io/vertx/core/impl/ContextImpl.java +++ b/src/main/java/io/vertx/core/impl/ContextImpl.java @@ -17,10 +17,12 @@ import io.vertx.core.impl.logging.Logger; import io.vertx.core.impl.logging.LoggerFactory; import io.vertx.core.json.JsonObject; +import io.vertx.core.spi.context.storage.ContextLocal; import io.vertx.core.spi.metrics.PoolMetrics; import io.vertx.core.spi.tracing.VertxTracer; import java.util.concurrent.*; +import java.util.function.UnaryOperator; /** * A base class for {@link Context} implementations. diff --git a/src/main/java/io/vertx/core/impl/ContextInternal.java b/src/main/java/io/vertx/core/impl/ContextInternal.java index 34bd5aa377e..d772e232c27 100644 --- a/src/main/java/io/vertx/core/impl/ContextInternal.java +++ b/src/main/java/io/vertx/core/impl/ContextInternal.java @@ -25,6 +25,7 @@ import java.util.Objects; import java.util.concurrent.*; import java.util.function.Supplier; +import java.util.function.UnaryOperator; /** * This interface provides an api for vert.x core internal use only @@ -35,7 +36,7 @@ */ public interface ContextInternal extends Context { - ContextLocal> LOCAL_MAP = new ContextLocalImpl<>(0); + ContextLocal> LOCAL_MAP = (ContextLocal) new ContextLocalImpl<>(0, ConcurrentMap.class, t -> new ConcurrentHashMap<>(t)); /** * @return the current context diff --git a/src/main/java/io/vertx/core/impl/ContextLocalImpl.java b/src/main/java/io/vertx/core/impl/ContextLocalImpl.java index 02671c9a58c..bf9e4810ea0 100644 --- a/src/main/java/io/vertx/core/impl/ContextLocalImpl.java +++ b/src/main/java/io/vertx/core/impl/ContextLocalImpl.java @@ -12,18 +12,31 @@ import io.vertx.core.spi.context.storage.ContextLocal; +import java.util.function.Function; +import java.util.function.UnaryOperator; + /** * @author Julien Viet */ public class ContextLocalImpl implements ContextLocal { - final int index; + public static ContextLocal registerLocal(Class type) { + return registerLocal(type, (UnaryOperator) ContextLocalImpl.IDENTITY); + } - public ContextLocalImpl(int index) { - this.index = index; + public static ContextLocal registerLocal(Class type, Function duplicator) { + return LocalSeq.add(idx -> new ContextLocalImpl<>(idx, type, duplicator)); } - public ContextLocalImpl() { - this.index = LocalSeq.next(); + public static final UnaryOperator IDENTITY = UnaryOperator.identity(); + + final int index; + final Class type; + final Function duplicator; + + ContextLocalImpl(int index, Class type, Function duplicator) { + this.index = index; + this.type = type; + this.duplicator = duplicator; } } diff --git a/src/main/java/io/vertx/core/impl/DuplicatedContext.java b/src/main/java/io/vertx/core/impl/DuplicatedContext.java index fc2fddfac6d..d142b83f073 100644 --- a/src/main/java/io/vertx/core/impl/DuplicatedContext.java +++ b/src/main/java/io/vertx/core/impl/DuplicatedContext.java @@ -16,10 +16,10 @@ import io.vertx.core.Handler; import io.vertx.core.ThreadingModel; import io.vertx.core.json.JsonObject; +import io.vertx.core.spi.context.storage.ContextLocal; import io.vertx.core.spi.tracing.VertxTracer; import java.util.concurrent.Callable; -import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.Executor; @@ -164,7 +164,28 @@ public boolean isWorkerContext() { @Override public ContextInternal duplicate() { - return new DuplicatedContext(delegate, locals.length == 0 ? VertxImpl.EMPTY_CONTEXT_LOCALS : new Object[locals.length]); + Object[] localsDuplicate = locals.length == 0 ? VertxImpl.EMPTY_CONTEXT_LOCALS : locals.clone(); + ContextLocal[] contextLocals = ((VertxImpl) delegate.owner()).contextLocals; + for (int i = 0;i < localsDuplicate.length;i++) { + ContextLocalImpl contextLocal = (ContextLocalImpl) contextLocals[i]; + Object local = locals[i]; + if (local != null) { + if (local != ContextLocalImpl.IDENTITY) { + localsDuplicate[i] = duplicate(local, contextLocal); + } else { + localsDuplicate[i] = local; + } + } + } + return new DuplicatedContext(delegate, localsDuplicate); + } + + private static T duplicate(Object o, ContextLocalImpl contextLocal) { + if (contextLocal.type.isInstance(o)) { + return contextLocal.duplicator.apply(contextLocal.type.cast(o)); + } else { + throw new ClassCastException(); + } } @Override diff --git a/src/main/java/io/vertx/core/impl/LocalSeq.java b/src/main/java/io/vertx/core/impl/LocalSeq.java index b9b4a7f0d8a..6a716df250f 100644 --- a/src/main/java/io/vertx/core/impl/LocalSeq.java +++ b/src/main/java/io/vertx/core/impl/LocalSeq.java @@ -10,7 +10,11 @@ */ package io.vertx.core.impl; -import java.util.concurrent.atomic.AtomicInteger; +import io.vertx.core.spi.context.storage.ContextLocal; + +import java.util.ArrayList; +import java.util.List; +import java.util.function.IntFunction; /** * @author Julien Viet @@ -18,20 +22,34 @@ class LocalSeq { // 0 : reserved slot for local context map - private static final AtomicInteger seq = new AtomicInteger(1); + private static final List> locals = new ArrayList<>(); + + static { + reset(); + } /** * Hook for testing purposes */ static void reset() { - seq.set((1)); + synchronized (locals) { + locals.clear(); + locals.add(ContextInternal.LOCAL_MAP); + } } - static int get() { - return seq.get(); + static ContextLocal[] get() { + synchronized (locals) { + return locals.toArray(new ContextLocal[0]); + } } - static int next() { - return seq.getAndIncrement(); + static ContextLocal add(IntFunction> provider) { + synchronized (locals) { + int idx = locals.size(); + ContextLocal local = provider.apply(idx); + locals.add(local); + return local; + } } } diff --git a/src/main/java/io/vertx/core/impl/VertxImpl.java b/src/main/java/io/vertx/core/impl/VertxImpl.java index 59734a3d856..86711dc15cb 100644 --- a/src/main/java/io/vertx/core/impl/VertxImpl.java +++ b/src/main/java/io/vertx/core/impl/VertxImpl.java @@ -37,6 +37,7 @@ import io.vertx.core.net.*; import io.vertx.core.net.impl.*; import io.vertx.core.impl.transports.JDKTransport; +import io.vertx.core.spi.context.storage.ContextLocal; import io.vertx.core.spi.file.FileResolver; import io.vertx.core.file.impl.FileSystemImpl; import io.vertx.core.file.impl.WindowsFileSystem; @@ -73,6 +74,7 @@ import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; import java.util.function.Supplier; +import java.util.function.UnaryOperator; /** * @author Tim Fox @@ -136,7 +138,7 @@ private static ThreadFactory virtualThreadFactory() { private final VerticleManager verticleManager; private final FileResolver fileResolver; private final Map sharedNetServers = new HashMap<>(); - private final int contextLocals; + final ContextLocal[] contextLocals; final WorkerPool workerPool; final WorkerPool internalWorkerPool; final WorkerPool virtualThreaWorkerPool; @@ -503,10 +505,11 @@ public boolean cancelTimer(long id) { } private Object[] createContextLocals() { - if (contextLocals == 0) { + int len = contextLocals.length; + if (len == 0) { return EMPTY_CONTEXT_LOCALS; } else { - return new Object[contextLocals]; + return new Object[len]; } } diff --git a/src/main/java/io/vertx/core/spi/context/storage/ContextLocal.java b/src/main/java/io/vertx/core/spi/context/storage/ContextLocal.java index 8831b74098f..6811b8093d0 100644 --- a/src/main/java/io/vertx/core/spi/context/storage/ContextLocal.java +++ b/src/main/java/io/vertx/core/spi/context/storage/ContextLocal.java @@ -15,6 +15,7 @@ import io.vertx.core.impl.ContextLocalImpl; import java.util.function.Supplier; +import java.util.function.UnaryOperator; /** * A local storage for arbitrary data attached to a duplicated {@link Context}. @@ -35,7 +36,16 @@ public interface ContextLocal { * @return the context local storage */ static ContextLocal registerLocal(Class type) { - return new ContextLocalImpl<>(); + return ContextLocalImpl.registerLocal(type); + } + + /** + * Registers a context local storage. + * + * @return the context local storage + */ + static ContextLocal registerLocal(Class type, UnaryOperator duplicator) { + return ContextLocalImpl.registerLocal(type, duplicator); } /** @@ -58,6 +68,10 @@ default T get(Context context, Supplier initialValueSupplier) { return get(context, AccessMode.CONCURRENT, initialValueSupplier); } + default T duplicate(T value) { + return value; + } + /** * Put local data in the {@code context}. * diff --git a/src/test/java/io/vertx/core/ContextTest.java b/src/test/java/io/vertx/core/ContextTest.java index c41eb3a087a..2a64f2afb34 100644 --- a/src/test/java/io/vertx/core/ContextTest.java +++ b/src/test/java/io/vertx/core/ContextTest.java @@ -1092,4 +1092,18 @@ public void testConcurrentLocalAccess() throws Exception { } } + @Test + public void testNestedDuplicate() { + ContextInternal ctx = ((ContextInternal) vertx.getOrCreateContext()).duplicate(); + ctx.putLocal("foo", "bar"); + Object expected = new Object(); + ctx.putLocal(contextLocal, AccessMode.CONCURRENT, expected); + ContextInternal duplicate = ctx.duplicate(); + assertEquals("bar", duplicate.getLocal("foo")); + assertEquals(expected, duplicate.getLocal(contextLocal)); + ctx.removeLocal("foo"); + ctx.removeLocal(contextLocal, AccessMode.CONCURRENT); + assertEquals("bar", duplicate.getLocal("foo")); + assertEquals(expected, duplicate.getLocal(contextLocal)); + } }