diff options
author | Andrew Or <andrew@databricks.com> | 2016-04-16 14:00:53 -0700 |
---|---|---|
committer | Yin Huai <yhuai@databricks.com> | 2016-04-16 14:00:53 -0700 |
commit | 5cefecc95a5b8418713516802c416cfde5a94a2d (patch) | |
tree | 0285112a763748ee531a6124c2d872f7cfaac00e /sql/core/src | |
parent | 3f49afee937a66d458e0c194e46d6a9e380e054e (diff) | |
download | spark-5cefecc95a5b8418713516802c416cfde5a94a2d.tar.gz spark-5cefecc95a5b8418713516802c416cfde5a94a2d.tar.bz2 spark-5cefecc95a5b8418713516802c416cfde5a94a2d.zip |
[SPARK-14647][SQL] Group SQLContext/HiveContext state into SharedState
## What changes were proposed in this pull request?
This patch adds a SharedState that groups state shared across multiple SQLContexts. This is analogous to the SessionState added in SPARK-13526 that groups session-specific state. This cleanup makes the constructors of the contexts simpler and ultimately allows us to remove HiveContext in the near future.
## How was this patch tested?
Existing tests.
Closes #12405
Author: Andrew Or <andrew@databricks.com>
Author: Yin Huai <yhuai@databricks.com>
Closes #12447 from yhuai/sharedState.
Diffstat (limited to 'sql/core/src')
3 files changed, 61 insertions, 19 deletions
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala b/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala index 9259ff4062..781d699819 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala @@ -41,7 +41,7 @@ import org.apache.spark.sql.execution._ import org.apache.spark.sql.execution.command.ShowTablesCommand import org.apache.spark.sql.execution.datasources._ import org.apache.spark.sql.execution.ui.{SQLListener, SQLTab} -import org.apache.spark.sql.internal.{SessionState, SQLConf} +import org.apache.spark.sql.internal.{SessionState, SharedState, SQLConf} import org.apache.spark.sql.sources.BaseRelation import org.apache.spark.sql.types._ import org.apache.spark.sql.util.ExecutionListenerManager @@ -63,17 +63,14 @@ import org.apache.spark.util.Utils * @since 1.0.0 */ class SQLContext private[sql]( - @transient val sparkContext: SparkContext, - @transient protected[sql] val cacheManager: CacheManager, - @transient private[sql] val listener: SQLListener, - val isRootContext: Boolean, - @transient private[sql] val externalCatalog: ExternalCatalog) + @transient protected[sql] val sharedState: SharedState, + val isRootContext: Boolean) extends Logging with Serializable { self => def this(sc: SparkContext) = { - this(sc, new CacheManager, SQLContext.createListenerAndUI(sc), true, new InMemoryCatalog) + this(new SharedState(sc), true) } def this(sparkContext: JavaSparkContext) = this(sparkContext.sc) @@ -100,20 +97,20 @@ class SQLContext private[sql]( } } + def sparkContext: SparkContext = sharedState.sparkContext + + protected[sql] def cacheManager: CacheManager = sharedState.cacheManager + protected[sql] def listener: SQLListener = sharedState.listener + protected[sql] def externalCatalog: ExternalCatalog = sharedState.externalCatalog + /** - * Returns a SQLContext as new session, with separated SQL configurations, temporary tables, - * registered functions, but sharing the same SparkContext, CacheManager, SQLListener and SQLTab. + * Returns a [[SQLContext]] as new session, with separated SQL configurations, temporary + * tables, registered functions, but sharing the same [[SparkContext]], cached data and + * other things. * * @since 1.6.0 */ - def newSession(): SQLContext = { - new SQLContext( - sparkContext = sparkContext, - cacheManager = cacheManager, - listener = listener, - isRootContext = false, - externalCatalog = externalCatalog) - } + def newSession(): SQLContext = new SQLContext(sharedState, isRootContext = false) /** * Per-session state, e.g. configuration, functions, temporary tables etc. diff --git a/sql/core/src/main/scala/org/apache/spark/sql/internal/SessionState.scala b/sql/core/src/main/scala/org/apache/spark/sql/internal/SessionState.scala index c30f879ded..d404a7c0ae 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/internal/SessionState.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/internal/SessionState.scala @@ -22,10 +22,8 @@ import org.apache.spark.sql.catalyst.analysis.{Analyzer, FunctionRegistry} import org.apache.spark.sql.catalyst.catalog.SessionCatalog import org.apache.spark.sql.catalyst.optimizer.Optimizer import org.apache.spark.sql.catalyst.parser.ParserInterface -import org.apache.spark.sql.catalyst.rules.RuleExecutor import org.apache.spark.sql.execution._ import org.apache.spark.sql.execution.datasources.{DataSourceAnalysis, PreInsertCastAndRename, ResolveDataSource} -import org.apache.spark.sql.execution.exchange.{EnsureRequirements, ReuseExchange} import org.apache.spark.sql.util.ExecutionListenerManager /** diff --git a/sql/core/src/main/scala/org/apache/spark/sql/internal/SharedState.scala b/sql/core/src/main/scala/org/apache/spark/sql/internal/SharedState.scala new file mode 100644 index 0000000000..9a30c7de1f --- /dev/null +++ b/sql/core/src/main/scala/org/apache/spark/sql/internal/SharedState.scala @@ -0,0 +1,47 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.internal + +import org.apache.spark.SparkContext +import org.apache.spark.sql.SQLContext +import org.apache.spark.sql.catalyst.catalog.{ExternalCatalog, InMemoryCatalog} +import org.apache.spark.sql.execution.CacheManager +import org.apache.spark.sql.execution.ui.SQLListener + + +/** + * A class that holds all state shared across sessions in a given [[SQLContext]]. + */ +private[sql] class SharedState(val sparkContext: SparkContext) { + + /** + * Class for caching query results reused in future executions. + */ + val cacheManager: CacheManager = new CacheManager + + /** + * A listener for SQL-specific [[org.apache.spark.scheduler.SparkListenerEvent]]s. + */ + val listener: SQLListener = SQLContext.createListenerAndUI(sparkContext) + + /** + * A catalog that interacts with external systems. + */ + lazy val externalCatalog: ExternalCatalog = new InMemoryCatalog + +} |