Skip to content

Commit

Permalink
index: kafka: Add checkpoints
Browse files Browse the repository at this point in the history
  • Loading branch information
tontinton committed Jun 19, 2024
1 parent 089b58f commit b40ca57
Show file tree
Hide file tree
Showing 8 changed files with 207 additions and 31 deletions.
5 changes: 5 additions & 0 deletions migrations/0001_base.down.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
DROP INDEX IF EXISTS kafka_checkpoints_partition_idx;
DROP INDEX IF EXISTS kafka_checkpoints_source_id_idx;
DROP TABLE IF EXISTS kafka_checkpoints;
DROP TABLE IF EXISTS index_files;
DROP TABLE IF EXISTS indexes;
Original file line number Diff line number Diff line change
Expand Up @@ -9,3 +9,11 @@ CREATE TABLE IF NOT EXISTS index_files(
file_name TEXT NOT NULL,
footer_len BIGINT NOT NULL
);

CREATE TABLE IF NOT EXISTS kafka_checkpoints(
source_id TEXT NOT NULL,
partition INT NOT NULL,
offset_value BIGINT NOT NULL,

PRIMARY KEY (source_id, partition)
);
2 changes: 0 additions & 2 deletions migrations/0001_indexes.down.sql

This file was deleted.

4 changes: 3 additions & 1 deletion src/commands/index.rs
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,8 @@ async fn pipe_source_to_index(

write_unified_index(&id, &index, &index_dir, &config.name, &config.path, pool).await?;

source.on_index_created().await?;

Ok(did_timeout)
}

Expand All @@ -137,7 +139,7 @@ pub async fn run_index(args: IndexArgs, pool: &PgPool) -> Result<()> {
build_parsers_from_field_configs(&config.schema.fields, &mut schema_builder)?;
let schema = schema_builder.build();

let mut source = connect_to_source(args.input.as_deref(), args.stream).await?;
let mut source = connect_to_source(args.input.as_deref(), args.stream, pool).await?;

while pipe_source_to_index(
&mut source,
Expand Down
4 changes: 4 additions & 0 deletions src/commands/sources/buf_source.rs
Original file line number Diff line number Diff line change
Expand Up @@ -47,4 +47,8 @@ impl Source for BufSource {
self.line.clear();
Ok(SourceItem::Document(map))
}

async fn on_index_created(&mut self) -> Result<()> {
Ok(())
}
}
78 changes: 78 additions & 0 deletions src/commands/sources/kafka_checkpoint.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
use std::collections::BTreeMap;

use color_eyre::Result;
use sqlx::PgPool;

#[derive(Debug, Clone)]
pub struct Checkpoint {
source_id: String,
pool: PgPool,
}

impl Checkpoint {
pub fn new(source_id: String, pool: PgPool) -> Self {
Self { source_id, pool }
}

pub async fn load(&self, partitions: &[i32]) -> Result<Vec<(i32, Option<i64>)>> {
if partitions.is_empty() {
return Ok(Vec::new());
}

let placeholders = (0..partitions.len())
.map(|i| format!("${}", i + 2))
.collect::<Vec<_>>()
.join(", ");

let sql = format!(
"SELECT partition, offset_value FROM kafka_checkpoints WHERE source_id = $1 AND partition IN ({})",
placeholders
);

let mut query = sqlx::query_as(&sql).bind(&self.source_id);
for partition in partitions {
query = query.bind(partition);
}

let partitions_and_offsets: Vec<(i32, i64)> = query.fetch_all(&self.pool).await?;
debug!("Loaded checkpoints: {partitions_and_offsets:?}");

let partitions_to_offsets = partitions_and_offsets
.into_iter()
.collect::<BTreeMap<i32, i64>>();

Ok(partitions
.iter()
.map(|partition| (*partition, partitions_to_offsets.get(partition).copied()))
.collect())
}

pub async fn save(&self, partitions_and_offsets: &[(i32, i64)]) -> Result<()> {
let items = partitions_and_offsets
.iter()
// Add 1 as we don't want to seek to the last record already read, but the next.
.map(|(p, o)| (&self.source_id, *p, *o + 1))
.collect::<Vec<_>>();

let mut sql = String::from(
"INSERT INTO kafka_checkpoints (source_id, partition, offset_value) VALUES ",
);

let params = (0..items.len())
.map(|i| format!("(${}, ${}, ${})", i * 3 + 1, i * 3 + 2, i * 3 + 3))
.collect::<Vec<_>>();
sql.push_str(&params.join(", "));
sql.push_str(" ON CONFLICT (source_id, partition) DO UPDATE SET offset_value = EXCLUDED.offset_value");

debug!("Saving checkpoints: {partitions_and_offsets:?}");

let mut query = sqlx::query(&sql);
for (source_id, partition, offset) in items {
query = query.bind(source_id).bind(partition).bind(offset);
}

query.execute(&self.pool).await?;

Ok(())
}
}
120 changes: 94 additions & 26 deletions src/commands/sources/kafka_source.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use std::time::Duration;
use std::{collections::BTreeMap, time::Duration};

use async_trait::async_trait;
use color_eyre::{
Expand All @@ -11,24 +11,29 @@ use rdkafka::{
error::KafkaError,
ClientConfig, ClientContext, Message, Offset,
};
use sqlx::PgPool;
use tokio::{
sync::{mpsc, oneshot},
task::spawn_blocking,
};

use super::{Source, SourceItem};
use super::{kafka_checkpoint::Checkpoint, Source, SourceItem};

pub const KAFKA_PREFIX: &str = "kafka://";
const CONSUMER_THREAD_MESSAGES_CHANNEL_SIZE: usize = 10;
const POLL_DURATION: Duration = Duration::from_secs(1);

enum MessageFromConsumerThread {
Payload(Vec<u8>),
Payload {
bytes: Vec<u8>,
partition: i32,
offset: i64,
},
Eof,
PreRebalance,
PostRebalance {
partitions: Vec<i32>,
offsets_tx: oneshot::Sender<Vec<Offset>>,
checkpoint_tx: oneshot::Sender<Vec<(i32, Option<i64>)>>,
},
}

Expand Down Expand Up @@ -60,36 +65,50 @@ impl ConsumerContext for KafkaContext {
return;
}

let partitions = tpl
.elements()
.iter()
.map(|x| x.partition())
.collect::<Vec<_>>();
// elements() panics when tpl is empty, so we check capacity.
let partitions = if tpl.capacity() > 0 {
tpl.elements()
.iter()
.map(|x| {
assert_eq!(x.topic(), self.topic);
x.partition()
})
.collect::<Vec<_>>()
} else {
Vec::new()
};

let (tx, rx) = oneshot::channel();
let msg = MessageFromConsumerThread::PostRebalance {
partitions: partitions.clone(),
offsets_tx: tx,
partitions,
checkpoint_tx: tx,
};
if let Err(e) = self.messages_tx.blocking_send(Ok(msg)) {
error!("Failed to send post-rebalance event: {e}");
return;
}

let offsets_result = rx.blocking_recv();
if let Err(e) = offsets_result {
let checkpoint_result = rx.blocking_recv();
if let Err(e) = checkpoint_result {
error!("Failed to recv post-rebalance offsets: {e}");
return;
}
let offsets = offsets_result.unwrap();
let partitions_and_offsets = checkpoint_result.unwrap();

for (id, offset) in partitions.into_iter().zip(offsets) {
for (id, offset) in partitions_and_offsets {
let Some(mut partition) = tpl.find_partition(&self.topic, id) else {
warn!("Partition id '{id}' not found?");
continue;
};
if let Err(e) = partition.set_offset(offset) {
warn!("Failed to set offset to '{offset:?}' for partition id '{id}': {e}");

let rdkafka_offset = if let Some(offset) = offset {
Offset::Offset(offset)
} else {
Offset::Beginning
};

if let Err(e) = partition.set_offset(rdkafka_offset) {
warn!("Failed to set offset to '{rdkafka_offset:?}' for partition id '{id}': {e}");
}
}
}
Expand All @@ -104,6 +123,8 @@ type KafkaConsumer = BaseConsumer<KafkaContext>;

pub struct KafkaSource {
messages_rx: mpsc::Receiver<Result<MessageFromConsumerThread>>,
checkpoint: Option<Checkpoint>,
partition_to_offset: BTreeMap<i32, i64>,
}

fn parse_url(url: &str) -> Result<(&str, &str)> {
Expand Down Expand Up @@ -156,7 +177,11 @@ fn run_consumer_thread(
}

let msg = msg
.map(|x| MessageFromConsumerThread::Payload(x.payload().unwrap().to_vec()))
.map(|x| MessageFromConsumerThread::Payload {
bytes: x.payload().unwrap().to_vec(),
partition: x.partition(),
offset: x.offset(),
})
.map_err(|e| Report::new(e));

if let Err(e) = tx.blocking_send(msg) {
Expand All @@ -168,7 +193,7 @@ fn run_consumer_thread(
}

impl KafkaSource {
pub fn from_url(url: &str, stream: bool) -> Result<Self> {
pub fn from_url(url: &str, stream: bool, pool: &PgPool) -> Result<Self> {
let (servers, topic) = parse_url(url)?;

let log_level = if cfg!(debug_assertions) {
Expand Down Expand Up @@ -213,7 +238,18 @@ impl KafkaSource {

run_consumer_thread(consumer, tx);

Ok(Self { messages_rx: rx })
let checkpoint = if stream {
// Url is not a good identifier as a source id, but we'll live with it for now.
Some(Checkpoint::new(url.to_string(), pool.clone()))
} else {
None
};

Ok(Self {
messages_rx: rx,
checkpoint,
partition_to_offset: BTreeMap::new(),
})
}
}

Expand All @@ -226,7 +262,12 @@ impl Source for KafkaSource {
};

match msg? {
MessageFromConsumerThread::Payload(bytes) => {
MessageFromConsumerThread::Payload {
bytes,
partition,
offset,
} => {
self.partition_to_offset.insert(partition, offset);
break SourceItem::Document(serde_json::from_slice(&bytes)?);
}
MessageFromConsumerThread::Eof => {
Expand All @@ -235,15 +276,42 @@ impl Source for KafkaSource {
MessageFromConsumerThread::PreRebalance => {
break SourceItem::Restart;
}
MessageFromConsumerThread::PostRebalance{partitions, offsets_tx} => {
if offsets_tx
.send(partitions.into_iter().map(|_| Offset::Stored).collect())
.is_err()
{
MessageFromConsumerThread::PostRebalance {
partitions,
checkpoint_tx,
} => {
let Some(ref checkpoint) = self.checkpoint else {
continue;
};

let partitions_and_offsets = checkpoint
.load(&partitions)
.await
.context("failed to load checkpoint")?;
if checkpoint_tx.send(partitions_and_offsets).is_err() {
bail!("failed to respond with partition offsets, kafka consumer thread probably closed")
}

self.partition_to_offset.clear();
}
}
})
}

async fn on_index_created(&mut self) -> Result<()> {
let Some(ref checkpoint) = self.checkpoint else {
return Ok(());
};

let flat = self
.partition_to_offset
.iter()
.map(|(p, o)| (*p, *o))
.collect::<Vec<_>>();
checkpoint.save(&flat).await?;

self.partition_to_offset.clear();

Ok(())
}
}
17 changes: 15 additions & 2 deletions src/commands/sources/mod.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,10 @@
mod buf_source;
mod kafka_checkpoint;
mod kafka_source;

use async_trait::async_trait;
use color_eyre::{eyre::bail, Result};
use sqlx::PgPool;

use self::{
buf_source::BufSource,
Expand All @@ -24,12 +26,23 @@ pub enum SourceItem {

#[async_trait]
pub trait Source {
/// Get a document from the source.
async fn get_one(&mut self) -> Result<SourceItem>;

/// Called when an index file was just created to notify the source.
/// Useful for implementing checkpoints for example.
async fn on_index_created(&mut self) -> Result<()>;
}

pub async fn connect_to_source(input: Option<&str>, stream: bool) -> Result<Box<dyn Source>> {
pub async fn connect_to_source(
input: Option<&str>,
stream: bool,
pool: &PgPool,
) -> Result<Box<dyn Source>> {
Ok(match input {
Some(url) if url.starts_with(KAFKA_PREFIX) => Box::new(KafkaSource::from_url(url, stream)?),
Some(url) if url.starts_with(KAFKA_PREFIX) => {
Box::new(KafkaSource::from_url(url, stream, pool)?)
}
Some(path) => {
if stream {
bail!("Streaming from a file is not currently supported.");
Expand Down

0 comments on commit b40ca57

Please sign in to comment.