diff --git a/notebooks/03_pytorch.ipynb b/notebooks/03_pytorch.ipynb index 7dc32b41..237c973d 100644 --- a/notebooks/03_pytorch.ipynb +++ b/notebooks/03_pytorch.ipynb @@ -478,7 +478,7 @@ }, { "cell_type": "code", - "execution_count": 6, + "execution_count": null, "id": "8a6fdd873625b07b", "metadata": { "ExecuteTime": { @@ -858,7 +858,7 @@ ], "source": [ "from capymoa.classifier import OnlineBagging\n", - "from capymoa.stream import TorchClassifyStream\n", + "from capymoa.stream import TorchStream\n", "from capymoa.evaluation import prequential_evaluation\n", "from capymoa.evaluation.visualization import plot_windowed_results\n", "\n", @@ -868,7 +868,9 @@ "pytorch_dataset = datasets.FashionMNIST(\n", " root=\"data\", train=True, download=True, transform=ToTensor()\n", ")\n", - "pytorch_stream = TorchClassifyStream(dataset=pytorch_dataset, num_classes=10)\n", + "pytorch_stream = TorchStream.from_classification(\n", + " dataset=pytorch_dataset, num_classes=10\n", + ")\n", "\n", "# Creating a learner\n", "ob_learner = OnlineBagging(schema=pytorch_stream.get_schema(), ensemble_size=5)\n", diff --git a/src/capymoa/ocl/datasets.py b/src/capymoa/ocl/datasets.py index 60cc0a0d..ec6451a9 100644 --- a/src/capymoa/ocl/datasets.py +++ b/src/capymoa/ocl/datasets.py @@ -65,7 +65,7 @@ partition_by_schedule, class_schedule_to_task_mask, ) -from capymoa.stream import Stream, TorchClassifyStream +from capymoa.stream import Stream, TorchStream from capymoa.stream._stream import Schema @@ -227,7 +227,7 @@ def __init__( self.test_tasks = self._preload_datasets(self.test_tasks) # Create streams for training and testing - self.stream = TorchClassifyStream( + self.stream = TorchStream.from_classification( ConcatDataset(self.train_tasks), num_classes=self.num_classes, shuffle=False, diff --git a/src/capymoa/stream/__init__.py b/src/capymoa/stream/__init__.py index a7cbc3ed..ecd887a2 100644 --- a/src/capymoa/stream/__init__.py +++ b/src/capymoa/stream/__init__.py @@ -7,14 +7,14 @@ ) from ._csv_stream import CSVStream from ._stream_from_file import stream_from_file -from .torch import TorchClassifyStream +from .torch import TorchStream from . import drift, generator, preprocessing __all__ = [ "Stream", "Schema", "ARFFStream", - "TorchClassifyStream", + "TorchStream", "CSVStream", "drift", "generator", diff --git a/src/capymoa/stream/_stream_from_file.py b/src/capymoa/stream/_stream_from_file.py index fda3ce07..a4ba8d8c 100644 --- a/src/capymoa/stream/_stream_from_file.py +++ b/src/capymoa/stream/_stream_from_file.py @@ -34,7 +34,7 @@ def stream_from_file( * :class:`~capymoa.stream.CSVStream` * :class:`~capymoa.stream.ARFFStream` * :class:`~capymoa.stream.NumpyStream` - * :class:`~capymoa.stream.TorchClassifyStream` + * :class:`~capymoa.stream.TorchStream` * :class:`~capymoa.stream.Stream` **CSV File Considerations:** diff --git a/src/capymoa/stream/torch.py b/src/capymoa/stream/torch.py index 25a4e632..da472d75 100644 --- a/src/capymoa/stream/torch.py +++ b/src/capymoa/stream/torch.py @@ -1,125 +1,169 @@ -import copy from typing import Optional, Sequence, Tuple import torch from capymoa.stream import Stream, Schema -from capymoa.instance import ( - LabeledInstance, -) +from capymoa.instance import LabeledInstance, RegressionInstance from torch.utils.data import Dataset -class TorchClassifyStream(Stream[LabeledInstance]): - """TorchClassifyStream turns a PyTorch dataset into a classification stream. +def _shuffle_dataset(dataset: Dataset, seed: Optional[int] = None) -> Dataset: + rng = torch.Generator() + if seed is not None: + rng.manual_seed(seed) + indicies = torch.randperm(len(dataset), generator=rng) + return torch.utils.data.Subset(dataset, indicies) + + +class TorchStream(Stream): + """A stream adapter for PyTorch datasets. + + This class converts PyTorch datasets into CapyMOA streams for both classification + and regression tasks. + + Creating a classification stream from a PyTorch dataset: - >>> from capymoa.evaluation import ClassificationEvaluator - ... >>> from capymoa.datasets import get_download_dir - >>> from capymoa.stream import TorchClassifyStream - >>> from torchvision import datasets - >>> from torchvision.transforms import ToTensor - >>> print("Using PyTorch Dataset"); pytorchDataset = datasets.FashionMNIST( #doctest:+ELLIPSIS + >>> from capymoa.stream import TorchStream + >>> from torchvision import datasets, transforms + >>> + >>> dataset = datasets.FashionMNIST( ... root=get_download_dir(), ... train=True, ... download=True, - ... transform=ToTensor() - ... ) - Using PyTorch Dataset... - >>> pytorch_stream = TorchClassifyStream(pytorchDataset, 10, class_names=pytorchDataset.classes) - >>> pytorch_stream.get_schema() - @relation PytorchDataset - - @attribute 0 numeric - @attribute 1 numeric - ... - @attribute 783 numeric - @attribute class {T-shirt/top,Trouser,Pullover,Dress,Coat,Sandal,Shirt,Sneaker,Bag,'Ankle boot'} - - @data - >>> pytorch_stream.next_instance() - LabeledInstance( - Schema(PytorchDataset), - x=[0. 0. 0. ... 0. 0. 0.], - y_index=9, - y_label='Ankle boot' - ) - - You can construct :class:`TorchClassifyStream` using a random sampler by passing a sampler - to the constructor: + ... transform=transforms.ToTensor() + ... ) # doctest: +SKIP + >>> stream = TorchStream.from_classification( + ... dataset, num_classes=10, class_names=dataset.classes + ... ) # doctest: +SKIP + >>> stream.next_instance() # doctest: +SKIP + LabeledInstance(...) + + Creating a shuffled classification stream: >>> import torch - >>> from torch.utils.data import RandomSampler, TensorDataset + >>> from torch.utils.data import TensorDataset + >>> >>> dataset = TensorDataset( - ... torch.tensor([[1], [2], [3]]), torch.tensor([0, 1, 2]) + ... torch.tensor([[1.0], [2.0], [3.0]]), + ... torch.tensor([0, 1, 2]) ... ) - >>> pytorch_stream = TorchClassifyStream(dataset=dataset, num_classes=3, shuffle=True) - >>> for instance in pytorch_stream: - ... print(instance.x) - [3.] - [1.] - [2.] - - Importantly you can restart the stream to iterate over the dataset in - the same order again: - - >>> pytorch_stream.restart() - >>> for instance in pytorch_stream: - ... print(instance.x) - [3.] - [1.] - [2.] + >>> stream = TorchStream.from_classification( + ... dataset, num_classes=3, shuffle=True, shuffle_seed=0 + ... ) + >>> [float(inst.x[0]) for inst in stream] + [3.0, 1.0, 2.0] + + Streams can be restarted to iterate again: + + >>> stream.restart() + >>> [float(inst.x[0]) for inst in stream] + [3.0, 1.0, 2.0] + + Creating a regression stream: + + >>> dataset = TensorDataset( + ... torch.tensor([[1.0], [2.0], [3.0]]), + ... torch.tensor([0.5, 1.5, 2.5]) + ... ) + >>> stream = TorchStream.from_regression( + ... dataset, shuffle=True, shuffle_seed=0 + ... ) + >>> [(float(inst.x[0]), float(inst.y_value)) for inst in stream] + [(3.0, 2.5), (1.0, 0.5), (2.0, 1.5)] """ - def __init__( - self, - dataset: Dataset[Tuple[torch.Tensor, torch.LongTensor]], - num_classes: int, + @staticmethod + def from_regression( + dataset: Dataset[Tuple[torch.Tensor, torch.Tensor | float]], + dataset_name: str = "TorchStream", shuffle: bool = False, - shuffle_seed: int = 0, - class_names: Optional[Sequence[str]] = None, - dataset_name: str = "PytorchDataset", - shape: Optional[Sequence[int]] = None, - ): - """Create a stream from a PyTorch dataset. - - :param dataset: A PyTorch dataset - :param num_classes: The number of classes in the dataset - :param shuffle: Randomly sample with replacement, defaults to False - :param shuffle_seed: Seed for shuffling, defaults to 0 - :param class_names: The names of the classes, defaults to None - :param dataset_name: The name of the dataset, defaults to "PytorchDataset" + shuffle_seed: Optional[int] = None, + ) -> "TorchStream": + """Construct a stream for regression from a PyTorch Dataset. + + :param dataset: A PyTorch Dataset that yields tuples of (features, target) for + regression tasks. + :param dataset_name: An optional name for the stream. + :param shape: An optional shape for the features. If not provided, features will + be treated as flat vectors. + :param shuffle: Whether to shuffle the dataset. + :param shuffle_seed: An optional seed for shuffling the dataset. + :return: A TorchStream instance for regression. """ - if not (class_names is None or len(class_names) == num_classes): - raise ValueError("Number of class labels must match the number of classes") - self.__init_args_kwargs__ = copy.copy( - locals() - ) # save init args for recreation. not a deep copy to avoid unnecessary use of memory + # Construct the schema based on the dataset and provided parameters + X, _ = dataset[0] + n_features = X.numel() + features = [str(f) for f in range(n_features)] + ["target"] + schema = Schema.from_custom( + features=features, + target="target", + name=dataset_name, + ) + + dataset = _shuffle_dataset(dataset, seed=shuffle_seed) if shuffle else dataset + return TorchStream(dataset, schema) - self._dataset = dataset - self._index = 0 - self._permutation = torch.arange(len(dataset)) + @staticmethod + def from_classification( + dataset: Dataset[Tuple[torch.Tensor, torch.Tensor | int]], + num_classes: int, + class_names: Optional[Sequence[str]] = None, + dataset_name: str = "TorchStream", + shape: Optional[Sequence[int]] = None, + shuffle: bool = False, + shuffle_seed: Optional[int] = None, + ) -> "TorchStream": + """Construct a stream for classification from a PyTorch Dataset. + + :param dataset: A PyTorch Dataset that yields tuples of (features, target). + :param num_classes: The number of classes in the classification task. + :param class_names: An optional sequence of class names corresponding to the class indices. + :param dataset_name: An optional name for the stream. + :param shape: An optional shape for the features. If not provided, features will + be treated as flat vectors. + :param shuffle: Whether to shuffle the dataset. + :param shuffle_seed: An optional seed for shuffling the dataset. + :return: A TorchStream instance. + """ - if shuffle: - self._permutation = torch.randperm( - len(dataset), - generator=torch.Generator().manual_seed(shuffle_seed), - ) + if class_names is None: + class_names = [str(k) for k in range(num_classes)] + if len(class_names) != num_classes: + raise ValueError("Length of class_names must match num_classes.") - # Use the first instance to infer the number of attributes - X, _ = self._dataset[0] + # Construct the schema based on the dataset and provided parameters + X, _ = dataset[0] n_features = X.numel() - - # Create a header describing the dataset for MOA - self._schema = Schema.from_custom( - features=[f"{f}" for f in range(n_features)] + ["class"], + features = [str(f) for f in range(n_features)] + ["class"] + schema = Schema.from_custom( + features=features, target="class", - categories={"class": class_names or [str(i) for i in range(num_classes)]}, + categories={"class": class_names}, name=dataset_name, ) - if shape is not None: - self._schema.shape = shape + schema._shape = shape if shape is not None else (n_features,) + + dataset = _shuffle_dataset(dataset, seed=shuffle_seed) if shuffle else dataset + return TorchStream(dataset, schema) + + def __init__( + self, + dataset: Dataset, + schema: Schema, + ): + """Construct a TorchStream from a PyTorch Dataset and a Schema. + + Usually you want :meth:`from_classification` or :meth:`from_regression`. + + :param dataset: A PyTorch Dataset that yields tuples of (features, target). + :param schema: A Schema object that describes the structure of the data, + including feature names and target information. + """ + self._dataset = dataset + self.schema = schema + self._index = 0 def has_more_instances(self): return len(self._dataset) > self._index @@ -128,19 +172,26 @@ def next_instance(self): if not self.has_more_instances(): raise StopIteration() - X, y = self._dataset[self._permutation[self._index]] + X, y = self._dataset[self._index] self._index += 1 # increment counter for next call - # Tensors on the CPU and NumPy arrays share their underlying memory locations - # We should prefer numpy over tensors in instances to improve compatibility - # See: https://pytorch.org/tutorials/beginner/blitz/tensor_tutorial.html#bridge-to-np-label - X = X.view(-1).numpy() - if isinstance(y, torch.Tensor) and torch.isnan(y): - y = -1 - return LabeledInstance.from_array(self._schema, X, int(y)) + if self.schema.is_classification(): + # Tensors on the CPU and NumPy arrays share their underlying memory locations + # We should prefer numpy over tensors in instances to improve compatibility + # See: https://pytorch.org/tutorials/beginner/blitz/tensor_tutorial.html#bridge-to-np-label + X = X.view(-1).numpy() + if isinstance(y, torch.Tensor) and torch.isnan(y): + y = -1 + return LabeledInstance.from_array(self.schema, X, int(y)) + elif self.schema.is_regression(): + X = X.view(-1).numpy() + y = y.item() # Convert single-value tensor to a Python scalar + return RegressionInstance.from_array(self.schema, X, y) + else: + raise ValueError("Schema must be either classification or regression.") def get_schema(self): - return self._schema + return self.schema def get_moa_stream(self): return None diff --git a/tests/test_stream.py b/tests/test_stream.py index c7d0c4c6..c4db4319 100644 --- a/tests/test_stream.py +++ b/tests/test_stream.py @@ -19,7 +19,7 @@ CSVStream, NumpyStream, Stream, - TorchClassifyStream, + TorchStream, stream_from_file, ) from pathlib import Path @@ -100,6 +100,8 @@ def check_attributes(numeric_attributes, nominal_attributes, num_attributes, sch YC2 = DATA[:, 3] DATASET_C1 = TensorDataset(torch.tensor(XC1), torch.tensor(YC1)) DATASET_C2 = TensorDataset(torch.tensor(XC2), torch.tensor(YC2)) +DATASET_N1 = TensorDataset(torch.tensor(XN1), torch.tensor(YN1)) +DATASET_N2 = TensorDataset(torch.tensor(XN2), torch.tensor(YN2)) ARFF = RESOURCES / "stream_test.arff" CSV = RESOURCES / "stream_test.csv" @@ -117,8 +119,8 @@ def check_attributes(numeric_attributes, nominal_attributes, num_attributes, sch (stream_from_file(CSV, class_index=3), "cat2", 5), (NumpyStream(XC1, YC1, target_type="categorical"), "cat1", 5), (NumpyStream(XC2, YC2, target_type="categorical"), "cat2", 5), - (TorchClassifyStream(DATASET_C1, 3), "cat1", 5), # type: ignore - (TorchClassifyStream(DATASET_C2, 2), "cat2", 5), # type: ignore + (TorchStream.from_classification(DATASET_C1, 3), "cat1", 5), # type: ignore + (TorchStream.from_classification(DATASET_C2, 2), "cat2", 5), # type: ignore ], ) def test_stream_classification( @@ -135,7 +137,7 @@ def test_stream_classification( num_attributes = len(numeric_attributes) + len(nominal_attributes) # NumpyStream and PyTorch streams do not have nominal labels by default. - if isinstance(stream, (NumpyStream, TorchClassifyStream)): + if isinstance(stream, (NumpyStream, TorchStream)): numeric_attributes = list(map(str, range(num_attributes))) nominal_attributes = {} label_values = [str(i) for i in label_indexes] @@ -199,6 +201,8 @@ def test_stream_classification( (stream_from_file(CSV, class_index=1), "num2"), (NumpyStream(XN1, YN1, target_type="numeric"), "num1"), (NumpyStream(XN2, YN2, target_type="numeric"), "num2"), + (TorchStream.from_regression(DATASET_N1), "num1"), + (TorchStream.from_regression(DATASET_N2), "num2"), ], ) def test_regression_stream(stream: Stream[RegressionInstance], target: str): @@ -208,7 +212,7 @@ def test_regression_stream(stream: Stream[RegressionInstance], target: str): num_attributes = len(numeric_attributes) + len(nominal_attributes) # Stream treats nominal attributes as numeric - if isinstance(stream, NumpyStream): + if isinstance(stream, (NumpyStream, TorchStream)): numeric_attributes = list(map(str, range(num_attributes))) nominal_attributes = {}