Skip to content
25 changes: 20 additions & 5 deletions src/commands/protocols/publish/publish-finalization-command.js
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,8 @@
const { id, publishOperationId, merkleRoot, byteSize } = eventData;
const { blockchain, contractAddress } = event;
const operationId = this.operationIdService.generateId();
const ual = this.ualService.deriveUAL(blockchain, contractAddress, id);

this.operationIdService.emitChangeEvent(
OPERATION_ID_STATUS.PUBLISH_FINALIZATION.PUBLISH_FINALIZATION_START,
operationId,
Expand Down Expand Up @@ -71,7 +73,9 @@
assertion = result.assertion;
publisherPeerId = result.remotePeerId;
} catch (error) {
this.logger.error(`Failed to read cached publish data: ${error.message}`); // TODO: Make this log more descriptive
this.logger.error(
`[Cache] Failed to read cached publish data for UAL ${ual} (publishOperationId: ${publishOperationId}, txHash: ${txHash}, operationId: ${operationId}): ${error.message}`,
);
this.operationIdService.emitChangeEvent(
OPERATION_ID_STATUS.FAILED,
operationId,
Expand All @@ -81,8 +85,6 @@
return Command.empty();
}

const ual = this.ualService.deriveUAL(blockchain, contractAddress, id);

try {
await this.validatePublishData(merkleRoot, cachedMerkleRoot, byteSize, assertion, ual);
} catch (e) {
Expand Down Expand Up @@ -185,23 +187,36 @@

async readWithRetries(publishOperationId) {
let attempt = 0;
const datasetPath = this.fileService.getPendingStorageDocumentPath(publishOperationId);

while (attempt < MAX_RETRIES_READ_CACHED_PUBLISH_DATA) {
try {
const datasetPath =
this.fileService.getPendingStorageDocumentPath(publishOperationId);
const stats = await this.fileService.stat(datasetPath);

Check failure on line 194 in src/commands/protocols/publish/publish-finalization-command.js

View workflow job for this annotation

GitHub Actions / lint

Unexpected `await` inside a loop

Check failure on line 194 in src/commands/protocols/publish/publish-finalization-command.js

View workflow job for this annotation

GitHub Actions / lint

Unexpected `await` inside a loop
this.logger.debug(
`[Cache] Cache file present on attempt ${attempt + 1} (publishOperationId: ${publishOperationId}, path: ${datasetPath}, size: ${stats.size} bytes).`,
);

// eslint-disable-next-line no-await-in-loop
const cachedData = await this.fileService.readFile(datasetPath, true);
this.logger.debug(
`[Cache] Read cached publish data on attempt ${attempt + 1} (publishOperationId: ${publishOperationId}, path: ${datasetPath}).`,
);
return cachedData;
} catch (error) {
attempt += 1;
this.logger.warn(
`[Cache] Attempt ${attempt} to read cached publish data failed (publishOperationId: ${publishOperationId}, path: ${datasetPath}): ${error.message}`,
);

// eslint-disable-next-line no-await-in-loop
await new Promise((resolve) => {
setTimeout(resolve, RETRY_DELAY_READ_CACHED_PUBLISH_DATA);
});
}
}
this.logger.error(
`[Cache] Exhausted retries reading cached publish data (publishOperationId: ${publishOperationId}, path: ${datasetPath}).`,
);
// TODO: Mark this operation as failed
throw new Error('Failed to read cached publish data');
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,8 @@ class OtBlazegraph extends OtTripleStore {
`com.bigdata.rdf.store.AbstractTripleStore.quads=true\n` +
`com.bigdata.namespace.${name}.lex.com.bigdata.btree.BTree.branchingFactor=400\n` +
`com.bigdata.rdf.store.AbstractTripleStore.geoSpatial=false\n` +
`com.bigdata.journal.Journal.groupCommit=false\n` +
`com.bigdata.journal.Journal.groupCommit=true\n` +
`com.bigdata.journal.Journal.groupCommitSleepTime=1\n` +
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What does this do?

`com.bigdata.rdf.sail.isolatableIndices=false\n` +
`com.bigdata.rdf.store.AbstractTripleStore.enableRawRecordsSupport=false\n` +
`com.bigdata.rdf.store.AbstractTripleStore.Options.inlineTextLiterals=true\n` +
Expand Down Expand Up @@ -177,12 +178,36 @@ class OtBlazegraph extends OtTripleStore {
}

async queryVoid(repository, query, timeout) {
return axios.post(this.repositories[repository].sparqlEndpoint, query, {
headers: {
'Content-Type': 'application/sparql-update; charset=UTF-8',
'X-BIGDATA-MAX-QUERY-MILLIS': timeout,
},
});
const snippet = query?.slice(0, 80)?.replace(/\s+/g, ' ') || '';
const label = `[OtBlazegraph.queryVoid] ${repository} ${snippet}`;
if (this.logger?.startTimer) this.logger.startTimer(label);
try {
this.logger.debug(`[OtBlazegraph.queryVoid] Sending update to ${repository}`);
const response = await axios.post(this.repositories[repository].sparqlEndpoint, query, {
headers: {
'Content-Type': 'application/sparql-update; charset=UTF-8',
'X-BIGDATA-MAX-QUERY-MILLIS': timeout,
},
});
this.logger.debug(
`[OtBlazegraph.queryVoid] Update succeeded for ${repository} (status: ${response.status})`,
);
return response;
} catch (error) {
const status = error?.response?.status;
const dataSnippet =
typeof error?.response?.data === 'string'
? error.response.data.slice(0, 200)
: '';
this.logger.error(
`[OtBlazegraph.queryVoid] Update failed for ${repository} (status: ${status}): ${error.message}${
dataSnippet ? ` | data: ${dataSnippet}` : ''
} | query: ${snippet || '<empty>'}`,
);
throw error;
} finally {
if (this.logger?.endTimer) this.logger.endTimer(label);
}
}

async deleteRepository(repository) {
Expand Down
45 changes: 39 additions & 6 deletions src/service/triple-store-service.js
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,12 @@ class TripleStoreService {
`to the Triple Store's ${repository} repository.`,
);

const totalInsertLabel = `[TripleStoreService.insertKnowledgeCollection TOTAL] ${repository} ${knowledgeCollectionUAL}`;
const attemptInsertLabel = (attempt) =>
`[TripleStoreService.insertKnowledgeCollection ATTEMPT ${attempt}] ${repository} ${knowledgeCollectionUAL}`;

this.logger.startTimer(totalInsertLabel);

const publicAssertion = triples.public ?? triples;

const filteredPublic = [];
Expand Down Expand Up @@ -272,13 +278,36 @@ class TripleStoreService {
let success = false;

while (attempts < retries && !success) {
const attemptTimerLabel = attemptInsertLabel(attempts + 1);
this.logger.startTimer(attemptTimerLabel);
try {
await this.tripleStoreModuleManager.queryVoid(
this.repositoryImplementations[repository],
repository,
insertQuery,
this.config.modules.tripleStore.timeout.insert,
);
const queryLabel = `[TripleStoreService.insertKnowledgeCollection QUERY] ${repository} ${knowledgeCollectionUAL}`;
this.logger.startTimer(queryLabel);
try {
await this.tripleStoreModuleManager.queryVoid(
this.repositoryImplementations[repository],
repository,
insertQuery,
this.config.modules.tripleStore.timeout.insert,
);
this.logger.debug(
`queryVoid succeeded for repository: ${repository}, UAL: ${knowledgeCollectionUAL}`,
);
} catch (queryError) {
const status = queryError?.response?.status;
const dataSnippet =
typeof queryError?.response?.data === 'string'
? queryError.response.data.slice(0, 200)
: '';
this.logger.error(
`queryVoid failed for repository: ${repository}, UAL: ${knowledgeCollectionUAL}, status: ${status}. ${queryError.message}${
dataSnippet ? ` | data: ${dataSnippet}` : ''
}`,
);
throw queryError;
} finally {
this.logger.endTimer(queryLabel);
}
if (paranetUAL) {
await this.tripleStoreModuleManager.createParanetKnoledgeCollectionConnection(
this.repositoryImplementations[repository],
Expand Down Expand Up @@ -336,14 +365,18 @@ class TripleStoreService {
),
]);

this.logger.endTimer(totalInsertLabel);
throw new Error(
`Failed to store Knowledge Collection with the UAL: ${knowledgeCollectionUAL} ` +
`to the Triple Store's ${repository} repository after maximum retries. Error ${error}`,
);
}
} finally {
this.logger.endTimer(attemptTimerLabel);
}
}

this.logger.endTimer(totalInsertLabel);
return totalNumberOfTriplesInserted;
}

Expand Down
Loading