aboutsummaryrefslogtreecommitdiff
path: root/sql/catalyst
diff options
context:
space:
mode:
authorKunal Khamar <kkhamar@outlook.com>2017-03-08 13:06:22 -0800
committerShixiong Zhu <shixiong@databricks.com>2017-03-08 13:20:45 -0800
commit6570cfd7abe349dc6d2151f2ac9dc662e7465a79 (patch)
tree97b54a89a3d228c737203989d6b68db5ec75d8ef /sql/catalyst
parent1bf9012380de2aa7bdf39220b55748defde8b700 (diff)
downloadspark-6570cfd7abe349dc6d2151f2ac9dc662e7465a79.tar.gz
spark-6570cfd7abe349dc6d2151f2ac9dc662e7465a79.tar.bz2
spark-6570cfd7abe349dc6d2151f2ac9dc662e7465a79.zip
[SPARK-19540][SQL] Add ability to clone SparkSession wherein cloned session has an identical copy of the SessionState
Forking a newSession() from SparkSession currently makes a new SparkSession that does not retain SessionState (i.e. temporary tables, SQL config, registered functions etc.) This change adds a method cloneSession() which creates a new SparkSession with a copy of the parent's SessionState. Subsequent changes to base session are not propagated to cloned session, clone is independent after creation. If the base is changed after clone has been created, say user registers new UDF, then the new UDF will not be available inside the clone. Same goes for configs and temp tables. Unit tests Author: Kunal Khamar <kkhamar@outlook.com> Author: Shixiong Zhu <shixiong@databricks.com> Closes #16826 from kunalkhamar/fork-sparksession.
Diffstat (limited to 'sql/catalyst')
-rw-r--r--sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/CatalystConf.scala7
-rw-r--r--sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionRegistry.scala5
-rw-r--r--sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala38
-rw-r--r--sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalogSuite.scala55
4 files changed, 100 insertions, 5 deletions
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/CatalystConf.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/CatalystConf.scala
index fb99cb27b8..cff0efa979 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/CatalystConf.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/CatalystConf.scala
@@ -66,6 +66,8 @@ trait CatalystConf {
/** The maximum number of joined nodes allowed in the dynamic programming algorithm. */
def joinReorderDPThreshold: Int
+
+ override def clone(): CatalystConf = throw new CloneNotSupportedException()
}
@@ -85,4 +87,7 @@ case class SimpleCatalystConf(
joinReorderDPThreshold: Int = 12,
warehousePath: String = "/user/hive/warehouse",
sessionLocalTimeZone: String = TimeZone.getDefault().getID)
- extends CatalystConf
+ extends CatalystConf {
+
+ override def clone(): SimpleCatalystConf = this.copy()
+}
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionRegistry.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionRegistry.scala
index 556fa99017..0dcb44081f 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionRegistry.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionRegistry.scala
@@ -64,6 +64,8 @@ trait FunctionRegistry {
/** Clear all registered functions. */
def clear(): Unit
+ /** Create a copy of this registry with identical functions as this registry. */
+ override def clone(): FunctionRegistry = throw new CloneNotSupportedException()
}
class SimpleFunctionRegistry extends FunctionRegistry {
@@ -107,7 +109,7 @@ class SimpleFunctionRegistry extends FunctionRegistry {
functionBuilders.clear()
}
- def copy(): SimpleFunctionRegistry = synchronized {
+ override def clone(): SimpleFunctionRegistry = synchronized {
val registry = new SimpleFunctionRegistry
functionBuilders.iterator.foreach { case (name, (info, builder)) =>
registry.registerFunction(name, info, builder)
@@ -150,6 +152,7 @@ object EmptyFunctionRegistry extends FunctionRegistry {
throw new UnsupportedOperationException
}
+ override def clone(): FunctionRegistry = this
}
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala
index 831e37aac1..6cfc4a4321 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala
@@ -50,7 +50,6 @@ object SessionCatalog {
class SessionCatalog(
externalCatalog: ExternalCatalog,
globalTempViewManager: GlobalTempViewManager,
- functionResourceLoader: FunctionResourceLoader,
functionRegistry: FunctionRegistry,
conf: CatalystConf,
hadoopConf: Configuration,
@@ -66,16 +65,19 @@ class SessionCatalog(
this(
externalCatalog,
new GlobalTempViewManager("global_temp"),
- DummyFunctionResourceLoader,
functionRegistry,
conf,
new Configuration(),
CatalystSqlParser)
+ functionResourceLoader = DummyFunctionResourceLoader
}
// For testing only.
def this(externalCatalog: ExternalCatalog) {
- this(externalCatalog, new SimpleFunctionRegistry, new SimpleCatalystConf(true))
+ this(
+ externalCatalog,
+ new SimpleFunctionRegistry,
+ SimpleCatalystConf(caseSensitiveAnalysis = true))
}
/** List of temporary tables, mapping from table name to their logical plan. */
@@ -89,6 +91,8 @@ class SessionCatalog(
@GuardedBy("this")
protected var currentDb = formatDatabaseName(DEFAULT_DATABASE)
+ @volatile var functionResourceLoader: FunctionResourceLoader = _
+
/**
* Checks if the given name conforms the Hive standard ("[a-zA-z_0-9]+"),
* i.e. if this name only contains characters, numbers, and _.
@@ -987,6 +991,9 @@ class SessionCatalog(
* by a tuple (resource type, resource uri).
*/
def loadFunctionResources(resources: Seq[FunctionResource]): Unit = {
+ if (functionResourceLoader == null) {
+ throw new IllegalStateException("functionResourceLoader has not yet been initialized")
+ }
resources.foreach(functionResourceLoader.loadResource)
}
@@ -1182,4 +1189,29 @@ class SessionCatalog(
}
}
+ /**
+ * Create a new [[SessionCatalog]] with the provided parameters. `externalCatalog` and
+ * `globalTempViewManager` are `inherited`, while `currentDb` and `tempTables` are copied.
+ */
+ def newSessionCatalogWith(
+ conf: CatalystConf,
+ hadoopConf: Configuration,
+ functionRegistry: FunctionRegistry,
+ parser: ParserInterface): SessionCatalog = {
+ val catalog = new SessionCatalog(
+ externalCatalog,
+ globalTempViewManager,
+ functionRegistry,
+ conf,
+ hadoopConf,
+ parser)
+
+ synchronized {
+ catalog.currentDb = currentDb
+ // copy over temporary tables
+ tempTables.foreach(kv => catalog.tempTables.put(kv._1, kv._2))
+ }
+
+ catalog
+ }
}
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalogSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalogSuite.scala
index 328a16c4bf..7e74dcdef0 100644
--- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalogSuite.scala
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalogSuite.scala
@@ -17,6 +17,8 @@
package org.apache.spark.sql.catalyst.catalog
+import org.apache.hadoop.conf.Configuration
+
import org.apache.spark.sql.AnalysisException
import org.apache.spark.sql.catalyst.{FunctionIdentifier, SimpleCatalystConf, TableIdentifier}
import org.apache.spark.sql.catalyst.analysis._
@@ -1197,6 +1199,59 @@ class SessionCatalogSuite extends PlanTest {
}
}
+ test("clone SessionCatalog - temp views") {
+ val externalCatalog = newEmptyCatalog()
+ val original = new SessionCatalog(externalCatalog)
+ val tempTable1 = Range(1, 10, 1, 10)
+ original.createTempView("copytest1", tempTable1, overrideIfExists = false)
+
+ // check if tables copied over
+ val clone = original.newSessionCatalogWith(
+ SimpleCatalystConf(caseSensitiveAnalysis = true),
+ new Configuration(),
+ new SimpleFunctionRegistry,
+ CatalystSqlParser)
+ assert(original ne clone)
+ assert(clone.getTempView("copytest1") == Some(tempTable1))
+
+ // check if clone and original independent
+ clone.dropTable(TableIdentifier("copytest1"), ignoreIfNotExists = false, purge = false)
+ assert(original.getTempView("copytest1") == Some(tempTable1))
+
+ val tempTable2 = Range(1, 20, 2, 10)
+ original.createTempView("copytest2", tempTable2, overrideIfExists = false)
+ assert(clone.getTempView("copytest2").isEmpty)
+ }
+
+ test("clone SessionCatalog - current db") {
+ val externalCatalog = newEmptyCatalog()
+ val db1 = "db1"
+ val db2 = "db2"
+ val db3 = "db3"
+
+ externalCatalog.createDatabase(newDb(db1), ignoreIfExists = true)
+ externalCatalog.createDatabase(newDb(db2), ignoreIfExists = true)
+ externalCatalog.createDatabase(newDb(db3), ignoreIfExists = true)
+
+ val original = new SessionCatalog(externalCatalog)
+ original.setCurrentDatabase(db1)
+
+ // check if current db copied over
+ val clone = original.newSessionCatalogWith(
+ SimpleCatalystConf(caseSensitiveAnalysis = true),
+ new Configuration(),
+ new SimpleFunctionRegistry,
+ CatalystSqlParser)
+ assert(original ne clone)
+ assert(clone.getCurrentDatabase == db1)
+
+ // check if clone and original independent
+ clone.setCurrentDatabase(db2)
+ assert(original.getCurrentDatabase == db1)
+ original.setCurrentDatabase(db3)
+ assert(clone.getCurrentDatabase == db2)
+ }
+
test("SPARK-19737: detect undefined functions without triggering relation resolution") {
import org.apache.spark.sql.catalyst.dsl.plans._