From a432a2b86081a18cebf4085cead702436960f6c7 Mon Sep 17 00:00:00 2001 From: Wenchen Fan Date: Wed, 4 May 2016 14:40:54 -0700 Subject: [SPARK-15116] In REPL we should create SparkSession first and get SparkContext from it ## What changes were proposed in this pull request? see https://github.com/apache/spark/pull/12873#discussion_r61993910. The problem is, if we create `SparkContext` first and then call `SparkSession.builder.enableHiveSupport().getOrCreate()`, we will reuse the existing `SparkContext` and the hive flag won't be set. ## How was this patch tested? verified it locally. Author: Wenchen Fan Closes #12890 from cloud-fan/repl. --- .../scala/org/apache/spark/repl/SparkILoop.scala | 20 +++++++--------- .../org/apache/spark/repl/SparkILoopInit.scala | 11 +++------ .../main/scala/org/apache/spark/repl/Main.scala | 27 ++++++++++------------ .../scala/org/apache/spark/repl/SparkILoop.scala | 11 +++------ 4 files changed, 26 insertions(+), 43 deletions(-) (limited to 'repl') diff --git a/repl/scala-2.10/src/main/scala/org/apache/spark/repl/SparkILoop.scala b/repl/scala-2.10/src/main/scala/org/apache/spark/repl/SparkILoop.scala index c4f64505a2..b1e95d8fdb 100644 --- a/repl/scala-2.10/src/main/scala/org/apache/spark/repl/SparkILoop.scala +++ b/repl/scala-2.10/src/main/scala/org/apache/spark/repl/SparkILoop.scala @@ -1003,7 +1003,7 @@ class SparkILoop( // NOTE: Must be public for visibility @DeveloperApi - def createSparkContext(): SparkContext = { + def createSparkSession(): SparkSession = { val execUri = System.getenv("SPARK_EXECUTOR_URI") val jars = SparkILoop.getAddedJars val conf = new SparkConf() @@ -1019,22 +1019,18 @@ class SparkILoop( if (execUri != null) { conf.set("spark.executor.uri", execUri) } - sparkContext = new SparkContext(conf) - logInfo("Created spark context..") - Signaling.cancelOnInterrupt(sparkContext) - sparkContext - } - @DeveloperApi - // TODO: don't duplicate this code - def createSparkSession(): SparkSession = { - if (SparkSession.hiveClassesArePresent) { + val builder = SparkSession.builder.config(conf) + val sparkSession = if (SparkSession.hiveClassesArePresent) { logInfo("Creating Spark session with Hive support") - SparkSession.builder.enableHiveSupport().getOrCreate() + builder.enableHiveSupport().getOrCreate() } else { logInfo("Creating Spark session") - SparkSession.builder.getOrCreate() + builder.getOrCreate() } + sparkContext = sparkSession.sparkContext + Signaling.cancelOnInterrupt(sparkContext) + sparkSession } private def getMaster(): String = { diff --git a/repl/scala-2.10/src/main/scala/org/apache/spark/repl/SparkILoopInit.scala b/repl/scala-2.10/src/main/scala/org/apache/spark/repl/SparkILoopInit.scala index f1febb9497..29f63de8a0 100644 --- a/repl/scala-2.10/src/main/scala/org/apache/spark/repl/SparkILoopInit.scala +++ b/repl/scala-2.10/src/main/scala/org/apache/spark/repl/SparkILoopInit.scala @@ -123,19 +123,14 @@ private[repl] trait SparkILoopInit { def initializeSpark() { intp.beQuietDuring { command(""" + @transient val spark = org.apache.spark.repl.Main.interp.createSparkSession() @transient val sc = { - val _sc = org.apache.spark.repl.Main.interp.createSparkContext() + val _sc = spark.sparkContext _sc.uiWebUrl.foreach(webUrl => println(s"Spark context Web UI available at ${webUrl}")) println("Spark context available as 'sc' " + s"(master = ${_sc.master}, app id = ${_sc.applicationId}).") - _sc - } - """) - command(""" - @transient val spark = { - val _session = org.apache.spark.repl.Main.interp.createSparkSession() println("Spark session available as 'spark'.") - _session + _sc } """) command("import org.apache.spark.SparkContext._") diff --git a/repl/scala-2.11/src/main/scala/org/apache/spark/repl/Main.scala b/repl/scala-2.11/src/main/scala/org/apache/spark/repl/Main.scala index a171759809..005edda2be 100644 --- a/repl/scala-2.11/src/main/scala/org/apache/spark/repl/Main.scala +++ b/repl/scala-2.11/src/main/scala/org/apache/spark/repl/Main.scala @@ -71,35 +71,32 @@ object Main extends Logging { } } - def createSparkContext(): SparkContext = { + def createSparkSession(): SparkSession = { val execUri = System.getenv("SPARK_EXECUTOR_URI") conf.setIfMissing("spark.app.name", "Spark shell") - // SparkContext will detect this configuration and register it with the RpcEnv's - // file server, setting spark.repl.class.uri to the actual URI for executors to - // use. This is sort of ugly but since executors are started as part of SparkContext - // initialization in certain cases, there's an initialization order issue that prevents - // this from being set after SparkContext is instantiated. - .set("spark.repl.class.outputDir", outputDir.getAbsolutePath()) + // SparkContext will detect this configuration and register it with the RpcEnv's + // file server, setting spark.repl.class.uri to the actual URI for executors to + // use. This is sort of ugly but since executors are started as part of SparkContext + // initialization in certain cases, there's an initialization order issue that prevents + // this from being set after SparkContext is instantiated. + conf.set("spark.repl.class.outputDir", outputDir.getAbsolutePath()) if (execUri != null) { conf.set("spark.executor.uri", execUri) } if (System.getenv("SPARK_HOME") != null) { conf.setSparkHome(System.getenv("SPARK_HOME")) } - sparkContext = new SparkContext(conf) - logInfo("Created spark context..") - Signaling.cancelOnInterrupt(sparkContext) - sparkContext - } - def createSparkSession(): SparkSession = { + val builder = SparkSession.builder.config(conf) if (SparkSession.hiveClassesArePresent) { - sparkSession = SparkSession.builder.enableHiveSupport().getOrCreate() + sparkSession = builder.enableHiveSupport().getOrCreate() logInfo("Created Spark session with Hive support") } else { - sparkSession = SparkSession.builder.getOrCreate() + sparkSession = builder.getOrCreate() logInfo("Created Spark session") } + sparkContext = sparkSession.sparkContext + Signaling.cancelOnInterrupt(sparkContext) sparkSession } diff --git a/repl/scala-2.11/src/main/scala/org/apache/spark/repl/SparkILoop.scala b/repl/scala-2.11/src/main/scala/org/apache/spark/repl/SparkILoop.scala index d74b796531..bbdb992d8a 100644 --- a/repl/scala-2.11/src/main/scala/org/apache/spark/repl/SparkILoop.scala +++ b/repl/scala-2.11/src/main/scala/org/apache/spark/repl/SparkILoop.scala @@ -36,19 +36,14 @@ class SparkILoop(in0: Option[BufferedReader], out: JPrintWriter) def initializeSpark() { intp.beQuietDuring { processLine(""" + @transient val spark = org.apache.spark.repl.Main.createSparkSession() @transient val sc = { - val _sc = org.apache.spark.repl.Main.createSparkContext() + val _sc = spark.sparkContext _sc.uiWebUrl.foreach(webUrl => println(s"Spark context Web UI available at ${webUrl}")) println("Spark context available as 'sc' " + s"(master = ${_sc.master}, app id = ${_sc.applicationId}).") - _sc - } - """) - processLine(""" - @transient val spark = { - val _session = org.apache.spark.repl.Main.createSparkSession() println("Spark session available as 'spark'.") - _session + _sc } """) processLine("import org.apache.spark.SparkContext._") -- cgit v1.2.3