diff --git a/examples/README.md b/examples/README.md index 42adf0fa..02def739 100644 --- a/examples/README.md +++ b/examples/README.md @@ -45,6 +45,8 @@ If something is missing, or you found a mistake in one of these examples, please - [insert_file_stream_parquet.ts](node/insert_file_stream_parquet.ts) - (Node.js only) stream a Parquet file into ClickHouse. - [insert_arbitrary_format_stream.ts](node/insert_arbitrary_format_stream.ts) - (Node.js only) stream in arbitrary format into ClickHouse. In this case, the input format is [AVRO](https://clickhouse.com/docs/interfaces/formats/Avro), inserting the data from an Avro data file generated ad-hoc. - [stream_created_from_array_raw.ts](node/stream_created_from_array_raw.ts) - (Node.js only) converting the string input into a stream and sending it to ClickHouse; in this scenario, the base input is a CSV string. +- [insert_streaming_with_backpressure.ts](node/insert_streaming_with_backpressure.ts) - (Node.js only) advanced streaming INSERT example with proper backpressure handling, demonstrating how to handle high-throughput scenarios where your application pushes data to the stream. +- [insert_streaming_backpressure_simple.ts](node/insert_streaming_backpressure_simple.ts) - (Node.js only) simple streaming INSERT example with backpressure handling, showing the essential pattern for streaming data from your application to ClickHouse. - [insert_values_and_functions.ts](insert_values_and_functions.ts) - generating an `INSERT INTO ... VALUES` statement that uses a combination of values and function calls. - [insert_ephemeral_columns.ts](insert_ephemeral_columns.ts) - inserting data into a table that has [ephemeral columns](https://clickhouse.com/docs/en/sql-reference/statements/create/table#ephemeral). diff --git a/examples/node/insert_streaming_backpressure_simple.ts b/examples/node/insert_streaming_backpressure_simple.ts new file mode 100644 index 00000000..92e75257 --- /dev/null +++ b/examples/node/insert_streaming_backpressure_simple.ts @@ -0,0 +1,137 @@ +import { createClient } from '@clickhouse/client' +import * as Stream from 'node:stream' + +interface DataRow { + id: number + name: string + value: number +} + +class SimpleBackpressureStream extends Stream.Readable { + #currentId = 1 + #maxRecords = 0 + #intervalId: NodeJS.Timeout | null = null + #isPaused = false + + constructor(maxRecords: number) { + super({ objectMode: true, highWaterMark: 5 }) + this.#maxRecords = maxRecords + } + + _read() { + if (this.#isPaused) { + console.log('Backpressure relieved - resuming data production') + this.#isPaused = false + this.#startProducing() + } + } + + #startProducing() { + if (this.#intervalId || this.#currentId > this.#maxRecords) { + return + } + + this.#intervalId = setInterval(() => { + if (this.#currentId > this.#maxRecords) { + console.log('All data produced, ending stream') + this.push(null) // End the stream + this.#stopProducing() + return + } + + const data: DataRow = { + id: this.#currentId++, + name: `Name_${this.#currentId - 1}`, + value: Math.random() * 1000, + } + const canContinue = this.push(data) + + if (!canContinue) { + console.log('Backpressure detected - pausing data production') + this.#isPaused = true + this.#stopProducing() + } else if (this.#currentId % 500 === 0) { + console.log(`Produced ${this.#currentId - 1} records`) + } + }, 1) + } + + #stopProducing() { + if (this.#intervalId) { + clearInterval(this.#intervalId) + this.#intervalId = null + } + } + + start() { + this.#startProducing() + } + + _destroy(error: Error | null, callback: (error?: Error | null) => void) { + this.#stopProducing() + callback(error) + } +} + +void (async () => { + const tableName = 'simple_streaming_demo' + const client = createClient() + + // Setup table + await client.command({ + query: `DROP TABLE IF EXISTS ${tableName}`, + }) + + await client.command({ + query: ` + CREATE TABLE ${tableName} + ( + id UInt32, + name String, + value Float64 + ) + ENGINE = MergeTree() + ORDER BY id + `, + }) + + const maxRecords = 10000 + console.log('Creating backpressure-aware data stream...') + + const dataStream = new SimpleBackpressureStream(maxRecords) + + try { + console.log('Starting streaming insert with backpressure demonstration...') + + const insertPromise = client.insert({ + table: tableName, + values: dataStream, + format: 'JSONEachRow', + clickhouse_settings: { + // Use async inserts to handle streaming data more efficiently + async_insert: 1, + wait_for_async_insert: 1, + async_insert_max_data_size: '10485760', // 10MB + async_insert_busy_timeout_ms: 1000, + }, + }) + + setTimeout(() => dataStream.start(), 100) + + await insertPromise + + console.log('Insert completed successfully!') + + const result = await client.query({ + query: `SELECT count() as total FROM ${tableName}`, + format: 'JSONEachRow', + }) + + const [{ total }] = await result.json<{ total: string }>() + console.log(`Total records inserted: ${total}`) + } catch (error) { + console.error('Insert failed:', error) + } finally { + await client.close() + } +})() diff --git a/examples/node/insert_streaming_with_backpressure.ts b/examples/node/insert_streaming_with_backpressure.ts new file mode 100644 index 00000000..2d3cd649 --- /dev/null +++ b/examples/node/insert_streaming_with_backpressure.ts @@ -0,0 +1,301 @@ +import { createClient } from '@clickhouse/client' +import type { Row } from '@clickhouse/client-common' +import * as Stream from 'node:stream' +import { EventEmitter } from 'node:events' + +interface DataRow { + id: number + timestamp: Date + message: string + value: number +} + +class BackpressureAwareDataProducer extends Stream.Readable { + #dataSource: EventEmitter + #streamPaused = false + #pendingData: DataRow[] = [] + #isDestroyed = false + #total = 0 + + constructor(dataSource: EventEmitter, options?: Stream.ReadableOptions) { + super({ + // Required for JSON* formats + objectMode: true, + // Limit buffering to prevent memory issues + highWaterMark: 16, + ...options, + }) + + this.#dataSource = dataSource + this.#setupDataSourceListeners() + } + + #setupDataSourceListeners() { + this.#dataSource.on('data', this.#handleIncomingData.bind(this)) + this.#dataSource.on('end', this.#handleDataSourceEnd.bind(this)) + this.#dataSource.on('error', this.#handleDataSourceError.bind(this)) + } + + #handleIncomingData(data: DataRow) { + if (this.#isDestroyed) { + return + } + + if (this.#streamPaused) { + this.#pendingData.push(data) + return + } + + // Try to push the data immediately + if (!this.#pushData(data)) { + // If push returns false, we're experiencing backpressure + // Pause the data source and buffer subsequent data + this.#streamPaused = true + } + } + + #pushData(data: DataRow): boolean { + if (this.#isDestroyed) { + return false + } + + // Convert data to JSON object for ClickHouse + const jsonData = { + id: data.id, + timestamp: data.timestamp.toISOString(), + message: data.message, + value: data.value, + } + + const pushed = this.push(jsonData) + if (pushed) { + this.#total++ + if (this.#total % 1000 === 0) { + console.log(`Produced ${this.#total} rows`) + } + } + return pushed + } + + #handleDataSourceEnd() { + console.log(`Data source ended. Total produced: ${this.#total} rows`) + this.push(null) + } + + #handleDataSourceError(error: Error) { + console.error('Data source error:', error) + this.destroy(error) + } + + // Called when the stream is ready to accept more data (backpressure resolved) + _read() { + if (this.#streamPaused && this.#pendingData.length > 0) { + // Process buffered data when backpressure is resolved + this.#streamPaused = false + + // Push all pending data, but stop if we hit backpressure again + while (this.#pendingData.length > 0 && !this.#streamPaused) { + const data = this.#pendingData.shift() + if (!data || !this.#pushData(data)) { + this.#streamPaused = true + break + } + } + } + } + + _destroy(error: Error | null, callback: (error?: Error | null) => void) { + this.#isDestroyed = true + this.#dataSource.removeAllListeners('data') + this.#dataSource.removeAllListeners('end') + this.#dataSource.removeAllListeners('error') + callback(error) + } + + get total(): number { + return this.#total + } +} + +// Simulated data source that generates data at varying rates +class SimulatedDataSource extends EventEmitter { + #intervalHandle: NodeJS.Timeout | null = null + #isRunning = false + #total = 0 + #burstMode = false + + start() { + if (this.#isRunning) return + + this.#isRunning = true + this.#scheduleNextBatch() + + // Randomly switch between normal and burst modes + setInterval(() => { + this.#burstMode = !this.#burstMode + console.log(`Switched to ${this.#burstMode ? 'burst' : 'normal'} mode`) + }, 10000) + } + + #scheduleNextBatch() { + if (!this.#isRunning) return + + // Variable delay to simulate real-world conditions + const delay = this.#burstMode ? 10 : Math.random() * 100 + 50 + const batchSize = this.#burstMode + ? Math.floor(Math.random() * 100) + 50 + : Math.floor(Math.random() * 10) + 1 + + this.#intervalHandle = setTimeout(() => { + this.#generateBatch(batchSize) + this.#scheduleNextBatch() + }, delay) + } + + #generateBatch(size: number) { + for (let i = 0; i < size; i++) { + const id = this.#total++ + const data: DataRow = { + id, + timestamp: new Date(), + message: `Message ${id} - ${Math.random().toString(36).substring(7)}`, + value: Math.random() * 1000, + } + + this.emit('data', data) + } + } + + stop() { + this.#isRunning = false + if (this.#intervalHandle) { + clearTimeout(this.#intervalHandle) + this.#intervalHandle = null + } + this.emit('end') + } + + get total(): number { + return this.#total + } +} + +void (async () => { + const tableName = 'streaming_backpressure_demo' + const client = createClient({ + // Configure client for high-throughput scenarios + max_open_connections: 5, + compression: { + request: true, + response: true, + }, + }) + + console.log('Setting up table...') + + await client.command({ + query: `DROP TABLE IF EXISTS ${tableName}`, + }) + + await client.command({ + query: ` + CREATE OR REPLACE ${tableName} + ( + id UInt64, + timestamp DateTime, + message String, + value Float64 + ) + ENGINE = MergeTree() + ORDER BY (id, timestamp) + `, + }) + + // Create data source and producer + const dataSource = new SimulatedDataSource() + const dataProducer = new BackpressureAwareDataProducer(dataSource) + + // Start generating data + console.log('Starting data generation...') + dataSource.start() + + // Handle graceful shutdown + const cleanup = async () => { + console.log('\nShutting down gracefully...') + dataSource.stop() + + // Wait a bit for any remaining data to be processed + await new Promise((resolve) => setTimeout(resolve, 1000)) + + console.log(`Final stats:`) + console.log(`- Generated: ${dataSource.total} rows`) + console.log(`- Produced: ${dataProducer.total} rows`) + + await client.close() + process.exit(0) + } + + process.on('SIGINT', cleanup) + process.on('SIGTERM', cleanup) + + try { + console.log('Starting streaming insert...') + const startTime = Date.now() + + await client.insert({ + table: tableName, + values: dataProducer, + format: 'JSONEachRow', + clickhouse_settings: { + // Optimize for streaming inserts + async_insert: 1, + wait_for_async_insert: 1, + async_insert_max_data_size: '10485760', // 10MB + async_insert_busy_timeout_ms: 1000, + }, + }) + + const duration = Date.now() - startTime + + console.log(`\nInsert completed in ${duration}ms`) + console.log(`Inserted ${dataProducer.total} rows`) + console.log('\nVerifying inserted data...') + + const result = await client.query({ + query: ` + SELECT + count() as total_rows, + min(timestamp) as min_timestamp, + max(timestamp) as max_timestamp, + avg(value) as avg_value + FROM ${tableName} + `, + format: 'JSONEachRow', + }) + + const stats = await result.json<{ + total_rows: string + min_timestamp: string + max_timestamp: string + avg_value: string + }>() + + console.log('Verification results:', stats[0]) + + const sampleResult = await client.query({ + query: `SELECT * FROM ${tableName} ORDER BY id LIMIT 5`, + format: 'JSONEachRow', + }) + + console.log('\nSample data:') + for await (const rows of sampleResult.stream()) { + rows.forEach((row: Row) => { + console.log(row.json()) + }) + } + } catch (error) { + console.error('Insert failed:', error) + } finally { + await cleanup() + } +})()