aboutsummaryrefslogtreecommitdiff
path: root/examples
diff options
context:
space:
mode:
authorXin Ren <iamshrek@126.com>2016-03-03 09:32:47 -0800
committerXiangrui Meng <meng@databricks.com>2016-03-03 09:32:47 -0800
commit70f6f9649bdb13b6745473b7edc4cd06b10f99d2 (patch)
tree72b9f2de8f67f5917a37f65b2cc805243b413fe2 /examples
parent645c3a85e2029928d37ec2de9ef5a2d884620b9b (diff)
downloadspark-70f6f9649bdb13b6745473b7edc4cd06b10f99d2.tar.gz
spark-70f6f9649bdb13b6745473b7edc4cd06b10f99d2.tar.bz2
spark-70f6f9649bdb13b6745473b7edc4cd06b10f99d2.zip
[SPARK-13013][DOCS] Replace example code in mllib-clustering.md using include_example
Replace example code in mllib-clustering.md using include_example https://issues.apache.org/jira/browse/SPARK-13013 The example code in the user guide is embedded in the markdown and hence it is not easy to test. It would be nice to automatically test them. This JIRA is to discuss options to automate example code testing and see what we can do in Spark 1.6. Goal is to move actual example code to spark/examples and test compilation in Jenkins builds. Then in the markdown, we can reference part of the code to show in the user guide. This requires adding a Jekyll tag that is similar to https://github.com/jekyll/jekyll/blob/master/lib/jekyll/tags/include.rb, e.g., called include_example. `{% include_example scala/org/apache/spark/examples/mllib/KMeansExample.scala %}` Jekyll will find `examples/src/main/scala/org/apache/spark/examples/mllib/KMeansExample.scala` and pick code blocks marked "example" and replace code block in `{% highlight %}` in the markdown. See more sub-tasks in parent ticket: https://issues.apache.org/jira/browse/SPARK-11337 Author: Xin Ren <iamshrek@126.com> Closes #11116 from keypointt/SPARK-13013.
Diffstat (limited to 'examples')
-rw-r--r--examples/src/main/java/org/apache/spark/examples/mllib/JavaGaussianMixtureExample.java72
-rw-r--r--examples/src/main/java/org/apache/spark/examples/mllib/JavaKMeansExample.java72
-rw-r--r--examples/src/main/java/org/apache/spark/examples/mllib/JavaLatentDirichletAllocationExample.java93
-rw-r--r--examples/src/main/java/org/apache/spark/examples/mllib/JavaPowerIterationClusteringExample.java4
-rw-r--r--examples/src/main/python/mllib/gaussian_mixture_example.py51
-rw-r--r--examples/src/main/python/mllib/k_means_example.py55
-rw-r--r--examples/src/main/python/mllib/latent_dirichlet_allocation_example.py54
-rw-r--r--examples/src/main/python/mllib/power_iteration_clustering_example.py44
-rw-r--r--examples/src/main/python/mllib/streaming_k_means_example.py66
-rw-r--r--examples/src/main/scala/org/apache/spark/examples/mllib/GaussianMixtureExample.scala57
-rw-r--r--examples/src/main/scala/org/apache/spark/examples/mllib/KMeansExample.scala56
-rw-r--r--examples/src/main/scala/org/apache/spark/examples/mllib/LatentDirichletAllocationExample.scala62
-rw-r--r--examples/src/main/scala/org/apache/spark/examples/mllib/PowerIterationClusteringExample.scala8
-rw-r--r--examples/src/main/scala/org/apache/spark/examples/mllib/StreamingKMeansExample.scala6
14 files changed, 697 insertions, 3 deletions
diff --git a/examples/src/main/java/org/apache/spark/examples/mllib/JavaGaussianMixtureExample.java b/examples/src/main/java/org/apache/spark/examples/mllib/JavaGaussianMixtureExample.java
new file mode 100644
index 0000000000..4d1c64aa3c
--- /dev/null
+++ b/examples/src/main/java/org/apache/spark/examples/mllib/JavaGaussianMixtureExample.java
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.examples.mllib;
+
+import org.apache.spark.SparkConf;
+import org.apache.spark.api.java.JavaSparkContext;
+
+// $example on$
+import org.apache.spark.api.java.JavaRDD;
+import org.apache.spark.api.java.function.Function;
+import org.apache.spark.mllib.clustering.GaussianMixture;
+import org.apache.spark.mllib.clustering.GaussianMixtureModel;
+import org.apache.spark.mllib.linalg.Vector;
+import org.apache.spark.mllib.linalg.Vectors;
+// $example off$
+
+public class JavaGaussianMixtureExample {
+ public static void main(String[] args) {
+
+ SparkConf conf = new SparkConf().setAppName("JavaGaussianMixtureExample");
+ JavaSparkContext jsc = new JavaSparkContext(conf);
+
+ // $example on$
+ // Load and parse data
+ String path = "data/mllib/gmm_data.txt";
+ JavaRDD<String> data = jsc.textFile(path);
+ JavaRDD<Vector> parsedData = data.map(
+ new Function<String, Vector>() {
+ public Vector call(String s) {
+ String[] sarray = s.trim().split(" ");
+ double[] values = new double[sarray.length];
+ for (int i = 0; i < sarray.length; i++)
+ values[i] = Double.parseDouble(sarray[i]);
+ return Vectors.dense(values);
+ }
+ }
+ );
+ parsedData.cache();
+
+ // Cluster the data into two classes using GaussianMixture
+ GaussianMixtureModel gmm = new GaussianMixture().setK(2).run(parsedData.rdd());
+
+ // Save and load GaussianMixtureModel
+ gmm.save(jsc.sc(), "target/org/apache/spark/JavaGaussianMixtureExample/GaussianMixtureModel");
+ GaussianMixtureModel sameModel = GaussianMixtureModel.load(jsc.sc(),
+ "target/org.apache.spark.JavaGaussianMixtureExample/GaussianMixtureModel");
+
+ // Output the parameters of the mixture model
+ for (int j = 0; j < gmm.k(); j++) {
+ System.out.printf("weight=%f\nmu=%s\nsigma=\n%s\n",
+ gmm.weights()[j], gmm.gaussians()[j].mu(), gmm.gaussians()[j].sigma());
+ }
+ // $example off$
+
+ jsc.stop();
+ }
+}
diff --git a/examples/src/main/java/org/apache/spark/examples/mllib/JavaKMeansExample.java b/examples/src/main/java/org/apache/spark/examples/mllib/JavaKMeansExample.java
new file mode 100644
index 0000000000..a24606a2e9
--- /dev/null
+++ b/examples/src/main/java/org/apache/spark/examples/mllib/JavaKMeansExample.java
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.examples.mllib;
+
+import org.apache.spark.SparkConf;
+import org.apache.spark.api.java.JavaSparkContext;
+
+// $example on$
+import org.apache.spark.api.java.JavaRDD;
+import org.apache.spark.api.java.function.Function;
+import org.apache.spark.mllib.clustering.KMeans;
+import org.apache.spark.mllib.clustering.KMeansModel;
+import org.apache.spark.mllib.linalg.Vector;
+import org.apache.spark.mllib.linalg.Vectors;
+// $example off$
+
+public class JavaKMeansExample {
+ public static void main(String[] args) {
+
+ SparkConf conf = new SparkConf().setAppName("JavaKMeansExample");
+ JavaSparkContext jsc = new JavaSparkContext(conf);
+
+ // $example on$
+ // Load and parse data
+ String path = "data/mllib/kmeans_data.txt";
+ JavaRDD<String> data = jsc.textFile(path);
+ JavaRDD<Vector> parsedData = data.map(
+ new Function<String, Vector>() {
+ public Vector call(String s) {
+ String[] sarray = s.split(" ");
+ double[] values = new double[sarray.length];
+ for (int i = 0; i < sarray.length; i++)
+ values[i] = Double.parseDouble(sarray[i]);
+ return Vectors.dense(values);
+ }
+ }
+ );
+ parsedData.cache();
+
+ // Cluster the data into two classes using KMeans
+ int numClusters = 2;
+ int numIterations = 20;
+ KMeansModel clusters = KMeans.train(parsedData.rdd(), numClusters, numIterations);
+
+ // Evaluate clustering by computing Within Set Sum of Squared Errors
+ double WSSSE = clusters.computeCost(parsedData.rdd());
+ System.out.println("Within Set Sum of Squared Errors = " + WSSSE);
+
+ // Save and load model
+ clusters.save(jsc.sc(), "target/org/apache/spark/JavaKMeansExample/KMeansModel");
+ KMeansModel sameModel = KMeansModel.load(jsc.sc(),
+ "target/org/apache/spark/JavaKMeansExample/KMeansModel");
+ // $example off$
+
+ jsc.stop();
+ }
+}
diff --git a/examples/src/main/java/org/apache/spark/examples/mllib/JavaLatentDirichletAllocationExample.java b/examples/src/main/java/org/apache/spark/examples/mllib/JavaLatentDirichletAllocationExample.java
new file mode 100644
index 0000000000..4d8b65c544
--- /dev/null
+++ b/examples/src/main/java/org/apache/spark/examples/mllib/JavaLatentDirichletAllocationExample.java
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.examples.mllib;
+
+import org.apache.spark.SparkConf;
+import org.apache.spark.api.java.JavaSparkContext;
+
+// $example on$
+import scala.Tuple2;
+
+import org.apache.spark.api.java.JavaPairRDD;
+import org.apache.spark.api.java.JavaRDD;
+import org.apache.spark.api.java.function.Function;
+import org.apache.spark.mllib.clustering.DistributedLDAModel;
+import org.apache.spark.mllib.clustering.LDA;
+import org.apache.spark.mllib.clustering.LDAModel;
+import org.apache.spark.mllib.linalg.Matrix;
+import org.apache.spark.mllib.linalg.Vector;
+import org.apache.spark.mllib.linalg.Vectors;
+// $example off$
+
+public class JavaLatentDirichletAllocationExample {
+ public static void main(String[] args) {
+
+ SparkConf conf = new SparkConf().setAppName("JavaKLatentDirichletAllocationExample");
+ JavaSparkContext jsc = new JavaSparkContext(conf);
+
+ // $example on$
+ // Load and parse the data
+ String path = "data/mllib/sample_lda_data.txt";
+ JavaRDD<String> data = jsc.textFile(path);
+ JavaRDD<Vector> parsedData = data.map(
+ new Function<String, Vector>() {
+ public Vector call(String s) {
+ String[] sarray = s.trim().split(" ");
+ double[] values = new double[sarray.length];
+ for (int i = 0; i < sarray.length; i++)
+ values[i] = Double.parseDouble(sarray[i]);
+ return Vectors.dense(values);
+ }
+ }
+ );
+ // Index documents with unique IDs
+ JavaPairRDD<Long, Vector> corpus =
+ JavaPairRDD.fromJavaRDD(parsedData.zipWithIndex().map(
+ new Function<Tuple2<Vector, Long>, Tuple2<Long, Vector>>() {
+ public Tuple2<Long, Vector> call(Tuple2<Vector, Long> doc_id) {
+ return doc_id.swap();
+ }
+ }
+ )
+ );
+ corpus.cache();
+
+ // Cluster the documents into three topics using LDA
+ LDAModel ldaModel = new LDA().setK(3).run(corpus);
+
+ // Output topics. Each is a distribution over words (matching word count vectors)
+ System.out.println("Learned topics (as distributions over vocab of " + ldaModel.vocabSize()
+ + " words):");
+ Matrix topics = ldaModel.topicsMatrix();
+ for (int topic = 0; topic < 3; topic++) {
+ System.out.print("Topic " + topic + ":");
+ for (int word = 0; word < ldaModel.vocabSize(); word++) {
+ System.out.print(" " + topics.apply(word, topic));
+ }
+ System.out.println();
+ }
+
+ ldaModel.save(jsc.sc(),
+ "target/org/apache/spark/JavaLatentDirichletAllocationExample/LDAModel");
+ DistributedLDAModel sameModel = DistributedLDAModel.load(jsc.sc(),
+ "target/org/apache/spark/JavaLatentDirichletAllocationExample/LDAModel");
+ // $example off$
+
+ jsc.stop();
+ }
+}
diff --git a/examples/src/main/java/org/apache/spark/examples/mllib/JavaPowerIterationClusteringExample.java b/examples/src/main/java/org/apache/spark/examples/mllib/JavaPowerIterationClusteringExample.java
index 6c6f9768f0..b62fa90c34 100644
--- a/examples/src/main/java/org/apache/spark/examples/mllib/JavaPowerIterationClusteringExample.java
+++ b/examples/src/main/java/org/apache/spark/examples/mllib/JavaPowerIterationClusteringExample.java
@@ -24,8 +24,10 @@ import com.google.common.collect.Lists;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
+// $example on$
import org.apache.spark.mllib.clustering.PowerIterationClustering;
import org.apache.spark.mllib.clustering.PowerIterationClusteringModel;
+// $example off$
/**
* Java example for graph clustering using power iteration clustering (PIC).
@@ -36,6 +38,7 @@ public class JavaPowerIterationClusteringExample {
JavaSparkContext sc = new JavaSparkContext(sparkConf);
@SuppressWarnings("unchecked")
+ // $example on$
JavaRDD<Tuple3<Long, Long, Double>> similarities = sc.parallelize(Lists.newArrayList(
new Tuple3<Long, Long, Double>(0L, 1L, 0.9),
new Tuple3<Long, Long, Double>(1L, 2L, 0.9),
@@ -51,6 +54,7 @@ public class JavaPowerIterationClusteringExample {
for (PowerIterationClustering.Assignment a: model.assignments().toJavaRDD().collect()) {
System.out.println(a.id() + " -> " + a.cluster());
}
+ // $example off$
sc.stop();
}
diff --git a/examples/src/main/python/mllib/gaussian_mixture_example.py b/examples/src/main/python/mllib/gaussian_mixture_example.py
new file mode 100644
index 0000000000..a60e799d62
--- /dev/null
+++ b/examples/src/main/python/mllib/gaussian_mixture_example.py
@@ -0,0 +1,51 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+from __future__ import print_function
+
+# $example on$
+from numpy import array
+# $example off$
+
+from pyspark import SparkContext
+# $example on$
+from pyspark.mllib.clustering import GaussianMixture, GaussianMixtureModel
+# $example off$
+
+if __name__ == "__main__":
+ sc = SparkContext(appName="GaussianMixtureExample") # SparkContext
+
+ # $example on$
+ # Load and parse the data
+ data = sc.textFile("data/mllib/gmm_data.txt")
+ parsedData = data.map(lambda line: array([float(x) for x in line.strip().split(' ')]))
+
+ # Build the model (cluster the data)
+ gmm = GaussianMixture.train(parsedData, 2)
+
+ # Save and load model
+ gmm.save(sc, "target/org/apache/spark/PythonGaussianMixtureExample/GaussianMixtureModel")
+ sameModel = GaussianMixtureModel\
+ .load(sc, "target/org/apache/spark/PythonGaussianMixtureExample/GaussianMixtureModel")
+
+ # output parameters of model
+ for i in range(2):
+ print("weight = ", gmm.weights[i], "mu = ", gmm.gaussians[i].mu,
+ "sigma = ", gmm.gaussians[i].sigma.toArray())
+ # $example off$
+
+ sc.stop()
diff --git a/examples/src/main/python/mllib/k_means_example.py b/examples/src/main/python/mllib/k_means_example.py
new file mode 100644
index 0000000000..5c397e62ef
--- /dev/null
+++ b/examples/src/main/python/mllib/k_means_example.py
@@ -0,0 +1,55 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+from __future__ import print_function
+
+# $example on$
+from numpy import array
+from math import sqrt
+# $example off$
+
+from pyspark import SparkContext
+# $example on$
+from pyspark.mllib.clustering import KMeans, KMeansModel
+# $example off$
+
+if __name__ == "__main__":
+ sc = SparkContext(appName="KMeansExample") # SparkContext
+
+ # $example on$
+ # Load and parse the data
+ data = sc.textFile("data/mllib/kmeans_data.txt")
+ parsedData = data.map(lambda line: array([float(x) for x in line.split(' ')]))
+
+ # Build the model (cluster the data)
+ clusters = KMeans.train(parsedData, 2, maxIterations=10,
+ runs=10, initializationMode="random")
+
+ # Evaluate clustering by computing Within Set Sum of Squared Errors
+ def error(point):
+ center = clusters.centers[clusters.predict(point)]
+ return sqrt(sum([x**2 for x in (point - center)]))
+
+ WSSSE = parsedData.map(lambda point: error(point)).reduce(lambda x, y: x + y)
+ print("Within Set Sum of Squared Error = " + str(WSSSE))
+
+ # Save and load model
+ clusters.save(sc, "target/org/apache/spark/PythonKMeansExample/KMeansModel")
+ sameModel = KMeansModel.load(sc, "target/org/apache/spark/PythonKMeansExample/KMeansModel")
+ # $example off$
+
+ sc.stop()
diff --git a/examples/src/main/python/mllib/latent_dirichlet_allocation_example.py b/examples/src/main/python/mllib/latent_dirichlet_allocation_example.py
new file mode 100644
index 0000000000..2a1bef5f20
--- /dev/null
+++ b/examples/src/main/python/mllib/latent_dirichlet_allocation_example.py
@@ -0,0 +1,54 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+from __future__ import print_function
+
+from pyspark import SparkContext
+# $example on$
+from pyspark.mllib.clustering import LDA, LDAModel
+from pyspark.mllib.linalg import Vectors
+# $example off$
+
+if __name__ == "__main__":
+ sc = SparkContext(appName="LatentDirichletAllocationExample") # SparkContext
+
+ # $example on$
+ # Load and parse the data
+ data = sc.textFile("data/mllib/sample_lda_data.txt")
+ parsedData = data.map(lambda line: Vectors.dense([float(x) for x in line.strip().split(' ')]))
+ # Index documents with unique IDs
+ corpus = parsedData.zipWithIndex().map(lambda x: [x[1], x[0]]).cache()
+
+ # Cluster the documents into three topics using LDA
+ ldaModel = LDA.train(corpus, k=3)
+
+ # Output topics. Each is a distribution over words (matching word count vectors)
+ print("Learned topics (as distributions over vocab of " + str(ldaModel.vocabSize())
+ + " words):")
+ topics = ldaModel.topicsMatrix()
+ for topic in range(3):
+ print("Topic " + str(topic) + ":")
+ for word in range(0, ldaModel.vocabSize()):
+ print(" " + str(topics[word][topic]))
+
+ # Save and load model
+ ldaModel.save(sc, "target/org/apache/spark/PythonLatentDirichletAllocationExample/LDAModel")
+ sameModel = LDAModel\
+ .load(sc, "target/org/apache/spark/PythonLatentDirichletAllocationExample/LDAModel")
+ # $example off$
+
+ sc.stop()
diff --git a/examples/src/main/python/mllib/power_iteration_clustering_example.py b/examples/src/main/python/mllib/power_iteration_clustering_example.py
new file mode 100644
index 0000000000..ca19c0ccb6
--- /dev/null
+++ b/examples/src/main/python/mllib/power_iteration_clustering_example.py
@@ -0,0 +1,44 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+from __future__ import print_function
+
+from pyspark import SparkContext
+# $example on$
+from pyspark.mllib.clustering import PowerIterationClustering, PowerIterationClusteringModel
+# $example off$
+
+if __name__ == "__main__":
+ sc = SparkContext(appName="PowerIterationClusteringExample") # SparkContext
+
+ # $example on$
+ # Load and parse the data
+ data = sc.textFile("data/mllib/pic_data.txt")
+ similarities = data.map(lambda line: tuple([float(x) for x in line.split(' ')]))
+
+ # Cluster the data into two classes using PowerIterationClustering
+ model = PowerIterationClustering.train(similarities, 2, 10)
+
+ model.assignments().foreach(lambda x: print(str(x.id) + " -> " + str(x.cluster)))
+
+ # Save and load model
+ model.save(sc, "target/org/apache/spark/PythonPowerIterationClusteringExample/PICModel")
+ sameModel = PowerIterationClusteringModel\
+ .load(sc, "target/org/apache/spark/PythonPowerIterationClusteringExample/PICModel")
+ # $example off$
+
+ sc.stop()
diff --git a/examples/src/main/python/mllib/streaming_k_means_example.py b/examples/src/main/python/mllib/streaming_k_means_example.py
new file mode 100644
index 0000000000..e82509ad3f
--- /dev/null
+++ b/examples/src/main/python/mllib/streaming_k_means_example.py
@@ -0,0 +1,66 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+from __future__ import print_function
+
+from pyspark import SparkContext
+from pyspark.streaming import StreamingContext
+# $example on$
+from pyspark.mllib.linalg import Vectors
+from pyspark.mllib.regression import LabeledPoint
+from pyspark.mllib.clustering import StreamingKMeans
+# $example off$
+
+if __name__ == "__main__":
+ sc = SparkContext(appName="StreamingKMeansExample") # SparkContext
+ ssc = StreamingContext(sc, 1)
+
+ # $example on$
+ # we make an input stream of vectors for training,
+ # as well as a stream of vectors for testing
+ def parse(lp):
+ label = float(lp[lp.find('(') + 1: lp.find(')')])
+ vec = Vectors.dense(lp[lp.find('[') + 1: lp.find(']')].split(','))
+
+ return LabeledPoint(label, vec)
+
+ trainingData = sc.textFile("data/mllib/kmeans_data.txt")\
+ .map(lambda line: Vectors.dense([float(x) for x in line.strip().split(' ')]))
+
+ testingData = sc.textFile("data/mllib/streaming_kmeans_data_test.txt").map(parse)
+
+ trainingQueue = [trainingData]
+ testingQueue = [testingData]
+
+ trainingStream = ssc.queueStream(trainingQueue)
+ testingStream = ssc.queueStream(testingQueue)
+
+ # We create a model with random clusters and specify the number of clusters to find
+ model = StreamingKMeans(k=2, decayFactor=1.0).setRandomCenters(3, 1.0, 0)
+
+ # Now register the streams for training and testing and start the job,
+ # printing the predicted cluster assignments on new data points as they arrive.
+ model.trainOn(trainingStream)
+
+ result = model.predictOnValues(testingStream.map(lambda lp: (lp.label, lp.features)))
+ result.pprint()
+
+ ssc.start()
+ ssc.stop(stopSparkContext=True, stopGraceFully=True)
+ # $example off$
+
+ print("Final centers: " + str(model.latestModel().centers))
diff --git a/examples/src/main/scala/org/apache/spark/examples/mllib/GaussianMixtureExample.scala b/examples/src/main/scala/org/apache/spark/examples/mllib/GaussianMixtureExample.scala
new file mode 100644
index 0000000000..b1b3a79d87
--- /dev/null
+++ b/examples/src/main/scala/org/apache/spark/examples/mllib/GaussianMixtureExample.scala
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+// scalastyle:off println
+package org.apache.spark.examples.mllib
+
+import org.apache.spark.{SparkConf, SparkContext}
+// $example on$
+import org.apache.spark.mllib.clustering.{GaussianMixture, GaussianMixtureModel}
+import org.apache.spark.mllib.linalg.Vectors
+// $example off$
+
+object GaussianMixtureExample {
+
+ def main(args: Array[String]) {
+
+ val conf = new SparkConf().setAppName("GaussianMixtureExample")
+ val sc = new SparkContext(conf)
+
+ // $example on$
+ // Load and parse the data
+ val data = sc.textFile("data/mllib/gmm_data.txt")
+ val parsedData = data.map(s => Vectors.dense(s.trim.split(' ').map(_.toDouble))).cache()
+
+ // Cluster the data into two classes using GaussianMixture
+ val gmm = new GaussianMixture().setK(2).run(parsedData)
+
+ // Save and load model
+ gmm.save(sc, "target/org/apache/spark/GaussianMixtureExample/GaussianMixtureModel")
+ val sameModel = GaussianMixtureModel.load(sc,
+ "target/org/apache/spark/GaussianMixtureExample/GaussianMixtureModel")
+
+ // output parameters of max-likelihood model
+ for (i <- 0 until gmm.k) {
+ println("weight=%f\nmu=%s\nsigma=\n%s\n" format
+ (gmm.weights(i), gmm.gaussians(i).mu, gmm.gaussians(i).sigma))
+ }
+ // $example off$
+
+ sc.stop()
+ }
+}
+// scalastyle:on println
diff --git a/examples/src/main/scala/org/apache/spark/examples/mllib/KMeansExample.scala b/examples/src/main/scala/org/apache/spark/examples/mllib/KMeansExample.scala
new file mode 100644
index 0000000000..c4d71d862f
--- /dev/null
+++ b/examples/src/main/scala/org/apache/spark/examples/mllib/KMeansExample.scala
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+// scalastyle:off println
+package org.apache.spark.examples.mllib
+
+import org.apache.spark.{SparkConf, SparkContext}
+// $example on$
+import org.apache.spark.mllib.clustering.{KMeans, KMeansModel}
+import org.apache.spark.mllib.linalg.Vectors
+// $example off$
+
+object KMeansExample {
+
+ def main(args: Array[String]) {
+
+ val conf = new SparkConf().setAppName("KMeansExample")
+ val sc = new SparkContext(conf)
+
+ // $example on$
+ // Load and parse the data
+ val data = sc.textFile("data/mllib/kmeans_data.txt")
+ val parsedData = data.map(s => Vectors.dense(s.split(' ').map(_.toDouble))).cache()
+
+ // Cluster the data into two classes using KMeans
+ val numClusters = 2
+ val numIterations = 20
+ val clusters = KMeans.train(parsedData, numClusters, numIterations)
+
+ // Evaluate clustering by computing Within Set Sum of Squared Errors
+ val WSSSE = clusters.computeCost(parsedData)
+ println("Within Set Sum of Squared Errors = " + WSSSE)
+
+ // Save and load model
+ clusters.save(sc, "target/org/apache/spark/KMeansExample/KMeansModel")
+ val sameModel = KMeansModel.load(sc, "target/org/apache/spark/KMeansExample/KMeansModel")
+ // $example off$
+
+ sc.stop()
+ }
+}
+// scalastyle:on println
diff --git a/examples/src/main/scala/org/apache/spark/examples/mllib/LatentDirichletAllocationExample.scala b/examples/src/main/scala/org/apache/spark/examples/mllib/LatentDirichletAllocationExample.scala
new file mode 100644
index 0000000000..f2c8ec0143
--- /dev/null
+++ b/examples/src/main/scala/org/apache/spark/examples/mllib/LatentDirichletAllocationExample.scala
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+// scalastyle:off println
+package org.apache.spark.examples.mllib
+
+import org.apache.spark.{SparkConf, SparkContext}
+// $example on$
+import org.apache.spark.mllib.clustering.{DistributedLDAModel, LDA}
+import org.apache.spark.mllib.linalg.Vectors
+// $example off$
+
+object LatentDirichletAllocationExample {
+
+ def main(args: Array[String]) {
+
+ val conf = new SparkConf().setAppName("LatentDirichletAllocationExample")
+ val sc = new SparkContext(conf)
+
+ // $example on$
+ // Load and parse the data
+ val data = sc.textFile("data/mllib/sample_lda_data.txt")
+ val parsedData = data.map(s => Vectors.dense(s.trim.split(' ').map(_.toDouble)))
+ // Index documents with unique IDs
+ val corpus = parsedData.zipWithIndex.map(_.swap).cache()
+
+ // Cluster the documents into three topics using LDA
+ val ldaModel = new LDA().setK(3).run(corpus)
+
+ // Output topics. Each is a distribution over words (matching word count vectors)
+ println("Learned topics (as distributions over vocab of " + ldaModel.vocabSize + " words):")
+ val topics = ldaModel.topicsMatrix
+ for (topic <- Range(0, 3)) {
+ print("Topic " + topic + ":")
+ for (word <- Range(0, ldaModel.vocabSize)) { print(" " + topics(word, topic)); }
+ println()
+ }
+
+ // Save and load model.
+ ldaModel.save(sc, "target/org/apache/spark/LatentDirichletAllocationExample/LDAModel")
+ val sameModel = DistributedLDAModel.load(sc,
+ "target/org/apache/spark/LatentDirichletAllocationExample/LDAModel")
+ // $example off$
+
+ sc.stop()
+ }
+}
+// scalastyle:on println
diff --git a/examples/src/main/scala/org/apache/spark/examples/mllib/PowerIterationClusteringExample.scala b/examples/src/main/scala/org/apache/spark/examples/mllib/PowerIterationClusteringExample.scala
index bb9c1cbca9..a81c9b383d 100644
--- a/examples/src/main/scala/org/apache/spark/examples/mllib/PowerIterationClusteringExample.scala
+++ b/examples/src/main/scala/org/apache/spark/examples/mllib/PowerIterationClusteringExample.scala
@@ -22,7 +22,9 @@ import org.apache.log4j.{Level, Logger}
import scopt.OptionParser
import org.apache.spark.{SparkConf, SparkContext}
+// $example on$
import org.apache.spark.mllib.clustering.PowerIterationClustering
+// $example off$
import org.apache.spark.rdd.RDD
/**
@@ -90,6 +92,7 @@ object PowerIterationClusteringExample {
Logger.getRootLogger.setLevel(Level.WARN)
+ // $example on$
val circlesRdd = generateCirclesRdd(sc, params.k, params.numPoints)
val model = new PowerIterationClustering()
.setK(params.k)
@@ -101,12 +104,13 @@ object PowerIterationClusteringExample {
val assignments = clusters.toList.sortBy { case (k, v) => v.length }
val assignmentsStr = assignments
.map { case (k, v) =>
- s"$k -> ${v.sorted.mkString("[", ",", "]")}"
- }.mkString(", ")
+ s"$k -> ${v.sorted.mkString("[", ",", "]")}"
+ }.mkString(", ")
val sizesStr = assignments.map {
_._2.length
}.sorted.mkString("(", ",", ")")
println(s"Cluster assignments: $assignmentsStr\ncluster sizes: $sizesStr")
+ // $example off$
sc.stop()
}
diff --git a/examples/src/main/scala/org/apache/spark/examples/mllib/StreamingKMeansExample.scala b/examples/src/main/scala/org/apache/spark/examples/mllib/StreamingKMeansExample.scala
index af03724a8a..7888af79f8 100644
--- a/examples/src/main/scala/org/apache/spark/examples/mllib/StreamingKMeansExample.scala
+++ b/examples/src/main/scala/org/apache/spark/examples/mllib/StreamingKMeansExample.scala
@@ -19,10 +19,12 @@
package org.apache.spark.examples.mllib
import org.apache.spark.SparkConf
+// $example on$
import org.apache.spark.mllib.clustering.StreamingKMeans
import org.apache.spark.mllib.linalg.Vectors
import org.apache.spark.mllib.regression.LabeledPoint
import org.apache.spark.streaming.{Seconds, StreamingContext}
+// $example off$
/**
* Estimate clusters on one stream of data and make predictions
@@ -58,7 +60,8 @@ object StreamingKMeansExample {
System.exit(1)
}
- val conf = new SparkConf().setMaster("local").setAppName("StreamingKMeansExample")
+ // $example on$
+ val conf = new SparkConf().setAppName("StreamingKMeansExample")
val ssc = new StreamingContext(conf, Seconds(args(2).toLong))
val trainingData = ssc.textFileStream(args(0)).map(Vectors.parse)
@@ -74,6 +77,7 @@ object StreamingKMeansExample {
ssc.start()
ssc.awaitTermination()
+ // $example off$
}
}
// scalastyle:on println