From e4f06b5a84404f12f8ad78a4f839d5d57f3d1ba9 Mon Sep 17 00:00:00 2001 From: Tony Solomonik Date: Tue, 18 Jun 2024 23:30:45 +0300 Subject: [PATCH] index: kafka: Add checkpoints --- migrations/0001_base.down.sql | 5 + .../{0001_indexes.up.sql => 0001_base.up.sql} | 8 ++ migrations/0001_indexes.down.sql | 2 - src/commands/index.rs | 4 +- src/commands/sources/buf_source.rs | 4 + src/commands/sources/kafka_checkpoint.rs | 78 ++++++++++++ src/commands/sources/kafka_source.rs | 120 ++++++++++++++---- src/commands/sources/mod.rs | 17 ++- 8 files changed, 207 insertions(+), 31 deletions(-) create mode 100644 migrations/0001_base.down.sql rename migrations/{0001_indexes.up.sql => 0001_base.up.sql} (62%) delete mode 100644 migrations/0001_indexes.down.sql create mode 100644 src/commands/sources/kafka_checkpoint.rs diff --git a/migrations/0001_base.down.sql b/migrations/0001_base.down.sql new file mode 100644 index 0000000..94a4ac8 --- /dev/null +++ b/migrations/0001_base.down.sql @@ -0,0 +1,5 @@ +DROP INDEX IF EXISTS kafka_checkpoints_partition_idx; +DROP INDEX IF EXISTS kafka_checkpoints_source_id_idx; +DROP TABLE IF EXISTS kafka_checkpoints; +DROP TABLE IF EXISTS index_files; +DROP TABLE IF EXISTS indexes; diff --git a/migrations/0001_indexes.up.sql b/migrations/0001_base.up.sql similarity index 62% rename from migrations/0001_indexes.up.sql rename to migrations/0001_base.up.sql index d50e41a..1aff917 100644 --- a/migrations/0001_indexes.up.sql +++ b/migrations/0001_base.up.sql @@ -9,3 +9,11 @@ CREATE TABLE IF NOT EXISTS index_files( file_name TEXT NOT NULL, footer_len BIGINT NOT NULL ); + +CREATE TABLE IF NOT EXISTS kafka_checkpoints( + source_id TEXT NOT NULL, + partition INT NOT NULL, + offset_value BIGINT NOT NULL, + + PRIMARY KEY (source_id, partition) +); diff --git a/migrations/0001_indexes.down.sql b/migrations/0001_indexes.down.sql deleted file mode 100644 index b91e5cc..0000000 --- a/migrations/0001_indexes.down.sql +++ /dev/null @@ -1,2 +0,0 @@ -DROP TABLE index_files; -DROP TABLE indexes; diff --git a/src/commands/index.rs b/src/commands/index.rs index d676401..0a1bea0 100644 --- a/src/commands/index.rs +++ b/src/commands/index.rs @@ -125,6 +125,8 @@ async fn pipe_source_to_index( write_unified_index(&id, &index, &index_dir, &config.name, &config.path, pool).await?; + source.on_index_created().await?; + Ok(did_timeout) } @@ -137,7 +139,7 @@ pub async fn run_index(args: IndexArgs, pool: &PgPool) -> Result<()> { build_parsers_from_field_configs(&config.schema.fields, &mut schema_builder)?; let schema = schema_builder.build(); - let mut source = connect_to_source(args.input.as_deref(), args.stream).await?; + let mut source = connect_to_source(args.input.as_deref(), args.stream, pool).await?; while pipe_source_to_index( &mut source, diff --git a/src/commands/sources/buf_source.rs b/src/commands/sources/buf_source.rs index cfcb871..9c37e6d 100644 --- a/src/commands/sources/buf_source.rs +++ b/src/commands/sources/buf_source.rs @@ -47,4 +47,8 @@ impl Source for BufSource { self.line.clear(); Ok(SourceItem::Document(map)) } + + async fn on_index_created(&mut self) -> Result<()> { + Ok(()) + } } diff --git a/src/commands/sources/kafka_checkpoint.rs b/src/commands/sources/kafka_checkpoint.rs new file mode 100644 index 0000000..55d1645 --- /dev/null +++ b/src/commands/sources/kafka_checkpoint.rs @@ -0,0 +1,78 @@ +use std::collections::BTreeMap; + +use color_eyre::Result; +use sqlx::PgPool; + +#[derive(Debug, Clone)] +pub struct Checkpoint { + source_id: String, + pool: PgPool, +} + +impl Checkpoint { + pub fn new(source_id: String, pool: PgPool) -> Self { + Self { source_id, pool } + } + + pub async fn load(&self, partitions: &[i32]) -> Result)>> { + if partitions.is_empty() { + return Ok(Vec::new()); + } + + let placeholders = (0..partitions.len()) + .map(|i| format!("${}", i + 2)) + .collect::>() + .join(", "); + + let sql = format!( + "SELECT partition, offset_value FROM kafka_checkpoints WHERE source_id = $1 AND partition IN ({})", + placeholders + ); + + let mut query = sqlx::query_as(&sql).bind(&self.source_id); + for partition in partitions { + query = query.bind(partition); + } + + let partitions_and_offsets: Vec<(i32, i64)> = query.fetch_all(&self.pool).await?; + debug!("Loaded checkpoints: {partitions_and_offsets:?}"); + + let partitions_to_offsets = partitions_and_offsets + .into_iter() + .collect::>(); + + Ok(partitions + .iter() + .map(|partition| (*partition, partitions_to_offsets.get(partition).copied())) + .collect()) + } + + pub async fn save(&self, partitions_and_offsets: &[(i32, i64)]) -> Result<()> { + let items = partitions_and_offsets + .iter() + // Add 1 as we don't want to seek to the last record already read, but the next. + .map(|(p, o)| (&self.source_id, *p, *o + 1)) + .collect::>(); + + let mut sql = String::from( + "INSERT INTO kafka_checkpoints (source_id, partition, offset_value) VALUES ", + ); + + let params = (0..items.len()) + .map(|i| format!("(${}, ${}, ${})", i * 3 + 1, i * 3 + 2, i * 3 + 3)) + .collect::>(); + sql.push_str(¶ms.join(", ")); + sql.push_str(" ON CONFLICT (source_id, partition) DO UPDATE SET offset_value = EXCLUDED.offset_value"); + + debug!("Saving checkpoints: {partitions_and_offsets:?}"); + + let mut query = sqlx::query(&sql); + for (source_id, partition, offset) in items { + query = query.bind(source_id).bind(partition).bind(offset); + } + + query.execute(&self.pool).await?; + + Ok(()) + } +} diff --git a/src/commands/sources/kafka_source.rs b/src/commands/sources/kafka_source.rs index aa7c74b..79bec98 100644 --- a/src/commands/sources/kafka_source.rs +++ b/src/commands/sources/kafka_source.rs @@ -1,4 +1,4 @@ -use std::time::Duration; +use std::{collections::BTreeMap, time::Duration}; use async_trait::async_trait; use color_eyre::{ @@ -11,24 +11,29 @@ use rdkafka::{ error::KafkaError, ClientConfig, ClientContext, Message, Offset, }; +use sqlx::PgPool; use tokio::{ sync::{mpsc, oneshot}, task::spawn_blocking, }; -use super::{Source, SourceItem}; +use super::{kafka_checkpoint::Checkpoint, Source, SourceItem}; pub const KAFKA_PREFIX: &str = "kafka://"; const CONSUMER_THREAD_MESSAGES_CHANNEL_SIZE: usize = 10; const POLL_DURATION: Duration = Duration::from_secs(1); enum MessageFromConsumerThread { - Payload(Vec), + Payload { + bytes: Vec, + partition: i32, + offset: i64, + }, Eof, PreRebalance, PostRebalance { partitions: Vec, - offsets_tx: oneshot::Sender>, + checkpoint_tx: oneshot::Sender)>>, }, } @@ -60,36 +65,50 @@ impl ConsumerContext for KafkaContext { return; } - let partitions = tpl - .elements() - .iter() - .map(|x| x.partition()) - .collect::>(); + // elements() panics when tpl is empty, so we check capacity. + let partitions = if tpl.capacity() > 0 { + tpl.elements() + .iter() + .map(|x| { + assert_eq!(x.topic(), self.topic); + x.partition() + }) + .collect::>() + } else { + Vec::new() + }; let (tx, rx) = oneshot::channel(); let msg = MessageFromConsumerThread::PostRebalance { - partitions: partitions.clone(), - offsets_tx: tx, + partitions, + checkpoint_tx: tx, }; if let Err(e) = self.messages_tx.blocking_send(Ok(msg)) { error!("Failed to send post-rebalance event: {e}"); return; } - let offsets_result = rx.blocking_recv(); - if let Err(e) = offsets_result { + let checkpoint_result = rx.blocking_recv(); + if let Err(e) = checkpoint_result { error!("Failed to recv post-rebalance offsets: {e}"); return; } - let offsets = offsets_result.unwrap(); + let partitions_and_offsets = checkpoint_result.unwrap(); - for (id, offset) in partitions.into_iter().zip(offsets) { + for (id, offset) in partitions_and_offsets { let Some(mut partition) = tpl.find_partition(&self.topic, id) else { warn!("Partition id '{id}' not found?"); continue; }; - if let Err(e) = partition.set_offset(offset) { - warn!("Failed to set offset to '{offset:?}' for partition id '{id}': {e}"); + + let rdkafka_offset = if let Some(offset) = offset { + Offset::Offset(offset) + } else { + Offset::Beginning + }; + + if let Err(e) = partition.set_offset(rdkafka_offset) { + warn!("Failed to set offset to '{rdkafka_offset:?}' for partition id '{id}': {e}"); } } } @@ -104,6 +123,8 @@ type KafkaConsumer = BaseConsumer; pub struct KafkaSource { messages_rx: mpsc::Receiver>, + checkpoint: Option, + partition_to_offset: BTreeMap, } fn parse_url(url: &str) -> Result<(&str, &str)> { @@ -156,7 +177,11 @@ fn run_consumer_thread( } let msg = msg - .map(|x| MessageFromConsumerThread::Payload(x.payload().unwrap().to_vec())) + .map(|x| MessageFromConsumerThread::Payload { + bytes: x.payload().unwrap().to_vec(), + partition: x.partition(), + offset: x.offset(), + }) .map_err(|e| Report::new(e)); if let Err(e) = tx.blocking_send(msg) { @@ -168,7 +193,7 @@ fn run_consumer_thread( } impl KafkaSource { - pub fn from_url(url: &str, stream: bool) -> Result { + pub fn from_url(url: &str, stream: bool, pool: &PgPool) -> Result { let (servers, topic) = parse_url(url)?; let log_level = if cfg!(debug_assertions) { @@ -213,7 +238,18 @@ impl KafkaSource { run_consumer_thread(consumer, tx); - Ok(Self { messages_rx: rx }) + let checkpoint = if stream { + // Url is not a good identifier as a source id, but we'll live with it for now. + Some(Checkpoint::new(url.to_string(), pool.clone())) + } else { + None + }; + + Ok(Self { + messages_rx: rx, + checkpoint, + partition_to_offset: BTreeMap::new(), + }) } } @@ -226,7 +262,12 @@ impl Source for KafkaSource { }; match msg? { - MessageFromConsumerThread::Payload(bytes) => { + MessageFromConsumerThread::Payload { + bytes, + partition, + offset, + } => { + self.partition_to_offset.insert(partition, offset); break SourceItem::Document(serde_json::from_slice(&bytes)?); } MessageFromConsumerThread::Eof => { @@ -235,15 +276,42 @@ impl Source for KafkaSource { MessageFromConsumerThread::PreRebalance => { break SourceItem::Restart; } - MessageFromConsumerThread::PostRebalance{partitions, offsets_tx} => { - if offsets_tx - .send(partitions.into_iter().map(|_| Offset::Stored).collect()) - .is_err() - { + MessageFromConsumerThread::PostRebalance { + partitions, + checkpoint_tx, + } => { + let Some(ref checkpoint) = self.checkpoint else { + continue; + }; + + let partitions_and_offsets = checkpoint + .load(&partitions) + .await + .context("failed to load checkpoint")?; + if checkpoint_tx.send(partitions_and_offsets).is_err() { bail!("failed to respond with partition offsets, kafka consumer thread probably closed") } + + self.partition_to_offset.clear(); } } }) } + + async fn on_index_created(&mut self) -> Result<()> { + let Some(ref checkpoint) = self.checkpoint else { + return Ok(()); + }; + + let flat = self + .partition_to_offset + .iter() + .map(|(p, o)| (*p, *o)) + .collect::>(); + checkpoint.save(&flat).await?; + + self.partition_to_offset.clear(); + + Ok(()) + } } diff --git a/src/commands/sources/mod.rs b/src/commands/sources/mod.rs index 70f4b38..9d2dc1f 100644 --- a/src/commands/sources/mod.rs +++ b/src/commands/sources/mod.rs @@ -1,8 +1,10 @@ mod buf_source; +mod kafka_checkpoint; mod kafka_source; use async_trait::async_trait; use color_eyre::{eyre::bail, Result}; +use sqlx::PgPool; use self::{ buf_source::BufSource, @@ -24,12 +26,23 @@ pub enum SourceItem { #[async_trait] pub trait Source { + /// Get a document from the source. async fn get_one(&mut self) -> Result; + + /// Called when an index file was just created to notify the source. + /// Useful for implementing checkpoints for example. + async fn on_index_created(&mut self) -> Result<()>; } -pub async fn connect_to_source(input: Option<&str>, stream: bool) -> Result> { +pub async fn connect_to_source( + input: Option<&str>, + stream: bool, + pool: &PgPool, +) -> Result> { Ok(match input { - Some(url) if url.starts_with(KAFKA_PREFIX) => Box::new(KafkaSource::from_url(url, stream)?), + Some(url) if url.starts_with(KAFKA_PREFIX) => { + Box::new(KafkaSource::from_url(url, stream, pool)?) + } Some(path) => { if stream { bail!("Streaming from a file is not currently supported.");