diff --git a/dask_ml/model_selection/_incremental.py b/dask_ml/model_selection/_incremental.py index 4697cbec6..4056f35ec 100644 --- a/dask_ml/model_selection/_incremental.py +++ b/dask_ml/model_selection/_incremental.py @@ -203,12 +203,13 @@ async def _fit( y_train = sorted(futures_of(y_train), key=lambda f: f.key) assert len(X_train) == len(y_train) - train_eg = await client.gather(client.map(len, y_train)) - msg = "[CV%s] For training there are between %d and %d examples in each chunk" - logger.info(msg, prefix, min(train_eg), max(train_eg)) + train_eg = await client.gather(client.map(len, X_train)) - # Order by which we process training data futures - order = [] + min_samples = min(train_eg) if len(train_eg) else len(X_train) + max_samples = max(train_eg) if len(train_eg) else len(X_train) + + msg = "[CV%s] For training there are between %d and %d examples in each chunk" + logger.info(msg, prefix, min_samples, max_samples) def get_futures(partial_fit_calls): """Policy to get training data futures @@ -218,6 +219,9 @@ def get_futures(partial_fit_calls): This function handles that policy internally, and also controls random access to training data. """ + if dask.is_dask_collection(X_train): + return X_train, y_train + # Shuffle blocks going forward to get uniform-but-random access while partial_fit_calls >= len(order): L = list(range(len(X_train))) @@ -226,6 +230,9 @@ def get_futures(partial_fit_calls): j = order[partial_fit_calls] return X_train[j], y_train[j] + # Order by which we process training data futures + order = [] + # Submit initial partial_fit and score computations on first batch of data X_future, y_future = get_futures(0) X_future_2, y_future_2 = get_futures(1) @@ -566,7 +573,8 @@ def _get_train_test_split(self, X, y, **kwargs): X, y : dask.array.Array """ if self.test_size is None: - test_size = min(0.2, 1 / X.npartitions) + npartitions = getattr(X, 'npartitions', 1) + test_size = min(0.2, 1 / npartitions) else: test_size = self.test_size X_train, X_test, y_train, y_test = train_test_split( diff --git a/tests/model_selection/test_hyperband.py b/tests/model_selection/test_hyperband.py index 9bcf131aa..6e31fddc4 100644 --- a/tests/model_selection/test_hyperband.py +++ b/tests/model_selection/test_hyperband.py @@ -14,6 +14,7 @@ loop, ) from sklearn.linear_model import SGDClassifier +from sklearn.datasets import make_classification as sk_make_classification from dask_ml._compat import DISTRIBUTED_2_5_0 from dask_ml.datasets import make_classification @@ -478,3 +479,19 @@ async def test_dataframe_inputs(c, s, a, b): params = {"value": scipy.stats.uniform(0, 1)} alg = HyperbandSearchCV(model, params, max_iter=9, random_state=42) await alg.fit(X, y) + + +@gen_cluster(client=True) +def test_pandas(c, s, a, b): + + X, y = sk_make_classification() + X, y = pd.DataFrame(X), pd.Series(y) + + est = SGDClassifier(tol=1e-3) + param_dist = {'alpha': np.logspace(-4, 0, num=1000), + 'loss': ['hinge', 'log', 'modified_huber', 'squared_hinge'], + 'average': [True, False]} + + search = HyperbandSearchCV(est, param_dist) + yield search.fit(X, y, classes=y.unique()) + assert search.best_params_