aboutsummaryrefslogtreecommitdiff
path: root/repl
diff options
context:
space:
mode:
authorPrashant Sharma <prashant@apache.org>2016-06-19 20:12:00 +0100
committerSean Owen <sowen@cloudera.com>2016-06-19 20:12:00 +0100
commit1b3a9b966a7813e2406dfb020e83605af22f9ef3 (patch)
treef0f27fa3095a5cb3208f882ad141070fb80e3ae7 /repl
parent001a58960311b07fe80e2f01e473f4987948d06e (diff)
downloadspark-1b3a9b966a7813e2406dfb020e83605af22f9ef3.tar.gz
spark-1b3a9b966a7813e2406dfb020e83605af22f9ef3.tar.bz2
spark-1b3a9b966a7813e2406dfb020e83605af22f9ef3.zip
[SPARK-15942][REPL] Unblock `:reset` command in REPL.
## What changes were proposed in this pull (Paste from JIRA issue.) As a follow up for SPARK-15697, I have following semantics for `:reset` command. On `:reset` we forget all that user has done but not the initialization of spark. To avoid confusion or make it more clear, we show the message `spark` and `sc` are not erased, infact they are in same state as they were left by previous operations done by the user. While doing above, somewhere I felt that this is not usually what reset means. But an accidental shutdown of a cluster can be very costly, so may be in that sense this is less surprising and still useful. ## How was this patch tested? Manually, by calling `:reset` command, by both altering the state of SparkContext and creating some local variables. Author: Prashant Sharma <prashant@apache.org> Author: Prashant Sharma <prashsh1@in.ibm.com> Closes #13661 from ScrapCodes/repl-reset-command.
Diffstat (limited to 'repl')
-rw-r--r--repl/scala-2.11/src/main/scala/org/apache/spark/repl/SparkILoop.scala16
-rw-r--r--repl/scala-2.11/src/test/scala/org/apache/spark/repl/ReplSuite.scala3
2 files changed, 16 insertions, 3 deletions
diff --git a/repl/scala-2.11/src/main/scala/org/apache/spark/repl/SparkILoop.scala b/repl/scala-2.11/src/main/scala/org/apache/spark/repl/SparkILoop.scala
index dcf3209ae7..2707b0847a 100644
--- a/repl/scala-2.11/src/main/scala/org/apache/spark/repl/SparkILoop.scala
+++ b/repl/scala-2.11/src/main/scala/org/apache/spark/repl/SparkILoop.scala
@@ -36,7 +36,11 @@ class SparkILoop(in0: Option[BufferedReader], out: JPrintWriter)
def initializeSpark() {
intp.beQuietDuring {
processLine("""
- @transient val spark = org.apache.spark.repl.Main.createSparkSession()
+ @transient val spark = if (org.apache.spark.repl.Main.sparkSession != null) {
+ org.apache.spark.repl.Main.sparkSession
+ } else {
+ org.apache.spark.repl.Main.createSparkSession()
+ }
@transient val sc = {
val _sc = spark.sparkContext
_sc.uiWebUrl.foreach(webUrl => println(s"Spark context Web UI available at ${webUrl}"))
@@ -50,6 +54,7 @@ class SparkILoop(in0: Option[BufferedReader], out: JPrintWriter)
processLine("import spark.implicits._")
processLine("import spark.sql")
processLine("import org.apache.spark.sql.functions._")
+ replayCommandStack = Nil // remove above commands from session history.
}
}
@@ -70,7 +75,8 @@ class SparkILoop(in0: Option[BufferedReader], out: JPrintWriter)
echo("Type :help for more information.")
}
- private val blockedCommands = Set[String]("reset")
+ /** Add repl commands that needs to be blocked. e.g. reset */
+ private val blockedCommands = Set[String]()
/** Standard commands */
lazy val sparkStandardCommands: List[SparkILoop.this.LoopCommand] =
@@ -88,6 +94,12 @@ class SparkILoop(in0: Option[BufferedReader], out: JPrintWriter)
initializeSpark()
super.loadFiles(settings)
}
+
+ override def resetCommand(line: String): Unit = {
+ super.resetCommand(line)
+ initializeSpark()
+ echo("Note that after :reset, state of SparkSession and SparkContext is unchanged.")
+ }
}
object SparkILoop {
diff --git a/repl/scala-2.11/src/test/scala/org/apache/spark/repl/ReplSuite.scala b/repl/scala-2.11/src/test/scala/org/apache/spark/repl/ReplSuite.scala
index 2444e93d9a..c10db947bc 100644
--- a/repl/scala-2.11/src/test/scala/org/apache/spark/repl/ReplSuite.scala
+++ b/repl/scala-2.11/src/test/scala/org/apache/spark/repl/ReplSuite.scala
@@ -49,7 +49,8 @@ class ReplSuite extends SparkFunSuite {
val oldExecutorClasspath = System.getProperty(CONF_EXECUTOR_CLASSPATH)
System.setProperty(CONF_EXECUTOR_CLASSPATH, classpath)
-
+ Main.sparkContext = null
+ Main.sparkSession = null // causes recreation of SparkContext for each test.
Main.conf.set("spark.master", master)
Main.doMain(Array("-classpath", classpath), new SparkILoop(in, new PrintWriter(out)))