-
Notifications
You must be signed in to change notification settings - Fork 1.2k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
RayDistributor for using Ray to distribute the calculations in tsfresh #1030
base: main
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -200,6 +200,56 @@ of 3 workers: | |
column_sort='time', | ||
distributor=Distributor) | ||
|
||
Using Ray to distribute the calculations | ||
''''''''''''''''''''''''''''''''''''''''' | ||
We provide a Distributor for the `Ray framework <https://docs.ray.io/en/latest/index.html>`_, where | ||
*"Ray is an open-source unified compute framework that makes it easy to scale AI and Python workloads."* | ||
|
||
.. NOTE:: | ||
Ray is an optional dependency and users who needs to use Ray to distribute the calculations should install | ||
it first by `pip install ray`. | ||
|
||
Ray is a easy-to-use developing framework for distributed computing workload. Users could use it on single node or scale | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Could you add a few words on why a user would choose the ray distributor and not use any other distributors? I am totally fine with having it in the code-base, I just want to make sure users are not confused on what to choose. What are benefits compared to e.g. dask? |
||
to a cluster. Users only need to have an address of the Ray cluster to connect to (e.g., "ray://123.45.67.89:10001"). | ||
|
||
.. code:: python | ||
|
||
from tsfresh.examples.robot_execution_failures import \ | ||
download_robot_execution_failures, \ | ||
load_robot_execution_failures | ||
from tsfresh.feature_extraction import extract_features | ||
from tsfresh.utilities.distribution import RayDistributor | ||
|
||
download_robot_execution_failures() | ||
df, y = load_robot_execution_failures() | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. So this means the data is loaded on the main (driver) node and then sent to the workers, is this correct? So it helps to speed up the computation but does not help when the data is larger than memory, right? |
||
|
||
Distributor = RayDistributor(address="ray://123.45.67.89:10001") | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. If the number of workers is not retrieved automatically , don't you need to pass it here because the default is 1? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Nit: can you use a lowercase |
||
|
||
X = extract_features(timeseries_container=df, | ||
column_id='id', | ||
column_sort='time', | ||
distributor=Distributor) | ||
|
||
If users would like to have a local cluster with multiple workers. They could simply leave `address`` unset. | ||
|
||
.. code:: python | ||
|
||
from tsfresh.examples.robot_execution_failures import \ | ||
download_robot_execution_failures, \ | ||
load_robot_execution_failures | ||
from tsfresh.feature_extraction import extract_features | ||
from tsfresh.utilities.distribution import RayDistributor | ||
|
||
download_robot_execution_failures() | ||
df, y = load_robot_execution_failures() | ||
|
||
Distributor = RayDistributor(n_workers=3) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. As far as I understood, this will not automatically start a ray cluster with 3 workers, or? |
||
|
||
X = extract_features(timeseries_container=df, | ||
column_id='id', | ||
column_sort='time', | ||
distributor=Distributor) | ||
|
||
Writing your own distributor | ||
'''''''''''''''''''''''''''' | ||
|
||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The structure of the test requirements changed on main to have a more "modern" or typical repository structure. Those changes will go into the setup.cfg file once you merge in newest main. |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -8,4 +8,6 @@ seaborn>=0.7.1 | |
ipython>=5.3.0 | ||
notebook>=4.4.1 | ||
pandas-datareader>=0.5.0 | ||
ray>=2.5.0 | ||
protobuf<=3.20.3 | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Why is that needed? Could you maybe add a comment because I do not see protobuf being used directly |
||
pre-commit |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -15,6 +15,7 @@ | |
ClusterDaskDistributor, | ||
LocalDaskDistributor, | ||
MultiprocessingDistributor, | ||
RayDistributor, | ||
) | ||
|
||
|
||
|
@@ -189,3 +190,70 @@ def test_dask_cluster_extraction_two_workers(self): | |
self.assertTrue(np.all(extracted_features.b__mean == np.array([37.85, 34.75]))) | ||
self.assertTrue(np.all(extracted_features.b__median == np.array([39.5, 28.0]))) | ||
cluster.close() | ||
|
||
|
||
# RayDistributor | ||
class LocalRayDistributorTestCase(DataTestCase): | ||
def test_ray_cluster_extraction_one_worker(self): | ||
|
||
Distributor = RayDistributor(n_workers=1) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Same nit as before, can you use lowercase? |
||
|
||
df = self.create_test_data_sample() | ||
extracted_features = extract_features( | ||
df, | ||
column_id="id", | ||
column_sort="sort", | ||
column_kind="kind", | ||
column_value="val", | ||
distributor=Distributor, | ||
) | ||
|
||
self.assertIsInstance(extracted_features, pd.DataFrame) | ||
self.assertTrue(np.all(extracted_features.a__maximum == np.array([71, 77]))) | ||
self.assertTrue( | ||
np.all(extracted_features.a__sum_values == np.array([691, 1017])) | ||
) | ||
self.assertTrue( | ||
np.all(extracted_features.a__abs_energy == np.array([32211, 63167])) | ||
) | ||
self.assertTrue( | ||
np.all(extracted_features.b__sum_values == np.array([757, 695])) | ||
) | ||
self.assertTrue(np.all(extracted_features.b__minimum == np.array([3, 1]))) | ||
self.assertTrue( | ||
np.all(extracted_features.b__abs_energy == np.array([36619, 35483])) | ||
) | ||
self.assertTrue(np.all(extracted_features.b__mean == np.array([37.85, 34.75]))) | ||
self.assertTrue(np.all(extracted_features.b__median == np.array([39.5, 28.0]))) | ||
|
||
def test_ray_cluster_extraction_two_worker(self): | ||
|
||
Distributor = RayDistributor(n_workers=2) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Again the question: this is not creating a ray cluster with two workers, or? It is technically the same cluster as without this option - you just change the chunking. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Is it possible to actually start a 2-worker local ray cluster? |
||
|
||
df = self.create_test_data_sample() | ||
extracted_features = extract_features( | ||
df, | ||
column_id="id", | ||
column_sort="sort", | ||
column_kind="kind", | ||
column_value="val", | ||
distributor=Distributor, | ||
) | ||
|
||
self.assertIsInstance(extracted_features, pd.DataFrame) | ||
self.assertTrue(np.all(extracted_features.a__maximum == np.array([71, 77]))) | ||
self.assertTrue( | ||
np.all(extracted_features.a__sum_values == np.array([691, 1017])) | ||
) | ||
self.assertTrue( | ||
np.all(extracted_features.a__abs_energy == np.array([32211, 63167])) | ||
) | ||
self.assertTrue( | ||
np.all(extracted_features.b__sum_values == np.array([757, 695])) | ||
) | ||
self.assertTrue(np.all(extracted_features.b__minimum == np.array([3, 1]))) | ||
self.assertTrue( | ||
np.all(extracted_features.b__abs_energy == np.array([36619, 35483])) | ||
) | ||
self.assertTrue(np.all(extracted_features.b__mean == np.array([37.85, 34.75]))) | ||
self.assertTrue(np.all(extracted_features.b__median == np.array([39.5, 28.0]))) |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -494,6 +494,84 @@ def close(self): | |
self.pool.join() | ||
|
||
|
||
class RayDistributor(IterableDistributorBaseClass): | ||
def __init__( | ||
self, | ||
address=None, | ||
rayinit_config={}, | ||
n_workers=1, | ||
disable_progressbar=False, | ||
progressbar_title="Feature Extraction", | ||
): | ||
""" | ||
Creates a new RayDistributor instance | ||
|
||
:param address: the ip address and port number of the Ray Cluster | ||
:type address: str | ||
:param rayinit_config: external config for the ray.init calling | ||
:type rayinit_config: dict | ||
:param n_workers: How many workers should the multiprocessing pool have? | ||
:type n_workers: int | ||
:param disable_progressbar: whether to show a progressbar or not. | ||
:type disable_progressbar: bool | ||
:param progressbar_title: the title of the progressbar | ||
:type progressbar_title: basestring | ||
""" | ||
import ray | ||
|
||
ray.init(address=address, **rayinit_config) | ||
self.n_workers = n_workers | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The number of workers is defined by the ray cluster size when starting the cluster and can not be controlled by the user st this point, or? So the user needs to make sure to always pass in the correct number of workers according to the cluster. Can this also be retrieved from ray? We do something similar for dask. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. If possible, I would recommend the following: as the number of worker property currently is not changing the cluster deployment, I would prefer if it is filled automatically. If this is not possible, we should remove the default value of 1 and maybe rename the parameter to make sure users know they need to set it to the number of cluster workers. |
||
self.cpu_per_worker = max( | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I am not an expert in ray, but this assumes that you have a heterogeneous cluster where each machine has the same number of CPUs, or? Why is this setting needed at all? Does ray not use all CPUs of a machine by default (again, not an expert in ray!) |
||
1, int(ray.available_resources()["CPU"]) // self.n_workers | ||
) | ||
self.disable_progressbar = disable_progressbar | ||
self.progressbar_title = progressbar_title | ||
|
||
def calculate_best_chunk_size(self, data_length): | ||
""" | ||
Uses the number of ray workers in the cluster (during execution time, meaning when you start the extraction) | ||
to find the optimal chunk_size. | ||
|
||
:param data_length: A length which defines how many calculations there need to be. | ||
:type data_length: int | ||
""" | ||
chunk_size, extra = divmod(data_length, self.n_workers * 5) | ||
if extra: | ||
chunk_size += 1 | ||
return chunk_size | ||
|
||
def distribute(self, func, partitioned_chunks, kwargs): | ||
""" | ||
Create a remote function in ray and calculate the features in a parallel fashion | ||
by distributing the data chunck to the remote function. | ||
|
||
:param func: the function to send to each worker. | ||
:type func: callable | ||
:param partitioned_chunks: The list of data chunks - each element is again | ||
a list of chunks - and should be processed by one worker. | ||
:type partitioned_chunks: iterable | ||
:param kwargs: parameters for the map function | ||
:type kwargs: dict of string to parameter | ||
|
||
:return: The result of the calculation as a generator - | ||
each item should be the result of the application of func to a single element. | ||
""" | ||
import ray | ||
|
||
remote_func = ray.remote(func).options(num_cpus=self.cpu_per_worker) | ||
results = [remote_func.remote(chunk, **kwargs) for chunk in partitioned_chunks] | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Just out of curiosity, how will ray distribute the work on the cluster? How will it di the scheduling decision? (just for my own education) |
||
for result in results: | ||
yield ray.get(result) | ||
|
||
def close(self): | ||
""" | ||
Disconnect the worker, and terminate processes. | ||
""" | ||
import ray | ||
|
||
ray.shutdown() | ||
|
||
|
||
class ApplyDistributor(DistributorBaseClass): | ||
def __init__(self, meta): | ||
self.meta = meta | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@TheaperDeng - I have update the pre-commit file on the main branch to use the newest versions. Can you please merge in the newest changes and resolve the merge conflicts?