Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
25 commits
Select commit Hold shift + click to select a range
87f654d
log: Won't be only temporary swap
castelao Aug 15, 2025
6e46ff7
refact: Renaming calculate to compute()
castelao Aug 18, 2025
2f86a50
refact: Renaming Dataset::open() to new()
castelao Aug 18, 2025
549a554
refact: Moving LazyChunk to its own sub-module
castelao Aug 18, 2025
ab550f0
refact: Renaming array to cost to avoid confusion
castelao Aug 18, 2025
956b3bd
Use expect instead of unwrap
castelao Aug 18, 2025
6cc883d
feat: Creating empty delta
castelao Aug 18, 2025
74a14c3
feat: Adding dims in the Dataset struct
castelao Aug 18, 2025
1c7292a
test: Check during test that input isn't zero dimension
castelao Aug 18, 2025
39af81c
Loading ArraySubset
castelao Aug 18, 2025
1289b6b
fix: Missing CHUNK_SHAPE
castelao Aug 18, 2025
61481e9
feat: LazyArray
castelao Aug 20, 2025
c27aea9
refact: renaming
castelao Aug 20, 2025
e03c7c5
feat: Minimalist Display for LazyDataset
castelao Aug 20, 2025
e721507
refact: cost.compute now uses LazyDataset
castelao Aug 20, 2025
45817ea
log: Updating, LazyDataset shows subset instead of chunk
castelao Aug 20, 2025
7b2e0ba
log: LazyDataset loading new variable
castelao Aug 20, 2025
b28f136
clean:
castelao Aug 20, 2025
1fc97f5
clean:
castelao Aug 20, 2025
a06f1bb
test, fix: It was failing with an empty subset
castelao Aug 20, 2025
08261aa
clean, log: Redundant log
castelao Aug 20, 2025
8eac026
fix: LazyDataset pub for crate
castelao Aug 20, 2025
f0b6805
style:
castelao Sep 6, 2025
c5042aa
lazy_chunk won't be used anymore in favor to LazySubset
castelao Sep 6, 2025
46d1d5b
Removing deprecated lazy_dataset, in favor to LazySubset
castelao Sep 6, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion crates/revrt/src/cost.rs
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,7 @@ impl CostFunction {
/// features.
pub(crate) fn compute(
&self,
// For now let's restrict to f32, but we are ready to move to generic types.
mut features: LazySubset<f32>,
) -> ndarray::ArrayBase<ndarray::OwnedRepr<f32>, ndarray::Dim<ndarray::IxDynImpl>> {
debug!("Calculating cost for ({})", features.subset());
Expand Down Expand Up @@ -117,7 +118,6 @@ impl CostFunction {
let views: Vec<_> = cost.iter().map(|a| a.view()).collect();
let stack = stack(Axis(0), &views).unwrap();
//let cost = stack![Axis(3), &cost];
trace!("Stack shape: {:?}", stack.shape());
let cost = stack.sum_axis(Axis(0));
trace!("Stack shape: {:?}", stack.shape());

Expand Down
184 changes: 67 additions & 117 deletions crates/revrt/src/dataset.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,19 +7,24 @@ use std::sync::RwLock;

use tracing::{debug, trace, warn};
use zarrs::array::ArrayChunkCacheExt;
use zarrs::array_subset::ArraySubset;
use zarrs::storage::{
ListableStorageTraits, ReadableListableStorage, ReadableWritableListableStorage,
};

use crate::ArrayIndex;
use crate::cost::CostFunction;
use crate::error::Result;
use crate::ArrayIndex;
pub(crate) use lazy_chunk::LazyChunk;
pub(crate) use lazy_subset::LazySubset;

const CHUNK_SHAPE: [u64; 2] = [1_000, 1_000];

/// Manages the features datasets and calculated total cost
pub(super) struct Dataset {
/// A Zarr storages with the features
source: ReadableListableStorage,
dims: Vec<u64>,
// Silly way to keep the tmp path alive
#[allow(dead_code)]
cost_path: tempfile::TempDir,
Expand All @@ -45,18 +50,33 @@ impl Dataset {
path: P,
cost_function: CostFunction,
cache_size: u64,
) -> Result<Self> {
tracing::warn!("Deprecated: use `Dataset::new` instead");
Self::new(path, cost_function, cache_size)
}

pub(super) fn new<P: AsRef<std::path::Path>>(
path: P,
cost_function: CostFunction,
cache_size: u64,
) -> Result<Self> {
debug!("Opening dataset: {:?}", path.as_ref());
let filesystem =
zarrs::filesystem::FilesystemStore::new(path).expect("could not open filesystem store");
let source = std::sync::Arc::new(filesystem);

// ==== Temporary solution to specify dimensions ====
// Assume all variables have the same shape and chunk shape.
// Find the name of the first variable and use it.
let varname = source.list().unwrap()[0].to_string();
let varname = varname.split("/").collect::<Vec<_>>()[0];
let tmp = zarrs::array::Array::open(source.clone(), &format!("/{varname}")).unwrap();
let dims = tmp.shape().to_vec();
debug_assert!(!dims.contains(&0));

// ==== Create the swap dataset ====
let tmp_path = tempfile::TempDir::new().unwrap();
debug!(
"Initializing a temporary swap dataset at {:?}",
tmp_path.path()
);
debug!("Initializing a swap dataset at {:?}", tmp_path.path());
let swap: ReadableWritableListableStorage = std::sync::Arc::new(
zarrs::filesystem::FilesystemStore::new(tmp_path.path())
.expect("could not open filesystem store"),
Expand All @@ -80,24 +100,48 @@ impl Dataset {
// ----

trace!("Creating an empty cost array");
let array = zarrs::array::ArrayBuilder::new(
let cost = zarrs::array::ArrayBuilder::new(
cost_shape.into(),
zarrs::array::DataType::Float32,
chunk_shape,
zarrs::array::FillValue::from(zarrs::array::ZARR_NAN_F32),
)
.build(swap.clone(), "/cost")
.unwrap();
trace!("Cost shape: {:?}", array.shape().to_vec());
trace!("Cost chunk shape: {:?}", array.chunk_grid());
array.store_metadata().unwrap();
.expect("Failed to create cost array");
trace!("Cost shape: {:?}", cost.shape().to_vec());
trace!("Cost chunk shape: {:?}", cost.chunk_grid());
cost.store_metadata().unwrap();

debug!(
"Cost variable created: {:?}, shape: {:?} [{:?}]",
cost.path(),
cost.shape(),
cost.chunk_grid()
);

// ==== Create the delta cost array ====
trace!("Creating an empty delta cost array");

let delta = zarrs::array::ArrayBuilder::new(
[cost_shape, &[8]].concat(),
zarrs::array::DataType::Float32,
vec![CHUNK_SHAPE[0], CHUNK_SHAPE[1], 8].try_into().unwrap(),
// CHUNK_SHAPE .iter() .chain(&[8]) .cloned() .collect::<Vec<_>>() .try_into() .unwrap(),
zarrs::array::FillValue::from(zarrs::array::ZARR_NAN_F32),
)
.build(swap.clone(), "/delta")
.expect("Failed to create delta array");
trace!("Delta shape: {:?}", delta.shape().to_vec());
trace!("Delta chunk shape: {:?}", delta.chunk_grid());
delta.store_metadata().unwrap();

// ====
trace!("Cost dataset contents: {:?}", swap.list().unwrap());

let cost_chunk_idx = ndarray::Array2::from_elem(
(
array.chunk_grid_shape().unwrap()[0] as usize,
array.chunk_grid_shape().unwrap()[1] as usize,
cost.chunk_grid_shape().unwrap()[0] as usize,
cost.chunk_grid_shape().unwrap()[1] as usize,
),
false,
)
Expand All @@ -112,6 +156,7 @@ impl Dataset {
trace!("Dataset opened successfully");
Ok(Self {
source,
dims,
cost_path: tmp_path,
swap,
cost_chunk_idx,
Expand All @@ -128,25 +173,14 @@ impl Dataset {
// Get the subset according to cost's chunk
let subset = variable.chunk_subset(&[ci, cj]).unwrap();
let data = LazySubset::<f32>::new(self.source.clone(), subset);
let output = self.cost_function.compute(data);

trace!("Cost function: {:?}", self.cost_function);
let output = self.cost_function.compute(data);

/*
trace!("Getting '/A' variable");
let array = zarrs::array::Array::open(self.source.clone(), "/A").unwrap();
let value = array.retrieve_chunk_ndarray::<f32>(&[i, j]).unwrap();
trace!("Value: {:?}", value);
trace!("Calculating cost for chunk ({}, {})", i, j);
let output = value * 10.0;
*/

let cost = zarrs::array::Array::open(self.swap.clone(), "/cost").unwrap();
cost.store_metadata().unwrap();
let chunk_indices: Vec<u64> = vec![ci, cj];
trace!("Storing chunk at {:?}", chunk_indices);
let chunk_subset =
&zarrs::array_subset::ArraySubset::new_with_ranges(&[ci..(ci + 1), cj..(cj + 1)]);
let chunk_subset = &ArraySubset::new_with_ranges(&[ci..(ci + 1), cj..(cj + 1)]);
trace!("Target chunk subset: {:?}", chunk_subset);
cost.store_chunks_ndarray(chunk_subset, output).unwrap();
}
Expand Down Expand Up @@ -310,16 +344,15 @@ mod tests {
let cost_function =
CostFunction::from_json(r#"{"cost_layers": [{"layer_name": "A"}]}"#).unwrap();
let dataset =
Dataset::open(path, cost_function, 250_000_000).expect("Error opening dataset");
Dataset::new(path, cost_function, 250_000_000).expect("Error opening dataset");

let test_points = [ArrayIndex { i: 3, j: 1 }, ArrayIndex { i: 2, j: 2 }];
let array = zarrs::array::Array::open(dataset.source.clone(), "/A").unwrap();
for point in test_points {
let results = dataset.get_3x3(&point);

for (ArrayIndex { i, j }, val) in results {
let subset =
zarrs::array_subset::ArraySubset::new_with_ranges(&[i..(i + 1), j..(j + 1)]);
let subset = ArraySubset::new_with_ranges(&[i..(i + 1), j..(j + 1)]);
let subset_elements: Vec<f32> = array
.retrieve_array_subset_elements(&subset)
.expect("Error reading zarr data");
Expand All @@ -334,7 +367,7 @@ mod tests {
let path = samples::multi_variable_zarr();
let cost_function = crate::cost::sample::cost_function();
let dataset =
Dataset::open(path, cost_function, 250_000_000).expect("Error opening dataset");
Dataset::new(path, cost_function, 250_000_000).expect("Error opening dataset");

let test_points = [ArrayIndex { i: 3, j: 1 }, ArrayIndex { i: 2, j: 2 }];
let array_a = zarrs::array::Array::open(dataset.source.clone(), "/A").unwrap();
Expand All @@ -344,8 +377,7 @@ mod tests {
let results = dataset.get_3x3(&point);

for (ArrayIndex { i, j }, val) in results {
let subset =
zarrs::array_subset::ArraySubset::new_with_ranges(&[i..(i + 1), j..(j + 1)]);
let subset = ArraySubset::new_with_ranges(&[i..(i + 1), j..(j + 1)]);
let subset_elements_a: Vec<f32> = array_a
.retrieve_array_subset_elements(&subset)
.expect("Error reading zarr data");
Expand Down Expand Up @@ -378,7 +410,7 @@ mod tests {
let cost_function =
CostFunction::from_json(r#"{"cost_layers": [{"layer_name": "cost"}]}"#).unwrap();
let dataset =
Dataset::open(path, cost_function, 250_000_000).expect("Error opening dataset");
Dataset::new(path, cost_function, 250_000_000).expect("Error opening dataset");

let results = dataset.get_3x3(&ArrayIndex { i: 0, j: 0 });

Expand All @@ -394,7 +426,7 @@ mod tests {
let cost_function =
CostFunction::from_json(r#"{"cost_layers": [{"layer_name": "cost"}]}"#).unwrap();
let dataset =
Dataset::open(path, cost_function, 250_000_000).expect("Error opening dataset");
Dataset::new(path, cost_function, 250_000_000).expect("Error opening dataset");

let results = dataset.get_3x3(&ArrayIndex { i: si, j: sj });

Expand Down Expand Up @@ -424,7 +456,7 @@ mod tests {
let cost_function =
CostFunction::from_json(r#"{"cost_layers": [{"layer_name": "cost"}]}"#).unwrap();
let dataset =
Dataset::open(path, cost_function, 250_000_000).expect("Error opening dataset");
Dataset::new(path, cost_function, 250_000_000).expect("Error opening dataset");

let results = dataset.get_3x3(&ArrayIndex { i: si, j: sj });

Expand Down Expand Up @@ -457,7 +489,7 @@ mod tests {
let cost_function =
CostFunction::from_json(r#"{"cost_layers": [{"layer_name": "cost"}]}"#).unwrap();
let dataset =
Dataset::open(path, cost_function, 250_000_000).expect("Error opening dataset");
Dataset::new(path, cost_function, 250_000_000).expect("Error opening dataset");

let results = dataset.get_3x3(&ArrayIndex { i: si, j: sj });

Expand All @@ -470,85 +502,3 @@ mod tests {
);
}
}

/// Lazy chunk of a Zarr dataset
pub(crate) struct LazyChunk {
/// Source Zarr storage
source: ReadableListableStorage,
/// Chunk index 1st dimension
ci: u64,
/// Chunk index 2nd dimension
cj: u64,
/// Data
// We know it is a 2D array of f32. We might want to simplify and strict this definition.
// data: std::collections::HashMap<String, ndarray::Array2<f32>>,
data: std::collections::HashMap<
String,
ndarray::ArrayBase<ndarray::OwnedRepr<f32>, ndarray::Dim<ndarray::IxDynImpl>>,
>,
}

#[allow(dead_code)]
impl LazyChunk {
pub(super) fn ci(&self) -> u64 {
self.ci
}

pub(super) fn cj(&self) -> u64 {
self.cj
}

//fn get(&self, variable: &str) -> Result<&ndarray::Array2<f32>> {
pub(crate) fn get(
&mut self,
variable: &str,
) -> Result<ndarray::ArrayBase<ndarray::OwnedRepr<f32>, ndarray::Dim<ndarray::IxDynImpl>>> {
trace!("Getting chunk data for variable: {}", variable);

Ok(match self.data.get(variable) {
Some(v) => {
trace!("Chunk data for variable {} already loaded", variable);
v.clone()
}
None => {
trace!("Loading chunk data for variable: {}", variable);
let array = zarrs::array::Array::open(self.source.clone(), &format!("/{variable}"))
.unwrap();
let chunk_indices = &[self.ci, self.cj];
let chunk_subset = zarrs::array_subset::ArraySubset::new_with_ranges(&[
chunk_indices[0]..(chunk_indices[0] + 1),
chunk_indices[1]..(chunk_indices[1] + 1),
]);
trace!("Storing chunk data for variable: {}", variable);
let values = array.retrieve_chunks_ndarray::<f32>(&chunk_subset).unwrap();
// array.retrieve_chunk_ndarray::<f32>(&[ci, cj]).unwrap();
self.data.insert(variable.to_string(), values.clone());
values
}
})
}
}

#[cfg(test)]
mod chunk_tests {
use super::*;

#[test]
fn dev() {
let path = samples::multi_variable_zarr();
let store: zarrs::storage::ReadableListableStorage =
std::sync::Arc::new(zarrs::filesystem::FilesystemStore::new(&path).unwrap());

let mut chunk = LazyChunk {
source: store,
ci: 0,
cj: 0,
data: std::collections::HashMap::new(),
};

assert_eq!(chunk.ci, 0);
assert_eq!(chunk.cj, 0);

let _tmp = chunk.get("A").unwrap();
}
}
Loading