Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 5 additions & 3 deletions notebooks/03_pytorch.ipynb
Original file line number Diff line number Diff line change
Expand Up @@ -478,7 +478,7 @@
},
{
"cell_type": "code",
"execution_count": 6,
"execution_count": null,
"id": "8a6fdd873625b07b",
"metadata": {
"ExecuteTime": {
Expand Down Expand Up @@ -858,7 +858,7 @@
],
"source": [
"from capymoa.classifier import OnlineBagging\n",
"from capymoa.stream import TorchClassifyStream\n",
"from capymoa.stream import TorchStream\n",
"from capymoa.evaluation import prequential_evaluation\n",
"from capymoa.evaluation.visualization import plot_windowed_results\n",
"\n",
Expand All @@ -868,7 +868,9 @@
"pytorch_dataset = datasets.FashionMNIST(\n",
" root=\"data\", train=True, download=True, transform=ToTensor()\n",
")\n",
"pytorch_stream = TorchClassifyStream(dataset=pytorch_dataset, num_classes=10)\n",
"pytorch_stream = TorchStream.from_classification(\n",
" dataset=pytorch_dataset, num_classes=10\n",
")\n",
"\n",
"# Creating a learner\n",
"ob_learner = OnlineBagging(schema=pytorch_stream.get_schema(), ensemble_size=5)\n",
Expand Down
4 changes: 2 additions & 2 deletions src/capymoa/ocl/datasets.py
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@
partition_by_schedule,
class_schedule_to_task_mask,
)
from capymoa.stream import Stream, TorchClassifyStream
from capymoa.stream import Stream, TorchStream
from capymoa.stream._stream import Schema


Expand Down Expand Up @@ -227,7 +227,7 @@ def __init__(
self.test_tasks = self._preload_datasets(self.test_tasks)

# Create streams for training and testing
self.stream = TorchClassifyStream(
self.stream = TorchStream.from_classification(
ConcatDataset(self.train_tasks),
num_classes=self.num_classes,
shuffle=False,
Expand Down
4 changes: 2 additions & 2 deletions src/capymoa/stream/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,14 +7,14 @@
)
from ._csv_stream import CSVStream
from ._stream_from_file import stream_from_file
from .torch import TorchClassifyStream
from .torch import TorchStream
from . import drift, generator, preprocessing

__all__ = [
"Stream",
"Schema",
"ARFFStream",
"TorchClassifyStream",
"TorchStream",
"CSVStream",
"drift",
"generator",
Expand Down
2 changes: 1 addition & 1 deletion src/capymoa/stream/_stream_from_file.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ def stream_from_file(
* :class:`~capymoa.stream.CSVStream`
* :class:`~capymoa.stream.ARFFStream`
* :class:`~capymoa.stream.NumpyStream`
* :class:`~capymoa.stream.TorchClassifyStream`
* :class:`~capymoa.stream.TorchStream`
* :class:`~capymoa.stream.Stream`

**CSV File Considerations:**
Expand Down
255 changes: 153 additions & 102 deletions src/capymoa/stream/torch.py
Original file line number Diff line number Diff line change
@@ -1,125 +1,169 @@
import copy
from typing import Optional, Sequence, Tuple

import torch

from capymoa.stream import Stream, Schema
from capymoa.instance import (
LabeledInstance,
)
from capymoa.instance import LabeledInstance, RegressionInstance
from torch.utils.data import Dataset


class TorchClassifyStream(Stream[LabeledInstance]):
"""TorchClassifyStream turns a PyTorch dataset into a classification stream.
def _shuffle_dataset(dataset: Dataset, seed: Optional[int] = None) -> Dataset:
rng = torch.Generator()
if seed is not None:
rng.manual_seed(seed)
indicies = torch.randperm(len(dataset), generator=rng)
return torch.utils.data.Subset(dataset, indicies)


class TorchStream(Stream):
"""A stream adapter for PyTorch datasets.

This class converts PyTorch datasets into CapyMOA streams for both classification
and regression tasks.

Creating a classification stream from a PyTorch dataset:

>>> from capymoa.evaluation import ClassificationEvaluator
...
>>> from capymoa.datasets import get_download_dir
>>> from capymoa.stream import TorchClassifyStream
>>> from torchvision import datasets
>>> from torchvision.transforms import ToTensor
>>> print("Using PyTorch Dataset"); pytorchDataset = datasets.FashionMNIST( #doctest:+ELLIPSIS
>>> from capymoa.stream import TorchStream
>>> from torchvision import datasets, transforms
>>>
>>> dataset = datasets.FashionMNIST(
... root=get_download_dir(),
... train=True,
... download=True,
... transform=ToTensor()
... )
Using PyTorch Dataset...
>>> pytorch_stream = TorchClassifyStream(pytorchDataset, 10, class_names=pytorchDataset.classes)
>>> pytorch_stream.get_schema()
@relation PytorchDataset
<BLANKLINE>
@attribute 0 numeric
@attribute 1 numeric
...
@attribute 783 numeric
@attribute class {T-shirt/top,Trouser,Pullover,Dress,Coat,Sandal,Shirt,Sneaker,Bag,'Ankle boot'}
<BLANKLINE>
@data
>>> pytorch_stream.next_instance()
LabeledInstance(
Schema(PytorchDataset),
x=[0. 0. 0. ... 0. 0. 0.],
y_index=9,
y_label='Ankle boot'
)

You can construct :class:`TorchClassifyStream` using a random sampler by passing a sampler
to the constructor:
... transform=transforms.ToTensor()
... ) # doctest: +SKIP
>>> stream = TorchStream.from_classification(
... dataset, num_classes=10, class_names=dataset.classes
... ) # doctest: +SKIP
>>> stream.next_instance() # doctest: +SKIP
LabeledInstance(...)

Creating a shuffled classification stream:

>>> import torch
>>> from torch.utils.data import RandomSampler, TensorDataset
>>> from torch.utils.data import TensorDataset
>>>
>>> dataset = TensorDataset(
... torch.tensor([[1], [2], [3]]), torch.tensor([0, 1, 2])
... torch.tensor([[1.0], [2.0], [3.0]]),
... torch.tensor([0, 1, 2])
... )
>>> pytorch_stream = TorchClassifyStream(dataset=dataset, num_classes=3, shuffle=True)
>>> for instance in pytorch_stream:
... print(instance.x)
[3.]
[1.]
[2.]

Importantly you can restart the stream to iterate over the dataset in
the same order again:

>>> pytorch_stream.restart()
>>> for instance in pytorch_stream:
... print(instance.x)
[3.]
[1.]
[2.]
>>> stream = TorchStream.from_classification(
... dataset, num_classes=3, shuffle=True, shuffle_seed=0
... )
>>> [float(inst.x[0]) for inst in stream]
[3.0, 1.0, 2.0]

Streams can be restarted to iterate again:

>>> stream.restart()
>>> [float(inst.x[0]) for inst in stream]
[3.0, 1.0, 2.0]

Creating a regression stream:

>>> dataset = TensorDataset(
... torch.tensor([[1.0], [2.0], [3.0]]),
... torch.tensor([0.5, 1.5, 2.5])
... )
>>> stream = TorchStream.from_regression(
... dataset, shuffle=True, shuffle_seed=0
... )
>>> [(float(inst.x[0]), float(inst.y_value)) for inst in stream]
[(3.0, 2.5), (1.0, 0.5), (2.0, 1.5)]
"""

def __init__(
self,
dataset: Dataset[Tuple[torch.Tensor, torch.LongTensor]],
num_classes: int,
@staticmethod
def from_regression(
dataset: Dataset[Tuple[torch.Tensor, torch.Tensor | float]],
dataset_name: str = "TorchStream",
shuffle: bool = False,
shuffle_seed: int = 0,
class_names: Optional[Sequence[str]] = None,
dataset_name: str = "PytorchDataset",
shape: Optional[Sequence[int]] = None,
):
"""Create a stream from a PyTorch dataset.

:param dataset: A PyTorch dataset
:param num_classes: The number of classes in the dataset
:param shuffle: Randomly sample with replacement, defaults to False
:param shuffle_seed: Seed for shuffling, defaults to 0
:param class_names: The names of the classes, defaults to None
:param dataset_name: The name of the dataset, defaults to "PytorchDataset"
shuffle_seed: Optional[int] = None,
) -> "TorchStream":
"""Construct a stream for regression from a PyTorch Dataset.

:param dataset: A PyTorch Dataset that yields tuples of (features, target) for
regression tasks.
:param dataset_name: An optional name for the stream.
:param shape: An optional shape for the features. If not provided, features will
be treated as flat vectors.
:param shuffle: Whether to shuffle the dataset.
:param shuffle_seed: An optional seed for shuffling the dataset.
:return: A TorchStream instance for regression.
"""
if not (class_names is None or len(class_names) == num_classes):
raise ValueError("Number of class labels must match the number of classes")

self.__init_args_kwargs__ = copy.copy(
locals()
) # save init args for recreation. not a deep copy to avoid unnecessary use of memory
# Construct the schema based on the dataset and provided parameters
X, _ = dataset[0]
n_features = X.numel()
features = [str(f) for f in range(n_features)] + ["target"]
schema = Schema.from_custom(
features=features,
target="target",
name=dataset_name,
)

dataset = _shuffle_dataset(dataset, seed=shuffle_seed) if shuffle else dataset
return TorchStream(dataset, schema)

self._dataset = dataset
self._index = 0
self._permutation = torch.arange(len(dataset))
@staticmethod
def from_classification(
dataset: Dataset[Tuple[torch.Tensor, torch.Tensor | int]],
num_classes: int,
class_names: Optional[Sequence[str]] = None,
dataset_name: str = "TorchStream",
shape: Optional[Sequence[int]] = None,
shuffle: bool = False,
shuffle_seed: Optional[int] = None,
) -> "TorchStream":
"""Construct a stream for classification from a PyTorch Dataset.

:param dataset: A PyTorch Dataset that yields tuples of (features, target).
:param num_classes: The number of classes in the classification task.
:param class_names: An optional sequence of class names corresponding to the class indices.
:param dataset_name: An optional name for the stream.
:param shape: An optional shape for the features. If not provided, features will
be treated as flat vectors.
:param shuffle: Whether to shuffle the dataset.
:param shuffle_seed: An optional seed for shuffling the dataset.
:return: A TorchStream instance.
"""

if shuffle:
self._permutation = torch.randperm(
len(dataset),
generator=torch.Generator().manual_seed(shuffle_seed),
)
if class_names is None:
class_names = [str(k) for k in range(num_classes)]
if len(class_names) != num_classes:
raise ValueError("Length of class_names must match num_classes.")

# Use the first instance to infer the number of attributes
X, _ = self._dataset[0]
# Construct the schema based on the dataset and provided parameters
X, _ = dataset[0]
n_features = X.numel()

# Create a header describing the dataset for MOA
self._schema = Schema.from_custom(
features=[f"{f}" for f in range(n_features)] + ["class"],
features = [str(f) for f in range(n_features)] + ["class"]
schema = Schema.from_custom(
features=features,
target="class",
categories={"class": class_names or [str(i) for i in range(num_classes)]},
categories={"class": class_names},
name=dataset_name,
)
if shape is not None:
self._schema.shape = shape
schema._shape = shape if shape is not None else (n_features,)

dataset = _shuffle_dataset(dataset, seed=shuffle_seed) if shuffle else dataset
return TorchStream(dataset, schema)

def __init__(
self,
dataset: Dataset,
schema: Schema,
):
"""Construct a TorchStream from a PyTorch Dataset and a Schema.

Usually you want :meth:`from_classification` or :meth:`from_regression`.

:param dataset: A PyTorch Dataset that yields tuples of (features, target).
:param schema: A Schema object that describes the structure of the data,
including feature names and target information.
"""
self._dataset = dataset
self.schema = schema
self._index = 0

def has_more_instances(self):
return len(self._dataset) > self._index
Expand All @@ -128,19 +172,26 @@ def next_instance(self):
if not self.has_more_instances():
raise StopIteration()

X, y = self._dataset[self._permutation[self._index]]
X, y = self._dataset[self._index]
self._index += 1 # increment counter for next call

# Tensors on the CPU and NumPy arrays share their underlying memory locations
# We should prefer numpy over tensors in instances to improve compatibility
# See: https://pytorch.org/tutorials/beginner/blitz/tensor_tutorial.html#bridge-to-np-label
X = X.view(-1).numpy()
if isinstance(y, torch.Tensor) and torch.isnan(y):
y = -1
return LabeledInstance.from_array(self._schema, X, int(y))
if self.schema.is_classification():
# Tensors on the CPU and NumPy arrays share their underlying memory locations
# We should prefer numpy over tensors in instances to improve compatibility
# See: https://pytorch.org/tutorials/beginner/blitz/tensor_tutorial.html#bridge-to-np-label
X = X.view(-1).numpy()
if isinstance(y, torch.Tensor) and torch.isnan(y):
y = -1
return LabeledInstance.from_array(self.schema, X, int(y))
elif self.schema.is_regression():
X = X.view(-1).numpy()
y = y.item() # Convert single-value tensor to a Python scalar
return RegressionInstance.from_array(self.schema, X, y)
else:
raise ValueError("Schema must be either classification or regression.")

def get_schema(self):
return self._schema
return self.schema

def get_moa_stream(self):
return None
Expand Down
Loading