Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -289,6 +289,36 @@ public class LuceneSail extends NotifyingSailWrapper {
*/
public static final String LUCENE_RAMDIR_KEY = "useramdir";

/**
* Whether the LuceneIndex should support transactions and rollbacks (true by default). Set the key
* "transactional=false" as sail parameter to DISABLE rollback support. This will improve performance if you are
* doing a lot of inserts/updates and don't need rollback support. It will also run fsync only at an interval (see
* {@link #FSYNC_INTERVAL_KEY}), instead of after each transaction commit. Changes in the index will be only visible
* to readers after the periodic fsync is done.
* <p>
* See <a href="https://github.com/eclipse-rdf4j/rdf4j/issues/5291">the original issue</a>.
*/
public static final String TRANSACTIONAL_KEY = "transactional";

/**
* Default value for {@link #TRANSACTIONAL_KEY}, true (rollbacks enabled, fsync after each commit).
*/
public static final boolean DEFAULT_TRANSACTIONAL = true;

/**
* Set the key "fsyncInterval=&lt;t&gt;" as sail parameter to configure the interval in milliseconds in which fsync
* is called on the Lucene index, default is defined in {@link #DEFAULT_FSYNC_INTERVAL}. Changes in the index will
* become visible to readers at most after the interval is elapsed. Only used when {@link #TRANSACTIONAL_KEY} is set
* to false (rollbacks disabled).
*/
public static final String FSYNC_INTERVAL_KEY = "fsyncInterval";

/**
* Default fsync interval in milliseconds, used when {@link #FSYNC_INTERVAL_KEY} is not set and
* {@link #TRANSACTIONAL_KEY} is set to false (rollbacks disabled).
*/
public static final long DEFAULT_FSYNC_INTERVAL = 10_000L;

/**
* Set the key "defaultNumDocs=&lt;n&gt;" as sail parameter to limit the maximum number of documents to return from
* a search query. The default is to return all documents. NB: this may involve extra cost for some SearchIndex
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,9 +25,11 @@
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Properties;
import java.util.Set;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Supplier;

Expand Down Expand Up @@ -150,6 +152,12 @@ public class LuceneIndex extends AbstractLuceneIndex {

private volatile int fuzzyPrefixLength;

private boolean transactionsEnabled = LuceneSail.DEFAULT_TRANSACTIONAL;

private long fsyncIntervalMillis = LuceneSail.DEFAULT_FSYNC_INTERVAL;

private ScheduledThreadPoolExecutor fsyncScheduler;

/**
* The IndexWriter that can be used to alter the index' contents. Created lazily.
*/
Expand Down Expand Up @@ -213,6 +221,16 @@ public synchronized void initialize(Properties parameters) throws Exception {
this.fuzzyPrefixLength = NumberUtils.toInt(parameters.getProperty(FUZZY_PREFIX_LENGTH_KEY), 0);
}

if (parameters.containsKey(LuceneSail.TRANSACTIONAL_KEY)) {
this.transactionsEnabled = Boolean.parseBoolean(
parameters.getProperty(LuceneSail.TRANSACTIONAL_KEY));
}
if (parameters.containsKey(LuceneSail.FSYNC_INTERVAL_KEY)) {
this.fsyncIntervalMillis = NumberUtils.toLong(
parameters.getProperty(LuceneSail.FSYNC_INTERVAL_KEY),
LuceneSail.DEFAULT_FSYNC_INTERVAL);
}

postInit();
}

Expand Down Expand Up @@ -270,6 +288,43 @@ private void postInit() throws IOException {
IndexWriter writer = new IndexWriter(directory, indexWriterConfig);
writer.close();
}

setUpFsyncScheduler();
}

private void setUpFsyncScheduler() {
// If transactions are disabled, launch a background thread to fsync periodically.
if (this.transactionsEnabled || this.fsyncIntervalMillis <= 0) {
return;
}

// Use a daemon thread so the scheduler does not prevent JVM shutdown.
this.fsyncScheduler = new ScheduledThreadPoolExecutor(1, r -> {
Thread t = Executors.defaultThreadFactory().newThread(r);
t.setDaemon(true);
t.setName("rdf4j-lucene-fsync-" + t.getId());
return t;
});
// Help GC by removing cancelled tasks from the queue.
this.fsyncScheduler.setRemoveOnCancelPolicy(true);
this.fsyncScheduler.scheduleAtFixedRate(
() -> {
try {
if (this.getIndexWriter().hasUncommittedChanges()) {
this.getIndexWriter().commit();
invalidateReaders();
}
} catch (Throwable e) {
// We just log errors here, there's not much else we can do.
// Rethrowing exceptions on the next commit would be confusing, especially if
// the error is transient.
logger.error("Exception during Lucene index fsync", e);
}
},
this.fsyncIntervalMillis,
this.fsyncIntervalMillis,
TimeUnit.MILLISECONDS
);
}

protected Function<String, ? extends SpatialStrategy> createSpatialStrategyMapper(Map<String, String> parameters) {
Expand Down Expand Up @@ -384,15 +439,42 @@ public void shutDown() throws IOException {
oldmonitors.clear();
}
} finally {
// shutdown fsync scheduler
try {
IndexWriter toCloseIndexWriter = indexWriter;
indexWriter = null;
if (toCloseIndexWriter != null) {
toCloseIndexWriter.close();
if (fsyncScheduler != null) {
fsyncScheduler.shutdown();
if (!fsyncScheduler.awaitTermination(10, TimeUnit.SECONDS)) {
logger.error("Failed to shut down Lucene fsync scheduler within 10s");
}
}
} catch (InterruptedException e) {
logger.error("Interrupted while trying to shut down Lucene fsync scheduler", e);
Thread.currentThread().interrupt();
} catch (Throwable e) {
exceptions.add(e);
} finally {
if (!exceptions.isEmpty()) {
throw new UndeclaredThrowableException(exceptions.get(0));
// Do a final sync of any uncommitted changes
IndexWriter toCloseIndexWriter = indexWriter;
try {
if (!this.transactionsEnabled && toCloseIndexWriter != null && toCloseIndexWriter.isOpen()
&& toCloseIndexWriter.hasUncommittedChanges()) {
toCloseIndexWriter.commit();
}
} catch (Throwable e) {
logger.error("Failed to commit Lucene IndexWriter while closing", e);
exceptions.add(e);
} finally {
// shutdown the index writer
try {
indexWriter = null;
if (toCloseIndexWriter != null) {
toCloseIndexWriter.close();
}
} finally {
if (!exceptions.isEmpty()) {
throw new UndeclaredThrowableException(exceptions.get(0));
}
}
}
}
}
Expand Down Expand Up @@ -708,14 +790,21 @@ public synchronized void begin() throws IOException {
*/
@Override
public synchronized void commit() throws IOException {
getIndexWriter().commit();
// the old IndexReaders/Searchers are not outdated
invalidateReaders();
if (this.transactionsEnabled) {
getIndexWriter().commit();
// the old IndexReaders/Searchers are not outdated
invalidateReaders();
} else {
getIndexWriter().flush();
}
}

@Override
public synchronized void rollback() throws IOException {
getIndexWriter().rollback();
if (this.transactionsEnabled) {
getIndexWriter().rollback();
}
// If transactions are disabled, we cannot rollback changes, so we just ignore the rollback request
}

// //////////////////////////////// Methods for querying the index
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,26 @@ public abstract class AbstractGenericLuceneTest {

protected abstract void configure(LuceneSail sail) throws IOException;

/**
* How long to wait after a commit() to ensure index is updated.
*
* @return milliseconds to wait
*/
protected long waitAfterCommitMillis() {
return 0;
}

protected final void sleepAfterCommitIfNeeded() {
long waitMillis = waitAfterCommitMillis();
if (waitMillis > 0) {
try {
Thread.sleep(waitMillis);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
}

@BeforeEach
public void setUp() throws Exception {
// set logging, uncomment this to get better logging for debugging
Expand Down Expand Up @@ -166,6 +186,7 @@ public void tearDown() throws RepositoryException {

@Test
public void testComplexQueryTwo() throws MalformedQueryException, RepositoryException, QueryEvaluationException {
sleepAfterCommitIfNeeded();
// prepare the query
StringBuilder buffer = new StringBuilder();
buffer.append("SELECT ?Resource ?Matching ?Score ");
Expand Down Expand Up @@ -275,6 +296,7 @@ private void evaluate(String[] queries, ArrayList<List<Map<String, String>>> exp
@Test
public void testPredicateLuceneQueries()
throws MalformedQueryException, RepositoryException, QueryEvaluationException {
sleepAfterCommitIfNeeded();
// prepare the query
String[] queries = new String[] {
"SELECT ?Resource ?Score ?Snippet \n"
Expand Down Expand Up @@ -337,6 +359,7 @@ public void testPredicateLuceneQueries()

@Test
public void testSnippetQueries() throws MalformedQueryException, RepositoryException, QueryEvaluationException {
sleepAfterCommitIfNeeded();
// prepare the query
// search for the term "one", but only in predicate 1
StringBuilder buffer = new StringBuilder();
Expand Down Expand Up @@ -396,6 +419,7 @@ public void testSnippetLimitedToPredicate()
localConnection.add(SUBJECT_1, PREDICATE_1, vf.createLiteral("but the unicorn charly said to goaway"));
localConnection.add(SUBJECT_1, PREDICATE_2, vf.createLiteral("there was poor charly without a kidney"));
localConnection.commit();
sleepAfterCommitIfNeeded();
}

// prepare the query
Expand Down Expand Up @@ -471,6 +495,7 @@ public void testCharlyTerm() {
localConnection.add(SUBJECT_1, PREDICATE_1, vf.createLiteral("but the unicorn charly said to goaway"));
localConnection.add(SUBJECT_1, PREDICATE_2, vf.createLiteral("there was poor charly without a kidney"));
localConnection.commit();
sleepAfterCommitIfNeeded();
}
// search for the term "charly" in all predicates
StringBuilder buffer = new StringBuilder();
Expand Down Expand Up @@ -529,6 +554,7 @@ public void testCharlyTerm() {

@Test
public void testGraphQuery() throws QueryEvaluationException, MalformedQueryException, RepositoryException {
sleepAfterCommitIfNeeded();
IRI score = vf.createIRI(LuceneSailSchema.NAMESPACE + "score");
StringBuilder query = new StringBuilder();

Expand Down Expand Up @@ -577,6 +603,7 @@ public void testGraphQuery() throws QueryEvaluationException, MalformedQueryExce
@Test
public void testQueryWithSpecifiedSubject()
throws RepositoryException, MalformedQueryException, QueryEvaluationException {
sleepAfterCommitIfNeeded();
// fire a query with the subject pre-specified
TupleQuery query = connection.prepareTupleQuery(QUERY_STRING);
query.setBinding("Subject", SUBJECT_1);
Expand All @@ -594,6 +621,7 @@ public void testQueryWithSpecifiedSubject()

@Test
public void testUnionQuery() throws RepositoryException, MalformedQueryException, QueryEvaluationException {
sleepAfterCommitIfNeeded();
String queryStr = "";
queryStr += "PREFIX search: <http://www.openrdf.org/contrib/lucenesail#> ";
queryStr += "PREFIX rdfs: <http://www.w3.org/2000/01/rdf-schema#> ";
Expand Down Expand Up @@ -626,6 +654,7 @@ public void testContextHandling() {
connection.add(SUBJECT_5, PREDICATE_1, vf.createLiteral("sfiveponectwo"), CONTEXT_2);
connection.add(SUBJECT_5, PREDICATE_2, vf.createLiteral("sfiveptwoctwo"), CONTEXT_2);
connection.commit();
sleepAfterCommitIfNeeded();
// connection.close();
// connection = repository.getConnection();
// connection.setAutoCommit(false);
Expand All @@ -640,6 +669,7 @@ public void testContextHandling() {
// remove a context
connection.clear(CONTEXT_1);
connection.commit();
sleepAfterCommitIfNeeded();
assertNoQueryResult("sfourponecone");
assertNoQueryResult("sfourptwocone");
assertNoQueryResult("sfiveponecone");
Expand All @@ -659,6 +689,7 @@ public void testNullContextHandling() {
connection.add(SUBJECT_5, PREDICATE_1, vf.createLiteral("sfiveponectwo"), CONTEXT_2);
connection.add(SUBJECT_5, PREDICATE_2, vf.createLiteral("sfiveptwoctwo"), CONTEXT_2);
connection.commit();
sleepAfterCommitIfNeeded();
// connection.close();
// connection = repository.getConnection();
// connection.setAutoCommit(false);
Expand All @@ -673,6 +704,7 @@ public void testNullContextHandling() {
// remove a context
connection.clear((Resource) null);
connection.commit();
sleepAfterCommitIfNeeded();
assertNoQueryResult("sfourponecone");
assertNoQueryResult("sfourptwocone");
assertNoQueryResult("sfiveponecone");
Expand All @@ -682,6 +714,7 @@ public void testNullContextHandling() {

@Test
public void testFuzzyQuery() throws MalformedQueryException, RepositoryException, QueryEvaluationException {
sleepAfterCommitIfNeeded();
// prepare the query
// search for the term "one" with 80% fuzzyness
StringBuilder buffer = new StringBuilder();
Expand Down Expand Up @@ -729,11 +762,13 @@ public void testFuzzyQuery() throws MalformedQueryException, RepositoryException
@Test
public void testReindexing() {
sail.reindex();
sleepAfterCommitIfNeeded();
testComplexQueryTwo();
}

@Test
public void testPropertyVar() throws MalformedQueryException, RepositoryException, QueryEvaluationException {
sleepAfterCommitIfNeeded();
StringBuilder buffer = new StringBuilder();
buffer.append("SELECT ?Resource ?Property \n");
buffer.append("WHERE { \n");
Expand Down
Loading
Loading