aboutsummaryrefslogtreecommitdiff
path: root/examples
diff options
context:
space:
mode:
authorDB Tsai <dbt@netflix.com>2015-06-02 19:12:08 -0700
committerJoseph K. Bradley <joseph@databricks.com>2015-06-02 19:12:08 -0700
commita86b3e9b9b75f5af4fdbba22e87769058f023204 (patch)
treec44cf27beb8bfc2e93efede89c9e2755b1789629 /examples
parentc3f4c3257194ba34ccd298d13ea1edcfc75f7552 (diff)
downloadspark-a86b3e9b9b75f5af4fdbba22e87769058f023204.tar.gz
spark-a86b3e9b9b75f5af4fdbba22e87769058f023204.tar.bz2
spark-a86b3e9b9b75f5af4fdbba22e87769058f023204.zip
[SPARK-7547] [ML] Scala Example code for ElasticNet
This is scala example code for both linear and logistic regression. Python and Java versions are to be added. Author: DB Tsai <dbt@netflix.com> Closes #6576 from dbtsai/elasticNetExample and squashes the following commits: e7ca406 [DB Tsai] fix test 6bb6d77 [DB Tsai] fix suite and remove duplicated setMaxIter 136e0dd [DB Tsai] address feedback 1ec29d4 [DB Tsai] fix style 9462f5f [DB Tsai] add example
Diffstat (limited to 'examples')
-rw-r--r--examples/src/main/scala/org/apache/spark/examples/ml/LinearRegressionExample.scala142
-rw-r--r--examples/src/main/scala/org/apache/spark/examples/ml/LogisticRegressionExample.scala159
2 files changed, 301 insertions, 0 deletions
diff --git a/examples/src/main/scala/org/apache/spark/examples/ml/LinearRegressionExample.scala b/examples/src/main/scala/org/apache/spark/examples/ml/LinearRegressionExample.scala
new file mode 100644
index 0000000000..b54466fd48
--- /dev/null
+++ b/examples/src/main/scala/org/apache/spark/examples/ml/LinearRegressionExample.scala
@@ -0,0 +1,142 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.examples.ml
+
+import scala.collection.mutable
+import scala.language.reflectiveCalls
+
+import scopt.OptionParser
+
+import org.apache.spark.{SparkConf, SparkContext}
+import org.apache.spark.examples.mllib.AbstractParams
+import org.apache.spark.ml.{Pipeline, PipelineStage}
+import org.apache.spark.ml.regression.{LinearRegression, LinearRegressionModel}
+import org.apache.spark.sql.DataFrame
+
+/**
+ * An example runner for linear regression with elastic-net (mixing L1/L2) regularization.
+ * Run with
+ * {{{
+ * bin/run-example ml.LinearRegressionExample [options]
+ * }}}
+ * A synthetic dataset can be found at `data/mllib/sample_linear_regression_data.txt` which can be
+ * trained by
+ * {{{
+ * bin/run-example ml.LinearRegressionExample --regParam 0.15 --elasticNetParam 1.0 \
+ * data/mllib/sample_linear_regression_data.txt
+ * }}}
+ * If you use it as a template to create your own app, please use `spark-submit` to submit your app.
+ */
+object LinearRegressionExample {
+
+ case class Params(
+ input: String = null,
+ testInput: String = "",
+ dataFormat: String = "libsvm",
+ regParam: Double = 0.0,
+ elasticNetParam: Double = 0.0,
+ maxIter: Int = 100,
+ tol: Double = 1E-6,
+ fracTest: Double = 0.2) extends AbstractParams[Params]
+
+ def main(args: Array[String]) {
+ val defaultParams = Params()
+
+ val parser = new OptionParser[Params]("LinearRegressionExample") {
+ head("LinearRegressionExample: an example Linear Regression with Elastic-Net app.")
+ opt[Double]("regParam")
+ .text(s"regularization parameter, default: ${defaultParams.regParam}")
+ .action((x, c) => c.copy(regParam = x))
+ opt[Double]("elasticNetParam")
+ .text(s"ElasticNet mixing parameter. For alpha = 0, the penalty is an L2 penalty. " +
+ s"For alpha = 1, it is an L1 penalty. For 0 < alpha < 1, the penalty is a combination of " +
+ s"L1 and L2, default: ${defaultParams.elasticNetParam}")
+ .action((x, c) => c.copy(elasticNetParam = x))
+ opt[Int]("maxIter")
+ .text(s"maximum number of iterations, default: ${defaultParams.maxIter}")
+ .action((x, c) => c.copy(maxIter = x))
+ opt[Double]("tol")
+ .text(s"the convergence tolerance of iterations, Smaller value will lead " +
+ s"to higher accuracy with the cost of more iterations, default: ${defaultParams.tol}")
+ .action((x, c) => c.copy(tol = x))
+ opt[Double]("fracTest")
+ .text(s"fraction of data to hold out for testing. If given option testInput, " +
+ s"this option is ignored. default: ${defaultParams.fracTest}")
+ .action((x, c) => c.copy(fracTest = x))
+ opt[String]("testInput")
+ .text(s"input path to test dataset. If given, option fracTest is ignored." +
+ s" default: ${defaultParams.testInput}")
+ .action((x, c) => c.copy(testInput = x))
+ opt[String]("dataFormat")
+ .text("data format: libsvm (default), dense (deprecated in Spark v1.1)")
+ .action((x, c) => c.copy(dataFormat = x))
+ arg[String]("<input>")
+ .text("input path to labeled examples")
+ .required()
+ .action((x, c) => c.copy(input = x))
+ checkConfig { params =>
+ if (params.fracTest < 0 || params.fracTest >= 1) {
+ failure(s"fracTest ${params.fracTest} value incorrect; should be in [0,1).")
+ } else {
+ success
+ }
+ }
+ }
+
+ parser.parse(args, defaultParams).map { params =>
+ run(params)
+ }.getOrElse {
+ sys.exit(1)
+ }
+ }
+
+ def run(params: Params) {
+ val conf = new SparkConf().setAppName(s"LinearRegressionExample with $params")
+ val sc = new SparkContext(conf)
+
+ println(s"LinearRegressionExample with parameters:\n$params")
+
+ // Load training and test data and cache it.
+ val (training: DataFrame, test: DataFrame) = DecisionTreeExample.loadDatasets(sc, params.input,
+ params.dataFormat, params.testInput, "regression", params.fracTest)
+
+ val lir = new LinearRegression()
+ .setFeaturesCol("features")
+ .setLabelCol("label")
+ .setRegParam(params.regParam)
+ .setElasticNetParam(params.elasticNetParam)
+ .setMaxIter(params.maxIter)
+ .setTol(params.tol)
+
+ // Train the model
+ val startTime = System.nanoTime()
+ val lirModel = lir.fit(training)
+ val elapsedTime = (System.nanoTime() - startTime) / 1e9
+ println(s"Training time: $elapsedTime seconds")
+
+ // Print the weights and intercept for linear regression.
+ println(s"Weights: ${lirModel.weights} Intercept: ${lirModel.intercept}")
+
+ println("Training data results:")
+ DecisionTreeExample.evaluateRegressionModel(lirModel, training, "label")
+ println("Test data results:")
+ DecisionTreeExample.evaluateRegressionModel(lirModel, test, "label")
+
+ sc.stop()
+ }
+}
diff --git a/examples/src/main/scala/org/apache/spark/examples/ml/LogisticRegressionExample.scala b/examples/src/main/scala/org/apache/spark/examples/ml/LogisticRegressionExample.scala
new file mode 100644
index 0000000000..b12f833ce9
--- /dev/null
+++ b/examples/src/main/scala/org/apache/spark/examples/ml/LogisticRegressionExample.scala
@@ -0,0 +1,159 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.examples.ml
+
+import scala.collection.mutable
+import scala.language.reflectiveCalls
+
+import scopt.OptionParser
+
+import org.apache.spark.{SparkConf, SparkContext}
+import org.apache.spark.examples.mllib.AbstractParams
+import org.apache.spark.ml.{Pipeline, PipelineStage}
+import org.apache.spark.ml.classification.{LogisticRegression, LogisticRegressionModel}
+import org.apache.spark.ml.feature.StringIndexer
+import org.apache.spark.sql.DataFrame
+
+/**
+ * An example runner for logistic regression with elastic-net (mixing L1/L2) regularization.
+ * Run with
+ * {{{
+ * bin/run-example ml.LogisticRegressionExample [options]
+ * }}}
+ * A synthetic dataset can be found at `data/mllib/sample_libsvm_data.txt` which can be
+ * trained by
+ * {{{
+ * bin/run-example ml.LogisticRegressionExample --regParam 0.3 --elasticNetParam 0.8 \
+ * data/mllib/sample_libsvm_data.txt
+ * }}}
+ * If you use it as a template to create your own app, please use `spark-submit` to submit your app.
+ */
+object LogisticRegressionExample {
+
+ case class Params(
+ input: String = null,
+ testInput: String = "",
+ dataFormat: String = "libsvm",
+ regParam: Double = 0.0,
+ elasticNetParam: Double = 0.0,
+ maxIter: Int = 100,
+ fitIntercept: Boolean = true,
+ tol: Double = 1E-6,
+ fracTest: Double = 0.2) extends AbstractParams[Params]
+
+ def main(args: Array[String]) {
+ val defaultParams = Params()
+
+ val parser = new OptionParser[Params]("LogisticRegressionExample") {
+ head("LogisticRegressionExample: an example Logistic Regression with Elastic-Net app.")
+ opt[Double]("regParam")
+ .text(s"regularization parameter, default: ${defaultParams.regParam}")
+ .action((x, c) => c.copy(regParam = x))
+ opt[Double]("elasticNetParam")
+ .text(s"ElasticNet mixing parameter. For alpha = 0, the penalty is an L2 penalty. " +
+ s"For alpha = 1, it is an L1 penalty. For 0 < alpha < 1, the penalty is a combination of " +
+ s"L1 and L2, default: ${defaultParams.elasticNetParam}")
+ .action((x, c) => c.copy(elasticNetParam = x))
+ opt[Int]("maxIter")
+ .text(s"maximum number of iterations, default: ${defaultParams.maxIter}")
+ .action((x, c) => c.copy(maxIter = x))
+ opt[Boolean]("fitIntercept")
+ .text(s"whether to fit an intercept term, default: ${defaultParams.fitIntercept}")
+ .action((x, c) => c.copy(fitIntercept = x))
+ opt[Double]("tol")
+ .text(s"the convergence tolerance of iterations, Smaller value will lead " +
+ s"to higher accuracy with the cost of more iterations, default: ${defaultParams.tol}")
+ .action((x, c) => c.copy(tol = x))
+ opt[Double]("fracTest")
+ .text(s"fraction of data to hold out for testing. If given option testInput, " +
+ s"this option is ignored. default: ${defaultParams.fracTest}")
+ .action((x, c) => c.copy(fracTest = x))
+ opt[String]("testInput")
+ .text(s"input path to test dataset. If given, option fracTest is ignored." +
+ s" default: ${defaultParams.testInput}")
+ .action((x, c) => c.copy(testInput = x))
+ opt[String]("dataFormat")
+ .text("data format: libsvm (default), dense (deprecated in Spark v1.1)")
+ .action((x, c) => c.copy(dataFormat = x))
+ arg[String]("<input>")
+ .text("input path to labeled examples")
+ .required()
+ .action((x, c) => c.copy(input = x))
+ checkConfig { params =>
+ if (params.fracTest < 0 || params.fracTest >= 1) {
+ failure(s"fracTest ${params.fracTest} value incorrect; should be in [0,1).")
+ } else {
+ success
+ }
+ }
+ }
+
+ parser.parse(args, defaultParams).map { params =>
+ run(params)
+ }.getOrElse {
+ sys.exit(1)
+ }
+ }
+
+ def run(params: Params) {
+ val conf = new SparkConf().setAppName(s"LogisticRegressionExample with $params")
+ val sc = new SparkContext(conf)
+
+ println(s"LogisticRegressionExample with parameters:\n$params")
+
+ // Load training and test data and cache it.
+ val (training: DataFrame, test: DataFrame) = DecisionTreeExample.loadDatasets(sc, params.input,
+ params.dataFormat, params.testInput, "classification", params.fracTest)
+
+ // Set up Pipeline
+ val stages = new mutable.ArrayBuffer[PipelineStage]()
+
+ val labelIndexer = new StringIndexer()
+ .setInputCol("labelString")
+ .setOutputCol("indexedLabel")
+ stages += labelIndexer
+
+ val lor = new LogisticRegression()
+ .setFeaturesCol("features")
+ .setLabelCol("indexedLabel")
+ .setRegParam(params.regParam)
+ .setElasticNetParam(params.elasticNetParam)
+ .setMaxIter(params.maxIter)
+ .setTol(params.tol)
+
+ stages += lor
+ val pipeline = new Pipeline().setStages(stages.toArray)
+
+ // Fit the Pipeline
+ val startTime = System.nanoTime()
+ val pipelineModel = pipeline.fit(training)
+ val elapsedTime = (System.nanoTime() - startTime) / 1e9
+ println(s"Training time: $elapsedTime seconds")
+
+ val lirModel = pipelineModel.stages.last.asInstanceOf[LogisticRegressionModel]
+ // Print the weights and intercept for logistic regression.
+ println(s"Weights: ${lirModel.weights} Intercept: ${lirModel.intercept}")
+
+ println("Training data results:")
+ DecisionTreeExample.evaluateClassificationModel(pipelineModel, training, "indexedLabel")
+ println("Test data results:")
+ DecisionTreeExample.evaluateClassificationModel(pipelineModel, test, "indexedLabel")
+
+ sc.stop()
+ }
+}