aboutsummaryrefslogtreecommitdiff
path: root/sql/core/src
diff options
context:
space:
mode:
authorAndrew Or <andrew@databricks.com>2016-02-27 19:51:28 -0800
committerReynold Xin <rxin@databricks.com>2016-02-27 19:51:28 -0800
commitcca79fad66c4315b0ed6de59fd87700a540e6646 (patch)
treeca305c4e716f0138e1130bc7d6ab3a5bcf3928cf /sql/core/src
parent4c5e968db23db41c0bea802819ebd75fad63bc2b (diff)
downloadspark-cca79fad66c4315b0ed6de59fd87700a540e6646.tar.gz
spark-cca79fad66c4315b0ed6de59fd87700a540e6646.tar.bz2
spark-cca79fad66c4315b0ed6de59fd87700a540e6646.zip
[SPARK-13526][SQL] Move SQLContext per-session states to new class
## What changes were proposed in this pull request? This creates a `SessionState`, which groups a few fields that existed in `SQLContext`. Because `HiveContext` extends `SQLContext` we also need to make changes there. This is mainly a cleanup task that will soon pave the way for merging the two contexts. ## How was this patch tested? Existing unit tests; this patch introduces no change in behavior. Author: Andrew Or <andrew@databricks.com> Closes #11405 from andrewor14/refactor-session.
Diffstat (limited to 'sql/core/src')
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala88
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/UDFRegistration.scala5
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/internal/SessionState.scala111
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/test/TestSQLContext.scala22
4 files changed, 151 insertions, 75 deletions
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala b/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala
index 1c24d9e4ae..cb4a6397b2 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala
@@ -25,13 +25,12 @@ import scala.collection.JavaConverters._
import scala.collection.immutable
import scala.reflect.runtime.universe.TypeTag
-import org.apache.spark.{SparkContext, SparkException}
+import org.apache.spark.{Logging, SparkContext, SparkException}
import org.apache.spark.annotation.{DeveloperApi, Experimental}
import org.apache.spark.api.java.{JavaRDD, JavaSparkContext}
import org.apache.spark.rdd.RDD
import org.apache.spark.scheduler.{SparkListener, SparkListenerApplicationEnd}
-import org.apache.spark.sql.{execution => sparkexecution}
-import org.apache.spark.sql.catalyst.{InternalRow, _}
+import org.apache.spark.sql.catalyst._
import org.apache.spark.sql.catalyst.analysis._
import org.apache.spark.sql.catalyst.encoders.encoderFor
import org.apache.spark.sql.catalyst.expressions._
@@ -40,9 +39,8 @@ import org.apache.spark.sql.catalyst.plans.logical.{LocalRelation, LogicalPlan,
import org.apache.spark.sql.catalyst.rules.RuleExecutor
import org.apache.spark.sql.execution._
import org.apache.spark.sql.execution.datasources._
-import org.apache.spark.sql.execution.exchange.EnsureRequirements
import org.apache.spark.sql.execution.ui.{SQLListener, SQLTab}
-import org.apache.spark.sql.internal.SQLConf
+import org.apache.spark.sql.internal.{SessionState, SQLConf}
import org.apache.spark.sql.internal.SQLConf.SQLConfEntry
import org.apache.spark.sql.sources.BaseRelation
import org.apache.spark.sql.types._
@@ -68,7 +66,7 @@ class SQLContext private[sql](
@transient protected[sql] val cacheManager: CacheManager,
@transient private[sql] val listener: SQLListener,
val isRootContext: Boolean)
- extends org.apache.spark.Logging with Serializable {
+ extends Logging with Serializable {
self =>
@@ -115,9 +113,27 @@ class SQLContext private[sql](
}
/**
- * @return Spark SQL configuration
+ * Per-session state, e.g. configuration, functions, temporary tables etc.
*/
- protected[sql] lazy val conf = new SQLConf
+ @transient
+ protected[sql] lazy val sessionState: SessionState = new SessionState(self)
+ protected[sql] def conf: SQLConf = sessionState.conf
+ protected[sql] def catalog: Catalog = sessionState.catalog
+ protected[sql] def functionRegistry: FunctionRegistry = sessionState.functionRegistry
+ protected[sql] def analyzer: Analyzer = sessionState.analyzer
+ protected[sql] def optimizer: Optimizer = sessionState.optimizer
+ protected[sql] def sqlParser: ParserInterface = sessionState.sqlParser
+ protected[sql] def planner: SparkPlanner = sessionState.planner
+ protected[sql] def continuousQueryManager = sessionState.continuousQueryManager
+ protected[sql] def prepareForExecution: RuleExecutor[SparkPlan] =
+ sessionState.prepareForExecution
+
+ /**
+ * An interface to register custom [[org.apache.spark.sql.util.QueryExecutionListener]]s
+ * that listen for execution metrics.
+ */
+ @Experimental
+ def listenerManager: ExecutionListenerManager = sessionState.listenerManager
/**
* Set Spark SQL configuration properties.
@@ -179,43 +195,11 @@ class SQLContext private[sql](
*/
def getAllConfs: immutable.Map[String, String] = conf.getAllConfs
- @transient
- lazy val listenerManager: ExecutionListenerManager = new ExecutionListenerManager
-
- protected[sql] lazy val continuousQueryManager = new ContinuousQueryManager(this)
-
- @transient
- protected[sql] lazy val catalog: Catalog = new SimpleCatalog(conf)
-
- @transient
- protected[sql] lazy val functionRegistry: FunctionRegistry = FunctionRegistry.builtin.copy()
-
- @transient
- protected[sql] lazy val analyzer: Analyzer =
- new Analyzer(catalog, functionRegistry, conf) {
- override val extendedResolutionRules =
- python.ExtractPythonUDFs ::
- PreInsertCastAndRename ::
- (if (conf.runSQLOnFile) new ResolveDataSource(self) :: Nil else Nil)
-
- override val extendedCheckRules = Seq(
- datasources.PreWriteCheck(catalog)
- )
- }
-
- @transient
- protected[sql] lazy val optimizer: Optimizer = new SparkOptimizer(this)
-
- @transient
- protected[sql] val sqlParser: ParserInterface = new SparkQl(conf)
-
protected[sql] def parseSql(sql: String): LogicalPlan = sqlParser.parsePlan(sql)
- protected[sql] def executeSql(sql: String):
- org.apache.spark.sql.execution.QueryExecution = executePlan(parseSql(sql))
+ protected[sql] def executeSql(sql: String): QueryExecution = executePlan(parseSql(sql))
- protected[sql] def executePlan(plan: LogicalPlan) =
- new sparkexecution.QueryExecution(this, plan)
+ protected[sql] def executePlan(plan: LogicalPlan) = new QueryExecution(this, plan)
/**
* Add a jar to SQLContext
@@ -299,10 +283,8 @@ class SQLContext private[sql](
*
* @group basic
* @since 1.3.0
- * TODO move to SQLSession?
*/
- @transient
- val udf: UDFRegistration = new UDFRegistration(this)
+ def udf: UDFRegistration = sessionState.udf
/**
* Returns true if the table is currently cached in-memory.
@@ -873,25 +855,9 @@ class SQLContext private[sql](
}
@transient
- protected[sql] val planner: sparkexecution.SparkPlanner = new sparkexecution.SparkPlanner(this)
-
- @transient
protected[sql] lazy val emptyResult = sparkContext.parallelize(Seq.empty[InternalRow], 1)
/**
- * Prepares a planned SparkPlan for execution by inserting shuffle operations and internal
- * row format conversions as needed.
- */
- @transient
- protected[sql] val prepareForExecution = new RuleExecutor[SparkPlan] {
- val batches = Seq(
- Batch("Subquery", Once, PlanSubqueries(self)),
- Batch("Add exchange", Once, EnsureRequirements(self)),
- Batch("Whole stage codegen", Once, CollapseCodegenStages(self))
- )
- }
-
- /**
* Parses the data type in our internal string representation. The data type string should
* have the same format as the one generated by `toString` in scala.
* It is only used by PySpark.
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/UDFRegistration.scala b/sql/core/src/main/scala/org/apache/spark/sql/UDFRegistration.scala
index de01cbcb0e..d894825632 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/UDFRegistration.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/UDFRegistration.scala
@@ -22,6 +22,7 @@ import scala.util.Try
import org.apache.spark.Logging
import org.apache.spark.sql.api.java._
+import org.apache.spark.sql.catalyst.analysis.FunctionRegistry
import org.apache.spark.sql.catalyst.ScalaReflection
import org.apache.spark.sql.catalyst.expressions.{Expression, ScalaUDF}
import org.apache.spark.sql.execution.aggregate.ScalaUDAF
@@ -34,9 +35,7 @@ import org.apache.spark.sql.types.DataType
*
* @since 1.3.0
*/
-class UDFRegistration private[sql] (sqlContext: SQLContext) extends Logging {
-
- private val functionRegistry = sqlContext.functionRegistry
+class UDFRegistration private[sql] (functionRegistry: FunctionRegistry) extends Logging {
protected[sql] def registerPython(name: String, udf: UserDefinedPythonFunction): Unit = {
log.debug(
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/internal/SessionState.scala b/sql/core/src/main/scala/org/apache/spark/sql/internal/SessionState.scala
new file mode 100644
index 0000000000..f93a405f77
--- /dev/null
+++ b/sql/core/src/main/scala/org/apache/spark/sql/internal/SessionState.scala
@@ -0,0 +1,111 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.internal
+
+import org.apache.spark.sql.{ContinuousQueryManager, SQLContext, UDFRegistration}
+import org.apache.spark.sql.catalyst.ParserInterface
+import org.apache.spark.sql.catalyst.analysis.{Analyzer, Catalog, FunctionRegistry, SimpleCatalog}
+import org.apache.spark.sql.catalyst.optimizer.Optimizer
+import org.apache.spark.sql.catalyst.rules.RuleExecutor
+import org.apache.spark.sql.execution._
+import org.apache.spark.sql.execution.datasources.{PreInsertCastAndRename, ResolveDataSource}
+import org.apache.spark.sql.execution.exchange.EnsureRequirements
+import org.apache.spark.sql.util.ExecutionListenerManager
+
+
+/**
+ * A class that holds all session-specific state in a given [[SQLContext]].
+ */
+private[sql] class SessionState(ctx: SQLContext) {
+
+ // Note: These are all lazy vals because they depend on each other (e.g. conf) and we
+ // want subclasses to override some of the fields. Otherwise, we would get a lot of NPEs.
+
+ /**
+ * SQL-specific key-value configurations.
+ */
+ lazy val conf = new SQLConf
+
+ /**
+ * Internal catalog for managing table and database states.
+ */
+ lazy val catalog: Catalog = new SimpleCatalog(conf)
+
+ /**
+ * Internal catalog for managing functions registered by the user.
+ */
+ lazy val functionRegistry: FunctionRegistry = FunctionRegistry.builtin.copy()
+
+ /**
+ * Interface exposed to the user for registering user-defined functions.
+ */
+ lazy val udf: UDFRegistration = new UDFRegistration(functionRegistry)
+
+ /**
+ * Logical query plan analyzer for resolving unresolved attributes and relations.
+ */
+ lazy val analyzer: Analyzer = {
+ new Analyzer(catalog, functionRegistry, conf) {
+ override val extendedResolutionRules =
+ python.ExtractPythonUDFs ::
+ PreInsertCastAndRename ::
+ (if (conf.runSQLOnFile) new ResolveDataSource(ctx) :: Nil else Nil)
+
+ override val extendedCheckRules = Seq(datasources.PreWriteCheck(catalog))
+ }
+ }
+
+ /**
+ * Logical query plan optimizer.
+ */
+ lazy val optimizer: Optimizer = new SparkOptimizer(ctx)
+
+ /**
+ * Parser that extracts expressions, plans, table identifiers etc. from SQL texts.
+ */
+ lazy val sqlParser: ParserInterface = new SparkQl(conf)
+
+ /**
+ * Planner that converts optimized logical plans to physical plans.
+ */
+ lazy val planner: SparkPlanner = new SparkPlanner(ctx)
+
+ /**
+ * Prepares a planned [[SparkPlan]] for execution by inserting shuffle operations and internal
+ * row format conversions as needed.
+ */
+ lazy val prepareForExecution = new RuleExecutor[SparkPlan] {
+ override val batches: Seq[Batch] = Seq(
+ Batch("Subquery", Once, PlanSubqueries(ctx)),
+ Batch("Add exchange", Once, EnsureRequirements(ctx)),
+ Batch("Whole stage codegen", Once, CollapseCodegenStages(ctx))
+ )
+ }
+
+ /**
+ * An interface to register custom [[org.apache.spark.sql.util.QueryExecutionListener]]s
+ * that listen for execution metrics.
+ */
+ lazy val listenerManager: ExecutionListenerManager = new ExecutionListenerManager
+
+ /**
+ * Interface to start and stop [[org.apache.spark.sql.ContinuousQuery]]s.
+ */
+ lazy val continuousQueryManager: ContinuousQueryManager = new ContinuousQueryManager(ctx)
+
+}
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/test/TestSQLContext.scala b/sql/core/src/test/scala/org/apache/spark/sql/test/TestSQLContext.scala
index 28ad7ae64a..b3e146fba8 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/test/TestSQLContext.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/test/TestSQLContext.scala
@@ -19,7 +19,7 @@ package org.apache.spark.sql.test
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.sql.SQLContext
-import org.apache.spark.sql.internal.SQLConf
+import org.apache.spark.sql.internal.{SessionState, SQLConf}
/**
* A special [[SQLContext]] prepared for testing.
@@ -31,16 +31,16 @@ private[sql] class TestSQLContext(sc: SparkContext) extends SQLContext(sc) { sel
new SparkConf().set("spark.sql.testkey", "true")))
}
- protected[sql] override lazy val conf: SQLConf = new SQLConf {
-
- clear()
-
- override def clear(): Unit = {
- super.clear()
-
- // Make sure we start with the default test configs even after clear
- TestSQLContext.overrideConfs.foreach {
- case (key, value) => setConfString(key, value)
+ @transient
+ protected[sql] override lazy val sessionState: SessionState = new SessionState(self) {
+ override lazy val conf: SQLConf = {
+ new SQLConf {
+ clear()
+ override def clear(): Unit = {
+ super.clear()
+ // Make sure we start with the default test configs even after clear
+ TestSQLContext.overrideConfs.foreach { case (key, value) => setConfString(key, value) }
+ }
}
}
}