From a432a2b86081a18cebf4085cead702436960f6c7 Mon Sep 17 00:00:00 2001 From: Wenchen Fan Date: Wed, 4 May 2016 14:40:54 -0700 Subject: [SPARK-15116] In REPL we should create SparkSession first and get SparkContext from it ## What changes were proposed in this pull request? see https://github.com/apache/spark/pull/12873#discussion_r61993910. The problem is, if we create `SparkContext` first and then call `SparkSession.builder.enableHiveSupport().getOrCreate()`, we will reuse the existing `SparkContext` and the hive flag won't be set. ## How was this patch tested? verified it locally. Author: Wenchen Fan Closes #12890 from cloud-fan/repl. --- .../scala/org/apache/spark/repl/SparkILoop.scala | 20 ++++++++------------ .../scala/org/apache/spark/repl/SparkILoopInit.scala | 11 +++-------- 2 files changed, 11 insertions(+), 20 deletions(-) (limited to 'repl/scala-2.10') diff --git a/repl/scala-2.10/src/main/scala/org/apache/spark/repl/SparkILoop.scala b/repl/scala-2.10/src/main/scala/org/apache/spark/repl/SparkILoop.scala index c4f64505a2..b1e95d8fdb 100644 --- a/repl/scala-2.10/src/main/scala/org/apache/spark/repl/SparkILoop.scala +++ b/repl/scala-2.10/src/main/scala/org/apache/spark/repl/SparkILoop.scala @@ -1003,7 +1003,7 @@ class SparkILoop( // NOTE: Must be public for visibility @DeveloperApi - def createSparkContext(): SparkContext = { + def createSparkSession(): SparkSession = { val execUri = System.getenv("SPARK_EXECUTOR_URI") val jars = SparkILoop.getAddedJars val conf = new SparkConf() @@ -1019,22 +1019,18 @@ class SparkILoop( if (execUri != null) { conf.set("spark.executor.uri", execUri) } - sparkContext = new SparkContext(conf) - logInfo("Created spark context..") - Signaling.cancelOnInterrupt(sparkContext) - sparkContext - } - @DeveloperApi - // TODO: don't duplicate this code - def createSparkSession(): SparkSession = { - if (SparkSession.hiveClassesArePresent) { + val builder = SparkSession.builder.config(conf) + val sparkSession = if (SparkSession.hiveClassesArePresent) { logInfo("Creating Spark session with Hive support") - SparkSession.builder.enableHiveSupport().getOrCreate() + builder.enableHiveSupport().getOrCreate() } else { logInfo("Creating Spark session") - SparkSession.builder.getOrCreate() + builder.getOrCreate() } + sparkContext = sparkSession.sparkContext + Signaling.cancelOnInterrupt(sparkContext) + sparkSession } private def getMaster(): String = { diff --git a/repl/scala-2.10/src/main/scala/org/apache/spark/repl/SparkILoopInit.scala b/repl/scala-2.10/src/main/scala/org/apache/spark/repl/SparkILoopInit.scala index f1febb9497..29f63de8a0 100644 --- a/repl/scala-2.10/src/main/scala/org/apache/spark/repl/SparkILoopInit.scala +++ b/repl/scala-2.10/src/main/scala/org/apache/spark/repl/SparkILoopInit.scala @@ -123,19 +123,14 @@ private[repl] trait SparkILoopInit { def initializeSpark() { intp.beQuietDuring { command(""" + @transient val spark = org.apache.spark.repl.Main.interp.createSparkSession() @transient val sc = { - val _sc = org.apache.spark.repl.Main.interp.createSparkContext() + val _sc = spark.sparkContext _sc.uiWebUrl.foreach(webUrl => println(s"Spark context Web UI available at ${webUrl}")) println("Spark context available as 'sc' " + s"(master = ${_sc.master}, app id = ${_sc.applicationId}).") - _sc - } - """) - command(""" - @transient val spark = { - val _session = org.apache.spark.repl.Main.interp.createSparkSession() println("Spark session available as 'spark'.") - _session + _sc } """) command("import org.apache.spark.SparkContext._") -- cgit v1.2.3