aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorAndrew Or <andrew@databricks.com>2016-04-17 17:35:41 -0700
committerAndrew Or <andrew@databricks.com>2016-04-17 17:35:41 -0700
commit7de06a646dff7ede520d2e982ac0996d8c184650 (patch)
tree119dd5028ba5c35fb2415fc26529c8dfe469f43c
parent699a4dfd89dc598e79955cfd6f66c06b6bf74be6 (diff)
downloadspark-7de06a646dff7ede520d2e982ac0996d8c184650.tar.gz
spark-7de06a646dff7ede520d2e982ac0996d8c184650.tar.bz2
spark-7de06a646dff7ede520d2e982ac0996d8c184650.zip
Revert "[SPARK-14647][SQL] Group SQLContext/HiveContext state into SharedState"
This reverts commit 5cefecc95a5b8418713516802c416cfde5a94a2d.
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala31
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/internal/SessionState.scala2
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/internal/SharedState.scala47
-rw-r--r--sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala51
-rw-r--r--sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionState.scala15
-rw-r--r--sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSharedState.scala53
-rw-r--r--sql/hive/src/main/scala/org/apache/spark/sql/hive/test/TestHive.scala86
-rw-r--r--sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveExternalCatalogSuite.scala12
8 files changed, 122 insertions, 175 deletions
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala b/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala
index 781d699819..9259ff4062 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala
@@ -41,7 +41,7 @@ import org.apache.spark.sql.execution._
import org.apache.spark.sql.execution.command.ShowTablesCommand
import org.apache.spark.sql.execution.datasources._
import org.apache.spark.sql.execution.ui.{SQLListener, SQLTab}
-import org.apache.spark.sql.internal.{SessionState, SharedState, SQLConf}
+import org.apache.spark.sql.internal.{SessionState, SQLConf}
import org.apache.spark.sql.sources.BaseRelation
import org.apache.spark.sql.types._
import org.apache.spark.sql.util.ExecutionListenerManager
@@ -63,14 +63,17 @@ import org.apache.spark.util.Utils
* @since 1.0.0
*/
class SQLContext private[sql](
- @transient protected[sql] val sharedState: SharedState,
- val isRootContext: Boolean)
+ @transient val sparkContext: SparkContext,
+ @transient protected[sql] val cacheManager: CacheManager,
+ @transient private[sql] val listener: SQLListener,
+ val isRootContext: Boolean,
+ @transient private[sql] val externalCatalog: ExternalCatalog)
extends Logging with Serializable {
self =>
def this(sc: SparkContext) = {
- this(new SharedState(sc), true)
+ this(sc, new CacheManager, SQLContext.createListenerAndUI(sc), true, new InMemoryCatalog)
}
def this(sparkContext: JavaSparkContext) = this(sparkContext.sc)
@@ -97,20 +100,20 @@ class SQLContext private[sql](
}
}
- def sparkContext: SparkContext = sharedState.sparkContext
-
- protected[sql] def cacheManager: CacheManager = sharedState.cacheManager
- protected[sql] def listener: SQLListener = sharedState.listener
- protected[sql] def externalCatalog: ExternalCatalog = sharedState.externalCatalog
-
/**
- * Returns a [[SQLContext]] as new session, with separated SQL configurations, temporary
- * tables, registered functions, but sharing the same [[SparkContext]], cached data and
- * other things.
+ * Returns a SQLContext as new session, with separated SQL configurations, temporary tables,
+ * registered functions, but sharing the same SparkContext, CacheManager, SQLListener and SQLTab.
*
* @since 1.6.0
*/
- def newSession(): SQLContext = new SQLContext(sharedState, isRootContext = false)
+ def newSession(): SQLContext = {
+ new SQLContext(
+ sparkContext = sparkContext,
+ cacheManager = cacheManager,
+ listener = listener,
+ isRootContext = false,
+ externalCatalog = externalCatalog)
+ }
/**
* Per-session state, e.g. configuration, functions, temporary tables etc.
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/internal/SessionState.scala b/sql/core/src/main/scala/org/apache/spark/sql/internal/SessionState.scala
index d404a7c0ae..c30f879ded 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/internal/SessionState.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/internal/SessionState.scala
@@ -22,8 +22,10 @@ import org.apache.spark.sql.catalyst.analysis.{Analyzer, FunctionRegistry}
import org.apache.spark.sql.catalyst.catalog.SessionCatalog
import org.apache.spark.sql.catalyst.optimizer.Optimizer
import org.apache.spark.sql.catalyst.parser.ParserInterface
+import org.apache.spark.sql.catalyst.rules.RuleExecutor
import org.apache.spark.sql.execution._
import org.apache.spark.sql.execution.datasources.{DataSourceAnalysis, PreInsertCastAndRename, ResolveDataSource}
+import org.apache.spark.sql.execution.exchange.{EnsureRequirements, ReuseExchange}
import org.apache.spark.sql.util.ExecutionListenerManager
/**
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/internal/SharedState.scala b/sql/core/src/main/scala/org/apache/spark/sql/internal/SharedState.scala
deleted file mode 100644
index 9a30c7de1f..0000000000
--- a/sql/core/src/main/scala/org/apache/spark/sql/internal/SharedState.scala
+++ /dev/null
@@ -1,47 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.sql.internal
-
-import org.apache.spark.SparkContext
-import org.apache.spark.sql.SQLContext
-import org.apache.spark.sql.catalyst.catalog.{ExternalCatalog, InMemoryCatalog}
-import org.apache.spark.sql.execution.CacheManager
-import org.apache.spark.sql.execution.ui.SQLListener
-
-
-/**
- * A class that holds all state shared across sessions in a given [[SQLContext]].
- */
-private[sql] class SharedState(val sparkContext: SparkContext) {
-
- /**
- * Class for caching query results reused in future executions.
- */
- val cacheManager: CacheManager = new CacheManager
-
- /**
- * A listener for SQL-specific [[org.apache.spark.scheduler.SparkListenerEvent]]s.
- */
- val listener: SQLListener = SQLContext.createListenerAndUI(sparkContext)
-
- /**
- * A catalog that interacts with external systems.
- */
- lazy val externalCatalog: ExternalCatalog = new InMemoryCatalog
-
-}
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala
index 71ef99a6a9..42cda0be16 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala
@@ -45,10 +45,12 @@ import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions.{Expression, LeafExpression}
import org.apache.spark.sql.catalyst.expressions.codegen.CodegenFallback
import org.apache.spark.sql.catalyst.plans.logical._
+import org.apache.spark.sql.execution._
import org.apache.spark.sql.execution.command.{ExecutedCommand, SetCommand}
+import org.apache.spark.sql.execution.ui.SQLListener
import org.apache.spark.sql.hive.client._
import org.apache.spark.sql.hive.execution.{AnalyzeTable, DescribeHiveTableCommand, HiveNativeCommand}
-import org.apache.spark.sql.internal.{SharedState, SQLConf}
+import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.internal.SQLConf._
import org.apache.spark.sql.types._
import org.apache.spark.unsafe.types.UTF8String
@@ -61,14 +63,32 @@ import org.apache.spark.util.Utils
* @since 1.0.0
*/
class HiveContext private[hive](
- @transient protected[hive] val hiveSharedState: HiveSharedState,
- override val isRootContext: Boolean)
- extends SQLContext(hiveSharedState, isRootContext) with Logging {
-
+ sc: SparkContext,
+ cacheManager: CacheManager,
+ listener: SQLListener,
+ @transient private[hive] val executionHive: HiveClientImpl,
+ @transient private[hive] val metadataHive: HiveClient,
+ isRootContext: Boolean,
+ @transient private[sql] val hiveCatalog: HiveExternalCatalog)
+ extends SQLContext(sc, cacheManager, listener, isRootContext, hiveCatalog) with Logging {
self =>
+ private def this(sc: SparkContext, execHive: HiveClientImpl, metaHive: HiveClient) {
+ this(
+ sc,
+ new CacheManager,
+ SQLContext.createListenerAndUI(sc),
+ execHive,
+ metaHive,
+ true,
+ new HiveExternalCatalog(metaHive))
+ }
+
def this(sc: SparkContext) = {
- this(new HiveSharedState(sc), true)
+ this(
+ sc,
+ HiveContext.newClientForExecution(sc.conf, sc.hadoopConfiguration),
+ HiveContext.newClientForMetadata(sc.conf, sc.hadoopConfiguration))
}
def this(sc: JavaSparkContext) = this(sc.sc)
@@ -83,16 +103,19 @@ class HiveContext private[hive](
* and Hive client (both of execution and metadata) with existing HiveContext.
*/
override def newSession(): HiveContext = {
- new HiveContext(hiveSharedState, isRootContext = false)
+ new HiveContext(
+ sc = sc,
+ cacheManager = cacheManager,
+ listener = listener,
+ executionHive = executionHive.newSession(),
+ metadataHive = metadataHive.newSession(),
+ isRootContext = false,
+ hiveCatalog = hiveCatalog)
}
@transient
protected[sql] override lazy val sessionState = new HiveSessionState(self)
- protected[hive] def hiveCatalog: HiveExternalCatalog = hiveSharedState.externalCatalog
- protected[hive] def executionHive: HiveClientImpl = sessionState.executionHive
- protected[hive] def metadataHive: HiveClient = sessionState.metadataHive
-
/**
* When true, enables an experimental feature where metastore tables that use the parquet SerDe
* are automatically converted to use the Spark SQL parquet table scan, instead of the Hive
@@ -136,7 +159,7 @@ class HiveContext private[hive](
protected[hive] def hiveThriftServerAsync: Boolean = getConf(HIVE_THRIFT_SERVER_ASYNC)
protected[hive] def hiveThriftServerSingleSession: Boolean =
- sparkContext.conf.getBoolean("spark.sql.hive.thriftServer.singleSession", defaultValue = false)
+ sc.conf.get("spark.sql.hive.thriftServer.singleSession", "false").toBoolean
@transient
protected[sql] lazy val substitutor = new VariableSubstitution()
@@ -504,9 +527,7 @@ private[hive] object HiveContext extends Logging {
* The version of the Hive client that is used here must match the metastore that is configured
* in the hive-site.xml file.
*/
- protected[hive] def newClientForMetadata(
- conf: SparkConf,
- hadoopConf: Configuration): HiveClient = {
+ private def newClientForMetadata(conf: SparkConf, hadoopConf: Configuration): HiveClient = {
val hiveConf = new HiveConf(hadoopConf, classOf[HiveConf])
val configurations = hiveClientConfigurations(hiveConf)
newClientForMetadata(conf, hiveConf, hadoopConf, configurations)
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionState.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionState.scala
index bc28b55d06..b992fda18c 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionState.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionState.scala
@@ -18,11 +18,10 @@
package org.apache.spark.sql.hive
import org.apache.spark.sql._
-import org.apache.spark.sql.catalyst.analysis.Analyzer
+import org.apache.spark.sql.catalyst.analysis.{Analyzer, FunctionRegistry}
import org.apache.spark.sql.catalyst.parser.ParserInterface
-import org.apache.spark.sql.execution.SparkPlanner
+import org.apache.spark.sql.execution.{python, SparkPlanner}
import org.apache.spark.sql.execution.datasources._
-import org.apache.spark.sql.hive.client.{HiveClient, HiveClientImpl}
import org.apache.spark.sql.hive.execution.HiveSqlParser
import org.apache.spark.sql.internal.{SessionState, SQLConf}
@@ -32,16 +31,6 @@ import org.apache.spark.sql.internal.{SessionState, SQLConf}
*/
private[hive] class HiveSessionState(ctx: HiveContext) extends SessionState(ctx) {
- /**
- * A Hive client used for execution.
- */
- val executionHive: HiveClientImpl = ctx.hiveSharedState.executionHive.newSession()
-
- /**
- * A Hive client used for interacting with the metastore.
- */
- val metadataHive: HiveClient = ctx.hiveSharedState.metadataHive.newSession()
-
override lazy val conf: SQLConf = new SQLConf {
override def caseSensitiveAnalysis: Boolean = getConf(SQLConf.CASE_SENSITIVE, false)
}
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSharedState.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSharedState.scala
deleted file mode 100644
index 11097c33df..0000000000
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSharedState.scala
+++ /dev/null
@@ -1,53 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.sql.hive
-
-import org.apache.spark.SparkContext
-import org.apache.spark.sql.hive.client.{HiveClient, HiveClientImpl}
-import org.apache.spark.sql.internal.SharedState
-
-
-/**
- * A class that holds all state shared across sessions in a given [[HiveContext]].
- */
-private[hive] class HiveSharedState(override val sparkContext: SparkContext)
- extends SharedState(sparkContext) {
-
- // TODO: just share the IsolatedClientLoader instead of the client instances themselves
-
- /**
- * A Hive client used for execution.
- */
- val executionHive: HiveClientImpl = {
- HiveContext.newClientForExecution(sparkContext.conf, sparkContext.hadoopConfiguration)
- }
-
- /**
- * A Hive client used to interact with the metastore.
- */
- // This needs to be a lazy val at here because TestHiveSharedState is overriding it.
- lazy val metadataHive: HiveClient = {
- HiveContext.newClientForMetadata(sparkContext.conf, sparkContext.hadoopConfiguration)
- }
-
- /**
- * A catalog that interacts with the Hive metastore.
- */
- override lazy val externalCatalog = new HiveExternalCatalog(metadataHive)
-
-}
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/test/TestHive.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/test/TestHive.scala
index d56d36fe32..7f6ca21782 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/test/TestHive.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/test/TestHive.scala
@@ -72,24 +72,63 @@ object TestHive
* test cases that rely on TestHive must be serialized.
*/
class TestHiveContext private[hive](
- testHiveSharedState: TestHiveSharedState,
+ sc: SparkContext,
+ cacheManager: CacheManager,
+ listener: SQLListener,
+ executionHive: HiveClientImpl,
+ metadataHive: HiveClient,
+ isRootContext: Boolean,
+ hiveCatalog: HiveExternalCatalog,
val warehousePath: File,
val scratchDirPath: File,
- metastoreTemporaryConf: Map[String, String],
- isRootContext: Boolean)
- extends HiveContext(testHiveSharedState, isRootContext) { self =>
+ metastoreTemporaryConf: Map[String, String])
+ extends HiveContext(
+ sc,
+ cacheManager,
+ listener,
+ executionHive,
+ metadataHive,
+ isRootContext,
+ hiveCatalog) { self =>
+
+ // Unfortunately, due to the complex interactions between the construction parameters
+ // and the limitations in scala constructors, we need many of these constructors to
+ // provide a shorthand to create a new TestHiveContext with only a SparkContext.
+ // This is not a great design pattern but it's necessary here.
private def this(
sc: SparkContext,
+ executionHive: HiveClientImpl,
+ metadataHive: HiveClient,
warehousePath: File,
scratchDirPath: File,
metastoreTemporaryConf: Map[String, String]) {
this(
- new TestHiveSharedState(sc, warehousePath, scratchDirPath, metastoreTemporaryConf),
+ sc,
+ new CacheManager,
+ SQLContext.createListenerAndUI(sc),
+ executionHive,
+ metadataHive,
+ true,
+ new HiveExternalCatalog(metadataHive),
warehousePath,
scratchDirPath,
- metastoreTemporaryConf,
- true)
+ metastoreTemporaryConf)
+ }
+
+ private def this(
+ sc: SparkContext,
+ warehousePath: File,
+ scratchDirPath: File,
+ metastoreTemporaryConf: Map[String, String]) {
+ this(
+ sc,
+ HiveContext.newClientForExecution(sc.conf, sc.hadoopConfiguration),
+ TestHiveContext.newClientForMetadata(
+ sc.conf, sc.hadoopConfiguration, warehousePath, scratchDirPath, metastoreTemporaryConf),
+ warehousePath,
+ scratchDirPath,
+ metastoreTemporaryConf)
}
def this(sc: SparkContext) {
@@ -102,11 +141,16 @@ class TestHiveContext private[hive](
override def newSession(): HiveContext = {
new TestHiveContext(
- testHiveSharedState,
- warehousePath,
- scratchDirPath,
- metastoreTemporaryConf,
- isRootContext = false)
+ sc = sc,
+ cacheManager = cacheManager,
+ listener = listener,
+ executionHive = executionHive.newSession(),
+ metadataHive = metadataHive.newSession(),
+ isRootContext = false,
+ hiveCatalog = hiveCatalog,
+ warehousePath = warehousePath,
+ scratchDirPath = scratchDirPath,
+ metastoreTemporaryConf = metastoreTemporaryConf)
}
// By clearing the port we force Spark to pick a new one. This allows us to rerun tests
@@ -505,22 +549,6 @@ private[hive] class TestHiveFunctionRegistry extends SimpleFunctionRegistry {
}
}
-
-private[hive] class TestHiveSharedState(
- sc: SparkContext,
- warehousePath: File,
- scratchDirPath: File,
- metastoreTemporaryConf: Map[String, String])
- extends HiveSharedState(sc) {
-
- override lazy val metadataHive: HiveClient = {
- TestHiveContext.newClientForMetadata(
- sc.conf, sc.hadoopConfiguration, warehousePath, scratchDirPath, metastoreTemporaryConf)
- }
-
-}
-
-
private[hive] object TestHiveContext {
/**
@@ -535,7 +563,7 @@ private[hive] object TestHiveContext {
/**
* Create a [[HiveClient]] used to retrieve metadata from the Hive MetaStore.
*/
- def newClientForMetadata(
+ private def newClientForMetadata(
conf: SparkConf,
hadoopConf: Configuration,
warehousePath: File,
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveExternalCatalogSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveExternalCatalogSuite.scala
index 84285b7f40..3334c16f0b 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveExternalCatalogSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveExternalCatalogSuite.scala
@@ -18,10 +18,12 @@
package org.apache.spark.sql.hive
import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.util.VersionInfo
import org.apache.spark.SparkConf
import org.apache.spark.sql.catalyst.catalog._
-import org.apache.spark.sql.hive.client.HiveClient
+import org.apache.spark.sql.hive.client.{HiveClient, IsolatedClientLoader}
+import org.apache.spark.util.Utils
/**
* Test suite for the [[HiveExternalCatalog]].
@@ -29,9 +31,11 @@ import org.apache.spark.sql.hive.client.HiveClient
class HiveExternalCatalogSuite extends CatalogTestCases {
private val client: HiveClient = {
- // We create a metastore at a temp location to avoid any potential
- // conflict of having multiple connections to a single derby instance.
- HiveContext.newClientForExecution(new SparkConf, new Configuration)
+ IsolatedClientLoader.forVersion(
+ hiveMetastoreVersion = HiveContext.hiveExecutionVersion,
+ hadoopVersion = VersionInfo.getVersion,
+ sparkConf = new SparkConf(),
+ hadoopConf = new Configuration()).createClient()
}
protected override val utils: CatalogTestUtils = new CatalogTestUtils {