aboutsummaryrefslogtreecommitdiff
path: root/examples
diff options
context:
space:
mode:
authorXiangrui Meng <meng@databricks.com>2014-04-29 00:41:03 -0700
committerReynold Xin <rxin@apache.org>2014-04-29 00:41:03 -0700
commit3f38334f441940ed0a5bbf5588ca7f22d3940359 (patch)
treef1b6e54f34e810b5efdc5a9fbdfa7737d3eb5d92 /examples
parent497be3ca2d8f0600e927f8f036177fcd3bb6e229 (diff)
downloadspark-3f38334f441940ed0a5bbf5588ca7f22d3940359.tar.gz
spark-3f38334f441940ed0a5bbf5588ca7f22d3940359.tar.bz2
spark-3f38334f441940ed0a5bbf5588ca7f22d3940359.zip
[SPARK-1636][MLLIB] Move main methods to examples
* `NaiveBayes` -> `SparseNaiveBayes` * `KMeans` -> `DenseKMeans` * `SVMWithSGD` and `LogisticRegerssionWithSGD` -> `BinaryClassification` * `ALS` -> `MovieLensALS` * `LinearRegressionWithSGD`, `LassoWithSGD`, and `RidgeRegressionWithSGD` -> `LinearRegression` * `DecisionTree` -> `DecisionTreeRunner` `scopt` is used for parsing command-line parameters. `scopt` has MIT license and it only depends on `scala-library`. Example help message: ~~~ BinaryClassification: an example app for binary classification. Usage: BinaryClassification [options] <input> --numIterations <value> number of iterations --stepSize <value> initial step size, default: 1.0 --algorithm <value> algorithm (SVM,LR), default: LR --regType <value> regularization type (L1,L2), default: L2 --regParam <value> regularization parameter, default: 0.1 <input> input paths to labeled examples in LIBSVM format ~~~ Author: Xiangrui Meng <meng@databricks.com> Closes #584 from mengxr/mllib-main and squashes the following commits: 7b58c60 [Xiangrui Meng] minor 6e35d7e [Xiangrui Meng] make imports explicit and fix code style c6178c9 [Xiangrui Meng] update TS PCA/SVD to use new spark-submit 6acff75 [Xiangrui Meng] use scopt for DecisionTreeRunner be86069 [Xiangrui Meng] use main instead of extending App b3edf68 [Xiangrui Meng] move DecisionTree's main method to examples 8bfaa5a [Xiangrui Meng] change NaiveBayesParams to Params fe23dcb [Xiangrui Meng] remove main from KMeans and add DenseKMeans as an example 67f4448 [Xiangrui Meng] remove main methods from linear regression algorithms and add LinearRegression example b066bbc [Xiangrui Meng] remove main from ALS and add MovieLensALS example b040f3b [Xiangrui Meng] change BinaryClassificationParams to Params 577945b [Xiangrui Meng] remove unused imports from NB 3d299bc [Xiangrui Meng] remove main from LR/SVM and add an example app for binary classification f70878e [Xiangrui Meng] remove main from NaiveBayes and add an example NaiveBayes app 01ec2cd [Xiangrui Meng] Merge branch 'master' into mllib-main 9420692 [Xiangrui Meng] add scopt to examples dependencies
Diffstat (limited to 'examples')
-rw-r--r--examples/pom.xml5
-rw-r--r--examples/src/main/scala/org/apache/spark/examples/mllib/BinaryClassification.scala145
-rw-r--r--examples/src/main/scala/org/apache/spark/examples/mllib/DecisionTreeRunner.scala161
-rw-r--r--examples/src/main/scala/org/apache/spark/examples/mllib/DenseKMeans.scala109
-rw-r--r--examples/src/main/scala/org/apache/spark/examples/mllib/LinearRegression.scala125
-rw-r--r--examples/src/main/scala/org/apache/spark/examples/mllib/MovieLensALS.scala131
-rw-r--r--examples/src/main/scala/org/apache/spark/examples/mllib/SparseNaiveBayes.scala102
-rw-r--r--examples/src/main/scala/org/apache/spark/examples/mllib/TallSkinnyPCA.scala12
-rw-r--r--examples/src/main/scala/org/apache/spark/examples/mllib/TallSkinnySVD.scala12
9 files changed, 786 insertions, 16 deletions
diff --git a/examples/pom.xml b/examples/pom.xml
index a2d1b19736..e1fc149d87 100644
--- a/examples/pom.xml
+++ b/examples/pom.xml
@@ -166,6 +166,11 @@
</exclusion>
</exclusions>
</dependency>
+ <dependency>
+ <groupId>com.github.scopt</groupId>
+ <artifactId>scopt_${scala.binary.version}</artifactId>
+ <version>3.2.0</version>
+ </dependency>
</dependencies>
<build>
diff --git a/examples/src/main/scala/org/apache/spark/examples/mllib/BinaryClassification.scala b/examples/src/main/scala/org/apache/spark/examples/mllib/BinaryClassification.scala
new file mode 100644
index 0000000000..ec9de022c1
--- /dev/null
+++ b/examples/src/main/scala/org/apache/spark/examples/mllib/BinaryClassification.scala
@@ -0,0 +1,145 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.examples.mllib
+
+import org.apache.log4j.{Level, Logger}
+import scopt.OptionParser
+
+import org.apache.spark.{SparkConf, SparkContext}
+import org.apache.spark.mllib.classification.{LogisticRegressionWithSGD, SVMWithSGD}
+import org.apache.spark.mllib.evaluation.binary.BinaryClassificationMetrics
+import org.apache.spark.mllib.util.MLUtils
+import org.apache.spark.mllib.optimization.{SquaredL2Updater, L1Updater}
+
+/**
+ * An example app for binary classification. Run with
+ * {{{
+ * ./bin/run-example org.apache.spark.examples.mllib.BinaryClassification
+ * }}}
+ * If you use it as a template to create your own app, please use `spark-submit` to submit your app.
+ */
+object BinaryClassification {
+
+ object Algorithm extends Enumeration {
+ type Algorithm = Value
+ val SVM, LR = Value
+ }
+
+ object RegType extends Enumeration {
+ type RegType = Value
+ val L1, L2 = Value
+ }
+
+ import Algorithm._
+ import RegType._
+
+ case class Params(
+ input: String = null,
+ numIterations: Int = 100,
+ stepSize: Double = 1.0,
+ algorithm: Algorithm = LR,
+ regType: RegType = L2,
+ regParam: Double = 0.1)
+
+ def main(args: Array[String]) {
+ val defaultParams = Params()
+
+ val parser = new OptionParser[Params]("BinaryClassification") {
+ head("BinaryClassification: an example app for binary classification.")
+ opt[Int]("numIterations")
+ .text("number of iterations")
+ .action((x, c) => c.copy(numIterations = x))
+ opt[Double]("stepSize")
+ .text(s"initial step size, default: ${defaultParams.stepSize}")
+ .action((x, c) => c.copy(stepSize = x))
+ opt[String]("algorithm")
+ .text(s"algorithm (${Algorithm.values.mkString(",")}), " +
+ s"default: ${defaultParams.algorithm}")
+ .action((x, c) => c.copy(algorithm = Algorithm.withName(x)))
+ opt[String]("regType")
+ .text(s"regularization type (${RegType.values.mkString(",")}), " +
+ s"default: ${defaultParams.regType}")
+ .action((x, c) => c.copy(regType = RegType.withName(x)))
+ opt[Double]("regParam")
+ .text(s"regularization parameter, default: ${defaultParams.regParam}")
+ arg[String]("<input>")
+ .required()
+ .text("input paths to labeled examples in LIBSVM format")
+ .action((x, c) => c.copy(input = x))
+ }
+
+ parser.parse(args, defaultParams).map { params =>
+ run(params)
+ } getOrElse {
+ sys.exit(1)
+ }
+ }
+
+ def run(params: Params) {
+ val conf = new SparkConf().setAppName(s"BinaryClassification with $params")
+ val sc = new SparkContext(conf)
+
+ Logger.getRootLogger.setLevel(Level.WARN)
+
+ val examples = MLUtils.loadLibSVMData(sc, params.input).cache()
+
+ val splits = examples.randomSplit(Array(0.8, 0.2))
+ val training = splits(0).cache()
+ val test = splits(1).cache()
+
+ val numTraining = training.count()
+ val numTest = test.count()
+ println(s"Training: $numTraining, test: $numTest.")
+
+ examples.unpersist(blocking = false)
+
+ val updater = params.regType match {
+ case L1 => new L1Updater()
+ case L2 => new SquaredL2Updater()
+ }
+
+ val model = params.algorithm match {
+ case LR =>
+ val algorithm = new LogisticRegressionWithSGD()
+ algorithm.optimizer
+ .setNumIterations(params.numIterations)
+ .setStepSize(params.stepSize)
+ .setUpdater(updater)
+ .setRegParam(params.regParam)
+ algorithm.run(training).clearThreshold()
+ case SVM =>
+ val algorithm = new SVMWithSGD()
+ algorithm.optimizer
+ .setNumIterations(params.numIterations)
+ .setStepSize(params.stepSize)
+ .setUpdater(updater)
+ .setRegParam(params.regParam)
+ algorithm.run(training).clearThreshold()
+ }
+
+ val prediction = model.predict(test.map(_.features))
+ val predictionAndLabel = prediction.zip(test.map(_.label))
+
+ val metrics = new BinaryClassificationMetrics(predictionAndLabel)
+
+ println(s"Test areaUnderPR = ${metrics.areaUnderPR()}.")
+ println(s"Test areaUnderROC = ${metrics.areaUnderROC()}.")
+
+ sc.stop()
+ }
+}
diff --git a/examples/src/main/scala/org/apache/spark/examples/mllib/DecisionTreeRunner.scala b/examples/src/main/scala/org/apache/spark/examples/mllib/DecisionTreeRunner.scala
new file mode 100644
index 0000000000..0bd847d7ba
--- /dev/null
+++ b/examples/src/main/scala/org/apache/spark/examples/mllib/DecisionTreeRunner.scala
@@ -0,0 +1,161 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.examples.mllib
+
+import scopt.OptionParser
+
+import org.apache.spark.{SparkConf, SparkContext}
+import org.apache.spark.SparkContext._
+import org.apache.spark.mllib.linalg.Vector
+import org.apache.spark.mllib.regression.LabeledPoint
+import org.apache.spark.mllib.tree.{DecisionTree, impurity}
+import org.apache.spark.mllib.tree.configuration.{Algo, Strategy}
+import org.apache.spark.mllib.tree.configuration.Algo._
+import org.apache.spark.mllib.tree.model.DecisionTreeModel
+import org.apache.spark.mllib.util.MLUtils
+import org.apache.spark.rdd.RDD
+
+/**
+ * An example runner for decision tree. Run with
+ * {{{
+ * ./bin/spark-example org.apache.spark.examples.mllib.DecisionTreeRunner [options]
+ * }}}
+ * If you use it as a template to create your own app, please use `spark-submit` to submit your app.
+ */
+object DecisionTreeRunner {
+
+ object ImpurityType extends Enumeration {
+ type ImpurityType = Value
+ val Gini, Entropy, Variance = Value
+ }
+
+ import ImpurityType._
+
+ case class Params(
+ input: String = null,
+ algo: Algo = Classification,
+ maxDepth: Int = 5,
+ impurity: ImpurityType = Gini,
+ maxBins: Int = 20)
+
+ def main(args: Array[String]) {
+ val defaultParams = Params()
+
+ val parser = new OptionParser[Params]("DecisionTreeRunner") {
+ head("DecisionTreeRunner: an example decision tree app.")
+ opt[String]("algo")
+ .text(s"algorithm (${Algo.values.mkString(",")}), default: ${defaultParams.algo}")
+ .action((x, c) => c.copy(algo = Algo.withName(x)))
+ opt[String]("impurity")
+ .text(s"impurity type (${ImpurityType.values.mkString(",")}), " +
+ s"default: ${defaultParams.impurity}")
+ .action((x, c) => c.copy(impurity = ImpurityType.withName(x)))
+ opt[Int]("maxDepth")
+ .text(s"max depth of the tree, default: ${defaultParams.maxDepth}")
+ .action((x, c) => c.copy(maxDepth = x))
+ opt[Int]("maxBins")
+ .text(s"max number of bins, default: ${defaultParams.maxBins}")
+ .action((x, c) => c.copy(maxBins = x))
+ arg[String]("<input>")
+ .text("input paths to labeled examples in dense format (label,f0 f1 f2 ...)")
+ .required()
+ .action((x, c) => c.copy(input = x))
+ checkConfig { params =>
+ if (params.algo == Classification &&
+ (params.impurity == Gini || params.impurity == Entropy)) {
+ success
+ } else if (params.algo == Regression && params.impurity == Variance) {
+ success
+ } else {
+ failure(s"Algo ${params.algo} is not compatible with impurity ${params.impurity}.")
+ }
+ }
+ }
+
+ parser.parse(args, defaultParams).map { params =>
+ run(params)
+ }.getOrElse {
+ sys.exit(1)
+ }
+ }
+
+ def run(params: Params) {
+ val conf = new SparkConf().setAppName("DecisionTreeRunner")
+ val sc = new SparkContext(conf)
+
+ // Load training data and cache it.
+ val examples = MLUtils.loadLabeledData(sc, params.input).cache()
+
+ val splits = examples.randomSplit(Array(0.8, 0.2))
+ val training = splits(0).cache()
+ val test = splits(1).cache()
+
+ val numTraining = training.count()
+ val numTest = test.count()
+
+ println(s"numTraining = $numTraining, numTest = $numTest.")
+
+ examples.unpersist(blocking = false)
+
+ val impurityCalculator = params.impurity match {
+ case Gini => impurity.Gini
+ case Entropy => impurity.Entropy
+ case Variance => impurity.Variance
+ }
+
+ val strategy = new Strategy(params.algo, impurityCalculator, params.maxDepth, params.maxBins)
+ val model = DecisionTree.train(training, strategy)
+
+ if (params.algo == Classification) {
+ val accuracy = accuracyScore(model, test)
+ println(s"Test accuracy = $accuracy.")
+ }
+
+ if (params.algo == Regression) {
+ val mse = meanSquaredError(model, test)
+ println(s"Test mean squared error = $mse.")
+ }
+
+ sc.stop()
+ }
+
+ /**
+ * Calculates the classifier accuracy.
+ */
+ private def accuracyScore(
+ model: DecisionTreeModel,
+ data: RDD[LabeledPoint],
+ threshold: Double = 0.5): Double = {
+ def predictedValue(features: Vector): Double = {
+ if (model.predict(features) < threshold) 0.0 else 1.0
+ }
+ val correctCount = data.filter(y => predictedValue(y.features) == y.label).count()
+ val count = data.count()
+ correctCount.toDouble / count
+ }
+
+ /**
+ * Calculates the mean squared error for regression.
+ */
+ private def meanSquaredError(tree: DecisionTreeModel, data: RDD[LabeledPoint]): Double = {
+ data.map { y =>
+ val err = tree.predict(y.features) - y.label
+ err * err
+ }.mean()
+ }
+}
diff --git a/examples/src/main/scala/org/apache/spark/examples/mllib/DenseKMeans.scala b/examples/src/main/scala/org/apache/spark/examples/mllib/DenseKMeans.scala
new file mode 100644
index 0000000000..f96bc1bf00
--- /dev/null
+++ b/examples/src/main/scala/org/apache/spark/examples/mllib/DenseKMeans.scala
@@ -0,0 +1,109 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.examples.mllib
+
+import org.apache.log4j.{Level, Logger}
+import scopt.OptionParser
+
+import org.apache.spark.{SparkConf, SparkContext}
+import org.apache.spark.mllib.clustering.KMeans
+import org.apache.spark.mllib.linalg.Vectors
+
+/**
+ * An example k-means app. Run with
+ * {{{
+ * ./bin/spark-example org.apache.spark.examples.mllib.DenseKMeans [options] <input>
+ * }}}
+ * If you use it as a template to create your own app, please use `spark-submit` to submit your app.
+ */
+object DenseKMeans {
+
+ object InitializationMode extends Enumeration {
+ type InitializationMode = Value
+ val Random, Parallel = Value
+ }
+
+ import InitializationMode._
+
+ case class Params(
+ input: String = null,
+ k: Int = -1,
+ numIterations: Int = 10,
+ initializationMode: InitializationMode = Parallel)
+
+ def main(args: Array[String]) {
+ val defaultParams = Params()
+
+ val parser = new OptionParser[Params]("DenseKMeans") {
+ head("DenseKMeans: an example k-means app for dense data.")
+ opt[Int]('k', "k")
+ .required()
+ .text(s"number of clusters, required")
+ .action((x, c) => c.copy(k = x))
+ opt[Int]("numIterations")
+ .text(s"number of iterations, default; ${defaultParams.numIterations}")
+ .action((x, c) => c.copy(numIterations = x))
+ opt[String]("initMode")
+ .text(s"initialization mode (${InitializationMode.values.mkString(",")}), " +
+ s"default: ${defaultParams.initializationMode}")
+ .action((x, c) => c.copy(initializationMode = InitializationMode.withName(x)))
+ arg[String]("<input>")
+ .text("input paths to examples")
+ .required()
+ .action((x, c) => c.copy(input = x))
+ }
+
+ parser.parse(args, defaultParams).map { params =>
+ run(params)
+ }.getOrElse {
+ sys.exit(1)
+ }
+ }
+
+ def run(params: Params) {
+ val conf = new SparkConf().setAppName(s"DenseKMeans with $params")
+ val sc = new SparkContext(conf)
+
+ Logger.getRootLogger.setLevel(Level.WARN)
+
+ val examples = sc.textFile(params.input).map { line =>
+ Vectors.dense(line.split(' ').map(_.toDouble))
+ }.cache()
+
+ val numExamples = examples.count()
+
+ println(s"numExamples = $numExamples.")
+
+ val initMode = params.initializationMode match {
+ case Random => KMeans.RANDOM
+ case Parallel => KMeans.K_MEANS_PARALLEL
+ }
+
+ val model = new KMeans()
+ .setInitializationMode(initMode)
+ .setK(params.k)
+ .setMaxIterations(params.numIterations)
+ .run(examples)
+
+ val cost = model.computeCost(examples)
+
+ println(s"Total cost = $cost.")
+
+ sc.stop()
+ }
+}
diff --git a/examples/src/main/scala/org/apache/spark/examples/mllib/LinearRegression.scala b/examples/src/main/scala/org/apache/spark/examples/mllib/LinearRegression.scala
new file mode 100644
index 0000000000..1723ca6931
--- /dev/null
+++ b/examples/src/main/scala/org/apache/spark/examples/mllib/LinearRegression.scala
@@ -0,0 +1,125 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.examples.mllib
+
+import org.apache.log4j.{Level, Logger}
+import scopt.OptionParser
+
+import org.apache.spark.{SparkConf, SparkContext}
+import org.apache.spark.mllib.regression.LinearRegressionWithSGD
+import org.apache.spark.mllib.util.{MulticlassLabelParser, MLUtils}
+import org.apache.spark.mllib.optimization.{SimpleUpdater, SquaredL2Updater, L1Updater}
+
+/**
+ * An example app for linear regression. Run with
+ * {{{
+ * ./bin/run-example org.apache.spark.examples.mllib.LinearRegression
+ * }}}
+ * If you use it as a template to create your own app, please use `spark-submit` to submit your app.
+ */
+object LinearRegression extends App {
+
+ object RegType extends Enumeration {
+ type RegType = Value
+ val NONE, L1, L2 = Value
+ }
+
+ import RegType._
+
+ case class Params(
+ input: String = null,
+ numIterations: Int = 100,
+ stepSize: Double = 1.0,
+ regType: RegType = L2,
+ regParam: Double = 0.1)
+
+ val defaultParams = Params()
+
+ val parser = new OptionParser[Params]("LinearRegression") {
+ head("LinearRegression: an example app for linear regression.")
+ opt[Int]("numIterations")
+ .text("number of iterations")
+ .action((x, c) => c.copy(numIterations = x))
+ opt[Double]("stepSize")
+ .text(s"initial step size, default: ${defaultParams.stepSize}")
+ .action((x, c) => c.copy(stepSize = x))
+ opt[String]("regType")
+ .text(s"regularization type (${RegType.values.mkString(",")}), " +
+ s"default: ${defaultParams.regType}")
+ .action((x, c) => c.copy(regType = RegType.withName(x)))
+ opt[Double]("regParam")
+ .text(s"regularization parameter, default: ${defaultParams.regParam}")
+ arg[String]("<input>")
+ .required()
+ .text("input paths to labeled examples in LIBSVM format")
+ .action((x, c) => c.copy(input = x))
+ }
+
+ parser.parse(args, defaultParams).map { params =>
+ run(params)
+ } getOrElse {
+ sys.exit(1)
+ }
+
+ def run(params: Params) {
+ val conf = new SparkConf().setAppName(s"LinearRegression with $params")
+ val sc = new SparkContext(conf)
+
+ Logger.getRootLogger.setLevel(Level.WARN)
+
+ val examples = MLUtils.loadLibSVMData(sc, params.input, MulticlassLabelParser).cache()
+
+ val splits = examples.randomSplit(Array(0.8, 0.2))
+ val training = splits(0).cache()
+ val test = splits(1).cache()
+
+ val numTraining = training.count()
+ val numTest = test.count()
+ println(s"Training: $numTraining, test: $numTest.")
+
+ examples.unpersist(blocking = false)
+
+ val updater = params.regType match {
+ case NONE => new SimpleUpdater()
+ case L1 => new L1Updater()
+ case L2 => new SquaredL2Updater()
+ }
+
+ val algorithm = new LinearRegressionWithSGD()
+ algorithm.optimizer
+ .setNumIterations(params.numIterations)
+ .setStepSize(params.stepSize)
+ .setUpdater(updater)
+ .setRegParam(params.regParam)
+
+ val model = algorithm.run(training)
+
+ val prediction = model.predict(test.map(_.features))
+ val predictionAndLabel = prediction.zip(test.map(_.label))
+
+ val loss = predictionAndLabel.map { case (p, l) =>
+ val err = p - l
+ err * err
+ }.reduce(_ + _)
+ val rmse = math.sqrt(loss / numTest)
+
+ println(s"Test RMSE = $rmse.")
+
+ sc.stop()
+ }
+}
diff --git a/examples/src/main/scala/org/apache/spark/examples/mllib/MovieLensALS.scala b/examples/src/main/scala/org/apache/spark/examples/mllib/MovieLensALS.scala
new file mode 100644
index 0000000000..703f02255b
--- /dev/null
+++ b/examples/src/main/scala/org/apache/spark/examples/mllib/MovieLensALS.scala
@@ -0,0 +1,131 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.examples.mllib
+
+import com.esotericsoftware.kryo.Kryo
+import org.apache.log4j.{Level, Logger}
+import scopt.OptionParser
+
+import org.apache.spark.{SparkConf, SparkContext}
+import org.apache.spark.SparkContext._
+import org.apache.spark.mllib.recommendation.{ALS, MatrixFactorizationModel, Rating}
+import org.apache.spark.rdd.RDD
+import org.apache.spark.serializer.{KryoSerializer, KryoRegistrator}
+
+/**
+ * An example app for ALS on MovieLens data (http://grouplens.org/datasets/movielens/).
+ */
+object MovieLensALS {
+
+ class ALSRegistrator extends KryoRegistrator {
+ override def registerClasses(kryo: Kryo) {
+ kryo.register(classOf[Rating])
+ }
+ }
+
+ case class Params(
+ input: String = null,
+ kryo: Boolean = false,
+ numIterations: Int = 20,
+ lambda: Double = 1.0,
+ rank: Int = 10)
+
+ def main(args: Array[String]) {
+ val defaultParams = Params()
+
+ val parser = new OptionParser[Params]("MovieLensALS") {
+ head("MovieLensALS: an example app for ALS on MovieLens data.")
+ opt[Int]("rank")
+ .text(s"rank, default: ${defaultParams.rank}}")
+ .action((x, c) => c.copy(rank = x))
+ opt[Int]("numIterations")
+ .text(s"number of iterations, default: ${defaultParams.numIterations}")
+ .action((x, c) => c.copy(numIterations = x))
+ opt[Double]("lambda")
+ .text(s"lambda (smoothing constant), default: ${defaultParams.lambda}")
+ .action((x, c) => c.copy(lambda = x))
+ opt[Unit]("kryo")
+ .text(s"use Kryo serialization")
+ .action((_, c) => c.copy(kryo = true))
+ arg[String]("<input>")
+ .required()
+ .text("input paths to a MovieLens dataset of ratings")
+ .action((x, c) => c.copy(input = x))
+ }
+
+ parser.parse(args, defaultParams).map { params =>
+ run(params)
+ } getOrElse {
+ System.exit(1)
+ }
+ }
+
+ def run(params: Params) {
+ val conf = new SparkConf().setAppName(s"MovieLensALS with $params")
+ if (params.kryo) {
+ conf.set("spark.serializer", classOf[KryoSerializer].getName)
+ .set("spark.kryo.registrator", classOf[ALSRegistrator].getName)
+ .set("spark.kryoserializer.buffer.mb", "8")
+ }
+ val sc = new SparkContext(conf)
+
+ Logger.getRootLogger.setLevel(Level.WARN)
+
+ val ratings = sc.textFile(params.input).map { line =>
+ val fields = line.split("::")
+ Rating(fields(0).toInt, fields(1).toInt, fields(2).toDouble)
+ }.cache()
+
+ val numRatings = ratings.count()
+ val numUsers = ratings.map(_.user).distinct().count()
+ val numMovies = ratings.map(_.product).distinct().count()
+
+ println(s"Got $numRatings ratings from $numUsers users on $numMovies movies.")
+
+ val splits = ratings.randomSplit(Array(0.8, 0.2))
+ val training = splits(0).cache()
+ val test = splits(1).cache()
+
+ val numTraining = training.count()
+ val numTest = test.count()
+ println(s"Training: $numTraining, test: $numTest.")
+
+ ratings.unpersist(blocking = false)
+
+ val model = new ALS()
+ .setRank(params.rank)
+ .setIterations(params.numIterations)
+ .setLambda(params.lambda)
+ .run(training)
+
+ val rmse = computeRmse(model, test, numTest)
+
+ println(s"Test RMSE = $rmse.")
+
+ sc.stop()
+ }
+
+ /** Compute RMSE (Root Mean Squared Error). */
+ def computeRmse(model: MatrixFactorizationModel, data: RDD[Rating], n: Long) = {
+ val predictions: RDD[Rating] = model.predict(data.map(x => (x.user, x.product)))
+ val predictionsAndRatings = predictions.map(x => ((x.user, x.product), x.rating))
+ .join(data.map(x => ((x.user, x.product), x.rating)))
+ .values
+ math.sqrt(predictionsAndRatings.map(x => (x._1 - x._2) * (x._1 - x._2)).reduce(_ + _) / n)
+ }
+}
diff --git a/examples/src/main/scala/org/apache/spark/examples/mllib/SparseNaiveBayes.scala b/examples/src/main/scala/org/apache/spark/examples/mllib/SparseNaiveBayes.scala
new file mode 100644
index 0000000000..25b6768b8d
--- /dev/null
+++ b/examples/src/main/scala/org/apache/spark/examples/mllib/SparseNaiveBayes.scala
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.examples.mllib
+
+import org.apache.log4j.{Level, Logger}
+import scopt.OptionParser
+
+import org.apache.spark.{SparkConf, SparkContext}
+import org.apache.spark.mllib.classification.NaiveBayes
+import org.apache.spark.mllib.util.{MLUtils, MulticlassLabelParser}
+
+/**
+ * An example naive Bayes app. Run with
+ * {{{
+ * ./bin/spark-example org.apache.spark.examples.mllib.SparseNaiveBayes [options] <input>
+ * }}}
+ * If you use it as a template to create your own app, please use `spark-submit` to submit your app.
+ */
+object SparseNaiveBayes {
+
+ case class Params(
+ input: String = null,
+ minPartitions: Int = 0,
+ numFeatures: Int = -1,
+ lambda: Double = 1.0)
+
+ def main(args: Array[String]) {
+ val defaultParams = Params()
+
+ val parser = new OptionParser[Params]("SparseNaiveBayes") {
+ head("SparseNaiveBayes: an example naive Bayes app for LIBSVM data.")
+ opt[Int]("numPartitions")
+ .text("min number of partitions")
+ .action((x, c) => c.copy(minPartitions = x))
+ opt[Int]("numFeatures")
+ .text("number of features")
+ .action((x, c) => c.copy(numFeatures = x))
+ opt[Double]("lambda")
+ .text(s"lambda (smoothing constant), default: ${defaultParams.lambda}")
+ .action((x, c) => c.copy(lambda = x))
+ arg[String]("<input>")
+ .text("input paths to labeled examples in LIBSVM format")
+ .required()
+ .action((x, c) => c.copy(input = x))
+ }
+
+ parser.parse(args, defaultParams).map { params =>
+ run(params)
+ }.getOrElse {
+ sys.exit(1)
+ }
+ }
+
+ def run(params: Params) {
+ val conf = new SparkConf().setAppName(s"SparseNaiveBayes with $params")
+ val sc = new SparkContext(conf)
+
+ Logger.getRootLogger.setLevel(Level.WARN)
+
+ val minPartitions =
+ if (params.minPartitions > 0) params.minPartitions else sc.defaultMinPartitions
+
+ val examples = MLUtils.loadLibSVMData(sc, params.input, MulticlassLabelParser,
+ params.numFeatures, minPartitions)
+ // Cache examples because it will be used in both training and evaluation.
+ examples.cache()
+
+ val splits = examples.randomSplit(Array(0.8, 0.2))
+ val training = splits(0)
+ val test = splits(1)
+
+ val numTraining = training.count()
+ val numTest = test.count()
+
+ println(s"numTraining = $numTraining, numTest = $numTest.")
+
+ val model = new NaiveBayes().setLambda(params.lambda).run(training)
+
+ val prediction = model.predict(test.map(_.features))
+ val predictionAndLabel = prediction.zip(test.map(_.label))
+ val accuracy = predictionAndLabel.filter(x => x._1 == x._2).count().toDouble / numTest
+
+ println(s"Test accuracy = $accuracy.")
+
+ sc.stop()
+ }
+}
diff --git a/examples/src/main/scala/org/apache/spark/examples/mllib/TallSkinnyPCA.scala b/examples/src/main/scala/org/apache/spark/examples/mllib/TallSkinnyPCA.scala
index 39e71cdab4..3cd9cb743e 100644
--- a/examples/src/main/scala/org/apache/spark/examples/mllib/TallSkinnyPCA.scala
+++ b/examples/src/main/scala/org/apache/spark/examples/mllib/TallSkinnyPCA.scala
@@ -35,20 +35,16 @@ import org.apache.spark.mllib.linalg.Vectors
*/
object TallSkinnyPCA {
def main(args: Array[String]) {
- if (args.length != 2) {
- System.err.println("Usage: TallSkinnyPCA <master> <file>")
+ if (args.length != 1) {
+ System.err.println("Usage: TallSkinnyPCA <input>")
System.exit(1)
}
- val conf = new SparkConf()
- .setMaster(args(0))
- .setAppName("TallSkinnyPCA")
- .setSparkHome(System.getenv("SPARK_HOME"))
- .setJars(SparkContext.jarOfClass(this.getClass).toSeq)
+ val conf = new SparkConf().setAppName("TallSkinnyPCA")
val sc = new SparkContext(conf)
// Load and parse the data file.
- val rows = sc.textFile(args(1)).map { line =>
+ val rows = sc.textFile(args(0)).map { line =>
val values = line.split(' ').map(_.toDouble)
Vectors.dense(values)
}
diff --git a/examples/src/main/scala/org/apache/spark/examples/mllib/TallSkinnySVD.scala b/examples/src/main/scala/org/apache/spark/examples/mllib/TallSkinnySVD.scala
index 2b7de2acc6..4d66903186 100644
--- a/examples/src/main/scala/org/apache/spark/examples/mllib/TallSkinnySVD.scala
+++ b/examples/src/main/scala/org/apache/spark/examples/mllib/TallSkinnySVD.scala
@@ -35,20 +35,16 @@ import org.apache.spark.mllib.linalg.Vectors
*/
object TallSkinnySVD {
def main(args: Array[String]) {
- if (args.length != 2) {
- System.err.println("Usage: TallSkinnySVD <master> <file>")
+ if (args.length != 1) {
+ System.err.println("Usage: TallSkinnySVD <input>")
System.exit(1)
}
- val conf = new SparkConf()
- .setMaster(args(0))
- .setAppName("TallSkinnySVD")
- .setSparkHome(System.getenv("SPARK_HOME"))
- .setJars(SparkContext.jarOfClass(this.getClass).toSeq)
+ val conf = new SparkConf().setAppName("TallSkinnySVD")
val sc = new SparkContext(conf)
// Load and parse the data file.
- val rows = sc.textFile(args(1)).map { line =>
+ val rows = sc.textFile(args(0)).map { line =>
val values = line.split(' ').map(_.toDouble)
Vectors.dense(values)
}