diff --git a/mycore-base/src/main/java/org/mycore/datamodel/metadata/MCRMetadataManager.java b/mycore-base/src/main/java/org/mycore/datamodel/metadata/MCRMetadataManager.java index 6376a25688..d7d04ed0fc 100644 --- a/mycore-base/src/main/java/org/mycore/datamodel/metadata/MCRMetadataManager.java +++ b/mycore-base/src/main/java/org/mycore/datamodel/metadata/MCRMetadataManager.java @@ -33,6 +33,10 @@ import java.util.Objects; import java.util.Optional; import java.util.concurrent.Callable; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.locks.ReentrantLock; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; @@ -81,6 +85,17 @@ public final class MCRMetadataManager { public static final String DEFAULT_IMPLEMENTATION_KEY = "Default"; + /** + * Property key for the per-object lock acquisition timeout (in seconds) used by + * {@link #lock(MCRObjectID)}. When the timeout elapses without acquiring the lock, + * an {@link MCRPersistenceException} is thrown. + */ + public static final String LOCK_TIMEOUT_PROPERTY = "MCR.Metadata.Manager.LockTimeoutSeconds"; + + private static final int DEFAULT_LOCK_TIMEOUT_SECONDS = 60; + + private static final ConcurrentMap LOCKS = new ConcurrentHashMap<>(); + private MCRMetadataManager() { } @@ -105,19 +120,20 @@ public static void create(final MCRDerivate mcrDerivate) throws MCRPersistenceEx derivateId = assignNewIdIfNecessary(mcrDerivate); - validateDerivate(mcrDerivate, derivateId); + try (MCRObjectLock ignored = lock(derivateId)) { + validateDerivate(mcrDerivate, derivateId); - final MCRObjectID objectId = getObjectIDFromDerivate(mcrDerivate); - checkWritePermission(objectId, derivateId); + final MCRObjectID objectId = getObjectIDFromDerivate(mcrDerivate); + checkWritePermission(objectId, derivateId); - byte[] objectBackup = retrieveObjectBackup(objectId); + byte[] objectBackup = retrieveObjectBackup(objectId); - setDerivateMetadata(mcrDerivate); + setDerivateMetadata(mcrDerivate); - fireEvent(mcrDerivate, null, MCREvent.EventType.CREATE); - - createDataInIFS(mcrDerivate, derivateId, objectId, objectBackup); + fireEvent(mcrDerivate, null, MCREvent.EventType.CREATE); + createDataInIFS(mcrDerivate, derivateId, objectId, objectBackup); + } } private static MCRObjectID assignNewIdIfNecessary(MCRDerivate mcrDerivate) { @@ -266,17 +282,19 @@ public static void create(final MCRObject mcrObject) throws MCRPersistenceExcept MCRObjectID objectId = Objects.requireNonNull(mcrObject.getId(), "ObjectID must not be null"); checkCreatePrivilege(objectId); - // exist the object? - if (exists(objectId)) { - throw new MCRPersistenceException("The object " + objectId + " already exists, nothing done."); - } + try (MCRObjectLock ignored = lock(objectId)) { + // exist the object? + if (exists(objectId)) { + throw new MCRPersistenceException("The object " + objectId + " already exists, nothing done."); + } - normalizeObject(mcrObject); + normalizeObject(mcrObject); - validateObject(mcrObject); + validateObject(mcrObject); - // handle events - fireEvent(mcrObject, null, MCREvent.EventType.CREATE); + // handle events + fireEvent(mcrObject, null, MCREvent.EventType.CREATE); + } } public static void checkCreatePrivilege(MCRObjectID objectId) throws MCRAccessException { @@ -304,24 +322,26 @@ public static void delete(final MCRDerivate mcrDerivate) throws MCRPersistenceEx if (!MCRAccessManager.checkDerivateContentPermission(id, PERMISSION_DELETE)) { throw new MCRMissingPermissionException("Delete derivate", id.toString(), PERMISSION_DELETE); } - // mark for deletion - MCRMarkManager.getInstance().mark(id, Operation.DELETE); + try (MCRObjectLock ignored = lock(id)) { + // mark for deletion + MCRMarkManager.getInstance().mark(id, Operation.DELETE); - // delete data from IFS - if (mcrDerivate.getDerivate().getInternals() != null) { - try { - deleteDerivate(id.toString()); - LOGGER.info("IFS entries for MCRDerivate {} are deleted.", id); - } catch (final Exception e) { - throw new MCRPersistenceException("Error while delete MCRDerivate " + id + " in IFS", e); + // delete data from IFS + if (mcrDerivate.getDerivate().getInternals() != null) { + try { + deleteDerivate(id.toString()); + LOGGER.info("IFS entries for MCRDerivate {} are deleted.", id); + } catch (final Exception e) { + throw new MCRPersistenceException("Error while delete MCRDerivate " + id + " in IFS", e); + } } - } - // handle events - fireEvent(mcrDerivate, null, MCREvent.EventType.DELETE); + // handle events + fireEvent(mcrDerivate, null, MCREvent.EventType.DELETE); - // remove mark - MCRMarkManager.getInstance().remove(id); + // remove mark + MCRMarkManager.getInstance().remove(id); + } } /** @@ -379,27 +399,31 @@ private static void delete(final MCRObjectID id, MCRObject mcrObject) checkDeletePermission(id); - checkForActiveLinks(id); + try (MCRObjectLock ignored = lock(id)) { + checkForActiveLinks(id); - markForDeletion(id); + markForDeletion(id); - removeAllChildren(id); + removeAllChildren(id); - removeAllDerivates(id); + removeAllDerivates(id); - if (mcrObject == null) { - try { - mcrObject = retrieveMCRObject(id); - } catch (MCRPersistenceException e) { - LOGGER.error(() -> "Error while deleting " + id + ". Unable to retrieve MCRObject from disk. Create " + - "empty mycoreobject for event handling as fallback.", e); - mcrObject = new MCRObject(); - mcrObject.setId(id); + if (mcrObject == null) { + try { + mcrObject = retrieveMCRObject(id); + } catch (MCRPersistenceException e) { + LOGGER.error( + () -> "Error while deleting " + id + ". Unable to retrieve MCRObject from disk. Create " + + "empty mycoreobject for event handling as fallback.", + e); + mcrObject = new MCRObject(); + mcrObject.setId(id); + } } - } - fireEvent(mcrObject, null, MCREvent.EventType.DELETE); + fireEvent(mcrObject, null, MCREvent.EventType.DELETE); - removeMark(id); + removeMark(id); + } } private static void removeAllChildren(MCRObjectID id) @@ -659,21 +683,22 @@ public static void update(final MCRDerivate mcrDerivate) throws MCRPersistenceEx return; } - if (!exists(derivateId)) { - create(mcrDerivate); - return; - } - - checkUpdatePermission(derivateId); + try (MCRObjectLock ignored = lock(derivateId)) { + if (!exists(derivateId)) { + create(mcrDerivate); + return; + } - Path fileSourceDirectory = handleFileSourceDirectory(mcrDerivate); + checkUpdatePermission(derivateId); - MCRDerivate old = retrieveMCRDerivate(derivateId); + Path fileSourceDirectory = handleFileSourceDirectory(mcrDerivate); - updateDerivate(mcrDerivate, old); + MCRDerivate old = retrieveMCRDerivate(derivateId); - updateIFS(fileSourceDirectory, derivateId); + updateDerivate(mcrDerivate, old); + updateIFS(fileSourceDirectory, derivateId); + } } private static Path handleFileSourceDirectory(MCRDerivate mcrDerivate) { @@ -730,21 +755,23 @@ public static void update(final MCRObject mcrObject) throws MCRPersistenceExcept return; } - if (!exists(id)) { - create(mcrObject); - return; - } + try (MCRObjectLock ignored = lock(id)) { + if (!exists(id)) { + create(mcrObject); + return; + } - checkUpdatePermission(id); - normalizeObject(mcrObject); - validateObject(mcrObject); + checkUpdatePermission(id); + normalizeObject(mcrObject); + validateObject(mcrObject); - MCRObject old = retrieveMCRObject(id); + MCRObject old = retrieveMCRObject(id); - checkModificationDates(mcrObject, old); - retainCreatedDateAndFlags(mcrObject, old); + checkModificationDates(mcrObject, old); + retainCreatedDateAndFlags(mcrObject, old); - fireUpdateEvent(mcrObject); + fireUpdateEvent(mcrObject); + } } /** @@ -909,4 +936,75 @@ public static MCRObjectID getObjectId(MCRObjectID derivateID) { return null; }); } + + /** + * Acquires the per-object lock for {@code id}, blocking up to the timeout configured by + * {@link #LOCK_TIMEOUT_PROPERTY} (default 60s). The returned {@link MCRObjectLock} must be + * closed (use try-with-resources). Reentrant: the same thread may acquire the same id any + * number of times; only the outermost {@code close()} releases the lock for other threads. + * + * @param id the object id to lock + * @return an AutoCloseable lock handle + * @throws MCRPersistenceException if the timeout elapses or the wait is interrupted + */ + public static MCRObjectLock lock(MCRObjectID id) { + LockEntry entry = LOCKS.compute(id, (k, v) -> { + if (v == null) { + v = new LockEntry(); + } + v.refCount++; + return v; + }); + boolean acquired = false; + try { + acquired = entry.lock.tryLock(getLockTimeoutSeconds(), TimeUnit.SECONDS); + if (!acquired) { + throw new MCRPersistenceException("Lock timeout for " + id); + } + return new MCRObjectLock(id); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new MCRPersistenceException("Lock interrupted for " + id, e); + } finally { + if (!acquired) { + LOCKS.compute(id, (k, v) -> --v.refCount == 0 ? null : v); + } + } + } + + private static int getLockTimeoutSeconds() { + return MCRConfiguration2.getInt(LOCK_TIMEOUT_PROPERTY).orElse(DEFAULT_LOCK_TIMEOUT_SECONDS); + } + + private static final class LockEntry { + final ReentrantLock lock = new ReentrantLock(); + int refCount; // guarded per key by ConcurrentHashMap.compute on LOCKS + } + + /** + * AutoCloseable handle returned by {@link MCRMetadataManager#lock(MCRObjectID)}. + * Release by calling {@link #close()}, typically via try-with-resources. + */ + public static final class MCRObjectLock implements AutoCloseable { + + private final MCRObjectID id; + + private boolean closed; + + MCRObjectLock(MCRObjectID id) { + this.id = id; + } + + @Override + public void close() { + if (closed) { + return; + } + closed = true; + LOCKS.compute(id, (k, v) -> { + v.lock.unlock(); + return --v.refCount == 0 ? null : v; + }); + } + } } diff --git a/mycore-base/src/main/resources/config/mycore.properties b/mycore-base/src/main/resources/config/mycore.properties index 5bf6831580..1d9c4fe9cd 100644 --- a/mycore-base/src/main/resources/config/mycore.properties +++ b/mycore-base/src/main/resources/config/mycore.properties @@ -290,6 +290,10 @@ MCR.Access.Facts.Condition.category=org.mycore.access.facts.condition.fact.MCRCa # Which metadata manager to use (dictates the available stores) MCR.Metadata.Manager.Class=org.mycore.datamodel.common.MCRDefaultXMLMetadataManager +# Timeout in seconds for acquiring the per-object write lock used by MCRMetadataManager. +# A timeout throws MCRPersistenceException; the caller can retry, surface, or abort. + MCR.Metadata.Manager.LockTimeoutSeconds=60 + # Metadata store for derivate XML MCR.IFS2.Store.derivate.Class=org.mycore.datamodel.ifs2.MCRVersioningMetadataStore MCR.IFS2.Store.derivate.SlotLayout=4-2-2 diff --git a/mycore-base/src/test/java/org/mycore/datamodel/metadata/MCRMetadataManagerConcurrentUpdateTest.java b/mycore-base/src/test/java/org/mycore/datamodel/metadata/MCRMetadataManagerConcurrentUpdateTest.java new file mode 100644 index 0000000000..b8cc1c7cc0 --- /dev/null +++ b/mycore-base/src/test/java/org/mycore/datamodel/metadata/MCRMetadataManagerConcurrentUpdateTest.java @@ -0,0 +1,240 @@ +/* + * This file is part of *** M y C o R e *** + * See https://www.mycore.de/ for details. + * + * MyCoRe is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * MyCoRe is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with MyCoRe. If not, see . + */ + +package org.mycore.datamodel.metadata; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertTrue; + +import java.util.List; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.CyclicBarrier; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; + +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; +import org.mycore.access.MCRAccessBaseImpl; +import org.mycore.common.MCRException; +import org.mycore.common.MCRPersistenceException; +import org.mycore.common.MCRSession; +import org.mycore.common.MCRSessionMgr; +import org.mycore.common.MCRTestConfiguration; +import org.mycore.common.MCRTestProperty; +import org.mycore.common.events.MCREvent.ObjectType; +import org.mycore.common.events.MCREventManager; +import org.mycore.datamodel.common.MCRXMLMetadataEventHandler; +import org.mycore.datamodel.metadata.MCRMetadataManager.MCRObjectLock; +import org.mycore.test.MCRJPAExtension; +import org.mycore.test.MCRMetadataExtension; +import org.mycore.test.MyCoReTest; + +/** + * Integration tests for the per-object lock wired into {@link MCRMetadataManager}'s + * write methods. Both tests would fail without the automatic lock. + */ +@MyCoReTest +@ExtendWith(MCRJPAExtension.class) +@ExtendWith(MCRMetadataExtension.class) +@MCRTestConfiguration(properties = { + @MCRTestProperty(key = "MCR.Access.Class", classNameOf = MCRAccessBaseImpl.class), + @MCRTestProperty(key = "MCR.Metadata.Type.document", string = "true") +}) +public class MCRMetadataManagerConcurrentUpdateTest { + + private MCRObject obj; + + @BeforeEach + public void setUp() throws Exception { + MCREventManager.getInstance().clear(); + MCREventManager.getInstance().addEventHandler(ObjectType.OBJECT, new MCRXMLMetadataEventHandler()); + obj = newObject("test_document_00000001"); + MCRMetadataManager.create(obj); + } + + @AfterEach + public void tearDown() throws Exception { + if (MCRMetadataManager.exists(obj.getId())) { + MCRMetadataManager.delete(MCRMetadataManager.retrieveMCRObject(obj.getId())); + } + } + + private static MCRObject newObject(String id) { + MCRObject object = new MCRObject(); + object.setId(MCRObjectID.getInstance(id)); + object.setSchema("noSchema"); + return object; + } + + /** + * Holds {@code lock(id)} on a separate thread; main thread tries an + * {@code update(id)} which must block until the holder releases. + * Without the automatic lock inside {@code update}, the call returns + * immediately and the assertion fails. + */ + @Test + public void updateBlocksWhileExternalLockHeld() throws Exception { + MCRObjectID id = obj.getId(); + MCRSession parentSession = MCRSessionMgr.getCurrentSession(); + + CountDownLatch lockHeld = new CountDownLatch(1); + CountDownLatch releaseHolder = new CountDownLatch(1); + AtomicBoolean updateReturned = new AtomicBoolean(); + + ExecutorService pool = Executors.newFixedThreadPool(2); + try { + CompletableFuture holder = CompletableFuture.runAsync(() -> { + MCRSessionMgr.setCurrentSession(parentSession); + try (MCRObjectLock l = MCRMetadataManager.lock(id)) { + lockHeld.countDown(); + releaseHolder.await(); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } finally { + MCRSessionMgr.releaseCurrentSession(); + } + }, pool); + + assertTrue(lockHeld.await(5, TimeUnit.SECONDS), "holder must acquire the lock"); + + CompletableFuture updater = CompletableFuture.runAsync(() -> { + MCRSessionMgr.setCurrentSession(parentSession); + try { + MCRObject loaded = MCRMetadataManager.retrieveMCRObject(id); + loaded.getService().addFlag("updated", "yes"); + MCRMetadataManager.update(loaded); + updateReturned.set(true); + } catch (Exception e) { + throw new MCRException("update failed", e); + } finally { + MCRSessionMgr.releaseCurrentSession(); + } + }, pool); + + // Give the updater enough time to enter update() and block on the lock. + // Without the automatic lock, update() would complete in well under 1s. + Thread.sleep(1000); + assertFalse(updateReturned.get(), + "update must block while another thread holds lock(id)"); + + releaseHolder.countDown(); + holder.get(5, TimeUnit.SECONDS); + updater.get(5, TimeUnit.SECONDS); + + assertTrue(updateReturned.get(), "update must complete after holder releases lock"); + + MCRObject finalState = MCRMetadataManager.retrieveMCRObject(id); + assertEquals(List.of("yes"), finalState.getService().getFlags("updated")); + } finally { + pool.shutdown(); + assertTrue(pool.awaitTermination(10, TimeUnit.SECONDS)); + } + } + + /** + * Forces both threads to retrieve the same {@code modifyDate} snapshot before + * either persists, then both call {@code update}. With the lock in place, + * one wins and the other's stale {@code modifyDate} is rejected by + * {@code checkModificationDates} (which now sits inside the locked critical + * section). Without the lock the two writes interleave and silently lose one + * caller's flag. + */ + @Test + public void forcedRaceProducesExactlyOneWinner() throws Exception { + MCRObjectID id = obj.getId(); + MCRSession parentSession = MCRSessionMgr.getCurrentSession(); + + CyclicBarrier afterRetrieve = new CyclicBarrier(2); + ExecutorService pool = Executors.newFixedThreadPool(2); + try { + CompletableFuture fA = racingUpdate(pool, parentSession, id, afterRetrieve, "A"); + CompletableFuture fB = racingUpdate(pool, parentSession, id, afterRetrieve, "B"); + + int succeeded = 0; + int rejected = 0; + for (CompletableFuture f : List.of(fA, fB)) { + try { + if (f.get(15, TimeUnit.SECONDS)) { + succeeded++; + } else { + rejected++; + } + } catch (ExecutionException ee) { + Throwable root = rootCause(ee); + if (root instanceof MCRPersistenceException) { + rejected++; + } else { + throw ee; + } + } + } + + assertEquals(1, succeeded, + "exactly one update must succeed; the other must be rejected as stale"); + assertEquals(1, rejected, "exactly one update must be rejected"); + + MCRObject finalState = MCRMetadataManager.retrieveMCRObject(id); + long winnerFlags = finalState.getService().getFlags("race").stream() + .filter(s -> s.equals("A") || s.equals("B")).count(); + assertEquals(1, winnerFlags, "persisted state must contain exactly the winner's flag"); + } finally { + pool.shutdown(); + assertTrue(pool.awaitTermination(10, TimeUnit.SECONDS)); + } + } + + private static CompletableFuture racingUpdate(ExecutorService pool, MCRSession session, + MCRObjectID id, CyclicBarrier afterRetrieve, String value) { + return CompletableFuture.supplyAsync(() -> { + MCRSessionMgr.setCurrentSession(session); + try { + MCRObject loaded = MCRMetadataManager.retrieveMCRObject(id); + loaded.getService().addFlag("race", value); + // wait until the other thread has also retrieved its (stale) snapshot + afterRetrieve.await(15, TimeUnit.SECONDS); + try { + MCRMetadataManager.update(loaded); + return true; + } catch (MCRPersistenceException e) { + return false; + } catch (Exception e) { + throw new MCRException("unexpected update failure", e); + } + } catch (Exception e) { + throw new MCRException("racing update failed", e); + } finally { + MCRSessionMgr.releaseCurrentSession(); + } + }, pool); + } + + private static Throwable rootCause(Throwable t) { + Throwable c = t; + while (c.getCause() != null && c.getCause() != c) { + c = c.getCause(); + } + return c; + } +} diff --git a/mycore-base/src/test/java/org/mycore/datamodel/metadata/MCRMetadataManagerLockTest.java b/mycore-base/src/test/java/org/mycore/datamodel/metadata/MCRMetadataManagerLockTest.java new file mode 100644 index 0000000000..a09c892731 --- /dev/null +++ b/mycore-base/src/test/java/org/mycore/datamodel/metadata/MCRMetadataManagerLockTest.java @@ -0,0 +1,185 @@ +/* + * This file is part of *** M y C o R e *** + * See https://www.mycore.de/ for details. + * + * MyCoRe is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * MyCoRe is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with MyCoRe. If not, see . + */ + +package org.mycore.datamodel.metadata; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.junit.jupiter.api.Assertions.assertTrue; + +import java.lang.reflect.Field; +import java.util.Map; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; + +import org.junit.jupiter.api.Test; +import org.mycore.common.MCRPersistenceException; +import org.mycore.common.MCRTestConfiguration; +import org.mycore.common.MCRTestProperty; +import org.mycore.datamodel.metadata.MCRMetadataManager.MCRObjectLock; +import org.mycore.test.MyCoReTest; + +/** + * Tests for the per-object lock primitive on {@link MCRMetadataManager}. + * No metadata store is touched; only lock acquisition/release semantics are exercised. + */ +@MyCoReTest +@MCRTestConfiguration(properties = { + @MCRTestProperty(key = "MCR.Metadata.Type.test", string = "true") +}) +public class MCRMetadataManagerLockTest { + + private static MCRObjectID idA() { + return MCRObjectID.getInstance("MyCoRe_test_00000001"); + } + + private static MCRObjectID idB() { + return MCRObjectID.getInstance("MyCoRe_test_00000002"); + } + + @Test + public void reentrantSameThread() { + MCRObjectID id = idA(); + try (MCRObjectLock outer = MCRMetadataManager.lock(id)) { + try (MCRObjectLock inner = MCRMetadataManager.lock(id)) { + assertTrue(true); + } + } + } + + @Test + public void differentIdsRunInParallel() throws Exception { + MCRObjectID a = idA(); + MCRObjectID b = idB(); + CountDownLatch bothInside = new CountDownLatch(2); + CountDownLatch release = new CountDownLatch(1); + ExecutorService pool = Executors.newFixedThreadPool(2); + try { + pool.submit(() -> { + try (MCRObjectLock l = MCRMetadataManager.lock(a)) { + bothInside.countDown(); + release.await(); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } + return null; + }); + pool.submit(() -> { + try (MCRObjectLock l = MCRMetadataManager.lock(b)) { + bothInside.countDown(); + release.await(); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } + return null; + }); + assertTrue(bothInside.await(5, TimeUnit.SECONDS), + "both threads should hold their lock simultaneously"); + release.countDown(); + } finally { + pool.shutdown(); + assertTrue(pool.awaitTermination(5, TimeUnit.SECONDS)); + } + } + + @Test + @MCRTestConfiguration(properties = { + @MCRTestProperty(key = "MCR.Metadata.Type.test", string = "true"), + @MCRTestProperty(key = "MCR.Metadata.Manager.LockTimeoutSeconds", string = "1") + }) + public void timeoutThrows() throws Exception { + MCRObjectID id = idA(); + ExecutorService pool = Executors.newSingleThreadExecutor(); + CountDownLatch holding = new CountDownLatch(1); + CountDownLatch release = new CountDownLatch(1); + try { + pool.submit(() -> { + try (MCRObjectLock l = MCRMetadataManager.lock(id)) { + holding.countDown(); + release.await(); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } + return null; + }); + assertTrue(holding.await(5, TimeUnit.SECONDS)); + assertThrows(MCRPersistenceException.class, () -> MCRMetadataManager.lock(id)); + release.countDown(); + } finally { + pool.shutdown(); + assertTrue(pool.awaitTermination(5, TimeUnit.SECONDS)); + } + } + + @Test + public void refcountCleansUpAfterRelease() throws Exception { + for (int i = 0; i < 100; i++) { + MCRObjectID id = MCRObjectID.getInstance(String.format("MyCoRe_test_%08d", 100 + i)); + MCRMetadataManager.lock(id).close(); + } + Field f = MCRMetadataManager.class.getDeclaredField("LOCKS"); + f.setAccessible(true); + Map map = (Map) f.get(null); + assertEquals(0, map.size(), "LOCKS map should be empty after all releases"); + } + + @Test + public void doubleCloseIsNoop() throws Exception { + MCRObjectID id = idA(); + MCRObjectLock l = MCRMetadataManager.lock(id); + l.close(); + l.close(); + Field f = MCRMetadataManager.class.getDeclaredField("LOCKS"); + f.setAccessible(true); + Map map = (Map) f.get(null); + assertEquals(0, map.size(), "second close must not corrupt LOCKS map"); + try (MCRObjectLock again = MCRMetadataManager.lock(id)) { + assertTrue(true); + } + } + + @Test + public void mutualExclusionSerialisesAccess() throws Exception { + MCRObjectID id = idA(); + AtomicInteger inside = new AtomicInteger(); + AtomicInteger maxObserved = new AtomicInteger(); + ExecutorService pool = Executors.newFixedThreadPool(8); + try { + for (int i = 0; i < 8; i++) { + pool.submit(() -> { + try (MCRObjectLock l = MCRMetadataManager.lock(id)) { + int now = inside.incrementAndGet(); + maxObserved.accumulateAndGet(now, Math::max); + Thread.sleep(20); + inside.decrementAndGet(); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } + return null; + }); + } + } finally { + pool.shutdown(); + assertTrue(pool.awaitTermination(10, TimeUnit.SECONDS)); + } + assertEquals(1, maxObserved.get(), "only one thread may hold the lock at a time"); + } +} diff --git a/mycore-pi/src/main/java/org/mycore/pi/MCRPIService.java b/mycore-pi/src/main/java/org/mycore/pi/MCRPIService.java index e2714f4da1..95f3461cd5 100644 --- a/mycore-pi/src/main/java/org/mycore/pi/MCRPIService.java +++ b/mycore-pi/src/main/java/org/mycore/pi/MCRPIService.java @@ -52,6 +52,7 @@ import org.mycore.datamodel.metadata.MCRBase; import org.mycore.datamodel.metadata.MCRDerivate; import org.mycore.datamodel.metadata.MCRMetadataManager; +import org.mycore.datamodel.metadata.MCRMetadataManager.MCRObjectLock; import org.mycore.datamodel.metadata.MCRObject; import org.mycore.datamodel.metadata.MCRObjectID; import org.mycore.datamodel.metadata.MCRObjectService; @@ -340,24 +341,26 @@ public T register(MCRBase obj, String additional, boolean updateObject) // There are many querys that require the current database state. // So we start a new transaction within the synchronized block final MCRFixedUserCallable createPICallable = new MCRFixedUserCallable<>(() -> { - this.validateRegistration(obj, additional, updateObject); - final T identifier = getNewIdentifier(obj, additional); - MCRPIServiceDates dates = this.registerIdentifier(obj, additional, identifier); - this.getMetadataService().insertIdentifier(identifier, obj, additional); - - MCRPI databaseEntry = insertIdentifierToDatabase(obj, additional, identifier, dates); - - addFlagToObject(obj, databaseEntry); - - if (updateObject) { - if (obj instanceof MCRObject object) { - MCRMetadataManager.update(object); - } else if (obj instanceof MCRDerivate derivate) { - MCRMetadataManager.update(derivate); + try (MCRObjectLock ignored = MCRMetadataManager.lock(obj.getId())) { + this.validateRegistration(obj, additional, updateObject); + final T identifier = getNewIdentifier(obj, additional); + MCRPIServiceDates dates = this.registerIdentifier(obj, additional, identifier); + this.getMetadataService().insertIdentifier(identifier, obj, additional); + + MCRPI databaseEntry = insertIdentifierToDatabase(obj, additional, identifier, dates); + + addFlagToObject(obj, databaseEntry); + + if (updateObject) { + if (obj instanceof MCRObject object) { + MCRMetadataManager.update(object); + } else if (obj instanceof MCRDerivate derivate) { + MCRMetadataManager.update(derivate); + } } - } - return identifier; + return identifier; + } }, MCRSessionMgr.getCurrentSession().getUserInformation()); try { @@ -513,25 +516,27 @@ protected MCRPI getTableEntry(MCRObjectID id, String additional) { } public void updateFlag(MCRObjectID id, String additional, MCRPI mcrpi) { - MCRBase obj = MCRMetadataManager.retrieve(id); - MCRObjectService service = obj.getService(); - List flags = service.getFlags(PI_FLAG); - Gson gson = getGson(); - String stringFlag = flags.stream().filter(s -> { - MCRPI flag = gson.fromJson(s, MCRPI.class); - return Objects.equals(flag.getAdditional(), additional) && Objects - .equals(flag.getIdentifier(), mcrpi.getIdentifier()); - }).findAny().orElseThrow(() -> new MCRException(new MCRPersistentIdentifierException( - "Could find flag to update (" + id + "," + additional + "," + mcrpi.getIdentifier() + ")"))); - - int flagIndex = service.getFlagIndex(stringFlag); - service.removeFlag(flagIndex); - - addFlagToObject(obj, mcrpi); - try { - MCRMetadataManager.update(obj); - } catch (Exception e) { - throw new MCRException("Could not update flags of object " + id, e); + try (MCRObjectLock ignored = MCRMetadataManager.lock(id)) { + MCRBase obj = MCRMetadataManager.retrieve(id); + MCRObjectService service = obj.getService(); + List flags = service.getFlags(PI_FLAG); + Gson gson = getGson(); + String stringFlag = flags.stream().filter(s -> { + MCRPI flag = gson.fromJson(s, MCRPI.class); + return Objects.equals(flag.getAdditional(), additional) && Objects + .equals(flag.getIdentifier(), mcrpi.getIdentifier()); + }).findAny().orElseThrow(() -> new MCRException(new MCRPersistentIdentifierException( + "Could find flag to update (" + id + "," + additional + "," + mcrpi.getIdentifier() + ")"))); + + int flagIndex = service.getFlagIndex(stringFlag); + service.removeFlag(flagIndex); + + addFlagToObject(obj, mcrpi); + try { + MCRMetadataManager.update(obj); + } catch (Exception e) { + throw new MCRException("Could not update flags of object " + id, e); + } } }