Skip to content

Conversation

flyingsilverfin
Copy link
Member

@flyingsilverfin flyingsilverfin commented Oct 2, 2025

Usage and product changes

We notice that calling transaction.close() does not wait until the server has freed up resource. This makes quick sequences, such as tests where transactions open and are followed by database deletes, unreliable. Further investigation that workarounds using the existing on_close callbacks in Python and Java caused segfaults. We fix both:

  1. Transaction.close() in Python and Java now blocks for 1 round trip. In Rust, this now returns a promise/future. In Java/Python, we pick the most relevant default and resolve the promise from Java/Python.
  2. We fix segfaults that occur when the Rust driver calls into Python/Java once the user attaches .on_close callbacks to transactions.

We also fix nondeterministic errors:

  1. adding on_close callbacks must return a promise, since the implementation injects the callback into our lowest-level listener loop which may register the callback later. Not awaiting the on_close() registration will lead to hit or miss execution of the callback when registering on_close callbacks, not awaiting, and then closing the transaction immediately
  2. we add keepalive to the channel, without which messages sometimes get "stuck" on the client-side receiving end of responses from the server. No further clues found as to why this happens. See comments for more detail.

We also add one major feature enhancement: configurable logging. All logging should now go through the tracing crate. We can configure logging levels for just the driver library with the TYPEDB_DRIVER_LOG or general RUST_LOG environment variables. By default we set it to info.

Implementation

  • Fix and enhance on_close callbacks:

    • on attaching a callback, we don't return until the callback is actually registered (used to submit into an async channel, but not necessarily be recorded)
      • this is also sped up by having the lowest-level registration listener listen in an async context instead of a polling context
    • we fix calling segfault that occurred on invoking the callback from Rust, mostly by enabling threading from the SWIG .i layer!
  • Make close() a promise in Rust, which can be awaited, and a blocking operation in Java and Python, which awaits a signal from the server that the transaction is actually closed and the resources are freed up.

  • We add on_close callback integration tests for Python, Java, and Rust

  • add keepaliveto the channel, which prevents some nondeterministic message delays/delivery failures.

Further notes

Mysterious lost responses

It appears that server responses (in particular, the transaction open response) sometimes never gets delivered into our code. This only is reproducible in the localstack demo https://github.com/typedb-osi/typedb-localstack-demo, and there non-deterministically!

We see:

  • Driver: send open transaction request
  • Server: receive open txn request OpenTransaction.Req
  • Server: open txn, response with OpenTransaction.Res

These are confirmed with Wireshark.

The client side actually receives something. If we add logging into stub.rs:

    let stream = this
      .grpc
      .transaction(UnboundedReceiverStream::new(receiver))
     .map(...)
     .await
trace!("Received response to txn open request!")

This actually returns a usable grpc stream successfully -- however, the initial OpenTransaction.Res message doesn't arrive until "something else" happens, such as the stream closing, or a keepalive ping it sent.

It's very strange but the keepalive ping at being set at 3 seconds does force the message to arrive at some point...

type: string
steps:
- run: |
brew install [email protected]
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixes mac build issues

- deploy-snapshot-mac-x86_64:
filters:
branches:
only: [development, master, "3.0"]
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

unused

/// Closes the transaction and frees the native rust object.
#[no_mangle]
pub extern "C" fn transaction_close(txn: *mut Transaction) {
pub extern "C" fn transaction_submit_close(txn: *mut Transaction) {
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

New paradigm i chose:
close() returns a promise and does not take ownership.
We also have a submit_close(), which a fire-and-forget equivalent that is used by drop since we can't block or resolve promises there.

Note: we still use force_close on driver for server connections. Didn't change it there!

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@dmitrii-ubskii @farost important change!

@flyingsilverfin flyingsilverfin changed the title Improve on close callback Fix transaction on_close and Java and Python block on close() Oct 2, 2025
* under the License.
*/

%module(threads=1) typedb_driver
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Key point to make the python on_close callbacks work - segfaults are caused by multiple threads interacting. I think this makes SWIG interact with Python's GIL correctly

#include <iostream>
#include <mutex>
#include <unordered_map>
static std::unordered_map<size_t, TransactionCallbackDirector*> transactionOnCloseCallbacks {};
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This had a race condition if the driver calls a callback at the same time as a user adding one to a new transaction.

ThreadSafeTransactionCallbacks(const ThreadSafeTransactionCallbacks&) = delete;
ThreadSafeTransactionCallbacks& operator=(const ThreadSafeTransactionCallbacks&) = delete;

// --- Core Operations ---
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes this was AI generated

%inline %{
#include <atomic>
void transaction_on_close_register(const Transaction* transaction, TransactionCallbackDirector* handler) {
VoidPromise* transaction_on_close_register(const Transaction* transaction, TransactionCallbackDirector* handler) {
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

on_close callback registration also returns a promise, since we're not guaranteed that the on_close callback is registered when this returns.


void Transaction::close() {
if (transactionNative != nullptr) _native::transaction_close(transactionNative.release());
void Transaction::submitClose() {
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

probably irrelevant to do this in C++ tbh

TransactionOnClose callback = new TransactionOnClose(function);
callbacks.add(callback);
transaction_on_close(nativeObject, callback.released());
transaction_on_close(nativeObject, callback.released()).get();
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Java blocks and resolves the promises for both on_close() and for close() for ease of use/so users don't forget to do it. I think in Rust people are more familiar with awaiting futures when they need to & the tooling around it is better

)

typedb_java_test(
name = "test-driver",
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

new place to test driver impl as an integration test

)

py_test(
name = "test_driver",
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Python integration test location

def on_close(self, function: callable):
transaction_on_close(self.native_object, _Transaction.TransactionOnClose(function).__disown__())
callback = _Transaction.TransactionOnClose(function)
void_promise_resolve(transaction_on_close(self.native_object, callback.__disown__()))
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Python also actively resolves on_close and close() for the user

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@dmitrii-ubskii @farost important to know in the new settings

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

(Java's the same)

is_open: Arc<AtomicCell<bool>>,
error: Arc<RwLock<Option<Error>>>,
on_close_register_sink: UnboundedSender<Box<dyn FnOnce(Option<Error>) + Send + Sync>>,
on_close_register_sink: UnboundedSender<(Box<dyn FnOnce(Option<Error>) + Send + Sync>, UnboundedSender<()>)>,
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Awaitable notifier to know when the on_close is guaranteed to be registered

impl Drop for TransactionTransmitter {
fn drop(&mut self) {
self.force_close();
self.submit_close();
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Usage of new submit_close fire and forget drop


pub(in crate::connection) fn force_close(&self) {
#[cfg(not(feature = "sync"))]
pub(in crate::connection) fn close(&self) -> impl Promise<'_, Result<()>> {
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

close() now returns a promise. This is a bit ugly but probably ok?

Whole thing inside needs to be in the future so that it either all runs when awaited or not at all

Comment on lines +125 to +126
let close_notifier_callback = Box::new(move |error| {
closed_sink.send(()).unwrap();
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is kinda a hack but honestly might be ok

pub(in crate::connection) fn on_close(
&self,
callback: impl FnOnce(Option<Error>) + Send + Sync + 'static,
) -> impl Promise<'_, Result<()>> {
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

on_close is also implemented as a promise now

move || {
Self::dispatch_loop(queue_source, request_sink, collector, on_close_callback_source, shutdown_signal)
}
move || Self::sync_dispatch_loop(queue_source, request_sink, collector, shutdown_signal)
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since we just have to get the on_close_callback into the Collector in some way, we can choose whether we lsiten to the channel in the async listen loop or the sync dispatch loop. To minimize waits, we're going to let Tokio manage it in the Async loop.

driver_options.tls_config().clone().expect("TLS config object must be set when TLS is enabled");
builder = builder.tls_config(tls_config)?;
}
builder = builder.http2_keep_alive_interval(Duration::from_secs(10)).keep_alive_while_idle(true);
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

so this is a bit of mystery:
without it (this was AI suggested), it appears that server responses (in particular, the transaction open response) never gets delivered into our code.

Ie. we see:
Driver: send open transaction request
Server: receive open txn request OpenTransaction.Req
Server: open txn, response with OpenTransaction.Res

(confirmed the above with wireshark).

The client side actually receives _something (from stub.rs):

this
                    .grpc
                    .transaction(UnboundedReceiverStream::new(receiver))

when awaited, this actually returns a stream successfully -- however, the OpenTransaction.Res message doesn't arrive until "something else" happens, such as the stream closing because of a transaction timeout...

It's very strange but since this solves it and it's sucked up a ton of time, I'm going to leave this in.

request = request_source.recv() => request,
_ = shutdown_signal.recv() => None,
} {
trace!("RPC dispatcher loop received request {:?}", request);
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we want to see everything happening, on TRACE we'll basically see every message in and out

self.transaction_transmitter.on_close(callback)
}

pub(crate) fn close(&self) -> impl Promise<'_, Result<()>> {
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So here's a question: why is it ok for close() to be a true promise, while the variants below for commit and rollback are resolved and re-emitted as promises? We should probably pick 1 approach

cc @farost @dmitrii-ubskii

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We wait for answers from the server in the variants below

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

And probably convert them

///
/// The logging is initialized only once using a static flag to prevent
/// multiple initializations in applications that create multiple drivers.
pub fn init_logging() {
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

New feature: i added some basic but also easily extensible logging throughout. We can configure it at runtime with environment variables!

}))
.await;

transaction.close().await;
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

verify: if we await the close() future, we have guaranteed all thea llocated callbacks are executed.

@flyingsilverfin flyingsilverfin marked this pull request as ready for review October 2, 2025 22:54
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants