From 95faa731c15ce2e36373071a405207165818df97 Mon Sep 17 00:00:00 2001 From: Reynold Xin Date: Fri, 22 Apr 2016 22:50:32 -0700 Subject: [SPARK-14866][SQL] Break SQLQuerySuite out into smaller test suites ## What changes were proposed in this pull request? This patch breaks SQLQuerySuite out into smaller test suites. It was a little bit too large for debugging. ## How was this patch tested? This is a test only change. Author: Reynold Xin Closes #12630 from rxin/SPARK-14866. --- .../spark/sql/catalyst/analysis/Analyzer.scala | 6 +- .../spark/sql/hive/execution/SQLQuerySuite.scala | 509 --------------------- .../spark/sql/hive/execution/SQLViewSuite.scala | 199 ++++++++ .../hive/execution/SQLWindowFunctionSuite.scala | 370 +++++++++++++++ 4 files changed, 572 insertions(+), 512 deletions(-) create mode 100644 sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLViewSuite.scala create mode 100644 sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLWindowFunctionSuite.scala diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala index 182e459d8f..24558d5b8c 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala @@ -407,7 +407,7 @@ class Analyzer( * Replaces [[UnresolvedRelation]]s with concrete relations from the catalog. */ object ResolveRelations extends Rule[LogicalPlan] { - private def getTable(u: UnresolvedRelation): LogicalPlan = { + private def lookupTableFromCatalog(u: UnresolvedRelation): LogicalPlan = { try { catalog.lookupRelation(u.tableIdentifier, u.alias) } catch { @@ -418,10 +418,10 @@ class Analyzer( def apply(plan: LogicalPlan): LogicalPlan = plan resolveOperators { case i @ InsertIntoTable(u: UnresolvedRelation, _, _, _, _) => - i.copy(table = EliminateSubqueryAliases(getTable(u))) + i.copy(table = EliminateSubqueryAliases(lookupTableFromCatalog(u))) case u: UnresolvedRelation => try { - getTable(u) + lookupTableFromCatalog(u) } catch { case _: AnalysisException if u.tableIdentifier.database.isDefined => // delay the exception into CheckAnalysis, then it could be resolved as data source. diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala index 80d54f0960..0ecc9807b2 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala @@ -49,11 +49,6 @@ case class Order( state: String, month: Int) -case class WindowData( - month: Int, - area: String, - product: Int) - /** * A collection of hive query tests where we generate the answers ourselves instead of depending on * Hive to generate them (in contrast to HiveQuerySuite). Often this is because the query is @@ -812,197 +807,6 @@ class SQLQuerySuite extends QueryTest with SQLTestUtils with TestHiveSingleton { """.stripMargin), (2 to 6).map(i => Row(i))) } - test("window function: udaf with aggregate expression") { - val data = Seq( - WindowData(1, "a", 5), - WindowData(2, "a", 6), - WindowData(3, "b", 7), - WindowData(4, "b", 8), - WindowData(5, "c", 9), - WindowData(6, "c", 10) - ) - sparkContext.parallelize(data).toDF().registerTempTable("windowData") - - checkAnswer( - sql( - """ - |select area, sum(product), sum(sum(product)) over (partition by area) - |from windowData group by month, area - """.stripMargin), - Seq( - ("a", 5, 11), - ("a", 6, 11), - ("b", 7, 15), - ("b", 8, 15), - ("c", 9, 19), - ("c", 10, 19) - ).map(i => Row(i._1, i._2, i._3))) - - checkAnswer( - sql( - """ - |select area, sum(product) - 1, sum(sum(product)) over (partition by area) - |from windowData group by month, area - """.stripMargin), - Seq( - ("a", 4, 11), - ("a", 5, 11), - ("b", 6, 15), - ("b", 7, 15), - ("c", 8, 19), - ("c", 9, 19) - ).map(i => Row(i._1, i._2, i._3))) - - checkAnswer( - sql( - """ - |select area, sum(product), sum(product) / sum(sum(product)) over (partition by area) - |from windowData group by month, area - """.stripMargin), - Seq( - ("a", 5, 5d/11), - ("a", 6, 6d/11), - ("b", 7, 7d/15), - ("b", 8, 8d/15), - ("c", 10, 10d/19), - ("c", 9, 9d/19) - ).map(i => Row(i._1, i._2, i._3))) - - checkAnswer( - sql( - """ - |select area, sum(product), sum(product) / sum(sum(product) - 1) over (partition by area) - |from windowData group by month, area - """.stripMargin), - Seq( - ("a", 5, 5d/9), - ("a", 6, 6d/9), - ("b", 7, 7d/13), - ("b", 8, 8d/13), - ("c", 10, 10d/17), - ("c", 9, 9d/17) - ).map(i => Row(i._1, i._2, i._3))) - } - - test("window function: refer column in inner select block") { - val data = Seq( - WindowData(1, "a", 5), - WindowData(2, "a", 6), - WindowData(3, "b", 7), - WindowData(4, "b", 8), - WindowData(5, "c", 9), - WindowData(6, "c", 10) - ) - sparkContext.parallelize(data).toDF().registerTempTable("windowData") - - checkAnswer( - sql( - """ - |select area, rank() over (partition by area order by tmp.month) + tmp.tmp1 as c1 - |from (select month, area, product, 1 as tmp1 from windowData) tmp - """.stripMargin), - Seq( - ("a", 2), - ("a", 3), - ("b", 2), - ("b", 3), - ("c", 2), - ("c", 3) - ).map(i => Row(i._1, i._2))) - } - - test("window function: partition and order expressions") { - val data = Seq( - WindowData(1, "a", 5), - WindowData(2, "a", 6), - WindowData(3, "b", 7), - WindowData(4, "b", 8), - WindowData(5, "c", 9), - WindowData(6, "c", 10) - ) - sparkContext.parallelize(data).toDF().registerTempTable("windowData") - - checkAnswer( - sql( - """ - |select month, area, product, sum(product + 1) over (partition by 1 order by 2) - |from windowData - """.stripMargin), - Seq( - (1, "a", 5, 51), - (2, "a", 6, 51), - (3, "b", 7, 51), - (4, "b", 8, 51), - (5, "c", 9, 51), - (6, "c", 10, 51) - ).map(i => Row(i._1, i._2, i._3, i._4))) - - checkAnswer( - sql( - """ - |select month, area, product, sum(product) - |over (partition by month % 2 order by 10 - product) - |from windowData - """.stripMargin), - Seq( - (1, "a", 5, 21), - (2, "a", 6, 24), - (3, "b", 7, 16), - (4, "b", 8, 18), - (5, "c", 9, 9), - (6, "c", 10, 10) - ).map(i => Row(i._1, i._2, i._3, i._4))) - } - - test("window function: distinct should not be silently ignored") { - val data = Seq( - WindowData(1, "a", 5), - WindowData(2, "a", 6), - WindowData(3, "b", 7), - WindowData(4, "b", 8), - WindowData(5, "c", 9), - WindowData(6, "c", 10) - ) - sparkContext.parallelize(data).toDF().registerTempTable("windowData") - - val e = intercept[AnalysisException] { - sql( - """ - |select month, area, product, sum(distinct product + 1) over (partition by 1 order by 2) - |from windowData - """.stripMargin) - } - assert(e.getMessage.contains("Distinct window functions are not supported")) - } - - test("window function: expressions in arguments of a window functions") { - val data = Seq( - WindowData(1, "a", 5), - WindowData(2, "a", 6), - WindowData(3, "b", 7), - WindowData(4, "b", 8), - WindowData(5, "c", 9), - WindowData(6, "c", 10) - ) - sparkContext.parallelize(data).toDF().registerTempTable("windowData") - - checkAnswer( - sql( - """ - |select month, area, month % 2, - |lag(product, 1 + 1, product) over (partition by month % 2 order by area) - |from windowData - """.stripMargin), - Seq( - (1, "a", 1, 5), - (2, "a", 0, 6), - (3, "b", 1, 7), - (4, "b", 0, 8), - (5, "c", 1, 5), - (6, "c", 0, 6) - ).map(i => Row(i._1, i._2, i._3, i._4))) - } - test("Sorting columns are not in Generate") { withTempTable("data") { sqlContext.range(1, 5) @@ -1030,139 +834,6 @@ class SQLQuerySuite extends QueryTest with SQLTestUtils with TestHiveSingleton { } } - test("window function: Sorting columns are not in Project") { - val data = Seq( - WindowData(1, "d", 10), - WindowData(2, "a", 6), - WindowData(3, "b", 7), - WindowData(4, "b", 8), - WindowData(5, "c", 9), - WindowData(6, "c", 11) - ) - sparkContext.parallelize(data).toDF().registerTempTable("windowData") - - checkAnswer( - sql("select month, product, sum(product + 1) over() from windowData order by area"), - Seq( - (2, 6, 57), - (3, 7, 57), - (4, 8, 57), - (5, 9, 57), - (6, 11, 57), - (1, 10, 57) - ).map(i => Row(i._1, i._2, i._3))) - - checkAnswer( - sql( - """ - |select area, rank() over (partition by area order by tmp.month) + tmp.tmp1 as c1 - |from (select month, area, product as p, 1 as tmp1 from windowData) tmp order by p - """.stripMargin), - Seq( - ("a", 2), - ("b", 2), - ("b", 3), - ("c", 2), - ("d", 2), - ("c", 3) - ).map(i => Row(i._1, i._2))) - - checkAnswer( - sql( - """ - |select area, rank() over (partition by area order by month) as c1 - |from windowData group by product, area, month order by product, area - """.stripMargin), - Seq( - ("a", 1), - ("b", 1), - ("b", 2), - ("c", 1), - ("d", 1), - ("c", 2) - ).map(i => Row(i._1, i._2))) - - checkAnswer( - sql( - """ - |select area, sum(product) / sum(sum(product)) over (partition by area) as c1 - |from windowData group by area, month order by month, c1 - """.stripMargin), - Seq( - ("d", 1.0), - ("a", 1.0), - ("b", 0.4666666666666667), - ("b", 0.5333333333333333), - ("c", 0.45), - ("c", 0.55) - ).map(i => Row(i._1, i._2))) - } - - // todo: fix this test case by reimplementing the function ResolveAggregateFunctions - ignore("window function: Pushing aggregate Expressions in Sort to Aggregate") { - val data = Seq( - WindowData(1, "d", 10), - WindowData(2, "a", 6), - WindowData(3, "b", 7), - WindowData(4, "b", 8), - WindowData(5, "c", 9), - WindowData(6, "c", 11) - ) - sparkContext.parallelize(data).toDF().registerTempTable("windowData") - - checkAnswer( - sql( - """ - |select area, sum(product) over () as c from windowData - |where product > 3 group by area, product - |having avg(month) > 0 order by avg(month), product - """.stripMargin), - Seq( - ("a", 51), - ("b", 51), - ("b", 51), - ("c", 51), - ("c", 51), - ("d", 51) - ).map(i => Row(i._1, i._2))) - } - - test("window function: multiple window expressions in a single expression") { - val nums = sparkContext.parallelize(1 to 10).map(x => (x, x % 2)).toDF("x", "y") - nums.registerTempTable("nums") - - val expected = - Row(1, 1, 1, 55, 1, 57) :: - Row(0, 2, 3, 55, 2, 60) :: - Row(1, 3, 6, 55, 4, 65) :: - Row(0, 4, 10, 55, 6, 71) :: - Row(1, 5, 15, 55, 9, 79) :: - Row(0, 6, 21, 55, 12, 88) :: - Row(1, 7, 28, 55, 16, 99) :: - Row(0, 8, 36, 55, 20, 111) :: - Row(1, 9, 45, 55, 25, 125) :: - Row(0, 10, 55, 55, 30, 140) :: Nil - - val actual = sql( - """ - |SELECT - | y, - | x, - | sum(x) OVER w1 AS running_sum, - | sum(x) OVER w2 AS total_sum, - | sum(x) OVER w3 AS running_sum_per_y, - | ((sum(x) OVER w1) + (sum(x) OVER w2) + (sum(x) OVER w3)) as combined2 - |FROM nums - |WINDOW w1 AS (ORDER BY x ROWS BETWEEN UnBOUNDED PRECEDiNG AND CuRRENT RoW), - | w2 AS (ORDER BY x ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOuNDED FoLLOWING), - | w3 AS (PARTITION BY y ORDER BY x ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW) - """.stripMargin) - - checkAnswer(actual, expected) - - dropTempTable("nums") - } - test("test case key when") { (1 to 5).map(i => (i, i.toString)).toDF("k", "v").registerTempTable("t") checkAnswer( @@ -1170,18 +841,6 @@ class SQLQuerySuite extends QueryTest with SQLTestUtils with TestHiveSingleton { Row(0, "1") :: Row(22, "2") :: Row(0, "3") :: Row(44, "4") :: Row(0, "5") :: Nil) } - test("SPARK-7595: Window will cause resolve failed with self join") { - sql("SELECT * FROM src") // Force loading of src table. - - checkAnswer(sql( - """ - |with - | v1 as (select key, count(value) over (partition by key) cnt_val from src), - | v2 as (select v1.key, v1_lag.cnt_val from v1, v1 v1_lag where v1.key = v1_lag.key) - | select * from v2 order by key limit 1 - """.stripMargin), Row(0, 3)) - } - test("SPARK-7269 Check analysis failed in case in-sensitive") { Seq(1, 2, 3).map { i => (i.toString, i.toString) @@ -1485,174 +1144,6 @@ class SQLQuerySuite extends QueryTest with SQLTestUtils with TestHiveSingleton { }) } - test("correctly parse CREATE VIEW statement") { - withSQLConf(SQLConf.NATIVE_VIEW.key -> "true") { - withTable("jt") { - val df = (1 until 10).map(i => i -> i).toDF("i", "j") - df.write.format("json").saveAsTable("jt") - sql( - """CREATE VIEW IF NOT EXISTS - |default.testView (c1 COMMENT 'blabla', c2 COMMENT 'blabla') - |TBLPROPERTIES ('a' = 'b') - |AS SELECT * FROM jt""".stripMargin) - checkAnswer(sql("SELECT c1, c2 FROM testView ORDER BY c1"), (1 to 9).map(i => Row(i, i))) - sql("DROP VIEW testView") - } - } - } - - test("correctly handle CREATE VIEW IF NOT EXISTS") { - withSQLConf(SQLConf.NATIVE_VIEW.key -> "true") { - withTable("jt", "jt2") { - sqlContext.range(1, 10).write.format("json").saveAsTable("jt") - sql("CREATE VIEW testView AS SELECT id FROM jt") - - val df = (1 until 10).map(i => i -> i).toDF("i", "j") - df.write.format("json").saveAsTable("jt2") - sql("CREATE VIEW IF NOT EXISTS testView AS SELECT * FROM jt2") - - // make sure our view doesn't change. - checkAnswer(sql("SELECT * FROM testView ORDER BY id"), (1 to 9).map(i => Row(i))) - sql("DROP VIEW testView") - } - } - } - - Seq(true, false).foreach { enabled => - val prefix = (if (enabled) "With" else "Without") + " canonical native view: " - test(s"$prefix correctly handle CREATE OR REPLACE VIEW") { - withSQLConf( - SQLConf.NATIVE_VIEW.key -> "true", SQLConf.CANONICAL_NATIVE_VIEW.key -> enabled.toString) { - withTable("jt", "jt2") { - sqlContext.range(1, 10).write.format("json").saveAsTable("jt") - sql("CREATE OR REPLACE VIEW testView AS SELECT id FROM jt") - checkAnswer(sql("SELECT * FROM testView ORDER BY id"), (1 to 9).map(i => Row(i))) - - val df = (1 until 10).map(i => i -> i).toDF("i", "j") - df.write.format("json").saveAsTable("jt2") - sql("CREATE OR REPLACE VIEW testView AS SELECT * FROM jt2") - // make sure the view has been changed. - checkAnswer(sql("SELECT * FROM testView ORDER BY i"), (1 to 9).map(i => Row(i, i))) - - sql("DROP VIEW testView") - - val e = intercept[AnalysisException] { - sql("CREATE OR REPLACE VIEW IF NOT EXISTS testView AS SELECT id FROM jt") - } - assert(e.message.contains("not allowed to define a view")) - } - } - } - - test(s"$prefix correctly handle ALTER VIEW") { - withSQLConf( - SQLConf.NATIVE_VIEW.key -> "true", SQLConf.CANONICAL_NATIVE_VIEW.key -> enabled.toString) { - withTable("jt", "jt2") { - withView("testView") { - sqlContext.range(1, 10).write.format("json").saveAsTable("jt") - sql("CREATE VIEW testView AS SELECT id FROM jt") - - val df = (1 until 10).map(i => i -> i).toDF("i", "j") - df.write.format("json").saveAsTable("jt2") - sql("ALTER VIEW testView AS SELECT * FROM jt2") - // make sure the view has been changed. - checkAnswer(sql("SELECT * FROM testView ORDER BY i"), (1 to 9).map(i => Row(i, i))) - } - } - } - } - - test(s"$prefix create hive view for json table") { - // json table is not hive-compatible, make sure the new flag fix it. - withSQLConf( - SQLConf.NATIVE_VIEW.key -> "true", SQLConf.CANONICAL_NATIVE_VIEW.key -> enabled.toString) { - withTable("jt") { - withView("testView") { - sqlContext.range(1, 10).write.format("json").saveAsTable("jt") - sql("CREATE VIEW testView AS SELECT id FROM jt") - checkAnswer(sql("SELECT * FROM testView ORDER BY id"), (1 to 9).map(i => Row(i))) - } - } - } - } - - test(s"$prefix create hive view for partitioned parquet table") { - // partitioned parquet table is not hive-compatible, make sure the new flag fix it. - withSQLConf( - SQLConf.NATIVE_VIEW.key -> "true", SQLConf.CANONICAL_NATIVE_VIEW.key -> enabled.toString) { - withTable("parTable") { - withView("testView") { - val df = Seq(1 -> "a").toDF("i", "j") - df.write.format("parquet").partitionBy("i").saveAsTable("parTable") - sql("CREATE VIEW testView AS SELECT i, j FROM parTable") - checkAnswer(sql("SELECT * FROM testView"), Row(1, "a")) - } - } - } - } - } - - test("CTE within view") { - withSQLConf( - SQLConf.NATIVE_VIEW.key -> "true", SQLConf.CANONICAL_NATIVE_VIEW.key -> "true") { - withView("cte_view") { - sql("CREATE VIEW cte_view AS WITH w AS (SELECT 1 AS n) SELECT n FROM w") - checkAnswer(sql("SELECT * FROM cte_view"), Row(1)) - } - } - } - - test("Using view after switching current database") { - withSQLConf( - SQLConf.NATIVE_VIEW.key -> "true", SQLConf.CANONICAL_NATIVE_VIEW.key -> "true") { - withView("v") { - sql("CREATE VIEW v AS SELECT * FROM src") - withTempDatabase { db => - activateDatabase(db) { - // Should look up table `src` in database `default`. - checkAnswer(sql("SELECT * FROM default.v"), sql("SELECT * FROM default.src")) - - // The new `src` table shouldn't be scanned. - sql("CREATE TABLE src(key INT, value STRING)") - checkAnswer(sql("SELECT * FROM default.v"), sql("SELECT * FROM default.src")) - } - } - } - } - } - - test("Using view after adding more columns") { - withSQLConf( - SQLConf.NATIVE_VIEW.key -> "true", SQLConf.CANONICAL_NATIVE_VIEW.key -> "true") { - withTable("add_col") { - sqlContext.range(10).write.saveAsTable("add_col") - withView("v") { - sql("CREATE VIEW v AS SELECT * FROM add_col") - sqlContext.range(10).select('id, 'id as 'a).write.mode("overwrite").saveAsTable("add_col") - checkAnswer(sql("SELECT * FROM v"), sqlContext.range(10).toDF()) - } - } - } - } - - test("create hive view for joined tables") { - // make sure the new flag can handle some complex cases like join and schema change. - withSQLConf(SQLConf.NATIVE_VIEW.key -> "true") { - withTable("jt1", "jt2") { - sqlContext.range(1, 10).toDF("id1").write.format("json").saveAsTable("jt1") - sqlContext.range(1, 10).toDF("id2").write.format("json").saveAsTable("jt2") - sql("CREATE VIEW testView AS SELECT * FROM jt1 JOIN jt2 ON id1 == id2") - checkAnswer(sql("SELECT * FROM testView ORDER BY id1"), (1 to 9).map(i => Row(i, i))) - - val df = (1 until 10).map(i => i -> i).toDF("id1", "newCol") - df.write.format("json").mode(SaveMode.Overwrite).saveAsTable("jt1") - checkAnswer(sql("SELECT * FROM testView ORDER BY id1"), (1 to 9).map(i => Row(i, i))) - - sql("DROP VIEW testView") - } - } - } - test("SPARK-8976 Wrong Result for Rollup #1") { checkAnswer(sql( "SELECT count(*) AS cnt, key % 5, grouping_id() FROM src GROUP BY key%5 WITH ROLLUP"), diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLViewSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLViewSuite.scala new file mode 100644 index 0000000000..cdd5cb31d9 --- /dev/null +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLViewSuite.scala @@ -0,0 +1,199 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.hive.execution + +import org.apache.spark.sql.{AnalysisException, QueryTest, Row, SaveMode} +import org.apache.spark.sql.hive.test.TestHiveSingleton +import org.apache.spark.sql.internal.SQLConf +import org.apache.spark.sql.test.SQLTestUtils + +/** + * A suite for testing view related functionality. + */ +class SQLViewSuite extends QueryTest with SQLTestUtils with TestHiveSingleton { + import hiveContext.implicits._ + + test("correctly parse CREATE VIEW statement") { + withSQLConf(SQLConf.NATIVE_VIEW.key -> "true") { + withTable("jt") { + val df = (1 until 10).map(i => i -> i).toDF("i", "j") + df.write.format("json").saveAsTable("jt") + sql( + """CREATE VIEW IF NOT EXISTS + |default.testView (c1 COMMENT 'blabla', c2 COMMENT 'blabla') + |TBLPROPERTIES ('a' = 'b') + |AS SELECT * FROM jt""".stripMargin) + checkAnswer(sql("SELECT c1, c2 FROM testView ORDER BY c1"), (1 to 9).map(i => Row(i, i))) + sql("DROP VIEW testView") + } + } + } + + test("correctly handle CREATE VIEW IF NOT EXISTS") { + withSQLConf(SQLConf.NATIVE_VIEW.key -> "true") { + withTable("jt", "jt2") { + sqlContext.range(1, 10).write.format("json").saveAsTable("jt") + sql("CREATE VIEW testView AS SELECT id FROM jt") + + val df = (1 until 10).map(i => i -> i).toDF("i", "j") + df.write.format("json").saveAsTable("jt2") + sql("CREATE VIEW IF NOT EXISTS testView AS SELECT * FROM jt2") + + // make sure our view doesn't change. + checkAnswer(sql("SELECT * FROM testView ORDER BY id"), (1 to 9).map(i => Row(i))) + sql("DROP VIEW testView") + } + } + } + + Seq(true, false).foreach { enabled => + val prefix = (if (enabled) "With" else "Without") + " canonical native view: " + test(s"$prefix correctly handle CREATE OR REPLACE VIEW") { + withSQLConf( + SQLConf.NATIVE_VIEW.key -> "true", SQLConf.CANONICAL_NATIVE_VIEW.key -> enabled.toString) { + withTable("jt", "jt2") { + sqlContext.range(1, 10).write.format("json").saveAsTable("jt") + sql("CREATE OR REPLACE VIEW testView AS SELECT id FROM jt") + checkAnswer(sql("SELECT * FROM testView ORDER BY id"), (1 to 9).map(i => Row(i))) + + val df = (1 until 10).map(i => i -> i).toDF("i", "j") + df.write.format("json").saveAsTable("jt2") + sql("CREATE OR REPLACE VIEW testView AS SELECT * FROM jt2") + // make sure the view has been changed. + checkAnswer(sql("SELECT * FROM testView ORDER BY i"), (1 to 9).map(i => Row(i, i))) + + sql("DROP VIEW testView") + + val e = intercept[AnalysisException] { + sql("CREATE OR REPLACE VIEW IF NOT EXISTS testView AS SELECT id FROM jt") + } + assert(e.message.contains("not allowed to define a view")) + } + } + } + + test(s"$prefix correctly handle ALTER VIEW") { + withSQLConf( + SQLConf.NATIVE_VIEW.key -> "true", SQLConf.CANONICAL_NATIVE_VIEW.key -> enabled.toString) { + withTable("jt", "jt2") { + withView("testView") { + sqlContext.range(1, 10).write.format("json").saveAsTable("jt") + sql("CREATE VIEW testView AS SELECT id FROM jt") + + val df = (1 until 10).map(i => i -> i).toDF("i", "j") + df.write.format("json").saveAsTable("jt2") + sql("ALTER VIEW testView AS SELECT * FROM jt2") + // make sure the view has been changed. + checkAnswer(sql("SELECT * FROM testView ORDER BY i"), (1 to 9).map(i => Row(i, i))) + } + } + } + } + + test(s"$prefix create hive view for json table") { + // json table is not hive-compatible, make sure the new flag fix it. + withSQLConf( + SQLConf.NATIVE_VIEW.key -> "true", SQLConf.CANONICAL_NATIVE_VIEW.key -> enabled.toString) { + withTable("jt") { + withView("testView") { + sqlContext.range(1, 10).write.format("json").saveAsTable("jt") + sql("CREATE VIEW testView AS SELECT id FROM jt") + checkAnswer(sql("SELECT * FROM testView ORDER BY id"), (1 to 9).map(i => Row(i))) + } + } + } + } + + test(s"$prefix create hive view for partitioned parquet table") { + // partitioned parquet table is not hive-compatible, make sure the new flag fix it. + withSQLConf( + SQLConf.NATIVE_VIEW.key -> "true", SQLConf.CANONICAL_NATIVE_VIEW.key -> enabled.toString) { + withTable("parTable") { + withView("testView") { + val df = Seq(1 -> "a").toDF("i", "j") + df.write.format("parquet").partitionBy("i").saveAsTable("parTable") + sql("CREATE VIEW testView AS SELECT i, j FROM parTable") + checkAnswer(sql("SELECT * FROM testView"), Row(1, "a")) + } + } + } + } + } + + test("CTE within view") { + withSQLConf( + SQLConf.NATIVE_VIEW.key -> "true", SQLConf.CANONICAL_NATIVE_VIEW.key -> "true") { + withView("cte_view") { + sql("CREATE VIEW cte_view AS WITH w AS (SELECT 1 AS n) SELECT n FROM w") + checkAnswer(sql("SELECT * FROM cte_view"), Row(1)) + } + } + } + + test("Using view after switching current database") { + withSQLConf( + SQLConf.NATIVE_VIEW.key -> "true", SQLConf.CANONICAL_NATIVE_VIEW.key -> "true") { + withView("v") { + sql("CREATE VIEW v AS SELECT * FROM src") + withTempDatabase { db => + activateDatabase(db) { + // Should look up table `src` in database `default`. + checkAnswer(sql("SELECT * FROM default.v"), sql("SELECT * FROM default.src")) + + // The new `src` table shouldn't be scanned. + sql("CREATE TABLE src(key INT, value STRING)") + checkAnswer(sql("SELECT * FROM default.v"), sql("SELECT * FROM default.src")) + } + } + } + } + } + + test("Using view after adding more columns") { + withSQLConf( + SQLConf.NATIVE_VIEW.key -> "true", SQLConf.CANONICAL_NATIVE_VIEW.key -> "true") { + withTable("add_col") { + sqlContext.range(10).write.saveAsTable("add_col") + withView("v") { + sql("CREATE VIEW v AS SELECT * FROM add_col") + sqlContext.range(10).select('id, 'id as 'a).write.mode("overwrite").saveAsTable("add_col") + checkAnswer(sql("SELECT * FROM v"), sqlContext.range(10).toDF()) + } + } + } + } + + test("create hive view for joined tables") { + // make sure the new flag can handle some complex cases like join and schema change. + withSQLConf(SQLConf.NATIVE_VIEW.key -> "true") { + withTable("jt1", "jt2") { + sqlContext.range(1, 10).toDF("id1").write.format("json").saveAsTable("jt1") + sqlContext.range(1, 10).toDF("id2").write.format("json").saveAsTable("jt2") + sql("CREATE VIEW testView AS SELECT * FROM jt1 JOIN jt2 ON id1 == id2") + checkAnswer(sql("SELECT * FROM testView ORDER BY id1"), (1 to 9).map(i => Row(i, i))) + + val df = (1 until 10).map(i => i -> i).toDF("id1", "newCol") + df.write.format("json").mode(SaveMode.Overwrite).saveAsTable("jt1") + checkAnswer(sql("SELECT * FROM testView ORDER BY id1"), (1 to 9).map(i => Row(i, i))) + + sql("DROP VIEW testView") + } + } + } + +} diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLWindowFunctionSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLWindowFunctionSuite.scala new file mode 100644 index 0000000000..d0e7552c12 --- /dev/null +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLWindowFunctionSuite.scala @@ -0,0 +1,370 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.hive.execution + +import org.apache.spark.sql.{AnalysisException, QueryTest, Row} +import org.apache.spark.sql.hive.test.TestHiveSingleton +import org.apache.spark.sql.test.SQLTestUtils + + +case class WindowData(month: Int, area: String, product: Int) + + +/** + * Test suite for SQL window functions. + */ +class SQLWindowFunctionSuite extends QueryTest with SQLTestUtils with TestHiveSingleton { + import hiveContext.implicits._ + + test("window function: udaf with aggregate expression") { + val data = Seq( + WindowData(1, "a", 5), + WindowData(2, "a", 6), + WindowData(3, "b", 7), + WindowData(4, "b", 8), + WindowData(5, "c", 9), + WindowData(6, "c", 10) + ) + sparkContext.parallelize(data).toDF().registerTempTable("windowData") + + checkAnswer( + sql( + """ + |select area, sum(product), sum(sum(product)) over (partition by area) + |from windowData group by month, area + """.stripMargin), + Seq( + ("a", 5, 11), + ("a", 6, 11), + ("b", 7, 15), + ("b", 8, 15), + ("c", 9, 19), + ("c", 10, 19) + ).map(i => Row(i._1, i._2, i._3))) + + checkAnswer( + sql( + """ + |select area, sum(product) - 1, sum(sum(product)) over (partition by area) + |from windowData group by month, area + """.stripMargin), + Seq( + ("a", 4, 11), + ("a", 5, 11), + ("b", 6, 15), + ("b", 7, 15), + ("c", 8, 19), + ("c", 9, 19) + ).map(i => Row(i._1, i._2, i._3))) + + checkAnswer( + sql( + """ + |select area, sum(product), sum(product) / sum(sum(product)) over (partition by area) + |from windowData group by month, area + """.stripMargin), + Seq( + ("a", 5, 5d/11), + ("a", 6, 6d/11), + ("b", 7, 7d/15), + ("b", 8, 8d/15), + ("c", 10, 10d/19), + ("c", 9, 9d/19) + ).map(i => Row(i._1, i._2, i._3))) + + checkAnswer( + sql( + """ + |select area, sum(product), sum(product) / sum(sum(product) - 1) over (partition by area) + |from windowData group by month, area + """.stripMargin), + Seq( + ("a", 5, 5d/9), + ("a", 6, 6d/9), + ("b", 7, 7d/13), + ("b", 8, 8d/13), + ("c", 10, 10d/17), + ("c", 9, 9d/17) + ).map(i => Row(i._1, i._2, i._3))) + } + + test("window function: refer column in inner select block") { + val data = Seq( + WindowData(1, "a", 5), + WindowData(2, "a", 6), + WindowData(3, "b", 7), + WindowData(4, "b", 8), + WindowData(5, "c", 9), + WindowData(6, "c", 10) + ) + sparkContext.parallelize(data).toDF().registerTempTable("windowData") + + checkAnswer( + sql( + """ + |select area, rank() over (partition by area order by tmp.month) + tmp.tmp1 as c1 + |from (select month, area, product, 1 as tmp1 from windowData) tmp + """.stripMargin), + Seq( + ("a", 2), + ("a", 3), + ("b", 2), + ("b", 3), + ("c", 2), + ("c", 3) + ).map(i => Row(i._1, i._2))) + } + + test("window function: partition and order expressions") { + val data = Seq( + WindowData(1, "a", 5), + WindowData(2, "a", 6), + WindowData(3, "b", 7), + WindowData(4, "b", 8), + WindowData(5, "c", 9), + WindowData(6, "c", 10) + ) + sparkContext.parallelize(data).toDF().registerTempTable("windowData") + + checkAnswer( + sql( + """ + |select month, area, product, sum(product + 1) over (partition by 1 order by 2) + |from windowData + """.stripMargin), + Seq( + (1, "a", 5, 51), + (2, "a", 6, 51), + (3, "b", 7, 51), + (4, "b", 8, 51), + (5, "c", 9, 51), + (6, "c", 10, 51) + ).map(i => Row(i._1, i._2, i._3, i._4))) + + checkAnswer( + sql( + """ + |select month, area, product, sum(product) + |over (partition by month % 2 order by 10 - product) + |from windowData + """.stripMargin), + Seq( + (1, "a", 5, 21), + (2, "a", 6, 24), + (3, "b", 7, 16), + (4, "b", 8, 18), + (5, "c", 9, 9), + (6, "c", 10, 10) + ).map(i => Row(i._1, i._2, i._3, i._4))) + } + + test("window function: distinct should not be silently ignored") { + val data = Seq( + WindowData(1, "a", 5), + WindowData(2, "a", 6), + WindowData(3, "b", 7), + WindowData(4, "b", 8), + WindowData(5, "c", 9), + WindowData(6, "c", 10) + ) + sparkContext.parallelize(data).toDF().registerTempTable("windowData") + + val e = intercept[AnalysisException] { + sql( + """ + |select month, area, product, sum(distinct product + 1) over (partition by 1 order by 2) + |from windowData + """.stripMargin) + } + assert(e.getMessage.contains("Distinct window functions are not supported")) + } + + test("window function: expressions in arguments of a window functions") { + val data = Seq( + WindowData(1, "a", 5), + WindowData(2, "a", 6), + WindowData(3, "b", 7), + WindowData(4, "b", 8), + WindowData(5, "c", 9), + WindowData(6, "c", 10) + ) + sparkContext.parallelize(data).toDF().registerTempTable("windowData") + + checkAnswer( + sql( + """ + |select month, area, month % 2, + |lag(product, 1 + 1, product) over (partition by month % 2 order by area) + |from windowData + """.stripMargin), + Seq( + (1, "a", 1, 5), + (2, "a", 0, 6), + (3, "b", 1, 7), + (4, "b", 0, 8), + (5, "c", 1, 5), + (6, "c", 0, 6) + ).map(i => Row(i._1, i._2, i._3, i._4))) + } + + + test("window function: Sorting columns are not in Project") { + val data = Seq( + WindowData(1, "d", 10), + WindowData(2, "a", 6), + WindowData(3, "b", 7), + WindowData(4, "b", 8), + WindowData(5, "c", 9), + WindowData(6, "c", 11) + ) + sparkContext.parallelize(data).toDF().registerTempTable("windowData") + + checkAnswer( + sql("select month, product, sum(product + 1) over() from windowData order by area"), + Seq( + (2, 6, 57), + (3, 7, 57), + (4, 8, 57), + (5, 9, 57), + (6, 11, 57), + (1, 10, 57) + ).map(i => Row(i._1, i._2, i._3))) + + checkAnswer( + sql( + """ + |select area, rank() over (partition by area order by tmp.month) + tmp.tmp1 as c1 + |from (select month, area, product as p, 1 as tmp1 from windowData) tmp order by p + """.stripMargin), + Seq( + ("a", 2), + ("b", 2), + ("b", 3), + ("c", 2), + ("d", 2), + ("c", 3) + ).map(i => Row(i._1, i._2))) + + checkAnswer( + sql( + """ + |select area, rank() over (partition by area order by month) as c1 + |from windowData group by product, area, month order by product, area + """.stripMargin), + Seq( + ("a", 1), + ("b", 1), + ("b", 2), + ("c", 1), + ("d", 1), + ("c", 2) + ).map(i => Row(i._1, i._2))) + + checkAnswer( + sql( + """ + |select area, sum(product) / sum(sum(product)) over (partition by area) as c1 + |from windowData group by area, month order by month, c1 + """.stripMargin), + Seq( + ("d", 1.0), + ("a", 1.0), + ("b", 0.4666666666666667), + ("b", 0.5333333333333333), + ("c", 0.45), + ("c", 0.55) + ).map(i => Row(i._1, i._2))) + } + + // todo: fix this test case by reimplementing the function ResolveAggregateFunctions + ignore("window function: Pushing aggregate Expressions in Sort to Aggregate") { + val data = Seq( + WindowData(1, "d", 10), + WindowData(2, "a", 6), + WindowData(3, "b", 7), + WindowData(4, "b", 8), + WindowData(5, "c", 9), + WindowData(6, "c", 11) + ) + sparkContext.parallelize(data).toDF().registerTempTable("windowData") + + checkAnswer( + sql( + """ + |select area, sum(product) over () as c from windowData + |where product > 3 group by area, product + |having avg(month) > 0 order by avg(month), product + """.stripMargin), + Seq( + ("a", 51), + ("b", 51), + ("b", 51), + ("c", 51), + ("c", 51), + ("d", 51) + ).map(i => Row(i._1, i._2))) + } + + test("window function: multiple window expressions in a single expression") { + val nums = sparkContext.parallelize(1 to 10).map(x => (x, x % 2)).toDF("x", "y") + nums.registerTempTable("nums") + + val expected = + Row(1, 1, 1, 55, 1, 57) :: + Row(0, 2, 3, 55, 2, 60) :: + Row(1, 3, 6, 55, 4, 65) :: + Row(0, 4, 10, 55, 6, 71) :: + Row(1, 5, 15, 55, 9, 79) :: + Row(0, 6, 21, 55, 12, 88) :: + Row(1, 7, 28, 55, 16, 99) :: + Row(0, 8, 36, 55, 20, 111) :: + Row(1, 9, 45, 55, 25, 125) :: + Row(0, 10, 55, 55, 30, 140) :: Nil + + val actual = sql( + """ + |SELECT + | y, + | x, + | sum(x) OVER w1 AS running_sum, + | sum(x) OVER w2 AS total_sum, + | sum(x) OVER w3 AS running_sum_per_y, + | ((sum(x) OVER w1) + (sum(x) OVER w2) + (sum(x) OVER w3)) as combined2 + |FROM nums + |WINDOW w1 AS (ORDER BY x ROWS BETWEEN UnBOUNDED PRECEDiNG AND CuRRENT RoW), + | w2 AS (ORDER BY x ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOuNDED FoLLOWING), + | w3 AS (PARTITION BY y ORDER BY x ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW) + """.stripMargin) + + checkAnswer(actual, expected) + + sqlContext.dropTempTable("nums") + } + + test("SPARK-7595: Window will cause resolve failed with self join") { + sql("SELECT * FROM src") // Force loading of src table. + + checkAnswer(sql( + """ + |with + | v1 as (select key, count(value) over (partition by key) cnt_val from src), + | v2 as (select v1.key, v1_lag.cnt_val from v1, v1 v1_lag where v1.key = v1_lag.key) + | select * from v2 order by key limit 1 + """.stripMargin), Row(0, 3)) + } +} -- cgit v1.2.3