aboutsummaryrefslogtreecommitdiff
path: root/sql/hive/src
diff options
context:
space:
mode:
authorTejas Patil <tejasp@fb.com>2016-09-10 09:27:22 +0800
committerWenchen Fan <wenchen@databricks.com>2016-09-10 09:27:22 +0800
commit335491704c526921da3b3c5035175677ba5b92de (patch)
tree43194c82c849831ea54c96c012aef4f6f04d49ca /sql/hive/src
parentf7d2143705c8c1baeed0bc62940f9dba636e705b (diff)
downloadspark-335491704c526921da3b3c5035175677ba5b92de.tar.gz
spark-335491704c526921da3b3c5035175677ba5b92de.tar.bz2
spark-335491704c526921da3b3c5035175677ba5b92de.zip
[SPARK-15453][SQL] FileSourceScanExec to extract `outputOrdering` information
## What changes were proposed in this pull request? Jira : https://issues.apache.org/jira/browse/SPARK-15453 Extracting sort ordering information in `FileSourceScanExec` so that planner can make use of it. My motivation to make this change was to get Sort Merge join in par with Hive's Sort-Merge-Bucket join when the source tables are bucketed + sorted. Query: ``` val df = (0 until 16).map(i => (i % 8, i * 2, i.toString)).toDF("i", "j", "k").coalesce(1) df.write.bucketBy(8, "j", "k").sortBy("j", "k").saveAsTable("table8") df.write.bucketBy(8, "j", "k").sortBy("j", "k").saveAsTable("table9") context.sql("SELECT * FROM table8 a JOIN table9 b ON a.j=b.j AND a.k=b.k").explain(true) ``` Before: ``` == Physical Plan == *SortMergeJoin [j#120, k#121], [j#123, k#124], Inner :- *Sort [j#120 ASC, k#121 ASC], false, 0 : +- *Project [i#119, j#120, k#121] : +- *Filter (isnotnull(k#121) && isnotnull(j#120)) : +- *FileScan orc default.table8[i#119,j#120,k#121] Batched: false, Format: ORC, InputPaths: file:/Users/tejasp/Desktop/dev/tp-spark/spark-warehouse/table8, PartitionFilters: [], PushedFilters: [IsNotNull(k), IsNotNull(j)], ReadSchema: struct<i:int,j:int,k:string> +- *Sort [j#123 ASC, k#124 ASC], false, 0 +- *Project [i#122, j#123, k#124] +- *Filter (isnotnull(k#124) && isnotnull(j#123)) +- *FileScan orc default.table9[i#122,j#123,k#124] Batched: false, Format: ORC, InputPaths: file:/Users/tejasp/Desktop/dev/tp-spark/spark-warehouse/table9, PartitionFilters: [], PushedFilters: [IsNotNull(k), IsNotNull(j)], ReadSchema: struct<i:int,j:int,k:string> ``` After: (note that the `Sort` step is no longer there) ``` == Physical Plan == *SortMergeJoin [j#49, k#50], [j#52, k#53], Inner :- *Project [i#48, j#49, k#50] : +- *Filter (isnotnull(k#50) && isnotnull(j#49)) : +- *FileScan orc default.table8[i#48,j#49,k#50] Batched: false, Format: ORC, InputPaths: file:/Users/tejasp/Desktop/dev/tp-spark/spark-warehouse/table8, PartitionFilters: [], PushedFilters: [IsNotNull(k), IsNotNull(j)], ReadSchema: struct<i:int,j:int,k:string> +- *Project [i#51, j#52, k#53] +- *Filter (isnotnull(j#52) && isnotnull(k#53)) +- *FileScan orc default.table9[i#51,j#52,k#53] Batched: false, Format: ORC, InputPaths: file:/Users/tejasp/Desktop/dev/tp-spark/spark-warehouse/table9, PartitionFilters: [], PushedFilters: [IsNotNull(j), IsNotNull(k)], ReadSchema: struct<i:int,j:int,k:string> ``` ## How was this patch tested? Added a test case in `JoinSuite`. Ran all other tests in `JoinSuite` Author: Tejas Patil <tejasp@fb.com> Closes #14864 from tejasapatil/SPARK-15453_smb_optimization.
Diffstat (limited to 'sql/hive/src')
-rw-r--r--sql/hive/src/test/scala/org/apache/spark/sql/sources/BucketedReadSuite.scala63
1 files changed, 61 insertions, 2 deletions
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/sources/BucketedReadSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/sources/BucketedReadSuite.scala
index ca2ec9f6a5..3ff85176de 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/sources/BucketedReadSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/sources/BucketedReadSuite.scala
@@ -23,7 +23,7 @@ import org.apache.spark.sql._
import org.apache.spark.sql.catalyst.catalog.BucketSpec
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.plans.physical.HashPartitioning
-import org.apache.spark.sql.execution.DataSourceScanExec
+import org.apache.spark.sql.execution.{DataSourceScanExec, SortExec}
import org.apache.spark.sql.execution.datasources.DataSourceStrategy
import org.apache.spark.sql.execution.exchange.ShuffleExchange
import org.apache.spark.sql.execution.joins.SortMergeJoinExec
@@ -237,7 +237,9 @@ class BucketedReadSuite extends QueryTest with SQLTestUtils with TestHiveSinglet
bucketSpecRight: Option[BucketSpec],
joinColumns: Seq[String],
shuffleLeft: Boolean,
- shuffleRight: Boolean): Unit = {
+ shuffleRight: Boolean,
+ sortLeft: Boolean = true,
+ sortRight: Boolean = true): Unit = {
withTable("bucketed_table1", "bucketed_table2") {
def withBucket(
writer: DataFrameWriter[Row],
@@ -247,6 +249,15 @@ class BucketedReadSuite extends QueryTest with SQLTestUtils with TestHiveSinglet
spec.numBuckets,
spec.bucketColumnNames.head,
spec.bucketColumnNames.tail: _*)
+
+ if (spec.sortColumnNames.nonEmpty) {
+ writer.sortBy(
+ spec.sortColumnNames.head,
+ spec.sortColumnNames.tail: _*
+ )
+ } else {
+ writer
+ }
}.getOrElse(writer)
}
@@ -267,12 +278,21 @@ class BucketedReadSuite extends QueryTest with SQLTestUtils with TestHiveSinglet
assert(joined.queryExecution.executedPlan.isInstanceOf[SortMergeJoinExec])
val joinOperator = joined.queryExecution.executedPlan.asInstanceOf[SortMergeJoinExec]
+ // check existence of shuffle
assert(
joinOperator.left.find(_.isInstanceOf[ShuffleExchange]).isDefined == shuffleLeft,
s"expected shuffle in plan to be $shuffleLeft but found\n${joinOperator.left}")
assert(
joinOperator.right.find(_.isInstanceOf[ShuffleExchange]).isDefined == shuffleRight,
s"expected shuffle in plan to be $shuffleRight but found\n${joinOperator.right}")
+
+ // check existence of sort
+ assert(
+ joinOperator.left.find(_.isInstanceOf[SortExec]).isDefined == sortLeft,
+ s"expected sort in plan to be $shuffleLeft but found\n${joinOperator.left}")
+ assert(
+ joinOperator.right.find(_.isInstanceOf[SortExec]).isDefined == sortRight,
+ s"expected sort in plan to be $shuffleRight but found\n${joinOperator.right}")
}
}
}
@@ -321,6 +341,45 @@ class BucketedReadSuite extends QueryTest with SQLTestUtils with TestHiveSinglet
}
}
+ test("avoid shuffle and sort when bucket and sort columns are join keys") {
+ val bucketSpec = Some(BucketSpec(8, Seq("i", "j"), Seq("i", "j")))
+ testBucketing(
+ bucketSpec, bucketSpec, Seq("i", "j"),
+ shuffleLeft = false, shuffleRight = false,
+ sortLeft = false, sortRight = false
+ )
+ }
+
+ test("avoid shuffle and sort when sort columns are a super set of join keys") {
+ val bucketSpec1 = Some(BucketSpec(8, Seq("i"), Seq("i", "j")))
+ val bucketSpec2 = Some(BucketSpec(8, Seq("i"), Seq("i", "k")))
+ testBucketing(
+ bucketSpec1, bucketSpec2, Seq("i"),
+ shuffleLeft = false, shuffleRight = false,
+ sortLeft = false, sortRight = false
+ )
+ }
+
+ test("only sort one side when sort columns are different") {
+ val bucketSpec1 = Some(BucketSpec(8, Seq("i", "j"), Seq("i", "j")))
+ val bucketSpec2 = Some(BucketSpec(8, Seq("i", "j"), Seq("k")))
+ testBucketing(
+ bucketSpec1, bucketSpec2, Seq("i", "j"),
+ shuffleLeft = false, shuffleRight = false,
+ sortLeft = false, sortRight = true
+ )
+ }
+
+ test("only sort one side when sort columns are same but their ordering is different") {
+ val bucketSpec1 = Some(BucketSpec(8, Seq("i", "j"), Seq("i", "j")))
+ val bucketSpec2 = Some(BucketSpec(8, Seq("i", "j"), Seq("j", "i")))
+ testBucketing(
+ bucketSpec1, bucketSpec2, Seq("i", "j"),
+ shuffleLeft = false, shuffleRight = false,
+ sortLeft = false, sortRight = true
+ )
+ }
+
test("avoid shuffle when grouping keys are equal to bucket keys") {
withTable("bucketed_table") {
df1.write.format("parquet").bucketBy(8, "i", "j").saveAsTable("bucketed_table")