diff --git a/core/sail/lucene-api/src/main/java/org/eclipse/rdf4j/sail/lucene/LuceneSail.java b/core/sail/lucene-api/src/main/java/org/eclipse/rdf4j/sail/lucene/LuceneSail.java index f2d96ecc435..a5a233cce54 100644 --- a/core/sail/lucene-api/src/main/java/org/eclipse/rdf4j/sail/lucene/LuceneSail.java +++ b/core/sail/lucene-api/src/main/java/org/eclipse/rdf4j/sail/lucene/LuceneSail.java @@ -289,6 +289,36 @@ public class LuceneSail extends NotifyingSailWrapper { */ public static final String LUCENE_RAMDIR_KEY = "useramdir"; + /** + * Whether the LuceneIndex should support transactions and rollbacks (true by default). Set the key + * "transactional=false" as sail parameter to DISABLE rollback support. This will improve performance if you are + * doing a lot of inserts/updates and don't need rollback support. It will also run fsync only at an interval (see + * {@link #FSYNC_INTERVAL_KEY}), instead of after each transaction commit. Changes in the index will be only visible + * to readers after the periodic fsync is done. + *

+ * See the original issue. + */ + public static final String TRANSACTIONAL_KEY = "transactional"; + + /** + * Default value for {@link #TRANSACTIONAL_KEY}, true (rollbacks enabled, fsync after each commit). + */ + public static final boolean DEFAULT_TRANSACTIONAL = true; + + /** + * Set the key "fsyncInterval=<t>" as sail parameter to configure the interval in milliseconds in which fsync + * is called on the Lucene index, default is defined in {@link #DEFAULT_FSYNC_INTERVAL}. Changes in the index will + * become visible to readers at most after the interval is elapsed. Only used when {@link #TRANSACTIONAL_KEY} is set + * to false (rollbacks disabled). + */ + public static final String FSYNC_INTERVAL_KEY = "fsyncInterval"; + + /** + * Default fsync interval in milliseconds, used when {@link #FSYNC_INTERVAL_KEY} is not set and + * {@link #TRANSACTIONAL_KEY} is set to false (rollbacks disabled). + */ + public static final long DEFAULT_FSYNC_INTERVAL = 10_000L; + /** * Set the key "defaultNumDocs=<n>" as sail parameter to limit the maximum number of documents to return from * a search query. The default is to return all documents. NB: this may involve extra cost for some SearchIndex diff --git a/core/sail/lucene/src/main/java/org/eclipse/rdf4j/sail/lucene/impl/LuceneIndex.java b/core/sail/lucene/src/main/java/org/eclipse/rdf4j/sail/lucene/impl/LuceneIndex.java index 5999a91cbe8..7b6c3264619 100644 --- a/core/sail/lucene/src/main/java/org/eclipse/rdf4j/sail/lucene/impl/LuceneIndex.java +++ b/core/sail/lucene/src/main/java/org/eclipse/rdf4j/sail/lucene/impl/LuceneIndex.java @@ -25,9 +25,11 @@ import java.util.Iterator; import java.util.List; import java.util.Map; -import java.util.Objects; import java.util.Properties; import java.util.Set; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledThreadPoolExecutor; +import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.function.Supplier; @@ -150,6 +152,12 @@ public class LuceneIndex extends AbstractLuceneIndex { private volatile int fuzzyPrefixLength; + private boolean transactionsEnabled = LuceneSail.DEFAULT_TRANSACTIONAL; + + private long fsyncIntervalMillis = LuceneSail.DEFAULT_FSYNC_INTERVAL; + + private ScheduledThreadPoolExecutor fsyncScheduler; + /** * The IndexWriter that can be used to alter the index' contents. Created lazily. */ @@ -213,6 +221,16 @@ public synchronized void initialize(Properties parameters) throws Exception { this.fuzzyPrefixLength = NumberUtils.toInt(parameters.getProperty(FUZZY_PREFIX_LENGTH_KEY), 0); } + if (parameters.containsKey(LuceneSail.TRANSACTIONAL_KEY)) { + this.transactionsEnabled = Boolean.parseBoolean( + parameters.getProperty(LuceneSail.TRANSACTIONAL_KEY)); + } + if (parameters.containsKey(LuceneSail.FSYNC_INTERVAL_KEY)) { + this.fsyncIntervalMillis = NumberUtils.toLong( + parameters.getProperty(LuceneSail.FSYNC_INTERVAL_KEY), + LuceneSail.DEFAULT_FSYNC_INTERVAL); + } + postInit(); } @@ -270,6 +288,43 @@ private void postInit() throws IOException { IndexWriter writer = new IndexWriter(directory, indexWriterConfig); writer.close(); } + + setUpFsyncScheduler(); + } + + private void setUpFsyncScheduler() { + // If transactions are disabled, launch a background thread to fsync periodically. + if (this.transactionsEnabled || this.fsyncIntervalMillis <= 0) { + return; + } + + // Use a daemon thread so the scheduler does not prevent JVM shutdown. + this.fsyncScheduler = new ScheduledThreadPoolExecutor(1, r -> { + Thread t = Executors.defaultThreadFactory().newThread(r); + t.setDaemon(true); + t.setName("rdf4j-lucene-fsync-" + t.getId()); + return t; + }); + // Help GC by removing cancelled tasks from the queue. + this.fsyncScheduler.setRemoveOnCancelPolicy(true); + this.fsyncScheduler.scheduleAtFixedRate( + () -> { + try { + if (this.getIndexWriter().hasUncommittedChanges()) { + this.getIndexWriter().commit(); + invalidateReaders(); + } + } catch (Throwable e) { + // We just log errors here, there's not much else we can do. + // Rethrowing exceptions on the next commit would be confusing, especially if + // the error is transient. + logger.error("Exception during Lucene index fsync", e); + } + }, + this.fsyncIntervalMillis, + this.fsyncIntervalMillis, + TimeUnit.MILLISECONDS + ); } protected Function createSpatialStrategyMapper(Map parameters) { @@ -384,15 +439,42 @@ public void shutDown() throws IOException { oldmonitors.clear(); } } finally { + // shutdown fsync scheduler try { - IndexWriter toCloseIndexWriter = indexWriter; - indexWriter = null; - if (toCloseIndexWriter != null) { - toCloseIndexWriter.close(); + if (fsyncScheduler != null) { + fsyncScheduler.shutdown(); + if (!fsyncScheduler.awaitTermination(10, TimeUnit.SECONDS)) { + logger.error("Failed to shut down Lucene fsync scheduler within 10s"); + } } + } catch (InterruptedException e) { + logger.error("Interrupted while trying to shut down Lucene fsync scheduler", e); + Thread.currentThread().interrupt(); + } catch (Throwable e) { + exceptions.add(e); } finally { - if (!exceptions.isEmpty()) { - throw new UndeclaredThrowableException(exceptions.get(0)); + // Do a final sync of any uncommitted changes + IndexWriter toCloseIndexWriter = indexWriter; + try { + if (!this.transactionsEnabled && toCloseIndexWriter != null && toCloseIndexWriter.isOpen() + && toCloseIndexWriter.hasUncommittedChanges()) { + toCloseIndexWriter.commit(); + } + } catch (Throwable e) { + logger.error("Failed to commit Lucene IndexWriter while closing", e); + exceptions.add(e); + } finally { + // shutdown the index writer + try { + indexWriter = null; + if (toCloseIndexWriter != null) { + toCloseIndexWriter.close(); + } + } finally { + if (!exceptions.isEmpty()) { + throw new UndeclaredThrowableException(exceptions.get(0)); + } + } } } } @@ -708,14 +790,21 @@ public synchronized void begin() throws IOException { */ @Override public synchronized void commit() throws IOException { - getIndexWriter().commit(); - // the old IndexReaders/Searchers are not outdated - invalidateReaders(); + if (this.transactionsEnabled) { + getIndexWriter().commit(); + // the old IndexReaders/Searchers are not outdated + invalidateReaders(); + } else { + getIndexWriter().flush(); + } } @Override public synchronized void rollback() throws IOException { - getIndexWriter().rollback(); + if (this.transactionsEnabled) { + getIndexWriter().rollback(); + } + // If transactions are disabled, we cannot rollback changes, so we just ignore the rollback request } // //////////////////////////////// Methods for querying the index diff --git a/core/sail/lucene/src/test/java/org/eclipse/rdf4j/sail/lucene/impl/AbstractGenericLuceneTest.java b/core/sail/lucene/src/test/java/org/eclipse/rdf4j/sail/lucene/impl/AbstractGenericLuceneTest.java index 1e09c30169e..97cac07d1cb 100644 --- a/core/sail/lucene/src/test/java/org/eclipse/rdf4j/sail/lucene/impl/AbstractGenericLuceneTest.java +++ b/core/sail/lucene/src/test/java/org/eclipse/rdf4j/sail/lucene/impl/AbstractGenericLuceneTest.java @@ -119,6 +119,26 @@ public abstract class AbstractGenericLuceneTest { protected abstract void configure(LuceneSail sail) throws IOException; + /** + * How long to wait after a commit() to ensure index is updated. + * + * @return milliseconds to wait + */ + protected long waitAfterCommitMillis() { + return 0; + } + + protected final void sleepAfterCommitIfNeeded() { + long waitMillis = waitAfterCommitMillis(); + if (waitMillis > 0) { + try { + Thread.sleep(waitMillis); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } + } + } + @BeforeEach public void setUp() throws Exception { // set logging, uncomment this to get better logging for debugging @@ -166,6 +186,7 @@ public void tearDown() throws RepositoryException { @Test public void testComplexQueryTwo() throws MalformedQueryException, RepositoryException, QueryEvaluationException { + sleepAfterCommitIfNeeded(); // prepare the query StringBuilder buffer = new StringBuilder(); buffer.append("SELECT ?Resource ?Matching ?Score "); @@ -275,6 +296,7 @@ private void evaluate(String[] queries, ArrayList>> exp @Test public void testPredicateLuceneQueries() throws MalformedQueryException, RepositoryException, QueryEvaluationException { + sleepAfterCommitIfNeeded(); // prepare the query String[] queries = new String[] { "SELECT ?Resource ?Score ?Snippet \n" @@ -337,6 +359,7 @@ public void testPredicateLuceneQueries() @Test public void testSnippetQueries() throws MalformedQueryException, RepositoryException, QueryEvaluationException { + sleepAfterCommitIfNeeded(); // prepare the query // search for the term "one", but only in predicate 1 StringBuilder buffer = new StringBuilder(); @@ -396,6 +419,7 @@ public void testSnippetLimitedToPredicate() localConnection.add(SUBJECT_1, PREDICATE_1, vf.createLiteral("but the unicorn charly said to goaway")); localConnection.add(SUBJECT_1, PREDICATE_2, vf.createLiteral("there was poor charly without a kidney")); localConnection.commit(); + sleepAfterCommitIfNeeded(); } // prepare the query @@ -471,6 +495,7 @@ public void testCharlyTerm() { localConnection.add(SUBJECT_1, PREDICATE_1, vf.createLiteral("but the unicorn charly said to goaway")); localConnection.add(SUBJECT_1, PREDICATE_2, vf.createLiteral("there was poor charly without a kidney")); localConnection.commit(); + sleepAfterCommitIfNeeded(); } // search for the term "charly" in all predicates StringBuilder buffer = new StringBuilder(); @@ -529,6 +554,7 @@ public void testCharlyTerm() { @Test public void testGraphQuery() throws QueryEvaluationException, MalformedQueryException, RepositoryException { + sleepAfterCommitIfNeeded(); IRI score = vf.createIRI(LuceneSailSchema.NAMESPACE + "score"); StringBuilder query = new StringBuilder(); @@ -577,6 +603,7 @@ public void testGraphQuery() throws QueryEvaluationException, MalformedQueryExce @Test public void testQueryWithSpecifiedSubject() throws RepositoryException, MalformedQueryException, QueryEvaluationException { + sleepAfterCommitIfNeeded(); // fire a query with the subject pre-specified TupleQuery query = connection.prepareTupleQuery(QUERY_STRING); query.setBinding("Subject", SUBJECT_1); @@ -594,6 +621,7 @@ public void testQueryWithSpecifiedSubject() @Test public void testUnionQuery() throws RepositoryException, MalformedQueryException, QueryEvaluationException { + sleepAfterCommitIfNeeded(); String queryStr = ""; queryStr += "PREFIX search: "; queryStr += "PREFIX rdfs: "; @@ -626,6 +654,7 @@ public void testContextHandling() { connection.add(SUBJECT_5, PREDICATE_1, vf.createLiteral("sfiveponectwo"), CONTEXT_2); connection.add(SUBJECT_5, PREDICATE_2, vf.createLiteral("sfiveptwoctwo"), CONTEXT_2); connection.commit(); + sleepAfterCommitIfNeeded(); // connection.close(); // connection = repository.getConnection(); // connection.setAutoCommit(false); @@ -640,6 +669,7 @@ public void testContextHandling() { // remove a context connection.clear(CONTEXT_1); connection.commit(); + sleepAfterCommitIfNeeded(); assertNoQueryResult("sfourponecone"); assertNoQueryResult("sfourptwocone"); assertNoQueryResult("sfiveponecone"); @@ -659,6 +689,7 @@ public void testNullContextHandling() { connection.add(SUBJECT_5, PREDICATE_1, vf.createLiteral("sfiveponectwo"), CONTEXT_2); connection.add(SUBJECT_5, PREDICATE_2, vf.createLiteral("sfiveptwoctwo"), CONTEXT_2); connection.commit(); + sleepAfterCommitIfNeeded(); // connection.close(); // connection = repository.getConnection(); // connection.setAutoCommit(false); @@ -673,6 +704,7 @@ public void testNullContextHandling() { // remove a context connection.clear((Resource) null); connection.commit(); + sleepAfterCommitIfNeeded(); assertNoQueryResult("sfourponecone"); assertNoQueryResult("sfourptwocone"); assertNoQueryResult("sfiveponecone"); @@ -682,6 +714,7 @@ public void testNullContextHandling() { @Test public void testFuzzyQuery() throws MalformedQueryException, RepositoryException, QueryEvaluationException { + sleepAfterCommitIfNeeded(); // prepare the query // search for the term "one" with 80% fuzzyness StringBuilder buffer = new StringBuilder(); @@ -729,11 +762,13 @@ public void testFuzzyQuery() throws MalformedQueryException, RepositoryException @Test public void testReindexing() { sail.reindex(); + sleepAfterCommitIfNeeded(); testComplexQueryTwo(); } @Test public void testPropertyVar() throws MalformedQueryException, RepositoryException, QueryEvaluationException { + sleepAfterCommitIfNeeded(); StringBuilder buffer = new StringBuilder(); buffer.append("SELECT ?Resource ?Property \n"); buffer.append("WHERE { \n"); diff --git a/core/sail/lucene/src/test/java/org/eclipse/rdf4j/sail/lucene/impl/LuceneNonTransactionalTest.java b/core/sail/lucene/src/test/java/org/eclipse/rdf4j/sail/lucene/impl/LuceneNonTransactionalTest.java new file mode 100644 index 00000000000..7cea00b8d26 --- /dev/null +++ b/core/sail/lucene/src/test/java/org/eclipse/rdf4j/sail/lucene/impl/LuceneNonTransactionalTest.java @@ -0,0 +1,67 @@ +/******************************************************************************* + * Copyright (c) 2025 Eclipse RDF4J contributors. + * + * All rights reserved. This program and the accompanying materials + * are made available under the terms of the Eclipse Distribution License v1.0 + * which accompanies this distribution, and is available at + * http://www.eclipse.org/org/documents/edl-v10.php. + * + * SPDX-License-Identifier: BSD-3-Clause + *******************************************************************************/ +package org.eclipse.rdf4j.sail.lucene.impl; + +import static org.assertj.core.api.AssertionsForClassTypes.assertThat; +import static org.junit.jupiter.api.Assertions.assertNotNull; + +import java.io.File; +import java.io.IOException; +import java.util.Properties; + +import org.apache.lucene.analysis.standard.StandardAnalyzer; +import org.apache.lucene.store.NIOFSDirectory; +import org.eclipse.rdf4j.sail.lucene.LuceneSail; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.io.TempDir; + +/** + * Test to verify that when transactional support is disabled, the Lucene index is configured with a periodic fsync. It + * also checks if the index still works correctly in this mode. + * + * @author Piotr SowiƄski + */ +public class LuceneNonTransactionalTest extends AbstractGenericLuceneTest { + + @TempDir + public File dataDir; + + private LuceneIndex index; + + @Override + protected void configure(LuceneSail sail) throws IOException { + index = new LuceneIndex(new NIOFSDirectory(dataDir.toPath()), new StandardAnalyzer()); + var params = new Properties(); + params.setProperty(LuceneSail.TRANSACTIONAL_KEY, "false"); + params.setProperty(LuceneSail.FSYNC_INTERVAL_KEY, "100"); + params.setProperty(LuceneSail.LUCENE_DIR_KEY, dataDir.getAbsolutePath()); + try { + index.initialize(params); + } catch (Exception e) { + throw new RuntimeException(e); + } + sail.setLuceneIndex(index); + } + + @Override + protected long waitAfterCommitMillis() { + return 200; + } + + @Test + public void testIndexSettings() { + assertNotNull(index); + // Make sure the thread for periodic fsync is running + var allThreads = Thread.getAllStackTraces().keySet(); + assertThat(allThreads.stream().filter(t -> t.getName().startsWith("rdf4j-lucene-fsync-")).count()) + .isGreaterThan(0); + } +} diff --git a/core/sail/lucene/src/test/java/org/eclipse/rdf4j/sail/lucene/impl/LuceneSailTest.java b/core/sail/lucene/src/test/java/org/eclipse/rdf4j/sail/lucene/impl/LuceneSailTest.java index 088f6452900..ebfbd2b1201 100644 --- a/core/sail/lucene/src/test/java/org/eclipse/rdf4j/sail/lucene/impl/LuceneSailTest.java +++ b/core/sail/lucene/src/test/java/org/eclipse/rdf4j/sail/lucene/impl/LuceneSailTest.java @@ -10,11 +10,15 @@ *******************************************************************************/ package org.eclipse.rdf4j.sail.lucene.impl; +import static org.assertj.core.api.AssertionsForClassTypes.assertThat; +import static org.junit.jupiter.api.Assertions.assertNotNull; + import java.io.IOException; import org.apache.lucene.analysis.standard.StandardAnalyzer; import org.apache.lucene.store.RAMDirectory; import org.eclipse.rdf4j.sail.lucene.LuceneSail; +import org.junit.jupiter.api.Test; public class LuceneSailTest extends AbstractGenericLuceneTest { @@ -25,4 +29,13 @@ protected void configure(LuceneSail sail) throws IOException { index = new LuceneIndex(new RAMDirectory(), new StandardAnalyzer()); sail.setLuceneIndex(index); } + + @Test + public void testIndexSettings() { + assertNotNull(index); + // Make sure the thread for periodic fsync is NOT running + var allThreads = Thread.getAllStackTraces().keySet(); + assertThat(allThreads.stream().filter(t -> t.getName().startsWith("rdf4j-lucene-fsync-")).count()) + .isEqualTo(0); + } }