aboutsummaryrefslogtreecommitdiff
path: root/sql/hive
diff options
context:
space:
mode:
authorWenchen Fan <wenchen@databricks.com>2016-11-17 17:31:12 -0800
committerYin Huai <yhuai@databricks.com>2016-11-17 17:31:12 -0800
commitce13c2672318242748f7520ed4ce6bcfad4fb428 (patch)
tree76d5ca2b6d5bd8b69dcf4bf97bb77d9c22d67b3c /sql/hive
parentb0aa1aa1af6c513a6a881eaea96abdd2b480ef98 (diff)
downloadspark-ce13c2672318242748f7520ed4ce6bcfad4fb428.tar.gz
spark-ce13c2672318242748f7520ed4ce6bcfad4fb428.tar.bz2
spark-ce13c2672318242748f7520ed4ce6bcfad4fb428.zip
[SPARK-18360][SQL] default table path of tables in default database should depend on the location of default database
## What changes were proposed in this pull request? The current semantic of the warehouse config: 1. it's a static config, which means you can't change it once your spark application is launched. 2. Once a database is created, its location won't change even the warehouse path config is changed. 3. default database is a special case, although its location is fixed, but the locations of tables created in it are not. If a Spark app starts with warehouse path B(while the location of default database is A), then users create a table `tbl` in default database, its location will be `B/tbl` instead of `A/tbl`. If uses change the warehouse path config to C, and create another table `tbl2`, its location will still be `B/tbl2` instead of `C/tbl2`. rule 3 doesn't make sense and I think we made it by mistake, not intentionally. Data source tables don't follow rule 3 and treat default database like normal ones. This PR fixes hive serde tables to make it consistent with data source tables. ## How was this patch tested? HiveSparkSubmitSuite Author: Wenchen Fan <wenchen@databricks.com> Closes #15812 from cloud-fan/default-db.
Diffstat (limited to 'sql/hive')
-rw-r--r--sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala237
-rw-r--r--sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveSparkSubmitSuite.scala76
2 files changed, 190 insertions, 123 deletions
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala
index 843305883a..cacffcf33c 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala
@@ -197,136 +197,151 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, hadoopConf: Configurat
if (tableDefinition.tableType == VIEW) {
client.createTable(tableDefinition, ignoreIfExists)
- } else if (tableDefinition.provider.get == DDLUtils.HIVE_PROVIDER) {
- // Here we follow data source tables and put table metadata like provider, schema, etc. in
- // table properties, so that we can work around the Hive metastore issue about not case
- // preserving and make Hive serde table support mixed-case column names.
- val tableWithDataSourceProps = tableDefinition.copy(
- properties = tableDefinition.properties ++ tableMetaToTableProps(tableDefinition))
- client.createTable(tableWithDataSourceProps, ignoreIfExists)
} else {
- // To work around some hive metastore issues, e.g. not case-preserving, bad decimal type
- // support, no column nullability, etc., we should do some extra works before saving table
- // metadata into Hive metastore:
- // 1. Put table metadata like provider, schema, etc. in table properties.
- // 2. Check if this table is hive compatible.
- // 2.1 If it's not hive compatible, set location URI, schema, partition columns and bucket
- // spec to empty and save table metadata to Hive.
- // 2.2 If it's hive compatible, set serde information in table metadata and try to save
- // it to Hive. If it fails, treat it as not hive compatible and go back to 2.1
- val tableProperties = tableMetaToTableProps(tableDefinition)
-
// Ideally we should not create a managed table with location, but Hive serde table can
// specify location for managed table. And in [[CreateDataSourceTableAsSelectCommand]] we have
// to create the table directory and write out data before we create this table, to avoid
// exposing a partial written table.
val needDefaultTableLocation = tableDefinition.tableType == MANAGED &&
tableDefinition.storage.locationUri.isEmpty
+
val tableLocation = if (needDefaultTableLocation) {
Some(defaultTablePath(tableDefinition.identifier))
} else {
tableDefinition.storage.locationUri
}
- // Ideally we should also put `locationUri` in table properties like provider, schema, etc.
- // However, in older version of Spark we already store table location in storage properties
- // with key "path". Here we keep this behaviour for backward compatibility.
- val storagePropsWithLocation = tableDefinition.storage.properties ++
- tableLocation.map("path" -> _)
-
- // converts the table metadata to Spark SQL specific format, i.e. set data schema, names and
- // bucket specification to empty. Note that partition columns are retained, so that we can
- // call partition-related Hive API later.
- def newSparkSQLSpecificMetastoreTable(): CatalogTable = {
- tableDefinition.copy(
- // Hive only allows directory paths as location URIs while Spark SQL data source tables
- // also allow file paths. For non-hive-compatible format, we should not set location URI
- // to avoid hive metastore to throw exception.
- storage = tableDefinition.storage.copy(
- locationUri = None,
- properties = storagePropsWithLocation),
- schema = tableDefinition.partitionSchema,
- bucketSpec = None,
- properties = tableDefinition.properties ++ tableProperties)
+
+ if (tableDefinition.provider.get == DDLUtils.HIVE_PROVIDER) {
+ val tableWithDataSourceProps = tableDefinition.copy(
+ // We can't leave `locationUri` empty and count on Hive metastore to set a default table
+ // location, because Hive metastore uses hive.metastore.warehouse.dir to generate default
+ // table location for tables in default database, while we expect to use the location of
+ // default database.
+ storage = tableDefinition.storage.copy(locationUri = tableLocation),
+ // Here we follow data source tables and put table metadata like provider, schema, etc. in
+ // table properties, so that we can work around the Hive metastore issue about not case
+ // preserving and make Hive serde table support mixed-case column names.
+ properties = tableDefinition.properties ++ tableMetaToTableProps(tableDefinition))
+ client.createTable(tableWithDataSourceProps, ignoreIfExists)
+ } else {
+ createDataSourceTable(
+ tableDefinition.withNewStorage(locationUri = tableLocation),
+ ignoreIfExists)
}
+ }
+ }
- // converts the table metadata to Hive compatible format, i.e. set the serde information.
- def newHiveCompatibleMetastoreTable(serde: HiveSerDe): CatalogTable = {
- val location = if (tableDefinition.tableType == EXTERNAL) {
- // When we hit this branch, we are saving an external data source table with hive
- // compatible format, which means the data source is file-based and must have a `path`.
- require(tableDefinition.storage.locationUri.isDefined,
- "External file-based data source table must have a `path` entry in storage properties.")
- Some(new Path(tableDefinition.location).toUri.toString)
- } else {
- None
- }
+ private def createDataSourceTable(table: CatalogTable, ignoreIfExists: Boolean): Unit = {
+ // To work around some hive metastore issues, e.g. not case-preserving, bad decimal type
+ // support, no column nullability, etc., we should do some extra works before saving table
+ // metadata into Hive metastore:
+ // 1. Put table metadata like provider, schema, etc. in table properties.
+ // 2. Check if this table is hive compatible.
+ // 2.1 If it's not hive compatible, set location URI, schema, partition columns and bucket
+ // spec to empty and save table metadata to Hive.
+ // 2.2 If it's hive compatible, set serde information in table metadata and try to save
+ // it to Hive. If it fails, treat it as not hive compatible and go back to 2.1
+ val tableProperties = tableMetaToTableProps(table)
+
+ // Ideally we should also put `locationUri` in table properties like provider, schema, etc.
+ // However, in older version of Spark we already store table location in storage properties
+ // with key "path". Here we keep this behaviour for backward compatibility.
+ val storagePropsWithLocation = table.storage.properties ++
+ table.storage.locationUri.map("path" -> _)
+
+ // converts the table metadata to Spark SQL specific format, i.e. set data schema, names and
+ // bucket specification to empty. Note that partition columns are retained, so that we can
+ // call partition-related Hive API later.
+ def newSparkSQLSpecificMetastoreTable(): CatalogTable = {
+ table.copy(
+ // Hive only allows directory paths as location URIs while Spark SQL data source tables
+ // also allow file paths. For non-hive-compatible format, we should not set location URI
+ // to avoid hive metastore to throw exception.
+ storage = table.storage.copy(
+ locationUri = None,
+ properties = storagePropsWithLocation),
+ schema = table.partitionSchema,
+ bucketSpec = None,
+ properties = table.properties ++ tableProperties)
+ }
- tableDefinition.copy(
- storage = tableDefinition.storage.copy(
- locationUri = location,
- inputFormat = serde.inputFormat,
- outputFormat = serde.outputFormat,
- serde = serde.serde,
- properties = storagePropsWithLocation
- ),
- properties = tableDefinition.properties ++ tableProperties)
+ // converts the table metadata to Hive compatible format, i.e. set the serde information.
+ def newHiveCompatibleMetastoreTable(serde: HiveSerDe): CatalogTable = {
+ val location = if (table.tableType == EXTERNAL) {
+ // When we hit this branch, we are saving an external data source table with hive
+ // compatible format, which means the data source is file-based and must have a `path`.
+ require(table.storage.locationUri.isDefined,
+ "External file-based data source table must have a `path` entry in storage properties.")
+ Some(new Path(table.location).toUri.toString)
+ } else {
+ None
}
- val qualifiedTableName = tableDefinition.identifier.quotedString
- val maybeSerde = HiveSerDe.sourceToSerDe(tableDefinition.provider.get)
- val skipHiveMetadata = tableDefinition.storage.properties
- .getOrElse("skipHiveMetadata", "false").toBoolean
-
- val (hiveCompatibleTable, logMessage) = maybeSerde match {
- case _ if skipHiveMetadata =>
- val message =
- s"Persisting data source table $qualifiedTableName into Hive metastore in" +
- "Spark SQL specific format, which is NOT compatible with Hive."
- (None, message)
-
- // our bucketing is un-compatible with hive(different hash function)
- case _ if tableDefinition.bucketSpec.nonEmpty =>
- val message =
- s"Persisting bucketed data source table $qualifiedTableName into " +
- "Hive metastore in Spark SQL specific format, which is NOT compatible with Hive. "
- (None, message)
-
- case Some(serde) =>
- val message =
- s"Persisting file based data source table $qualifiedTableName into " +
- s"Hive metastore in Hive compatible format."
- (Some(newHiveCompatibleMetastoreTable(serde)), message)
-
- case _ =>
- val provider = tableDefinition.provider.get
- val message =
- s"Couldn't find corresponding Hive SerDe for data source provider $provider. " +
- s"Persisting data source table $qualifiedTableName into Hive metastore in " +
- s"Spark SQL specific format, which is NOT compatible with Hive."
- (None, message)
- }
+ table.copy(
+ storage = table.storage.copy(
+ locationUri = location,
+ inputFormat = serde.inputFormat,
+ outputFormat = serde.outputFormat,
+ serde = serde.serde,
+ properties = storagePropsWithLocation
+ ),
+ properties = table.properties ++ tableProperties)
+ }
- (hiveCompatibleTable, logMessage) match {
- case (Some(table), message) =>
- // We first try to save the metadata of the table in a Hive compatible way.
- // If Hive throws an error, we fall back to save its metadata in the Spark SQL
- // specific way.
- try {
- logInfo(message)
- saveTableIntoHive(table, ignoreIfExists)
- } catch {
- case NonFatal(e) =>
- val warningMessage =
- s"Could not persist ${tableDefinition.identifier.quotedString} in a Hive " +
- "compatible way. Persisting it into Hive metastore in Spark SQL specific format."
- logWarning(warningMessage, e)
- saveTableIntoHive(newSparkSQLSpecificMetastoreTable(), ignoreIfExists)
- }
+ val qualifiedTableName = table.identifier.quotedString
+ val maybeSerde = HiveSerDe.sourceToSerDe(table.provider.get)
+ val skipHiveMetadata = table.storage.properties
+ .getOrElse("skipHiveMetadata", "false").toBoolean
+
+ val (hiveCompatibleTable, logMessage) = maybeSerde match {
+ case _ if skipHiveMetadata =>
+ val message =
+ s"Persisting data source table $qualifiedTableName into Hive metastore in" +
+ "Spark SQL specific format, which is NOT compatible with Hive."
+ (None, message)
+
+ // our bucketing is un-compatible with hive(different hash function)
+ case _ if table.bucketSpec.nonEmpty =>
+ val message =
+ s"Persisting bucketed data source table $qualifiedTableName into " +
+ "Hive metastore in Spark SQL specific format, which is NOT compatible with Hive. "
+ (None, message)
+
+ case Some(serde) =>
+ val message =
+ s"Persisting file based data source table $qualifiedTableName into " +
+ s"Hive metastore in Hive compatible format."
+ (Some(newHiveCompatibleMetastoreTable(serde)), message)
+
+ case _ =>
+ val provider = table.provider.get
+ val message =
+ s"Couldn't find corresponding Hive SerDe for data source provider $provider. " +
+ s"Persisting data source table $qualifiedTableName into Hive metastore in " +
+ s"Spark SQL specific format, which is NOT compatible with Hive."
+ (None, message)
+ }
- case (None, message) =>
- logWarning(message)
- saveTableIntoHive(newSparkSQLSpecificMetastoreTable(), ignoreIfExists)
- }
+ (hiveCompatibleTable, logMessage) match {
+ case (Some(table), message) =>
+ // We first try to save the metadata of the table in a Hive compatible way.
+ // If Hive throws an error, we fall back to save its metadata in the Spark SQL
+ // specific way.
+ try {
+ logInfo(message)
+ saveTableIntoHive(table, ignoreIfExists)
+ } catch {
+ case NonFatal(e) =>
+ val warningMessage =
+ s"Could not persist ${table.identifier.quotedString} in a Hive " +
+ "compatible way. Persisting it into Hive metastore in Spark SQL specific format."
+ logWarning(warningMessage, e)
+ saveTableIntoHive(newSparkSQLSpecificMetastoreTable(), ignoreIfExists)
+ }
+
+ case (None, message) =>
+ logWarning(message)
+ saveTableIntoHive(newSparkSQLSpecificMetastoreTable(), ignoreIfExists)
}
}
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveSparkSubmitSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveSparkSubmitSuite.scala
index fbd705172c..a670560c59 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveSparkSubmitSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveSparkSubmitSuite.scala
@@ -24,6 +24,7 @@ import java.util.Date
import scala.collection.mutable.ArrayBuffer
import scala.tools.nsc.Properties
+import org.apache.hadoop.fs.Path
import org.scalatest.{BeforeAndAfterEach, Matchers}
import org.scalatest.concurrent.Timeouts
import org.scalatest.exceptions.TestFailedDueToTimeoutException
@@ -33,11 +34,12 @@ import org.apache.spark._
import org.apache.spark.internal.Logging
import org.apache.spark.sql.{QueryTest, Row, SparkSession}
import org.apache.spark.sql.catalyst.{FunctionIdentifier, TableIdentifier}
-import org.apache.spark.sql.catalyst.catalog.{CatalogFunction, FunctionResource, JarResource}
+import org.apache.spark.sql.catalyst.catalog._
+import org.apache.spark.sql.execution.command.DDLUtils
import org.apache.spark.sql.expressions.Window
import org.apache.spark.sql.hive.test.{TestHive, TestHiveContext}
import org.apache.spark.sql.test.ProcessTestUtils.ProcessOutputCapturer
-import org.apache.spark.sql.types.DecimalType
+import org.apache.spark.sql.types.{DecimalType, StructType}
import org.apache.spark.util.{ResetSystemProperties, Utils}
/**
@@ -295,6 +297,20 @@ class HiveSparkSubmitSuite
runSparkSubmit(args)
}
+ test("SPARK-18360: default table path of tables in default database should depend on the " +
+ "location of default database") {
+ val unusedJar = TestUtils.createJarWithClasses(Seq.empty)
+ val args = Seq(
+ "--class", SPARK_18360.getClass.getName.stripSuffix("$"),
+ "--name", "SPARK-18360",
+ "--master", "local-cluster[2,1,1024]",
+ "--conf", "spark.ui.enabled=false",
+ "--conf", "spark.master.rest.enabled=false",
+ "--driver-java-options", "-Dderby.system.durability=test",
+ unusedJar.toString)
+ runSparkSubmit(args)
+ }
+
// NOTE: This is an expensive operation in terms of time (10 seconds+). Use sparingly.
// This is copied from org.apache.spark.deploy.SparkSubmitSuite
private def runSparkSubmit(args: Seq[String]): Unit = {
@@ -397,11 +413,7 @@ object SetWarehouseLocationTest extends Logging {
def main(args: Array[String]): Unit = {
Utils.configTestLog4j("INFO")
- val sparkConf = new SparkConf(loadDefaults = true)
- val builder = SparkSession.builder()
- .config(sparkConf)
- .config("spark.ui.enabled", "false")
- .enableHiveSupport()
+ val sparkConf = new SparkConf(loadDefaults = true).set("spark.ui.enabled", "false")
val providedExpectedWarehouseLocation =
sparkConf.getOption("spark.sql.test.expectedWarehouseDir")
@@ -410,7 +422,7 @@ object SetWarehouseLocationTest extends Logging {
// If spark.sql.test.expectedWarehouseDir is set, the warehouse dir is set
// through spark-summit. So, neither spark.sql.warehouse.dir nor
// hive.metastore.warehouse.dir is set at here.
- (builder.getOrCreate(), warehouseDir)
+ (new TestHiveContext(new SparkContext(sparkConf)).sparkSession, warehouseDir)
case None =>
val warehouseLocation = Utils.createTempDir()
warehouseLocation.delete()
@@ -420,10 +432,10 @@ object SetWarehouseLocationTest extends Logging {
// spark.sql.warehouse.dir and hive.metastore.warehouse.dir.
// We are expecting that the value of spark.sql.warehouse.dir will override the
// value of hive.metastore.warehouse.dir.
- val session = builder
- .config("spark.sql.warehouse.dir", warehouseLocation.toString)
- .config("hive.metastore.warehouse.dir", hiveWarehouseLocation.toString)
- .getOrCreate()
+ val session = new TestHiveContext(new SparkContext(sparkConf
+ .set("spark.sql.warehouse.dir", warehouseLocation.toString)
+ .set("hive.metastore.warehouse.dir", hiveWarehouseLocation.toString)))
+ .sparkSession
(session, warehouseLocation.toString)
}
@@ -801,3 +813,43 @@ object SPARK_14244 extends QueryTest {
}
}
}
+
+object SPARK_18360 {
+ def main(args: Array[String]): Unit = {
+ val spark = SparkSession.builder()
+ .config("spark.ui.enabled", "false")
+ .enableHiveSupport().getOrCreate()
+
+ val defaultDbLocation = spark.catalog.getDatabase("default").locationUri
+ assert(new Path(defaultDbLocation) == new Path(spark.sharedState.warehousePath))
+
+ val hiveClient = spark.sharedState.externalCatalog.asInstanceOf[HiveExternalCatalog].client
+
+ try {
+ val tableMeta = CatalogTable(
+ identifier = TableIdentifier("test_tbl", Some("default")),
+ tableType = CatalogTableType.MANAGED,
+ storage = CatalogStorageFormat.empty,
+ schema = new StructType().add("i", "int"),
+ provider = Some(DDLUtils.HIVE_PROVIDER))
+
+ val newWarehousePath = Utils.createTempDir().getAbsolutePath
+ hiveClient.runSqlHive(s"SET hive.metastore.warehouse.dir=$newWarehousePath")
+ hiveClient.createTable(tableMeta, ignoreIfExists = false)
+ val rawTable = hiveClient.getTable("default", "test_tbl")
+ // Hive will use the value of `hive.metastore.warehouse.dir` to generate default table
+ // location for tables in default database.
+ assert(rawTable.storage.locationUri.get.contains(newWarehousePath))
+ hiveClient.dropTable("default", "test_tbl", ignoreIfNotExists = false, purge = false)
+
+ spark.sharedState.externalCatalog.createTable(tableMeta, ignoreIfExists = false)
+ val readBack = spark.sharedState.externalCatalog.getTable("default", "test_tbl")
+ // Spark SQL will use the location of default database to generate default table
+ // location for tables in default database.
+ assert(readBack.storage.locationUri.get.contains(defaultDbLocation))
+ } finally {
+ hiveClient.dropTable("default", "test_tbl", ignoreIfNotExists = true, purge = false)
+ hiveClient.runSqlHive(s"SET hive.metastore.warehouse.dir=$defaultDbLocation")
+ }
+ }
+}