aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorPrashant Sharma <prashant.s@imaginea.com>2013-07-15 11:15:55 +0530
committerPrashant Sharma <prashant.s@imaginea.com>2013-07-15 11:15:55 +0530
commita3494d405d4c54b672711dfbee0c7d5d688e96d8 (patch)
tree2f17e0c477786732dfef85e36b1e059196b21b15
parenta220e11a07eb56fade8d0573503b58ce659fda54 (diff)
parent4698a0d6886905ef21cbd52e108d0dcab3df12df (diff)
downloadspark-a3494d405d4c54b672711dfbee0c7d5d688e96d8.tar.gz
spark-a3494d405d4c54b672711dfbee0c7d5d688e96d8.tar.bz2
spark-a3494d405d4c54b672711dfbee0c7d5d688e96d8.zip
Merge branch 'master' of github.com:mesos/spark into scala-2.10
Conflicts: core/src/main/scala/spark/Utils.scala core/src/test/scala/spark/ui/UISuite.scala project/SparkBuild.scala run
-rwxr-xr-xbin/compute-classpath.sh111
-rwxr-xr-xbin/start-slave.sh3
-rw-r--r--core/src/hadoop2-yarn/scala/spark/deploy/yarn/ApplicationMaster.scala58
-rw-r--r--core/src/main/scala/spark/KryoSerializer.scala4
-rw-r--r--core/src/main/scala/spark/RDD.scala12
-rw-r--r--core/src/main/scala/spark/Utils.scala31
-rw-r--r--core/src/main/scala/spark/api/java/JavaRDDLike.scala25
-rw-r--r--core/src/main/scala/spark/deploy/master/Master.scala2
-rw-r--r--core/src/main/scala/spark/deploy/master/ui/ApplicationPage.scala4
-rw-r--r--core/src/main/scala/spark/deploy/master/ui/IndexPage.scala4
-rw-r--r--core/src/main/scala/spark/deploy/worker/ExecutorRunner.scala4
-rw-r--r--core/src/main/scala/spark/deploy/worker/Worker.scala2
-rw-r--r--core/src/main/scala/spark/deploy/worker/ui/IndexPage.scala11
-rw-r--r--core/src/main/scala/spark/deploy/worker/ui/WorkerWebUI.scala103
-rw-r--r--core/src/main/scala/spark/scheduler/cluster/ClusterTaskSetManager.scala10
-rw-r--r--core/src/main/scala/spark/storage/StorageUtils.scala11
-rw-r--r--core/src/main/scala/spark/ui/Page.scala2
-rw-r--r--core/src/main/scala/spark/ui/SparkUI.scala7
-rw-r--r--core/src/main/scala/spark/ui/UIUtils.scala5
-rw-r--r--core/src/main/scala/spark/ui/env/EnvironmentUI.scala69
-rw-r--r--core/src/main/scala/spark/ui/jobs/IndexPage.scala8
-rw-r--r--core/src/main/scala/spark/ui/storage/RDDPage.scala17
-rw-r--r--core/src/test/scala/spark/RDDSuite.scala20
-rw-r--r--core/src/test/scala/spark/UtilsSuite.scala49
-rw-r--r--core/src/test/scala/spark/ui/UISuite.scala2
-rw-r--r--docs/configuration.md12
-rw-r--r--docs/streaming-programming-guide.md4
-rwxr-xr-xmake-distribution.sh39
-rw-r--r--mllib/src/main/scala/spark/mllib/recommendation/ALS.scala70
-rw-r--r--project/SparkBuild.scala5
-rwxr-xr-xrun48
-rwxr-xr-xspark-shell67
32 files changed, 644 insertions, 175 deletions
diff --git a/bin/compute-classpath.sh b/bin/compute-classpath.sh
index 2caefe98c9..c7874dcd8c 100755
--- a/bin/compute-classpath.sh
+++ b/bin/compute-classpath.sh
@@ -24,46 +24,71 @@ PYSPARK_DIR="$FWDIR/python"
# Build up classpath
CLASSPATH="$SPARK_CLASSPATH"
-CLASSPATH="$CLASSPATH:$FWDIR/conf"
-CLASSPATH="$CLASSPATH:$CORE_DIR/target/scala-$SCALA_VERSION/classes"
-if [ -n "$SPARK_TESTING" ] ; then
- CLASSPATH="$CLASSPATH:$CORE_DIR/target/scala-$SCALA_VERSION/test-classes"
- CLASSPATH="$CLASSPATH:$STREAMING_DIR/target/scala-$SCALA_VERSION/test-classes"
-fi
-CLASSPATH="$CLASSPATH:$CORE_DIR/src/main/resources"
-CLASSPATH="$CLASSPATH:$REPL_DIR/target/scala-$SCALA_VERSION/classes"
-CLASSPATH="$CLASSPATH:$EXAMPLES_DIR/target/scala-$SCALA_VERSION/classes"
-CLASSPATH="$CLASSPATH:$STREAMING_DIR/target/scala-$SCALA_VERSION/classes"
-CLASSPATH="$CLASSPATH:$STREAMING_DIR/lib/org/apache/kafka/kafka/0.7.2-spark/*" # <-- our in-project Kafka Jar
-if [ -e "$FWDIR/lib_managed" ]; then
- CLASSPATH="$CLASSPATH:$FWDIR/lib_managed/jars/*"
- CLASSPATH="$CLASSPATH:$FWDIR/lib_managed/bundles/*"
-fi
-CLASSPATH="$CLASSPATH:$REPL_DIR/lib/*"
-# Add the shaded JAR for Maven builds
-if [ -e $REPL_BIN_DIR/target ]; then
- for jar in `find "$REPL_BIN_DIR/target" -name 'spark-repl-*-shaded-hadoop*.jar'`; do
+
+function dev_classpath {
+ CLASSPATH="$CLASSPATH:$FWDIR/conf"
+ CLASSPATH="$CLASSPATH:$CORE_DIR/target/scala-$SCALA_VERSION/classes"
+ if [ -n "$SPARK_TESTING" ] ; then
+ CLASSPATH="$CLASSPATH:$CORE_DIR/target/scala-$SCALA_VERSION/test-classes"
+ CLASSPATH="$CLASSPATH:$STREAMING_DIR/target/scala-$SCALA_VERSION/test-classes"
+ fi
+ CLASSPATH="$CLASSPATH:$CORE_DIR/src/main/resources"
+ CLASSPATH="$CLASSPATH:$REPL_DIR/target/scala-$SCALA_VERSION/classes"
+ CLASSPATH="$CLASSPATH:$EXAMPLES_DIR/target/scala-$SCALA_VERSION/classes"
+ CLASSPATH="$CLASSPATH:$STREAMING_DIR/target/scala-$SCALA_VERSION/classes"
+ CLASSPATH="$CLASSPATH:$STREAMING_DIR/lib/org/apache/kafka/kafka/0.7.2-spark/*" # <-- our in-project Kafka Jar
+ if [ -e "$FWDIR/lib_managed" ]; then
+ CLASSPATH="$CLASSPATH:$FWDIR/lib_managed/jars/*"
+ CLASSPATH="$CLASSPATH:$FWDIR/lib_managed/bundles/*"
+ fi
+ CLASSPATH="$CLASSPATH:$REPL_DIR/lib/*"
+ # Add the shaded JAR for Maven builds
+ if [ -e $REPL_BIN_DIR/target ]; then
+ for jar in `find "$REPL_BIN_DIR/target" -name 'spark-repl-*-shaded-hadoop*.jar'`; do
+ CLASSPATH="$CLASSPATH:$jar"
+ done
+ # The shaded JAR doesn't contain examples, so include those separately
+ EXAMPLES_JAR=`ls "$EXAMPLES_DIR/target/spark-examples"*[0-9T].jar`
+ CLASSPATH+=":$EXAMPLES_JAR"
+ fi
+ CLASSPATH="$CLASSPATH:$BAGEL_DIR/target/scala-$SCALA_VERSION/classes"
+ CLASSPATH="$CLASSPATH:$MLLIB_DIR/target/scala-$SCALA_VERSION/classes"
+ for jar in `find $PYSPARK_DIR/lib -name '*jar'`; do
CLASSPATH="$CLASSPATH:$jar"
done
- # The shaded JAR doesn't contain examples, so include those separately
- EXAMPLES_JAR=`ls "$EXAMPLES_DIR/target/spark-examples"*[0-9T].jar`
- CLASSPATH+=":$EXAMPLES_JAR"
-fi
-CLASSPATH="$CLASSPATH:$BAGEL_DIR/target/scala-$SCALA_VERSION/classes"
-CLASSPATH="$CLASSPATH:$MLLIB_DIR/target/scala-$SCALA_VERSION/classes"
-for jar in `find $PYSPARK_DIR/lib -name '*jar'`; do
- CLASSPATH="$CLASSPATH:$jar"
-done
-# Figure out the JAR file that our examples were packaged into. This includes a bit of a hack
-# to avoid the -sources and -doc packages that are built by publish-local.
-if [ -e "$EXAMPLES_DIR/target/scala-$SCALA_VERSION/spark-examples"*[0-9T].jar ]; then
- # Use the JAR from the SBT build
- export SPARK_EXAMPLES_JAR=`ls "$EXAMPLES_DIR/target/scala-$SCALA_VERSION/spark-examples"*[0-9T].jar`
-fi
-if [ -e "$EXAMPLES_DIR/target/spark-examples"*[0-9T].jar ]; then
- # Use the JAR from the Maven build
- export SPARK_EXAMPLES_JAR=`ls "$EXAMPLES_DIR/target/spark-examples"*[0-9T].jar`
+ # Figure out the JAR file that our examples were packaged into. This includes a bit of a hack
+ # to avoid the -sources and -doc packages that are built by publish-local.
+ if [ -e "$EXAMPLES_DIR/target/scala-$SCALA_VERSION/spark-examples"*[0-9T].jar ]; then
+ # Use the JAR from the SBT build
+ export SPARK_EXAMPLES_JAR=`ls "$EXAMPLES_DIR/target/scala-$SCALA_VERSION/spark-examples"*[0-9T].jar`
+ fi
+ if [ -e "$EXAMPLES_DIR/target/spark-examples"*[0-9T].jar ]; then
+ # Use the JAR from the Maven build
+ export SPARK_EXAMPLES_JAR=`ls "$EXAMPLES_DIR/target/spark-examples"*[0-9T].jar`
+ fi
+
+ # Add Scala standard library
+ if [ -z "$SCALA_LIBRARY_PATH" ]; then
+ if [ -z "$SCALA_HOME" ]; then
+ echo "SCALA_HOME is not set" >&2
+ exit 1
+ fi
+ SCALA_LIBRARY_PATH="$SCALA_HOME/lib"
+ fi
+ CLASSPATH="$CLASSPATH:$SCALA_LIBRARY_PATH/scala-library.jar"
+ CLASSPATH="$CLASSPATH:$SCALA_LIBRARY_PATH/scala-compiler.jar"
+ CLASSPATH="$CLASSPATH:$SCALA_LIBRARY_PATH/jline.jar"
+}
+
+function release_classpath {
+ CLASSPATH="$CLASSPATH:$FWDIR/jars/*"
+}
+
+if [ -f "$FWDIR/RELEASE" ]; then
+ release_classpath
+else
+ dev_classpath
fi
# Add hadoop conf dir - else FileSystem.*, etc fail !
@@ -76,16 +101,4 @@ if [ "x" != "x$YARN_CONF_DIR" ]; then
CLASSPATH="$CLASSPATH:$YARN_CONF_DIR"
fi
-# Add Scala standard library
-if [ -z "$SCALA_LIBRARY_PATH" ]; then
- if [ -z "$SCALA_HOME" ]; then
- echo "SCALA_HOME is not set" >&2
- exit 1
- fi
- SCALA_LIBRARY_PATH="$SCALA_HOME/lib"
-fi
-CLASSPATH="$CLASSPATH:$SCALA_LIBRARY_PATH/scala-library.jar"
-CLASSPATH="$CLASSPATH:$SCALA_LIBRARY_PATH/scala-compiler.jar"
-CLASSPATH="$CLASSPATH:$SCALA_LIBRARY_PATH/jline.jar"
-
echo "$CLASSPATH"
diff --git a/bin/start-slave.sh b/bin/start-slave.sh
index 26b5b9d462..1082c09eb1 100755
--- a/bin/start-slave.sh
+++ b/bin/start-slave.sh
@@ -1,4 +1,7 @@
#!/usr/bin/env bash
+#
+# Usage: start-slave.sh <worker#> <master-spark-URL>
+# where <master-spark-URL> is like "spark://localhost:7077"
bin=`dirname "$0"`
bin=`cd "$bin"; pwd`
diff --git a/core/src/hadoop2-yarn/scala/spark/deploy/yarn/ApplicationMaster.scala b/core/src/hadoop2-yarn/scala/spark/deploy/yarn/ApplicationMaster.scala
index f19648ec68..6a0617cc06 100644
--- a/core/src/hadoop2-yarn/scala/spark/deploy/yarn/ApplicationMaster.scala
+++ b/core/src/hadoop2-yarn/scala/spark/deploy/yarn/ApplicationMaster.scala
@@ -27,6 +27,7 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration) e
private val yarnConf: YarnConfiguration = new YarnConfiguration(conf)
private var yarnAllocator: YarnAllocationHandler = null
+ private var isFinished:Boolean = false
def run() {
@@ -68,10 +69,7 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration) e
// Wait for the user class to Finish
userThread.join()
-
- // Finish the ApplicationMaster
- finishApplicationMaster()
- // TODO: Exit based on success/failure
+
System.exit(0)
}
@@ -124,17 +122,30 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration) e
}
}
}
-
+
private def startUserClass(): Thread = {
logInfo("Starting the user JAR in a separate Thread")
val mainMethod = Class.forName(args.userClass, false, Thread.currentThread.getContextClassLoader)
.getMethod("main", classOf[Array[String]])
val t = new Thread {
override def run() {
- // Copy
- var mainArgs: Array[String] = new Array[String](args.userArgs.size())
- args.userArgs.copyToArray(mainArgs, 0, args.userArgs.size())
- mainMethod.invoke(null, mainArgs)
+ var successed = false
+ try {
+ // Copy
+ var mainArgs: Array[String] = new Array[String](args.userArgs.size())
+ args.userArgs.copyToArray(mainArgs, 0, args.userArgs.size())
+ mainMethod.invoke(null, mainArgs)
+ // some job script has "System.exit(0)" at the end, for example SparkPi, SparkLR
+ // userThread will stop here unless it has uncaught exception thrown out
+ // It need shutdown hook to set SUCCEEDED
+ successed = true
+ } finally {
+ if (successed) {
+ ApplicationMaster.this.finishApplicationMaster(FinalApplicationStatus.SUCCEEDED)
+ } else {
+ ApplicationMaster.this.finishApplicationMaster(FinalApplicationStatus.FAILED)
+ }
+ }
}
}
t.start()
@@ -179,7 +190,7 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration) e
logInfo("All workers have launched.")
// Launch a progress reporter thread, else app will get killed after expiration (def: 10mins) timeout
- if (userThread.isAlive){
+ if (userThread.isAlive) {
// ensure that progress is sent before YarnConfiguration.RM_AM_EXPIRY_INTERVAL_MS elapse.
val timeoutInterval = yarnConf.getInt(YarnConfiguration.RM_AM_EXPIRY_INTERVAL_MS, 120000)
@@ -197,7 +208,7 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration) e
val t = new Thread {
override def run() {
- while (userThread.isAlive){
+ while (userThread.isAlive) {
val missingWorkerCount = args.numWorkers - yarnAllocator.getNumWorkersRunning
if (missingWorkerCount > 0) {
logInfo("Allocating " + missingWorkerCount + " containers to make up for (potentially ?) lost containers")
@@ -235,14 +246,23 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration) e
}
}
*/
-
- def finishApplicationMaster() {
+
+ def finishApplicationMaster(status: FinalApplicationStatus) {
+
+ synchronized {
+ if (isFinished) {
+ return
+ }
+ isFinished = true
+ }
+
+ logInfo("finishApplicationMaster with " + status)
val finishReq = Records.newRecord(classOf[FinishApplicationMasterRequest])
.asInstanceOf[FinishApplicationMasterRequest]
finishReq.setAppAttemptId(appAttemptId)
- // TODO: Check if the application has failed or succeeded
- finishReq.setFinishApplicationStatus(FinalApplicationStatus.SUCCEEDED)
+ finishReq.setFinishApplicationStatus(status)
resourceManager.finishApplicationMaster(finishReq)
+
}
}
@@ -256,7 +276,7 @@ object ApplicationMaster {
private val ALLOCATOR_LOOP_WAIT_COUNT = 30
def incrementAllocatorLoop(by: Int) {
val count = yarnAllocatorLoop.getAndAdd(by)
- if (count >= ALLOCATOR_LOOP_WAIT_COUNT){
+ if (count >= ALLOCATOR_LOOP_WAIT_COUNT) {
yarnAllocatorLoop.synchronized {
// to wake threads off wait ...
yarnAllocatorLoop.notifyAll()
@@ -291,14 +311,16 @@ object ApplicationMaster {
logInfo("Invoking sc stop from shutdown hook")
sc.stop()
// best case ...
- for (master <- applicationMasters) master.finishApplicationMaster
+ for (master <- applicationMasters) {
+ master.finishApplicationMaster(FinalApplicationStatus.SUCCEEDED)
+ }
}
} )
}
// Wait for initialization to complete and atleast 'some' nodes can get allocated
yarnAllocatorLoop.synchronized {
- while (yarnAllocatorLoop.get() <= ALLOCATOR_LOOP_WAIT_COUNT){
+ while (yarnAllocatorLoop.get() <= ALLOCATOR_LOOP_WAIT_COUNT) {
yarnAllocatorLoop.wait(1000L)
}
}
diff --git a/core/src/main/scala/spark/KryoSerializer.scala b/core/src/main/scala/spark/KryoSerializer.scala
index d723ab7b1e..c7dbcc6fbc 100644
--- a/core/src/main/scala/spark/KryoSerializer.scala
+++ b/core/src/main/scala/spark/KryoSerializer.scala
@@ -210,6 +210,10 @@ class KryoSerializer extends spark.serializer.Serializer with Logging {
val reg = Class.forName(regCls, true, classLoader).newInstance().asInstanceOf[KryoRegistrator]
reg.registerClasses(kryo)
}
+
+ // Allow disabling Kryo reference tracking if user knows their object graphs don't have loops
+ kryo.setReferences(System.getProperty("spark.kryo.referenceTracking", "true").toBoolean)
+
kryo
}
diff --git a/core/src/main/scala/spark/RDD.scala b/core/src/main/scala/spark/RDD.scala
index 8dde5391d3..3a454df10d 100644
--- a/core/src/main/scala/spark/RDD.scala
+++ b/core/src/main/scala/spark/RDD.scala
@@ -781,10 +781,20 @@ abstract class RDD[T: ClassTag](
}.reduce { (queue1, queue2) =>
queue1 ++= queue2
queue1
- }.toArray
+ }.toArray.sorted(ord.reverse)
}
/**
+ * Returns the first K elements from this RDD as defined by
+ * the specified implicit Ordering[T] and maintains the
+ * ordering.
+ * @param num the number of top elements to return
+ * @param ord the implicit ordering for T
+ * @return an array of top elements
+ */
+ def takeOrdered(num: Int)(implicit ord: Ordering[T]): Array[T] = top(num)(ord.reverse)
+
+ /**
* Save this RDD as a text file, using string representations of elements.
*/
def saveAsTextFile(path: String) {
diff --git a/core/src/main/scala/spark/Utils.scala b/core/src/main/scala/spark/Utils.scala
index f90b2ccaa1..9817272b0e 100644
--- a/core/src/main/scala/spark/Utils.scala
+++ b/core/src/main/scala/spark/Utils.scala
@@ -549,10 +549,15 @@ private object Utils extends Logging {
/**
* Execute a command and get its output, throwing an exception if it yields a code other than 0.
*/
- def executeAndGetOutput(command: Seq[String], workingDir: File = new File(".")): String = {
- val process = new ProcessBuilder(command: _*)
+ def executeAndGetOutput(command: Seq[String], workingDir: File = new File("."),
+ extraEnvironment: Map[String, String] = Map.empty): String = {
+ val builder = new ProcessBuilder(command: _*)
.directory(workingDir)
- .start()
+ val environment = builder.environment()
+ for ((key, value) <- extraEnvironment) {
+ environment.put(key, value)
+ }
+ val process = builder.start()
new Thread("read stderr for " + command(0)) {
override def run() {
for (line <- Source.fromInputStream(process.getErrorStream).getLines) {
@@ -577,8 +582,15 @@ private object Utils extends Logging {
output.toString
}
+ /**
+ * A regular expression to match classes of the "core" Spark API that we want to skip when
+ * finding the call site of a method.
+ */
+ private val SPARK_CLASS_REGEX = """^spark(\.api\.java)?(\.rdd)?\.[A-Z]""".r
+
private[spark] class CallSiteInfo(val lastSparkMethod: String, val firstUserFile: String,
val firstUserLine: Int, val firstUserClass: String)
+
/**
* When called inside a class in the spark package, returns the name of the user code class
* (outside the spark package) that called into Spark, as well as which Spark method they called.
@@ -600,7 +612,7 @@ private object Utils extends Logging {
for (el <- trace) {
if (!finished) {
- if (el.getClassName.startsWith("spark.") && !el.getClassName.startsWith("spark.examples.")) {
+ if (SPARK_CLASS_REGEX.findFirstIn(el.getClassName) != None) {
lastSparkMethod = if (el.getMethodName == "<init>") {
// Spark method is a constructor; get its class name
el.getClassName.substring(el.getClassName.lastIndexOf('.') + 1)
@@ -625,15 +637,16 @@ private object Utils extends Logging {
callSiteInfo.firstUserLine)
}
- /** Return a string containing the last `n` bytes of a file. */
- def lastNBytes(path: String, n: Int): String = {
+ /** Return a string containing part of a file from byte 'start' to 'end'. */
+ def offsetBytes(path: String, start: Long, end: Long): String = {
val file = new File(path)
val length = file.length()
- val buff = new Array[Byte](math.min(n, length.toInt))
- val skip = math.max(0, length - n)
+ val effectiveEnd = math.min(length, end)
+ val effectiveStart = math.max(0, start)
+ val buff = new Array[Byte]((effectiveEnd-effectiveStart).toInt)
val stream = new FileInputStream(file)
- stream.skip(skip)
+ stream.skip(effectiveStart)
stream.read(buff)
stream.close()
Source.fromBytes(buff).mkString
diff --git a/core/src/main/scala/spark/api/java/JavaRDDLike.scala b/core/src/main/scala/spark/api/java/JavaRDDLike.scala
index 3fe2011f4c..b0d094d564 100644
--- a/core/src/main/scala/spark/api/java/JavaRDDLike.scala
+++ b/core/src/main/scala/spark/api/java/JavaRDDLike.scala
@@ -385,4 +385,29 @@ trait JavaRDDLike[T, This <: JavaRDDLike[T, This]] extends Serializable {
val comp = com.google.common.collect.Ordering.natural().asInstanceOf[Comparator[T]]
top(num, comp)
}
+
+ /**
+ * Returns the first K elements from this RDD as defined by
+ * the specified Comparator[T] and maintains the order.
+ * @param num the number of top elements to return
+ * @param comp the comparator that defines the order
+ * @return an array of top elements
+ */
+ def takeOrdered(num: Int, comp: Comparator[T]): JList[T] = {
+ import scala.collection.JavaConversions._
+ val topElems = rdd.takeOrdered(num)(Ordering.comparatorToOrdering(comp))
+ val arr: java.util.Collection[T] = topElems.toSeq
+ new java.util.ArrayList(arr)
+ }
+
+ /**
+ * Returns the first K elements from this RDD using the
+ * natural ordering for T while maintain the order.
+ * @param num the number of top elements to return
+ * @return an array of top elements
+ */
+ def takeOrdered(num: Int): JList[T] = {
+ val comp = com.google.common.collect.Ordering.natural().asInstanceOf[Comparator[T]]
+ takeOrdered(num, comp)
+ }
}
diff --git a/core/src/main/scala/spark/deploy/master/Master.scala b/core/src/main/scala/spark/deploy/master/Master.scala
index d31e6735b7..315a00f244 100644
--- a/core/src/main/scala/spark/deploy/master/Master.scala
+++ b/core/src/main/scala/spark/deploy/master/Master.scala
@@ -72,7 +72,7 @@ private[spark] class Master(host: String, port: Int, webUiPort: Int) extends Act
} else {
addWorker(id, host, workerPort, cores, memory, worker_webUiPort, publicAddress)
context.watch(sender) // This doesn't work with remote actors but helps for testing
- sender ! RegisteredWorker("http://" + masterPublicAddress + ":" + webUi.boundPort)
+ sender ! RegisteredWorker("http://" + masterPublicAddress + ":" + webUi.boundPort.get)
schedule()
}
}
diff --git a/core/src/main/scala/spark/deploy/master/ui/ApplicationPage.scala b/core/src/main/scala/spark/deploy/master/ui/ApplicationPage.scala
index 939f8c9587..88a3db223a 100644
--- a/core/src/main/scala/spark/deploy/master/ui/ApplicationPage.scala
+++ b/core/src/main/scala/spark/deploy/master/ui/ApplicationPage.scala
@@ -91,9 +91,9 @@ private[spark] class ApplicationPage(parent: MasterWebUI) {
<td>{executor.memory}</td>
<td>{executor.state}</td>
<td>
- <a href={"%s/log?appId=%s&executorId=%s&logType=stdout"
+ <a href={"%s/logPage?appId=%s&executorId=%s&logType=stdout"
.format(executor.worker.webUiAddress, executor.application.id, executor.id)}>stdout</a>
- <a href={"%s/log?appId=%s&executorId=%s&logType=stderr"
+ <a href={"%s/logPage?appId=%s&executorId=%s&logType=stderr"
.format(executor.worker.webUiAddress, executor.application.id, executor.id)}>stderr</a>
</td>
</tr>
diff --git a/core/src/main/scala/spark/deploy/master/ui/IndexPage.scala b/core/src/main/scala/spark/deploy/master/ui/IndexPage.scala
index 73e378f988..617e4021d5 100644
--- a/core/src/main/scala/spark/deploy/master/ui/IndexPage.scala
+++ b/core/src/main/scala/spark/deploy/master/ui/IndexPage.scala
@@ -38,8 +38,8 @@ private[spark] class IndexPage(parent: MasterWebUI) {
<div class="row">
<div class="span12">
<ul class="unstyled">
- <li><strong>URL:</strong>{state.uri}</li>
- <li><strong>Workers:</strong>{state.workers.size}</li>
+ <li><strong>URL:</strong> {state.uri}</li>
+ <li><strong>Workers:</strong> {state.workers.size}</li>
<li><strong>Cores:</strong> {state.workers.map(_.cores).sum} Total,
{state.workers.map(_.coresUsed).sum} Used</li>
<li><strong>Memory:</strong>
diff --git a/core/src/main/scala/spark/deploy/worker/ExecutorRunner.scala b/core/src/main/scala/spark/deploy/worker/ExecutorRunner.scala
index d7f58b2cb1..5d3d54c65e 100644
--- a/core/src/main/scala/spark/deploy/worker/ExecutorRunner.scala
+++ b/core/src/main/scala/spark/deploy/worker/ExecutorRunner.scala
@@ -98,7 +98,9 @@ private[spark] class ExecutorRunner(
// Figure out our classpath with the external compute-classpath script
val ext = if (System.getProperty("os.name").startsWith("Windows")) ".cmd" else ".sh"
- val classPath = Utils.executeAndGetOutput(Seq(sparkHome + "/bin/compute-classpath" + ext))
+ val classPath = Utils.executeAndGetOutput(
+ Seq(sparkHome + "/bin/compute-classpath" + ext),
+ extraEnvironment=appDesc.command.environment)
Seq("-cp", classPath) ++ libraryOpts ++ userOpts ++ memoryOpts
}
diff --git a/core/src/main/scala/spark/deploy/worker/Worker.scala b/core/src/main/scala/spark/deploy/worker/Worker.scala
index 7c1871e047..3a65ccad27 100644
--- a/core/src/main/scala/spark/deploy/worker/Worker.scala
+++ b/core/src/main/scala/spark/deploy/worker/Worker.scala
@@ -77,7 +77,7 @@ private[spark] class Worker(
sparkHome = new File(Option(System.getenv("SPARK_HOME")).getOrElse("."))
logInfo("Spark home: " + sparkHome)
createWorkDir()
- webUi = new WorkerWebUI(self, workDir, Some(webUiPort))
+ webUi = new WorkerWebUI(this, workDir, Some(webUiPort))
webUi.start()
connectToMaster()
}
diff --git a/core/src/main/scala/spark/deploy/worker/ui/IndexPage.scala b/core/src/main/scala/spark/deploy/worker/ui/IndexPage.scala
index 8cb74632aa..6247dd4065 100644
--- a/core/src/main/scala/spark/deploy/worker/ui/IndexPage.scala
+++ b/core/src/main/scala/spark/deploy/worker/ui/IndexPage.scala
@@ -17,17 +17,18 @@ import spark.Utils
import spark.ui.UIUtils
private[spark] class IndexPage(parent: WorkerWebUI) {
+ val workerActor = parent.worker.self
val worker = parent.worker
val timeout = parent.timeout
def renderJson(request: HttpServletRequest): JValue = {
- val stateFuture = (worker ? RequestWorkerState)(timeout).mapTo[WorkerState]
+ val stateFuture = (workerActor ? RequestWorkerState)(timeout).mapTo[WorkerState]
val workerState = Await.result(stateFuture, 30 seconds)
JsonProtocol.writeWorkerState(workerState)
}
def render(request: HttpServletRequest): Seq[Node] = {
- val stateFuture = (worker ? RequestWorkerState)(timeout).mapTo[WorkerState]
+ val stateFuture = (workerActor ? RequestWorkerState)(timeout).mapTo[WorkerState]
val workerState = Await.result(stateFuture, 30 seconds)
val executorHeaders = Seq("ExecutorID", "Cores", "Memory", "Job Details", "Logs")
@@ -89,11 +90,11 @@ private[spark] class IndexPage(parent: WorkerWebUI) {
</ul>
</td>
<td>
- <a href={"log?appId=%s&executorId=%s&logType=stdout"
+ <a href={"logPage?appId=%s&executorId=%s&logType=stdout"
.format(executor.appId, executor.execId)}>stdout</a>
- <a href={"log?appId=%s&executorId=%s&logType=stderr"
+ <a href={"logPage?appId=%s&executorId=%s&logType=stderr"
.format(executor.appId, executor.execId)}>stderr</a>
- </td>
+ </td>
</tr>
}
diff --git a/core/src/main/scala/spark/deploy/worker/ui/WorkerWebUI.scala b/core/src/main/scala/spark/deploy/worker/ui/WorkerWebUI.scala
index b1336dd1af..9d0a39bb1e 100644
--- a/core/src/main/scala/spark/deploy/worker/ui/WorkerWebUI.scala
+++ b/core/src/main/scala/spark/deploy/worker/ui/WorkerWebUI.scala
@@ -11,15 +11,17 @@ import javax.servlet.http.HttpServletRequest
import org.eclipse.jetty.server.{Handler, Server}
+import spark.deploy.worker.Worker
import spark.{Utils, Logging}
import spark.ui.JettyUtils
import spark.ui.JettyUtils._
+import spark.ui.UIUtils
/**
* Web UI server for the standalone worker.
*/
private[spark]
-class WorkerWebUI(val worker: ActorRef, val workDir: File, requestedPort: Option[Int] = None)
+class WorkerWebUI(val worker: Worker, val workDir: File, requestedPort: Option[Int] = None)
extends Logging {
implicit val timeout = Timeout(
Duration.create(System.getProperty("spark.akka.askTimeout", "10").toLong, "seconds"))
@@ -35,6 +37,7 @@ class WorkerWebUI(val worker: ActorRef, val workDir: File, requestedPort: Option
val handlers = Array[(String, Handler)](
("/static", createStaticHandler(WorkerWebUI.STATIC_RESOURCE_DIR)),
("/log", (request: HttpServletRequest) => log(request)),
+ ("/logPage", (request: HttpServletRequest) => logPage(request)),
("/json", (request: HttpServletRequest) => indexPage.renderJson(request)),
("*", (request: HttpServletRequest) => indexPage.render(request))
)
@@ -53,18 +56,104 @@ class WorkerWebUI(val worker: ActorRef, val workDir: File, requestedPort: Option
}
def log(request: HttpServletRequest): String = {
+ val defaultBytes = 100 * 1024
val appId = request.getParameter("appId")
val executorId = request.getParameter("executorId")
val logType = request.getParameter("logType")
+ val offset = Option(request.getParameter("offset")).map(_.toLong)
+ val byteLength = Option(request.getParameter("byteLength")).map(_.toInt).getOrElse(defaultBytes)
+ val path = "%s/%s/%s/%s".format(workDir.getPath, appId, executorId, logType)
- val maxBytes = 1024 * 1024 // Guard against OOM
- val defaultBytes = 100 * 1024
- val numBytes = Option(request.getParameter("numBytes"))
- .flatMap(s => Some(s.toInt)).getOrElse(defaultBytes)
+ val (startByte, endByte) = getByteRange(path, offset, byteLength)
+ val file = new File(path)
+ val logLength = file.length
+
+ val pre = "==== Bytes %s-%s of %s of %s/%s/%s ====\n"
+ .format(startByte, endByte, logLength, appId, executorId, logType)
+ pre + Utils.offsetBytes(path, startByte, endByte)
+ }
+ def logPage(request: HttpServletRequest): Seq[scala.xml.Node] = {
+ val defaultBytes = 100 * 1024
+ val appId = request.getParameter("appId")
+ val executorId = request.getParameter("executorId")
+ val logType = request.getParameter("logType")
+ val offset = Option(request.getParameter("offset")).map(_.toLong)
+ val byteLength = Option(request.getParameter("byteLength")).map(_.toInt).getOrElse(defaultBytes)
val path = "%s/%s/%s/%s".format(workDir.getPath, appId, executorId, logType)
- val pre = "==== Last %s bytes of %s/%s/%s ====\n".format(numBytes, appId, executorId, logType)
- pre + Utils.lastNBytes(path, math.min(numBytes, maxBytes))
+
+ val (startByte, endByte) = getByteRange(path, offset, byteLength)
+ val file = new File(path)
+ val logLength = file.length
+
+ val logText = <node>{Utils.offsetBytes(path, startByte, endByte)}</node>
+
+ val linkToMaster = <p><a href={worker.masterWebUiUrl}>Back to Master</a></p>
+
+ val range = <span>Bytes {startByte.toString} - {endByte.toString} of {logLength}</span>
+
+ val backButton =
+ if (startByte > 0) {
+ <a href={"?appId=%s&executorId=%s&logType=%s&offset=%s&byteLength=%s"
+ .format(appId, executorId, logType, math.max(startByte-byteLength, 0),
+ byteLength)}>
+ <button>Previous {Utils.memoryBytesToString(math.min(byteLength, startByte))}</button>
+ </a>
+ }
+ else {
+ <button disabled="disabled">Previous 0 B</button>
+ }
+
+ val nextButton =
+ if (endByte < logLength) {
+ <a href={"?appId=%s&executorId=%s&logType=%s&offset=%s&byteLength=%s".
+ format(appId, executorId, logType, endByte, byteLength)}>
+ <button>Next {Utils.memoryBytesToString(math.min(byteLength, logLength-endByte))}</button>
+ </a>
+ }
+ else {
+ <button disabled="disabled">Next 0 B</button>
+ }
+
+ val content =
+ <html>
+ <body>
+ {linkToMaster}
+ <hr />
+ <div>
+ <div style="float:left;width:40%">{backButton}</div>
+ <div style="float:left;">{range}</div>
+ <div style="float:right;">{nextButton}</div>
+ </div>
+ <br />
+ <div style="height:500px;overflow:auto;padding:5px;">
+ <pre>{logText}</pre>
+ </div>
+ </body>
+ </html>
+ UIUtils.basicSparkPage(content, logType + " log page for " + appId)
+ }
+
+ /** Determine the byte range for a log or log page. */
+ def getByteRange(path: String, offset: Option[Long], byteLength: Int)
+ : (Long, Long) = {
+ val defaultBytes = 100 * 1024
+ val maxBytes = 1024 * 1024
+
+ val file = new File(path)
+ val logLength = file.length()
+ val getOffset = offset.getOrElse(logLength-defaultBytes)
+
+ val startByte =
+ if (getOffset < 0) 0L
+ else if (getOffset > logLength) logLength
+ else getOffset
+
+ val logPageLength = math.min(byteLength, maxBytes)
+
+ val endByte = math.min(startByte+logPageLength, logLength)
+
+ (startByte, endByte)
}
def stop() {
diff --git a/core/src/main/scala/spark/scheduler/cluster/ClusterTaskSetManager.scala b/core/src/main/scala/spark/scheduler/cluster/ClusterTaskSetManager.scala
index fe6420a522..327d6797ae 100644
--- a/core/src/main/scala/spark/scheduler/cluster/ClusterTaskSetManager.scala
+++ b/core/src/main/scala/spark/scheduler/cluster/ClusterTaskSetManager.scala
@@ -454,10 +454,10 @@ private[spark] class ClusterTaskSetManager(
val taskId = sched.newTaskId()
// Figure out whether this should count as a preferred launch
val taskLocality =
- if (isProcessLocalLocation(task, hostPort)) TaskLocality.PROCESS_LOCAL else
- if (isHostLocalLocation(task, hostPort)) TaskLocality.NODE_LOCAL else
- if (isRackLocalLocation(task, hostPort)) TaskLocality.RACK_LOCAL else
- TaskLocality.ANY
+ if (isProcessLocalLocation(task, hostPort)) TaskLocality.PROCESS_LOCAL
+ else if (isHostLocalLocation(task, hostPort)) TaskLocality.NODE_LOCAL
+ else if (isRackLocalLocation(task, hostPort)) TaskLocality.RACK_LOCAL
+ else TaskLocality.ANY
val prefStr = taskLocality.toString
logInfo("Starting task %s:%d as TID %s on slave %s: %s (%s)".format(
taskSet.id, index, taskId, execId, hostPort, prefStr))
@@ -467,7 +467,7 @@ private[spark] class ClusterTaskSetManager(
val info = new TaskInfo(taskId, index, time, execId, hostPort, taskLocality)
taskInfos(taskId) = info
taskAttempts(index) = info :: taskAttempts(index)
- if (TaskLocality.NODE_LOCAL == taskLocality) {
+ if (taskLocality == TaskLocality.PROCESS_LOCAL || taskLocality == TaskLocality.NODE_LOCAL) {
lastPreferredLaunchTime = time
}
// Serialize and return the task
diff --git a/core/src/main/scala/spark/storage/StorageUtils.scala b/core/src/main/scala/spark/storage/StorageUtils.scala
index 950c0cdf35..aca16f533a 100644
--- a/core/src/main/scala/spark/storage/StorageUtils.scala
+++ b/core/src/main/scala/spark/storage/StorageUtils.scala
@@ -39,12 +39,19 @@ case class RDDInfo(id: Int, name: String, storageLevel: StorageLevel,
private[spark]
object StorageUtils {
- /* Given the current storage status of the BlockManager, returns information for each RDD */
- def rddInfoFromStorageStatus(storageStatusList: Array[StorageStatus],
+ /* Returns RDD-level information, compiled from a list of StorageStatus objects */
+ def rddInfoFromStorageStatus(storageStatusList: Seq[StorageStatus],
sc: SparkContext) : Array[RDDInfo] = {
rddInfoFromBlockStatusList(storageStatusList.flatMap(_.blocks).toMap, sc)
}
+ /* Returns a map of blocks to their locations, compiled from a list of StorageStatus objects */
+ def blockLocationsFromStorageStatus(storageStatusList: Seq[StorageStatus]) = {
+ val blockLocationPairs = storageStatusList
+ .flatMap(s => s.blocks.map(b => (b._1, s.blockManagerId.hostPort)))
+ blockLocationPairs.groupBy(_._1).map{case (k, v) => (k, v.unzip._2)}.toMap
+ }
+
/* Given a list of BlockStatus objets, returns information for each RDD */
def rddInfoFromBlockStatusList(infos: Map[String, BlockStatus],
sc: SparkContext) : Array[RDDInfo] = {
diff --git a/core/src/main/scala/spark/ui/Page.scala b/core/src/main/scala/spark/ui/Page.scala
index c853b44b76..ed8f91842c 100644
--- a/core/src/main/scala/spark/ui/Page.scala
+++ b/core/src/main/scala/spark/ui/Page.scala
@@ -1,3 +1,3 @@
package spark.ui
-private[spark] object Page extends Enumeration { val Storage, Jobs = Value } \ No newline at end of file
+private[spark] object Page extends Enumeration { val Storage, Jobs, Environment = Value } \ No newline at end of file
diff --git a/core/src/main/scala/spark/ui/SparkUI.scala b/core/src/main/scala/spark/ui/SparkUI.scala
index b3bdc2c490..874e5ba8ec 100644
--- a/core/src/main/scala/spark/ui/SparkUI.scala
+++ b/core/src/main/scala/spark/ui/SparkUI.scala
@@ -5,15 +5,13 @@ import javax.servlet.http.HttpServletRequest
import org.eclipse.jetty.server.{Handler, Server}
import spark.{Logging, SparkContext, Utils}
+import spark.ui.env.EnvironmentUI
import spark.ui.storage.BlockManagerUI
import spark.ui.jobs.JobProgressUI
-import spark.ui.UIUtils._
import spark.ui.JettyUtils._
/** Top level user interface for Spark */
private[spark] class SparkUI(sc: SparkContext) extends Logging {
- // TODO(pwendell): It would be nice to add a view that prints out environment information
-
val host = Utils.localHostName()
val port = Option(System.getProperty("spark.ui.port")).getOrElse(SparkUI.DEFAULT_PORT).toInt
var boundPort: Option[Int] = None
@@ -25,7 +23,8 @@ private[spark] class SparkUI(sc: SparkContext) extends Logging {
)
val storage = new BlockManagerUI(sc)
val jobs = new JobProgressUI(sc)
- val allHandlers = storage.getHandlers ++ jobs.getHandlers ++ handlers
+ val env = new EnvironmentUI(sc)
+ val allHandlers = storage.getHandlers ++ jobs.getHandlers ++ env.getHandlers ++ handlers
/** Bind the HTTP server which backs this web interface */
def bind() {
diff --git a/core/src/main/scala/spark/ui/UIUtils.scala b/core/src/main/scala/spark/ui/UIUtils.scala
index 7b79290d1b..36d9c47245 100644
--- a/core/src/main/scala/spark/ui/UIUtils.scala
+++ b/core/src/main/scala/spark/ui/UIUtils.scala
@@ -19,6 +19,10 @@ private[spark] object UIUtils {
case Jobs => <li class="active"><a href="/stages">Jobs</a></li>
case _ => <li><a href="/stages">Jobs</a></li>
}
+ val environment = page match {
+ case Environment => <li class="active"><a href="/environment">Environment</a></li>
+ case _ => <li><a href="/environment">Environment</a></li>
+ }
<html>
<head>
@@ -44,6 +48,7 @@ private[spark] object UIUtils {
<ul class="nav">
{storage}
{jobs}
+ {environment}
</ul>
<ul id="infolist">
<li>Application: <strong>{sc.appName}</strong></li>
diff --git a/core/src/main/scala/spark/ui/env/EnvironmentUI.scala b/core/src/main/scala/spark/ui/env/EnvironmentUI.scala
new file mode 100644
index 0000000000..6b8b9f05bb
--- /dev/null
+++ b/core/src/main/scala/spark/ui/env/EnvironmentUI.scala
@@ -0,0 +1,69 @@
+package spark.ui.env
+
+import javax.servlet.http.HttpServletRequest
+
+import org.eclipse.jetty.server.Handler
+
+import scala.collection.JavaConversions._
+import scala.util.Properties
+
+import spark.ui.JettyUtils._
+import spark.ui.UIUtils.headerSparkPage
+import spark.ui.Page.Environment
+import spark.SparkContext
+import spark.ui.UIUtils
+
+import scala.xml.Node
+
+private[spark] class EnvironmentUI(sc: SparkContext) {
+
+ def getHandlers = Seq[(String, Handler)](
+ ("/environment", (request: HttpServletRequest) => envDetails(request))
+ )
+
+ def envDetails(request: HttpServletRequest): Seq[Node] = {
+ val jvmInformation = Seq(
+ ("Java Version", "%s (%s)".format(Properties.javaVersion, Properties.javaVendor)),
+ ("Java Home", Properties.javaHome),
+ ("Scala Version", Properties.versionString),
+ ("Scala Home", Properties.scalaHome)
+ )
+ def jvmRow(kv: (String, String)) = <tr><td>{kv._1}</td><td>{kv._2}</td></tr>
+ def jvmTable = UIUtils.listingTable(Seq("Name", "Value"), jvmRow, jvmInformation)
+
+ val properties = System.getProperties.iterator.toSeq
+ val classPathProperty = properties
+ .filter{case (k, v) => k.contains("java.class.path")}
+ .headOption
+ .getOrElse("", "")
+ val sparkProperties = properties.filter(_._1.startsWith("spark"))
+ val otherProperties = properties.diff(sparkProperties :+ classPathProperty)
+
+ val propertyHeaders = Seq("Name", "Value")
+ def propertyRow(kv: (String, String)) = <tr><td>{kv._1}</td><td>{kv._2}</td></tr>
+ val sparkPropertyTable = UIUtils.listingTable(propertyHeaders, propertyRow, sparkProperties)
+ val otherPropertyTable = UIUtils.listingTable(propertyHeaders, propertyRow, otherProperties)
+
+ val classPathEntries = classPathProperty._2
+ .split(System.getProperty("path.separator", ":"))
+ .filterNot(e => e.isEmpty)
+ .map(e => (e, "System Classpath"))
+ val addedJars = sc.addedJars.iterator.toSeq.map{case (path, time) => (path, "Added By User")}
+ val addedFiles = sc.addedFiles.iterator.toSeq.map{case (path, time) => (path, "Added By User")}
+ val classPath = addedJars ++ addedFiles ++ classPathEntries
+
+ val classPathHeaders = Seq("Resource", "Source")
+ def classPathRow(data: (String, String)) = <tr><td>{data._1}</td><td>{data._2}</td></tr>
+ val classPathTable = UIUtils.listingTable(classPathHeaders, classPathRow, classPath)
+
+ val content =
+ <span>
+ <h2>Runtime Information</h2> {jvmTable}
+ <h2>Spark Properties</h2> {sparkPropertyTable}
+ <h2>System Properties</h2> {otherPropertyTable}
+ <h2>Classpath Entries</h2> {classPathTable}
+ </span>
+
+ headerSparkPage(content, sc, "Environment", Environment)
+ }
+}
diff --git a/core/src/main/scala/spark/ui/jobs/IndexPage.scala b/core/src/main/scala/spark/ui/jobs/IndexPage.scala
index 1e675ab2cb..7907ab3bc7 100644
--- a/core/src/main/scala/spark/ui/jobs/IndexPage.scala
+++ b/core/src/main/scala/spark/ui/jobs/IndexPage.scala
@@ -29,10 +29,10 @@ private[spark] class IndexPage(parent: JobProgressUI) {
<th>Stage Id</th>
<th>Origin</th>
<th>Submitted</th>
- <td>Duration</td>
- <td colspan="2">Tasks: Complete/Total</td>
- <td>Shuffle Activity</td>
- <td>Stored RDD</td>
+ <th>Duration</th>
+ <th colspan="2">Tasks: Complete/Total</th>
+ <th>Shuffle Activity</th>
+ <th>Stored RDD</th>
</thead>
<tbody>
{rows.map(r => makeRow(r))}
diff --git a/core/src/main/scala/spark/ui/storage/RDDPage.scala b/core/src/main/scala/spark/ui/storage/RDDPage.scala
index 0cb1e47ea5..428db6fa95 100644
--- a/core/src/main/scala/spark/ui/storage/RDDPage.scala
+++ b/core/src/main/scala/spark/ui/storage/RDDPage.scala
@@ -26,8 +26,14 @@ private[spark] class RDDPage(parent: BlockManagerUI) {
val workers = filteredStorageStatusList.map((prefix, _))
val workerTable = listingTable(workerHeaders, workerRow, workers)
- val blockHeaders = Seq("Block Name", "Storage Level", "Size in Memory", "Size on Disk")
- val blocks = filteredStorageStatusList.flatMap(_.blocks).toArray.sortWith(_._1 < _._1)
+ val blockHeaders = Seq("Block Name", "Storage Level", "Size in Memory", "Size on Disk",
+ "Locations")
+
+ val blockStatuses = filteredStorageStatusList.flatMap(_.blocks).toArray.sortWith(_._1 < _._1)
+ val blockLocations = StorageUtils.blockLocationsFromStorageStatus(filteredStorageStatusList)
+ val blocks = blockStatuses.map {
+ case(id, status) => (id, status, blockLocations.get(id).getOrElse(Seq("UNKNOWN")))
+ }
val blockTable = listingTable(blockHeaders, blockRow, blocks)
val content =
@@ -74,8 +80,8 @@ private[spark] class RDDPage(parent: BlockManagerUI) {
headerSparkPage(content, parent.sc, "RDD Info: " + rddInfo.name, Jobs)
}
- def blockRow(blk: (String, BlockStatus)): Seq[Node] = {
- val (id, block) = blk
+ def blockRow(row: (String, BlockStatus, Seq[String])): Seq[Node] = {
+ val (id, block, locations) = row
<tr>
<td>{id}</td>
<td>
@@ -87,6 +93,9 @@ private[spark] class RDDPage(parent: BlockManagerUI) {
<td sorttable_customkey={block.diskSize.toString}>
{Utils.memoryBytesToString(block.diskSize)}
</td>
+ <td>
+ {locations.map(l => <span>{l}<br/></span>)}
+ </td>
</tr>
}
diff --git a/core/src/test/scala/spark/RDDSuite.scala b/core/src/test/scala/spark/RDDSuite.scala
index e41ae385c0..aa3ee5f5ee 100644
--- a/core/src/test/scala/spark/RDDSuite.scala
+++ b/core/src/test/scala/spark/RDDSuite.scala
@@ -240,7 +240,7 @@ class RDDSuite extends FunSuite with SharedSparkContext {
val ints = sc.makeRDD(scala.util.Random.shuffle(nums), 2)
val topK = ints.top(5)
assert(topK.size === 5)
- assert(topK.sorted === nums.sorted.takeRight(5))
+ assert(topK === nums.reverse.take(5))
}
test("top with custom ordering") {
@@ -252,6 +252,24 @@ class RDDSuite extends FunSuite with SharedSparkContext {
assert(topK.sorted === Array("b", "a"))
}
+ test("takeOrdered with predefined ordering") {
+ val nums = Array(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)
+ val rdd = sc.makeRDD(nums, 2)
+ val sortedLowerK = rdd.takeOrdered(5)
+ assert(sortedLowerK.size === 5)
+ assert(sortedLowerK === Array(1, 2, 3, 4, 5))
+ }
+
+ test("takeOrdered with custom ordering") {
+ val nums = Array(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)
+ implicit val ord = implicitly[Ordering[Int]].reverse
+ val rdd = sc.makeRDD(nums, 2)
+ val sortedTopK = rdd.takeOrdered(5)
+ assert(sortedTopK.size === 5)
+ assert(sortedTopK === Array(10, 9, 8, 7, 6))
+ assert(sortedTopK === nums.sorted(ord).take(5))
+ }
+
test("takeSample") {
val data = sc.parallelize(1 to 100, 2)
for (seed <- 1 to 5) {
diff --git a/core/src/test/scala/spark/UtilsSuite.scala b/core/src/test/scala/spark/UtilsSuite.scala
index 4a113e16bf..1e1260f606 100644
--- a/core/src/test/scala/spark/UtilsSuite.scala
+++ b/core/src/test/scala/spark/UtilsSuite.scala
@@ -1,7 +1,10 @@
package spark
+import com.google.common.base.Charsets
+import com.google.common.io.Files
+import java.io.{ByteArrayOutputStream, ByteArrayInputStream, FileOutputStream, File}
import org.scalatest.FunSuite
-import java.io.{ByteArrayOutputStream, ByteArrayInputStream}
+import org.apache.commons.io.FileUtils
import scala.util.Random
class UtilsSuite extends FunSuite {
@@ -71,5 +74,49 @@ class UtilsSuite extends FunSuite {
assert(Utils.splitCommandString("''") === Seq(""))
assert(Utils.splitCommandString("\"\"") === Seq(""))
}
+
+ test("string formatting of time durations") {
+ val second = 1000
+ val minute = second * 60
+ val hour = minute * 60
+ def str = Utils.msDurationToString(_)
+
+ assert(str(123) === "123 ms")
+ assert(str(second) === "1.0 s")
+ assert(str(second + 462) === "1.5 s")
+ assert(str(hour) === "1.00 h")
+ assert(str(minute) === "1.0 m")
+ assert(str(minute + 4 * second + 34) === "1.1 m")
+ assert(str(10 * hour + minute + 4 * second) === "10.02 h")
+ assert(str(10 * hour + 59 * minute + 59 * second + 999) === "11.00 h")
+ }
+
+ test("reading offset bytes of a file") {
+ val tmpDir2 = Files.createTempDir()
+ val f1Path = tmpDir2 + "/f1"
+ val f1 = new FileOutputStream(f1Path)
+ f1.write("1\n2\n3\n4\n5\n6\n7\n8\n9\n".getBytes(Charsets.UTF_8))
+ f1.close()
+
+ // Read first few bytes
+ assert(Utils.offsetBytes(f1Path, 0, 5) === "1\n2\n3")
+
+ // Read some middle bytes
+ assert(Utils.offsetBytes(f1Path, 4, 11) === "3\n4\n5\n6")
+
+ // Read last few bytes
+ assert(Utils.offsetBytes(f1Path, 12, 18) === "7\n8\n9\n")
+
+ // Read some nonexistent bytes in the beginning
+ assert(Utils.offsetBytes(f1Path, -5, 5) === "1\n2\n3")
+
+ // Read some nonexistent bytes at the end
+ assert(Utils.offsetBytes(f1Path, 12, 22) === "7\n8\n9\n")
+
+ // Read some nonexistent bytes on both ends
+ assert(Utils.offsetBytes(f1Path, -3, 25) === "1\n2\n3\n4\n5\n6\n7\n8\n9\n")
+
+ FileUtils.deleteDirectory(tmpDir2)
+ }
}
diff --git a/core/src/test/scala/spark/ui/UISuite.scala b/core/src/test/scala/spark/ui/UISuite.scala
index b40c29bede..60bb0240a5 100644
--- a/core/src/test/scala/spark/ui/UISuite.scala
+++ b/core/src/test/scala/spark/ui/UISuite.scala
@@ -1,5 +1,7 @@
package spark.ui
+import scala.util.{Failure, Success, Try}
+import java.net.ServerSocket
import org.scalatest.FunSuite
import org.eclipse.jetty.server.Server
import java.net.ServerSocket
diff --git a/docs/configuration.md b/docs/configuration.md
index 5a80510959..5c06897cae 100644
--- a/docs/configuration.md
+++ b/docs/configuration.md
@@ -198,8 +198,18 @@ Apart from these, the following properties are also available, and may be useful
</td>
</tr>
<tr>
+ <td>spark.kryo.referenceTracking</td>
+ <td>true</td>
+ <td>
+ Whether to track references to the same object when serializing data with Kryo, which is
+ necessary if your object graphs have loops and useful for efficiency if they contain multiple
+ copies of the same object. Can be disabled to improve performance if you know this is not the
+ case.
+ </td>
+</tr>
+<tr>
<td>spark.kryoserializer.buffer.mb</td>
- <td>32</td>
+ <td>2</td>
<td>
Maximum object size to allow within Kryo (the library needs to create a buffer at least as
large as the largest single object you'll serialize). Increase this if you get a "buffer limit
diff --git a/docs/streaming-programming-guide.md b/docs/streaming-programming-guide.md
index f5788dc467..8cd1b0cd66 100644
--- a/docs/streaming-programming-guide.md
+++ b/docs/streaming-programming-guide.md
@@ -7,7 +7,7 @@ title: Spark Streaming Programming Guide
{:toc}
# Overview
-A Spark Streaming application is very similar to a Spark application; it consists of a *driver program* that runs the user's `main` function and continuous executes various *parallel operations* on input streams of data. The main abstraction Spark Streaming provides is a *discretized stream* (DStream), which is a continuous sequence of RDDs (distributed collections of elements) representing a continuous stream of data. DStreams can be created from live incoming data (such as data from a socket, Kafka, etc.) or can be generated by transformong existing DStreams using parallel operators like `map`, `reduce`, and `window`. The basic processing model is as follows:
+A Spark Streaming application is very similar to a Spark application; it consists of a *driver program* that runs the user's `main` function and continuous executes various *parallel operations* on input streams of data. The main abstraction Spark Streaming provides is a *discretized stream* (DStream), which is a continuous sequence of RDDs (distributed collections of elements) representing a continuous stream of data. DStreams can be created from live incoming data (such as data from a socket, Kafka, etc.) or can be generated by transforming existing DStreams using parallel operators like `map`, `reduce`, and `window`. The basic processing model is as follows:
(i) While a Spark Streaming driver program is running, the system receives data from various sources and and divides it into batches. Each batch of data is treated as an RDD, that is, an immutable parallel collection of data. These input RDDs are saved in memory and replicated to two nodes for fault-tolerance. This sequence of RDDs is collectively called an InputDStream.
(ii) Data received by InputDStreams are processed using DStream operations. Since all data is represented as RDDs and all DStream operations as RDD operations, data is automatically recovered in the event of node failures.
@@ -20,7 +20,7 @@ The first thing a Spark Streaming program must do is create a `StreamingContext`
new StreamingContext(master, appName, batchDuration, [sparkHome], [jars])
{% endhighlight %}
-The `master` parameter is a standard [Spark cluster URL](scala-programming-guide.html#master-urls) and can be "local" for local testing. The `appName` is a name of your program, which will be shown on your cluster's web UI. The `batchDuration` is the size of the batches (as explained earlier). This must be set carefully such the cluster can keep up with the processing of the data streams. Start with something conservative like 5 seconds. See the [Performance Tuning](#setting-the-right-batch-size) section for a detailed discussion. Finally, `sparkHome` and `jars` are necessary when running on a cluster to specify the location of your code, as described in the [Spark programming guide](scala-programming-guide.html#deploying-code-on-a-cluster).
+The `master` parameter is a standard [Spark cluster URL](scala-programming-guide.html#master-urls) and can be "local" for local testing. The `appName` is a name of your program, which will be shown on your cluster's web UI. The `batchDuration` is the size of the batches (as explained earlier). This must be set carefully such that the cluster can keep up with the processing of the data streams. Start with something conservative like 5 seconds. See the [Performance Tuning](#setting-the-right-batch-size) section for a detailed discussion. Finally, `sparkHome` and `jars` are necessary when running on a cluster to specify the location of your code, as described in the [Spark programming guide](scala-programming-guide.html#deploying-code-on-a-cluster).
This constructor creates a SparkContext for your job as well, which can be accessed with `streamingContext.sparkContext`.
diff --git a/make-distribution.sh b/make-distribution.sh
new file mode 100755
index 0000000000..feb13d52f9
--- /dev/null
+++ b/make-distribution.sh
@@ -0,0 +1,39 @@
+#!/bin/bash
+#
+# Script to create a binary distribution for easy deploys of Spark.
+# The distribution directory defaults to dist/ but can be overridden below.
+# The distribution contains fat (assembly) jars that include the Scala library,
+# so it is completely self contained.
+# It does not contain source or *.class files.
+#
+# Recommended deploy/testing procedure (standalone mode):
+# 1) Rsync / deploy the dist/ dir to one host
+# 2) cd to deploy dir; ./bin/start-master.sh
+# 3) Verify master is up by visiting web page, ie http://master-ip:8080. Note the spark:// URL.
+# 4) ./bin/start-slave.sh 1 <<spark:// URL>>
+# 5) MASTER="spark://my-master-ip:7077" ./spark-shell
+
+# Figure out where the Spark framework is installed
+FWDIR="$(cd `dirname $0`; pwd)"
+DISTDIR="$FWDIR/dist"
+
+# Get version from SBT
+export TERM=dumb # Prevents color codes in SBT output
+VERSION=$($FWDIR/sbt/sbt "show version" | tail -1 | cut -f 2)
+echo "Making distribution for Spark $VERSION in $DISTDIR..."
+
+# Build fat JAR
+$FWDIR/sbt/sbt "repl/assembly"
+
+# Make directories
+rm -rf "$DISTDIR"
+mkdir -p "$DISTDIR/jars"
+echo "$VERSION" >$DISTDIR/RELEASE
+
+# Copy jars
+cp $FWDIR/repl/target/*.jar "$DISTDIR/jars/"
+
+# Copy other things
+cp -r "$FWDIR/bin" "$DISTDIR"
+cp -r "$FWDIR/conf" "$DISTDIR"
+cp "$FWDIR/run" "$FWDIR/spark-shell" "$DISTDIR"
diff --git a/mllib/src/main/scala/spark/mllib/recommendation/ALS.scala b/mllib/src/main/scala/spark/mllib/recommendation/ALS.scala
index 6c9fb2359c..4c18cbdc6b 100644
--- a/mllib/src/main/scala/spark/mllib/recommendation/ALS.scala
+++ b/mllib/src/main/scala/spark/mllib/recommendation/ALS.scala
@@ -2,11 +2,14 @@ package spark.mllib.recommendation
import scala.collection.mutable.{ArrayBuffer, BitSet}
import scala.util.Random
+import scala.util.Sorting
import spark.{HashPartitioner, Partitioner, SparkContext, RDD}
import spark.storage.StorageLevel
+import spark.KryoRegistrator
import spark.SparkContext._
+import com.esotericsoftware.kryo.Kryo
import org.jblas.{DoubleMatrix, SimpleBlas, Solve}
@@ -34,6 +37,12 @@ private[recommendation] case class InLinkBlock(
/**
+ * A more compact class to represent a rating than Tuple3[Int, Int, Double].
+ */
+private[recommendation] case class Rating(user: Int, product: Int, rating: Double)
+
+
+/**
* Alternating Least Squares matrix factorization.
*
* This is a blocked implementation of the ALS factorization algorithm that groups the two sets
@@ -84,15 +93,15 @@ class ALS private (var numBlocks: Int, var rank: Int, var iterations: Int, var l
*/
def train(ratings: RDD[(Int, Int, Double)]): MatrixFactorizationModel = {
val numBlocks = if (this.numBlocks == -1) {
- math.max(ratings.context.defaultParallelism, ratings.partitions.size)
+ math.max(ratings.context.defaultParallelism, ratings.partitions.size / 2)
} else {
this.numBlocks
}
val partitioner = new HashPartitioner(numBlocks)
- val ratingsByUserBlock = ratings.map{ case (u, p, r) => (u % numBlocks, (u, p, r)) }
- val ratingsByProductBlock = ratings.map{ case (u, p, r) => (p % numBlocks, (p, u, r)) }
+ val ratingsByUserBlock = ratings.map{ case (u, p, r) => (u % numBlocks, Rating(u, p, r)) }
+ val ratingsByProductBlock = ratings.map{ case (u, p, r) => (p % numBlocks, Rating(p, u, r)) }
val (userInLinks, userOutLinks) = makeLinkRDDs(numBlocks, ratingsByUserBlock)
val (productInLinks, productOutLinks) = makeLinkRDDs(numBlocks, ratingsByProductBlock)
@@ -126,13 +135,13 @@ class ALS private (var numBlocks: Int, var rank: Int, var iterations: Int, var l
* Make the out-links table for a block of the users (or products) dataset given the list of
* (user, product, rating) values for the users in that block (or the opposite for products).
*/
- private def makeOutLinkBlock(numBlocks: Int, ratings: Array[(Int, Int, Double)]): OutLinkBlock = {
- val userIds = ratings.map(_._1).distinct.sorted
+ private def makeOutLinkBlock(numBlocks: Int, ratings: Array[Rating]): OutLinkBlock = {
+ val userIds = ratings.map(_.user).distinct.sorted
val numUsers = userIds.length
val userIdToPos = userIds.zipWithIndex.toMap
val shouldSend = Array.fill(numUsers)(new BitSet(numBlocks))
- for ((u, p, r) <- ratings) {
- shouldSend(userIdToPos(u))(p % numBlocks) = true
+ for (r <- ratings) {
+ shouldSend(userIdToPos(r.user))(r.product % numBlocks) = true
}
OutLinkBlock(userIds, shouldSend)
}
@@ -141,18 +150,28 @@ class ALS private (var numBlocks: Int, var rank: Int, var iterations: Int, var l
* Make the in-links table for a block of the users (or products) dataset given a list of
* (user, product, rating) values for the users in that block (or the opposite for products).
*/
- private def makeInLinkBlock(numBlocks: Int, ratings: Array[(Int, Int, Double)]): InLinkBlock = {
- val userIds = ratings.map(_._1).distinct.sorted
+ private def makeInLinkBlock(numBlocks: Int, ratings: Array[Rating]): InLinkBlock = {
+ val userIds = ratings.map(_.user).distinct.sorted
val numUsers = userIds.length
val userIdToPos = userIds.zipWithIndex.toMap
+ // Split out our ratings by product block
+ val blockRatings = Array.fill(numBlocks)(new ArrayBuffer[Rating])
+ for (r <- ratings) {
+ blockRatings(r.product % numBlocks) += r
+ }
val ratingsForBlock = new Array[Array[(Array[Int], Array[Double])]](numBlocks)
for (productBlock <- 0 until numBlocks) {
- val ratingsInBlock = ratings.filter(t => t._2 % numBlocks == productBlock)
- val ratingsByProduct = ratingsInBlock.groupBy(_._2) // (p, Seq[(u, p, r)])
- .toArray
- .sortBy(_._1)
- .map{case (p, rs) => (rs.map(t => userIdToPos(t._1)), rs.map(_._3))}
- ratingsForBlock(productBlock) = ratingsByProduct
+ // Create an array of (product, Seq(Rating)) ratings
+ val groupedRatings = blockRatings(productBlock).groupBy(_.product).toArray
+ // Sort them by product ID
+ val ordering = new Ordering[(Int, ArrayBuffer[Rating])] {
+ def compare(a: (Int, ArrayBuffer[Rating]), b: (Int, ArrayBuffer[Rating])): Int = a._1 - b._1
+ }
+ Sorting.quickSort(groupedRatings)(ordering)
+ // Translate the user IDs to indices based on userIdToPos
+ ratingsForBlock(productBlock) = groupedRatings.map { case (p, rs) =>
+ (rs.view.map(r => userIdToPos(r.user)).toArray, rs.view.map(_.rating).toArray)
+ }
}
InLinkBlock(userIds, ratingsForBlock)
}
@@ -162,12 +181,12 @@ class ALS private (var numBlocks: Int, var rank: Int, var iterations: Int, var l
* the users (or (blockId, (p, u, r)) for the products). We create these simultaneously to avoid
* having to shuffle the (blockId, (u, p, r)) RDD twice, or to cache it.
*/
- private def makeLinkRDDs(numBlocks: Int, ratings: RDD[(Int, (Int, Int, Double))])
+ private def makeLinkRDDs(numBlocks: Int, ratings: RDD[(Int, Rating)])
: (RDD[(Int, InLinkBlock)], RDD[(Int, OutLinkBlock)]) =
{
val grouped = ratings.partitionBy(new HashPartitioner(numBlocks))
val links = grouped.mapPartitionsWithIndex((blockId, elements) => {
- val ratings = elements.map(_._2).toArray
+ val ratings = elements.map{_._2}.toArray
val inLinkBlock = makeInLinkBlock(numBlocks, ratings)
val outLinkBlock = makeOutLinkBlock(numBlocks, ratings)
Iterator.single((blockId, (inLinkBlock, outLinkBlock)))
@@ -366,19 +385,30 @@ object ALS {
train(ratings, rank, iterations, 0.01, -1)
}
+ private class ALSRegistrator extends KryoRegistrator {
+ override def registerClasses(kryo: Kryo) {
+ kryo.register(classOf[Rating])
+ }
+ }
+
def main(args: Array[String]) {
- if (args.length != 5) {
- println("Usage: ALS <master> <ratings_file> <rank> <iterations> <output_dir>")
+ if (args.length != 5 && args.length != 6) {
+ println("Usage: ALS <master> <ratings_file> <rank> <iterations> <output_dir> [<blocks>]")
System.exit(1)
}
val (master, ratingsFile, rank, iters, outputDir) =
(args(0), args(1), args(2).toInt, args(3).toInt, args(4))
+ val blocks = if (args.length == 6) args(5).toInt else -1
+ System.setProperty("spark.serializer", "spark.KryoSerializer")
+ System.setProperty("spark.kryo.registrator", classOf[ALSRegistrator].getName)
+ System.setProperty("spark.kryo.referenceTracking", "false")
+ System.setProperty("spark.locality.wait", "10000")
val sc = new SparkContext(master, "ALS")
val ratings = sc.textFile(ratingsFile).map { line =>
val fields = line.split(',')
(fields(0).toInt, fields(1).toInt, fields(2).toDouble)
}
- val model = ALS.train(ratings, rank, iters)
+ val model = ALS.train(ratings, rank, iters, 0.01, blocks)
model.userFeatures.map{ case (id, vec) => id + "," + vec.mkString(" ") }
.saveAsTextFile(outputDir + "/userFeatures")
model.productFeatures.map{ case (id, vec) => id + "," + vec.mkString(" ") }
diff --git a/project/SparkBuild.scala b/project/SparkBuild.scala
index bb2a97837c..d7cd337fa4 100644
--- a/project/SparkBuild.scala
+++ b/project/SparkBuild.scala
@@ -143,13 +143,12 @@ object SparkBuild extends Build {
libraryDependencies ++= Seq(
"com.google.guava" % "guava" % "14.0.1",
- "com.google.code.findbugs" % "jsr305" % "1.3.+",
+ "com.google.code.findbugs" % "jsr305" % "1.3.9",
"log4j" % "log4j" % "1.2.16",
"org.slf4j" % "slf4j-api" % slf4jVersion,
"org.slf4j" % "slf4j-log4j12" % slf4jVersion,
"com.ning" % "compress-lzf" % "0.8.4",
"commons-daemon" % "commons-daemon" % "1.0.10",
- "org.apache.hadoop" % "hadoop-core" % HADOOP_VERSION excludeAll(excludeNetty, excludeJackson),
"org.ow2.asm" % "asm" % "4.0",
"com.google.protobuf" % "protobuf-java" % "2.4.1",
"de.javakaffee" % "kryo-serializers" % "0.22",
@@ -197,7 +196,7 @@ object SparkBuild extends Build {
def replSettings = sharedSettings ++ Seq(
name := "spark-repl",
libraryDependencies ++= Seq("org.scala-lang" % "scala-compiler" % "2.10.1")
- )
+ ) ++ assemblySettings ++ extraAssemblySettings
def examplesSettings = sharedSettings ++ Seq(
name := "spark-examples",
diff --git a/run b/run
index 91665f8934..46d9c61ba1 100755
--- a/run
+++ b/run
@@ -1,8 +1,5 @@
#!/bin/bash
-
-SCALA_VERSION=2.10
-
# Figure out where the Scala framework is installed
FWDIR="$(cd `dirname $0`; pwd)"
@@ -78,7 +75,7 @@ else
exit 1
fi
fi
- if [ -z "$SCALA_LIBRARY_PATH" ]; then
+ if [[ ! -f "$FWDIR/RELEASE" && -z "$SCALA_LIBRARY_PATH" ]]; then
if [ -z "$SCALA_HOME" ]; then
echo "SCALA_HOME is not set" >&2
exit 1
@@ -105,43 +102,38 @@ fi
export JAVA_OPTS
# Attention: when changing the way the JAVA_OPTS are assembled, the change must be reflected in ExecutorRunner.scala!
-CORE_DIR="$FWDIR/core"
-EXAMPLES_DIR="$FWDIR/examples"
-REPL_DIR="$FWDIR/repl"
+if [ ! -f "$FWDIR/RELEASE" ]; then
+ CORE_DIR="$FWDIR/core"
+ EXAMPLES_DIR="$FWDIR/examples"
+ REPL_DIR="$FWDIR/repl"
-# Exit if the user hasn't compiled Spark
-if [ ! -e "$CORE_DIR/target" ]; then
- echo "Failed to find Spark classes in $CORE_DIR/target" >&2
- echo "You need to compile Spark before running this program" >&2
- exit 1
-fi
+ # Exit if the user hasn't compiled Spark
+ if [ ! -e "$CORE_DIR/target" ]; then
+ echo "Failed to find Spark classes in $CORE_DIR/target" >&2
+ echo "You need to compile Spark before running this program" >&2
+ exit 1
+ fi
-if [[ "$@" = *repl* && ! -e "$REPL_DIR/target" ]]; then
- echo "Failed to find Spark classes in $REPL_DIR/target" >&2
- echo "You need to compile Spark repl module before running this program" >&2
- exit 1
+ if [[ "$@" = *repl* && ! -e "$REPL_DIR/target" ]]; then
+ echo "Failed to find Spark classes in $REPL_DIR/target" >&2
+ echo "You need to compile Spark repl module before running this program" >&2
+ exit 1
+ fi
fi
# Compute classpath using external script
CLASSPATH=`$FWDIR/bin/compute-classpath.sh`
export CLASSPATH
-# Figure out the JAR file that our examples were packaged into. This includes a bit of a hack
-# to avoid the -sources and -doc packages that are built by publish-local.
-if [ -e "$EXAMPLES_DIR/target/scala-$SCALA_VERSION/spark-examples"*[0-9T].jar ]; then
- # Use the JAR from the SBT build
- export SPARK_EXAMPLES_JAR=`ls "$EXAMPLES_DIR/target/scala-$SCALA_VERSION/spark-examples"*[0-9T].jar`
-fi
-if [ -e "$EXAMPLES_DIR/target/spark-examples"*[0-9T].jar ]; then
- # Use the JAR from the Maven build
- export SPARK_EXAMPLES_JAR=`ls "$EXAMPLES_DIR/target/spark-examples"*[0-9T].jar`
-fi
-
if [ "$SPARK_LAUNCH_WITH_SCALA" == "1" ]; then
EXTRA_ARGS="" # Java options will be passed to scala as JAVA_OPTS
else
# The JVM doesn't read JAVA_OPTS by default so we need to pass it in
EXTRA_ARGS="$JAVA_OPTS"
fi
+<<<<<<< HEAD
exec "$RUNNER" -cp "$CLASSPATH" $EXTRA_ARGS "$@"
+=======
+exec "$RUNNER" -cp "$CLASSPATH" $EXTRA_ARGS "$@"
+>>>>>>> 4698a0d6886905ef21cbd52e108d0dcab3df12df
diff --git a/spark-shell b/spark-shell
index 574ae2104d..a8e72143fb 100755
--- a/spark-shell
+++ b/spark-shell
@@ -1,4 +1,65 @@
-#!/bin/sh
+#!/bin/bash --posix
+#
+# Shell script for starting the Spark Shell REPL
+# Note that it will set MASTER to spark://${SPARK_MASTER_IP}:${SPARK_MASTER_PORT}
+# if those two env vars are set in spark-env.sh but MASTER is not.
+# Options:
+# -c <cores> Set the number of cores for REPL to use
+#
FWDIR="`dirname $0`"
-export SPARK_LAUNCH_WITH_SCALA=1
-exec $FWDIR/run spark.repl.Main "$@"
+
+for o in "$@"; do
+ if [ "$1" = "-c" -o "$1" = "--cores" ]; then
+ shift
+ if [ -n "$1" ]; then
+ OPTIONS="-Dspark.cores.max=$1"
+ shift
+ fi
+ fi
+done
+
+# Set MASTER from spark-env if possible
+if [ -z "$MASTER" ]; then
+ if [ -e "$FWDIR/conf/spark-env.sh" ]; then
+ . "$FWDIR/conf/spark-env.sh"
+ fi
+ if [[ "x" != "x$SPARK_MASTER_IP" && "y" != "y$SPARK_MASTER_PORT" ]]; then
+ MASTER="spark://${SPARK_MASTER_IP}:${SPARK_MASTER_PORT}"
+ export MASTER
+ fi
+fi
+
+# Copy restore-TTY-on-exit functions from Scala script so spark-shell exits properly even in
+# binary distribution of Spark where Scala is not installed
+exit_status=127
+saved_stty=""
+
+# restore stty settings (echo in particular)
+function restoreSttySettings() {
+ stty $saved_stty
+ saved_stty=""
+}
+
+function onExit() {
+ if [[ "$saved_stty" != "" ]]; then
+ restoreSttySettings
+ fi
+ exit $exit_status
+}
+
+# to reenable echo if we are interrupted before completing.
+trap onExit INT
+
+# save terminal settings
+saved_stty=$(stty -g 2>/dev/null)
+# clear on error so we don't later try to restore them
+if [[ ! $? ]]; then
+ saved_stty=""
+fi
+
+$FWDIR/run $OPTIONS spark.repl.Main "$@"
+
+# record the exit status lest it be overwritten:
+# then reenable echo and propagate the code.
+exit_status=$?
+onExit