Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

CountVectorizer example #160

Open
wants to merge 7 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion binder/environment.yml
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ dependencies:
- bokeh=2.1.1
- dask=2.20.0
- dask-image=0.2.0
- dask-ml=1.5.0
- dask-ml=1.6.0
- dask-labextension=2.0.2
- jupyterlab=2.1
- nodejs=14
Expand Down
211 changes: 168 additions & 43 deletions machine-learning/text-vectorization.ipynb
Original file line number Diff line number Diff line change
Expand Up @@ -4,16 +4,9 @@
"cell_type": "markdown",
"metadata": {},
"source": [
"# Text Vectorization Pipeline\n",
"# Working with Text Data\n",
"\n",
"This example illustrates how Dask-ML can be used to classify large textual datasets in parallel.\n",
"It is adapted from [this scikit-learn example](https://scikit-learn.org/stable/auto_examples/applications/plot_out_of_core_classification.html#sphx-glr-auto-examples-applications-plot-out-of-core-classification-py).\n",
"\n",
"The primary differences are that\n",
"\n",
"* We fit the entire model, including text vectorization, as a pipeline.\n",
"* We use dask collections like [Dask Bag](https://docs.dask.org/en/latest/bag.html), [Dask Dataframe](https://docs.dask.org/en/latest/dataframe.html), and [Dask Array](https://docs.dask.org/en/latest/array.html)\n",
" rather than generators to work with larger than memory datasets."
"Dask-ML includes several ways to [process text data](https://ml.dask.org/modules/api.html#dask-ml-feature-extraction-text-feature-extraction). Typically these work with the [`Dask DataFrame`](https://docs.dask.org/en/latest/dataframe.html) or [`Bag`](https://docs.dask.org/en/latest/bag.html) collections, which can reference larger-than-memory datasets stored on disk or in distributed memory on a Dask Cluster."
]
},
{
Expand All @@ -22,19 +15,17 @@
"metadata": {},
"outputs": [],
"source": [
"from dask.distributed import Client, progress\n",
"from dask.distributed import Client\n",
"\n",
"client = Client(n_workers=2, threads_per_worker=2, memory_limit='2GB')\n",
"client = Client(n_workers=4, threads_per_worker=1, memory_limit='2GB')\n",
"client"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## Fetch the data\n",
"\n",
"Scikit-Learn provides a utility to fetch the newsgroups dataset."
"In this example, we'll work with the 20 newsgroups dataset from scikit-learn. Each element in the dataset has a bit of metadata and the full text of a post."
]
},
{
Expand All @@ -45,20 +36,15 @@
"source": [
"import sklearn.datasets\n",
"\n",
"bunch = sklearn.datasets.fetch_20newsgroups()"
"news = sklearn.datasets.fetch_20newsgroups()\n",
"print(news.data[0][:500])"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"The data from scikit-learn isn't *too* large, so the data is just\n",
"returned in memory. Each document is a string. The target we're predicting\n",
"is an integer, which codes the topic of the post.\n",
"\n",
"We'll load the documents and targets directly into a dask DataFrame.\n",
"In practice, on a larger than memory dataset, you would likely load the\n",
"documents from disk or cloud storage using `dask.bag` or `dask.delayed`."
"This returns a list of documents (strings). We'll load the datset using `dask.bag.from_sequence`, but in practice you would want to load the data on the workers. See https://docs.dask.org/en/latest/best-practices.html#load-data-with-dask and https://docs.dask.org/en/latest/delayed-best-practices.html#don-t-call-dask-delayed-on-other-dask-collections."
]
},
{
Expand All @@ -67,20 +53,45 @@
"metadata": {},
"outputs": [],
"source": [
"import dask.dataframe as dd\n",
"import pandas as pd\n",
"import dask\n",
"import numpy as np\n",
"import dask.bag as db\n",
"\n",
"df = dd.from_pandas(pd.DataFrame({\"text\": bunch.data, \"target\": bunch.target}),\n",
" npartitions=25)\n",
"documents = db.from_sequence(news['data'], npartitions=10).persist()\n",
"documents"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## Feature Extraction\n",
"\n",
"df"
"Dask-ML's feature extractors turn Bags or DataFrames of raw documents (strings) into Dask Arrays backed by scipy.sparse matrices.\n",
"\n",
"If the limitations of `HashingVectorizer` (no inverse transform, no IDF weighting) are acceptable, then we strongly recommend using it over something like `CountVectorizer`. `HashingVectorizer` is completely stateless and so is much easier (and faster) to use in a distributed setting.\n",
"\n",
"Note that becuase `HashingVectorizer` is stateless, the calls to `fit` and `transform` are nearly instant."
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"import dask_ml.feature_extraction\n",
"\n",
"hashing_vectorizer = dask_ml.feature_extraction.text.HashingVectorizer()\n",
"%time hashing_vectorizer.fit(documents)\n",
"%time transformed = hashing_vectorizer.transform(documents)"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"Each row in the `text` column has a bit of metadata and the full text of a post."
"It's only when you `.compute()` the result that we load data and do the transformation."
]
},
{
Expand All @@ -89,23 +100,58 @@
"metadata": {},
"outputs": [],
"source": [
"print(df.head().loc[0, 'text'][:500])"
"%time transformed.compute()"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## Feature Hashing"
"`CountVectorizer` is not stateless unless you provide a `vocabulary` ahead of time. When no vocabulary is provided, `CountVectorizer.fit` or `CountVectorizer.fit_transform` will need to load data to discover the unique set of terms in the documents."
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"vectorizer = dask_ml.feature_extraction.text.CountVectorizer()\n",
"%time result = vectorizer.fit_transform(documents)"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"Now `.fit_transform` (and `fit` and `transform`) is much more expensive since all the documents must be loaded to determine the `vocabulary`.\n",
"\n",
"Thee result is again a Dask `Array` backed by `scipy.sparse.csr_matrix` objects. We can bring it back to the client with `.compute()`"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"%time result.compute()"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"Notice that we persisted `documents` earlier. If possible, persisting the input documents is preferable to avoid making two passes over the data. One to discover the vocabulary and a second to transform. If the dataset is larger than (distributed) memory, then two passes will be necessary."
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"Dask's [HashingVectorizer](https://ml.dask.org/modules/generated/dask_ml.feature_extraction.text.HashingVectorizer.html#dask_ml.feature_extraction.text.HashingVectorizer) provides a similar API to [scikit-learn's implementation](https://scikit-learn.org/stable/modules/generated/sklearn.feature_extraction.text.HashingVectorizer.html). In fact, Dask-ML's implementation uses scikit-learn's, applying it to each partition of the input `dask.dataframe.Series` or `dask.bag.Bag`.\n",
"## A note on vocabularies\n",
"\n",
"Transformation, once we actually compute the result, happens in parallel and returns a dask Array."
"You can also provide a vocabulary ahead of time, which avoids the need for making two passes over the data. This makes operations like `vectorizer.transform` instantaneous, since no vocabulary needs to be discovered. However, vocabularies can become quite large. Consider persisting your data ahead of time to avoid bloating the size of the `CountVectorizer` object. Dask-ML's `CountVectorizer` works just fine when the `vocabulary` is a pointer to a piece of data on the cluster."
]
},
{
Expand All @@ -114,20 +160,61 @@
"metadata": {},
"outputs": [],
"source": [
"import dask_ml.feature_extraction.text\n",
"# reuse the vocabulary from the previously fitted estimator.\n",
"# In practice this would come from an external source.\n",
"vocabulary = vectorizer.vocabulary_\n",
"remote_vocabulary, = client.scatter([vocabulary], broadcast=True)\n",
"\n",
"vect = dask_ml.feature_extraction.text.HashingVectorizer()\n",
"X = vect.fit_transform(df['text'])\n",
"X"
"vectorizer2 = dask_ml.feature_extraction.text.CountVectorizer(\n",
" vocabulary=remote_vocabulary\n",
")"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"`CountVectorizer.transform` doesn't need to do any real work now, so it's fast."
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"%time result = vectorizer2.transform(documents)"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"%time result.compute()"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"The output array `X` has unknown chunk sizes becase the input dask Series or Bags don't know their own length.\n",
"See https://scikit-learn.org/stable/modules/feature_extraction.html#vectorizing-a-large-text-corpus-with-the-hashing-trick for more on problems with large vocabularies, which recommends \"feature hashing\" as a possible solution.\n",
"\n",
"## Feature Hashing\n",
"\n",
"Each block in `X` is a `scipy.sparse` matrix."
"Feature hashing transforms a DataFrame or Bag of inputs (mappings or strings) to a sparse array. It is completely stateless, and so doesn't suffer from the same issues as `CountVectorizer`. See https://scikit-learn.org/stable/modules/feature_extraction.html#feature-hashing for more."
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"hasher = dask_ml.feature_extraction.text.FeatureHasher(input_type=\"string\")\n",
"result = hasher.transform(documents)\n",
"result"
]
},
{
Expand All @@ -136,14 +223,51 @@
"metadata": {},
"outputs": [],
"source": [
"X.blocks[0].compute()"
"%time result.compute()"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"This is a document-term matrix. Each row is the hashed representation of the original post."
"## Text Vectorization Pipeline"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"The rest of this example is adapted from [this scikit-learn example](https://scikit-learn.org/stable/auto_examples/applications/plot_out_of_core_classification.html#sphx-glr-auto-examples-applications-plot-out-of-core-classification-py).\n",
"\n",
"The primary differences are that\n",
"\n",
"* We fit the entire model, including text vectorization, as a pipeline.\n",
"* We use dask collections like [Dask Bag](https://docs.dask.org/en/latest/bag.html), [Dask Dataframe](https://docs.dask.org/en/latest/dataframe.html), and [Dask Array](https://docs.dask.org/en/latest/array.html)\n",
" rather than generators to work with larger than memory datasets."
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"We'll load the documents and targets directly into a dask DataFrame.\n",
"In practice, on a larger than memory dataset, you would likely load the\n",
"documents from disk or cloud storage using `dask.bag` or `dask.delayed`."
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"import dask.dataframe as dd\n",
"import pandas as pd\n",
"\n",
"df = dd.from_pandas(pd.DataFrame({\"text\": news.data, \"target\": news.target}),\n",
" npartitions=25)\n",
"\n",
"df"
]
},
{
Expand All @@ -164,7 +288,7 @@
"metadata": {},
"outputs": [],
"source": [
"bunch.target_names"
"news.target_names"
]
},
{
Expand All @@ -175,7 +299,7 @@
"source": [
"import numpy as np\n",
"\n",
"positive = np.arange(len(bunch.target_names))[['comp' in x for x in bunch.target_names]]\n",
"positive = np.arange(len(news.target_names))[['comp' in x for x in news.target_names]]\n",
"y = df['target'].isin(positive).astype(int)\n",
"y"
]
Expand Down Expand Up @@ -210,6 +334,7 @@
"sgd = sklearn.linear_model.SGDClassifier(\n",
" tol=1e-3\n",
")\n",
"vect = dask_ml.feature_extraction.text.HashingVectorizer()\n",
"clf = dask_ml.wrappers.Incremental(\n",
" sgd, scoring='accuracy', assume_equal_chunks=True\n",
")\n",
Expand Down Expand Up @@ -293,7 +418,7 @@
"name": "python",
"nbconvert_exporter": "python",
"pygments_lexer": "ipython3",
"version": "3.7.4"
"version": "3.8.5"
}
},
"nbformat": 4,
Expand Down