aboutsummaryrefslogtreecommitdiff
path: root/docs/mllib-clustering.md
diff options
context:
space:
mode:
Diffstat (limited to 'docs/mllib-clustering.md')
-rw-r--r--docs/mllib-clustering.md48
1 files changed, 44 insertions, 4 deletions
diff --git a/docs/mllib-clustering.md b/docs/mllib-clustering.md
index 1b088969dd..dcaa3784be 100644
--- a/docs/mllib-clustering.md
+++ b/docs/mllib-clustering.md
@@ -593,6 +593,50 @@ ssc.start()
ssc.awaitTermination()
{% endhighlight %}
+</div>
+
+<div data-lang="python" markdown="1">
+First we import the neccessary classes.
+
+{% highlight python %}
+from pyspark.mllib.linalg import Vectors
+from pyspark.mllib.regression import LabeledPoint
+from pyspark.mllib.clustering import StreamingKMeans
+{% endhighlight %}
+
+Then we make an input stream of vectors for training, as well as a stream of labeled data
+points for testing. We assume a StreamingContext `ssc` has been created, see
+[Spark Streaming Programming Guide](streaming-programming-guide.html#initializing) for more info.
+
+{% highlight python %}
+def parse(lp):
+ label = float(lp[lp.find('(') + 1: lp.find(',')])
+ vec = Vectors.dense(lp[lp.find('[') + 1: lp.find(']')].split(','))
+ return LabeledPoint(label, vec)
+
+trainingData = ssc.textFileStream("/training/data/dir").map(Vectors.parse)
+testData = ssc.textFileStream("/testing/data/dir").map(parse)
+{% endhighlight %}
+
+We create a model with random clusters and specify the number of clusters to find
+
+{% highlight python %}
+model = StreamingKMeans(k=2, decayFactor=1.0).setRandomCenters(3, 1.0, 0)
+{% endhighlight %}
+
+Now register the streams for training and testing and start the job, printing
+the predicted cluster assignments on new data points as they arrive.
+
+{% highlight python %}
+model.trainOn(trainingData)
+print(model.predictOnValues(testData.map(lambda lp: (lp.label, lp.features))))
+
+ssc.start()
+ssc.awaitTermination()
+{% endhighlight %}
+</div>
+
+</div>
As you add new text files with data the cluster centers will update. Each training
point should be formatted as `[x1, x2, x3]`, and each test data point
@@ -600,7 +644,3 @@ should be formatted as `(y, [x1, x2, x3])`, where `y` is some useful label or id
(e.g. a true category assignment). Anytime a text file is placed in `/training/data/dir`
the model will update. Anytime a text file is placed in `/testing/data/dir`
you will see predictions. With new data, the cluster centers will change!
-
-</div>
-
-</div>