diff --git a/bin/cli.ts b/bin/cli.ts index 0f37489..d352bf3 100644 --- a/bin/cli.ts +++ b/bin/cli.ts @@ -1,11 +1,15 @@ #!/usr/bin/env node -import * as process from "process"; -import { Ordered, replicateLDES } from "../lib/client"; -import { intoConfig } from "../lib/config"; +import * as process from "process";; import { Command, Option } from "commander"; import { Writer } from "n3"; -import { enhanced_fetch, FetchConfig, processConditionFile } from "../lib/utils"; -import { getLoggerFor } from "../lib/utils/logUtil"; +import { replicateLDES } from "../lib/client"; +import { intoConfig } from "../lib/config" +import { enhanced_fetch } from "../lib/fetcher"; +import { processConditionFile } from "../lib/condition"; +import { getLoggerFor } from "../lib/utils"; + +import type { Ordered } from "../lib/strategy"; +import type { FetchConfig } from "../lib/fetcher"; const program = new Command(); let paramURL: string = ""; diff --git a/lib/client.ts b/lib/client.ts index 8adeee2..f990630 100644 --- a/lib/client.ts +++ b/lib/client.ts @@ -1,39 +1,38 @@ -import { Config, intoConfig } from "./config"; -import { Fragment, Member } from "./page"; import { RdfDereferencer, rdfDereferencer } from "rdf-dereference"; -import { FileStateFactory, NoStateFactory, StateFactory } from "./state"; +import { LDES, RDF, TREE } from "@treecg/types"; import { CBDShapeExtractor } from "extract-cbd-shape"; import { RdfStore } from "rdf-stores"; import { DataFactory } from "rdf-data-factory"; -import { Term } from "@rdfjs/types"; +import { intoConfig } from "./config"; +import { handleConditions } from "./condition"; +import { FileStateFactory, NoStateFactory } from "./state"; +import { OrderedStrategy, UnorderedStrategy } from "./strategy"; +import { + ModulatorFactory, + Fetcher, + longPromise, + resetPromise, + Manager, + maybeVersionMaterialize, +} from "./fetcher"; import { extractMainNodeShape, getObjects, - handleConditions, - maybeVersionMaterialize, - ModulatorFactory, - Notifier, streamToArray, + getLoggerFor, + handleExit } from "./utils"; -import { LDES, RDF, TREE } from "@treecg/types"; -import { FetchedPage, Fetcher, longPromise, resetPromise } from "./pageFetcher"; -import { Manager } from "./memberManager"; -import { OrderedStrategy, StrategyEvents, UnorderedStrategy } from "./strategy"; -import { getLoggerFor } from "./utils/logUtil"; -import { handleExit } from "./exitHandler"; - -export { intoConfig } from "./config"; -export { enhanced_fetch, extractMainNodeShape, retry_fetch } from "./utils"; -export * from "./condition/index"; -export type { Member, Page, Relation, Fragment } from "./page"; -export type { Config, ShapeConfig } from "./config"; + +import type { Term } from "@rdfjs/types"; +import type { Config } from "./config"; +import type { StateFactory } from "./state"; +import type { Ordered, StrategyEvents } from "./strategy"; +import type { LDESInfo, Notifier, FetchedPage, Fragment, Member } from "./fetcher"; const df = new DataFactory(); type Controller = ReadableStreamDefaultController; -export type Ordered = "ascending" | "descending" | "none"; - export function replicateLDES( config: Partial & { url: string }, ordered: Ordered = "none", @@ -43,13 +42,6 @@ export function replicateLDES( return new Client(intoConfig(config), ordered, dereferencer, streamId); } -export type LDESInfo = { - shape: Term; - extractor: CBDShapeExtractor; - timestampPath?: Term; - versionOfPath?: Term; -}; - async function getInfo( ldesId: Term, viewId: Term, @@ -251,7 +243,6 @@ export class Client { async init( emit: (member: Member) => void, close: () => void, - factory: ModulatorFactory, ): Promise { // Fetch the url const root = await fetchPage( @@ -410,7 +401,7 @@ export class Client { this.memberManager, this.fetcher, notifier, - factory, + this.modulatorFactory, this.ordered, this.config.polling, this.config.pollInterval, @@ -419,7 +410,7 @@ export class Client { this.memberManager, this.fetcher, notifier, - factory, + this.modulatorFactory, this.config.polling, this.config.pollInterval, ); @@ -455,7 +446,6 @@ export class Client { resetPromise(emitted); }, () => controller.close(), - this.modulatorFactory, ); }, diff --git a/lib/condition/condition.ts b/lib/condition/condition.ts index 020198d..779930c 100644 --- a/lib/condition/condition.ts +++ b/lib/condition/condition.ts @@ -1,13 +1,18 @@ -import { Quad, Term } from "@rdfjs/types"; import { NamedNode, Parser } from "n3"; -import { BasicLensM, Cont, extractShapes } from "rdf-lens"; +import { DataFactory } from "rdf-data-factory"; +import { BasicLensM, extractShapes, pred } from "rdf-lens"; import { RdfStore } from "rdf-stores"; -import { Member } from "../page"; import { TREE, XSD } from "@treecg/types"; +import { getLoggerFor, parseInBetweenRelation } from "../utils"; import { SHAPES } from "./shapes"; -import { cbdEquals, Path } from "./range"; -import { getLoggerFor } from "../utils/logUtil"; -import { parseInBetweenRelation } from "../utils/inBetween"; +import { cbdEquals } from "./range"; + +import type { Quad, Term } from "@rdfjs/types"; +import type { Cont } from "rdf-lens"; +import type { Member } from "../fetcher"; +import type { Path } from "./range"; + +const df = new DataFactory(); type RdfThing = { entry: Term; @@ -470,3 +475,104 @@ export class MaxCountCondition implements Condition { } } } + +export async function processConditionFile( + conditionFile?: string, +): Promise { + let condition: Condition = empty_condition(); + + /* eslint-disable @typescript-eslint/no-require-imports */ + const fs = + typeof require === "undefined" + ? await import("fs/promises") + : require("fs/promises"); + + if (conditionFile) { + try { + condition = parse_condition( + await fs.readFile(conditionFile, { encoding: "utf8" }), + conditionFile, + ); + } catch (ex) { + console.error(`Failed to read condition file: ${conditionFile}`); + throw ex; + } + } + + return condition; +} + +/** + * Function that handles any given condition, together with the "before" and "after" options, + * and builds the corresponding unified Condition. + */ +export function handleConditions( + condition: Condition, + defaultTimezone: string, + before?: Date, + after?: Date, + timestampPath?: Term, +): Condition { + // Check if before and after conditions are defined and build corresponding Condition object + let handledCondition: Condition = empty_condition(); + const toDateLiteral = (date: Date) => { + return df.literal(date.toISOString(), XSD.terms.dateTime); + }; + + if (before) { + if (!timestampPath) { + throw "Cannot apply 'before' or 'after' filters since the target LDES does not define a ldes:timestampPath predicate"; + } + + const predLens = pred(timestampPath); + const beforeCond = new LeafCondition({ + relationType: TREE.terms.LessThanRelation, + value: toDateLiteral(before), + compareType: "date", + path: predLens, + pathQuads: { entry: timestampPath, quads: [] }, + defaultTimezone, + }); + if (after) { + const afterCond = new LeafCondition({ + relationType: TREE.terms.GreaterThanRelation, + value: toDateLiteral(after), + compareType: "date", + path: predLens, + pathQuads: { entry: timestampPath, quads: [] }, + defaultTimezone, + }); + // Got bi-condition with before & after filters + handledCondition = new AndCondition({ + items: [beforeCond, afterCond], + }); + } else { + // Got condition with before filter only + handledCondition = beforeCond; + } + } else if (after) { + if (!timestampPath) { + throw "Cannot apply 'before' or 'after' filters since the target LDES does not define a ldes:timestampPath predicate"; + } + + const predLens = pred(timestampPath); + // Got condition with after filter only + handledCondition = new LeafCondition({ + relationType: TREE.terms.GreaterThanRelation, + value: toDateLiteral(after), + compareType: "date", + path: predLens, + pathQuads: { entry: timestampPath, quads: [] }, + defaultTimezone, + }); + } + + // See if condition file was defined too + if (!(condition instanceof EmptyCondition)) { + return new AndCondition({ + items: [condition, handledCondition], + }); + } else { + return handledCondition; + } +} \ No newline at end of file diff --git a/lib/condition/index.ts b/lib/condition/index.ts index bc3dd77..1a3c6f5 100644 --- a/lib/condition/index.ts +++ b/lib/condition/index.ts @@ -1 +1,3 @@ export * from "./condition"; +export * from "./range"; +export * from "./shapes"; diff --git a/lib/condition/range.ts b/lib/condition/range.ts index c88b9c5..3d49188 100644 --- a/lib/condition/range.ts +++ b/lib/condition/range.ts @@ -1,8 +1,10 @@ -import { Quad, Term } from "@rdfjs/types"; import { RdfStore } from "rdf-stores"; -import { getObjects } from "../utils"; import { RDF, TREE } from "@treecg/types"; -import { Condition, Range } from "./condition"; +import { getObjects } from "../utils"; +import { Range } from "./condition"; + +import type { Quad, Term } from "@rdfjs/types"; +import type { Condition } from "./condition"; export type Path = { store: RdfStore; diff --git a/lib/config.ts b/lib/config.ts index 859b1e0..716f5c1 100644 --- a/lib/config.ts +++ b/lib/config.ts @@ -1,5 +1,7 @@ -import { NamedNode, Quad } from "@rdfjs/types"; -import { Condition, empty_condition as emptyCondition } from "./condition"; +import { empty_condition as emptyCondition } from "./condition"; + +import type { NamedNode, Quad } from "@rdfjs/types"; +import type { Condition } from "./condition"; export interface ShapeConfig { quads: Quad[]; diff --git a/lib/fetcher/enhancedFetch.ts b/lib/fetcher/enhancedFetch.ts new file mode 100644 index 0000000..222dfc2 --- /dev/null +++ b/lib/fetcher/enhancedFetch.ts @@ -0,0 +1,170 @@ +import { urlToUrl, getLoggerFor } from "../utils"; + +const logger = getLoggerFor("EnhancedFetch"); + +export type AuthConfig = { + type: "basic"; + auth: string; + host: string; +}; + +export type RetryConfig = { + codes: number[]; + base: number; + maxRetries: number; +}; + +export type FetchConfig = { + auth?: AuthConfig; + concurrent?: number; + retry?: Partial; + safe?: boolean; +}; + +export function enhanced_fetch( + config: FetchConfig, + start?: typeof fetch, +): typeof fetch { + const start_f = start || fetch; + const safe_f = config.safe + ? ((async (a, b) => { + while (true) { + try { + return await start_f(a, b); + } catch (ex) { + logger.debug( + `This should not happen, it will not happen this is safe. ${JSON.stringify( + ex, + )}`, + ); + } + } + }) as typeof fetch) + : start_f; + + const fetch_f = config.auth + ? handle_basic_auth(safe_f, config.auth) + : safe_f; + + return limit_fetch_per_domain( + retry_fetch(fetch_f, config.retry || {}), + config.concurrent, + ); +} + +export function limit_fetch_per_domain( + fetch_f: typeof fetch, + concurrent: number = 10, +): typeof fetch { + const domain_dict: { [domain: string]: Array<(value: void) => void> } = {}; + + const out: typeof fetch = async (input, init) => { + const url: URL = urlToUrl(input); + const domain = url.origin; + + if (!(domain in domain_dict)) { + domain_dict[domain] = []; + } + + const requests = domain_dict[domain]; + await new Promise((res) => { + logger.debug( + `[limit] ${domain} capacity ${requests.length}/${concurrent}`, + ); + if (requests.length < concurrent) { + requests.push(res); + res({}); + } else { + requests.push(res); + } + }); + const resp = await fetch_f(input, init); + + requests.shift(); + for (let i = 0; i < concurrent; i++) { + if (requests[i]) { + requests[i](); + } + } + + return resp; + }; + + return out; +} + +export function handle_basic_auth( + fetch_f: typeof fetch, + config: AuthConfig, +): typeof fetch { + let authRequired = false; + + const basicAuthValue = `Basic ${Buffer.from(config.auth).toString("base64")}`; + const setHeader = (init?: RequestInit): RequestInit => { + const reqInit = init || {}; + const headers = new Headers(reqInit.headers); + headers.set("Authorization", basicAuthValue); + reqInit.headers = headers; + return reqInit; + }; + + const auth_f: typeof fetch = async (input, init) => { + const url: URL = urlToUrl(input); + if (authRequired && url.host === config.host) { + return await fetch_f(input, setHeader(init)); + } + + const resp = await fetch_f(input, init); + if (resp.status === 401) { + logger.debug("[auth] Unauthorized, adding basic auth"); + if (url.host === config.host) { + authRequired = true; + return await fetch_f(input, setHeader(init)); + } + } + + return resp; + }; + + return auth_f; +} + +export function retry_fetch( + fetch_f: typeof fetch, + partial_config: Partial, +): typeof fetch { + const config: RetryConfig = Object.assign( + { + codes: [408, 425, 429, 500, 502, 503, 504], + base: 500, + maxRetries: 5, + }, + partial_config, + ); + + const retry: typeof fetch = async (input, init) => { + let tryCount = 0; + let retryTime = config.maxRetries; + while (config.maxRetries == 0 || tryCount < config.maxRetries) { + const resp = await fetch_f(input, init); + if (!resp.ok) { + if (config.codes.some((x) => x == resp.status)) { + logger.debug( + `[retry_fetch] Retry ${input} ${tryCount}/${config.maxRetries}`, + ); + // Wait 500ms, 1 second, 2 seconds, 4 seconds, 8 seconds, fail + tryCount += 1; + await new Promise((res) => setTimeout(res, retryTime)); + retryTime *= 2; + continue; + } + return resp; + } + return resp; + } + + throw `Max retries exceeded (${config.maxRetries})`; + }; + + return retry; +} \ No newline at end of file diff --git a/lib/fetcher/index.ts b/lib/fetcher/index.ts new file mode 100644 index 0000000..0207284 --- /dev/null +++ b/lib/fetcher/index.ts @@ -0,0 +1,6 @@ +export * from "./modulator"; +export * from "./relation"; +export * from "./page"; +export * from "./pageFetcher"; +export * from "./enhancedFetch"; +export * from "./memberManager"; \ No newline at end of file diff --git a/lib/memberManager.ts b/lib/fetcher/memberManager.ts similarity index 62% rename from lib/memberManager.ts rename to lib/fetcher/memberManager.ts index 919b37f..641da80 100644 --- a/lib/memberManager.ts +++ b/lib/fetcher/memberManager.ts @@ -1,15 +1,22 @@ -import { Quad, Term } from "@rdfjs/types"; -import { Fragment, Member } from "./page"; -import { FetchedPage } from "./pageFetcher"; import { CBDShapeExtractor } from "extract-cbd-shape"; -import { DC, LDES, TREE } from "@treecg/types"; -import { LDESInfo } from "./client"; -import { getObjects, memberFromQuads, Notifier } from "./utils"; +import { RDF, DC, LDES, TREE } from "@treecg/types"; import { RdfStore } from "rdf-stores"; -import { getLoggerFor } from "./utils/logUtil"; -import { DataFactory } from "n3"; +import { DataFactory } from "rdf-data-factory"; +import { getObjects, getLoggerFor } from "../utils"; -const { namedNode } = DataFactory; +import type { Quad, Term, Quad_Subject, Quad_Predicate } from "@rdfjs/types"; +import type { Notifier } from "./modulator"; +import type { Fragment, Member } from "./page"; +import type { FetchedPage } from "./pageFetcher"; + +const { quad, namedNode, defaultGraph } = new DataFactory(); + +export type LDESInfo = { + shape: Term; + extractor: CBDShapeExtractor; + timestampPath?: Term; + versionOfPath?: Term; +}; export interface Options { ldesId?: Term; @@ -34,6 +41,98 @@ export type MemberEvents = { error: Error; }; +export function memberFromQuads( + member: Term, + quads: Quad[], + timestampPath: Term | undefined, + isVersionOfPath: Term | undefined, + created?: Date, +): Member { + // Get timestamp + let timestamp: string | Date | undefined; + if (timestampPath) { + const ts = quads.find( + (x) => + x.subject.equals(member) && x.predicate.equals(timestampPath), + )?.object.value; + if (ts) { + try { + timestamp = new Date(ts); + } catch (ex: unknown) { + timestamp = ts; + } + } + } + + // Get isVersionof + let isVersionOf: string | undefined; + if (isVersionOfPath) { + isVersionOf = quads.find( + (x) => + x.subject.equals(member) && x.predicate.equals(isVersionOfPath), + )?.object.value; + } + + // Get type + const type: Term | undefined = quads.find( + (x) => x.subject.equals(member) && x.predicate.value === RDF.type, + )?.object; + return { quads, id: member, isVersionOf, timestamp, type, created }; +} + +/** + * Version materialization function that sets the declared ldes:versionOfPath property value + * as the member's subject IRI + */ +export function maybeVersionMaterialize( + member: Member, + materialize: boolean, + ldesInfo: LDESInfo, +): Member { + if (materialize && ldesInfo.versionOfPath) { + // Create RDF store with member quads + const memberStore = RdfStore.createDefault(); + member.quads.forEach((q) => memberStore.addQuad(q)); + // Get materialized subject IRI + const newSubject = getObjects( + memberStore, + member.id, + ldesInfo.versionOfPath, + )[0]; + if (newSubject) { + // Remove version property + memberStore.removeQuad( + quad( + member.id, + ldesInfo.versionOfPath, + newSubject, + defaultGraph(), + ), + ); + // Updated all quads with materialized subject + for (const q of memberStore.getQuads(member.id)) { + const newQ = quad( + newSubject, + q.predicate, + q.object, + q.graph, + ); + memberStore.removeQuad(q); + memberStore.addQuad(newQ); + } + // Update member object + member.id = newSubject; + member.quads = memberStore.getQuads(); + } else { + console.error( + `No version property found in Member (${member.id}) as specified by ldes:isVersionOfPath ${ldesInfo.versionOfPath}`, + ); + } + } + + return member; +} + export class Manager { public queued: number = 0; diff --git a/lib/fetcher/modulator.ts b/lib/fetcher/modulator.ts new file mode 100644 index 0000000..2bd6afd --- /dev/null +++ b/lib/fetcher/modulator.ts @@ -0,0 +1,205 @@ +import { StateT } from "../state"; +import { getLoggerFor } from "../utils/"; + +import type { StateFactory } from "../state"; + +export type Notifier = { + [K in keyof Events]: (event: Events[K], state: S) => void; +}; + +/** + * Generic interface that represents a structure that ranks elements. + * Most common is a Priority Queue (heap like) that pops elements in order. + * An array is also a Ranker, without ordering. + */ +export interface Ranker { + push(item: T): void; + pop(): T | undefined; +} + +export type ModulartorEvents = { + ready: Indexed; +}; + +/** + * Factory that creates Modulator's + * This is a factory to keep track whether or not the Modulator should be paused or not. + */ +export class ModulatorFactory { + concurrent = 10; + paused: boolean = false; + + factory: StateFactory; + children: ModulatorInstance[] = []; + + constructor(stateFactory: StateFactory, concurrent?: number) { + this.factory = stateFactory; + if (concurrent) { + this.concurrent = concurrent; + } + } + + /** + * Note: `T` should be plain javascript objects (because that how state is saved) + */ + create( + name: string, + ranker: Ranker>, + notifier: Notifier, unknown>, + parse?: (item: unknown) => T, + ): Modulator { + const state = this.factory.build>( + name, + JSON.stringify, + JSON.parse, + () => ({ + todo: [], + inflight: [], + }), + ); + + if (parse) { + state.item.todo = state.item.todo.map(({ item, index }) => ({ + index, + item: parse(item), + })); + state.item.inflight = state.item.inflight.map( + ({ item, index }) => ({ + index, + item: parse(item), + }), + ); + } + + const modulator = new ModulatorInstance(state, ranker, notifier, this); + this.children.push(>modulator); + return modulator; + } + + pause() { + this.paused = true; + } + + unpause() { + this.paused = false; + this.children.forEach((x) => x.checkReady()); + } +} + +/** + * Modulator is a structure that only buffers elements and only handles elements + * when the factory is not paused and when not too many items are active at once. + */ +export interface Modulator { + push(item: T): void; + + finished(index: number): void; + + length(): number; +} + +type Indexed = { + item: T; + index: number; +}; + +type ModulatorInstanceState = { + todo: Indexed[]; + inflight: Indexed[]; +}; + +class ModulatorInstance implements Modulator { + at: number = 0; + index = 0; + + private state: StateT>; + + private ranker: Ranker>; + private notifier: Notifier, unknown>; + private factory: ModulatorFactory; + + private logger = getLoggerFor(this); + + constructor( + state: StateT>, + ranker: Ranker>, + notifier: Notifier, unknown>, + factory: ModulatorFactory, + ) { + this.state = state; + const readd = [...this.state.item.todo, ...this.state.item.inflight]; + this.state.item.todo.push(...this.state.item.inflight); + while (this.state.item.inflight.pop()) { + // pass + } + while (this.state.item.todo.pop()) { + // pass + } + this.ranker = ranker; + this.notifier = notifier; + this.factory = factory; + for (const item of readd) { + this.push(item.item); + } + } + + length(): number { + return this.state.item.todo.length; + } + + push(item: T) { + const indexed = { item, index: this.index }; + this.state.item.todo.push(indexed); + this.index += 1; + this.ranker.push(indexed); + this.checkReady(); + } + + finished(index: number) { + const removeIdx = this.state.item.inflight.findIndex( + (x) => x.index == index, + ); + if (removeIdx >= 0) { + this.state.item.inflight.splice(removeIdx, 1); + } else { + this.logger.error( + "[finished] Expected to be able to remove inflight item", + ); + } + + this.at -= 1; + this.checkReady(); + } + + checkReady() { + if (this.factory.paused) { + return; + } + + while (this.at < this.factory.concurrent) { + const item = this.ranker.pop(); + if (item) { + // This item is no longer todo + // I'm quite afraid to use filter for this + const removeIdx = this.state.item.todo.findIndex( + (x) => x.index == item.index, + ); + if (removeIdx >= 0) { + this.state.item.todo.splice(removeIdx, 1); + } else { + this.logger.error( + "[checkReady] Expected to be able to remove inflight item", + ); + } + + // This item is now inflight + this.state.item.inflight.push(item); + + this.at += 1; + this.notifier.ready(item, {}); + } else { + break; + } + } + } +} \ No newline at end of file diff --git a/lib/page.ts b/lib/fetcher/page.ts similarity index 87% rename from lib/page.ts rename to lib/fetcher/page.ts index e7f6c3e..9223733 100644 --- a/lib/page.ts +++ b/lib/fetcher/page.ts @@ -1,15 +1,16 @@ -import { Quad, Term } from "@rdfjs/types"; import { DC, LDES, RDF, TREE } from "@treecg/types"; import { CBDShapeExtractor } from "extract-cbd-shape"; -import { State } from "./state"; +import { DataFactory } from "rdf-data-factory"; import { RdfStore } from "rdf-stores"; -import { getObjects, memberFromQuads } from "./utils"; -import { Condition } from "./condition"; -import { RelationCondition } from "./condition/range"; -import { getLoggerFor } from "./utils/logUtil"; -import { DataFactory } from "n3"; +import { RelationCondition } from "../condition"; +import { getObjects, getLoggerFor } from "../utils"; +import { memberFromQuads } from "./memberManager"; -const { namedNode } = DataFactory; +import type { Quad, Term } from "@rdfjs/types"; +import type { State } from "../state"; +import type { Condition } from "../condition"; + +const df = new DataFactory(); export interface Member { id: Term; @@ -57,8 +58,8 @@ export function extractMembers( const members = getObjects(store, stream, TREE.terms.member, null); async function extractMember(member: Term) { - const quads = await extractor.extract(store, member, shapeId, [namedNode(LDES.custom("IngestionMetadata"))]); - const created = getObjects(store, member, DC.terms.custom("created"), namedNode(LDES.custom("IngestionMetadata")))[0]?.value; + const quads = await extractor.extract(store, member, shapeId, [df.namedNode(LDES.custom("IngestionMetadata"))]); + const created = getObjects(store, member, DC.terms.custom("created"), df.namedNode(LDES.custom("IngestionMetadata")))[0]?.value; cb(memberFromQuads(member, quads, timestampPath, isVersionOfPath, created ? new Date(created) : undefined)); } diff --git a/lib/pageFetcher.ts b/lib/fetcher/pageFetcher.ts similarity index 92% rename from lib/pageFetcher.ts rename to lib/fetcher/pageFetcher.ts index 0478c12..6033f17 100644 --- a/lib/pageFetcher.ts +++ b/lib/fetcher/pageFetcher.ts @@ -1,11 +1,15 @@ -import { IDereferenceOptions, RdfDereferencer } from "rdf-dereference"; -import { Notifier } from "./utils"; -import { extractRelations, Relation, Relations } from "./page"; -import { SimpleRelation } from "./relation"; + +import { RdfDereferencer } from "rdf-dereference"; import { RdfStore } from "rdf-stores"; import { DataFactory } from "rdf-data-factory"; -import { Condition } from "./condition"; -import { getLoggerFor } from "./utils/logUtil"; +import { getLoggerFor } from "../utils"; +import { extractRelations } from "./page"; + +import type { IDereferenceOptions } from "rdf-dereference"; +import type { Condition } from "../condition"; +import type { Notifier } from "./modulator"; +import type { SimpleRelation } from "./relation"; +import type { Relation, Relations } from "./page"; const { namedNode } = new DataFactory(); diff --git a/lib/relation.ts b/lib/fetcher/relation.ts similarity index 98% rename from lib/relation.ts rename to lib/fetcher/relation.ts index c484efb..8263ef6 100644 --- a/lib/relation.ts +++ b/lib/fetcher/relation.ts @@ -1,4 +1,4 @@ -import { Value } from "./condition"; +import type { Value } from "../condition"; export type SimpleRelation = { important: boolean; diff --git a/lib/rdfc-processor.ts b/lib/rdfc-processor.ts index 29fc002..7581cf6 100644 --- a/lib/rdfc-processor.ts +++ b/lib/rdfc-processor.ts @@ -1,13 +1,14 @@ -import { getLoggerFor } from "./utils/logUtil"; -import { replicateLDES } from "./client"; -import { enhanced_fetch, processConditionFile } from "./utils"; import { DataFactory } from "rdf-data-factory"; import { SDS } from "@treecg/types"; import { Writer as NWriter } from "n3"; +import { replicateLDES } from "./client"; +import { enhanced_fetch } from "./fetcher" +import { processConditionFile } from "./condition"; +import { getLoggerFor } from "./utils"; import type { Writer } from "@rdfc/js-runner"; import type { Quad_Object } from "@rdfjs/types"; -import type { Ordered } from "./client"; +import type { Ordered } from "./strategy"; const df = new DataFactory(); diff --git a/lib/state/index.ts b/lib/state/index.ts new file mode 100644 index 0000000..6ddf04c --- /dev/null +++ b/lib/state/index.ts @@ -0,0 +1,2 @@ +export * from "./state"; +export * from "./storage"; \ No newline at end of file diff --git a/lib/state.ts b/lib/state/state.ts similarity index 95% rename from lib/state.ts rename to lib/state/state.ts index d35e47b..3194b8b 100644 --- a/lib/state.ts +++ b/lib/state/state.ts @@ -1,10 +1,5 @@ import { storage } from "./storage"; -export interface ClientState { - root: string; // Used to acquire shape - inFlight: string[]; // fragments that are currently being checked -} - export interface State { init(): Promise; diff --git a/lib/storage.ts b/lib/state/storage.ts similarity index 98% rename from lib/storage.ts rename to lib/state/storage.ts index bb3bfa4..cb1cd9b 100644 --- a/lib/storage.ts +++ b/lib/state/storage.ts @@ -1,4 +1,3 @@ -// myLibrary.ts interface Storage { getItem(key: string): string; diff --git a/lib/strategy/index.ts b/lib/strategy/index.ts index c7e37c9..b5b6809 100644 --- a/lib/strategy/index.ts +++ b/lib/strategy/index.ts @@ -1,12 +1,13 @@ -import type { NamedNode } from "@rdfjs/types"; -import { Fragment, Member } from "../page"; -import { FetchedPage } from "../pageFetcher"; -import { RelationChain } from "../relation"; import { TREE } from "@treecg/types"; +import { Fragment, Member, FetchedPage, RelationChain } from "../fetcher"; + +import type { NamedNode } from "@rdfjs/types"; export { UnorderedStrategy } from "./unordered"; export { OrderedStrategy } from "./ordered"; +export type Ordered = "ascending" | "descending" | "none"; + /** * Predicates representing greater than relations */ diff --git a/lib/strategy/ordered.ts b/lib/strategy/ordered.ts index 4a94e6c..ed0a569 100644 --- a/lib/strategy/ordered.ts +++ b/lib/strategy/ordered.ts @@ -1,16 +1,22 @@ import { Heap } from "heap-js"; -import { Manager, MemberEvents } from "../memberManager"; -import { Fragment, Member, Relations } from "../page"; -import { FetchedPage, Fetcher, FetchEvent } from "../pageFetcher"; -import { Modulator, ModulatorFactory, Notifier } from "../utils"; -import { RelationChain, SimpleRelation } from "../relation"; - -import { Ordered } from "../client"; -import { GTRs, LTR, StrategyEvents } from "."; -import { getLoggerFor } from "../utils/logUtil"; -import { Value } from "../condition"; import { TREE } from "@treecg/types"; -import { parseInBetweenRelation } from "../utils/inBetween"; +import { Fetcher, ModulatorFactory, RelationChain, Manager } from "../fetcher"; +import { parseInBetweenRelation, getLoggerFor } from "../utils"; +import { GTRs, LTR } from "."; + +import type { Value } from "../condition"; +import type { + Fragment, + Member, + Relations, + FetchedPage, + FetchEvent, + Modulator, + Notifier, + SimpleRelation, + MemberEvents +} from "../fetcher"; +import type { StrategyEvents, Ordered } from "."; export type StateItem = { rel: RelationChain; diff --git a/lib/strategy/unordered.ts b/lib/strategy/unordered.ts index 4d00124..835a4b9 100644 --- a/lib/strategy/unordered.ts +++ b/lib/strategy/unordered.ts @@ -1,10 +1,16 @@ -import { Manager, MemberEvents } from "../memberManager"; -import { FetchedPage, Fetcher, FetchEvent, Node } from "../pageFetcher"; -import { Modulator, ModulatorFactory, Notifier } from "../utils"; - -import { StrategyEvents } from "."; -import { getLoggerFor } from "../utils/logUtil"; -import { Fragment } from "../page"; +import { Fetcher, ModulatorFactory, Manager } from "../fetcher"; +import { getLoggerFor } from "../utils"; + +import type { + FetchedPage, + FetchEvent, + Node, + Modulator, + Notifier, + Fragment, + MemberEvents +} from "../fetcher"; +import type { StrategyEvents } from "."; export class UnorderedStrategy { private manager: Manager; diff --git a/lib/utils.ts b/lib/utils.ts deleted file mode 100644 index 9b8ca53..0000000 --- a/lib/utils.ts +++ /dev/null @@ -1,713 +0,0 @@ -import { - NamedNode, - Quad, - Quad_Predicate, - Quad_Subject, - Quad_Object, - Stream, - Term, -} from "@rdfjs/types"; -import { BaseQuad } from "n3"; -import { StateFactory, StateT } from "./state"; -import { RdfStore } from "rdf-stores"; -import { DataFactory } from "rdf-data-factory"; -import { RDF, SHACL, TREE, XSD } from "@treecg/types"; -import { Member } from "./page"; -import { LDESInfo } from "./client"; -import { pred } from "rdf-lens"; -import { - AndCondition, - Condition, - empty_condition, - EmptyCondition, - LeafCondition, - parse_condition, -} from "./condition/index"; - -import { getLoggerFor } from "./utils/logUtil"; - -const logger = getLoggerFor("Utils"); - -const df = new DataFactory(); - -export type Notifier = { - [K in keyof Events]: (event: Events[K], state: S) => void; -}; - -export function getSubjects( - store: RdfStore, - predicate: Term | null, - object: Term | null, - graph?: Term | null, -): Quad_Subject[] { - return store.getQuads(null, predicate, object, graph).map((quad) => { - return quad.subject; - }); -} - -export function getObjects( - store: RdfStore, - subject: Term | null, - predicate: Term | null, - graph?: Term | null, -): Quad_Object[] { - return store.getQuads(subject, predicate, null, graph).map((quad) => { - return quad.object; - }); -} - -export function readableToArray(stream: ReadableStream): Promise { - const out: T[] = []; - const reader = stream.getReader(); - return new Promise((res, rej) => { - const next = () => { - reader - .read() - .catch(rej) - .then((obj) => { - if (obj) { - if (obj.done) { - res(out); - } else { - out.push(obj.value); - next(); - } - } else { - res(out); - } - }); - }; - next(); - }); -} - -/** - * Converts a stream to an array, pushing all elements to an array - * Resolving the promise with the 'end' event - */ -export function streamToArray( - stream: Stream, -): Promise { - const out: T[] = []; - return new Promise((res, rej) => { - stream.on("end", () => res(out)); - stream.on("data", (x) => { - out.push(x); - }); - stream.on("error", (ex) => { - logger.error("[streamToArray] Stream to Array failed"); - rej(ex); - }); - }); -} - -/** - * Find the main sh:NodeShape subject of a given Shape Graph. - * We determine this by assuming that the main node shape - * is not referenced by any other shape description. - * If more than one is found an exception is thrown. - */ -export function extractMainNodeShape(store: RdfStore): NamedNode { - const nodeShapes = getSubjects( - store, - RDF.terms.type, - SHACL.terms.NodeShape, - null, - ); - let mainNodeShape = null; - - if (nodeShapes && nodeShapes.length > 0) { - for (const ns of nodeShapes) { - const isNotReferenced = - getSubjects(store, null, ns, null).length === 0; - - if (isNotReferenced) { - if (!mainNodeShape) { - mainNodeShape = ns; - } else { - throw new Error( - "There are multiple main node shapes in a given shape graph. Unrelated shapes must be given as separate shape graphs", - ); - } - } - } - if (mainNodeShape) { - return mainNodeShape; - } else { - throw new Error( - "No main SHACL Node Shapes found in given shape graph", - ); - } - } else { - throw new Error("No SHACL Node Shapes found in given shape graph"); - } -} - -/** - * Generic interface that represents a structure that ranks elements. - * Most common is a Priority Queue (heap like) that pops elements in order. - * An array is also a Ranker, without ordering. - */ -export interface Ranker { - push(item: T): void; - pop(): T | undefined; -} - -export type ModulartorEvents = { - ready: Indexed; -}; - -/** - * Factory that creates Modulator's - * This is a factory to keep track whether or not the Modulator should be paused or not. - */ -export class ModulatorFactory { - concurrent = 10; - paused: boolean = false; - - factory: StateFactory; - children: ModulatorInstance[] = []; - - constructor(stateFactory: StateFactory, concurrent?: number) { - this.factory = stateFactory; - if (concurrent) { - this.concurrent = concurrent; - } - } - - /** - * Note: `T` should be plain javascript objects (because that how state is saved) - */ - create( - name: string, - ranker: Ranker>, - notifier: Notifier, unknown>, - parse?: (item: unknown) => T, - ): Modulator { - const state = this.factory.build>( - name, - JSON.stringify, - JSON.parse, - () => ({ - todo: [], - inflight: [], - }), - ); - - if (parse) { - state.item.todo = state.item.todo.map(({ item, index }) => ({ - index, - item: parse(item), - })); - state.item.inflight = state.item.inflight.map( - ({ item, index }) => ({ - index, - item: parse(item), - }), - ); - } - - const modulator = new ModulatorInstance(state, ranker, notifier, this); - this.children.push(>modulator); - return modulator; - } - - pause() { - this.paused = true; - } - - unpause() { - this.paused = false; - this.children.forEach((x) => x.checkReady()); - } -} - -/** - * Modulator is a structure that only buffers elements and only handles elements - * when the factory is not paused and when not too many items are active at once. - */ -export interface Modulator { - push(item: T): void; - - finished(index: number): void; - - length(): number; -} - -type Indexed = { - item: T; - index: number; -}; - -type ModulatorInstanceState = { - todo: Indexed[]; - inflight: Indexed[]; -}; - -class ModulatorInstance implements Modulator { - at: number = 0; - index = 0; - - private state: StateT>; - - private ranker: Ranker>; - private notifier: Notifier, unknown>; - private factory: ModulatorFactory; - - private logger = getLoggerFor(this); - - constructor( - state: StateT>, - ranker: Ranker>, - notifier: Notifier, unknown>, - factory: ModulatorFactory, - ) { - this.state = state; - const readd = [...this.state.item.todo, ...this.state.item.inflight]; - this.state.item.todo.push(...this.state.item.inflight); - while (this.state.item.inflight.pop()) { - // pass - } - while (this.state.item.todo.pop()) { - // pass - } - this.ranker = ranker; - this.notifier = notifier; - this.factory = factory; - for (const item of readd) { - this.push(item.item); - } - } - - length(): number { - return this.state.item.todo.length; - } - - push(item: T) { - const indexed = { item, index: this.index }; - this.state.item.todo.push(indexed); - this.index += 1; - this.ranker.push(indexed); - this.checkReady(); - } - - finished(index: number) { - const removeIdx = this.state.item.inflight.findIndex( - (x) => x.index == index, - ); - if (removeIdx >= 0) { - this.state.item.inflight.splice(removeIdx, 1); - } else { - this.logger.error( - "[finished] Expected to be able to remove inflight item", - ); - } - - this.at -= 1; - this.checkReady(); - } - - checkReady() { - if (this.factory.paused) { - return; - } - - while (this.at < this.factory.concurrent) { - const item = this.ranker.pop(); - if (item) { - // This item is no longer todo - // I'm quite afraid to use filter for this - const removeIdx = this.state.item.todo.findIndex( - (x) => x.index == item.index, - ); - if (removeIdx >= 0) { - this.state.item.todo.splice(removeIdx, 1); - } else { - this.logger.error( - "[checkReady] Expected to be able to remove inflight item", - ); - } - - // This item is now inflight - this.state.item.inflight.push(item); - - this.at += 1; - this.notifier.ready(item, {}); - } else { - break; - } - } - } -} - -function urlToUrl(input: Parameters[0]): URL { - if (typeof input === "string") { - return new URL(input); - } else if (input instanceof URL) { - return input; - } else if (input instanceof Request) { - return new URL(input.url); - } else { - throw "Not a real url"; - } -} - -export type AuthConfig = { - type: "basic"; - auth: string; - host: string; -}; - -export type RetryConfig = { - codes: number[]; - base: number; - maxRetries: number; -}; - -export type FetchConfig = { - auth?: AuthConfig; - concurrent?: number; - retry?: Partial; - safe?: boolean; -}; - -export function enhanced_fetch( - config: FetchConfig, - start?: typeof fetch, -): typeof fetch { - const start_f = start || fetch; - const safe_f = config.safe - ? ((async (a, b) => { - while (true) { - try { - return await start_f(a, b); - } catch (ex) { - logger.debug( - `This should not happen, it will not happen this is safe. ${JSON.stringify( - ex, - )}`, - ); - } - } - }) as typeof fetch) - : start_f; - - const fetch_f = config.auth - ? handle_basic_auth(safe_f, config.auth) - : safe_f; - - return limit_fetch_per_domain( - retry_fetch(fetch_f, config.retry || {}), - config.concurrent, - ); -} - -export function limit_fetch_per_domain( - fetch_f: typeof fetch, - concurrent: number = 10, -): typeof fetch { - const domain_dict: { [domain: string]: Array<(value: void) => void> } = {}; - - const out: typeof fetch = async (input, init) => { - const url: URL = urlToUrl(input); - const domain = url.origin; - - if (!(domain in domain_dict)) { - domain_dict[domain] = []; - } - - const requests = domain_dict[domain]; - await new Promise((res) => { - logger.debug( - `[limit] ${domain} capacity ${requests.length}/${concurrent}`, - ); - if (requests.length < concurrent) { - requests.push(res); - res({}); - } else { - requests.push(res); - } - }); - const resp = await fetch_f(input, init); - - requests.shift(); - for (let i = 0; i < concurrent; i++) { - if (requests[i]) { - requests[i](); - } - } - - return resp; - }; - - return out; -} - -export function handle_basic_auth( - fetch_f: typeof fetch, - config: AuthConfig, -): typeof fetch { - let authRequired = false; - - const basicAuthValue = `Basic ${Buffer.from(config.auth).toString("base64")}`; - const setHeader = (init?: RequestInit): RequestInit => { - const reqInit = init || {}; - const headers = new Headers(reqInit.headers); - headers.set("Authorization", basicAuthValue); - reqInit.headers = headers; - return reqInit; - }; - - const auth_f: typeof fetch = async (input, init) => { - const url: URL = urlToUrl(input); - if (authRequired && url.host === config.host) { - return await fetch_f(input, setHeader(init)); - } - - const resp = await fetch_f(input, init); - if (resp.status === 401) { - logger.debug("[auth] Unauthorized, adding basic auth"); - if (url.host === config.host) { - authRequired = true; - return await fetch_f(input, setHeader(init)); - } - } - - return resp; - }; - - return auth_f; -} - -export function retry_fetch( - fetch_f: typeof fetch, - partial_config: Partial, -): typeof fetch { - const config: RetryConfig = Object.assign( - { - codes: [408, 425, 429, 500, 502, 503, 504], - base: 500, - maxRetries: 5, - }, - partial_config, - ); - - const retry: typeof fetch = async (input, init) => { - let tryCount = 0; - let retryTime = config.maxRetries; - while (config.maxRetries == 0 || tryCount < config.maxRetries) { - const resp = await fetch_f(input, init); - if (!resp.ok) { - if (config.codes.some((x) => x == resp.status)) { - logger.debug( - `[retry_fetch] Retry ${input} ${tryCount}/${config.maxRetries}`, - ); - // Wait 500ms, 1 second, 2 seconds, 4 seconds, 8 seconds, fail - tryCount += 1; - await new Promise((res) => setTimeout(res, retryTime)); - retryTime *= 2; - continue; - } - return resp; - } - return resp; - } - - throw `Max retries exceeded (${config.maxRetries})`; - }; - - return retry; -} - -export function memberFromQuads( - member: Term, - quads: Quad[], - timestampPath: Term | undefined, - isVersionOfPath: Term | undefined, - created?: Date, -): Member { - // Get timestamp - let timestamp: string | Date | undefined; - if (timestampPath) { - const ts = quads.find( - (x) => - x.subject.equals(member) && x.predicate.equals(timestampPath), - )?.object.value; - if (ts) { - try { - timestamp = new Date(ts); - } catch (ex: unknown) { - timestamp = ts; - } - } - } - - // Get isVersionof - let isVersionOf: string | undefined; - if (isVersionOfPath) { - isVersionOf = quads.find( - (x) => - x.subject.equals(member) && x.predicate.equals(isVersionOfPath), - )?.object.value; - } - - // Get type - const type: Term | undefined = quads.find( - (x) => x.subject.equals(member) && x.predicate.value === RDF.type, - )?.object; - return { quads, id: member, isVersionOf, timestamp, type, created }; -} - -/** - * Version materialization function that sets the declared ldes:versionOfPath property value - * as the member's subject IRI - */ -export function maybeVersionMaterialize( - member: Member, - materialize: boolean, - ldesInfo: LDESInfo, -): Member { - if (materialize && ldesInfo.versionOfPath) { - // Create RDF store with member quads - const memberStore = RdfStore.createDefault(); - member.quads.forEach((q) => memberStore.addQuad(q)); - // Get materialized subject IRI - const newSubject = getObjects( - memberStore, - member.id, - ldesInfo.versionOfPath, - )[0]; - if (newSubject) { - // Remove version property - memberStore.removeQuad( - df.quad( - member.id, - ldesInfo.versionOfPath, - newSubject, - ), - ); - // Updated all quads with materialized subject - for (const q of memberStore.getQuads(member.id)) { - //q.subject = newSubject; - const newQ = df.quad( - newSubject, - q.predicate, - q.object, - q.graph, - ); - memberStore.removeQuad(q); - memberStore.addQuad(newQ); - } - // Update member object - member.id = newSubject; - member.quads = memberStore.getQuads(); - } else { - console.error( - `No version property found in Member (${member.id}) as specified by ldes:isVersionOfPath ${ldesInfo.versionOfPath}`, - ); - } - } - - return member; -} - -export async function processConditionFile( - conditionFile?: string, -): Promise { - let condition: Condition = empty_condition(); - - /* eslint-disable @typescript-eslint/no-require-imports */ - const fs = - typeof require === "undefined" - ? await import("fs/promises") - : require("fs/promises"); - - if (conditionFile) { - try { - condition = parse_condition( - await fs.readFile(conditionFile, { encoding: "utf8" }), - conditionFile, - ); - } catch (ex) { - console.error(`Failed to read condition file: ${conditionFile}`); - throw ex; - } - } - - return condition; -} - -/** - * Function that handles any given condition, together with the "before" and "after" options, - * and builds the corresponding unified Condition. - */ -export function handleConditions( - condition: Condition, - defaultTimezone: string, - before?: Date, - after?: Date, - timestampPath?: Term, -): Condition { - // Check if before and after conditions are defined and build corresponding Condition object - let handledCondition: Condition = empty_condition(); - const toDateLiteral = (date: Date) => { - return df.literal(date.toISOString(), XSD.terms.dateTime); - }; - - if (before) { - if (!timestampPath) { - throw "Cannot apply 'before' or 'after' filters since the target LDES does not define a ldes:timestampPath predicate"; - } - - const predLens = pred(timestampPath); - const beforeCond = new LeafCondition({ - relationType: TREE.terms.LessThanRelation, - value: toDateLiteral(before), - compareType: "date", - path: predLens, - pathQuads: { entry: timestampPath, quads: [] }, - defaultTimezone, - }); - if (after) { - const afterCond = new LeafCondition({ - relationType: TREE.terms.GreaterThanRelation, - value: toDateLiteral(after), - compareType: "date", - path: predLens, - pathQuads: { entry: timestampPath, quads: [] }, - defaultTimezone, - }); - // Got bi-condition with before & after filters - handledCondition = new AndCondition({ - items: [beforeCond, afterCond], - }); - } else { - // Got condition with before filter only - handledCondition = beforeCond; - } - } else if (after) { - if (!timestampPath) { - throw "Cannot apply 'before' or 'after' filters since the target LDES does not define a ldes:timestampPath predicate"; - } - - const predLens = pred(timestampPath); - // Got condition with after filter only - handledCondition = new LeafCondition({ - relationType: TREE.terms.GreaterThanRelation, - value: toDateLiteral(after), - compareType: "date", - path: predLens, - pathQuads: { entry: timestampPath, quads: [] }, - defaultTimezone, - }); - } - - // See if condition file was defined too - if (!(condition instanceof EmptyCondition)) { - return new AndCondition({ - items: [condition, handledCondition], - }); - } else { - return handledCondition; - } -} diff --git a/lib/exitHandler.ts b/lib/utils/exitHandler.ts similarity index 97% rename from lib/exitHandler.ts rename to lib/utils/exitHandler.ts index b83907f..4e229b6 100644 --- a/lib/exitHandler.ts +++ b/lib/utils/exitHandler.ts @@ -1,4 +1,4 @@ -import { getLoggerFor } from "./utils/logUtil"; +import { getLoggerFor } from "./logUtil"; const logger = getLoggerFor("ExitHandler"); diff --git a/lib/utils/general.ts b/lib/utils/general.ts new file mode 100644 index 0000000..e3223f6 --- /dev/null +++ b/lib/utils/general.ts @@ -0,0 +1,138 @@ +import { BaseQuad } from "n3"; +import { RdfStore } from "rdf-stores"; +import { DataFactory } from "rdf-data-factory"; +import { RDF, SHACL } from "@treecg/types"; +import { getLoggerFor } from "./logUtil"; + +import type { + NamedNode, + Quad_Subject, + Quad_Object, + Stream, + Term, +} from "@rdfjs/types"; + +const logger = getLoggerFor("Utils"); + +const df = new DataFactory(); + +export function getSubjects( + store: RdfStore, + predicate: Term | null, + object: Term | null, + graph?: Term | null, +): Quad_Subject[] { + return store.getQuads(null, predicate, object, graph).map((quad) => { + return quad.subject; + }); +} + +export function getObjects( + store: RdfStore, + subject: Term | null, + predicate: Term | null, + graph?: Term | null, +): Quad_Object[] { + return store.getQuads(subject, predicate, null, graph).map((quad) => { + return quad.object; + }); +} + +export function readableToArray(stream: ReadableStream): Promise { + const out: T[] = []; + const reader = stream.getReader(); + return new Promise((res, rej) => { + const next = () => { + reader + .read() + .catch(rej) + .then((obj) => { + if (obj) { + if (obj.done) { + res(out); + } else { + out.push(obj.value); + next(); + } + } else { + res(out); + } + }); + }; + next(); + }); +} + +/** + * Converts a stream to an array, pushing all elements to an array + * Resolving the promise with the 'end' event + */ +export function streamToArray( + stream: Stream, +): Promise { + const out: T[] = []; + return new Promise((res, rej) => { + stream.on("end", () => res(out)); + stream.on("data", (x) => { + out.push(x); + }); + stream.on("error", (ex) => { + logger.error("[streamToArray] Stream to Array failed"); + rej(ex); + }); + }); +} + +/** + * Find the main sh:NodeShape subject of a given Shape Graph. + * We determine this by assuming that the main node shape + * is not referenced by any other shape description. + * If more than one is found an exception is thrown. + */ +export function extractMainNodeShape(store: RdfStore): NamedNode { + const nodeShapes = getSubjects( + store, + RDF.terms.type, + SHACL.terms.NodeShape, + null, + ); + let mainNodeShape = null; + + if (nodeShapes && nodeShapes.length > 0) { + for (const ns of nodeShapes) { + const isNotReferenced = + getSubjects(store, null, ns, null).length === 0; + + if (isNotReferenced) { + if (!mainNodeShape) { + mainNodeShape = ns; + } else { + throw new Error( + "There are multiple main node shapes in a given shape graph. Unrelated shapes must be given as separate shape graphs", + ); + } + } + } + if (mainNodeShape) { + return mainNodeShape; + } else { + throw new Error( + "No main SHACL Node Shapes found in given shape graph", + ); + } + } else { + throw new Error("No SHACL Node Shapes found in given shape graph"); + } +} + +export function urlToUrl(input: Parameters[0]): URL { + if (typeof input === "string") { + return new URL(input); + } else if (input instanceof URL) { + return input; + } else if (input instanceof Request) { + return new URL(input.url); + } else { + throw "Not a real url"; + } +} \ No newline at end of file diff --git a/lib/utils/index.ts b/lib/utils/index.ts new file mode 100644 index 0000000..b54f737 --- /dev/null +++ b/lib/utils/index.ts @@ -0,0 +1,4 @@ +export * from "./exitHandler"; +export * from "./logUtil"; +export * from "./inBetween"; +export * from "./general"; diff --git a/tests/conditions.test.ts b/tests/conditions.test.ts index 4f99358..b7c2a4e 100644 --- a/tests/conditions.test.ts +++ b/tests/conditions.test.ts @@ -1,6 +1,6 @@ import { describe, expect, it } from "vitest"; -import { Range } from "../lib/condition"; import { TREE } from "@treecg/types"; +import { Range } from "../lib/condition"; const lessers = [ new Range(5, TREE.LessThanOrEqualToRelation, "AoE"), diff --git a/tests/helper.ts b/tests/helper.ts index aac98b3..fa72b57 100644 --- a/tests/helper.ts +++ b/tests/helper.ts @@ -1,6 +1,6 @@ import { Quad } from "@rdfjs/types"; import { Parser, Writer } from "n3"; -import { Member } from "../lib/page"; +import { Member } from "../lib/fetcher/page"; import { TREE } from "@treecg/types"; export type FragmentId = number; diff --git a/tests/relation.test.ts b/tests/relation.test.ts index 280a924..5eaac33 100644 --- a/tests/relation.test.ts +++ b/tests/relation.test.ts @@ -1,6 +1,8 @@ import { describe, expect, test } from "vitest"; -import { RelationChain, SimpleRelation } from "../lib/relation"; import Heap from "heap-js"; +import { RelationChain } from "../lib/fetcher"; + +import type { SimpleRelation } from "../lib/fetcher"; // This probably should not be here type Comparable = number | string | Date; diff --git a/tests/unordered.test.ts b/tests/unordered.test.ts index 6f0924e..909e656 100644 --- a/tests/unordered.test.ts +++ b/tests/unordered.test.ts @@ -1,11 +1,12 @@ /* eslint-disable @typescript-eslint/no-explicit-any */ import { afterEach, beforeEach, describe, expect, test } from "vitest"; -import { read, Tree } from "./helper"; - -import { MaxCountCondition, replicateLDES, retry_fetch } from "../lib/client"; import { Parser } from "n3"; import { TREE } from "@treecg/types"; import { rmSync } from "fs"; +import { read, Tree } from "./helper"; +import { MaxCountCondition } from "../lib/condition"; +import { retry_fetch } from "../lib/fetcher"; +import { replicateLDES } from "../lib/client"; const oldFetch = global.fetch; beforeEach(() => {