aboutsummaryrefslogtreecommitdiff
path: root/examples
diff options
context:
space:
mode:
authorJoseph K. Bradley <joseph.kurata.bradley@gmail.com>2014-08-18 18:01:39 -0700
committerXiangrui Meng <meng@databricks.com>2014-08-18 18:01:39 -0700
commitc8b16ca0d86cc60fb960eebf0cb383f159a88b03 (patch)
tree27f6b16cc7bd14af681d1678fda53ea3051e2e36 /examples
parent115eeb30dd9c9dd10685a71f2c23ca23794d3142 (diff)
downloadspark-c8b16ca0d86cc60fb960eebf0cb383f159a88b03.tar.gz
spark-c8b16ca0d86cc60fb960eebf0cb383f159a88b03.tar.bz2
spark-c8b16ca0d86cc60fb960eebf0cb383f159a88b03.zip
[SPARK-2850] [SPARK-2626] [mllib] MLlib stats examples + small fixes
Added examples for statistical summarization: * Scala: StatisticalSummary.scala ** Tests: correlation, MultivariateOnlineSummarizer * python: statistical_summary.py ** Tests: correlation (since MultivariateOnlineSummarizer has no Python API) Added examples for random and sampled RDDs: * Scala: RandomAndSampledRDDs.scala * python: random_and_sampled_rdds.py * Both test: ** RandomRDDGenerators.normalRDD, normalVectorRDD ** RDD.sample, takeSample, sampleByKey Added sc.stop() to all examples. CorrelationSuite.scala * Added 1 test for RDDs with only 1 value RowMatrix.scala * numCols(): Added check for numRows = 0, with error message. * computeCovariance(): Added check for numRows <= 1, with error message. Python SparseVector (pyspark/mllib/linalg.py) * Added toDense() function python/run-tests script * Added stat.py (doc test) CC: mengxr dorx Main changes were examples to show usage across APIs. Author: Joseph K. Bradley <joseph.kurata.bradley@gmail.com> Closes #1878 from jkbradley/mllib-stats-api-check and squashes the following commits: ea5c047 [Joseph K. Bradley] Merge remote-tracking branch 'upstream/master' into mllib-stats-api-check dafebe2 [Joseph K. Bradley] Bug fixes for examples SampledRDDs.scala and sampled_rdds.py: Check for division by 0 and for missing key in maps. 8d1e555 [Joseph K. Bradley] Merge remote-tracking branch 'upstream/master' into mllib-stats-api-check 60c72d9 [Joseph K. Bradley] Fixed stat.py doc test to work for Python versions printing nan or NaN. b20d90a [Joseph K. Bradley] Merge remote-tracking branch 'upstream/master' into mllib-stats-api-check 4e5d15e [Joseph K. Bradley] Changed pyspark/mllib/stat.py doc tests to use NaN instead of nan. 32173b7 [Joseph K. Bradley] Stats examples update. c8c20dc [Joseph K. Bradley] Merge remote-tracking branch 'upstream/master' into mllib-stats-api-check cf70b07 [Joseph K. Bradley] Merge remote-tracking branch 'upstream/master' into mllib-stats-api-check 0b7cec3 [Joseph K. Bradley] Small updates based on code review. Renamed statistical_summary.py to correlations.py ab48f6e [Joseph K. Bradley] RowMatrix.scala * numCols(): Added check for numRows = 0, with error message. * computeCovariance(): Added check for numRows <= 1, with error message. 65e4ebc [Joseph K. Bradley] Merge remote-tracking branch 'upstream/master' into mllib-stats-api-check 8195c78 [Joseph K. Bradley] Added examples for random and sampled RDDs: * Scala: RandomAndSampledRDDs.scala * python: random_and_sampled_rdds.py * Both test: ** RandomRDDGenerators.normalRDD, normalVectorRDD ** RDD.sample, takeSample, sampleByKey 064985b [Joseph K. Bradley] Merge remote-tracking branch 'upstream/master' into mllib-stats-api-check ee918e9 [Joseph K. Bradley] Added examples for statistical summarization: * Scala: StatisticalSummary.scala ** Tests: correlation, MultivariateOnlineSummarizer * python: statistical_summary.py ** Tests: correlation (since MultivariateOnlineSummarizer has no Python API)
Diffstat (limited to 'examples')
-rwxr-xr-xexamples/src/main/python/als.py2
-rw-r--r--examples/src/main/python/cassandra_inputformat.py2
-rw-r--r--examples/src/main/python/cassandra_outputformat.py2
-rw-r--r--examples/src/main/python/hbase_inputformat.py2
-rw-r--r--examples/src/main/python/hbase_outputformat.py2
-rwxr-xr-xexamples/src/main/python/kmeans.py2
-rwxr-xr-xexamples/src/main/python/logistic_regression.py2
-rwxr-xr-xexamples/src/main/python/mllib/correlations.py60
-rwxr-xr-xexamples/src/main/python/mllib/decision_tree_runner.py5
-rwxr-xr-xexamples/src/main/python/mllib/kmeans.py1
-rwxr-xr-xexamples/src/main/python/mllib/logistic_regression.py1
-rwxr-xr-xexamples/src/main/python/mllib/random_rdd_generation.py55
-rwxr-xr-xexamples/src/main/python/mllib/sampled_rdds.py86
-rwxr-xr-xexamples/src/main/python/pagerank.py2
-rwxr-xr-xexamples/src/main/python/pi.py2
-rwxr-xr-xexamples/src/main/python/sort.py2
-rwxr-xr-xexamples/src/main/python/transitive_closure.py2
-rwxr-xr-xexamples/src/main/python/wordcount.py2
-rw-r--r--examples/src/main/scala/org/apache/spark/examples/mllib/Correlations.scala92
-rw-r--r--examples/src/main/scala/org/apache/spark/examples/mllib/MultivariateSummarizer.scala98
-rw-r--r--examples/src/main/scala/org/apache/spark/examples/mllib/RandomRDDGeneration.scala60
-rw-r--r--examples/src/main/scala/org/apache/spark/examples/mllib/SampledRDDs.scala126
22 files changed, 608 insertions, 0 deletions
diff --git a/examples/src/main/python/als.py b/examples/src/main/python/als.py
index c862650b0a..5b1fa4d997 100755
--- a/examples/src/main/python/als.py
+++ b/examples/src/main/python/als.py
@@ -97,3 +97,5 @@ if __name__ == "__main__":
error = rmse(R, ms, us)
print "Iteration %d:" % i
print "\nRMSE: %5.4f\n" % error
+
+ sc.stop()
diff --git a/examples/src/main/python/cassandra_inputformat.py b/examples/src/main/python/cassandra_inputformat.py
index 39fa6b0d22..e4a897f61e 100644
--- a/examples/src/main/python/cassandra_inputformat.py
+++ b/examples/src/main/python/cassandra_inputformat.py
@@ -77,3 +77,5 @@ if __name__ == "__main__":
output = cass_rdd.collect()
for (k, v) in output:
print (k, v)
+
+ sc.stop()
diff --git a/examples/src/main/python/cassandra_outputformat.py b/examples/src/main/python/cassandra_outputformat.py
index 1dfbf98604..836c35b5c6 100644
--- a/examples/src/main/python/cassandra_outputformat.py
+++ b/examples/src/main/python/cassandra_outputformat.py
@@ -81,3 +81,5 @@ if __name__ == "__main__":
conf=conf,
keyConverter="org.apache.spark.examples.pythonconverters.ToCassandraCQLKeyConverter",
valueConverter="org.apache.spark.examples.pythonconverters.ToCassandraCQLValueConverter")
+
+ sc.stop()
diff --git a/examples/src/main/python/hbase_inputformat.py b/examples/src/main/python/hbase_inputformat.py
index c9fa8e171c..befacee0de 100644
--- a/examples/src/main/python/hbase_inputformat.py
+++ b/examples/src/main/python/hbase_inputformat.py
@@ -71,3 +71,5 @@ if __name__ == "__main__":
output = hbase_rdd.collect()
for (k, v) in output:
print (k, v)
+
+ sc.stop()
diff --git a/examples/src/main/python/hbase_outputformat.py b/examples/src/main/python/hbase_outputformat.py
index 5e11548fd1..49bbc5aebd 100644
--- a/examples/src/main/python/hbase_outputformat.py
+++ b/examples/src/main/python/hbase_outputformat.py
@@ -63,3 +63,5 @@ if __name__ == "__main__":
conf=conf,
keyConverter="org.apache.spark.examples.pythonconverters.StringToImmutableBytesWritableConverter",
valueConverter="org.apache.spark.examples.pythonconverters.StringListToPutConverter")
+
+ sc.stop()
diff --git a/examples/src/main/python/kmeans.py b/examples/src/main/python/kmeans.py
index 036bdf4c4f..86ef6f32c8 100755
--- a/examples/src/main/python/kmeans.py
+++ b/examples/src/main/python/kmeans.py
@@ -77,3 +77,5 @@ if __name__ == "__main__":
kPoints[x] = y
print "Final centers: " + str(kPoints)
+
+ sc.stop()
diff --git a/examples/src/main/python/logistic_regression.py b/examples/src/main/python/logistic_regression.py
index 8456b272f9..3aa56b0528 100755
--- a/examples/src/main/python/logistic_regression.py
+++ b/examples/src/main/python/logistic_regression.py
@@ -80,3 +80,5 @@ if __name__ == "__main__":
w -= points.map(lambda m: gradient(m, w)).reduce(add)
print "Final w: " + str(w)
+
+ sc.stop()
diff --git a/examples/src/main/python/mllib/correlations.py b/examples/src/main/python/mllib/correlations.py
new file mode 100755
index 0000000000..6b16a56e44
--- /dev/null
+++ b/examples/src/main/python/mllib/correlations.py
@@ -0,0 +1,60 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+"""
+Correlations using MLlib.
+"""
+
+import sys
+
+from pyspark import SparkContext
+from pyspark.mllib.regression import LabeledPoint
+from pyspark.mllib.stat import Statistics
+from pyspark.mllib.util import MLUtils
+
+
+if __name__ == "__main__":
+ if len(sys.argv) not in [1,2]:
+ print >> sys.stderr, "Usage: correlations (<file>)"
+ exit(-1)
+ sc = SparkContext(appName="PythonCorrelations")
+ if len(sys.argv) == 2:
+ filepath = sys.argv[1]
+ else:
+ filepath = 'data/mllib/sample_linear_regression_data.txt'
+ corrType = 'pearson'
+
+ points = MLUtils.loadLibSVMFile(sc, filepath)\
+ .map(lambda lp: LabeledPoint(lp.label, lp.features.toArray()))
+
+ print
+ print 'Summary of data file: ' + filepath
+ print '%d data points' % points.count()
+
+ # Statistics (correlations)
+ print
+ print 'Correlation (%s) between label and each feature' % corrType
+ print 'Feature\tCorrelation'
+ numFeatures = points.take(1)[0].features.size
+ labelRDD = points.map(lambda lp: lp.label)
+ for i in range(numFeatures):
+ featureRDD = points.map(lambda lp: lp.features[i])
+ corr = Statistics.corr(labelRDD, featureRDD, corrType)
+ print '%d\t%g' % (i, corr)
+ print
+
+ sc.stop()
diff --git a/examples/src/main/python/mllib/decision_tree_runner.py b/examples/src/main/python/mllib/decision_tree_runner.py
index db96a7cb37..6e4a4a0cb6 100755
--- a/examples/src/main/python/mllib/decision_tree_runner.py
+++ b/examples/src/main/python/mllib/decision_tree_runner.py
@@ -17,6 +17,8 @@
"""
Decision tree classification and regression using MLlib.
+
+This example requires NumPy (http://www.numpy.org/).
"""
import numpy, os, sys
@@ -117,6 +119,7 @@ if __name__ == "__main__":
if len(sys.argv) == 2:
dataPath = sys.argv[1]
if not os.path.isfile(dataPath):
+ sc.stop()
usage()
points = MLUtils.loadLibSVMFile(sc, dataPath)
@@ -133,3 +136,5 @@ if __name__ == "__main__":
print " Model depth: %d\n" % model.depth()
print " Training accuracy: %g\n" % getAccuracy(model, reindexedData)
print model
+
+ sc.stop()
diff --git a/examples/src/main/python/mllib/kmeans.py b/examples/src/main/python/mllib/kmeans.py
index b308132c9a..2eeb1abeeb 100755
--- a/examples/src/main/python/mllib/kmeans.py
+++ b/examples/src/main/python/mllib/kmeans.py
@@ -42,3 +42,4 @@ if __name__ == "__main__":
k = int(sys.argv[2])
model = KMeans.train(data, k)
print "Final centers: " + str(model.clusterCenters)
+ sc.stop()
diff --git a/examples/src/main/python/mllib/logistic_regression.py b/examples/src/main/python/mllib/logistic_regression.py
index 9d547ff77c..8cae27fc4a 100755
--- a/examples/src/main/python/mllib/logistic_regression.py
+++ b/examples/src/main/python/mllib/logistic_regression.py
@@ -50,3 +50,4 @@ if __name__ == "__main__":
model = LogisticRegressionWithSGD.train(points, iterations)
print "Final weights: " + str(model.weights)
print "Final intercept: " + str(model.intercept)
+ sc.stop()
diff --git a/examples/src/main/python/mllib/random_rdd_generation.py b/examples/src/main/python/mllib/random_rdd_generation.py
new file mode 100755
index 0000000000..b388d8d83f
--- /dev/null
+++ b/examples/src/main/python/mllib/random_rdd_generation.py
@@ -0,0 +1,55 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+"""
+Randomly generated RDDs.
+"""
+
+import sys
+
+from pyspark import SparkContext
+from pyspark.mllib.random import RandomRDDs
+
+
+if __name__ == "__main__":
+ if len(sys.argv) not in [1, 2]:
+ print >> sys.stderr, "Usage: random_rdd_generation"
+ exit(-1)
+
+ sc = SparkContext(appName="PythonRandomRDDGeneration")
+
+ numExamples = 10000 # number of examples to generate
+ fraction = 0.1 # fraction of data to sample
+
+ # Example: RandomRDDs.normalRDD
+ normalRDD = RandomRDDs.normalRDD(sc, numExamples)
+ print 'Generated RDD of %d examples sampled from the standard normal distribution'\
+ % normalRDD.count()
+ print ' First 5 samples:'
+ for sample in normalRDD.take(5):
+ print ' ' + str(sample)
+ print
+
+ # Example: RandomRDDs.normalVectorRDD
+ normalVectorRDD = RandomRDDs.normalVectorRDD(sc, numRows = numExamples, numCols = 2)
+ print 'Generated RDD of %d examples of length-2 vectors.' % normalVectorRDD.count()
+ print ' First 5 samples:'
+ for sample in normalVectorRDD.take(5):
+ print ' ' + str(sample)
+ print
+
+ sc.stop()
diff --git a/examples/src/main/python/mllib/sampled_rdds.py b/examples/src/main/python/mllib/sampled_rdds.py
new file mode 100755
index 0000000000..ec64a5978c
--- /dev/null
+++ b/examples/src/main/python/mllib/sampled_rdds.py
@@ -0,0 +1,86 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+"""
+Randomly sampled RDDs.
+"""
+
+import sys
+
+from pyspark import SparkContext
+from pyspark.mllib.util import MLUtils
+
+
+if __name__ == "__main__":
+ if len(sys.argv) not in [1, 2]:
+ print >> sys.stderr, "Usage: sampled_rdds <libsvm data file>"
+ exit(-1)
+ if len(sys.argv) == 2:
+ datapath = sys.argv[1]
+ else:
+ datapath = 'data/mllib/sample_binary_classification_data.txt'
+
+ sc = SparkContext(appName="PythonSampledRDDs")
+
+ fraction = 0.1 # fraction of data to sample
+
+ examples = MLUtils.loadLibSVMFile(sc, datapath)
+ numExamples = examples.count()
+ if numExamples == 0:
+ print >> sys.stderr, "Error: Data file had no samples to load."
+ exit(1)
+ print 'Loaded data with %d examples from file: %s' % (numExamples, datapath)
+
+ # Example: RDD.sample() and RDD.takeSample()
+ expectedSampleSize = int(numExamples * fraction)
+ print 'Sampling RDD using fraction %g. Expected sample size = %d.' \
+ % (fraction, expectedSampleSize)
+ sampledRDD = examples.sample(withReplacement = True, fraction = fraction)
+ print ' RDD.sample(): sample has %d examples' % sampledRDD.count()
+ sampledArray = examples.takeSample(withReplacement = True, num = expectedSampleSize)
+ print ' RDD.takeSample(): sample has %d examples' % len(sampledArray)
+
+ print
+
+ # Example: RDD.sampleByKey()
+ keyedRDD = examples.map(lambda lp: (int(lp.label), lp.features))
+ print ' Keyed data using label (Int) as key ==> Orig'
+ # Count examples per label in original data.
+ keyCountsA = keyedRDD.countByKey()
+
+ # Subsample, and count examples per label in sampled data.
+ fractions = {}
+ for k in keyCountsA.keys():
+ fractions[k] = fraction
+ sampledByKeyRDD = keyedRDD.sampleByKey(withReplacement = True, fractions = fractions)
+ keyCountsB = sampledByKeyRDD.countByKey()
+ sizeB = sum(keyCountsB.values())
+ print ' Sampled %d examples using approximate stratified sampling (by label). ==> Sample' \
+ % sizeB
+
+ # Compare samples
+ print ' \tFractions of examples with key'
+ print 'Key\tOrig\tSample'
+ for k in sorted(keyCountsA.keys()):
+ fracA = keyCountsA[k] / float(numExamples)
+ if sizeB != 0:
+ fracB = keyCountsB.get(k, 0) / float(sizeB)
+ else:
+ fracB = 0
+ print '%d\t%g\t%g' % (k, fracA, fracB)
+
+ sc.stop()
diff --git a/examples/src/main/python/pagerank.py b/examples/src/main/python/pagerank.py
index 0b96343158..b539c4128c 100755
--- a/examples/src/main/python/pagerank.py
+++ b/examples/src/main/python/pagerank.py
@@ -68,3 +68,5 @@ if __name__ == "__main__":
# Collects all URL ranks and dump them to console.
for (link, rank) in ranks.collect():
print "%s has rank: %s." % (link, rank)
+
+ sc.stop()
diff --git a/examples/src/main/python/pi.py b/examples/src/main/python/pi.py
index 21d94a2cd4..fc37459dc7 100755
--- a/examples/src/main/python/pi.py
+++ b/examples/src/main/python/pi.py
@@ -37,3 +37,5 @@ if __name__ == "__main__":
count = sc.parallelize(xrange(1, n+1), slices).map(f).reduce(add)
print "Pi is roughly %f" % (4.0 * count / n)
+
+ sc.stop()
diff --git a/examples/src/main/python/sort.py b/examples/src/main/python/sort.py
index 41d00c1b79..bb686f1751 100755
--- a/examples/src/main/python/sort.py
+++ b/examples/src/main/python/sort.py
@@ -34,3 +34,5 @@ if __name__ == "__main__":
output = sortedCount.collect()
for (num, unitcount) in output:
print num
+
+ sc.stop()
diff --git a/examples/src/main/python/transitive_closure.py b/examples/src/main/python/transitive_closure.py
index 8698369b13..bf331b542c 100755
--- a/examples/src/main/python/transitive_closure.py
+++ b/examples/src/main/python/transitive_closure.py
@@ -64,3 +64,5 @@ if __name__ == "__main__":
break
print "TC has %i edges" % tc.count()
+
+ sc.stop()
diff --git a/examples/src/main/python/wordcount.py b/examples/src/main/python/wordcount.py
index dcc095fdd0..ae6cd13b83 100755
--- a/examples/src/main/python/wordcount.py
+++ b/examples/src/main/python/wordcount.py
@@ -33,3 +33,5 @@ if __name__ == "__main__":
output = counts.collect()
for (word, count) in output:
print "%s: %i" % (word, count)
+
+ sc.stop()
diff --git a/examples/src/main/scala/org/apache/spark/examples/mllib/Correlations.scala b/examples/src/main/scala/org/apache/spark/examples/mllib/Correlations.scala
new file mode 100644
index 0000000000..d6b2fe430e
--- /dev/null
+++ b/examples/src/main/scala/org/apache/spark/examples/mllib/Correlations.scala
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.examples.mllib
+
+import scopt.OptionParser
+
+import org.apache.spark.mllib.stat.Statistics
+import org.apache.spark.mllib.util.MLUtils
+import org.apache.spark.{SparkConf, SparkContext}
+
+
+/**
+ * An example app for summarizing multivariate data from a file. Run with
+ * {{{
+ * bin/run-example org.apache.spark.examples.mllib.Correlations
+ * }}}
+ * By default, this loads a synthetic dataset from `data/mllib/sample_linear_regression_data.txt`.
+ * If you use it as a template to create your own app, please use `spark-submit` to submit your app.
+ */
+object Correlations {
+
+ case class Params(input: String = "data/mllib/sample_linear_regression_data.txt")
+
+ def main(args: Array[String]) {
+
+ val defaultParams = Params()
+
+ val parser = new OptionParser[Params]("Correlations") {
+ head("Correlations: an example app for computing correlations")
+ opt[String]("input")
+ .text(s"Input path to labeled examples in LIBSVM format, default: ${defaultParams.input}")
+ .action((x, c) => c.copy(input = x))
+ note(
+ """
+ |For example, the following command runs this app on a synthetic dataset:
+ |
+ | bin/spark-submit --class org.apache.spark.examples.mllib.Correlations \
+ | examples/target/scala-*/spark-examples-*.jar \
+ | --input data/mllib/sample_linear_regression_data.txt
+ """.stripMargin)
+ }
+
+ parser.parse(args, defaultParams).map { params =>
+ run(params)
+ } getOrElse {
+ sys.exit(1)
+ }
+ }
+
+ def run(params: Params) {
+ val conf = new SparkConf().setAppName(s"Correlations with $params")
+ val sc = new SparkContext(conf)
+
+ val examples = MLUtils.loadLibSVMFile(sc, params.input).cache()
+
+ println(s"Summary of data file: ${params.input}")
+ println(s"${examples.count()} data points")
+
+ // Calculate label -- feature correlations
+ val labelRDD = examples.map(_.label)
+ val numFeatures = examples.take(1)(0).features.size
+ val corrType = "pearson"
+ println()
+ println(s"Correlation ($corrType) between label and each feature")
+ println(s"Feature\tCorrelation")
+ var feature = 0
+ while (feature < numFeatures) {
+ val featureRDD = examples.map(_.features(feature))
+ val corr = Statistics.corr(labelRDD, featureRDD)
+ println(s"$feature\t$corr")
+ feature += 1
+ }
+ println()
+
+ sc.stop()
+ }
+}
diff --git a/examples/src/main/scala/org/apache/spark/examples/mllib/MultivariateSummarizer.scala b/examples/src/main/scala/org/apache/spark/examples/mllib/MultivariateSummarizer.scala
new file mode 100644
index 0000000000..4532512c01
--- /dev/null
+++ b/examples/src/main/scala/org/apache/spark/examples/mllib/MultivariateSummarizer.scala
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.examples.mllib
+
+import scopt.OptionParser
+
+import org.apache.spark.mllib.linalg.Vectors
+import org.apache.spark.mllib.stat.MultivariateOnlineSummarizer
+import org.apache.spark.mllib.util.MLUtils
+import org.apache.spark.{SparkConf, SparkContext}
+
+
+/**
+ * An example app for summarizing multivariate data from a file. Run with
+ * {{{
+ * bin/run-example org.apache.spark.examples.mllib.MultivariateSummarizer
+ * }}}
+ * By default, this loads a synthetic dataset from `data/mllib/sample_linear_regression_data.txt`.
+ * If you use it as a template to create your own app, please use `spark-submit` to submit your app.
+ */
+object MultivariateSummarizer {
+
+ case class Params(input: String = "data/mllib/sample_linear_regression_data.txt")
+
+ def main(args: Array[String]) {
+
+ val defaultParams = Params()
+
+ val parser = new OptionParser[Params]("MultivariateSummarizer") {
+ head("MultivariateSummarizer: an example app for MultivariateOnlineSummarizer")
+ opt[String]("input")
+ .text(s"Input path to labeled examples in LIBSVM format, default: ${defaultParams.input}")
+ .action((x, c) => c.copy(input = x))
+ note(
+ """
+ |For example, the following command runs this app on a synthetic dataset:
+ |
+ | bin/spark-submit --class org.apache.spark.examples.mllib.MultivariateSummarizer \
+ | examples/target/scala-*/spark-examples-*.jar \
+ | --input data/mllib/sample_linear_regression_data.txt
+ """.stripMargin)
+ }
+
+ parser.parse(args, defaultParams).map { params =>
+ run(params)
+ } getOrElse {
+ sys.exit(1)
+ }
+ }
+
+ def run(params: Params) {
+ val conf = new SparkConf().setAppName(s"MultivariateSummarizer with $params")
+ val sc = new SparkContext(conf)
+
+ val examples = MLUtils.loadLibSVMFile(sc, params.input).cache()
+
+ println(s"Summary of data file: ${params.input}")
+ println(s"${examples.count()} data points")
+
+ // Summarize labels
+ val labelSummary = examples.aggregate(new MultivariateOnlineSummarizer())(
+ (summary, lp) => summary.add(Vectors.dense(lp.label)),
+ (sum1, sum2) => sum1.merge(sum2))
+
+ // Summarize features
+ val featureSummary = examples.aggregate(new MultivariateOnlineSummarizer())(
+ (summary, lp) => summary.add(lp.features),
+ (sum1, sum2) => sum1.merge(sum2))
+
+ println()
+ println(s"Summary statistics")
+ println(s"\tLabel\tFeatures")
+ println(s"mean\t${labelSummary.mean(0)}\t${featureSummary.mean.toArray.mkString("\t")}")
+ println(s"var\t${labelSummary.variance(0)}\t${featureSummary.variance.toArray.mkString("\t")}")
+ println(
+ s"nnz\t${labelSummary.numNonzeros(0)}\t${featureSummary.numNonzeros.toArray.mkString("\t")}")
+ println(s"max\t${labelSummary.max(0)}\t${featureSummary.max.toArray.mkString("\t")}")
+ println(s"min\t${labelSummary.min(0)}\t${featureSummary.min.toArray.mkString("\t")}")
+ println()
+
+ sc.stop()
+ }
+}
diff --git a/examples/src/main/scala/org/apache/spark/examples/mllib/RandomRDDGeneration.scala b/examples/src/main/scala/org/apache/spark/examples/mllib/RandomRDDGeneration.scala
new file mode 100644
index 0000000000..924b586e3a
--- /dev/null
+++ b/examples/src/main/scala/org/apache/spark/examples/mllib/RandomRDDGeneration.scala
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.examples.mllib
+
+import org.apache.spark.mllib.random.RandomRDDs
+import org.apache.spark.rdd.RDD
+
+import org.apache.spark.{SparkConf, SparkContext}
+
+/**
+ * An example app for randomly generated RDDs. Run with
+ * {{{
+ * bin/run-example org.apache.spark.examples.mllib.RandomRDDGeneration
+ * }}}
+ * If you use it as a template to create your own app, please use `spark-submit` to submit your app.
+ */
+object RandomRDDGeneration {
+
+ def main(args: Array[String]) {
+
+ val conf = new SparkConf().setAppName(s"RandomRDDGeneration")
+ val sc = new SparkContext(conf)
+
+ val numExamples = 10000 // number of examples to generate
+ val fraction = 0.1 // fraction of data to sample
+
+ // Example: RandomRDDs.normalRDD
+ val normalRDD: RDD[Double] = RandomRDDs.normalRDD(sc, numExamples)
+ println(s"Generated RDD of ${normalRDD.count()}" +
+ " examples sampled from the standard normal distribution")
+ println(" First 5 samples:")
+ normalRDD.take(5).foreach( x => println(s" $x") )
+
+ // Example: RandomRDDs.normalVectorRDD
+ val normalVectorRDD = RandomRDDs.normalVectorRDD(sc, numRows = numExamples, numCols = 2)
+ println(s"Generated RDD of ${normalVectorRDD.count()} examples of length-2 vectors.")
+ println(" First 5 samples:")
+ normalVectorRDD.take(5).foreach( x => println(s" $x") )
+
+ println()
+
+ sc.stop()
+ }
+
+}
diff --git a/examples/src/main/scala/org/apache/spark/examples/mllib/SampledRDDs.scala b/examples/src/main/scala/org/apache/spark/examples/mllib/SampledRDDs.scala
new file mode 100644
index 0000000000..f01b8266e3
--- /dev/null
+++ b/examples/src/main/scala/org/apache/spark/examples/mllib/SampledRDDs.scala
@@ -0,0 +1,126 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.examples.mllib
+
+import org.apache.spark.mllib.util.MLUtils
+import scopt.OptionParser
+
+import org.apache.spark.{SparkConf, SparkContext}
+import org.apache.spark.SparkContext._
+
+/**
+ * An example app for randomly generated and sampled RDDs. Run with
+ * {{{
+ * bin/run-example org.apache.spark.examples.mllib.SampledRDDs
+ * }}}
+ * If you use it as a template to create your own app, please use `spark-submit` to submit your app.
+ */
+object SampledRDDs {
+
+ case class Params(input: String = "data/mllib/sample_binary_classification_data.txt")
+
+ def main(args: Array[String]) {
+ val defaultParams = Params()
+
+ val parser = new OptionParser[Params]("SampledRDDs") {
+ head("SampledRDDs: an example app for randomly generated and sampled RDDs.")
+ opt[String]("input")
+ .text(s"Input path to labeled examples in LIBSVM format, default: ${defaultParams.input}")
+ .action((x, c) => c.copy(input = x))
+ note(
+ """
+ |For example, the following command runs this app:
+ |
+ | bin/spark-submit --class org.apache.spark.examples.mllib.SampledRDDs \
+ | examples/target/scala-*/spark-examples-*.jar
+ """.stripMargin)
+ }
+
+ parser.parse(args, defaultParams).map { params =>
+ run(params)
+ } getOrElse {
+ sys.exit(1)
+ }
+ }
+
+ def run(params: Params) {
+ val conf = new SparkConf().setAppName(s"SampledRDDs with $params")
+ val sc = new SparkContext(conf)
+
+ val fraction = 0.1 // fraction of data to sample
+
+ val examples = MLUtils.loadLibSVMFile(sc, params.input)
+ val numExamples = examples.count()
+ if (numExamples == 0) {
+ throw new RuntimeException("Error: Data file had no samples to load.")
+ }
+ println(s"Loaded data with $numExamples examples from file: ${params.input}")
+
+ // Example: RDD.sample() and RDD.takeSample()
+ val expectedSampleSize = (numExamples * fraction).toInt
+ println(s"Sampling RDD using fraction $fraction. Expected sample size = $expectedSampleSize.")
+ val sampledRDD = examples.sample(withReplacement = true, fraction = fraction)
+ println(s" RDD.sample(): sample has ${sampledRDD.count()} examples")
+ val sampledArray = examples.takeSample(withReplacement = true, num = expectedSampleSize)
+ println(s" RDD.takeSample(): sample has ${sampledArray.size} examples")
+
+ println()
+
+ // Example: RDD.sampleByKey() and RDD.sampleByKeyExact()
+ val keyedRDD = examples.map { lp => (lp.label.toInt, lp.features) }
+ println(s" Keyed data using label (Int) as key ==> Orig")
+ // Count examples per label in original data.
+ val keyCounts = keyedRDD.countByKey()
+
+ // Subsample, and count examples per label in sampled data. (approximate)
+ val fractions = keyCounts.keys.map((_, fraction)).toMap
+ val sampledByKeyRDD = keyedRDD.sampleByKey(withReplacement = true, fractions = fractions)
+ val keyCountsB = sampledByKeyRDD.countByKey()
+ val sizeB = keyCountsB.values.sum
+ println(s" Sampled $sizeB examples using approximate stratified sampling (by label)." +
+ " ==> Approx Sample")
+
+ // Subsample, and count examples per label in sampled data. (approximate)
+ val sampledByKeyRDDExact =
+ keyedRDD.sampleByKeyExact(withReplacement = true, fractions = fractions)
+ val keyCountsBExact = sampledByKeyRDDExact.countByKey()
+ val sizeBExact = keyCountsBExact.values.sum
+ println(s" Sampled $sizeBExact examples using exact stratified sampling (by label)." +
+ " ==> Exact Sample")
+
+ // Compare samples
+ println(s" \tFractions of examples with key")
+ println(s"Key\tOrig\tApprox Sample\tExact Sample")
+ keyCounts.keys.toSeq.sorted.foreach { key =>
+ val origFrac = keyCounts(key) / numExamples.toDouble
+ val approxFrac = if (sizeB != 0) {
+ keyCountsB.getOrElse(key, 0L) / sizeB.toDouble
+ } else {
+ 0
+ }
+ val exactFrac = if (sizeBExact != 0) {
+ keyCountsBExact.getOrElse(key, 0L) / sizeBExact.toDouble
+ } else {
+ 0
+ }
+ println(s"$key\t$origFrac\t$approxFrac\t$exactFrac")
+ }
+
+ sc.stop()
+ }
+}