aboutsummaryrefslogtreecommitdiff
path: root/sql/core/src
diff options
context:
space:
mode:
authorAndrew Or <andrew@databricks.com>2016-04-18 13:15:23 -0700
committerYin Huai <yhuai@databricks.com>2016-04-18 13:15:23 -0700
commit28ee15702d9efd52a26a065c6e544b5345a8f65d (patch)
treef097fa7ac83c4540e262da71d302aebc4b746daf /sql/core/src
parente4ae974294fc61f03b235f82d1618f29cad8feee (diff)
downloadspark-28ee15702d9efd52a26a065c6e544b5345a8f65d.tar.gz
spark-28ee15702d9efd52a26a065c6e544b5345a8f65d.tar.bz2
spark-28ee15702d9efd52a26a065c6e544b5345a8f65d.zip
[SPARK-14647][SQL] Group SQLContext/HiveContext state into SharedState
## What changes were proposed in this pull request? This patch adds a SharedState that groups state shared across multiple SQLContexts. This is analogous to the SessionState added in SPARK-13526 that groups session-specific state. This cleanup makes the constructors of the contexts simpler and ultimately allows us to remove HiveContext in the near future. ## How was this patch tested? Existing tests. Author: Yin Huai <yhuai@databricks.com> Closes #12463 from yhuai/sharedState.
Diffstat (limited to 'sql/core/src')
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala31
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/internal/SessionState.scala2
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/internal/SharedState.scala47
3 files changed, 61 insertions, 19 deletions
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala b/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala
index 9259ff4062..781d699819 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala
@@ -41,7 +41,7 @@ import org.apache.spark.sql.execution._
import org.apache.spark.sql.execution.command.ShowTablesCommand
import org.apache.spark.sql.execution.datasources._
import org.apache.spark.sql.execution.ui.{SQLListener, SQLTab}
-import org.apache.spark.sql.internal.{SessionState, SQLConf}
+import org.apache.spark.sql.internal.{SessionState, SharedState, SQLConf}
import org.apache.spark.sql.sources.BaseRelation
import org.apache.spark.sql.types._
import org.apache.spark.sql.util.ExecutionListenerManager
@@ -63,17 +63,14 @@ import org.apache.spark.util.Utils
* @since 1.0.0
*/
class SQLContext private[sql](
- @transient val sparkContext: SparkContext,
- @transient protected[sql] val cacheManager: CacheManager,
- @transient private[sql] val listener: SQLListener,
- val isRootContext: Boolean,
- @transient private[sql] val externalCatalog: ExternalCatalog)
+ @transient protected[sql] val sharedState: SharedState,
+ val isRootContext: Boolean)
extends Logging with Serializable {
self =>
def this(sc: SparkContext) = {
- this(sc, new CacheManager, SQLContext.createListenerAndUI(sc), true, new InMemoryCatalog)
+ this(new SharedState(sc), true)
}
def this(sparkContext: JavaSparkContext) = this(sparkContext.sc)
@@ -100,20 +97,20 @@ class SQLContext private[sql](
}
}
+ def sparkContext: SparkContext = sharedState.sparkContext
+
+ protected[sql] def cacheManager: CacheManager = sharedState.cacheManager
+ protected[sql] def listener: SQLListener = sharedState.listener
+ protected[sql] def externalCatalog: ExternalCatalog = sharedState.externalCatalog
+
/**
- * Returns a SQLContext as new session, with separated SQL configurations, temporary tables,
- * registered functions, but sharing the same SparkContext, CacheManager, SQLListener and SQLTab.
+ * Returns a [[SQLContext]] as new session, with separated SQL configurations, temporary
+ * tables, registered functions, but sharing the same [[SparkContext]], cached data and
+ * other things.
*
* @since 1.6.0
*/
- def newSession(): SQLContext = {
- new SQLContext(
- sparkContext = sparkContext,
- cacheManager = cacheManager,
- listener = listener,
- isRootContext = false,
- externalCatalog = externalCatalog)
- }
+ def newSession(): SQLContext = new SQLContext(sharedState, isRootContext = false)
/**
* Per-session state, e.g. configuration, functions, temporary tables etc.
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/internal/SessionState.scala b/sql/core/src/main/scala/org/apache/spark/sql/internal/SessionState.scala
index c30f879ded..d404a7c0ae 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/internal/SessionState.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/internal/SessionState.scala
@@ -22,10 +22,8 @@ import org.apache.spark.sql.catalyst.analysis.{Analyzer, FunctionRegistry}
import org.apache.spark.sql.catalyst.catalog.SessionCatalog
import org.apache.spark.sql.catalyst.optimizer.Optimizer
import org.apache.spark.sql.catalyst.parser.ParserInterface
-import org.apache.spark.sql.catalyst.rules.RuleExecutor
import org.apache.spark.sql.execution._
import org.apache.spark.sql.execution.datasources.{DataSourceAnalysis, PreInsertCastAndRename, ResolveDataSource}
-import org.apache.spark.sql.execution.exchange.{EnsureRequirements, ReuseExchange}
import org.apache.spark.sql.util.ExecutionListenerManager
/**
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/internal/SharedState.scala b/sql/core/src/main/scala/org/apache/spark/sql/internal/SharedState.scala
new file mode 100644
index 0000000000..9a30c7de1f
--- /dev/null
+++ b/sql/core/src/main/scala/org/apache/spark/sql/internal/SharedState.scala
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.internal
+
+import org.apache.spark.SparkContext
+import org.apache.spark.sql.SQLContext
+import org.apache.spark.sql.catalyst.catalog.{ExternalCatalog, InMemoryCatalog}
+import org.apache.spark.sql.execution.CacheManager
+import org.apache.spark.sql.execution.ui.SQLListener
+
+
+/**
+ * A class that holds all state shared across sessions in a given [[SQLContext]].
+ */
+private[sql] class SharedState(val sparkContext: SparkContext) {
+
+ /**
+ * Class for caching query results reused in future executions.
+ */
+ val cacheManager: CacheManager = new CacheManager
+
+ /**
+ * A listener for SQL-specific [[org.apache.spark.scheduler.SparkListenerEvent]]s.
+ */
+ val listener: SQLListener = SQLContext.createListenerAndUI(sparkContext)
+
+ /**
+ * A catalog that interacts with external systems.
+ */
+ lazy val externalCatalog: ExternalCatalog = new InMemoryCatalog
+
+}