Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 6 additions & 1 deletion docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,12 @@ services:
max-file: "3"

rdf4j:
image: nanopub/rdf4j-workbench:5.1.3-SNAPSHOT
# Image with additional fixes:
# - JDK 25 for better performance
# - Fixed issue with federated query deadlocks
# - Allowed for disabling transactions in text repos for faster writes
# Source branch: https://github.com/Ostrzyciel/rdf4j/tree/piotr/merged-fixes-2025-12-10
image: ostrzyciel/rdf4j-workbench-tomcat:5.2.2-SNAPSHOT-b
# image: eclipse/rdf4j-workbench:5.0.3
restart: unless-stopped
environment:
Expand Down
21 changes: 9 additions & 12 deletions src/main/java/com/knowledgepixels/query/MetricsCollector.java
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import io.micrometer.core.instrument.Gauge;
import io.micrometer.core.instrument.MeterRegistry;

import java.util.Collections;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
Expand Down Expand Up @@ -50,28 +51,24 @@ public MetricsCollector(MeterRegistry meterRegistry) {
public void updateMetrics() {
// Update numeric metrics
loadCounter.set((int) StatusController.get().getState().loadCounter);
// Request repository names once, to avoid multiple calls
var repoNames = TripleStore.get().getRepositoryNames();
if (repoNames == null) {
repoNames = Set.of();
}
typeRepositoriesCounter.set(
(int) Optional
.ofNullable(TripleStore.get().getRepositoryNames())
.orElse(Set.of())
(int) repoNames
.stream()
.filter(repo -> repo.startsWith("type_"))
.count()
);
pubkeyRepositoriesCounter.set(
(int) Optional
.ofNullable(TripleStore.get().getRepositoryNames())
.orElse(Set.of())
(int) repoNames
.stream()
.filter(repo -> repo.startsWith("pubkey_"))
.count()
);
fullRepositoriesCounter.set(
Optional
.ofNullable(TripleStore.get().getRepositoryNames())
.orElse(Set.of())
.size()
);
fullRepositoriesCounter.set(repoNames.size());

// Update status gauge
final var currentStatus = StatusController.get().getState().state;
Expand Down
138 changes: 114 additions & 24 deletions src/main/java/com/knowledgepixels/query/TripleStore.java
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@
import java.io.InputStreamReader;
import java.util.*;
import java.util.Map.Entry;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;

/**
* Class to access the database in the form of triple stores.
Expand Down Expand Up @@ -172,10 +174,55 @@ private void createRepo(String repoName) {
indexTypes = "spoc,posc,ospc";
}

String createRegularRepoQueryString = "@prefix rdfs: <http://www.w3.org/2000/01/rdf-schema#>.\n" + "@prefix rep: <http://www.openrdf.org/config/repository#>.\n" + "@prefix sr: <http://www.openrdf.org/config/repository/sail#>.\n" + "@prefix sail: <http://www.openrdf.org/config/sail#>.\n" + "@prefix sail-luc: <http://www.openrdf.org/config/sail/lucene#>.\n" + "@prefix lmdb: <http://rdf4j.org/config/sail/lmdb#>.\n" + "@prefix sb: <http://www.openrdf.org/config/sail/base#>.\n" + "\n" + "[] a rep:Repository ;\n" + " rep:repositoryID \"" + repoName + "\" ;\n" + " rdfs:label \"" + repoName + " LMDB store\" ;\n" + " rep:repositoryImpl [\n" + " rep:repositoryType \"openrdf:SailRepository\" ;\n" + " sr:sailImpl [\n" + " sail:sailType \"rdf4j:LmdbStore\" ;\n" + " sail:iterationCacheSyncThreshold \"10000\";\n" + " lmdb:tripleIndexes \"" + indexTypes + "\" ;\n" + " sb:defaultQueryEvaluationMode \"STANDARD\"\n" + " ]\n" + " ].\n";
String createRegularRepoQueryString =
"@prefix rdfs: <http://www.w3.org/2000/01/rdf-schema#>.\n" +
"@prefix rep: <http://www.openrdf.org/config/repository#>.\n" +
"@prefix sr: <http://www.openrdf.org/config/repository/sail#>.\n" +
"@prefix sail: <http://www.openrdf.org/config/sail#>.\n" +
"@prefix sail-luc: <http://www.openrdf.org/config/sail/lucene#>.\n" +
"@prefix lmdb: <http://rdf4j.org/config/sail/lmdb#>.\n" +
"@prefix sb: <http://www.openrdf.org/config/sail/base#>.\n" +
"\n" +
"[] a rep:Repository ;\n" +
" rep:repositoryID \"" + repoName + "\" ;\n" +
" rdfs:label \"" + repoName + " LMDB store\" ;\n" +
" rep:repositoryImpl [\n" +
" rep:repositoryType \"openrdf:SailRepository\" ;\n" +
" sr:sailImpl [\n" +
" sail:sailType \"rdf4j:LmdbStore\" ;\n" +
" sail:iterationCacheSyncThreshold \"10000\";\n" +
" lmdb:tripleIndexes \"" + indexTypes + "\" ;\n" +
" sb:defaultQueryEvaluationMode \"STANDARD\"\n" +
" ]\n"
+ " ].\n";

// TODO Index npa:hasFilterLiteral predicate too (see https://groups.google.com/g/rdf4j-users/c/epF4Af1jXGU):
String createTextRepoQueryString = "@prefix rdfs: <http://www.w3.org/2000/01/rdf-schema#>.\n" + "@prefix rep: <http://www.openrdf.org/config/repository#>.\n" + "@prefix sr: <http://www.openrdf.org/config/repository/sail#>.\n" + "@prefix sail: <http://www.openrdf.org/config/sail#>.\n" + "@prefix sail-luc: <http://www.openrdf.org/config/sail/lucene#>.\n" + "@prefix lmdb: <http://rdf4j.org/config/sail/lmdb#>.\n" + "@prefix sb: <http://www.openrdf.org/config/sail/base#>.\n" + "\n" + "[] a rep:Repository ;\n" + " rep:repositoryID \"" + repoName + "\" ;\n" + " rdfs:label \"" + repoName + " store\" ;\n" + " rep:repositoryImpl [\n" + " rep:repositoryType \"openrdf:SailRepository\" ;\n" + " sr:sailImpl [\n" + " sail:sailType \"openrdf:LuceneSail\" ;\n" + " sail-luc:indexDir \"index/\" ;\n" + " sail:delegate [" + " sail:sailType \"rdf4j:LmdbStore\" ;\n" + " sail:iterationCacheSyncThreshold \"10000\";\n" + " lmdb:tripleIndexes \"" + indexTypes + "\" ;\n" + " sb:defaultQueryEvaluationMode \"STANDARD\"\n" + " ]\n" + " ]\n" + " ].";
String createTextRepoQueryString =
"@prefix rdfs: <http://www.w3.org/2000/01/rdf-schema#>.\n" +
"@prefix rep: <http://www.openrdf.org/config/repository#>.\n" +
"@prefix sr: <http://www.openrdf.org/config/repository/sail#>.\n" +
"@prefix sail: <http://www.openrdf.org/config/sail#>.\n" +
"@prefix sail-luc: <http://www.openrdf.org/config/sail/lucene#>.\n" +
"@prefix lmdb: <http://rdf4j.org/config/sail/lmdb#>.\n" +
"@prefix sb: <http://www.openrdf.org/config/sail/base#>.\n" +
"\n"
+ "[] a rep:Repository ;\n" +
" rep:repositoryID \"" + repoName + "\" ;\n" +
" rdfs:label \"" + repoName + " store\" ;\n" +
" rep:repositoryImpl [\n" +
" rep:repositoryType \"openrdf:SailRepository\" ;\n" +
" sr:sailImpl [\n" +
" sail:sailType \"openrdf:LuceneSail\" ;\n" +
" sail-luc:indexDir \"index/\" ;\n" +
" sail-luc:transactional false ;\n" +
" sail:delegate [\n" +
" sail:sailType \"rdf4j:LmdbStore\" ;\n" +
" sail:iterationCacheSyncThreshold \"10000\";\n" +
" lmdb:tripleIndexes \"" + indexTypes + "\" ;\n" +
" sb:defaultQueryEvaluationMode \"STANDARD\"\n" +
" ]\n" +
" ]\n" +
" ].";

String createRepoQueryString = createRegularRepoQueryString;
if (repoName.startsWith("text")) {
Expand Down Expand Up @@ -225,37 +272,78 @@ public RepositoryConnection getAdminRepoConnection() {
return get().getRepoConnection(ADMIN_REPO);
}

private Set<String> cachedRepositoryNames = Set.of();
private boolean repoNamesCacheValid = false;
private final ReadWriteLock repoNamesCacheLock = new ReentrantReadWriteLock();

/**
* Returns set of all repository names.
*
* @return Repository name set
*/
public Set<String> getRepositoryNames() {
Map<String, Boolean> repositoryNames = null;
try (CloseableHttpClient httpclient = HttpClients.createDefault()) {
HttpResponse resp = httpclient.execute(RequestBuilder.get()
.setUri(endpointBase + "/repositories")
.addHeader("Content-Type", "text/csv")
.build());
BufferedReader reader = new BufferedReader(new InputStreamReader(resp.getEntity().getContent()));
int code = resp.getStatusLine().getStatusCode();
if (code < 200 || code >= 300) return null;
repositoryNames = new HashMap<>();
int lineCount = 0;
while (true) {
String line = reader.readLine();
if (line == null) break;
if (lineCount > 0) {
String repoName = line.split(",")[1];
repositoryNames.put(repoName, true);
// See if the repository names are cached:
final var readLock = repoNamesCacheLock.readLock();
try {
readLock.lock();
if (repoNamesCacheValid) {
return cachedRepositoryNames;
}
} finally {
readLock.unlock();
}

// Not cached, get from server:
final var writeLock = repoNamesCacheLock.writeLock();
try {
writeLock.lock();
// Check again if another thread has already updated the cache:
if (repoNamesCacheValid) {
return cachedRepositoryNames;
}
Map<String, Boolean> repositoryNames = null;
try (CloseableHttpClient httpclient = HttpClients.createDefault()) {
HttpResponse resp = httpclient.execute(RequestBuilder.get()
.setUri(endpointBase + "/repositories")
.addHeader("Content-Type", "text/csv")
.build());
BufferedReader reader = new BufferedReader(new InputStreamReader(resp.getEntity().getContent()));
int code = resp.getStatusLine().getStatusCode();
if (code < 200 || code >= 300) return null;
repositoryNames = new HashMap<>();
int lineCount = 0;
while (true) {
String line = reader.readLine();
if (line == null) break;
if (lineCount > 0) {
String repoName = line.split(",")[1];
repositoryNames.put(repoName, true);
}
lineCount = lineCount + 1;
}
lineCount = lineCount + 1;
} catch (IOException ex) {
log.info("Could not get repository names.", ex);
return null;
}
} catch (IOException ex) {
log.info("Could not get repository names.", ex);
return null;
cachedRepositoryNames = repositoryNames.keySet();
repoNamesCacheValid = true;
return cachedRepositoryNames;
} finally {
writeLock.unlock();
}
}

/**
* Invalidates the repository names cache. Call this method when a repository is created or deleted.
*/
private void invalidateRepositoryNamesCache() {
final var writeLock = repoNamesCacheLock.writeLock();
try {
writeLock.lock();
repoNamesCacheValid = false;
} finally {
writeLock.unlock();
}
return repositoryNames.keySet();
}

@GeneratedFlagForDependentElements
Expand All @@ -278,6 +366,8 @@ private void initNewRepo(String repoName) {
}
conn.commit();
}
// Refresh repository names cache
invalidateRepositoryNamesCache();
}
}

Expand Down
63 changes: 62 additions & 1 deletion src/test/java/com/knowledgepixels/query/TripleStoreTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -8,18 +8,44 @@
import org.apache.http.impl.client.HttpClients;
import org.eclipse.rdf4j.repository.Repository;
import org.junit.jupiter.api.Test;
import org.junit.platform.commons.support.HierarchyTraversalMode;
import org.junit.platform.commons.support.ReflectionSupport;
import org.mockito.MockSettings;
import org.mockito.internal.creation.MockSettingsImpl;

import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.util.Set;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;

import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNull;
import static org.mockito.Mockito.*;

class TripleStoreTest {

/**
* Initializes the repoNamesCacheLock field in TripleStore mock.
* Without this, all calls to getRepositoryNames() would result in a NullPointerException.
*/
private ReentrantReadWriteLock initRepoNamesCacheLock(TripleStore mock) {
final var readWriteLock = new ReentrantReadWriteLock();
final var lockField = ReflectionSupport.findFields(
TripleStore.class,
f -> f.getName().equals("repoNamesCacheLock"),
HierarchyTraversalMode.TOP_DOWN
).getFirst();
lockField.setAccessible(true);
try {
lockField.set(mock, readWriteLock);
return readWriteLock;
} catch (IllegalAccessException e) {
throw new RuntimeException(e);
}
}

@Test
void getRepoConnectionWithValidRepo() {
TripleStore mock = mock(TripleStore.class);
Expand All @@ -36,8 +62,9 @@ void getRepoConnectionWithInvalidRepo() {
}

@Test
void getRepositoryNamesHandlesIOException() throws IOException {
void getRepositoryNamesHandlesIOException() throws IOException, IllegalAccessException {
TripleStore mock = mock(TripleStore.class, CALLS_REAL_METHODS);
ReentrantReadWriteLock repoNamesCacheLock = initRepoNamesCacheLock(mock);
CloseableHttpClient httpClientMock = mock(CloseableHttpClient.class);

when(httpClientMock.execute(any(HttpUriRequest.class))).thenThrow(new IOException());
Expand All @@ -46,11 +73,14 @@ void getRepositoryNamesHandlesIOException() throws IOException {
mockedStatic.when(HttpClients::createDefault).thenReturn(httpClientMock);
assertNull(mock.getRepositoryNames());
}
assertEquals(0, repoNamesCacheLock.getReadLockCount());
assertEquals(0, repoNamesCacheLock.getWriteHoldCount());
}

@Test
void getRepositoryNamesReturnsNullForNonValidResponse() throws IOException {
TripleStore mock = mock(TripleStore.class, CALLS_REAL_METHODS);
ReentrantReadWriteLock repoNamesCacheLock = initRepoNamesCacheLock(mock);
CloseableHttpClient httpClientMock = mock(CloseableHttpClient.class);
CloseableHttpResponse responseMock = mock(CloseableHttpResponse.class);

Expand All @@ -64,11 +94,14 @@ void getRepositoryNamesReturnsNullForNonValidResponse() throws IOException {
mockedStatic.when(HttpClients::createDefault).thenReturn(httpClientMock);
assertNull(mock.getRepositoryNames());
}
assertEquals(0, repoNamesCacheLock.getReadLockCount());
assertEquals(0, repoNamesCacheLock.getWriteHoldCount());
}

@Test
void getRepositoryNamesReturnsSetOfRepositoryNames() throws IOException {
TripleStore mock = mock(TripleStore.class, CALLS_REAL_METHODS);
ReentrantReadWriteLock repoNamesCacheLock = initRepoNamesCacheLock(mock);
CloseableHttpClient httpClientMock = mock(CloseableHttpClient.class);
CloseableHttpResponse responseMock = mock(CloseableHttpResponse.class);

Expand All @@ -85,6 +118,34 @@ void getRepositoryNamesReturnsSetOfRepositoryNames() throws IOException {
Set<String> result = mock.getRepositoryNames();
assertEquals(Set.of("repo1", "repo2"), result);
}
assertEquals(0, repoNamesCacheLock.getReadLockCount());
assertEquals(0, repoNamesCacheLock.getWriteHoldCount());
}

@Test
void getRepositoryNamesCachesResult() throws IOException {
TripleStore mock = mock(TripleStore.class, CALLS_REAL_METHODS);
ReentrantReadWriteLock repoNamesCacheLock = initRepoNamesCacheLock(mock);
CloseableHttpClient httpClientMock = mock(CloseableHttpClient.class);
CloseableHttpResponse responseMock = mock(CloseableHttpResponse.class);

when(httpClientMock.execute(any(HttpUriRequest.class))).thenReturn(responseMock);
when(responseMock.getEntity()).thenReturn(mock(HttpEntity.class));
String content = "id,name\n1,repo1\n2,repo2\n";
when(responseMock.getEntity().getContent()).thenReturn(new ByteArrayInputStream(content.getBytes()));

when(responseMock.getStatusLine()).thenReturn(mock(StatusLine.class));
when(responseMock.getStatusLine().getStatusCode()).thenReturn(200);

try (var mockedStatic = mockStatic(HttpClients.class)) {
mockedStatic.when(HttpClients::createDefault).thenReturn(httpClientMock);
Set<String> firstCallResult = mock.getRepositoryNames();
Set<String> secondCallResult = mock.getRepositoryNames();
assertEquals(Set.of("repo1", "repo2"), firstCallResult);
assertEquals(firstCallResult, secondCallResult);
verify(httpClientMock, times(1)).execute(any(HttpUriRequest.class));
}
assertEquals(0, repoNamesCacheLock.getReadLockCount());
assertEquals(0, repoNamesCacheLock.getWriteHoldCount());
}
}