Skip to content

Commit

Permalink
tests: kafka: Add checkpoint test
Browse files Browse the repository at this point in the history
  • Loading branch information
tontinton committed Jun 22, 2024
1 parent e7eb752 commit f671176
Show file tree
Hide file tree
Showing 5 changed files with 151 additions and 8 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 4 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,9 @@ lto = true
inherits = "release"
lto = "thin"

[features]
in-tests = []

[dependencies]
async-trait = "0.1.80"
bincode = "1.3.3"
Expand Down Expand Up @@ -43,3 +46,4 @@ rstest = "0.21.0"
tempfile = "3.10.1"
testcontainers = "0.17.0"
testcontainers-modules = { version = "0.5.0", features = ["postgres", "kafka"] }
toshokan = { path = ".", features = ["in-tests"] }
11 changes: 9 additions & 2 deletions src/commands/index.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,15 @@ pub struct IndexRunner {

impl IndexRunner {
pub async fn new(args: IndexArgs, pool: PgPool) -> Result<Self> {
let source = connect_to_source(args.input.as_deref(), args.stream, &pool).await?;
IndexRunner::new_with_source(args, pool, source).await
}

pub async fn new_with_source(
args: IndexArgs,
pool: PgPool,
source: Box<dyn Source + Send + Sync>,
) -> Result<Self> {
let config = get_index_config(&args.name, &pool).await?;

let mut schema_builder = Schema::builder();
Expand All @@ -58,8 +67,6 @@ impl IndexRunner {
build_parsers_from_field_configs(&config.schema.fields, &mut schema_builder)?;
let schema = schema_builder.build();

let source = connect_to_source(args.input.as_deref(), args.stream, &pool).await?;

Ok(Self {
source,
schema,
Expand Down
51 changes: 51 additions & 0 deletions src/commands/sources/kafka_source.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,34 @@ pub const KAFKA_PREFIX: &str = "kafka://";
const CONSUMER_THREAD_MESSAGES_CHANNEL_SIZE: usize = 10;
const POLL_DURATION: Duration = Duration::from_secs(1);

macro_rules! track_saved_checkpoint_impl {
( $self:ident, $partitions_and_offsets:ident ) => {
#[cfg(feature = "in-tests")]
{
$self.saved_partitions_and_offsets = $partitions_and_offsets.to_vec();
}

#[cfg(not(feature = "in-tests"))]
{
// No-op when not in tests.
}
};
}

macro_rules! track_loaded_checkpoint_impl {
( $self:ident, $partitions_and_offsets:ident ) => {
#[cfg(feature = "in-tests")]
{
$self.loaded_partitions_and_offsets = $partitions_and_offsets.to_vec();
}

#[cfg(not(feature = "in-tests"))]
{
// No-op when not in tests.
}
};
}

struct PostRebalance {
partitions: Vec<i32>,
checkpoint_tx: oneshot::Sender<Vec<(i32, Option<i64>)>>,
Expand Down Expand Up @@ -129,6 +157,11 @@ pub struct KafkaSource {
messages_rx: mpsc::Receiver<Result<MessageFromConsumerThread>>,
checkpoint: Option<Checkpoint>,
partition_to_offset: BTreeMap<i32, i64>,

#[cfg(feature = "in-tests")]
pub saved_partitions_and_offsets: Vec<(i32, i64)>,
#[cfg(feature = "in-tests")]
pub loaded_partitions_and_offsets: Vec<(i32, Option<i64>)>,
}

pub fn parse_url(url: &str) -> Result<(&str, &str)> {
Expand Down Expand Up @@ -253,6 +286,11 @@ impl KafkaSource {
messages_rx: rx,
checkpoint,
partition_to_offset: BTreeMap::new(),

#[cfg(feature = "in-tests")]
saved_partitions_and_offsets: Vec::new(),
#[cfg(feature = "in-tests")]
loaded_partitions_and_offsets: Vec::new(),
};

this.wait_for_assignment()
Expand Down Expand Up @@ -285,6 +323,9 @@ impl KafkaSource {
.load(&msg.partitions)
.await
.context("failed to load checkpoint")?;

self.track_loaded_checkpoint(&partitions_and_offsets);

if msg.checkpoint_tx.send(partitions_and_offsets).is_err() {
bail!("failed to respond with partition offsets, kafka consumer thread probably closed")
}
Expand All @@ -293,6 +334,14 @@ impl KafkaSource {

Ok(())
}

fn track_saved_checkpoint(&mut self, partitions_and_offsets: Vec<(i32, i64)>) {
track_saved_checkpoint_impl!(self, partitions_and_offsets);
}

fn track_loaded_checkpoint(&mut self, partitions_and_offsets: &[(i32, Option<i64>)]) {
track_loaded_checkpoint_impl!(self, partitions_and_offsets);
}
}

#[async_trait]
Expand Down Expand Up @@ -338,6 +387,8 @@ impl Source for KafkaSource {
.collect::<Vec<_>>();
checkpoint.save(&flat).await?;

self.track_saved_checkpoint(flat);

self.partition_to_offset.clear();

Ok(())
Expand Down
92 changes: 86 additions & 6 deletions tests/kafka_indexing.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,20 +3,19 @@ mod common;
use std::{
path::{Path, PathBuf},
str::FromStr,
sync::Arc,
time::Duration,
};

use async_trait::async_trait;
use clap::Parser;
use color_eyre::{
eyre::{bail, Context},
Result,
};
use ctor::ctor;
use log::debug;
use notify::{
event::{CreateKind, RemoveKind},
recommended_watcher, Event, EventKind, RecursiveMode, Watcher,
};
use notify::{event::CreateKind, recommended_watcher, Event, EventKind, RecursiveMode, Watcher};
use rdkafka::{
producer::{FutureProducer, FutureRecord, Producer},
ClientConfig,
Expand All @@ -27,15 +26,18 @@ use testcontainers_modules::kafka::{Kafka as KafkaContainer, KAFKA_PORT};
use tokio::{
fs::{create_dir_all, remove_dir_all},
join, select, spawn,
sync::{mpsc, oneshot},
sync::{mpsc, oneshot, Mutex},
task::JoinHandle,
};
use toshokan::{
args::IndexArgs,
commands::{
create::run_create_from_config,
index::{run_index, BatchResult, IndexRunner},
sources::kafka_source::parse_url,
sources::{
kafka_source::{parse_url, KafkaSource},
Source, SourceItem,
},
},
config::IndexConfig,
};
Expand Down Expand Up @@ -242,3 +244,81 @@ async fn test_kafka_index_stream_restart_on_rebalance() -> Result<()> {

Ok(())
}

struct ArcSource(Arc<Mutex<dyn Source + Send>>);

#[async_trait]
impl Source for ArcSource {
async fn get_one(&mut self) -> Result<SourceItem> {
self.0.clone().lock_owned().await.get_one().await
}

async fn on_index_created(&mut self) -> Result<()> {
self.0.clone().lock_owned().await.on_index_created().await
}
}

#[tokio::test]
async fn test_kafka_index_stream_load_checkpoint() -> Result<()> {
let postgres = run_postgres().await?;

let kafka_container = KafkaContainer::default().start().await?;
let kafka_port = kafka_container.get_host_port_ipv4(KAFKA_PORT).await?;
let kafka_url = format!("kafka://127.0.0.1:{kafka_port}/test_topic");

let mut config = IndexConfig::from_str(include_str!("../example_config.yaml"))?;
config.path = "/tmp/toshokan_kafka_stream_load_checkpoint".to_string();

// Just in case this path already exists, remove it.
let _ = remove_dir_all(&config.path).await;
create_dir_all(&config.path).await?;

run_create_from_config(&config, &postgres.pool).await?;

produce_logs(
&kafka_url,
include_str!("test_files/hdfs-logs-multitenants-2.json"),
)
.await?;

// Save checkpoint.
{
let kafka_source = Arc::new(Mutex::new(
KafkaSource::from_url(&kafka_url, true, &postgres.pool).await?,
));

assert_eq!(
kafka_source.lock().await.loaded_partitions_and_offsets,
vec![(0, None)]
);

let mut runner = IndexRunner::new_with_source(
IndexArgs::parse_from([
"",
&config.name,
&kafka_url,
"--stream",
"--commit-interval",
"500ms",
]),
postgres.pool.clone(),
Box::new(ArcSource(kafka_source.clone())),
)
.await?;

while kafka_source.lock().await.saved_partitions_and_offsets != vec![(0, 2)] {
assert_eq!(runner.run_one_batch().await?, BatchResult::Timeout);
}
}

// Load checkpoint.
{
let kafka_source = KafkaSource::from_url(&kafka_url, true, &postgres.pool).await?;
assert_eq!(
kafka_source.loaded_partitions_and_offsets,
vec![(0, Some(2))]
);
}

Ok(())
}

0 comments on commit f671176

Please sign in to comment.