Skip to content

Commit

Permalink
index: Export field parsing login to new field_parser.rs file
Browse files Browse the repository at this point in the history
  • Loading branch information
tontinton committed Jun 2, 2024
1 parent 8177c80 commit e515b74
Show file tree
Hide file tree
Showing 3 changed files with 154 additions and 146 deletions.
143 changes: 143 additions & 0 deletions src/commands/field_parser.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,143 @@
use color_eyre::{eyre::eyre, Result};
use tantivy::{
schema::{Field, OwnedValue, SchemaBuilder},
TantivyDocument,
};

use crate::config::{number::NumberFieldType, FieldConfig, FieldType};

type ParseFn = Box<dyn Fn(serde_json::Value) -> Result<OwnedValue>>;

pub struct FieldParser {
pub name: String,
field: Field,
parse_fn: ParseFn,
is_array: bool,
}

impl FieldParser {
/// Parses the `serde_json::Value` into a `tantivy::OwnedValue` and adds
/// the result into the tantivy document.
pub fn add_parsed_field_value(
&self,
doc: &mut TantivyDocument,
json_value: serde_json::Value,
) -> Result<()> {
if self.is_array {
let values: Vec<serde_json::Value> = serde_json::from_value(json_value)?;
for value in values {
doc.add_field_value(self.field, (self.parse_fn)(value)?);
}
} else {
let value = (self.parse_fn)(json_value)?;
doc.add_field_value(self.field, value);
}

Ok(())
}
}

fn common_parse(value: serde_json::Value) -> Result<OwnedValue> {
Ok(serde_json::from_value(value)?)
}

fn build_parser_from_field_config(
config: FieldConfig,
schema_builder: &mut SchemaBuilder,
) -> Result<FieldParser> {
let name = config.name;

let (field, parse_fn): (Field, ParseFn) = match config.type_ {
FieldType::Text(options) => {
let field = schema_builder.add_text_field(&name, options);
(field, Box::new(common_parse))
}
FieldType::Number(options) => {
let field_type = options.type_.clone();
let parse_string = options.parse_string;
let field = match field_type {
NumberFieldType::U64 => schema_builder.add_u64_field(&name, options),
NumberFieldType::I64 => schema_builder.add_i64_field(&name, options),
NumberFieldType::F64 => schema_builder.add_f64_field(&name, options),
};

(
field,
Box::new(move |value| {
if !parse_string {
return common_parse(value);
}

if let Ok(value_str) = serde_json::from_value::<String>(value.clone()) {
Ok(match field_type {
NumberFieldType::U64 => value_str.parse::<u64>()?.into(),
NumberFieldType::I64 => value_str.parse::<i64>()?.into(),
NumberFieldType::F64 => value_str.parse::<f64>()?.into(),
})
} else {
common_parse(value)
}
}),
)
}
FieldType::Boolean(options) => {
let parse_string = options.parse_string;
let field = schema_builder.add_bool_field(&name, options);
(
field,
Box::new(move |value| {
if !parse_string {
return common_parse(value);
}

if let Ok(value_str) = serde_json::from_value::<String>(value.clone()) {
let trimmed = value_str.trim();
if trimmed.len() < 4 || trimmed.len() > 5 {
return Err(eyre!("cannot parse '{}' as boolean", trimmed));
}
let value_str = trimmed.to_lowercase();
match value_str.as_str() {
"true" => Ok(true.into()),
"false" => Ok(false.into()),
_ => Err(eyre!("cannot parse '{}' as boolean", trimmed)),
}
} else {
common_parse(value)
}
}),
)
}
FieldType::Datetime(options) => {
let field = schema_builder.add_date_field(&name, options.clone());
(
field,
Box::new(move |value| options.formats.try_parse(value)),
)
}
FieldType::Ip(options) => {
let field = schema_builder.add_ip_addr_field(&name, options);
(field, Box::new(common_parse))
}
FieldType::DynamicObject(options) => {
let field = schema_builder.add_json_field(&name, options);
(field, Box::new(common_parse))
}
};

Ok(FieldParser {
name,
field,
parse_fn,
is_array: config.array,
})
}

pub fn build_parsers_from_fields_config(
fields: Vec<FieldConfig>,
schema_builder: &mut SchemaBuilder,
) -> Result<Vec<FieldParser>> {
fields
.into_iter()
.map(|field| build_parser_from_field_config(field, schema_builder))
.collect::<Result<Vec<_>>>()
}
156 changes: 10 additions & 146 deletions src/commands/index.rs
Original file line number Diff line number Diff line change
@@ -1,153 +1,26 @@
use color_eyre::{eyre::eyre, Result};
use color_eyre::Result;
use sqlx::PgPool;
use tantivy::{
directory::MmapDirectory,
indexer::NoMergePolicy,
schema::{Field, OwnedValue, Schema, SchemaBuilder},
Index, IndexWriter, TantivyDocument,
directory::MmapDirectory, indexer::NoMergePolicy, schema::Schema, Index, IndexWriter,
TantivyDocument,
};
use tokio::{
fs::{create_dir_all, File},
io::{AsyncBufReadExt, BufReader},
task::spawn_blocking,
};

use crate::{
args::IndexArgs,
config::{number::NumberFieldType, FieldConfig, FieldType},
};
use crate::{args::IndexArgs, commands::field_parser::build_parsers_from_fields_config};

use super::{dynamic_field_config, get_index_config, write_unified_index, DYNAMIC_FIELD_NAME};

enum ParsedValue {
Single(OwnedValue),
Multiple(Vec<OwnedValue>),
}

type SingleParseFn = Box<dyn Fn(serde_json::Value) -> Result<OwnedValue>>;
type ParseFn = Box<dyn Fn(serde_json::Value) -> Result<ParsedValue>>;

type FieldParser = (
String, // name
Field, // tantivy field in schema
ParseFn, // parse function
);

fn common_parse(value: serde_json::Value) -> Result<OwnedValue> {
Ok(serde_json::from_value(value)?)
}

fn get_field_parser(
config: FieldConfig,
schema_builder: &mut SchemaBuilder,
) -> Result<FieldParser> {
let name = config.name;

let (field, parse_single_fn): (Field, SingleParseFn) = match config.type_ {
FieldType::Text(options) => {
let field = schema_builder.add_text_field(&name, options);
(field, Box::new(common_parse))
}
FieldType::Number(options) => {
let field_type = options.type_.clone();
let parse_string = options.parse_string;
let field = match field_type {
NumberFieldType::U64 => schema_builder.add_u64_field(&name, options),
NumberFieldType::I64 => schema_builder.add_i64_field(&name, options),
NumberFieldType::F64 => schema_builder.add_f64_field(&name, options),
};

(
field,
Box::new(move |value| {
if !parse_string {
return common_parse(value);
}

if let Ok(value_str) = serde_json::from_value::<String>(value.clone()) {
Ok(match field_type {
NumberFieldType::U64 => value_str.parse::<u64>()?.into(),
NumberFieldType::I64 => value_str.parse::<i64>()?.into(),
NumberFieldType::F64 => value_str.parse::<f64>()?.into(),
})
} else {
common_parse(value)
}
}),
)
}
FieldType::Boolean(options) => {
let parse_string = options.parse_string;
let field = schema_builder.add_bool_field(&name, options);
(
field,
Box::new(move |value| {
if !parse_string {
return common_parse(value);
}

if let Ok(value_str) = serde_json::from_value::<String>(value.clone()) {
let trimmed = value_str.trim();
if trimmed.len() < 4 || trimmed.len() > 5 {
return Err(eyre!("cannot parse '{}' as boolean", trimmed));
}
let value_str = trimmed.to_lowercase();
match value_str.as_str() {
"true" => Ok(true.into()),
"false" => Ok(false.into()),
_ => Err(eyre!("cannot parse '{}' as boolean", trimmed)),
}
} else {
common_parse(value)
}
}),
)
}
FieldType::Datetime(options) => {
let field = schema_builder.add_date_field(&name, options.clone());
(
field,
Box::new(move |value| options.formats.try_parse(value)),
)
}
FieldType::Ip(options) => {
let field = schema_builder.add_ip_addr_field(&name, options);
(field, Box::new(common_parse))
}
FieldType::DynamicObject(options) => {
let field = schema_builder.add_json_field(&name, options);
(field, Box::new(common_parse))
}
};

let parse_fn: ParseFn = if config.array {
Box::new(move |value| {
let array: Vec<serde_json::Value> = serde_json::from_value(value)?;
Ok(ParsedValue::Multiple(
array
.into_iter()
.map(&parse_single_fn)
.collect::<Result<Vec<_>>>()?,
))
})
} else {
Box::new(move |value| Ok(ParsedValue::Single(parse_single_fn(value)?)))
};

Ok((name, field, parse_fn))
}

pub async fn run_index(args: IndexArgs, pool: PgPool) -> Result<()> {
let config = get_index_config(&args.name, &pool).await?;

let mut schema_builder = Schema::builder();
let dynamic_field = schema_builder.add_json_field(DYNAMIC_FIELD_NAME, dynamic_field_config());
let field_parsers = config
.schema
.fields
.into_iter()
.map(|field| get_field_parser(field, &mut schema_builder))
.collect::<Result<Vec<FieldParser>>>()?;
let field_parsers =
build_parsers_from_fields_config(config.schema.fields, &mut schema_builder)?;

let schema = schema_builder.build();

Expand All @@ -170,24 +43,15 @@ pub async fn run_index(args: IndexArgs, pool: PgPool) -> Result<()> {
let mut doc = TantivyDocument::new();
let mut json_obj: serde_json::Map<String, serde_json::Value> = serde_json::from_str(&line)?;

for (name, field, parse_fn) in &field_parsers {
for field_parser in &field_parsers {
let name = &field_parser.name;
let Some(json_value) = json_obj.remove(name) else {
debug!("field '{}' in schema but not found", &name);
continue;
};

match parse_fn(json_value) {
Ok(ParsedValue::Single(value)) => {
doc.add_field_value(*field, value);
}
Ok(ParsedValue::Multiple(values)) => {
for value in values {
doc.add_field_value(*field, value);
}
}
Err(e) => {
error!("failed to parse '{}': {}", &name, e);
}
if let Err(e) = field_parser.add_parsed_field_value(&mut doc, json_value) {
error!("failed to parse '{}': {}", &name, e);
}
}

Expand Down
1 change: 1 addition & 0 deletions src/commands/mod.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
pub mod create;
pub mod drop;
pub mod field_parser;
pub mod index;
pub mod merge;
pub mod search;
Expand Down

0 comments on commit e515b74

Please sign in to comment.