aboutsummaryrefslogtreecommitdiff
path: root/repl
diff options
context:
space:
mode:
authorJoseph E. Gonzalez <joseph.e.gonzalez@gmail.com>2013-11-04 20:10:15 -0800
committerJoseph E. Gonzalez <joseph.e.gonzalez@gmail.com>2013-11-04 20:10:15 -0800
commit3c37928fab0801fa1e2662d873dac4b4f93c547d (patch)
treebb0aed55504e5e049110858a3c2a23ffc8ff297d /repl
parent99bfcc91e010ba29852ec7dd0b4270805b7b2377 (diff)
downloadspark-3c37928fab0801fa1e2662d873dac4b4f93c547d.tar.gz
spark-3c37928fab0801fa1e2662d873dac4b4f93c547d.tar.bz2
spark-3c37928fab0801fa1e2662d873dac4b4f93c547d.zip
This commit adds a new graphx-shell which is essentially the same as
the spark shell but with GraphX packages automatically imported and with Kryo serialization enabled for GraphX types. In addition the graphx-shell has a nifty new logo. To make these changes minimally invasive in the SparkILoop.scala I added some additional environment variables: SPARK_BANNER_TEXT: If set this string is displayed instead of the spark logo SPARK_SHELL_INIT_BLOCK: if set this expression is evaluated in the spark shell after the spark context is created.
Diffstat (limited to 'repl')
-rw-r--r--repl/src/main/scala/org/apache/spark/repl/SparkILoop.scala174
1 files changed, 91 insertions, 83 deletions
diff --git a/repl/src/main/scala/org/apache/spark/repl/SparkILoop.scala b/repl/src/main/scala/org/apache/spark/repl/SparkILoop.scala
index 0ced284da6..efdd90c47f 100644
--- a/repl/src/main/scala/org/apache/spark/repl/SparkILoop.scala
+++ b/repl/src/main/scala/org/apache/spark/repl/SparkILoop.scala
@@ -45,7 +45,7 @@ class SparkILoop(in0: Option[BufferedReader], val out: PrintWriter, val master:
def this(in0: BufferedReader, out: PrintWriter, master: String) = this(Some(in0), out, Some(master))
def this(in0: BufferedReader, out: PrintWriter) = this(Some(in0), out, None)
def this() = this(None, new PrintWriter(Console.out, true), None)
-
+
var in: InteractiveReader = _ // the input stream from which commands come
var settings: Settings = _
var intp: SparkIMain = _
@@ -56,16 +56,16 @@ class SparkILoop(in0: Option[BufferedReader], val out: PrintWriter, val master:
Power[g.type](this, g)
}
*/
-
+
// TODO
// object opt extends AestheticSettings
- //
+ //
@deprecated("Use `intp` instead.", "2.9.0")
def interpreter = intp
-
+
@deprecated("Use `intp` instead.", "2.9.0")
def interpreter_= (i: SparkIMain): Unit = intp = i
-
+
def history = in.history
/** The context class loader at the time this object was created */
@@ -75,7 +75,7 @@ class SparkILoop(in0: Option[BufferedReader], val out: PrintWriter, val master:
private val signallable =
/*if (isReplDebug) Signallable("Dump repl state.")(dumpCommand())
else*/ null
-
+
// classpath entries added via :cp
var addedClasspath: String = ""
@@ -87,10 +87,10 @@ class SparkILoop(in0: Option[BufferedReader], val out: PrintWriter, val master:
/** Record a command for replay should the user request a :replay */
def addReplay(cmd: String) = replayCommandStack ::= cmd
-
+
/** Try to install sigint handler: ignore failure. Signal handler
* will interrupt current line execution if any is in progress.
- *
+ *
* Attempting to protect the repl from accidental exit, we only honor
* a single ctrl-C if the current buffer is empty: otherwise we look
* for a second one within a short time.
@@ -124,7 +124,7 @@ class SparkILoop(in0: Option[BufferedReader], val out: PrintWriter, val master:
Thread.currentThread.setContextClassLoader(originalClassLoader)
}
}
-
+
class SparkILoopInterpreter extends SparkIMain(settings, out) {
override lazy val formatting = new Formatting {
def prompt = SparkILoop.this.prompt
@@ -135,7 +135,7 @@ class SparkILoop(in0: Option[BufferedReader], val out: PrintWriter, val master:
|// She's gone rogue, captain! Have to take her out!
|// Calling Thread.stop on runaway %s with offending code:
|// scala> %s""".stripMargin
-
+
echo(template.format(line.thread, line.code))
// XXX no way to suppress the deprecation warning
line.thread.stop()
@@ -151,7 +151,7 @@ class SparkILoop(in0: Option[BufferedReader], val out: PrintWriter, val master:
def createInterpreter() {
if (addedClasspath != "")
settings.classpath append addedClasspath
-
+
intp = new SparkILoopInterpreter
intp.setContextClassLoader()
installSigIntHandler()
@@ -168,10 +168,10 @@ class SparkILoop(in0: Option[BufferedReader], val out: PrintWriter, val master:
private def helpSummary() = {
val usageWidth = commands map (_.usageMsg.length) max
val formatStr = "%-" + usageWidth + "s %s %s"
-
+
echo("All commands can be abbreviated, e.g. :he instead of :help.")
echo("Those marked with a * have more detailed help, e.g. :help imports.\n")
-
+
commands foreach { cmd =>
val star = if (cmd.hasLongHelp) "*" else " "
echo(formatStr.format(cmd.usageMsg, star, cmd.help))
@@ -182,7 +182,7 @@ class SparkILoop(in0: Option[BufferedReader], val out: PrintWriter, val master:
case Nil => echo(cmd + ": no such command. Type :help for help.")
case xs => echo(cmd + " is ambiguous: did you mean " + xs.map(":" + _.name).mkString(" or ") + "?")
}
- Result(true, None)
+ Result(true, None)
}
private def matchingCommands(cmd: String) = commands filter (_.name startsWith cmd)
private def uniqueCommand(cmd: String): Option[LoopCommand] = {
@@ -193,31 +193,35 @@ class SparkILoop(in0: Option[BufferedReader], val out: PrintWriter, val master:
case xs => xs find (_.name == cmd)
}
}
-
+
/** Print a welcome message */
def printWelcome() {
- echo("""Welcome to
- ____ __
+ val prop = System.getenv("SPARK_BANNER_TEXT")
+ val bannerText =
+ if (prop != null) prop else
+ """Welcome to
+ ____ __
/ __/__ ___ _____/ /__
_\ \/ _ \/ _ `/ __/ '_/
/___/ .__/\_,_/_/ /_/\_\ version 0.9.0-SNAPSHOT
- /_/
-""")
+ /_/
+ """
+ echo(bannerText)
import Properties._
val welcomeMsg = "Using Scala %s (%s, Java %s)".format(
- versionString, javaVmName, javaVersion)
+ versionString, javaVmName, javaVersion)
echo(welcomeMsg)
}
-
+
/** Show the history */
lazy val historyCommand = new LoopCommand("history", "show the history (optional num is commands to show)") {
override def usage = "[num]"
def defaultLines = 20
-
+
def apply(line: String): Result = {
if (history eq NoHistory)
return "No history available."
-
+
val xs = words(line)
val current = history.index
val count = try xs.head.toInt catch { case _: Exception => defaultLines }
@@ -237,21 +241,21 @@ class SparkILoop(in0: Option[BufferedReader], val out: PrintWriter, val master:
out print msg
out.flush()
}
-
+
/** Search the history */
def searchHistory(_cmdline: String) {
val cmdline = _cmdline.toLowerCase
val offset = history.index - history.size + 1
-
+
for ((line, index) <- history.asStrings.zipWithIndex ; if line.toLowerCase contains cmdline)
echo("%d %s".format(index + offset, line))
}
-
+
private var currentPrompt = Properties.shellPromptString
def setPrompt(prompt: String) = currentPrompt = prompt
/** Prompt to print when awaiting input */
def prompt = currentPrompt
-
+
import LoopCommand.{ cmd, nullary }
/** Standard commands **/
@@ -273,7 +277,7 @@ class SparkILoop(in0: Option[BufferedReader], val out: PrintWriter, val master:
nullary("silent", "disable/enable automatic printing of results", verbosity),
cmd("type", "<expr>", "display the type of an expression without evaluating it", typeCommand)
)
-
+
/** Power user commands */
lazy val powerCommands: List[LoopCommand] = List(
//nullary("dump", "displays a view of the interpreter's internal state", dumpCommand),
@@ -298,10 +302,10 @@ class SparkILoop(in0: Option[BufferedReader], val out: PrintWriter, val master:
|An argument of clear will remove the wrapper if any is active.
|Note that wrappers do not compose (a new one replaces the old
|one) and also that the :phase command uses the same machinery,
- |so setting :wrap will clear any :phase setting.
+ |so setting :wrap will clear any :phase setting.
""".stripMargin.trim)
)
-
+
/*
private def dumpCommand(): Result = {
echo("" + power)
@@ -309,7 +313,7 @@ class SparkILoop(in0: Option[BufferedReader], val out: PrintWriter, val master:
in.redrawLine()
}
*/
-
+
private val typeTransforms = List(
"scala.collection.immutable." -> "immutable.",
"scala.collection.mutable." -> "mutable.",
@@ -317,7 +321,7 @@ class SparkILoop(in0: Option[BufferedReader], val out: PrintWriter, val master:
"java.lang." -> "jl.",
"scala.runtime." -> "runtime."
)
-
+
private def importsCommand(line: String): Result = {
val tokens = words(line)
val handlers = intp.languageWildcardHandlers ++ intp.importHandlers
@@ -333,7 +337,7 @@ class SparkILoop(in0: Option[BufferedReader], val out: PrintWriter, val master:
val implicitMsg = if (imps.isEmpty) "" else imps.size + " are implicit"
val foundMsg = if (found.isEmpty) "" else found.mkString(" // imports: ", ", ", "")
val statsMsg = List(typeMsg, termMsg, implicitMsg) filterNot (_ == "") mkString ("(", ", ", ")")
-
+
intp.reporter.printMessage("%2d) %-30s %s%s".format(
idx + 1,
handler.importString,
@@ -342,12 +346,12 @@ class SparkILoop(in0: Option[BufferedReader], val out: PrintWriter, val master:
))
}
}
-
+
private def implicitsCommand(line: String): Result = {
val intp = SparkILoop.this.intp
import intp._
import global.Symbol
-
+
def p(x: Any) = intp.reporter.printMessage("" + x)
// If an argument is given, only show a source with that
@@ -360,14 +364,14 @@ class SparkILoop(in0: Option[BufferedReader], val out: PrintWriter, val master:
else (args exists (source.name.toString contains _))
}
}
-
+
if (filtered.isEmpty)
return "No implicits have been imported other than those in Predef."
-
+
filtered foreach {
case (source, syms) =>
p("/* " + syms.size + " implicit members imported from " + source.fullName + " */")
-
+
// This groups the members by where the symbol is defined
val byOwner = syms groupBy (_.owner)
val sortedOwners = byOwner.toList sortBy { case (owner, _) => intp.afterTyper(source.info.baseClasses indexOf owner) }
@@ -388,10 +392,10 @@ class SparkILoop(in0: Option[BufferedReader], val out: PrintWriter, val master:
xss map (xs => xs sortBy (_.name.toString))
}
-
- val ownerMessage = if (owner == source) " defined in " else " inherited from "
+
+ val ownerMessage = if (owner == source) " defined in " else " inherited from "
p(" /* " + members.size + ownerMessage + owner.fullName + " */")
-
+
memberGroups foreach { group =>
group foreach (s => p(" " + intp.symbolDefString(s)))
p("")
@@ -400,7 +404,7 @@ class SparkILoop(in0: Option[BufferedReader], val out: PrintWriter, val master:
p("")
}
}
-
+
protected def newJavap() = new Javap(intp.classLoader, new SparkIMain.ReplStrippingWriter(intp)) {
override def tryClass(path: String): Array[Byte] = {
// Look for Foo first, then Foo$, but if Foo$ is given explicitly,
@@ -417,20 +421,20 @@ class SparkILoop(in0: Option[BufferedReader], val out: PrintWriter, val master:
private lazy val javap =
try newJavap()
catch { case _: Exception => null }
-
+
private def typeCommand(line: String): Result = {
intp.typeOfExpression(line) match {
case Some(tp) => tp.toString
case _ => "Failed to determine type."
}
}
-
+
private def javapCommand(line: String): Result = {
if (javap == null)
return ":javap unavailable on this platform."
if (line == "")
return ":javap [-lcsvp] [path1 path2 ...]"
-
+
javap(words(line)) foreach { res =>
if (res.isError) return "Failed: " + res.value
else res.show()
@@ -504,25 +508,25 @@ class SparkILoop(in0: Option[BufferedReader], val out: PrintWriter, val master:
}
else {
val what = phased.parse(name)
- if (what.isEmpty || !phased.set(what))
+ if (what.isEmpty || !phased.set(what))
"'" + name + "' does not appear to represent a valid phase."
else {
intp.setExecutionWrapper(pathToPhaseWrapper)
val activeMessage =
if (what.toString.length == name.length) "" + what
else "%s (%s)".format(what, name)
-
+
"Active phase is now: " + activeMessage
}
}
}
*/
-
+
/** Available commands */
def commands: List[LoopCommand] = standardCommands /* ++ (
if (isReplPower) powerCommands else Nil
)*/
-
+
val replayQuestionMessage =
"""|The repl compiler has crashed spectacularly. Shall I replay your
|session? I can re-run all lines except the last one.
@@ -579,10 +583,10 @@ class SparkILoop(in0: Option[BufferedReader], val out: PrintWriter, val master:
}
/** interpret all lines from a specified file */
- def interpretAllFrom(file: File) {
+ def interpretAllFrom(file: File) {
val oldIn = in
val oldReplay = replayCommandStack
-
+
try file applyReader { reader =>
in = SimpleReader(reader, out, false)
echo("Loading " + file + "...")
@@ -604,26 +608,26 @@ class SparkILoop(in0: Option[BufferedReader], val out: PrintWriter, val master:
echo("")
}
}
-
+
/** fork a shell and run a command */
lazy val shCommand = new LoopCommand("sh", "run a shell command (result is implicitly => List[String])") {
override def usage = "<command line>"
def apply(line: String): Result = line match {
case "" => showUsage()
- case _ =>
+ case _ =>
val toRun = classOf[ProcessResult].getName + "(" + string2codeQuoted(line) + ")"
intp interpret toRun
()
}
}
-
+
def withFile(filename: String)(action: File => Unit) {
val f = File(filename)
-
+
if (f.exists) action(f)
else echo("That file does not exist")
}
-
+
def loadCommand(arg: String) = {
var shouldReplay: Option[String] = None
withFile(arg)(f => {
@@ -657,7 +661,7 @@ class SparkILoop(in0: Option[BufferedReader], val out: PrintWriter, val master:
}
else echo("The path '" + f + "' doesn't seem to exist.")
}
-
+
def powerCmd(): Result = {
if (isReplPower) "Already in power mode."
else enablePowerMode()
@@ -667,13 +671,13 @@ class SparkILoop(in0: Option[BufferedReader], val out: PrintWriter, val master:
//power.unleash()
//echo(power.banner)
}
-
+
def verbosity() = {
val old = intp.printResults
intp.printResults = !old
echo("Switched " + (if (old) "off" else "on") + " result printing.")
}
-
+
/** Run one command submitted by the user. Two values are returned:
* (1) whether to keep running, (2) the line to record for replay,
* if any. */
@@ -688,11 +692,11 @@ class SparkILoop(in0: Option[BufferedReader], val out: PrintWriter, val master:
else if (intp.global == null) Result(false, None) // Notice failure to create compiler
else Result(true, interpretStartingWith(line))
}
-
+
private def readWhile(cond: String => Boolean) = {
Iterator continually in.readLine("") takeWhile (x => x != null && cond(x))
}
-
+
def pasteCommand(): Result = {
echo("// Entering paste mode (ctrl-D to finish)\n")
val code = readWhile(_ => true) mkString "\n"
@@ -700,17 +704,17 @@ class SparkILoop(in0: Option[BufferedReader], val out: PrintWriter, val master:
intp interpret code
()
}
-
+
private object paste extends Pasted {
val ContinueString = " | "
val PromptString = "scala> "
-
+
def interpret(line: String): Unit = {
echo(line.trim)
intp interpret line
echo("")
}
-
+
def transcript(start: String) = {
// Printing this message doesn't work very well because it's buried in the
// transcript they just pasted. Todo: a short timer goes off when
@@ -731,7 +735,7 @@ class SparkILoop(in0: Option[BufferedReader], val out: PrintWriter, val master:
def interpretStartingWith(code: String): Option[String] = {
// signal completion non-completion input has been received
in.completion.resetVerbosity()
-
+
def reallyInterpret = {
val reallyResult = intp.interpret(code)
(reallyResult, reallyResult match {
@@ -741,7 +745,7 @@ class SparkILoop(in0: Option[BufferedReader], val out: PrintWriter, val master:
if (in.interactive && code.endsWith("\n\n")) {
echo("You typed two blank lines. Starting a new command.")
None
- }
+ }
else in.readLine(ContinueString) match {
case null =>
// we know compilation is going to fail since we're at EOF and the
@@ -755,10 +759,10 @@ class SparkILoop(in0: Option[BufferedReader], val out: PrintWriter, val master:
}
})
}
-
+
/** Here we place ourselves between the user and the interpreter and examine
* the input they are ostensibly submitting. We intervene in several cases:
- *
+ *
* 1) If the line starts with "scala> " it is assumed to be an interpreter paste.
* 2) If the line starts with "." (but not ".." or "./") it is treated as an invocation
* on the previous result.
@@ -787,7 +791,7 @@ class SparkILoop(in0: Option[BufferedReader], val out: PrintWriter, val master:
val (code, result) = reallyInterpret
//if (power != null && code == IR.Error)
// runCompletion
-
+
result
}
else runCompletion match {
@@ -808,7 +812,7 @@ class SparkILoop(in0: Option[BufferedReader], val out: PrintWriter, val master:
}
case _ =>
}
-
+
/** Tries to create a JLineReader, falling back to SimpleReader:
* unless settings or properties are such that it should start
* with SimpleReader.
@@ -837,6 +841,10 @@ class SparkILoop(in0: Option[BufferedReader], val out: PrintWriter, val master:
org.apache.spark.repl.Main.interp.out.flush();
""")
command("import org.apache.spark.SparkContext._")
+ val prop = System.getenv("SPARK_SHELL_INIT_BLOCK")
+ if (prop != null) {
+ command(prop)
+ }
}
echo("Type in expressions to have them evaluated.")
echo("Type :help for more information.")
@@ -884,7 +892,7 @@ class SparkILoop(in0: Option[BufferedReader], val out: PrintWriter, val master:
this.settings = settings
createInterpreter()
-
+
// sets in to some kind of reader depending on environmental cues
in = in0 match {
case Some(reader) => SimpleReader(reader, out, true)
@@ -895,10 +903,10 @@ class SparkILoop(in0: Option[BufferedReader], val out: PrintWriter, val master:
// it is broken on startup; go ahead and exit
if (intp.reporter.hasErrors)
return false
-
- try {
+
+ try {
// this is about the illusion of snappiness. We call initialize()
- // which spins off a separate thread, then print the prompt and try
+ // which spins off a separate thread, then print the prompt and try
// our best to look ready. Ideally the user will spend a
// couple seconds saying "wow, it starts so fast!" and by the time
// they type a command the compiler is ready to roll.
@@ -920,19 +928,19 @@ class SparkILoop(in0: Option[BufferedReader], val out: PrintWriter, val master:
def neededHelp(): String =
(if (command.settings.help.value) command.usageMsg + "\n" else "") +
(if (command.settings.Xhelp.value) command.xusageMsg + "\n" else "")
-
+
// if they asked for no help and command is valid, we call the real main
neededHelp() match {
case "" => command.ok && process(command.settings)
case help => echoNoNL(help) ; true
}
}
-
+
@deprecated("Use `process` instead", "2.9.0")
def main(args: Array[String]): Unit = {
if (isReplDebug)
System.out.println(new java.util.Date)
-
+
process(args)
}
@deprecated("Use `process` instead", "2.9.0")
@@ -948,7 +956,7 @@ object SparkILoop {
// like if you'd just typed it into the repl.
def runForTranscript(code: String, settings: Settings): String = {
import java.io.{ BufferedReader, StringReader, OutputStreamWriter }
-
+
stringFromStream { ostream =>
Console.withOut(ostream) {
val output = new PrintWriter(new OutputStreamWriter(ostream), true) {
@@ -977,19 +985,19 @@ object SparkILoop {
}
}
}
-
+
/** Creates an interpreter loop with default settings and feeds
* the given code to it as input.
*/
def run(code: String, sets: Settings = new Settings): String = {
import java.io.{ BufferedReader, StringReader, OutputStreamWriter }
-
+
stringFromStream { ostream =>
Console.withOut(ostream) {
val input = new BufferedReader(new StringReader(code))
val output = new PrintWriter(new OutputStreamWriter(ostream), true)
val repl = new SparkILoop(input, output)
-
+
if (sets.classpath.isDefault)
sets.classpath.value = sys.props("java.class.path")
@@ -1017,7 +1025,7 @@ object SparkILoop {
repl.settings.embeddedDefaults[T]
repl.createInterpreter()
repl.in = SparkJLineReader(repl)
-
+
// rebind exit so people don't accidentally call sys.exit by way of predef
repl.quietRun("""def exit = println("Type :quit to resume program execution.")""")
args foreach (p => repl.bind(p.name, p.tpe, p.value))
@@ -1025,5 +1033,5 @@ object SparkILoop {
echo("\nDebug repl exiting.")
repl.closeInterpreter()
- }
+ }
}