From e6d5fc54e96dc9fd2f1bb16d82af75db04405b6d Mon Sep 17 00:00:00 2001 From: ash Date: Thu, 7 Nov 2024 11:02:44 +0000 Subject: [PATCH] feat: query result create and archive (#2) Adds tooling to create and archive query results. --- package.json | 4 + src/api.ts | 17 ++-- src/index.js | 73 ++------------- src/query-result.js | 185 ++++++++++++++++++++++++++++++++++++++ test/query-result.spec.js | 32 +++++++ 5 files changed, 239 insertions(+), 72 deletions(-) create mode 100644 src/query-result.js create mode 100644 test/query-result.spec.js diff --git a/package.json b/package.json index c1773da..5ed4c4f 100644 --- a/package.json +++ b/package.json @@ -22,6 +22,10 @@ "./errors": { "import": "./src/errors.js", "types": "./dist/src/errors.d.ts" + }, + "./query-result": { + "import": "./src/query-result.js", + "types": "./dist/src/query-result.d.ts" } }, "dependencies": { diff --git a/src/api.ts b/src/api.ts index b148944..55576b2 100644 --- a/src/api.ts +++ b/src/api.ts @@ -1,10 +1,14 @@ import { MultihashDigest, Link } from 'multiformats' -import { Delegation, Failure, Result, Principal, IPLDView } from '@ucanto/interface' -import { DecodeFailure, ShardedDAGIndex, UnknownFormat } from '@storacha/blob-index/types' +import { Delegation, Failure, Result, Principal, IPLDView, IPLDBlock } from '@ucanto/interface' +import { DecodeFailure, ShardedDAGIndex, ShardedDAGIndexView, UnknownFormat } from '@storacha/blob-index/types' export { MultihashDigest, Link } -export { Delegation, Failure, Result } -export { DecodeFailure, ShardedDAGIndex, UnknownFormat } +export { Delegation, Failure, Result, Principal, IPLDView, IPLDBlock } +export { DecodeFailure, ShardedDAGIndex, ShardedDAGIndexView, UnknownFormat } + +export interface IndexingServiceClient { + queryClaims (q: Query): Promise> +} /** * Match narrows parameters for locating providers/claims for a set of multihashes. @@ -21,9 +25,12 @@ export interface Query { match?: Match } -export interface QueryOk extends IPLDView { +export interface QueryOk extends QueryResult {} + +export interface QueryResult extends IPLDView { claims: Map indexes: Map + archive (): Promise> } export type QueryError = diff --git a/src/index.js b/src/index.js index b1d0298..b1909ba 100644 --- a/src/index.js +++ b/src/index.js @@ -1,32 +1,14 @@ -import * as CBOR from '@ipld/dag-cbor' -import { CID } from 'multiformats/cid' import { base58btc } from 'multiformats/bases/base58' -import { z } from 'zod' -import { ok, error } from '@ucanto/core' -import * as CAR from '@ucanto/core/car' -import * as Delegation from '@ucanto/core/delegation' -import * as ShardedDAGIndex from '@storacha/blob-index/sharded-dag-index' -import { InvalidQueryError, NetworkError, UnknownFormatError, DecodeError } from './errors.js' +import { error } from '@ucanto/core' +import * as QueryResult from './query-result.js' +import { InvalidQueryError, NetworkError } from './errors.js' -/** @import { Result, Link, Query, QueryOk, QueryError } from './api.js' */ +/** @import { IndexingServiceClient, Result, Query, QueryOk, QueryError } from './api.js' */ const SERVICE_URL = 'https://indexing.storacha.network' const CLAIMS_PATH = '/claims' -const QueryResult = z - .object({ - 'index/query/result@0.1': z.object({ - claims: z - .array( - z.instanceof(CID).transform(cid => /** @type {Link} */ (cid)) - ), - indexes: z - .record(z.string(), z.instanceof(CID)) - .transform((record) => Object.values(record)), - }), - }) - .transform((object) => object['index/query/result@0.1']) - +/** @implements {IndexingServiceClient} */ export class Client { #fetch #serviceURL @@ -44,8 +26,6 @@ export class Client { /** * @param {Query} query - * @param {object} [options] - * @param {typeof fetch} options.fetch * @returns {Promise>} */ async queryClaims({ hashes = [], match = { subject: [] } }) { @@ -69,47 +49,6 @@ export class Client { return error(new NetworkError('missing response body')) } - const { roots, blocks } = CAR.decode(new Uint8Array(await response.arrayBuffer())) - if (roots.length !== 1) { - return error(new DecodeError('expected exactly one root')) - } - - let parsed - try { - parsed = QueryResult.parse(await CBOR.decode(roots[0].bytes)) - } catch (/** @type {any} */ err) { - return error(new UnknownFormatError(`parsing root block: ${err.message}`)) - } - - const claims = new Map() - for (const root of parsed.claims) { - let claim - try { - claim = Delegation.view({ root, blocks }) - } catch (/** @type {any} */ err) { - return error(new DecodeError(`decoding claim: ${root}: ${err.message}`)) - } - claims.set(root.toString(), claim) - } - - const indexes = new Map() - for (const link of parsed.indexes) { - const block = blocks.get(link.toString()) - if (!block) { - return error(new DecodeError(`missing index: ${link}`)) - } - const { ok: index, error: err } = ShardedDAGIndex.extract(block.bytes) - if (!index) { - return error(new DecodeError(`extracting index: ${link}: ${err.message}`)) - } - indexes.set(link.toString(), index) - } - - return ok({ - root: roots[0], - iterateIPLDBlocks: () => blocks.values(), - claims, - indexes - }) + return QueryResult.extract(new Uint8Array(await response.arrayBuffer())) } } diff --git a/src/query-result.js b/src/query-result.js new file mode 100644 index 0000000..e461f13 --- /dev/null +++ b/src/query-result.js @@ -0,0 +1,185 @@ +/** @import * as API from './api.js' */ +import * as CBOR from '@ipld/dag-cbor' +import { CID } from 'multiformats/cid' +import { create as createLink } from 'multiformats/link' +import { z } from 'zod' +import { ok, error } from '@ucanto/core' +import * as CAR from '@ucanto/core/car' +import * as Delegation from '@ucanto/core/delegation' +import * as ShardedDAGIndex from '@storacha/blob-index/sharded-dag-index' +import { UnknownFormatError, DecodeError } from './errors.js' +import { sha256 } from 'multiformats/hashes/sha2' + +const QueryResultSchema = z + .object({ + 'index/query/result@0.1': z.object({ + claims: z + .array( + z.instanceof(CID).transform(cid => /** @type {API.Link} */ (cid)) + ), + indexes: z + .record(z.string(), z.instanceof(CID)) + .transform((record) => Object.values(record)), + }), + }) + .transform((object) => object['index/query/result@0.1']) + +/** + * @param {{ root: API.Link, blocks: Map }} arg + * @returns {Promise>} + */ +export const create = async ({ root, blocks }) => { + const rootBlock = blocks.get(root.toString()) + if (!rootBlock) { + return error(new DecodeError(`missing root block: ${root}`)) + } + return view({ root: rootBlock, blocks }) +} + +/** + * @param {{ root: API.IPLDBlock, blocks: Map }} arg + * @returns {Promise>} + */ +export const view = async ({ root, blocks }) => { + let parsed + try { + parsed = QueryResultSchema.parse(CBOR.decode(root.bytes)) + } catch (/** @type {any} */ err) { + return error(new UnknownFormatError(`parsing root block: ${err.message}`)) + } + + const claims = new Map() + for (const root of parsed.claims) { + let claim + try { + claim = Delegation.view({ root, blocks }) + } catch (/** @type {any} */ err) { + return error(new DecodeError(`decoding claim: ${root}: ${err.message}`)) + } + claims.set(root.toString(), claim) + } + + const indexes = new Map() + for (const link of parsed.indexes) { + const block = blocks.get(link.toString()) + if (!block) { + return error(new DecodeError(`missing index: ${link}`)) + } + const { ok: index, error: err } = ShardedDAGIndex.extract(block.bytes) + if (!index) { + return error(new DecodeError(`extracting index: ${link}: ${err.message}`)) + } + indexes.set(link.toString(), index) + } + + return ok(new QueryResult({ root, blocks, data: { claims, indexes } })) +} + +/** + * @typedef {string} ContextID + * @param {{ claims: API.Delegation[], indexes: Map }} param + */ +export const from = async ({ claims, indexes }) => { + const blocks = new Map() + const rootData = { + 'index/query/result@0.1': { + claims: /** @type {API.Link[]} **/ ([]), + indexes: /** @type {Record} */ ({}) + } + } + const data = { claims: new Map(), indexes: new Map() } + + for (const claim of claims) { + rootData['index/query/result@0.1'].claims.push(claim.link()) + for (const block of claim.iterateIPLDBlocks()) { + blocks.set(block.cid.toString(), block) + } + data.claims.set(claim.link().toString(), claim) + } + + for (const [contextID, index] of indexes.entries()) { + const result = await index.archive() + if (!result.ok) { + return result + } + const digest = await sha256.digest(result.ok) + const link = createLink(0x0202, digest) + rootData['index/query/result@0.1'].indexes[contextID] = link + blocks.set(link.toString(), { cid: link, bytes: result.ok }) + data.indexes.set(link.toString(), index) + } + + const rootBytes = CBOR.encode(rootData) + const rootDigest = await sha256.digest(rootBytes) + const rootLink = createLink(CBOR.code, rootDigest) + const root = { cid: rootLink, bytes: rootBytes } + blocks.set(rootLink.toString(), root) + + return ok(new QueryResult({ root, blocks, data })) +} + +class QueryResult { + #root + #blocks + #data + + /** + * @param {{ + * root: API.IPLDBlock + * blocks: Map + * data: { + * claims: Map + * indexes: Map + * } + * }} param + */ + constructor ({ root, blocks, data }) { + this.#root = root + this.#blocks = blocks + this.#data = data + } + + get root () { + return this.#root + } + + iterateIPLDBlocks () { + return this.#blocks.values() + } + + get claims () { + return this.#data.claims + } + + get indexes () { + return this.#data.indexes + } + + archive () { + return archive(this) + } +} + +/** + * @param {API.QueryResult} result + * @returns {Promise>} + */ +export const archive = async (result) => { + const blocks = new Map() + for (const block of result.iterateIPLDBlocks()) { + blocks.set(block.cid.toString(), block) + } + return ok(CAR.encode({ roots: [result.root], blocks })) +} + +/** + * @param {Uint8Array} bytes + * @returns {Promise>} + */ +export const extract = async (bytes) => { + const { roots, blocks } = CAR.decode(bytes) + if (roots.length !== 1) { + return error(new DecodeError('expected exactly one root')) + } + return view({ root: roots[0], blocks }) +} diff --git a/test/query-result.spec.js b/test/query-result.spec.js new file mode 100644 index 0000000..e36d620 --- /dev/null +++ b/test/query-result.spec.js @@ -0,0 +1,32 @@ +import * as fs from 'node:fs' +import * as path from 'node:path' +import { describe, it } from 'mocha' +import { assert } from 'chai' +import * as QueryResult from '../src/query-result.js' + +describe('query result', () => { + it('round trip', async () => { + const digestString = 'zQmRm3SMS4EbiKYy7VeV3zqXqzyr76mq9b2zg3Tij3VhKUG' + const fixturePath = path.join(import.meta.dirname, 'fixtures', `${digestString}.queryresult.car`) + const carBytes = await fs.promises.readFile(fixturePath) + + const extract0 = await QueryResult.extract(carBytes) + assert(extract0.ok) + assert(!extract0.error) + + assert(extract0.ok.claims.size > 0) + assert(extract0.ok.indexes.size > 0) + + const archive = await extract0.ok.archive() + assert(archive.ok) + assert(!archive.error) + + const extract1 = await QueryResult.extract(archive.ok) + assert(extract1.ok) + assert(!extract1.error) + + assert.equal(extract0.ok.root.toString(), extract1.ok.root.toString()) + assert.equal(extract0.ok.claims.size, extract1.ok.claims.size) + assert.equal(extract0.ok.indexes.size, extract1.ok.indexes.size) + }) +})