Skip to content

Commit 607ffbd

Browse files
authored
Fix Kinesis source panic on resharding (#5912)
1 parent 4bb6e2a commit 607ffbd

File tree

1 file changed

+69
-1
lines changed

1 file changed

+69
-1
lines changed

quickwit/quickwit-indexing/src/source/kinesis/kinesis_source.rs

Lines changed: 69 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -137,7 +137,14 @@ impl KinesisSource {
137137
shard_id: ShardId,
138138
checkpoint: &SourceCheckpoint,
139139
) {
140-
assert!(!self.state.shard_consumers.contains_key(&shard_id));
140+
if self.state.shard_consumers.contains_key(&shard_id) {
141+
info!(
142+
stream_name = %self.stream_name,
143+
shard_id = %shard_id,
144+
"Shard consumer already exists, skipping creation."
145+
);
146+
return;
147+
}
141148

142149
let partition_id = PartitionId::from(shard_id.as_str());
143150
let from_position = checkpoint
@@ -149,6 +156,12 @@ impl KinesisSource {
149156
Position::Offset(offset) => Some(offset.to_string()),
150157
Position::Eof(_) => panic!("position of a Kinesis shard should never be EOF"),
151158
};
159+
info!(
160+
stream_name = %self.stream_name,
161+
shard_id = %shard_id,
162+
start_position = ?from_position,
163+
"Spawning new shard consumer"
164+
);
152165
let shard_consumer = ShardConsumer::new(
153166
self.stream_name.clone(),
154167
shard_id.clone(),
@@ -384,6 +397,61 @@ mod tests {
384397
Ok(merged_batch)
385398
}
386399

400+
#[ignore]
401+
#[tokio::test]
402+
async fn test_kinesis_source_handles_resharding_with_split() {
403+
use crate::source::kinesis::api::tests::split_shard;
404+
use crate::source::kinesis::helpers::tests::wait_for_active_stream;
405+
406+
let universe = Universe::with_accelerated_time();
407+
let (doc_processor_mailbox, _doc_processor_inbox) = universe.create_test_mailbox();
408+
let (kinesis_client, stream_name) = setup("test-resharding-split", 1).await.unwrap();
409+
let index_id = "test-kinesis-resharding-index";
410+
let index_uid = IndexUid::new_with_random_ulid(index_id);
411+
412+
// Split the shard (1 -> 2 shards)
413+
let shard_id_0 = make_shard_id(0);
414+
split_shard(
415+
&kinesis_client,
416+
&stream_name,
417+
&shard_id_0,
418+
"85070591730234615865843651857942052864",
419+
)
420+
.await
421+
.unwrap();
422+
423+
// Wait for stream to be active after split
424+
let _ = wait_for_active_stream(&kinesis_client, &stream_name)
425+
.await
426+
.unwrap();
427+
428+
// Initialize source after split
429+
let kinesis_params = KinesisSourceParams {
430+
stream_name: stream_name.clone(),
431+
region_or_endpoint: Some(RegionOrEndpoint::Endpoint(
432+
"http://localhost:4566".to_string(),
433+
)),
434+
enable_backfill_mode: true,
435+
};
436+
let source_params = SourceParams::Kinesis(kinesis_params.clone());
437+
let source_config = SourceConfig::for_test("test-kinesis-resharding", source_params);
438+
let source_runtime = SourceRuntimeBuilder::new(index_uid, source_config).build();
439+
440+
let kinesis_source = KinesisSource::try_new(source_runtime, kinesis_params)
441+
.await
442+
.unwrap();
443+
444+
let actor = SourceActor {
445+
source: Box::new(kinesis_source),
446+
doc_processor_mailbox: doc_processor_mailbox.clone(),
447+
};
448+
let (_mailbox, handle) = universe.spawn_builder().spawn(actor);
449+
let (exit_status, _exit_state) = handle.join().await;
450+
assert!(exit_status.is_success());
451+
452+
teardown(&kinesis_client, &stream_name).await;
453+
}
454+
387455
#[ignore]
388456
#[tokio::test]
389457
async fn test_kinesis_source() {

0 commit comments

Comments
 (0)