aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorReynold Xin <rxin@databricks.com>2016-04-22 22:50:32 -0700
committerReynold Xin <rxin@databricks.com>2016-04-22 22:50:32 -0700
commit95faa731c15ce2e36373071a405207165818df97 (patch)
treef980422766b7bec11314378ee3c8c6748cfc4a47
parentbdde010edbc79e506e183e2b9a2b9b19f7b226fb (diff)
downloadspark-95faa731c15ce2e36373071a405207165818df97.tar.gz
spark-95faa731c15ce2e36373071a405207165818df97.tar.bz2
spark-95faa731c15ce2e36373071a405207165818df97.zip
[SPARK-14866][SQL] Break SQLQuerySuite out into smaller test suites
## What changes were proposed in this pull request? This patch breaks SQLQuerySuite out into smaller test suites. It was a little bit too large for debugging. ## How was this patch tested? This is a test only change. Author: Reynold Xin <rxin@databricks.com> Closes #12630 from rxin/SPARK-14866.
-rw-r--r--sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala6
-rw-r--r--sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala509
-rw-r--r--sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLViewSuite.scala199
-rw-r--r--sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLWindowFunctionSuite.scala370
4 files changed, 572 insertions, 512 deletions
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
index 182e459d8f..24558d5b8c 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
@@ -407,7 +407,7 @@ class Analyzer(
* Replaces [[UnresolvedRelation]]s with concrete relations from the catalog.
*/
object ResolveRelations extends Rule[LogicalPlan] {
- private def getTable(u: UnresolvedRelation): LogicalPlan = {
+ private def lookupTableFromCatalog(u: UnresolvedRelation): LogicalPlan = {
try {
catalog.lookupRelation(u.tableIdentifier, u.alias)
} catch {
@@ -418,10 +418,10 @@ class Analyzer(
def apply(plan: LogicalPlan): LogicalPlan = plan resolveOperators {
case i @ InsertIntoTable(u: UnresolvedRelation, _, _, _, _) =>
- i.copy(table = EliminateSubqueryAliases(getTable(u)))
+ i.copy(table = EliminateSubqueryAliases(lookupTableFromCatalog(u)))
case u: UnresolvedRelation =>
try {
- getTable(u)
+ lookupTableFromCatalog(u)
} catch {
case _: AnalysisException if u.tableIdentifier.database.isDefined =>
// delay the exception into CheckAnalysis, then it could be resolved as data source.
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala
index 80d54f0960..0ecc9807b2 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala
@@ -49,11 +49,6 @@ case class Order(
state: String,
month: Int)
-case class WindowData(
- month: Int,
- area: String,
- product: Int)
-
/**
* A collection of hive query tests where we generate the answers ourselves instead of depending on
* Hive to generate them (in contrast to HiveQuerySuite). Often this is because the query is
@@ -812,197 +807,6 @@ class SQLQuerySuite extends QueryTest with SQLTestUtils with TestHiveSingleton {
""".stripMargin), (2 to 6).map(i => Row(i)))
}
- test("window function: udaf with aggregate expression") {
- val data = Seq(
- WindowData(1, "a", 5),
- WindowData(2, "a", 6),
- WindowData(3, "b", 7),
- WindowData(4, "b", 8),
- WindowData(5, "c", 9),
- WindowData(6, "c", 10)
- )
- sparkContext.parallelize(data).toDF().registerTempTable("windowData")
-
- checkAnswer(
- sql(
- """
- |select area, sum(product), sum(sum(product)) over (partition by area)
- |from windowData group by month, area
- """.stripMargin),
- Seq(
- ("a", 5, 11),
- ("a", 6, 11),
- ("b", 7, 15),
- ("b", 8, 15),
- ("c", 9, 19),
- ("c", 10, 19)
- ).map(i => Row(i._1, i._2, i._3)))
-
- checkAnswer(
- sql(
- """
- |select area, sum(product) - 1, sum(sum(product)) over (partition by area)
- |from windowData group by month, area
- """.stripMargin),
- Seq(
- ("a", 4, 11),
- ("a", 5, 11),
- ("b", 6, 15),
- ("b", 7, 15),
- ("c", 8, 19),
- ("c", 9, 19)
- ).map(i => Row(i._1, i._2, i._3)))
-
- checkAnswer(
- sql(
- """
- |select area, sum(product), sum(product) / sum(sum(product)) over (partition by area)
- |from windowData group by month, area
- """.stripMargin),
- Seq(
- ("a", 5, 5d/11),
- ("a", 6, 6d/11),
- ("b", 7, 7d/15),
- ("b", 8, 8d/15),
- ("c", 10, 10d/19),
- ("c", 9, 9d/19)
- ).map(i => Row(i._1, i._2, i._3)))
-
- checkAnswer(
- sql(
- """
- |select area, sum(product), sum(product) / sum(sum(product) - 1) over (partition by area)
- |from windowData group by month, area
- """.stripMargin),
- Seq(
- ("a", 5, 5d/9),
- ("a", 6, 6d/9),
- ("b", 7, 7d/13),
- ("b", 8, 8d/13),
- ("c", 10, 10d/17),
- ("c", 9, 9d/17)
- ).map(i => Row(i._1, i._2, i._3)))
- }
-
- test("window function: refer column in inner select block") {
- val data = Seq(
- WindowData(1, "a", 5),
- WindowData(2, "a", 6),
- WindowData(3, "b", 7),
- WindowData(4, "b", 8),
- WindowData(5, "c", 9),
- WindowData(6, "c", 10)
- )
- sparkContext.parallelize(data).toDF().registerTempTable("windowData")
-
- checkAnswer(
- sql(
- """
- |select area, rank() over (partition by area order by tmp.month) + tmp.tmp1 as c1
- |from (select month, area, product, 1 as tmp1 from windowData) tmp
- """.stripMargin),
- Seq(
- ("a", 2),
- ("a", 3),
- ("b", 2),
- ("b", 3),
- ("c", 2),
- ("c", 3)
- ).map(i => Row(i._1, i._2)))
- }
-
- test("window function: partition and order expressions") {
- val data = Seq(
- WindowData(1, "a", 5),
- WindowData(2, "a", 6),
- WindowData(3, "b", 7),
- WindowData(4, "b", 8),
- WindowData(5, "c", 9),
- WindowData(6, "c", 10)
- )
- sparkContext.parallelize(data).toDF().registerTempTable("windowData")
-
- checkAnswer(
- sql(
- """
- |select month, area, product, sum(product + 1) over (partition by 1 order by 2)
- |from windowData
- """.stripMargin),
- Seq(
- (1, "a", 5, 51),
- (2, "a", 6, 51),
- (3, "b", 7, 51),
- (4, "b", 8, 51),
- (5, "c", 9, 51),
- (6, "c", 10, 51)
- ).map(i => Row(i._1, i._2, i._3, i._4)))
-
- checkAnswer(
- sql(
- """
- |select month, area, product, sum(product)
- |over (partition by month % 2 order by 10 - product)
- |from windowData
- """.stripMargin),
- Seq(
- (1, "a", 5, 21),
- (2, "a", 6, 24),
- (3, "b", 7, 16),
- (4, "b", 8, 18),
- (5, "c", 9, 9),
- (6, "c", 10, 10)
- ).map(i => Row(i._1, i._2, i._3, i._4)))
- }
-
- test("window function: distinct should not be silently ignored") {
- val data = Seq(
- WindowData(1, "a", 5),
- WindowData(2, "a", 6),
- WindowData(3, "b", 7),
- WindowData(4, "b", 8),
- WindowData(5, "c", 9),
- WindowData(6, "c", 10)
- )
- sparkContext.parallelize(data).toDF().registerTempTable("windowData")
-
- val e = intercept[AnalysisException] {
- sql(
- """
- |select month, area, product, sum(distinct product + 1) over (partition by 1 order by 2)
- |from windowData
- """.stripMargin)
- }
- assert(e.getMessage.contains("Distinct window functions are not supported"))
- }
-
- test("window function: expressions in arguments of a window functions") {
- val data = Seq(
- WindowData(1, "a", 5),
- WindowData(2, "a", 6),
- WindowData(3, "b", 7),
- WindowData(4, "b", 8),
- WindowData(5, "c", 9),
- WindowData(6, "c", 10)
- )
- sparkContext.parallelize(data).toDF().registerTempTable("windowData")
-
- checkAnswer(
- sql(
- """
- |select month, area, month % 2,
- |lag(product, 1 + 1, product) over (partition by month % 2 order by area)
- |from windowData
- """.stripMargin),
- Seq(
- (1, "a", 1, 5),
- (2, "a", 0, 6),
- (3, "b", 1, 7),
- (4, "b", 0, 8),
- (5, "c", 1, 5),
- (6, "c", 0, 6)
- ).map(i => Row(i._1, i._2, i._3, i._4)))
- }
-
test("Sorting columns are not in Generate") {
withTempTable("data") {
sqlContext.range(1, 5)
@@ -1030,139 +834,6 @@ class SQLQuerySuite extends QueryTest with SQLTestUtils with TestHiveSingleton {
}
}
- test("window function: Sorting columns are not in Project") {
- val data = Seq(
- WindowData(1, "d", 10),
- WindowData(2, "a", 6),
- WindowData(3, "b", 7),
- WindowData(4, "b", 8),
- WindowData(5, "c", 9),
- WindowData(6, "c", 11)
- )
- sparkContext.parallelize(data).toDF().registerTempTable("windowData")
-
- checkAnswer(
- sql("select month, product, sum(product + 1) over() from windowData order by area"),
- Seq(
- (2, 6, 57),
- (3, 7, 57),
- (4, 8, 57),
- (5, 9, 57),
- (6, 11, 57),
- (1, 10, 57)
- ).map(i => Row(i._1, i._2, i._3)))
-
- checkAnswer(
- sql(
- """
- |select area, rank() over (partition by area order by tmp.month) + tmp.tmp1 as c1
- |from (select month, area, product as p, 1 as tmp1 from windowData) tmp order by p
- """.stripMargin),
- Seq(
- ("a", 2),
- ("b", 2),
- ("b", 3),
- ("c", 2),
- ("d", 2),
- ("c", 3)
- ).map(i => Row(i._1, i._2)))
-
- checkAnswer(
- sql(
- """
- |select area, rank() over (partition by area order by month) as c1
- |from windowData group by product, area, month order by product, area
- """.stripMargin),
- Seq(
- ("a", 1),
- ("b", 1),
- ("b", 2),
- ("c", 1),
- ("d", 1),
- ("c", 2)
- ).map(i => Row(i._1, i._2)))
-
- checkAnswer(
- sql(
- """
- |select area, sum(product) / sum(sum(product)) over (partition by area) as c1
- |from windowData group by area, month order by month, c1
- """.stripMargin),
- Seq(
- ("d", 1.0),
- ("a", 1.0),
- ("b", 0.4666666666666667),
- ("b", 0.5333333333333333),
- ("c", 0.45),
- ("c", 0.55)
- ).map(i => Row(i._1, i._2)))
- }
-
- // todo: fix this test case by reimplementing the function ResolveAggregateFunctions
- ignore("window function: Pushing aggregate Expressions in Sort to Aggregate") {
- val data = Seq(
- WindowData(1, "d", 10),
- WindowData(2, "a", 6),
- WindowData(3, "b", 7),
- WindowData(4, "b", 8),
- WindowData(5, "c", 9),
- WindowData(6, "c", 11)
- )
- sparkContext.parallelize(data).toDF().registerTempTable("windowData")
-
- checkAnswer(
- sql(
- """
- |select area, sum(product) over () as c from windowData
- |where product > 3 group by area, product
- |having avg(month) > 0 order by avg(month), product
- """.stripMargin),
- Seq(
- ("a", 51),
- ("b", 51),
- ("b", 51),
- ("c", 51),
- ("c", 51),
- ("d", 51)
- ).map(i => Row(i._1, i._2)))
- }
-
- test("window function: multiple window expressions in a single expression") {
- val nums = sparkContext.parallelize(1 to 10).map(x => (x, x % 2)).toDF("x", "y")
- nums.registerTempTable("nums")
-
- val expected =
- Row(1, 1, 1, 55, 1, 57) ::
- Row(0, 2, 3, 55, 2, 60) ::
- Row(1, 3, 6, 55, 4, 65) ::
- Row(0, 4, 10, 55, 6, 71) ::
- Row(1, 5, 15, 55, 9, 79) ::
- Row(0, 6, 21, 55, 12, 88) ::
- Row(1, 7, 28, 55, 16, 99) ::
- Row(0, 8, 36, 55, 20, 111) ::
- Row(1, 9, 45, 55, 25, 125) ::
- Row(0, 10, 55, 55, 30, 140) :: Nil
-
- val actual = sql(
- """
- |SELECT
- | y,
- | x,
- | sum(x) OVER w1 AS running_sum,
- | sum(x) OVER w2 AS total_sum,
- | sum(x) OVER w3 AS running_sum_per_y,
- | ((sum(x) OVER w1) + (sum(x) OVER w2) + (sum(x) OVER w3)) as combined2
- |FROM nums
- |WINDOW w1 AS (ORDER BY x ROWS BETWEEN UnBOUNDED PRECEDiNG AND CuRRENT RoW),
- | w2 AS (ORDER BY x ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOuNDED FoLLOWING),
- | w3 AS (PARTITION BY y ORDER BY x ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW)
- """.stripMargin)
-
- checkAnswer(actual, expected)
-
- dropTempTable("nums")
- }
-
test("test case key when") {
(1 to 5).map(i => (i, i.toString)).toDF("k", "v").registerTempTable("t")
checkAnswer(
@@ -1170,18 +841,6 @@ class SQLQuerySuite extends QueryTest with SQLTestUtils with TestHiveSingleton {
Row(0, "1") :: Row(22, "2") :: Row(0, "3") :: Row(44, "4") :: Row(0, "5") :: Nil)
}
- test("SPARK-7595: Window will cause resolve failed with self join") {
- sql("SELECT * FROM src") // Force loading of src table.
-
- checkAnswer(sql(
- """
- |with
- | v1 as (select key, count(value) over (partition by key) cnt_val from src),
- | v2 as (select v1.key, v1_lag.cnt_val from v1, v1 v1_lag where v1.key = v1_lag.key)
- | select * from v2 order by key limit 1
- """.stripMargin), Row(0, 3))
- }
-
test("SPARK-7269 Check analysis failed in case in-sensitive") {
Seq(1, 2, 3).map { i =>
(i.toString, i.toString)
@@ -1485,174 +1144,6 @@ class SQLQuerySuite extends QueryTest with SQLTestUtils with TestHiveSingleton {
})
}
- test("correctly parse CREATE VIEW statement") {
- withSQLConf(SQLConf.NATIVE_VIEW.key -> "true") {
- withTable("jt") {
- val df = (1 until 10).map(i => i -> i).toDF("i", "j")
- df.write.format("json").saveAsTable("jt")
- sql(
- """CREATE VIEW IF NOT EXISTS
- |default.testView (c1 COMMENT 'blabla', c2 COMMENT 'blabla')
- |TBLPROPERTIES ('a' = 'b')
- |AS SELECT * FROM jt""".stripMargin)
- checkAnswer(sql("SELECT c1, c2 FROM testView ORDER BY c1"), (1 to 9).map(i => Row(i, i)))
- sql("DROP VIEW testView")
- }
- }
- }
-
- test("correctly handle CREATE VIEW IF NOT EXISTS") {
- withSQLConf(SQLConf.NATIVE_VIEW.key -> "true") {
- withTable("jt", "jt2") {
- sqlContext.range(1, 10).write.format("json").saveAsTable("jt")
- sql("CREATE VIEW testView AS SELECT id FROM jt")
-
- val df = (1 until 10).map(i => i -> i).toDF("i", "j")
- df.write.format("json").saveAsTable("jt2")
- sql("CREATE VIEW IF NOT EXISTS testView AS SELECT * FROM jt2")
-
- // make sure our view doesn't change.
- checkAnswer(sql("SELECT * FROM testView ORDER BY id"), (1 to 9).map(i => Row(i)))
- sql("DROP VIEW testView")
- }
- }
- }
-
- Seq(true, false).foreach { enabled =>
- val prefix = (if (enabled) "With" else "Without") + " canonical native view: "
- test(s"$prefix correctly handle CREATE OR REPLACE VIEW") {
- withSQLConf(
- SQLConf.NATIVE_VIEW.key -> "true", SQLConf.CANONICAL_NATIVE_VIEW.key -> enabled.toString) {
- withTable("jt", "jt2") {
- sqlContext.range(1, 10).write.format("json").saveAsTable("jt")
- sql("CREATE OR REPLACE VIEW testView AS SELECT id FROM jt")
- checkAnswer(sql("SELECT * FROM testView ORDER BY id"), (1 to 9).map(i => Row(i)))
-
- val df = (1 until 10).map(i => i -> i).toDF("i", "j")
- df.write.format("json").saveAsTable("jt2")
- sql("CREATE OR REPLACE VIEW testView AS SELECT * FROM jt2")
- // make sure the view has been changed.
- checkAnswer(sql("SELECT * FROM testView ORDER BY i"), (1 to 9).map(i => Row(i, i)))
-
- sql("DROP VIEW testView")
-
- val e = intercept[AnalysisException] {
- sql("CREATE OR REPLACE VIEW IF NOT EXISTS testView AS SELECT id FROM jt")
- }
- assert(e.message.contains("not allowed to define a view"))
- }
- }
- }
-
- test(s"$prefix correctly handle ALTER VIEW") {
- withSQLConf(
- SQLConf.NATIVE_VIEW.key -> "true", SQLConf.CANONICAL_NATIVE_VIEW.key -> enabled.toString) {
- withTable("jt", "jt2") {
- withView("testView") {
- sqlContext.range(1, 10).write.format("json").saveAsTable("jt")
- sql("CREATE VIEW testView AS SELECT id FROM jt")
-
- val df = (1 until 10).map(i => i -> i).toDF("i", "j")
- df.write.format("json").saveAsTable("jt2")
- sql("ALTER VIEW testView AS SELECT * FROM jt2")
- // make sure the view has been changed.
- checkAnswer(sql("SELECT * FROM testView ORDER BY i"), (1 to 9).map(i => Row(i, i)))
- }
- }
- }
- }
-
- test(s"$prefix create hive view for json table") {
- // json table is not hive-compatible, make sure the new flag fix it.
- withSQLConf(
- SQLConf.NATIVE_VIEW.key -> "true", SQLConf.CANONICAL_NATIVE_VIEW.key -> enabled.toString) {
- withTable("jt") {
- withView("testView") {
- sqlContext.range(1, 10).write.format("json").saveAsTable("jt")
- sql("CREATE VIEW testView AS SELECT id FROM jt")
- checkAnswer(sql("SELECT * FROM testView ORDER BY id"), (1 to 9).map(i => Row(i)))
- }
- }
- }
- }
-
- test(s"$prefix create hive view for partitioned parquet table") {
- // partitioned parquet table is not hive-compatible, make sure the new flag fix it.
- withSQLConf(
- SQLConf.NATIVE_VIEW.key -> "true", SQLConf.CANONICAL_NATIVE_VIEW.key -> enabled.toString) {
- withTable("parTable") {
- withView("testView") {
- val df = Seq(1 -> "a").toDF("i", "j")
- df.write.format("parquet").partitionBy("i").saveAsTable("parTable")
- sql("CREATE VIEW testView AS SELECT i, j FROM parTable")
- checkAnswer(sql("SELECT * FROM testView"), Row(1, "a"))
- }
- }
- }
- }
- }
-
- test("CTE within view") {
- withSQLConf(
- SQLConf.NATIVE_VIEW.key -> "true", SQLConf.CANONICAL_NATIVE_VIEW.key -> "true") {
- withView("cte_view") {
- sql("CREATE VIEW cte_view AS WITH w AS (SELECT 1 AS n) SELECT n FROM w")
- checkAnswer(sql("SELECT * FROM cte_view"), Row(1))
- }
- }
- }
-
- test("Using view after switching current database") {
- withSQLConf(
- SQLConf.NATIVE_VIEW.key -> "true", SQLConf.CANONICAL_NATIVE_VIEW.key -> "true") {
- withView("v") {
- sql("CREATE VIEW v AS SELECT * FROM src")
- withTempDatabase { db =>
- activateDatabase(db) {
- // Should look up table `src` in database `default`.
- checkAnswer(sql("SELECT * FROM default.v"), sql("SELECT * FROM default.src"))
-
- // The new `src` table shouldn't be scanned.
- sql("CREATE TABLE src(key INT, value STRING)")
- checkAnswer(sql("SELECT * FROM default.v"), sql("SELECT * FROM default.src"))
- }
- }
- }
- }
- }
-
- test("Using view after adding more columns") {
- withSQLConf(
- SQLConf.NATIVE_VIEW.key -> "true", SQLConf.CANONICAL_NATIVE_VIEW.key -> "true") {
- withTable("add_col") {
- sqlContext.range(10).write.saveAsTable("add_col")
- withView("v") {
- sql("CREATE VIEW v AS SELECT * FROM add_col")
- sqlContext.range(10).select('id, 'id as 'a).write.mode("overwrite").saveAsTable("add_col")
- checkAnswer(sql("SELECT * FROM v"), sqlContext.range(10).toDF())
- }
- }
- }
- }
-
- test("create hive view for joined tables") {
- // make sure the new flag can handle some complex cases like join and schema change.
- withSQLConf(SQLConf.NATIVE_VIEW.key -> "true") {
- withTable("jt1", "jt2") {
- sqlContext.range(1, 10).toDF("id1").write.format("json").saveAsTable("jt1")
- sqlContext.range(1, 10).toDF("id2").write.format("json").saveAsTable("jt2")
- sql("CREATE VIEW testView AS SELECT * FROM jt1 JOIN jt2 ON id1 == id2")
- checkAnswer(sql("SELECT * FROM testView ORDER BY id1"), (1 to 9).map(i => Row(i, i)))
-
- val df = (1 until 10).map(i => i -> i).toDF("id1", "newCol")
- df.write.format("json").mode(SaveMode.Overwrite).saveAsTable("jt1")
- checkAnswer(sql("SELECT * FROM testView ORDER BY id1"), (1 to 9).map(i => Row(i, i)))
-
- sql("DROP VIEW testView")
- }
- }
- }
-
test("SPARK-8976 Wrong Result for Rollup #1") {
checkAnswer(sql(
"SELECT count(*) AS cnt, key % 5, grouping_id() FROM src GROUP BY key%5 WITH ROLLUP"),
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLViewSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLViewSuite.scala
new file mode 100644
index 0000000000..cdd5cb31d9
--- /dev/null
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLViewSuite.scala
@@ -0,0 +1,199 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.hive.execution
+
+import org.apache.spark.sql.{AnalysisException, QueryTest, Row, SaveMode}
+import org.apache.spark.sql.hive.test.TestHiveSingleton
+import org.apache.spark.sql.internal.SQLConf
+import org.apache.spark.sql.test.SQLTestUtils
+
+/**
+ * A suite for testing view related functionality.
+ */
+class SQLViewSuite extends QueryTest with SQLTestUtils with TestHiveSingleton {
+ import hiveContext.implicits._
+
+ test("correctly parse CREATE VIEW statement") {
+ withSQLConf(SQLConf.NATIVE_VIEW.key -> "true") {
+ withTable("jt") {
+ val df = (1 until 10).map(i => i -> i).toDF("i", "j")
+ df.write.format("json").saveAsTable("jt")
+ sql(
+ """CREATE VIEW IF NOT EXISTS
+ |default.testView (c1 COMMENT 'blabla', c2 COMMENT 'blabla')
+ |TBLPROPERTIES ('a' = 'b')
+ |AS SELECT * FROM jt""".stripMargin)
+ checkAnswer(sql("SELECT c1, c2 FROM testView ORDER BY c1"), (1 to 9).map(i => Row(i, i)))
+ sql("DROP VIEW testView")
+ }
+ }
+ }
+
+ test("correctly handle CREATE VIEW IF NOT EXISTS") {
+ withSQLConf(SQLConf.NATIVE_VIEW.key -> "true") {
+ withTable("jt", "jt2") {
+ sqlContext.range(1, 10).write.format("json").saveAsTable("jt")
+ sql("CREATE VIEW testView AS SELECT id FROM jt")
+
+ val df = (1 until 10).map(i => i -> i).toDF("i", "j")
+ df.write.format("json").saveAsTable("jt2")
+ sql("CREATE VIEW IF NOT EXISTS testView AS SELECT * FROM jt2")
+
+ // make sure our view doesn't change.
+ checkAnswer(sql("SELECT * FROM testView ORDER BY id"), (1 to 9).map(i => Row(i)))
+ sql("DROP VIEW testView")
+ }
+ }
+ }
+
+ Seq(true, false).foreach { enabled =>
+ val prefix = (if (enabled) "With" else "Without") + " canonical native view: "
+ test(s"$prefix correctly handle CREATE OR REPLACE VIEW") {
+ withSQLConf(
+ SQLConf.NATIVE_VIEW.key -> "true", SQLConf.CANONICAL_NATIVE_VIEW.key -> enabled.toString) {
+ withTable("jt", "jt2") {
+ sqlContext.range(1, 10).write.format("json").saveAsTable("jt")
+ sql("CREATE OR REPLACE VIEW testView AS SELECT id FROM jt")
+ checkAnswer(sql("SELECT * FROM testView ORDER BY id"), (1 to 9).map(i => Row(i)))
+
+ val df = (1 until 10).map(i => i -> i).toDF("i", "j")
+ df.write.format("json").saveAsTable("jt2")
+ sql("CREATE OR REPLACE VIEW testView AS SELECT * FROM jt2")
+ // make sure the view has been changed.
+ checkAnswer(sql("SELECT * FROM testView ORDER BY i"), (1 to 9).map(i => Row(i, i)))
+
+ sql("DROP VIEW testView")
+
+ val e = intercept[AnalysisException] {
+ sql("CREATE OR REPLACE VIEW IF NOT EXISTS testView AS SELECT id FROM jt")
+ }
+ assert(e.message.contains("not allowed to define a view"))
+ }
+ }
+ }
+
+ test(s"$prefix correctly handle ALTER VIEW") {
+ withSQLConf(
+ SQLConf.NATIVE_VIEW.key -> "true", SQLConf.CANONICAL_NATIVE_VIEW.key -> enabled.toString) {
+ withTable("jt", "jt2") {
+ withView("testView") {
+ sqlContext.range(1, 10).write.format("json").saveAsTable("jt")
+ sql("CREATE VIEW testView AS SELECT id FROM jt")
+
+ val df = (1 until 10).map(i => i -> i).toDF("i", "j")
+ df.write.format("json").saveAsTable("jt2")
+ sql("ALTER VIEW testView AS SELECT * FROM jt2")
+ // make sure the view has been changed.
+ checkAnswer(sql("SELECT * FROM testView ORDER BY i"), (1 to 9).map(i => Row(i, i)))
+ }
+ }
+ }
+ }
+
+ test(s"$prefix create hive view for json table") {
+ // json table is not hive-compatible, make sure the new flag fix it.
+ withSQLConf(
+ SQLConf.NATIVE_VIEW.key -> "true", SQLConf.CANONICAL_NATIVE_VIEW.key -> enabled.toString) {
+ withTable("jt") {
+ withView("testView") {
+ sqlContext.range(1, 10).write.format("json").saveAsTable("jt")
+ sql("CREATE VIEW testView AS SELECT id FROM jt")
+ checkAnswer(sql("SELECT * FROM testView ORDER BY id"), (1 to 9).map(i => Row(i)))
+ }
+ }
+ }
+ }
+
+ test(s"$prefix create hive view for partitioned parquet table") {
+ // partitioned parquet table is not hive-compatible, make sure the new flag fix it.
+ withSQLConf(
+ SQLConf.NATIVE_VIEW.key -> "true", SQLConf.CANONICAL_NATIVE_VIEW.key -> enabled.toString) {
+ withTable("parTable") {
+ withView("testView") {
+ val df = Seq(1 -> "a").toDF("i", "j")
+ df.write.format("parquet").partitionBy("i").saveAsTable("parTable")
+ sql("CREATE VIEW testView AS SELECT i, j FROM parTable")
+ checkAnswer(sql("SELECT * FROM testView"), Row(1, "a"))
+ }
+ }
+ }
+ }
+ }
+
+ test("CTE within view") {
+ withSQLConf(
+ SQLConf.NATIVE_VIEW.key -> "true", SQLConf.CANONICAL_NATIVE_VIEW.key -> "true") {
+ withView("cte_view") {
+ sql("CREATE VIEW cte_view AS WITH w AS (SELECT 1 AS n) SELECT n FROM w")
+ checkAnswer(sql("SELECT * FROM cte_view"), Row(1))
+ }
+ }
+ }
+
+ test("Using view after switching current database") {
+ withSQLConf(
+ SQLConf.NATIVE_VIEW.key -> "true", SQLConf.CANONICAL_NATIVE_VIEW.key -> "true") {
+ withView("v") {
+ sql("CREATE VIEW v AS SELECT * FROM src")
+ withTempDatabase { db =>
+ activateDatabase(db) {
+ // Should look up table `src` in database `default`.
+ checkAnswer(sql("SELECT * FROM default.v"), sql("SELECT * FROM default.src"))
+
+ // The new `src` table shouldn't be scanned.
+ sql("CREATE TABLE src(key INT, value STRING)")
+ checkAnswer(sql("SELECT * FROM default.v"), sql("SELECT * FROM default.src"))
+ }
+ }
+ }
+ }
+ }
+
+ test("Using view after adding more columns") {
+ withSQLConf(
+ SQLConf.NATIVE_VIEW.key -> "true", SQLConf.CANONICAL_NATIVE_VIEW.key -> "true") {
+ withTable("add_col") {
+ sqlContext.range(10).write.saveAsTable("add_col")
+ withView("v") {
+ sql("CREATE VIEW v AS SELECT * FROM add_col")
+ sqlContext.range(10).select('id, 'id as 'a).write.mode("overwrite").saveAsTable("add_col")
+ checkAnswer(sql("SELECT * FROM v"), sqlContext.range(10).toDF())
+ }
+ }
+ }
+ }
+
+ test("create hive view for joined tables") {
+ // make sure the new flag can handle some complex cases like join and schema change.
+ withSQLConf(SQLConf.NATIVE_VIEW.key -> "true") {
+ withTable("jt1", "jt2") {
+ sqlContext.range(1, 10).toDF("id1").write.format("json").saveAsTable("jt1")
+ sqlContext.range(1, 10).toDF("id2").write.format("json").saveAsTable("jt2")
+ sql("CREATE VIEW testView AS SELECT * FROM jt1 JOIN jt2 ON id1 == id2")
+ checkAnswer(sql("SELECT * FROM testView ORDER BY id1"), (1 to 9).map(i => Row(i, i)))
+
+ val df = (1 until 10).map(i => i -> i).toDF("id1", "newCol")
+ df.write.format("json").mode(SaveMode.Overwrite).saveAsTable("jt1")
+ checkAnswer(sql("SELECT * FROM testView ORDER BY id1"), (1 to 9).map(i => Row(i, i)))
+
+ sql("DROP VIEW testView")
+ }
+ }
+ }
+
+}
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLWindowFunctionSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLWindowFunctionSuite.scala
new file mode 100644
index 0000000000..d0e7552c12
--- /dev/null
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLWindowFunctionSuite.scala
@@ -0,0 +1,370 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.hive.execution
+
+import org.apache.spark.sql.{AnalysisException, QueryTest, Row}
+import org.apache.spark.sql.hive.test.TestHiveSingleton
+import org.apache.spark.sql.test.SQLTestUtils
+
+
+case class WindowData(month: Int, area: String, product: Int)
+
+
+/**
+ * Test suite for SQL window functions.
+ */
+class SQLWindowFunctionSuite extends QueryTest with SQLTestUtils with TestHiveSingleton {
+ import hiveContext.implicits._
+
+ test("window function: udaf with aggregate expression") {
+ val data = Seq(
+ WindowData(1, "a", 5),
+ WindowData(2, "a", 6),
+ WindowData(3, "b", 7),
+ WindowData(4, "b", 8),
+ WindowData(5, "c", 9),
+ WindowData(6, "c", 10)
+ )
+ sparkContext.parallelize(data).toDF().registerTempTable("windowData")
+
+ checkAnswer(
+ sql(
+ """
+ |select area, sum(product), sum(sum(product)) over (partition by area)
+ |from windowData group by month, area
+ """.stripMargin),
+ Seq(
+ ("a", 5, 11),
+ ("a", 6, 11),
+ ("b", 7, 15),
+ ("b", 8, 15),
+ ("c", 9, 19),
+ ("c", 10, 19)
+ ).map(i => Row(i._1, i._2, i._3)))
+
+ checkAnswer(
+ sql(
+ """
+ |select area, sum(product) - 1, sum(sum(product)) over (partition by area)
+ |from windowData group by month, area
+ """.stripMargin),
+ Seq(
+ ("a", 4, 11),
+ ("a", 5, 11),
+ ("b", 6, 15),
+ ("b", 7, 15),
+ ("c", 8, 19),
+ ("c", 9, 19)
+ ).map(i => Row(i._1, i._2, i._3)))
+
+ checkAnswer(
+ sql(
+ """
+ |select area, sum(product), sum(product) / sum(sum(product)) over (partition by area)
+ |from windowData group by month, area
+ """.stripMargin),
+ Seq(
+ ("a", 5, 5d/11),
+ ("a", 6, 6d/11),
+ ("b", 7, 7d/15),
+ ("b", 8, 8d/15),
+ ("c", 10, 10d/19),
+ ("c", 9, 9d/19)
+ ).map(i => Row(i._1, i._2, i._3)))
+
+ checkAnswer(
+ sql(
+ """
+ |select area, sum(product), sum(product) / sum(sum(product) - 1) over (partition by area)
+ |from windowData group by month, area
+ """.stripMargin),
+ Seq(
+ ("a", 5, 5d/9),
+ ("a", 6, 6d/9),
+ ("b", 7, 7d/13),
+ ("b", 8, 8d/13),
+ ("c", 10, 10d/17),
+ ("c", 9, 9d/17)
+ ).map(i => Row(i._1, i._2, i._3)))
+ }
+
+ test("window function: refer column in inner select block") {
+ val data = Seq(
+ WindowData(1, "a", 5),
+ WindowData(2, "a", 6),
+ WindowData(3, "b", 7),
+ WindowData(4, "b", 8),
+ WindowData(5, "c", 9),
+ WindowData(6, "c", 10)
+ )
+ sparkContext.parallelize(data).toDF().registerTempTable("windowData")
+
+ checkAnswer(
+ sql(
+ """
+ |select area, rank() over (partition by area order by tmp.month) + tmp.tmp1 as c1
+ |from (select month, area, product, 1 as tmp1 from windowData) tmp
+ """.stripMargin),
+ Seq(
+ ("a", 2),
+ ("a", 3),
+ ("b", 2),
+ ("b", 3),
+ ("c", 2),
+ ("c", 3)
+ ).map(i => Row(i._1, i._2)))
+ }
+
+ test("window function: partition and order expressions") {
+ val data = Seq(
+ WindowData(1, "a", 5),
+ WindowData(2, "a", 6),
+ WindowData(3, "b", 7),
+ WindowData(4, "b", 8),
+ WindowData(5, "c", 9),
+ WindowData(6, "c", 10)
+ )
+ sparkContext.parallelize(data).toDF().registerTempTable("windowData")
+
+ checkAnswer(
+ sql(
+ """
+ |select month, area, product, sum(product + 1) over (partition by 1 order by 2)
+ |from windowData
+ """.stripMargin),
+ Seq(
+ (1, "a", 5, 51),
+ (2, "a", 6, 51),
+ (3, "b", 7, 51),
+ (4, "b", 8, 51),
+ (5, "c", 9, 51),
+ (6, "c", 10, 51)
+ ).map(i => Row(i._1, i._2, i._3, i._4)))
+
+ checkAnswer(
+ sql(
+ """
+ |select month, area, product, sum(product)
+ |over (partition by month % 2 order by 10 - product)
+ |from windowData
+ """.stripMargin),
+ Seq(
+ (1, "a", 5, 21),
+ (2, "a", 6, 24),
+ (3, "b", 7, 16),
+ (4, "b", 8, 18),
+ (5, "c", 9, 9),
+ (6, "c", 10, 10)
+ ).map(i => Row(i._1, i._2, i._3, i._4)))
+ }
+
+ test("window function: distinct should not be silently ignored") {
+ val data = Seq(
+ WindowData(1, "a", 5),
+ WindowData(2, "a", 6),
+ WindowData(3, "b", 7),
+ WindowData(4, "b", 8),
+ WindowData(5, "c", 9),
+ WindowData(6, "c", 10)
+ )
+ sparkContext.parallelize(data).toDF().registerTempTable("windowData")
+
+ val e = intercept[AnalysisException] {
+ sql(
+ """
+ |select month, area, product, sum(distinct product + 1) over (partition by 1 order by 2)
+ |from windowData
+ """.stripMargin)
+ }
+ assert(e.getMessage.contains("Distinct window functions are not supported"))
+ }
+
+ test("window function: expressions in arguments of a window functions") {
+ val data = Seq(
+ WindowData(1, "a", 5),
+ WindowData(2, "a", 6),
+ WindowData(3, "b", 7),
+ WindowData(4, "b", 8),
+ WindowData(5, "c", 9),
+ WindowData(6, "c", 10)
+ )
+ sparkContext.parallelize(data).toDF().registerTempTable("windowData")
+
+ checkAnswer(
+ sql(
+ """
+ |select month, area, month % 2,
+ |lag(product, 1 + 1, product) over (partition by month % 2 order by area)
+ |from windowData
+ """.stripMargin),
+ Seq(
+ (1, "a", 1, 5),
+ (2, "a", 0, 6),
+ (3, "b", 1, 7),
+ (4, "b", 0, 8),
+ (5, "c", 1, 5),
+ (6, "c", 0, 6)
+ ).map(i => Row(i._1, i._2, i._3, i._4)))
+ }
+
+
+ test("window function: Sorting columns are not in Project") {
+ val data = Seq(
+ WindowData(1, "d", 10),
+ WindowData(2, "a", 6),
+ WindowData(3, "b", 7),
+ WindowData(4, "b", 8),
+ WindowData(5, "c", 9),
+ WindowData(6, "c", 11)
+ )
+ sparkContext.parallelize(data).toDF().registerTempTable("windowData")
+
+ checkAnswer(
+ sql("select month, product, sum(product + 1) over() from windowData order by area"),
+ Seq(
+ (2, 6, 57),
+ (3, 7, 57),
+ (4, 8, 57),
+ (5, 9, 57),
+ (6, 11, 57),
+ (1, 10, 57)
+ ).map(i => Row(i._1, i._2, i._3)))
+
+ checkAnswer(
+ sql(
+ """
+ |select area, rank() over (partition by area order by tmp.month) + tmp.tmp1 as c1
+ |from (select month, area, product as p, 1 as tmp1 from windowData) tmp order by p
+ """.stripMargin),
+ Seq(
+ ("a", 2),
+ ("b", 2),
+ ("b", 3),
+ ("c", 2),
+ ("d", 2),
+ ("c", 3)
+ ).map(i => Row(i._1, i._2)))
+
+ checkAnswer(
+ sql(
+ """
+ |select area, rank() over (partition by area order by month) as c1
+ |from windowData group by product, area, month order by product, area
+ """.stripMargin),
+ Seq(
+ ("a", 1),
+ ("b", 1),
+ ("b", 2),
+ ("c", 1),
+ ("d", 1),
+ ("c", 2)
+ ).map(i => Row(i._1, i._2)))
+
+ checkAnswer(
+ sql(
+ """
+ |select area, sum(product) / sum(sum(product)) over (partition by area) as c1
+ |from windowData group by area, month order by month, c1
+ """.stripMargin),
+ Seq(
+ ("d", 1.0),
+ ("a", 1.0),
+ ("b", 0.4666666666666667),
+ ("b", 0.5333333333333333),
+ ("c", 0.45),
+ ("c", 0.55)
+ ).map(i => Row(i._1, i._2)))
+ }
+
+ // todo: fix this test case by reimplementing the function ResolveAggregateFunctions
+ ignore("window function: Pushing aggregate Expressions in Sort to Aggregate") {
+ val data = Seq(
+ WindowData(1, "d", 10),
+ WindowData(2, "a", 6),
+ WindowData(3, "b", 7),
+ WindowData(4, "b", 8),
+ WindowData(5, "c", 9),
+ WindowData(6, "c", 11)
+ )
+ sparkContext.parallelize(data).toDF().registerTempTable("windowData")
+
+ checkAnswer(
+ sql(
+ """
+ |select area, sum(product) over () as c from windowData
+ |where product > 3 group by area, product
+ |having avg(month) > 0 order by avg(month), product
+ """.stripMargin),
+ Seq(
+ ("a", 51),
+ ("b", 51),
+ ("b", 51),
+ ("c", 51),
+ ("c", 51),
+ ("d", 51)
+ ).map(i => Row(i._1, i._2)))
+ }
+
+ test("window function: multiple window expressions in a single expression") {
+ val nums = sparkContext.parallelize(1 to 10).map(x => (x, x % 2)).toDF("x", "y")
+ nums.registerTempTable("nums")
+
+ val expected =
+ Row(1, 1, 1, 55, 1, 57) ::
+ Row(0, 2, 3, 55, 2, 60) ::
+ Row(1, 3, 6, 55, 4, 65) ::
+ Row(0, 4, 10, 55, 6, 71) ::
+ Row(1, 5, 15, 55, 9, 79) ::
+ Row(0, 6, 21, 55, 12, 88) ::
+ Row(1, 7, 28, 55, 16, 99) ::
+ Row(0, 8, 36, 55, 20, 111) ::
+ Row(1, 9, 45, 55, 25, 125) ::
+ Row(0, 10, 55, 55, 30, 140) :: Nil
+
+ val actual = sql(
+ """
+ |SELECT
+ | y,
+ | x,
+ | sum(x) OVER w1 AS running_sum,
+ | sum(x) OVER w2 AS total_sum,
+ | sum(x) OVER w3 AS running_sum_per_y,
+ | ((sum(x) OVER w1) + (sum(x) OVER w2) + (sum(x) OVER w3)) as combined2
+ |FROM nums
+ |WINDOW w1 AS (ORDER BY x ROWS BETWEEN UnBOUNDED PRECEDiNG AND CuRRENT RoW),
+ | w2 AS (ORDER BY x ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOuNDED FoLLOWING),
+ | w3 AS (PARTITION BY y ORDER BY x ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW)
+ """.stripMargin)
+
+ checkAnswer(actual, expected)
+
+ sqlContext.dropTempTable("nums")
+ }
+
+ test("SPARK-7595: Window will cause resolve failed with self join") {
+ sql("SELECT * FROM src") // Force loading of src table.
+
+ checkAnswer(sql(
+ """
+ |with
+ | v1 as (select key, count(value) over (partition by key) cnt_val from src),
+ | v2 as (select v1.key, v1_lag.cnt_val from v1, v1 v1_lag where v1.key = v1_lag.key)
+ | select * from v2 order by key limit 1
+ """.stripMargin), Row(0, 3))
+ }
+}