diff options
author | MechCoder <manojkumarsivaraj334@gmail.com> | 2015-06-19 12:23:15 -0700 |
---|---|---|
committer | Xiangrui Meng <meng@databricks.com> | 2015-06-19 12:23:15 -0700 |
commit | 54976e55e36465108b71b40b8a431be9d6d703ce (patch) | |
tree | cacad3ffa4f48e89ce575684272456e7b3931937 /docs/mllib-clustering.md | |
parent | e41e2fd6c61076f870de03b85c5da6c12b8da038 (diff) | |
download | spark-54976e55e36465108b71b40b8a431be9d6d703ce.tar.gz spark-54976e55e36465108b71b40b8a431be9d6d703ce.tar.bz2 spark-54976e55e36465108b71b40b8a431be9d6d703ce.zip |
[SPARK-4118] [MLLIB] [PYSPARK] Python bindings for StreamingKMeans
Python bindings for StreamingKMeans
Will change status to MRG once docs, tests and examples are updated.
Author: MechCoder <manojkumarsivaraj334@gmail.com>
Closes #6499 from MechCoder/spark-4118 and squashes the following commits:
7722d16 [MechCoder] minor style fixes
51052d3 [MechCoder] Doc fixes
2061a76 [MechCoder] Add tests for simultaneous training and prediction Minor style fixes
81482fd [MechCoder] minor
5d9fe61 [MechCoder] predictOn should take into account the latest model
8ab9e89 [MechCoder] Fix Python3 error
a9817df [MechCoder] Better tests and minor fixes
c80e451 [MechCoder] Add ignore_unicode_prefix
ee8ce16 [MechCoder] Update tests, doc and examples
4b1481f [MechCoder] Some changes and tests
d8b066a [MechCoder] [SPARK-4118] [MLlib] [PySpark] Python bindings for StreamingKMeans
Diffstat (limited to 'docs/mllib-clustering.md')
-rw-r--r-- | docs/mllib-clustering.md | 48 |
1 files changed, 44 insertions, 4 deletions
diff --git a/docs/mllib-clustering.md b/docs/mllib-clustering.md index 1b088969dd..dcaa3784be 100644 --- a/docs/mllib-clustering.md +++ b/docs/mllib-clustering.md @@ -593,6 +593,50 @@ ssc.start() ssc.awaitTermination() {% endhighlight %} +</div> + +<div data-lang="python" markdown="1"> +First we import the neccessary classes. + +{% highlight python %} +from pyspark.mllib.linalg import Vectors +from pyspark.mllib.regression import LabeledPoint +from pyspark.mllib.clustering import StreamingKMeans +{% endhighlight %} + +Then we make an input stream of vectors for training, as well as a stream of labeled data +points for testing. We assume a StreamingContext `ssc` has been created, see +[Spark Streaming Programming Guide](streaming-programming-guide.html#initializing) for more info. + +{% highlight python %} +def parse(lp): + label = float(lp[lp.find('(') + 1: lp.find(',')]) + vec = Vectors.dense(lp[lp.find('[') + 1: lp.find(']')].split(',')) + return LabeledPoint(label, vec) + +trainingData = ssc.textFileStream("/training/data/dir").map(Vectors.parse) +testData = ssc.textFileStream("/testing/data/dir").map(parse) +{% endhighlight %} + +We create a model with random clusters and specify the number of clusters to find + +{% highlight python %} +model = StreamingKMeans(k=2, decayFactor=1.0).setRandomCenters(3, 1.0, 0) +{% endhighlight %} + +Now register the streams for training and testing and start the job, printing +the predicted cluster assignments on new data points as they arrive. + +{% highlight python %} +model.trainOn(trainingData) +print(model.predictOnValues(testData.map(lambda lp: (lp.label, lp.features)))) + +ssc.start() +ssc.awaitTermination() +{% endhighlight %} +</div> + +</div> As you add new text files with data the cluster centers will update. Each training point should be formatted as `[x1, x2, x3]`, and each test data point @@ -600,7 +644,3 @@ should be formatted as `(y, [x1, x2, x3])`, where `y` is some useful label or id (e.g. a true category assignment). Anytime a text file is placed in `/training/data/dir` the model will update. Anytime a text file is placed in `/testing/data/dir` you will see predictions. With new data, the cluster centers will change! - -</div> - -</div> |