|
1 | 1 | # @ydbjs/topic
|
2 | 2 |
|
3 |
| -The `@ydbjs/topic` package provides high-level, type-safe clients for working with YDB topics (message queues) in JavaScript/TypeScript. It enables efficient streaming reads and writes, partition management, offset commits, and supports compression and custom payload encoding/decoding. |
| 3 | +Read this in Russian: [README.ru.md](README.ru.md) |
| 4 | + |
| 5 | +High‑level, type‑safe clients for YDB Topics (publish–subscribe message streams) in JavaScript/TypeScript. Provides efficient streaming reads and writes, partition session management, offset commits, compression, and transaction‑aware readers/writers. |
4 | 6 |
|
5 | 7 | ## Features
|
6 | 8 |
|
7 |
| -- Streaming topic reader and writer with async iteration |
8 |
| -- Partition session management and offset commit |
9 |
| -- Compression and custom payload encoding/decoding |
10 |
| -- TypeScript support with type definitions |
11 |
| -- Integration with `@ydbjs/core` and `@ydbjs/api` |
| 9 | +- Streaming reader and writer with async iteration |
| 10 | +- Partition session lifecycle hooks and offset commit |
| 11 | +- Pluggable compression codecs (RAW, GZIP, ZSTD; custom via maps) |
| 12 | +- Transaction‑aware read/write helpers |
| 13 | +- First‑class TypeScript types |
12 | 14 |
|
13 | 15 | ## Installation
|
14 | 16 |
|
15 | 17 | ```sh
|
16 |
| -npm install @ydbjs/core@alpha @ydbjs/topic@alpha |
| 18 | +npm install @ydbjs/topic |
17 | 19 | ```
|
18 | 20 |
|
19 |
| -## How It Works |
| 21 | +Requires Node.js >= 20.19. |
| 22 | + |
| 23 | +## Getting Started |
20 | 24 |
|
21 |
| -- **TopicReader**: Reads messages from a YDB topic as async batches, manages partition sessions, and supports offset commits. |
22 |
| -- **TopicWriter**: Writes messages to a YDB topic, supports batching, compression, and custom encoding. |
23 |
| -- **Integration**: Use with a `Driver` from `@ydbjs/core` for connection management and authentication. |
| 25 | +Two ways to use the client: |
24 | 26 |
|
25 |
| -## Usage |
| 27 | +- Top‑level client via `topic(driver)` |
| 28 | +- Direct factory functions via subpath imports |
26 | 29 |
|
27 |
| -### Reading from a Topic |
| 30 | +### Using the top‑level client |
28 | 31 |
|
29 | 32 | ```ts
|
30 | 33 | import { Driver } from '@ydbjs/core'
|
31 |
| -import { TopicReader } from '@ydbjs/topic/reader' |
32 |
| -import { Codec } from '@ydbjs/api/topic' |
| 34 | +import { topic } from '@ydbjs/topic' |
33 | 35 |
|
34 | 36 | const driver = new Driver(process.env['YDB_CONNECTION_STRING']!)
|
35 | 37 | await driver.ready()
|
36 | 38 |
|
37 |
| -await using reader = new TopicReader(driver, { |
38 |
| - topic: 'test-topic', |
39 |
| - consumer: 'test-consumer', |
40 |
| - maxBufferBytes: 64n * 1024n, |
41 |
| - compression: { |
42 |
| - decompress(codec, payload) { |
43 |
| - if (codec === Codec.GZIP) { |
44 |
| - return import('node:zlib').then((zlib) => zlib.gunzipSync(payload)) |
45 |
| - } else { |
46 |
| - throw new Error(`Unsupported codec: ${codec}`) |
47 |
| - } |
48 |
| - }, |
49 |
| - }, |
50 |
| -}) |
| 39 | +const t = topic(driver) |
51 | 40 |
|
52 |
| -for await (let batch of reader.read({ limit: 50, waitMs: 1000 })) { |
53 |
| - console.log('received batch', batch.length) |
| 41 | +// Reader |
| 42 | +await using reader = t.createReader({ topic: '/Root/my-topic', consumer: 'my-consumer' }) |
| 43 | +for await (const batch of reader.read()) { |
| 44 | + // Process messages |
| 45 | + for (const msg of batch) console.log(new TextDecoder().decode(msg.payload)) |
| 46 | + // Commit processed offsets (see performance note below) |
54 | 47 | await reader.commit(batch)
|
55 | 48 | }
|
| 49 | + |
| 50 | +// Writer |
| 51 | +await using writer = t.createWriter({ topic: '/Root/my-topic', producer: 'my-producer' }) |
| 52 | +writer.write(new TextEncoder().encode('Hello, YDB!')) |
| 53 | +await writer.flush() |
56 | 54 | ```
|
57 | 55 |
|
58 |
| -### Writing to a Topic |
| 56 | +### Using direct factories |
59 | 57 |
|
60 | 58 | ```ts
|
61 | 59 | import { Driver } from '@ydbjs/core'
|
62 |
| -import { TopicWriter } from '@ydbjs/topic/writer' |
63 |
| -import { Codec } from '@ydbjs/api/topic' |
64 |
| -import * as zlib from 'node:zlib' |
| 60 | +import { createTopicReader, createTopicTxReader } from '@ydbjs/topic/reader' |
| 61 | +import { createTopicWriter, createTopicTxWriter } from '@ydbjs/topic/writer' |
65 | 62 |
|
66 | 63 | const driver = new Driver(process.env['YDB_CONNECTION_STRING']!)
|
67 | 64 | await driver.ready()
|
68 | 65 |
|
69 |
| -await using writer = new TopicWriter(driver, { |
70 |
| - topic: 'test-topic', |
71 |
| - producer: 'test-producer', |
72 |
| - maxBufferBytes: 64n * 1024n, |
73 |
| - flushIntervalMs: 5000, |
74 |
| - compression: { |
75 |
| - codec: Codec.GZIP, |
76 |
| - compress(payload) { |
77 |
| - return zlib.gzipSync(payload) |
78 |
| - }, |
79 |
| - }, |
80 |
| -}) |
81 |
| - |
82 |
| -writer.write(new Uint8Array([1, 2, 3, 4])) |
| 66 | +await using reader = createTopicReader(driver, { topic: '/Root/my-topic', consumer: 'my-consumer' }) |
| 67 | +await using writer = createTopicWriter(driver, { topic: '/Root/my-topic', producer: 'my-producer' }) |
83 | 68 | ```
|
84 | 69 |
|
85 |
| -## Configuration & Options |
| 70 | +## Reader |
86 | 71 |
|
87 |
| -### TopicReaderOptions |
| 72 | +### Options |
88 | 73 |
|
89 |
| -Options for configuring a `TopicReader` instance: |
| 74 | +- `topic`: `string | TopicReaderSource | TopicReaderSource[]` — topic path or detailed sources |
| 75 | +- `consumer`: `string` — consumer name |
| 76 | +- `codecMap?`: `Map<Codec | number, CompressionCodec>` — custom codecs for decompression |
| 77 | +- `maxBufferBytes?`: `bigint` — internal buffer cap (default ~4 MiB) |
| 78 | +- `updateTokenIntervalMs?`: `number` — auth token refresh interval (default 60000) |
| 79 | +- `onPartitionSessionStart?`: hook to adjust read/commit offsets per session |
| 80 | +- `onPartitionSessionStop?`: hook on session stop (cleanup/commit) |
| 81 | +- `onCommittedOffset?`: observe commit acknowledgments from server |
90 | 82 |
|
91 |
| -| Option | Type | Description & Best Practice | |
92 |
| -| ------------------------- | ------------------------------------------------------- | --------------------------------------------------------------------------------------------------------------------------------------------------- | |
93 |
| -| `topic` | `string \| TopicReaderSource \| TopicReaderSource[]` | Topic path or array of topic sources. Use a string for a single topic, or an array for multi-topic reading. | |
94 |
| -| `consumer` | `string` | Consumer name. Use a unique name per logical consumer group. | |
95 |
| -| `maxBufferBytes` | `bigint` | Max internal buffer size in bytes. Increase for high-throughput, decrease to limit memory usage. | |
96 |
| -| `updateTokenIntervalMs` | `number` | How often to update the token (ms). Default: 60000. Lower for short-lived tokens. | |
97 |
| -| `compression.decompress` | `(codec, payload) => Uint8Array \| Promise<Uint8Array>` | Custom decompression function. Use for custom codecs or to enable GZIP/LZ4, etc. | |
98 |
| -| `decode` | `(payload: Uint8Array) => Payload` | Custom payload decoder. Use for JSON, protobuf, or other formats. | |
99 |
| -| `onPartitionSessionStart` | `function` | Called when a partition session starts. Use to set custom read/commit offsets. | |
100 |
| -| `onPartitionSessionStop` | `function` | Called when a partition session stops. Use to commit offsets or cleanup. | |
101 |
| -| `onCommittedOffset` | `function` | Called after offsets are committed. Use for monitoring or logging. For high-throughput, prefer this hook over awaiting `commit()` (see note below). | |
| 83 | +TopicReaderSource supports partition filters and time‑based selectors: |
102 | 84 |
|
103 |
| -> **Performance Note:** |
104 |
| -> |
105 |
| -> The `commit` method can be called without `await` to send commit requests to the server. If you use `await reader.commit(batch)`, your code will wait for the server's acknowledgment before continuing, which can significantly reduce throughput. For best performance in high-load scenarios, avoid awaiting `commit` directly in your main loop. Instead, use the `onCommittedOffset` hook to be notified when the server confirms the commit. This allows your application to process messages at maximum speed while still tracking commit confirmations asynchronously. |
| 85 | +```ts |
| 86 | +const source = { |
| 87 | + path: '/Root/my-topic', |
| 88 | + partitionIds: [0n, 1n], |
| 89 | + // Skip messages older than 5 minutes |
| 90 | + maxLag: '5m', // number (ms), ms‑string, or Duration |
| 91 | + // Start from a timestamp |
| 92 | + readFrom: new Date(Date.now() - 60_000), |
| 93 | +} |
| 94 | +``` |
106 | 95 |
|
107 |
| -#### Example: Custom Decoder and Partition Hooks |
| 96 | +### Reading and committing |
108 | 97 |
|
109 | 98 | ```ts
|
110 |
| -await using reader = new TopicReader(driver, { |
111 |
| - topic: 'test-topic', |
112 |
| - consumer: 'my-consumer', |
113 |
| - decode(payload) { |
114 |
| - return JSON.parse(Buffer.from(payload).toString('utf8')) |
115 |
| - }, |
116 |
| - onPartitionSessionStart(session, committedOffset, partitionOffsets) { |
117 |
| - console.log('Partition started', session.partitionId) |
118 |
| - }, |
119 |
| - onPartitionSessionStop(session, committedOffset) { |
120 |
| - console.log('Partition stopped', session.partitionId) |
121 |
| - }, |
122 |
| -}) |
| 99 | +const t = topic(driver) |
| 100 | +await using reader = t.createReader({ topic: source, consumer: 'svc-a' }) |
| 101 | + |
| 102 | +for await (const batch of reader.read({ limit: 100, waitMs: 1000 })) { |
| 103 | + if (!batch.length) continue // periodic empty batches when no data |
| 104 | + |
| 105 | + // Handle messages |
| 106 | + for (const m of batch) doSomething(m) |
| 107 | + |
| 108 | + // Option A (simple): await commit for each batch |
| 109 | + await reader.commit(batch) |
| 110 | + |
| 111 | + // Option B (fast path): fire‑and‑forget commit |
| 112 | + // void reader.commit(batch) |
| 113 | +} |
123 | 114 | ```
|
124 | 115 |
|
125 |
| -### TopicWriterOptions |
| 116 | +Performance note: awaiting `commit()` in the hot path reduces throughput. For high load, prefer fire‑and‑forget plus `onCommittedOffset` to observe confirmations asynchronously. |
126 | 117 |
|
127 |
| -Options for configuring a `TopicWriter` instance: |
| 118 | +## Writer |
128 | 119 |
|
129 |
| -| Option | Type | Description & Best Practice | |
130 |
| -| ------------------------ | ------------------------------------------------------------ | ---------------------------------------------------------------------------------------------------------------------------------------------- | |
131 |
| -| `topic` | `string` | Topic path to write to. Required. | |
132 |
| -| `producer` | `string` | Producer name. Set for idempotency and tracking. | |
133 |
| -| `getLastSeqNo` | `boolean` | Get last sequence number before writing. Use for exactly-once or deduplication. | |
134 |
| -| `allowDuplicates` | `boolean` | Allow duplicate messages. Set to true for at-most-once delivery. | |
135 |
| -| `updateTokenIntervalMs` | `number` | How often to update the token (ms). Default: 60000. | |
136 |
| -| `maxBufferBytes` | `bigint` | Max buffer size in bytes. Increase for batching, decrease for low-latency. | |
137 |
| -| `maxInflightCount` | `bigint` | Max in-flight messages. Tune for throughput vs. memory. | |
138 |
| -| `flushIntervalMs` | `number` | Auto-flush interval (ms). Lower for low-latency, higher for throughput. | |
139 |
| -| `compression.codec` | `Codec` | Compression codec (e.g., GZIP). Use to reduce network usage. | |
140 |
| -| `compression.compress` | `(payload: Uint8Array) => Uint8Array \| Promise<Uint8Array>` | Custom compression function. Use for custom codecs or advanced compression. | |
141 |
| -| `compression.minRawSize` | `bigint` | Minimum payload size to compress. Avoids compressing small messages. | |
142 |
| -| `encode` | `(payload: Payload) => Uint8Array` | Custom encoder. Use for JSON, protobuf, or other formats. | |
143 |
| -| `onAck` | `(seqNo: bigint, status?: string) => void` | Called on message acknowledgment. Use for tracking or logging. For high-throughput, prefer this hook over awaiting `write()` (see note below). | |
| 120 | +### Options |
144 | 121 |
|
145 |
| -> **Performance Note:** |
146 |
| -> |
147 |
| -> The `write` method adds messages to an internal buffer and returns a promise that resolves when the server acknowledges the write. If you use `await writer.write(...)`, your code will wait for the server's acknowledgment before continuing, which can significantly reduce throughput. For best performance in high-load scenarios, avoid awaiting `write` directly in your main loop. Instead, use the `onAck` hook to be notified when the server confirms the write. You can tune throughput and latency using `maxBufferBytes`, `maxInflightCount`, and `flushIntervalMs` options to control how quickly messages are sent to the server. |
| 122 | +- `topic`: `string` |
| 123 | +- `tx?`: `TX` — transaction to write within |
| 124 | +- `producer?`: `string` — producer id (auto‑generated if omitted) |
| 125 | +- `codec?`: `CompressionCodec` — compression (default RAW; built‑ins: RAW, GZIP, ZSTD) |
| 126 | +- `maxBufferBytes?`: `bigint` — writer buffer cap (default 256 MiB) |
| 127 | +- `maxInflightCount?`: `number` — max messages in‑flight (default 1000) |
| 128 | +- `flushIntervalMs?`: `number` — periodic flush tick (default 10ms) |
| 129 | +- `updateTokenIntervalMs?`: `number` — auth token refresh interval (default 60000) |
| 130 | +- `retryConfig?(signal)`: tune connection retry strategy |
| 131 | +- `onAck?(seqNo, status)`: callback on message acknowledgment |
148 | 132 |
|
149 |
| -#### Example: Custom Encoder and Compression |
| 133 | +### Writing messages |
150 | 134 |
|
151 | 135 | ```ts
|
152 |
| -await using writer = new TopicWriter(driver, { |
153 |
| - topic: 'test-topic', |
| 136 | +import { Codec } from '@ydbjs/api/topic' |
| 137 | + |
| 138 | +const t = topic(driver) |
| 139 | +await using writer = t.createWriter({ |
| 140 | + topic: '/Root/my-topic', |
154 | 141 | producer: 'json-producer',
|
155 |
| - encode(payload) { |
156 |
| - return Buffer.from(JSON.stringify(payload), 'utf8') |
157 |
| - }, |
158 |
| - compression: { |
159 |
| - codec: Codec.GZIP, |
160 |
| - compress(payload) { |
161 |
| - return zlib.gzipSync(payload) |
162 |
| - }, |
163 |
| - }, |
| 142 | + // Use RAW by default, or provide your own codec implementation. |
| 143 | + // See "Custom codecs" below for an example. |
164 | 144 | onAck(seqNo, status) {
|
165 |
| - console.log('Ack for', seqNo, 'status:', status) |
| 145 | + console.log('ack', seqNo, status) |
166 | 146 | },
|
167 | 147 | })
|
168 | 148 |
|
169 |
| -writer.write({ foo: 'bar', ts: Date.now() }) |
| 149 | +const payload = new TextEncoder().encode(JSON.stringify({ foo: 'bar', ts: Date.now() })) |
| 150 | +const seqNo = writer.write(payload) |
| 151 | +await writer.flush() |
| 152 | +``` |
| 153 | + |
| 154 | +`write()` accepts `Uint8Array` only. Encode your own objects/strings as needed. |
| 155 | + |
| 156 | +## Transactions |
| 157 | + |
| 158 | +Run topic reads/writes inside an @ydbjs/query transaction handler and pass the `tx` object that it provides. |
| 159 | + |
| 160 | +- Reader: `createTopicTxReader(driver, { topic, consumer, tx })` or `t.createTxReader({ ..., tx })`. The reader tracks read offsets and automatically issues updateOffsetsInTransaction on commit. |
| 161 | +- Writer: `createTopicTxWriter(tx, driver, { topic, ... })` or `t.createTxWriter(tx, { ... })`. The writer ensures a flush before the transaction commits. |
| 162 | + |
| 163 | +```ts |
| 164 | +import { query } from '@ydbjs/query' |
| 165 | +import { createTopicTxReader } from '@ydbjs/topic/reader' |
| 166 | +import { createTopicTxWriter } from '@ydbjs/topic/writer' |
| 167 | + |
| 168 | +const qc = query(driver) |
| 169 | + |
| 170 | +await qc.transaction(async (tx, signal) => { |
| 171 | + // Tx‑aware reader: offsets are bound to the transaction commit |
| 172 | + await using reader = createTopicTxReader(driver, { topic: '/Root/my-topic', consumer: 'svc-a', tx }) |
| 173 | + for await (const batch of reader.read({ signal })) { |
| 174 | + // process batch... |
| 175 | + } |
| 176 | + |
| 177 | + // Tx‑aware writer: flushes before commit |
| 178 | + await using writer = createTopicTxWriter(tx, driver, { topic: '/Root/my-topic', producer: 'p1' }) |
| 179 | + writer.write(new TextEncoder().encode('message')) |
| 180 | +}) |
170 | 181 | ```
|
171 | 182 |
|
172 |
| -## API |
| 183 | +Note: `tx` comes from the Query layer and exposes the required hooks; Topic clients integrate with them automatically. |
173 | 184 |
|
174 |
| -### TopicReader |
| 185 | +## Custom codecs |
175 | 186 |
|
176 |
| -- Reads messages from a topic as async batches |
177 |
| -- Supports custom decompression and decoding |
178 |
| -- Partition session hooks: `onPartitionSessionStart`, `onPartitionSessionStop`, `onCommittedOffset` |
179 |
| -- Offset commit with `commit()` |
| 187 | +Reader supports custom decompression through `codecMap` and writer via a `CompressionCodec` instance. |
180 | 188 |
|
181 |
| -### TopicWriter |
| 189 | +```ts |
| 190 | +import { Codec } from '@ydbjs/api/topic' |
| 191 | +import * as zlib from 'node:zlib' |
182 | 192 |
|
183 |
| -- Writes messages to a topic, supports batching and compression |
184 |
| -- Custom encoding and compression |
185 |
| -- Ack callback: `onAck` |
| 193 | +const MyGzip = { |
| 194 | + codec: Codec.GZIP, |
| 195 | + compress: (p: Uint8Array) => zlib.gzipSync(p), |
| 196 | + decompress: (p: Uint8Array) => zlib.gunzipSync(p), |
| 197 | +} |
| 198 | + |
| 199 | +await using reader = createTopicReader(driver, { |
| 200 | + topic: '/Root/custom', |
| 201 | + consumer: 'c1', |
| 202 | + codecMap: new Map([[Codec.GZIP, MyGzip]]), |
| 203 | +}) |
| 204 | + |
| 205 | +await using writer = createTopicWriter(driver, { |
| 206 | + topic: '/Root/custom', |
| 207 | + producer: 'p1', |
| 208 | + codec: MyGzip, |
| 209 | +}) |
| 210 | +``` |
186 | 211 |
|
187 |
| -### Types |
| 212 | +## Exports |
188 | 213 |
|
189 |
| -- `TopicMessage<Payload>`: Message structure for topic payloads |
190 |
| -- `TopicReaderOptions`, `TopicWriterOptions`: Configuration options for reader and writer |
| 214 | +- `@ydbjs/topic`: `topic(driver)` and types |
| 215 | +- `@ydbjs/topic/reader`: `createTopicReader`, `createTopicTxReader`, reader types |
| 216 | +- `@ydbjs/topic/writer`: `createTopicWriter`, `createTopicTxWriter`, writer types |
| 217 | +- `@ydbjs/topic/writer2`: experimental state‑machine writer (API subject to change) |
191 | 218 |
|
192 | 219 | ## License
|
193 | 220 |
|
|
0 commit comments