aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorJeff Zhang <zjffdu@apache.org>2016-05-25 10:46:51 -0700
committerAndrew Or <andrew@databricks.com>2016-05-25 10:46:51 -0700
commit01e7b9c85bb84924e279021f9748774dce9702c8 (patch)
tree8c360803e35f42b64c9a98f3baa7eaad4a4f5eb1
parentb120fba6ae26186b3fa0dfbb1637046f4e76c2b0 (diff)
downloadspark-01e7b9c85bb84924e279021f9748774dce9702c8.tar.gz
spark-01e7b9c85bb84924e279021f9748774dce9702c8.tar.bz2
spark-01e7b9c85bb84924e279021f9748774dce9702c8.zip
[SPARK-15345][SQL][PYSPARK] SparkSession's conf doesn't take effect when this already an existing SparkContext
## What changes were proposed in this pull request? Override the existing SparkContext is the provided SparkConf is different. PySpark part hasn't been fixed yet, will do that after the first round of review to ensure this is the correct approach. ## How was this patch tested? Manually verify it in spark-shell. rxin Please help review it, I think this is a very critical issue for spark 2.0 Author: Jeff Zhang <zjffdu@apache.org> Closes #13160 from zjffdu/SPARK-15345.
-rw-r--r--core/src/main/scala/org/apache/spark/SparkContext.scala3
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala14
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/SparkSessionBuilderSuite.scala14
3 files changed, 28 insertions, 3 deletions
diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala
index 36aa3becb4..5018eb38d9 100644
--- a/core/src/main/scala/org/apache/spark/SparkContext.scala
+++ b/core/src/main/scala/org/apache/spark/SparkContext.scala
@@ -2254,6 +2254,9 @@ object SparkContext extends Logging {
if (activeContext.get() == null) {
setActiveContext(new SparkContext(config), allowMultipleContexts = false)
}
+ if (config.getAll.nonEmpty) {
+ logWarning("Use an existing SparkContext, some configuration may not take effect.")
+ }
activeContext.get()
}
}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala b/sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala
index 5c87c84418..86c97b92df 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala
@@ -636,7 +636,7 @@ object SparkSession {
/**
* Builder for [[SparkSession]].
*/
- class Builder {
+ class Builder extends Logging {
private[this] val options = new scala.collection.mutable.HashMap[String, String]
@@ -753,6 +753,9 @@ object SparkSession {
var session = activeThreadSession.get()
if ((session ne null) && !session.sparkContext.isStopped) {
options.foreach { case (k, v) => session.conf.set(k, v) }
+ if (options.nonEmpty) {
+ logWarning("Use an existing SparkSession, some configuration may not take effect.")
+ }
return session
}
@@ -762,6 +765,9 @@ object SparkSession {
session = defaultSession.get()
if ((session ne null) && !session.sparkContext.isStopped) {
options.foreach { case (k, v) => session.conf.set(k, v) }
+ if (options.nonEmpty) {
+ logWarning("Use an existing SparkSession, some configuration may not take effect.")
+ }
return session
}
@@ -774,7 +780,11 @@ object SparkSession {
val sparkConf = new SparkConf()
options.foreach { case (k, v) => sparkConf.set(k, v) }
- SparkContext.getOrCreate(sparkConf)
+ val sc = SparkContext.getOrCreate(sparkConf)
+ // maybe this is an existing SparkContext, update its SparkConf which maybe used
+ // by SparkSession
+ options.foreach { case (k, v) => sc.conf.set(k, v) }
+ sc
}
session = new SparkSession(sparkContext)
options.foreach { case (k, v) => session.conf.set(k, v) }
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/SparkSessionBuilderSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/SparkSessionBuilderSuite.scala
index ec6a2b3575..786956df8a 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/SparkSessionBuilderSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/SparkSessionBuilderSuite.scala
@@ -17,7 +17,7 @@
package org.apache.spark.sql
-import org.apache.spark.{SparkContext, SparkFunSuite}
+import org.apache.spark.{SparkConf, SparkContext, SparkFunSuite}
/**
* Test cases for the builder pattern of [[SparkSession]].
@@ -90,4 +90,16 @@ class SparkSessionBuilderSuite extends SparkFunSuite {
assert(newSession != activeSession)
newSession.stop()
}
+
+ test("create SparkContext first then SparkSession") {
+ sparkContext.stop()
+ val conf = new SparkConf().setAppName("test").setMaster("local").set("key1", "value1")
+ val sparkContext2 = new SparkContext(conf)
+ val session = SparkSession.builder().config("key2", "value2").getOrCreate()
+ assert(session.conf.get("key1") == "value1")
+ assert(session.conf.get("key2") == "value2")
+ assert(session.sparkContext.conf.get("key1") == "value1")
+ assert(session.sparkContext.conf.get("key2") == "value2")
+ session.stop()
+ }
}