aboutsummaryrefslogtreecommitdiff
path: root/repl
diff options
context:
space:
mode:
authorWenchen Fan <wenchen@databricks.com>2016-05-04 14:40:54 -0700
committerAndrew Or <andrew@databricks.com>2016-05-04 14:40:54 -0700
commita432a2b86081a18cebf4085cead702436960f6c7 (patch)
tree84f12ce725fe5e2bb8e338b3d1aec164e61d6dd8 /repl
parenteb019af9a9cadb127eab1b6d30312169ed90f808 (diff)
downloadspark-a432a2b86081a18cebf4085cead702436960f6c7.tar.gz
spark-a432a2b86081a18cebf4085cead702436960f6c7.tar.bz2
spark-a432a2b86081a18cebf4085cead702436960f6c7.zip
[SPARK-15116] In REPL we should create SparkSession first and get SparkContext from it
## What changes were proposed in this pull request? see https://github.com/apache/spark/pull/12873#discussion_r61993910. The problem is, if we create `SparkContext` first and then call `SparkSession.builder.enableHiveSupport().getOrCreate()`, we will reuse the existing `SparkContext` and the hive flag won't be set. ## How was this patch tested? verified it locally. Author: Wenchen Fan <wenchen@databricks.com> Closes #12890 from cloud-fan/repl.
Diffstat (limited to 'repl')
-rw-r--r--repl/scala-2.10/src/main/scala/org/apache/spark/repl/SparkILoop.scala20
-rw-r--r--repl/scala-2.10/src/main/scala/org/apache/spark/repl/SparkILoopInit.scala11
-rw-r--r--repl/scala-2.11/src/main/scala/org/apache/spark/repl/Main.scala27
-rw-r--r--repl/scala-2.11/src/main/scala/org/apache/spark/repl/SparkILoop.scala11
4 files changed, 26 insertions, 43 deletions
diff --git a/repl/scala-2.10/src/main/scala/org/apache/spark/repl/SparkILoop.scala b/repl/scala-2.10/src/main/scala/org/apache/spark/repl/SparkILoop.scala
index c4f64505a2..b1e95d8fdb 100644
--- a/repl/scala-2.10/src/main/scala/org/apache/spark/repl/SparkILoop.scala
+++ b/repl/scala-2.10/src/main/scala/org/apache/spark/repl/SparkILoop.scala
@@ -1003,7 +1003,7 @@ class SparkILoop(
// NOTE: Must be public for visibility
@DeveloperApi
- def createSparkContext(): SparkContext = {
+ def createSparkSession(): SparkSession = {
val execUri = System.getenv("SPARK_EXECUTOR_URI")
val jars = SparkILoop.getAddedJars
val conf = new SparkConf()
@@ -1019,22 +1019,18 @@ class SparkILoop(
if (execUri != null) {
conf.set("spark.executor.uri", execUri)
}
- sparkContext = new SparkContext(conf)
- logInfo("Created spark context..")
- Signaling.cancelOnInterrupt(sparkContext)
- sparkContext
- }
- @DeveloperApi
- // TODO: don't duplicate this code
- def createSparkSession(): SparkSession = {
- if (SparkSession.hiveClassesArePresent) {
+ val builder = SparkSession.builder.config(conf)
+ val sparkSession = if (SparkSession.hiveClassesArePresent) {
logInfo("Creating Spark session with Hive support")
- SparkSession.builder.enableHiveSupport().getOrCreate()
+ builder.enableHiveSupport().getOrCreate()
} else {
logInfo("Creating Spark session")
- SparkSession.builder.getOrCreate()
+ builder.getOrCreate()
}
+ sparkContext = sparkSession.sparkContext
+ Signaling.cancelOnInterrupt(sparkContext)
+ sparkSession
}
private def getMaster(): String = {
diff --git a/repl/scala-2.10/src/main/scala/org/apache/spark/repl/SparkILoopInit.scala b/repl/scala-2.10/src/main/scala/org/apache/spark/repl/SparkILoopInit.scala
index f1febb9497..29f63de8a0 100644
--- a/repl/scala-2.10/src/main/scala/org/apache/spark/repl/SparkILoopInit.scala
+++ b/repl/scala-2.10/src/main/scala/org/apache/spark/repl/SparkILoopInit.scala
@@ -123,19 +123,14 @@ private[repl] trait SparkILoopInit {
def initializeSpark() {
intp.beQuietDuring {
command("""
+ @transient val spark = org.apache.spark.repl.Main.interp.createSparkSession()
@transient val sc = {
- val _sc = org.apache.spark.repl.Main.interp.createSparkContext()
+ val _sc = spark.sparkContext
_sc.uiWebUrl.foreach(webUrl => println(s"Spark context Web UI available at ${webUrl}"))
println("Spark context available as 'sc' " +
s"(master = ${_sc.master}, app id = ${_sc.applicationId}).")
- _sc
- }
- """)
- command("""
- @transient val spark = {
- val _session = org.apache.spark.repl.Main.interp.createSparkSession()
println("Spark session available as 'spark'.")
- _session
+ _sc
}
""")
command("import org.apache.spark.SparkContext._")
diff --git a/repl/scala-2.11/src/main/scala/org/apache/spark/repl/Main.scala b/repl/scala-2.11/src/main/scala/org/apache/spark/repl/Main.scala
index a171759809..005edda2be 100644
--- a/repl/scala-2.11/src/main/scala/org/apache/spark/repl/Main.scala
+++ b/repl/scala-2.11/src/main/scala/org/apache/spark/repl/Main.scala
@@ -71,35 +71,32 @@ object Main extends Logging {
}
}
- def createSparkContext(): SparkContext = {
+ def createSparkSession(): SparkSession = {
val execUri = System.getenv("SPARK_EXECUTOR_URI")
conf.setIfMissing("spark.app.name", "Spark shell")
- // SparkContext will detect this configuration and register it with the RpcEnv's
- // file server, setting spark.repl.class.uri to the actual URI for executors to
- // use. This is sort of ugly but since executors are started as part of SparkContext
- // initialization in certain cases, there's an initialization order issue that prevents
- // this from being set after SparkContext is instantiated.
- .set("spark.repl.class.outputDir", outputDir.getAbsolutePath())
+ // SparkContext will detect this configuration and register it with the RpcEnv's
+ // file server, setting spark.repl.class.uri to the actual URI for executors to
+ // use. This is sort of ugly but since executors are started as part of SparkContext
+ // initialization in certain cases, there's an initialization order issue that prevents
+ // this from being set after SparkContext is instantiated.
+ conf.set("spark.repl.class.outputDir", outputDir.getAbsolutePath())
if (execUri != null) {
conf.set("spark.executor.uri", execUri)
}
if (System.getenv("SPARK_HOME") != null) {
conf.setSparkHome(System.getenv("SPARK_HOME"))
}
- sparkContext = new SparkContext(conf)
- logInfo("Created spark context..")
- Signaling.cancelOnInterrupt(sparkContext)
- sparkContext
- }
- def createSparkSession(): SparkSession = {
+ val builder = SparkSession.builder.config(conf)
if (SparkSession.hiveClassesArePresent) {
- sparkSession = SparkSession.builder.enableHiveSupport().getOrCreate()
+ sparkSession = builder.enableHiveSupport().getOrCreate()
logInfo("Created Spark session with Hive support")
} else {
- sparkSession = SparkSession.builder.getOrCreate()
+ sparkSession = builder.getOrCreate()
logInfo("Created Spark session")
}
+ sparkContext = sparkSession.sparkContext
+ Signaling.cancelOnInterrupt(sparkContext)
sparkSession
}
diff --git a/repl/scala-2.11/src/main/scala/org/apache/spark/repl/SparkILoop.scala b/repl/scala-2.11/src/main/scala/org/apache/spark/repl/SparkILoop.scala
index d74b796531..bbdb992d8a 100644
--- a/repl/scala-2.11/src/main/scala/org/apache/spark/repl/SparkILoop.scala
+++ b/repl/scala-2.11/src/main/scala/org/apache/spark/repl/SparkILoop.scala
@@ -36,19 +36,14 @@ class SparkILoop(in0: Option[BufferedReader], out: JPrintWriter)
def initializeSpark() {
intp.beQuietDuring {
processLine("""
+ @transient val spark = org.apache.spark.repl.Main.createSparkSession()
@transient val sc = {
- val _sc = org.apache.spark.repl.Main.createSparkContext()
+ val _sc = spark.sparkContext
_sc.uiWebUrl.foreach(webUrl => println(s"Spark context Web UI available at ${webUrl}"))
println("Spark context available as 'sc' " +
s"(master = ${_sc.master}, app id = ${_sc.applicationId}).")
- _sc
- }
- """)
- processLine("""
- @transient val spark = {
- val _session = org.apache.spark.repl.Main.createSparkSession()
println("Spark session available as 'spark'.")
- _session
+ _sc
}
""")
processLine("import org.apache.spark.SparkContext._")