Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions examples/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,8 @@ If something is missing, or you found a mistake in one of these examples, please
- [insert_file_stream_parquet.ts](node/insert_file_stream_parquet.ts) - (Node.js only) stream a Parquet file into ClickHouse.
- [insert_arbitrary_format_stream.ts](node/insert_arbitrary_format_stream.ts) - (Node.js only) stream in arbitrary format into ClickHouse. In this case, the input format is [AVRO](https://clickhouse.com/docs/interfaces/formats/Avro), inserting the data from an Avro data file generated ad-hoc.
- [stream_created_from_array_raw.ts](node/stream_created_from_array_raw.ts) - (Node.js only) converting the string input into a stream and sending it to ClickHouse; in this scenario, the base input is a CSV string.
- [insert_streaming_with_backpressure.ts](node/insert_streaming_with_backpressure.ts) - (Node.js only) advanced streaming INSERT example with proper backpressure handling, demonstrating how to handle high-throughput scenarios where your application pushes data to the stream.
- [insert_streaming_backpressure_simple.ts](node/insert_streaming_backpressure_simple.ts) - (Node.js only) simple streaming INSERT example with backpressure handling, showing the essential pattern for streaming data from your application to ClickHouse.
- [insert_values_and_functions.ts](insert_values_and_functions.ts) - generating an `INSERT INTO ... VALUES` statement that uses a combination of values and function calls.
- [insert_ephemeral_columns.ts](insert_ephemeral_columns.ts) - inserting data into a table that has [ephemeral columns](https://clickhouse.com/docs/en/sql-reference/statements/create/table#ephemeral).

Expand Down
137 changes: 137 additions & 0 deletions examples/node/insert_streaming_backpressure_simple.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,137 @@
import { createClient } from '@clickhouse/client'
import * as Stream from 'node:stream'

interface DataRow {
id: number
name: string
value: number
}

class SimpleBackpressureStream extends Stream.Readable {
#currentId = 1
#maxRecords = 0
#intervalId: NodeJS.Timeout | null = null
#isPaused = false

constructor(maxRecords: number) {
super({ objectMode: true, highWaterMark: 5 })
this.#maxRecords = maxRecords
}

_read() {
if (this.#isPaused) {
console.log('Backpressure relieved - resuming data production')
this.#isPaused = false
this.#startProducing()
}
}

#startProducing() {
if (this.#intervalId || this.#currentId > this.#maxRecords) {
return
}

this.#intervalId = setInterval(() => {
if (this.#currentId > this.#maxRecords) {
console.log('All data produced, ending stream')
this.push(null) // End the stream
this.#stopProducing()
return
}

const data: DataRow = {
id: this.#currentId++,
name: `Name_${this.#currentId - 1}`,
value: Math.random() * 1000,
}
const canContinue = this.push(data)

if (!canContinue) {
console.log('Backpressure detected - pausing data production')
this.#isPaused = true
this.#stopProducing()
} else if (this.#currentId % 500 === 0) {
console.log(`Produced ${this.#currentId - 1} records`)
}
}, 1)
}

#stopProducing() {
if (this.#intervalId) {
clearInterval(this.#intervalId)
this.#intervalId = null
}
}

start() {
this.#startProducing()
}

_destroy(error: Error | null, callback: (error?: Error | null) => void) {
this.#stopProducing()
callback(error)
}
}

void (async () => {
const tableName = 'simple_streaming_demo'
const client = createClient()

// Setup table
await client.command({
query: `DROP TABLE IF EXISTS ${tableName}`,
})

await client.command({
query: `
CREATE TABLE ${tableName}
(
id UInt32,
name String,
value Float64
)
ENGINE = MergeTree()
ORDER BY id
`,
})

const maxRecords = 10000
console.log('Creating backpressure-aware data stream...')

const dataStream = new SimpleBackpressureStream(maxRecords)

try {
console.log('Starting streaming insert with backpressure demonstration...')

const insertPromise = client.insert({
table: tableName,
values: dataStream,
format: 'JSONEachRow',
clickhouse_settings: {
// Use async inserts to handle streaming data more efficiently
async_insert: 1,
wait_for_async_insert: 1,
async_insert_max_data_size: '10485760', // 10MB
async_insert_busy_timeout_ms: 1000,
},
})

setTimeout(() => dataStream.start(), 100)

await insertPromise

console.log('Insert completed successfully!')

const result = await client.query({
query: `SELECT count() as total FROM ${tableName}`,
format: 'JSONEachRow',
})

const [{ total }] = await result.json<{ total: string }>()
console.log(`Total records inserted: ${total}`)
} catch (error) {
console.error('Insert failed:', error)
} finally {
await client.close()
}
})()
Loading