From a0ecd7f5e52d3815ced4379e58434cc1e36b324f Mon Sep 17 00:00:00 2001 From: William Swanson Date: Fri, 17 Oct 2025 13:45:51 -0700 Subject: [PATCH 1/3] Add an `autoRotateCouch` helper function --- CHANGELOG.md | 2 ++ src/couchdb/autoRotateCouch.ts | 19 +++++++++++++++++++ src/index.ts | 1 + 3 files changed, 22 insertions(+) create mode 100644 src/couchdb/autoRotateCouch.ts diff --git a/CHANGELOG.md b/CHANGELOG.md index 34c1d05..46a2d1f 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -2,6 +2,8 @@ ## Unreleased +- added: `autoRotateCouch` helper function. + ## 0.2.23 (2025-09-11) - added: Accept a chunk size for rolling database streaming queries. diff --git a/src/couchdb/autoRotateCouch.ts b/src/couchdb/autoRotateCouch.ts new file mode 100644 index 0000000..6e0bf94 --- /dev/null +++ b/src/couchdb/autoRotateCouch.ts @@ -0,0 +1,19 @@ +import type { ServerScope } from 'nano' + +export async function autoRotateCouch( + connections: ServerScope[], + work: (connection: ServerScope) => Promise, + opts: { startIndex?: number } = {} +): Promise { + const { startIndex = Math.floor(Math.random() * 255) } = opts + let index = startIndex % connections.length + + for (let i = 0; i < connections.length - 1; ++i) { + try { + return await work(connections[index]) + } catch (error) { + index = (index + 1) % connections.length + } + } + return await work(connections[index]) +} diff --git a/src/index.ts b/src/index.ts index 18a0a3e..096efad 100644 --- a/src/index.ts +++ b/src/index.ts @@ -13,6 +13,7 @@ export { connectCouch } from './couchdb/couch-pool' +export * from './couchdb/autoRotateCouch' export * from './couchdb/for-each-document' export * from './couchdb/js-design-document' export * from './couchdb/mango-design-document' From 027935ae394a02d6577b73aeb52e4750355e05a6 Mon Sep 17 00:00:00 2001 From: William Swanson Date: Fri, 17 Oct 2025 13:49:15 -0700 Subject: [PATCH 2/3] Integrate auto-rotation into streaming queries --- CHANGELOG.md | 1 + src/couchdb/rolling-database.ts | 53 +++++++++++++++++++++------------ 2 files changed, 35 insertions(+), 19 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 46a2d1f..d9bbe67 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -3,6 +3,7 @@ ## Unreleased - added: `autoRotateCouch` helper function. +- added: `chunkSize` and `startIndex` parameters to rolling database streaming methods. ## 0.2.23 (2025-09-11) diff --git a/src/couchdb/rolling-database.ts b/src/couchdb/rolling-database.ts index 751c59b..6b15085 100644 --- a/src/couchdb/rolling-database.ts +++ b/src/couchdb/rolling-database.ts @@ -17,6 +17,7 @@ import nano, { import { asCouchDoc, + autoRotateCouch, connectCouch, CouchDoc, CouchPool, @@ -69,6 +70,14 @@ export interface RollingViewParams extends DocumentViewParams { partition?: string } +export interface RollingStreamParams extends RollingViewParams { + /** The number of documents to retrieve in each iteration */ + chunkSize?: number + + /** The preferred node to use for auto-rotation */ + startIndex?: number +} + /** * Arguments to the rolling database reduce query. */ @@ -94,8 +103,8 @@ export interface RollingDatabase { ) => Promise>> listAsStream: ( - connection: ServerScope, - params: RollingViewParams + connection: ServerScope | ServerScope[], + params: RollingStreamParams ) => AsyncIterableIterator> /** @@ -118,10 +127,10 @@ export interface RollingDatabase { ) => Promise>> viewAsStream: ( - connection: ServerScope, + connection: ServerScope | ServerScope[], design: string, view: string, - params: RollingViewParams + params: RollingStreamParams ) => AsyncIterableIterator> insert: ( @@ -225,18 +234,16 @@ export function makeRollingDatabase( } async function* streamingQuery( - connection: ServerScope, + connections: ServerScope[], callback: ( db: nano.DocumentScope, params: DocumentViewParams ) => Promise>, - opts: { afterDate?: Date; chunkSize?: number } = {} + opts: { afterDate?: Date; chunkSize?: number; startIndex?: number } = {} ): AsyncIterableIterator> { - const { afterDate, chunkSize = 2048, ...rest } = opts + const { afterDate, chunkSize = 2048, startIndex, ...rest } = opts for (const database of databases) { - const db = connection.use(database.name) - let lastRow: { id: string; key: string } | undefined while (true) { const params: DocumentViewParams = { @@ -249,7 +256,12 @@ export function makeRollingDatabase( params.start_key = lastRow.key params.start_key_doc_id = lastRow.id } - const rows = await callback(db, params) + const rows = await autoRotateCouch( + connections, + async connection => + await callback(connection.use(database.name), params), + { startIndex } + ) for (const row of rows) yield asDoc(row.doc) // Set up the next iteration: @@ -319,18 +331,20 @@ export function makeRollingDatabase( } function listAsStream( - connection: ServerScope, - opts: RollingViewParams + connection: ServerScope | ServerScope[], + opts: RollingStreamParams ): AsyncIterableIterator> { const { afterDate, + chunkSize, partition, + startIndex, // Native CouchDB options: ...rest } = opts return streamingQuery( - connection, + Array.isArray(connection) ? connection : [connection], async (db, params) => { const allParams = { ...rest, ...params } const { rows } = await (partition == null @@ -338,7 +352,7 @@ export function makeRollingDatabase( : db.partitionedList(partition, allParams)) return rows }, - { afterDate } + { afterDate, chunkSize, startIndex } ) } @@ -405,21 +419,22 @@ export function makeRollingDatabase( } function viewAsStream( - connection: ServerScope, + connection: ServerScope | ServerScope[], design: string, view: string, - opts: RollingViewParams & { chunkSize?: number } + opts: RollingStreamParams ): AsyncIterableIterator> { const { afterDate, - partition, chunkSize, + partition, + startIndex, // Native CouchDB options: ...rest } = opts return streamingQuery( - connection, + Array.isArray(connection) ? connection : [connection], async (db, params) => { const allParams = { ...rest, ...params, reduce: false } const { rows } = await (partition == null @@ -427,7 +442,7 @@ export function makeRollingDatabase( : db.partitionedView(partition, design, view, allParams)) return rows }, - { chunkSize, afterDate } + { afterDate, chunkSize, startIndex } ) } From d35c7dac03596c271ae68948045b8ec40efb40bf Mon Sep 17 00:00:00 2001 From: William Swanson Date: Fri, 17 Oct 2025 13:52:51 -0700 Subject: [PATCH 3/3] Align viewToStream parameter names --- CHANGELOG.md | 1 + src/couchdb/view-to-stream.ts | 12 ++++++++---- 2 files changed, 9 insertions(+), 4 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index d9bbe67..1398ac9 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -4,6 +4,7 @@ - added: `autoRotateCouch` helper function. - added: `chunkSize` and `startIndex` parameters to rolling database streaming methods. +- deprecated: `limit` parameter for `viewToStream`. Use `chunkSize` instead. ## 0.2.23 (2025-09-11) diff --git a/src/couchdb/view-to-stream.ts b/src/couchdb/view-to-stream.ts index 3282744..a032118 100644 --- a/src/couchdb/view-to-stream.ts +++ b/src/couchdb/view-to-stream.ts @@ -15,15 +15,19 @@ interface CommonViewResponse { */ export async function* viewToStream( callback: (params: DocumentViewParams) => Promise, - opts: { limit?: number } = {} + opts: { + chunkSize?: number + /** @deprecated use chunkSize */ + limit?: number + } = {} ): AsyncIterableIterator { - const { limit = 2048 } = opts + const { limit = 2048, chunkSize = limit } = opts let lastRow: { id: string; key: string } | undefined while (true) { const params: DocumentViewParams = { include_docs: true, - limit + limit: chunkSize } if (lastRow != null) { params.skip = 1 @@ -34,7 +38,7 @@ export async function* viewToStream( for (const row of rows) yield row.doc // Set up the next iteration: - if (rows.length < limit) break + if (rows.length < chunkSize) break lastRow = rows[rows.length - 1] } }