aboutsummaryrefslogtreecommitdiff
path: root/sql/catalyst
diff options
context:
space:
mode:
Diffstat (limited to 'sql/catalyst')
-rw-r--r--sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/CatalystConf.scala7
-rw-r--r--sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionRegistry.scala5
-rw-r--r--sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala38
-rw-r--r--sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalogSuite.scala55
4 files changed, 100 insertions, 5 deletions
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/CatalystConf.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/CatalystConf.scala
index fb99cb27b8..cff0efa979 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/CatalystConf.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/CatalystConf.scala
@@ -66,6 +66,8 @@ trait CatalystConf {
/** The maximum number of joined nodes allowed in the dynamic programming algorithm. */
def joinReorderDPThreshold: Int
+
+ override def clone(): CatalystConf = throw new CloneNotSupportedException()
}
@@ -85,4 +87,7 @@ case class SimpleCatalystConf(
joinReorderDPThreshold: Int = 12,
warehousePath: String = "/user/hive/warehouse",
sessionLocalTimeZone: String = TimeZone.getDefault().getID)
- extends CatalystConf
+ extends CatalystConf {
+
+ override def clone(): SimpleCatalystConf = this.copy()
+}
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionRegistry.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionRegistry.scala
index 556fa99017..0dcb44081f 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionRegistry.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionRegistry.scala
@@ -64,6 +64,8 @@ trait FunctionRegistry {
/** Clear all registered functions. */
def clear(): Unit
+ /** Create a copy of this registry with identical functions as this registry. */
+ override def clone(): FunctionRegistry = throw new CloneNotSupportedException()
}
class SimpleFunctionRegistry extends FunctionRegistry {
@@ -107,7 +109,7 @@ class SimpleFunctionRegistry extends FunctionRegistry {
functionBuilders.clear()
}
- def copy(): SimpleFunctionRegistry = synchronized {
+ override def clone(): SimpleFunctionRegistry = synchronized {
val registry = new SimpleFunctionRegistry
functionBuilders.iterator.foreach { case (name, (info, builder)) =>
registry.registerFunction(name, info, builder)
@@ -150,6 +152,7 @@ object EmptyFunctionRegistry extends FunctionRegistry {
throw new UnsupportedOperationException
}
+ override def clone(): FunctionRegistry = this
}
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala
index 831e37aac1..6cfc4a4321 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala
@@ -50,7 +50,6 @@ object SessionCatalog {
class SessionCatalog(
externalCatalog: ExternalCatalog,
globalTempViewManager: GlobalTempViewManager,
- functionResourceLoader: FunctionResourceLoader,
functionRegistry: FunctionRegistry,
conf: CatalystConf,
hadoopConf: Configuration,
@@ -66,16 +65,19 @@ class SessionCatalog(
this(
externalCatalog,
new GlobalTempViewManager("global_temp"),
- DummyFunctionResourceLoader,
functionRegistry,
conf,
new Configuration(),
CatalystSqlParser)
+ functionResourceLoader = DummyFunctionResourceLoader
}
// For testing only.
def this(externalCatalog: ExternalCatalog) {
- this(externalCatalog, new SimpleFunctionRegistry, new SimpleCatalystConf(true))
+ this(
+ externalCatalog,
+ new SimpleFunctionRegistry,
+ SimpleCatalystConf(caseSensitiveAnalysis = true))
}
/** List of temporary tables, mapping from table name to their logical plan. */
@@ -89,6 +91,8 @@ class SessionCatalog(
@GuardedBy("this")
protected var currentDb = formatDatabaseName(DEFAULT_DATABASE)
+ @volatile var functionResourceLoader: FunctionResourceLoader = _
+
/**
* Checks if the given name conforms the Hive standard ("[a-zA-z_0-9]+"),
* i.e. if this name only contains characters, numbers, and _.
@@ -987,6 +991,9 @@ class SessionCatalog(
* by a tuple (resource type, resource uri).
*/
def loadFunctionResources(resources: Seq[FunctionResource]): Unit = {
+ if (functionResourceLoader == null) {
+ throw new IllegalStateException("functionResourceLoader has not yet been initialized")
+ }
resources.foreach(functionResourceLoader.loadResource)
}
@@ -1182,4 +1189,29 @@ class SessionCatalog(
}
}
+ /**
+ * Create a new [[SessionCatalog]] with the provided parameters. `externalCatalog` and
+ * `globalTempViewManager` are `inherited`, while `currentDb` and `tempTables` are copied.
+ */
+ def newSessionCatalogWith(
+ conf: CatalystConf,
+ hadoopConf: Configuration,
+ functionRegistry: FunctionRegistry,
+ parser: ParserInterface): SessionCatalog = {
+ val catalog = new SessionCatalog(
+ externalCatalog,
+ globalTempViewManager,
+ functionRegistry,
+ conf,
+ hadoopConf,
+ parser)
+
+ synchronized {
+ catalog.currentDb = currentDb
+ // copy over temporary tables
+ tempTables.foreach(kv => catalog.tempTables.put(kv._1, kv._2))
+ }
+
+ catalog
+ }
}
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalogSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalogSuite.scala
index 328a16c4bf..7e74dcdef0 100644
--- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalogSuite.scala
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalogSuite.scala
@@ -17,6 +17,8 @@
package org.apache.spark.sql.catalyst.catalog
+import org.apache.hadoop.conf.Configuration
+
import org.apache.spark.sql.AnalysisException
import org.apache.spark.sql.catalyst.{FunctionIdentifier, SimpleCatalystConf, TableIdentifier}
import org.apache.spark.sql.catalyst.analysis._
@@ -1197,6 +1199,59 @@ class SessionCatalogSuite extends PlanTest {
}
}
+ test("clone SessionCatalog - temp views") {
+ val externalCatalog = newEmptyCatalog()
+ val original = new SessionCatalog(externalCatalog)
+ val tempTable1 = Range(1, 10, 1, 10)
+ original.createTempView("copytest1", tempTable1, overrideIfExists = false)
+
+ // check if tables copied over
+ val clone = original.newSessionCatalogWith(
+ SimpleCatalystConf(caseSensitiveAnalysis = true),
+ new Configuration(),
+ new SimpleFunctionRegistry,
+ CatalystSqlParser)
+ assert(original ne clone)
+ assert(clone.getTempView("copytest1") == Some(tempTable1))
+
+ // check if clone and original independent
+ clone.dropTable(TableIdentifier("copytest1"), ignoreIfNotExists = false, purge = false)
+ assert(original.getTempView("copytest1") == Some(tempTable1))
+
+ val tempTable2 = Range(1, 20, 2, 10)
+ original.createTempView("copytest2", tempTable2, overrideIfExists = false)
+ assert(clone.getTempView("copytest2").isEmpty)
+ }
+
+ test("clone SessionCatalog - current db") {
+ val externalCatalog = newEmptyCatalog()
+ val db1 = "db1"
+ val db2 = "db2"
+ val db3 = "db3"
+
+ externalCatalog.createDatabase(newDb(db1), ignoreIfExists = true)
+ externalCatalog.createDatabase(newDb(db2), ignoreIfExists = true)
+ externalCatalog.createDatabase(newDb(db3), ignoreIfExists = true)
+
+ val original = new SessionCatalog(externalCatalog)
+ original.setCurrentDatabase(db1)
+
+ // check if current db copied over
+ val clone = original.newSessionCatalogWith(
+ SimpleCatalystConf(caseSensitiveAnalysis = true),
+ new Configuration(),
+ new SimpleFunctionRegistry,
+ CatalystSqlParser)
+ assert(original ne clone)
+ assert(clone.getCurrentDatabase == db1)
+
+ // check if clone and original independent
+ clone.setCurrentDatabase(db2)
+ assert(original.getCurrentDatabase == db1)
+ original.setCurrentDatabase(db3)
+ assert(clone.getCurrentDatabase == db2)
+ }
+
test("SPARK-19737: detect undefined functions without triggering relation resolution") {
import org.apache.spark.sql.catalyst.dsl.plans._