Skip to content

Commit e3d0b89

Browse files
committed
reuse owned pointer when constructing
Signed-off-by: Bugen Zhao <[email protected]>
1 parent 7f07f70 commit e3d0b89

File tree

2 files changed

+8
-4
lines changed

2 files changed

+8
-4
lines changed

src/connector/src/source/cdc/source/reader.rs

+4-4
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@ use risingwave_common::bail;
2323
use risingwave_common::metrics::GLOBAL_ERROR_METRICS;
2424
use risingwave_common::util::addr::HostAddr;
2525
use risingwave_jni_core::jvm_runtime::{execute_with_jni_env, JVM};
26-
use risingwave_jni_core::{call_static_method, JniReceiverType};
26+
use risingwave_jni_core::{call_static_method, JniReceiverType, OwnedPointer};
2727
use risingwave_pb::connector_service::{GetEventStreamRequest, GetEventStreamResponse};
2828
use thiserror_ext::AsReport;
2929
use tokio::sync::mpsc;
@@ -130,15 +130,15 @@ impl<T: CdcSourceTypeTrait> SplitReader for CdcSplitReader<T> {
130130
};
131131

132132
// `runJniDbzSourceThread` will take ownership of `tx`, and release it later in
133-
// `Java_com_risingwave_java_binding_Binding_cdcSourceSenderClose` with `AutoClosable`.
134-
let tx = Box::into_raw(Box::new(tx));
133+
// `Java_com_risingwave_java_binding_Binding_cdcSourceSenderClose` via `AutoClosable`.
134+
let tx: OwnedPointer<_> = tx.into();
135135

136136
let result = call_static_method!(
137137
env,
138138
{com.risingwave.connector.source.core.JniDbzSourceHandler},
139139
{void runJniDbzSourceThread(byte[] getEventStreamRequestBytes, long channelPtr)},
140140
&get_event_stream_request_bytes,
141-
tx
141+
tx.pointer()
142142
);
143143

144144
match result {

src/jni_core/src/lib.rs

+4
Original file line numberDiff line numberDiff line change
@@ -151,6 +151,10 @@ impl<T> From<T> for Pointer<'static, T> {
151151
}
152152

153153
impl<'a, T> Pointer<'a, T> {
154+
pub fn pointer(&self) -> jlong {
155+
self.pointer
156+
}
157+
154158
fn as_ref(&self) -> &'a T {
155159
debug_assert!(self.pointer != 0);
156160
unsafe { &*(self.pointer as *const T) }

0 commit comments

Comments
 (0)