aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala4
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/stat/FrequentItems.scala2
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/stat/StatFunctions.scala2
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/sources/commands.scala4
4 files changed, 5 insertions, 7 deletions
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala
index 8fe1f7e34c..caad2da80b 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala
@@ -1469,14 +1469,12 @@ class DataFrame private[sql](
lazy val rdd: RDD[Row] = {
// use a local variable to make sure the map closure doesn't capture the whole DataFrame
val schema = this.schema
- internalRowRdd.mapPartitions { rows =>
+ queryExecution.toRdd.mapPartitions { rows =>
val converter = CatalystTypeConverters.createToScalaConverter(schema)
rows.map(converter(_).asInstanceOf[Row])
}
}
- private[sql] def internalRowRdd = queryExecution.executedPlan.execute()
-
/**
* Returns the content of the [[DataFrame]] as a [[JavaRDD]] of [[Row]]s.
* @group rdd
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/stat/FrequentItems.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/stat/FrequentItems.scala
index 3ebbf96090..4e2e2c210d 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/stat/FrequentItems.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/stat/FrequentItems.scala
@@ -90,7 +90,7 @@ private[sql] object FrequentItems extends Logging {
(name, originalSchema.fields(index).dataType)
}
- val freqItems = df.select(cols.map(Column(_)) : _*).internalRowRdd.aggregate(countMaps)(
+ val freqItems = df.select(cols.map(Column(_)) : _*).queryExecution.toRdd.aggregate(countMaps)(
seqOp = (counts, row) => {
var i = 0
while (i < numCols) {
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/stat/StatFunctions.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/stat/StatFunctions.scala
index b624ef7e8f..23ddfa9839 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/stat/StatFunctions.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/stat/StatFunctions.scala
@@ -82,7 +82,7 @@ private[sql] object StatFunctions extends Logging {
s"with dataType ${data.get.dataType} not supported.")
}
val columns = cols.map(n => Column(Cast(Column(n).expr, DoubleType)))
- df.select(columns: _*).internalRowRdd.aggregate(new CovarianceCounter)(
+ df.select(columns: _*).queryExecution.toRdd.aggregate(new CovarianceCounter)(
seqOp = (counter, row) => {
counter.add(row.getDouble(0), row.getDouble(1))
},
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/sources/commands.scala b/sql/core/src/main/scala/org/apache/spark/sql/sources/commands.scala
index 42b51caab5..7214eb0b41 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/sources/commands.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/sources/commands.scala
@@ -154,7 +154,7 @@ private[sql] case class InsertIntoHadoopFsRelation(
writerContainer.driverSideSetup()
try {
- df.sqlContext.sparkContext.runJob(df.internalRowRdd, writeRows _)
+ df.sqlContext.sparkContext.runJob(df.queryExecution.toRdd, writeRows _)
writerContainer.commitJob()
relation.refresh()
} catch { case cause: Throwable =>
@@ -220,7 +220,7 @@ private[sql] case class InsertIntoHadoopFsRelation(
writerContainer.driverSideSetup()
try {
- df.sqlContext.sparkContext.runJob(df.internalRowRdd, writeRows _)
+ df.sqlContext.sparkContext.runJob(df.queryExecution.toRdd, writeRows _)
writerContainer.commitJob()
relation.refresh()
} catch { case cause: Throwable =>