diff options
author | Jeff Zhang <zjffdu@apache.org> | 2016-05-25 10:46:51 -0700 |
---|---|---|
committer | Andrew Or <andrew@databricks.com> | 2016-05-25 10:46:51 -0700 |
commit | 01e7b9c85bb84924e279021f9748774dce9702c8 (patch) | |
tree | 8c360803e35f42b64c9a98f3baa7eaad4a4f5eb1 | |
parent | b120fba6ae26186b3fa0dfbb1637046f4e76c2b0 (diff) | |
download | spark-01e7b9c85bb84924e279021f9748774dce9702c8.tar.gz spark-01e7b9c85bb84924e279021f9748774dce9702c8.tar.bz2 spark-01e7b9c85bb84924e279021f9748774dce9702c8.zip |
[SPARK-15345][SQL][PYSPARK] SparkSession's conf doesn't take effect when this already an existing SparkContext
## What changes were proposed in this pull request?
Override the existing SparkContext is the provided SparkConf is different. PySpark part hasn't been fixed yet, will do that after the first round of review to ensure this is the correct approach.
## How was this patch tested?
Manually verify it in spark-shell.
rxin Please help review it, I think this is a very critical issue for spark 2.0
Author: Jeff Zhang <zjffdu@apache.org>
Closes #13160 from zjffdu/SPARK-15345.
3 files changed, 28 insertions, 3 deletions
diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala index 36aa3becb4..5018eb38d9 100644 --- a/core/src/main/scala/org/apache/spark/SparkContext.scala +++ b/core/src/main/scala/org/apache/spark/SparkContext.scala @@ -2254,6 +2254,9 @@ object SparkContext extends Logging { if (activeContext.get() == null) { setActiveContext(new SparkContext(config), allowMultipleContexts = false) } + if (config.getAll.nonEmpty) { + logWarning("Use an existing SparkContext, some configuration may not take effect.") + } activeContext.get() } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala b/sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala index 5c87c84418..86c97b92df 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala @@ -636,7 +636,7 @@ object SparkSession { /** * Builder for [[SparkSession]]. */ - class Builder { + class Builder extends Logging { private[this] val options = new scala.collection.mutable.HashMap[String, String] @@ -753,6 +753,9 @@ object SparkSession { var session = activeThreadSession.get() if ((session ne null) && !session.sparkContext.isStopped) { options.foreach { case (k, v) => session.conf.set(k, v) } + if (options.nonEmpty) { + logWarning("Use an existing SparkSession, some configuration may not take effect.") + } return session } @@ -762,6 +765,9 @@ object SparkSession { session = defaultSession.get() if ((session ne null) && !session.sparkContext.isStopped) { options.foreach { case (k, v) => session.conf.set(k, v) } + if (options.nonEmpty) { + logWarning("Use an existing SparkSession, some configuration may not take effect.") + } return session } @@ -774,7 +780,11 @@ object SparkSession { val sparkConf = new SparkConf() options.foreach { case (k, v) => sparkConf.set(k, v) } - SparkContext.getOrCreate(sparkConf) + val sc = SparkContext.getOrCreate(sparkConf) + // maybe this is an existing SparkContext, update its SparkConf which maybe used + // by SparkSession + options.foreach { case (k, v) => sc.conf.set(k, v) } + sc } session = new SparkSession(sparkContext) options.foreach { case (k, v) => session.conf.set(k, v) } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/SparkSessionBuilderSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/SparkSessionBuilderSuite.scala index ec6a2b3575..786956df8a 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/SparkSessionBuilderSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/SparkSessionBuilderSuite.scala @@ -17,7 +17,7 @@ package org.apache.spark.sql -import org.apache.spark.{SparkContext, SparkFunSuite} +import org.apache.spark.{SparkConf, SparkContext, SparkFunSuite} /** * Test cases for the builder pattern of [[SparkSession]]. @@ -90,4 +90,16 @@ class SparkSessionBuilderSuite extends SparkFunSuite { assert(newSession != activeSession) newSession.stop() } + + test("create SparkContext first then SparkSession") { + sparkContext.stop() + val conf = new SparkConf().setAppName("test").setMaster("local").set("key1", "value1") + val sparkContext2 = new SparkContext(conf) + val session = SparkSession.builder().config("key2", "value2").getOrCreate() + assert(session.conf.get("key1") == "value1") + assert(session.conf.get("key2") == "value2") + assert(session.sparkContext.conf.get("key1") == "value1") + assert(session.sparkContext.conf.get("key2") == "value2") + session.stop() + } } |