aboutsummaryrefslogtreecommitdiff
path: root/examples
diff options
context:
space:
mode:
authorCheng Lian <lian@databricks.com>2016-02-27 00:28:30 +0800
committerCheng Lian <lian@databricks.com>2016-02-27 00:28:30 +0800
commit99dfcedbfd4c83c7b6a343456f03e8c6e29968c5 (patch)
treeeb08a31c82b94e1582a0b6aa265038cb5fd44403 /examples
parent5c3912e5c90ce659146c3056430d100604378b71 (diff)
downloadspark-99dfcedbfd4c83c7b6a343456f03e8c6e29968c5.tar.gz
spark-99dfcedbfd4c83c7b6a343456f03e8c6e29968c5.tar.bz2
spark-99dfcedbfd4c83c7b6a343456f03e8c6e29968c5.zip
[SPARK-13457][SQL] Removes DataFrame RDD operations
## What changes were proposed in this pull request? This is another try of PR #11323. This PR removes DataFrame RDD operations except for `foreach` and `foreachPartitions` (they are actions rather than transformations). Original calls are now replaced by calls to methods of `DataFrame.rdd`. PR #11323 was reverted because it introduced a regression: both `DataFrame.foreach` and `DataFrame.foreachPartitions` wrap underlying RDD operations with `withNewExecutionId` to track Spark jobs. But they are removed in #11323. ## How was the this patch tested? No extra tests are added. Existing tests should do the work. Author: Cheng Lian <lian@databricks.com> Closes #11388 from liancheng/remove-df-rdd-ops.
Diffstat (limited to 'examples')
-rw-r--r--examples/src/main/scala/org/apache/spark/examples/ml/DataFrameExample.scala2
-rw-r--r--examples/src/main/scala/org/apache/spark/examples/ml/DecisionTreeExample.scala8
-rw-r--r--examples/src/main/scala/org/apache/spark/examples/ml/OneVsRestExample.scala2
-rw-r--r--examples/src/main/scala/org/apache/spark/examples/mllib/LDAExample.scala1
-rw-r--r--examples/src/main/scala/org/apache/spark/examples/sql/RDDRelation.scala2
-rw-r--r--examples/src/main/scala/org/apache/spark/examples/sql/hive/HiveFromSpark.scala2
6 files changed, 9 insertions, 8 deletions
diff --git a/examples/src/main/scala/org/apache/spark/examples/ml/DataFrameExample.scala b/examples/src/main/scala/org/apache/spark/examples/ml/DataFrameExample.scala
index 0a477abae5..7e608a2812 100644
--- a/examples/src/main/scala/org/apache/spark/examples/ml/DataFrameExample.scala
+++ b/examples/src/main/scala/org/apache/spark/examples/ml/DataFrameExample.scala
@@ -79,7 +79,7 @@ object DataFrameExample {
labelSummary.show()
// Convert features column to an RDD of vectors.
- val features = df.select("features").map { case Row(v: Vector) => v }
+ val features = df.select("features").rdd.map { case Row(v: Vector) => v }
val featureSummary = features.aggregate(new MultivariateOnlineSummarizer())(
(summary, feat) => summary.add(feat),
(sum1, sum2) => sum1.merge(sum2))
diff --git a/examples/src/main/scala/org/apache/spark/examples/ml/DecisionTreeExample.scala b/examples/src/main/scala/org/apache/spark/examples/ml/DecisionTreeExample.scala
index a37d12aa63..d2560cc00b 100644
--- a/examples/src/main/scala/org/apache/spark/examples/ml/DecisionTreeExample.scala
+++ b/examples/src/main/scala/org/apache/spark/examples/ml/DecisionTreeExample.scala
@@ -310,8 +310,8 @@ object DecisionTreeExample {
data: DataFrame,
labelColName: String): Unit = {
val fullPredictions = model.transform(data).cache()
- val predictions = fullPredictions.select("prediction").map(_.getDouble(0))
- val labels = fullPredictions.select(labelColName).map(_.getDouble(0))
+ val predictions = fullPredictions.select("prediction").rdd.map(_.getDouble(0))
+ val labels = fullPredictions.select(labelColName).rdd.map(_.getDouble(0))
// Print number of classes for reference
val numClasses = MetadataUtils.getNumClasses(fullPredictions.schema(labelColName)) match {
case Some(n) => n
@@ -335,8 +335,8 @@ object DecisionTreeExample {
data: DataFrame,
labelColName: String): Unit = {
val fullPredictions = model.transform(data).cache()
- val predictions = fullPredictions.select("prediction").map(_.getDouble(0))
- val labels = fullPredictions.select(labelColName).map(_.getDouble(0))
+ val predictions = fullPredictions.select("prediction").rdd.map(_.getDouble(0))
+ val labels = fullPredictions.select(labelColName).rdd.map(_.getDouble(0))
val RMSE = new RegressionMetrics(predictions.zip(labels)).rootMeanSquaredError
println(s" Root mean squared error (RMSE): $RMSE")
}
diff --git a/examples/src/main/scala/org/apache/spark/examples/ml/OneVsRestExample.scala b/examples/src/main/scala/org/apache/spark/examples/ml/OneVsRestExample.scala
index ccee3b2aef..a0bb5dabf4 100644
--- a/examples/src/main/scala/org/apache/spark/examples/ml/OneVsRestExample.scala
+++ b/examples/src/main/scala/org/apache/spark/examples/ml/OneVsRestExample.scala
@@ -155,7 +155,7 @@ object OneVsRestExample {
// evaluate the model
val predictionsAndLabels = predictions.select("prediction", "label")
- .map(row => (row.getDouble(0), row.getDouble(1)))
+ .rdd.map(row => (row.getDouble(0), row.getDouble(1)))
val metrics = new MulticlassMetrics(predictionsAndLabels)
diff --git a/examples/src/main/scala/org/apache/spark/examples/mllib/LDAExample.scala b/examples/src/main/scala/org/apache/spark/examples/mllib/LDAExample.scala
index d28323555b..038b2fe611 100644
--- a/examples/src/main/scala/org/apache/spark/examples/mllib/LDAExample.scala
+++ b/examples/src/main/scala/org/apache/spark/examples/mllib/LDAExample.scala
@@ -221,6 +221,7 @@ object LDAExample {
val model = pipeline.fit(df)
val documents = model.transform(df)
.select("features")
+ .rdd
.map { case Row(features: Vector) => features }
.zipWithIndex()
.map(_.swap)
diff --git a/examples/src/main/scala/org/apache/spark/examples/sql/RDDRelation.scala b/examples/src/main/scala/org/apache/spark/examples/sql/RDDRelation.scala
index a2f0fcd0e4..620ff07631 100644
--- a/examples/src/main/scala/org/apache/spark/examples/sql/RDDRelation.scala
+++ b/examples/src/main/scala/org/apache/spark/examples/sql/RDDRelation.scala
@@ -52,7 +52,7 @@ object RDDRelation {
val rddFromSql = sqlContext.sql("SELECT key, value FROM records WHERE key < 10")
println("Result of RDD.map:")
- rddFromSql.map(row => s"Key: ${row(0)}, Value: ${row(1)}").collect().foreach(println)
+ rddFromSql.rdd.map(row => s"Key: ${row(0)}, Value: ${row(1)}").collect().foreach(println)
// Queries can also be written using a LINQ-like Scala DSL.
df.where($"key" === 1).orderBy($"value".asc).select($"key").collect().foreach(println)
diff --git a/examples/src/main/scala/org/apache/spark/examples/sql/hive/HiveFromSpark.scala b/examples/src/main/scala/org/apache/spark/examples/sql/hive/HiveFromSpark.scala
index 4e427f54da..b654a2c8d4 100644
--- a/examples/src/main/scala/org/apache/spark/examples/sql/hive/HiveFromSpark.scala
+++ b/examples/src/main/scala/org/apache/spark/examples/sql/hive/HiveFromSpark.scala
@@ -63,7 +63,7 @@ object HiveFromSpark {
val rddFromSql = sql("SELECT key, value FROM src WHERE key < 10 ORDER BY key")
println("Result of RDD.map:")
- val rddAsStrings = rddFromSql.map {
+ val rddAsStrings = rddFromSql.rdd.map {
case Row(key: Int, value: String) => s"Key: $key, Value: $value"
}