Skip to content

Commit

Permalink
Merge pull request #129 from Oxid15/develop
Browse files Browse the repository at this point in the history
v0.8.0
  • Loading branch information
Oxid15 authored Nov 15, 2022
2 parents fb92b6f + ef065b3 commit 18f42d3
Show file tree
Hide file tree
Showing 41 changed files with 1,067 additions and 209 deletions.
2 changes: 1 addition & 1 deletion cascade/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
"""


__version__ = '0.7.3'
__version__ = '0.8.0'
__author__ = 'Ilia Moiseev'
__author_email__ = '[email protected]'

Expand Down
4 changes: 2 additions & 2 deletions cascade/base/meta_handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ def read(self, path: str) -> Union[Dict, List[Dict]]:
self._raise_io_error(path, e)
return meta

def write(self, path:str, obj: List[Dict], overwrite=True) -> None:
def write(self, path: str, obj: List[Dict], overwrite=True) -> None:
if not overwrite and os.path.exists(path):
return

Expand Down Expand Up @@ -181,7 +181,7 @@ def read(self, path: str) -> Union[Dict, List[Dict]]:
handler = self._get_handler(path)
return handler.read(path)

def write(self, path: str, obj, overwrite:bool = True) -> None:
def write(self, path: str, obj, overwrite: bool = True) -> None:
"""
Writes object to path.
Expand Down
4 changes: 2 additions & 2 deletions cascade/base/traceable.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,15 +7,15 @@ class Traceable:
Base class for everything that has metadata in cascade.
Handles the logic of getting and updating internal meta prefix.
"""
def __init__(self, *args, meta_prefix:Union[Dict, str] = None, **kwargs) -> None:
def __init__(self, *args, meta_prefix: Union[Dict, str] = None, **kwargs) -> None:
"""
Parameters
----------
meta_prefix: Union[Dict, str], optional
The dictionary that is used to update object's meta in `get_meta` call.
Due to the call of update can overwrite default values.
If str - prefix assumed to be path and loaded using MetaHandler.
See also
--------
cascade.base.MetaHandler
Expand Down
2 changes: 2 additions & 0 deletions cascade/data/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,13 @@
from .apply_modifier import ApplyModifier
from .bruteforce_cacher import BruteforceCacher
from .sequential_cacher import SequentialCacher
from .composer import Composer
from .concatenator import Concatenator
from .cyclic_sampler import CyclicSampler
from .random_sampler import RandomSampler
from .pickler import Pickler
from .folder_dataset import FolderDataset
from .range_sampler import RangeSampler
from .version_assigner import VersionAssigner

from .utils import split
4 changes: 0 additions & 4 deletions cascade/data/apply_modifier.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,3 @@ def __init__(self, dataset: Dataset, func: Callable, *args, **kwargs) -> None:
def __getitem__(self, index: int) -> T:
item = self._dataset[index]
return self._func(item)

def __repr__(self) -> str:
rp = super().__repr__()
return f'{rp}, {repr(self._func)}'
52 changes: 52 additions & 0 deletions cascade/data/composer.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
from typing import List, Iterable, Tuple, Dict
from . import T, Dataset


class Composer(Dataset):
"""
Unifies two or more datasets element-wise.
Example
-------
>>> from cascade import data as cdd
>>> items = cdd.Wrapper([0, 1, 2, 3, 4])
>>> labels = cdd.Wrapper([1, 0, 0, 1, 1])
>>> ds = cdd.Composer((items, labels))
>>> assert ds[0] == (0, 1)
"""
def __init__(self, datasets: Iterable[Dataset], *args, **kwargs) -> None:
"""
Parameters
----------
datasets: Iterable[Dataset]
Datasets of the same length to be unified
"""
super().__init__(*args, **kwargs)
self._validate_input(datasets)
self._datasets = datasets
# Since we checked the same length in all datasets, we can
# set the length of any dataset as the length of Composer
self._len = len(self._datasets[0])

def _validate_input(self, datasets):
lengths = [len(ds) for ds in datasets]
first = lengths[0]
if not all([ln == first for ln in lengths]):
raise ValueError(
f'The datasets passed should be of the same length\n'
f'Actual lengths: {lengths}'
)

def __getitem__(self, index: int) -> Tuple[T]:
return tuple(ds[index] for ds in self._datasets)

def __len__(self) -> int:
return self._len

def get_meta(self) -> List[Dict]:
"""
Composer calls `get_meta()` of all its datasets
"""
meta = super().get_meta()
meta[0]['data'] = [ds.get_meta() for ds in self._datasets]
return meta
2 changes: 1 addition & 1 deletion cascade/data/dataset.py
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ def __repr__(self) -> str:
"""
Returns
-------
repr: str
repr: str
Representation of a Dataset. This repr used as a name for get_meta() method
by default gives the name of class from basic repr
Expand Down
2 changes: 1 addition & 1 deletion cascade/data/folder_dataset.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ def __init__(self, root: str, *args, **kwargs) -> None:
Parameters
----------
root: str
A path to the folder of files
A path to the folder of files
"""
super().__init__(*args, **kwargs)
self._root = os.path.abspath(root)
Expand Down
6 changes: 6 additions & 0 deletions cascade/data/pickler.py
Original file line number Diff line number Diff line change
Expand Up @@ -59,3 +59,9 @@ def _dump(self) -> None:
def _load(self) -> None:
with open(self._path, 'rb') as f:
self._dataset = pickle.load(f)

def ds(self):
"""
Returns pickled dataset
"""
return self._dataset
13 changes: 7 additions & 6 deletions cascade/data/range_sampler.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,12 +42,13 @@ class RangeSampler(Sampler):
2
3
"""
def __init__(self,
dataset: Dataset,
start:int = None,
stop:int = None,
step:int = 1,
*args, **kwargs) -> None:
def __init__(
self,
dataset: Dataset,
start: int = None,
stop: int = None,
step: int = 1,
*args, **kwargs) -> None:
"""
Parameters
----------
Expand Down
8 changes: 4 additions & 4 deletions cascade/data/sequential_cacher.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,10 +30,10 @@ class SequentialCacher(Modifier):
BruteforceCacher
"""
def __init__(
self,
dataset: Dataset,
batch_size: int = 2,
*args, **kwargs) -> None:
self,
dataset: Dataset,
batch_size: int = 2,
*args, **kwargs) -> None:
"""
Parameters
----------
Expand Down
161 changes: 161 additions & 0 deletions cascade/data/version_assigner.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,161 @@
import os
from hashlib import md5
from typing import Dict, List
from . import Dataset, Modifier
from ..base import MetaHandler, supported_meta_formats


class VersionAssigner(Modifier):
"""
Class for automatic data versioning using metadata.
`VersionAssigner` is a simple `Modifier` that tracks changes in metadata and assigns
dataset a version considering changes in meta.
The version consists of two parts, namely major and minor in the format `MAJOR.MINOR` just
like in semantic versioning. The meaning of parts is the following: *major* number changes
if there are changes in the structure of the pipeline e.g. some dataset was added/removed;
*minor* number changes in case of any metadata change e.g. new data arrived and changed
the length of modifiers on pipeline.
Example
-------
>>> # Set up the pipeline
>>> from cascade import data as cdd
>>> ds = cdd.Wrapper([0, 1, 2, 3, 4])
>>> ds = VersionAssigner(ds, 'data_log.yml') # can be any supported meta format
>>> print(ds.version)
... 0.0
>>> # Changes its structure - add new modifier
>>> ds = cdd.Wrapper([0, 1, 2, 3, 4])
>>> ds = cdd.RangeSampler(ds, 0, len(ds), 2)
>>> ds = VersionAssigner(ds, 'data_log.yml')
>>> print(ds.version)
... 1.0
>>> # Revert changes - version downgrades back
>>> ds = cdd.Wrapper([0, 1, 2, 3, 4])
>>> ds = VersionAssigner(ds, 'data_log.yml')
>>> print(ds.version)
... 0.0
>>> # Update input data - minor update
>>> ds = cdd.Wrapper([0, 1, 2, 3, 4, 5])
>>> ds = VersionAssigner(ds, 'data_log.yml')
>>> print(ds.version)
... 0.1
Note
----
Some limitations are present. If meta data of some dataset has
something random or run-dependent like for example memory
address of an object or time of creation, then the version will
bump on every run.
Important
---------
In current implementation counts only highest-level pipeline changes.
For example if final part of a pipeline is Concatenator will only
count it and not the structure of a pipelines before.
It is only applied to the major version changes and may be fixed in
following versions.
"""
def __init__(self, dataset: Dataset, path: str, verbose=False, *args, **kwargs) -> None:
"""
Parameters
----------
dataset: Dataset
a dataset to infer version to
path: str
a path to a version log file of this dataset can be of any supported
meta format
"""
super().__init__(dataset, *args, **kwargs)
self._mh = MetaHandler()
self._assign_path(path)
self._versions = {}

# get meta for info about pipeline
meta = self._dataset.get_meta()

# TODO: extract all names
# use whole meta and pipeline which is only first-level names
meta_str = str(meta)
pipeline = ' '.join([m['name'] for m in meta])

# identify pipeline
meta_hash = md5(str.encode(meta_str, 'utf-8')).hexdigest()
pipe_hash = md5(str.encode(pipeline, 'utf-8')).hexdigest()

if os.path.exists(self._root):
self._versions = self._mh.read(self._root)

if pipe_hash in self._versions:
if meta_hash in self._versions[pipe_hash]:
self.version = self._versions[pipe_hash][meta_hash]['version']
else:
last_ver = self._get_last_version_from_pipe(pipe_hash)
major, minor = self._split_ver(last_ver)
minor += 1
self.version = self._join_ver(major, minor)
self._versions[pipe_hash][meta_hash] = {
'version': self.version,
'meta': meta,
'pipeline': pipeline
}
else:
last_ver = self._get_last_version()
major, minor = self._split_ver(last_ver)
major += 1
self.version = self._join_ver(major, minor)
self._versions[pipe_hash] = {}
self._versions[pipe_hash][meta_hash] = {
'version': self.version,
'meta': meta,
'pipeline': pipeline
}

self._mh.write(self._root, self._versions)
else:
self.version = '0.0'
self._versions[pipe_hash] = {}
self._versions[pipe_hash][meta_hash] = {
'version': self.version,
'meta': meta,
'pipeline': pipeline
}
self._mh.write(self._root, self._versions)

if verbose:
print('Dataset version:', self.version)

def _assign_path(self, path):
_, ext = os.path.splitext(path)
if ext == '':
raise ValueError(f'Provided path {path} has no extension')

assert ext in supported_meta_formats, f'Only {supported_meta_formats} are supported formats'
self._root = path

def _split_ver(self, ver):
major, minor = ver.split('.')
return int(major), int(minor)

def _join_ver(self, major, minor):
return f'{major}.{minor}'

def _get_last_version_from_pipe(self, pipe_hash):
versions = [item['version'] for item in self._versions[pipe_hash].values()]
versions = sorted(versions)
return versions[-1]

def _get_last_version(self):
versions_flat = []
for pipe_hash in self._versions:
versions_flat += [item['version'] for item in self._versions[pipe_hash].values()]
versions = sorted(versions_flat)
return versions[-1]

def get_meta(self) -> List[Dict]:
meta = super().get_meta()
meta[0]['version'] = self.version
return meta
9 changes: 9 additions & 0 deletions cascade/docs/source/cascade.data.rst
Original file line number Diff line number Diff line change
Expand Up @@ -134,3 +134,12 @@ cascade.data
|
|

.. autoclass:: cascade.data.VersionAssigner

|
|
|
2 changes: 2 additions & 0 deletions cascade/meta/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,3 +19,5 @@
from .validator import DataValidationException, Validator, AggregateValidator, PredicateValidator
from .meta_validator import MetaValidator
from .history_viewer import HistoryViewer
from .dataleak_validator import DataleakValidator
from .hashes import numpy_md5
Loading

0 comments on commit 18f42d3

Please sign in to comment.