aboutsummaryrefslogtreecommitdiff
path: root/sql
diff options
context:
space:
mode:
authorXiao Li <gatorsmile@gmail.com>2017-02-20 09:04:22 -0800
committerXiao Li <gatorsmile@gmail.com>2017-02-20 09:04:22 -0800
commitead4ba0eb5841e42e6a57c1a1865bf89564e8ff9 (patch)
tree79e128414a2cbb5aef2819f67b3d2ef298258664 /sql
parentd0ecca6075d86bedebf8bc2278085a2cd6cb0a43 (diff)
downloadspark-ead4ba0eb5841e42e6a57c1a1865bf89564e8ff9.tar.gz
spark-ead4ba0eb5841e42e6a57c1a1865bf89564e8ff9.tar.bz2
spark-ead4ba0eb5841e42e6a57c1a1865bf89564e8ff9.zip
[SPARK-15453][SQL][FOLLOW-UP] FileSourceScanExec to extract `outputOrdering` information
### What changes were proposed in this pull request? `outputOrdering` is also dependent on whether the bucket has more than one files. The test cases fail when we try to move them to sql/core. This PR is to fix the test cases introduced in https://github.com/apache/spark/pull/14864 and add a test case to verify [the related logics](https://github.com/tejasapatil/spark/blob/070c24994747c0479fb2520774ede27ff1cf8cac/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala#L197-L206). ### How was this patch tested? N/A Author: Xiao Li <gatorsmile@gmail.com> Closes #16994 from gatorsmile/bucketingTS.
Diffstat (limited to 'sql')
-rw-r--r--sql/hive/src/test/scala/org/apache/spark/sql/sources/BucketedReadSuite.scala229
1 files changed, 137 insertions, 92 deletions
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/sources/BucketedReadSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/sources/BucketedReadSuite.scala
index d9ddcbd57c..4fc72b9e47 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/sources/BucketedReadSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/sources/BucketedReadSuite.scala
@@ -227,6 +227,12 @@ class BucketedReadSuite extends QueryTest with SQLTestUtils with TestHiveSinglet
private val df1 = (0 until 50).map(i => (i % 5, i % 13, i.toString)).toDF("i", "j", "k").as("df1")
private val df2 = (0 until 50).map(i => (i % 7, i % 11, i.toString)).toDF("i", "j", "k").as("df2")
+ case class BucketedTableTestSpec(
+ bucketSpec: Option[BucketSpec],
+ numPartitions: Int = 10,
+ expectedShuffle: Boolean = true,
+ expectedSort: Boolean = true)
+
/**
* A helper method to test the bucket read functionality using join. It will save `df1` and `df2`
* to hive tables, bucketed or not, according to the given bucket specifics. Next we will join
@@ -234,14 +240,15 @@ class BucketedReadSuite extends QueryTest with SQLTestUtils with TestHiveSinglet
* exists as user expected according to the `shuffleLeft` and `shuffleRight`.
*/
private def testBucketing(
- bucketSpecLeft: Option[BucketSpec],
- bucketSpecRight: Option[BucketSpec],
+ bucketedTableTestSpecLeft: BucketedTableTestSpec,
+ bucketedTableTestSpecRight: BucketedTableTestSpec,
joinType: String = "inner",
- joinCondition: (DataFrame, DataFrame) => Column,
- shuffleLeft: Boolean,
- shuffleRight: Boolean,
- sortLeft: Boolean = true,
- sortRight: Boolean = true): Unit = {
+ joinCondition: (DataFrame, DataFrame) => Column): Unit = {
+ val BucketedTableTestSpec(bucketSpecLeft, numPartitionsLeft, shuffleLeft, sortLeft) =
+ bucketedTableTestSpecLeft
+ val BucketedTableTestSpec(bucketSpecRight, numPartitionsRight, shuffleRight, sortRight) =
+ bucketedTableTestSpecRight
+
withTable("bucketed_table1", "bucketed_table2") {
def withBucket(
writer: DataFrameWriter[Row],
@@ -263,8 +270,10 @@ class BucketedReadSuite extends QueryTest with SQLTestUtils with TestHiveSinglet
}.getOrElse(writer)
}
- withBucket(df1.write.format("parquet"), bucketSpecLeft).saveAsTable("bucketed_table1")
- withBucket(df2.write.format("parquet"), bucketSpecRight).saveAsTable("bucketed_table2")
+ withBucket(df1.repartition(numPartitionsLeft).write.format("parquet"), bucketSpecLeft)
+ .saveAsTable("bucketed_table1")
+ withBucket(df2.repartition(numPartitionsRight).write.format("parquet"), bucketSpecRight)
+ .saveAsTable("bucketed_table2")
withSQLConf(SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "0",
SQLConf.WHOLESTAGE_CODEGEN_ENABLED.key -> "false") {
@@ -291,10 +300,10 @@ class BucketedReadSuite extends QueryTest with SQLTestUtils with TestHiveSinglet
// check existence of sort
assert(
joinOperator.left.find(_.isInstanceOf[SortExec]).isDefined == sortLeft,
- s"expected sort in plan to be $shuffleLeft but found\n${joinOperator.left}")
+ s"expected sort in the left child to be $sortLeft but found\n${joinOperator.left}")
assert(
joinOperator.right.find(_.isInstanceOf[SortExec]).isDefined == sortRight,
- s"expected sort in plan to be $shuffleRight but found\n${joinOperator.right}")
+ s"expected sort in the right child to be $sortRight but found\n${joinOperator.right}")
}
}
}
@@ -305,138 +314,174 @@ class BucketedReadSuite extends QueryTest with SQLTestUtils with TestHiveSinglet
test("avoid shuffle when join 2 bucketed tables") {
val bucketSpec = Some(BucketSpec(8, Seq("i", "j"), Nil))
+ val bucketedTableTestSpecLeft = BucketedTableTestSpec(bucketSpec, expectedShuffle = false)
+ val bucketedTableTestSpecRight = BucketedTableTestSpec(bucketSpec, expectedShuffle = false)
testBucketing(
- bucketSpecLeft = bucketSpec,
- bucketSpecRight = bucketSpec,
- joinCondition = joinCondition(Seq("i", "j")),
- shuffleLeft = false,
- shuffleRight = false
+ bucketedTableTestSpecLeft = bucketedTableTestSpecLeft,
+ bucketedTableTestSpecRight = bucketedTableTestSpecRight,
+ joinCondition = joinCondition(Seq("i", "j"))
)
}
// Enable it after fix https://issues.apache.org/jira/browse/SPARK-12704
ignore("avoid shuffle when join keys are a super-set of bucket keys") {
val bucketSpec = Some(BucketSpec(8, Seq("i"), Nil))
+ val bucketedTableTestSpecLeft = BucketedTableTestSpec(bucketSpec, expectedShuffle = false)
+ val bucketedTableTestSpecRight = BucketedTableTestSpec(bucketSpec, expectedShuffle = false)
testBucketing(
- bucketSpecLeft = bucketSpec,
- bucketSpecRight = bucketSpec,
- joinCondition = joinCondition(Seq("i", "j")),
- shuffleLeft = false,
- shuffleRight = false
+ bucketedTableTestSpecLeft = bucketedTableTestSpecLeft,
+ bucketedTableTestSpecRight = bucketedTableTestSpecRight,
+ joinCondition = joinCondition(Seq("i", "j"))
)
}
test("only shuffle one side when join bucketed table and non-bucketed table") {
val bucketSpec = Some(BucketSpec(8, Seq("i", "j"), Nil))
+ val bucketedTableTestSpecLeft = BucketedTableTestSpec(bucketSpec, expectedShuffle = false)
+ val bucketedTableTestSpecRight = BucketedTableTestSpec(None, expectedShuffle = true)
testBucketing(
- bucketSpecLeft = bucketSpec,
- bucketSpecRight = None,
- joinCondition = joinCondition(Seq("i", "j")),
- shuffleLeft = false,
- shuffleRight = true
+ bucketedTableTestSpecLeft = bucketedTableTestSpecLeft,
+ bucketedTableTestSpecRight = bucketedTableTestSpecRight,
+ joinCondition = joinCondition(Seq("i", "j"))
)
}
test("only shuffle one side when 2 bucketed tables have different bucket number") {
- val bucketSpec1 = Some(BucketSpec(8, Seq("i", "j"), Nil))
- val bucketSpec2 = Some(BucketSpec(5, Seq("i", "j"), Nil))
+ val bucketSpecLeft = Some(BucketSpec(8, Seq("i", "j"), Nil))
+ val bucketSpecRight = Some(BucketSpec(5, Seq("i", "j"), Nil))
+ val bucketedTableTestSpecLeft = BucketedTableTestSpec(bucketSpecLeft, expectedShuffle = false)
+ val bucketedTableTestSpecRight = BucketedTableTestSpec(bucketSpecRight, expectedShuffle = true)
testBucketing(
- bucketSpecLeft = bucketSpec1,
- bucketSpecRight = bucketSpec2,
- joinCondition = joinCondition(Seq("i", "j")),
- shuffleLeft = false,
- shuffleRight = true
+ bucketedTableTestSpecLeft = bucketedTableTestSpecLeft,
+ bucketedTableTestSpecRight = bucketedTableTestSpecRight,
+ joinCondition = joinCondition(Seq("i", "j"))
)
}
test("only shuffle one side when 2 bucketed tables have different bucket keys") {
- val bucketSpec1 = Some(BucketSpec(8, Seq("i"), Nil))
- val bucketSpec2 = Some(BucketSpec(8, Seq("j"), Nil))
+ val bucketSpecLeft = Some(BucketSpec(8, Seq("i"), Nil))
+ val bucketSpecRight = Some(BucketSpec(8, Seq("j"), Nil))
+ val bucketedTableTestSpecLeft = BucketedTableTestSpec(bucketSpecLeft, expectedShuffle = false)
+ val bucketedTableTestSpecRight = BucketedTableTestSpec(bucketSpecRight, expectedShuffle = true)
testBucketing(
- bucketSpecLeft = bucketSpec1,
- bucketSpecRight = bucketSpec2,
- joinCondition = joinCondition(Seq("i")),
- shuffleLeft = false,
- shuffleRight = true
+ bucketedTableTestSpecLeft = bucketedTableTestSpecLeft,
+ bucketedTableTestSpecRight = bucketedTableTestSpecRight,
+ joinCondition = joinCondition(Seq("i"))
)
}
test("shuffle when join keys are not equal to bucket keys") {
val bucketSpec = Some(BucketSpec(8, Seq("i"), Nil))
+ val bucketedTableTestSpecLeft = BucketedTableTestSpec(bucketSpec, expectedShuffle = true)
+ val bucketedTableTestSpecRight = BucketedTableTestSpec(bucketSpec, expectedShuffle = true)
testBucketing(
- bucketSpecLeft = bucketSpec,
- bucketSpecRight = bucketSpec,
- joinCondition = joinCondition(Seq("j")),
- shuffleLeft = true,
- shuffleRight = true
+ bucketedTableTestSpecLeft = bucketedTableTestSpecLeft,
+ bucketedTableTestSpecRight = bucketedTableTestSpecRight,
+ joinCondition = joinCondition(Seq("j"))
)
}
test("shuffle when join 2 bucketed tables with bucketing disabled") {
val bucketSpec = Some(BucketSpec(8, Seq("i", "j"), Nil))
+ val bucketedTableTestSpecLeft = BucketedTableTestSpec(bucketSpec, expectedShuffle = true)
+ val bucketedTableTestSpecRight = BucketedTableTestSpec(bucketSpec, expectedShuffle = true)
withSQLConf(SQLConf.BUCKETING_ENABLED.key -> "false") {
testBucketing(
- bucketSpecLeft = bucketSpec,
- bucketSpecRight = bucketSpec,
- joinCondition = joinCondition(Seq("i", "j")),
- shuffleLeft = true,
- shuffleRight = true
+ bucketedTableTestSpecLeft = bucketedTableTestSpecLeft,
+ bucketedTableTestSpecRight = bucketedTableTestSpecRight,
+ joinCondition = joinCondition(Seq("i", "j"))
)
}
}
- test("avoid shuffle and sort when bucket and sort columns are join keys") {
+ test("check sort and shuffle when bucket and sort columns are join keys") {
+ // In case of bucketing, its possible to have multiple files belonging to the
+ // same bucket in a given relation. Each of these files are locally sorted
+ // but those files combined together are not globally sorted. Given that,
+ // the RDD partition will not be sorted even if the relation has sort columns set
+ // Therefore, we still need to keep the Sort in both sides.
val bucketSpec = Some(BucketSpec(8, Seq("i", "j"), Seq("i", "j")))
+
+ val bucketedTableTestSpecLeft1 = BucketedTableTestSpec(
+ bucketSpec, numPartitions = 50, expectedShuffle = false, expectedSort = true)
+ val bucketedTableTestSpecRight1 = BucketedTableTestSpec(
+ bucketSpec, numPartitions = 1, expectedShuffle = false, expectedSort = false)
testBucketing(
- bucketSpecLeft = bucketSpec,
- bucketSpecRight = bucketSpec,
- joinCondition = joinCondition(Seq("i", "j")),
- shuffleLeft = false,
- shuffleRight = false,
- sortLeft = false,
- sortRight = false
+ bucketedTableTestSpecLeft = bucketedTableTestSpecLeft1,
+ bucketedTableTestSpecRight = bucketedTableTestSpecRight1,
+ joinCondition = joinCondition(Seq("i", "j"))
+ )
+
+ val bucketedTableTestSpecLeft2 = BucketedTableTestSpec(
+ bucketSpec, numPartitions = 1, expectedShuffle = false, expectedSort = false)
+ val bucketedTableTestSpecRight2 = BucketedTableTestSpec(
+ bucketSpec, numPartitions = 50, expectedShuffle = false, expectedSort = true)
+ testBucketing(
+ bucketedTableTestSpecLeft = bucketedTableTestSpecLeft2,
+ bucketedTableTestSpecRight = bucketedTableTestSpecRight2,
+ joinCondition = joinCondition(Seq("i", "j"))
+ )
+
+ val bucketedTableTestSpecLeft3 = BucketedTableTestSpec(
+ bucketSpec, numPartitions = 50, expectedShuffle = false, expectedSort = true)
+ val bucketedTableTestSpecRight3 = BucketedTableTestSpec(
+ bucketSpec, numPartitions = 50, expectedShuffle = false, expectedSort = true)
+ testBucketing(
+ bucketedTableTestSpecLeft = bucketedTableTestSpecLeft3,
+ bucketedTableTestSpecRight = bucketedTableTestSpecRight3,
+ joinCondition = joinCondition(Seq("i", "j"))
+ )
+
+ val bucketedTableTestSpecLeft4 = BucketedTableTestSpec(
+ bucketSpec, numPartitions = 1, expectedShuffle = false, expectedSort = false)
+ val bucketedTableTestSpecRight4 = BucketedTableTestSpec(
+ bucketSpec, numPartitions = 1, expectedShuffle = false, expectedSort = false)
+ testBucketing(
+ bucketedTableTestSpecLeft = bucketedTableTestSpecLeft4,
+ bucketedTableTestSpecRight = bucketedTableTestSpecRight4,
+ joinCondition = joinCondition(Seq("i", "j"))
)
}
test("avoid shuffle and sort when sort columns are a super set of join keys") {
- val bucketSpec1 = Some(BucketSpec(8, Seq("i"), Seq("i", "j")))
- val bucketSpec2 = Some(BucketSpec(8, Seq("i"), Seq("i", "k")))
+ val bucketSpecLeft = Some(BucketSpec(8, Seq("i"), Seq("i", "j")))
+ val bucketSpecRight = Some(BucketSpec(8, Seq("i"), Seq("i", "k")))
+ val bucketedTableTestSpecLeft = BucketedTableTestSpec(
+ bucketSpecLeft, numPartitions = 1, expectedShuffle = false, expectedSort = false)
+ val bucketedTableTestSpecRight = BucketedTableTestSpec(
+ bucketSpecRight, numPartitions = 1, expectedShuffle = false, expectedSort = false)
testBucketing(
- bucketSpecLeft = bucketSpec1,
- bucketSpecRight = bucketSpec2,
- joinCondition = joinCondition(Seq("i")),
- shuffleLeft = false,
- shuffleRight = false,
- sortLeft = false,
- sortRight = false
+ bucketedTableTestSpecLeft = bucketedTableTestSpecLeft,
+ bucketedTableTestSpecRight = bucketedTableTestSpecRight,
+ joinCondition = joinCondition(Seq("i"))
)
}
test("only sort one side when sort columns are different") {
- val bucketSpec1 = Some(BucketSpec(8, Seq("i", "j"), Seq("i", "j")))
- val bucketSpec2 = Some(BucketSpec(8, Seq("i", "j"), Seq("k")))
+ val bucketSpecLeft = Some(BucketSpec(8, Seq("i", "j"), Seq("i", "j")))
+ val bucketSpecRight = Some(BucketSpec(8, Seq("i", "j"), Seq("k")))
+ val bucketedTableTestSpecLeft = BucketedTableTestSpec(
+ bucketSpecLeft, numPartitions = 1, expectedShuffle = false, expectedSort = false)
+ val bucketedTableTestSpecRight = BucketedTableTestSpec(
+ bucketSpecRight, numPartitions = 1, expectedShuffle = false, expectedSort = true)
testBucketing(
- bucketSpecLeft = bucketSpec1,
- bucketSpecRight = bucketSpec2,
- joinCondition = joinCondition(Seq("i", "j")),
- shuffleLeft = false,
- shuffleRight = false,
- sortLeft = false,
- sortRight = true
+ bucketedTableTestSpecLeft = bucketedTableTestSpecLeft,
+ bucketedTableTestSpecRight = bucketedTableTestSpecRight,
+ joinCondition = joinCondition(Seq("i", "j"))
)
}
test("only sort one side when sort columns are same but their ordering is different") {
- val bucketSpec1 = Some(BucketSpec(8, Seq("i", "j"), Seq("i", "j")))
- val bucketSpec2 = Some(BucketSpec(8, Seq("i", "j"), Seq("j", "i")))
+ val bucketSpecLeft = Some(BucketSpec(8, Seq("i", "j"), Seq("i", "j")))
+ val bucketSpecRight = Some(BucketSpec(8, Seq("i", "j"), Seq("j", "i")))
+ val bucketedTableTestSpecLeft = BucketedTableTestSpec(
+ bucketSpecLeft, numPartitions = 1, expectedShuffle = false, expectedSort = false)
+ val bucketedTableTestSpecRight = BucketedTableTestSpec(
+ bucketSpecRight, numPartitions = 1, expectedShuffle = false, expectedSort = true)
testBucketing(
- bucketSpecLeft = bucketSpec1,
- bucketSpecRight = bucketSpec2,
- joinCondition = joinCondition(Seq("i", "j")),
- shuffleLeft = false,
- shuffleRight = false,
- sortLeft = false,
- sortRight = true
+ bucketedTableTestSpecLeft = bucketedTableTestSpecLeft,
+ bucketedTableTestSpecRight = bucketedTableTestSpecRight,
+ joinCondition = joinCondition(Seq("i", "j"))
)
}
@@ -470,20 +515,20 @@ class BucketedReadSuite extends QueryTest with SQLTestUtils with TestHiveSinglet
test("SPARK-17698 Join predicates should not contain filter clauses") {
val bucketSpec = Some(BucketSpec(8, Seq("i"), Seq("i")))
+ val bucketedTableTestSpecLeft = BucketedTableTestSpec(
+ bucketSpec, numPartitions = 1, expectedShuffle = false, expectedSort = false)
+ val bucketedTableTestSpecRight = BucketedTableTestSpec(
+ bucketSpec, numPartitions = 1, expectedShuffle = false, expectedSort = false)
testBucketing(
- bucketSpecLeft = bucketSpec,
- bucketSpecRight = bucketSpec,
+ bucketedTableTestSpecLeft = bucketedTableTestSpecLeft,
+ bucketedTableTestSpecRight = bucketedTableTestSpecRight,
joinType = "fullouter",
joinCondition = (left: DataFrame, right: DataFrame) => {
val joinPredicates = Seq("i").map(col => left(col) === right(col)).reduce(_ && _)
val filterLeft = left("i") === Literal("1")
val filterRight = right("i") === Literal("1")
joinPredicates && filterLeft && filterRight
- },
- shuffleLeft = false,
- shuffleRight = false,
- sortLeft = false,
- sortRight = false
+ }
)
}