Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,10 @@

## Unreleased

- added: `autoRotateCouch` helper function.
- added: `chunkSize` and `startIndex` parameters to rolling database streaming methods.
- deprecated: `limit` parameter for `viewToStream`. Use `chunkSize` instead.

## 0.2.23 (2025-09-11)

- added: Accept a chunk size for rolling database streaming queries.
Expand Down
19 changes: 19 additions & 0 deletions src/couchdb/autoRotateCouch.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
import type { ServerScope } from 'nano'

export async function autoRotateCouch<T>(
connections: ServerScope[],
work: (connection: ServerScope) => Promise<T>,
opts: { startIndex?: number } = {}
): Promise<T> {
const { startIndex = Math.floor(Math.random() * 255) } = opts
let index = startIndex % connections.length

for (let i = 0; i < connections.length - 1; ++i) {
try {
return await work(connections[index])
} catch (error) {
index = (index + 1) % connections.length
}
}
return await work(connections[index])
}
53 changes: 34 additions & 19 deletions src/couchdb/rolling-database.ts
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import nano, {

import {
asCouchDoc,
autoRotateCouch,
connectCouch,
CouchDoc,
CouchPool,
Expand Down Expand Up @@ -69,6 +70,14 @@ export interface RollingViewParams extends DocumentViewParams {
partition?: string
}

export interface RollingStreamParams extends RollingViewParams {
/** The number of documents to retrieve in each iteration */
chunkSize?: number

/** The preferred node to use for auto-rotation */
startIndex?: number
}

/**
* Arguments to the rolling database reduce query.
*/
Expand All @@ -94,8 +103,8 @@ export interface RollingDatabase<T> {
) => Promise<Array<CouchDoc<T>>>

listAsStream: (
connection: ServerScope,
params: RollingViewParams
connection: ServerScope | ServerScope[],
params: RollingStreamParams
) => AsyncIterableIterator<CouchDoc<T>>

/**
Expand All @@ -118,10 +127,10 @@ export interface RollingDatabase<T> {
) => Promise<Array<CouchDoc<T>>>

viewAsStream: (
connection: ServerScope,
connection: ServerScope | ServerScope[],
design: string,
view: string,
params: RollingViewParams
params: RollingStreamParams
) => AsyncIterableIterator<CouchDoc<T>>

insert: (
Expand Down Expand Up @@ -225,18 +234,16 @@ export function makeRollingDatabase<T>(
}

async function* streamingQuery(
connection: ServerScope,
connections: ServerScope[],
callback: (
db: nano.DocumentScope<unknown>,
params: DocumentViewParams
) => Promise<Array<{ id: string; key: string; doc?: unknown }>>,
opts: { afterDate?: Date; chunkSize?: number } = {}
opts: { afterDate?: Date; chunkSize?: number; startIndex?: number } = {}
): AsyncIterableIterator<CouchDoc<T>> {
const { afterDate, chunkSize = 2048, ...rest } = opts
const { afterDate, chunkSize = 2048, startIndex, ...rest } = opts

for (const database of databases) {
const db = connection.use(database.name)

let lastRow: { id: string; key: string } | undefined
while (true) {
const params: DocumentViewParams = {
Expand All @@ -249,7 +256,12 @@ export function makeRollingDatabase<T>(
params.start_key = lastRow.key
params.start_key_doc_id = lastRow.id
}
const rows = await callback(db, params)
const rows = await autoRotateCouch(
connections,
async connection =>
await callback(connection.use(database.name), params),
{ startIndex }
)
for (const row of rows) yield asDoc(row.doc)

// Set up the next iteration:
Expand Down Expand Up @@ -319,26 +331,28 @@ export function makeRollingDatabase<T>(
}

function listAsStream(
connection: ServerScope,
opts: RollingViewParams
connection: ServerScope | ServerScope[],
opts: RollingStreamParams
): AsyncIterableIterator<CouchDoc<T>> {
const {
afterDate,
chunkSize,
partition,
startIndex,
// Native CouchDB options:
...rest
} = opts

return streamingQuery(
connection,
Array.isArray(connection) ? connection : [connection],
async (db, params) => {
const allParams = { ...rest, ...params }
const { rows } = await (partition == null
? db.list(allParams)
: db.partitionedList(partition, allParams))
return rows
},
{ afterDate }
{ afterDate, chunkSize, startIndex }
)
}

Expand Down Expand Up @@ -405,29 +419,30 @@ export function makeRollingDatabase<T>(
}

function viewAsStream(
connection: ServerScope,
connection: ServerScope | ServerScope[],
design: string,
view: string,
opts: RollingViewParams & { chunkSize?: number }
opts: RollingStreamParams
): AsyncIterableIterator<CouchDoc<T>> {
const {
afterDate,
partition,
chunkSize,
partition,
startIndex,
// Native CouchDB options:
...rest
} = opts

return streamingQuery(
connection,
Array.isArray(connection) ? connection : [connection],
async (db, params) => {
const allParams = { ...rest, ...params, reduce: false }
const { rows } = await (partition == null
? db.view(design, view, allParams)
: db.partitionedView(partition, design, view, allParams))
return rows
},
{ chunkSize, afterDate }
{ afterDate, chunkSize, startIndex }
)
}

Expand Down
12 changes: 8 additions & 4 deletions src/couchdb/view-to-stream.ts
Original file line number Diff line number Diff line change
Expand Up @@ -15,15 +15,19 @@ interface CommonViewResponse {
*/
export async function* viewToStream(
callback: (params: DocumentViewParams) => Promise<CommonViewResponse>,
opts: { limit?: number } = {}
opts: {
chunkSize?: number
/** @deprecated use chunkSize */
limit?: number
} = {}
): AsyncIterableIterator<unknown> {
const { limit = 2048 } = opts
const { limit = 2048, chunkSize = limit } = opts

let lastRow: { id: string; key: string } | undefined
while (true) {
const params: DocumentViewParams = {
include_docs: true,
limit
limit: chunkSize
}
if (lastRow != null) {
params.skip = 1
Expand All @@ -34,7 +38,7 @@ export async function* viewToStream(
for (const row of rows) yield row.doc

// Set up the next iteration:
if (rows.length < limit) break
if (rows.length < chunkSize) break
lastRow = rows[rows.length - 1]
}
}
1 change: 1 addition & 0 deletions src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ export {
connectCouch
} from './couchdb/couch-pool'

export * from './couchdb/autoRotateCouch'
export * from './couchdb/for-each-document'
export * from './couchdb/js-design-document'
export * from './couchdb/mango-design-document'
Expand Down
Loading