aboutsummaryrefslogtreecommitdiff
path: root/sql
diff options
context:
space:
mode:
authorgatorsmile <gatorsmile@gmail.com>2016-09-01 16:36:14 +0800
committerWenchen Fan <wenchen@databricks.com>2016-09-01 16:36:14 +0800
commit1f06a5b6a0584d0c9656f58eaf54e54e2383c82b (patch)
tree9ed5ed2249d690e4752b61e0da3f7e0c852a155b /sql
parentdd859f95c0aaa0b7c8fbff0a5f108cf3c9bf520a (diff)
downloadspark-1f06a5b6a0584d0c9656f58eaf54e54e2383c82b.tar.gz
spark-1f06a5b6a0584d0c9656f58eaf54e54e2383c82b.tar.bz2
spark-1f06a5b6a0584d0c9656f58eaf54e54e2383c82b.zip
[SPARK-17353][SPARK-16943][SPARK-16942][SQL] Fix multiple bugs in CREATE TABLE LIKE command
### What changes were proposed in this pull request? The existing `CREATE TABLE LIKE` command has multiple issues: - The generated table is non-empty when the source table is a data source table. The major reason is the data source table is using the table property `path` to store the location of table contents. Currently, we keep it unchanged. Thus, we still create the same table with the same location. - The table type of the generated table is `EXTERNAL` when the source table is an external Hive Serde table. Currently, we explicitly set it to `MANAGED`, but Hive is checking the table property `EXTERNAL` to decide whether the table is `EXTERNAL` or not. (See https://github.com/apache/hive/blob/master/metastore/src/java/org/apache/hadoop/hive/metastore/ObjectStore.java#L1407-L1408) Thus, the created table is still `EXTERNAL`. - When the source table is a `VIEW`, the metadata of the generated table contains the original view text and view original text. So far, this does not break anything, but it could cause something wrong in Hive. (For example, https://github.com/apache/hive/blob/master/metastore/src/java/org/apache/hadoop/hive/metastore/ObjectStore.java#L1405-L1406) - The issue regarding the table `comment`. To follow what Hive does, the table comment should be cleaned, but the column comments should be still kept. - The `INDEX` table is not supported. Thus, we should throw an exception in this case. - `owner` should not be retained. `ToHiveTable` set it [here](https://github.com/apache/spark/blob/e679bc3c1cd418ef0025d2ecbc547c9660cac433/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala#L793) no matter which value we set in `CatalogTable`. We set it to an empty string for avoiding the confusing output in Explain. - Add a support for temp tables - Like Hive, we should not copy the table properties from the source table to the created table, especially for the statistics-related properties, which could be wrong in the created table. - `unsupportedFeatures` should not be copied from the source table. The created table does not have these unsupported features. - When the type of source table is a view, the target table is using the default format of data source tables: `spark.sql.sources.default`. This PR is to fix the above issues. ### How was this patch tested? Improve the test coverage by adding more test cases Author: gatorsmile <gatorsmile@gmail.com> Closes #14531 from gatorsmile/createTableLike.
Diffstat (limited to 'sql')
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala57
-rw-r--r--sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala4
-rw-r--r--sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala226
3 files changed, 273 insertions, 14 deletions
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala
index b4a15b8b28..67b2329eff 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala
@@ -29,17 +29,23 @@ import org.apache.hadoop.fs.Path
import org.apache.spark.sql.{AnalysisException, Row, SparkSession}
import org.apache.spark.sql.catalyst.TableIdentifier
-import org.apache.spark.sql.catalyst.catalog.{BucketSpec, CatalogTable, CatalogTableType}
+import org.apache.spark.sql.catalyst.catalog.{BucketSpec, CatalogStorageFormat, CatalogTable, CatalogTableType}
import org.apache.spark.sql.catalyst.catalog.CatalogTableType._
import org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec
import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference}
import org.apache.spark.sql.catalyst.util.quoteIdentifier
import org.apache.spark.sql.execution.datasources.PartitioningUtils
+import org.apache.spark.sql.internal.HiveSerDe
import org.apache.spark.sql.types._
import org.apache.spark.util.Utils
/**
- * A command to create a table with the same definition of the given existing table.
+ * A command to create a MANAGED table with the same definition of the given existing table.
+ * In the target table definition, the table comment is always empty but the column comments
+ * are identical to the ones defined in the source table.
+ *
+ * The CatalogTable attributes copied from the source table are storage(inputFormat, outputFormat,
+ * serde, compressed, properties), schema, provider, partitionColumnNames, bucketSpec.
*
* The syntax of using this command in SQL is:
* {{{
@@ -58,18 +64,45 @@ case class CreateTableLikeCommand(
throw new AnalysisException(
s"Source table in CREATE TABLE LIKE does not exist: '$sourceTable'")
}
- if (catalog.isTemporaryTable(sourceTable)) {
- throw new AnalysisException(
- s"Source table in CREATE TABLE LIKE cannot be temporary: '$sourceTable'")
+
+ val sourceTableDesc = catalog.getTableMetadata(sourceTable)
+
+ // Storage format
+ val newStorage =
+ if (sourceTableDesc.tableType == CatalogTableType.VIEW) {
+ val newPath = catalog.defaultTablePath(targetTable)
+ CatalogStorageFormat.empty.copy(properties = Map("path" -> newPath))
+ } else if (DDLUtils.isDatasourceTable(sourceTableDesc)) {
+ val newPath = catalog.defaultTablePath(targetTable)
+ val newSerdeProp =
+ sourceTableDesc.storage.properties.filterKeys(_.toLowerCase != "path") ++
+ Map("path" -> newPath)
+ sourceTableDesc.storage.copy(
+ locationUri = None,
+ properties = newSerdeProp)
+ } else {
+ sourceTableDesc.storage.copy(
+ locationUri = None,
+ properties = sourceTableDesc.storage.properties)
+ }
+
+ val newProvider = if (sourceTableDesc.tableType == CatalogTableType.VIEW) {
+ Some(sparkSession.sessionState.conf.defaultDataSourceName)
+ } else {
+ sourceTableDesc.provider
}
- val tableToCreate = catalog.getTableMetadata(sourceTable).copy(
- identifier = targetTable,
- tableType = CatalogTableType.MANAGED,
- createTime = System.currentTimeMillis,
- lastAccessTime = -1).withNewStorage(locationUri = None)
+ val newTableDesc =
+ CatalogTable(
+ identifier = targetTable,
+ tableType = CatalogTableType.MANAGED,
+ storage = newStorage,
+ schema = sourceTableDesc.schema,
+ provider = newProvider,
+ partitionColumnNames = sourceTableDesc.partitionColumnNames,
+ bucketSpec = sourceTableDesc.bucketSpec)
- catalog.createTable(tableToCreate, ifNotExists)
+ catalog.createTable(newTableDesc, ifNotExists)
Seq.empty[Row]
}
}
@@ -517,7 +550,7 @@ case class ShowTablesCommand(
/**
- * A command for users to list the properties for a table If propertyKey is specified, the value
+ * A command for users to list the properties for a table. If propertyKey is specified, the value
* for the propertyKey is returned. If propertyKey is not specified, all the keys and their
* corresponding values are returned.
* The syntax of using this command in SQL is:
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala
index dd982192a3..54ec61abed 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala
@@ -404,7 +404,9 @@ private[hive] class HiveClientImpl(
properties = Option(h.getTTable.getSd.getSerdeInfo.getParameters)
.map(_.asScala.toMap).orNull
),
- properties = properties.filter(kv => kv._1 != "comment"),
+ // For EXTERNAL_TABLE, the table properties has a particular field "EXTERNAL". This is added
+ // in the function toHiveTable.
+ properties = properties.filter(kv => kv._1 != "comment" && kv._1 != "EXTERNAL"),
comment = properties.get("comment"),
viewOriginalText = Option(h.getViewOriginalText),
viewText = Option(h.getViewExpandedText),
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala
index 58c43ebcae..7f3d96de85 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala
@@ -25,8 +25,10 @@ import org.scalatest.BeforeAndAfterEach
import org.apache.spark.internal.config._
import org.apache.spark.sql.{AnalysisException, QueryTest, Row, SaveMode}
import org.apache.spark.sql.catalyst.analysis.TableAlreadyExistsException
-import org.apache.spark.sql.catalyst.catalog.{CatalogDatabase, CatalogTableType}
+import org.apache.spark.sql.catalyst.catalog.{CatalogDatabase, CatalogTable, CatalogTableType}
import org.apache.spark.sql.catalyst.TableIdentifier
+import org.apache.spark.sql.execution.command.DDLUtils
+import org.apache.spark.sql.execution.datasources.CaseInsensitiveMap
import org.apache.spark.sql.hive.HiveExternalCatalog
import org.apache.spark.sql.hive.test.TestHiveSingleton
import org.apache.spark.sql.internal.SQLConf
@@ -660,6 +662,228 @@ class HiveDDLSuite
}
}
+ test("CREATE TABLE LIKE a temporary view") {
+ val sourceViewName = "tab1"
+ val targetTabName = "tab2"
+ withTempView(sourceViewName) {
+ withTable(targetTabName) {
+ spark.range(10).select('id as 'a, 'id as 'b, 'id as 'c, 'id as 'd)
+ .createTempView(sourceViewName)
+ sql(s"CREATE TABLE $targetTabName LIKE $sourceViewName")
+
+ val sourceTable = spark.sessionState.catalog.getTableMetadata(
+ TableIdentifier(sourceViewName, None))
+ val targetTable = spark.sessionState.catalog.getTableMetadata(
+ TableIdentifier(targetTabName, Some("default")))
+
+ checkCreateTableLike(sourceTable, targetTable)
+ }
+ }
+ }
+
+ test("CREATE TABLE LIKE a data source table") {
+ val sourceTabName = "tab1"
+ val targetTabName = "tab2"
+ withTable(sourceTabName, targetTabName) {
+ spark.range(10).select('id as 'a, 'id as 'b, 'id as 'c, 'id as 'd)
+ .write.format("json").saveAsTable(sourceTabName)
+ sql(s"CREATE TABLE $targetTabName LIKE $sourceTabName")
+
+ val sourceTable =
+ spark.sessionState.catalog.getTableMetadata(TableIdentifier(sourceTabName, Some("default")))
+ val targetTable =
+ spark.sessionState.catalog.getTableMetadata(TableIdentifier(targetTabName, Some("default")))
+ // The table type of the source table should be a Hive-managed data source table
+ assert(DDLUtils.isDatasourceTable(sourceTable))
+ assert(sourceTable.tableType == CatalogTableType.MANAGED)
+
+ checkCreateTableLike(sourceTable, targetTable)
+ }
+ }
+
+ test("CREATE TABLE LIKE an external data source table") {
+ val sourceTabName = "tab1"
+ val targetTabName = "tab2"
+ withTable(sourceTabName, targetTabName) {
+ withTempPath { dir =>
+ val path = dir.getCanonicalPath
+ spark.range(10).select('id as 'a, 'id as 'b, 'id as 'c, 'id as 'd)
+ .write.format("parquet").save(path)
+ sql(s"CREATE TABLE $sourceTabName USING parquet OPTIONS (PATH '$path')")
+ sql(s"CREATE TABLE $targetTabName LIKE $sourceTabName")
+
+ // The source table should be an external data source table
+ val sourceTable = spark.sessionState.catalog.getTableMetadata(
+ TableIdentifier(sourceTabName, Some("default")))
+ val targetTable = spark.sessionState.catalog.getTableMetadata(
+ TableIdentifier(targetTabName, Some("default")))
+ // The table type of the source table should be an external data source table
+ assert(DDLUtils.isDatasourceTable(sourceTable))
+ assert(sourceTable.tableType == CatalogTableType.EXTERNAL)
+
+ checkCreateTableLike(sourceTable, targetTable)
+ }
+ }
+ }
+
+ test("CREATE TABLE LIKE a managed Hive serde table") {
+ val catalog = spark.sessionState.catalog
+ val sourceTabName = "tab1"
+ val targetTabName = "tab2"
+ withTable(sourceTabName, targetTabName) {
+ sql(s"CREATE TABLE $sourceTabName TBLPROPERTIES('prop1'='value1') AS SELECT 1 key, 'a'")
+ sql(s"CREATE TABLE $targetTabName LIKE $sourceTabName")
+
+ val sourceTable = catalog.getTableMetadata(TableIdentifier(sourceTabName, Some("default")))
+ assert(sourceTable.tableType == CatalogTableType.MANAGED)
+ assert(sourceTable.properties.get("prop1").nonEmpty)
+ val targetTable = catalog.getTableMetadata(TableIdentifier(targetTabName, Some("default")))
+
+ checkCreateTableLike(sourceTable, targetTable)
+ }
+ }
+
+ test("CREATE TABLE LIKE an external Hive serde table") {
+ val catalog = spark.sessionState.catalog
+ withTempDir { tmpDir =>
+ val basePath = tmpDir.getCanonicalPath
+ val sourceTabName = "tab1"
+ val targetTabName = "tab2"
+ withTable(sourceTabName, targetTabName) {
+ assert(tmpDir.listFiles.isEmpty)
+ sql(
+ s"""
+ |CREATE EXTERNAL TABLE $sourceTabName (key INT comment 'test', value STRING)
+ |COMMENT 'Apache Spark'
+ |PARTITIONED BY (ds STRING, hr STRING)
+ |LOCATION '$basePath'
+ """.stripMargin)
+ for (ds <- Seq("2008-04-08", "2008-04-09"); hr <- Seq("11", "12")) {
+ sql(
+ s"""
+ |INSERT OVERWRITE TABLE $sourceTabName
+ |partition (ds='$ds',hr='$hr')
+ |SELECT 1, 'a'
+ """.stripMargin)
+ }
+ sql(s"CREATE TABLE $targetTabName LIKE $sourceTabName")
+
+ val sourceTable = catalog.getTableMetadata(TableIdentifier(sourceTabName, Some("default")))
+ assert(sourceTable.tableType == CatalogTableType.EXTERNAL)
+ assert(sourceTable.comment == Option("Apache Spark"))
+ val targetTable = catalog.getTableMetadata(TableIdentifier(targetTabName, Some("default")))
+
+ checkCreateTableLike(sourceTable, targetTable)
+ }
+ }
+ }
+
+ test("CREATE TABLE LIKE a view") {
+ val sourceTabName = "tab1"
+ val sourceViewName = "view"
+ val targetTabName = "tab2"
+ withTable(sourceTabName, targetTabName) {
+ withView(sourceViewName) {
+ spark.range(10).select('id as 'a, 'id as 'b, 'id as 'c, 'id as 'd)
+ .write.format("json").saveAsTable(sourceTabName)
+ sql(s"CREATE VIEW $sourceViewName AS SELECT * FROM $sourceTabName")
+ sql(s"CREATE TABLE $targetTabName LIKE $sourceViewName")
+
+ val sourceView = spark.sessionState.catalog.getTableMetadata(
+ TableIdentifier(sourceViewName, Some("default")))
+ // The original source should be a VIEW with an empty path
+ assert(sourceView.tableType == CatalogTableType.VIEW)
+ assert(sourceView.viewText.nonEmpty && sourceView.viewOriginalText.nonEmpty)
+ val targetTable = spark.sessionState.catalog.getTableMetadata(
+ TableIdentifier(targetTabName, Some("default")))
+
+ checkCreateTableLike(sourceView, targetTable)
+ }
+ }
+ }
+
+ private def getTablePath(table: CatalogTable): Option[String] = {
+ if (DDLUtils.isDatasourceTable(table)) {
+ new CaseInsensitiveMap(table.storage.properties).get("path")
+ } else {
+ table.storage.locationUri
+ }
+ }
+
+ private def checkCreateTableLike(sourceTable: CatalogTable, targetTable: CatalogTable): Unit = {
+ // The created table should be a MANAGED table with empty view text and original text.
+ assert(targetTable.tableType == CatalogTableType.MANAGED,
+ "the created table must be a Hive managed table")
+ assert(targetTable.viewText.isEmpty && targetTable.viewOriginalText.isEmpty,
+ "the view text and original text in the created table must be empty")
+ assert(targetTable.comment.isEmpty,
+ "the comment in the created table must be empty")
+ assert(targetTable.unsupportedFeatures.isEmpty,
+ "the unsupportedFeatures in the create table must be empty")
+
+ val metastoreGeneratedProperties = Seq(
+ "CreateTime",
+ "transient_lastDdlTime",
+ "grantTime",
+ "lastUpdateTime",
+ "last_modified_by",
+ "last_modified_time",
+ "Owner:",
+ "COLUMN_STATS_ACCURATE",
+ "numFiles",
+ "numRows",
+ "rawDataSize",
+ "totalSize",
+ "totalNumberFiles",
+ "maxFileSize",
+ "minFileSize"
+ )
+ assert(targetTable.properties.filterKeys(!metastoreGeneratedProperties.contains(_)).isEmpty,
+ "the table properties of source tables should not be copied in the created table")
+
+ if (DDLUtils.isDatasourceTable(sourceTable) ||
+ sourceTable.tableType == CatalogTableType.VIEW) {
+ assert(DDLUtils.isDatasourceTable(targetTable),
+ "the target table should be a data source table")
+ } else {
+ assert(!DDLUtils.isDatasourceTable(targetTable),
+ "the target table should be a Hive serde table")
+ }
+
+ if (sourceTable.tableType == CatalogTableType.VIEW) {
+ // Source table is a temporary/permanent view, which does not have a provider. The created
+ // target table uses the default data source format
+ assert(targetTable.provider == Option(spark.sessionState.conf.defaultDataSourceName))
+ } else {
+ assert(targetTable.provider == sourceTable.provider)
+ }
+
+ val sourceTablePath = getTablePath(sourceTable)
+ val targetTablePath = getTablePath(targetTable)
+ assert(targetTablePath.nonEmpty, "target table path should not be empty")
+ assert(sourceTablePath != targetTablePath,
+ "source table/view path should be different from target table path")
+
+ // The source table contents should not been seen in the target table.
+ assert(spark.table(sourceTable.identifier).count() != 0, "the source table should be nonempty")
+ assert(spark.table(targetTable.identifier).count() == 0, "the target table should be empty")
+
+ // Their schema should be identical
+ checkAnswer(
+ sql(s"DESC ${sourceTable.identifier}"),
+ sql(s"DESC ${targetTable.identifier}"))
+
+ withSQLConf("hive.exec.dynamic.partition.mode" -> "nonstrict") {
+ // Check whether the new table can be inserted using the data from the original table
+ sql(s"INSERT INTO TABLE ${targetTable.identifier} SELECT * FROM ${sourceTable.identifier}")
+ }
+
+ // After insertion, the data should be identical
+ checkAnswer(
+ sql(s"SELECT * FROM ${sourceTable.identifier}"),
+ sql(s"SELECT * FROM ${targetTable.identifier}"))
+ }
+
test("desc table for data source table") {
withTable("tab1") {
val tabName = "tab1"