aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala57
-rw-r--r--sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala4
-rw-r--r--sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala226
3 files changed, 273 insertions, 14 deletions
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala
index b4a15b8b28..67b2329eff 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala
@@ -29,17 +29,23 @@ import org.apache.hadoop.fs.Path
import org.apache.spark.sql.{AnalysisException, Row, SparkSession}
import org.apache.spark.sql.catalyst.TableIdentifier
-import org.apache.spark.sql.catalyst.catalog.{BucketSpec, CatalogTable, CatalogTableType}
+import org.apache.spark.sql.catalyst.catalog.{BucketSpec, CatalogStorageFormat, CatalogTable, CatalogTableType}
import org.apache.spark.sql.catalyst.catalog.CatalogTableType._
import org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec
import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference}
import org.apache.spark.sql.catalyst.util.quoteIdentifier
import org.apache.spark.sql.execution.datasources.PartitioningUtils
+import org.apache.spark.sql.internal.HiveSerDe
import org.apache.spark.sql.types._
import org.apache.spark.util.Utils
/**
- * A command to create a table with the same definition of the given existing table.
+ * A command to create a MANAGED table with the same definition of the given existing table.
+ * In the target table definition, the table comment is always empty but the column comments
+ * are identical to the ones defined in the source table.
+ *
+ * The CatalogTable attributes copied from the source table are storage(inputFormat, outputFormat,
+ * serde, compressed, properties), schema, provider, partitionColumnNames, bucketSpec.
*
* The syntax of using this command in SQL is:
* {{{
@@ -58,18 +64,45 @@ case class CreateTableLikeCommand(
throw new AnalysisException(
s"Source table in CREATE TABLE LIKE does not exist: '$sourceTable'")
}
- if (catalog.isTemporaryTable(sourceTable)) {
- throw new AnalysisException(
- s"Source table in CREATE TABLE LIKE cannot be temporary: '$sourceTable'")
+
+ val sourceTableDesc = catalog.getTableMetadata(sourceTable)
+
+ // Storage format
+ val newStorage =
+ if (sourceTableDesc.tableType == CatalogTableType.VIEW) {
+ val newPath = catalog.defaultTablePath(targetTable)
+ CatalogStorageFormat.empty.copy(properties = Map("path" -> newPath))
+ } else if (DDLUtils.isDatasourceTable(sourceTableDesc)) {
+ val newPath = catalog.defaultTablePath(targetTable)
+ val newSerdeProp =
+ sourceTableDesc.storage.properties.filterKeys(_.toLowerCase != "path") ++
+ Map("path" -> newPath)
+ sourceTableDesc.storage.copy(
+ locationUri = None,
+ properties = newSerdeProp)
+ } else {
+ sourceTableDesc.storage.copy(
+ locationUri = None,
+ properties = sourceTableDesc.storage.properties)
+ }
+
+ val newProvider = if (sourceTableDesc.tableType == CatalogTableType.VIEW) {
+ Some(sparkSession.sessionState.conf.defaultDataSourceName)
+ } else {
+ sourceTableDesc.provider
}
- val tableToCreate = catalog.getTableMetadata(sourceTable).copy(
- identifier = targetTable,
- tableType = CatalogTableType.MANAGED,
- createTime = System.currentTimeMillis,
- lastAccessTime = -1).withNewStorage(locationUri = None)
+ val newTableDesc =
+ CatalogTable(
+ identifier = targetTable,
+ tableType = CatalogTableType.MANAGED,
+ storage = newStorage,
+ schema = sourceTableDesc.schema,
+ provider = newProvider,
+ partitionColumnNames = sourceTableDesc.partitionColumnNames,
+ bucketSpec = sourceTableDesc.bucketSpec)
- catalog.createTable(tableToCreate, ifNotExists)
+ catalog.createTable(newTableDesc, ifNotExists)
Seq.empty[Row]
}
}
@@ -517,7 +550,7 @@ case class ShowTablesCommand(
/**
- * A command for users to list the properties for a table If propertyKey is specified, the value
+ * A command for users to list the properties for a table. If propertyKey is specified, the value
* for the propertyKey is returned. If propertyKey is not specified, all the keys and their
* corresponding values are returned.
* The syntax of using this command in SQL is:
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala
index dd982192a3..54ec61abed 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala
@@ -404,7 +404,9 @@ private[hive] class HiveClientImpl(
properties = Option(h.getTTable.getSd.getSerdeInfo.getParameters)
.map(_.asScala.toMap).orNull
),
- properties = properties.filter(kv => kv._1 != "comment"),
+ // For EXTERNAL_TABLE, the table properties has a particular field "EXTERNAL". This is added
+ // in the function toHiveTable.
+ properties = properties.filter(kv => kv._1 != "comment" && kv._1 != "EXTERNAL"),
comment = properties.get("comment"),
viewOriginalText = Option(h.getViewOriginalText),
viewText = Option(h.getViewExpandedText),
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala
index 58c43ebcae..7f3d96de85 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala
@@ -25,8 +25,10 @@ import org.scalatest.BeforeAndAfterEach
import org.apache.spark.internal.config._
import org.apache.spark.sql.{AnalysisException, QueryTest, Row, SaveMode}
import org.apache.spark.sql.catalyst.analysis.TableAlreadyExistsException
-import org.apache.spark.sql.catalyst.catalog.{CatalogDatabase, CatalogTableType}
+import org.apache.spark.sql.catalyst.catalog.{CatalogDatabase, CatalogTable, CatalogTableType}
import org.apache.spark.sql.catalyst.TableIdentifier
+import org.apache.spark.sql.execution.command.DDLUtils
+import org.apache.spark.sql.execution.datasources.CaseInsensitiveMap
import org.apache.spark.sql.hive.HiveExternalCatalog
import org.apache.spark.sql.hive.test.TestHiveSingleton
import org.apache.spark.sql.internal.SQLConf
@@ -660,6 +662,228 @@ class HiveDDLSuite
}
}
+ test("CREATE TABLE LIKE a temporary view") {
+ val sourceViewName = "tab1"
+ val targetTabName = "tab2"
+ withTempView(sourceViewName) {
+ withTable(targetTabName) {
+ spark.range(10).select('id as 'a, 'id as 'b, 'id as 'c, 'id as 'd)
+ .createTempView(sourceViewName)
+ sql(s"CREATE TABLE $targetTabName LIKE $sourceViewName")
+
+ val sourceTable = spark.sessionState.catalog.getTableMetadata(
+ TableIdentifier(sourceViewName, None))
+ val targetTable = spark.sessionState.catalog.getTableMetadata(
+ TableIdentifier(targetTabName, Some("default")))
+
+ checkCreateTableLike(sourceTable, targetTable)
+ }
+ }
+ }
+
+ test("CREATE TABLE LIKE a data source table") {
+ val sourceTabName = "tab1"
+ val targetTabName = "tab2"
+ withTable(sourceTabName, targetTabName) {
+ spark.range(10).select('id as 'a, 'id as 'b, 'id as 'c, 'id as 'd)
+ .write.format("json").saveAsTable(sourceTabName)
+ sql(s"CREATE TABLE $targetTabName LIKE $sourceTabName")
+
+ val sourceTable =
+ spark.sessionState.catalog.getTableMetadata(TableIdentifier(sourceTabName, Some("default")))
+ val targetTable =
+ spark.sessionState.catalog.getTableMetadata(TableIdentifier(targetTabName, Some("default")))
+ // The table type of the source table should be a Hive-managed data source table
+ assert(DDLUtils.isDatasourceTable(sourceTable))
+ assert(sourceTable.tableType == CatalogTableType.MANAGED)
+
+ checkCreateTableLike(sourceTable, targetTable)
+ }
+ }
+
+ test("CREATE TABLE LIKE an external data source table") {
+ val sourceTabName = "tab1"
+ val targetTabName = "tab2"
+ withTable(sourceTabName, targetTabName) {
+ withTempPath { dir =>
+ val path = dir.getCanonicalPath
+ spark.range(10).select('id as 'a, 'id as 'b, 'id as 'c, 'id as 'd)
+ .write.format("parquet").save(path)
+ sql(s"CREATE TABLE $sourceTabName USING parquet OPTIONS (PATH '$path')")
+ sql(s"CREATE TABLE $targetTabName LIKE $sourceTabName")
+
+ // The source table should be an external data source table
+ val sourceTable = spark.sessionState.catalog.getTableMetadata(
+ TableIdentifier(sourceTabName, Some("default")))
+ val targetTable = spark.sessionState.catalog.getTableMetadata(
+ TableIdentifier(targetTabName, Some("default")))
+ // The table type of the source table should be an external data source table
+ assert(DDLUtils.isDatasourceTable(sourceTable))
+ assert(sourceTable.tableType == CatalogTableType.EXTERNAL)
+
+ checkCreateTableLike(sourceTable, targetTable)
+ }
+ }
+ }
+
+ test("CREATE TABLE LIKE a managed Hive serde table") {
+ val catalog = spark.sessionState.catalog
+ val sourceTabName = "tab1"
+ val targetTabName = "tab2"
+ withTable(sourceTabName, targetTabName) {
+ sql(s"CREATE TABLE $sourceTabName TBLPROPERTIES('prop1'='value1') AS SELECT 1 key, 'a'")
+ sql(s"CREATE TABLE $targetTabName LIKE $sourceTabName")
+
+ val sourceTable = catalog.getTableMetadata(TableIdentifier(sourceTabName, Some("default")))
+ assert(sourceTable.tableType == CatalogTableType.MANAGED)
+ assert(sourceTable.properties.get("prop1").nonEmpty)
+ val targetTable = catalog.getTableMetadata(TableIdentifier(targetTabName, Some("default")))
+
+ checkCreateTableLike(sourceTable, targetTable)
+ }
+ }
+
+ test("CREATE TABLE LIKE an external Hive serde table") {
+ val catalog = spark.sessionState.catalog
+ withTempDir { tmpDir =>
+ val basePath = tmpDir.getCanonicalPath
+ val sourceTabName = "tab1"
+ val targetTabName = "tab2"
+ withTable(sourceTabName, targetTabName) {
+ assert(tmpDir.listFiles.isEmpty)
+ sql(
+ s"""
+ |CREATE EXTERNAL TABLE $sourceTabName (key INT comment 'test', value STRING)
+ |COMMENT 'Apache Spark'
+ |PARTITIONED BY (ds STRING, hr STRING)
+ |LOCATION '$basePath'
+ """.stripMargin)
+ for (ds <- Seq("2008-04-08", "2008-04-09"); hr <- Seq("11", "12")) {
+ sql(
+ s"""
+ |INSERT OVERWRITE TABLE $sourceTabName
+ |partition (ds='$ds',hr='$hr')
+ |SELECT 1, 'a'
+ """.stripMargin)
+ }
+ sql(s"CREATE TABLE $targetTabName LIKE $sourceTabName")
+
+ val sourceTable = catalog.getTableMetadata(TableIdentifier(sourceTabName, Some("default")))
+ assert(sourceTable.tableType == CatalogTableType.EXTERNAL)
+ assert(sourceTable.comment == Option("Apache Spark"))
+ val targetTable = catalog.getTableMetadata(TableIdentifier(targetTabName, Some("default")))
+
+ checkCreateTableLike(sourceTable, targetTable)
+ }
+ }
+ }
+
+ test("CREATE TABLE LIKE a view") {
+ val sourceTabName = "tab1"
+ val sourceViewName = "view"
+ val targetTabName = "tab2"
+ withTable(sourceTabName, targetTabName) {
+ withView(sourceViewName) {
+ spark.range(10).select('id as 'a, 'id as 'b, 'id as 'c, 'id as 'd)
+ .write.format("json").saveAsTable(sourceTabName)
+ sql(s"CREATE VIEW $sourceViewName AS SELECT * FROM $sourceTabName")
+ sql(s"CREATE TABLE $targetTabName LIKE $sourceViewName")
+
+ val sourceView = spark.sessionState.catalog.getTableMetadata(
+ TableIdentifier(sourceViewName, Some("default")))
+ // The original source should be a VIEW with an empty path
+ assert(sourceView.tableType == CatalogTableType.VIEW)
+ assert(sourceView.viewText.nonEmpty && sourceView.viewOriginalText.nonEmpty)
+ val targetTable = spark.sessionState.catalog.getTableMetadata(
+ TableIdentifier(targetTabName, Some("default")))
+
+ checkCreateTableLike(sourceView, targetTable)
+ }
+ }
+ }
+
+ private def getTablePath(table: CatalogTable): Option[String] = {
+ if (DDLUtils.isDatasourceTable(table)) {
+ new CaseInsensitiveMap(table.storage.properties).get("path")
+ } else {
+ table.storage.locationUri
+ }
+ }
+
+ private def checkCreateTableLike(sourceTable: CatalogTable, targetTable: CatalogTable): Unit = {
+ // The created table should be a MANAGED table with empty view text and original text.
+ assert(targetTable.tableType == CatalogTableType.MANAGED,
+ "the created table must be a Hive managed table")
+ assert(targetTable.viewText.isEmpty && targetTable.viewOriginalText.isEmpty,
+ "the view text and original text in the created table must be empty")
+ assert(targetTable.comment.isEmpty,
+ "the comment in the created table must be empty")
+ assert(targetTable.unsupportedFeatures.isEmpty,
+ "the unsupportedFeatures in the create table must be empty")
+
+ val metastoreGeneratedProperties = Seq(
+ "CreateTime",
+ "transient_lastDdlTime",
+ "grantTime",
+ "lastUpdateTime",
+ "last_modified_by",
+ "last_modified_time",
+ "Owner:",
+ "COLUMN_STATS_ACCURATE",
+ "numFiles",
+ "numRows",
+ "rawDataSize",
+ "totalSize",
+ "totalNumberFiles",
+ "maxFileSize",
+ "minFileSize"
+ )
+ assert(targetTable.properties.filterKeys(!metastoreGeneratedProperties.contains(_)).isEmpty,
+ "the table properties of source tables should not be copied in the created table")
+
+ if (DDLUtils.isDatasourceTable(sourceTable) ||
+ sourceTable.tableType == CatalogTableType.VIEW) {
+ assert(DDLUtils.isDatasourceTable(targetTable),
+ "the target table should be a data source table")
+ } else {
+ assert(!DDLUtils.isDatasourceTable(targetTable),
+ "the target table should be a Hive serde table")
+ }
+
+ if (sourceTable.tableType == CatalogTableType.VIEW) {
+ // Source table is a temporary/permanent view, which does not have a provider. The created
+ // target table uses the default data source format
+ assert(targetTable.provider == Option(spark.sessionState.conf.defaultDataSourceName))
+ } else {
+ assert(targetTable.provider == sourceTable.provider)
+ }
+
+ val sourceTablePath = getTablePath(sourceTable)
+ val targetTablePath = getTablePath(targetTable)
+ assert(targetTablePath.nonEmpty, "target table path should not be empty")
+ assert(sourceTablePath != targetTablePath,
+ "source table/view path should be different from target table path")
+
+ // The source table contents should not been seen in the target table.
+ assert(spark.table(sourceTable.identifier).count() != 0, "the source table should be nonempty")
+ assert(spark.table(targetTable.identifier).count() == 0, "the target table should be empty")
+
+ // Their schema should be identical
+ checkAnswer(
+ sql(s"DESC ${sourceTable.identifier}"),
+ sql(s"DESC ${targetTable.identifier}"))
+
+ withSQLConf("hive.exec.dynamic.partition.mode" -> "nonstrict") {
+ // Check whether the new table can be inserted using the data from the original table
+ sql(s"INSERT INTO TABLE ${targetTable.identifier} SELECT * FROM ${sourceTable.identifier}")
+ }
+
+ // After insertion, the data should be identical
+ checkAnswer(
+ sql(s"SELECT * FROM ${sourceTable.identifier}"),
+ sql(s"SELECT * FROM ${targetTable.identifier}"))
+ }
+
test("desc table for data source table") {
withTable("tab1") {
val tabName = "tab1"