diff options
Diffstat (limited to 'sql/catalyst')
4 files changed, 100 insertions, 5 deletions
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/CatalystConf.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/CatalystConf.scala index fb99cb27b8..cff0efa979 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/CatalystConf.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/CatalystConf.scala @@ -66,6 +66,8 @@ trait CatalystConf { /** The maximum number of joined nodes allowed in the dynamic programming algorithm. */ def joinReorderDPThreshold: Int + + override def clone(): CatalystConf = throw new CloneNotSupportedException() } @@ -85,4 +87,7 @@ case class SimpleCatalystConf( joinReorderDPThreshold: Int = 12, warehousePath: String = "/user/hive/warehouse", sessionLocalTimeZone: String = TimeZone.getDefault().getID) - extends CatalystConf + extends CatalystConf { + + override def clone(): SimpleCatalystConf = this.copy() +} diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionRegistry.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionRegistry.scala index 556fa99017..0dcb44081f 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionRegistry.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionRegistry.scala @@ -64,6 +64,8 @@ trait FunctionRegistry { /** Clear all registered functions. */ def clear(): Unit + /** Create a copy of this registry with identical functions as this registry. */ + override def clone(): FunctionRegistry = throw new CloneNotSupportedException() } class SimpleFunctionRegistry extends FunctionRegistry { @@ -107,7 +109,7 @@ class SimpleFunctionRegistry extends FunctionRegistry { functionBuilders.clear() } - def copy(): SimpleFunctionRegistry = synchronized { + override def clone(): SimpleFunctionRegistry = synchronized { val registry = new SimpleFunctionRegistry functionBuilders.iterator.foreach { case (name, (info, builder)) => registry.registerFunction(name, info, builder) @@ -150,6 +152,7 @@ object EmptyFunctionRegistry extends FunctionRegistry { throw new UnsupportedOperationException } + override def clone(): FunctionRegistry = this } diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala index 831e37aac1..6cfc4a4321 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala @@ -50,7 +50,6 @@ object SessionCatalog { class SessionCatalog( externalCatalog: ExternalCatalog, globalTempViewManager: GlobalTempViewManager, - functionResourceLoader: FunctionResourceLoader, functionRegistry: FunctionRegistry, conf: CatalystConf, hadoopConf: Configuration, @@ -66,16 +65,19 @@ class SessionCatalog( this( externalCatalog, new GlobalTempViewManager("global_temp"), - DummyFunctionResourceLoader, functionRegistry, conf, new Configuration(), CatalystSqlParser) + functionResourceLoader = DummyFunctionResourceLoader } // For testing only. def this(externalCatalog: ExternalCatalog) { - this(externalCatalog, new SimpleFunctionRegistry, new SimpleCatalystConf(true)) + this( + externalCatalog, + new SimpleFunctionRegistry, + SimpleCatalystConf(caseSensitiveAnalysis = true)) } /** List of temporary tables, mapping from table name to their logical plan. */ @@ -89,6 +91,8 @@ class SessionCatalog( @GuardedBy("this") protected var currentDb = formatDatabaseName(DEFAULT_DATABASE) + @volatile var functionResourceLoader: FunctionResourceLoader = _ + /** * Checks if the given name conforms the Hive standard ("[a-zA-z_0-9]+"), * i.e. if this name only contains characters, numbers, and _. @@ -987,6 +991,9 @@ class SessionCatalog( * by a tuple (resource type, resource uri). */ def loadFunctionResources(resources: Seq[FunctionResource]): Unit = { + if (functionResourceLoader == null) { + throw new IllegalStateException("functionResourceLoader has not yet been initialized") + } resources.foreach(functionResourceLoader.loadResource) } @@ -1182,4 +1189,29 @@ class SessionCatalog( } } + /** + * Create a new [[SessionCatalog]] with the provided parameters. `externalCatalog` and + * `globalTempViewManager` are `inherited`, while `currentDb` and `tempTables` are copied. + */ + def newSessionCatalogWith( + conf: CatalystConf, + hadoopConf: Configuration, + functionRegistry: FunctionRegistry, + parser: ParserInterface): SessionCatalog = { + val catalog = new SessionCatalog( + externalCatalog, + globalTempViewManager, + functionRegistry, + conf, + hadoopConf, + parser) + + synchronized { + catalog.currentDb = currentDb + // copy over temporary tables + tempTables.foreach(kv => catalog.tempTables.put(kv._1, kv._2)) + } + + catalog + } } diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalogSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalogSuite.scala index 328a16c4bf..7e74dcdef0 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalogSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalogSuite.scala @@ -17,6 +17,8 @@ package org.apache.spark.sql.catalyst.catalog +import org.apache.hadoop.conf.Configuration + import org.apache.spark.sql.AnalysisException import org.apache.spark.sql.catalyst.{FunctionIdentifier, SimpleCatalystConf, TableIdentifier} import org.apache.spark.sql.catalyst.analysis._ @@ -1197,6 +1199,59 @@ class SessionCatalogSuite extends PlanTest { } } + test("clone SessionCatalog - temp views") { + val externalCatalog = newEmptyCatalog() + val original = new SessionCatalog(externalCatalog) + val tempTable1 = Range(1, 10, 1, 10) + original.createTempView("copytest1", tempTable1, overrideIfExists = false) + + // check if tables copied over + val clone = original.newSessionCatalogWith( + SimpleCatalystConf(caseSensitiveAnalysis = true), + new Configuration(), + new SimpleFunctionRegistry, + CatalystSqlParser) + assert(original ne clone) + assert(clone.getTempView("copytest1") == Some(tempTable1)) + + // check if clone and original independent + clone.dropTable(TableIdentifier("copytest1"), ignoreIfNotExists = false, purge = false) + assert(original.getTempView("copytest1") == Some(tempTable1)) + + val tempTable2 = Range(1, 20, 2, 10) + original.createTempView("copytest2", tempTable2, overrideIfExists = false) + assert(clone.getTempView("copytest2").isEmpty) + } + + test("clone SessionCatalog - current db") { + val externalCatalog = newEmptyCatalog() + val db1 = "db1" + val db2 = "db2" + val db3 = "db3" + + externalCatalog.createDatabase(newDb(db1), ignoreIfExists = true) + externalCatalog.createDatabase(newDb(db2), ignoreIfExists = true) + externalCatalog.createDatabase(newDb(db3), ignoreIfExists = true) + + val original = new SessionCatalog(externalCatalog) + original.setCurrentDatabase(db1) + + // check if current db copied over + val clone = original.newSessionCatalogWith( + SimpleCatalystConf(caseSensitiveAnalysis = true), + new Configuration(), + new SimpleFunctionRegistry, + CatalystSqlParser) + assert(original ne clone) + assert(clone.getCurrentDatabase == db1) + + // check if clone and original independent + clone.setCurrentDatabase(db2) + assert(original.getCurrentDatabase == db1) + original.setCurrentDatabase(db3) + assert(clone.getCurrentDatabase == db2) + } + test("SPARK-19737: detect undefined functions without triggering relation resolution") { import org.apache.spark.sql.catalyst.dsl.plans._ |