Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion orsopy/fileio/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
from .base import (Column, ComplexValue, ErrorColumn, File, Header, Person, Value, ValueRange, ValueVector,
_read_header_data, _validate_header_data)
from .data_source import DataSource, Experiment, InstrumentSettings, Measurement, Polarization, Sample
from .orso import ORSO_VERSION, Orso, OrsoDataset, load_nexus, load_orso, save_nexus, save_orso
from .orso import ORSO_VERSION, Orso, OrsoDataset, OrsoSpacer, load_nexus, load_orso, save_nexus, save_orso
from .reduction import Reduction, Software

__all__ = [s for s in dir() if not s.startswith("_")]
110 changes: 103 additions & 7 deletions orsopy/fileio/orso.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
"""

from dataclasses import dataclass
from typing import BinaryIO, List, Optional, Sequence, TextIO, Union
from typing import BinaryIO, List, Optional, Sequence, TextIO, Tuple, Union

import numpy as np
import yaml
Expand Down Expand Up @@ -39,6 +39,7 @@ class Orso(Header):
reduction: Reduction
columns: List[Union[Column, ErrorColumn]]
data_set: Optional[Union[int, str]] = None
data_shape: Optional[Tuple[int, ...]] = None

__repr__ = Header._staggered_repr

Expand Down Expand Up @@ -154,6 +155,13 @@ class OrsoDataset:
data: Union[np.ndarray, Sequence[np.ndarray], Sequence[Sequence]]

def __post_init__(self):
if not type(self.data) is np.ndarray:
# convert other sequence data to array
self.data = np.array(self.data, dtype=float)
if len(self.data.shape) > 2:
# case with higher dimension dataset, flatten and store shape
self.info.data_shape = self.data.shape[:-1]
self.data = self.data.reshape(-1, self.data.shape[-1])
if self.data.shape[1] != len(self.info.columns):
raise ValueError("Data has to have the same number of columns as header")

Expand Down Expand Up @@ -202,9 +210,89 @@ def save(self, fname: Union[TextIO, str]):
def __eq__(self, other: "OrsoDataset"):
return self.info == other.info and (self.data == other.data).all()

@property
def columns(self):
return list(self.info.columns)

def items(self):
"""
Iterator over columns and their data, returning column data
with correct shape, if that was specified in header.
"""
if self.info.data_shape is None:
shape = (self.data.shape[0],)
else:
shape = self.info.data_shape
for i, col in enumerate(self.info.columns):
yield col, self.data[:, i].reshape(*shape)

def __getitem__(self, item):
if self.info.data_shape is None:
shape = (self.data.shape[0],)
else:
shape = self.info.data_shape
if type(item) is int:
return self.data[:, item].reshape(*shape)
elif type(item) is str:
for i, col in enumerate(self.info.columns):
if col.name == item:
return self[i]
raise KeyError(
f"'{item}' is not a column in this dataset, possible columns are: "
f"{[ci.name for ci in self.info.columns]}"
)
elif type(item) is slice:
output = []
for i in range(*item.indices(len(self))):
output.append(self[i])
return output

def __len__(self):
return len(self.info.columns)


class OrsoSpacer:
"""
Class to indicate a spacer between ORSO datasets when writing text file.

This is just for text file spaces/comments and is ignored when loading the file.
"""

def __init__(self, spacer):
for li in spacer.splitlines():
if not (li.isspace() or li.startswith("# #")):
raise ValueError("spacer can only contain new lines and spaces or use '# #' comments")
self.spacer = spacer

def text(self):
return self.spacer


def write_text_data(f, dsi: OrsoDataset, data_separator: str):
"""
Writes data to file and adds separator for multi-dimensional data.
"""
if dsi.info.data_shape and len(dsi.info.data_shape) > 1:
# write a data_separate after each line is finished
rows = dsi.info.data_shape[0]
row_length = dsi.data.shape[0] // rows
for row in range(rows):
sidx = row * row_length
eidx = sidx + row_length
usidx = tuple(int(i) for i in np.unravel_index(sidx, dsi.info.data_shape))
ueidx = tuple(int(i) for i in np.unravel_index(eidx - 1, dsi.info.data_shape))
f.write(f"# # stride: {usidx} - {ueidx}\n")
np.savetxt(f, dsi.data[sidx:eidx, :], fmt="%-22.16e")
f.write(data_separator)
else:
np.savetxt(f, dsi.data, fmt="%-22.16e")


def save_orso(
datasets: List[OrsoDataset], fname: Union[TextIO, str], comment: Optional[str] = None, data_separator: str = ""
datasets: List[Union[OrsoDataset, OrsoSpacer]],
fname: Union[TextIO, str],
comment: Optional[str] = None,
data_separator: str = "",
) -> None:
"""
Saves an ORSO file. Each of the datasets must have a unique
Expand All @@ -224,14 +312,17 @@ def save_orso(
if data_separator != "" and not data_separator.isspace():
raise ValueError("data_separator can only contain new lines and spaces")

dsets = []
for idx, dataset in enumerate(datasets):
if isinstance(dataset, OrsoSpacer):
continue
info = dataset.info
data_set = info.data_set
if data_set is None or (isinstance(data_set, str) and len(data_set) == 0):
# it's not set, or is zero length string
info.data_set = idx
dsets.append(info.data_set)

dsets = [dataset.info.data_set for dataset in datasets]
if len(set(dsets)) != len(dsets):
raise ValueError("All `OrsoDataset.info.data_set` values must be unique")

Expand All @@ -242,13 +333,18 @@ def save_orso(

ds1 = datasets[0]
header += ds1.header()
np.savetxt(f, ds1.data, header=header, fmt="%-22.16e")
f.write("# " + header.strip().replace("\n", "\n# ") + "\n")
write_text_data(f, ds1, data_separator)

for dsi in datasets[1:]:
# write an optional spacer string between dataset e.g. \n
f.write(data_separator)
hi = ds1.diff_header(dsi)
np.savetxt(f, dsi.data, header=hi, fmt="%-22.16e")
if isinstance(dsi, OrsoDataset):
# write an optional spacer string between dataset e.g. \n
hi = ds1.diff_header(dsi)
f.write("# " + hi.strip().replace("\n", "\n# ") + "\n")
write_text_data(f, dsi, data_separator)
else:
f.write(dsi.text())


def load_orso(fname: Union[TextIO, str]) -> List[OrsoDataset]:
Expand Down
Loading