aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala10
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala42
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/MultiSQLContextsSuite.scala99
-rw-r--r--sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala12
4 files changed, 156 insertions, 7 deletions
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala b/sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala
index 47397c4be3..f62df9bdeb 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala
@@ -186,6 +186,16 @@ private[spark] object SQLConf {
import SQLConfEntry._
+ val ALLOW_MULTIPLE_CONTEXTS = booleanConf("spark.sql.allowMultipleContexts",
+ defaultValue = Some(true),
+ doc = "When set to true, creating multiple SQLContexts/HiveContexts is allowed." +
+ "When set to false, only one SQLContext/HiveContext is allowed to be created " +
+ "through the constructor (new SQLContexts/HiveContexts created through newSession " +
+ "method is allowed). Please note that this conf needs to be set in Spark Conf. Once" +
+ "a SQLContext/HiveContext has been created, changing the value of this conf will not" +
+ "have effect.",
+ isPublic = true)
+
val COMPRESS_CACHED = booleanConf("spark.sql.inMemoryColumnarStorage.compressed",
defaultValue = Some(true),
doc = "When set to true Spark SQL will automatically select a compression codec for each " +
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala b/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala
index 2bdfd82af0..1bd2913892 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala
@@ -26,7 +26,7 @@ import scala.collection.immutable
import scala.reflect.runtime.universe.TypeTag
import scala.util.control.NonFatal
-import org.apache.spark.SparkContext
+import org.apache.spark.{SparkException, SparkContext}
import org.apache.spark.annotation.{DeveloperApi, Experimental}
import org.apache.spark.api.java.{JavaRDD, JavaSparkContext}
import org.apache.spark.rdd.RDD
@@ -64,14 +64,37 @@ import org.apache.spark.util.Utils
*/
class SQLContext private[sql](
@transient val sparkContext: SparkContext,
- @transient protected[sql] val cacheManager: CacheManager)
+ @transient protected[sql] val cacheManager: CacheManager,
+ val isRootContext: Boolean)
extends org.apache.spark.Logging with Serializable {
self =>
- def this(sparkContext: SparkContext) = this(sparkContext, new CacheManager)
+ def this(sparkContext: SparkContext) = this(sparkContext, new CacheManager, true)
def this(sparkContext: JavaSparkContext) = this(sparkContext.sc)
+ // If spark.sql.allowMultipleContexts is true, we will throw an exception if a user
+ // wants to create a new root SQLContext (a SLQContext that is not created by newSession).
+ private val allowMultipleContexts =
+ sparkContext.conf.getBoolean(
+ SQLConf.ALLOW_MULTIPLE_CONTEXTS.key,
+ SQLConf.ALLOW_MULTIPLE_CONTEXTS.defaultValue.get)
+
+ // Assert no root SQLContext is running when allowMultipleContexts is false.
+ {
+ if (!allowMultipleContexts && isRootContext) {
+ SQLContext.getInstantiatedContextOption() match {
+ case Some(rootSQLContext) =>
+ val errMsg = "Only one SQLContext/HiveContext may be running in this JVM. " +
+ s"It is recommended to use SQLContext.getOrCreate to get the instantiated " +
+ s"SQLContext/HiveContext. To ignore this error, " +
+ s"set ${SQLConf.ALLOW_MULTIPLE_CONTEXTS.key} = true in SparkConf."
+ throw new SparkException(errMsg)
+ case None => // OK
+ }
+ }
+ }
+
/**
* Returns a SQLContext as new session, with separated SQL configurations, temporary tables,
* registered functions, but sharing the same SparkContext and CacheManager.
@@ -79,7 +102,10 @@ class SQLContext private[sql](
* @since 1.6.0
*/
def newSession(): SQLContext = {
- new SQLContext(sparkContext, cacheManager)
+ new SQLContext(
+ sparkContext = sparkContext,
+ cacheManager = cacheManager,
+ isRootContext = false)
}
/**
@@ -1239,6 +1265,10 @@ object SQLContext {
instantiatedContext.compareAndSet(null, sqlContext)
}
+ private[sql] def getInstantiatedContextOption(): Option[SQLContext] = {
+ Option(instantiatedContext.get())
+ }
+
/**
* Changes the SQLContext that will be returned in this thread and its children when
* SQLContext.getOrCreate() is called. This can be used to ensure that a given thread receives
@@ -1260,6 +1290,10 @@ object SQLContext {
activeContext.remove()
}
+ private[sql] def getActiveContextOption(): Option[SQLContext] = {
+ Option(activeContext.get())
+ }
+
/**
* Converts an iterator of Java Beans to InternalRow using the provided
* bean info & schema. This is not related to the singleton, but is a static
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/MultiSQLContextsSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/MultiSQLContextsSuite.scala
new file mode 100644
index 0000000000..0e8fcb6a85
--- /dev/null
+++ b/sql/core/src/test/scala/org/apache/spark/sql/MultiSQLContextsSuite.scala
@@ -0,0 +1,99 @@
+/*
+* Licensed to the Apache Software Foundation (ASF) under one or more
+* contributor license agreements. See the NOTICE file distributed with
+* this work for additional information regarding copyright ownership.
+* The ASF licenses this file to You under the Apache License, Version 2.0
+* (the "License"); you may not use this file except in compliance with
+* the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.spark.sql
+
+import org.apache.spark._
+import org.scalatest.BeforeAndAfterAll
+
+class MultiSQLContextsSuite extends SparkFunSuite with BeforeAndAfterAll {
+
+ private var originalActiveSQLContext: Option[SQLContext] = _
+ private var originalInstantiatedSQLContext: Option[SQLContext] = _
+ private var sparkConf: SparkConf = _
+
+ override protected def beforeAll(): Unit = {
+ originalActiveSQLContext = SQLContext.getActiveContextOption()
+ originalInstantiatedSQLContext = SQLContext.getInstantiatedContextOption()
+
+ SQLContext.clearActive()
+ originalInstantiatedSQLContext.foreach(ctx => SQLContext.clearInstantiatedContext(ctx))
+ sparkConf =
+ new SparkConf(false)
+ .setMaster("local[*]")
+ .setAppName("test")
+ .set("spark.ui.enabled", "false")
+ .set("spark.driver.allowMultipleContexts", "true")
+ }
+
+ override protected def afterAll(): Unit = {
+ // Set these states back.
+ originalActiveSQLContext.foreach(ctx => SQLContext.setActive(ctx))
+ originalInstantiatedSQLContext.foreach(ctx => SQLContext.setInstantiatedContext(ctx))
+ }
+
+ def testNewSession(rootSQLContext: SQLContext): Unit = {
+ // Make sure we can successfully create new Session.
+ rootSQLContext.newSession()
+
+ // Reset the state. It is always safe to clear the active context.
+ SQLContext.clearActive()
+ }
+
+ def testCreatingNewSQLContext(allowsMultipleContexts: Boolean): Unit = {
+ val conf =
+ sparkConf
+ .clone
+ .set(SQLConf.ALLOW_MULTIPLE_CONTEXTS.key, allowsMultipleContexts.toString)
+ val sparkContext = new SparkContext(conf)
+
+ try {
+ if (allowsMultipleContexts) {
+ new SQLContext(sparkContext)
+ SQLContext.clearActive()
+ } else {
+ // If allowsMultipleContexts is false, make sure we can get the error.
+ val message = intercept[SparkException] {
+ new SQLContext(sparkContext)
+ }.getMessage
+ assert(message.contains("Only one SQLContext/HiveContext may be running"))
+ }
+ } finally {
+ sparkContext.stop()
+ }
+ }
+
+ test("test the flag to disallow creating multiple root SQLContext") {
+ Seq(false, true).foreach { allowMultipleSQLContexts =>
+ val conf =
+ sparkConf
+ .clone
+ .set(SQLConf.ALLOW_MULTIPLE_CONTEXTS.key, allowMultipleSQLContexts.toString)
+ val sc = new SparkContext(conf)
+ try {
+ val rootSQLContext = new SQLContext(sc)
+ testNewSession(rootSQLContext)
+ testNewSession(rootSQLContext)
+ testCreatingNewSQLContext(allowMultipleSQLContexts)
+
+ SQLContext.clearInstantiatedContext(rootSQLContext)
+ } finally {
+ sc.stop()
+ }
+ }
+ }
+}
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala
index dad1e2347c..ddeadd3eb7 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala
@@ -89,10 +89,11 @@ class HiveContext private[hive](
sc: SparkContext,
cacheManager: CacheManager,
@transient execHive: ClientWrapper,
- @transient metaHive: ClientInterface) extends SQLContext(sc, cacheManager) with Logging {
+ @transient metaHive: ClientInterface,
+ isRootContext: Boolean) extends SQLContext(sc, cacheManager, isRootContext) with Logging {
self =>
- def this(sc: SparkContext) = this(sc, new CacheManager, null, null)
+ def this(sc: SparkContext) = this(sc, new CacheManager, null, null, true)
def this(sc: JavaSparkContext) = this(sc.sc)
import org.apache.spark.sql.hive.HiveContext._
@@ -105,7 +106,12 @@ class HiveContext private[hive](
* and Hive client (both of execution and metadata) with existing HiveContext.
*/
override def newSession(): HiveContext = {
- new HiveContext(sc, cacheManager, executionHive.newSession(), metadataHive.newSession())
+ new HiveContext(
+ sc = sc,
+ cacheManager = cacheManager,
+ execHive = executionHive.newSession(),
+ metaHive = metadataHive.newSession(),
+ isRootContext = false)
}
/**