diff options
Diffstat (limited to 'sql/hive')
-rw-r--r-- | sql/hive/src/test/scala/org/apache/spark/sql/sources/BucketedReadSuite.scala | 124 |
1 files changed, 101 insertions, 23 deletions
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/sources/BucketedReadSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/sources/BucketedReadSuite.scala index 3ff85176de..9ed454e578 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/sources/BucketedReadSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/sources/BucketedReadSuite.scala @@ -235,7 +235,8 @@ class BucketedReadSuite extends QueryTest with SQLTestUtils with TestHiveSinglet private def testBucketing( bucketSpecLeft: Option[BucketSpec], bucketSpecRight: Option[BucketSpec], - joinColumns: Seq[String], + joinType: String = "inner", + joinCondition: (DataFrame, DataFrame) => Column, shuffleLeft: Boolean, shuffleRight: Boolean, sortLeft: Boolean = true, @@ -268,12 +269,12 @@ class BucketedReadSuite extends QueryTest with SQLTestUtils with TestHiveSinglet SQLConf.WHOLESTAGE_CODEGEN_ENABLED.key -> "false") { val t1 = spark.table("bucketed_table1") val t2 = spark.table("bucketed_table2") - val joined = t1.join(t2, joinCondition(t1, t2, joinColumns)) + val joined = t1.join(t2, joinCondition(t1, t2), joinType) // First check the result is corrected. checkAnswer( joined.sort("bucketed_table1.k", "bucketed_table2.k"), - df1.join(df2, joinCondition(df1, df2, joinColumns)).sort("df1.k", "df2.k")) + df1.join(df2, joinCondition(df1, df2), joinType).sort("df1.k", "df2.k")) assert(joined.queryExecution.executedPlan.isInstanceOf[SortMergeJoinExec]) val joinOperator = joined.queryExecution.executedPlan.asInstanceOf[SortMergeJoinExec] @@ -297,56 +298,102 @@ class BucketedReadSuite extends QueryTest with SQLTestUtils with TestHiveSinglet } } - private def joinCondition(left: DataFrame, right: DataFrame, joinCols: Seq[String]): Column = { + private def joinCondition(joinCols: Seq[String]) (left: DataFrame, right: DataFrame): Column = { joinCols.map(col => left(col) === right(col)).reduce(_ && _) } test("avoid shuffle when join 2 bucketed tables") { val bucketSpec = Some(BucketSpec(8, Seq("i", "j"), Nil)) - testBucketing(bucketSpec, bucketSpec, Seq("i", "j"), shuffleLeft = false, shuffleRight = false) + testBucketing( + bucketSpecLeft = bucketSpec, + bucketSpecRight = bucketSpec, + joinCondition = joinCondition(Seq("i", "j")), + shuffleLeft = false, + shuffleRight = false + ) } // Enable it after fix https://issues.apache.org/jira/browse/SPARK-12704 ignore("avoid shuffle when join keys are a super-set of bucket keys") { val bucketSpec = Some(BucketSpec(8, Seq("i"), Nil)) - testBucketing(bucketSpec, bucketSpec, Seq("i", "j"), shuffleLeft = false, shuffleRight = false) + testBucketing( + bucketSpecLeft = bucketSpec, + bucketSpecRight = bucketSpec, + joinCondition = joinCondition(Seq("i", "j")), + shuffleLeft = false, + shuffleRight = false + ) } test("only shuffle one side when join bucketed table and non-bucketed table") { val bucketSpec = Some(BucketSpec(8, Seq("i", "j"), Nil)) - testBucketing(bucketSpec, None, Seq("i", "j"), shuffleLeft = false, shuffleRight = true) + testBucketing( + bucketSpecLeft = bucketSpec, + bucketSpecRight = None, + joinCondition = joinCondition(Seq("i", "j")), + shuffleLeft = false, + shuffleRight = true + ) } test("only shuffle one side when 2 bucketed tables have different bucket number") { val bucketSpec1 = Some(BucketSpec(8, Seq("i", "j"), Nil)) val bucketSpec2 = Some(BucketSpec(5, Seq("i", "j"), Nil)) - testBucketing(bucketSpec1, bucketSpec2, Seq("i", "j"), shuffleLeft = false, shuffleRight = true) + testBucketing( + bucketSpecLeft = bucketSpec1, + bucketSpecRight = bucketSpec2, + joinCondition = joinCondition(Seq("i", "j")), + shuffleLeft = false, + shuffleRight = true + ) } test("only shuffle one side when 2 bucketed tables have different bucket keys") { val bucketSpec1 = Some(BucketSpec(8, Seq("i"), Nil)) val bucketSpec2 = Some(BucketSpec(8, Seq("j"), Nil)) - testBucketing(bucketSpec1, bucketSpec2, Seq("i"), shuffleLeft = false, shuffleRight = true) + testBucketing( + bucketSpecLeft = bucketSpec1, + bucketSpecRight = bucketSpec2, + joinCondition = joinCondition(Seq("i")), + shuffleLeft = false, + shuffleRight = true + ) } test("shuffle when join keys are not equal to bucket keys") { val bucketSpec = Some(BucketSpec(8, Seq("i"), Nil)) - testBucketing(bucketSpec, bucketSpec, Seq("j"), shuffleLeft = true, shuffleRight = true) + testBucketing( + bucketSpecLeft = bucketSpec, + bucketSpecRight = bucketSpec, + joinCondition = joinCondition(Seq("j")), + shuffleLeft = true, + shuffleRight = true + ) } test("shuffle when join 2 bucketed tables with bucketing disabled") { val bucketSpec = Some(BucketSpec(8, Seq("i", "j"), Nil)) withSQLConf(SQLConf.BUCKETING_ENABLED.key -> "false") { - testBucketing(bucketSpec, bucketSpec, Seq("i", "j"), shuffleLeft = true, shuffleRight = true) + testBucketing( + bucketSpecLeft = bucketSpec, + bucketSpecRight = bucketSpec, + joinCondition = joinCondition(Seq("i", "j")), + shuffleLeft = true, + shuffleRight = true + ) } } test("avoid shuffle and sort when bucket and sort columns are join keys") { val bucketSpec = Some(BucketSpec(8, Seq("i", "j"), Seq("i", "j"))) testBucketing( - bucketSpec, bucketSpec, Seq("i", "j"), - shuffleLeft = false, shuffleRight = false, - sortLeft = false, sortRight = false + bucketSpecLeft = bucketSpec, + bucketSpecRight = bucketSpec, + joinCondition = joinCondition(Seq("i", "j")), + shuffleLeft = false, + shuffleRight = false, + sortLeft = false, + sortRight = false ) } @@ -354,9 +401,13 @@ class BucketedReadSuite extends QueryTest with SQLTestUtils with TestHiveSinglet val bucketSpec1 = Some(BucketSpec(8, Seq("i"), Seq("i", "j"))) val bucketSpec2 = Some(BucketSpec(8, Seq("i"), Seq("i", "k"))) testBucketing( - bucketSpec1, bucketSpec2, Seq("i"), - shuffleLeft = false, shuffleRight = false, - sortLeft = false, sortRight = false + bucketSpecLeft = bucketSpec1, + bucketSpecRight = bucketSpec2, + joinCondition = joinCondition(Seq("i")), + shuffleLeft = false, + shuffleRight = false, + sortLeft = false, + sortRight = false ) } @@ -364,9 +415,13 @@ class BucketedReadSuite extends QueryTest with SQLTestUtils with TestHiveSinglet val bucketSpec1 = Some(BucketSpec(8, Seq("i", "j"), Seq("i", "j"))) val bucketSpec2 = Some(BucketSpec(8, Seq("i", "j"), Seq("k"))) testBucketing( - bucketSpec1, bucketSpec2, Seq("i", "j"), - shuffleLeft = false, shuffleRight = false, - sortLeft = false, sortRight = true + bucketSpecLeft = bucketSpec1, + bucketSpecRight = bucketSpec2, + joinCondition = joinCondition(Seq("i", "j")), + shuffleLeft = false, + shuffleRight = false, + sortLeft = false, + sortRight = true ) } @@ -374,9 +429,13 @@ class BucketedReadSuite extends QueryTest with SQLTestUtils with TestHiveSinglet val bucketSpec1 = Some(BucketSpec(8, Seq("i", "j"), Seq("i", "j"))) val bucketSpec2 = Some(BucketSpec(8, Seq("i", "j"), Seq("j", "i"))) testBucketing( - bucketSpec1, bucketSpec2, Seq("i", "j"), - shuffleLeft = false, shuffleRight = false, - sortLeft = false, sortRight = true + bucketSpecLeft = bucketSpec1, + bucketSpecRight = bucketSpec2, + joinCondition = joinCondition(Seq("i", "j")), + shuffleLeft = false, + shuffleRight = false, + sortLeft = false, + sortRight = true ) } @@ -408,6 +467,25 @@ class BucketedReadSuite extends QueryTest with SQLTestUtils with TestHiveSinglet } } + test("SPARK-17698 Join predicates should not contain filter clauses") { + val bucketSpec = Some(BucketSpec(8, Seq("i"), Seq("i"))) + testBucketing( + bucketSpecLeft = bucketSpec, + bucketSpecRight = bucketSpec, + joinType = "fullouter", + joinCondition = (left: DataFrame, right: DataFrame) => { + val joinPredicates = Seq("i").map(col => left(col) === right(col)).reduce(_ && _) + val filterLeft = left("i") === Literal("1") + val filterRight = right("i") === Literal("1") + joinPredicates && filterLeft && filterRight + }, + shuffleLeft = false, + shuffleRight = false, + sortLeft = false, + sortRight = false + ) + } + test("error if there exists any malformed bucket files") { withTable("bucketed_table") { df1.write.format("parquet").bucketBy(8, "i").saveAsTable("bucketed_table") |