From 1b3a9b966a7813e2406dfb020e83605af22f9ef3 Mon Sep 17 00:00:00 2001 From: Prashant Sharma Date: Sun, 19 Jun 2016 20:12:00 +0100 Subject: [SPARK-15942][REPL] Unblock `:reset` command in REPL. ## What changes were proposed in this pull (Paste from JIRA issue.) As a follow up for SPARK-15697, I have following semantics for `:reset` command. On `:reset` we forget all that user has done but not the initialization of spark. To avoid confusion or make it more clear, we show the message `spark` and `sc` are not erased, infact they are in same state as they were left by previous operations done by the user. While doing above, somewhere I felt that this is not usually what reset means. But an accidental shutdown of a cluster can be very costly, so may be in that sense this is less surprising and still useful. ## How was this patch tested? Manually, by calling `:reset` command, by both altering the state of SparkContext and creating some local variables. Author: Prashant Sharma Author: Prashant Sharma Closes #13661 from ScrapCodes/repl-reset-command. --- .../main/scala/org/apache/spark/repl/SparkILoop.scala | 16 ++++++++++++++-- .../src/test/scala/org/apache/spark/repl/ReplSuite.scala | 3 ++- 2 files changed, 16 insertions(+), 3 deletions(-) (limited to 'repl/scala-2.11/src') diff --git a/repl/scala-2.11/src/main/scala/org/apache/spark/repl/SparkILoop.scala b/repl/scala-2.11/src/main/scala/org/apache/spark/repl/SparkILoop.scala index dcf3209ae7..2707b0847a 100644 --- a/repl/scala-2.11/src/main/scala/org/apache/spark/repl/SparkILoop.scala +++ b/repl/scala-2.11/src/main/scala/org/apache/spark/repl/SparkILoop.scala @@ -36,7 +36,11 @@ class SparkILoop(in0: Option[BufferedReader], out: JPrintWriter) def initializeSpark() { intp.beQuietDuring { processLine(""" - @transient val spark = org.apache.spark.repl.Main.createSparkSession() + @transient val spark = if (org.apache.spark.repl.Main.sparkSession != null) { + org.apache.spark.repl.Main.sparkSession + } else { + org.apache.spark.repl.Main.createSparkSession() + } @transient val sc = { val _sc = spark.sparkContext _sc.uiWebUrl.foreach(webUrl => println(s"Spark context Web UI available at ${webUrl}")) @@ -50,6 +54,7 @@ class SparkILoop(in0: Option[BufferedReader], out: JPrintWriter) processLine("import spark.implicits._") processLine("import spark.sql") processLine("import org.apache.spark.sql.functions._") + replayCommandStack = Nil // remove above commands from session history. } } @@ -70,7 +75,8 @@ class SparkILoop(in0: Option[BufferedReader], out: JPrintWriter) echo("Type :help for more information.") } - private val blockedCommands = Set[String]("reset") + /** Add repl commands that needs to be blocked. e.g. reset */ + private val blockedCommands = Set[String]() /** Standard commands */ lazy val sparkStandardCommands: List[SparkILoop.this.LoopCommand] = @@ -88,6 +94,12 @@ class SparkILoop(in0: Option[BufferedReader], out: JPrintWriter) initializeSpark() super.loadFiles(settings) } + + override def resetCommand(line: String): Unit = { + super.resetCommand(line) + initializeSpark() + echo("Note that after :reset, state of SparkSession and SparkContext is unchanged.") + } } object SparkILoop { diff --git a/repl/scala-2.11/src/test/scala/org/apache/spark/repl/ReplSuite.scala b/repl/scala-2.11/src/test/scala/org/apache/spark/repl/ReplSuite.scala index 2444e93d9a..c10db947bc 100644 --- a/repl/scala-2.11/src/test/scala/org/apache/spark/repl/ReplSuite.scala +++ b/repl/scala-2.11/src/test/scala/org/apache/spark/repl/ReplSuite.scala @@ -49,7 +49,8 @@ class ReplSuite extends SparkFunSuite { val oldExecutorClasspath = System.getProperty(CONF_EXECUTOR_CLASSPATH) System.setProperty(CONF_EXECUTOR_CLASSPATH, classpath) - + Main.sparkContext = null + Main.sparkSession = null // causes recreation of SparkContext for each test. Main.conf.set("spark.master", master) Main.doMain(Array("-classpath", classpath), new SparkILoop(in, new PrintWriter(out))) -- cgit v1.2.3