diff options
author | freeman <the.freeman.lab@gmail.com> | 2014-10-31 22:30:12 -0700 |
---|---|---|
committer | Xiangrui Meng <meng@databricks.com> | 2014-10-31 22:30:12 -0700 |
commit | 98c556ebbca6a815813daaefd292d2e46fb16cc2 (patch) | |
tree | 99433d41db58d784b1a5bba9b76c777a70494fa3 /docs | |
parent | 8602195510f5821b37746bb7fa24902f43a1bd93 (diff) | |
download | spark-98c556ebbca6a815813daaefd292d2e46fb16cc2.tar.gz spark-98c556ebbca6a815813daaefd292d2e46fb16cc2.tar.bz2 spark-98c556ebbca6a815813daaefd292d2e46fb16cc2.zip |
Streaming KMeans [MLLIB][SPARK-3254]
This adds a Streaming KMeans algorithm to MLlib. It uses an update rule that generalizes the mini-batch KMeans update to incorporate a decay factor, which allows past data to be forgotten. The decay factor can be specified explicitly, or via a more intuitive "fractional decay" setting, in units of either data points or batches.
The PR includes:
- StreamingKMeans algorithm with decay factor settings
- Usage example
- Additions to documentation clustering page
- Unit tests of basic behavior and decay behaviors
tdas mengxr rezazadeh
Author: freeman <the.freeman.lab@gmail.com>
Author: Jeremy Freeman <the.freeman.lab@gmail.com>
Author: Xiangrui Meng <meng@databricks.com>
Closes #2942 from freeman-lab/streaming-kmeans and squashes the following commits:
b2e5b4a [freeman] Fixes to docs / examples
078617c [Jeremy Freeman] Merge pull request #1 from mengxr/SPARK-3254
2e682c0 [Xiangrui Meng] take discount on previous weights; use BLAS; detect dying clusters
0411bf5 [freeman] Change decay parameterization
9f7aea9 [freeman] Style fixes
374a706 [freeman] Formatting
ad9bdc2 [freeman] Use labeled points and predictOnValues in examples
77dbd3f [freeman] Make initialization check an assertion
9cfc301 [freeman] Make random seed an argument
44050a9 [freeman] Simpler constructor
c7050d5 [freeman] Fix spacing
2899623 [freeman] Use pattern matching for clarity
a4a316b [freeman] Use collect
1472ec5 [freeman] Doc formatting
ea22ec8 [freeman] Fix imports
2086bdc [freeman] Log cluster center updates
ea9877c [freeman] More documentation
9facbe3 [freeman] Bug fix
5db7074 [freeman] Example usage for StreamingKMeans
f33684b [freeman] Add explanation and example to docs
b5b5f8d [freeman] Add better documentation
a0fd790 [freeman] Merge remote-tracking branch 'upstream/master' into streaming-kmeans
9fd9c15 [freeman] Merge remote-tracking branch 'upstream/master' into streaming-kmeans
b93350f [freeman] Streaming KMeans with decay
Diffstat (limited to 'docs')
-rw-r--r-- | docs/mllib-clustering.md | 96 |
1 files changed, 95 insertions, 1 deletions
diff --git a/docs/mllib-clustering.md b/docs/mllib-clustering.md index 7978e934fb..c696ae9c8e 100644 --- a/docs/mllib-clustering.md +++ b/docs/mllib-clustering.md @@ -34,7 +34,7 @@ a given dataset, the algorithm returns the best clustering result). * *initializationSteps* determines the number of steps in the k-means\|\| algorithm. * *epsilon* determines the distance threshold within which we consider k-means to have converged. -## Examples +### Examples <div class="codetabs"> <div data-lang="scala" markdown="1"> @@ -153,3 +153,97 @@ provided in the [Self-Contained Applications](quick-start.html#self-contained-ap section of the Spark Quick Start guide. Be sure to also include *spark-mllib* to your build file as a dependency. + +## Streaming clustering + +When data arrive in a stream, we may want to estimate clusters dynamically, +updating them as new data arrive. MLlib provides support for streaming k-means clustering, +with parameters to control the decay (or "forgetfulness") of the estimates. The algorithm +uses a generalization of the mini-batch k-means update rule. For each batch of data, we assign +all points to their nearest cluster, compute new cluster centers, then update each cluster using: + +`\begin{equation} + c_{t+1} = \frac{c_tn_t\alpha + x_tm_t}{n_t\alpha+m_t} +\end{equation}` +`\begin{equation} + n_{t+1} = n_t + m_t +\end{equation}` + +Where `$c_t$` is the previous center for the cluster, `$n_t$` is the number of points assigned +to the cluster thus far, `$x_t$` is the new cluster center from the current batch, and `$m_t$` +is the number of points added to the cluster in the current batch. The decay factor `$\alpha$` +can be used to ignore the past: with `$\alpha$=1` all data will be used from the beginning; +with `$\alpha$=0` only the most recent data will be used. This is analogous to an +exponentially-weighted moving average. + +The decay can be specified using a `halfLife` parameter, which determines the +correct decay factor `a` such that, for data acquired +at time `t`, its contribution by time `t + halfLife` will have dropped to 0.5. +The unit of time can be specified either as `batches` or `points` and the update rule +will be adjusted accordingly. + +### Examples + +This example shows how to estimate clusters on streaming data. + +<div class="codetabs"> + +<div data-lang="scala" markdown="1"> + +First we import the neccessary classes. + +{% highlight scala %} + +import org.apache.spark.mllib.linalg.Vectors +import org.apache.spark.mllib.regression.LabeledPoint +import org.apache.spark.mllib.clustering.StreamingKMeans + +{% endhighlight %} + +Then we make an input stream of vectors for training, as well as a stream of labeled data +points for testing. We assume a StreamingContext `ssc` has been created, see +[Spark Streaming Programming Guide](streaming-programming-guide.html#initializing) for more info. + +{% highlight scala %} + +val trainingData = ssc.textFileStream("/training/data/dir").map(Vectors.parse) +val testData = ssc.textFileStream("/testing/data/dir").map(LabeledPoint.parse) + +{% endhighlight %} + +We create a model with random clusters and specify the number of clusters to find + +{% highlight scala %} + +val numDimensions = 3 +val numClusters = 2 +val model = new StreamingKMeans() + .setK(numClusters) + .setDecayFactor(1.0) + .setRandomCenters(numDimensions, 0.0) + +{% endhighlight %} + +Now register the streams for training and testing and start the job, printing +the predicted cluster assignments on new data points as they arrive. + +{% highlight scala %} + +model.trainOn(trainingData) +model.predictOnValues(testData).print() + +ssc.start() +ssc.awaitTermination() + +{% endhighlight %} + +As you add new text files with data the cluster centers will update. Each training +point should be formatted as `[x1, x2, x3]`, and each test data point +should be formatted as `(y, [x1, x2, x3])`, where `y` is some useful label or identifier +(e.g. a true category assignment). Anytime a text file is placed in `/training/data/dir` +the model will update. Anytime a text file is placed in `/testing/data/dir` +you will see predictions. With new data, the cluster centers will change! + +</div> + +</div> |