From f81d3483f1d494f8c8f6695f50a314fc636d4543 Mon Sep 17 00:00:00 2001 From: Vincent Weevers Date: Fri, 27 Dec 2024 14:41:03 +0100 Subject: [PATCH] Implement explicit snapshots (#93) See https://github.com/Level/community/issues/118. TLDR: ```js await db.put('example', 'before') const snapshot = db.snapshot() await db.put('example', 'after') await db.get('example', { snapshot })) // Returns 'before' await snapshot.close() ``` Category: addition --- README.md | 59 ++-- abstract-iterator.js | 12 + abstract-level.js | 80 +++++- abstract-snapshot.js | 78 ++++++ index.d.ts | 4 + index.js | 1 + lib/abstract-sublevel.js | 4 + package.json | 3 +- test/async-iterator-test.js | 2 +- test/index.js | 6 +- test/iterator-explicit-snapshot-test.js | 356 ++++++++++++++++++++++++ test/iterator-seek-test.js | 2 +- test/traits/closed.js | 4 +- test/traits/open.js | 12 +- test/util.js | 40 ++- types/abstract-level.d.ts | 12 + types/abstract-snapshot.d.ts | 23 ++ 17 files changed, 653 insertions(+), 45 deletions(-) create mode 100644 abstract-snapshot.js create mode 100644 test/iterator-explicit-snapshot-test.js create mode 100644 types/abstract-snapshot.d.ts diff --git a/README.md b/README.md index f4739e8..e563c4f 100644 --- a/README.md +++ b/README.md @@ -127,7 +127,7 @@ Get a value from the database by `key`. The optional `options` object may contai - `keyEncoding`: custom key encoding for this operation, used to encode the `key`. - `valueEncoding`: custom value encoding for this operation, used to decode the value. -- `snapshot`: explicit [snapshot](#snapshot--dbsnapshot) to read from. If no `snapshot` is provided and `db.supports.implicitSnapshots` is true, the database will create its own internal snapshot for this operation. +- `snapshot`: explicit [snapshot](#snapshot--dbsnapshotoptions) to read from. If no `snapshot` is provided and `db.supports.implicitSnapshots` is true, the database will create its own internal snapshot for this operation. Returns a promise for the value. If the `key` was not found then the value will be `undefined`. @@ -137,7 +137,7 @@ Get multiple values from the database by an array of `keys`. The optional `optio - `keyEncoding`: custom key encoding for this operation, used to encode the `keys`. - `valueEncoding`: custom value encoding for this operation, used to decode values. -- `snapshot`: explicit [snapshot](#snapshot--dbsnapshot) to read from. If no `snapshot` is provided and `db.supports.implicitSnapshots` is true, the database will create its own internal snapshot for this operation. +- `snapshot`: explicit [snapshot](#snapshot--dbsnapshotoptions) to read from. If no `snapshot` is provided and `db.supports.implicitSnapshots` is true, the database will create its own internal snapshot for this operation. Returns a promise for an array of values with the same order as `keys`. If a key was not found, the relevant value will be `undefined`. @@ -233,7 +233,7 @@ The `gte` and `lte` range options take precedence over `gt` and `lt` respectivel - `keyEncoding`: custom key encoding for this iterator, used to encode range options, to encode `seek()` targets and to decode keys. - `valueEncoding`: custom value encoding for this iterator, used to decode values. - `signal`: an [`AbortSignal`](https://developer.mozilla.org/en-US/docs/Web/API/AbortSignal) to [abort read operations on the iterator](#aborting-iterators). -- `snapshot`: explicit [snapshot](#snapshot--dbsnapshot) for the iterator to read from. If no `snapshot` is provided and `db.supports.implicitSnapshots` is true, the database will create its own internal snapshot before returning an iterator. +- `snapshot`: explicit [snapshot](#snapshot--dbsnapshotoptions) for the iterator to read from. If no `snapshot` is provided and `db.supports.implicitSnapshots` is true, the database will create its own internal snapshot before returning an iterator. Lastly, an implementation is free to add its own options. @@ -276,7 +276,7 @@ Delete all entries or a range. Not guaranteed to be atomic. Returns a promise. A - `reverse` (boolean, default: `false`): delete entries in reverse order. Only effective in combination with `limit`, to delete the last N entries. - `limit` (number, default: `Infinity`): limit the number of entries to be deleted. This number represents a _maximum_ number of entries and will not be reached if the end of the range is reached first. A value of `Infinity` or `-1` means there is no limit. When `reverse` is true the entries with the highest keys will be deleted instead of the lowest keys. - `keyEncoding`: custom key encoding for this operation, used to encode range options. -- `snapshot`: explicit [snapshot](#snapshot--dbsnapshot) to read from, such that entries not present in the snapshot will not be deleted. If no `snapshot` is provided and `db.supports.implicitSnapshots` is true, the database may create its own internal snapshot but (unlike on other methods) this is currently not a hard requirement for implementations. +- `snapshot`: explicit [snapshot](#snapshot--dbsnapshotoptions) to read from, such that entries not present in the snapshot will not be deleted. If no `snapshot` is provided and `db.supports.implicitSnapshots` is true, the database may create its own internal snapshot but (unlike on other methods) this is currently not a hard requirement for implementations. The `gte` and `lte` range options take precedence over `gt` and `lt` respectively. If no options are provided, all entries will be deleted. @@ -383,11 +383,15 @@ console.log(nested.prefixKey('a', 'utf8')) // '!example!!nested!a' console.log(nested.prefixKey('a', 'utf8', true)) // '!nested!a' ``` -### `snapshot = db.snapshot()` +### `snapshot = db.snapshot(options)` -**This is an experimental API and not widely supported at the time of writing ([Level/community#118](https://github.com/Level/community/issues/118)).** +**This is an experimental API ([Level/community#118](https://github.com/Level/community/issues/118)).** -Create an explicit [snapshot](#snapshot). Throws a [`LEVEL_NOT_SUPPORTED`](#level_not_supported) error if `db.supports.explicitSnapshots` is not true. For details, see [Reading From Snapshots](#reading-from-snapshots). +Create an explicit [snapshot](#snapshot). Throws a [`LEVEL_NOT_SUPPORTED`](#level_not_supported) error if `db.supports.explicitSnapshots` is false. For details, see [Reading From Snapshots](#reading-from-snapshots). + +There are currently no options but specific implementations may add their own. + +Don't forget to call `snapshot.close()` when done. ### `db.supports` @@ -705,15 +709,21 @@ console.log(foo.path(true)) // ['example', 'nested', 'foo'] ### `snapshot` -#### `snapshot.close()` +#### `snapshot.ref()` -Free up underlying resources. Be sure to call this when the snapshot is no longer needed, because snapshots may cause the database to temporarily pause internal storage optimizations. Returns a promise. Closing the snapshot is an idempotent operation, such that calling `snapshot.close()` more than once is allowed and makes no difference. +Increment reference count, to register work that should delay closing until `snapshot.unref()` is called an equal amount of times. The promise that will be returned by `snapshot.close()` will not resolve until the reference count returns to 0. This prevents prematurely closing underlying resources while the snapshot is in use. + +It is normally not necessary to call `snapshot.ref()` and `snapshot.unref()` because builtin database methods automatically do. -After `snapshot.close()` has been called, no further operations are allowed. For example, `db.get(key, { snapshot })` will yield an error with code [`LEVEL_SNAPSHOT_NOT_OPEN`](#level_snapshot_not_open). Any unclosed iterators (that use this snapshot) will be closed by `snapshot.close()` and can then no longer be used. +#### `snapshot.unref()` -#### `snapshot.db` +Decrement reference count, to indicate that the work has finished. + +#### `snapshot.close()` + +Free up underlying resources. Be sure to call this when the snapshot is no longer needed, because snapshots may cause the database to temporarily pause internal storage optimizations. Returns a promise. Closing the snapshot is an idempotent operation, such that calling `snapshot.close()` more than once is allowed and makes no difference. -A reference to the database that created this snapshot. +After `snapshot.close()` has been called, no further operations are allowed. For example, `db.get(key, { snapshot })` will throw an error with code [`LEVEL_SNAPSHOT_NOT_OPEN`](#level_snapshot_not_open). ### Encodings @@ -950,10 +960,10 @@ Removing this concern (if necessary) must be done on an application-level. For e ### Reading From Snapshots -A snapshot is a lightweight "token" that represents the version of a database at a particular point in time. This allows for reading data without seeing subsequent writes made on the database. It comes in two forms: +A snapshot is a lightweight "token" that represents a version of a database at a particular point in time. This allows for reading data without seeing subsequent writes made on the database. It comes in two forms: 1. Implicit snapshots: created internally by the database and not visible to the outside world. -2. Explicit snapshots: created with `snapshot = db.snapshot()`. Because it acts as a token, `snapshot` has no methods of its own besides `snapshot.close()`. Instead the snapshot is to be passed to database (or [sublevel](#sublevel)) methods like `db.iterator()`. +2. Explicit snapshots: created with `snapshot = db.snapshot()`. Because it acts as a token, `snapshot` has no read methods of its own. Instead the snapshot is to be passed to database methods like `db.get()` and `db.iterator()`. This also works on sublevels. Use explicit snapshots wisely, because their lifetime must be managed manually. Implicit snapshots are typically more convenient and possibly more performant because they can handled natively and have their lifetime limited by the surrounding operation. That said, explicit snapshots can be useful to make multiple read operations that require a shared, consistent view of the data. @@ -1004,6 +1014,7 @@ await db.put('example', 1) const snapshot = db.snapshot() db.put('example', 2) await db.get('example', { snapshot })) // Yields 1 (always) +await snapshot.close() ``` The main use case for explicit snapshots is retrieving data from an index. @@ -1625,19 +1636,19 @@ class ExampleSublevel extends AbstractSublevel { } ``` -### `snapshot = db._snapshot()` +### `snapshot = db._snapshot(options)` + +Create a snapshot. The `options` argument is guaranteed to be an object. There are currently no options but implementations may add their own. -The default `_snapshot()` throws a [`LEVEL_NOT_SUPPORTED`](#errors) error. To implement this method, extend `AbstractSnapshot`, return an instance of this class in an overridden `_snapshot()` method and set `manifest.explicitSnapshots` to `true`: +The default `_snapshot()` throws a [`LEVEL_NOT_SUPPORTED`](#level_not_supported) error. To implement this method, extend `AbstractSnapshot`, return an instance of this class in an overridden `_snapshot()` method and set `manifest.explicitSnapshots` to `true`: ```js const { AbstractSnapshot } = require('abstract-level') class ExampleSnapshot extends AbstractSnapshot { - constructor (db) { - super(db) + constructor (options) { + super(options) } - - // .. } class ExampleLevel extends AbstractLevel { @@ -1650,8 +1661,8 @@ class ExampleLevel extends AbstractLevel { super(manifest, options) } - _snapshot () { - return new ExampleSnapshot(this) + _snapshot (options) { + return new ExampleSnapshot(options) } } ``` @@ -1762,11 +1773,11 @@ The default `_close()` returns a resolved promise. Overriding is optional. ### `snapshot = new AbstractSnapshot(db)` -The first argument to this constructor must be an instance of the relevant `AbstractLevel` implementation. The constructor will set `snapshot.db` which ensures that `db` will not be garbage collected in case there are no other references to it. +The first argument to this constructor must be an instance of the relevant `AbstractLevel` implementation. #### `snapshot._close()` -Free up underlying resources. This method is guaranteed to only be called once. Must return a promise. +Free up underlying resources. This method is guaranteed to only be called once and will not be called while read operations like `db._get()` are inflight. Must return a promise. The default `_close()` returns a resolved promise. Overriding is optional. diff --git a/abstract-iterator.js b/abstract-iterator.js index 2eca323..fcac462 100644 --- a/abstract-iterator.js +++ b/abstract-iterator.js @@ -18,6 +18,7 @@ const kValues = Symbol('values') const kLimit = Symbol('limit') const kCount = Symbol('count') const kEnded = Symbol('ended') +const kSnapshot = Symbol('snapshot') // This class is an internal utility for common functionality between AbstractIterator, // AbstractKeyIterator and AbstractValueIterator. It's not exported. @@ -40,6 +41,7 @@ class CommonIterator { this[kLimit] = Number.isInteger(options.limit) && options.limit >= 0 ? options.limit : Infinity this[kCount] = 0 this[kSignal] = options.signal != null ? options.signal : null + this[kSnapshot] = options.snapshot != null ? options.snapshot : null // Ending means reaching the natural end of the data and (unlike closing) that can // be reset by seek(), unless the limit was reached. @@ -363,6 +365,11 @@ const startWork = function (iterator) { } iterator[kWorking] = true + + // Keep snapshot open during operation + if (iterator[kSnapshot] !== null) { + iterator[kSnapshot].ref() + } } const endWork = function (iterator) { @@ -371,6 +378,11 @@ const endWork = function (iterator) { if (iterator[kPendingClose] !== null) { iterator[kPendingClose]() } + + // Release snapshot + if (iterator[kSnapshot] !== null) { + iterator[kSnapshot].unref() + } } const privateClose = async function (iterator) { diff --git a/abstract-level.js b/abstract-level.js index 8703e5c..f219440 100644 --- a/abstract-level.js +++ b/abstract-level.js @@ -51,12 +51,16 @@ class AbstractLevel extends EventEmitter { this[kStatusChange] = null this[kStatusLocked] = false + // Aliased for backwards compatibility + const implicitSnapshots = manifest.snapshots !== false && + manifest.implicitSnapshots !== false + this.hooks = new DatabaseHooks() this.supports = supports(manifest, { deferredOpen: true, // TODO (next major): add seek - snapshots: manifest.snapshots !== false, + implicitSnapshots, permanence: manifest.permanence !== false, encodings: manifest.encodings || {}, @@ -107,6 +111,9 @@ class AbstractLevel extends EventEmitter { }), keyFormat: Object.freeze({ keyEncoding: this[kKeyEncoding].format + }), + owner: Object.freeze({ + owner: this }) } @@ -323,6 +330,7 @@ class AbstractLevel extends EventEmitter { const err = this._checkKey(key) if (err) throw err + const snapshot = options.snapshot != null ? options.snapshot : null const keyEncoding = this.keyEncoding(options.keyEncoding) const valueEncoding = this.valueEncoding(options.valueEncoding) const keyFormat = keyEncoding.format @@ -335,7 +343,23 @@ class AbstractLevel extends EventEmitter { } const encodedKey = keyEncoding.encode(key) - const value = await this._get(this.prefixKey(encodedKey, keyFormat, true), options) + const mappedKey = this.prefixKey(encodedKey, keyFormat, true) + + // Keep snapshot open during operation + if (snapshot !== null) { + snapshot.ref() + } + + let value + + try { + value = await this._get(mappedKey, options) + } finally { + // Release snapshot + if (snapshot !== null) { + snapshot.unref() + } + } try { return value === undefined ? value : valueEncoding.decode(value) @@ -368,6 +392,7 @@ class AbstractLevel extends EventEmitter { return [] } + const snapshot = options.snapshot != null ? options.snapshot : null const keyEncoding = this.keyEncoding(options.keyEncoding) const valueEncoding = this.valueEncoding(options.valueEncoding) const keyFormat = keyEncoding.format @@ -388,7 +413,21 @@ class AbstractLevel extends EventEmitter { mappedKeys[i] = this.prefixKey(keyEncoding.encode(key), keyFormat, true) } - const values = await this._getMany(mappedKeys, options) + // Keep snapshot open during operation + if (snapshot !== null) { + snapshot.ref() + } + + let values + + try { + values = await this._getMany(mappedKeys, options) + } finally { + // Release snapshot + if (snapshot !== null) { + snapshot.unref() + } + } try { for (let i = 0; i < values.length; i++) { @@ -716,12 +755,26 @@ class AbstractLevel extends EventEmitter { const original = options const keyEncoding = this.keyEncoding(options.keyEncoding) + const snapshot = options.snapshot != null ? options.snapshot : null options = rangeOptions(options, keyEncoding) options.keyEncoding = keyEncoding.format if (options.limit !== 0) { - await this._clear(options) + // Keep snapshot open during operation + if (snapshot !== null) { + snapshot.ref() + } + + try { + await this._clear(options) + } finally { + // Release snapshot + if (snapshot !== null) { + snapshot.unref() + } + } + this.emit('clear', original) } } @@ -809,6 +862,25 @@ class AbstractLevel extends EventEmitter { return new DefaultValueIterator(this, options) } + snapshot (options) { + assertOpen(this) + + // Owner is an undocumented option explained in AbstractSnapshot + if (typeof options !== 'object' || options === null) { + options = this[kDefaultOptions].owner + } else if (options.owner == null) { + options = { ...options, owner: this } + } + + return this._snapshot(options) + } + + _snapshot (options) { + throw new ModuleError('Database does not support explicit snapshots', { + code: 'LEVEL_NOT_SUPPORTED' + }) + } + defer (fn, options) { if (typeof fn !== 'function') { throw new TypeError('The first argument must be a function') diff --git a/abstract-snapshot.js b/abstract-snapshot.js new file mode 100644 index 0000000..2ca83bc --- /dev/null +++ b/abstract-snapshot.js @@ -0,0 +1,78 @@ +'use strict' + +const ModuleError = require('module-error') +const { noop } = require('./lib/common') + +class AbstractSnapshot { + #open = true + #referenceCount = 0 + #pendingClose = null + #closePromise = null + #owner + + constructor (options) { + // Defining this as an option gives sublevels the opportunity to create a snapshot + // via their parent database but still designate themselves as the "owner", which + // just means which database will close the snapshot upon db.close(). This ensures + // that the API of AbstractSublevel is symmetrical to AbstractLevel. + const owner = options.owner + + if (typeof owner !== 'object' || owner === null) { + const hint = owner === null ? 'null' : typeof owner + throw new TypeError(`Owner must be an abstract-level database, received ${hint}`) + } + + // Also ensures this db will not be garbage collected + this.#owner = owner + this.#owner.attachResource(this) + } + + ref () { + if (!this.#open) { + throw new ModuleError('Snapshot is not open: cannot use snapshot after close()', { + code: 'LEVEL_SNAPSHOT_NOT_OPEN' + }) + } + + this.#referenceCount++ + } + + unref () { + if (--this.#referenceCount === 0 && this.#pendingClose !== null) { + this.#pendingClose() + } + } + + async close () { + if (this.#closePromise !== null) { + // First caller of close() is responsible for error + return this.#closePromise.catch(noop) + } + + this.#open = false + + // Wrap to avoid race issues on recursive calls + this.#closePromise = new Promise((resolve, reject) => { + this.#pendingClose = () => { + this.#pendingClose = null + privateClose(this, this.#owner).then(resolve, reject) + } + }) + + // If working we'll delay closing, but still handle the close error (if any) here + if (this.#referenceCount === 0) { + this.#pendingClose() + } + + return this.#closePromise + } + + async _close () {} +} + +const privateClose = async function (snapshot, owner) { + await snapshot._close() + owner.detachResource(snapshot) +} + +exports.AbstractSnapshot = AbstractSnapshot diff --git a/index.d.ts b/index.d.ts index 4277657..1b308fb 100644 --- a/index.d.ts +++ b/index.d.ts @@ -38,4 +38,8 @@ export { AbstractSublevelOptions } from './types/abstract-sublevel' +export { + AbstractSnapshot +} from './types/abstract-snapshot' + export * as Transcoder from 'level-transcoder' diff --git a/index.js b/index.js index cf5dee4..753269b 100644 --- a/index.js +++ b/index.js @@ -6,3 +6,4 @@ exports.AbstractIterator = require('./abstract-iterator').AbstractIterator exports.AbstractKeyIterator = require('./abstract-iterator').AbstractKeyIterator exports.AbstractValueIterator = require('./abstract-iterator').AbstractValueIterator exports.AbstractChainedBatch = require('./abstract-chained-batch').AbstractChainedBatch +exports.AbstractSnapshot = require('./abstract-snapshot').AbstractSnapshot diff --git a/lib/abstract-sublevel.js b/lib/abstract-sublevel.js index fa75014..4b40893 100644 --- a/lib/abstract-sublevel.js +++ b/lib/abstract-sublevel.js @@ -182,6 +182,10 @@ module.exports = function ({ AbstractLevel }) { const iterator = this[kRoot].values(options) return new AbstractSublevelValueIterator(this, options, iterator) } + + _snapshot (options) { + return this[kRoot].snapshot(options) + } } return { AbstractSublevel } diff --git a/package.json b/package.json index bf070b8..e76d4e3 100644 --- a/package.json +++ b/package.json @@ -16,6 +16,7 @@ "abstract-chained-batch.js", "abstract-iterator.js", "abstract-level.js", + "abstract-snapshot.js", "index.js", "index.d.ts", "lib", @@ -27,7 +28,7 @@ "dependencies": { "buffer": "^6.0.3", "is-buffer": "^2.0.5", - "level-supports": "^6.0.0", + "level-supports": "^6.1.1", "level-transcoder": "^1.0.1", "maybe-combine-errors": "^1.0.0", "module-error": "^1.0.1" diff --git a/test/async-iterator-test.js b/test/async-iterator-test.js index 57bce72..66111b9 100644 --- a/test/async-iterator-test.js +++ b/test/async-iterator-test.js @@ -53,7 +53,7 @@ exports.asyncIterator = function (test, testCommon) { await db.close() }) - testCommon.supports.snapshots && test(`for await...of ${mode}() (deferred, with snapshot)`, async function (t) { + testCommon.supports.implicitSnapshots && test(`for await...of ${mode}() (deferred, with snapshot)`, async function (t) { t.plan(2) const db = testCommon.factory() diff --git a/test/index.js b/test/index.js index 9f6ac2b..f3256a2 100644 --- a/test/index.js +++ b/test/index.js @@ -43,12 +43,16 @@ function suite (options) { require('./iterator-seek-test').all(test, testCommon) } - if (testCommon.supports.snapshots) { + if (testCommon.supports.implicitSnapshots) { require('./iterator-snapshot-test').all(test, testCommon) } else { require('./iterator-no-snapshot-test').all(test, testCommon) } + if (testCommon.supports.explicitSnapshots) { + require('./iterator-explicit-snapshot-test').all(test, testCommon) + } + require('./clear-test').all(test, testCommon) require('./clear-range-test').all(test, testCommon) require('./sublevel-test').all(test, testCommon) diff --git a/test/iterator-explicit-snapshot-test.js b/test/iterator-explicit-snapshot-test.js new file mode 100644 index 0000000..d0f14f7 --- /dev/null +++ b/test/iterator-explicit-snapshot-test.js @@ -0,0 +1,356 @@ +'use strict' + +const traits = require('./traits') + +exports.traits = function (test, testCommon) { + // TODO: document (or fix...) that deferred open is not supported + traits.open('snapshot()', testCommon, { deferred: false }, async function (t, db) { + const snapshot = db.snapshot() + return snapshot.close() + }) + + traits.closed('snapshot()', testCommon, async function (t, db) { + db.snapshot() + }) +} + +exports.get = function (test, testCommon) { + const { testFresh, testClose } = testFactory(test, testCommon) + + testFresh('get() changed entry from snapshot', async function (t, db) { + t.plan(3) + + await db.put('abc', 'before') + const snapshot = db.snapshot() + await db.put('abc', 'after') + + t.is(await db.get('abc'), 'after') + t.is(await db.get('abc', { snapshot }), 'before') + t.is(await db.get('other', { snapshot }), undefined) + + return snapshot.close() + }) + + testFresh('get() deleted entry from snapshot', async function (t, db) { + t.plan(3) + + await db.put('abc', 'before') + const snapshot = db.snapshot() + await db.del('abc') + + t.is(await db.get('abc'), undefined) + t.is(await db.get('abc', { snapshot }), 'before') + t.is(await db.get('other', { snapshot }), undefined) + + return snapshot.close() + }) + + testFresh('get() non-existent entry from snapshot', async function (t, db) { + t.plan(2) + + const snapshot = db.snapshot() + await db.put('abc', 'after') + + t.is(await db.get('abc'), 'after') + t.is(await db.get('abc', { snapshot }), undefined) + + return snapshot.close() + }) + + testFresh('get() entries from multiple snapshots', async function (t, db) { + const snapshots = [] + const iterations = 100 + + t.plan(iterations) + + for (let i = 0; i < iterations; i++) { + await db.put('number', i.toString()) + snapshots.push(db.snapshot()) + } + + for (let i = 0; i < iterations; i++) { + const snapshot = snapshots[i] + const value = i.toString() + + t.is(await db.get('number', { snapshot }), value) + } + + return Promise.all(snapshots.map(x => x.close())) + }) + + testFresh('get() entries from snapshot after closing another', async function (t, db) { + await db.put('abc', 'before') + + const snapshot1 = db.snapshot() + const snapshot2 = db.snapshot() + + await db.put('abc', 'after') + await snapshot1.close() + + // Closing one snapshot should not affect the other + t.is(await db.get('abc', { snapshot: snapshot2 }), 'before') + + return snapshot2.close() + }) + + testClose('get()', async function (db, snapshot) { + return db.get('xyz', { snapshot }) + }) +} + +exports.getMany = function (test, testCommon) { + const { testFresh, testClose } = testFactory(test, testCommon) + + testFresh('getMany() entries from snapshot', async function (t, db) { + t.plan(3) + + await db.put('a', '1') + await db.put('b', '2') + await db.put('c', '3') + + const snapshot = db.snapshot() + + await db.put('a', 'abc') + await db.del('b') + await db.put('c', 'xyz') + + t.same(await db.getMany(['a', 'b', 'c']), ['abc', undefined, 'xyz']) + t.same(await db.getMany(['a', 'b', 'c'], { snapshot }), ['1', '2', '3']) + t.same(await db.getMany(['a', 'b', 'c']), ['abc', undefined, 'xyz'], 'no side effects') + + return snapshot.close() + }) + + testClose('getMany()', async function (db, snapshot) { + return db.getMany(['xyz'], { snapshot }) + }) +} + +exports.iterator = function (test, testCommon) { + const { testFresh, testClose } = testFactory(test, testCommon) + + testFresh('iterator(), keys(), values() with snapshot', async function (t, db) { + t.plan(10) + + await db.put('a', '1') + await db.put('b', '2') + await db.put('c', '3') + + const snapshot = db.snapshot() + + await db.put('a', 'after') + await db.del('b') + await db.put('c', 'after') + await db.put('d', 'after') + + t.same( + await db.iterator().all(), + [['a', 'after'], ['c', 'after'], ['d', 'after']], + 'data was written' + ) + + for (const fn of [all, nextv, next]) { + t.same(await fn(db.iterator({ snapshot })), [['a', '1'], ['b', '2'], ['c', '3']], 'iterator') + t.same(await fn(db.keys({ snapshot })), ['a', 'b', 'c'], 'keys') + t.same(await fn(db.values({ snapshot })), ['1', '2', '3'], 'values') + } + + async function all (iterator) { + return iterator.all() + } + + async function nextv (iterator) { + try { + return iterator.nextv(10) + } finally { + await iterator.close() + } + } + + async function next (iterator) { + try { + const entries = [] + let entry + + while ((entry = await iterator.next()) !== undefined) { + entries.push(entry) + } + + return entries + } finally { + await iterator.close() + } + } + + return snapshot.close() + }) + + // Test that every iterator type and read method checks snapshot state + for (const type of ['iterator', 'keys', 'values']) { + testClose(`${type}().all()`, async function (db, snapshot) { + return db[type]({ snapshot }).all() + }) + + testClose(`${type}().next()`, async function (db, snapshot) { + const iterator = db[type]({ snapshot }) + + try { + await iterator.next() + } finally { + iterator.close() + } + }) + + testClose(`${type}().nextv()`, async function (db, snapshot) { + const iterator = db[type]({ snapshot }) + + try { + await iterator.nextv(10) + } finally { + iterator.close() + } + }) + } +} + +exports.clear = function (test, testCommon) { + const { testFresh, testClose } = testFactory(test, testCommon) + + testFresh('clear() entries from snapshot', async function (t, db) { + t.plan(2) + + await db.put('a', 'xyz') + const snapshot = db.snapshot() + + await db.put('b', 'xyz') + await db.clear({ snapshot }) + + t.same(await db.keys().all(), ['b']) + t.same(await db.keys({ snapshot }).all(), ['a']) + + return snapshot.close() + }) + + testFresh('clear() entries from empty snapshot', async function (t, db) { + t.plan(2) + + const snapshot = db.snapshot() + + await db.put('a', 'xyz') + await db.clear({ snapshot }) + + t.same(await db.keys().all(), ['a']) + t.same(await db.keys({ snapshot }).all(), []) + + return snapshot.close() + }) + + testClose('clear()', async function (db, snapshot) { + return db.clear({ snapshot }) + }) +} + +exports.cleanup = function (test, testCommon) { + test('snapshot is closed on database close', async function (t) { + t.plan(1) + + const db = testCommon.factory() + await db.open() + const snapshot = db.snapshot() + const promise = db.close() + + try { + snapshot.ref() + } catch (err) { + t.is(err.code, 'LEVEL_SNAPSHOT_NOT_OPEN') + } + + return promise + }) + + test('snapshot is closed along with iterator', async function (t) { + t.plan(2) + + const db = testCommon.factory() + await db.open() + await db.put('beep', 'boop') + + // These resources have a potentially tricky relationship. If all is well, + // db.close() calls both snapshot.close() and iterator.close() in parallel, + // and snapshot.close() and iterator.close() wait on the read. Crucially, + // closing the snapshot only waits for individual operations on the iterator + // rather than for the entire iterator to be closed (which may never happen). + const snapshot = db.snapshot() + const iterator = db.iterator({ snapshot }) + const readPromise = iterator.all() + const closePromise = db.close() + + try { + snapshot.ref() + } catch (err) { + t.is(err.code, 'LEVEL_SNAPSHOT_NOT_OPEN', 'snapshot is closing') + } + + try { + await iterator.next() + } catch (err) { + // Effectively also asserts that the LEVEL_ITERATOR_NOT_OPEN error takes + // precedence over LEVEL_SNAPSHOT_NOT_OPEN. + t.is(err.code, 'LEVEL_ITERATOR_NOT_OPEN', 'iterator is closing') + } + + return Promise.all([readPromise, closePromise]) + }) +} + +exports.all = function (test, testCommon) { + exports.traits(test, testCommon) + exports.get(test, testCommon) + exports.getMany(test, testCommon) + exports.iterator(test, testCommon) + exports.clear(test, testCommon) + exports.cleanup(test, testCommon) +} + +function testFactory (test, testCommon) { + const testFresh = function (name, run) { + test(name, async function (t) { + const db = testCommon.factory() + await db.open() + await run(t, db) + return db.close() + }) + } + + const testClose = function (name, run) { + testFresh(`${name} after closing snapshot`, async function (t, db) { + t.plan(1) + + const snapshot = db.snapshot() + await snapshot.close() + + try { + await run(db, snapshot) + } catch (err) { + t.is(err.code, 'LEVEL_SNAPSHOT_NOT_OPEN') + } + }) + + testFresh(`${name} while closing snapshot`, async function (t, db) { + t.plan(1) + + const snapshot = db.snapshot() + const promise = snapshot.close() + + try { + await run(db, snapshot) + } catch (err) { + t.is(err.code, 'LEVEL_SNAPSHOT_NOT_OPEN') + } + + return promise + }) + } + + return { testFresh, testClose } +} diff --git a/test/iterator-seek-test.js b/test/iterator-seek-test.js index a7cc93d..2ee3b4b 100644 --- a/test/iterator-seek-test.js +++ b/test/iterator-seek-test.js @@ -170,7 +170,7 @@ exports.seek = function (test, testCommon) { return db.close() }) - if (testCommon.supports.snapshots) { + if (testCommon.supports.implicitSnapshots) { for (const reverse of [false, true]) { for (const deferred of [false, true]) { test(`${mode}().seek() respects snapshot (reverse: ${reverse}, deferred: ${deferred})`, async function (t) { diff --git a/test/traits/closed.js b/test/traits/closed.js index 9640153..a2fb7db 100644 --- a/test/traits/closed.js +++ b/test/traits/closed.js @@ -18,7 +18,7 @@ module.exports = function (name, testCommon, run) { error = err } - t.is(error.code, 'LEVEL_DATABASE_NOT_OPEN') + t.is(error && error.code, 'LEVEL_DATABASE_NOT_OPEN') }) test(`${name} on closing db fails (deferred open: ${deferred})`, async function (t) { @@ -36,7 +36,7 @@ module.exports = function (name, testCommon, run) { } await promise - t.is(error.code, 'LEVEL_DATABASE_NOT_OPEN') + t.is(error && error.code, 'LEVEL_DATABASE_NOT_OPEN') }) } } diff --git a/test/traits/open.js b/test/traits/open.js index 0b43e83..1534c8c 100644 --- a/test/traits/open.js +++ b/test/traits/open.js @@ -1,7 +1,13 @@ 'use strict' -module.exports = function (name, testCommon, run) { +module.exports = function (name, testCommon, options, run) { + if (typeof options === 'function') { + run = options + options = {} + } + const test = testCommon.test + const deferred = options.deferred !== false test(`${name} on open db`, async function (t) { const db = testCommon.factory() @@ -15,7 +21,7 @@ module.exports = function (name, testCommon, run) { return db.close() }) - test(`${name} on opening db`, async function (t) { + deferred && test(`${name} on opening db`, async function (t) { const db = testCommon.factory() t.is(db.status, 'opening') await run(t, db) @@ -38,7 +44,7 @@ module.exports = function (name, testCommon, run) { return db.close() }) - test(`${name} on reopening db`, async function (t) { + deferred && test(`${name} on reopening db`, async function (t) { const db = testCommon.factory() await db.close() diff --git a/test/util.js b/test/util.js index c144a53..d2df9e7 100644 --- a/test/util.js +++ b/test/util.js @@ -1,6 +1,6 @@ 'use strict' -const { AbstractLevel, AbstractChainedBatch } = require('..') +const { AbstractLevel, AbstractChainedBatch, AbstractSnapshot } = require('..') const { AbstractIterator, AbstractKeyIterator, AbstractValueIterator } = require('..') const noop = function () {} @@ -88,7 +88,12 @@ const kOptions = Symbol('options') */ class MinimalLevel extends AbstractLevel { constructor (options) { - super({ encodings: { utf8: true }, seek: true }, options) + super({ + encodings: { utf8: true }, + seek: true, + explicitSnapshots: true + }, options) + this[kEntries] = new Map() } @@ -97,12 +102,15 @@ class MinimalLevel extends AbstractLevel { } async _get (key, options) { + const entries = (options.snapshot || this)[kEntries] + // Is undefined if not found - return this[kEntries].get(key) + return entries.get(key) } async _getMany (keys, options) { - return keys.map(k => this[kEntries].get(k)) + const entries = (options.snapshot || this)[kEntries] + return keys.map(k => entries.get(k)) } async _del (key, options) { @@ -110,7 +118,9 @@ class MinimalLevel extends AbstractLevel { } async _clear (options) { - for (const [k] of sliceEntries(this[kEntries], options, true)) { + const entries = (options.snapshot || this)[kEntries] + + for (const [k] of sliceEntries(entries, options, true)) { this[kEntries].delete(k) } } @@ -137,12 +147,24 @@ class MinimalLevel extends AbstractLevel { _values (options) { return new MinimalValueIterator(this, options) } + + _snapshot (options) { + return new MinimalSnapshot(this, options) + } +} + +class MinimalSnapshot extends AbstractSnapshot { + constructor (db, options) { + super(options) + this[kEntries] = new Map(db[kEntries]) + } } class MinimalIterator extends AbstractIterator { constructor (db, options) { super(db, options) - this[kEntries] = sliceEntries(db[kEntries], options, false) + const entries = (options.snapshot || db)[kEntries] + this[kEntries] = sliceEntries(entries, options, false) this[kOptions] = options this[kPosition] = 0 } @@ -151,7 +173,8 @@ class MinimalIterator extends AbstractIterator { class MinimalKeyIterator extends AbstractKeyIterator { constructor (db, options) { super(db, options) - this[kEntries] = sliceEntries(db[kEntries], options, false) + const entries = (options.snapshot || db)[kEntries] + this[kEntries] = sliceEntries(entries, options, false) this[kOptions] = options this[kPosition] = 0 } @@ -160,7 +183,8 @@ class MinimalKeyIterator extends AbstractKeyIterator { class MinimalValueIterator extends AbstractValueIterator { constructor (db, options) { super(db, options) - this[kEntries] = sliceEntries(db[kEntries], options, false) + const entries = (options.snapshot || db)[kEntries] + this[kEntries] = sliceEntries(entries, options, false) this[kOptions] = options this[kPosition] = 0 } diff --git a/types/abstract-level.d.ts b/types/abstract-level.d.ts index 48a0603..d3232e5 100644 --- a/types/abstract-level.d.ts +++ b/types/abstract-level.d.ts @@ -3,6 +3,7 @@ import * as Transcoder from 'level-transcoder' import { EventEmitter } from 'events' import { AbstractChainedBatch } from './abstract-chained-batch' import { AbstractSublevel, AbstractSublevelOptions } from './abstract-sublevel' +import { AbstractSnapshot } from './abstract-snapshot' import { AbstractIterator, @@ -235,6 +236,17 @@ declare class AbstractLevel */ valueEncoding (): Transcoder.Encoding + /** + * Create an explicit snapshot. Throws a `LEVEL_NOT_SUPPORTED` error if + * `db.supports.explicitSnapshots` is false. + * + * Don't forget to call `snapshot.close()` when done. + * + * @param options There are currently no options but specific implementations + * may add their own. + */ + snapshot (options?: any | undefined): AbstractSnapshot + /** * Call the function {@link fn} at a later time when {@link status} changes to * `'open'` or `'closed'`. Known as a _deferred operation_. diff --git a/types/abstract-snapshot.d.ts b/types/abstract-snapshot.d.ts new file mode 100644 index 0000000..68f95b7 --- /dev/null +++ b/types/abstract-snapshot.d.ts @@ -0,0 +1,23 @@ +/** + * A lightweight token that represents a version of a database at a particular point in + * time. + */ +export class AbstractSnapshot { + /** + * Increment reference count, to register work that should delay closing until + * {@link unref} is called an equal amount of times. The promise that will be returned + * by {@link close} will not resolve until the reference count returns to 0. This + * prevents prematurely closing underlying resources while the snapshot is in use. + */ + ref (): void + + /** + * Decrement reference count, to indicate that the work has finished. + */ + unref (): void + + /** + * Close the snapshot. + */ + close (): Promise +}