Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 5 additions & 9 deletions crates/revrt/src/cost.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,9 @@

use derive_builder::Builder;
use ndarray::{Axis, stack};
use tracing::{info, trace};
use tracing::{debug, trace};

use crate::dataset::LazyChunk;
use crate::dataset::LazySubset;
use crate::error::Result;

#[derive(Debug, serde::Deserialize)]
Expand Down Expand Up @@ -70,15 +70,11 @@ impl CostFunction {
/// # Returns
/// A 2D array containing the cost for the subset covered by the input
/// features.
pub(crate) fn calculate(
pub(crate) fn compute(
&self,
mut features: LazyChunk,
mut features: LazySubset<f32>,
) -> ndarray::ArrayBase<ndarray::OwnedRepr<f32>, ndarray::Dim<ndarray::IxDynImpl>> {
info!(
"Calculating cost for chunk ({}, {})",
features.ci(),
features.cj()
);
debug!("Calculating cost for ({})", features.subset());

let cost = self
.cost_layers
Expand Down
42 changes: 23 additions & 19 deletions crates/revrt/src/dataset.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
mod lazy_subset;
#[cfg(test)]
pub(crate) mod samples;

Expand All @@ -13,6 +14,7 @@ use zarrs::storage::{
use crate::ArrayIndex;
use crate::cost::CostFunction;
use crate::error::Result;
pub(crate) use lazy_subset::LazySubset;

/// Manages the features datasets and calculated total cost
pub(super) struct Dataset {
Expand All @@ -29,7 +31,7 @@ pub(super) struct Dataset {
/// cost is calculated from multiple variables.
// cost_variables: Vec<String>,
/// Storage for the calculated cost
cost: ReadableWritableListableStorage,
swap: ReadableWritableListableStorage,
/// Index of cost chunks already calculated
cost_chunk_idx: RwLock<ndarray::Array2<bool>>,
/// Custom cost function definition
Expand All @@ -49,20 +51,20 @@ impl Dataset {
zarrs::filesystem::FilesystemStore::new(path).expect("could not open filesystem store");
let source = std::sync::Arc::new(filesystem);

// ==== Create the cost dataset ====
// ==== Create the swap dataset ====
let tmp_path = tempfile::TempDir::new().unwrap();
debug!(
"Initializing a temporary cost dataset at {:?}",
"Initializing a temporary swap dataset at {:?}",
tmp_path.path()
);
let cost: ReadableWritableListableStorage = std::sync::Arc::new(
let swap: ReadableWritableListableStorage = std::sync::Arc::new(
zarrs::filesystem::FilesystemStore::new(tmp_path.path())
.expect("could not open filesystem store"),
);

trace!("Creating a new group for the cost dataset");
zarrs::group::GroupBuilder::new()
.build(cost.clone(), "/")
.build(swap.clone(), "/")
.unwrap()
.store_metadata()
.unwrap();
Expand All @@ -84,13 +86,13 @@ impl Dataset {
chunk_shape,
zarrs::array::FillValue::from(zarrs::array::ZARR_NAN_F32),
)
.build(cost.clone(), "/cost")
.build(swap.clone(), "/cost")
.unwrap();
trace!("Cost shape: {:?}", array.shape().to_vec());
trace!("Cost chunk shape: {:?}", array.chunk_grid());
array.store_metadata().unwrap();

trace!("Cost dataset contents: {:?}", cost.list().unwrap());
trace!("Cost dataset contents: {:?}", swap.list().unwrap());

let cost_chunk_idx = ndarray::Array2::from_elem(
(
Expand All @@ -111,7 +113,7 @@ impl Dataset {
Ok(Self {
source,
cost_path: tmp_path,
cost,
swap,
cost_chunk_idx,
cost_function,
cache,
Expand All @@ -120,13 +122,14 @@ impl Dataset {

fn calculate_chunk_cost(&self, ci: u64, cj: u64) {
trace!("Creating a LazyChunk for ({}, {})", ci, cj);
let chunk = LazyChunk {
source: self.source.clone(),
ci,
cj,
data: std::collections::HashMap::new(),
};
let output = self.cost_function.calculate(chunk);

// cost variable is stored in the swap dataset
let variable = zarrs::array::Array::open(self.swap.clone(), "/cost").unwrap();
// Get the subset according to cost's chunk
let subset = variable.chunk_subset(&[ci, cj]).unwrap();
let data = LazySubset::<f32>::new(self.source.clone(), subset);
let output = self.cost_function.compute(data);

trace!("Cost function: {:?}", self.cost_function);

/*
Expand All @@ -138,7 +141,7 @@ impl Dataset {
let output = value * 10.0;
*/

let cost = zarrs::array::Array::open(self.cost.clone(), "/cost").unwrap();
let cost = zarrs::array::Array::open(self.swap.clone(), "/cost").unwrap();
cost.store_metadata().unwrap();
let chunk_indices: Vec<u64> = vec![ci, cj];
trace!("Storing chunk at {:?}", chunk_indices);
Expand All @@ -153,11 +156,11 @@ impl Dataset {

trace!("Getting 3x3 neighborhood for (i={}, j={})", i, j);

trace!("Cost dataset contents: {:?}", self.cost.list().unwrap());
trace!("Cost dataset size: {:?}", self.cost.size().unwrap());
trace!("Cost dataset contents: {:?}", self.swap.list().unwrap());
trace!("Cost dataset size: {:?}", self.swap.size().unwrap());

trace!("Opening cost dataset");
let cost = zarrs::array::Array::open(self.cost.clone(), "/cost").unwrap();
let cost = zarrs::array::Array::open(self.swap.clone(), "/cost").unwrap();
trace!("Cost dataset with shape: {:?}", cost.shape());

// Cutting off the edges for now.
Expand Down Expand Up @@ -485,6 +488,7 @@ pub(crate) struct LazyChunk {
>,
}

#[allow(dead_code)]
impl LazyChunk {
pub(super) fn ci(&self) -> u64 {
self.ci
Expand Down
144 changes: 144 additions & 0 deletions crates/revrt/src/dataset/lazy_subset.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,144 @@
//! Lazy load a subset of the source Dataset
//!
//! This was originally developed to support the cost calculation, where the
//! variables that will be used are not known until the cost is actually
//! computed, and the same variable may be used multiple times. Thus the goal
//! is to load each variable only once, don't load unecessary variables.
//!
//! The subset is fixed at the time of creation, so all variables are
//! consistent for the same domain.
//!
//! Before we used the LazyChunk, which assumed that the intended outcome
//! would match the source chunks. Here we replace that concept by a
//! `LazySubset`, which is tied to an `ArraySubset`, thus it has no assumtions
//! on the source's chunk. Therefore, the source can have variable chunk shapes,
//! one for each variable, and don't need to match the desired cost chunk shape.
//!
//! Note that we could have used Zarrs' intrinsic cache here, but a common
//! use for LazySubset is to load the features to compute cost for a chunk.
//! Therefore, those chunks of features are loaded only once and we don't
//! expect to use that anymore since we save the resulted cost. Using Zarrs'
//! cache would lead to unnecessary memory usage. Another problem is how
//! large should be that cache? It gets more difficult to estimate once we
//! consider the possibility of multiple threads working on different chunks.

use std::collections::HashMap;
use std::fmt;

use tracing::trace;
use zarrs::array::{Array, ElementOwned};
use zarrs::array_subset::ArraySubset;
use zarrs::storage::ReadableListableStorage;

use crate::error::Result;

/// Lazy loaded subset of a Zarr Dataset.
///
/// This struct is intended to work as a cache for a subset of a Zarr
/// Dataset.
pub(crate) struct LazySubset<T> {
/// Source Zarr storage
source: ReadableListableStorage,
/// Subset of the source to be lazily loaded
subset: ArraySubset,
/// Data
data: HashMap<
String,
ndarray::ArrayBase<ndarray::OwnedRepr<T>, ndarray::Dim<ndarray::IxDynImpl>>,
>,
}

impl<T> fmt::Display for LazySubset<T> {
/// Display a LazySubset.
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
// Add information on the source and the data HashMap.
write!(f, "LazySubset {{ subset: {:?}, ... }}", self.subset,)
}
}
impl<T: ElementOwned> LazySubset<T> {
/// Create a new LazySubset for a given source and subset.
pub(super) fn new(source: ReadableListableStorage, subset: ArraySubset) -> Self {
trace!("Creating LazySubset for subset: {:?}", subset);

LazySubset {
source,
subset,
data: HashMap::new(),
}
}

/// Show the subset used by this LazySubset.
pub(crate) fn subset(&self) -> &ArraySubset {
&self.subset
}

/// Get a data for a specific variable.
pub(crate) fn get(
&mut self,
varname: &str,
) -> Result<ndarray::ArrayBase<ndarray::OwnedRepr<T>, ndarray::Dim<ndarray::IxDynImpl>>> {
trace!("Getting data subset for variable: {}", varname);

let data = match self.data.get(varname) {
Some(v) => {
trace!("Data for variable {} already loaded", varname);
v.clone()
}
None => {
trace!(
"Loading data subset ({:?}) for variable: {}",
self.subset, varname
);

let variable = Array::open(self.source.clone(), &format!("/{varname}"))
.expect("Failed to open variable");

let values = variable
.retrieve_array_subset_ndarray(&self.subset)
.expect("Failed to retrieve array subset");

self.data.insert(varname.to_string(), values.clone());

values
}
};

Ok(data)
}
}

#[cfg(test)]
mod tests {
use super::*;
use crate::dataset::samples;
use std::sync::Arc;
// use zarrs::storage::store::MemoryStore;
use zarrs::storage::ReadableListableStorage;

#[test]
fn sample() {
let path = samples::multi_variable_zarr();
let store: ReadableListableStorage =
Arc::new(zarrs::filesystem::FilesystemStore::new(&path).unwrap());

let subset = ArraySubset::new_with_start_shape(vec![0, 0], vec![2, 2]).unwrap();
let mut dataset = LazySubset::<f32>::new(store, subset);
let tmp = dataset.get("A").unwrap();
assert_eq!(tmp.shape(), &[2, 2]);
}

/*
#[test]
fn test_lazy_dataset() {
let storage = MemoryStore::new();
let subset = ArraySubset::default();
let mut lazy_dataset = LazySubset::<f32>::new(Arc::new(storage), subset);

if let Some(data) = lazy_dataset.get("test_var") {
assert!(!data.is_empty());
} else {
panic!("Failed to retrieve data for 'test_var'");
}
}
*/
}
Loading