diff options
author | Wenchen Fan <wenchen@databricks.com> | 2016-05-04 14:40:54 -0700 |
---|---|---|
committer | Andrew Or <andrew@databricks.com> | 2016-05-04 14:40:54 -0700 |
commit | a432a2b86081a18cebf4085cead702436960f6c7 (patch) | |
tree | 84f12ce725fe5e2bb8e338b3d1aec164e61d6dd8 /repl/scala-2.11/src | |
parent | eb019af9a9cadb127eab1b6d30312169ed90f808 (diff) | |
download | spark-a432a2b86081a18cebf4085cead702436960f6c7.tar.gz spark-a432a2b86081a18cebf4085cead702436960f6c7.tar.bz2 spark-a432a2b86081a18cebf4085cead702436960f6c7.zip |
[SPARK-15116] In REPL we should create SparkSession first and get SparkContext from it
## What changes were proposed in this pull request?
see https://github.com/apache/spark/pull/12873#discussion_r61993910. The problem is, if we create `SparkContext` first and then call `SparkSession.builder.enableHiveSupport().getOrCreate()`, we will reuse the existing `SparkContext` and the hive flag won't be set.
## How was this patch tested?
verified it locally.
Author: Wenchen Fan <wenchen@databricks.com>
Closes #12890 from cloud-fan/repl.
Diffstat (limited to 'repl/scala-2.11/src')
-rw-r--r-- | repl/scala-2.11/src/main/scala/org/apache/spark/repl/Main.scala | 27 | ||||
-rw-r--r-- | repl/scala-2.11/src/main/scala/org/apache/spark/repl/SparkILoop.scala | 11 |
2 files changed, 15 insertions, 23 deletions
diff --git a/repl/scala-2.11/src/main/scala/org/apache/spark/repl/Main.scala b/repl/scala-2.11/src/main/scala/org/apache/spark/repl/Main.scala index a171759809..005edda2be 100644 --- a/repl/scala-2.11/src/main/scala/org/apache/spark/repl/Main.scala +++ b/repl/scala-2.11/src/main/scala/org/apache/spark/repl/Main.scala @@ -71,35 +71,32 @@ object Main extends Logging { } } - def createSparkContext(): SparkContext = { + def createSparkSession(): SparkSession = { val execUri = System.getenv("SPARK_EXECUTOR_URI") conf.setIfMissing("spark.app.name", "Spark shell") - // SparkContext will detect this configuration and register it with the RpcEnv's - // file server, setting spark.repl.class.uri to the actual URI for executors to - // use. This is sort of ugly but since executors are started as part of SparkContext - // initialization in certain cases, there's an initialization order issue that prevents - // this from being set after SparkContext is instantiated. - .set("spark.repl.class.outputDir", outputDir.getAbsolutePath()) + // SparkContext will detect this configuration and register it with the RpcEnv's + // file server, setting spark.repl.class.uri to the actual URI for executors to + // use. This is sort of ugly but since executors are started as part of SparkContext + // initialization in certain cases, there's an initialization order issue that prevents + // this from being set after SparkContext is instantiated. + conf.set("spark.repl.class.outputDir", outputDir.getAbsolutePath()) if (execUri != null) { conf.set("spark.executor.uri", execUri) } if (System.getenv("SPARK_HOME") != null) { conf.setSparkHome(System.getenv("SPARK_HOME")) } - sparkContext = new SparkContext(conf) - logInfo("Created spark context..") - Signaling.cancelOnInterrupt(sparkContext) - sparkContext - } - def createSparkSession(): SparkSession = { + val builder = SparkSession.builder.config(conf) if (SparkSession.hiveClassesArePresent) { - sparkSession = SparkSession.builder.enableHiveSupport().getOrCreate() + sparkSession = builder.enableHiveSupport().getOrCreate() logInfo("Created Spark session with Hive support") } else { - sparkSession = SparkSession.builder.getOrCreate() + sparkSession = builder.getOrCreate() logInfo("Created Spark session") } + sparkContext = sparkSession.sparkContext + Signaling.cancelOnInterrupt(sparkContext) sparkSession } diff --git a/repl/scala-2.11/src/main/scala/org/apache/spark/repl/SparkILoop.scala b/repl/scala-2.11/src/main/scala/org/apache/spark/repl/SparkILoop.scala index d74b796531..bbdb992d8a 100644 --- a/repl/scala-2.11/src/main/scala/org/apache/spark/repl/SparkILoop.scala +++ b/repl/scala-2.11/src/main/scala/org/apache/spark/repl/SparkILoop.scala @@ -36,19 +36,14 @@ class SparkILoop(in0: Option[BufferedReader], out: JPrintWriter) def initializeSpark() { intp.beQuietDuring { processLine(""" + @transient val spark = org.apache.spark.repl.Main.createSparkSession() @transient val sc = { - val _sc = org.apache.spark.repl.Main.createSparkContext() + val _sc = spark.sparkContext _sc.uiWebUrl.foreach(webUrl => println(s"Spark context Web UI available at ${webUrl}")) println("Spark context available as 'sc' " + s"(master = ${_sc.master}, app id = ${_sc.applicationId}).") - _sc - } - """) - processLine(""" - @transient val spark = { - val _session = org.apache.spark.repl.Main.createSparkSession() println("Spark session available as 'spark'.") - _session + _sc } """) processLine("import org.apache.spark.SparkContext._") |