aboutsummaryrefslogtreecommitdiff
path: root/sql/catalyst
diff options
context:
space:
mode:
authorHerman van Hovell <hvanhovell@databricks.com>2017-03-28 10:07:24 +0800
committerWenchen Fan <wenchen@databricks.com>2017-03-28 10:07:24 +0800
commitea361165e1ddce4d8aa0242ae3e878d7b39f1de2 (patch)
treef3014ba709d54b48172a399708074480a6ed9661 /sql/catalyst
parent8a6f33f0483dcee81467e6374a796b5dbd53ea30 (diff)
downloadspark-ea361165e1ddce4d8aa0242ae3e878d7b39f1de2.tar.gz
spark-ea361165e1ddce4d8aa0242ae3e878d7b39f1de2.tar.bz2
spark-ea361165e1ddce4d8aa0242ae3e878d7b39f1de2.zip
[SPARK-20100][SQL] Refactor SessionState initialization
## What changes were proposed in this pull request? The current SessionState initialization code path is quite complex. A part of the creation is done in the SessionState companion objects, a part of the creation is one inside the SessionState class, and a part is done by passing functions. This PR refactors this code path, and consolidates SessionState initialization into a builder class. This SessionState will not do any initialization and just becomes a place holder for the various Spark SQL internals. This also lays the ground work for two future improvements: 1. This provides us with a start for removing the `HiveSessionState`. Removing the `HiveSessionState` would also require us to move resource loading into a separate class, and to (re)move metadata hive. 2. This makes it easier to customize the Spark Session. Currently you will need to create a custom version of the builder. I have added hooks to facilitate this. A future step will be to create a semi stable API on top of this. ## How was this patch tested? Existing tests. Author: Herman van Hovell <hvanhovell@databricks.com> Closes #17433 from hvanhovell/SPARK-20100.
Diffstat (limited to 'sql/catalyst')
-rw-r--r--sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala46
-rw-r--r--sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala16
-rw-r--r--sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalogSuite.scala22
3 files changed, 31 insertions, 53 deletions
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala
index a469d12451..72ab075408 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala
@@ -54,7 +54,8 @@ class SessionCatalog(
functionRegistry: FunctionRegistry,
conf: CatalystConf,
hadoopConf: Configuration,
- parser: ParserInterface) extends Logging {
+ parser: ParserInterface,
+ functionResourceLoader: FunctionResourceLoader) extends Logging {
import SessionCatalog._
import CatalogTypes.TablePartitionSpec
@@ -69,8 +70,8 @@ class SessionCatalog(
functionRegistry,
conf,
new Configuration(),
- CatalystSqlParser)
- functionResourceLoader = DummyFunctionResourceLoader
+ CatalystSqlParser,
+ DummyFunctionResourceLoader)
}
// For testing only.
@@ -90,9 +91,7 @@ class SessionCatalog(
// check whether the temporary table or function exists, then, if not, operate on
// the corresponding item in the current database.
@GuardedBy("this")
- protected var currentDb = formatDatabaseName(DEFAULT_DATABASE)
-
- @volatile var functionResourceLoader: FunctionResourceLoader = _
+ protected var currentDb: String = formatDatabaseName(DEFAULT_DATABASE)
/**
* Checks if the given name conforms the Hive standard ("[a-zA-z_0-9]+"),
@@ -1059,9 +1058,6 @@ class SessionCatalog(
* by a tuple (resource type, resource uri).
*/
def loadFunctionResources(resources: Seq[FunctionResource]): Unit = {
- if (functionResourceLoader == null) {
- throw new IllegalStateException("functionResourceLoader has not yet been initialized")
- }
resources.foreach(functionResourceLoader.loadResource)
}
@@ -1259,28 +1255,16 @@ class SessionCatalog(
}
/**
- * Create a new [[SessionCatalog]] with the provided parameters. `externalCatalog` and
- * `globalTempViewManager` are `inherited`, while `currentDb` and `tempTables` are copied.
+ * Copy the current state of the catalog to another catalog.
+ *
+ * This function is synchronized on this [[SessionCatalog]] (the source) to make sure the copied
+ * state is consistent. The target [[SessionCatalog]] is not synchronized, and should not be
+ * because the target [[SessionCatalog]] should not be published at this point. The caller must
+ * synchronize on the target if this assumption does not hold.
*/
- def newSessionCatalogWith(
- conf: CatalystConf,
- hadoopConf: Configuration,
- functionRegistry: FunctionRegistry,
- parser: ParserInterface): SessionCatalog = {
- val catalog = new SessionCatalog(
- externalCatalog,
- globalTempViewManager,
- functionRegistry,
- conf,
- hadoopConf,
- parser)
-
- synchronized {
- catalog.currentDb = currentDb
- // copy over temporary tables
- tempTables.foreach(kv => catalog.tempTables.put(kv._1, kv._2))
- }
-
- catalog
+ private[sql] def copyStateTo(target: SessionCatalog): Unit = synchronized {
+ target.currentDb = currentDb
+ // copy over temporary tables
+ tempTables.foreach(kv => target.tempTables.put(kv._1, kv._2))
}
}
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
index dbe3ded4bb..dbf479d215 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
@@ -17,20 +17,14 @@
package org.apache.spark.sql.catalyst.optimizer
-import scala.annotation.tailrec
-import scala.collection.immutable.HashSet
import scala.collection.mutable
-import scala.collection.mutable.ArrayBuffer
-import org.apache.spark.api.java.function.FilterFunction
import org.apache.spark.sql.AnalysisException
import org.apache.spark.sql.catalyst.{CatalystConf, SimpleCatalystConf}
import org.apache.spark.sql.catalyst.analysis._
import org.apache.spark.sql.catalyst.catalog.{InMemoryCatalog, SessionCatalog}
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.expressions.aggregate._
-import org.apache.spark.sql.catalyst.expressions.Literal.{FalseLiteral, TrueLiteral}
-import org.apache.spark.sql.catalyst.planning.ExtractFiltersAndInnerJoins
import org.apache.spark.sql.catalyst.plans._
import org.apache.spark.sql.catalyst.plans.logical._
import org.apache.spark.sql.catalyst.rules._
@@ -79,7 +73,7 @@ abstract class Optimizer(sessionCatalog: SessionCatalog, conf: CatalystConf)
Batch("Aggregate", fixedPoint,
RemoveLiteralFromGroupExpressions,
RemoveRepetitionFromGroupExpressions) ::
- Batch("Operator Optimizations", fixedPoint,
+ Batch("Operator Optimizations", fixedPoint, Seq(
// Operator push down
PushProjectionThroughUnion,
ReorderJoin(conf),
@@ -117,7 +111,8 @@ abstract class Optimizer(sessionCatalog: SessionCatalog, conf: CatalystConf)
RemoveRedundantProject,
SimplifyCreateStructOps,
SimplifyCreateArrayOps,
- SimplifyCreateMapOps) ::
+ SimplifyCreateMapOps) ++
+ extendedOperatorOptimizationRules: _*) ::
Batch("Check Cartesian Products", Once,
CheckCartesianProducts(conf)) ::
Batch("Join Reorder", Once,
@@ -146,6 +141,11 @@ abstract class Optimizer(sessionCatalog: SessionCatalog, conf: CatalystConf)
s.withNewPlan(newPlan)
}
}
+
+ /**
+ * Override to provide additional rules for the operator optimization batch.
+ */
+ def extendedOperatorOptimizationRules: Seq[Rule[LogicalPlan]] = Nil
}
/**
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalogSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalogSuite.scala
index ca4ce1c117..56bca73a88 100644
--- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalogSuite.scala
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalogSuite.scala
@@ -17,8 +17,6 @@
package org.apache.spark.sql.catalyst.catalog
-import org.apache.hadoop.conf.Configuration
-
import org.apache.spark.sql.AnalysisException
import org.apache.spark.sql.catalyst.{FunctionIdentifier, SimpleCatalystConf, TableIdentifier}
import org.apache.spark.sql.catalyst.analysis._
@@ -1331,17 +1329,15 @@ abstract class SessionCatalogSuite extends PlanTest {
}
}
- test("clone SessionCatalog - temp views") {
+ test("copy SessionCatalog state - temp views") {
withEmptyCatalog { original =>
val tempTable1 = Range(1, 10, 1, 10)
original.createTempView("copytest1", tempTable1, overrideIfExists = false)
// check if tables copied over
- val clone = original.newSessionCatalogWith(
- SimpleCatalystConf(caseSensitiveAnalysis = true),
- new Configuration(),
- new SimpleFunctionRegistry,
- CatalystSqlParser)
+ val clone = new SessionCatalog(original.externalCatalog)
+ original.copyStateTo(clone)
+
assert(original ne clone)
assert(clone.getTempView("copytest1") == Some(tempTable1))
@@ -1355,7 +1351,7 @@ abstract class SessionCatalogSuite extends PlanTest {
}
}
- test("clone SessionCatalog - current db") {
+ test("copy SessionCatalog state - current db") {
withEmptyCatalog { original =>
val db1 = "db1"
val db2 = "db2"
@@ -1368,11 +1364,9 @@ abstract class SessionCatalogSuite extends PlanTest {
original.setCurrentDatabase(db1)
// check if current db copied over
- val clone = original.newSessionCatalogWith(
- SimpleCatalystConf(caseSensitiveAnalysis = true),
- new Configuration(),
- new SimpleFunctionRegistry,
- CatalystSqlParser)
+ val clone = new SessionCatalog(original.externalCatalog)
+ original.copyStateTo(clone)
+
assert(original ne clone)
assert(clone.getCurrentDatabase == db1)