Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

RayDistributor for using Ray to distribute the calculations in tsfresh #1030

Open
wants to merge 2 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions .pre-commit-config.yaml
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@TheaperDeng - I have update the pre-commit file on the main branch to use the newest versions. Can you please merge in the newest changes and resolve the merge conflicts?

Original file line number Diff line number Diff line change
@@ -1,12 +1,12 @@
repos:
- repo: https://github.com/psf/black
rev: 19.10b0
rev: 22.3.0
hooks:
- id: black
language_version: python3
exclude: "^docs/.*$"
- repo: https://github.com/pycqa/isort
rev: 5.7.0
rev: 5.12.0
hooks:
- id: isort
args:
Expand Down
50 changes: 50 additions & 0 deletions docs/text/tsfresh_on_a_cluster.rst
Original file line number Diff line number Diff line change
Expand Up @@ -200,6 +200,56 @@ of 3 workers:
column_sort='time',
distributor=Distributor)

Using Ray to distribute the calculations
'''''''''''''''''''''''''''''''''''''''''
We provide a Distributor for the `Ray framework <https://docs.ray.io/en/latest/index.html>`_, where
*"Ray is an open-source unified compute framework that makes it easy to scale AI and Python workloads."*

.. NOTE::
Ray is an optional dependency and users who needs to use Ray to distribute the calculations should install
it first by `pip install ray`.

Ray is a easy-to-use developing framework for distributed computing workload. Users could use it on single node or scale
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you add a few words on why a user would choose the ray distributor and not use any other distributors? I am totally fine with having it in the code-base, I just want to make sure users are not confused on what to choose. What are benefits compared to e.g. dask?
What understood, using ray allows to parallelize the computation but does not help for out-of-memory data. And it is of course useful if you already run a fay cluster

to a cluster. Users only need to have an address of the Ray cluster to connect to (e.g., "ray://123.45.67.89:10001").

.. code:: python

from tsfresh.examples.robot_execution_failures import \
download_robot_execution_failures, \
load_robot_execution_failures
from tsfresh.feature_extraction import extract_features
from tsfresh.utilities.distribution import RayDistributor

download_robot_execution_failures()
df, y = load_robot_execution_failures()
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So this means the data is loaded on the main (driver) node and then sent to the workers, is this correct? So it helps to speed up the computation but does not help when the data is larger than memory, right?
That is fine, just want to understand.


Distributor = RayDistributor(address="ray://123.45.67.89:10001")
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If the number of workers is not retrieved automatically , don't you need to pass it here because the default is 1?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: can you use a lowercase distributor? The object is not a class but an instance.


X = extract_features(timeseries_container=df,
column_id='id',
column_sort='time',
distributor=Distributor)

If users would like to have a local cluster with multiple workers. They could simply leave `address`` unset.

.. code:: python

from tsfresh.examples.robot_execution_failures import \
download_robot_execution_failures, \
load_robot_execution_failures
from tsfresh.feature_extraction import extract_features
from tsfresh.utilities.distribution import RayDistributor

download_robot_execution_failures()
df, y = load_robot_execution_failures()

Distributor = RayDistributor(n_workers=3)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As far as I understood, this will not automatically start a ray cluster with 3 workers, or?


X = extract_features(timeseries_container=df,
column_id='id',
column_sort='time',
distributor=Distributor)

Writing your own distributor
''''''''''''''''''''''''''''

Expand Down
2 changes: 2 additions & 0 deletions test-requirements.txt
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The structure of the test requirements changed on main to have a more "modern" or typical repository structure. Those changes will go into the setup.cfg file once you merge in newest main.

Original file line number Diff line number Diff line change
Expand Up @@ -8,4 +8,6 @@ seaborn>=0.7.1
ipython>=5.3.0
notebook>=4.4.1
pandas-datareader>=0.5.0
ray>=2.5.0
protobuf<=3.20.3
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why is that needed? Could you maybe add a comment because I do not see protobuf being used directly

pre-commit
68 changes: 68 additions & 0 deletions tests/units/utilities/test_distribution.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
ClusterDaskDistributor,
LocalDaskDistributor,
MultiprocessingDistributor,
RayDistributor,
)


Expand Down Expand Up @@ -189,3 +190,70 @@ def test_dask_cluster_extraction_two_workers(self):
self.assertTrue(np.all(extracted_features.b__mean == np.array([37.85, 34.75])))
self.assertTrue(np.all(extracted_features.b__median == np.array([39.5, 28.0])))
cluster.close()


# RayDistributor
class LocalRayDistributorTestCase(DataTestCase):
def test_ray_cluster_extraction_one_worker(self):

Distributor = RayDistributor(n_workers=1)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same nit as before, can you use lowercase?


df = self.create_test_data_sample()
extracted_features = extract_features(
df,
column_id="id",
column_sort="sort",
column_kind="kind",
column_value="val",
distributor=Distributor,
)

self.assertIsInstance(extracted_features, pd.DataFrame)
self.assertTrue(np.all(extracted_features.a__maximum == np.array([71, 77])))
self.assertTrue(
np.all(extracted_features.a__sum_values == np.array([691, 1017]))
)
self.assertTrue(
np.all(extracted_features.a__abs_energy == np.array([32211, 63167]))
)
self.assertTrue(
np.all(extracted_features.b__sum_values == np.array([757, 695]))
)
self.assertTrue(np.all(extracted_features.b__minimum == np.array([3, 1])))
self.assertTrue(
np.all(extracted_features.b__abs_energy == np.array([36619, 35483]))
)
self.assertTrue(np.all(extracted_features.b__mean == np.array([37.85, 34.75])))
self.assertTrue(np.all(extracted_features.b__median == np.array([39.5, 28.0])))

def test_ray_cluster_extraction_two_worker(self):

Distributor = RayDistributor(n_workers=2)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Again the question: this is not creating a ray cluster with two workers, or? It is technically the same cluster as without this option - you just change the chunking.
Not sure if this is expected.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it possible to actually start a 2-worker local ray cluster?


df = self.create_test_data_sample()
extracted_features = extract_features(
df,
column_id="id",
column_sort="sort",
column_kind="kind",
column_value="val",
distributor=Distributor,
)

self.assertIsInstance(extracted_features, pd.DataFrame)
self.assertTrue(np.all(extracted_features.a__maximum == np.array([71, 77])))
self.assertTrue(
np.all(extracted_features.a__sum_values == np.array([691, 1017]))
)
self.assertTrue(
np.all(extracted_features.a__abs_energy == np.array([32211, 63167]))
)
self.assertTrue(
np.all(extracted_features.b__sum_values == np.array([757, 695]))
)
self.assertTrue(np.all(extracted_features.b__minimum == np.array([3, 1])))
self.assertTrue(
np.all(extracted_features.b__abs_energy == np.array([36619, 35483]))
)
self.assertTrue(np.all(extracted_features.b__mean == np.array([37.85, 34.75])))
self.assertTrue(np.all(extracted_features.b__median == np.array([39.5, 28.0])))
78 changes: 78 additions & 0 deletions tsfresh/utilities/distribution.py
Original file line number Diff line number Diff line change
Expand Up @@ -494,6 +494,84 @@ def close(self):
self.pool.join()


class RayDistributor(IterableDistributorBaseClass):
def __init__(
self,
address=None,
rayinit_config={},
n_workers=1,
disable_progressbar=False,
progressbar_title="Feature Extraction",
):
"""
Creates a new RayDistributor instance

:param address: the ip address and port number of the Ray Cluster
:type address: str
:param rayinit_config: external config for the ray.init calling
:type rayinit_config: dict
:param n_workers: How many workers should the multiprocessing pool have?
:type n_workers: int
:param disable_progressbar: whether to show a progressbar or not.
:type disable_progressbar: bool
:param progressbar_title: the title of the progressbar
:type progressbar_title: basestring
"""
import ray

ray.init(address=address, **rayinit_config)
self.n_workers = n_workers
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The number of workers is defined by the ray cluster size when starting the cluster and can not be controlled by the user st this point, or? So the user needs to make sure to always pass in the correct number of workers according to the cluster. Can this also be retrieved from ray? We do something similar for dask.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If possible, I would recommend the following: as the number of worker property currently is not changing the cluster deployment, I would prefer if it is filled automatically. If this is not possible, we should remove the default value of 1 and maybe rename the parameter to make sure users know they need to set it to the number of cluster workers.
Now, it might look to users as if they can control the number of workers in ther cluster using this variable (which I think they can not)

self.cpu_per_worker = max(
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am not an expert in ray, but this assumes that you have a heterogeneous cluster where each machine has the same number of CPUs, or? Why is this setting needed at all? Does ray not use all CPUs of a machine by default (again, not an expert in ray!)

1, int(ray.available_resources()["CPU"]) // self.n_workers
)
self.disable_progressbar = disable_progressbar
self.progressbar_title = progressbar_title

def calculate_best_chunk_size(self, data_length):
"""
Uses the number of ray workers in the cluster (during execution time, meaning when you start the extraction)
to find the optimal chunk_size.

:param data_length: A length which defines how many calculations there need to be.
:type data_length: int
"""
chunk_size, extra = divmod(data_length, self.n_workers * 5)
if extra:
chunk_size += 1
return chunk_size

def distribute(self, func, partitioned_chunks, kwargs):
"""
Create a remote function in ray and calculate the features in a parallel fashion
by distributing the data chunck to the remote function.

:param func: the function to send to each worker.
:type func: callable
:param partitioned_chunks: The list of data chunks - each element is again
a list of chunks - and should be processed by one worker.
:type partitioned_chunks: iterable
:param kwargs: parameters for the map function
:type kwargs: dict of string to parameter

:return: The result of the calculation as a generator -
each item should be the result of the application of func to a single element.
"""
import ray

remote_func = ray.remote(func).options(num_cpus=self.cpu_per_worker)
results = [remote_func.remote(chunk, **kwargs) for chunk in partitioned_chunks]
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just out of curiosity, how will ray distribute the work on the cluster? How will it di the scheduling decision? (just for my own education)

for result in results:
yield ray.get(result)

def close(self):
"""
Disconnect the worker, and terminate processes.
"""
import ray

ray.shutdown()


class ApplyDistributor(DistributorBaseClass):
def __init__(self, meta):
self.meta = meta
Expand Down