Skip to content

Commit abb55ec

Browse files
authored
Implement Cluster Scan (#113)
Signed-off-by: currantw <[email protected]>
1 parent 114e076 commit abb55ec

File tree

15 files changed

+798
-66
lines changed

15 files changed

+798
-66
lines changed

.gitignore

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -143,10 +143,11 @@ $RECYCLE.BIN/
143143
# Mac desktop service store files
144144
.DS_Store
145145

146-
# IDE generaged files
146+
# IDE generated files
147147
.vs
148148
.vscode
149149
.kiro/
150+
.amazonq/
150151

151152
_NCrunch*
152153

rust/src/ffi.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -317,7 +317,7 @@ pub(crate) unsafe fn create_route(
317317
/// * `data`, `data_len` and also each pointer stored in `data` must be able to be safely casted to a valid to a slice of the corresponding type via [`from_raw_parts`].
318318
/// See the safety documentation of [`from_raw_parts`].
319319
/// * The caller is responsible of freeing the allocated memory.
320-
pub(crate) unsafe fn convert_double_pointer_to_vec<'a>(
320+
pub(crate) unsafe fn convert_string_pointer_array_to_vector<'a>(
321321
data: *const *const u8,
322322
len: usize,
323323
data_len: *const usize,
@@ -530,11 +530,11 @@ pub struct BatchOptionsInfo {
530530
/// * `cmd_ptr` must be able to be safely casted to a valid [`CmdInfo`]
531531
/// * `args` and `args_len` in a referred [`CmdInfo`] structure must not be `null`.
532532
/// * `data` in a referred [`CmdInfo`] structure must point to `arg_count` consecutive string pointers.
533-
/// * `args_len` in a referred [`CmdInfo`] structure must point to `arg_count` consecutive string lengths. See the safety documentation of [`convert_double_pointer_to_vec`].
533+
/// * `args_len` in a referred [`CmdInfo`] structure must point to `arg_count` consecutive string lengths. See the safety documentation of [`convert_string_pointer_array_to_vector`].
534534
pub(crate) unsafe fn create_cmd(ptr: *const CmdInfo) -> Result<Cmd, String> {
535535
let info = unsafe { *ptr };
536536
let arg_vec =
537-
unsafe { convert_double_pointer_to_vec(info.args, info.arg_count, info.args_len) };
537+
unsafe { convert_string_pointer_array_to_vector(info.args, info.arg_count, info.args_len) };
538538

539539
let Some(mut cmd) = info.request_type.get_command() else {
540540
return Err("Couldn't fetch command type".into());

rust/src/lib.rs

Lines changed: 303 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@ use glide_core::{
1111
};
1212
use std::{
1313
ffi::{CStr, CString, c_char, c_void},
14+
slice::from_raw_parts,
1415
sync::Arc,
1516
};
1617
use tokio::runtime::{Builder, Runtime};
@@ -438,3 +439,305 @@ pub unsafe extern "C" fn init(level: Option<Level>, file_name: *const c_char) ->
438439
let logger_level = logger_core::init(level.map(|level| level.into()), file_name_as_str);
439440
logger_level.into()
440441
}
442+
443+
/// Execute a cluster scan request.
444+
///
445+
/// # Safety
446+
/// * `client_ptr` must be a valid Client pointer from create_client
447+
/// * `cursor` must be "0" for initial scan or a valid cursor ID from previous scan
448+
/// * `args` and `arg_lengths` must be valid arrays of length `arg_count`
449+
/// * `args` format: [b"MATCH", pattern_arg, b"COUNT", count, b"TYPE", type] (all optional)
450+
#[unsafe(no_mangle)]
451+
pub unsafe extern "C-unwind" fn request_cluster_scan(
452+
client_ptr: *const c_void,
453+
callback_index: usize,
454+
cursor: *const c_char,
455+
arg_count: u64,
456+
args: *const usize,
457+
arg_lengths: *const u64,
458+
) {
459+
// Build client and add panic guard.
460+
let client = unsafe {
461+
Arc::increment_strong_count(client_ptr);
462+
Arc::from_raw(client_ptr as *mut Client)
463+
};
464+
let core = client.core.clone();
465+
466+
let mut panic_guard = PanicGuard {
467+
panicked: true,
468+
failure_callback: core.failure_callback,
469+
callback_index,
470+
};
471+
472+
// Get the cluster scan state.
473+
let cursor_id = unsafe { CStr::from_ptr(cursor) }
474+
.to_str()
475+
.unwrap_or("0")
476+
.to_owned();
477+
478+
let scan_state_cursor = if cursor_id == "0" {
479+
redis::ScanStateRC::new()
480+
} else {
481+
match glide_core::cluster_scan_container::get_cluster_scan_cursor(cursor_id.clone()) {
482+
Ok(existing_cursor) => existing_cursor,
483+
Err(_error) => {
484+
unsafe {
485+
(core.failure_callback)(
486+
callback_index,
487+
format!("Invalid cursor ID: {}", cursor_id).as_ptr() as *const c_char,
488+
RequestErrorType::Unspecified,
489+
);
490+
}
491+
return;
492+
}
493+
}
494+
};
495+
496+
// Build cluster scan arguments.
497+
let cluster_scan_args = match unsafe {
498+
build_cluster_scan_args(
499+
arg_count,
500+
args,
501+
arg_lengths,
502+
core.failure_callback,
503+
callback_index,
504+
)
505+
} {
506+
Some(args) => args,
507+
None => return,
508+
};
509+
510+
// Run cluster scan.
511+
client.runtime.spawn(async move {
512+
let mut async_panic_guard = PanicGuard {
513+
panicked: true,
514+
failure_callback: core.failure_callback,
515+
callback_index,
516+
};
517+
518+
let result = core
519+
.client
520+
.clone()
521+
.cluster_scan(&scan_state_cursor, cluster_scan_args)
522+
.await;
523+
match result {
524+
Ok(value) => {
525+
let ptr = Box::into_raw(Box::new(ResponseValue::from_value(value)));
526+
unsafe { (core.success_callback)(callback_index, ptr) };
527+
}
528+
Err(err) => unsafe {
529+
report_error(
530+
core.failure_callback,
531+
callback_index,
532+
glide_core::errors::error_message(&err),
533+
glide_core::errors::error_type(&err),
534+
);
535+
},
536+
};
537+
538+
async_panic_guard.panicked = false;
539+
});
540+
541+
panic_guard.panicked = false;
542+
}
543+
544+
/// Remove a cluster scan cursor from the Rust core container.
545+
///
546+
/// This should be called when the C# ClusterScanCursor is disposed or finalized
547+
/// to clean up resources allocated by the Rust core for cluster scan operations.
548+
///
549+
/// # Safety
550+
/// * `cursor_id` must be a valid C string or null
551+
#[unsafe(no_mangle)]
552+
pub unsafe extern "C" fn remove_cluster_scan_cursor(cursor_id: *const c_char) {
553+
if cursor_id.is_null() {
554+
return;
555+
}
556+
557+
if let Ok(cursor_str) = unsafe { CStr::from_ptr(cursor_id).to_str() } {
558+
glide_core::cluster_scan_container::remove_scan_state_cursor(cursor_str.to_string());
559+
}
560+
}
561+
562+
/// Build cluster scan arguments from C-style arrays.
563+
///
564+
/// # Arguments
565+
///
566+
/// * `arg_count` - The number of arguments in the arrays
567+
/// * `args` - Pointer to an array of pointers to argument data
568+
/// * `arg_lengths` - Pointer to an array of argument lengths
569+
/// * `failure_callback` - Callback function to invoke on error
570+
/// * `callback_index` - Index to pass to the callback function
571+
///
572+
/// # Safety
573+
/// * `args` and `arg_lengths` must be valid arrays of length `arg_count`
574+
/// * Each pointer in `args` must point to valid memory of the corresponding length
575+
unsafe fn build_cluster_scan_args(
576+
arg_count: u64,
577+
args: *const usize,
578+
arg_lengths: *const u64,
579+
failure_callback: FailureCallback,
580+
callback_index: usize,
581+
) -> Option<redis::ClusterScanArgs> {
582+
if arg_count == 0 {
583+
return Some(redis::ClusterScanArgs::builder().build());
584+
}
585+
586+
let arg_vec = unsafe { convert_string_pointer_array_to_vector(args, arg_count, arg_lengths) };
587+
588+
// Parse arguments from vector.
589+
let mut pattern_arg: &[u8] = &[];
590+
let mut type_arg: &[u8] = &[];
591+
let mut count_arg: &[u8] = &[];
592+
593+
let mut iter = arg_vec.iter().peekable();
594+
while let Some(arg) = iter.next() {
595+
match *arg {
596+
b"MATCH" => match iter.next() {
597+
Some(p) => pattern_arg = p,
598+
None => {
599+
unsafe {
600+
report_error(
601+
failure_callback,
602+
callback_index,
603+
"No argument following MATCH.".into(),
604+
RequestErrorType::Unspecified,
605+
);
606+
}
607+
return None;
608+
}
609+
},
610+
b"TYPE" => match iter.next() {
611+
Some(t) => type_arg = t,
612+
None => {
613+
unsafe {
614+
report_error(
615+
failure_callback,
616+
callback_index,
617+
"No argument following TYPE.".into(),
618+
RequestErrorType::Unspecified,
619+
);
620+
}
621+
return None;
622+
}
623+
},
624+
b"COUNT" => match iter.next() {
625+
Some(c) => count_arg = c,
626+
None => {
627+
unsafe {
628+
report_error(
629+
failure_callback,
630+
callback_index,
631+
"No argument following COUNT.".into(),
632+
RequestErrorType::Unspecified,
633+
);
634+
}
635+
return None;
636+
}
637+
},
638+
_ => {
639+
unsafe {
640+
report_error(
641+
failure_callback,
642+
callback_index,
643+
"Unknown cluster scan argument".into(),
644+
RequestErrorType::Unspecified,
645+
);
646+
}
647+
return None;
648+
}
649+
}
650+
}
651+
652+
// Build cluster scan arguments.
653+
let mut cluster_scan_args_builder = redis::ClusterScanArgs::builder();
654+
655+
if !pattern_arg.is_empty() {
656+
cluster_scan_args_builder = cluster_scan_args_builder.with_match_pattern(pattern_arg);
657+
}
658+
659+
if !type_arg.is_empty() {
660+
let converted_type = match std::str::from_utf8(type_arg) {
661+
Ok(t) => redis::ObjectType::from(t.to_string()),
662+
Err(_) => {
663+
unsafe {
664+
report_error(
665+
failure_callback,
666+
callback_index,
667+
"Invalid UTF-8 in TYPE argument".into(),
668+
RequestErrorType::Unspecified,
669+
);
670+
}
671+
return None;
672+
}
673+
};
674+
675+
cluster_scan_args_builder = cluster_scan_args_builder.with_object_type(converted_type);
676+
}
677+
678+
if !count_arg.is_empty() {
679+
let count_str = match std::str::from_utf8(count_arg) {
680+
Ok(c) => c,
681+
Err(_) => {
682+
unsafe {
683+
report_error(
684+
failure_callback,
685+
callback_index,
686+
"Invalid UTF-8 in COUNT argument".into(),
687+
RequestErrorType::Unspecified,
688+
);
689+
}
690+
return None;
691+
}
692+
};
693+
694+
let converted_count = match count_str.parse::<u32>() {
695+
Ok(c) => c,
696+
Err(_) => {
697+
unsafe {
698+
report_error(
699+
failure_callback,
700+
callback_index,
701+
"Invalid COUNT value".into(),
702+
RequestErrorType::Unspecified,
703+
);
704+
}
705+
return None;
706+
}
707+
};
708+
709+
cluster_scan_args_builder = cluster_scan_args_builder.with_count(converted_count);
710+
}
711+
712+
Some(cluster_scan_args_builder.build())
713+
}
714+
715+
/// Converts an array of pointers to strings to a vector of strings.
716+
///
717+
/// # Arguments
718+
///
719+
/// * `data` - Pointer to an array of pointers to string data
720+
/// * `len` - The number of strings in the array
721+
/// * `data_len` - Pointer to an array of string lengths
722+
///
723+
/// # Safety
724+
///
725+
/// `convert_string_pointer_array_to_vector` returns a `Vec` of u8 slice which holds pointers of C
726+
/// strings. The returned `Vec<&'a [u8]>` is meant to be copied into Rust code. Storing them
727+
/// for later use will cause the program to crash as the pointers will be freed by the caller.
728+
unsafe fn convert_string_pointer_array_to_vector<'a>(
729+
data: *const usize,
730+
len: u64,
731+
data_len: *const u64,
732+
) -> Vec<&'a [u8]> {
733+
let string_ptrs = unsafe { from_raw_parts(data, len as usize) };
734+
let string_lengths = unsafe { from_raw_parts(data_len, len as usize) };
735+
736+
let mut result = Vec::<&[u8]>::with_capacity(string_ptrs.len());
737+
for (i, &str_ptr) in string_ptrs.iter().enumerate() {
738+
let slice = unsafe { from_raw_parts(str_ptr as *const u8, string_lengths[i] as usize) };
739+
result.push(slice);
740+
}
741+
742+
result
743+
}

0 commit comments

Comments
 (0)