diff options
author | Joseph K. Bradley <joseph.kurata.bradley@gmail.com> | 2014-08-18 18:01:39 -0700 |
---|---|---|
committer | Xiangrui Meng <meng@databricks.com> | 2014-08-18 18:01:39 -0700 |
commit | c8b16ca0d86cc60fb960eebf0cb383f159a88b03 (patch) | |
tree | 27f6b16cc7bd14af681d1678fda53ea3051e2e36 /examples/src/main/scala | |
parent | 115eeb30dd9c9dd10685a71f2c23ca23794d3142 (diff) | |
download | spark-c8b16ca0d86cc60fb960eebf0cb383f159a88b03.tar.gz spark-c8b16ca0d86cc60fb960eebf0cb383f159a88b03.tar.bz2 spark-c8b16ca0d86cc60fb960eebf0cb383f159a88b03.zip |
[SPARK-2850] [SPARK-2626] [mllib] MLlib stats examples + small fixes
Added examples for statistical summarization:
* Scala: StatisticalSummary.scala
** Tests: correlation, MultivariateOnlineSummarizer
* python: statistical_summary.py
** Tests: correlation (since MultivariateOnlineSummarizer has no Python API)
Added examples for random and sampled RDDs:
* Scala: RandomAndSampledRDDs.scala
* python: random_and_sampled_rdds.py
* Both test:
** RandomRDDGenerators.normalRDD, normalVectorRDD
** RDD.sample, takeSample, sampleByKey
Added sc.stop() to all examples.
CorrelationSuite.scala
* Added 1 test for RDDs with only 1 value
RowMatrix.scala
* numCols(): Added check for numRows = 0, with error message.
* computeCovariance(): Added check for numRows <= 1, with error message.
Python SparseVector (pyspark/mllib/linalg.py)
* Added toDense() function
python/run-tests script
* Added stat.py (doc test)
CC: mengxr dorx Main changes were examples to show usage across APIs.
Author: Joseph K. Bradley <joseph.kurata.bradley@gmail.com>
Closes #1878 from jkbradley/mllib-stats-api-check and squashes the following commits:
ea5c047 [Joseph K. Bradley] Merge remote-tracking branch 'upstream/master' into mllib-stats-api-check
dafebe2 [Joseph K. Bradley] Bug fixes for examples SampledRDDs.scala and sampled_rdds.py: Check for division by 0 and for missing key in maps.
8d1e555 [Joseph K. Bradley] Merge remote-tracking branch 'upstream/master' into mllib-stats-api-check
60c72d9 [Joseph K. Bradley] Fixed stat.py doc test to work for Python versions printing nan or NaN.
b20d90a [Joseph K. Bradley] Merge remote-tracking branch 'upstream/master' into mllib-stats-api-check
4e5d15e [Joseph K. Bradley] Changed pyspark/mllib/stat.py doc tests to use NaN instead of nan.
32173b7 [Joseph K. Bradley] Stats examples update.
c8c20dc [Joseph K. Bradley] Merge remote-tracking branch 'upstream/master' into mllib-stats-api-check
cf70b07 [Joseph K. Bradley] Merge remote-tracking branch 'upstream/master' into mllib-stats-api-check
0b7cec3 [Joseph K. Bradley] Small updates based on code review. Renamed statistical_summary.py to correlations.py
ab48f6e [Joseph K. Bradley] RowMatrix.scala * numCols(): Added check for numRows = 0, with error message. * computeCovariance(): Added check for numRows <= 1, with error message.
65e4ebc [Joseph K. Bradley] Merge remote-tracking branch 'upstream/master' into mllib-stats-api-check
8195c78 [Joseph K. Bradley] Added examples for random and sampled RDDs: * Scala: RandomAndSampledRDDs.scala * python: random_and_sampled_rdds.py * Both test: ** RandomRDDGenerators.normalRDD, normalVectorRDD ** RDD.sample, takeSample, sampleByKey
064985b [Joseph K. Bradley] Merge remote-tracking branch 'upstream/master' into mllib-stats-api-check
ee918e9 [Joseph K. Bradley] Added examples for statistical summarization: * Scala: StatisticalSummary.scala ** Tests: correlation, MultivariateOnlineSummarizer * python: statistical_summary.py ** Tests: correlation (since MultivariateOnlineSummarizer has no Python API)
Diffstat (limited to 'examples/src/main/scala')
4 files changed, 376 insertions, 0 deletions
diff --git a/examples/src/main/scala/org/apache/spark/examples/mllib/Correlations.scala b/examples/src/main/scala/org/apache/spark/examples/mllib/Correlations.scala new file mode 100644 index 0000000000..d6b2fe430e --- /dev/null +++ b/examples/src/main/scala/org/apache/spark/examples/mllib/Correlations.scala @@ -0,0 +1,92 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.examples.mllib + +import scopt.OptionParser + +import org.apache.spark.mllib.stat.Statistics +import org.apache.spark.mllib.util.MLUtils +import org.apache.spark.{SparkConf, SparkContext} + + +/** + * An example app for summarizing multivariate data from a file. Run with + * {{{ + * bin/run-example org.apache.spark.examples.mllib.Correlations + * }}} + * By default, this loads a synthetic dataset from `data/mllib/sample_linear_regression_data.txt`. + * If you use it as a template to create your own app, please use `spark-submit` to submit your app. + */ +object Correlations { + + case class Params(input: String = "data/mllib/sample_linear_regression_data.txt") + + def main(args: Array[String]) { + + val defaultParams = Params() + + val parser = new OptionParser[Params]("Correlations") { + head("Correlations: an example app for computing correlations") + opt[String]("input") + .text(s"Input path to labeled examples in LIBSVM format, default: ${defaultParams.input}") + .action((x, c) => c.copy(input = x)) + note( + """ + |For example, the following command runs this app on a synthetic dataset: + | + | bin/spark-submit --class org.apache.spark.examples.mllib.Correlations \ + | examples/target/scala-*/spark-examples-*.jar \ + | --input data/mllib/sample_linear_regression_data.txt + """.stripMargin) + } + + parser.parse(args, defaultParams).map { params => + run(params) + } getOrElse { + sys.exit(1) + } + } + + def run(params: Params) { + val conf = new SparkConf().setAppName(s"Correlations with $params") + val sc = new SparkContext(conf) + + val examples = MLUtils.loadLibSVMFile(sc, params.input).cache() + + println(s"Summary of data file: ${params.input}") + println(s"${examples.count()} data points") + + // Calculate label -- feature correlations + val labelRDD = examples.map(_.label) + val numFeatures = examples.take(1)(0).features.size + val corrType = "pearson" + println() + println(s"Correlation ($corrType) between label and each feature") + println(s"Feature\tCorrelation") + var feature = 0 + while (feature < numFeatures) { + val featureRDD = examples.map(_.features(feature)) + val corr = Statistics.corr(labelRDD, featureRDD) + println(s"$feature\t$corr") + feature += 1 + } + println() + + sc.stop() + } +} diff --git a/examples/src/main/scala/org/apache/spark/examples/mllib/MultivariateSummarizer.scala b/examples/src/main/scala/org/apache/spark/examples/mllib/MultivariateSummarizer.scala new file mode 100644 index 0000000000..4532512c01 --- /dev/null +++ b/examples/src/main/scala/org/apache/spark/examples/mllib/MultivariateSummarizer.scala @@ -0,0 +1,98 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.examples.mllib + +import scopt.OptionParser + +import org.apache.spark.mllib.linalg.Vectors +import org.apache.spark.mllib.stat.MultivariateOnlineSummarizer +import org.apache.spark.mllib.util.MLUtils +import org.apache.spark.{SparkConf, SparkContext} + + +/** + * An example app for summarizing multivariate data from a file. Run with + * {{{ + * bin/run-example org.apache.spark.examples.mllib.MultivariateSummarizer + * }}} + * By default, this loads a synthetic dataset from `data/mllib/sample_linear_regression_data.txt`. + * If you use it as a template to create your own app, please use `spark-submit` to submit your app. + */ +object MultivariateSummarizer { + + case class Params(input: String = "data/mllib/sample_linear_regression_data.txt") + + def main(args: Array[String]) { + + val defaultParams = Params() + + val parser = new OptionParser[Params]("MultivariateSummarizer") { + head("MultivariateSummarizer: an example app for MultivariateOnlineSummarizer") + opt[String]("input") + .text(s"Input path to labeled examples in LIBSVM format, default: ${defaultParams.input}") + .action((x, c) => c.copy(input = x)) + note( + """ + |For example, the following command runs this app on a synthetic dataset: + | + | bin/spark-submit --class org.apache.spark.examples.mllib.MultivariateSummarizer \ + | examples/target/scala-*/spark-examples-*.jar \ + | --input data/mllib/sample_linear_regression_data.txt + """.stripMargin) + } + + parser.parse(args, defaultParams).map { params => + run(params) + } getOrElse { + sys.exit(1) + } + } + + def run(params: Params) { + val conf = new SparkConf().setAppName(s"MultivariateSummarizer with $params") + val sc = new SparkContext(conf) + + val examples = MLUtils.loadLibSVMFile(sc, params.input).cache() + + println(s"Summary of data file: ${params.input}") + println(s"${examples.count()} data points") + + // Summarize labels + val labelSummary = examples.aggregate(new MultivariateOnlineSummarizer())( + (summary, lp) => summary.add(Vectors.dense(lp.label)), + (sum1, sum2) => sum1.merge(sum2)) + + // Summarize features + val featureSummary = examples.aggregate(new MultivariateOnlineSummarizer())( + (summary, lp) => summary.add(lp.features), + (sum1, sum2) => sum1.merge(sum2)) + + println() + println(s"Summary statistics") + println(s"\tLabel\tFeatures") + println(s"mean\t${labelSummary.mean(0)}\t${featureSummary.mean.toArray.mkString("\t")}") + println(s"var\t${labelSummary.variance(0)}\t${featureSummary.variance.toArray.mkString("\t")}") + println( + s"nnz\t${labelSummary.numNonzeros(0)}\t${featureSummary.numNonzeros.toArray.mkString("\t")}") + println(s"max\t${labelSummary.max(0)}\t${featureSummary.max.toArray.mkString("\t")}") + println(s"min\t${labelSummary.min(0)}\t${featureSummary.min.toArray.mkString("\t")}") + println() + + sc.stop() + } +} diff --git a/examples/src/main/scala/org/apache/spark/examples/mllib/RandomRDDGeneration.scala b/examples/src/main/scala/org/apache/spark/examples/mllib/RandomRDDGeneration.scala new file mode 100644 index 0000000000..924b586e3a --- /dev/null +++ b/examples/src/main/scala/org/apache/spark/examples/mllib/RandomRDDGeneration.scala @@ -0,0 +1,60 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.examples.mllib + +import org.apache.spark.mllib.random.RandomRDDs +import org.apache.spark.rdd.RDD + +import org.apache.spark.{SparkConf, SparkContext} + +/** + * An example app for randomly generated RDDs. Run with + * {{{ + * bin/run-example org.apache.spark.examples.mllib.RandomRDDGeneration + * }}} + * If you use it as a template to create your own app, please use `spark-submit` to submit your app. + */ +object RandomRDDGeneration { + + def main(args: Array[String]) { + + val conf = new SparkConf().setAppName(s"RandomRDDGeneration") + val sc = new SparkContext(conf) + + val numExamples = 10000 // number of examples to generate + val fraction = 0.1 // fraction of data to sample + + // Example: RandomRDDs.normalRDD + val normalRDD: RDD[Double] = RandomRDDs.normalRDD(sc, numExamples) + println(s"Generated RDD of ${normalRDD.count()}" + + " examples sampled from the standard normal distribution") + println(" First 5 samples:") + normalRDD.take(5).foreach( x => println(s" $x") ) + + // Example: RandomRDDs.normalVectorRDD + val normalVectorRDD = RandomRDDs.normalVectorRDD(sc, numRows = numExamples, numCols = 2) + println(s"Generated RDD of ${normalVectorRDD.count()} examples of length-2 vectors.") + println(" First 5 samples:") + normalVectorRDD.take(5).foreach( x => println(s" $x") ) + + println() + + sc.stop() + } + +} diff --git a/examples/src/main/scala/org/apache/spark/examples/mllib/SampledRDDs.scala b/examples/src/main/scala/org/apache/spark/examples/mllib/SampledRDDs.scala new file mode 100644 index 0000000000..f01b8266e3 --- /dev/null +++ b/examples/src/main/scala/org/apache/spark/examples/mllib/SampledRDDs.scala @@ -0,0 +1,126 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.examples.mllib + +import org.apache.spark.mllib.util.MLUtils +import scopt.OptionParser + +import org.apache.spark.{SparkConf, SparkContext} +import org.apache.spark.SparkContext._ + +/** + * An example app for randomly generated and sampled RDDs. Run with + * {{{ + * bin/run-example org.apache.spark.examples.mllib.SampledRDDs + * }}} + * If you use it as a template to create your own app, please use `spark-submit` to submit your app. + */ +object SampledRDDs { + + case class Params(input: String = "data/mllib/sample_binary_classification_data.txt") + + def main(args: Array[String]) { + val defaultParams = Params() + + val parser = new OptionParser[Params]("SampledRDDs") { + head("SampledRDDs: an example app for randomly generated and sampled RDDs.") + opt[String]("input") + .text(s"Input path to labeled examples in LIBSVM format, default: ${defaultParams.input}") + .action((x, c) => c.copy(input = x)) + note( + """ + |For example, the following command runs this app: + | + | bin/spark-submit --class org.apache.spark.examples.mllib.SampledRDDs \ + | examples/target/scala-*/spark-examples-*.jar + """.stripMargin) + } + + parser.parse(args, defaultParams).map { params => + run(params) + } getOrElse { + sys.exit(1) + } + } + + def run(params: Params) { + val conf = new SparkConf().setAppName(s"SampledRDDs with $params") + val sc = new SparkContext(conf) + + val fraction = 0.1 // fraction of data to sample + + val examples = MLUtils.loadLibSVMFile(sc, params.input) + val numExamples = examples.count() + if (numExamples == 0) { + throw new RuntimeException("Error: Data file had no samples to load.") + } + println(s"Loaded data with $numExamples examples from file: ${params.input}") + + // Example: RDD.sample() and RDD.takeSample() + val expectedSampleSize = (numExamples * fraction).toInt + println(s"Sampling RDD using fraction $fraction. Expected sample size = $expectedSampleSize.") + val sampledRDD = examples.sample(withReplacement = true, fraction = fraction) + println(s" RDD.sample(): sample has ${sampledRDD.count()} examples") + val sampledArray = examples.takeSample(withReplacement = true, num = expectedSampleSize) + println(s" RDD.takeSample(): sample has ${sampledArray.size} examples") + + println() + + // Example: RDD.sampleByKey() and RDD.sampleByKeyExact() + val keyedRDD = examples.map { lp => (lp.label.toInt, lp.features) } + println(s" Keyed data using label (Int) as key ==> Orig") + // Count examples per label in original data. + val keyCounts = keyedRDD.countByKey() + + // Subsample, and count examples per label in sampled data. (approximate) + val fractions = keyCounts.keys.map((_, fraction)).toMap + val sampledByKeyRDD = keyedRDD.sampleByKey(withReplacement = true, fractions = fractions) + val keyCountsB = sampledByKeyRDD.countByKey() + val sizeB = keyCountsB.values.sum + println(s" Sampled $sizeB examples using approximate stratified sampling (by label)." + + " ==> Approx Sample") + + // Subsample, and count examples per label in sampled data. (approximate) + val sampledByKeyRDDExact = + keyedRDD.sampleByKeyExact(withReplacement = true, fractions = fractions) + val keyCountsBExact = sampledByKeyRDDExact.countByKey() + val sizeBExact = keyCountsBExact.values.sum + println(s" Sampled $sizeBExact examples using exact stratified sampling (by label)." + + " ==> Exact Sample") + + // Compare samples + println(s" \tFractions of examples with key") + println(s"Key\tOrig\tApprox Sample\tExact Sample") + keyCounts.keys.toSeq.sorted.foreach { key => + val origFrac = keyCounts(key) / numExamples.toDouble + val approxFrac = if (sizeB != 0) { + keyCountsB.getOrElse(key, 0L) / sizeB.toDouble + } else { + 0 + } + val exactFrac = if (sizeBExact != 0) { + keyCountsBExact.getOrElse(key, 0L) / sizeBExact.toDouble + } else { + 0 + } + println(s"$key\t$origFrac\t$approxFrac\t$exactFrac") + } + + sc.stop() + } +} |