From ce13c2672318242748f7520ed4ce6bcfad4fb428 Mon Sep 17 00:00:00 2001 From: Wenchen Fan Date: Thu, 17 Nov 2016 17:31:12 -0800 Subject: [SPARK-18360][SQL] default table path of tables in default database should depend on the location of default database ## What changes were proposed in this pull request? The current semantic of the warehouse config: 1. it's a static config, which means you can't change it once your spark application is launched. 2. Once a database is created, its location won't change even the warehouse path config is changed. 3. default database is a special case, although its location is fixed, but the locations of tables created in it are not. If a Spark app starts with warehouse path B(while the location of default database is A), then users create a table `tbl` in default database, its location will be `B/tbl` instead of `A/tbl`. If uses change the warehouse path config to C, and create another table `tbl2`, its location will still be `B/tbl2` instead of `C/tbl2`. rule 3 doesn't make sense and I think we made it by mistake, not intentionally. Data source tables don't follow rule 3 and treat default database like normal ones. This PR fixes hive serde tables to make it consistent with data source tables. ## How was this patch tested? HiveSparkSubmitSuite Author: Wenchen Fan Closes #15812 from cloud-fan/default-db. --- .../spark/sql/hive/HiveExternalCatalog.scala | 237 +++++++++++---------- .../spark/sql/hive/HiveSparkSubmitSuite.scala | 76 +++++-- 2 files changed, 190 insertions(+), 123 deletions(-) (limited to 'sql/hive/src') diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala index 843305883a..cacffcf33c 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala @@ -197,136 +197,151 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, hadoopConf: Configurat if (tableDefinition.tableType == VIEW) { client.createTable(tableDefinition, ignoreIfExists) - } else if (tableDefinition.provider.get == DDLUtils.HIVE_PROVIDER) { - // Here we follow data source tables and put table metadata like provider, schema, etc. in - // table properties, so that we can work around the Hive metastore issue about not case - // preserving and make Hive serde table support mixed-case column names. - val tableWithDataSourceProps = tableDefinition.copy( - properties = tableDefinition.properties ++ tableMetaToTableProps(tableDefinition)) - client.createTable(tableWithDataSourceProps, ignoreIfExists) } else { - // To work around some hive metastore issues, e.g. not case-preserving, bad decimal type - // support, no column nullability, etc., we should do some extra works before saving table - // metadata into Hive metastore: - // 1. Put table metadata like provider, schema, etc. in table properties. - // 2. Check if this table is hive compatible. - // 2.1 If it's not hive compatible, set location URI, schema, partition columns and bucket - // spec to empty and save table metadata to Hive. - // 2.2 If it's hive compatible, set serde information in table metadata and try to save - // it to Hive. If it fails, treat it as not hive compatible and go back to 2.1 - val tableProperties = tableMetaToTableProps(tableDefinition) - // Ideally we should not create a managed table with location, but Hive serde table can // specify location for managed table. And in [[CreateDataSourceTableAsSelectCommand]] we have // to create the table directory and write out data before we create this table, to avoid // exposing a partial written table. val needDefaultTableLocation = tableDefinition.tableType == MANAGED && tableDefinition.storage.locationUri.isEmpty + val tableLocation = if (needDefaultTableLocation) { Some(defaultTablePath(tableDefinition.identifier)) } else { tableDefinition.storage.locationUri } - // Ideally we should also put `locationUri` in table properties like provider, schema, etc. - // However, in older version of Spark we already store table location in storage properties - // with key "path". Here we keep this behaviour for backward compatibility. - val storagePropsWithLocation = tableDefinition.storage.properties ++ - tableLocation.map("path" -> _) - - // converts the table metadata to Spark SQL specific format, i.e. set data schema, names and - // bucket specification to empty. Note that partition columns are retained, so that we can - // call partition-related Hive API later. - def newSparkSQLSpecificMetastoreTable(): CatalogTable = { - tableDefinition.copy( - // Hive only allows directory paths as location URIs while Spark SQL data source tables - // also allow file paths. For non-hive-compatible format, we should not set location URI - // to avoid hive metastore to throw exception. - storage = tableDefinition.storage.copy( - locationUri = None, - properties = storagePropsWithLocation), - schema = tableDefinition.partitionSchema, - bucketSpec = None, - properties = tableDefinition.properties ++ tableProperties) + + if (tableDefinition.provider.get == DDLUtils.HIVE_PROVIDER) { + val tableWithDataSourceProps = tableDefinition.copy( + // We can't leave `locationUri` empty and count on Hive metastore to set a default table + // location, because Hive metastore uses hive.metastore.warehouse.dir to generate default + // table location for tables in default database, while we expect to use the location of + // default database. + storage = tableDefinition.storage.copy(locationUri = tableLocation), + // Here we follow data source tables and put table metadata like provider, schema, etc. in + // table properties, so that we can work around the Hive metastore issue about not case + // preserving and make Hive serde table support mixed-case column names. + properties = tableDefinition.properties ++ tableMetaToTableProps(tableDefinition)) + client.createTable(tableWithDataSourceProps, ignoreIfExists) + } else { + createDataSourceTable( + tableDefinition.withNewStorage(locationUri = tableLocation), + ignoreIfExists) } + } + } - // converts the table metadata to Hive compatible format, i.e. set the serde information. - def newHiveCompatibleMetastoreTable(serde: HiveSerDe): CatalogTable = { - val location = if (tableDefinition.tableType == EXTERNAL) { - // When we hit this branch, we are saving an external data source table with hive - // compatible format, which means the data source is file-based and must have a `path`. - require(tableDefinition.storage.locationUri.isDefined, - "External file-based data source table must have a `path` entry in storage properties.") - Some(new Path(tableDefinition.location).toUri.toString) - } else { - None - } + private def createDataSourceTable(table: CatalogTable, ignoreIfExists: Boolean): Unit = { + // To work around some hive metastore issues, e.g. not case-preserving, bad decimal type + // support, no column nullability, etc., we should do some extra works before saving table + // metadata into Hive metastore: + // 1. Put table metadata like provider, schema, etc. in table properties. + // 2. Check if this table is hive compatible. + // 2.1 If it's not hive compatible, set location URI, schema, partition columns and bucket + // spec to empty and save table metadata to Hive. + // 2.2 If it's hive compatible, set serde information in table metadata and try to save + // it to Hive. If it fails, treat it as not hive compatible and go back to 2.1 + val tableProperties = tableMetaToTableProps(table) + + // Ideally we should also put `locationUri` in table properties like provider, schema, etc. + // However, in older version of Spark we already store table location in storage properties + // with key "path". Here we keep this behaviour for backward compatibility. + val storagePropsWithLocation = table.storage.properties ++ + table.storage.locationUri.map("path" -> _) + + // converts the table metadata to Spark SQL specific format, i.e. set data schema, names and + // bucket specification to empty. Note that partition columns are retained, so that we can + // call partition-related Hive API later. + def newSparkSQLSpecificMetastoreTable(): CatalogTable = { + table.copy( + // Hive only allows directory paths as location URIs while Spark SQL data source tables + // also allow file paths. For non-hive-compatible format, we should not set location URI + // to avoid hive metastore to throw exception. + storage = table.storage.copy( + locationUri = None, + properties = storagePropsWithLocation), + schema = table.partitionSchema, + bucketSpec = None, + properties = table.properties ++ tableProperties) + } - tableDefinition.copy( - storage = tableDefinition.storage.copy( - locationUri = location, - inputFormat = serde.inputFormat, - outputFormat = serde.outputFormat, - serde = serde.serde, - properties = storagePropsWithLocation - ), - properties = tableDefinition.properties ++ tableProperties) + // converts the table metadata to Hive compatible format, i.e. set the serde information. + def newHiveCompatibleMetastoreTable(serde: HiveSerDe): CatalogTable = { + val location = if (table.tableType == EXTERNAL) { + // When we hit this branch, we are saving an external data source table with hive + // compatible format, which means the data source is file-based and must have a `path`. + require(table.storage.locationUri.isDefined, + "External file-based data source table must have a `path` entry in storage properties.") + Some(new Path(table.location).toUri.toString) + } else { + None } - val qualifiedTableName = tableDefinition.identifier.quotedString - val maybeSerde = HiveSerDe.sourceToSerDe(tableDefinition.provider.get) - val skipHiveMetadata = tableDefinition.storage.properties - .getOrElse("skipHiveMetadata", "false").toBoolean - - val (hiveCompatibleTable, logMessage) = maybeSerde match { - case _ if skipHiveMetadata => - val message = - s"Persisting data source table $qualifiedTableName into Hive metastore in" + - "Spark SQL specific format, which is NOT compatible with Hive." - (None, message) - - // our bucketing is un-compatible with hive(different hash function) - case _ if tableDefinition.bucketSpec.nonEmpty => - val message = - s"Persisting bucketed data source table $qualifiedTableName into " + - "Hive metastore in Spark SQL specific format, which is NOT compatible with Hive. " - (None, message) - - case Some(serde) => - val message = - s"Persisting file based data source table $qualifiedTableName into " + - s"Hive metastore in Hive compatible format." - (Some(newHiveCompatibleMetastoreTable(serde)), message) - - case _ => - val provider = tableDefinition.provider.get - val message = - s"Couldn't find corresponding Hive SerDe for data source provider $provider. " + - s"Persisting data source table $qualifiedTableName into Hive metastore in " + - s"Spark SQL specific format, which is NOT compatible with Hive." - (None, message) - } + table.copy( + storage = table.storage.copy( + locationUri = location, + inputFormat = serde.inputFormat, + outputFormat = serde.outputFormat, + serde = serde.serde, + properties = storagePropsWithLocation + ), + properties = table.properties ++ tableProperties) + } - (hiveCompatibleTable, logMessage) match { - case (Some(table), message) => - // We first try to save the metadata of the table in a Hive compatible way. - // If Hive throws an error, we fall back to save its metadata in the Spark SQL - // specific way. - try { - logInfo(message) - saveTableIntoHive(table, ignoreIfExists) - } catch { - case NonFatal(e) => - val warningMessage = - s"Could not persist ${tableDefinition.identifier.quotedString} in a Hive " + - "compatible way. Persisting it into Hive metastore in Spark SQL specific format." - logWarning(warningMessage, e) - saveTableIntoHive(newSparkSQLSpecificMetastoreTable(), ignoreIfExists) - } + val qualifiedTableName = table.identifier.quotedString + val maybeSerde = HiveSerDe.sourceToSerDe(table.provider.get) + val skipHiveMetadata = table.storage.properties + .getOrElse("skipHiveMetadata", "false").toBoolean + + val (hiveCompatibleTable, logMessage) = maybeSerde match { + case _ if skipHiveMetadata => + val message = + s"Persisting data source table $qualifiedTableName into Hive metastore in" + + "Spark SQL specific format, which is NOT compatible with Hive." + (None, message) + + // our bucketing is un-compatible with hive(different hash function) + case _ if table.bucketSpec.nonEmpty => + val message = + s"Persisting bucketed data source table $qualifiedTableName into " + + "Hive metastore in Spark SQL specific format, which is NOT compatible with Hive. " + (None, message) + + case Some(serde) => + val message = + s"Persisting file based data source table $qualifiedTableName into " + + s"Hive metastore in Hive compatible format." + (Some(newHiveCompatibleMetastoreTable(serde)), message) + + case _ => + val provider = table.provider.get + val message = + s"Couldn't find corresponding Hive SerDe for data source provider $provider. " + + s"Persisting data source table $qualifiedTableName into Hive metastore in " + + s"Spark SQL specific format, which is NOT compatible with Hive." + (None, message) + } - case (None, message) => - logWarning(message) - saveTableIntoHive(newSparkSQLSpecificMetastoreTable(), ignoreIfExists) - } + (hiveCompatibleTable, logMessage) match { + case (Some(table), message) => + // We first try to save the metadata of the table in a Hive compatible way. + // If Hive throws an error, we fall back to save its metadata in the Spark SQL + // specific way. + try { + logInfo(message) + saveTableIntoHive(table, ignoreIfExists) + } catch { + case NonFatal(e) => + val warningMessage = + s"Could not persist ${table.identifier.quotedString} in a Hive " + + "compatible way. Persisting it into Hive metastore in Spark SQL specific format." + logWarning(warningMessage, e) + saveTableIntoHive(newSparkSQLSpecificMetastoreTable(), ignoreIfExists) + } + + case (None, message) => + logWarning(message) + saveTableIntoHive(newSparkSQLSpecificMetastoreTable(), ignoreIfExists) } } diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveSparkSubmitSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveSparkSubmitSuite.scala index fbd705172c..a670560c59 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveSparkSubmitSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveSparkSubmitSuite.scala @@ -24,6 +24,7 @@ import java.util.Date import scala.collection.mutable.ArrayBuffer import scala.tools.nsc.Properties +import org.apache.hadoop.fs.Path import org.scalatest.{BeforeAndAfterEach, Matchers} import org.scalatest.concurrent.Timeouts import org.scalatest.exceptions.TestFailedDueToTimeoutException @@ -33,11 +34,12 @@ import org.apache.spark._ import org.apache.spark.internal.Logging import org.apache.spark.sql.{QueryTest, Row, SparkSession} import org.apache.spark.sql.catalyst.{FunctionIdentifier, TableIdentifier} -import org.apache.spark.sql.catalyst.catalog.{CatalogFunction, FunctionResource, JarResource} +import org.apache.spark.sql.catalyst.catalog._ +import org.apache.spark.sql.execution.command.DDLUtils import org.apache.spark.sql.expressions.Window import org.apache.spark.sql.hive.test.{TestHive, TestHiveContext} import org.apache.spark.sql.test.ProcessTestUtils.ProcessOutputCapturer -import org.apache.spark.sql.types.DecimalType +import org.apache.spark.sql.types.{DecimalType, StructType} import org.apache.spark.util.{ResetSystemProperties, Utils} /** @@ -295,6 +297,20 @@ class HiveSparkSubmitSuite runSparkSubmit(args) } + test("SPARK-18360: default table path of tables in default database should depend on the " + + "location of default database") { + val unusedJar = TestUtils.createJarWithClasses(Seq.empty) + val args = Seq( + "--class", SPARK_18360.getClass.getName.stripSuffix("$"), + "--name", "SPARK-18360", + "--master", "local-cluster[2,1,1024]", + "--conf", "spark.ui.enabled=false", + "--conf", "spark.master.rest.enabled=false", + "--driver-java-options", "-Dderby.system.durability=test", + unusedJar.toString) + runSparkSubmit(args) + } + // NOTE: This is an expensive operation in terms of time (10 seconds+). Use sparingly. // This is copied from org.apache.spark.deploy.SparkSubmitSuite private def runSparkSubmit(args: Seq[String]): Unit = { @@ -397,11 +413,7 @@ object SetWarehouseLocationTest extends Logging { def main(args: Array[String]): Unit = { Utils.configTestLog4j("INFO") - val sparkConf = new SparkConf(loadDefaults = true) - val builder = SparkSession.builder() - .config(sparkConf) - .config("spark.ui.enabled", "false") - .enableHiveSupport() + val sparkConf = new SparkConf(loadDefaults = true).set("spark.ui.enabled", "false") val providedExpectedWarehouseLocation = sparkConf.getOption("spark.sql.test.expectedWarehouseDir") @@ -410,7 +422,7 @@ object SetWarehouseLocationTest extends Logging { // If spark.sql.test.expectedWarehouseDir is set, the warehouse dir is set // through spark-summit. So, neither spark.sql.warehouse.dir nor // hive.metastore.warehouse.dir is set at here. - (builder.getOrCreate(), warehouseDir) + (new TestHiveContext(new SparkContext(sparkConf)).sparkSession, warehouseDir) case None => val warehouseLocation = Utils.createTempDir() warehouseLocation.delete() @@ -420,10 +432,10 @@ object SetWarehouseLocationTest extends Logging { // spark.sql.warehouse.dir and hive.metastore.warehouse.dir. // We are expecting that the value of spark.sql.warehouse.dir will override the // value of hive.metastore.warehouse.dir. - val session = builder - .config("spark.sql.warehouse.dir", warehouseLocation.toString) - .config("hive.metastore.warehouse.dir", hiveWarehouseLocation.toString) - .getOrCreate() + val session = new TestHiveContext(new SparkContext(sparkConf + .set("spark.sql.warehouse.dir", warehouseLocation.toString) + .set("hive.metastore.warehouse.dir", hiveWarehouseLocation.toString))) + .sparkSession (session, warehouseLocation.toString) } @@ -801,3 +813,43 @@ object SPARK_14244 extends QueryTest { } } } + +object SPARK_18360 { + def main(args: Array[String]): Unit = { + val spark = SparkSession.builder() + .config("spark.ui.enabled", "false") + .enableHiveSupport().getOrCreate() + + val defaultDbLocation = spark.catalog.getDatabase("default").locationUri + assert(new Path(defaultDbLocation) == new Path(spark.sharedState.warehousePath)) + + val hiveClient = spark.sharedState.externalCatalog.asInstanceOf[HiveExternalCatalog].client + + try { + val tableMeta = CatalogTable( + identifier = TableIdentifier("test_tbl", Some("default")), + tableType = CatalogTableType.MANAGED, + storage = CatalogStorageFormat.empty, + schema = new StructType().add("i", "int"), + provider = Some(DDLUtils.HIVE_PROVIDER)) + + val newWarehousePath = Utils.createTempDir().getAbsolutePath + hiveClient.runSqlHive(s"SET hive.metastore.warehouse.dir=$newWarehousePath") + hiveClient.createTable(tableMeta, ignoreIfExists = false) + val rawTable = hiveClient.getTable("default", "test_tbl") + // Hive will use the value of `hive.metastore.warehouse.dir` to generate default table + // location for tables in default database. + assert(rawTable.storage.locationUri.get.contains(newWarehousePath)) + hiveClient.dropTable("default", "test_tbl", ignoreIfNotExists = false, purge = false) + + spark.sharedState.externalCatalog.createTable(tableMeta, ignoreIfExists = false) + val readBack = spark.sharedState.externalCatalog.getTable("default", "test_tbl") + // Spark SQL will use the location of default database to generate default table + // location for tables in default database. + assert(readBack.storage.locationUri.get.contains(defaultDbLocation)) + } finally { + hiveClient.dropTable("default", "test_tbl", ignoreIfNotExists = true, purge = false) + hiveClient.runSqlHive(s"SET hive.metastore.warehouse.dir=$defaultDbLocation") + } + } +} -- cgit v1.2.3