aboutsummaryrefslogtreecommitdiff
path: root/sql
diff options
context:
space:
mode:
Diffstat (limited to 'sql')
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/command/views.scala39
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala23
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/internal/SQLConfSuite.scala16
-rw-r--r--sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLViewSuite.scala206
4 files changed, 106 insertions, 178 deletions
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/views.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/views.scala
index 088f684365..007fa46943 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/views.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/views.scala
@@ -149,37 +149,18 @@ case class CreateViewCommand(
* SQL based on the analyzed plan, and also creates the proper schema for the view.
*/
private def prepareTable(sparkSession: SparkSession, analyzedPlan: LogicalPlan): CatalogTable = {
- val viewSQL: String =
- if (sparkSession.sessionState.conf.canonicalView) {
- val logicalPlan =
- if (tableDesc.schema.isEmpty) {
- analyzedPlan
- } else {
- val projectList = analyzedPlan.output.zip(tableDesc.schema).map {
- case (attr, col) => Alias(attr, col.name)()
- }
- sparkSession.sessionState.executePlan(Project(projectList, analyzedPlan)).analyzed
- }
- new SQLBuilder(logicalPlan).toSQL
- } else {
- // When user specified column names for view, we should create a project to do the renaming.
- // When no column name specified, we still need to create a project to declare the columns
- // we need, to make us more robust to top level `*`s.
- val viewOutput = {
- val columnNames = analyzedPlan.output.map(f => quote(f.name))
- if (tableDesc.schema.isEmpty) {
- columnNames.mkString(", ")
- } else {
- columnNames.zip(tableDesc.schema.map(f => quote(f.name))).map {
- case (name, alias) => s"$name AS $alias"
- }.mkString(", ")
+ val viewSQL: String = {
+ val logicalPlan =
+ if (tableDesc.schema.isEmpty) {
+ analyzedPlan
+ } else {
+ val projectList = analyzedPlan.output.zip(tableDesc.schema).map {
+ case (attr, col) => Alias(attr, col.name)()
}
+ sparkSession.sessionState.executePlan(Project(projectList, analyzedPlan)).analyzed
}
-
- val viewText = tableDesc.viewText.get
- val viewName = quote(tableDesc.identifier.table)
- s"SELECT $viewOutput FROM ($viewText) $viewName"
- }
+ new SQLBuilder(logicalPlan).toSQL
+ }
// Validate the view SQL - make sure we can parse it and analyze it.
// If we cannot analyze the generated query, there is probably a bug in SQL generation.
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala b/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
index 1a9bb6a0b5..5ab0c1d4c4 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
@@ -258,25 +258,6 @@ object SQLConf {
.booleanConf
.createWithDefault(false)
- val NATIVE_VIEW = SQLConfigBuilder("spark.sql.nativeView")
- .internal()
- .doc("When true, CREATE VIEW will be handled by Spark SQL instead of Hive native commands. " +
- "Note that this function is experimental and should ony be used when you are using " +
- "non-hive-compatible tables written by Spark SQL. The SQL string used to create " +
- "view should be fully qualified, i.e. use `tbl1`.`col1` instead of `*` whenever " +
- "possible, or you may get wrong result.")
- .booleanConf
- .createWithDefault(true)
-
- val CANONICAL_NATIVE_VIEW = SQLConfigBuilder("spark.sql.nativeView.canonical")
- .internal()
- .doc("When this option and spark.sql.nativeView are both true, Spark SQL tries to handle " +
- "CREATE VIEW statement using SQL query string generated from view definition logical " +
- "plan. If the logical plan doesn't have a SQL representation, we fallback to the " +
- "original native view implementation.")
- .booleanConf
- .createWithDefault(true)
-
val COLUMN_NAME_OF_CORRUPT_RECORD = SQLConfigBuilder("spark.sql.columnNameOfCorruptRecord")
.doc("The name of internal column for storing raw/un-parsed JSON records that fail to parse.")
.stringConf
@@ -613,8 +594,6 @@ private[sql] class SQLConf extends Serializable with CatalystConf with Logging {
def metastorePartitionPruning: Boolean = getConf(HIVE_METASTORE_PARTITION_PRUNING)
- def nativeView: Boolean = getConf(NATIVE_VIEW)
-
def wholeStageEnabled: Boolean = getConf(WHOLESTAGE_CODEGEN_ENABLED)
def wholeStageMaxNumFields: Int = getConf(WHOLESTAGE_MAX_NUM_FIELDS)
@@ -625,8 +604,6 @@ private[sql] class SQLConf extends Serializable with CatalystConf with Logging {
def exchangeReuseEnabled: Boolean = getConf(EXCHANGE_REUSE_ENABLED)
- def canonicalView: Boolean = getConf(CANONICAL_NATIVE_VIEW)
-
def caseSensitiveAnalysis: Boolean = getConf(SQLConf.CASE_SENSITIVE)
def subexpressionEliminationEnabled: Boolean =
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/internal/SQLConfSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/internal/SQLConfSuite.scala
index 2cd3f475b6..4454cad7bc 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/internal/SQLConfSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/internal/SQLConfSuite.scala
@@ -126,17 +126,17 @@ class SQLConfSuite extends QueryTest with SharedSQLContext {
test("reset - internal conf") {
spark.sessionState.conf.clear()
- val original = spark.conf.get(SQLConf.NATIVE_VIEW)
+ val original = spark.conf.get(SQLConf.OPTIMIZER_MAX_ITERATIONS)
try {
- assert(spark.conf.get(SQLConf.NATIVE_VIEW) === true)
- sql(s"set ${SQLConf.NATIVE_VIEW.key}=false")
- assert(spark.conf.get(SQLConf.NATIVE_VIEW) === false)
- assert(sql(s"set").where(s"key = '${SQLConf.NATIVE_VIEW.key}'").count() == 1)
+ assert(spark.conf.get(SQLConf.OPTIMIZER_MAX_ITERATIONS) === 100)
+ sql(s"set ${SQLConf.OPTIMIZER_MAX_ITERATIONS.key}=10")
+ assert(spark.conf.get(SQLConf.OPTIMIZER_MAX_ITERATIONS) === 10)
+ assert(sql(s"set").where(s"key = '${SQLConf.OPTIMIZER_MAX_ITERATIONS.key}'").count() == 1)
sql(s"reset")
- assert(spark.conf.get(SQLConf.NATIVE_VIEW) === true)
- assert(sql(s"set").where(s"key = '${SQLConf.NATIVE_VIEW.key}'").count() == 0)
+ assert(spark.conf.get(SQLConf.OPTIMIZER_MAX_ITERATIONS) === 100)
+ assert(sql(s"set").where(s"key = '${SQLConf.OPTIMIZER_MAX_ITERATIONS.key}'").count() == 0)
} finally {
- sql(s"set ${SQLConf.NATIVE_VIEW}=$original")
+ sql(s"set ${SQLConf.OPTIMIZER_MAX_ITERATIONS}=$original")
}
}
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLViewSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLViewSuite.scala
index 39846f145c..82dc64a457 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLViewSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLViewSuite.scala
@@ -95,15 +95,13 @@ class SQLViewSuite extends QueryTest with SQLTestUtils with TestHiveSingleton {
}
test("correctly parse CREATE VIEW statement") {
- withSQLConf(SQLConf.NATIVE_VIEW.key -> "true") {
- sql(
- """CREATE VIEW IF NOT EXISTS
- |default.testView (c1 COMMENT 'blabla', c2 COMMENT 'blabla')
- |TBLPROPERTIES ('a' = 'b')
- |AS SELECT * FROM jt""".stripMargin)
- checkAnswer(sql("SELECT c1, c2 FROM testView ORDER BY c1"), (1 to 9).map(i => Row(i, i)))
- sql("DROP VIEW testView")
- }
+ sql(
+ """CREATE VIEW IF NOT EXISTS
+ |default.testView (c1 COMMENT 'blabla', c2 COMMENT 'blabla')
+ |TBLPROPERTIES ('a' = 'b')
+ |AS SELECT * FROM jt""".stripMargin)
+ checkAnswer(sql("SELECT c1, c2 FROM testView ORDER BY c1"), (1 to 9).map(i => Row(i, i)))
+ sql("DROP VIEW testView")
}
test("correctly parse CREATE TEMPORARY VIEW statement") {
@@ -145,18 +143,16 @@ class SQLViewSuite extends QueryTest with SQLTestUtils with TestHiveSingleton {
}
test("correctly handle CREATE VIEW IF NOT EXISTS") {
- withSQLConf(SQLConf.NATIVE_VIEW.key -> "true") {
- withTable("jt2") {
- withView("testView") {
- sql("CREATE VIEW testView AS SELECT id FROM jt")
+ withTable("jt2") {
+ withView("testView") {
+ sql("CREATE VIEW testView AS SELECT id FROM jt")
- val df = (1 until 10).map(i => i -> i).toDF("i", "j")
- df.write.format("json").saveAsTable("jt2")
- sql("CREATE VIEW IF NOT EXISTS testView AS SELECT * FROM jt2")
+ val df = (1 until 10).map(i => i -> i).toDF("i", "j")
+ df.write.format("json").saveAsTable("jt2")
+ sql("CREATE VIEW IF NOT EXISTS testView AS SELECT * FROM jt2")
- // make sure our view doesn't change.
- checkAnswer(sql("SELECT * FROM testView ORDER BY id"), (1 to 9).map(i => Row(i)))
- }
+ // make sure our view doesn't change.
+ checkAnswer(sql("SELECT * FROM testView ORDER BY id"), (1 to 9).map(i => Row(i)))
}
}
}
@@ -174,134 +170,108 @@ class SQLViewSuite extends QueryTest with SQLTestUtils with TestHiveSingleton {
}
}
- Seq(true, false).foreach { enabled =>
- val prefix = (if (enabled) "With" else "Without") + " canonical native view: "
- test(s"$prefix correctly handle CREATE OR REPLACE VIEW") {
- withSQLConf(
- SQLConf.NATIVE_VIEW.key -> "true", SQLConf.CANONICAL_NATIVE_VIEW.key -> enabled.toString) {
- withTable("jt2") {
- sql("CREATE OR REPLACE VIEW testView AS SELECT id FROM jt")
- checkAnswer(sql("SELECT * FROM testView ORDER BY id"), (1 to 9).map(i => Row(i)))
-
- val df = (1 until 10).map(i => i -> i).toDF("i", "j")
- df.write.format("json").saveAsTable("jt2")
- sql("CREATE OR REPLACE VIEW testView AS SELECT * FROM jt2")
- // make sure the view has been changed.
- checkAnswer(sql("SELECT * FROM testView ORDER BY i"), (1 to 9).map(i => Row(i, i)))
-
- sql("DROP VIEW testView")
-
- val e = intercept[AnalysisException] {
- sql("CREATE OR REPLACE VIEW IF NOT EXISTS testView AS SELECT id FROM jt")
- }
- assert(e.message.contains(
- "CREATE VIEW with both IF NOT EXISTS and REPLACE is not allowed"))
- }
+ test("correctly handle CREATE OR REPLACE VIEW") {
+ withTable("jt2") {
+ sql("CREATE OR REPLACE VIEW testView AS SELECT id FROM jt")
+ checkAnswer(sql("SELECT * FROM testView ORDER BY id"), (1 to 9).map(i => Row(i)))
+
+ val df = (1 until 10).map(i => i -> i).toDF("i", "j")
+ df.write.format("json").saveAsTable("jt2")
+ sql("CREATE OR REPLACE VIEW testView AS SELECT * FROM jt2")
+ // make sure the view has been changed.
+ checkAnswer(sql("SELECT * FROM testView ORDER BY i"), (1 to 9).map(i => Row(i, i)))
+
+ sql("DROP VIEW testView")
+
+ val e = intercept[AnalysisException] {
+ sql("CREATE OR REPLACE VIEW IF NOT EXISTS testView AS SELECT id FROM jt")
}
+ assert(e.message.contains(
+ "CREATE VIEW with both IF NOT EXISTS and REPLACE is not allowed"))
}
+ }
- test(s"$prefix correctly handle ALTER VIEW") {
- withSQLConf(
- SQLConf.NATIVE_VIEW.key -> "true", SQLConf.CANONICAL_NATIVE_VIEW.key -> enabled.toString) {
- withTable("jt2") {
- withView("testView") {
- sql("CREATE VIEW testView AS SELECT id FROM jt")
-
- val df = (1 until 10).map(i => i -> i).toDF("i", "j")
- df.write.format("json").saveAsTable("jt2")
- sql("ALTER VIEW testView AS SELECT * FROM jt2")
- // make sure the view has been changed.
- checkAnswer(sql("SELECT * FROM testView ORDER BY i"), (1 to 9).map(i => Row(i, i)))
- }
- }
+ test("correctly handle ALTER VIEW") {
+ withTable("jt2") {
+ withView("testView") {
+ sql("CREATE VIEW testView AS SELECT id FROM jt")
+
+ val df = (1 until 10).map(i => i -> i).toDF("i", "j")
+ df.write.format("json").saveAsTable("jt2")
+ sql("ALTER VIEW testView AS SELECT * FROM jt2")
+ // make sure the view has been changed.
+ checkAnswer(sql("SELECT * FROM testView ORDER BY i"), (1 to 9).map(i => Row(i, i)))
}
}
+ }
- test(s"$prefix create hive view for json table") {
- // json table is not hive-compatible, make sure the new flag fix it.
- withSQLConf(
- SQLConf.NATIVE_VIEW.key -> "true", SQLConf.CANONICAL_NATIVE_VIEW.key -> enabled.toString) {
- withView("testView") {
- sql("CREATE VIEW testView AS SELECT id FROM jt")
- checkAnswer(sql("SELECT * FROM testView ORDER BY id"), (1 to 9).map(i => Row(i)))
- }
- }
+ test("create hive view for json table") {
+ // json table is not hive-compatible, make sure the new flag fix it.
+ withView("testView") {
+ sql("CREATE VIEW testView AS SELECT id FROM jt")
+ checkAnswer(sql("SELECT * FROM testView ORDER BY id"), (1 to 9).map(i => Row(i)))
}
+ }
- test(s"$prefix create hive view for partitioned parquet table") {
- // partitioned parquet table is not hive-compatible, make sure the new flag fix it.
- withSQLConf(
- SQLConf.NATIVE_VIEW.key -> "true", SQLConf.CANONICAL_NATIVE_VIEW.key -> enabled.toString) {
- withTable("parTable") {
- withView("testView") {
- val df = Seq(1 -> "a").toDF("i", "j")
- df.write.format("parquet").partitionBy("i").saveAsTable("parTable")
- sql("CREATE VIEW testView AS SELECT i, j FROM parTable")
- checkAnswer(sql("SELECT * FROM testView"), Row(1, "a"))
- }
- }
+ test("create hive view for partitioned parquet table") {
+ // partitioned parquet table is not hive-compatible, make sure the new flag fix it.
+ withTable("parTable") {
+ withView("testView") {
+ val df = Seq(1 -> "a").toDF("i", "j")
+ df.write.format("parquet").partitionBy("i").saveAsTable("parTable")
+ sql("CREATE VIEW testView AS SELECT i, j FROM parTable")
+ checkAnswer(sql("SELECT * FROM testView"), Row(1, "a"))
}
}
}
test("CTE within view") {
- withSQLConf(
- SQLConf.NATIVE_VIEW.key -> "true", SQLConf.CANONICAL_NATIVE_VIEW.key -> "true") {
- withView("cte_view") {
- sql("CREATE VIEW cte_view AS WITH w AS (SELECT 1 AS n) SELECT n FROM w")
- checkAnswer(sql("SELECT * FROM cte_view"), Row(1))
- }
+ withView("cte_view") {
+ sql("CREATE VIEW cte_view AS WITH w AS (SELECT 1 AS n) SELECT n FROM w")
+ checkAnswer(sql("SELECT * FROM cte_view"), Row(1))
}
}
test("Using view after switching current database") {
- withSQLConf(
- SQLConf.NATIVE_VIEW.key -> "true", SQLConf.CANONICAL_NATIVE_VIEW.key -> "true") {
- withView("v") {
- sql("CREATE VIEW v AS SELECT * FROM src")
- withTempDatabase { db =>
- activateDatabase(db) {
- // Should look up table `src` in database `default`.
- checkAnswer(sql("SELECT * FROM default.v"), sql("SELECT * FROM default.src"))
-
- // The new `src` table shouldn't be scanned.
- sql("CREATE TABLE src(key INT, value STRING)")
- checkAnswer(sql("SELECT * FROM default.v"), sql("SELECT * FROM default.src"))
- }
+ withView("v") {
+ sql("CREATE VIEW v AS SELECT * FROM src")
+ withTempDatabase { db =>
+ activateDatabase(db) {
+ // Should look up table `src` in database `default`.
+ checkAnswer(sql("SELECT * FROM default.v"), sql("SELECT * FROM default.src"))
+
+ // The new `src` table shouldn't be scanned.
+ sql("CREATE TABLE src(key INT, value STRING)")
+ checkAnswer(sql("SELECT * FROM default.v"), sql("SELECT * FROM default.src"))
}
}
}
}
test("Using view after adding more columns") {
- withSQLConf(
- SQLConf.NATIVE_VIEW.key -> "true", SQLConf.CANONICAL_NATIVE_VIEW.key -> "true") {
- withTable("add_col") {
- spark.range(10).write.saveAsTable("add_col")
- withView("v") {
- sql("CREATE VIEW v AS SELECT * FROM add_col")
- spark.range(10).select('id, 'id as 'a).write.mode("overwrite").saveAsTable("add_col")
- checkAnswer(sql("SELECT * FROM v"), spark.range(10).toDF())
- }
+ withTable("add_col") {
+ spark.range(10).write.saveAsTable("add_col")
+ withView("v") {
+ sql("CREATE VIEW v AS SELECT * FROM add_col")
+ spark.range(10).select('id, 'id as 'a).write.mode("overwrite").saveAsTable("add_col")
+ checkAnswer(sql("SELECT * FROM v"), spark.range(10).toDF())
}
}
}
test("create hive view for joined tables") {
// make sure the new flag can handle some complex cases like join and schema change.
- withSQLConf(SQLConf.NATIVE_VIEW.key -> "true") {
- withTable("jt1", "jt2") {
- spark.range(1, 10).toDF("id1").write.format("json").saveAsTable("jt1")
- spark.range(1, 10).toDF("id2").write.format("json").saveAsTable("jt2")
- sql("CREATE VIEW testView AS SELECT * FROM jt1 JOIN jt2 ON id1 == id2")
- checkAnswer(sql("SELECT * FROM testView ORDER BY id1"), (1 to 9).map(i => Row(i, i)))
-
- val df = (1 until 10).map(i => i -> i).toDF("id1", "newCol")
- df.write.format("json").mode(SaveMode.Overwrite).saveAsTable("jt1")
- checkAnswer(sql("SELECT * FROM testView ORDER BY id1"), (1 to 9).map(i => Row(i, i)))
-
- sql("DROP VIEW testView")
- }
+ withTable("jt1", "jt2") {
+ spark.range(1, 10).toDF("id1").write.format("json").saveAsTable("jt1")
+ spark.range(1, 10).toDF("id2").write.format("json").saveAsTable("jt2")
+ sql("CREATE VIEW testView AS SELECT * FROM jt1 JOIN jt2 ON id1 == id2")
+ checkAnswer(sql("SELECT * FROM testView ORDER BY id1"), (1 to 9).map(i => Row(i, i)))
+
+ val df = (1 until 10).map(i => i -> i).toDF("id1", "newCol")
+ df.write.format("json").mode(SaveMode.Overwrite).saveAsTable("jt1")
+ checkAnswer(sql("SELECT * FROM testView ORDER BY id1"), (1 to 9).map(i => Row(i, i)))
+
+ sql("DROP VIEW testView")
}
}