aboutsummaryrefslogtreecommitdiff
path: root/sql/catalyst/src
diff options
context:
space:
mode:
authorYin Huai <yhuai@databricks.com>2016-04-30 18:04:36 -0700
committerReynold Xin <rxin@databricks.com>2016-04-30 18:04:42 -0700
commit0182d9599d15f70eeb6288bf9294fa677004bd14 (patch)
tree322c2997d575cd5fa367eed0ef6b9cf2a69ba68b /sql/catalyst/src
parent19a6d192d53ce6dffe998ce110adab1f2efcb23e (diff)
downloadspark-0182d9599d15f70eeb6288bf9294fa677004bd14.tar.gz
spark-0182d9599d15f70eeb6288bf9294fa677004bd14.tar.bz2
spark-0182d9599d15f70eeb6288bf9294fa677004bd14.zip
[SPARK-15034][SPARK-15035][SPARK-15036][SQL] Use spark.sql.warehouse.dir as the warehouse location
This PR contains three changes: 1. We will use spark.sql.warehouse.dir set warehouse location. We will not use hive.metastore.warehouse.dir. 2. SessionCatalog needs to set the location to default db. Otherwise, when creating a table in SparkSession without hive support, the default db's path will be an empty string. 3. When we create a database, we need to make the path qualified. Existing tests and new tests Author: Yin Huai <yhuai@databricks.com> Closes #12812 from yhuai/warehouse.
Diffstat (limited to 'sql/catalyst/src')
-rw-r--r--sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala35
1 files changed, 30 insertions, 5 deletions
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala
index b06f24bc48..a445a253ff 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala
@@ -21,6 +21,7 @@ import java.io.File
import scala.collection.mutable
+import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.Path
import org.apache.spark.internal.Logging
@@ -44,14 +45,21 @@ class SessionCatalog(
externalCatalog: ExternalCatalog,
functionResourceLoader: FunctionResourceLoader,
functionRegistry: FunctionRegistry,
- conf: CatalystConf) extends Logging {
+ conf: CatalystConf,
+ hadoopConf: Configuration) extends Logging {
import CatalogTypes.TablePartitionSpec
+ // For testing only.
def this(
externalCatalog: ExternalCatalog,
functionRegistry: FunctionRegistry,
conf: CatalystConf) {
- this(externalCatalog, DummyFunctionResourceLoader, functionRegistry, conf)
+ this(
+ externalCatalog,
+ DummyFunctionResourceLoader,
+ functionRegistry,
+ conf,
+ new Configuration())
}
// For testing only.
@@ -68,7 +76,8 @@ class SessionCatalog(
// the corresponding item in the current database.
protected var currentDb = {
val defaultName = "default"
- val defaultDbDefinition = CatalogDatabase(defaultName, "default database", "", Map())
+ val defaultDbDefinition =
+ CatalogDatabase(defaultName, "default database", conf.warehousePath, Map())
// Initialize default database if it doesn't already exist
createDatabase(defaultDbDefinition, ignoreIfExists = true)
defaultName
@@ -81,6 +90,18 @@ class SessionCatalog(
if (conf.caseSensitiveAnalysis) name else name.toLowerCase
}
+ /**
+ * This method is used to make the given path qualified before we
+ * store this path in the underlying external catalog. So, when a path
+ * does not contain a scheme, this path will not be changed after the default
+ * FileSystem is changed.
+ */
+ private def makeQualifiedPath(path: String): Path = {
+ val hadoopPath = new Path(path)
+ val fs = hadoopPath.getFileSystem(hadoopConf)
+ fs.makeQualified(hadoopPath)
+ }
+
// ----------------------------------------------------------------------------
// Databases
// ----------------------------------------------------------------------------
@@ -88,7 +109,10 @@ class SessionCatalog(
// ----------------------------------------------------------------------------
def createDatabase(dbDefinition: CatalogDatabase, ignoreIfExists: Boolean): Unit = {
- externalCatalog.createDatabase(dbDefinition, ignoreIfExists)
+ val qualifiedPath = makeQualifiedPath(dbDefinition.locationUri).toString
+ externalCatalog.createDatabase(
+ dbDefinition.copy(locationUri = qualifiedPath),
+ ignoreIfExists)
}
def dropDatabase(db: String, ignoreIfNotExists: Boolean, cascade: Boolean): Unit = {
@@ -125,7 +149,8 @@ class SessionCatalog(
}
def getDefaultDBPath(db: String): String = {
- new Path(new Path(conf.warehousePath), db + ".db").toString
+ val database = if (conf.caseSensitiveAnalysis) db else db.toLowerCase
+ new Path(new Path(conf.warehousePath), database + ".db").toString
}
// ----------------------------------------------------------------------------