aboutsummaryrefslogtreecommitdiff
path: root/sql/core
diff options
context:
space:
mode:
authorWenchen Fan <cloud0fan@outlook.com>2015-07-01 01:02:33 -0700
committerMichael Armbrust <michael@databricks.com>2015-07-01 01:02:33 -0700
commit0eee0615894cda8ae1b2c8e61b8bda0ff648a219 (patch)
treec4f744f9bac701ef99e9494f1f91f8ad1b187990 /sql/core
parentfc3a6fe67f5aeda2443958c31f097daeba8549e5 (diff)
downloadspark-0eee0615894cda8ae1b2c8e61b8bda0ff648a219.tar.gz
spark-0eee0615894cda8ae1b2c8e61b8bda0ff648a219.tar.bz2
spark-0eee0615894cda8ae1b2c8e61b8bda0ff648a219.zip
[SQL] [MINOR] remove internalRowRDD in DataFrame
Developers have already familiar with `queryExecution.toRDD` as internal row RDD, and we should not add new concept. Author: Wenchen Fan <cloud0fan@outlook.com> Closes #7116 from cloud-fan/internal-rdd and squashes the following commits: 24756ca [Wenchen Fan] remove internalRowRDD
Diffstat (limited to 'sql/core')
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala4
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/stat/FrequentItems.scala2
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/stat/StatFunctions.scala2
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/sources/commands.scala4
4 files changed, 5 insertions, 7 deletions
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala
index 8fe1f7e34c..caad2da80b 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala
@@ -1469,14 +1469,12 @@ class DataFrame private[sql](
lazy val rdd: RDD[Row] = {
// use a local variable to make sure the map closure doesn't capture the whole DataFrame
val schema = this.schema
- internalRowRdd.mapPartitions { rows =>
+ queryExecution.toRdd.mapPartitions { rows =>
val converter = CatalystTypeConverters.createToScalaConverter(schema)
rows.map(converter(_).asInstanceOf[Row])
}
}
- private[sql] def internalRowRdd = queryExecution.executedPlan.execute()
-
/**
* Returns the content of the [[DataFrame]] as a [[JavaRDD]] of [[Row]]s.
* @group rdd
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/stat/FrequentItems.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/stat/FrequentItems.scala
index 3ebbf96090..4e2e2c210d 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/stat/FrequentItems.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/stat/FrequentItems.scala
@@ -90,7 +90,7 @@ private[sql] object FrequentItems extends Logging {
(name, originalSchema.fields(index).dataType)
}
- val freqItems = df.select(cols.map(Column(_)) : _*).internalRowRdd.aggregate(countMaps)(
+ val freqItems = df.select(cols.map(Column(_)) : _*).queryExecution.toRdd.aggregate(countMaps)(
seqOp = (counts, row) => {
var i = 0
while (i < numCols) {
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/stat/StatFunctions.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/stat/StatFunctions.scala
index b624ef7e8f..23ddfa9839 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/stat/StatFunctions.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/stat/StatFunctions.scala
@@ -82,7 +82,7 @@ private[sql] object StatFunctions extends Logging {
s"with dataType ${data.get.dataType} not supported.")
}
val columns = cols.map(n => Column(Cast(Column(n).expr, DoubleType)))
- df.select(columns: _*).internalRowRdd.aggregate(new CovarianceCounter)(
+ df.select(columns: _*).queryExecution.toRdd.aggregate(new CovarianceCounter)(
seqOp = (counter, row) => {
counter.add(row.getDouble(0), row.getDouble(1))
},
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/sources/commands.scala b/sql/core/src/main/scala/org/apache/spark/sql/sources/commands.scala
index 42b51caab5..7214eb0b41 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/sources/commands.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/sources/commands.scala
@@ -154,7 +154,7 @@ private[sql] case class InsertIntoHadoopFsRelation(
writerContainer.driverSideSetup()
try {
- df.sqlContext.sparkContext.runJob(df.internalRowRdd, writeRows _)
+ df.sqlContext.sparkContext.runJob(df.queryExecution.toRdd, writeRows _)
writerContainer.commitJob()
relation.refresh()
} catch { case cause: Throwable =>
@@ -220,7 +220,7 @@ private[sql] case class InsertIntoHadoopFsRelation(
writerContainer.driverSideSetup()
try {
- df.sqlContext.sparkContext.runJob(df.internalRowRdd, writeRows _)
+ df.sqlContext.sparkContext.runJob(df.queryExecution.toRdd, writeRows _)
writerContainer.commitJob()
relation.refresh()
} catch { case cause: Throwable =>