aboutsummaryrefslogtreecommitdiff
path: root/docs/mllib-clustering.md
diff options
context:
space:
mode:
authorMechCoder <manojkumarsivaraj334@gmail.com>2015-06-19 12:23:15 -0700
committerXiangrui Meng <meng@databricks.com>2015-06-19 12:23:15 -0700
commit54976e55e36465108b71b40b8a431be9d6d703ce (patch)
treecacad3ffa4f48e89ce575684272456e7b3931937 /docs/mllib-clustering.md
parente41e2fd6c61076f870de03b85c5da6c12b8da038 (diff)
downloadspark-54976e55e36465108b71b40b8a431be9d6d703ce.tar.gz
spark-54976e55e36465108b71b40b8a431be9d6d703ce.tar.bz2
spark-54976e55e36465108b71b40b8a431be9d6d703ce.zip
[SPARK-4118] [MLLIB] [PYSPARK] Python bindings for StreamingKMeans
Python bindings for StreamingKMeans Will change status to MRG once docs, tests and examples are updated. Author: MechCoder <manojkumarsivaraj334@gmail.com> Closes #6499 from MechCoder/spark-4118 and squashes the following commits: 7722d16 [MechCoder] minor style fixes 51052d3 [MechCoder] Doc fixes 2061a76 [MechCoder] Add tests for simultaneous training and prediction Minor style fixes 81482fd [MechCoder] minor 5d9fe61 [MechCoder] predictOn should take into account the latest model 8ab9e89 [MechCoder] Fix Python3 error a9817df [MechCoder] Better tests and minor fixes c80e451 [MechCoder] Add ignore_unicode_prefix ee8ce16 [MechCoder] Update tests, doc and examples 4b1481f [MechCoder] Some changes and tests d8b066a [MechCoder] [SPARK-4118] [MLlib] [PySpark] Python bindings for StreamingKMeans
Diffstat (limited to 'docs/mllib-clustering.md')
-rw-r--r--docs/mllib-clustering.md48
1 files changed, 44 insertions, 4 deletions
diff --git a/docs/mllib-clustering.md b/docs/mllib-clustering.md
index 1b088969dd..dcaa3784be 100644
--- a/docs/mllib-clustering.md
+++ b/docs/mllib-clustering.md
@@ -593,6 +593,50 @@ ssc.start()
ssc.awaitTermination()
{% endhighlight %}
+</div>
+
+<div data-lang="python" markdown="1">
+First we import the neccessary classes.
+
+{% highlight python %}
+from pyspark.mllib.linalg import Vectors
+from pyspark.mllib.regression import LabeledPoint
+from pyspark.mllib.clustering import StreamingKMeans
+{% endhighlight %}
+
+Then we make an input stream of vectors for training, as well as a stream of labeled data
+points for testing. We assume a StreamingContext `ssc` has been created, see
+[Spark Streaming Programming Guide](streaming-programming-guide.html#initializing) for more info.
+
+{% highlight python %}
+def parse(lp):
+ label = float(lp[lp.find('(') + 1: lp.find(',')])
+ vec = Vectors.dense(lp[lp.find('[') + 1: lp.find(']')].split(','))
+ return LabeledPoint(label, vec)
+
+trainingData = ssc.textFileStream("/training/data/dir").map(Vectors.parse)
+testData = ssc.textFileStream("/testing/data/dir").map(parse)
+{% endhighlight %}
+
+We create a model with random clusters and specify the number of clusters to find
+
+{% highlight python %}
+model = StreamingKMeans(k=2, decayFactor=1.0).setRandomCenters(3, 1.0, 0)
+{% endhighlight %}
+
+Now register the streams for training and testing and start the job, printing
+the predicted cluster assignments on new data points as they arrive.
+
+{% highlight python %}
+model.trainOn(trainingData)
+print(model.predictOnValues(testData.map(lambda lp: (lp.label, lp.features))))
+
+ssc.start()
+ssc.awaitTermination()
+{% endhighlight %}
+</div>
+
+</div>
As you add new text files with data the cluster centers will update. Each training
point should be formatted as `[x1, x2, x3]`, and each test data point
@@ -600,7 +644,3 @@ should be formatted as `(y, [x1, x2, x3])`, where `y` is some useful label or id
(e.g. a true category assignment). Anytime a text file is placed in `/training/data/dir`
the model will update. Anytime a text file is placed in `/testing/data/dir`
you will see predictions. With new data, the cluster centers will change!
-
-</div>
-
-</div>