aboutsummaryrefslogtreecommitdiff
path: root/docs/mllib-clustering.md
diff options
context:
space:
mode:
authorXin Ren <iamshrek@126.com>2016-03-03 09:32:47 -0800
committerXiangrui Meng <meng@databricks.com>2016-03-03 09:32:47 -0800
commit70f6f9649bdb13b6745473b7edc4cd06b10f99d2 (patch)
tree72b9f2de8f67f5917a37f65b2cc805243b413fe2 /docs/mllib-clustering.md
parent645c3a85e2029928d37ec2de9ef5a2d884620b9b (diff)
downloadspark-70f6f9649bdb13b6745473b7edc4cd06b10f99d2.tar.gz
spark-70f6f9649bdb13b6745473b7edc4cd06b10f99d2.tar.bz2
spark-70f6f9649bdb13b6745473b7edc4cd06b10f99d2.zip
[SPARK-13013][DOCS] Replace example code in mllib-clustering.md using include_example
Replace example code in mllib-clustering.md using include_example https://issues.apache.org/jira/browse/SPARK-13013 The example code in the user guide is embedded in the markdown and hence it is not easy to test. It would be nice to automatically test them. This JIRA is to discuss options to automate example code testing and see what we can do in Spark 1.6. Goal is to move actual example code to spark/examples and test compilation in Jenkins builds. Then in the markdown, we can reference part of the code to show in the user guide. This requires adding a Jekyll tag that is similar to https://github.com/jekyll/jekyll/blob/master/lib/jekyll/tags/include.rb, e.g., called include_example. `{% include_example scala/org/apache/spark/examples/mllib/KMeansExample.scala %}` Jekyll will find `examples/src/main/scala/org/apache/spark/examples/mllib/KMeansExample.scala` and pick code blocks marked "example" and replace code block in `{% highlight %}` in the markdown. See more sub-tasks in parent ticket: https://issues.apache.org/jira/browse/SPARK-11337 Author: Xin Ren <iamshrek@126.com> Closes #11116 from keypointt/SPARK-13013.
Diffstat (limited to 'docs/mllib-clustering.md')
-rw-r--r--docs/mllib-clustering.md460
1 files changed, 16 insertions, 444 deletions
diff --git a/docs/mllib-clustering.md b/docs/mllib-clustering.md
index 8e724fbf06..44720147be 100644
--- a/docs/mllib-clustering.md
+++ b/docs/mllib-clustering.md
@@ -49,27 +49,7 @@ optimal *k* is usually one where there is an "elbow" in the WSSSE graph.
Refer to the [`KMeans` Scala docs](api/scala/index.html#org.apache.spark.mllib.clustering.KMeans) and [`KMeansModel` Scala docs](api/scala/index.html#org.apache.spark.mllib.clustering.KMeansModel) for details on the API.
-{% highlight scala %}
-import org.apache.spark.mllib.clustering.{KMeans, KMeansModel}
-import org.apache.spark.mllib.linalg.Vectors
-
-// Load and parse the data
-val data = sc.textFile("data/mllib/kmeans_data.txt")
-val parsedData = data.map(s => Vectors.dense(s.split(' ').map(_.toDouble))).cache()
-
-// Cluster the data into two classes using KMeans
-val numClusters = 2
-val numIterations = 20
-val clusters = KMeans.train(parsedData, numClusters, numIterations)
-
-// Evaluate clustering by computing Within Set Sum of Squared Errors
-val WSSSE = clusters.computeCost(parsedData)
-println("Within Set Sum of Squared Errors = " + WSSSE)
-
-// Save and load model
-clusters.save(sc, "myModelPath")
-val sameModel = KMeansModel.load(sc, "myModelPath")
-{% endhighlight %}
+{% include_example scala/org/apache/spark/examples/mllib/KMeansExample.scala %}
</div>
<div data-lang="java" markdown="1">
@@ -81,51 +61,7 @@ that is equivalent to the provided example in Scala is given below:
Refer to the [`KMeans` Java docs](api/java/org/apache/spark/mllib/clustering/KMeans.html) and [`KMeansModel` Java docs](api/java/org/apache/spark/mllib/clustering/KMeansModel.html) for details on the API.
-{% highlight java %}
-import org.apache.spark.api.java.*;
-import org.apache.spark.api.java.function.Function;
-import org.apache.spark.mllib.clustering.KMeans;
-import org.apache.spark.mllib.clustering.KMeansModel;
-import org.apache.spark.mllib.linalg.Vector;
-import org.apache.spark.mllib.linalg.Vectors;
-import org.apache.spark.SparkConf;
-
-public class KMeansExample {
- public static void main(String[] args) {
- SparkConf conf = new SparkConf().setAppName("K-means Example");
- JavaSparkContext sc = new JavaSparkContext(conf);
-
- // Load and parse data
- String path = "data/mllib/kmeans_data.txt";
- JavaRDD<String> data = sc.textFile(path);
- JavaRDD<Vector> parsedData = data.map(
- new Function<String, Vector>() {
- public Vector call(String s) {
- String[] sarray = s.split(" ");
- double[] values = new double[sarray.length];
- for (int i = 0; i < sarray.length; i++)
- values[i] = Double.parseDouble(sarray[i]);
- return Vectors.dense(values);
- }
- }
- );
- parsedData.cache();
-
- // Cluster the data into two classes using KMeans
- int numClusters = 2;
- int numIterations = 20;
- KMeansModel clusters = KMeans.train(parsedData.rdd(), numClusters, numIterations);
-
- // Evaluate clustering by computing Within Set Sum of Squared Errors
- double WSSSE = clusters.computeCost(parsedData.rdd());
- System.out.println("Within Set Sum of Squared Errors = " + WSSSE);
-
- // Save and load model
- clusters.save(sc.sc(), "myModelPath");
- KMeansModel sameModel = KMeansModel.load(sc.sc(), "myModelPath");
- }
-}
-{% endhighlight %}
+{% include_example java/org/apache/spark/examples/mllib/JavaKMeansExample.java %}
</div>
<div data-lang="python" markdown="1">
@@ -138,27 +74,7 @@ fact the optimal *k* is usually one where there is an "elbow" in the WSSSE graph
Refer to the [`KMeans` Python docs](api/python/pyspark.mllib.html#pyspark.mllib.clustering.KMeans) and [`KMeansModel` Python docs](api/python/pyspark.mllib.html#pyspark.mllib.clustering.KMeansModel) for more details on the API.
-{% highlight python %}
-from pyspark.mllib.clustering import KMeans, KMeansModel
-from numpy import array
-from math import sqrt
-
-# Load and parse the data
-data = sc.textFile("data/mllib/kmeans_data.txt")
-parsedData = data.map(lambda line: array([float(x) for x in line.split(' ')]))
-
-# Build the model (cluster the data)
-clusters = KMeans.train(parsedData, 2, maxIterations=10,
- runs=10, initializationMode="random")
-
-# Evaluate clustering by computing Within Set Sum of Squared Errors
-WSSSE = clusters.computeCost(parsedData)
-print("Within Set Sum of Squared Error = " + str(WSSSE))
-
-# Save and load model
-clusters.save(sc, "myModelPath")
-sameModel = KMeansModel.load(sc, "myModelPath")
-{% endhighlight %}
+{% include_example python/mllib/k_means_example.py %}
</div>
</div>
@@ -188,29 +104,7 @@ to the algorithm. We then output the parameters of the mixture model.
Refer to the [`GaussianMixture` Scala docs](api/scala/index.html#org.apache.spark.mllib.clustering.GaussianMixture) and [`GaussianMixtureModel` Scala docs](api/scala/index.html#org.apache.spark.mllib.clustering.GaussianMixtureModel) for details on the API.
-{% highlight scala %}
-import org.apache.spark.mllib.clustering.GaussianMixture
-import org.apache.spark.mllib.clustering.GaussianMixtureModel
-import org.apache.spark.mllib.linalg.Vectors
-
-// Load and parse the data
-val data = sc.textFile("data/mllib/gmm_data.txt")
-val parsedData = data.map(s => Vectors.dense(s.trim.split(' ').map(_.toDouble))).cache()
-
-// Cluster the data into two classes using GaussianMixture
-val gmm = new GaussianMixture().setK(2).run(parsedData)
-
-// Save and load model
-gmm.save(sc, "myGMMModel")
-val sameModel = GaussianMixtureModel.load(sc, "myGMMModel")
-
-// output parameters of max-likelihood model
-for (i <- 0 until gmm.k) {
- println("weight=%f\nmu=%s\nsigma=\n%s\n" format
- (gmm.weights(i), gmm.gaussians(i).mu, gmm.gaussians(i).sigma))
-}
-
-{% endhighlight %}
+{% include_example scala/org/apache/spark/examples/mllib/GaussianMixtureExample.scala %}
</div>
<div data-lang="java" markdown="1">
@@ -222,50 +116,7 @@ that is equivalent to the provided example in Scala is given below:
Refer to the [`GaussianMixture` Java docs](api/java/org/apache/spark/mllib/clustering/GaussianMixture.html) and [`GaussianMixtureModel` Java docs](api/java/org/apache/spark/mllib/clustering/GaussianMixtureModel.html) for details on the API.
-{% highlight java %}
-import org.apache.spark.api.java.*;
-import org.apache.spark.api.java.function.Function;
-import org.apache.spark.mllib.clustering.GaussianMixture;
-import org.apache.spark.mllib.clustering.GaussianMixtureModel;
-import org.apache.spark.mllib.linalg.Vector;
-import org.apache.spark.mllib.linalg.Vectors;
-import org.apache.spark.SparkConf;
-
-public class GaussianMixtureExample {
- public static void main(String[] args) {
- SparkConf conf = new SparkConf().setAppName("GaussianMixture Example");
- JavaSparkContext sc = new JavaSparkContext(conf);
-
- // Load and parse data
- String path = "data/mllib/gmm_data.txt";
- JavaRDD<String> data = sc.textFile(path);
- JavaRDD<Vector> parsedData = data.map(
- new Function<String, Vector>() {
- public Vector call(String s) {
- String[] sarray = s.trim().split(" ");
- double[] values = new double[sarray.length];
- for (int i = 0; i < sarray.length; i++)
- values[i] = Double.parseDouble(sarray[i]);
- return Vectors.dense(values);
- }
- }
- );
- parsedData.cache();
-
- // Cluster the data into two classes using GaussianMixture
- GaussianMixtureModel gmm = new GaussianMixture().setK(2).run(parsedData.rdd());
-
- // Save and load GaussianMixtureModel
- gmm.save(sc.sc(), "myGMMModel");
- GaussianMixtureModel sameModel = GaussianMixtureModel.load(sc.sc(), "myGMMModel");
- // Output the parameters of the mixture model
- for(int j=0; j<gmm.k(); j++) {
- System.out.printf("weight=%f\nmu=%s\nsigma=\n%s\n",
- gmm.weights()[j], gmm.gaussians()[j].mu(), gmm.gaussians()[j].sigma());
- }
- }
-}
-{% endhighlight %}
+{% include_example java/org/apache/spark/examples/mllib/JavaGaussianMixtureExample.java %}
</div>
<div data-lang="python" markdown="1">
@@ -276,23 +127,7 @@ to the algorithm. We then output the parameters of the mixture model.
Refer to the [`GaussianMixture` Python docs](api/python/pyspark.mllib.html#pyspark.mllib.clustering.GaussianMixture) and [`GaussianMixtureModel` Python docs](api/python/pyspark.mllib.html#pyspark.mllib.clustering.GaussianMixtureModel) for more details on the API.
-{% highlight python %}
-from pyspark.mllib.clustering import GaussianMixture
-from numpy import array
-
-# Load and parse the data
-data = sc.textFile("data/mllib/gmm_data.txt")
-parsedData = data.map(lambda line: array([float(x) for x in line.strip().split(' ')]))
-
-# Build the model (cluster the data)
-gmm = GaussianMixture.train(parsedData, 2)
-
-# output parameters of model
-for i in range(2):
- print ("weight = ", gmm.weights[i], "mu = ", gmm.gaussians[i].mu,
- "sigma = ", gmm.gaussians[i].sigma.toArray())
-
-{% endhighlight %}
+{% include_example python/mllib/gaussian_mixture_example.py %}
</div>
</div>
@@ -334,31 +169,7 @@ which contains the computed clustering assignments.
Refer to the [`PowerIterationClustering` Scala docs](api/scala/index.html#org.apache.spark.mllib.clustering.PowerIterationClustering) and [`PowerIterationClusteringModel` Scala docs](api/scala/index.html#org.apache.spark.mllib.clustering.PowerIterationClusteringModel) for details on the API.
-{% highlight scala %}
-import org.apache.spark.mllib.clustering.{PowerIterationClustering, PowerIterationClusteringModel}
-import org.apache.spark.mllib.linalg.Vectors
-
-// Load and parse the data
-val data = sc.textFile("data/mllib/pic_data.txt")
-val similarities = data.map { line =>
- val parts = line.split(' ')
- (parts(0).toLong, parts(1).toLong, parts(2).toDouble)
-}
-
-// Cluster the data into two classes using PowerIterationClustering
-val pic = new PowerIterationClustering()
- .setK(2)
- .setMaxIterations(10)
-val model = pic.run(similarities)
-
-model.assignments.foreach { a =>
- println(s"${a.id} -> ${a.cluster}")
-}
-
-// Save and load model
-model.save(sc, "myModelPath")
-val sameModel = PowerIterationClusteringModel.load(sc, "myModelPath")
-{% endhighlight %}
+{% include_example scala/org/apache/spark/examples/mllib/PowerIterationClusteringExample.scala %}
A full example that produces the experiment described in the PIC paper can be found under
[`examples/`](https://github.com/apache/spark/blob/master/examples/src/main/scala/org/apache/spark/examples/mllib/PowerIterationClusteringExample.scala).
@@ -377,40 +188,7 @@ which contains the computed clustering assignments.
Refer to the [`PowerIterationClustering` Java docs](api/java/org/apache/spark/mllib/clustering/PowerIterationClustering.html) and [`PowerIterationClusteringModel` Java docs](api/java/org/apache/spark/mllib/clustering/PowerIterationClusteringModel.html) for details on the API.
-{% highlight java %}
-import scala.Tuple2;
-import scala.Tuple3;
-
-import org.apache.spark.api.java.JavaRDD;
-import org.apache.spark.api.java.function.Function;
-import org.apache.spark.mllib.clustering.PowerIterationClustering;
-import org.apache.spark.mllib.clustering.PowerIterationClusteringModel;
-
-// Load and parse the data
-JavaRDD<String> data = sc.textFile("data/mllib/pic_data.txt");
-JavaRDD<Tuple3<Long, Long, Double>> similarities = data.map(
- new Function<String, Tuple3<Long, Long, Double>>() {
- public Tuple3<Long, Long, Double> call(String line) {
- String[] parts = line.split(" ");
- return new Tuple3<>(new Long(parts[0]), new Long(parts[1]), new Double(parts[2]));
- }
- }
-);
-
-// Cluster the data into two classes using PowerIterationClustering
-PowerIterationClustering pic = new PowerIterationClustering()
- .setK(2)
- .setMaxIterations(10);
-PowerIterationClusteringModel model = pic.run(similarities);
-
-for (PowerIterationClustering.Assignment a: model.assignments().toJavaRDD().collect()) {
- System.out.println(a.id() + " -> " + a.cluster());
-}
-
-// Save and load model
-model.save(sc.sc(), "myModelPath");
-PowerIterationClusteringModel sameModel = PowerIterationClusteringModel.load(sc.sc(), "myModelPath");
-{% endhighlight %}
+{% include_example java/org/apache/spark/examples/mllib/JavaPowerIterationClusteringExample.java %}
</div>
<div data-lang="python" markdown="1">
@@ -425,23 +203,7 @@ which contains the computed clustering assignments.
Refer to the [`PowerIterationClustering` Python docs](api/python/pyspark.mllib.html#pyspark.mllib.clustering.PowerIterationClustering) and [`PowerIterationClusteringModel` Python docs](api/python/pyspark.mllib.html#pyspark.mllib.clustering.PowerIterationClusteringModel) for more details on the API.
-{% highlight python %}
-from __future__ import print_function
-from pyspark.mllib.clustering import PowerIterationClustering, PowerIterationClusteringModel
-
-# Load and parse the data
-data = sc.textFile("data/mllib/pic_data.txt")
-similarities = data.map(lambda line: tuple([float(x) for x in line.split(' ')]))
-
-# Cluster the data into two classes using PowerIterationClustering
-model = PowerIterationClustering.train(similarities, 2, 10)
-
-model.assignments().foreach(lambda x: print(str(x.id) + " -> " + str(x.cluster)))
-
-# Save and load model
-model.save(sc, "myModelPath")
-sameModel = PowerIterationClusteringModel.load(sc, "myModelPath")
-{% endhighlight %}
+{% include_example python/mllib/power_iteration_clustering_example.py %}
</div>
</div>
@@ -587,129 +349,19 @@ to the algorithm. We then output the topics, represented as probability distribu
<div data-lang="scala" markdown="1">
Refer to the [`LDA` Scala docs](api/scala/index.html#org.apache.spark.mllib.clustering.LDA) and [`DistributedLDAModel` Scala docs](api/scala/index.html#org.apache.spark.mllib.clustering.DistributedLDAModel) for details on the API.
-{% highlight scala %}
-import org.apache.spark.mllib.clustering.{LDA, DistributedLDAModel}
-import org.apache.spark.mllib.linalg.Vectors
-
-// Load and parse the data
-val data = sc.textFile("data/mllib/sample_lda_data.txt")
-val parsedData = data.map(s => Vectors.dense(s.trim.split(' ').map(_.toDouble)))
-// Index documents with unique IDs
-val corpus = parsedData.zipWithIndex.map(_.swap).cache()
-
-// Cluster the documents into three topics using LDA
-val ldaModel = new LDA().setK(3).run(corpus)
-
-// Output topics. Each is a distribution over words (matching word count vectors)
-println("Learned topics (as distributions over vocab of " + ldaModel.vocabSize + " words):")
-val topics = ldaModel.topicsMatrix
-for (topic <- Range(0, 3)) {
- print("Topic " + topic + ":")
- for (word <- Range(0, ldaModel.vocabSize)) { print(" " + topics(word, topic)); }
- println()
-}
-
-// Save and load model.
-ldaModel.save(sc, "myLDAModel")
-val sameModel = DistributedLDAModel.load(sc, "myLDAModel")
-
-{% endhighlight %}
+{% include_example scala/org/apache/spark/examples/mllib/LatentDirichletAllocationExample.scala %}
</div>
<div data-lang="java" markdown="1">
Refer to the [`LDA` Java docs](api/java/org/apache/spark/mllib/clustering/LDA.html) and [`DistributedLDAModel` Java docs](api/java/org/apache/spark/mllib/clustering/DistributedLDAModel.html) for details on the API.
-{% highlight java %}
-import scala.Tuple2;
-
-import org.apache.spark.api.java.*;
-import org.apache.spark.api.java.function.Function;
-import org.apache.spark.mllib.clustering.DistributedLDAModel;
-import org.apache.spark.mllib.clustering.LDA;
-import org.apache.spark.mllib.linalg.Matrix;
-import org.apache.spark.mllib.linalg.Vector;
-import org.apache.spark.mllib.linalg.Vectors;
-import org.apache.spark.SparkConf;
-
-public class JavaLDAExample {
- public static void main(String[] args) {
- SparkConf conf = new SparkConf().setAppName("LDA Example");
- JavaSparkContext sc = new JavaSparkContext(conf);
-
- // Load and parse the data
- String path = "data/mllib/sample_lda_data.txt";
- JavaRDD<String> data = sc.textFile(path);
- JavaRDD<Vector> parsedData = data.map(
- new Function<String, Vector>() {
- public Vector call(String s) {
- String[] sarray = s.trim().split(" ");
- double[] values = new double[sarray.length];
- for (int i = 0; i < sarray.length; i++)
- values[i] = Double.parseDouble(sarray[i]);
- return Vectors.dense(values);
- }
- }
- );
- // Index documents with unique IDs
- JavaPairRDD<Long, Vector> corpus = JavaPairRDD.fromJavaRDD(parsedData.zipWithIndex().map(
- new Function<Tuple2<Vector, Long>, Tuple2<Long, Vector>>() {
- public Tuple2<Long, Vector> call(Tuple2<Vector, Long> doc_id) {
- return doc_id.swap();
- }
- }
- ));
- corpus.cache();
-
- // Cluster the documents into three topics using LDA
- DistributedLDAModel ldaModel = new LDA().setK(3).run(corpus);
-
- // Output topics. Each is a distribution over words (matching word count vectors)
- System.out.println("Learned topics (as distributions over vocab of " + ldaModel.vocabSize()
- + " words):");
- Matrix topics = ldaModel.topicsMatrix();
- for (int topic = 0; topic < 3; topic++) {
- System.out.print("Topic " + topic + ":");
- for (int word = 0; word < ldaModel.vocabSize(); word++) {
- System.out.print(" " + topics.apply(word, topic));
- }
- System.out.println();
- }
-
- ldaModel.save(sc.sc(), "myLDAModel");
- DistributedLDAModel sameModel = DistributedLDAModel.load(sc.sc(), "myLDAModel");
- }
-}
-{% endhighlight %}
+{% include_example java/org/apache/spark/examples/mllib/JavaLatentDirichletAllocationExample.java %}
</div>
<div data-lang="python" markdown="1">
Refer to the [`LDA` Python docs](api/python/pyspark.mllib.html#pyspark.mllib.clustering.LDA) and [`LDAModel` Python docs](api/python/pyspark.mllib.html#pyspark.mllib.clustering.LDAModel) for more details on the API.
-{% highlight python %}
-from pyspark.mllib.clustering import LDA, LDAModel
-from pyspark.mllib.linalg import Vectors
-
-# Load and parse the data
-data = sc.textFile("data/mllib/sample_lda_data.txt")
-parsedData = data.map(lambda line: Vectors.dense([float(x) for x in line.strip().split(' ')]))
-# Index documents with unique IDs
-corpus = parsedData.zipWithIndex().map(lambda x: [x[1], x[0]]).cache()
-
-# Cluster the documents into three topics using LDA
-ldaModel = LDA.train(corpus, k=3)
-
-# Output topics. Each is a distribution over words (matching word count vectors)
-print("Learned topics (as distributions over vocab of " + str(ldaModel.vocabSize()) + " words):")
-topics = ldaModel.topicsMatrix()
-for topic in range(3):
- print("Topic " + str(topic) + ":")
- for word in range(0, ldaModel.vocabSize()):
- print(" " + str(topics[word][topic]))
-
-# Save and load model
-model.save(sc, "myModelPath")
-sameModel = LDAModel.load(sc, "myModelPath")
-{% endhighlight %}
+{% include_example python/mllib/latent_dirichlet_allocation_example.py %}
</div>
</div>
@@ -785,96 +437,16 @@ This example shows how to estimate clusters on streaming data.
<div data-lang="scala" markdown="1">
Refer to the [`StreamingKMeans` Scala docs](api/scala/index.html#org.apache.spark.mllib.clustering.StreamingKMeans) for details on the API.
+And Refer to [Spark Streaming Programming Guide](streaming-programming-guide.html#initializing) for details on StreamingContext.
-First we import the necessary classes.
-
-{% highlight scala %}
-
-import org.apache.spark.mllib.linalg.Vectors
-import org.apache.spark.mllib.regression.LabeledPoint
-import org.apache.spark.mllib.clustering.StreamingKMeans
-
-{% endhighlight %}
-
-Then we make an input stream of vectors for training, as well as a stream of labeled data
-points for testing. We assume a StreamingContext `ssc` has been created, see
-[Spark Streaming Programming Guide](streaming-programming-guide.html#initializing) for more info.
-
-{% highlight scala %}
-
-val trainingData = ssc.textFileStream("/training/data/dir").map(Vectors.parse)
-val testData = ssc.textFileStream("/testing/data/dir").map(LabeledPoint.parse)
-
-{% endhighlight %}
-
-We create a model with random clusters and specify the number of clusters to find
-
-{% highlight scala %}
-
-val numDimensions = 3
-val numClusters = 2
-val model = new StreamingKMeans()
- .setK(numClusters)
- .setDecayFactor(1.0)
- .setRandomCenters(numDimensions, 0.0)
-
-{% endhighlight %}
-
-Now register the streams for training and testing and start the job, printing
-the predicted cluster assignments on new data points as they arrive.
-
-{% highlight scala %}
-
-model.trainOn(trainingData)
-model.predictOnValues(testData.map(lp => (lp.label, lp.features))).print()
-
-ssc.start()
-ssc.awaitTermination()
-
-{% endhighlight %}
+{% include_example scala/org/apache/spark/examples/mllib/StreamingKMeansExample.scala %}
</div>
<div data-lang="python" markdown="1">
Refer to the [`StreamingKMeans` Python docs](api/python/pyspark.mllib.html#pyspark.mllib.clustering.StreamingKMeans) for more details on the API.
+And Refer to [Spark Streaming Programming Guide](streaming-programming-guide.html#initializing) for details on StreamingContext.
-First we import the necessary classes.
-
-{% highlight python %}
-from pyspark.mllib.linalg import Vectors
-from pyspark.mllib.regression import LabeledPoint
-from pyspark.mllib.clustering import StreamingKMeans
-{% endhighlight %}
-
-Then we make an input stream of vectors for training, as well as a stream of labeled data
-points for testing. We assume a StreamingContext `ssc` has been created, see
-[Spark Streaming Programming Guide](streaming-programming-guide.html#initializing) for more info.
-
-{% highlight python %}
-def parse(lp):
- label = float(lp[lp.find('(') + 1: lp.find(',')])
- vec = Vectors.dense(lp[lp.find('[') + 1: lp.find(']')].split(','))
- return LabeledPoint(label, vec)
-
-trainingData = ssc.textFileStream("/training/data/dir").map(Vectors.parse)
-testData = ssc.textFileStream("/testing/data/dir").map(parse)
-{% endhighlight %}
-
-We create a model with random clusters and specify the number of clusters to find
-
-{% highlight python %}
-model = StreamingKMeans(k=2, decayFactor=1.0).setRandomCenters(3, 1.0, 0)
-{% endhighlight %}
-
-Now register the streams for training and testing and start the job, printing
-the predicted cluster assignments on new data points as they arrive.
-
-{% highlight python %}
-model.trainOn(trainingData)
-print(model.predictOnValues(testData.map(lambda lp: (lp.label, lp.features))))
-
-ssc.start()
-ssc.awaitTermination()
-{% endhighlight %}
+{% include_example python/mllib/streaming_k_means_example.py %}
</div>
</div>