aboutsummaryrefslogtreecommitdiff
path: root/sql/catalyst/src
diff options
context:
space:
mode:
Diffstat (limited to 'sql/catalyst/src')
-rw-r--r--sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala35
1 files changed, 30 insertions, 5 deletions
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala
index b06f24bc48..a445a253ff 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala
@@ -21,6 +21,7 @@ import java.io.File
import scala.collection.mutable
+import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.Path
import org.apache.spark.internal.Logging
@@ -44,14 +45,21 @@ class SessionCatalog(
externalCatalog: ExternalCatalog,
functionResourceLoader: FunctionResourceLoader,
functionRegistry: FunctionRegistry,
- conf: CatalystConf) extends Logging {
+ conf: CatalystConf,
+ hadoopConf: Configuration) extends Logging {
import CatalogTypes.TablePartitionSpec
+ // For testing only.
def this(
externalCatalog: ExternalCatalog,
functionRegistry: FunctionRegistry,
conf: CatalystConf) {
- this(externalCatalog, DummyFunctionResourceLoader, functionRegistry, conf)
+ this(
+ externalCatalog,
+ DummyFunctionResourceLoader,
+ functionRegistry,
+ conf,
+ new Configuration())
}
// For testing only.
@@ -68,7 +76,8 @@ class SessionCatalog(
// the corresponding item in the current database.
protected var currentDb = {
val defaultName = "default"
- val defaultDbDefinition = CatalogDatabase(defaultName, "default database", "", Map())
+ val defaultDbDefinition =
+ CatalogDatabase(defaultName, "default database", conf.warehousePath, Map())
// Initialize default database if it doesn't already exist
createDatabase(defaultDbDefinition, ignoreIfExists = true)
defaultName
@@ -81,6 +90,18 @@ class SessionCatalog(
if (conf.caseSensitiveAnalysis) name else name.toLowerCase
}
+ /**
+ * This method is used to make the given path qualified before we
+ * store this path in the underlying external catalog. So, when a path
+ * does not contain a scheme, this path will not be changed after the default
+ * FileSystem is changed.
+ */
+ private def makeQualifiedPath(path: String): Path = {
+ val hadoopPath = new Path(path)
+ val fs = hadoopPath.getFileSystem(hadoopConf)
+ fs.makeQualified(hadoopPath)
+ }
+
// ----------------------------------------------------------------------------
// Databases
// ----------------------------------------------------------------------------
@@ -88,7 +109,10 @@ class SessionCatalog(
// ----------------------------------------------------------------------------
def createDatabase(dbDefinition: CatalogDatabase, ignoreIfExists: Boolean): Unit = {
- externalCatalog.createDatabase(dbDefinition, ignoreIfExists)
+ val qualifiedPath = makeQualifiedPath(dbDefinition.locationUri).toString
+ externalCatalog.createDatabase(
+ dbDefinition.copy(locationUri = qualifiedPath),
+ ignoreIfExists)
}
def dropDatabase(db: String, ignoreIfNotExists: Boolean, cascade: Boolean): Unit = {
@@ -125,7 +149,8 @@ class SessionCatalog(
}
def getDefaultDBPath(db: String): String = {
- new Path(new Path(conf.warehousePath), db + ".db").toString
+ val database = if (conf.caseSensitiveAnalysis) db else db.toLowerCase
+ new Path(new Path(conf.warehousePath), database + ".db").toString
}
// ----------------------------------------------------------------------------