From e515b7428a5e03ca6278989018ab3e7d65099edd Mon Sep 17 00:00:00 2001 From: Tony Solomonik Date: Sun, 2 Jun 2024 20:03:03 +0300 Subject: [PATCH] index: Export field parsing login to new `field_parser.rs` file --- src/commands/field_parser.rs | 143 ++++++++++++++++++++++++++++++++ src/commands/index.rs | 156 +++-------------------------------- src/commands/mod.rs | 1 + 3 files changed, 154 insertions(+), 146 deletions(-) create mode 100644 src/commands/field_parser.rs diff --git a/src/commands/field_parser.rs b/src/commands/field_parser.rs new file mode 100644 index 0000000..151beaa --- /dev/null +++ b/src/commands/field_parser.rs @@ -0,0 +1,143 @@ +use color_eyre::{eyre::eyre, Result}; +use tantivy::{ + schema::{Field, OwnedValue, SchemaBuilder}, + TantivyDocument, +}; + +use crate::config::{number::NumberFieldType, FieldConfig, FieldType}; + +type ParseFn = Box Result>; + +pub struct FieldParser { + pub name: String, + field: Field, + parse_fn: ParseFn, + is_array: bool, +} + +impl FieldParser { + /// Parses the `serde_json::Value` into a `tantivy::OwnedValue` and adds + /// the result into the tantivy document. + pub fn add_parsed_field_value( + &self, + doc: &mut TantivyDocument, + json_value: serde_json::Value, + ) -> Result<()> { + if self.is_array { + let values: Vec = serde_json::from_value(json_value)?; + for value in values { + doc.add_field_value(self.field, (self.parse_fn)(value)?); + } + } else { + let value = (self.parse_fn)(json_value)?; + doc.add_field_value(self.field, value); + } + + Ok(()) + } +} + +fn common_parse(value: serde_json::Value) -> Result { + Ok(serde_json::from_value(value)?) +} + +fn build_parser_from_field_config( + config: FieldConfig, + schema_builder: &mut SchemaBuilder, +) -> Result { + let name = config.name; + + let (field, parse_fn): (Field, ParseFn) = match config.type_ { + FieldType::Text(options) => { + let field = schema_builder.add_text_field(&name, options); + (field, Box::new(common_parse)) + } + FieldType::Number(options) => { + let field_type = options.type_.clone(); + let parse_string = options.parse_string; + let field = match field_type { + NumberFieldType::U64 => schema_builder.add_u64_field(&name, options), + NumberFieldType::I64 => schema_builder.add_i64_field(&name, options), + NumberFieldType::F64 => schema_builder.add_f64_field(&name, options), + }; + + ( + field, + Box::new(move |value| { + if !parse_string { + return common_parse(value); + } + + if let Ok(value_str) = serde_json::from_value::(value.clone()) { + Ok(match field_type { + NumberFieldType::U64 => value_str.parse::()?.into(), + NumberFieldType::I64 => value_str.parse::()?.into(), + NumberFieldType::F64 => value_str.parse::()?.into(), + }) + } else { + common_parse(value) + } + }), + ) + } + FieldType::Boolean(options) => { + let parse_string = options.parse_string; + let field = schema_builder.add_bool_field(&name, options); + ( + field, + Box::new(move |value| { + if !parse_string { + return common_parse(value); + } + + if let Ok(value_str) = serde_json::from_value::(value.clone()) { + let trimmed = value_str.trim(); + if trimmed.len() < 4 || trimmed.len() > 5 { + return Err(eyre!("cannot parse '{}' as boolean", trimmed)); + } + let value_str = trimmed.to_lowercase(); + match value_str.as_str() { + "true" => Ok(true.into()), + "false" => Ok(false.into()), + _ => Err(eyre!("cannot parse '{}' as boolean", trimmed)), + } + } else { + common_parse(value) + } + }), + ) + } + FieldType::Datetime(options) => { + let field = schema_builder.add_date_field(&name, options.clone()); + ( + field, + Box::new(move |value| options.formats.try_parse(value)), + ) + } + FieldType::Ip(options) => { + let field = schema_builder.add_ip_addr_field(&name, options); + (field, Box::new(common_parse)) + } + FieldType::DynamicObject(options) => { + let field = schema_builder.add_json_field(&name, options); + (field, Box::new(common_parse)) + } + }; + + Ok(FieldParser { + name, + field, + parse_fn, + is_array: config.array, + }) +} + +pub fn build_parsers_from_fields_config( + fields: Vec, + schema_builder: &mut SchemaBuilder, +) -> Result> { + fields + .into_iter() + .map(|field| build_parser_from_field_config(field, schema_builder)) + .collect::>>() +} diff --git a/src/commands/index.rs b/src/commands/index.rs index 31912b8..6508a8c 100644 --- a/src/commands/index.rs +++ b/src/commands/index.rs @@ -1,10 +1,8 @@ -use color_eyre::{eyre::eyre, Result}; +use color_eyre::Result; use sqlx::PgPool; use tantivy::{ - directory::MmapDirectory, - indexer::NoMergePolicy, - schema::{Field, OwnedValue, Schema, SchemaBuilder}, - Index, IndexWriter, TantivyDocument, + directory::MmapDirectory, indexer::NoMergePolicy, schema::Schema, Index, IndexWriter, + TantivyDocument, }; use tokio::{ fs::{create_dir_all, File}, @@ -12,142 +10,17 @@ use tokio::{ task::spawn_blocking, }; -use crate::{ - args::IndexArgs, - config::{number::NumberFieldType, FieldConfig, FieldType}, -}; +use crate::{args::IndexArgs, commands::field_parser::build_parsers_from_fields_config}; use super::{dynamic_field_config, get_index_config, write_unified_index, DYNAMIC_FIELD_NAME}; -enum ParsedValue { - Single(OwnedValue), - Multiple(Vec), -} - -type SingleParseFn = Box Result>; -type ParseFn = Box Result>; - -type FieldParser = ( - String, // name - Field, // tantivy field in schema - ParseFn, // parse function -); - -fn common_parse(value: serde_json::Value) -> Result { - Ok(serde_json::from_value(value)?) -} - -fn get_field_parser( - config: FieldConfig, - schema_builder: &mut SchemaBuilder, -) -> Result { - let name = config.name; - - let (field, parse_single_fn): (Field, SingleParseFn) = match config.type_ { - FieldType::Text(options) => { - let field = schema_builder.add_text_field(&name, options); - (field, Box::new(common_parse)) - } - FieldType::Number(options) => { - let field_type = options.type_.clone(); - let parse_string = options.parse_string; - let field = match field_type { - NumberFieldType::U64 => schema_builder.add_u64_field(&name, options), - NumberFieldType::I64 => schema_builder.add_i64_field(&name, options), - NumberFieldType::F64 => schema_builder.add_f64_field(&name, options), - }; - - ( - field, - Box::new(move |value| { - if !parse_string { - return common_parse(value); - } - - if let Ok(value_str) = serde_json::from_value::(value.clone()) { - Ok(match field_type { - NumberFieldType::U64 => value_str.parse::()?.into(), - NumberFieldType::I64 => value_str.parse::()?.into(), - NumberFieldType::F64 => value_str.parse::()?.into(), - }) - } else { - common_parse(value) - } - }), - ) - } - FieldType::Boolean(options) => { - let parse_string = options.parse_string; - let field = schema_builder.add_bool_field(&name, options); - ( - field, - Box::new(move |value| { - if !parse_string { - return common_parse(value); - } - - if let Ok(value_str) = serde_json::from_value::(value.clone()) { - let trimmed = value_str.trim(); - if trimmed.len() < 4 || trimmed.len() > 5 { - return Err(eyre!("cannot parse '{}' as boolean", trimmed)); - } - let value_str = trimmed.to_lowercase(); - match value_str.as_str() { - "true" => Ok(true.into()), - "false" => Ok(false.into()), - _ => Err(eyre!("cannot parse '{}' as boolean", trimmed)), - } - } else { - common_parse(value) - } - }), - ) - } - FieldType::Datetime(options) => { - let field = schema_builder.add_date_field(&name, options.clone()); - ( - field, - Box::new(move |value| options.formats.try_parse(value)), - ) - } - FieldType::Ip(options) => { - let field = schema_builder.add_ip_addr_field(&name, options); - (field, Box::new(common_parse)) - } - FieldType::DynamicObject(options) => { - let field = schema_builder.add_json_field(&name, options); - (field, Box::new(common_parse)) - } - }; - - let parse_fn: ParseFn = if config.array { - Box::new(move |value| { - let array: Vec = serde_json::from_value(value)?; - Ok(ParsedValue::Multiple( - array - .into_iter() - .map(&parse_single_fn) - .collect::>>()?, - )) - }) - } else { - Box::new(move |value| Ok(ParsedValue::Single(parse_single_fn(value)?))) - }; - - Ok((name, field, parse_fn)) -} - pub async fn run_index(args: IndexArgs, pool: PgPool) -> Result<()> { let config = get_index_config(&args.name, &pool).await?; let mut schema_builder = Schema::builder(); let dynamic_field = schema_builder.add_json_field(DYNAMIC_FIELD_NAME, dynamic_field_config()); - let field_parsers = config - .schema - .fields - .into_iter() - .map(|field| get_field_parser(field, &mut schema_builder)) - .collect::>>()?; + let field_parsers = + build_parsers_from_fields_config(config.schema.fields, &mut schema_builder)?; let schema = schema_builder.build(); @@ -170,24 +43,15 @@ pub async fn run_index(args: IndexArgs, pool: PgPool) -> Result<()> { let mut doc = TantivyDocument::new(); let mut json_obj: serde_json::Map = serde_json::from_str(&line)?; - for (name, field, parse_fn) in &field_parsers { + for field_parser in &field_parsers { + let name = &field_parser.name; let Some(json_value) = json_obj.remove(name) else { debug!("field '{}' in schema but not found", &name); continue; }; - match parse_fn(json_value) { - Ok(ParsedValue::Single(value)) => { - doc.add_field_value(*field, value); - } - Ok(ParsedValue::Multiple(values)) => { - for value in values { - doc.add_field_value(*field, value); - } - } - Err(e) => { - error!("failed to parse '{}': {}", &name, e); - } + if let Err(e) = field_parser.add_parsed_field_value(&mut doc, json_value) { + error!("failed to parse '{}': {}", &name, e); } } diff --git a/src/commands/mod.rs b/src/commands/mod.rs index f2c33f9..94932a8 100644 --- a/src/commands/mod.rs +++ b/src/commands/mod.rs @@ -1,5 +1,6 @@ pub mod create; pub mod drop; +pub mod field_parser; pub mod index; pub mod merge; pub mod search;