diff --git a/crates/revrt/src/cost.rs b/crates/revrt/src/cost.rs index 650d7a9c..69770bbf 100644 --- a/crates/revrt/src/cost.rs +++ b/crates/revrt/src/cost.rs @@ -2,9 +2,9 @@ use derive_builder::Builder; use ndarray::{Axis, stack}; -use tracing::{info, trace}; +use tracing::{debug, trace}; -use crate::dataset::LazyChunk; +use crate::dataset::LazySubset; use crate::error::Result; #[derive(Debug, serde::Deserialize)] @@ -70,15 +70,11 @@ impl CostFunction { /// # Returns /// A 2D array containing the cost for the subset covered by the input /// features. - pub(crate) fn calculate( + pub(crate) fn compute( &self, - mut features: LazyChunk, + mut features: LazySubset, ) -> ndarray::ArrayBase, ndarray::Dim> { - info!( - "Calculating cost for chunk ({}, {})", - features.ci(), - features.cj() - ); + debug!("Calculating cost for ({})", features.subset()); let cost = self .cost_layers diff --git a/crates/revrt/src/dataset.rs b/crates/revrt/src/dataset.rs index 936be767..114f078d 100644 --- a/crates/revrt/src/dataset.rs +++ b/crates/revrt/src/dataset.rs @@ -1,3 +1,4 @@ +mod lazy_subset; #[cfg(test)] pub(crate) mod samples; @@ -13,6 +14,7 @@ use zarrs::storage::{ use crate::ArrayIndex; use crate::cost::CostFunction; use crate::error::Result; +pub(crate) use lazy_subset::LazySubset; /// Manages the features datasets and calculated total cost pub(super) struct Dataset { @@ -29,7 +31,7 @@ pub(super) struct Dataset { /// cost is calculated from multiple variables. // cost_variables: Vec, /// Storage for the calculated cost - cost: ReadableWritableListableStorage, + swap: ReadableWritableListableStorage, /// Index of cost chunks already calculated cost_chunk_idx: RwLock>, /// Custom cost function definition @@ -49,20 +51,20 @@ impl Dataset { zarrs::filesystem::FilesystemStore::new(path).expect("could not open filesystem store"); let source = std::sync::Arc::new(filesystem); - // ==== Create the cost dataset ==== + // ==== Create the swap dataset ==== let tmp_path = tempfile::TempDir::new().unwrap(); debug!( - "Initializing a temporary cost dataset at {:?}", + "Initializing a temporary swap dataset at {:?}", tmp_path.path() ); - let cost: ReadableWritableListableStorage = std::sync::Arc::new( + let swap: ReadableWritableListableStorage = std::sync::Arc::new( zarrs::filesystem::FilesystemStore::new(tmp_path.path()) .expect("could not open filesystem store"), ); trace!("Creating a new group for the cost dataset"); zarrs::group::GroupBuilder::new() - .build(cost.clone(), "/") + .build(swap.clone(), "/") .unwrap() .store_metadata() .unwrap(); @@ -84,13 +86,13 @@ impl Dataset { chunk_shape, zarrs::array::FillValue::from(zarrs::array::ZARR_NAN_F32), ) - .build(cost.clone(), "/cost") + .build(swap.clone(), "/cost") .unwrap(); trace!("Cost shape: {:?}", array.shape().to_vec()); trace!("Cost chunk shape: {:?}", array.chunk_grid()); array.store_metadata().unwrap(); - trace!("Cost dataset contents: {:?}", cost.list().unwrap()); + trace!("Cost dataset contents: {:?}", swap.list().unwrap()); let cost_chunk_idx = ndarray::Array2::from_elem( ( @@ -111,7 +113,7 @@ impl Dataset { Ok(Self { source, cost_path: tmp_path, - cost, + swap, cost_chunk_idx, cost_function, cache, @@ -120,13 +122,14 @@ impl Dataset { fn calculate_chunk_cost(&self, ci: u64, cj: u64) { trace!("Creating a LazyChunk for ({}, {})", ci, cj); - let chunk = LazyChunk { - source: self.source.clone(), - ci, - cj, - data: std::collections::HashMap::new(), - }; - let output = self.cost_function.calculate(chunk); + + // cost variable is stored in the swap dataset + let variable = zarrs::array::Array::open(self.swap.clone(), "/cost").unwrap(); + // Get the subset according to cost's chunk + let subset = variable.chunk_subset(&[ci, cj]).unwrap(); + let data = LazySubset::::new(self.source.clone(), subset); + let output = self.cost_function.compute(data); + trace!("Cost function: {:?}", self.cost_function); /* @@ -138,7 +141,7 @@ impl Dataset { let output = value * 10.0; */ - let cost = zarrs::array::Array::open(self.cost.clone(), "/cost").unwrap(); + let cost = zarrs::array::Array::open(self.swap.clone(), "/cost").unwrap(); cost.store_metadata().unwrap(); let chunk_indices: Vec = vec![ci, cj]; trace!("Storing chunk at {:?}", chunk_indices); @@ -153,11 +156,11 @@ impl Dataset { trace!("Getting 3x3 neighborhood for (i={}, j={})", i, j); - trace!("Cost dataset contents: {:?}", self.cost.list().unwrap()); - trace!("Cost dataset size: {:?}", self.cost.size().unwrap()); + trace!("Cost dataset contents: {:?}", self.swap.list().unwrap()); + trace!("Cost dataset size: {:?}", self.swap.size().unwrap()); trace!("Opening cost dataset"); - let cost = zarrs::array::Array::open(self.cost.clone(), "/cost").unwrap(); + let cost = zarrs::array::Array::open(self.swap.clone(), "/cost").unwrap(); trace!("Cost dataset with shape: {:?}", cost.shape()); // Cutting off the edges for now. @@ -485,6 +488,7 @@ pub(crate) struct LazyChunk { >, } +#[allow(dead_code)] impl LazyChunk { pub(super) fn ci(&self) -> u64 { self.ci diff --git a/crates/revrt/src/dataset/lazy_subset.rs b/crates/revrt/src/dataset/lazy_subset.rs new file mode 100644 index 00000000..39eff4d2 --- /dev/null +++ b/crates/revrt/src/dataset/lazy_subset.rs @@ -0,0 +1,144 @@ +//! Lazy load a subset of the source Dataset +//! +//! This was originally developed to support the cost calculation, where the +//! variables that will be used are not known until the cost is actually +//! computed, and the same variable may be used multiple times. Thus the goal +//! is to load each variable only once, don't load unecessary variables. +//! +//! The subset is fixed at the time of creation, so all variables are +//! consistent for the same domain. +//! +//! Before we used the LazyChunk, which assumed that the intended outcome +//! would match the source chunks. Here we replace that concept by a +//! `LazySubset`, which is tied to an `ArraySubset`, thus it has no assumtions +//! on the source's chunk. Therefore, the source can have variable chunk shapes, +//! one for each variable, and don't need to match the desired cost chunk shape. +//! +//! Note that we could have used Zarrs' intrinsic cache here, but a common +//! use for LazySubset is to load the features to compute cost for a chunk. +//! Therefore, those chunks of features are loaded only once and we don't +//! expect to use that anymore since we save the resulted cost. Using Zarrs' +//! cache would lead to unnecessary memory usage. Another problem is how +//! large should be that cache? It gets more difficult to estimate once we +//! consider the possibility of multiple threads working on different chunks. + +use std::collections::HashMap; +use std::fmt; + +use tracing::trace; +use zarrs::array::{Array, ElementOwned}; +use zarrs::array_subset::ArraySubset; +use zarrs::storage::ReadableListableStorage; + +use crate::error::Result; + +/// Lazy loaded subset of a Zarr Dataset. +/// +/// This struct is intended to work as a cache for a subset of a Zarr +/// Dataset. +pub(crate) struct LazySubset { + /// Source Zarr storage + source: ReadableListableStorage, + /// Subset of the source to be lazily loaded + subset: ArraySubset, + /// Data + data: HashMap< + String, + ndarray::ArrayBase, ndarray::Dim>, + >, +} + +impl fmt::Display for LazySubset { + /// Display a LazySubset. + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + // Add information on the source and the data HashMap. + write!(f, "LazySubset {{ subset: {:?}, ... }}", self.subset,) + } +} +impl LazySubset { + /// Create a new LazySubset for a given source and subset. + pub(super) fn new(source: ReadableListableStorage, subset: ArraySubset) -> Self { + trace!("Creating LazySubset for subset: {:?}", subset); + + LazySubset { + source, + subset, + data: HashMap::new(), + } + } + + /// Show the subset used by this LazySubset. + pub(crate) fn subset(&self) -> &ArraySubset { + &self.subset + } + + /// Get a data for a specific variable. + pub(crate) fn get( + &mut self, + varname: &str, + ) -> Result, ndarray::Dim>> { + trace!("Getting data subset for variable: {}", varname); + + let data = match self.data.get(varname) { + Some(v) => { + trace!("Data for variable {} already loaded", varname); + v.clone() + } + None => { + trace!( + "Loading data subset ({:?}) for variable: {}", + self.subset, varname + ); + + let variable = Array::open(self.source.clone(), &format!("/{varname}")) + .expect("Failed to open variable"); + + let values = variable + .retrieve_array_subset_ndarray(&self.subset) + .expect("Failed to retrieve array subset"); + + self.data.insert(varname.to_string(), values.clone()); + + values + } + }; + + Ok(data) + } +} + +#[cfg(test)] +mod tests { + use super::*; + use crate::dataset::samples; + use std::sync::Arc; + // use zarrs::storage::store::MemoryStore; + use zarrs::storage::ReadableListableStorage; + + #[test] + fn sample() { + let path = samples::multi_variable_zarr(); + let store: ReadableListableStorage = + Arc::new(zarrs::filesystem::FilesystemStore::new(&path).unwrap()); + + let subset = ArraySubset::new_with_start_shape(vec![0, 0], vec![2, 2]).unwrap(); + let mut dataset = LazySubset::::new(store, subset); + let tmp = dataset.get("A").unwrap(); + assert_eq!(tmp.shape(), &[2, 2]); + } + + /* + #[test] + fn test_lazy_dataset() { + let storage = MemoryStore::new(); + let subset = ArraySubset::default(); + let mut lazy_dataset = LazySubset::::new(Arc::new(storage), subset); + + if let Some(data) = lazy_dataset.get("test_var") { + assert!(!data.is_empty()); + } else { + panic!("Failed to retrieve data for 'test_var'"); + } + } + */ +}