diff options
author | Herman van Hovell <hvanhovell@databricks.com> | 2017-03-28 10:07:24 +0800 |
---|---|---|
committer | Wenchen Fan <wenchen@databricks.com> | 2017-03-28 10:07:24 +0800 |
commit | ea361165e1ddce4d8aa0242ae3e878d7b39f1de2 (patch) | |
tree | f3014ba709d54b48172a399708074480a6ed9661 /sql/catalyst/src/main | |
parent | 8a6f33f0483dcee81467e6374a796b5dbd53ea30 (diff) | |
download | spark-ea361165e1ddce4d8aa0242ae3e878d7b39f1de2.tar.gz spark-ea361165e1ddce4d8aa0242ae3e878d7b39f1de2.tar.bz2 spark-ea361165e1ddce4d8aa0242ae3e878d7b39f1de2.zip |
[SPARK-20100][SQL] Refactor SessionState initialization
## What changes were proposed in this pull request?
The current SessionState initialization code path is quite complex. A part of the creation is done in the SessionState companion objects, a part of the creation is one inside the SessionState class, and a part is done by passing functions.
This PR refactors this code path, and consolidates SessionState initialization into a builder class. This SessionState will not do any initialization and just becomes a place holder for the various Spark SQL internals. This also lays the ground work for two future improvements:
1. This provides us with a start for removing the `HiveSessionState`. Removing the `HiveSessionState` would also require us to move resource loading into a separate class, and to (re)move metadata hive.
2. This makes it easier to customize the Spark Session. Currently you will need to create a custom version of the builder. I have added hooks to facilitate this. A future step will be to create a semi stable API on top of this.
## How was this patch tested?
Existing tests.
Author: Herman van Hovell <hvanhovell@databricks.com>
Closes #17433 from hvanhovell/SPARK-20100.
Diffstat (limited to 'sql/catalyst/src/main')
-rw-r--r-- | sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala | 46 | ||||
-rw-r--r-- | sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala | 16 |
2 files changed, 23 insertions, 39 deletions
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala index a469d12451..72ab075408 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala @@ -54,7 +54,8 @@ class SessionCatalog( functionRegistry: FunctionRegistry, conf: CatalystConf, hadoopConf: Configuration, - parser: ParserInterface) extends Logging { + parser: ParserInterface, + functionResourceLoader: FunctionResourceLoader) extends Logging { import SessionCatalog._ import CatalogTypes.TablePartitionSpec @@ -69,8 +70,8 @@ class SessionCatalog( functionRegistry, conf, new Configuration(), - CatalystSqlParser) - functionResourceLoader = DummyFunctionResourceLoader + CatalystSqlParser, + DummyFunctionResourceLoader) } // For testing only. @@ -90,9 +91,7 @@ class SessionCatalog( // check whether the temporary table or function exists, then, if not, operate on // the corresponding item in the current database. @GuardedBy("this") - protected var currentDb = formatDatabaseName(DEFAULT_DATABASE) - - @volatile var functionResourceLoader: FunctionResourceLoader = _ + protected var currentDb: String = formatDatabaseName(DEFAULT_DATABASE) /** * Checks if the given name conforms the Hive standard ("[a-zA-z_0-9]+"), @@ -1059,9 +1058,6 @@ class SessionCatalog( * by a tuple (resource type, resource uri). */ def loadFunctionResources(resources: Seq[FunctionResource]): Unit = { - if (functionResourceLoader == null) { - throw new IllegalStateException("functionResourceLoader has not yet been initialized") - } resources.foreach(functionResourceLoader.loadResource) } @@ -1259,28 +1255,16 @@ class SessionCatalog( } /** - * Create a new [[SessionCatalog]] with the provided parameters. `externalCatalog` and - * `globalTempViewManager` are `inherited`, while `currentDb` and `tempTables` are copied. + * Copy the current state of the catalog to another catalog. + * + * This function is synchronized on this [[SessionCatalog]] (the source) to make sure the copied + * state is consistent. The target [[SessionCatalog]] is not synchronized, and should not be + * because the target [[SessionCatalog]] should not be published at this point. The caller must + * synchronize on the target if this assumption does not hold. */ - def newSessionCatalogWith( - conf: CatalystConf, - hadoopConf: Configuration, - functionRegistry: FunctionRegistry, - parser: ParserInterface): SessionCatalog = { - val catalog = new SessionCatalog( - externalCatalog, - globalTempViewManager, - functionRegistry, - conf, - hadoopConf, - parser) - - synchronized { - catalog.currentDb = currentDb - // copy over temporary tables - tempTables.foreach(kv => catalog.tempTables.put(kv._1, kv._2)) - } - - catalog + private[sql] def copyStateTo(target: SessionCatalog): Unit = synchronized { + target.currentDb = currentDb + // copy over temporary tables + tempTables.foreach(kv => target.tempTables.put(kv._1, kv._2)) } } diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala index dbe3ded4bb..dbf479d215 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala @@ -17,20 +17,14 @@ package org.apache.spark.sql.catalyst.optimizer -import scala.annotation.tailrec -import scala.collection.immutable.HashSet import scala.collection.mutable -import scala.collection.mutable.ArrayBuffer -import org.apache.spark.api.java.function.FilterFunction import org.apache.spark.sql.AnalysisException import org.apache.spark.sql.catalyst.{CatalystConf, SimpleCatalystConf} import org.apache.spark.sql.catalyst.analysis._ import org.apache.spark.sql.catalyst.catalog.{InMemoryCatalog, SessionCatalog} import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.expressions.aggregate._ -import org.apache.spark.sql.catalyst.expressions.Literal.{FalseLiteral, TrueLiteral} -import org.apache.spark.sql.catalyst.planning.ExtractFiltersAndInnerJoins import org.apache.spark.sql.catalyst.plans._ import org.apache.spark.sql.catalyst.plans.logical._ import org.apache.spark.sql.catalyst.rules._ @@ -79,7 +73,7 @@ abstract class Optimizer(sessionCatalog: SessionCatalog, conf: CatalystConf) Batch("Aggregate", fixedPoint, RemoveLiteralFromGroupExpressions, RemoveRepetitionFromGroupExpressions) :: - Batch("Operator Optimizations", fixedPoint, + Batch("Operator Optimizations", fixedPoint, Seq( // Operator push down PushProjectionThroughUnion, ReorderJoin(conf), @@ -117,7 +111,8 @@ abstract class Optimizer(sessionCatalog: SessionCatalog, conf: CatalystConf) RemoveRedundantProject, SimplifyCreateStructOps, SimplifyCreateArrayOps, - SimplifyCreateMapOps) :: + SimplifyCreateMapOps) ++ + extendedOperatorOptimizationRules: _*) :: Batch("Check Cartesian Products", Once, CheckCartesianProducts(conf)) :: Batch("Join Reorder", Once, @@ -146,6 +141,11 @@ abstract class Optimizer(sessionCatalog: SessionCatalog, conf: CatalystConf) s.withNewPlan(newPlan) } } + + /** + * Override to provide additional rules for the operator optimization batch. + */ + def extendedOperatorOptimizationRules: Seq[Rule[LogicalPlan]] = Nil } /** |