Skip to content

Commit

Permalink
Move handling of cli commands to new commands dir
Browse files Browse the repository at this point in the history
To reduce `main.rs` noise.
  • Loading branch information
tontinton committed Jun 1, 2024
1 parent 12dec08 commit c228a6e
Show file tree
Hide file tree
Showing 7 changed files with 541 additions and 474 deletions.
18 changes: 18 additions & 0 deletions src/commands/create.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
use color_eyre::Result;
use sqlx::{query, PgPool};

use crate::{args::CreateArgs, index_config::IndexConfig};

pub async fn run_create(args: CreateArgs, pool: PgPool) -> Result<()> {
let config = IndexConfig::from_path(&args.config_path).await?;

query("INSERT INTO indexes (name, config) VALUES ($1, $2)")
.bind(&config.name)
.bind(&serde_json::to_value(&config)?)
.execute(&pool)
.await?;

info!("Created index: {}", &config.name);

Ok(())
}
42 changes: 42 additions & 0 deletions src/commands/drop.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
use std::path::Path;

use color_eyre::{eyre::eyre, Result};
use sqlx::{query, query_as, PgPool};
use tokio::fs::remove_file;

use crate::args::DropArgs;

use super::get_index_path;

pub async fn run_drop(args: DropArgs, pool: PgPool) -> Result<()> {
let base_path = get_index_path(&args.name, &pool).await?;

let file_names: Vec<(String,)> =
query_as("SELECT file_name FROM index_files WHERE index_name=$1")
.bind(&args.name)
.fetch_all(&pool)
.await?;
let file_names_len = file_names.len();

for (file_name,) in file_names {
let _ = remove_file(
Path::new(&base_path)
.join(file_name)
.to_str()
.ok_or_else(|| eyre!("failed to build index file path"))?,
)
.await;
}

query("DELETE FROM indexes WHERE name=$1")
.bind(&args.name)
.execute(&pool)
.await?;

info!(
"Dropped index: {} ({} number of index files)",
&args.name, file_names_len
);

Ok(())
}
129 changes: 129 additions & 0 deletions src/commands/index.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,129 @@
use color_eyre::{eyre::eyre, Result};
use sqlx::PgPool;
use tantivy::{
directory::MmapDirectory,
indexer::NoMergePolicy,
schema::{Field, JsonObjectOptions, OwnedValue, Schema, STORED, STRING},
DateTime, Index, IndexWriter, TantivyDocument,
};
use tokio::{
fs::{create_dir_all, File},
io::{AsyncBufReadExt, BufReader},
task::spawn_blocking,
};

use crate::{args::IndexArgs, index_config::FieldType};

use super::{get_index_config, write_unified_index};

fn common_parse(value: serde_json::Value) -> Result<OwnedValue> {
Ok(serde_json::from_value(value)?)
}

pub fn parse_timestamp(timestamp: i64) -> Result<DateTime> {
// Minimum supported timestamp value in seconds (13 Apr 1972 23:59:55 GMT).
const MIN_TIMESTAMP_SECONDS: i64 = 72_057_595;

// Maximum supported timestamp value in seconds (16 Mar 2242 12:56:31 GMT).
const MAX_TIMESTAMP_SECONDS: i64 = 8_589_934_591;

const MIN_TIMESTAMP_MILLIS: i64 = MIN_TIMESTAMP_SECONDS * 1000;
const MAX_TIMESTAMP_MILLIS: i64 = MAX_TIMESTAMP_SECONDS * 1000;
const MIN_TIMESTAMP_MICROS: i64 = MIN_TIMESTAMP_SECONDS * 1_000_000;
const MAX_TIMESTAMP_MICROS: i64 = MAX_TIMESTAMP_SECONDS * 1_000_000;
const MIN_TIMESTAMP_NANOS: i64 = MIN_TIMESTAMP_SECONDS * 1_000_000_000;
const MAX_TIMESTAMP_NANOS: i64 = MAX_TIMESTAMP_SECONDS * 1_000_000_000;

match timestamp {
MIN_TIMESTAMP_SECONDS..=MAX_TIMESTAMP_SECONDS => {
Ok(DateTime::from_timestamp_secs(timestamp))
}
MIN_TIMESTAMP_MILLIS..=MAX_TIMESTAMP_MILLIS => {
Ok(DateTime::from_timestamp_millis(timestamp))
}
MIN_TIMESTAMP_MICROS..=MAX_TIMESTAMP_MICROS => {
Ok(DateTime::from_timestamp_micros(timestamp))
}
MIN_TIMESTAMP_NANOS..=MAX_TIMESTAMP_NANOS => Ok(DateTime::from_timestamp_nanos(timestamp)),
_ => Err(eyre!(
"failed to parse unix timestamp `{timestamp}`. Supported timestamp ranges \
from `13 Apr 1972 23:59:55` to `16 Mar 2242 12:56:31`"
)),
}
}

pub async fn run_index(args: IndexArgs, pool: PgPool) -> Result<()> {
let config = get_index_config(&args.name, &pool).await?;

let mut schema_builder = Schema::builder();
let dynamic_field = schema_builder.add_json_field(
"_dynamic",
JsonObjectOptions::from(STORED | STRING).set_expand_dots_enabled(),
);

let mut fields = Vec::<(String, Field, fn(serde_json::Value) -> Result<OwnedValue>)>::new();
for (name, schema) in config.schema.mappings {
match schema.type_ {
FieldType::Text(options) => {
let field = schema_builder.add_text_field(&name, options);
fields.push((name, field, common_parse));
}
FieldType::Datetime(options) => {
let field = schema_builder.add_date_field(&name, options);
fields.push((name, field, |v| {
let timestamp: i64 = serde_json::from_value(v)?;
Ok(parse_timestamp(timestamp)?.into())
}));
}
}
}

let schema = schema_builder.build();

let _ = create_dir_all(&args.build_dir).await;
let index = Index::open_or_create(MmapDirectory::open(&args.build_dir)?, schema)?;
let mut index_writer: IndexWriter = index.writer(args.memory_budget)?;
index_writer.set_merge_policy(Box::new(NoMergePolicy));

let mut reader = BufReader::new(File::open(&args.input_path).await?);

let mut line = String::new();
let mut added = 0;

loop {
let len = reader.read_line(&mut line).await?;
if len == 0 {
break;
}

let mut doc = TantivyDocument::new();
let mut json_obj: serde_json::Map<String, serde_json::Value> = serde_json::from_str(&line)?;

for (name, field, parse_fn) in &fields {
if let Some(value) = json_obj.remove(name) {
doc.add_field_value(*field, parse_fn(value)?);
}
}

doc.add_field_value(dynamic_field, json_obj);
index_writer.add_document(doc)?;
added += 1;

line.clear();
}

info!("Commiting {added} documents");
index_writer.prepare_commit()?.commit_future().await?;

let segment_ids = index.searchable_segment_ids()?;
if segment_ids.len() > 1 {
info!("Merging {} segments", segment_ids.len());
index_writer.merge(&segment_ids).await?;
}

spawn_blocking(move || index_writer.wait_merging_threads()).await??;

write_unified_index(index, &args.build_dir, &config.name, &config.path, &pool).await?;

Ok(())
}
68 changes: 68 additions & 0 deletions src/commands/merge.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
use std::path::Path;

use color_eyre::Result;
use sqlx::{query, PgPool};
use tantivy::{
directory::{DirectoryClone, MmapDirectory},
indexer::NoMergePolicy,
Index, IndexWriter,
};
use tokio::{
fs::{create_dir, remove_file},
task::spawn_blocking,
};

use crate::{args::MergeArgs, merge_directory::MergeDirectory};

use super::{get_index_config, open_unified_directories, write_unified_index};

pub async fn run_merge(args: MergeArgs, pool: PgPool) -> Result<()> {
let config = get_index_config(&args.name, &pool).await?;

let (ids, directories): (Vec<_>, Vec<_>) = open_unified_directories(&config.path, &pool)
.await?
.into_iter()
.map(|(id, dir)| (id, dir.box_clone()))
.unzip();

if directories.len() <= 1 {
info!("Need at least 2 files in index directory to be able to merge");
return Ok(());
}

let _ = create_dir(&args.merge_dir).await;
let output_dir = MmapDirectory::open(&args.merge_dir)?;

let index = Index::open(MergeDirectory::new(directories, output_dir.box_clone())?)?;
let mut index_writer: IndexWriter = index.writer_with_num_threads(1, 15_000_000)?;
index_writer.set_merge_policy(Box::new(NoMergePolicy));

let segment_ids = index.searchable_segment_ids()?;
if segment_ids.len() > 1 {
info!("Merging {} segments", segment_ids.len());
index_writer.merge(&segment_ids).await?;
}

spawn_blocking(move || index_writer.wait_merging_threads()).await??;

write_unified_index(index, &args.merge_dir, &config.name, &config.path, &pool).await?;

let delete_result = query("DELETE FROM index_files WHERE id = ANY($1)")
.bind(&ids)
.execute(&pool)
.await;

for id in ids {
let _ = remove_file(
Path::new(&config.path)
.join(format!("{}.index", id))
.to_str()
.expect("failed to build index path"),
)
.await;
}

delete_result?;

Ok(())
}
Loading

0 comments on commit c228a6e

Please sign in to comment.