Skip to content

Commit

Permalink
index: Export json readers to json_reader.rs
Browse files Browse the repository at this point in the history
  • Loading branch information
tontinton committed Jun 12, 2024
1 parent 745b9d8 commit b882d42
Show file tree
Hide file tree
Showing 3 changed files with 68 additions and 63 deletions.
67 changes: 5 additions & 62 deletions src/commands/index.rs
Original file line number Diff line number Diff line change
@@ -1,75 +1,18 @@
use async_trait::async_trait;
use color_eyre::Result;
use sqlx::PgPool;
use tantivy::{
directory::MmapDirectory, indexer::NoMergePolicy, schema::Schema, Index, IndexWriter,
TantivyDocument,
};
use tokio::{
fs::{create_dir_all, File},
io::{stdin, AsyncBufReadExt, AsyncRead, BufReader},
task::spawn_blocking,
};
use tokio::{fs::create_dir_all, task::spawn_blocking};

use crate::{args::IndexArgs, commands::field_parser::build_parsers_from_field_configs};
use crate::{
args::IndexArgs,
commands::{field_parser::build_parsers_from_field_configs, json_reader::build_json_reader},
};

use super::{dynamic_field_config, get_index_config, write_unified_index, DYNAMIC_FIELD_NAME};

type JsonMap = serde_json::Map<String, serde_json::Value>;
type AsyncBufReader = BufReader<Box<dyn AsyncRead + Send + Sync + Unpin>>;

#[async_trait]
trait JsonReader {
async fn next(&mut self) -> Result<Option<JsonMap>>;
}

struct BufJsonReader {
reader: AsyncBufReader,
line: String,
}

impl BufJsonReader {
async fn from_path(path: &str) -> std::io::Result<Self> {
debug!("Reading from '{}'", path);
Ok(Self::from_buf_reader(BufReader::new(Box::new(
File::open(&path).await?,
))))
}

fn from_stdin() -> Self {
debug!("Reading from stdin");
Self::from_buf_reader(BufReader::new(Box::new(stdin())))
}

fn from_buf_reader(reader: AsyncBufReader) -> Self {
Self {
reader,
line: String::new(),
}
}
}

#[async_trait]
impl JsonReader for BufJsonReader {
async fn next(&mut self) -> Result<Option<JsonMap>> {
let len = self.reader.read_line(&mut self.line).await?;
if len == 0 {
return Ok(None);
}

let map = serde_json::from_str(&self.line)?;
self.line.clear();
return Ok(map);
}
}

async fn build_json_reader(input: Option<&str>) -> Result<Box<dyn JsonReader + Unpin>> {
Ok(match input {
Some(path) => Box::new(BufJsonReader::from_path(path).await?),
None => Box::new(BufJsonReader::from_stdin()),
})
}

pub async fn run_index(args: IndexArgs, pool: &PgPool) -> Result<()> {
let config = get_index_config(&args.name, pool).await?;

Expand Down
61 changes: 61 additions & 0 deletions src/commands/json_reader.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
use async_trait::async_trait;
use color_eyre::Result;
use tokio::{
fs::File,
io::{stdin, AsyncBufReadExt, AsyncRead, BufReader},
};

type JsonMap = serde_json::Map<String, serde_json::Value>;
type AsyncBufReader = BufReader<Box<dyn AsyncRead + Send + Sync + Unpin>>;

#[async_trait]
pub trait JsonReader {
async fn next(&mut self) -> Result<Option<JsonMap>>;
}

struct BufJsonReader {
reader: AsyncBufReader,
line: String,
}

impl BufJsonReader {
async fn from_path(path: &str) -> std::io::Result<Self> {
debug!("Reading from '{}'", path);
Ok(Self::from_buf_reader(BufReader::new(Box::new(
File::open(&path).await?,
))))
}

fn from_stdin() -> Self {
debug!("Reading from stdin");
Self::from_buf_reader(BufReader::new(Box::new(stdin())))
}

fn from_buf_reader(reader: AsyncBufReader) -> Self {
Self {
reader,
line: String::new(),
}
}
}

#[async_trait]
impl JsonReader for BufJsonReader {
async fn next(&mut self) -> Result<Option<JsonMap>> {
let len = self.reader.read_line(&mut self.line).await?;
if len == 0 {
return Ok(None);
}

let map = serde_json::from_str(&self.line)?;
self.line.clear();
return Ok(map);
}
}

pub async fn build_json_reader(input: Option<&str>) -> Result<Box<dyn JsonReader + Unpin>> {
Ok(match input {
Some(path) => Box::new(BufJsonReader::from_path(path).await?),
None => Box::new(BufJsonReader::from_stdin()),
})
}
3 changes: 2 additions & 1 deletion src/commands/mod.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
pub mod create;
pub mod drop;
pub mod field_parser;
mod field_parser;
pub mod index;
mod json_reader;
pub mod merge;
pub mod search;

Expand Down

0 comments on commit b882d42

Please sign in to comment.