Skip to content

Commit f6ccbfd

Browse files
committed
feat(query, topic): add TX lifecycle hooks and refactor tx-topic APIs
- Introduce TX.onCommit, TX.onRollback, and TX.onClose lifecycle hooks (replaces registerPrecommitHook). Hooks receive AbortSignal where relevant. - Execute commit hooks before commit, rollback hooks on errors, and close hooks in finally. - Refactor Topic TX APIs to be tx-first: - createTxReader(tx, options) and createTxWriter(tx, options); options no longer includes tx. - Topic TX Reader registers a commit hook to update read offsets in the transaction. - Update _update_offsets_in_transaction signature to (tx, driver, consumer, updates). - Adjust e2e tests to the new API and parameter order. BREAKING CHANGE: - TopicClient.createTxReader and createTxWriter now accept (tx, options) instead of options with tx. - TX.registerPrecommitHook is removed; use tx.onCommit, tx.onRollback, and tx.onClose instead. Signed-off-by: Vladislav Polyakov <[email protected]>
1 parent 3c6ef86 commit f6ccbfd

File tree

5 files changed

+52
-23
lines changed

5 files changed

+52
-23
lines changed

e2e/topic/read-write-tx.test.ts

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -69,13 +69,13 @@ test('writes and reads in tx', async () => {
6969

7070
// Begin a transaction
7171
await yql.begin({ idempotent: true }, async (tx) => {
72-
await using readerTx = createTopicTxReader(driver, {
72+
let readerTx = createTopicTxReader(driver, {
7373
tx,
7474
topic: testTopicName,
7575
consumer: testConsumerName,
7676
})
7777

78-
await using writerTx = createTopicTxWriter(driver, tx, {
78+
await using writerTx = createTopicTxWriter(tx, driver, {
7979
topic: testTopicName,
8080
producer: testProducerName,
8181
})
@@ -132,7 +132,7 @@ test('rollbacks reads', async () => {
132132

133133
await yql
134134
.begin({ idempotent: true }, async (tx) => {
135-
await using readerTx = createTopicTxReader(driver, {
135+
let readerTx = createTopicTxReader(driver, {
136136
tx,
137137
topic: testTopicName,
138138
consumer: testConsumerName,
@@ -186,7 +186,7 @@ test('rollbacks writes', async () => {
186186

187187
await yql
188188
.begin({ idempotent: true }, async (tx) => {
189-
await using writerTx = createTopicTxWriter(driver, tx, {
189+
await using writerTx = createTopicTxWriter(tx, driver, {
190190
topic: testTopicName,
191191
producer: testProducerName,
192192
})

packages/query/src/index.ts

Lines changed: 37 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,9 @@ export type TX = SQL & {
2424
nodeId: bigint
2525
sessionId: string
2626
transactionId: string
27-
registerPrecommitHook: RegisterPrecommitHook
27+
onRollback: (fn: (error: unknown, signal?: AbortSignal) => Promise<void> | void) => void
28+
onCommit: (fn: (signal?: AbortSignal) => Promise<void> | void) => void
29+
onClose: (fn: (committed: boolean, signal?: AbortSignal) => Promise<void> | void) => void
2830
}
2931

3032
interface SessionContextCallback<T> {
@@ -194,25 +196,35 @@ export function query(driver: Driver): QueryClient {
194196

195197
store.transactionId = beginTransactionResult.txMeta!.id
196198

199+
let сommitHooks: Array<(signal?: AbortSignal) => Promise<void> | void> = []
200+
let rollbackHooks: Array<(error: unknown, signal?: AbortSignal) => Promise<void> | void> = []
201+
let closeHooks: Array<(committed: boolean, signal?: AbortSignal) => Promise<void> | void> = []
202+
203+
let commited = false
197204
try {
198-
let precommitHooks: Array<() => Promise<void> | void> = []
199205
let tx = Object.assign(yqlQuery, {
200206
nodeId: store.nodeId,
201207
sessionId: store.sessionId,
202208
transactionId: store.transactionId,
203-
registerPrecommitHook: (fn: () => Promise<void> | void) => {
204-
precommitHooks.push(fn);
209+
onRollback: (fn: () => Promise<void> | void) => {
210+
rollbackHooks.push(fn);
211+
},
212+
onCommit: (fn: () => Promise<void> | void) => {
213+
сommitHooks.push(fn);
214+
},
215+
onClose: (fn: () => Promise<void> | void) => {
216+
closeHooks.push(fn);
205217
},
206218
}) as TX
207219

208220
dbg.log('executing transaction body')
209221
let result = await ctx.run(store, () => caller!(tx, signal))
210222

211-
dbg.log('executing %d precommit hooks', precommitHooks.length)
212-
await Promise.all(precommitHooks.map(async (hook, i) => {
213-
dbg.log('executing precommit hook #%d', i + 1)
214-
await hook()
215-
dbg.log('precommit hook #%d completed', i + 1)
223+
dbg.log('executing %d commit hooks', сommitHooks.length)
224+
await Promise.all(сommitHooks.map(async (hook, i) => {
225+
dbg.log('executing commit hook #%d', i + 1)
226+
await hook(signal)
227+
dbg.log('commit hook #%d completed', i + 1)
216228
}))
217229

218230
dbg.log('committing transaction')
@@ -222,10 +234,19 @@ export function query(driver: Driver): QueryClient {
222234
throw new CommitError("Transaction commit failed.", new YDBError(commitResult.status, commitResult.issues))
223235
}
224236

237+
commited = true
225238
dbg.log('transaction committed successfully')
226239
return result
227240
} catch (error) {
228241
dbg.log('transaction error: %O', error)
242+
243+
dbg.log('executing %d rollback hooks', сommitHooks.length)
244+
await Promise.all(rollbackHooks.map(async (hook, i) => {
245+
dbg.log('executing rollback hook #%d', i + 1)
246+
await hook(error, signal)
247+
dbg.log('rollback hook #%d completed', i + 1)
248+
}))
249+
229250
void client.rollbackTransaction({ sessionId: store.sessionId, txId: store.transactionId })
230251

231252
if (!isRetryableError(error, options.idempotent)) {
@@ -237,6 +258,13 @@ export function query(driver: Driver): QueryClient {
237258
} finally {
238259
dbg.log('deleting session %s', sessionResponse.sessionId)
239260
void client.deleteSession({ sessionId: sessionResponse.sessionId })
261+
262+
dbg.log('executing %d close hooks', сommitHooks.length)
263+
await Promise.all(closeHooks.map(async (hook, i) => {
264+
dbg.log('executing close hook #%d', i + 1)
265+
await hook(commited, signal)
266+
dbg.log('close hook #%d completed', i + 1)
267+
}))
240268
}
241269
})
242270
}

packages/topic/src/index.ts

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@ import type { TX } from "./tx.js";
66

77
export interface TopicClient {
88
createReader(options: TopicReaderOptions): TopicReader;
9-
createTxReader(options: TopicTxReaderOptions): TopicTxReader;
9+
createTxReader(tx: TX, options: TopicTxReaderOptions): TopicTxReader;
1010
createWriter(options: TopicWriterOptions): TopicWriter;
1111
createTxWriter(tx: TX, options: TopicWriterOptions): TopicWriter;
1212
}
@@ -16,13 +16,13 @@ export function topic(driver: Driver): TopicClient {
1616
createReader(options) {
1717
return createTopicReader(driver, options);
1818
},
19-
createTxReader(options: TopicTxReaderOptions) {
20-
return createTopicTxReader(driver, options);
19+
createTxReader(tx: TX, options: Omit<TopicTxReaderOptions, 'tx'>) {
20+
return createTopicTxReader(tx, driver, options);
2121
},
2222
createWriter(options: TopicWriterOptions) {
2323
return createTopicWriter(driver, options);
2424
},
25-
createTxWriter(tx: TX, options: TopicWriterOptions) {
25+
createTxWriter(tx: TX, options: Omit<TopicWriterOptions, 'tx'>) {
2626
return createTopicTxWriter(tx, driver, options);
2727
},
2828
} as TopicClient

packages/topic/src/reader/_update_offsets_in_transaction.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,8 +16,8 @@ import type { TopicPartitionSession } from "../partition-session.js"
1616
let dbg = loggers.topic.extend('reader')
1717

1818
export let _update_offsets_in_transaction = async function updateOffsetsInTransaction(
19-
driver: Driver,
2019
tx: TX,
20+
driver: Driver,
2121
consumer: string,
2222
updates: Array<{
2323
partitionSession: TopicPartitionSession

packages/topic/src/reader/index.ts

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@ import { _read } from "./_read.js"
1414
import { _commit } from "./_commit.js"
1515
import { _update_offsets_in_transaction } from "./_update_offsets_in_transaction.js"
1616
import { _create_disposal_functions, _initialize_codecs, _start_background_token_refresher } from "./_shared.js"
17+
import type { TX } from "../tx.js"
1718

1819
let dbg = loggers.topic.extend('reader')
1920

@@ -164,12 +165,12 @@ export const createTopicReader = function createTopicReader(driver: Driver, opti
164165
// Re-export types for compatibility
165166
export type { TopicReaderOptions, TopicReader, TopicTxReaderOptions, TopicTxReader } from "./types.js"
166167

167-
export const createTopicTxReader = function createTopicTxReader(driver: Driver, options: TopicTxReaderOptions): TopicTxReader {
168+
export const createTopicTxReader = function createTopicTxReader(tx: TX, driver: Driver, options: Omit<TopicTxReaderOptions, 'tx'>): TopicTxReader {
168169
options.updateTokenIntervalMs ??= 60_000 // Default is 60 seconds.
169170

170171
let state: TopicTxReaderState = {
171172
driver,
172-
options,
173+
options: Object.assign(options, { tx }),
173174
topicsReadSettings: _parse_topics_read_settings(options.topic),
174175

175176
// Control
@@ -194,7 +195,7 @@ export const createTopicTxReader = function createTopicTxReader(driver: Driver,
194195
_initialize_codecs(state.codecs, options.codecMap)
195196

196197
// Register precommit hook to send updateOffsetsInTransaction
197-
options.tx.onCommit(async () => {
198+
tx.onCommit(async () => {
198199
// Send updateOffsetsInTransaction for all read offsets
199200
let updates = []
200201

@@ -212,8 +213,8 @@ export const createTopicTxReader = function createTopicTxReader(driver: Driver,
212213

213214
if (updates.length > 0) {
214215
await _update_offsets_in_transaction(
216+
tx,
215217
state.driver,
216-
options.tx,
217218
state.options.consumer,
218219
updates
219220
)
@@ -265,8 +266,8 @@ export const createTopicTxReader = function createTopicTxReader(driver: Driver,
265266
if (updates.length > 0) {
266267
try {
267268
await _update_offsets_in_transaction(
269+
tx,
268270
state.driver,
269-
options.tx,
270271
state.options.consumer,
271272
updates
272273
)

0 commit comments

Comments
 (0)