aboutsummaryrefslogtreecommitdiff
path: root/docs/mllib-clustering.md
diff options
context:
space:
mode:
authorfreeman <the.freeman.lab@gmail.com>2014-10-31 22:30:12 -0700
committerXiangrui Meng <meng@databricks.com>2014-10-31 22:30:12 -0700
commit98c556ebbca6a815813daaefd292d2e46fb16cc2 (patch)
tree99433d41db58d784b1a5bba9b76c777a70494fa3 /docs/mllib-clustering.md
parent8602195510f5821b37746bb7fa24902f43a1bd93 (diff)
downloadspark-98c556ebbca6a815813daaefd292d2e46fb16cc2.tar.gz
spark-98c556ebbca6a815813daaefd292d2e46fb16cc2.tar.bz2
spark-98c556ebbca6a815813daaefd292d2e46fb16cc2.zip
Streaming KMeans [MLLIB][SPARK-3254]
This adds a Streaming KMeans algorithm to MLlib. It uses an update rule that generalizes the mini-batch KMeans update to incorporate a decay factor, which allows past data to be forgotten. The decay factor can be specified explicitly, or via a more intuitive "fractional decay" setting, in units of either data points or batches. The PR includes: - StreamingKMeans algorithm with decay factor settings - Usage example - Additions to documentation clustering page - Unit tests of basic behavior and decay behaviors tdas mengxr rezazadeh Author: freeman <the.freeman.lab@gmail.com> Author: Jeremy Freeman <the.freeman.lab@gmail.com> Author: Xiangrui Meng <meng@databricks.com> Closes #2942 from freeman-lab/streaming-kmeans and squashes the following commits: b2e5b4a [freeman] Fixes to docs / examples 078617c [Jeremy Freeman] Merge pull request #1 from mengxr/SPARK-3254 2e682c0 [Xiangrui Meng] take discount on previous weights; use BLAS; detect dying clusters 0411bf5 [freeman] Change decay parameterization 9f7aea9 [freeman] Style fixes 374a706 [freeman] Formatting ad9bdc2 [freeman] Use labeled points and predictOnValues in examples 77dbd3f [freeman] Make initialization check an assertion 9cfc301 [freeman] Make random seed an argument 44050a9 [freeman] Simpler constructor c7050d5 [freeman] Fix spacing 2899623 [freeman] Use pattern matching for clarity a4a316b [freeman] Use collect 1472ec5 [freeman] Doc formatting ea22ec8 [freeman] Fix imports 2086bdc [freeman] Log cluster center updates ea9877c [freeman] More documentation 9facbe3 [freeman] Bug fix 5db7074 [freeman] Example usage for StreamingKMeans f33684b [freeman] Add explanation and example to docs b5b5f8d [freeman] Add better documentation a0fd790 [freeman] Merge remote-tracking branch 'upstream/master' into streaming-kmeans 9fd9c15 [freeman] Merge remote-tracking branch 'upstream/master' into streaming-kmeans b93350f [freeman] Streaming KMeans with decay
Diffstat (limited to 'docs/mllib-clustering.md')
-rw-r--r--docs/mllib-clustering.md96
1 files changed, 95 insertions, 1 deletions
diff --git a/docs/mllib-clustering.md b/docs/mllib-clustering.md
index 7978e934fb..c696ae9c8e 100644
--- a/docs/mllib-clustering.md
+++ b/docs/mllib-clustering.md
@@ -34,7 +34,7 @@ a given dataset, the algorithm returns the best clustering result).
* *initializationSteps* determines the number of steps in the k-means\|\| algorithm.
* *epsilon* determines the distance threshold within which we consider k-means to have converged.
-## Examples
+### Examples
<div class="codetabs">
<div data-lang="scala" markdown="1">
@@ -153,3 +153,97 @@ provided in the [Self-Contained Applications](quick-start.html#self-contained-ap
section of the Spark
Quick Start guide. Be sure to also include *spark-mllib* to your build file as
a dependency.
+
+## Streaming clustering
+
+When data arrive in a stream, we may want to estimate clusters dynamically,
+updating them as new data arrive. MLlib provides support for streaming k-means clustering,
+with parameters to control the decay (or "forgetfulness") of the estimates. The algorithm
+uses a generalization of the mini-batch k-means update rule. For each batch of data, we assign
+all points to their nearest cluster, compute new cluster centers, then update each cluster using:
+
+`\begin{equation}
+ c_{t+1} = \frac{c_tn_t\alpha + x_tm_t}{n_t\alpha+m_t}
+\end{equation}`
+`\begin{equation}
+ n_{t+1} = n_t + m_t
+\end{equation}`
+
+Where `$c_t$` is the previous center for the cluster, `$n_t$` is the number of points assigned
+to the cluster thus far, `$x_t$` is the new cluster center from the current batch, and `$m_t$`
+is the number of points added to the cluster in the current batch. The decay factor `$\alpha$`
+can be used to ignore the past: with `$\alpha$=1` all data will be used from the beginning;
+with `$\alpha$=0` only the most recent data will be used. This is analogous to an
+exponentially-weighted moving average.
+
+The decay can be specified using a `halfLife` parameter, which determines the
+correct decay factor `a` such that, for data acquired
+at time `t`, its contribution by time `t + halfLife` will have dropped to 0.5.
+The unit of time can be specified either as `batches` or `points` and the update rule
+will be adjusted accordingly.
+
+### Examples
+
+This example shows how to estimate clusters on streaming data.
+
+<div class="codetabs">
+
+<div data-lang="scala" markdown="1">
+
+First we import the neccessary classes.
+
+{% highlight scala %}
+
+import org.apache.spark.mllib.linalg.Vectors
+import org.apache.spark.mllib.regression.LabeledPoint
+import org.apache.spark.mllib.clustering.StreamingKMeans
+
+{% endhighlight %}
+
+Then we make an input stream of vectors for training, as well as a stream of labeled data
+points for testing. We assume a StreamingContext `ssc` has been created, see
+[Spark Streaming Programming Guide](streaming-programming-guide.html#initializing) for more info.
+
+{% highlight scala %}
+
+val trainingData = ssc.textFileStream("/training/data/dir").map(Vectors.parse)
+val testData = ssc.textFileStream("/testing/data/dir").map(LabeledPoint.parse)
+
+{% endhighlight %}
+
+We create a model with random clusters and specify the number of clusters to find
+
+{% highlight scala %}
+
+val numDimensions = 3
+val numClusters = 2
+val model = new StreamingKMeans()
+ .setK(numClusters)
+ .setDecayFactor(1.0)
+ .setRandomCenters(numDimensions, 0.0)
+
+{% endhighlight %}
+
+Now register the streams for training and testing and start the job, printing
+the predicted cluster assignments on new data points as they arrive.
+
+{% highlight scala %}
+
+model.trainOn(trainingData)
+model.predictOnValues(testData).print()
+
+ssc.start()
+ssc.awaitTermination()
+
+{% endhighlight %}
+
+As you add new text files with data the cluster centers will update. Each training
+point should be formatted as `[x1, x2, x3]`, and each test data point
+should be formatted as `(y, [x1, x2, x3])`, where `y` is some useful label or identifier
+(e.g. a true category assignment). Anytime a text file is placed in `/training/data/dir`
+the model will update. Anytime a text file is placed in `/testing/data/dir`
+you will see predictions. With new data, the cluster centers will change!
+
+</div>
+
+</div>