aboutsummaryrefslogtreecommitdiff
path: root/sql
diff options
context:
space:
mode:
authorWenchen Fan <wenchen@databricks.com>2016-10-10 15:48:57 +0800
committerWenchen Fan <wenchen@databricks.com>2016-10-10 15:48:57 +0800
commit23ddff4b2b2744c3dc84d928e144c541ad5df376 (patch)
treef61b64ea46adbd1eb424a0bbb8e8e383d1ee4e3b /sql
parent16590030c15b32e83b584283697b6f783cffe043 (diff)
downloadspark-23ddff4b2b2744c3dc84d928e144c541ad5df376.tar.gz
spark-23ddff4b2b2744c3dc84d928e144c541ad5df376.tar.bz2
spark-23ddff4b2b2744c3dc84d928e144c541ad5df376.zip
[SPARK-17338][SQL] add global temp view
## What changes were proposed in this pull request? Global temporary view is a cross-session temporary view, which means it's shared among all sessions. Its lifetime is the lifetime of the Spark application, i.e. it will be automatically dropped when the application terminates. It's tied to a system preserved database `global_temp`(configurable via SparkConf), and we must use the qualified name to refer a global temp view, e.g. SELECT * FROM global_temp.view1. changes for `SessionCatalog`: 1. add a new field `gloabalTempViews: GlobalTempViewManager`, to access the shared global temp views, and the global temp db name. 2. `createDatabase` will fail if users wanna create `global_temp`, which is system preserved. 3. `setCurrentDatabase` will fail if users wanna set `global_temp`, which is system preserved. 4. add `createGlobalTempView`, which is used in `CreateViewCommand` to create global temp views. 5. add `dropGlobalTempView`, which is used in `CatalogImpl` to drop global temp view. 6. add `alterTempViewDefinition`, which is used in `AlterViewAsCommand` to update the view definition for local/global temp views. 7. `renameTable`/`dropTable`/`isTemporaryTable`/`lookupRelation`/`getTempViewOrPermanentTableMetadata`/`refreshTable` will handle global temp views. changes for SQL commands: 1. `CreateViewCommand`/`AlterViewAsCommand` is updated to support global temp views 2. `ShowTablesCommand` outputs a new column `database`, which is used to distinguish global and local temp views. 3. other commands can also handle global temp views if they call `SessionCatalog` APIs which accepts global temp views, e.g. `DropTableCommand`, `AlterTableRenameCommand`, `ShowColumnsCommand`, etc. changes for other public API 1. add a new method `dropGlobalTempView` in `Catalog` 2. `Catalog.findTable` can find global temp view 3. add a new method `createGlobalTempView` in `Dataset` ## How was this patch tested? new tests in `SQLViewSuite` Author: Wenchen Fan <wenchen@databricks.com> Closes #14897 from cloud-fan/global-temp-view.
Diffstat (limited to 'sql')
-rw-r--r--sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBase.g48
-rw-r--r--sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala10
-rw-r--r--sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/GlobalTempViewManager.scala121
-rw-r--r--sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala189
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala48
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/catalog/Catalog.scala20
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/QueryExecution.scala8
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala19
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala25
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala11
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/command/views.scala150
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/ddl.scala20
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/internal/CatalogImpl.scala26
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/internal/SessionState.scala1
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/internal/SharedState.scala75
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/SQLContextSuite.scala11
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/execution/GlobalTempViewSuite.scala168
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala10
-rw-r--r--sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionCatalog.scala4
-rw-r--r--sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionState.scala1
-rw-r--r--sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveContextCompatibilitySuite.scala4
-rw-r--r--sql/hive/src/test/scala/org/apache/spark/sql/hive/ListTablesSuite.scala8
-rw-r--r--sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala2
-rw-r--r--sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveCommandSuite.scala10
-rw-r--r--sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLViewSuite.scala6
25 files changed, 737 insertions, 218 deletions
diff --git a/sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBase.g4 b/sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBase.g4
index a3bbaceca3..b599a88495 100644
--- a/sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBase.g4
+++ b/sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBase.g4
@@ -111,11 +111,12 @@ statement
| ALTER TABLE tableIdentifier RECOVER PARTITIONS #recoverPartitions
| DROP TABLE (IF EXISTS)? tableIdentifier PURGE? #dropTable
| DROP VIEW (IF EXISTS)? tableIdentifier #dropTable
- | CREATE (OR REPLACE)? TEMPORARY? VIEW (IF NOT EXISTS)? tableIdentifier
+ | CREATE (OR REPLACE)? (GLOBAL? TEMPORARY)?
+ VIEW (IF NOT EXISTS)? tableIdentifier
identifierCommentList? (COMMENT STRING)?
(PARTITIONED ON identifierList)?
(TBLPROPERTIES tablePropertyList)? AS query #createView
- | CREATE (OR REPLACE)? TEMPORARY VIEW
+ | CREATE (OR REPLACE)? GLOBAL? TEMPORARY VIEW
tableIdentifier ('(' colTypeList ')')? tableProvider
(OPTIONS tablePropertyList)? #createTempViewUsing
| ALTER VIEW tableIdentifier AS? query #alterViewQuery
@@ -676,7 +677,7 @@ nonReserved
| MAP | ARRAY | STRUCT
| LATERAL | WINDOW | REDUCE | TRANSFORM | USING | SERDE | SERDEPROPERTIES | RECORDREADER
| DELIMITED | FIELDS | TERMINATED | COLLECTION | ITEMS | KEYS | ESCAPED | LINES | SEPARATED
- | EXTENDED | REFRESH | CLEAR | CACHE | UNCACHE | LAZY | TEMPORARY | OPTIONS
+ | EXTENDED | REFRESH | CLEAR | CACHE | UNCACHE | LAZY | GLOBAL | TEMPORARY | OPTIONS
| GROUPING | CUBE | ROLLUP
| EXPLAIN | FORMAT | LOGICAL | FORMATTED | CODEGEN
| TABLESAMPLE | USE | TO | BUCKET | PERCENTLIT | OUT | OF
@@ -864,6 +865,7 @@ CACHE: 'CACHE';
UNCACHE: 'UNCACHE';
LAZY: 'LAZY';
FORMATTED: 'FORMATTED';
+GLOBAL: 'GLOBAL';
TEMPORARY: 'TEMPORARY' | 'TEMP';
OPTIONS: 'OPTIONS';
UNSET: 'UNSET';
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
index ae8869ff25..536d38777f 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
@@ -458,12 +458,12 @@ class Analyzer(
i.copy(table = EliminateSubqueryAliases(lookupTableFromCatalog(u)))
case u: UnresolvedRelation =>
val table = u.tableIdentifier
- if (table.database.isDefined && conf.runSQLonFile &&
+ if (table.database.isDefined && conf.runSQLonFile && !catalog.isTemporaryTable(table) &&
(!catalog.databaseExists(table.database.get) || !catalog.tableExists(table))) {
- // If the table does not exist, and the database part is specified, and we support
- // running SQL directly on files, then let's just return the original UnresolvedRelation.
- // It is possible we are matching a query like "select * from parquet.`/path/to/query`".
- // The plan will get resolved later.
+ // If the database part is specified, and we support running SQL directly on files, and
+ // it's not a temporary view, and the table does not exist, then let's just return the
+ // original UnresolvedRelation. It is possible we are matching a query like "select *
+ // from parquet.`/path/to/query`". The plan will get resolved later.
// Note that we are testing (!db_exists || !table_exists) because the catalog throws
// an exception from tableExists if the database does not exist.
u
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/GlobalTempViewManager.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/GlobalTempViewManager.scala
new file mode 100644
index 0000000000..6095ac0bc9
--- /dev/null
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/GlobalTempViewManager.scala
@@ -0,0 +1,121 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst.catalog
+
+import javax.annotation.concurrent.GuardedBy
+
+import scala.collection.mutable
+
+import org.apache.spark.sql.AnalysisException
+import org.apache.spark.sql.catalyst.analysis.TempTableAlreadyExistsException
+import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
+import org.apache.spark.sql.catalyst.util.StringUtils
+
+
+/**
+ * A thread-safe manager for global temporary views, providing atomic operations to manage them,
+ * e.g. create, update, remove, etc.
+ *
+ * Note that, the view name is always case-sensitive here, callers are responsible to format the
+ * view name w.r.t. case-sensitive config.
+ *
+ * @param database The system preserved virtual database that keeps all the global temporary views.
+ */
+class GlobalTempViewManager(val database: String) {
+
+ /** List of view definitions, mapping from view name to logical plan. */
+ @GuardedBy("this")
+ private val viewDefinitions = new mutable.HashMap[String, LogicalPlan]
+
+ /**
+ * Returns the global view definition which matches the given name, or None if not found.
+ */
+ def get(name: String): Option[LogicalPlan] = synchronized {
+ viewDefinitions.get(name)
+ }
+
+ /**
+ * Creates a global temp view, or issue an exception if the view already exists and
+ * `overrideIfExists` is false.
+ */
+ def create(
+ name: String,
+ viewDefinition: LogicalPlan,
+ overrideIfExists: Boolean): Unit = synchronized {
+ if (!overrideIfExists && viewDefinitions.contains(name)) {
+ throw new TempTableAlreadyExistsException(name)
+ }
+ viewDefinitions.put(name, viewDefinition)
+ }
+
+ /**
+ * Updates the global temp view if it exists, returns true if updated, false otherwise.
+ */
+ def update(
+ name: String,
+ viewDefinition: LogicalPlan): Boolean = synchronized {
+ if (viewDefinitions.contains(name)) {
+ viewDefinitions.put(name, viewDefinition)
+ true
+ } else {
+ false
+ }
+ }
+
+ /**
+ * Removes the global temp view if it exists, returns true if removed, false otherwise.
+ */
+ def remove(name: String): Boolean = synchronized {
+ viewDefinitions.remove(name).isDefined
+ }
+
+ /**
+ * Renames the global temp view if the source view exists and the destination view not exists, or
+ * issue an exception if the source view exists but the destination view already exists. Returns
+ * true if renamed, false otherwise.
+ */
+ def rename(oldName: String, newName: String): Boolean = synchronized {
+ if (viewDefinitions.contains(oldName)) {
+ if (viewDefinitions.contains(newName)) {
+ throw new AnalysisException(
+ s"rename temporary view from '$oldName' to '$newName': destination view already exists")
+ }
+
+ val viewDefinition = viewDefinitions(oldName)
+ viewDefinitions.remove(oldName)
+ viewDefinitions.put(newName, viewDefinition)
+ true
+ } else {
+ false
+ }
+ }
+
+ /**
+ * Lists the names of all global temporary views.
+ */
+ def listViewNames(pattern: String): Seq[String] = synchronized {
+ StringUtils.filterPattern(viewDefinitions.keys.toSeq, pattern)
+ }
+
+ /**
+ * Clears all the global temporary views.
+ */
+ def clear(): Unit = synchronized {
+ viewDefinitions.clear()
+ }
+}
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala
index 8c01c7a3f2..e44e30ec64 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala
@@ -25,6 +25,7 @@ import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.Path
import org.apache.spark.internal.Logging
+import org.apache.spark.internal.config.GLOBAL_TEMP_DATABASE
import org.apache.spark.sql.AnalysisException
import org.apache.spark.sql.catalyst.{CatalystConf, SimpleCatalystConf}
import org.apache.spark.sql.catalyst.{FunctionIdentifier, TableIdentifier}
@@ -47,6 +48,7 @@ object SessionCatalog {
*/
class SessionCatalog(
externalCatalog: ExternalCatalog,
+ globalTempViewManager: GlobalTempViewManager,
functionResourceLoader: FunctionResourceLoader,
functionRegistry: FunctionRegistry,
conf: CatalystConf,
@@ -61,6 +63,7 @@ class SessionCatalog(
conf: CatalystConf) {
this(
externalCatalog,
+ new GlobalTempViewManager(GLOBAL_TEMP_DATABASE.defaultValueString),
DummyFunctionResourceLoader,
functionRegistry,
conf,
@@ -142,8 +145,13 @@ class SessionCatalog(
// ----------------------------------------------------------------------------
def createDatabase(dbDefinition: CatalogDatabase, ignoreIfExists: Boolean): Unit = {
- val qualifiedPath = makeQualifiedPath(dbDefinition.locationUri).toString
val dbName = formatDatabaseName(dbDefinition.name)
+ if (dbName == globalTempViewManager.database) {
+ throw new AnalysisException(
+ s"${globalTempViewManager.database} is a system preserved database, " +
+ "you cannot create a database with this name.")
+ }
+ val qualifiedPath = makeQualifiedPath(dbDefinition.locationUri).toString
externalCatalog.createDatabase(
dbDefinition.copy(name = dbName, locationUri = qualifiedPath),
ignoreIfExists)
@@ -154,7 +162,7 @@ class SessionCatalog(
if (dbName == DEFAULT_DATABASE) {
throw new AnalysisException(s"Can not drop default database")
} else if (dbName == getCurrentDatabase) {
- throw new AnalysisException(s"Can not drop current database `${dbName}`")
+ throw new AnalysisException(s"Can not drop current database `$dbName`")
}
externalCatalog.dropDatabase(dbName, ignoreIfNotExists, cascade)
}
@@ -188,6 +196,13 @@ class SessionCatalog(
def setCurrentDatabase(db: String): Unit = {
val dbName = formatDatabaseName(db)
+ if (dbName == globalTempViewManager.database) {
+ throw new AnalysisException(
+ s"${globalTempViewManager.database} is a system preserved database, " +
+ "you cannot use it as current database. To access global temporary views, you should " +
+ "use qualified name with the GLOBAL_TEMP_DATABASE, e.g. SELECT * FROM " +
+ s"${globalTempViewManager.database}.viewName.")
+ }
requireDbExists(dbName)
synchronized { currentDb = dbName }
}
@@ -329,7 +344,7 @@ class SessionCatalog(
// ----------------------------------------------
/**
- * Create a temporary table.
+ * Create a local temporary view.
*/
def createTempView(
name: String,
@@ -343,19 +358,65 @@ class SessionCatalog(
}
/**
- * Return a temporary view exactly as it was stored.
+ * Create a global temporary view.
+ */
+ def createGlobalTempView(
+ name: String,
+ viewDefinition: LogicalPlan,
+ overrideIfExists: Boolean): Unit = {
+ globalTempViewManager.create(formatTableName(name), viewDefinition, overrideIfExists)
+ }
+
+ /**
+ * Alter the definition of a local/global temp view matching the given name, returns true if a
+ * temp view is matched and altered, false otherwise.
+ */
+ def alterTempViewDefinition(
+ name: TableIdentifier,
+ viewDefinition: LogicalPlan): Boolean = synchronized {
+ val viewName = formatTableName(name.table)
+ if (name.database.isEmpty) {
+ if (tempTables.contains(viewName)) {
+ createTempView(viewName, viewDefinition, overrideIfExists = true)
+ true
+ } else {
+ false
+ }
+ } else if (formatDatabaseName(name.database.get) == globalTempViewManager.database) {
+ globalTempViewManager.update(viewName, viewDefinition)
+ } else {
+ false
+ }
+ }
+
+ /**
+ * Return a local temporary view exactly as it was stored.
*/
def getTempView(name: String): Option[LogicalPlan] = synchronized {
tempTables.get(formatTableName(name))
}
/**
- * Drop a temporary view.
+ * Return a global temporary view exactly as it was stored.
+ */
+ def getGlobalTempView(name: String): Option[LogicalPlan] = {
+ globalTempViewManager.get(formatTableName(name))
+ }
+
+ /**
+ * Drop a local temporary view.
*/
def dropTempView(name: String): Unit = synchronized {
tempTables.remove(formatTableName(name))
}
+ /**
+ * Drop a global temporary view.
+ */
+ def dropGlobalTempView(name: String): Boolean = {
+ globalTempViewManager.remove(formatTableName(name))
+ }
+
// -------------------------------------------------------------
// | Methods that interact with temporary and metastore tables |
// -------------------------------------------------------------
@@ -371,9 +432,7 @@ class SessionCatalog(
*/
def getTempViewOrPermanentTableMetadata(name: TableIdentifier): CatalogTable = synchronized {
val table = formatTableName(name.table)
- if (name.database.isDefined) {
- getTableMetadata(name)
- } else {
+ if (name.database.isEmpty) {
getTempView(table).map { plan =>
CatalogTable(
identifier = TableIdentifier(table),
@@ -381,6 +440,16 @@ class SessionCatalog(
storage = CatalogStorageFormat.empty,
schema = plan.output.toStructType)
}.getOrElse(getTableMetadata(name))
+ } else if (formatDatabaseName(name.database.get) == globalTempViewManager.database) {
+ globalTempViewManager.get(table).map { plan =>
+ CatalogTable(
+ identifier = TableIdentifier(table, Some(globalTempViewManager.database)),
+ tableType = CatalogTableType.VIEW,
+ storage = CatalogStorageFormat.empty,
+ schema = plan.output.toStructType)
+ }.getOrElse(throw new NoSuchTableException(globalTempViewManager.database, table))
+ } else {
+ getTableMetadata(name)
}
}
@@ -393,21 +462,25 @@ class SessionCatalog(
*/
def renameTable(oldName: TableIdentifier, newName: String): Unit = synchronized {
val db = formatDatabaseName(oldName.database.getOrElse(currentDb))
- requireDbExists(db)
val oldTableName = formatTableName(oldName.table)
val newTableName = formatTableName(newName)
- if (oldName.database.isDefined || !tempTables.contains(oldTableName)) {
- requireTableExists(TableIdentifier(oldTableName, Some(db)))
- requireTableNotExists(TableIdentifier(newTableName, Some(db)))
- externalCatalog.renameTable(db, oldTableName, newTableName)
+ if (db == globalTempViewManager.database) {
+ globalTempViewManager.rename(oldTableName, newTableName)
} else {
- if (tempTables.contains(newTableName)) {
- throw new AnalysisException(
- s"RENAME TEMPORARY TABLE from '$oldName' to '$newName': destination table already exists")
+ requireDbExists(db)
+ if (oldName.database.isDefined || !tempTables.contains(oldTableName)) {
+ requireTableExists(TableIdentifier(oldTableName, Some(db)))
+ requireTableNotExists(TableIdentifier(newTableName, Some(db)))
+ externalCatalog.renameTable(db, oldTableName, newTableName)
+ } else {
+ if (tempTables.contains(newTableName)) {
+ throw new AnalysisException(s"RENAME TEMPORARY TABLE from '$oldName' to '$newName': " +
+ "destination table already exists")
+ }
+ val table = tempTables(oldTableName)
+ tempTables.remove(oldTableName)
+ tempTables.put(newTableName, table)
}
- val table = tempTables(oldTableName)
- tempTables.remove(oldTableName)
- tempTables.put(newTableName, table)
}
}
@@ -424,17 +497,24 @@ class SessionCatalog(
purge: Boolean): Unit = synchronized {
val db = formatDatabaseName(name.database.getOrElse(currentDb))
val table = formatTableName(name.table)
- if (name.database.isDefined || !tempTables.contains(table)) {
- requireDbExists(db)
- // When ignoreIfNotExists is false, no exception is issued when the table does not exist.
- // Instead, log it as an error message.
- if (tableExists(TableIdentifier(table, Option(db)))) {
- externalCatalog.dropTable(db, table, ignoreIfNotExists = true, purge = purge)
- } else if (!ignoreIfNotExists) {
- throw new NoSuchTableException(db = db, table = table)
+ if (db == globalTempViewManager.database) {
+ val viewExists = globalTempViewManager.remove(table)
+ if (!viewExists && !ignoreIfNotExists) {
+ throw new NoSuchTableException(globalTempViewManager.database, table)
}
} else {
- tempTables.remove(table)
+ if (name.database.isDefined || !tempTables.contains(table)) {
+ requireDbExists(db)
+ // When ignoreIfNotExists is false, no exception is issued when the table does not exist.
+ // Instead, log it as an error message.
+ if (tableExists(TableIdentifier(table, Option(db)))) {
+ externalCatalog.dropTable(db, table, ignoreIfNotExists = true, purge = purge)
+ } else if (!ignoreIfNotExists) {
+ throw new NoSuchTableException(db = db, table = table)
+ }
+ } else {
+ tempTables.remove(table)
+ }
}
}
@@ -445,6 +525,9 @@ class SessionCatalog(
* If no database is specified, this will first attempt to return a temporary table/view with
* the same name, then, if that does not exist, return the table/view from the current database.
*
+ * Note that, the global temp view database is also valid here, this will return the global temp
+ * view matching the given name.
+ *
* If the relation is a view, the relation will be wrapped in a [[SubqueryAlias]] which will
* track the name of the view.
*/
@@ -453,7 +536,11 @@ class SessionCatalog(
val db = formatDatabaseName(name.database.getOrElse(currentDb))
val table = formatTableName(name.table)
val relationAlias = alias.getOrElse(table)
- if (name.database.isDefined || !tempTables.contains(table)) {
+ if (db == globalTempViewManager.database) {
+ globalTempViewManager.get(table).map { viewDef =>
+ SubqueryAlias(relationAlias, viewDef, Some(name))
+ }.getOrElse(throw new NoSuchTableException(db, table))
+ } else if (name.database.isDefined || !tempTables.contains(table)) {
val metadata = externalCatalog.getTable(db, table)
val view = Option(metadata.tableType).collect {
case CatalogTableType.VIEW => name
@@ -472,27 +559,48 @@ class SessionCatalog(
* explicitly specified.
*/
def isTemporaryTable(name: TableIdentifier): Boolean = synchronized {
- name.database.isEmpty && tempTables.contains(formatTableName(name.table))
+ val table = formatTableName(name.table)
+ if (name.database.isEmpty) {
+ tempTables.contains(table)
+ } else if (formatDatabaseName(name.database.get) == globalTempViewManager.database) {
+ globalTempViewManager.get(table).isDefined
+ } else {
+ false
+ }
}
/**
- * List all tables in the specified database, including temporary tables.
+ * List all tables in the specified database, including local temporary tables.
+ *
+ * Note that, if the specified database is global temporary view database, we will list global
+ * temporary views.
*/
def listTables(db: String): Seq[TableIdentifier] = listTables(db, "*")
/**
- * List all matching tables in the specified database, including temporary tables.
+ * List all matching tables in the specified database, including local temporary tables.
+ *
+ * Note that, if the specified database is global temporary view database, we will list global
+ * temporary views.
*/
def listTables(db: String, pattern: String): Seq[TableIdentifier] = {
val dbName = formatDatabaseName(db)
- requireDbExists(dbName)
- val dbTables =
- externalCatalog.listTables(dbName, pattern).map { t => TableIdentifier(t, Some(dbName)) }
- synchronized {
- val _tempTables = StringUtils.filterPattern(tempTables.keys.toSeq, pattern)
- .map { t => TableIdentifier(t) }
- dbTables ++ _tempTables
+ val dbTables = if (dbName == globalTempViewManager.database) {
+ globalTempViewManager.listViewNames(pattern).map { name =>
+ TableIdentifier(name, Some(globalTempViewManager.database))
+ }
+ } else {
+ requireDbExists(dbName)
+ externalCatalog.listTables(dbName, pattern).map { name =>
+ TableIdentifier(name, Some(dbName))
+ }
+ }
+ val localTempViews = synchronized {
+ StringUtils.filterPattern(tempTables.keys.toSeq, pattern).map { name =>
+ TableIdentifier(name)
+ }
}
+ dbTables ++ localTempViews
}
/**
@@ -504,6 +612,8 @@ class SessionCatalog(
// If the database is not defined, there is a good chance this is a temp table.
if (name.database.isEmpty) {
tempTables.get(formatTableName(name.table)).foreach(_.refresh())
+ } else if (formatDatabaseName(name.database.get) == globalTempViewManager.database) {
+ globalTempViewManager.get(formatTableName(name.table)).foreach(_.refresh())
}
}
@@ -919,6 +1029,7 @@ class SessionCatalog(
}
}
tempTables.clear()
+ globalTempViewManager.clear()
functionRegistry.clear()
// restore built-in functions
FunctionRegistry.builtin.listFunction().foreach { f =>
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala b/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala
index 9cfbdffd02..4b52508740 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala
@@ -42,7 +42,7 @@ import org.apache.spark.sql.catalyst.plans._
import org.apache.spark.sql.catalyst.plans.logical._
import org.apache.spark.sql.catalyst.util.usePrettyExpression
import org.apache.spark.sql.execution.{FileRelation, LogicalRDD, QueryExecution, SQLExecution}
-import org.apache.spark.sql.execution.command.{CreateViewCommand, ExplainCommand}
+import org.apache.spark.sql.execution.command.{CreateViewCommand, ExplainCommand, GlobalTempView, LocalTempView}
import org.apache.spark.sql.execution.datasources.LogicalRelation
import org.apache.spark.sql.execution.datasources.json.JacksonGenerator
import org.apache.spark.sql.execution.python.EvaluatePython
@@ -2433,9 +2433,13 @@ class Dataset[T] private[sql](
}
/**
- * Creates a temporary view using the given name. The lifetime of this
+ * Creates a local temporary view using the given name. The lifetime of this
* temporary view is tied to the [[SparkSession]] that was used to create this Dataset.
*
+ * Local temporary view is session-scoped. Its lifetime is the lifetime of the session that
+ * created it, i.e. it will be automatically dropped when the session terminates. It's not
+ * tied to any databases, i.e. we can't use `db1.view1` to reference a local temporary view.
+ *
* @throws AnalysisException if the view name already exists
*
* @group basic
@@ -2443,21 +2447,51 @@ class Dataset[T] private[sql](
*/
@throws[AnalysisException]
def createTempView(viewName: String): Unit = withPlan {
- createViewCommand(viewName, replace = false)
+ createTempViewCommand(viewName, replace = false, global = false)
}
+
+
/**
- * Creates a temporary view using the given name. The lifetime of this
+ * Creates a local temporary view using the given name. The lifetime of this
* temporary view is tied to the [[SparkSession]] that was used to create this Dataset.
*
* @group basic
* @since 2.0.0
*/
def createOrReplaceTempView(viewName: String): Unit = withPlan {
- createViewCommand(viewName, replace = true)
+ createTempViewCommand(viewName, replace = true, global = false)
}
- private def createViewCommand(viewName: String, replace: Boolean): CreateViewCommand = {
+ /**
+ * Creates a global temporary view using the given name. The lifetime of this
+ * temporary view is tied to this Spark application.
+ *
+ * Global temporary view is cross-session. Its lifetime is the lifetime of the Spark application,
+ * i.e. it will be automatically dropped when the application terminates. It's tied to a system
+ * preserved database `_global_temp`, and we must use the qualified name to refer a global temp
+ * view, e.g. `SELECT * FROM _global_temp.view1`.
+ *
+ * @throws TempTableAlreadyExistsException if the view name already exists
+ *
+ * @group basic
+ * @since 2.1.0
+ */
+ @throws[AnalysisException]
+ def createGlobalTempView(viewName: String): Unit = withPlan {
+ createTempViewCommand(viewName, replace = false, global = true)
+ }
+
+ private def createTempViewCommand(
+ viewName: String,
+ replace: Boolean,
+ global: Boolean): CreateViewCommand = {
+ val viewType = if (global) {
+ GlobalTempView
+ } else {
+ LocalTempView
+ }
+
CreateViewCommand(
name = sparkSession.sessionState.sqlParser.parseTableIdentifier(viewName),
userSpecifiedColumns = Nil,
@@ -2467,7 +2501,7 @@ class Dataset[T] private[sql](
child = logicalPlan,
allowExisting = false,
replace = replace,
- isTemporary = true)
+ viewType = viewType)
}
/**
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/catalog/Catalog.scala b/sql/core/src/main/scala/org/apache/spark/sql/catalog/Catalog.scala
index 7f2762c7da..717fb29190 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/catalog/Catalog.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/catalog/Catalog.scala
@@ -262,15 +262,33 @@ abstract class Catalog {
options: Map[String, String]): DataFrame
/**
- * Drops the temporary view with the given view name in the catalog.
+ * Drops the local temporary view with the given view name in the catalog.
* If the view has been cached before, then it will also be uncached.
*
+ * Local temporary view is session-scoped. Its lifetime is the lifetime of the session that
+ * created it, i.e. it will be automatically dropped when the session terminates. It's not
+ * tied to any databases, i.e. we can't use `db1.view1` to reference a local temporary view.
+ *
* @param viewName the name of the view to be dropped.
* @since 2.0.0
*/
def dropTempView(viewName: String): Unit
/**
+ * Drops the global temporary view with the given view name in the catalog.
+ * If the view has been cached before, then it will also be uncached.
+ *
+ * Global temporary view is cross-session. Its lifetime is the lifetime of the Spark application,
+ * i.e. it will be automatically dropped when the application terminates. It's tied to a system
+ * preserved database `_global_temp`, and we must use the qualified name to refer a global temp
+ * view, e.g. `SELECT * FROM _global_temp.view1`.
+ *
+ * @param viewName the name of the view to be dropped.
+ * @since 2.1.0
+ */
+ def dropGlobalTempView(viewName: String): Boolean
+
+ /**
* Returns true if the table is currently cached in-memory.
*
* @since 2.0.0
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/QueryExecution.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/QueryExecution.scala
index 383b3a233f..cb45a6d78b 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/QueryExecution.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/QueryExecution.scala
@@ -21,15 +21,14 @@ import java.nio.charset.StandardCharsets
import java.sql.Timestamp
import org.apache.spark.rdd.RDD
-import org.apache.spark.sql.{AnalysisException, Row, SparkSession, SQLContext}
+import org.apache.spark.sql.{AnalysisException, Row, SparkSession}
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.analysis.UnsupportedOperationChecker
import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, ReturnAnswer}
import org.apache.spark.sql.catalyst.rules.Rule
import org.apache.spark.sql.catalyst.util.DateTimeUtils
-import org.apache.spark.sql.execution.command.{DescribeTableCommand, ExecutedCommandExec}
+import org.apache.spark.sql.execution.command.{DescribeTableCommand, ExecutedCommandExec, ShowTablesCommand}
import org.apache.spark.sql.execution.exchange.{EnsureRequirements, ReuseExchange}
-import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.types.{BinaryType, DateType, DecimalType, TimestampType, _}
import org.apache.spark.util.Utils
@@ -125,6 +124,9 @@ class QueryExecution(val sparkSession: SparkSession, val logical: LogicalPlan) {
.mkString("\t")
}
}
+ // SHOW TABLES in Hive only output table names, while ours outputs database, table name, isTemp.
+ case command: ExecutedCommandExec if command.cmd.isInstanceOf[ShowTablesCommand] =>
+ command.executeCollect().map(_.getString(1))
case command: ExecutedCommandExec =>
command.executeCollect().map(_.getString(0))
case other =>
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala
index 5f87b71210..be2eddbb0e 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala
@@ -29,9 +29,9 @@ import org.apache.spark.sql.catalyst.parser._
import org.apache.spark.sql.catalyst.parser.SqlBaseParser._
import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, OneRowRelation, ScriptInputOutputSchema}
import org.apache.spark.sql.execution.command._
-import org.apache.spark.sql.execution.datasources.{CreateTable, CreateTempViewUsing, _}
+import org.apache.spark.sql.execution.datasources.{CreateTable, _}
import org.apache.spark.sql.internal.{HiveSerDe, SQLConf, VariableSubstitution}
-import org.apache.spark.sql.types.{DataType, StructType}
+import org.apache.spark.sql.types.StructType
/**
* Concrete parser for Spark SQL statements.
@@ -385,7 +385,7 @@ class SparkSqlAstBuilder(conf: SQLConf) extends AstBuilder {
logWarning(s"CREATE TEMPORARY TABLE ... USING ... is deprecated, please use " +
"CREATE TEMPORARY VIEW ... USING ... instead")
- CreateTempViewUsing(table, schema, replace = true, provider, options)
+ CreateTempViewUsing(table, schema, replace = true, global = false, provider, options)
} else {
CreateTable(tableDesc, mode, None)
}
@@ -401,6 +401,7 @@ class SparkSqlAstBuilder(conf: SQLConf) extends AstBuilder {
tableIdent = visitTableIdentifier(ctx.tableIdentifier()),
userSpecifiedSchema = Option(ctx.colTypeList()).map(createSchema),
replace = ctx.REPLACE != null,
+ global = ctx.GLOBAL != null,
provider = ctx.tableProvider.qualifiedName.getText,
options = Option(ctx.tablePropertyList).map(visitPropertyKeyValues).getOrElse(Map.empty))
}
@@ -1269,7 +1270,7 @@ class SparkSqlAstBuilder(conf: SQLConf) extends AstBuilder {
*
* For example:
* {{{
- * CREATE [OR REPLACE] [TEMPORARY] VIEW [IF NOT EXISTS] [db_name.]view_name
+ * CREATE [OR REPLACE] [[GLOBAL] TEMPORARY] VIEW [IF NOT EXISTS] [db_name.]view_name
* [(column_name [COMMENT column_comment], ...) ]
* [COMMENT view_comment]
* [TBLPROPERTIES (property_name = property_value, ...)]
@@ -1286,6 +1287,14 @@ class SparkSqlAstBuilder(conf: SQLConf) extends AstBuilder {
}
}
+ val viewType = if (ctx.TEMPORARY == null) {
+ PersistedView
+ } else if (ctx.GLOBAL != null) {
+ GlobalTempView
+ } else {
+ LocalTempView
+ }
+
CreateViewCommand(
name = visitTableIdentifier(ctx.tableIdentifier),
userSpecifiedColumns = userSpecifiedColumns,
@@ -1295,7 +1304,7 @@ class SparkSqlAstBuilder(conf: SQLConf) extends AstBuilder {
child = plan(ctx.query),
allowExisting = ctx.EXISTS != null,
replace = ctx.REPLACE != null,
- isTemporary = ctx.TEMPORARY != null)
+ viewType = viewType)
}
}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala
index 01ac89868d..45fa293e58 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala
@@ -183,17 +183,20 @@ case class DropTableCommand(
override def run(sparkSession: SparkSession): Seq[Row] = {
val catalog = sparkSession.sessionState.catalog
- // If the command DROP VIEW is to drop a table or DROP TABLE is to drop a view
- // issue an exception.
- catalog.getTableMetadataOption(tableName).map(_.tableType match {
- case CatalogTableType.VIEW if !isView =>
- throw new AnalysisException(
- "Cannot drop a view with DROP TABLE. Please use DROP VIEW instead")
- case o if o != CatalogTableType.VIEW && isView =>
- throw new AnalysisException(
- s"Cannot drop a table with DROP VIEW. Please use DROP TABLE instead")
- case _ =>
- })
+
+ if (!catalog.isTemporaryTable(tableName) && catalog.tableExists(tableName)) {
+ // If the command DROP VIEW is to drop a table or DROP TABLE is to drop a view
+ // issue an exception.
+ catalog.getTableMetadata(tableName).tableType match {
+ case CatalogTableType.VIEW if !isView =>
+ throw new AnalysisException(
+ "Cannot drop a view with DROP TABLE. Please use DROP VIEW instead")
+ case o if o != CatalogTableType.VIEW && isView =>
+ throw new AnalysisException(
+ s"Cannot drop a table with DROP VIEW. Please use DROP TABLE instead")
+ case _ =>
+ }
+ }
try {
sparkSession.sharedState.cacheManager.uncacheQuery(
sparkSession.table(tableName.quotedString))
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala
index 08de6cd424..424ef58d76 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala
@@ -579,9 +579,10 @@ case class ShowTablesCommand(
databaseName: Option[String],
tableIdentifierPattern: Option[String]) extends RunnableCommand {
- // The result of SHOW TABLES has two columns, tableName and isTemporary.
+ // The result of SHOW TABLES has three columns: database, tableName and isTemporary.
override val output: Seq[Attribute] = {
- AttributeReference("tableName", StringType, nullable = false)() ::
+ AttributeReference("database", StringType, nullable = false)() ::
+ AttributeReference("tableName", StringType, nullable = false)() ::
AttributeReference("isTemporary", BooleanType, nullable = false)() :: Nil
}
@@ -592,9 +593,9 @@ case class ShowTablesCommand(
val db = databaseName.getOrElse(catalog.getCurrentDatabase)
val tables =
tableIdentifierPattern.map(catalog.listTables(db, _)).getOrElse(catalog.listTables(db))
- tables.map { t =>
- val isTemp = t.database.isEmpty
- Row(t.table, isTemp)
+ tables.map { tableIdent =>
+ val isTemp = catalog.isTemporaryTable(tableIdent)
+ Row(tableIdent.database.getOrElse(""), tableIdent.table, isTemp)
}
}
}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/views.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/views.scala
index 15340ee921..bbcd9c4ef5 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/views.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/views.scala
@@ -19,13 +19,46 @@ package org.apache.spark.sql.execution.command
import scala.util.control.NonFatal
-import org.apache.spark.sql.{AnalysisException, Row, SparkSession}
+import org.apache.spark.sql.{AnalysisException, Dataset, Row, SparkSession}
import org.apache.spark.sql.catalyst.{SQLBuilder, TableIdentifier}
import org.apache.spark.sql.catalyst.catalog.{CatalogStorageFormat, CatalogTable, CatalogTableType}
import org.apache.spark.sql.catalyst.expressions.Alias
import org.apache.spark.sql.catalyst.plans.QueryPlan
import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, Project}
-import org.apache.spark.sql.types.StructType
+import org.apache.spark.sql.execution.datasources.{DataSource, LogicalRelation}
+import org.apache.spark.sql.types.{MetadataBuilder, StructType}
+
+
+/**
+ * ViewType is used to specify the expected view type when we want to create or replace a view in
+ * [[CreateViewCommand]].
+ */
+sealed trait ViewType
+
+/**
+ * LocalTempView means session-scoped local temporary views. Its lifetime is the lifetime of the
+ * session that created it, i.e. it will be automatically dropped when the session terminates. It's
+ * not tied to any databases, i.e. we can't use `db1.view1` to reference a local temporary view.
+ */
+object LocalTempView extends ViewType
+
+/**
+ * GlobalTempView means cross-session global temporary views. Its lifetime is the lifetime of the
+ * Spark application, i.e. it will be automatically dropped when the application terminates. It's
+ * tied to a system preserved database `_global_temp`, and we must use the qualified name to refer a
+ * global temp view, e.g. SELECT * FROM _global_temp.view1.
+ */
+object GlobalTempView extends ViewType
+
+/**
+ * PersistedView means cross-session persisted views. Persisted views stay until they are
+ * explicitly dropped by user command. It's always tied to a database, default to the current
+ * database if not specified.
+ *
+ * Note that, Existing persisted view with the same name are not visible to the current session
+ * while the local temporary view exists, unless the view name is qualified by database.
+ */
+object PersistedView extends ViewType
/**
@@ -46,10 +79,7 @@ import org.apache.spark.sql.types.StructType
* already exists, throws analysis exception.
* @param replace if true, and if the view already exists, updates it; if false, and if the view
* already exists, throws analysis exception.
- * @param isTemporary if true, the view is created as a temporary view. Temporary views are dropped
- * at the end of current Spark session. Existing permanent relations with the same
- * name are not visible to the current session while the temporary view exists,
- * unless they are specified with full qualified table name with database prefix.
+ * @param viewType the expected view type to be created with this command.
*/
case class CreateViewCommand(
name: TableIdentifier,
@@ -60,20 +90,21 @@ case class CreateViewCommand(
child: LogicalPlan,
allowExisting: Boolean,
replace: Boolean,
- isTemporary: Boolean)
+ viewType: ViewType)
extends RunnableCommand {
override protected def innerChildren: Seq[QueryPlan[_]] = Seq(child)
- if (!isTemporary) {
- require(originalText.isDefined,
- "The table to created with CREATE VIEW must have 'originalText'.")
+ if (viewType == PersistedView) {
+ require(originalText.isDefined, "'originalText' must be provided to create permanent view")
}
if (allowExisting && replace) {
throw new AnalysisException("CREATE VIEW with both IF NOT EXISTS and REPLACE is not allowed.")
}
+ private def isTemporary = viewType == LocalTempView || viewType == GlobalTempView
+
// Disallows 'CREATE TEMPORARY VIEW IF NOT EXISTS' to be consistent with 'CREATE TEMPORARY TABLE'
if (allowExisting && isTemporary) {
throw new AnalysisException(
@@ -99,72 +130,53 @@ case class CreateViewCommand(
s"(num: `${analyzedPlan.output.length}`) does not match the number of column names " +
s"specified by CREATE VIEW (num: `${userSpecifiedColumns.length}`).")
}
- val sessionState = sparkSession.sessionState
-
- if (isTemporary) {
- createTemporaryView(sparkSession, analyzedPlan)
- } else {
- // Adds default database for permanent table if it doesn't exist, so that tableExists()
- // only check permanent tables.
- val database = name.database.getOrElse(sessionState.catalog.getCurrentDatabase)
- val qualifiedName = name.copy(database = Option(database))
-
- if (sessionState.catalog.tableExists(qualifiedName)) {
- val tableMetadata = sessionState.catalog.getTableMetadata(qualifiedName)
- if (allowExisting) {
- // Handles `CREATE VIEW IF NOT EXISTS v0 AS SELECT ...`. Does nothing when the target view
- // already exists.
- } else if (tableMetadata.tableType != CatalogTableType.VIEW) {
- throw new AnalysisException(s"$qualifiedName is not a view")
- } else if (replace) {
- // Handles `CREATE OR REPLACE VIEW v0 AS SELECT ...`
- sessionState.catalog.alterTable(prepareTable(sparkSession, analyzedPlan))
- } else {
- // Handles `CREATE VIEW v0 AS SELECT ...`. Throws exception when the target view already
- // exists.
- throw new AnalysisException(
- s"View $qualifiedName already exists. If you want to update the view definition, " +
- "please use ALTER VIEW AS or CREATE OR REPLACE VIEW AS")
- }
- } else {
- // Create the view if it doesn't exist.
- sessionState.catalog.createTable(
- prepareTable(sparkSession, analyzedPlan), ignoreIfExists = false)
- }
- }
- Seq.empty[Row]
- }
-
- private def createTemporaryView(sparkSession: SparkSession, analyzedPlan: LogicalPlan): Unit = {
- val catalog = sparkSession.sessionState.catalog
- // Projects column names to alias names
- val logicalPlan = if (userSpecifiedColumns.isEmpty) {
+ val aliasedPlan = if (userSpecifiedColumns.isEmpty) {
analyzedPlan
} else {
val projectList = analyzedPlan.output.zip(userSpecifiedColumns).map {
- case (attr, (colName, _)) => Alias(attr, colName)()
+ case (attr, (colName, None)) => Alias(attr, colName)()
+ case (attr, (colName, Some(colComment))) =>
+ val meta = new MetadataBuilder().putString("comment", colComment).build()
+ Alias(attr, colName)(explicitMetadata = Some(meta))
}
sparkSession.sessionState.executePlan(Project(projectList, analyzedPlan)).analyzed
}
- catalog.createTempView(name.table, logicalPlan, replace)
+ val catalog = sparkSession.sessionState.catalog
+ if (viewType == LocalTempView) {
+ catalog.createTempView(name.table, aliasedPlan, overrideIfExists = replace)
+ } else if (viewType == GlobalTempView) {
+ catalog.createGlobalTempView(name.table, aliasedPlan, overrideIfExists = replace)
+ } else if (catalog.tableExists(name)) {
+ val tableMetadata = catalog.getTableMetadata(name)
+ if (allowExisting) {
+ // Handles `CREATE VIEW IF NOT EXISTS v0 AS SELECT ...`. Does nothing when the target view
+ // already exists.
+ } else if (tableMetadata.tableType != CatalogTableType.VIEW) {
+ throw new AnalysisException(s"$name is not a view")
+ } else if (replace) {
+ // Handles `CREATE OR REPLACE VIEW v0 AS SELECT ...`
+ catalog.alterTable(prepareTable(sparkSession, aliasedPlan))
+ } else {
+ // Handles `CREATE VIEW v0 AS SELECT ...`. Throws exception when the target view already
+ // exists.
+ throw new AnalysisException(
+ s"View $name already exists. If you want to update the view definition, " +
+ "please use ALTER VIEW AS or CREATE OR REPLACE VIEW AS")
+ }
+ } else {
+ // Create the view if it doesn't exist.
+ catalog.createTable(prepareTable(sparkSession, aliasedPlan), ignoreIfExists = false)
+ }
+ Seq.empty[Row]
}
/**
* Returns a [[CatalogTable]] that can be used to save in the catalog. This comment canonicalize
* SQL based on the analyzed plan, and also creates the proper schema for the view.
*/
- private def prepareTable(sparkSession: SparkSession, analyzedPlan: LogicalPlan): CatalogTable = {
- val aliasedPlan = if (userSpecifiedColumns.isEmpty) {
- analyzedPlan
- } else {
- val projectList = analyzedPlan.output.zip(userSpecifiedColumns).map {
- case (attr, (colName, _)) => Alias(attr, colName)()
- }
- sparkSession.sessionState.executePlan(Project(projectList, analyzedPlan)).analyzed
- }
-
+ private def prepareTable(sparkSession: SparkSession, aliasedPlan: LogicalPlan): CatalogTable = {
val viewSQL: String = new SQLBuilder(aliasedPlan).toSQL
// Validate the view SQL - make sure we can parse it and analyze it.
@@ -176,19 +188,11 @@ case class CreateViewCommand(
throw new RuntimeException(s"Failed to analyze the canonicalized SQL: $viewSQL", e)
}
- val viewSchema = if (userSpecifiedColumns.isEmpty) {
- aliasedPlan.schema
- } else {
- StructType(aliasedPlan.schema.zip(userSpecifiedColumns).map {
- case (field, (_, comment)) => comment.map(field.withComment).getOrElse(field)
- })
- }
-
CatalogTable(
identifier = name,
tableType = CatalogTableType.VIEW,
storage = CatalogStorageFormat.empty,
- schema = viewSchema,
+ schema = aliasedPlan.schema,
properties = properties,
viewOriginalText = originalText,
viewText = Some(viewSQL),
@@ -222,8 +226,8 @@ case class AlterViewAsCommand(
qe.assertAnalyzed()
val analyzedPlan = qe.analyzed
- if (session.sessionState.catalog.isTemporaryTable(name)) {
- session.sessionState.catalog.createTempView(name.table, analyzedPlan, overrideIfExists = true)
+ if (session.sessionState.catalog.alterTempViewDefinition(name, analyzedPlan)) {
+ // a local/global temp view has been altered, we are done.
} else {
alterPermanentView(session, analyzedPlan)
}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/ddl.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/ddl.scala
index fa95af2648..59fb48ffea 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/ddl.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/ddl.scala
@@ -40,16 +40,20 @@ case class CreateTable(
override def innerChildren: Seq[QueryPlan[_]] = query.toSeq
}
+/**
+ * Create or replace a local/global temporary view with given data source.
+ */
case class CreateTempViewUsing(
tableIdent: TableIdentifier,
userSpecifiedSchema: Option[StructType],
replace: Boolean,
+ global: Boolean,
provider: String,
options: Map[String, String]) extends RunnableCommand {
if (tableIdent.database.isDefined) {
throw new AnalysisException(
- s"Temporary table '$tableIdent' should not have specified a database")
+ s"Temporary view '$tableIdent' should not have specified a database")
}
def run(sparkSession: SparkSession): Seq[Row] = {
@@ -58,10 +62,16 @@ case class CreateTempViewUsing(
userSpecifiedSchema = userSpecifiedSchema,
className = provider,
options = options)
- sparkSession.sessionState.catalog.createTempView(
- tableIdent.table,
- Dataset.ofRows(sparkSession, LogicalRelation(dataSource.resolveRelation())).logicalPlan,
- replace)
+
+ val catalog = sparkSession.sessionState.catalog
+ val viewDefinition = Dataset.ofRows(
+ sparkSession, LogicalRelation(dataSource.resolveRelation())).logicalPlan
+
+ if (global) {
+ catalog.createGlobalTempView(tableIdent.table, viewDefinition, replace)
+ } else {
+ catalog.createTempView(tableIdent.table, viewDefinition, replace)
+ }
Seq.empty[Row]
}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/internal/CatalogImpl.scala b/sql/core/src/main/scala/org/apache/spark/sql/internal/CatalogImpl.scala
index e412e1b4b3..c05bda3f1b 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/internal/CatalogImpl.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/internal/CatalogImpl.scala
@@ -94,20 +94,19 @@ class CatalogImpl(sparkSession: SparkSession) extends Catalog {
*/
@throws[AnalysisException]("database does not exist")
override def listTables(dbName: String): Dataset[Table] = {
- requireDatabaseExists(dbName)
val tables = sessionCatalog.listTables(dbName).map(makeTable)
CatalogImpl.makeDataset(tables, sparkSession)
}
private def makeTable(tableIdent: TableIdentifier): Table = {
val metadata = sessionCatalog.getTempViewOrPermanentTableMetadata(tableIdent)
- val database = metadata.identifier.database
+ val isTemp = sessionCatalog.isTemporaryTable(tableIdent)
new Table(
name = tableIdent.table,
- database = database.orNull,
+ database = metadata.identifier.database.orNull,
description = metadata.comment.orNull,
- tableType = if (database.isEmpty) "TEMPORARY" else metadata.tableType.name,
- isTemporary = database.isEmpty)
+ tableType = if (isTemp) "TEMPORARY" else metadata.tableType.name,
+ isTemporary = isTemp)
}
/**
@@ -365,7 +364,7 @@ class CatalogImpl(sparkSession: SparkSession) extends Catalog {
}
/**
- * Drops the temporary view with the given view name in the catalog.
+ * Drops the local temporary view with the given view name in the catalog.
* If the view has been cached/persisted before, it's also unpersisted.
*
* @param viewName the name of the view to be dropped.
@@ -380,6 +379,21 @@ class CatalogImpl(sparkSession: SparkSession) extends Catalog {
}
/**
+ * Drops the global temporary view with the given view name in the catalog.
+ * If the view has been cached/persisted before, it's also unpersisted.
+ *
+ * @param viewName the name of the view to be dropped.
+ * @group ddl_ops
+ * @since 2.1.0
+ */
+ override def dropGlobalTempView(viewName: String): Boolean = {
+ sparkSession.sessionState.catalog.getGlobalTempView(viewName).exists { viewDef =>
+ sparkSession.sharedState.cacheManager.uncacheQuery(Dataset.ofRows(sparkSession, viewDef))
+ sessionCatalog.dropGlobalTempView(viewName)
+ }
+ }
+
+ /**
* Returns true if the table is currently cached in-memory.
*
* @group cachemgmt
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/internal/SessionState.scala b/sql/core/src/main/scala/org/apache/spark/sql/internal/SessionState.scala
index 9f7d0019c6..8759dfe39c 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/internal/SessionState.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/internal/SessionState.scala
@@ -95,6 +95,7 @@ private[sql] class SessionState(sparkSession: SparkSession) {
*/
lazy val catalog = new SessionCatalog(
sparkSession.sharedState.externalCatalog,
+ sparkSession.sharedState.globalTempViewManager,
functionResourceLoader,
functionRegistry,
conf,
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/internal/SharedState.scala b/sql/core/src/main/scala/org/apache/spark/sql/internal/SharedState.scala
index 6387f01506..c555a43cd2 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/internal/SharedState.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/internal/SharedState.scala
@@ -22,11 +22,11 @@ import scala.util.control.NonFatal
import org.apache.hadoop.conf.Configuration
-import org.apache.spark.{SparkConf, SparkContext}
+import org.apache.spark.{SparkConf, SparkContext, SparkException}
import org.apache.spark.internal.config._
import org.apache.spark.internal.Logging
import org.apache.spark.sql.{SparkSession, SQLContext}
-import org.apache.spark.sql.catalyst.catalog.{ExternalCatalog, InMemoryCatalog}
+import org.apache.spark.sql.catalyst.catalog.{ExternalCatalog, GlobalTempViewManager, InMemoryCatalog}
import org.apache.spark.sql.execution.CacheManager
import org.apache.spark.sql.execution.ui.{SQLListener, SQLTab}
import org.apache.spark.util.{MutableURLClassLoader, Utils}
@@ -37,39 +37,14 @@ import org.apache.spark.util.{MutableURLClassLoader, Utils}
*/
private[sql] class SharedState(val sparkContext: SparkContext) extends Logging {
- /**
- * Class for caching query results reused in future executions.
- */
- val cacheManager: CacheManager = new CacheManager
-
- /**
- * A listener for SQL-specific [[org.apache.spark.scheduler.SparkListenerEvent]]s.
- */
- val listener: SQLListener = createListenerAndUI(sparkContext)
-
+ // Load hive-site.xml into hadoopConf and determine the warehouse path we want to use, based on
+ // the config from both hive and Spark SQL. Finally set the warehouse config value to sparkConf.
{
val configFile = Utils.getContextOrSparkClassLoader.getResource("hive-site.xml")
if (configFile != null) {
sparkContext.hadoopConfiguration.addResource(configFile)
}
- }
-
- /**
- * A catalog that interacts with external systems.
- */
- lazy val externalCatalog: ExternalCatalog =
- SharedState.reflect[ExternalCatalog, SparkConf, Configuration](
- SharedState.externalCatalogClassName(sparkContext.conf),
- sparkContext.conf,
- sparkContext.hadoopConfiguration)
-
- /**
- * A classloader used to load all user-added jar.
- */
- val jarClassLoader = new NonClosableMutableURLClassLoader(
- org.apache.spark.util.Utils.getContextOrSparkClassLoader)
- {
// Set the Hive metastore warehouse path to the one we use
val tempConf = new SQLConf
sparkContext.conf.getAll.foreach { case (k, v) => tempConf.setConfString(k, v) }
@@ -94,6 +69,48 @@ private[sql] class SharedState(val sparkContext: SparkContext) extends Logging {
}
/**
+ * Class for caching query results reused in future executions.
+ */
+ val cacheManager: CacheManager = new CacheManager
+
+ /**
+ * A listener for SQL-specific [[org.apache.spark.scheduler.SparkListenerEvent]]s.
+ */
+ val listener: SQLListener = createListenerAndUI(sparkContext)
+
+ /**
+ * A catalog that interacts with external systems.
+ */
+ val externalCatalog: ExternalCatalog =
+ SharedState.reflect[ExternalCatalog, SparkConf, Configuration](
+ SharedState.externalCatalogClassName(sparkContext.conf),
+ sparkContext.conf,
+ sparkContext.hadoopConfiguration)
+
+ /**
+ * A manager for global temporary views.
+ */
+ val globalTempViewManager = {
+ // System preserved database should not exists in metastore. However it's hard to guarantee it
+ // for every session, because case-sensitivity differs. Here we always lowercase it to make our
+ // life easier.
+ val globalTempDB = sparkContext.conf.get(GLOBAL_TEMP_DATABASE).toLowerCase
+ if (externalCatalog.databaseExists(globalTempDB)) {
+ throw new SparkException(
+ s"$globalTempDB is a system preserved database, please rename your existing database " +
+ "to resolve the name conflict, or set a different value for " +
+ s"${GLOBAL_TEMP_DATABASE.key}, and launch your Spark application again.")
+ }
+ new GlobalTempViewManager(globalTempDB)
+ }
+
+ /**
+ * A classloader used to load all user-added jar.
+ */
+ val jarClassLoader = new NonClosableMutableURLClassLoader(
+ org.apache.spark.util.Utils.getContextOrSparkClassLoader)
+
+ /**
* Create a SQLListener then add it into SparkContext, and create a SQLTab if there is SparkUI.
*/
private def createListenerAndUI(sc: SparkContext): SQLListener = {
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/SQLContextSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/SQLContextSuite.scala
index 001c1a1d85..2b35db411e 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/SQLContextSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/SQLContextSuite.scala
@@ -88,11 +88,11 @@ class SQLContextSuite extends SparkFunSuite with SharedSparkContext {
df.createOrReplaceTempView("listtablessuitetable")
assert(
sqlContext.tables().filter("tableName = 'listtablessuitetable'").collect().toSeq ==
- Row("listtablessuitetable", true) :: Nil)
+ Row("", "listtablessuitetable", true) :: Nil)
assert(
sqlContext.sql("SHOW tables").filter("tableName = 'listtablessuitetable'").collect().toSeq ==
- Row("listtablessuitetable", true) :: Nil)
+ Row("", "listtablessuitetable", true) :: Nil)
sqlContext.sessionState.catalog.dropTable(
TableIdentifier("listtablessuitetable"), ignoreIfNotExists = true, purge = false)
@@ -105,11 +105,11 @@ class SQLContextSuite extends SparkFunSuite with SharedSparkContext {
df.createOrReplaceTempView("listtablessuitetable")
assert(
sqlContext.tables("default").filter("tableName = 'listtablessuitetable'").collect().toSeq ==
- Row("listtablessuitetable", true) :: Nil)
+ Row("", "listtablessuitetable", true) :: Nil)
assert(
sqlContext.sql("show TABLES in default").filter("tableName = 'listtablessuitetable'")
- .collect().toSeq == Row("listtablessuitetable", true) :: Nil)
+ .collect().toSeq == Row("", "listtablessuitetable", true) :: Nil)
sqlContext.sessionState.catalog.dropTable(
TableIdentifier("listtablessuitetable"), ignoreIfNotExists = true, purge = false)
@@ -122,7 +122,8 @@ class SQLContextSuite extends SparkFunSuite with SharedSparkContext {
df.createOrReplaceTempView("listtablessuitetable")
val expectedSchema = StructType(
- StructField("tableName", StringType, false) ::
+ StructField("database", StringType, false) ::
+ StructField("tableName", StringType, false) ::
StructField("isTemporary", BooleanType, false) :: Nil)
Seq(sqlContext.tables(), sqlContext.sql("SHOW TABLes")).foreach {
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/GlobalTempViewSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/GlobalTempViewSuite.scala
new file mode 100644
index 0000000000..391bcb8b35
--- /dev/null
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/GlobalTempViewSuite.scala
@@ -0,0 +1,168 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution
+
+import org.apache.spark.sql.{AnalysisException, QueryTest, Row}
+import org.apache.spark.sql.catalog.Table
+import org.apache.spark.sql.catalyst.TableIdentifier
+import org.apache.spark.sql.catalyst.analysis.NoSuchTableException
+import org.apache.spark.sql.test.SharedSQLContext
+import org.apache.spark.sql.types.StructType
+
+class GlobalTempViewSuite extends QueryTest with SharedSQLContext {
+ import testImplicits._
+
+ override protected def beforeAll(): Unit = {
+ super.beforeAll()
+ globalTempDB = spark.sharedState.globalTempViewManager.database
+ }
+
+ private var globalTempDB: String = _
+
+ test("basic semantic") {
+ sql("CREATE GLOBAL TEMP VIEW src AS SELECT 1, 'a'")
+
+ // If there is no database in table name, we should try local temp view first, if not found,
+ // try table/view in current database, which is "default" in this case. So we expect
+ // NoSuchTableException here.
+ intercept[NoSuchTableException](spark.table("src"))
+
+ // Use qualified name to refer to the global temp view explicitly.
+ checkAnswer(spark.table(s"$globalTempDB.src"), Row(1, "a"))
+
+ // Table name without database will never refer to a global temp view.
+ intercept[NoSuchTableException](sql("DROP VIEW src"))
+
+ sql(s"DROP VIEW $globalTempDB.src")
+ // The global temp view should be dropped successfully.
+ intercept[NoSuchTableException](spark.table(s"$globalTempDB.src"))
+
+ // We can also use Dataset API to create global temp view
+ Seq(1 -> "a").toDF("i", "j").createGlobalTempView("src")
+ checkAnswer(spark.table(s"$globalTempDB.src"), Row(1, "a"))
+
+ // Use qualified name to rename a global temp view.
+ sql(s"ALTER VIEW $globalTempDB.src RENAME TO src2")
+ intercept[NoSuchTableException](spark.table(s"$globalTempDB.src"))
+ checkAnswer(spark.table(s"$globalTempDB.src2"), Row(1, "a"))
+
+ // Use qualified name to alter a global temp view.
+ sql(s"ALTER VIEW $globalTempDB.src2 AS SELECT 2, 'b'")
+ checkAnswer(spark.table(s"$globalTempDB.src2"), Row(2, "b"))
+
+ // We can also use Catalog API to drop global temp view
+ spark.catalog.dropGlobalTempView("src2")
+ intercept[NoSuchTableException](spark.table(s"$globalTempDB.src2"))
+ }
+
+ test("global temp view is shared among all sessions") {
+ try {
+ sql("CREATE GLOBAL TEMP VIEW src AS SELECT 1, 2")
+ checkAnswer(spark.table(s"$globalTempDB.src"), Row(1, 2))
+ val newSession = spark.newSession()
+ checkAnswer(newSession.table(s"$globalTempDB.src"), Row(1, 2))
+ } finally {
+ spark.catalog.dropGlobalTempView("src")
+ }
+ }
+
+ test("global temp view database should be preserved") {
+ val e = intercept[AnalysisException](sql(s"CREATE DATABASE $globalTempDB"))
+ assert(e.message.contains("system preserved database"))
+
+ val e2 = intercept[AnalysisException](sql(s"USE $globalTempDB"))
+ assert(e2.message.contains("system preserved database"))
+ }
+
+ test("CREATE GLOBAL TEMP VIEW USING") {
+ withTempPath { path =>
+ try {
+ Seq(1 -> "a").toDF("i", "j").write.parquet(path.getAbsolutePath)
+ sql(s"CREATE GLOBAL TEMP VIEW src USING parquet OPTIONS (PATH '${path.getAbsolutePath}')")
+ checkAnswer(spark.table(s"$globalTempDB.src"), Row(1, "a"))
+ sql(s"INSERT INTO $globalTempDB.src SELECT 2, 'b'")
+ checkAnswer(spark.table(s"$globalTempDB.src"), Row(1, "a") :: Row(2, "b") :: Nil)
+ } finally {
+ spark.catalog.dropGlobalTempView("src")
+ }
+ }
+ }
+
+ test("CREATE TABLE LIKE should work for global temp view") {
+ try {
+ sql("CREATE GLOBAL TEMP VIEW src AS SELECT 1 AS a, '2' AS b")
+ sql(s"CREATE TABLE cloned LIKE ${globalTempDB}.src")
+ val tableMeta = spark.sessionState.catalog.getTableMetadata(TableIdentifier("cloned"))
+ assert(tableMeta.schema == new StructType().add("a", "int", false).add("b", "string", false))
+ } finally {
+ spark.catalog.dropGlobalTempView("src")
+ sql("DROP TABLE default.cloned")
+ }
+ }
+
+ test("list global temp views") {
+ try {
+ sql("CREATE GLOBAL TEMP VIEW v1 AS SELECT 3, 4")
+ sql("CREATE TEMP VIEW v2 AS SELECT 1, 2")
+
+ checkAnswer(sql(s"SHOW TABLES IN $globalTempDB"),
+ Row(globalTempDB, "v1", true) ::
+ Row("", "v2", true) :: Nil)
+
+ assert(spark.catalog.listTables(globalTempDB).collect().toSeq.map(_.name) == Seq("v1", "v2"))
+ } finally {
+ spark.catalog.dropTempView("v1")
+ spark.catalog.dropGlobalTempView("v2")
+ }
+ }
+
+ test("should lookup global temp view if and only if global temp db is specified") {
+ try {
+ sql("CREATE GLOBAL TEMP VIEW same_name AS SELECT 3, 4")
+ sql("CREATE TEMP VIEW same_name AS SELECT 1, 2")
+
+ checkAnswer(sql("SELECT * FROM same_name"), Row(1, 2))
+
+ // we never lookup global temp views if database is not specified in table name
+ spark.catalog.dropTempView("same_name")
+ intercept[AnalysisException](sql("SELECT * FROM same_name"))
+
+ // Use qualified name to lookup a global temp view.
+ checkAnswer(sql(s"SELECT * FROM $globalTempDB.same_name"), Row(3, 4))
+ } finally {
+ spark.catalog.dropTempView("same_name")
+ spark.catalog.dropGlobalTempView("same_name")
+ }
+ }
+
+ test("public Catalog should recognize global temp view") {
+ try {
+ sql("CREATE GLOBAL TEMP VIEW src AS SELECT 1, 2")
+
+ assert(spark.catalog.tableExists(globalTempDB, "src"))
+ assert(spark.catalog.getTable(globalTempDB, "src").toString == new Table(
+ name = "src",
+ database = globalTempDB,
+ description = null,
+ tableType = "TEMPORARY",
+ isTemporary = true).toString)
+ } finally {
+ spark.catalog.dropGlobalTempView("src")
+ }
+ }
+}
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala
index 1bcb810a15..19885156cc 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala
@@ -969,17 +969,17 @@ class DDLSuite extends QueryTest with SharedSQLContext with BeforeAndAfterEach {
""".stripMargin)
checkAnswer(
sql("SHOW TABLES IN default 'show1*'"),
- Row("show1a", true) :: Nil)
+ Row("", "show1a", true) :: Nil)
checkAnswer(
sql("SHOW TABLES IN default 'show1*|show2*'"),
- Row("show1a", true) ::
- Row("show2b", true) :: Nil)
+ Row("", "show1a", true) ::
+ Row("", "show2b", true) :: Nil)
checkAnswer(
sql("SHOW TABLES 'show1*|show2*'"),
- Row("show1a", true) ::
- Row("show2b", true) :: Nil)
+ Row("", "show1a", true) ::
+ Row("", "show2b", true) :: Nil)
assert(
sql("SHOW TABLES").count() >= 2)
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionCatalog.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionCatalog.scala
index 85c509847d..85ecf0ce70 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionCatalog.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionCatalog.scala
@@ -29,7 +29,7 @@ import org.apache.spark.sql.{AnalysisException, SparkSession}
import org.apache.spark.sql.catalyst.{FunctionIdentifier, TableIdentifier}
import org.apache.spark.sql.catalyst.analysis.FunctionRegistry
import org.apache.spark.sql.catalyst.analysis.FunctionRegistry.FunctionBuilder
-import org.apache.spark.sql.catalyst.catalog.{FunctionResourceLoader, SessionCatalog}
+import org.apache.spark.sql.catalyst.catalog.{FunctionResourceLoader, GlobalTempViewManager, SessionCatalog}
import org.apache.spark.sql.catalyst.expressions.{Cast, Expression, ExpressionInfo}
import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, SubqueryAlias}
import org.apache.spark.sql.catalyst.rules.Rule
@@ -41,6 +41,7 @@ import org.apache.spark.util.Utils
private[sql] class HiveSessionCatalog(
externalCatalog: HiveExternalCatalog,
+ globalTempViewManager: GlobalTempViewManager,
sparkSession: SparkSession,
functionResourceLoader: FunctionResourceLoader,
functionRegistry: FunctionRegistry,
@@ -48,6 +49,7 @@ private[sql] class HiveSessionCatalog(
hadoopConf: Configuration)
extends SessionCatalog(
externalCatalog,
+ globalTempViewManager,
functionResourceLoader,
functionRegistry,
conf,
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionState.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionState.scala
index eb10c11382..6d4fe1a941 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionState.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionState.scala
@@ -45,6 +45,7 @@ private[hive] class HiveSessionState(sparkSession: SparkSession)
override lazy val catalog = {
new HiveSessionCatalog(
sparkSession.sharedState.externalCatalog.asInstanceOf[HiveExternalCatalog],
+ sparkSession.sharedState.globalTempViewManager,
sparkSession,
functionResourceLoader,
functionRegistry,
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveContextCompatibilitySuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveContextCompatibilitySuite.scala
index 57363b7259..939fd71b4f 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveContextCompatibilitySuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveContextCompatibilitySuite.scala
@@ -87,11 +87,11 @@ class HiveContextCompatibilitySuite extends SparkFunSuite with BeforeAndAfterEac
assert(
hc.sql("SELECT * FROM moo_table order by name").collect().toSeq ==
df.collect().toSeq.sortBy(_.getString(0)))
- val tables = hc.sql("SHOW TABLES IN mee_db").collect().map(_.getString(0))
+ val tables = hc.sql("SHOW TABLES IN mee_db").select("tableName").collect().map(_.getString(0))
assert(tables.toSet == Set("moo_table", "mee_table"))
hc.sql("DROP TABLE moo_table")
hc.sql("DROP TABLE mee_table")
- val tables2 = hc.sql("SHOW TABLES IN mee_db").collect().map(_.getString(0))
+ val tables2 = hc.sql("SHOW TABLES IN mee_db").select("tableName").collect().map(_.getString(0))
assert(tables2.isEmpty)
hc.sql("USE default")
hc.sql("DROP DATABASE mee_db CASCADE")
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/ListTablesSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/ListTablesSuite.scala
index 6eeb67510c..15ba61646d 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/ListTablesSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/ListTablesSuite.scala
@@ -58,10 +58,10 @@ class ListTablesSuite extends QueryTest with TestHiveSingleton with BeforeAndAft
// We are using default DB.
checkAnswer(
allTables.filter("tableName = 'listtablessuitetable'"),
- Row("listtablessuitetable", true))
+ Row("", "listtablessuitetable", true))
checkAnswer(
allTables.filter("tableName = 'hivelisttablessuitetable'"),
- Row("hivelisttablessuitetable", false))
+ Row("default", "hivelisttablessuitetable", false))
assert(allTables.filter("tableName = 'hiveindblisttablessuitetable'").count() === 0)
}
}
@@ -71,11 +71,11 @@ class ListTablesSuite extends QueryTest with TestHiveSingleton with BeforeAndAft
case allTables =>
checkAnswer(
allTables.filter("tableName = 'listtablessuitetable'"),
- Row("listtablessuitetable", true))
+ Row("", "listtablessuitetable", true))
assert(allTables.filter("tableName = 'hivelisttablessuitetable'").count() === 0)
checkAnswer(
allTables.filter("tableName = 'hiveindblisttablessuitetable'"),
- Row("hiveindblisttablessuitetable", false))
+ Row("listtablessuitedb", "hiveindblisttablessuitetable", false))
}
}
}
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala
index 8ae6868c98..51670649ad 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala
@@ -984,7 +984,7 @@ class MetastoreDataSourcesSuite extends QueryTest with SQLTestUtils with TestHiv
checkAnswer(
spark.sql("show TABLES in testdb8156").filter("tableName = 'ttt3'"),
- Row("ttt3", false))
+ Row("testdb8156", "ttt3", false))
spark.sql("""use default""")
spark.sql("""drop database if exists testdb8156 CASCADE""")
}
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveCommandSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveCommandSuite.scala
index b2103b3bfc..2c772ce215 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveCommandSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveCommandSuite.scala
@@ -94,15 +94,15 @@ class HiveCommandSuite extends QueryTest with SQLTestUtils with TestHiveSingleto
sql("CREATE TABLE show2b(c2 int)")
checkAnswer(
sql("SHOW TABLES IN default 'show1*'"),
- Row("show1a", false) :: Nil)
+ Row("default", "show1a", false) :: Nil)
checkAnswer(
sql("SHOW TABLES IN default 'show1*|show2*'"),
- Row("show1a", false) ::
- Row("show2b", false) :: Nil)
+ Row("default", "show1a", false) ::
+ Row("default", "show2b", false) :: Nil)
checkAnswer(
sql("SHOW TABLES 'show1*|show2*'"),
- Row("show1a", false) ::
- Row("show2b", false) :: Nil)
+ Row("default", "show1a", false) ::
+ Row("default", "show2b", false) :: Nil)
assert(
sql("SHOW TABLES").count() >= 2)
assert(
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLViewSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLViewSuite.scala
index f5c605fe5e..2af935da68 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLViewSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLViewSuite.scala
@@ -62,15 +62,15 @@ class SQLViewSuite extends QueryTest with SQLTestUtils with TestHiveSingleton {
var e = intercept[AnalysisException] {
sql("CREATE OR REPLACE VIEW tab1 AS SELECT * FROM jt")
}.getMessage
- assert(e.contains("`default`.`tab1` is not a view"))
+ assert(e.contains("`tab1` is not a view"))
e = intercept[AnalysisException] {
sql("CREATE VIEW tab1 AS SELECT * FROM jt")
}.getMessage
- assert(e.contains("`default`.`tab1` is not a view"))
+ assert(e.contains("`tab1` is not a view"))
e = intercept[AnalysisException] {
sql("ALTER VIEW tab1 AS SELECT * FROM jt")
}.getMessage
- assert(e.contains("`default`.`tab1` is not a view"))
+ assert(e.contains("`tab1` is not a view"))
}
}