diff options
author | Cheng Lian <lian@databricks.com> | 2016-02-27 00:28:30 +0800 |
---|---|---|
committer | Cheng Lian <lian@databricks.com> | 2016-02-27 00:28:30 +0800 |
commit | 99dfcedbfd4c83c7b6a343456f03e8c6e29968c5 (patch) | |
tree | eb08a31c82b94e1582a0b6aa265038cb5fd44403 /examples/src/main | |
parent | 5c3912e5c90ce659146c3056430d100604378b71 (diff) | |
download | spark-99dfcedbfd4c83c7b6a343456f03e8c6e29968c5.tar.gz spark-99dfcedbfd4c83c7b6a343456f03e8c6e29968c5.tar.bz2 spark-99dfcedbfd4c83c7b6a343456f03e8c6e29968c5.zip |
[SPARK-13457][SQL] Removes DataFrame RDD operations
## What changes were proposed in this pull request?
This is another try of PR #11323.
This PR removes DataFrame RDD operations except for `foreach` and `foreachPartitions` (they are actions rather than transformations). Original calls are now replaced by calls to methods of `DataFrame.rdd`.
PR #11323 was reverted because it introduced a regression: both `DataFrame.foreach` and `DataFrame.foreachPartitions` wrap underlying RDD operations with `withNewExecutionId` to track Spark jobs. But they are removed in #11323.
## How was the this patch tested?
No extra tests are added. Existing tests should do the work.
Author: Cheng Lian <lian@databricks.com>
Closes #11388 from liancheng/remove-df-rdd-ops.
Diffstat (limited to 'examples/src/main')
6 files changed, 9 insertions, 8 deletions
diff --git a/examples/src/main/scala/org/apache/spark/examples/ml/DataFrameExample.scala b/examples/src/main/scala/org/apache/spark/examples/ml/DataFrameExample.scala index 0a477abae5..7e608a2812 100644 --- a/examples/src/main/scala/org/apache/spark/examples/ml/DataFrameExample.scala +++ b/examples/src/main/scala/org/apache/spark/examples/ml/DataFrameExample.scala @@ -79,7 +79,7 @@ object DataFrameExample { labelSummary.show() // Convert features column to an RDD of vectors. - val features = df.select("features").map { case Row(v: Vector) => v } + val features = df.select("features").rdd.map { case Row(v: Vector) => v } val featureSummary = features.aggregate(new MultivariateOnlineSummarizer())( (summary, feat) => summary.add(feat), (sum1, sum2) => sum1.merge(sum2)) diff --git a/examples/src/main/scala/org/apache/spark/examples/ml/DecisionTreeExample.scala b/examples/src/main/scala/org/apache/spark/examples/ml/DecisionTreeExample.scala index a37d12aa63..d2560cc00b 100644 --- a/examples/src/main/scala/org/apache/spark/examples/ml/DecisionTreeExample.scala +++ b/examples/src/main/scala/org/apache/spark/examples/ml/DecisionTreeExample.scala @@ -310,8 +310,8 @@ object DecisionTreeExample { data: DataFrame, labelColName: String): Unit = { val fullPredictions = model.transform(data).cache() - val predictions = fullPredictions.select("prediction").map(_.getDouble(0)) - val labels = fullPredictions.select(labelColName).map(_.getDouble(0)) + val predictions = fullPredictions.select("prediction").rdd.map(_.getDouble(0)) + val labels = fullPredictions.select(labelColName).rdd.map(_.getDouble(0)) // Print number of classes for reference val numClasses = MetadataUtils.getNumClasses(fullPredictions.schema(labelColName)) match { case Some(n) => n @@ -335,8 +335,8 @@ object DecisionTreeExample { data: DataFrame, labelColName: String): Unit = { val fullPredictions = model.transform(data).cache() - val predictions = fullPredictions.select("prediction").map(_.getDouble(0)) - val labels = fullPredictions.select(labelColName).map(_.getDouble(0)) + val predictions = fullPredictions.select("prediction").rdd.map(_.getDouble(0)) + val labels = fullPredictions.select(labelColName).rdd.map(_.getDouble(0)) val RMSE = new RegressionMetrics(predictions.zip(labels)).rootMeanSquaredError println(s" Root mean squared error (RMSE): $RMSE") } diff --git a/examples/src/main/scala/org/apache/spark/examples/ml/OneVsRestExample.scala b/examples/src/main/scala/org/apache/spark/examples/ml/OneVsRestExample.scala index ccee3b2aef..a0bb5dabf4 100644 --- a/examples/src/main/scala/org/apache/spark/examples/ml/OneVsRestExample.scala +++ b/examples/src/main/scala/org/apache/spark/examples/ml/OneVsRestExample.scala @@ -155,7 +155,7 @@ object OneVsRestExample { // evaluate the model val predictionsAndLabels = predictions.select("prediction", "label") - .map(row => (row.getDouble(0), row.getDouble(1))) + .rdd.map(row => (row.getDouble(0), row.getDouble(1))) val metrics = new MulticlassMetrics(predictionsAndLabels) diff --git a/examples/src/main/scala/org/apache/spark/examples/mllib/LDAExample.scala b/examples/src/main/scala/org/apache/spark/examples/mllib/LDAExample.scala index d28323555b..038b2fe611 100644 --- a/examples/src/main/scala/org/apache/spark/examples/mllib/LDAExample.scala +++ b/examples/src/main/scala/org/apache/spark/examples/mllib/LDAExample.scala @@ -221,6 +221,7 @@ object LDAExample { val model = pipeline.fit(df) val documents = model.transform(df) .select("features") + .rdd .map { case Row(features: Vector) => features } .zipWithIndex() .map(_.swap) diff --git a/examples/src/main/scala/org/apache/spark/examples/sql/RDDRelation.scala b/examples/src/main/scala/org/apache/spark/examples/sql/RDDRelation.scala index a2f0fcd0e4..620ff07631 100644 --- a/examples/src/main/scala/org/apache/spark/examples/sql/RDDRelation.scala +++ b/examples/src/main/scala/org/apache/spark/examples/sql/RDDRelation.scala @@ -52,7 +52,7 @@ object RDDRelation { val rddFromSql = sqlContext.sql("SELECT key, value FROM records WHERE key < 10") println("Result of RDD.map:") - rddFromSql.map(row => s"Key: ${row(0)}, Value: ${row(1)}").collect().foreach(println) + rddFromSql.rdd.map(row => s"Key: ${row(0)}, Value: ${row(1)}").collect().foreach(println) // Queries can also be written using a LINQ-like Scala DSL. df.where($"key" === 1).orderBy($"value".asc).select($"key").collect().foreach(println) diff --git a/examples/src/main/scala/org/apache/spark/examples/sql/hive/HiveFromSpark.scala b/examples/src/main/scala/org/apache/spark/examples/sql/hive/HiveFromSpark.scala index 4e427f54da..b654a2c8d4 100644 --- a/examples/src/main/scala/org/apache/spark/examples/sql/hive/HiveFromSpark.scala +++ b/examples/src/main/scala/org/apache/spark/examples/sql/hive/HiveFromSpark.scala @@ -63,7 +63,7 @@ object HiveFromSpark { val rddFromSql = sql("SELECT key, value FROM src WHERE key < 10 ORDER BY key") println("Result of RDD.map:") - val rddAsStrings = rddFromSql.map { + val rddAsStrings = rddFromSql.rdd.map { case Row(key: Int, value: String) => s"Key: $key, Value: $value" } |