diff options
3 files changed, 28 insertions, 3 deletions
diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala index 36aa3becb4..5018eb38d9 100644 --- a/core/src/main/scala/org/apache/spark/SparkContext.scala +++ b/core/src/main/scala/org/apache/spark/SparkContext.scala @@ -2254,6 +2254,9 @@ object SparkContext extends Logging { if (activeContext.get() == null) { setActiveContext(new SparkContext(config), allowMultipleContexts = false) } + if (config.getAll.nonEmpty) { + logWarning("Use an existing SparkContext, some configuration may not take effect.") + } activeContext.get() } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala b/sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala index 5c87c84418..86c97b92df 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala @@ -636,7 +636,7 @@ object SparkSession { /** * Builder for [[SparkSession]]. */ - class Builder { + class Builder extends Logging { private[this] val options = new scala.collection.mutable.HashMap[String, String] @@ -753,6 +753,9 @@ object SparkSession { var session = activeThreadSession.get() if ((session ne null) && !session.sparkContext.isStopped) { options.foreach { case (k, v) => session.conf.set(k, v) } + if (options.nonEmpty) { + logWarning("Use an existing SparkSession, some configuration may not take effect.") + } return session } @@ -762,6 +765,9 @@ object SparkSession { session = defaultSession.get() if ((session ne null) && !session.sparkContext.isStopped) { options.foreach { case (k, v) => session.conf.set(k, v) } + if (options.nonEmpty) { + logWarning("Use an existing SparkSession, some configuration may not take effect.") + } return session } @@ -774,7 +780,11 @@ object SparkSession { val sparkConf = new SparkConf() options.foreach { case (k, v) => sparkConf.set(k, v) } - SparkContext.getOrCreate(sparkConf) + val sc = SparkContext.getOrCreate(sparkConf) + // maybe this is an existing SparkContext, update its SparkConf which maybe used + // by SparkSession + options.foreach { case (k, v) => sc.conf.set(k, v) } + sc } session = new SparkSession(sparkContext) options.foreach { case (k, v) => session.conf.set(k, v) } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/SparkSessionBuilderSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/SparkSessionBuilderSuite.scala index ec6a2b3575..786956df8a 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/SparkSessionBuilderSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/SparkSessionBuilderSuite.scala @@ -17,7 +17,7 @@ package org.apache.spark.sql -import org.apache.spark.{SparkContext, SparkFunSuite} +import org.apache.spark.{SparkConf, SparkContext, SparkFunSuite} /** * Test cases for the builder pattern of [[SparkSession]]. @@ -90,4 +90,16 @@ class SparkSessionBuilderSuite extends SparkFunSuite { assert(newSession != activeSession) newSession.stop() } + + test("create SparkContext first then SparkSession") { + sparkContext.stop() + val conf = new SparkConf().setAppName("test").setMaster("local").set("key1", "value1") + val sparkContext2 = new SparkContext(conf) + val session = SparkSession.builder().config("key2", "value2").getOrCreate() + assert(session.conf.get("key1") == "value1") + assert(session.conf.get("key2") == "value2") + assert(session.sparkContext.conf.get("key1") == "value1") + assert(session.sparkContext.conf.get("key2") == "value2") + session.stop() + } } |