Skip to content

Commit

Permalink
index: Create schema dynamically from schema stored in metadata
Browse files Browse the repository at this point in the history
  • Loading branch information
tontinton committed May 31, 2024
1 parent cf7b62f commit 4f14e2b
Show file tree
Hide file tree
Showing 3 changed files with 87 additions and 33 deletions.
3 changes: 3 additions & 0 deletions example_config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -8,3 +8,6 @@ schema:
type: !datetime
formats:
- timestamp
indexed: true
stored: true
fast: true
20 changes: 10 additions & 10 deletions src/index_config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ impl From<FastTextFieldType> for Option<&str> {
}

#[derive(Debug, Clone, Serialize, Deserialize, Default)]
struct TextMappingConfig {
pub struct TextMappingConfig {
#[serde(default = "default_true")]
pub stored: bool,

Expand Down Expand Up @@ -79,15 +79,15 @@ impl From<DateTimeFastPrecisionType> for Option<DateTimePrecision> {

#[derive(Debug, Clone, Serialize, Deserialize)]
#[serde(rename_all = "snake_case")]
enum DateTimeFormatType {
pub enum DateTimeFormatType {
Iso8601,
Rfc2822,
Rfc3339,
Timestamp,
}

#[derive(Clone, Debug, Serialize, Deserialize)]
struct DateTimeFormats(Vec<DateTimeFormatType>);
pub struct DateTimeFormats(Vec<DateTimeFormatType>);

impl Default for DateTimeFormats {
fn default() -> Self {
Expand All @@ -107,7 +107,7 @@ impl Deref for DateTimeFormats {
}

#[derive(Debug, Clone, Serialize, Deserialize, Default)]
struct DateTimeMappingConfig {
pub struct DateTimeMappingConfig {
#[serde(default = "default_true")]
pub stored: bool,

Expand All @@ -123,21 +123,21 @@ struct DateTimeMappingConfig {

#[derive(Debug, Clone, Serialize, Deserialize)]
#[serde(rename_all = "snake_case")]
enum MappingFieldType {
pub enum MappingFieldType {
Text(TextMappingConfig),
Datetime(DateTimeMappingConfig),
}

#[derive(Debug, Clone, Serialize, Deserialize)]
struct MappingConfig {
pub struct MappingConfig {
#[serde(rename = "type")]
type_: MappingFieldType,
pub type_: MappingFieldType,
}

#[derive(Debug, Clone, Serialize, Deserialize, Default)]
struct IndexSchema {
pub struct IndexSchema {
#[serde(default)]
mappings: HashMap<String, MappingConfig>,
pub mappings: HashMap<String, MappingConfig>,

#[serde(default)]
#[serde(skip_serializing_if = "Option::is_none")]
Expand All @@ -153,7 +153,7 @@ pub(crate) struct IndexConfig {
version: u32,

#[serde(default)]
schema: IndexSchema,
pub schema: IndexSchema,
}

impl IndexConfig {
Expand Down
97 changes: 74 additions & 23 deletions src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,7 @@ use tantivy::{
directory::{DirectoryClone, FileSlice, MmapDirectory},
indexer::NoMergePolicy,
query::QueryParser,
schema::{
DateOptions, DateTimePrecision, JsonObjectOptions, Schema, FAST, INDEXED, STORED, STRING,
},
schema::{Field, JsonObjectOptions, OwnedValue, Schema, STORED, STRING},
DateTime, Document, Index, IndexWriter, ReloadPolicy, TantivyDocument,
};
use tokio::{
Expand All @@ -42,6 +40,7 @@ use unified_index::unified_directory::UnifiedDirectory;

use crate::{
args::{parse_args, SubCommand},
index_config::MappingFieldType,
merge_directory::MergeDirectory,
opendal_file_handle::OpenDalFileHandle,
unified_index::{file_cache::build_file_cache, writer::UnifiedIndexWriter},
Expand Down Expand Up @@ -120,7 +119,8 @@ async fn open_unified_directories(
async fn write_unified_index(
index: Index,
input_dir: &str,
config: &IndexConfig,
index_name: &str,
index_dir: &str,
pool: &PgPool,
) -> Result<()> {
let cloned_input_dir = PathBuf::from(input_dir);
Expand All @@ -133,7 +133,7 @@ async fn write_unified_index(
.await?;

let mut builder = opendal::services::Fs::default();
builder.root(&config.path);
builder.root(index_dir);

let op = Operator::new(builder)?
.layer(LoggingLayer::default())
Expand All @@ -158,7 +158,7 @@ async fn write_unified_index(
"INSERT INTO index_files (id, index_name, file_name, footer_len) VALUES ($1, $2, $3, $4)",
)
.bind(&id.to_string())
.bind(&config.name)
.bind(index_name)
.bind(&file_name)
.bind(footer_len as i64)
.execute(pool)
Expand Down Expand Up @@ -208,21 +208,70 @@ async fn run_drop(args: DropArgs, pool: PgPool, base_path: &str) -> Result<()> {
Ok(())
}

async fn run_index(args: IndexArgs, pool: PgPool, config: &IndexConfig) -> Result<()> {
fn common_parse(value: serde_json::Value) -> Result<OwnedValue> {
Ok(serde_json::from_value(value)?)
}

pub fn parse_timestamp(timestamp: i64) -> Result<DateTime> {
// Minimum supported timestamp value in seconds (13 Apr 1972 23:59:55 GMT).
const MIN_TIMESTAMP_SECONDS: i64 = 72_057_595;

// Maximum supported timestamp value in seconds (16 Mar 2242 12:56:31 GMT).
const MAX_TIMESTAMP_SECONDS: i64 = 8_589_934_591;

const MIN_TIMESTAMP_MILLIS: i64 = MIN_TIMESTAMP_SECONDS * 1000;
const MAX_TIMESTAMP_MILLIS: i64 = MAX_TIMESTAMP_SECONDS * 1000;
const MIN_TIMESTAMP_MICROS: i64 = MIN_TIMESTAMP_SECONDS * 1_000_000;
const MAX_TIMESTAMP_MICROS: i64 = MAX_TIMESTAMP_SECONDS * 1_000_000;
const MIN_TIMESTAMP_NANOS: i64 = MIN_TIMESTAMP_SECONDS * 1_000_000_000;
const MAX_TIMESTAMP_NANOS: i64 = MAX_TIMESTAMP_SECONDS * 1_000_000_000;

match timestamp {
MIN_TIMESTAMP_SECONDS..=MAX_TIMESTAMP_SECONDS => {
Ok(DateTime::from_timestamp_secs(timestamp))
}
MIN_TIMESTAMP_MILLIS..=MAX_TIMESTAMP_MILLIS => {
Ok(DateTime::from_timestamp_millis(timestamp))
}
MIN_TIMESTAMP_MICROS..=MAX_TIMESTAMP_MICROS => {
Ok(DateTime::from_timestamp_micros(timestamp))
}
MIN_TIMESTAMP_NANOS..=MAX_TIMESTAMP_NANOS => Ok(DateTime::from_timestamp_nanos(timestamp)),
_ => Err(eyre!(
"failed to parse unix timestamp `{timestamp}`. Supported timestamp ranges \
from `13 Apr 1972 23:59:55` to `16 Mar 2242 12:56:31`"
)),
}
}

async fn run_index(args: IndexArgs, pool: PgPool, config: IndexConfig) -> Result<()> {
let mut schema_builder = Schema::builder();
let dynamic_field = schema_builder.add_json_field(
"_dynamic",
JsonObjectOptions::from(STORED | STRING).set_expand_dots_enabled(),
);
let timestamp_field = schema_builder.add_date_field(
"timestamp",
DateOptions::from(INDEXED | STORED | FAST).set_precision(DateTimePrecision::Seconds),
);

let mut fields = Vec::<(String, Field, fn(serde_json::Value) -> Result<OwnedValue>)>::new();
for (name, schema) in config.schema.mappings {
match schema.type_ {
MappingFieldType::Text(options) => {
let field = schema_builder.add_text_field(&name, options);
fields.push((name, field, common_parse));
}
MappingFieldType::Datetime(options) => {
let field = schema_builder.add_date_field(&name, options);
fields.push((name, field, |v| {
let timestamp: i64 = serde_json::from_value(v)?;
Ok(parse_timestamp(timestamp)?.into())
}));
}
}
}

let schema = schema_builder.build();

let _ = create_dir_all(&args.build_dir).await;
let index = Index::open_or_create(MmapDirectory::open(&args.build_dir)?, schema.clone())?;
let index = Index::open_or_create(MmapDirectory::open(&args.build_dir)?, schema)?;
let mut index_writer: IndexWriter = index.writer(args.memory_budget)?;
index_writer.set_merge_policy(Box::new(NoMergePolicy));

Expand All @@ -239,15 +288,17 @@ async fn run_index(args: IndexArgs, pool: PgPool, config: &IndexConfig) -> Resul

let mut doc = TantivyDocument::new();
let mut json_obj: serde_json::Map<String, serde_json::Value> = serde_json::from_str(&line)?;
if let Some(timestamp_val) = json_obj.remove("timestamp") {
if let Some(timestamp) = timestamp_val.as_i64() {
doc.add_field_value(timestamp_field, DateTime::from_timestamp_secs(timestamp));
doc.add_field_value(dynamic_field, json_obj);
index_writer.add_document(doc)?;
added += 1;

for (name, field, parse_fn) in &fields {
if let Some(value) = json_obj.remove(name) {
doc.add_field_value(*field, parse_fn(value)?);
}
}

doc.add_field_value(dynamic_field, json_obj);
index_writer.add_document(doc)?;
added += 1;

line.clear();
}

Expand All @@ -262,12 +313,12 @@ async fn run_index(args: IndexArgs, pool: PgPool, config: &IndexConfig) -> Resul

spawn_blocking(move || index_writer.wait_merging_threads()).await??;

write_unified_index(index, &args.build_dir, config, &pool).await?;
write_unified_index(index, &args.build_dir, &config.name, &config.path, &pool).await?;

Ok(())
}

async fn run_merge(args: MergeArgs, pool: PgPool, config: &IndexConfig) -> Result<()> {
async fn run_merge(args: MergeArgs, pool: PgPool, config: IndexConfig) -> Result<()> {
let (ids, directories): (Vec<_>, Vec<_>) = open_unified_directories(&config.path, &pool)
.await?
.into_iter()
Expand All @@ -294,7 +345,7 @@ async fn run_merge(args: MergeArgs, pool: PgPool, config: &IndexConfig) -> Resul

spawn_blocking(move || index_writer.wait_merging_threads()).await??;

write_unified_index(index, &args.merge_dir, config, &pool).await?;
write_unified_index(index, &args.merge_dir, &config.name, &config.path, &pool).await?;

let delete_result = query("DELETE FROM index_files WHERE id = ANY($1)")
.bind(&ids)
Expand Down Expand Up @@ -426,11 +477,11 @@ async fn async_main() -> Result<()> {
}
SubCommand::Index(index_args) => {
let config = get_index_config(&index_args.name, &pool).await?;
run_index(index_args, pool, &config).await?;
run_index(index_args, pool, config).await?;
}
SubCommand::Merge(merge_args) => {
let config = get_index_config(&merge_args.name, &pool).await?;
run_merge(merge_args, pool, &config).await?;
run_merge(merge_args, pool, config).await?;
}
SubCommand::Search(search_args) => {
let path = get_index_path(&search_args.name, &pool).await?;
Expand Down

0 comments on commit 4f14e2b

Please sign in to comment.