Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,9 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
- Added a new numerical integrator based on Picard-Chebyshev integration. This
integrator has only been added to the rust backend at this point, until more
testing can be done and it be made available on the frontend.
- Saving`SimultaneousStates` to parquet files can now optionally include a column
containing the TDB JD of when the state information was last updated. This allows
users to selectively update state vectors only when necessary.

### Changed

Expand All @@ -25,6 +28,9 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
### Fixed

- Fixed epoch times in PCK Type 2 frames that were not being converted correctly.
- Fixed bug in the non-gravitational model constructor for dust, where the default
value for converting from diameter to beta values was missing a 1e-3. This only
impacted when dust models were constructed using the diameter input.


## [v2.1.5]
Expand Down
49 changes: 45 additions & 4 deletions src/kete/rust/simult_states.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,13 @@ use kete_core::errors::Error;
use kete_core::io::FileIO;
use kete_core::simult_states::SimultaneousStates;
use kete_core::spice::LOADED_SPK;
use kete_core::time::TDB;
use kete_core::time::Time;
use pyo3::exceptions;
use pyo3::prelude::*;
use pyo3::{PyResult, pyclass, pymethods};

use crate::maybe_vec::MaybeVec;
use crate::time::PyTime;
use crate::vector::PyVector;
use crate::{fovs::AllowedFOV, state::PyState};
Expand Down Expand Up @@ -116,29 +119,67 @@ impl PySimultaneousStates {
}

/// Save states as a parquet file.
pub fn save_parquet(&self, filename: String) -> PyResult<()> {
///
/// Optionally save the times when the states were last updated.
/// If a single value is provided, then all states are assumed to have been updated
/// at the same time, otherwise the number of provided times must match the number
/// of states.
#[pyo3(signature = (filename, last_updated=None))]
pub fn save_parquet(
&self,
filename: String,
last_updated: Option<MaybeVec<PyTime>>,
) -> PyResult<()> {
if self.0.fov.is_some() {
Err(Error::IOError(
"Cannot save a SimultaneousStates object which has a FOV as parquet. \
Parquet can only support a basic table format and saving metadata such \
Parquet can only support a basic table format. Saving metadata such \
as a field of view is not feasible. Consider using the binary saving \
method `SimultaneousStates.save`."
.into(),
))?;
}
kete_core::io::parquet::write_states_parquet(&self.0.states, &filename)?;
let last_updated: Option<(Vec<_>, bool)> = last_updated.map(|v| v.into());

if let Some((update, was_vec)) = &last_updated
&& *was_vec
&& update.len() != self.0.states.len()
{
Err(Error::ValueError(
"The number of updated times provided does not match the number of \
states."
.into(),
))?;
};

let last_updated: Option<Vec<Time<TDB>>> = last_updated.map(|(v, was_vec)| {
if was_vec {
v.into_iter().map(|t| t.into()).collect()
} else {
vec![v.first().unwrap().0; self.0.states.len()]
}
});
kete_core::io::parquet::write_states_parquet(&self.0.states, &filename, last_updated)?;
Ok(())
}

/// Load states from a parquet file.
#[staticmethod]
pub fn load_parquet(filename: String) -> PyResult<Self> {
let states = kete_core::io::parquet::read_states_parquet(&filename)?;
let states = kete_core::io::parquet::read_states_parquet(&filename)?.0;

Ok(PySimultaneousStates(Box::new(
SimultaneousStates::new_exact(states, None)?,
)))
}

/// Load the last time the states were updated as saved in a parquet file.
#[staticmethod]
pub fn load_parquet_update_times(filename: String) -> PyResult<Vec<Option<PyTime>>> {
let update_times = kete_core::io::parquet::read_update_times_parquet(&filename)?;
Ok(update_times.into_iter().map(|x| x.map(PyTime)).collect())
}

/// Length of states
pub fn __len__(&self) -> usize {
self.0.states.len()
Expand Down
124 changes: 118 additions & 6 deletions src/kete_core/src/io/parquet.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,14 +36,31 @@ use std::fs::File;
use crate::errors::{Error, KeteResult};
use crate::frames::Equatorial;
use crate::state::State;
use crate::time::{TDB, Time};

use polars::prelude::*;

/// Write a collection of states to a parquet table.
///
/// If ``last_updated`` is not provided, the associated column will not be saved.
///
/// The parquet file will contain the following columns:
/// * desig - String designation
/// * center - Center ID
/// * jd - TDB Scaled JD
/// * x, y, z - Position in Equatorial frame in units of AU from the center ID
/// * vx, vy, vz - Velocity in Equatorial frame in units of AU/Day from the center ID
/// * updated - Optional TDB Scaled JD time when the state was last updated, this is
/// not required, but allows for record keeping of the last time that the file was
/// last updated.
///
/// # Errors
/// Saving is fallible due to filesystem calls.
pub fn write_states_parquet(states: &[State<Equatorial>], filename: &str) -> KeteResult<()> {
pub fn write_states_parquet(
states: &[State<Equatorial>],
filename: &str,
last_updated: Option<Vec<Time<TDB>>>,
) -> KeteResult<()> {
let desigs = Column::new(
"desig".into(),
states
Expand Down Expand Up @@ -83,8 +100,34 @@ pub fn write_states_parquet(states: &[State<Equatorial>], filename: &str) -> Ket
"center".into(),
states.iter().map(|state| state.center_id).collect_vec(),
);
let mut df = DataFrame::new(vec![desigs, jd, x, y, z, vx, vy, vz, center])
.map_err(|_| Error::ValueError("Failed to construct dataframe".into()))?;

if let Some(updated) = &last_updated
&& updated.len() != states.len()
{
return Err(Error::ValueError(
"Length of provided last_updated values does not match the number of states".into(),
));
}

// The updated column is only saved in the file if it is provided.
let mut df = match last_updated {
None => DataFrame::new(vec![desigs, jd, x, y, z, vx, vy, vz, center]),
Some(updated) => {
// If a vec of updates is provided, we will save the updated column as well
if updated.len() != states.len() {
return Err(Error::ValueError(
"Length of provided last_updated values does not match the number of states"
.into(),
));
}
let updated: Vec<Option<f64>> = updated.into_iter().map(|t| Some(t.jd)).collect();
let updated = Column::new("updated".into(), updated);

DataFrame::new(vec![desigs, jd, x, y, z, vx, vy, vz, center, updated])
}
}
.map_err(|_| Error::ValueError("Failed to construct dataframe".into()))?;

let file = File::create(filename)?;
let _ = ParquetWriter::new(file)
.finish(&mut df)
Expand All @@ -94,14 +137,27 @@ pub fn write_states_parquet(states: &[State<Equatorial>], filename: &str) -> Ket

/// Read a collection of states from a parquet table.
///
/// The parquet table will contain the following columns:
/// * desig - String designation
/// * center - Center ID
/// * jd - TDB Scaled JD
/// * x, y, z - Position in Equatorial frame in units of AU from the center ID
/// * vx, vy, vz - Velocity in Equatorial frame in units of AU/Day from the center ID
/// * updated - Optional TDB Scaled JD time when the state was last updated, this is
/// not required, but allows for record keeping of the last time that the file was
/// last updated.
///
///
/// # Errors
/// Reading files can fail for numerous reasons, incorrectly formatted, inconsistent
/// contents, etc.
///
/// # Panics
/// There are a number of unwraps in this function, but it is structured such that they
/// should not be possible to reach.
pub fn read_states_parquet(filename: &str) -> KeteResult<Vec<State<Equatorial>>> {
pub fn read_states_parquet(
filename: &str,
) -> KeteResult<(Vec<State<Equatorial>>, Vec<Option<Time<TDB>>>)> {
// this reads the parquet table, then creates iterators over the contents, making
// states by going through the iterators one at a time.
let r = File::open(filename)?;
Expand Down Expand Up @@ -139,7 +195,23 @@ pub fn read_states_parquet(filename: &str) -> KeteResult<Vec<State<Equatorial>>>
})
.collect::<KeteResult<Vec<_>>>()?;

Ok((0..dataframe.height())
// Fail on incorrect format, but None for missing data.
let updated_times = match dataframe.column("updated") {
Ok(column) => {
// if data is found it must be None or a number, no exceptions
let col = column.f64().map_err(|_| {
Error::ValueError("update column information is not all floats.".into())
})?;

col.iter()
.map(|maybe_jd| maybe_jd.map(Time::<TDB>::new))
.collect()
}
// if column not found, then None
Err(_) => vec![None; dataframe.height()],
};

let states = (0..dataframe.height())
.map(|_| {
let desig = desig_iter
.next()
Expand Down Expand Up @@ -169,5 +241,45 @@ pub fn read_states_parquet(filename: &str) -> KeteResult<Vec<State<Equatorial>>>
center_id,
)
})
.collect())
.collect();

Ok((states, updated_times))
}

/// Read only the updated time column if present in the file.
///
/// # Errors
/// Reading files can fail for numerous reasons, incorrectly formatted, inconsistent
/// contents, etc.
///
pub fn read_update_times_parquet(filename: &str) -> KeteResult<Vec<Option<Time<TDB>>>> {
let r = File::open(filename)?;
let reader = ParquetReader::new(r);

let mut dataframe = reader
.with_columns(Some(vec!["updated".into()]))
.finish()
.map_err(|_| {
Error::IOError("Failed to read contents of file as a parquet table.".into())
})?;

let dataframe = dataframe.as_single_chunk_par();

// Fail on incorrect format, but None for missing data.
let updated_times = match dataframe.column("updated") {
Ok(column) => {
// if data is found it must be None or a number, no exceptions
let col = column.f64().map_err(|_| {
Error::ValueError("update column information is not all floats.".into())
})?;

col.iter()
.map(|maybe_jd| maybe_jd.map(Time::<TDB>::new))
.collect()
}
// if column not found, then None
Err(_) => vec![None; dataframe.height()],
};

Ok(updated_times)
}