Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 14 additions & 6 deletions dask_ml/model_selection/_incremental.py
Original file line number Diff line number Diff line change
Expand Up @@ -203,12 +203,13 @@ async def _fit(
y_train = sorted(futures_of(y_train), key=lambda f: f.key)
assert len(X_train) == len(y_train)

train_eg = await client.gather(client.map(len, y_train))
msg = "[CV%s] For training there are between %d and %d examples in each chunk"
logger.info(msg, prefix, min(train_eg), max(train_eg))
train_eg = await client.gather(client.map(len, X_train))

# Order by which we process training data futures
order = []
min_samples = min(train_eg) if len(train_eg) else len(X_train)
max_samples = max(train_eg) if len(train_eg) else len(X_train)

msg = "[CV%s] For training there are between %d and %d examples in each chunk"
logger.info(msg, prefix, min_samples, max_samples)

def get_futures(partial_fit_calls):
"""Policy to get training data futures
Expand All @@ -218,6 +219,9 @@ def get_futures(partial_fit_calls):
This function handles that policy internally, and also controls random
access to training data.
"""
if dask.is_dask_collection(X_train):
return X_train, y_train

# Shuffle blocks going forward to get uniform-but-random access
while partial_fit_calls >= len(order):
L = list(range(len(X_train)))
Expand All @@ -226,6 +230,9 @@ def get_futures(partial_fit_calls):
j = order[partial_fit_calls]
return X_train[j], y_train[j]

# Order by which we process training data futures
order = []

# Submit initial partial_fit and score computations on first batch of data
X_future, y_future = get_futures(0)
X_future_2, y_future_2 = get_futures(1)
Expand Down Expand Up @@ -566,7 +573,8 @@ def _get_train_test_split(self, X, y, **kwargs):
X, y : dask.array.Array
"""
if self.test_size is None:
test_size = min(0.2, 1 / X.npartitions)
npartitions = getattr(X, 'npartitions', 1)
test_size = min(0.2, 1 / npartitions)
else:
test_size = self.test_size
X_train, X_test, y_train, y_test = train_test_split(
Expand Down
17 changes: 17 additions & 0 deletions tests/model_selection/test_hyperband.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
loop,
)
from sklearn.linear_model import SGDClassifier
from sklearn.datasets import make_classification as sk_make_classification

from dask_ml._compat import DISTRIBUTED_2_5_0
from dask_ml.datasets import make_classification
Expand Down Expand Up @@ -478,3 +479,19 @@ async def test_dataframe_inputs(c, s, a, b):
params = {"value": scipy.stats.uniform(0, 1)}
alg = HyperbandSearchCV(model, params, max_iter=9, random_state=42)
await alg.fit(X, y)


@gen_cluster(client=True)
def test_pandas(c, s, a, b):

X, y = sk_make_classification()
X, y = pd.DataFrame(X), pd.Series(y)

est = SGDClassifier(tol=1e-3)
param_dist = {'alpha': np.logspace(-4, 0, num=1000),
'loss': ['hinge', 'log', 'modified_huber', 'squared_hinge'],
'average': [True, False]}

search = HyperbandSearchCV(est, param_dist)
yield search.fit(X, y, classes=y.unique())
assert search.best_params_