diff options
author | Xiangrui Meng <meng@databricks.com> | 2014-04-29 00:41:03 -0700 |
---|---|---|
committer | Reynold Xin <rxin@apache.org> | 2014-04-29 00:41:03 -0700 |
commit | 3f38334f441940ed0a5bbf5588ca7f22d3940359 (patch) | |
tree | f1b6e54f34e810b5efdc5a9fbdfa7737d3eb5d92 /examples/src/main | |
parent | 497be3ca2d8f0600e927f8f036177fcd3bb6e229 (diff) | |
download | spark-3f38334f441940ed0a5bbf5588ca7f22d3940359.tar.gz spark-3f38334f441940ed0a5bbf5588ca7f22d3940359.tar.bz2 spark-3f38334f441940ed0a5bbf5588ca7f22d3940359.zip |
[SPARK-1636][MLLIB] Move main methods to examples
* `NaiveBayes` -> `SparseNaiveBayes`
* `KMeans` -> `DenseKMeans`
* `SVMWithSGD` and `LogisticRegerssionWithSGD` -> `BinaryClassification`
* `ALS` -> `MovieLensALS`
* `LinearRegressionWithSGD`, `LassoWithSGD`, and `RidgeRegressionWithSGD` -> `LinearRegression`
* `DecisionTree` -> `DecisionTreeRunner`
`scopt` is used for parsing command-line parameters. `scopt` has MIT license and it only depends on `scala-library`.
Example help message:
~~~
BinaryClassification: an example app for binary classification.
Usage: BinaryClassification [options] <input>
--numIterations <value>
number of iterations
--stepSize <value>
initial step size, default: 1.0
--algorithm <value>
algorithm (SVM,LR), default: LR
--regType <value>
regularization type (L1,L2), default: L2
--regParam <value>
regularization parameter, default: 0.1
<input>
input paths to labeled examples in LIBSVM format
~~~
Author: Xiangrui Meng <meng@databricks.com>
Closes #584 from mengxr/mllib-main and squashes the following commits:
7b58c60 [Xiangrui Meng] minor
6e35d7e [Xiangrui Meng] make imports explicit and fix code style
c6178c9 [Xiangrui Meng] update TS PCA/SVD to use new spark-submit
6acff75 [Xiangrui Meng] use scopt for DecisionTreeRunner
be86069 [Xiangrui Meng] use main instead of extending App
b3edf68 [Xiangrui Meng] move DecisionTree's main method to examples
8bfaa5a [Xiangrui Meng] change NaiveBayesParams to Params
fe23dcb [Xiangrui Meng] remove main from KMeans and add DenseKMeans as an example
67f4448 [Xiangrui Meng] remove main methods from linear regression algorithms and add LinearRegression example
b066bbc [Xiangrui Meng] remove main from ALS and add MovieLensALS example
b040f3b [Xiangrui Meng] change BinaryClassificationParams to Params
577945b [Xiangrui Meng] remove unused imports from NB
3d299bc [Xiangrui Meng] remove main from LR/SVM and add an example app for binary classification
f70878e [Xiangrui Meng] remove main from NaiveBayes and add an example NaiveBayes app
01ec2cd [Xiangrui Meng] Merge branch 'master' into mllib-main
9420692 [Xiangrui Meng] add scopt to examples dependencies
Diffstat (limited to 'examples/src/main')
8 files changed, 781 insertions, 16 deletions
diff --git a/examples/src/main/scala/org/apache/spark/examples/mllib/BinaryClassification.scala b/examples/src/main/scala/org/apache/spark/examples/mllib/BinaryClassification.scala new file mode 100644 index 0000000000..ec9de022c1 --- /dev/null +++ b/examples/src/main/scala/org/apache/spark/examples/mllib/BinaryClassification.scala @@ -0,0 +1,145 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.examples.mllib + +import org.apache.log4j.{Level, Logger} +import scopt.OptionParser + +import org.apache.spark.{SparkConf, SparkContext} +import org.apache.spark.mllib.classification.{LogisticRegressionWithSGD, SVMWithSGD} +import org.apache.spark.mllib.evaluation.binary.BinaryClassificationMetrics +import org.apache.spark.mllib.util.MLUtils +import org.apache.spark.mllib.optimization.{SquaredL2Updater, L1Updater} + +/** + * An example app for binary classification. Run with + * {{{ + * ./bin/run-example org.apache.spark.examples.mllib.BinaryClassification + * }}} + * If you use it as a template to create your own app, please use `spark-submit` to submit your app. + */ +object BinaryClassification { + + object Algorithm extends Enumeration { + type Algorithm = Value + val SVM, LR = Value + } + + object RegType extends Enumeration { + type RegType = Value + val L1, L2 = Value + } + + import Algorithm._ + import RegType._ + + case class Params( + input: String = null, + numIterations: Int = 100, + stepSize: Double = 1.0, + algorithm: Algorithm = LR, + regType: RegType = L2, + regParam: Double = 0.1) + + def main(args: Array[String]) { + val defaultParams = Params() + + val parser = new OptionParser[Params]("BinaryClassification") { + head("BinaryClassification: an example app for binary classification.") + opt[Int]("numIterations") + .text("number of iterations") + .action((x, c) => c.copy(numIterations = x)) + opt[Double]("stepSize") + .text(s"initial step size, default: ${defaultParams.stepSize}") + .action((x, c) => c.copy(stepSize = x)) + opt[String]("algorithm") + .text(s"algorithm (${Algorithm.values.mkString(",")}), " + + s"default: ${defaultParams.algorithm}") + .action((x, c) => c.copy(algorithm = Algorithm.withName(x))) + opt[String]("regType") + .text(s"regularization type (${RegType.values.mkString(",")}), " + + s"default: ${defaultParams.regType}") + .action((x, c) => c.copy(regType = RegType.withName(x))) + opt[Double]("regParam") + .text(s"regularization parameter, default: ${defaultParams.regParam}") + arg[String]("<input>") + .required() + .text("input paths to labeled examples in LIBSVM format") + .action((x, c) => c.copy(input = x)) + } + + parser.parse(args, defaultParams).map { params => + run(params) + } getOrElse { + sys.exit(1) + } + } + + def run(params: Params) { + val conf = new SparkConf().setAppName(s"BinaryClassification with $params") + val sc = new SparkContext(conf) + + Logger.getRootLogger.setLevel(Level.WARN) + + val examples = MLUtils.loadLibSVMData(sc, params.input).cache() + + val splits = examples.randomSplit(Array(0.8, 0.2)) + val training = splits(0).cache() + val test = splits(1).cache() + + val numTraining = training.count() + val numTest = test.count() + println(s"Training: $numTraining, test: $numTest.") + + examples.unpersist(blocking = false) + + val updater = params.regType match { + case L1 => new L1Updater() + case L2 => new SquaredL2Updater() + } + + val model = params.algorithm match { + case LR => + val algorithm = new LogisticRegressionWithSGD() + algorithm.optimizer + .setNumIterations(params.numIterations) + .setStepSize(params.stepSize) + .setUpdater(updater) + .setRegParam(params.regParam) + algorithm.run(training).clearThreshold() + case SVM => + val algorithm = new SVMWithSGD() + algorithm.optimizer + .setNumIterations(params.numIterations) + .setStepSize(params.stepSize) + .setUpdater(updater) + .setRegParam(params.regParam) + algorithm.run(training).clearThreshold() + } + + val prediction = model.predict(test.map(_.features)) + val predictionAndLabel = prediction.zip(test.map(_.label)) + + val metrics = new BinaryClassificationMetrics(predictionAndLabel) + + println(s"Test areaUnderPR = ${metrics.areaUnderPR()}.") + println(s"Test areaUnderROC = ${metrics.areaUnderROC()}.") + + sc.stop() + } +} diff --git a/examples/src/main/scala/org/apache/spark/examples/mllib/DecisionTreeRunner.scala b/examples/src/main/scala/org/apache/spark/examples/mllib/DecisionTreeRunner.scala new file mode 100644 index 0000000000..0bd847d7ba --- /dev/null +++ b/examples/src/main/scala/org/apache/spark/examples/mllib/DecisionTreeRunner.scala @@ -0,0 +1,161 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.examples.mllib + +import scopt.OptionParser + +import org.apache.spark.{SparkConf, SparkContext} +import org.apache.spark.SparkContext._ +import org.apache.spark.mllib.linalg.Vector +import org.apache.spark.mllib.regression.LabeledPoint +import org.apache.spark.mllib.tree.{DecisionTree, impurity} +import org.apache.spark.mllib.tree.configuration.{Algo, Strategy} +import org.apache.spark.mllib.tree.configuration.Algo._ +import org.apache.spark.mllib.tree.model.DecisionTreeModel +import org.apache.spark.mllib.util.MLUtils +import org.apache.spark.rdd.RDD + +/** + * An example runner for decision tree. Run with + * {{{ + * ./bin/spark-example org.apache.spark.examples.mllib.DecisionTreeRunner [options] + * }}} + * If you use it as a template to create your own app, please use `spark-submit` to submit your app. + */ +object DecisionTreeRunner { + + object ImpurityType extends Enumeration { + type ImpurityType = Value + val Gini, Entropy, Variance = Value + } + + import ImpurityType._ + + case class Params( + input: String = null, + algo: Algo = Classification, + maxDepth: Int = 5, + impurity: ImpurityType = Gini, + maxBins: Int = 20) + + def main(args: Array[String]) { + val defaultParams = Params() + + val parser = new OptionParser[Params]("DecisionTreeRunner") { + head("DecisionTreeRunner: an example decision tree app.") + opt[String]("algo") + .text(s"algorithm (${Algo.values.mkString(",")}), default: ${defaultParams.algo}") + .action((x, c) => c.copy(algo = Algo.withName(x))) + opt[String]("impurity") + .text(s"impurity type (${ImpurityType.values.mkString(",")}), " + + s"default: ${defaultParams.impurity}") + .action((x, c) => c.copy(impurity = ImpurityType.withName(x))) + opt[Int]("maxDepth") + .text(s"max depth of the tree, default: ${defaultParams.maxDepth}") + .action((x, c) => c.copy(maxDepth = x)) + opt[Int]("maxBins") + .text(s"max number of bins, default: ${defaultParams.maxBins}") + .action((x, c) => c.copy(maxBins = x)) + arg[String]("<input>") + .text("input paths to labeled examples in dense format (label,f0 f1 f2 ...)") + .required() + .action((x, c) => c.copy(input = x)) + checkConfig { params => + if (params.algo == Classification && + (params.impurity == Gini || params.impurity == Entropy)) { + success + } else if (params.algo == Regression && params.impurity == Variance) { + success + } else { + failure(s"Algo ${params.algo} is not compatible with impurity ${params.impurity}.") + } + } + } + + parser.parse(args, defaultParams).map { params => + run(params) + }.getOrElse { + sys.exit(1) + } + } + + def run(params: Params) { + val conf = new SparkConf().setAppName("DecisionTreeRunner") + val sc = new SparkContext(conf) + + // Load training data and cache it. + val examples = MLUtils.loadLabeledData(sc, params.input).cache() + + val splits = examples.randomSplit(Array(0.8, 0.2)) + val training = splits(0).cache() + val test = splits(1).cache() + + val numTraining = training.count() + val numTest = test.count() + + println(s"numTraining = $numTraining, numTest = $numTest.") + + examples.unpersist(blocking = false) + + val impurityCalculator = params.impurity match { + case Gini => impurity.Gini + case Entropy => impurity.Entropy + case Variance => impurity.Variance + } + + val strategy = new Strategy(params.algo, impurityCalculator, params.maxDepth, params.maxBins) + val model = DecisionTree.train(training, strategy) + + if (params.algo == Classification) { + val accuracy = accuracyScore(model, test) + println(s"Test accuracy = $accuracy.") + } + + if (params.algo == Regression) { + val mse = meanSquaredError(model, test) + println(s"Test mean squared error = $mse.") + } + + sc.stop() + } + + /** + * Calculates the classifier accuracy. + */ + private def accuracyScore( + model: DecisionTreeModel, + data: RDD[LabeledPoint], + threshold: Double = 0.5): Double = { + def predictedValue(features: Vector): Double = { + if (model.predict(features) < threshold) 0.0 else 1.0 + } + val correctCount = data.filter(y => predictedValue(y.features) == y.label).count() + val count = data.count() + correctCount.toDouble / count + } + + /** + * Calculates the mean squared error for regression. + */ + private def meanSquaredError(tree: DecisionTreeModel, data: RDD[LabeledPoint]): Double = { + data.map { y => + val err = tree.predict(y.features) - y.label + err * err + }.mean() + } +} diff --git a/examples/src/main/scala/org/apache/spark/examples/mllib/DenseKMeans.scala b/examples/src/main/scala/org/apache/spark/examples/mllib/DenseKMeans.scala new file mode 100644 index 0000000000..f96bc1bf00 --- /dev/null +++ b/examples/src/main/scala/org/apache/spark/examples/mllib/DenseKMeans.scala @@ -0,0 +1,109 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.examples.mllib + +import org.apache.log4j.{Level, Logger} +import scopt.OptionParser + +import org.apache.spark.{SparkConf, SparkContext} +import org.apache.spark.mllib.clustering.KMeans +import org.apache.spark.mllib.linalg.Vectors + +/** + * An example k-means app. Run with + * {{{ + * ./bin/spark-example org.apache.spark.examples.mllib.DenseKMeans [options] <input> + * }}} + * If you use it as a template to create your own app, please use `spark-submit` to submit your app. + */ +object DenseKMeans { + + object InitializationMode extends Enumeration { + type InitializationMode = Value + val Random, Parallel = Value + } + + import InitializationMode._ + + case class Params( + input: String = null, + k: Int = -1, + numIterations: Int = 10, + initializationMode: InitializationMode = Parallel) + + def main(args: Array[String]) { + val defaultParams = Params() + + val parser = new OptionParser[Params]("DenseKMeans") { + head("DenseKMeans: an example k-means app for dense data.") + opt[Int]('k', "k") + .required() + .text(s"number of clusters, required") + .action((x, c) => c.copy(k = x)) + opt[Int]("numIterations") + .text(s"number of iterations, default; ${defaultParams.numIterations}") + .action((x, c) => c.copy(numIterations = x)) + opt[String]("initMode") + .text(s"initialization mode (${InitializationMode.values.mkString(",")}), " + + s"default: ${defaultParams.initializationMode}") + .action((x, c) => c.copy(initializationMode = InitializationMode.withName(x))) + arg[String]("<input>") + .text("input paths to examples") + .required() + .action((x, c) => c.copy(input = x)) + } + + parser.parse(args, defaultParams).map { params => + run(params) + }.getOrElse { + sys.exit(1) + } + } + + def run(params: Params) { + val conf = new SparkConf().setAppName(s"DenseKMeans with $params") + val sc = new SparkContext(conf) + + Logger.getRootLogger.setLevel(Level.WARN) + + val examples = sc.textFile(params.input).map { line => + Vectors.dense(line.split(' ').map(_.toDouble)) + }.cache() + + val numExamples = examples.count() + + println(s"numExamples = $numExamples.") + + val initMode = params.initializationMode match { + case Random => KMeans.RANDOM + case Parallel => KMeans.K_MEANS_PARALLEL + } + + val model = new KMeans() + .setInitializationMode(initMode) + .setK(params.k) + .setMaxIterations(params.numIterations) + .run(examples) + + val cost = model.computeCost(examples) + + println(s"Total cost = $cost.") + + sc.stop() + } +} diff --git a/examples/src/main/scala/org/apache/spark/examples/mllib/LinearRegression.scala b/examples/src/main/scala/org/apache/spark/examples/mllib/LinearRegression.scala new file mode 100644 index 0000000000..1723ca6931 --- /dev/null +++ b/examples/src/main/scala/org/apache/spark/examples/mllib/LinearRegression.scala @@ -0,0 +1,125 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.examples.mllib + +import org.apache.log4j.{Level, Logger} +import scopt.OptionParser + +import org.apache.spark.{SparkConf, SparkContext} +import org.apache.spark.mllib.regression.LinearRegressionWithSGD +import org.apache.spark.mllib.util.{MulticlassLabelParser, MLUtils} +import org.apache.spark.mllib.optimization.{SimpleUpdater, SquaredL2Updater, L1Updater} + +/** + * An example app for linear regression. Run with + * {{{ + * ./bin/run-example org.apache.spark.examples.mllib.LinearRegression + * }}} + * If you use it as a template to create your own app, please use `spark-submit` to submit your app. + */ +object LinearRegression extends App { + + object RegType extends Enumeration { + type RegType = Value + val NONE, L1, L2 = Value + } + + import RegType._ + + case class Params( + input: String = null, + numIterations: Int = 100, + stepSize: Double = 1.0, + regType: RegType = L2, + regParam: Double = 0.1) + + val defaultParams = Params() + + val parser = new OptionParser[Params]("LinearRegression") { + head("LinearRegression: an example app for linear regression.") + opt[Int]("numIterations") + .text("number of iterations") + .action((x, c) => c.copy(numIterations = x)) + opt[Double]("stepSize") + .text(s"initial step size, default: ${defaultParams.stepSize}") + .action((x, c) => c.copy(stepSize = x)) + opt[String]("regType") + .text(s"regularization type (${RegType.values.mkString(",")}), " + + s"default: ${defaultParams.regType}") + .action((x, c) => c.copy(regType = RegType.withName(x))) + opt[Double]("regParam") + .text(s"regularization parameter, default: ${defaultParams.regParam}") + arg[String]("<input>") + .required() + .text("input paths to labeled examples in LIBSVM format") + .action((x, c) => c.copy(input = x)) + } + + parser.parse(args, defaultParams).map { params => + run(params) + } getOrElse { + sys.exit(1) + } + + def run(params: Params) { + val conf = new SparkConf().setAppName(s"LinearRegression with $params") + val sc = new SparkContext(conf) + + Logger.getRootLogger.setLevel(Level.WARN) + + val examples = MLUtils.loadLibSVMData(sc, params.input, MulticlassLabelParser).cache() + + val splits = examples.randomSplit(Array(0.8, 0.2)) + val training = splits(0).cache() + val test = splits(1).cache() + + val numTraining = training.count() + val numTest = test.count() + println(s"Training: $numTraining, test: $numTest.") + + examples.unpersist(blocking = false) + + val updater = params.regType match { + case NONE => new SimpleUpdater() + case L1 => new L1Updater() + case L2 => new SquaredL2Updater() + } + + val algorithm = new LinearRegressionWithSGD() + algorithm.optimizer + .setNumIterations(params.numIterations) + .setStepSize(params.stepSize) + .setUpdater(updater) + .setRegParam(params.regParam) + + val model = algorithm.run(training) + + val prediction = model.predict(test.map(_.features)) + val predictionAndLabel = prediction.zip(test.map(_.label)) + + val loss = predictionAndLabel.map { case (p, l) => + val err = p - l + err * err + }.reduce(_ + _) + val rmse = math.sqrt(loss / numTest) + + println(s"Test RMSE = $rmse.") + + sc.stop() + } +} diff --git a/examples/src/main/scala/org/apache/spark/examples/mllib/MovieLensALS.scala b/examples/src/main/scala/org/apache/spark/examples/mllib/MovieLensALS.scala new file mode 100644 index 0000000000..703f02255b --- /dev/null +++ b/examples/src/main/scala/org/apache/spark/examples/mllib/MovieLensALS.scala @@ -0,0 +1,131 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.examples.mllib + +import com.esotericsoftware.kryo.Kryo +import org.apache.log4j.{Level, Logger} +import scopt.OptionParser + +import org.apache.spark.{SparkConf, SparkContext} +import org.apache.spark.SparkContext._ +import org.apache.spark.mllib.recommendation.{ALS, MatrixFactorizationModel, Rating} +import org.apache.spark.rdd.RDD +import org.apache.spark.serializer.{KryoSerializer, KryoRegistrator} + +/** + * An example app for ALS on MovieLens data (http://grouplens.org/datasets/movielens/). + */ +object MovieLensALS { + + class ALSRegistrator extends KryoRegistrator { + override def registerClasses(kryo: Kryo) { + kryo.register(classOf[Rating]) + } + } + + case class Params( + input: String = null, + kryo: Boolean = false, + numIterations: Int = 20, + lambda: Double = 1.0, + rank: Int = 10) + + def main(args: Array[String]) { + val defaultParams = Params() + + val parser = new OptionParser[Params]("MovieLensALS") { + head("MovieLensALS: an example app for ALS on MovieLens data.") + opt[Int]("rank") + .text(s"rank, default: ${defaultParams.rank}}") + .action((x, c) => c.copy(rank = x)) + opt[Int]("numIterations") + .text(s"number of iterations, default: ${defaultParams.numIterations}") + .action((x, c) => c.copy(numIterations = x)) + opt[Double]("lambda") + .text(s"lambda (smoothing constant), default: ${defaultParams.lambda}") + .action((x, c) => c.copy(lambda = x)) + opt[Unit]("kryo") + .text(s"use Kryo serialization") + .action((_, c) => c.copy(kryo = true)) + arg[String]("<input>") + .required() + .text("input paths to a MovieLens dataset of ratings") + .action((x, c) => c.copy(input = x)) + } + + parser.parse(args, defaultParams).map { params => + run(params) + } getOrElse { + System.exit(1) + } + } + + def run(params: Params) { + val conf = new SparkConf().setAppName(s"MovieLensALS with $params") + if (params.kryo) { + conf.set("spark.serializer", classOf[KryoSerializer].getName) + .set("spark.kryo.registrator", classOf[ALSRegistrator].getName) + .set("spark.kryoserializer.buffer.mb", "8") + } + val sc = new SparkContext(conf) + + Logger.getRootLogger.setLevel(Level.WARN) + + val ratings = sc.textFile(params.input).map { line => + val fields = line.split("::") + Rating(fields(0).toInt, fields(1).toInt, fields(2).toDouble) + }.cache() + + val numRatings = ratings.count() + val numUsers = ratings.map(_.user).distinct().count() + val numMovies = ratings.map(_.product).distinct().count() + + println(s"Got $numRatings ratings from $numUsers users on $numMovies movies.") + + val splits = ratings.randomSplit(Array(0.8, 0.2)) + val training = splits(0).cache() + val test = splits(1).cache() + + val numTraining = training.count() + val numTest = test.count() + println(s"Training: $numTraining, test: $numTest.") + + ratings.unpersist(blocking = false) + + val model = new ALS() + .setRank(params.rank) + .setIterations(params.numIterations) + .setLambda(params.lambda) + .run(training) + + val rmse = computeRmse(model, test, numTest) + + println(s"Test RMSE = $rmse.") + + sc.stop() + } + + /** Compute RMSE (Root Mean Squared Error). */ + def computeRmse(model: MatrixFactorizationModel, data: RDD[Rating], n: Long) = { + val predictions: RDD[Rating] = model.predict(data.map(x => (x.user, x.product))) + val predictionsAndRatings = predictions.map(x => ((x.user, x.product), x.rating)) + .join(data.map(x => ((x.user, x.product), x.rating))) + .values + math.sqrt(predictionsAndRatings.map(x => (x._1 - x._2) * (x._1 - x._2)).reduce(_ + _) / n) + } +} diff --git a/examples/src/main/scala/org/apache/spark/examples/mllib/SparseNaiveBayes.scala b/examples/src/main/scala/org/apache/spark/examples/mllib/SparseNaiveBayes.scala new file mode 100644 index 0000000000..25b6768b8d --- /dev/null +++ b/examples/src/main/scala/org/apache/spark/examples/mllib/SparseNaiveBayes.scala @@ -0,0 +1,102 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.examples.mllib + +import org.apache.log4j.{Level, Logger} +import scopt.OptionParser + +import org.apache.spark.{SparkConf, SparkContext} +import org.apache.spark.mllib.classification.NaiveBayes +import org.apache.spark.mllib.util.{MLUtils, MulticlassLabelParser} + +/** + * An example naive Bayes app. Run with + * {{{ + * ./bin/spark-example org.apache.spark.examples.mllib.SparseNaiveBayes [options] <input> + * }}} + * If you use it as a template to create your own app, please use `spark-submit` to submit your app. + */ +object SparseNaiveBayes { + + case class Params( + input: String = null, + minPartitions: Int = 0, + numFeatures: Int = -1, + lambda: Double = 1.0) + + def main(args: Array[String]) { + val defaultParams = Params() + + val parser = new OptionParser[Params]("SparseNaiveBayes") { + head("SparseNaiveBayes: an example naive Bayes app for LIBSVM data.") + opt[Int]("numPartitions") + .text("min number of partitions") + .action((x, c) => c.copy(minPartitions = x)) + opt[Int]("numFeatures") + .text("number of features") + .action((x, c) => c.copy(numFeatures = x)) + opt[Double]("lambda") + .text(s"lambda (smoothing constant), default: ${defaultParams.lambda}") + .action((x, c) => c.copy(lambda = x)) + arg[String]("<input>") + .text("input paths to labeled examples in LIBSVM format") + .required() + .action((x, c) => c.copy(input = x)) + } + + parser.parse(args, defaultParams).map { params => + run(params) + }.getOrElse { + sys.exit(1) + } + } + + def run(params: Params) { + val conf = new SparkConf().setAppName(s"SparseNaiveBayes with $params") + val sc = new SparkContext(conf) + + Logger.getRootLogger.setLevel(Level.WARN) + + val minPartitions = + if (params.minPartitions > 0) params.minPartitions else sc.defaultMinPartitions + + val examples = MLUtils.loadLibSVMData(sc, params.input, MulticlassLabelParser, + params.numFeatures, minPartitions) + // Cache examples because it will be used in both training and evaluation. + examples.cache() + + val splits = examples.randomSplit(Array(0.8, 0.2)) + val training = splits(0) + val test = splits(1) + + val numTraining = training.count() + val numTest = test.count() + + println(s"numTraining = $numTraining, numTest = $numTest.") + + val model = new NaiveBayes().setLambda(params.lambda).run(training) + + val prediction = model.predict(test.map(_.features)) + val predictionAndLabel = prediction.zip(test.map(_.label)) + val accuracy = predictionAndLabel.filter(x => x._1 == x._2).count().toDouble / numTest + + println(s"Test accuracy = $accuracy.") + + sc.stop() + } +} diff --git a/examples/src/main/scala/org/apache/spark/examples/mllib/TallSkinnyPCA.scala b/examples/src/main/scala/org/apache/spark/examples/mllib/TallSkinnyPCA.scala index 39e71cdab4..3cd9cb743e 100644 --- a/examples/src/main/scala/org/apache/spark/examples/mllib/TallSkinnyPCA.scala +++ b/examples/src/main/scala/org/apache/spark/examples/mllib/TallSkinnyPCA.scala @@ -35,20 +35,16 @@ import org.apache.spark.mllib.linalg.Vectors */ object TallSkinnyPCA { def main(args: Array[String]) { - if (args.length != 2) { - System.err.println("Usage: TallSkinnyPCA <master> <file>") + if (args.length != 1) { + System.err.println("Usage: TallSkinnyPCA <input>") System.exit(1) } - val conf = new SparkConf() - .setMaster(args(0)) - .setAppName("TallSkinnyPCA") - .setSparkHome(System.getenv("SPARK_HOME")) - .setJars(SparkContext.jarOfClass(this.getClass).toSeq) + val conf = new SparkConf().setAppName("TallSkinnyPCA") val sc = new SparkContext(conf) // Load and parse the data file. - val rows = sc.textFile(args(1)).map { line => + val rows = sc.textFile(args(0)).map { line => val values = line.split(' ').map(_.toDouble) Vectors.dense(values) } diff --git a/examples/src/main/scala/org/apache/spark/examples/mllib/TallSkinnySVD.scala b/examples/src/main/scala/org/apache/spark/examples/mllib/TallSkinnySVD.scala index 2b7de2acc6..4d66903186 100644 --- a/examples/src/main/scala/org/apache/spark/examples/mllib/TallSkinnySVD.scala +++ b/examples/src/main/scala/org/apache/spark/examples/mllib/TallSkinnySVD.scala @@ -35,20 +35,16 @@ import org.apache.spark.mllib.linalg.Vectors */ object TallSkinnySVD { def main(args: Array[String]) { - if (args.length != 2) { - System.err.println("Usage: TallSkinnySVD <master> <file>") + if (args.length != 1) { + System.err.println("Usage: TallSkinnySVD <input>") System.exit(1) } - val conf = new SparkConf() - .setMaster(args(0)) - .setAppName("TallSkinnySVD") - .setSparkHome(System.getenv("SPARK_HOME")) - .setJars(SparkContext.jarOfClass(this.getClass).toSeq) + val conf = new SparkConf().setAppName("TallSkinnySVD") val sc = new SparkContext(conf) // Load and parse the data file. - val rows = sc.textFile(args(1)).map { line => + val rows = sc.textFile(args(0)).map { line => val values = line.split(' ').map(_.toDouble) Vectors.dense(values) } |