From d91d7165701007a6e343e1150a13213a5f1e01e3 Mon Sep 17 00:00:00 2001 From: eryue0220 Date: Fri, 19 Sep 2025 14:29:11 +0800 Subject: [PATCH 1/4] feat: add example --- examples/README.md | 2 + .../insert_streaming_backpressure_simple.ts | 156 +++++++++ .../insert_streaming_with_backpressure.ts | 300 ++++++++++++++++++ 3 files changed, 458 insertions(+) create mode 100644 examples/node/insert_streaming_backpressure_simple.ts create mode 100644 examples/node/insert_streaming_with_backpressure.ts diff --git a/examples/README.md b/examples/README.md index 42adf0fa..02def739 100644 --- a/examples/README.md +++ b/examples/README.md @@ -45,6 +45,8 @@ If something is missing, or you found a mistake in one of these examples, please - [insert_file_stream_parquet.ts](node/insert_file_stream_parquet.ts) - (Node.js only) stream a Parquet file into ClickHouse. - [insert_arbitrary_format_stream.ts](node/insert_arbitrary_format_stream.ts) - (Node.js only) stream in arbitrary format into ClickHouse. In this case, the input format is [AVRO](https://clickhouse.com/docs/interfaces/formats/Avro), inserting the data from an Avro data file generated ad-hoc. - [stream_created_from_array_raw.ts](node/stream_created_from_array_raw.ts) - (Node.js only) converting the string input into a stream and sending it to ClickHouse; in this scenario, the base input is a CSV string. +- [insert_streaming_with_backpressure.ts](node/insert_streaming_with_backpressure.ts) - (Node.js only) advanced streaming INSERT example with proper backpressure handling, demonstrating how to handle high-throughput scenarios where your application pushes data to the stream. +- [insert_streaming_backpressure_simple.ts](node/insert_streaming_backpressure_simple.ts) - (Node.js only) simple streaming INSERT example with backpressure handling, showing the essential pattern for streaming data from your application to ClickHouse. - [insert_values_and_functions.ts](insert_values_and_functions.ts) - generating an `INSERT INTO ... VALUES` statement that uses a combination of values and function calls. - [insert_ephemeral_columns.ts](insert_ephemeral_columns.ts) - inserting data into a table that has [ephemeral columns](https://clickhouse.com/docs/en/sql-reference/statements/create/table#ephemeral). diff --git a/examples/node/insert_streaming_backpressure_simple.ts b/examples/node/insert_streaming_backpressure_simple.ts new file mode 100644 index 00000000..73769143 --- /dev/null +++ b/examples/node/insert_streaming_backpressure_simple.ts @@ -0,0 +1,156 @@ +import { createClient } from '@clickhouse/client' +import * as Stream from 'stream' + +interface DataRow { + id: number + name: string + value: number +} + +class DataStreamWithBackpressure extends Stream.Readable { + #dataQueue: DataRow[] = [] + #isReading = false + #hasEnded = false + + constructor() { + super({ objectMode: true, highWaterMark: 10 }) + } + + addData(data: DataRow): boolean { + if (this.#hasEnded) { + return false + } + + // If we're currently reading, try to push immediately + if (this.#isReading && this.#dataQueue.length === 0) { + return this.#pushData(data) + } else { + // Otherwise, queue the data + this.#dataQueue.push(data) + return true + } + } + + endStream() { + this.#hasEnded = true + if (this.#dataQueue.length === 0) { + super.push(null) + } + } + + _read() { + this.#isReading = true + + while (this.#dataQueue.length > 0) { + const data = this.#dataQueue.shift() + + if (!data || !this.#pushData(data)) { + this.#isReading = false + return + } + } + + // If all data is processed and stream should end + if (this.#hasEnded && this.#dataQueue.length === 0) { + super.push(null) + } + + this.#isReading = false + } + + #pushData(data: DataRow): boolean { + // Convert to JSON format for ClickHouse + const jsonLine = JSON.stringify(data) + return super.push(jsonLine) + } +} + +void (async () => { + const client = createClient() + const tableName = 'simple_streaming_demo' + + // Setup table + await client.command({ + query: `DROP TABLE IF EXISTS ${tableName}`, + }) + + await client.command({ + query: ` + CREATE TABLE ${tableName} + ( + id UInt32, + name String, + value Float64 + ) + ENGINE = MergeTree() + ORDER BY id + `, + }) + + const dataStream = new DataStreamWithBackpressure() + const addDataToStream = () => { + const maxRecords = 10000 + let id = 1 + let addedCount = 0 + + const addBatch = () => { + // Add a batch of records + const batchSize = Math.min(100, maxRecords - addedCount) + + for (let i = 0; i < batchSize; i++) { + const success = dataStream.addData({ + id: id++, + name: `Name_${id}`, + value: Math.random() * 1000, + }) + + if (!success) { + console.log('Failed to add data - stream ended') + return + } + } + + addedCount += batchSize + console.log(`Added batch of ${batchSize} records (total: ${addedCount})`) + + if (addedCount < maxRecords) { + // Continue adding data with a small delay + setTimeout(addBatch, 10) + } else { + console.log('Finished adding data, ending stream') + dataStream.endStream() + } + } + + addBatch() + } + + try { + console.log('Starting streaming insert...') + + // Start adding data in the background + addDataToStream() + + // Perform the streaming insert + await client.insert({ + table: tableName, + values: dataStream, + format: 'JSONEachRow', + }) + + console.log('Insert completed successfully!') + + // Verify the results + const result = await client.query({ + query: `SELECT count() as total FROM ${tableName}`, + format: 'JSONEachRow', + }) + + const [{ total }] = await result.json<{ total: string }>() + console.log(`Total records inserted: ${total}`) + } catch (error) { + console.error('Insert failed:', error) + } finally { + await client.close() + } +})() diff --git a/examples/node/insert_streaming_with_backpressure.ts b/examples/node/insert_streaming_with_backpressure.ts new file mode 100644 index 00000000..4a5ee1b2 --- /dev/null +++ b/examples/node/insert_streaming_with_backpressure.ts @@ -0,0 +1,300 @@ +import { createClient } from '@clickhouse/client' +import type { Row } from '@clickhouse/client-common' +import * as Stream from 'stream' +import { EventEmitter } from 'events' + +interface DataRow { + id: number + timestamp: Date + message: string + value: number +} + +class BackpressureAwareDataProducer extends Stream.Readable { + #dataSource: EventEmitter + #streamPaused = false + #pendingData: DataRow[] = [] + #isDestroyed = false + #total = 0 + + constructor(dataSource: EventEmitter, options?: Stream.ReadableOptions) { + super({ + // Required for JSON* formats + objectMode: true, + // Limit buffering to prevent memory issues + highWaterMark: 16, + ...options, + }) + + this.#dataSource = dataSource + this.#setupDataSourceListeners() + } + + #setupDataSourceListeners() { + this.#dataSource.on('data', this.#handleIncomingData.bind(this)) + this.#dataSource.on('end', this.#handleDataSourceEnd.bind(this)) + this.#dataSource.on('error', this.#handleDataSourceError.bind(this)) + } + + #handleIncomingData(data: DataRow) { + if (this.#isDestroyed) { + return + } + + if (this.#streamPaused) { + this.#pendingData.push(data) + return + } + + // Try to push the data immediately + if (!this.#pushData(data)) { + // If push returns false, we're experiencing backpressure + // Pause the data source and buffer subsequent data + this.#streamPaused = true + } + } + + #pushData(data: DataRow): boolean { + if (this.#isDestroyed) { + return false + } + + // Convert data to JSON format for ClickHouse + const jsonLine = JSON.stringify({ + id: data.id, + timestamp: data.timestamp.toISOString(), + message: data.message, + value: data.value, + }) + + const pushed = super.push(jsonLine) + if (pushed) { + this.#total++ + if (this.#total % 1000 === 0) { + console.log(`Produced ${this.#total} rows`) + } + } + return pushed + } + + #handleDataSourceEnd() { + console.log(`Data source ended. Total produced: ${this.#total} rows`) + super.push(null) + } + + #handleDataSourceError(error: Error) { + console.error('Data source error:', error) + super.destroy(error) + } + + // Called when the stream is ready to accept more data (backpressure resolved) + _read() { + if (this.#streamPaused && this.#pendingData.length > 0) { + // Process buffered data when backpressure is resolved + this.#streamPaused = false + + // Push all pending data, but stop if we hit backpressure again + while (this.#pendingData.length > 0 && !this.#streamPaused) { + const data = this.#pendingData.shift() + if (!data || !this.#pushData(data)) { + this.#streamPaused = true + break + } + } + } + } + + _destroy(error: Error | null, callback: (error?: Error | null) => void) { + this.#isDestroyed = true + this.#dataSource.removeAllListeners('data') + this.#dataSource.removeAllListeners('end') + this.#dataSource.removeAllListeners('error') + callback(error) + } + + get total(): number { + return this.#total + } +} + +// Simulated data source that generates data at varying rates +class SimulatedDataSource extends EventEmitter { + #intervalHandle: NodeJS.Timeout | null = null + #isRunning = false + #total = 0 + #burstMode = false + + start() { + if (this.#isRunning) return + + this.#isRunning = true + this.#scheduleNextBatch() + + // Randomly switch between normal and burst modes + setInterval(() => { + this.#burstMode = !this.#burstMode + console.log(`Switched to ${this.#burstMode ? 'burst' : 'normal'} mode`) + }, 10000) + } + + #scheduleNextBatch() { + if (!this.#isRunning) return + + // Variable delay to simulate real-world conditions + const delay = this.#burstMode ? 10 : Math.random() * 100 + 50 + const batchSize = this.#burstMode + ? Math.floor(Math.random() * 100) + 50 + : Math.floor(Math.random() * 10) + 1 + + this.#intervalHandle = setTimeout(() => { + this.#generateBatch(batchSize) + this.#scheduleNextBatch() + }, delay) + } + + #generateBatch(size: number) { + for (let i = 0; i < size; i++) { + const data: DataRow = { + id: this.#total++, + timestamp: new Date(), + message: `Message ${this.#total} - ${Math.random().toString(36).substring(7)}`, + value: Math.random() * 1000, + } + + this.emit('data', data) + } + } + + stop() { + this.#isRunning = false + if (this.#intervalHandle) { + clearTimeout(this.#intervalHandle) + this.#intervalHandle = null + } + this.emit('end') + } + + get total(): number { + return this.#total + } +} + +void (async () => { + const tableName = 'streaming_backpressure_demo' + const client = createClient({ + // Configure client for high-throughput scenarios + max_open_connections: 5, + compression: { + request: true, + response: true, + }, + }) + + console.log('Setting up table...') + + await client.command({ + query: `DROP TABLE IF EXISTS ${tableName}`, + }) + + await client.command({ + query: ` + CREATE TABLE ${tableName} + ( + id UInt64, + timestamp DateTime, + message String, + value Float64 + ) + ENGINE = MergeTree() + ORDER BY (id, timestamp) + `, + }) + + // Create data source and producer + const dataSource = new SimulatedDataSource() + const dataProducer = new BackpressureAwareDataProducer(dataSource) + + // Start generating data + console.log('Starting data generation...') + dataSource.start() + + // Handle graceful shutdown + const cleanup = async () => { + console.log('\nShutting down gracefully...') + dataSource.stop() + + // Wait a bit for any remaining data to be processed + await new Promise((resolve) => setTimeout(resolve, 1000)) + + console.log(`Final stats:`) + console.log(`- Generated: ${dataSource.total} rows`) + console.log(`- Produced: ${dataProducer.total} rows`) + + await client.close() + process.exit(0) + } + + process.on('SIGINT', cleanup) + process.on('SIGTERM', cleanup) + + try { + console.log('Starting streaming insert...') + const startTime = Date.now() + + await client.insert({ + table: tableName, + values: dataProducer, + format: 'JSONEachRow', + clickhouse_settings: { + // Optimize for streaming inserts + async_insert: 1, + wait_for_async_insert: 1, + async_insert_max_data_size: '10485760', // 10MB + async_insert_busy_timeout_ms: 1000, + }, + }) + + const duration = Date.now() - startTime + + console.log(`\nInsert completed in ${duration}ms`) + console.log(`Inserted ${dataProducer.total} rows`) + console.log('\nVerifying inserted data...') + + const result = await client.query({ + query: ` + SELECT + count() as total_rows, + min(timestamp) as min_timestamp, + max(timestamp) as max_timestamp, + avg(value) as avg_value + FROM ${tableName} + `, + format: 'JSONEachRow', + }) + + const stats = await result.json<{ + total_rows: string + min_timestamp: string + max_timestamp: string + avg_value: string + }>() + + console.log('Verification results:', stats[0]) + + const sampleResult = await client.query({ + query: `SELECT * FROM ${tableName} ORDER BY id LIMIT 5`, + format: 'JSONEachRow', + }) + + console.log('\nSample data:') + for await (const rows of sampleResult.stream()) { + rows.forEach((row: Row) => { + console.log(row.json()) + }) + } + } catch (error) { + console.error('Insert failed:', error) + } finally { + await cleanup() + } +})() From 625e7f85c0b94927f53f907fd82b7c63d4c5926a Mon Sep 17 00:00:00 2001 From: eryue0220 Date: Fri, 19 Sep 2025 15:27:15 +0800 Subject: [PATCH 2/4] fix: example error --- .../insert_streaming_backpressure_simple.ts | 151 ++++++++---------- .../insert_streaming_with_backpressure.ts | 16 +- 2 files changed, 74 insertions(+), 93 deletions(-) diff --git a/examples/node/insert_streaming_backpressure_simple.ts b/examples/node/insert_streaming_backpressure_simple.ts index 73769143..7c813b89 100644 --- a/examples/node/insert_streaming_backpressure_simple.ts +++ b/examples/node/insert_streaming_backpressure_simple.ts @@ -1,5 +1,5 @@ import { createClient } from '@clickhouse/client' -import * as Stream from 'stream' +import * as Stream from 'node:stream' interface DataRow { id: number @@ -7,67 +7,75 @@ interface DataRow { value: number } -class DataStreamWithBackpressure extends Stream.Readable { - #dataQueue: DataRow[] = [] - #isReading = false - #hasEnded = false +class SimpleBackpressureStream extends Stream.Readable { + #currentId = 1 + #maxRecords = 0 + #intervalId: NodeJS.Timeout | null = null + #isPaused = false - constructor() { - super({ objectMode: true, highWaterMark: 10 }) + constructor(maxRecords: number) { + super({ objectMode: true, highWaterMark: 5 }) + this.#maxRecords = maxRecords } - addData(data: DataRow): boolean { - if (this.#hasEnded) { - return false - } - - // If we're currently reading, try to push immediately - if (this.#isReading && this.#dataQueue.length === 0) { - return this.#pushData(data) - } else { - // Otherwise, queue the data - this.#dataQueue.push(data) - return true + _read() { + if (this.#isPaused) { + console.log('Backpressure relieved - resuming data production') + this.#isPaused = false + this.#startProducing() } } - endStream() { - this.#hasEnded = true - if (this.#dataQueue.length === 0) { - super.push(null) + #startProducing() { + if (this.#intervalId || this.#currentId > this.#maxRecords) { + return } - } - - _read() { - this.#isReading = true - while (this.#dataQueue.length > 0) { - const data = this.#dataQueue.shift() - - if (!data || !this.#pushData(data)) { - this.#isReading = false + this.#intervalId = setInterval(() => { + if (this.#currentId > this.#maxRecords) { + console.log('All data produced, ending stream') + this.push(null) // End the stream + this.#stopProducing() return } - } - // If all data is processed and stream should end - if (this.#hasEnded && this.#dataQueue.length === 0) { - super.push(null) + const data: DataRow = { + id: this.#currentId++, + name: `Name_${this.#currentId - 1}`, + value: Math.random() * 1000, + } + const canContinue = this.push(data) + + if (!canContinue) { + // console.log('Backpressure detected - pausing data production') + this.#isPaused = true + this.#stopProducing() + } else if (this.#currentId % 500 === 0) { + console.log(`Produced ${this.#currentId - 1} records`) + } + }, 1) + } + + #stopProducing() { + if (this.#intervalId) { + clearInterval(this.#intervalId) + this.#intervalId = null } + } - this.#isReading = false + start() { + this.#startProducing() } - #pushData(data: DataRow): boolean { - // Convert to JSON format for ClickHouse - const jsonLine = JSON.stringify(data) - return super.push(jsonLine) + _destroy(error: Error | null, callback: (error?: Error | null) => void) { + this.#stopProducing() + callback(error) } } void (async () => { - const client = createClient() const tableName = 'simple_streaming_demo' + const client = createClient() // Setup table await client.command({ @@ -87,60 +95,33 @@ void (async () => { `, }) - const dataStream = new DataStreamWithBackpressure() - const addDataToStream = () => { - const maxRecords = 10000 - let id = 1 - let addedCount = 0 - - const addBatch = () => { - // Add a batch of records - const batchSize = Math.min(100, maxRecords - addedCount) - - for (let i = 0; i < batchSize; i++) { - const success = dataStream.addData({ - id: id++, - name: `Name_${id}`, - value: Math.random() * 1000, - }) - - if (!success) { - console.log('Failed to add data - stream ended') - return - } - } - - addedCount += batchSize - console.log(`Added batch of ${batchSize} records (total: ${addedCount})`) - - if (addedCount < maxRecords) { - // Continue adding data with a small delay - setTimeout(addBatch, 10) - } else { - console.log('Finished adding data, ending stream') - dataStream.endStream() - } - } + const maxRecords = 10000 + console.log('Creating backpressure-aware data stream...') - addBatch() - } + const dataStream = new SimpleBackpressureStream(maxRecords) try { - console.log('Starting streaming insert...') + console.log('Starting streaming insert with backpressure demonstration...') - // Start adding data in the background - addDataToStream() - - // Perform the streaming insert - await client.insert({ + const insertPromise = client.insert({ table: tableName, values: dataStream, format: 'JSONEachRow', + clickhouse_settings: { + // Use async inserts to handle streaming data more efficiently + async_insert: 1, + wait_for_async_insert: 1, + async_insert_max_data_size: '10485760', // 10MB + async_insert_busy_timeout_ms: 1000, + }, }) + setTimeout(() => dataStream.start(), 100) + + await insertPromise + console.log('Insert completed successfully!') - // Verify the results const result = await client.query({ query: `SELECT count() as total FROM ${tableName}`, format: 'JSONEachRow', diff --git a/examples/node/insert_streaming_with_backpressure.ts b/examples/node/insert_streaming_with_backpressure.ts index 4a5ee1b2..187fdb86 100644 --- a/examples/node/insert_streaming_with_backpressure.ts +++ b/examples/node/insert_streaming_with_backpressure.ts @@ -1,7 +1,7 @@ import { createClient } from '@clickhouse/client' import type { Row } from '@clickhouse/client-common' -import * as Stream from 'stream' -import { EventEmitter } from 'events' +import * as Stream from 'node:stream' +import { EventEmitter } from 'node:events' interface DataRow { id: number @@ -59,15 +59,15 @@ class BackpressureAwareDataProducer extends Stream.Readable { return false } - // Convert data to JSON format for ClickHouse - const jsonLine = JSON.stringify({ + // Convert data to JSON object for ClickHouse + const jsonData = { id: data.id, timestamp: data.timestamp.toISOString(), message: data.message, value: data.value, - }) + } - const pushed = super.push(jsonLine) + const pushed = this.push(jsonData) if (pushed) { this.#total++ if (this.#total % 1000 === 0) { @@ -79,12 +79,12 @@ class BackpressureAwareDataProducer extends Stream.Readable { #handleDataSourceEnd() { console.log(`Data source ended. Total produced: ${this.#total} rows`) - super.push(null) + this.push(null) } #handleDataSourceError(error: Error) { console.error('Data source error:', error) - super.destroy(error) + this.destroy(error) } // Called when the stream is ready to accept more data (backpressure resolved) From 050c29b78b551dce64cb84d49685a46e35859070 Mon Sep 17 00:00:00 2001 From: eryue0220 Date: Tue, 30 Sep 2025 14:40:51 +0800 Subject: [PATCH 3/4] cr --- examples/node/insert_streaming_with_backpressure.ts | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/examples/node/insert_streaming_with_backpressure.ts b/examples/node/insert_streaming_with_backpressure.ts index 187fdb86..0c382d0e 100644 --- a/examples/node/insert_streaming_with_backpressure.ts +++ b/examples/node/insert_streaming_with_backpressure.ts @@ -154,8 +154,9 @@ class SimulatedDataSource extends EventEmitter { #generateBatch(size: number) { for (let i = 0; i < size; i++) { + const id = this.#total++ const data: DataRow = { - id: this.#total++, + id, timestamp: new Date(), message: `Message ${this.#total} - ${Math.random().toString(36).substring(7)}`, value: Math.random() * 1000, @@ -198,7 +199,7 @@ void (async () => { await client.command({ query: ` - CREATE TABLE ${tableName} + CREATE OR REPLACE ${tableName} ( id UInt64, timestamp DateTime, From e64b78d347d109785102733180bcae857b0a6f1d Mon Sep 17 00:00:00 2001 From: eryue0220 Date: Tue, 30 Sep 2025 15:00:44 +0800 Subject: [PATCH 4/4] cr --- examples/node/insert_streaming_backpressure_simple.ts | 2 +- examples/node/insert_streaming_with_backpressure.ts | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/examples/node/insert_streaming_backpressure_simple.ts b/examples/node/insert_streaming_backpressure_simple.ts index 7c813b89..92e75257 100644 --- a/examples/node/insert_streaming_backpressure_simple.ts +++ b/examples/node/insert_streaming_backpressure_simple.ts @@ -47,7 +47,7 @@ class SimpleBackpressureStream extends Stream.Readable { const canContinue = this.push(data) if (!canContinue) { - // console.log('Backpressure detected - pausing data production') + console.log('Backpressure detected - pausing data production') this.#isPaused = true this.#stopProducing() } else if (this.#currentId % 500 === 0) { diff --git a/examples/node/insert_streaming_with_backpressure.ts b/examples/node/insert_streaming_with_backpressure.ts index 0c382d0e..2d3cd649 100644 --- a/examples/node/insert_streaming_with_backpressure.ts +++ b/examples/node/insert_streaming_with_backpressure.ts @@ -158,7 +158,7 @@ class SimulatedDataSource extends EventEmitter { const data: DataRow = { id, timestamp: new Date(), - message: `Message ${this.#total} - ${Math.random().toString(36).substring(7)}`, + message: `Message ${id} - ${Math.random().toString(36).substring(7)}`, value: Math.random() * 1000, }