diff --git a/README.md b/README.md index f6b404ac0..c3d1d33ba 100644 --- a/README.md +++ b/README.md @@ -147,7 +147,8 @@ Your node is now running. To start additional nodes, repeat these steps in a new - [API Endpoints](docs/API.md) - [Environmental Variables](docs/env.md) - [Database Guide](docs/database.md) -- [Storage Types](docs/Storage.md) +- [Asset Storage Types](docs/Storage.md) +- [Persistent storage for c2d jobs](docs/persistentStorage.md) - [Testing Guide](docs/testing.md) - [Network Configuration](docs/networking.md) - [Logging & accessing logs](docs/networking.md) diff --git a/docs/API.md b/docs/API.md index b141a61d9..b1b08174c 100644 --- a/docs/API.md +++ b/docs/API.md @@ -1586,3 +1586,154 @@ returns job result #### Response File content + +--- + +## Persistent Storage + +### `HTTP` POST /api/services/persistentStorage/buckets + +#### Description + +Create a new persistent storage bucket. Bucket ownership is set to the request `consumerAddress`. + +#### Request Headers + +| name | type | required | description | +| --------------- | ------ | -------- | ----------- | +| Authorization | string | | auth token (optional; depends on node auth configuration) | + +#### Request Body + +```json +{ + "consumerAddress": "0x...", + "signature": "0x...", + "nonce": "123", + "accessLists": [] +} +``` + +#### Response (200) + +```json +{ + "bucketId": "uuid", + "owner": "0x...", + "accessList": [] +} +``` + +--- + +### `HTTP` GET /api/services/persistentStorage/buckets + +#### Description + +List buckets for a given `owner`. Results are filtered by bucket access lists for the calling consumer. + +#### Query Parameters + +| name | type | required | description | +| --------------- | ------ | -------- | ----------- | +| consumerAddress | string | v | consumer address | +| signature | string | v | signed message (consumerAddress + nonce + command) | +| nonce | string | v | request nonce | +| chainId | number | v | chain id (used by auth/signature checks) | +| owner | string | v | bucket owner to filter by | + +#### Response (200) + +```json +[ + { + "bucketId": "uuid", + "owner": "0x...", + "createdAt": 1710000000, + "accessLists": [] + } +] +``` + +--- + +### `HTTP` GET /api/services/persistentStorage/buckets/:bucketId/files + +#### Description + +List files in a bucket. + +#### Query Parameters + +| name | type | required | description | +| --------------- | ------ | -------- | ----------- | +| consumerAddress | string | v | consumer address | +| signature | string | v | signed message (consumerAddress + nonce + command) | +| nonce | string | v | request nonce | + +#### Response (200) + +```json +[ + { + "bucketId": "uuid", + "name": "hello.txt", + "size": 123, + "lastModified": 1710000000 + } +] +``` + +--- + +### `HTTP` POST /api/services/persistentStorage/buckets/:bucketId/files/:fileName + +#### Description + +Upload a file to a bucket. The request body is treated as raw bytes. + +#### Query Parameters + +| name | type | required | description | +| --------------- | ------ | -------- | ----------- | +| consumerAddress | string | v | consumer address | +| signature | string | v | signed message (consumerAddress + nonce + command) | +| nonce | string | v | request nonce | + +#### Request Body + +Raw bytes (any content-type). + +#### Response (200) + +```json +{ + "bucketId": "uuid", + "name": "hello.txt", + "size": 123, + "lastModified": 1710000000 +} +``` + +--- + +### `HTTP` DELETE /api/services/persistentStorage/buckets/:bucketId/files/:fileName + +#### Description + +Delete a file from a bucket. + +#### Query Parameters + +| name | type | required | description | +| --------------- | ------ | -------- | ----------- | +| consumerAddress | string | v | consumer address | +| signature | string | v | signed message (consumerAddress + nonce + command) | +| nonce | string | v | request nonce | +| chainId | number | v | chain id (used by auth/signature checks) | + +#### Response (200) + +```json +{ "success": true } +``` diff --git a/docs/persistentStorage.md b/docs/persistentStorage.md new file mode 100644 index 000000000..db1396c6b --- /dev/null +++ b/docs/persistentStorage.md @@ -0,0 +1,172 @@ +# Persistent Storage + +This document describes Ocean Node **Persistent Storage** at a high level: what it is, how it is structured, how access control works, and how to use it via **P2P commands** and **HTTP endpoints**. + +--- + +## What it is + +Persistent Storage is a simple bucket + file store intended for **long-lived artifacts** that Ocean Node needs to keep across requests (and potentially across restarts), and to reference later (e.g. as file objects for compute). + +Key primitives: +- **Bucket**: a logical container for files. +- **File**: binary content stored inside a bucket. +- **Bucket registry**: a local SQLite table that stores bucket metadata (owner, access lists, createdAt). + +--- + +## Architecture (high level) + +### Components + +- **Handlers (protocol layer)**: `src/components/core/handler/persistentStorage.ts` + - Implements protocol commands such as create bucket, list files, upload, delete, and get buckets. + - Validates auth (token or signature) and applies high-level authorization checks. + +- **Persistent storage backends (storage layer)**: `src/components/persistentStorage/*` + - `PersistentStorageFactory`: shared functionality (SQLite bucket registry, access list checks). + - `PersistentStorageLocalFS`: local filesystem backend. + - `PersistentStorageS3`: stub for future S3-compatible backend. + +- **HTTP routes (HTTP interface)**: `src/components/httpRoutes/persistentStorage.ts` + - Exposes REST-ish endpoints under `/api/services/persistentStorage/...` that call the same handlers. + +### Data storage + +Persistent Storage uses two stores: + +1) **Bucket registry (SQLite)** +- File: `databases/persistentStorage.sqlite` +- Table: `persistent_storage_buckets` +- Columns: + - `bucketId` (primary key) + - `owner` (address, stored as a string) + - `accessListJson` (JSON-encoded access list array) + - `createdAt` (unix timestamp) + +2) **Backend data** +- `localfs`: writes file bytes to the configured folder under `buckets//`. +- `s3`: not implemented yet. + +--- + +## Ownership and access control + +### Ownership + +Every bucket has a single **owner** address, stored in the bucket registry. + +- When a bucket is created, the node sets: + - `owner = consumerAddress` (normalized via `ethers.getAddress`) + +### Bucket access list + +Each bucket stores an **AccessList[]** (per-chain list(s) of access list contract addresses): + +```ts +export interface AccessList { + [chainId: string]: string[] +} +``` + +This access list is used to decide whether a given `consumerAddress` is allowed to interact with a bucket. + +### Where checks happen + +Access checks happen at two levels: + +1) **Backend enforcement** (required) +- Backend operations `listFiles`, `uploadFile`, `deleteFile`, and `getFileObject` all require `consumerAddress`. +- The base class helper `assertConsumerAllowedForBucket(consumerAddress, bucketId)` loads the bucket ACL and throws `PersistentStorageAccessDeniedError` if the consumer is not allowed. + +2) **Handler enforcement** (command-specific) +- `createBucket`: additionally checks the node-level allow list `config.persistentStorage.accessLists` (who can create buckets at all). +- `getBuckets`: queries registry rows filtered by `owner` and then: + - if `consumerAddress === owner`: returns all buckets for that owner + - else: filters buckets by the bucket ACL + +### Error behavior + +- Backends throw `PersistentStorageAccessDeniedError` when forbidden. +- Handlers translate that into **HTTP 403** / `status.httpStatus = 403`. + +--- + +## Features + +### Supported today + +- **Create bucket** + - Creates a bucket id (UUID), persists it in SQLite with `owner` and `accessListJson`, and creates a local directory (localfs). + +- **List buckets (by owner)** + - Returns buckets from the registry filtered by `owner` (mandatory arg). + - Applies ACL filtering for non-owners. + +- **Upload file** + - Writes a stream to the backend. + - Enforces bucket ACL. + +- **List files** + - Returns file metadata (`name`, `size`, `lastModified`) for a bucket. + - Enforces bucket ACL. + +- **Delete file** + - Deletes the named file from the bucket. + - Enforces bucket ACL. + +### Not implemented yet + +- **S3 backend** + - `PersistentStorageS3` exists as a placeholder and currently throws “not implemented”. + +--- + +## Configuration + +Persistent storage is controlled by `persistentStorage` in node config. + +Key fields: +- `enabled`: boolean +- `type`: `"localfs"` or `"s3"` +- `accessLists`: AccessList[] — node-level allow list to create buckets +- `options`: + - localfs: `{ "folder": "/path/to/storage" }` + - s3: `{ endpoint, objectKey, accessKeyId, secretAccessKey, ... }` (future) + +--- + +## Usage + +### P2P commands + +All persistent storage operations are implemented as protocol commands in the handler: +- `persistentStorageCreateBucket` +- `persistentStorageGetBuckets` +- `persistentStorageListFiles` +- `persistentStorageUploadFile` +- `persistentStorageDeleteFile` + +Each command requires authentication (token or signature) based on Ocean Node’s auth configuration. + +### HTTP endpoints + +HTTP routes are available under `/api/services/persistentStorage/...` and call the same handlers. See `docs/API.md` for the full parameter lists and examples. + +At a glance: +- `POST /api/services/persistentStorage/buckets` +- `GET /api/services/persistentStorage/buckets` +- `GET /api/services/persistentStorage/buckets/:bucketId/files` +- `POST /api/services/persistentStorage/buckets/:bucketId/files/:fileName` +- `DELETE /api/services/persistentStorage/buckets/:bucketId/files/:fileName` + +Upload uses the raw request body as bytes and forwards it to the handler as a stream. + +--- + +## Limitations and notes + +- The bucket registry is local to the node (SQLite file). If you run multiple nodes, each node’s registry is independent unless you externalize/replicate it. +- `listBuckets(owner)` requires `owner` and only returns buckets that were created with that owner recorded. +- Filenames in `localfs` are constrained (no path separators) to avoid path traversal. + diff --git a/src/@types/AccessList.ts b/src/@types/AccessList.ts new file mode 100644 index 000000000..242b991d1 --- /dev/null +++ b/src/@types/AccessList.ts @@ -0,0 +1,6 @@ +/** + * Mapping of `chainId` -> list of smart contract addresses on that chain. + */ +export interface AccessList { + [chainId: string]: string[] +} diff --git a/src/@types/C2D/C2D.ts b/src/@types/C2D/C2D.ts index feb2f64b1..5db7afbd6 100644 --- a/src/@types/C2D/C2D.ts +++ b/src/@types/C2D/C2D.ts @@ -1,5 +1,6 @@ import { MetadataAlgorithm, ConsumerParameter } from '@oceanprotocol/ddo-js' import type { BaseFileObject, StorageObject, EncryptMethod } from '../fileObject.js' +import type { AccessList } from '../AccessList.js' export enum C2DClusterType { // eslint-disable-next-line no-unused-vars OPF_K8 = 0, @@ -95,7 +96,7 @@ export interface RunningPlatform { export interface ComputeAccessList { addresses: string[] - accessLists: { [chainId: string]: string[] }[] | null + accessLists: AccessList[] | null } export interface ComputeEnvironmentFreeOptions { diff --git a/src/@types/OceanNode.ts b/src/@types/OceanNode.ts index d007a7f64..c2dff8f43 100644 --- a/src/@types/OceanNode.ts +++ b/src/@types/OceanNode.ts @@ -4,6 +4,7 @@ import { C2DClusterInfo, C2DDockerConfig } from './C2D/C2D' import { FeeStrategy } from './Fees' import { Schema } from '../components/database' import { KeyProviderType } from './KeyManager' +import type { PersistentStorageConfig } from './PersistentStorage.js' export interface OceanNodeDBConfig { url: string | null @@ -139,6 +140,7 @@ export interface OceanNodeConfig { jwtSecret?: string httpCertPath?: string httpKeyPath?: string + persistentStorage?: PersistentStorageConfig } export interface P2PStatusResponse { diff --git a/src/@types/PersistentStorage.ts b/src/@types/PersistentStorage.ts new file mode 100644 index 000000000..52b751691 --- /dev/null +++ b/src/@types/PersistentStorage.ts @@ -0,0 +1,24 @@ +import type { AccessList } from './AccessList' + +export type PersistentStorageType = 'localfs' | 's3' + +export interface PersistentStorageLocalFSOptions { + folder: string +} + +export interface PersistentStorageS3Options { + endpoint: string + region?: string + objectKey: string + accessKeyId: string + secretAccessKey: string + /** If true, use path-style addressing (e.g. endpoint/bucket/key). Required for some S3-compatible services (e.g. MinIO). Default false (virtual-host style, e.g. bucket.endpoint/key). */ + forcePathStyle?: boolean +} + +export interface PersistentStorageConfig { + enabled: boolean + type: PersistentStorageType + accessLists: AccessList[] + options: PersistentStorageLocalFSOptions | PersistentStorageS3Options +} diff --git a/src/@types/commands.ts b/src/@types/commands.ts index ba35502b9..58d81fbea 100644 --- a/src/@types/commands.ts +++ b/src/@types/commands.ts @@ -1,3 +1,4 @@ +import { Readable } from 'stream' import { ValidateParams } from '../components/httpRoutes/validateCommands.js' import { P2PCommandResponse } from './OceanNode' import { DDO } from '@oceanprotocol/ddo-js' @@ -8,12 +9,13 @@ import type { DBComputeJobMetadata } from './C2D/C2D.js' import { FileObjectType, StorageObject, EncryptMethod } from './fileObject' - +import type { AccessList } from './AccessList.js' export interface Command { command: string // command name node?: string // if not present it means current node authorization?: string caller?: string | string[] // added by our node for rate limiting + stream?: Readable | null // commands may have an extra stream, after body. IE: Encrypt file } export interface GetP2PPeerCommand extends Command { @@ -314,3 +316,43 @@ export interface GetJobsCommand extends Command { consumerAddrs?: string[] runningJobs?: boolean } + +export interface PersistentStorageCreateBucketCommand extends Command { + consumerAddress: string + signature: string + nonce: string + accessLists: AccessList[] +} + +export interface PersistentStorageGetBucketsCommand extends Command { + consumerAddress: string + signature: string + nonce: string + chainId: number + /** Ethereum address; only buckets with this stored owner are returned (then filtered by ACL). */ + owner: string +} + +export interface PersistentStorageListFilesCommand extends Command { + consumerAddress: string + signature: string + nonce: string + bucketId: string +} + +export interface PersistentStorageUploadFileCommand extends Command { + consumerAddress: string + signature: string + nonce: string + bucketId: string + fileName: string +} + +export interface PersistentStorageDeleteFileCommand extends Command { + consumerAddress: string + signature: string + nonce: string + chainId: number + bucketId: string + fileName: string +} diff --git a/src/OceanNode.ts b/src/OceanNode.ts index f32cf7d87..a8c9c0aac 100644 --- a/src/OceanNode.ts +++ b/src/OceanNode.ts @@ -13,6 +13,8 @@ import { Auth } from './components/Auth/index.js' import { KeyManager } from './components/KeyManager/index.js' import { BlockchainRegistry } from './components/BlockchainRegistry/index.js' import { Blockchain } from './utils/blockchain.js' +import { createPersistentStorage } from './components/persistentStorage/createPersistentStorage.js' +import { PersistentStorageFactory } from './components/persistentStorage/PersistentStorageFactory.js' export interface RequestLimiter { requester: string | string[] // IP address or peer ID @@ -37,6 +39,7 @@ export class OceanNode { private remoteCaller: string | string[] private requestMap: Map private auth: Auth + private persistentStorage: PersistentStorageFactory // eslint-disable-next-line no-useless-constructor private constructor( @@ -73,6 +76,15 @@ export class OceanNode { this.config.claimDurationTimeout, this.blockchainRegistry ) + if (this.config.persistentStorage?.enabled) { + OCEAN_NODE_LOGGER.info( + `Starting PersistenStorage with type ${this.config.persistentStorage.type}` + ) + this.persistentStorage = createPersistentStorage(this) + } else { + OCEAN_NODE_LOGGER.info(`Starting without PersistenStorage`) + this.persistentStorage = null + } } } @@ -181,6 +193,10 @@ export class OceanNode { return this.blockchainRegistry } + public getPersistentStorage(): PersistentStorageFactory | null { + return this.persistentStorage + } + /** * Get a Blockchain instance for the given chainId. * Delegates to BlockchainRegistry. @@ -200,6 +216,10 @@ export class OceanNode { } } + public getConfig(): OceanNodeConfig { + return this.config + } + /** * v3: Direct protocol command handler - no P2P, just call handler directly * Returns {status, stream} without buffering diff --git a/src/components/P2P/handleProtocolCommands.ts b/src/components/P2P/handleProtocolCommands.ts index 0750dec33..f94f6a2f5 100644 --- a/src/components/P2P/handleProtocolCommands.ts +++ b/src/components/P2P/handleProtocolCommands.ts @@ -123,7 +123,39 @@ export async function handleProtocolCommands(stream: Stream, connection: Connect return } - P2P_LOGGER.logMessage('Performing P2P task: ' + JSON.stringify(task), true) + const taskRecord = task as unknown as Record + if (taskRecord.p2pStreamBody === true) { + delete taskRecord.p2pStreamBody + + // True streaming: expose an async Readable that reads LP frames lazily + // as the handler consumes it. Frames are terminated by an empty chunk. + taskRecord.stream = Readable.from( + (async function* () { + while (true) { + const frame = await lp.read({ signal: handshakeSignal() }) + const buf = Buffer.from( + (frame as unknown as { subarray: () => Uint8Array }).subarray() + ) + + if (buf.length === 0) { + break + } + + yield buf + } + })() + ) + } + + const logPayload = { ...taskRecord } + // Avoid JSON-stringifying the request stream itself. + if (logPayload.stream) { + logPayload.stream = '[request stream]' + } + if (Buffer.isBuffer(logPayload.rawData)) { + logPayload.rawData = `[${logPayload.rawData.length} bytes]` + } + P2P_LOGGER.logMessage('Performing P2P task: ' + JSON.stringify(logPayload), true) // Get and execute handler const handler: BaseHandler = this.getCoreHandlers().getHandler(task.command) @@ -152,11 +184,15 @@ export async function handleProtocolCommands(stream: Stream, connection: Connect await stream.close() } catch (err) { P2P_LOGGER.logMessageWithEmoji( - 'handleProtocolCommands Error: ' + err.message, + 'handleProtocolCommands Error: ' + + (err instanceof Error ? err.message : String(err)), true, GENERIC_EMOJIS.EMOJI_CROSS_MARK, LOG_LEVELS_STR.LEVEL_ERROR ) - await sendErrorAndClose(500, err.message) + const httpStatus = + typeof (err as any)?.status === 'number' ? (err as any).status : 500 + const msg = err instanceof Error ? err.message : String(err) + await sendErrorAndClose(httpStatus, msg) } } diff --git a/src/components/P2P/index.ts b/src/components/P2P/index.ts index deca9308e..d4a4a5d9a 100644 --- a/src/components/P2P/index.ts +++ b/src/components/P2P/index.ts @@ -33,7 +33,7 @@ import { } from '@libp2p/kad-dht' import { EVENTS, cidFromRawString } from '../../utils/index.js' -import { Transform } from 'stream' +import { Transform, Readable } from 'stream' import { Database } from '../database' import { OceanNodeConfig, @@ -70,6 +70,35 @@ type DDOCache = { let index = 0 +/** Optional request payload sent as LP frames after the command JSON; ends with an empty LP frame. */ +export type P2PRequestBodyStream = AsyncIterable | Readable + +function toUint8ArrayChunk(chunk: unknown): Uint8Array { + if (chunk instanceof Uint8Array) return chunk + if (Buffer.isBuffer(chunk)) return new Uint8Array(chunk) + if (typeof chunk === 'string') return uint8ArrayFromString(chunk) + if ( + chunk && + typeof chunk === 'object' && + ArrayBuffer.isView(chunk as ArrayBufferView) + ) { + const v = chunk as ArrayBufferView + return new Uint8Array(v.buffer, v.byteOffset, v.byteLength) + } + throw new Error('Unsupported chunk type for P2P request body') +} + +async function writeP2pRequestBodyLp( + lp: LengthPrefixedStream, + body: P2PRequestBodyStream, + signal: AbortSignal +): Promise { + for await (const chunk of body as AsyncIterable) { + await lp.write(toUint8ArrayChunk(chunk), { signal }) + } + await lp.write(new Uint8Array(0), { signal }) +} + export class OceanP2P extends EventEmitter { _libp2p: Libp2p _topic: string @@ -733,9 +762,19 @@ export class OceanP2P extends EventEmitter { async send( lp: LengthPrefixedStream, message: string, - options: { signal: AbortSignal } + options: { signal: AbortSignal }, + requestBody?: P2PRequestBodyStream ) { - await lp.write(uint8ArrayFromString(message), { signal: options.signal }) + let outbound = message + if (requestBody) { + const cmd = JSON.parse(message) as Record + cmd.p2pStreamBody = true + outbound = JSON.stringify(cmd) + } + await lp.write(uint8ArrayFromString(outbound), { signal: options.signal }) + if (requestBody) { + await writeP2pRequestBodyLp(lp, requestBody, options.signal) + } const statusBytes = await lp.read({ signal: options.signal }) return { status: JSON.parse(uint8ArrayToString(statusBytes.subarray())), @@ -755,7 +794,8 @@ export class OceanP2P extends EventEmitter { async sendTo( peerName: string, message: string, - multiAddrs?: string[] + multiAddrs?: string[], + requestBody?: P2PRequestBodyStream ): Promise<{ status: any; stream?: AsyncIterable }> { const options = { signal: AbortSignal.timeout(10_000), @@ -807,7 +847,7 @@ export class OceanP2P extends EventEmitter { let streamErr: Error | null = null try { - return await this.send(lpStream(stream), message, options) + return await this.send(lpStream(stream), message, options, requestBody) } catch (err) { try { stream.abort(err as Error) @@ -831,7 +871,7 @@ export class OceanP2P extends EventEmitter { stream = await connection.newStream(this._protocol, options) try { - return await this.send(lpStream(stream), message, options) + return await this.send(lpStream(stream), message, options, requestBody) } catch (retryErr) { try { stream.abort(retryErr as Error) diff --git a/src/components/core/compute/startCompute.ts b/src/components/core/compute/startCompute.ts index e43c668fa..0d42608d6 100644 --- a/src/components/core/compute/startCompute.ts +++ b/src/components/core/compute/startCompute.ts @@ -1025,41 +1025,5 @@ async function validateAccess( if (access.addresses.includes(consumerAddress)) { return true } - - const config = await getConfiguration() - const { supportedNetworks } = config - for (const accessListMap of access.accessLists) { - if (!accessListMap) continue - for (const chain of Object.keys(accessListMap)) { - const { chainId } = supportedNetworks[chain] - try { - const blockchain = oceanNode.getBlockchain(chainId) - if (!blockchain) { - CORE_LOGGER.logMessage( - `Blockchain instance not available for chain ${chainId}, skipping access list check`, - true - ) - continue - } - const signer = await blockchain.getSigner() - for (const accessListAddress of accessListMap[chain]) { - const hasAccess = await checkAddressOnAccessList( - accessListAddress, - consumerAddress, - signer - ) - if (hasAccess) { - return true - } - } - } catch (error) { - CORE_LOGGER.logMessage( - `Failed to check access lists on chain ${chain}: ${error.message}`, - true - ) - } - } - } - - return false + return await checkAddressOnAccessList(consumerAddress, access.accessLists, oceanNode) } diff --git a/src/components/core/handler/coreHandlersRegistry.ts b/src/components/core/handler/coreHandlersRegistry.ts index de3464580..985282192 100644 --- a/src/components/core/handler/coreHandlersRegistry.ts +++ b/src/components/core/handler/coreHandlersRegistry.ts @@ -47,6 +47,13 @@ import { } from './p2p.js' import { CreateAuthTokenHandler, InvalidateAuthTokenHandler } from './authHandler.js' import { GetJobsHandler } from './getJobs.js' +import { + PersistentStorageCreateBucketHandler, + PersistentStorageDeleteFileHandler, + PersistentStorageGetBucketsHandler, + PersistentStorageListFilesHandler, + PersistentStorageUploadFileHandler +} from './persistentStorage.js' export type HandlerRegistry = { handlerName: string // name of the handler @@ -167,6 +174,26 @@ export class CoreHandlersRegistry { this.registerCoreHandler(PROTOCOL_COMMANDS.PUSH_CONFIG, new PushConfigHandler(node)) this.registerCoreHandler(PROTOCOL_COMMANDS.GET_LOGS, new GetLogsHandler(node)) this.registerCoreHandler(PROTOCOL_COMMANDS.JOBS, new GetJobsHandler(node)) + this.registerCoreHandler( + PROTOCOL_COMMANDS.PERSISTENT_STORAGE_CREATE_BUCKET, + new PersistentStorageCreateBucketHandler(node) + ) + this.registerCoreHandler( + PROTOCOL_COMMANDS.PERSISTENT_STORAGE_GET_BUCKETS, + new PersistentStorageGetBucketsHandler(node) + ) + this.registerCoreHandler( + PROTOCOL_COMMANDS.PERSISTENT_STORAGE_LIST_FILES, + new PersistentStorageListFilesHandler(node) + ) + this.registerCoreHandler( + PROTOCOL_COMMANDS.PERSISTENT_STORAGE_UPLOAD_FILE, + new PersistentStorageUploadFileHandler(node) + ) + this.registerCoreHandler( + PROTOCOL_COMMANDS.PERSISTENT_STORAGE_DELETE_FILE, + new PersistentStorageDeleteFileHandler(node) + ) } public static getInstance( diff --git a/src/components/core/handler/persistentStorage.ts b/src/components/core/handler/persistentStorage.ts new file mode 100644 index 000000000..b94ca529a --- /dev/null +++ b/src/components/core/handler/persistentStorage.ts @@ -0,0 +1,327 @@ +import { Readable } from 'stream' +import type { + PersistentStorageCreateBucketCommand, + PersistentStorageDeleteFileCommand, + PersistentStorageGetBucketsCommand, + PersistentStorageListFilesCommand, + PersistentStorageUploadFileCommand +} from '../../../@types/commands.js' +import { + PersistentStorageAccessDeniedError, + type PersistentStorageFactory +} from '../../persistentStorage/PersistentStorageFactory.js' +import type { P2PCommandResponse } from '../../../@types/OceanNode.js' +import { getAddress } from 'ethers' +import { checkAddressOnAccessList } from '../../../utils/accessList.js' + +import { CORE_LOGGER } from '../../../utils/logging/common.js' +import { + buildInvalidRequestMessage, + validateCommandParameters, + type ValidateParams +} from '../../httpRoutes/validateCommands.js' +import { CommandHandler } from './handler.js' + +function requirePersistentStorage( + handler: CommandHandler +): Promise { + const node = handler.getOceanNode() as any + if (!node.getPersistentStorage) { + throw new Error('Persistent storage is not available on this node') + } + const storage = node.getPersistentStorage() + if (!storage) { + throw new Error('Persistent storage is not configured or disabled') + } + return storage +} + +export class PersistentStorageCreateBucketHandler extends CommandHandler { + validate(command: PersistentStorageCreateBucketCommand): ValidateParams { + const base = validateCommandParameters(command, [ + 'consumerAddress', + 'signature', + 'nonce', + 'accessLists' + ]) + if (!base.valid) return base + if (!Array.isArray(command.accessLists)) { + return buildInvalidRequestMessage( + 'Invalid parameter: "accessLists" must be an array of objects' + ) + } + return { valid: true } + } + + async handle(task: PersistentStorageCreateBucketCommand): Promise { + const validationResponse = await this.verifyParamsAndRateLimits(task) + if (this.shouldDenyTaskHandling(validationResponse)) return validationResponse + + const isAuthRequestValid = await this.validateTokenOrSignature( + task.authorization, + task.consumerAddress, + task.nonce, + task.signature, + task.command + ) + if (isAuthRequestValid.status.httpStatus !== 200) return isAuthRequestValid + + try { + const storage = await requirePersistentStorage(this) + const node = this.getOceanNode() + const config = node.getConfig() + const isAllowedCreate = await checkAddressOnAccessList( + task.consumerAddress, + config.persistentStorage?.accessLists, + node + ) + if (!isAllowedCreate) { + return { + stream: null, + status: { httpStatus: 403, error: 'You are not allowed to create new buckets' } + } + } + + let ownerNormalized: string + try { + ownerNormalized = getAddress(task.consumerAddress) + } catch { + return { + stream: null, + status: { httpStatus: 400, error: 'Invalid parameter: "consumerAddress"' } + } + } + + const result = await storage.createNewBucket(task.accessLists, ownerNormalized) + return { + stream: Readable.from(JSON.stringify(result)), + status: { httpStatus: 200, error: null } + } + } catch (e) { + const message = e instanceof Error ? e.message : String(e) + CORE_LOGGER.error(`PersistentStorageCreateBucketHandler error: ${message}`) + return { stream: null, status: { httpStatus: 500, error: message } } + } + } +} + +export class PersistentStorageGetBucketsHandler extends CommandHandler { + validate(command: PersistentStorageGetBucketsCommand): ValidateParams { + const base = validateCommandParameters(command, [ + 'consumerAddress', + 'signature', + 'nonce', + 'chainId', + 'owner' + ]) + if (!base.valid) return base + if (typeof command.chainId !== 'number') { + return buildInvalidRequestMessage('Invalid parameter: "chainId" must be a number') + } + if (!command.owner || typeof command.owner !== 'string') { + return buildInvalidRequestMessage( + 'Invalid parameter: "owner" must be a non-empty string' + ) + } + return { valid: true } + } + + async handle(task: PersistentStorageGetBucketsCommand): Promise { + const validationResponse = await this.verifyParamsAndRateLimits(task) + if (this.shouldDenyTaskHandling(validationResponse)) return validationResponse + + const isAuthRequestValid = await this.validateTokenOrSignature( + task.authorization, + task.consumerAddress, + task.nonce, + task.signature, + task.command + ) + if (isAuthRequestValid.status.httpStatus !== 200) return isAuthRequestValid + + let ownerNormalized: string + // let consumerNormalized: string + try { + ownerNormalized = getAddress(task.owner) + // consumerNormalized = getAddress(task.consumerAddress) + } catch { + return { + stream: null, + status: { + httpStatus: 400, + error: 'Invalid parameter: "owner" or "consumerAddress"' + } + } + } + + try { + const storage = await requirePersistentStorage(this) + // const node = this.getOceanNode() + const rows = await storage.listBuckets(ownerNormalized) + + return { + stream: Readable.from(JSON.stringify(rows)), + status: { httpStatus: 200, error: null } + } + } catch (e) { + const message = e instanceof Error ? e.message : String(e) + CORE_LOGGER.error(`PersistentStorageGetBucketsHandler error: ${message}`) + return { stream: null, status: { httpStatus: 500, error: message } } + } + } +} + +export class PersistentStorageListFilesHandler extends CommandHandler { + validate(command: PersistentStorageListFilesCommand): ValidateParams { + const base = validateCommandParameters(command, [ + 'consumerAddress', + 'signature', + 'nonce', + 'bucketId' + ]) + if (!base.valid) return base + if (!command.bucketId || typeof command.bucketId !== 'string') { + return buildInvalidRequestMessage('Invalid parameter: "bucketId" must be a string') + } + return { valid: true } + } + + async handle(task: PersistentStorageListFilesCommand): Promise { + const validationResponse = await this.verifyParamsAndRateLimits(task) + if (this.shouldDenyTaskHandling(validationResponse)) return validationResponse + + const isAuthRequestValid = await this.validateTokenOrSignature( + task.authorization, + task.consumerAddress, + task.nonce, + task.signature, + task.command + ) + if (isAuthRequestValid.status.httpStatus !== 200) return isAuthRequestValid + + try { + const storage = await requirePersistentStorage(this) + const result = await storage.listFiles(task.bucketId, task.consumerAddress) + return { + stream: Readable.from(JSON.stringify(result)), + status: { httpStatus: 200, error: null } + } + } catch (e) { + if (e instanceof PersistentStorageAccessDeniedError) { + return { + stream: null, + status: { httpStatus: 403, error: e.message } + } + } + const message = e instanceof Error ? e.message : String(e) + CORE_LOGGER.error(`PersistentStorageListFilesHandler error: ${message}`) + return { stream: null, status: { httpStatus: 500, error: message } } + } + } +} + +export class PersistentStorageUploadFileHandler extends CommandHandler { + validate(command: PersistentStorageUploadFileCommand): ValidateParams { + const base = validateCommandParameters(command, [ + 'consumerAddress', + 'signature', + 'nonce', + 'bucketId', + 'fileName' + ]) + if (!base.valid) return base + return { valid: true } + } + + async handle(task: PersistentStorageUploadFileCommand): Promise { + const validationResponse = await this.verifyParamsAndRateLimits(task) + if (this.shouldDenyTaskHandling(validationResponse)) return validationResponse + + const isAuthRequestValid = await this.validateTokenOrSignature( + task.authorization, + task.consumerAddress, + task.nonce, + task.signature, + task.command + ) + if (isAuthRequestValid.status.httpStatus !== 200) return isAuthRequestValid + + try { + const storage = await requirePersistentStorage(this) + if (!task.stream) { + return { + stream: null, + status: { httpStatus: 403, error: 'Upload stream error' } + } + } + const result = await storage.uploadFile( + task.bucketId, + task.fileName, + task.stream, + task.consumerAddress + ) + return { + stream: Readable.from(JSON.stringify(result)), + status: { httpStatus: 200, error: null } + } + } catch (e) { + if (e instanceof PersistentStorageAccessDeniedError) { + return { + stream: null, + status: { httpStatus: 403, error: e.message } + } + } + const message = e instanceof Error ? e.message : String(e) + CORE_LOGGER.error(`PersistentStorageUploadFileHandler error: ${message}`) + return { stream: null, status: { httpStatus: 500, error: message } } + } + } +} + +export class PersistentStorageDeleteFileHandler extends CommandHandler { + validate(command: PersistentStorageDeleteFileCommand): ValidateParams { + const base = validateCommandParameters(command, [ + 'consumerAddress', + 'signature', + 'nonce', + 'chainId', + 'bucketId', + 'fileName' + ]) + if (!base.valid) return base + return { valid: true } + } + + async handle(task: PersistentStorageDeleteFileCommand): Promise { + const validationResponse = await this.verifyParamsAndRateLimits(task) + if (this.shouldDenyTaskHandling(validationResponse)) return validationResponse + + const isAuthRequestValid = await this.validateTokenOrSignature( + task.authorization, + task.consumerAddress, + task.nonce, + task.signature, + task.command + ) + if (isAuthRequestValid.status.httpStatus !== 200) return isAuthRequestValid + + try { + const storage = await requirePersistentStorage(this) + await storage.deleteFile(task.bucketId, task.fileName, task.consumerAddress) + return { + stream: Readable.from(JSON.stringify({ success: true })), + status: { httpStatus: 200, error: null } + } + } catch (e) { + if (e instanceof PersistentStorageAccessDeniedError) { + return { + stream: null, + status: { httpStatus: 403, error: e.message } + } + } + const message = e instanceof Error ? e.message : String(e) + CORE_LOGGER.error(`PersistentStorageDeleteFileHandler error: ${message}`) + return { stream: null, status: { httpStatus: 500, error: message } } + } + } +} diff --git a/src/components/httpRoutes/index.ts b/src/components/httpRoutes/index.ts index cf5530c5f..184608f80 100644 --- a/src/components/httpRoutes/index.ts +++ b/src/components/httpRoutes/index.ts @@ -15,6 +15,7 @@ import { addMapping, allRoutesMapping, findPathName } from './routeUtils.js' import { PolicyServerPassthroughRoute } from './policyServer.js' import { authRoutes } from './auth.js' import { adminConfigRoutes } from './adminConfig.js' +import { persistentStorageRoutes } from './persistentStorage.js' export * from './getOceanPeers.js' export * from './auth.js' @@ -62,6 +63,8 @@ httpRoutes.use(PolicyServerPassthroughRoute) httpRoutes.use(authRoutes) // admin config routes httpRoutes.use(adminConfigRoutes) +// persistent storage routes +httpRoutes.use(persistentStorageRoutes) export function getAllServiceEndpoints() { httpRoutes.stack.forEach(addMapping.bind(null, [])) diff --git a/src/components/httpRoutes/persistentStorage.ts b/src/components/httpRoutes/persistentStorage.ts new file mode 100644 index 000000000..db7e977d9 --- /dev/null +++ b/src/components/httpRoutes/persistentStorage.ts @@ -0,0 +1,179 @@ +import express from 'express' +import { Readable } from 'stream' + +import { SERVICES_API_BASE_PATH, PROTOCOL_COMMANDS } from '../../utils/constants.js' +import { HTTP_LOGGER } from '../../utils/logging/common.js' +import { streamToObject, streamToString } from '../../utils/util.js' + +import { + PersistentStorageCreateBucketHandler, + PersistentStorageDeleteFileHandler, + PersistentStorageGetBucketsHandler, + PersistentStorageListFilesHandler, + PersistentStorageUploadFileHandler +} from '../core/handler/persistentStorage.js' + +export const persistentStorageRoutes = express.Router() + +function readRawBody(req: any): Promise { + return new Promise((resolve, reject) => { + const chunks: Buffer[] = [] + req.on('data', (chunk: any) => chunks.push(Buffer.from(chunk))) + req.on('end', () => resolve(Buffer.concat(chunks))) + req.on('error', reject) + }) +} + +// Create bucket +persistentStorageRoutes.post( + `${SERVICES_API_BASE_PATH}/persistentStorage/buckets`, + express.json(), + async (req, res) => { + try { + const response = await new PersistentStorageCreateBucketHandler( + req.oceanNode + ).handle({ + ...req.body, + command: PROTOCOL_COMMANDS.PERSISTENT_STORAGE_CREATE_BUCKET, + authorization: req.headers?.authorization, + caller: req.caller + }) + if (!response.stream) { + res.status(response.status.httpStatus).send(response.status.error) + return + } + const payload = await streamToObject(response.stream as Readable) + res.status(200).json(payload) + } catch (error) { + HTTP_LOGGER.error(`PersistentStorage create bucket error: ${error}`) + res.status(500).send('Internal Server Error') + } + } +) + +// List buckets for an owner (then filtered by ACL in handler) +persistentStorageRoutes.get( + `${SERVICES_API_BASE_PATH}/persistentStorage/buckets`, + async (req, res) => { + try { + const response = await new PersistentStorageGetBucketsHandler(req.oceanNode).handle( + { + command: PROTOCOL_COMMANDS.PERSISTENT_STORAGE_GET_BUCKETS, + consumerAddress: req.query.consumerAddress as string, + signature: req.query.signature as string, + nonce: req.query.nonce as string, + chainId: parseInt(req.query.chainId as string) || null, + owner: req.query.owner as string, + authorization: req.headers?.authorization, + caller: req.caller + } as any + ) + if (!response.stream) { + res.status(response.status.httpStatus).send(response.status.error) + return + } + const payload = await streamToObject(response.stream as Readable) + res.status(200).json(payload) + } catch (error) { + HTTP_LOGGER.error(`PersistentStorage get buckets error: ${error}`) + res.status(500).send('Internal Server Error') + } + } +) + +// List files in bucket +persistentStorageRoutes.get( + `${SERVICES_API_BASE_PATH}/persistentStorage/buckets/:bucketId/files`, + async (req, res) => { + try { + const response = await new PersistentStorageListFilesHandler(req.oceanNode).handle({ + command: PROTOCOL_COMMANDS.PERSISTENT_STORAGE_LIST_FILES, + consumerAddress: req.query.consumerAddress as string, + signature: req.query.signature as string, + nonce: req.query.nonce as string, + bucketId: req.params.bucketId, + authorization: req.headers?.authorization, + caller: req.caller + } as any) + if (!response.stream) { + res.status(response.status.httpStatus).send(response.status.error) + return + } + const payload = await streamToObject(response.stream as Readable) + res.status(200).json(payload) + } catch (error) { + HTTP_LOGGER.error(`PersistentStorage list files error: ${error}`) + res.status(500).send('Internal Server Error') + } + } +) + +// Upload file to bucket. Body is treated as raw bytes. +persistentStorageRoutes.post( + `${SERVICES_API_BASE_PATH}/persistentStorage/buckets/:bucketId/files/:fileName`, + async (req, res) => { + try { + const raw = await readRawBody(req) + const response = await new PersistentStorageUploadFileHandler(req.oceanNode).handle( + { + command: PROTOCOL_COMMANDS.PERSISTENT_STORAGE_UPLOAD_FILE, + consumerAddress: req.query.consumerAddress as string, + signature: req.query.signature as string, + nonce: req.query.nonce as string, + bucketId: req.params.bucketId, + fileName: req.params.fileName, + stream: Readable.from(raw), + authorization: req.headers?.authorization, + caller: req.caller + } as any + ) + if (!response.stream) { + res.status(response.status.httpStatus).send(response.status.error) + return + } + const payload = await streamToObject(response.stream as Readable) + res.status(200).json(payload) + } catch (error) { + HTTP_LOGGER.error(`PersistentStorage upload error: ${error}`) + res.status(500).send('Internal Server Error') + } + } +) + +// Delete file from bucket +persistentStorageRoutes.delete( + `${SERVICES_API_BASE_PATH}/persistentStorage/buckets/:bucketId/files/:fileName`, + async (req, res) => { + try { + const response = await new PersistentStorageDeleteFileHandler(req.oceanNode).handle( + { + command: PROTOCOL_COMMANDS.PERSISTENT_STORAGE_DELETE_FILE, + consumerAddress: req.query.consumerAddress as string, + signature: req.query.signature as string, + nonce: req.query.nonce as string, + chainId: parseInt(req.query.chainId as string) || null, + bucketId: req.params.bucketId, + fileName: req.params.fileName, + authorization: req.headers?.authorization, + caller: req.caller + } as any + ) + + if (response.status.httpStatus !== 200) { + res.status(response.status.httpStatus).send(response.status.error) + return + } + + if (!response.stream) { + res.status(200).json({ success: true }) + return + } + + const payload = JSON.parse(await streamToString(response.stream as Readable)) + res.status(200).json(payload) + } catch (error) { + HTTP_LOGGER.error(`PersistentStorage delete error: ${error}`) + res.status(500).send('Internal Server Error') + } + } +) diff --git a/src/components/httpRoutes/validateCommands.ts b/src/components/httpRoutes/validateCommands.ts index 49850f496..f8bb365cb 100644 --- a/src/components/httpRoutes/validateCommands.ts +++ b/src/components/httpRoutes/validateCommands.ts @@ -32,8 +32,23 @@ export function validateCommandParameters( return buildInvalidRequestMessage(`Invalid or unrecognized command: "${commandStr}"`) } - // deep copy - const logCommandData = structuredClone(commandData) + // deep copy for logging (must not throw for non-cloneable payloads like streams) + let logCommandData: any + try { + // For some commands, the task contains non-cloneable fields (e.g. Node streams). + // We redact those before cloning to avoid DataCloneError. + const sanitized = { ...(commandData ?? {}) } + if ('stream' in sanitized) { + sanitized.stream = '[STREAM]' + } + logCommandData = structuredClone(sanitized) + } catch { + // Last resort: shallow clone; avoid crashing validation because of logging. + logCommandData = { ...(commandData ?? {}) } + if ('stream' in logCommandData) { + logCommandData.stream = '[STREAM]' + } + } if (commandStr === PROTOCOL_COMMANDS.ENCRYPT) { logCommandData.files = [] // hide files data (sensitive) + rawData (long buffer) from logging diff --git a/src/components/persistentStorage/PersistentStorageFactory.ts b/src/components/persistentStorage/PersistentStorageFactory.ts new file mode 100644 index 000000000..6ceee168d --- /dev/null +++ b/src/components/persistentStorage/PersistentStorageFactory.ts @@ -0,0 +1,232 @@ +import type { AccessList } from '../../@types/AccessList.js' +import type { BaseFileObject } from '../../@types/fileObject.js' +import sqlite3, { RunResult } from 'sqlite3' +import path from 'path' +import fs from 'fs' +import { getAddress } from 'ethers' +import { OceanNode } from '../../OceanNode.js' +import { checkAddressOnAccessList } from '../../utils/accessList.js' + +export class PersistentStorageAccessDeniedError extends Error { + constructor(message = 'You are not allowed to access this bucket') { + super(message) + this.name = 'PersistentStorageAccessDeniedError' + } +} + +function normalizeWeb3Address(addr: string): string { + try { + return getAddress(addr) + } catch { + return (addr ?? '').toLowerCase() + } +} + +function parseBucketAccessListsJson(accessListJson: string): AccessList[] { + try { + const parsed = JSON.parse(accessListJson || '[]') + return Array.isArray(parsed) ? (parsed as AccessList[]) : [] + } catch { + return [] + } +} + +export type BucketRow = { + bucketId: string + owner: string + accessListJson: string + createdAt: number +} + +export interface PersistentStorageFileInfo { + bucketId: string + name: string + size: number + lastModified: number +} + +export type CreateBucketResult = { + bucketId: string + owner: string + accessList: AccessList[] +} + +/** Bucket metadata from registry (list APIs and internal filtering). */ +export type PersistentStorageBucketRecord = { + bucketId: string + owner: string + createdAt: number + accessLists: AccessList[] +} + +export abstract class PersistentStorageFactory { + private db: sqlite3.Database + private node: OceanNode + + constructor(node: OceanNode) { + this.node = node + const dbDir = path.dirname('databases/') + if (!fs.existsSync(dbDir)) { + fs.mkdirSync(dbDir, { recursive: true }) + } + this.db = new sqlite3.Database(dbDir + 'persistentStorage.sqlite') + } + + public abstract createNewBucket( + accessList: AccessList[], + owner: string + ): Promise + + public abstract listFiles( + bucketId: string, + consumerAddress: string + ): Promise + + public abstract uploadFile( + bucketId: string, + fileName: string, + content: NodeJS.ReadableStream, + consumerAddress: string + ): Promise + + public abstract deleteFile( + bucketId: string, + fileName: string, + consumerAddress: string + ): Promise + + /** + * Returns a file object that can be attached to compute jobs. + * The concrete shape depends on the backend implementation. + */ + public abstract getFileObject( + bucketId: string, + fileName: string, + consumerAddress: string + ): Promise + + // common functions + async getBucketAccessList(bucketId: string): Promise { + await this.dbCreateTables() + try { + const row = await this.getBucket(bucketId) + if (!row) { + return [] + } + return parseBucketAccessListsJson(row.accessListJson) + } catch { + return [] + } + } + + async getBucket(bucketId: string): Promise { + await this.dbCreateTables() + try { + const row = await this.dbGetBucket(bucketId) + return row + } catch { + return null + } + } + + /** + * Lists buckets for a given owner from the SQLite registry (metadata only). + * `owner` must already be normalized (e.g. checksummed `getAddress`). + * Backends that need setup (e.g. localfs init) should override and call `super.listBuckets(owner)`. + */ + async listBuckets(owner: string): Promise { + await this.dbCreateTables() + const rows = await this.dbListBucketsByOwner(owner) + return rows.map((row) => ({ + bucketId: row.bucketId, + owner: row.owner, + createdAt: row.createdAt, + accessLists: parseBucketAccessListsJson(row.accessListJson) + })) + } + + dbCreateTables(): Promise { + const createBucketsSQL = ` + CREATE TABLE IF NOT EXISTS persistent_storage_buckets ( + bucketId TEXT PRIMARY KEY, + owner TEXT NOT NULL, + accessListJson TEXT NOT NULL, + createdAt INTEGER NOT NULL + ); + ` + return new Promise((resolve, reject) => { + this.db.run(createBucketsSQL, (err) => { + if (err) reject(err) + else resolve() + }) + }) + } + + dbUpsertBucket( + bucketId: string, + owner: string, + accessListJson: string, + createdAt: number + ): Promise { + const sql = ` + INSERT INTO persistent_storage_buckets (bucketId, owner, accessListJson, createdAt) + VALUES (?, ?, ?, ?) + ON CONFLICT(bucketId) DO UPDATE SET accessListJson=excluded.accessListJson; + ` + return new Promise((resolve, reject) => { + this.db.run(sql, [bucketId, owner, accessListJson, createdAt], (err) => { + if (err) reject(err) + else resolve() + }) + }) + } + + dbGetBucket(bucketId: string): Promise { + const sql = `SELECT bucketId, owner, accessListJson, createdAt FROM persistent_storage_buckets WHERE bucketId = ?` + return new Promise((resolve, reject) => { + this.db.get(sql, [bucketId], (err, row: BucketRow | undefined) => { + if (err) reject(err) + else resolve(row ?? null) + }) + }) + } + + dbListBucketsByOwner(owner: string): Promise { + const sql = `SELECT bucketId, owner, accessListJson, createdAt FROM persistent_storage_buckets WHERE owner = ? ORDER BY createdAt ASC` + return new Promise((resolve, reject) => { + this.db.all(sql, [owner], (err, rows: BucketRow[]) => { + if (err) reject(err) + else resolve(rows ?? []) + }) + }) + } + + dbDeleteBucket(bucketId: string): Promise { + const sql = `DELETE FROM persistent_storage_buckets WHERE bucketId = ?` + return new Promise((resolve, reject) => { + this.db.run(sql, [bucketId], function (this: RunResult, err) { + if (err) reject(err) + else resolve(this.changes === 1) + }) + }) + } + + isAllowed(consumerAddress: string, accessLists: AccessList[]): Promise { + return checkAddressOnAccessList(consumerAddress, accessLists, this.node) + } + + /** Throws {@link PersistentStorageAccessDeniedError} if the consumer is not on the bucket access list. */ + protected async assertConsumerAllowedForBucket( + consumerAddress: string, + bucketId: string + ): Promise { + const bucket = await this.getBucket(bucketId) + const accessLists = parseBucketAccessListsJson(bucket.accessListJson) + if (normalizeWeb3Address(consumerAddress) === normalizeWeb3Address(bucket.owner)) { + return + } + if (!(await this.isAllowed(consumerAddress, accessLists))) { + throw new PersistentStorageAccessDeniedError() + } + } +} diff --git a/src/components/persistentStorage/PersistentStorageLocalFS.ts b/src/components/persistentStorage/PersistentStorageLocalFS.ts new file mode 100644 index 000000000..fa02c34f7 --- /dev/null +++ b/src/components/persistentStorage/PersistentStorageLocalFS.ts @@ -0,0 +1,162 @@ +import fs from 'fs' +import fsp from 'fs/promises' +import path from 'path' +import { pipeline } from 'stream/promises' +import { randomUUID } from 'crypto' + +import type { AccessList } from '../../@types/AccessList.js' +import type { PersistentStorageLocalFSOptions } from '../../@types/PersistentStorage.js' +import type { BaseFileObject } from '../../@types/fileObject.js' + +import { + CreateBucketResult, + PersistentStorageBucketRecord, + PersistentStorageFactory, + PersistentStorageFileInfo +} from './PersistentStorageFactory.js' +import { OceanNode } from '../../OceanNode.js' + +type LocalFileObject = BaseFileObject & { + type: 'localfs' + bucketId: string + fileName: string +} + +export class PersistentStorageLocalFS extends PersistentStorageFactory { + private baseFolder: string + + constructor(node: OceanNode) { + super(node) + const options = node.getConfig().persistentStorage + .options as PersistentStorageLocalFSOptions + + this.baseFolder = options.folder + } + + async init(): Promise { + await fsp.mkdir(this.baseFolder, { recursive: true }) + await super.dbCreateTables() + } + + private bucketPath(bucketId: string): string { + return path.join(this.baseFolder, 'buckets', bucketId) + } + + private async ensureBucketExists(bucketId: string): Promise { + const row = await this.dbGetBucket(bucketId) + if (!row) { + throw new Error(`Bucket not found: ${bucketId}`) + } + } + + async listBuckets(owner: string): Promise { + await this.init() + return super.listBuckets(owner) + } + + async createNewBucket( + accessList: AccessList[], + owner: string + ): Promise { + await this.init() + + const bucketId = randomUUID() + const createdAt = Math.floor(Date.now() / 1000) + await fsp.mkdir(this.bucketPath(bucketId), { recursive: true }) + await super.dbUpsertBucket( + bucketId, + owner, + JSON.stringify(accessList ?? []), + createdAt + ) + + return { bucketId, owner, accessList } + } + + async listFiles( + bucketId: string, + consumerAddress: string + ): Promise { + await this.init() + await this.ensureBucketExists(bucketId) + await this.assertConsumerAllowedForBucket(consumerAddress, bucketId) + + const dir = this.bucketPath(bucketId) + const entries = await fsp.readdir(dir, { withFileTypes: true }) + const out: PersistentStorageFileInfo[] = [] + + for (const ent of entries) { + if (!ent.isFile()) continue + const filePath = path.join(dir, ent.name) + const st = await fsp.stat(filePath) + out.push({ + bucketId, + name: ent.name, + size: st.size, + lastModified: Math.floor(st.mtimeMs) + }) + } + + return out + } + + async uploadFile( + bucketId: string, + fileName: string, + content: NodeJS.ReadableStream, + consumerAddress: string + ): Promise { + await this.init() + await this.ensureBucketExists(bucketId) + await this.assertConsumerAllowedForBucket(consumerAddress, bucketId) + + if (!fileName || fileName.includes('/') || fileName.includes('\\')) { + throw new Error('Invalid fileName') + } + + const targetDir = this.bucketPath(bucketId) + await fsp.mkdir(targetDir, { recursive: true }) + const targetPath = path.join(targetDir, fileName) + + await pipeline(content, fs.createWriteStream(targetPath)) + + const st = await fsp.stat(targetPath) + return { + bucketId, + name: fileName, + size: st.size, + lastModified: Math.floor(st.mtimeMs) + } + } + + async deleteFile( + bucketId: string, + fileName: string, + consumerAddress: string + ): Promise { + await this.init() + await this.ensureBucketExists(bucketId) + await this.assertConsumerAllowedForBucket(consumerAddress, bucketId) + + const targetPath = path.join(this.bucketPath(bucketId), fileName) + await fsp.rm(targetPath, { force: true }) + } + + async getFileObject( + bucketId: string, + fileName: string, + consumerAddress: string + ): Promise { + await this.init() + await this.ensureBucketExists(bucketId) + await this.assertConsumerAllowedForBucket(consumerAddress, bucketId) + + // This is intentionally not a downloadable URL; compute backends can interpret this object. + const obj: LocalFileObject = { + type: 'localfs', + bucketId, + fileName + } + return obj + } +} diff --git a/src/components/persistentStorage/PersistentStorageS3.ts b/src/components/persistentStorage/PersistentStorageS3.ts new file mode 100644 index 000000000..4be68c9dd --- /dev/null +++ b/src/components/persistentStorage/PersistentStorageS3.ts @@ -0,0 +1,74 @@ +import { + CreateBucketResult, + PersistentStorageBucketRecord, + PersistentStorageFactory, + PersistentStorageFileInfo +} from './PersistentStorageFactory.js' + +import type { AccessList } from '../../@types/AccessList.js' +import type { PersistentStorageS3Options } from '../../@types/PersistentStorage.js' +import type { BaseFileObject } from '../../@types/fileObject.js' +import { OceanNode } from '../../OceanNode.js' + +export class PersistentStorageS3 extends PersistentStorageFactory { + private options: PersistentStorageS3Options + constructor(node: OceanNode) { + super(node) + this.options = node.getConfig().persistentStorage + .options as PersistentStorageS3Options + } + + // eslint-disable-next-line require-await + async init(): Promise { + throw new Error('PersistentStorageS3 is not implemented yet') + } + + async listBuckets(owner: string): Promise { + await this.init() + return super.listBuckets(owner) + } + + // eslint-disable-next-line require-await + async createNewBucket( + accessList: AccessList[], + _owner: string + ): Promise { + throw new Error('PersistentStorageS3 is not implemented yet') + } + + // eslint-disable-next-line require-await + async listFiles( + _bucketId: string, + _consumerAddress: string + ): Promise { + throw new Error('PersistentStorageS3 is not implemented yet') + } + + // eslint-disable-next-line require-await + async uploadFile( + _bucketId: string, + _fileName: string, + _content: Buffer | NodeJS.ReadableStream, + _consumerAddress: string + ): Promise { + throw new Error('PersistentStorageS3 is not implemented yet') + } + + // eslint-disable-next-line require-await + async deleteFile( + _bucketId: string, + _fileName: string, + _consumerAddress: string + ): Promise { + throw new Error('PersistentStorageS3 is not implemented yet') + } + + // eslint-disable-next-line require-await + async getFileObject( + _bucketId: string, + _fileName: string, + _consumerAddress: string + ): Promise { + throw new Error('PersistentStorageS3 is not implemented yet') + } +} diff --git a/src/components/persistentStorage/createPersistentStorage.ts b/src/components/persistentStorage/createPersistentStorage.ts new file mode 100644 index 000000000..c8588cde4 --- /dev/null +++ b/src/components/persistentStorage/createPersistentStorage.ts @@ -0,0 +1,23 @@ +import { OceanNode } from '../../OceanNode.js' + +import type { PersistentStorageFactory } from './PersistentStorageFactory.js' +import { PersistentStorageLocalFS } from './PersistentStorageLocalFS.js' +import { PersistentStorageS3 } from './PersistentStorageS3.js' + +export function createPersistentStorage(node: OceanNode): PersistentStorageFactory { + const config = node.getConfig().persistentStorage + if (!config?.enabled) { + throw new Error('Persistent storage is disabled') + } + + switch (config.type) { + case 'localfs': + return new PersistentStorageLocalFS(node) + case 's3': + return new PersistentStorageS3(node) + default: + throw new Error( + `Unsupported persistent storage type: ${(config as { type?: string })?.type}` + ) + } +} diff --git a/src/components/persistentStorage/index.ts b/src/components/persistentStorage/index.ts new file mode 100644 index 000000000..00ad60b6a --- /dev/null +++ b/src/components/persistentStorage/index.ts @@ -0,0 +1,4 @@ +export * from './PersistentStorageFactory.js' +export { createPersistentStorage } from './createPersistentStorage.js' +export * from './PersistentStorageLocalFS.js' +export * from './PersistentStorageS3.js' diff --git a/src/test/integration/accessLists.test.ts b/src/test/integration/accessLists.test.ts index fe901f65e..18e62fd7d 100644 --- a/src/test/integration/accessLists.test.ts +++ b/src/test/integration/accessLists.test.ts @@ -17,7 +17,7 @@ import { AccessListContract, OceanNodeConfig } from '../../@types/OceanNode.js' import { homedir } from 'os' import { getConfiguration } from '../../utils/config.js' import { assert, expect } from 'chai' -import { checkAddressOnAccessList } from '../../utils/accessList.js' +import { checkAddressOnAccessListWithSigner } from '../../utils/accessList.js' import { KeyManager } from '../../components/KeyManager/index.js' describe('Should deploy some accessLists before all other tests.', () => { @@ -174,7 +174,11 @@ describe('Should deploy some accessLists before all other tests.', () => { for (let i = 0; i < wallets.length; i++) { const account = await wallets[i].getAddress() expect( - (await checkAddressOnAccessList(accessListAddress, account, owner)) === true, + (await checkAddressOnAccessListWithSigner( + accessListAddress, + account, + owner + )) === true, `Address ${account} has no balance on Access List ${accessListAddress}, so its not Authorized` ) } @@ -187,7 +191,11 @@ describe('Should deploy some accessLists before all other tests.', () => { for (let i = wallets.length; i < 4; i++) { const account = await (await provider.getSigner(i)).getAddress() expect( - (await checkAddressOnAccessList(accessListAddress, account, owner)) === false, + (await checkAddressOnAccessListWithSigner( + accessListAddress, + account, + owner + )) === false, `Address ${account} should not be part Access List ${accessListAddress}, therefore its not Authorized` ) } diff --git a/src/test/integration/compute.test.ts b/src/test/integration/compute.test.ts index bdfd844d6..0f9c66d06 100644 --- a/src/test/integration/compute.test.ts +++ b/src/test/integration/compute.test.ts @@ -58,7 +58,8 @@ import { buildEnvOverrideConfig, getMockSupportedNetworks, setupEnvironment, - tearDownEnvironment + tearDownEnvironment, + sleep } from '../utils/utils.js' import { ProviderFees, ProviderComputeInitializeResults } from '../../@types/Fees.js' @@ -83,8 +84,6 @@ import { C2DEngineDocker } from '../../components/c2d/compute_engine_docker.js' import { createHashForSignature, safeSign } from '../utils/signature.js' import { create256Hash } from '../../utils/crypt.js' -const sleep = (ms: number) => new Promise((resolve) => setTimeout(resolve, ms)) - /** * Polls getComputeEnvironments until every environment's resources (and free.resources) * have inUse === 0. Use with the same pattern as the compute tests: pass a callback that diff --git a/src/test/integration/persistentStorage.test.ts b/src/test/integration/persistentStorage.test.ts new file mode 100644 index 000000000..f8ddf72a0 --- /dev/null +++ b/src/test/integration/persistentStorage.test.ts @@ -0,0 +1,456 @@ +import { expect } from 'chai' +import fsp from 'fs/promises' +import os from 'os' +import path from 'path' +import { Readable } from 'stream' +import { getAddress, JsonRpcProvider, Signer } from 'ethers' + +import { Database } from '../../components/database/index.js' +import { + PersistentStorageCreateBucketHandler, + PersistentStorageDeleteFileHandler, + PersistentStorageGetBucketsHandler, + PersistentStorageListFilesHandler, + PersistentStorageUploadFileHandler +} from '../../components/core/handler/persistentStorage.js' +import { OceanNode } from '../../OceanNode.js' +import type { AccessList } from '../../@types/AccessList.js' +import { ENVIRONMENT_VARIABLES, PROTOCOL_COMMANDS } from '../../utils/constants.js' +import { getConfiguration } from '../../utils/config.js' +import { streamToObject } from '../../utils/util.js' +import { + DEFAULT_TEST_TIMEOUT, + OverrideEnvConfig, + TEST_ENV_CONFIG_FILE, + buildEnvOverrideConfig, + setupEnvironment, + tearDownEnvironment, + sleep +} from '../utils/utils.js' +import { createHashForSignature, safeSign } from '../utils/signature.js' + +import { BlockchainRegistry } from '../../components/BlockchainRegistry/index.js' +import { Blockchain } from '../../utils/blockchain.js' +import { RPCS, SupportedNetwork } from '../../@types/blockchain.js' +import { DEVELOPMENT_CHAIN_ID } from '../../utils/address.js' +import { deployAndGetAccessListConfig } from '../utils/contracts.js' +import { OceanNodeConfig } from '../../@types/OceanNode.js' +import { KeyManager } from '../../components/KeyManager/index.js' + +describe('Persistent storage handlers (integration)', function () { + this.timeout(DEFAULT_TEST_TIMEOUT) + + let previousConfiguration: OverrideEnvConfig[] + let config: OceanNodeConfig + let database: Database + let oceanNode: OceanNode + let consumer: Signer + let psRoot: string + + let provider: JsonRpcProvider + let blockchain: Blockchain + let owner: Signer + let wallets: Signer[] = [] + let forbiddenConsumer: Signer + let bucketAllowList: any + + before(async () => { + provider = new JsonRpcProvider('http://127.0.0.1:8545') + config = await getConfiguration() // Force reload the configuration + + wallets = [ + (await provider.getSigner(0)) as Signer, + (await provider.getSigner(1)) as Signer, + (await provider.getSigner(2)) as Signer, + (await provider.getSigner(3)) as Signer + ] + forbiddenConsumer = (await provider.getSigner(4)) as Signer + + const rpcs: RPCS = config.supportedNetworks + const chain: SupportedNetwork = rpcs[String(DEVELOPMENT_CHAIN_ID)] + const keyManager = new KeyManager(config) + const blockchains = new BlockchainRegistry(keyManager, config) + blockchain = blockchains.getBlockchain(chain.chainId) + + owner = await blockchain.getSigner() + + // ENVIRONMENT_VARIABLES.AUTHORIZED_PUBLISHERS_LIST + const accessListPublishers = await deployAndGetAccessListConfig( + owner, + provider, + wallets + ) + bucketAllowList = accessListPublishers + previousConfiguration = await setupEnvironment( + TEST_ENV_CONFIG_FILE, + buildEnvOverrideConfig( + [ENVIRONMENT_VARIABLES.PRIVATE_KEY], + ['0xc594c6e5def4bab63ac29eed19a134c130388f74f019bc74b8f4389df2837a58'] + ) + ) + + config = await getConfiguration(true) + psRoot = await fsp.mkdtemp(path.join(os.tmpdir(), 'ocean-ps-it-')) + config.persistentStorage = { + enabled: true, + type: 'localfs', + accessLists: [bucketAllowList], + options: { folder: psRoot } + } + + database = await Database.init(config.dbConfig) + oceanNode = await OceanNode.getInstance( + config, + database, + undefined, + undefined, + undefined, + undefined, + undefined, + true + ) + + consumer = (await provider.getSigner(1)) as Signer + }) + + after(async () => { + await tearDownEnvironment(previousConfiguration) + // await fsp.rm(psRoot, { recursive: true, force: true }) + }) + + it('create bucket → upload → list → delete (happy path)', async () => { + const consumerAddress = await consumer.getAddress() + let nonce = Date.now().toString() + let messageHashBytes = createHashForSignature( + consumerAddress, + nonce, + PROTOCOL_COMMANDS.PERSISTENT_STORAGE_CREATE_BUCKET + ) + let signature = await safeSign(consumer, messageHashBytes) + + const createRes = await new PersistentStorageCreateBucketHandler(oceanNode).handle({ + command: PROTOCOL_COMMANDS.PERSISTENT_STORAGE_CREATE_BUCKET, + consumerAddress, + signature, + nonce, + accessLists: [], + authorization: undefined + } as any) + + expect(createRes.status.httpStatus).to.equal(200) + expect(createRes.stream).to.be.instanceOf(Readable) + const created = await streamToObject(createRes.stream as Readable) + expect(created.bucketId).to.be.a('string') + expect(getAddress(created.owner)).to.equal(getAddress(consumerAddress)) + const bucketId = created.bucketId as string + + const fileName = 'hello.txt' + const body = Buffer.from('persistent-storage-it') + + nonce = Date.now().toString() + messageHashBytes = createHashForSignature( + consumerAddress, + nonce, + PROTOCOL_COMMANDS.PERSISTENT_STORAGE_UPLOAD_FILE + ) + signature = await safeSign(consumer, messageHashBytes) + const uploadRes = await new PersistentStorageUploadFileHandler(oceanNode).handle({ + command: PROTOCOL_COMMANDS.PERSISTENT_STORAGE_UPLOAD_FILE, + consumerAddress, + signature, + nonce, + bucketId, + fileName, + stream: Readable.from(body) + } as any) + expect(uploadRes.status.httpStatus).to.equal(200) + const uploaded = await streamToObject(uploadRes.stream as Readable) + expect(uploaded.name).to.equal(fileName) + expect(uploaded.size).to.equal(body.length) + await sleep(1000) + nonce = Date.now().toString() + messageHashBytes = createHashForSignature( + consumerAddress, + nonce, + PROTOCOL_COMMANDS.PERSISTENT_STORAGE_LIST_FILES + ) + signature = await safeSign(consumer, messageHashBytes) + const listRes = await new PersistentStorageListFilesHandler(oceanNode).handle({ + command: PROTOCOL_COMMANDS.PERSISTENT_STORAGE_LIST_FILES, + consumerAddress, + signature, + nonce, + bucketId, + authorization: undefined + } as any) + + expect(listRes.status.httpStatus).to.equal(200) + const listed = await streamToObject(listRes.stream as Readable) + expect(listed).to.be.an('array') + expect(listed.some((f: { name: string }) => f.name === fileName)).to.equal(true) + await sleep(1000) + nonce = Date.now().toString() + messageHashBytes = createHashForSignature( + consumerAddress, + nonce, + PROTOCOL_COMMANDS.PERSISTENT_STORAGE_DELETE_FILE + ) + signature = await safeSign(consumer, messageHashBytes) + const delRes = await new PersistentStorageDeleteFileHandler(oceanNode).handle({ + command: PROTOCOL_COMMANDS.PERSISTENT_STORAGE_DELETE_FILE, + consumerAddress, + signature, + nonce, + chainId: 8996, + bucketId, + fileName, + authorization: undefined + } as any) + + expect(delRes.status.httpStatus).to.equal(200) + await sleep(1000) + nonce = Date.now().toString() + messageHashBytes = createHashForSignature( + consumerAddress, + nonce, + PROTOCOL_COMMANDS.PERSISTENT_STORAGE_LIST_FILES + ) + signature = await safeSign(consumer, messageHashBytes) + const listAfterDel = await new PersistentStorageListFilesHandler(oceanNode).handle({ + command: PROTOCOL_COMMANDS.PERSISTENT_STORAGE_LIST_FILES, + consumerAddress, + signature, + nonce, + bucketId, + authorization: undefined + } as any) + expect(listAfterDel.status.httpStatus).to.equal(200) + const listedAfter = await streamToObject(listAfterDel.stream as Readable) + expect(listedAfter.some((f: { name: string }) => f.name === fileName)).to.equal(false) + }) + + it('should not create bucket when consumer is not on allow list', async () => { + const forbiddenConsumerAddress = await forbiddenConsumer.getAddress() + const nonce = Date.now().toString() + const messageHashBytes = createHashForSignature( + forbiddenConsumerAddress, + nonce, + PROTOCOL_COMMANDS.PERSISTENT_STORAGE_CREATE_BUCKET + ) + const signature = await safeSign(forbiddenConsumer, messageHashBytes) + + const res = await new PersistentStorageCreateBucketHandler(oceanNode).handle({ + command: PROTOCOL_COMMANDS.PERSISTENT_STORAGE_CREATE_BUCKET, + consumerAddress: forbiddenConsumerAddress, + signature, + nonce, + accessLists: [], + authorization: undefined + } as any) + + expect(res.status.httpStatus).to.equal(403) + expect(res.status.error).to.contain('not allowed') + }) + + it('should deny forbiddenConsumer for bucket operations when bucket has accessList', async () => { + // Create a bucket whose ACL allows only wallets[0..3] + const consumerAddress = await consumer.getAddress() + let nonce = Date.now().toString() + let messageHashBytes = createHashForSignature( + consumerAddress, + nonce, + PROTOCOL_COMMANDS.PERSISTENT_STORAGE_CREATE_BUCKET + ) + let signature = await safeSign(consumer, messageHashBytes) + + const createRes = await new PersistentStorageCreateBucketHandler(oceanNode).handle({ + command: PROTOCOL_COMMANDS.PERSISTENT_STORAGE_CREATE_BUCKET, + consumerAddress, + signature, + nonce, + accessLists: [bucketAllowList], + authorization: undefined + } as any) + + expect(createRes.status.httpStatus).to.equal(200) + const created = await streamToObject(createRes.stream as Readable) + const bucketId = created.bucketId as string + + // Forbidden consumer tries to list files -> should fail + const forbiddenConsumerAddress = await forbiddenConsumer.getAddress() + nonce = Date.now().toString() + messageHashBytes = createHashForSignature( + forbiddenConsumerAddress, + nonce, + PROTOCOL_COMMANDS.PERSISTENT_STORAGE_LIST_FILES + ) + signature = await safeSign(forbiddenConsumer, messageHashBytes) + const listRes = await new PersistentStorageListFilesHandler(oceanNode).handle({ + command: PROTOCOL_COMMANDS.PERSISTENT_STORAGE_LIST_FILES, + consumerAddress: forbiddenConsumerAddress, + signature, + nonce, + bucketId, + authorization: undefined + } as any) + expect(listRes.status.httpStatus).to.equal(403) + expect(listRes.status.error).to.contain('not allowed') + + // Forbidden consumer tries to upload -> should fail + nonce = Date.now().toString() + messageHashBytes = createHashForSignature( + forbiddenConsumerAddress, + nonce, + PROTOCOL_COMMANDS.PERSISTENT_STORAGE_UPLOAD_FILE + ) + signature = await safeSign(forbiddenConsumer, messageHashBytes) + const uploadRes = await new PersistentStorageUploadFileHandler(oceanNode).handle({ + command: PROTOCOL_COMMANDS.PERSISTENT_STORAGE_UPLOAD_FILE, + consumerAddress: forbiddenConsumerAddress, + signature, + nonce, + bucketId, + fileName: 'forbidden.txt', + stream: Readable.from(Buffer.from('nope')), + authorization: undefined + } as any) + expect(uploadRes.status.httpStatus).to.equal(403) + expect(uploadRes.status.error).to.contain('not allowed') + }) + + it('getBuckets returns buckets the consumer can access', async () => { + const consumerAddress = await consumer.getAddress() + await sleep(1000) + let nonce = Date.now() + let messageHashBytes = createHashForSignature( + consumerAddress, + nonce, + PROTOCOL_COMMANDS.PERSISTENT_STORAGE_GET_BUCKETS + ) + let signature = await safeSign(consumer, messageHashBytes) + const beforeCreate = await new PersistentStorageGetBucketsHandler(oceanNode).handle({ + command: PROTOCOL_COMMANDS.PERSISTENT_STORAGE_GET_BUCKETS, + consumerAddress, + signature, + nonce, + chainId: 8996, + owner: consumerAddress, + authorization: undefined + } as any) + expect(beforeCreate.status.httpStatus).to.equal(200) + const beforeList = await streamToObject(beforeCreate.stream as Readable) + expect(beforeList).to.be.an('array') + await sleep(1000) + nonce = Date.now() + messageHashBytes = createHashForSignature( + consumerAddress, + nonce, + PROTOCOL_COMMANDS.PERSISTENT_STORAGE_CREATE_BUCKET + ) + signature = await safeSign(consumer, messageHashBytes) + const createRes = await new PersistentStorageCreateBucketHandler(oceanNode).handle({ + command: PROTOCOL_COMMANDS.PERSISTENT_STORAGE_CREATE_BUCKET, + consumerAddress, + signature, + nonce, + accessLists: [], + authorization: undefined + } as any) + expect(createRes.status.httpStatus).to.equal(200) + const created = await streamToObject(createRes.stream as Readable) + const newBucketId = created.bucketId as string + await sleep(1000) + nonce = Date.now() + messageHashBytes = createHashForSignature( + consumerAddress, + nonce, + PROTOCOL_COMMANDS.PERSISTENT_STORAGE_GET_BUCKETS + ) + signature = await safeSign(consumer, messageHashBytes) + const afterCreate = await new PersistentStorageGetBucketsHandler(oceanNode).handle({ + command: PROTOCOL_COMMANDS.PERSISTENT_STORAGE_GET_BUCKETS, + consumerAddress, + signature, + nonce, + chainId: 8996, + owner: consumerAddress, + authorization: undefined + } as any) + expect(afterCreate.status.httpStatus).to.equal(200) + const afterList = await streamToObject(afterCreate.stream as Readable) + expect(afterList).to.be.an('array') + const found = afterList.find((b: { bucketId: string }) => b.bucketId === newBucketId) + expect(found).to.be.an('object') + expect(found.createdAt).to.be.a('number') + expect(getAddress(found.owner)).to.equal(getAddress(consumerAddress)) + expect(found.accessLists).to.be.an('array') + expect(afterList.length).to.be.at.least(beforeList.length + 1) + }) + + it('create bucket validate fails when accessLists is missing', async () => { + const consumerAddress = await consumer.getAddress() + await sleep(1000) + const nonce = Date.now().toString() + const messageHashBytes = createHashForSignature( + consumerAddress, + nonce, + PROTOCOL_COMMANDS.PERSISTENT_STORAGE_CREATE_BUCKET + ) + const signature = await safeSign(consumer, messageHashBytes) + const validation = await new PersistentStorageCreateBucketHandler(oceanNode).validate( + { + command: PROTOCOL_COMMANDS.PERSISTENT_STORAGE_CREATE_BUCKET, + consumerAddress, + signature, + nonce + } as any + ) + + expect(validation.valid).to.equal(false) + expect(validation.reason).to.contain('accessLists') + }) + + it('returns error when persistent storage is disabled', async () => { + const disabledConfig = { + ...config, + persistentStorage: { + enabled: false, + type: 'localfs' as const, + accessLists: [] as AccessList[], + options: { folder: psRoot } + } + } + const nodeDisabled = await OceanNode.getInstance( + disabledConfig, + database, + undefined, + undefined, + undefined, + undefined, + undefined, + true + ) + + const consumerAddress = await consumer.getAddress() + await sleep(1000) + const nonce = Date.now().toString() + const messageHashBytes = createHashForSignature( + consumerAddress, + nonce, + PROTOCOL_COMMANDS.PERSISTENT_STORAGE_CREATE_BUCKET + ) + const signature = await safeSign(consumer, messageHashBytes) + + const res = await new PersistentStorageCreateBucketHandler(nodeDisabled).handle({ + command: PROTOCOL_COMMANDS.PERSISTENT_STORAGE_CREATE_BUCKET, + consumerAddress, + signature, + nonce, + accessLists: [], + authorization: undefined + } as any) + + expect(res.status.httpStatus).to.equal(500) + expect(res.status.error).to.match(/not configured|disabled/i) + }) +}) diff --git a/src/test/utils/contracts.ts b/src/test/utils/contracts.ts index ba1c4112b..4dc7bbfa9 100644 --- a/src/test/utils/contracts.ts +++ b/src/test/utils/contracts.ts @@ -55,7 +55,6 @@ export async function deployAccessListContract( if (!nameAccessList || !symbolAccessList) { throw new Error(`Access list symbol and name are required`) } - const contract = getContract(contractFactoryAddress, contractFactoryAbi, signer) try { @@ -125,7 +124,12 @@ export async function deployAndGetAccessListConfig( await wallets[2].getAddress(), await wallets[3].getAddress() ], - ['https://oceanprotocol.com/nft/'] + [ + 'https://oceanprotocol.com/nft/', + 'https://oceanprotocol.com/nft/', + 'https://oceanprotocol.com/nft/', + 'https://oceanprotocol.com/nft/' + ] ) if (!txAddress) { diff --git a/src/test/utils/utils.ts b/src/test/utils/utils.ts index 8376d6ae1..b3bf0917b 100644 --- a/src/test/utils/utils.ts +++ b/src/test/utils/utils.ts @@ -4,7 +4,7 @@ import { fileURLToPath } from 'url' import { DB_TYPES, ENVIRONMENT_VARIABLES, EnvVariable } from '../../utils/constants.js' import { CONFIG_LOGGER } from '../../utils/logging/common.js' import { RPCS } from '../../@types/blockchain.js' -import { getConfiguration } from '../../utils/config.js' +import { getConfiguration } from '../../utils/config/builder.js' export const DEFAULT_TEST_TIMEOUT = 20000 // 20 secs MAX // __dirname and __filename are not defined in ES module scope @@ -164,3 +164,5 @@ export function isRunningContinousIntegrationEnv(): boolean { export const SELECTED_RUN_DATABASE = new Date().getTime() % 2 === 0 ? DB_TYPES.ELASTIC_SEARCH : DB_TYPES.TYPESENSE CONFIG_LOGGER.debug(`SELECTED_RUN_DATABASE: ${SELECTED_RUN_DATABASE}`) + +export const sleep = (ms: number) => new Promise((resolve) => setTimeout(resolve, ms)) diff --git a/src/utils/accessList.ts b/src/utils/accessList.ts index fbecbe462..21d7671f3 100644 --- a/src/utils/accessList.ts +++ b/src/utils/accessList.ts @@ -1,6 +1,8 @@ import AccessListJson from '@oceanprotocol/contracts/artifacts/contracts/accesslists/AccessList.sol/AccessList.json' with { type: 'json' } import { ethers, Signer } from 'ethers' import { CORE_LOGGER } from './logging/common.js' +import { AccessList } from '../@types/AccessList.js' +import { OceanNode } from '../OceanNode.js' /** * @param accessList the access list contract address @@ -9,7 +11,7 @@ import { CORE_LOGGER } from './logging/common.js' * @param signer signer for the contract part * @returns true if the account has balanceOf > 0 OR if the accessList is empty OR does not contain info for this chain, false otherwise */ -export async function checkAddressOnAccessList( +export async function checkAddressOnAccessListWithSigner( accessListContractAddress: string, addressToCheck: string, signer: Signer @@ -40,3 +42,49 @@ export async function checkAddressOnAccessList( return false } } + +export async function checkAddressOnAccessList( + consumerAddress: string, + access: AccessList[], + oceanNode: OceanNode +): Promise { + if (!access || access.length === 0) { + return false + } + const config = oceanNode.getConfig() + const { supportedNetworks } = config + for (const accessListMap of access) { + if (!accessListMap) continue + for (const chain of Object.keys(accessListMap)) { + const { chainId } = supportedNetworks[chain] + try { + const blockchain = oceanNode.getBlockchain(chainId) + if (!blockchain) { + CORE_LOGGER.logMessage( + `Blockchain instance not available for chain ${chainId}, skipping access list check`, + true + ) + continue + } + const signer = await blockchain.getSigner() + for (const accessListAddress of accessListMap[chain]) { + const hasAccess = await checkAddressOnAccessListWithSigner( + accessListAddress, + consumerAddress, + signer + ) + if (hasAccess) { + return true + } + } + } catch (error) { + CORE_LOGGER.logMessage( + `Failed to check access lists on chain ${chain}: ${error.message}`, + true + ) + } + } + } + + return false +} diff --git a/src/utils/address.ts b/src/utils/address.ts index 9760ed3b5..624438f28 100644 --- a/src/utils/address.ts +++ b/src/utils/address.ts @@ -1,7 +1,7 @@ import fs from 'fs' import addresses from '@oceanprotocol/contracts/addresses/address.json' with { type: 'json' } import { CORE_LOGGER } from './logging/common.js' -import { isDefined } from './index.js' +import { isDefined } from './util.js' /** * Get the artifacts address from the address.json file diff --git a/src/utils/config/builder.ts b/src/utils/config/builder.ts index 7a14e8cee..f55906e81 100644 --- a/src/utils/config/builder.ts +++ b/src/utils/config/builder.ts @@ -9,8 +9,7 @@ import { C2DClusterType } from '../../@types/C2D/C2D.js' import fs from 'fs' import os from 'os' import path from 'path' -// import { hexStringToByteArray, computeCodebaseHash } from '../index.js' -import { computeCodebaseHash } from '../index.js' +import { computeCodebaseHash } from '../attestation.js' import { getOceanArtifactsAdresses, diff --git a/src/utils/config/schemas.ts b/src/utils/config/schemas.ts index c60703ce2..cd23d5905 100644 --- a/src/utils/config/schemas.ts +++ b/src/utils/config/schemas.ts @@ -84,6 +84,64 @@ export const OceanNodeDBConfigSchema = z.object({ dbType: z.string().nullable() }) +export const PersistentStorageConfigSchema = z + .object({ + enabled: z.boolean().optional().default(false), + type: z.enum(['localfs', 's3']).optional().default('localfs'), + accessLists: jsonFromString(z.array(z.record(z.string(), z.array(z.string())))) + .optional() + .default([]), + options: z.any().optional() + }) + .superRefine((data, ctx) => { + if (!data.enabled) return + + if (data.type === 'localfs') { + if (!data.options || typeof data.options !== 'object') { + ctx.addIssue({ + code: z.ZodIssueCode.custom, + message: 'persistentStorage.options must be an object for localfs', + path: ['options'] + }) + return + } + if ( + typeof (data.options as any).folder !== 'string' || + !(data.options as any).folder + ) { + ctx.addIssue({ + code: z.ZodIssueCode.custom, + message: 'persistentStorage.options.folder is required for localfs', + path: ['options', 'folder'] + }) + } + } + + if (data.type === 's3') { + if (!data.options || typeof data.options !== 'object') { + ctx.addIssue({ + code: z.ZodIssueCode.custom, + message: 'persistentStorage.options must be an object for s3', + path: ['options'] + }) + return + } + const required = ['endpoint', 'objectKey', 'accessKeyId', 'secretAccessKey'] + for (const key of required) { + if ( + typeof (data.options as any)[key] !== 'string' || + !(data.options as any)[key] + ) { + ctx.addIssue({ + code: z.ZodIssueCode.custom, + message: `persistentStorage.options.${key} is required for s3`, + path: ['options', key] + }) + } + } + } + }) + export const DockerRegistryAuthSchema = z .object({ username: z.string().optional(), @@ -332,6 +390,7 @@ export const OceanNodeConfigSchema = z DB_PASSWORD: z.string().optional(), DB_TYPE: z.string().optional(), dbConfig: OceanNodeDBConfigSchema.optional(), + persistentStorage: PersistentStorageConfigSchema.optional(), FEE_AMOUNT: z.string().optional(), FEE_TOKENS: z.string().optional(), diff --git a/src/utils/constants.ts b/src/utils/constants.ts index 756e11b1e..63fb34a4c 100644 --- a/src/utils/constants.ts +++ b/src/utils/constants.ts @@ -38,7 +38,12 @@ export const PROTOCOL_COMMANDS = { FETCH_CONFIG: 'fetchConfig', PUSH_CONFIG: 'pushConfig', GET_LOGS: 'getLogs', - JOBS: 'jobs' + JOBS: 'jobs', + PERSISTENT_STORAGE_CREATE_BUCKET: 'persistentStorageCreateBucket', + PERSISTENT_STORAGE_GET_BUCKETS: 'persistentStorageGetBuckets', + PERSISTENT_STORAGE_LIST_FILES: 'persistentStorageListFiles', + PERSISTENT_STORAGE_UPLOAD_FILE: 'persistentStorageUploadFile', + PERSISTENT_STORAGE_DELETE_FILE: 'persistentStorageDeleteFile' } // more visible, keep then close to make sure we always update both export const SUPPORTED_PROTOCOL_COMMANDS: string[] = [ @@ -78,7 +83,12 @@ export const SUPPORTED_PROTOCOL_COMMANDS: string[] = [ PROTOCOL_COMMANDS.FETCH_CONFIG, PROTOCOL_COMMANDS.PUSH_CONFIG, PROTOCOL_COMMANDS.GET_LOGS, - PROTOCOL_COMMANDS.JOBS + PROTOCOL_COMMANDS.JOBS, + PROTOCOL_COMMANDS.PERSISTENT_STORAGE_CREATE_BUCKET, + PROTOCOL_COMMANDS.PERSISTENT_STORAGE_GET_BUCKETS, + PROTOCOL_COMMANDS.PERSISTENT_STORAGE_LIST_FILES, + PROTOCOL_COMMANDS.PERSISTENT_STORAGE_UPLOAD_FILE, + PROTOCOL_COMMANDS.PERSISTENT_STORAGE_DELETE_FILE ] export const MetadataStates = { diff --git a/src/utils/credentials.ts b/src/utils/credentials.ts index d54692b4f..b21c25f21 100644 --- a/src/utils/credentials.ts +++ b/src/utils/credentials.ts @@ -3,7 +3,7 @@ import { AccessListContract } from '../@types/OceanNode.js' import { CORE_LOGGER } from './logging/common.js' import { Credential, Credentials, MATCH_RULES } from '@oceanprotocol/ddo-js' import { CREDENTIALS_TYPES } from '../@types/DDO/Credentials.js' -import { checkAddressOnAccessList } from './accessList.js' +import { checkAddressOnAccessListWithSigner } from './accessList.js' import { isDefined } from './util.js' /** @@ -203,7 +203,7 @@ export async function checkSingleCredential( try { // Check if the consumer address has tokens in the access list contract - const hasAccess = await checkAddressOnAccessList( + const hasAccess = await checkAddressOnAccessListWithSigner( accessListCredential.accessList, consumerAddress, signer @@ -252,7 +252,7 @@ export async function checkCredentialOnAccessList( if (chainsListed.length > 0 && chainsListed.includes(chainId)) { let isAuthorized = false for (const accessListAddress of accessList[chainId]) { - const result = await checkAddressOnAccessList( + const result = await checkAddressOnAccessListWithSigner( accessListAddress, addressToCheck, signer