aboutsummaryrefslogtreecommitdiff
path: root/sql/hive
diff options
context:
space:
mode:
authorDavies Liu <davies@databricks.com>2016-03-12 00:48:36 -0800
committerDavies Liu <davies.liu@gmail.com>2016-03-12 00:48:36 -0800
commitba8c86d06f5968c1af4db8dd9a458005bc5f214c (patch)
treefa6a7479cef0ba8c2f6b4574b0bbd180502bed85 /sql/hive
parent2ef4c5963bff3574fe17e669d703b25ddd064e5d (diff)
downloadspark-ba8c86d06f5968c1af4db8dd9a458005bc5f214c.tar.gz
spark-ba8c86d06f5968c1af4db8dd9a458005bc5f214c.tar.bz2
spark-ba8c86d06f5968c1af4db8dd9a458005bc5f214c.zip
[SPARK-13671] [SPARK-13311] [SQL] Use different physical plans for RDD and data sources
## What changes were proposed in this pull request? This PR split the PhysicalRDD into two classes, PhysicalRDD and PhysicalScan. PhysicalRDD is used for DataFrames that is created from existing RDD. PhysicalScan is used for DataFrame that is created from data sources. This enable use to apply different optimization on both of them. Also fix the problem for sameResult() on two DataSourceScan. Also fix the equality check to toString for `In`. It's better to use Seq there, but we can't break this public API (sad). ## How was this patch tested? Existing tests. Manually tested with TPCDS query Q59 and Q64, all those duplicated exchanges can be re-used now, also saw there are 40+% performance improvement (saving half of the scan). Author: Davies Liu <davies@databricks.com> Closes #11514 from davies/existing_rdd.
Diffstat (limited to 'sql/hive')
-rw-r--r--sql/hive/src/test/scala/org/apache/spark/sql/hive/parquetSuites.scala4
-rw-r--r--sql/hive/src/test/scala/org/apache/spark/sql/sources/BucketedReadSuite.scala4
2 files changed, 4 insertions, 4 deletions
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/parquetSuites.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/parquetSuites.scala
index a0f09d6c4a..8fdbbd94c8 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/parquetSuites.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/parquetSuites.scala
@@ -20,7 +20,7 @@ package org.apache.spark.sql.hive
import java.io.File
import org.apache.spark.sql._
-import org.apache.spark.sql.execution.PhysicalRDD
+import org.apache.spark.sql.execution.DataSourceScan
import org.apache.spark.sql.execution.command.ExecutedCommand
import org.apache.spark.sql.execution.datasources.{InsertIntoDataSource, InsertIntoHadoopFsRelation, LogicalRelation}
import org.apache.spark.sql.hive.execution.HiveTableScan
@@ -196,7 +196,7 @@ class ParquetMetastoreSuite extends ParquetPartitioningTest {
}.isEmpty)
assert(
sql("SELECT * FROM normal_parquet").queryExecution.sparkPlan.collect {
- case _: PhysicalRDD => true
+ case _: DataSourceScan => true
}.nonEmpty)
}
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/sources/BucketedReadSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/sources/BucketedReadSuite.scala
index 35573f62dc..a0be55cfba 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/sources/BucketedReadSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/sources/BucketedReadSuite.scala
@@ -22,7 +22,7 @@ import java.io.File
import org.apache.spark.sql._
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.plans.physical.HashPartitioning
-import org.apache.spark.sql.execution.PhysicalRDD
+import org.apache.spark.sql.execution.DataSourceScan
import org.apache.spark.sql.execution.datasources.{BucketSpec, DataSourceStrategy}
import org.apache.spark.sql.execution.exchange.ShuffleExchange
import org.apache.spark.sql.execution.joins.SortMergeJoin
@@ -93,7 +93,7 @@ class BucketedReadSuite extends QueryTest with SQLTestUtils with TestHiveSinglet
// Filter could hide the bug in bucket pruning. Thus, skipping all the filters
val plan = bucketedDataFrame.filter(filterCondition).queryExecution.executedPlan
- val rdd = plan.find(_.isInstanceOf[PhysicalRDD])
+ val rdd = plan.find(_.isInstanceOf[DataSourceScan])
assert(rdd.isDefined, plan)
val checkedResult = rdd.get.execute().mapPartitionsWithIndex { case (index, iter) =>