forked from ClickHouse/clickhouse-js
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathlong_running_queries_timeouts.ts
297 lines (277 loc) · 10.5 KB
/
long_running_queries_timeouts.ts
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
import { type ClickHouseClient, createClient } from '@clickhouse/client' // or '@clickhouse/client-web'
import * as crypto from 'crypto'
import type { SetIntervalAsyncTimer } from 'set-interval-async'
import { clearIntervalAsync, setIntervalAsync } from 'set-interval-async'
/**
* If you execute a long-running query without data coming in from the client,
* and your LB has idle connection timeout set to a value less than the query execution time,
* there is a workaround to trigger ClickHouse to send progress HTTP headers and make LB think that the connection is alive.
*
* This is the combination of `send_progress_in_http_headers` + `http_headers_progress_interval_ms` settings.
*
* One of the symptoms of such LB timeout might be a "socket hang up" error when `request_timeout` runs off,
* but in `system.query_log` the query is marked as completed with its execution time less than `request_timeout`.
*
* An alternative, but more "hacky" approach will be to use an HTTP interface "feature",
* where a cancelled mutation query will still continue to execute on the server.
*
* This example covers both approaches.
*
* @see https://clickhouse.com/docs/en/operations/settings/settings#send_progress_in_http_headers
* @see https://clickhouse.com/docs/en/interfaces/http
*/
void (async () => {
const tableName = 'insert_from_select'
// ------------------------------------------------------------------------------------------------------------------
// Example #1 - waiting for the entire time of the query execution.
// This is susceptible to transient network errors.
// See example #2 for a more "safe", but more hacky approach.
// ------------------------------------------------------------------------------------------------------------------
await (async () => {
console.info('Running example #1')
const client = createClient({
/* Here we assume that:
--- We need to execute a long-running query that will not send any data from the client
aside from the statement itself, and will not receive any data from the server during the progress.
An example of such statement will be INSERT FROM SELECT; the client will get the response only when it's done;
--- There is an LB with 120s idle timeout; a safe value for `http_headers_progress_interval_ms` could be 110 or 115s;
--- We estimate that the query will be completed in 300 to 350s at most;
so we choose the safe value of `request_timeout` as 400s.
Of course, the exact settings values will depend on your infrastructure configuration. */
request_timeout: 400_000,
clickhouse_settings: {
// Ask ClickHouse to periodically send query execution progress in HTTP headers, creating some activity in the connection.
// 1 here is a boolean value (true).
send_progress_in_http_headers: 1,
// The interval of sending these progress headers. Here it is less than 120s,
// which in this example is assumed to be the LB idle connection timeout.
// As it is UInt64 (UInt64 max value > Number.MAX_SAFE_INTEGER), it should be passed as a string.
http_headers_progress_interval_ms: '110000',
},
})
await createTestTable(client, tableName)
// Assuming that this is our long-long running insert,
// it should not fail because of LB and the client settings described above.
await client.command({
query: `
INSERT INTO ${tableName}
SELECT '42', 'foobar'
`,
})
const rows = await client.query({
query: `SELECT * FROM ${tableName}`,
format: 'JSONEachRow',
})
console.info('Example #1 - inserted data:', await rows.json())
console.info('---------------------------------------------------')
await client.close()
})()
// ------------------------------------------------------------------------------------------------------------------
// Example #2 - cancelling the outgoing HTTP request, keeping the query running. This is a more "hacky" approach.
// Unlike TCP/Native, mutations sent over HTTP are NOT cancelled on the server when the connection is interrupted.
//
// While this is hacky, it is also less prone to network errors, as we only periodically poll the query status,
// instead of waiting on the other side of the connection for the entire time.
//
// Inspired by https://github.com/ClickHouse/clickhouse-js/issues/244 and the discussion in this issue.
// See also: https://github.com/ClickHouse/ClickHouse/issues/49683 - once implemented, we will not need this hack.
// ------------------------------------------------------------------------------------------------------------------
await (async () => {
console.info('Running example #2')
const client = createClient({
// we don't need any extra settings here.
})
await createTestTable(client, tableName)
// Used to cancel the outgoing HTTP request (but not the query itself!).
// See more on cancelling the HTTP requests in examples/abort_request.ts.
const abortController = new AbortController()
// IMPORTANT: you HAVE to generate the known query_id on the client side to be able to cancel the query later.
const queryId = crypto.randomUUID()
// Assuming that this is our long-long running insert.
// IMPORTANT: do not wait for the promise to resolve yet.
const commandPromise = longRunningInsert(client, {
tableName,
queryId,
abortController,
})
// Waiting until the INSERT appears on the server in system.query_log.
// Once it is there, we can safely cancel the outgoing HTTP request.
const insertQueryExists = await pollOnInterval(
'CheckQueryExists',
() => checkQueryExists(client, queryId),
{
intervalMs: 100,
maxPolls: 50,
},
)
abortController.abort()
await commandPromise
if (!insertQueryExists) {
// The query is still not received by the server after a reasonable amount of time.
// We might assume that the query will not be executed. Handle this depending on your use case.
console.error(
'The query is not received by the server - assuming a failure.',
)
await client.close()
process.exit(1)
}
// Waiting until the query is completed on the server.
const isCompleted = await pollOnInterval(
'CheckCompletedQuery',
() => checkCompletedQuery(client, queryId),
{
maxPolls: 400,
intervalMs: 1000, // assuming that our query max execution time is 400s
},
)
if (isCompleted) {
console.info('The query is completed.')
} else {
// Handle this depending on your use case - you could wait a bit more, or cancel the query on the server.
// See examples/cancel_query.ts for the latter option.
console.error(
'The query is not completed after a reasonable amount of time.',
)
await client.close()
process.exit(1)
}
// Check the inserted data.
const rows = await client.query({
query: `SELECT * FROM ${tableName}`,
format: 'JSONEachRow',
})
console.info('Example #2 - inserted data:', await rows.json())
await client.close()
process.exit(0)
})()
})()
async function longRunningInsert(
client: ClickHouseClient,
{
abortController,
queryId,
tableName,
}: {
tableName: string
queryId: string
abortController: AbortController
},
): Promise<void> {
try {
await client.command({
query: `
INSERT INTO ${tableName}
SELECT toString(42 + inner.number), concat('foobar_', inner.number, '_', sleepEachRow(1))
FROM (SELECT number FROM system.numbers LIMIT 3) AS inner
`,
abort_signal: abortController.signal,
query_id: queryId,
})
} catch (err) {
if (err instanceof Error && err.message.includes('abort')) {
console.info(
'The request was aborted, but the query is still running on the server.',
)
} else {
console.error('Unexpected error:', err)
await client.close()
process.exit(1)
}
}
}
async function checkQueryExists(
client: ClickHouseClient,
queryId: string,
): Promise<boolean> {
const resultSet = await client.query({
query: `
SELECT COUNT(*) > 0 AS exists
FROM system.query_log
WHERE query_id = '${queryId}'
`,
format: 'JSONEachRow',
})
const result = await resultSet.json<{ exists: 0 | 1 }>()
console.log(`[Query ${queryId}] CheckQueryExists result:`, result)
return result.length > 0 && result[0].exists !== 0
}
type QueryLogInfo = {
type:
| 'QueryStart'
| 'QueryFinish'
| 'ExceptionBeforeStart'
| 'ExceptionWhileProcessing'
}
async function checkCompletedQuery(
client: ClickHouseClient,
queryId: string,
): Promise<boolean> {
const resultSet = await client.query({
query: `
SELECT type
FROM system.query_log
WHERE query_id = '${queryId}' AND type != 'QueryStart'
LIMIT 1
`,
format: 'JSONEachRow',
})
const result = await resultSet.json<QueryLogInfo>()
console.log(`[Query ${queryId}] CheckCompletedQuery result:`, result)
return result.length > 0 && result[0].type === 'QueryFinish'
}
async function pollOnInterval(
op: string,
fn: () => Promise<boolean>,
{
intervalMs,
maxPolls,
}: {
intervalMs: number
maxPolls: number
},
): Promise<boolean> {
let intervalId: SetIntervalAsyncTimer<[]> | undefined
const result: boolean = await new Promise((resolve) => {
let pollsCount = 1
// See https://www.npmjs.com/package/set-interval-async#when-should-i-use-setintervalasync
intervalId = setIntervalAsync(async () => {
try {
const success = await fn()
console.log(`[${op}] Poll #${pollsCount}: ${success}`)
if (success) {
resolve(true)
} else {
if (pollsCount < maxPolls) {
pollsCount++
return
}
console.error(`[${op}] Max polls count reached!`)
resolve(false)
}
} catch (err) {
console.error(`[${op}] Error while polling:`, err)
resolve(false)
}
}, intervalMs)
})
if (intervalId !== undefined) {
await clearIntervalAsync(intervalId)
}
return result
}
async function createTestTable(client: ClickHouseClient, tableName: string) {
try {
await client.command({
query: `
CREATE OR REPLACE TABLE ${tableName}
(id String, data String)
ENGINE MergeTree()
ORDER BY (id)
`,
})
} catch (err) {
console.error(`Error while creating the table ${tableName}:`, err)
await client.close()
process.exit(1)
}
}