diff options
author | Yin Huai <yhuai@databricks.com> | 2016-04-30 18:04:36 -0700 |
---|---|---|
committer | Reynold Xin <rxin@databricks.com> | 2016-04-30 18:04:42 -0700 |
commit | 0182d9599d15f70eeb6288bf9294fa677004bd14 (patch) | |
tree | 322c2997d575cd5fa367eed0ef6b9cf2a69ba68b /sql/catalyst | |
parent | 19a6d192d53ce6dffe998ce110adab1f2efcb23e (diff) | |
download | spark-0182d9599d15f70eeb6288bf9294fa677004bd14.tar.gz spark-0182d9599d15f70eeb6288bf9294fa677004bd14.tar.bz2 spark-0182d9599d15f70eeb6288bf9294fa677004bd14.zip |
[SPARK-15034][SPARK-15035][SPARK-15036][SQL] Use spark.sql.warehouse.dir as the warehouse location
This PR contains three changes:
1. We will use spark.sql.warehouse.dir set warehouse location. We will not use hive.metastore.warehouse.dir.
2. SessionCatalog needs to set the location to default db. Otherwise, when creating a table in SparkSession without hive support, the default db's path will be an empty string.
3. When we create a database, we need to make the path qualified.
Existing tests and new tests
Author: Yin Huai <yhuai@databricks.com>
Closes #12812 from yhuai/warehouse.
Diffstat (limited to 'sql/catalyst')
-rw-r--r-- | sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala | 35 |
1 files changed, 30 insertions, 5 deletions
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala index b06f24bc48..a445a253ff 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala @@ -21,6 +21,7 @@ import java.io.File import scala.collection.mutable +import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs.Path import org.apache.spark.internal.Logging @@ -44,14 +45,21 @@ class SessionCatalog( externalCatalog: ExternalCatalog, functionResourceLoader: FunctionResourceLoader, functionRegistry: FunctionRegistry, - conf: CatalystConf) extends Logging { + conf: CatalystConf, + hadoopConf: Configuration) extends Logging { import CatalogTypes.TablePartitionSpec + // For testing only. def this( externalCatalog: ExternalCatalog, functionRegistry: FunctionRegistry, conf: CatalystConf) { - this(externalCatalog, DummyFunctionResourceLoader, functionRegistry, conf) + this( + externalCatalog, + DummyFunctionResourceLoader, + functionRegistry, + conf, + new Configuration()) } // For testing only. @@ -68,7 +76,8 @@ class SessionCatalog( // the corresponding item in the current database. protected var currentDb = { val defaultName = "default" - val defaultDbDefinition = CatalogDatabase(defaultName, "default database", "", Map()) + val defaultDbDefinition = + CatalogDatabase(defaultName, "default database", conf.warehousePath, Map()) // Initialize default database if it doesn't already exist createDatabase(defaultDbDefinition, ignoreIfExists = true) defaultName @@ -81,6 +90,18 @@ class SessionCatalog( if (conf.caseSensitiveAnalysis) name else name.toLowerCase } + /** + * This method is used to make the given path qualified before we + * store this path in the underlying external catalog. So, when a path + * does not contain a scheme, this path will not be changed after the default + * FileSystem is changed. + */ + private def makeQualifiedPath(path: String): Path = { + val hadoopPath = new Path(path) + val fs = hadoopPath.getFileSystem(hadoopConf) + fs.makeQualified(hadoopPath) + } + // ---------------------------------------------------------------------------- // Databases // ---------------------------------------------------------------------------- @@ -88,7 +109,10 @@ class SessionCatalog( // ---------------------------------------------------------------------------- def createDatabase(dbDefinition: CatalogDatabase, ignoreIfExists: Boolean): Unit = { - externalCatalog.createDatabase(dbDefinition, ignoreIfExists) + val qualifiedPath = makeQualifiedPath(dbDefinition.locationUri).toString + externalCatalog.createDatabase( + dbDefinition.copy(locationUri = qualifiedPath), + ignoreIfExists) } def dropDatabase(db: String, ignoreIfNotExists: Boolean, cascade: Boolean): Unit = { @@ -125,7 +149,8 @@ class SessionCatalog( } def getDefaultDBPath(db: String): String = { - new Path(new Path(conf.warehousePath), db + ".db").toString + val database = if (conf.caseSensitiveAnalysis) db else db.toLowerCase + new Path(new Path(conf.warehousePath), database + ".db").toString } // ---------------------------------------------------------------------------- |