Skip to content

Commit a7c4ff3

Browse files
committed
Adding JNI abstractions for logging in rust through RustLoggerBridge
Signed-off-by: Raghuvansh Raj <[email protected]>
1 parent 23e4717 commit a7c4ff3

File tree

4 files changed

+176
-85
lines changed

4 files changed

+176
-85
lines changed

modules/parquet-data-format/src/main/java/com/parquet/parquetdataformat/bridge/RustBridge.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,8 +21,13 @@ public class RustBridge {
2121

2222
static {
2323
NativeLibraryLoader.load("parquet_dataformat_jni");
24+
25+
initLogger();
2426
}
2527

28+
// Logger initialization method
29+
public static native void initLogger();
30+
2631
// Enhanced native methods that handle validation and provide better error reporting
2732
public static native void createWriter(String file, long schemaAddress) throws IOException;
2833
public static native void write(String file, long arrayAddress, long schemaAddress) throws IOException;
Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,34 @@
1+
/*
2+
* SPDX-License-Identifier: Apache-2.0
3+
*
4+
* The OpenSearch Contributors require contributions made to
5+
* this file be licensed under the Apache-2.0 license or a
6+
* compatible open source license.
7+
*/
8+
9+
package com.parquet.parquetdataformat.bridge;
10+
11+
import org.slf4j.Logger;
12+
import org.slf4j.LoggerFactory;
13+
14+
public class RustLoggerBridge {
15+
16+
private static final Logger logger = LoggerFactory.getLogger(RustLoggerBridge.class);
17+
18+
// Instance methods for direct Java usage
19+
public static void logInfo(String message) {
20+
logger.info(message);
21+
}
22+
23+
public static void logWarn(String message) {
24+
logger.warn(message);
25+
}
26+
27+
public static void logError(String message) {
28+
logger.error(message);
29+
}
30+
31+
public static void logDebug(String message) {
32+
logger.debug(message);
33+
}
34+
}

modules/parquet-data-format/src/main/rust/src/lib.rs

Lines changed: 34 additions & 85 deletions
Original file line numberDiff line numberDiff line change
@@ -1,19 +1,17 @@
11
use jni::objects::{JClass, JString};
22
use jni::sys::{jint, jlong};
3-
use jni::JNIEnv;
3+
use jni::{JNIEnv, JavaVM};
44
use dashmap::DashMap;
55
use arrow::record_batch::RecordBatch;
66
use parquet::arrow::ArrowWriter;
77
use std::fs::File;
88
use std::sync::{Arc, Mutex};
99
use lazy_static::lazy_static;
1010
use arrow::ffi::{FFI_ArrowSchema, FFI_ArrowArray};
11-
use std::fs::OpenOptions;
12-
use std::io::Write;
13-
use chrono::Utc;
1411
use parquet::basic::{Compression, ZstdLevel};
1512
use parquet::file::properties::WriterProperties;
1613

14+
pub mod logger;
1715
pub mod parquet_merge;
1816
pub use parquet_merge::*;
1917

@@ -27,21 +25,15 @@ struct NativeParquetWriter;
2725
impl NativeParquetWriter {
2826

2927
fn create_writer(filename: String, schema_address: i64) -> Result<(), Box<dyn std::error::Error>> {
30-
let log_msg = format!("[RUST] create_writer called for file: {}, schema_address: {}\n", filename, schema_address);
31-
println!("{}", log_msg.trim());
32-
Self::log_to_file(&log_msg);
28+
logger::log_info(&format!("[RUST] create_writer called for file: {}, schema_address: {}", filename, schema_address));
3329

3430
let arrow_schema = unsafe { FFI_ArrowSchema::from_raw(schema_address as *mut _) };
3531
let schema = Arc::new(arrow::datatypes::Schema::try_from(&arrow_schema)?);
3632

37-
let schema_msg = format!("[RUST] Schema created with {} fields\n", schema.fields().len());
38-
println!("{}", schema_msg.trim());
39-
Self::log_to_file(&schema_msg);
33+
logger::log_info(&format!("[RUST] Schema created with {} fields", schema.fields().len()));
4034

4135
for (i, field) in schema.fields().iter().enumerate() {
42-
let field_msg = format!("[RUST] Field {}: {} ({})\n", i, field.name(), field.data_type());
43-
println!("{}", field_msg.trim());
44-
Self::log_to_file(&field_msg);
36+
logger::log_debug(&format!("[RUST] Field {}: {} ({})", i, field.name(), field.data_type()));
4537
}
4638

4739
let file = File::create(&filename)?;
@@ -56,29 +48,21 @@ impl NativeParquetWriter {
5648
}
5749

5850
fn write_data(filename: String, array_address: i64, schema_address: i64) -> Result<(), Box<dyn std::error::Error>> {
59-
let log_msg = format!("[RUST] write_data called for file: {}, array_address: {}, schema_address: {}\n", filename, array_address, schema_address);
60-
println!("{}", log_msg.trim());
61-
Self::log_to_file(&log_msg);
51+
logger::log_info(&format!("[RUST] write_data called for file: {}, array_address: {}, schema_address: {}", filename, array_address, schema_address));
6252

6353
unsafe {
6454
let arrow_schema = FFI_ArrowSchema::from_raw(schema_address as *mut _);
6555
let arrow_array = FFI_ArrowArray::from_raw(array_address as *mut _);
6656

6757
match arrow::ffi::from_ffi(arrow_array, &arrow_schema) {
6858
Ok(array_data) => {
69-
let data_msg = format!("[RUST] Successfully imported array_data, length: {}\n", array_data.len());
70-
println!("{}", data_msg.trim());
71-
Self::log_to_file(&data_msg);
59+
logger::log_debug(&format!("[RUST] Successfully imported array_data, length: {}", array_data.len()));
7260

7361
let array: Arc<dyn arrow::array::Array> = arrow::array::make_array(array_data);
74-
let array_msg = format!("[RUST] Array type: {:?}, length: {}\n", array.data_type(), array.len());
75-
println!("{}", array_msg.trim());
76-
Self::log_to_file(&array_msg);
62+
logger::log_debug(&format!("[RUST] Array type: {:?}, length: {}", array.data_type(), array.len()));
7763

7864
if let Some(struct_array) = array.as_any().downcast_ref::<arrow::array::StructArray>() {
79-
let struct_msg = format!("[RUST] Successfully cast to StructArray with {} columns\n", struct_array.num_columns());
80-
println!("{}", struct_msg.trim());
81-
Self::log_to_file(&struct_msg);
65+
logger::log_debug(&format!("[RUST] Successfully cast to StructArray with {} columns", struct_array.num_columns()));
8266

8367
let schema = Arc::new(arrow::datatypes::Schema::new(
8468
struct_array.fields().clone()
@@ -89,70 +73,50 @@ impl NativeParquetWriter {
8973
struct_array.columns().to_vec(),
9074
)?;
9175

92-
let batch_msg = format!("[RUST] Created RecordBatch with {} rows and {} columns\n", record_batch.num_rows(), record_batch.num_columns());
93-
println!("{}", batch_msg.trim());
94-
Self::log_to_file(&batch_msg);
76+
logger::log_info(&format!("[RUST] Created RecordBatch with {} rows and {} columns", record_batch.num_rows(), record_batch.num_columns()));
9577

9678
if let Some(writer_arc) = WRITER_MANAGER.get(&filename) {
97-
let write_msg = "[RUST] Writing RecordBatch to file\n";
98-
println!("{}", write_msg.trim());
99-
Self::log_to_file(write_msg);
79+
logger::log_debug("[RUST] Writing RecordBatch to file");
10080
let mut writer = writer_arc.lock().unwrap();
10181
writer.write(&record_batch)?;
102-
let success_msg = "[RUST] Successfully wrote RecordBatch\n";
103-
println!("{}", success_msg.trim());
104-
Self::log_to_file(success_msg);
82+
logger::log_info("[RUST] Successfully wrote RecordBatch");
10583
} else {
106-
let error_msg = format!("[RUST] ERROR: No writer found for file: {}\n", filename);
107-
println!("{}", error_msg.trim());
108-
Self::log_to_file(&error_msg);
84+
logger::log_error(&format!("[RUST] ERROR: No writer found for file: {}", filename));
10985
}
11086
Ok(())
11187
} else {
112-
let error_msg = format!("[RUST] ERROR: Array is not a StructArray, type: {:?}\n", array.data_type());
113-
println!("{}", error_msg.trim());
114-
Self::log_to_file(&error_msg);
88+
logger::log_error(&format!("[RUST] ERROR: Array is not a StructArray, type: {:?}", array.data_type()));
11589
Err("Expected struct array from VectorSchemaRoot".into())
11690
}
11791
}
11892
Err(e) => {
119-
let error_msg = format!("[RUST] ERROR: Failed to import from FFI: {:?}\n", e);
120-
println!("{}", error_msg.trim());
121-
Self::log_to_file(&error_msg);
93+
logger::log_error(&format!("[RUST] ERROR: Failed to import from FFI: {:?}", e));
12294
Err(e.into())
12395
}
12496
}
12597
}
12698
}
12799

128100
fn close_writer(filename: String) -> Result<(), Box<dyn std::error::Error>> {
129-
let log_msg = format!("[RUST] close_writer called for file: {}\n", filename);
130-
println!("{}", log_msg.trim());
131-
Self::log_to_file(&log_msg);
101+
logger::log_info(&format!("[RUST] close_writer called for file: {}", filename));
132102

133103
if let Some((_, writer_arc)) = WRITER_MANAGER.remove(&filename) {
134104
match Arc::try_unwrap(writer_arc) {
135105
Ok(mutex) => {
136106
let mut writer = mutex.into_inner().unwrap();
137107
match writer.close() {
138108
Ok(_) => {
139-
let success_msg = format!("[RUST] Successfully closed writer for file: {}\n", filename);
140-
println!("{}", success_msg.trim());
141-
Self::log_to_file(&success_msg);
109+
logger::log_info(&format!("[RUST] Successfully closed writer for file: {}", filename));
142110
Ok(())
143111
}
144112
Err(e) => {
145-
let error_msg = format!("[RUST] ERROR: Failed to close writer for file: {}\n", filename);
146-
println!("{}", error_msg.trim());
147-
Self::log_to_file(&error_msg);
113+
logger::log_error(&format!("[RUST] ERROR: Failed to close writer for file: {}", filename));
148114
Err(e.into())
149115
}
150116
}
151117
}
152118
Err(_) => {
153-
let error_msg = format!("[RUST] ERROR: Writer still in use for file: {}\n", filename);
154-
println!("{}", error_msg.trim());
155-
Self::log_to_file(&error_msg);
119+
logger::log_error(&format!("[RUST] ERROR: Writer still in use for file: {}", filename));
156120
Err("Writer still in use".into())
157121
}
158122
}
@@ -162,37 +126,27 @@ impl NativeParquetWriter {
162126
}
163127

164128
fn flush_to_disk(filename: String) -> Result<(), Box<dyn std::error::Error>> {
165-
let log_msg = format!("[RUST] fsync_file called for file: {}\n", filename);
166-
println!("{}", log_msg.trim());
167-
Self::log_to_file(&log_msg);
129+
logger::log_info(&format!("[RUST] fsync_file called for file: {}", filename));
168130

169131
if let Some(file) = FILE_MANAGER.get_mut(&filename) {
170132
match file.sync_all() {
171133
Ok(_) => {
172-
let success_msg = format!("[RUST] Successfully fsynced file: {}\n", filename);
173-
println!("{}", success_msg.trim());
174-
Self::log_to_file(&success_msg);
134+
logger::log_info(&format!("[RUST] Successfully fsynced file: {}", filename));
175135
Ok(())
176136
}
177137
Err(e) => {
178-
let error_msg = format!("[RUST] ERROR: Failed to fsync file: {}\n", filename);
179-
println!("{}", error_msg.trim());
180-
Self::log_to_file(&error_msg);
138+
logger::log_error(&format!("[RUST] ERROR: Failed to fsync file: {}", filename));
181139
Err(e.into())
182140
}
183141
}
184142
} else {
185-
let error_msg = format!("[RUST] ERROR: File not found for fsync: {}\n", filename);
186-
println!("{}", error_msg.trim());
187-
Self::log_to_file(&error_msg);
143+
logger::log_error(&format!("[RUST] ERROR: File not found for fsync: {}", filename));
188144
Err("File not found".into())
189145
}
190146
}
191147

192148
fn get_filtered_writer_memory_usage(path_prefix: String) -> Result<usize, Box<dyn std::error::Error>> {
193-
let _log_msg = format!("[RUST] get_filtered_writer_memory_usage called with prefix: {}\n", path_prefix);
194-
// println!("{}", _log_msg.trim());
195-
// Self::log_to_file(&_log_msg);
149+
logger::log_debug(&format!("[RUST] get_filtered_writer_memory_usage called with prefix: {}", path_prefix));
196150

197151
let mut total_memory = 0;
198152
let mut writer_count = 0;
@@ -208,29 +162,24 @@ impl NativeParquetWriter {
208162
total_memory += memory_usage;
209163
writer_count += 1;
210164

211-
let usage_msg = format!("[RUST] Filtered Writer {}: {} bytes\n", filename, memory_usage);
212-
println!("{}", usage_msg.trim());
213-
Self::log_to_file(&usage_msg);
165+
logger::log_debug(&format!("[RUST] Filtered Writer {}: {} bytes", filename, memory_usage));
214166
}
215167
}
216168
}
217169

218-
let _total_msg = format!("[RUST] Total memory usage across {} filtered ArrowWriters (prefix: {}): {} bytes\n", writer_count, path_prefix, total_memory);
219-
//println!("{}", _total_msg.trim());
220-
//Self::log_to_file(&_total_msg);
170+
logger::log_debug(&format!("[RUST] Total memory usage across {} filtered ArrowWriters (prefix: {}): {} bytes", writer_count, path_prefix, total_memory));
221171

222172
Ok(total_memory)
223173
}
174+
}
224175

225-
fn log_to_file(message: &str) {
226-
if let Ok(mut file) = OpenOptions::new()
227-
.create(true)
228-
.append(true)
229-
.open("/tmp/rust_parquet_debug.log") {
230-
let timestamp = Utc::now().format("%Y-%m-%d %H:%M:%S%.3f UTC");
231-
let timestamped_message = format!("[{}] {}", timestamp, message);
232-
let _ = file.write_all(timestamped_message.as_bytes());
233-
}
176+
#[unsafe(no_mangle)]
177+
pub extern "system" fn Java_com_parquet_parquetdataformat_bridge_RustBridge_initLogger(
178+
env: JNIEnv,
179+
_class: JClass,
180+
) {
181+
if let Ok(jvm) = env.get_java_vm() {
182+
logger::init_logger(jvm);
234183
}
235184
}
236185

Lines changed: 103 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,103 @@
1+
use jni::{JNIEnv, JavaVM};
2+
use std::sync::OnceLock;
3+
4+
static JAVA_VM: OnceLock<JavaVM> = OnceLock::new();
5+
6+
/// Initialize the logger with the JVM instance
7+
pub fn init_logger(jvm: JavaVM) {
8+
JAVA_VM.set(jvm).ok();
9+
}
10+
11+
/// Log an info message through JNI callback to Java
12+
pub fn log_info(message: &str) {
13+
if let Some(jvm) = JAVA_VM.get() {
14+
if let Ok(mut env) = jvm.attach_current_thread() {
15+
call_java_logger(&mut env, "logInfo", message);
16+
}
17+
}
18+
}
19+
20+
/// Log a warning message through JNI callback to Java
21+
pub fn log_warn(message: &str) {
22+
if let Some(jvm) = JAVA_VM.get() {
23+
if let Ok(mut env) = jvm.attach_current_thread() {
24+
call_java_logger(&mut env, "logWarn", message);
25+
}
26+
}
27+
}
28+
29+
/// Log an error message through JNI callback to Java
30+
pub fn log_error(message: &str) {
31+
if let Some(jvm) = JAVA_VM.get() {
32+
if let Ok(mut env) = jvm.attach_current_thread() {
33+
call_java_logger(&mut env, "logError", message);
34+
}
35+
}
36+
}
37+
38+
/// Log a debug message through JNI callback to Java
39+
pub fn log_debug(message: &str) {
40+
if let Some(jvm) = JAVA_VM.get() {
41+
if let Ok(mut env) = jvm.attach_current_thread() {
42+
call_java_logger(&mut env, "logDebug", message);
43+
}
44+
}
45+
}
46+
47+
/// Internal function to call the Java logger method
48+
fn call_java_logger(env: &mut JNIEnv, method_name: &str, message: &str) {
49+
let result = (|| -> Result<(), Box<dyn std::error::Error>> {
50+
// Find the RustLoggerBridge class
51+
let class = env.find_class("com/parquet/parquetdataformat/bridge/RustLoggerBridge")?;
52+
53+
// Convert Rust string to Java string
54+
let java_message = env.new_string(message)?;
55+
56+
// Call the static method
57+
env.call_static_method(
58+
class,
59+
method_name,
60+
"(Ljava/lang/String;)V",
61+
&[(&java_message).into()],
62+
)?;
63+
64+
Ok(())
65+
})();
66+
67+
// If logging fails, fall back to println as last resort
68+
if result.is_err() {
69+
println!("[RUST_LOG_FALLBACK] {}: {}", method_name, message);
70+
}
71+
}
72+
73+
/// Macro for easy info logging
74+
#[macro_export]
75+
macro_rules! rust_log_info {
76+
($($arg:tt)*) => {
77+
$crate::logger::log_info(&format!($($arg)*))
78+
};
79+
}
80+
81+
/// Macro for easy warning logging
82+
#[macro_export]
83+
macro_rules! rust_log_warn {
84+
($($arg:tt)*) => {
85+
$crate::logger::log_warn(&format!($($arg)*))
86+
};
87+
}
88+
89+
/// Macro for easy error logging
90+
#[macro_export]
91+
macro_rules! rust_log_error {
92+
($($arg:tt)*) => {
93+
$crate::logger::log_error(&format!($($arg)*))
94+
};
95+
}
96+
97+
/// Macro for easy debug logging
98+
#[macro_export]
99+
macro_rules! rust_log_debug {
100+
($($arg:tt)*) => {
101+
$crate::logger::log_debug(&format!($($arg)*))
102+
};
103+
}

0 commit comments

Comments
 (0)