diff options
author | Kunal Khamar <kkhamar@outlook.com> | 2017-03-08 13:06:22 -0800 |
---|---|---|
committer | Shixiong Zhu <shixiong@databricks.com> | 2017-03-08 13:20:45 -0800 |
commit | 6570cfd7abe349dc6d2151f2ac9dc662e7465a79 (patch) | |
tree | 97b54a89a3d228c737203989d6b68db5ec75d8ef /sql/catalyst | |
parent | 1bf9012380de2aa7bdf39220b55748defde8b700 (diff) | |
download | spark-6570cfd7abe349dc6d2151f2ac9dc662e7465a79.tar.gz spark-6570cfd7abe349dc6d2151f2ac9dc662e7465a79.tar.bz2 spark-6570cfd7abe349dc6d2151f2ac9dc662e7465a79.zip |
[SPARK-19540][SQL] Add ability to clone SparkSession wherein cloned session has an identical copy of the SessionState
Forking a newSession() from SparkSession currently makes a new SparkSession that does not retain SessionState (i.e. temporary tables, SQL config, registered functions etc.) This change adds a method cloneSession() which creates a new SparkSession with a copy of the parent's SessionState.
Subsequent changes to base session are not propagated to cloned session, clone is independent after creation.
If the base is changed after clone has been created, say user registers new UDF, then the new UDF will not be available inside the clone. Same goes for configs and temp tables.
Unit tests
Author: Kunal Khamar <kkhamar@outlook.com>
Author: Shixiong Zhu <shixiong@databricks.com>
Closes #16826 from kunalkhamar/fork-sparksession.
Diffstat (limited to 'sql/catalyst')
4 files changed, 100 insertions, 5 deletions
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/CatalystConf.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/CatalystConf.scala index fb99cb27b8..cff0efa979 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/CatalystConf.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/CatalystConf.scala @@ -66,6 +66,8 @@ trait CatalystConf { /** The maximum number of joined nodes allowed in the dynamic programming algorithm. */ def joinReorderDPThreshold: Int + + override def clone(): CatalystConf = throw new CloneNotSupportedException() } @@ -85,4 +87,7 @@ case class SimpleCatalystConf( joinReorderDPThreshold: Int = 12, warehousePath: String = "/user/hive/warehouse", sessionLocalTimeZone: String = TimeZone.getDefault().getID) - extends CatalystConf + extends CatalystConf { + + override def clone(): SimpleCatalystConf = this.copy() +} diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionRegistry.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionRegistry.scala index 556fa99017..0dcb44081f 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionRegistry.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionRegistry.scala @@ -64,6 +64,8 @@ trait FunctionRegistry { /** Clear all registered functions. */ def clear(): Unit + /** Create a copy of this registry with identical functions as this registry. */ + override def clone(): FunctionRegistry = throw new CloneNotSupportedException() } class SimpleFunctionRegistry extends FunctionRegistry { @@ -107,7 +109,7 @@ class SimpleFunctionRegistry extends FunctionRegistry { functionBuilders.clear() } - def copy(): SimpleFunctionRegistry = synchronized { + override def clone(): SimpleFunctionRegistry = synchronized { val registry = new SimpleFunctionRegistry functionBuilders.iterator.foreach { case (name, (info, builder)) => registry.registerFunction(name, info, builder) @@ -150,6 +152,7 @@ object EmptyFunctionRegistry extends FunctionRegistry { throw new UnsupportedOperationException } + override def clone(): FunctionRegistry = this } diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala index 831e37aac1..6cfc4a4321 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala @@ -50,7 +50,6 @@ object SessionCatalog { class SessionCatalog( externalCatalog: ExternalCatalog, globalTempViewManager: GlobalTempViewManager, - functionResourceLoader: FunctionResourceLoader, functionRegistry: FunctionRegistry, conf: CatalystConf, hadoopConf: Configuration, @@ -66,16 +65,19 @@ class SessionCatalog( this( externalCatalog, new GlobalTempViewManager("global_temp"), - DummyFunctionResourceLoader, functionRegistry, conf, new Configuration(), CatalystSqlParser) + functionResourceLoader = DummyFunctionResourceLoader } // For testing only. def this(externalCatalog: ExternalCatalog) { - this(externalCatalog, new SimpleFunctionRegistry, new SimpleCatalystConf(true)) + this( + externalCatalog, + new SimpleFunctionRegistry, + SimpleCatalystConf(caseSensitiveAnalysis = true)) } /** List of temporary tables, mapping from table name to their logical plan. */ @@ -89,6 +91,8 @@ class SessionCatalog( @GuardedBy("this") protected var currentDb = formatDatabaseName(DEFAULT_DATABASE) + @volatile var functionResourceLoader: FunctionResourceLoader = _ + /** * Checks if the given name conforms the Hive standard ("[a-zA-z_0-9]+"), * i.e. if this name only contains characters, numbers, and _. @@ -987,6 +991,9 @@ class SessionCatalog( * by a tuple (resource type, resource uri). */ def loadFunctionResources(resources: Seq[FunctionResource]): Unit = { + if (functionResourceLoader == null) { + throw new IllegalStateException("functionResourceLoader has not yet been initialized") + } resources.foreach(functionResourceLoader.loadResource) } @@ -1182,4 +1189,29 @@ class SessionCatalog( } } + /** + * Create a new [[SessionCatalog]] with the provided parameters. `externalCatalog` and + * `globalTempViewManager` are `inherited`, while `currentDb` and `tempTables` are copied. + */ + def newSessionCatalogWith( + conf: CatalystConf, + hadoopConf: Configuration, + functionRegistry: FunctionRegistry, + parser: ParserInterface): SessionCatalog = { + val catalog = new SessionCatalog( + externalCatalog, + globalTempViewManager, + functionRegistry, + conf, + hadoopConf, + parser) + + synchronized { + catalog.currentDb = currentDb + // copy over temporary tables + tempTables.foreach(kv => catalog.tempTables.put(kv._1, kv._2)) + } + + catalog + } } diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalogSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalogSuite.scala index 328a16c4bf..7e74dcdef0 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalogSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalogSuite.scala @@ -17,6 +17,8 @@ package org.apache.spark.sql.catalyst.catalog +import org.apache.hadoop.conf.Configuration + import org.apache.spark.sql.AnalysisException import org.apache.spark.sql.catalyst.{FunctionIdentifier, SimpleCatalystConf, TableIdentifier} import org.apache.spark.sql.catalyst.analysis._ @@ -1197,6 +1199,59 @@ class SessionCatalogSuite extends PlanTest { } } + test("clone SessionCatalog - temp views") { + val externalCatalog = newEmptyCatalog() + val original = new SessionCatalog(externalCatalog) + val tempTable1 = Range(1, 10, 1, 10) + original.createTempView("copytest1", tempTable1, overrideIfExists = false) + + // check if tables copied over + val clone = original.newSessionCatalogWith( + SimpleCatalystConf(caseSensitiveAnalysis = true), + new Configuration(), + new SimpleFunctionRegistry, + CatalystSqlParser) + assert(original ne clone) + assert(clone.getTempView("copytest1") == Some(tempTable1)) + + // check if clone and original independent + clone.dropTable(TableIdentifier("copytest1"), ignoreIfNotExists = false, purge = false) + assert(original.getTempView("copytest1") == Some(tempTable1)) + + val tempTable2 = Range(1, 20, 2, 10) + original.createTempView("copytest2", tempTable2, overrideIfExists = false) + assert(clone.getTempView("copytest2").isEmpty) + } + + test("clone SessionCatalog - current db") { + val externalCatalog = newEmptyCatalog() + val db1 = "db1" + val db2 = "db2" + val db3 = "db3" + + externalCatalog.createDatabase(newDb(db1), ignoreIfExists = true) + externalCatalog.createDatabase(newDb(db2), ignoreIfExists = true) + externalCatalog.createDatabase(newDb(db3), ignoreIfExists = true) + + val original = new SessionCatalog(externalCatalog) + original.setCurrentDatabase(db1) + + // check if current db copied over + val clone = original.newSessionCatalogWith( + SimpleCatalystConf(caseSensitiveAnalysis = true), + new Configuration(), + new SimpleFunctionRegistry, + CatalystSqlParser) + assert(original ne clone) + assert(clone.getCurrentDatabase == db1) + + // check if clone and original independent + clone.setCurrentDatabase(db2) + assert(original.getCurrentDatabase == db1) + original.setCurrentDatabase(db3) + assert(clone.getCurrentDatabase == db2) + } + test("SPARK-19737: detect undefined functions without triggering relation resolution") { import org.apache.spark.sql.catalyst.dsl.plans._ |