aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorEvan Chan <ev@ooyala.com>2013-06-28 17:18:35 -0700
committerEvan Chan <ev@ooyala.com>2013-06-28 17:18:35 -0700
commit1107b4d55b9d5845ebf49fb2eef8cde55095982a (patch)
tree81d0d0793862a2b78be199499d35c8fdf13e850d
parent8113c55df8f4b5f34140ddba5e58e132e3dc2d23 (diff)
parent50ca17635a904f9496ccf996cd2f90325168bb9b (diff)
downloadspark-1107b4d55b9d5845ebf49fb2eef8cde55095982a.tar.gz
spark-1107b4d55b9d5845ebf49fb2eef8cde55095982a.tar.bz2
spark-1107b4d55b9d5845ebf49fb2eef8cde55095982a.zip
Merge branch 'master' into 2013-06/assembly-jar-deploy
Conflicts: run Previous changes that I made to run and set-dev-classpath.sh instead have been folded into compute-classpath.sh
-rw-r--r--bin/compute-classpath.cmd52
-rwxr-xr-xbin/compute-classpath.sh102
-rw-r--r--core/src/main/scala/spark/Utils.scala96
-rw-r--r--core/src/main/scala/spark/deploy/worker/ExecutorRunner.scala30
-rw-r--r--core/src/test/scala/spark/CheckpointSuite.scala10
-rw-r--r--core/src/test/scala/spark/PairRDDFunctionsSuite.scala287
-rw-r--r--core/src/test/scala/spark/PartitioningSuite.scala19
-rw-r--r--core/src/test/scala/spark/PipedRDDSuite.scala16
-rw-r--r--core/src/test/scala/spark/RDDSuite.scala87
-rw-r--r--core/src/test/scala/spark/SharedSparkContext.scala25
-rw-r--r--core/src/test/scala/spark/ShuffleSuite.scala298
-rw-r--r--core/src/test/scala/spark/SizeEstimatorSuite.scala72
-rw-r--r--core/src/test/scala/spark/SortingSuite.scala23
-rw-r--r--core/src/test/scala/spark/UnpersistSuite.scala30
-rw-r--r--core/src/test/scala/spark/UtilsSuite.scala53
-rw-r--r--core/src/test/scala/spark/ZippedPartitionsSuite.scala3
-rw-r--r--core/src/test/scala/spark/scheduler/JobLoggerSuite.scala1
-rw-r--r--docs/python-programming-guide.md7
-rw-r--r--project/SparkBuild.scala2
-rw-r--r--repl/src/test/scala/spark/repl/ReplSuite.scala38
-rwxr-xr-xrun104
-rw-r--r--run2.cmd46
-rw-r--r--set-dev-classpath.sh118
23 files changed, 837 insertions, 682 deletions
diff --git a/bin/compute-classpath.cmd b/bin/compute-classpath.cmd
new file mode 100644
index 0000000000..6e7efbd334
--- /dev/null
+++ b/bin/compute-classpath.cmd
@@ -0,0 +1,52 @@
+@echo off
+
+rem This script computes Spark's classpath and prints it to stdout; it's used by both the "run"
+rem script and the ExecutorRunner in standalone cluster mode.
+
+set SCALA_VERSION=2.9.3
+
+rem Figure out where the Spark framework is installed
+set FWDIR=%~dp0..\
+
+rem Load environment variables from conf\spark-env.cmd, if it exists
+if exist "%FWDIR%conf\spark-env.cmd" call "%FWDIR%conf\spark-env.cmd"
+
+set CORE_DIR=%FWDIR%core
+set REPL_DIR=%FWDIR%repl
+set EXAMPLES_DIR=%FWDIR%examples
+set BAGEL_DIR=%FWDIR%bagel
+set STREAMING_DIR=%FWDIR%streaming
+set PYSPARK_DIR=%FWDIR%python
+
+rem Build up classpath
+set CLASSPATH=%SPARK_CLASSPATH%;%MESOS_CLASSPATH%;%FWDIR%conf;%CORE_DIR%\target\scala-%SCALA_VERSION%\classes
+set CLASSPATH=%CLASSPATH%;%CORE_DIR%\target\scala-%SCALA_VERSION%\test-classes;%CORE_DIR%\src\main\resources
+set CLASSPATH=%CLASSPATH%;%STREAMING_DIR%\target\scala-%SCALA_VERSION%\classes;%STREAMING_DIR%\target\scala-%SCALA_VERSION%\test-classes
+set CLASSPATH=%CLASSPATH%;%STREAMING_DIR%\lib\org\apache\kafka\kafka\0.7.2-spark\*
+set CLASSPATH=%CLASSPATH%;%REPL_DIR%\target\scala-%SCALA_VERSION%\classes;%EXAMPLES_DIR%\target\scala-%SCALA_VERSION%\classes
+set CLASSPATH=%CLASSPATH%;%FWDIR%lib_managed\jars\*
+set CLASSPATH=%CLASSPATH%;%FWDIR%lib_managed\bundles\*
+set CLASSPATH=%CLASSPATH%;%FWDIR%repl\lib\*
+set CLASSPATH=%CLASSPATH%;%FWDIR%python\lib\*
+set CLASSPATH=%CLASSPATH%;%BAGEL_DIR%\target\scala-%SCALA_VERSION%\classes
+
+rem Add hadoop conf dir - else FileSystem.*, etc fail
+rem Note, this assumes that there is either a HADOOP_CONF_DIR or YARN_CONF_DIR which hosts
+rem the configurtion files.
+if "x%HADOOP_CONF_DIR%"=="x" goto no_hadoop_conf_dir
+ set CLASSPATH=%CLASSPATH%;%HADOOP_CONF_DIR%
+:no_hadoop_conf_dir
+
+if "x%YARN_CONF_DIR%"=="x" goto no_yarn_conf_dir
+ set CLASSPATH=%CLASSPATH%;%YARN_CONF_DIR%
+:no_yarn_conf_dir
+
+rem Add Scala standard library
+set CLASSPATH=%CLASSPATH%;%SCALA_HOME%\lib\scala-library.jar;%SCALA_HOME%\lib\scala-compiler.jar;%SCALA_HOME%\lib\jline.jar
+
+rem A bit of a hack to allow calling this script within run2.cmd without seeing output
+if "%DONT_PRINT_CLASSPATH%"=="1" goto exit
+
+echo %CLASSPATH%
+
+:exit
diff --git a/bin/compute-classpath.sh b/bin/compute-classpath.sh
new file mode 100755
index 0000000000..47937a0322
--- /dev/null
+++ b/bin/compute-classpath.sh
@@ -0,0 +1,102 @@
+#!/bin/bash
+
+# This script computes Spark's classpath and prints it to stdout; it's used by both the "run"
+# script and the ExecutorRunner in standalone cluster mode.
+
+SCALA_VERSION=2.9.3
+
+# Figure out where Spark is installed
+FWDIR="$(cd `dirname $0`/..; pwd)"
+
+# Load environment variables from conf/spark-env.sh, if it exists
+if [ -e $FWDIR/conf/spark-env.sh ] ; then
+ . $FWDIR/conf/spark-env.sh
+fi
+
+CORE_DIR="$FWDIR/core"
+REPL_DIR="$FWDIR/repl"
+REPL_BIN_DIR="$FWDIR/repl-bin"
+EXAMPLES_DIR="$FWDIR/examples"
+BAGEL_DIR="$FWDIR/bagel"
+STREAMING_DIR="$FWDIR/streaming"
+PYSPARK_DIR="$FWDIR/python"
+
+# Build up classpath
+CLASSPATH="$SPARK_CLASSPATH"
+
+function dev_classpath {
+ CLASSPATH="$CLASSPATH:$FWDIR/conf"
+ CLASSPATH="$CLASSPATH:$CORE_DIR/target/scala-$SCALA_VERSION/classes"
+ if [ -n "$SPARK_TESTING" ] ; then
+ CLASSPATH="$CLASSPATH:$CORE_DIR/target/scala-$SCALA_VERSION/test-classes"
+ CLASSPATH="$CLASSPATH:$STREAMING_DIR/target/scala-$SCALA_VERSION/test-classes"
+ fi
+ CLASSPATH="$CLASSPATH:$CORE_DIR/src/main/resources"
+ CLASSPATH="$CLASSPATH:$REPL_DIR/target/scala-$SCALA_VERSION/classes"
+ CLASSPATH="$CLASSPATH:$EXAMPLES_DIR/target/scala-$SCALA_VERSION/classes"
+ CLASSPATH="$CLASSPATH:$STREAMING_DIR/target/scala-$SCALA_VERSION/classes"
+ CLASSPATH="$CLASSPATH:$STREAMING_DIR/lib/org/apache/kafka/kafka/0.7.2-spark/*" # <-- our in-project Kafka Jar
+ if [ -e "$FWDIR/lib_managed" ]; then
+ CLASSPATH="$CLASSPATH:$FWDIR/lib_managed/jars/*"
+ CLASSPATH="$CLASSPATH:$FWDIR/lib_managed/bundles/*"
+ fi
+ CLASSPATH="$CLASSPATH:$REPL_DIR/lib/*"
+ # Add the shaded JAR for Maven builds
+ if [ -e $REPL_BIN_DIR/target ]; then
+ for jar in `find "$REPL_BIN_DIR/target" -name 'spark-repl-*-shaded-hadoop*.jar'`; do
+ CLASSPATH="$CLASSPATH:$jar"
+ done
+ # The shaded JAR doesn't contain examples, so include those separately
+ EXAMPLES_JAR=`ls "$EXAMPLES_DIR/target/spark-examples"*[0-9T].jar`
+ CLASSPATH+=":$EXAMPLES_JAR"
+ fi
+ CLASSPATH="$CLASSPATH:$BAGEL_DIR/target/scala-$SCALA_VERSION/classes"
+ for jar in `find $PYSPARK_DIR/lib -name '*jar'`; do
+ CLASSPATH="$CLASSPATH:$jar"
+ done
+
+ # Figure out the JAR file that our examples were packaged into. This includes a bit of a hack
+ # to avoid the -sources and -doc packages that are built by publish-local.
+ if [ -e "$EXAMPLES_DIR/target/scala-$SCALA_VERSION/spark-examples"*[0-9T].jar ]; then
+ # Use the JAR from the SBT build
+ export SPARK_EXAMPLES_JAR=`ls "$EXAMPLES_DIR/target/scala-$SCALA_VERSION/spark-examples"*[0-9T].jar`
+ fi
+ if [ -e "$EXAMPLES_DIR/target/spark-examples"*[0-9T].jar ]; then
+ # Use the JAR from the Maven build
+ export SPARK_EXAMPLES_JAR=`ls "$EXAMPLES_DIR/target/spark-examples"*[0-9T].jar`
+ fi
+
+ # Add Scala standard library
+ if [ -z "$SCALA_LIBRARY_PATH" ]; then
+ if [ -z "$SCALA_HOME" ]; then
+ echo "SCALA_HOME is not set" >&2
+ exit 1
+ fi
+ SCALA_LIBRARY_PATH="$SCALA_HOME/lib"
+ fi
+ CLASSPATH="$CLASSPATH:$SCALA_LIBRARY_PATH/scala-library.jar"
+ CLASSPATH="$CLASSPATH:$SCALA_LIBRARY_PATH/scala-compiler.jar"
+ CLASSPATH="$CLASSPATH:$SCALA_LIBRARY_PATH/jline.jar"
+}
+
+function release_classpath {
+ CLASSPATH="$CLASSPATH:$FWDIR/jars/*"
+}
+
+if [ -f "$FWDIR/RELEASE" ]; then
+ release_classpath
+else
+ dev_classpath
+fi
+
+# Add hadoop conf dir - else FileSystem.*, etc fail !
+# Note, this assumes that there is either a HADOOP_CONF_DIR or YARN_CONF_DIR which hosts
+# the configurtion files.
+if [ "x" != "x$HADOOP_CONF_DIR" ]; then
+ CLASSPATH="$CLASSPATH:$HADOOP_CONF_DIR"
+fi
+if [ "x" != "x$YARN_CONF_DIR" ]; then
+ CLASSPATH="$CLASSPATH:$YARN_CONF_DIR"
+fi
+
+echo "$CLASSPATH"
diff --git a/core/src/main/scala/spark/Utils.scala b/core/src/main/scala/spark/Utils.scala
index f3621c6bee..f41efa9d29 100644
--- a/core/src/main/scala/spark/Utils.scala
+++ b/core/src/main/scala/spark/Utils.scala
@@ -522,7 +522,38 @@ private object Utils extends Logging {
execute(command, new File("."))
}
- private[spark] class CallSiteInfo(val lastSparkMethod: String, val firstUserFile: String,
+ /**
+ * Execute a command and get its output, throwing an exception if it yields a code other than 0.
+ */
+ def executeAndGetOutput(command: Seq[String], workingDir: File = new File(".")): String = {
+ val process = new ProcessBuilder(command: _*)
+ .directory(workingDir)
+ .start()
+ new Thread("read stderr for " + command(0)) {
+ override def run() {
+ for (line <- Source.fromInputStream(process.getErrorStream).getLines) {
+ System.err.println(line)
+ }
+ }
+ }.start()
+ val output = new StringBuffer
+ val stdoutThread = new Thread("read stdout for " + command(0)) {
+ override def run() {
+ for (line <- Source.fromInputStream(process.getInputStream).getLines) {
+ output.append(line)
+ }
+ }
+ }
+ stdoutThread.start()
+ val exitCode = process.waitFor()
+ stdoutThread.join() // Wait for it to finish reading output
+ if (exitCode != 0) {
+ throw new SparkException("Process " + command + " exited with code " + exitCode)
+ }
+ output.toString
+ }
+
+ private[spark] class CallSiteInfo(val lastSparkMethod: String, val firstUserFile: String,
val firstUserLine: Int, val firstUserClass: String)
/**
* When called inside a class in the spark package, returns the name of the user code class
@@ -610,4 +641,67 @@ private object Utils extends Logging {
}
return false
}
+
+ def isSpace(c: Char): Boolean = {
+ " \t\r\n".indexOf(c) != -1
+ }
+
+ /**
+ * Split a string of potentially quoted arguments from the command line the way that a shell
+ * would do it to determine arguments to a command. For example, if the string is 'a "b c" d',
+ * then it would be parsed as three arguments: 'a', 'b c' and 'd'.
+ */
+ def splitCommandString(s: String): Seq[String] = {
+ val buf = new ArrayBuffer[String]
+ var inWord = false
+ var inSingleQuote = false
+ var inDoubleQuote = false
+ var curWord = new StringBuilder
+ def endWord() {
+ buf += curWord.toString
+ curWord.clear()
+ }
+ var i = 0
+ while (i < s.length) {
+ var nextChar = s.charAt(i)
+ if (inDoubleQuote) {
+ if (nextChar == '"') {
+ inDoubleQuote = false
+ } else if (nextChar == '\\') {
+ if (i < s.length - 1) {
+ // Append the next character directly, because only " and \ may be escaped in
+ // double quotes after the shell's own expansion
+ curWord.append(s.charAt(i + 1))
+ i += 1
+ }
+ } else {
+ curWord.append(nextChar)
+ }
+ } else if (inSingleQuote) {
+ if (nextChar == '\'') {
+ inSingleQuote = false
+ } else {
+ curWord.append(nextChar)
+ }
+ // Backslashes are not treated specially in single quotes
+ } else if (nextChar == '"') {
+ inWord = true
+ inDoubleQuote = true
+ } else if (nextChar == '\'') {
+ inWord = true
+ inSingleQuote = true
+ } else if (!isSpace(nextChar)) {
+ curWord.append(nextChar)
+ inWord = true
+ } else if (inWord && isSpace(nextChar)) {
+ endWord()
+ inWord = false
+ }
+ i += 1
+ }
+ if (inWord || inDoubleQuote || inSingleQuote) {
+ endWord()
+ }
+ return buf
+ }
}
diff --git a/core/src/main/scala/spark/deploy/worker/ExecutorRunner.scala b/core/src/main/scala/spark/deploy/worker/ExecutorRunner.scala
index 04a774658e..d7f58b2cb1 100644
--- a/core/src/main/scala/spark/deploy/worker/ExecutorRunner.scala
+++ b/core/src/main/scala/spark/deploy/worker/ExecutorRunner.scala
@@ -1,6 +1,7 @@
package spark.deploy.worker
import java.io._
+import java.lang.System.getenv
import spark.deploy.{ExecutorState, ExecutorStateChanged, ApplicationDescription}
import akka.actor.ActorRef
import spark.{Utils, Logging}
@@ -40,7 +41,7 @@ private[spark] class ExecutorRunner(
workerThread.start()
// Shutdown hook that kills actors on shutdown.
- shutdownHook = new Thread() {
+ shutdownHook = new Thread() {
override def run() {
if (process != null) {
logInfo("Shutdown hook killing child process.")
@@ -77,9 +78,29 @@ private[spark] class ExecutorRunner(
def buildCommandSeq(): Seq[String] = {
val command = appDesc.command
- val script = if (System.getProperty("os.name").startsWith("Windows")) "run.cmd" else "run"
- val runScript = new File(sparkHome, script).getCanonicalPath
- Seq(runScript, command.mainClass) ++ (command.arguments ++ Seq(appId)).map(substituteVariables)
+ val runner = Option(getenv("JAVA_HOME")).map(_ + "/bin/java").getOrElse("java")
+ // SPARK-698: do not call the run.cmd script, as process.destroy()
+ // fails to kill a process tree on Windows
+ Seq(runner) ++ buildJavaOpts() ++ Seq(command.mainClass) ++
+ command.arguments.map(substituteVariables)
+ }
+
+ /**
+ * Attention: this must always be aligned with the environment variables in the run scripts and
+ * the way the JAVA_OPTS are assembled there.
+ */
+ def buildJavaOpts(): Seq[String] = {
+ val libraryOpts = Option(getenv("SPARK_LIBRARY_PATH"))
+ .map(p => List("-Djava.library.path=" + p))
+ .getOrElse(Nil)
+ val userOpts = Option(getenv("SPARK_JAVA_OPTS")).map(Utils.splitCommandString).getOrElse(Nil)
+ val memoryOpts = Seq("-Xms" + memory + "M", "-Xmx" + memory + "M")
+
+ // Figure out our classpath with the external compute-classpath script
+ val ext = if (System.getProperty("os.name").startsWith("Windows")) ".cmd" else ".sh"
+ val classPath = Utils.executeAndGetOutput(Seq(sparkHome + "/bin/compute-classpath" + ext))
+
+ Seq("-cp", classPath) ++ libraryOpts ++ userOpts ++ memoryOpts
}
/** Spawn a thread that will redirect a given stream to a file */
@@ -115,7 +136,6 @@ private[spark] class ExecutorRunner(
for ((key, value) <- appDesc.command.environment) {
env.put(key, value)
}
- env.put("SPARK_MEM", memory.toString + "m")
// In case we are running this from within the Spark Shell, avoid creating a "scala"
// parent process for the executor command
env.put("SPARK_LAUNCH_WITH_SCALA", "0")
diff --git a/core/src/test/scala/spark/CheckpointSuite.scala b/core/src/test/scala/spark/CheckpointSuite.scala
index ca385972fb..28a7b21b92 100644
--- a/core/src/test/scala/spark/CheckpointSuite.scala
+++ b/core/src/test/scala/spark/CheckpointSuite.scala
@@ -27,6 +27,16 @@ class CheckpointSuite extends FunSuite with LocalSparkContext with Logging {
}
}
+ test("basic checkpointing") {
+ val parCollection = sc.makeRDD(1 to 4)
+ val flatMappedRDD = parCollection.flatMap(x => 1 to x)
+ flatMappedRDD.checkpoint()
+ assert(flatMappedRDD.dependencies.head.rdd == parCollection)
+ val result = flatMappedRDD.collect()
+ assert(flatMappedRDD.dependencies.head.rdd != parCollection)
+ assert(flatMappedRDD.collect() === result)
+ }
+
test("RDDs with one-to-one dependencies") {
testCheckpointing(_.map(x => x.toString))
testCheckpointing(_.flatMap(x => 1 to x))
diff --git a/core/src/test/scala/spark/PairRDDFunctionsSuite.scala b/core/src/test/scala/spark/PairRDDFunctionsSuite.scala
new file mode 100644
index 0000000000..682d2745bf
--- /dev/null
+++ b/core/src/test/scala/spark/PairRDDFunctionsSuite.scala
@@ -0,0 +1,287 @@
+package spark
+
+import scala.collection.mutable.ArrayBuffer
+import scala.collection.mutable.HashSet
+
+import org.scalatest.FunSuite
+import org.scalatest.prop.Checkers
+import org.scalacheck.Arbitrary._
+import org.scalacheck.Gen
+import org.scalacheck.Prop._
+
+import com.google.common.io.Files
+
+import spark.rdd.ShuffledRDD
+import spark.SparkContext._
+
+class PairRDDFunctionsSuite extends FunSuite with SharedSparkContext {
+ test("groupByKey") {
+ val pairs = sc.parallelize(Array((1, 1), (1, 2), (1, 3), (2, 1)))
+ val groups = pairs.groupByKey().collect()
+ assert(groups.size === 2)
+ val valuesFor1 = groups.find(_._1 == 1).get._2
+ assert(valuesFor1.toList.sorted === List(1, 2, 3))
+ val valuesFor2 = groups.find(_._1 == 2).get._2
+ assert(valuesFor2.toList.sorted === List(1))
+ }
+
+ test("groupByKey with duplicates") {
+ val pairs = sc.parallelize(Array((1, 1), (1, 2), (1, 3), (1, 1), (2, 1)))
+ val groups = pairs.groupByKey().collect()
+ assert(groups.size === 2)
+ val valuesFor1 = groups.find(_._1 == 1).get._2
+ assert(valuesFor1.toList.sorted === List(1, 1, 2, 3))
+ val valuesFor2 = groups.find(_._1 == 2).get._2
+ assert(valuesFor2.toList.sorted === List(1))
+ }
+
+ test("groupByKey with negative key hash codes") {
+ val pairs = sc.parallelize(Array((-1, 1), (-1, 2), (-1, 3), (2, 1)))
+ val groups = pairs.groupByKey().collect()
+ assert(groups.size === 2)
+ val valuesForMinus1 = groups.find(_._1 == -1).get._2
+ assert(valuesForMinus1.toList.sorted === List(1, 2, 3))
+ val valuesFor2 = groups.find(_._1 == 2).get._2
+ assert(valuesFor2.toList.sorted === List(1))
+ }
+
+ test("groupByKey with many output partitions") {
+ val pairs = sc.parallelize(Array((1, 1), (1, 2), (1, 3), (2, 1)))
+ val groups = pairs.groupByKey(10).collect()
+ assert(groups.size === 2)
+ val valuesFor1 = groups.find(_._1 == 1).get._2
+ assert(valuesFor1.toList.sorted === List(1, 2, 3))
+ val valuesFor2 = groups.find(_._1 == 2).get._2
+ assert(valuesFor2.toList.sorted === List(1))
+ }
+
+ test("reduceByKey") {
+ val pairs = sc.parallelize(Array((1, 1), (1, 2), (1, 3), (1, 1), (2, 1)))
+ val sums = pairs.reduceByKey(_+_).collect()
+ assert(sums.toSet === Set((1, 7), (2, 1)))
+ }
+
+ test("reduceByKey with collectAsMap") {
+ val pairs = sc.parallelize(Array((1, 1), (1, 2), (1, 3), (1, 1), (2, 1)))
+ val sums = pairs.reduceByKey(_+_).collectAsMap()
+ assert(sums.size === 2)
+ assert(sums(1) === 7)
+ assert(sums(2) === 1)
+ }
+
+ test("reduceByKey with many output partitons") {
+ val pairs = sc.parallelize(Array((1, 1), (1, 2), (1, 3), (1, 1), (2, 1)))
+ val sums = pairs.reduceByKey(_+_, 10).collect()
+ assert(sums.toSet === Set((1, 7), (2, 1)))
+ }
+
+ test("reduceByKey with partitioner") {
+ val p = new Partitioner() {
+ def numPartitions = 2
+ def getPartition(key: Any) = key.asInstanceOf[Int]
+ }
+ val pairs = sc.parallelize(Array((1, 1), (1, 2), (1, 1), (0, 1))).partitionBy(p)
+ val sums = pairs.reduceByKey(_+_)
+ assert(sums.collect().toSet === Set((1, 4), (0, 1)))
+ assert(sums.partitioner === Some(p))
+ // count the dependencies to make sure there is only 1 ShuffledRDD
+ val deps = new HashSet[RDD[_]]()
+ def visit(r: RDD[_]) {
+ for (dep <- r.dependencies) {
+ deps += dep.rdd
+ visit(dep.rdd)
+ }
+ }
+ visit(sums)
+ assert(deps.size === 2) // ShuffledRDD, ParallelCollection
+ }
+
+ test("join") {
+ val rdd1 = sc.parallelize(Array((1, 1), (1, 2), (2, 1), (3, 1)))
+ val rdd2 = sc.parallelize(Array((1, 'x'), (2, 'y'), (2, 'z'), (4, 'w')))
+ val joined = rdd1.join(rdd2).collect()
+ assert(joined.size === 4)
+ assert(joined.toSet === Set(
+ (1, (1, 'x')),
+ (1, (2, 'x')),
+ (2, (1, 'y')),
+ (2, (1, 'z'))
+ ))
+ }
+
+ test("join all-to-all") {
+ val rdd1 = sc.parallelize(Array((1, 1), (1, 2), (1, 3)))
+ val rdd2 = sc.parallelize(Array((1, 'x'), (1, 'y')))
+ val joined = rdd1.join(rdd2).collect()
+ assert(joined.size === 6)
+ assert(joined.toSet === Set(
+ (1, (1, 'x')),
+ (1, (1, 'y')),
+ (1, (2, 'x')),
+ (1, (2, 'y')),
+ (1, (3, 'x')),
+ (1, (3, 'y'))
+ ))
+ }
+
+ test("leftOuterJoin") {
+ val rdd1 = sc.parallelize(Array((1, 1), (1, 2), (2, 1), (3, 1)))
+ val rdd2 = sc.parallelize(Array((1, 'x'), (2, 'y'), (2, 'z'), (4, 'w')))
+ val joined = rdd1.leftOuterJoin(rdd2).collect()
+ assert(joined.size === 5)
+ assert(joined.toSet === Set(
+ (1, (1, Some('x'))),
+ (1, (2, Some('x'))),
+ (2, (1, Some('y'))),
+ (2, (1, Some('z'))),
+ (3, (1, None))
+ ))
+ }
+
+ test("rightOuterJoin") {
+ val rdd1 = sc.parallelize(Array((1, 1), (1, 2), (2, 1), (3, 1)))
+ val rdd2 = sc.parallelize(Array((1, 'x'), (2, 'y'), (2, 'z'), (4, 'w')))
+ val joined = rdd1.rightOuterJoin(rdd2).collect()
+ assert(joined.size === 5)
+ assert(joined.toSet === Set(
+ (1, (Some(1), 'x')),
+ (1, (Some(2), 'x')),
+ (2, (Some(1), 'y')),
+ (2, (Some(1), 'z')),
+ (4, (None, 'w'))
+ ))
+ }
+
+ test("join with no matches") {
+ val rdd1 = sc.parallelize(Array((1, 1), (1, 2), (2, 1), (3, 1)))
+ val rdd2 = sc.parallelize(Array((4, 'x'), (5, 'y'), (5, 'z'), (6, 'w')))
+ val joined = rdd1.join(rdd2).collect()
+ assert(joined.size === 0)
+ }
+
+ test("join with many output partitions") {
+ val rdd1 = sc.parallelize(Array((1, 1), (1, 2), (2, 1), (3, 1)))
+ val rdd2 = sc.parallelize(Array((1, 'x'), (2, 'y'), (2, 'z'), (4, 'w')))
+ val joined = rdd1.join(rdd2, 10).collect()
+ assert(joined.size === 4)
+ assert(joined.toSet === Set(
+ (1, (1, 'x')),
+ (1, (2, 'x')),
+ (2, (1, 'y')),
+ (2, (1, 'z'))
+ ))
+ }
+
+ test("groupWith") {
+ val rdd1 = sc.parallelize(Array((1, 1), (1, 2), (2, 1), (3, 1)))
+ val rdd2 = sc.parallelize(Array((1, 'x'), (2, 'y'), (2, 'z'), (4, 'w')))
+ val joined = rdd1.groupWith(rdd2).collect()
+ assert(joined.size === 4)
+ assert(joined.toSet === Set(
+ (1, (ArrayBuffer(1, 2), ArrayBuffer('x'))),
+ (2, (ArrayBuffer(1), ArrayBuffer('y', 'z'))),
+ (3, (ArrayBuffer(1), ArrayBuffer())),
+ (4, (ArrayBuffer(), ArrayBuffer('w')))
+ ))
+ }
+
+ test("zero-partition RDD") {
+ val emptyDir = Files.createTempDir()
+ val file = sc.textFile(emptyDir.getAbsolutePath)
+ assert(file.partitions.size == 0)
+ assert(file.collect().toList === Nil)
+ // Test that a shuffle on the file works, because this used to be a bug
+ assert(file.map(line => (line, 1)).reduceByKey(_ + _).collect().toList === Nil)
+ }
+
+ test("keys and values") {
+ val rdd = sc.parallelize(Array((1, "a"), (2, "b")))
+ assert(rdd.keys.collect().toList === List(1, 2))
+ assert(rdd.values.collect().toList === List("a", "b"))
+ }
+
+ test("default partitioner uses partition size") {
+ // specify 2000 partitions
+ val a = sc.makeRDD(Array(1, 2, 3, 4), 2000)
+ // do a map, which loses the partitioner
+ val b = a.map(a => (a, (a * 2).toString))
+ // then a group by, and see we didn't revert to 2 partitions
+ val c = b.groupByKey()
+ assert(c.partitions.size === 2000)
+ }
+
+ test("default partitioner uses largest partitioner") {
+ val a = sc.makeRDD(Array((1, "a"), (2, "b")), 2)
+ val b = sc.makeRDD(Array((1, "a"), (2, "b")), 2000)
+ val c = a.join(b)
+ assert(c.partitions.size === 2000)
+ }
+
+ test("subtract") {
+ val a = sc.parallelize(Array(1, 2, 3), 2)
+ val b = sc.parallelize(Array(2, 3, 4), 4)
+ val c = a.subtract(b)
+ assert(c.collect().toSet === Set(1))
+ assert(c.partitions.size === a.partitions.size)
+ }
+
+ test("subtract with narrow dependency") {
+ // use a deterministic partitioner
+ val p = new Partitioner() {
+ def numPartitions = 5
+ def getPartition(key: Any) = key.asInstanceOf[Int]
+ }
+ // partitionBy so we have a narrow dependency
+ val a = sc.parallelize(Array((1, "a"), (2, "b"), (3, "c"))).partitionBy(p)
+ // more partitions/no partitioner so a shuffle dependency
+ val b = sc.parallelize(Array((2, "b"), (3, "cc"), (4, "d")), 4)
+ val c = a.subtract(b)
+ assert(c.collect().toSet === Set((1, "a"), (3, "c")))
+ // Ideally we could keep the original partitioner...
+ assert(c.partitioner === None)
+ }
+
+ test("subtractByKey") {
+ val a = sc.parallelize(Array((1, "a"), (1, "a"), (2, "b"), (3, "c")), 2)
+ val b = sc.parallelize(Array((2, 20), (3, 30), (4, 40)), 4)
+ val c = a.subtractByKey(b)
+ assert(c.collect().toSet === Set((1, "a"), (1, "a")))
+ assert(c.partitions.size === a.partitions.size)
+ }
+
+ test("subtractByKey with narrow dependency") {
+ // use a deterministic partitioner
+ val p = new Partitioner() {
+ def numPartitions = 5
+ def getPartition(key: Any) = key.asInstanceOf[Int]
+ }
+ // partitionBy so we have a narrow dependency
+ val a = sc.parallelize(Array((1, "a"), (1, "a"), (2, "b"), (3, "c"))).partitionBy(p)
+ // more partitions/no partitioner so a shuffle dependency
+ val b = sc.parallelize(Array((2, "b"), (3, "cc"), (4, "d")), 4)
+ val c = a.subtractByKey(b)
+ assert(c.collect().toSet === Set((1, "a"), (1, "a")))
+ assert(c.partitioner.get === p)
+ }
+
+ test("foldByKey") {
+ val pairs = sc.parallelize(Array((1, 1), (1, 2), (1, 3), (1, 1), (2, 1)))
+ val sums = pairs.foldByKey(0)(_+_).collect()
+ assert(sums.toSet === Set((1, 7), (2, 1)))
+ }
+
+ test("foldByKey with mutable result type") {
+ val pairs = sc.parallelize(Array((1, 1), (1, 2), (1, 3), (1, 1), (2, 1)))
+ val bufs = pairs.mapValues(v => ArrayBuffer(v)).cache()
+ // Fold the values using in-place mutation
+ val sums = bufs.foldByKey(new ArrayBuffer[Int])(_ ++= _).collect()
+ assert(sums.toSet === Set((1, ArrayBuffer(1, 2, 3, 1)), (2, ArrayBuffer(1))))
+ // Check that the mutable objects in the original RDD were not changed
+ assert(bufs.collect().toSet === Set(
+ (1, ArrayBuffer(1)),
+ (1, ArrayBuffer(2)),
+ (1, ArrayBuffer(3)),
+ (1, ArrayBuffer(1)),
+ (2, ArrayBuffer(1))))
+ }
+}
diff --git a/core/src/test/scala/spark/PartitioningSuite.scala b/core/src/test/scala/spark/PartitioningSuite.scala
index 16f93e71a3..99e433e3bd 100644
--- a/core/src/test/scala/spark/PartitioningSuite.scala
+++ b/core/src/test/scala/spark/PartitioningSuite.scala
@@ -6,8 +6,8 @@ import SparkContext._
import spark.util.StatCounter
import scala.math.abs
-class PartitioningSuite extends FunSuite with LocalSparkContext {
-
+class PartitioningSuite extends FunSuite with SharedSparkContext {
+
test("HashPartitioner equality") {
val p2 = new HashPartitioner(2)
val p4 = new HashPartitioner(4)
@@ -21,8 +21,6 @@ class PartitioningSuite extends FunSuite with LocalSparkContext {
}
test("RangePartitioner equality") {
- sc = new SparkContext("local", "test")
-
// Make an RDD where all the elements are the same so that the partition range bounds
// are deterministically all the same.
val rdd = sc.parallelize(Seq(1, 1, 1, 1)).map(x => (x, x))
@@ -50,7 +48,6 @@ class PartitioningSuite extends FunSuite with LocalSparkContext {
}
test("HashPartitioner not equal to RangePartitioner") {
- sc = new SparkContext("local", "test")
val rdd = sc.parallelize(1 to 10).map(x => (x, x))
val rangeP2 = new RangePartitioner(2, rdd)
val hashP2 = new HashPartitioner(2)
@@ -61,8 +58,6 @@ class PartitioningSuite extends FunSuite with LocalSparkContext {
}
test("partitioner preservation") {
- sc = new SparkContext("local", "test")
-
val rdd = sc.parallelize(1 to 10, 4).map(x => (x, x))
val grouped2 = rdd.groupByKey(2)
@@ -101,7 +96,6 @@ class PartitioningSuite extends FunSuite with LocalSparkContext {
}
test("partitioning Java arrays should fail") {
- sc = new SparkContext("local", "test")
val arrs: RDD[Array[Int]] = sc.parallelize(Array(1, 2, 3, 4), 2).map(x => Array(x))
val arrPairs: RDD[(Array[Int], Int)] =
sc.parallelize(Array(1, 2, 3, 4), 2).map(x => (Array(x), x))
@@ -120,21 +114,20 @@ class PartitioningSuite extends FunSuite with LocalSparkContext {
assert(intercept[SparkException]{ arrPairs.reduceByKeyLocally(_ + _) }.getMessage.contains("array"))
assert(intercept[SparkException]{ arrPairs.reduceByKey(_ + _) }.getMessage.contains("array"))
}
-
- test("Zero-length partitions should be correctly handled") {
+
+ test("zero-length partitions should be correctly handled") {
// Create RDD with some consecutive empty partitions (including the "first" one)
- sc = new SparkContext("local", "test")
val rdd: RDD[Double] = sc
.parallelize(Array(-1.0, -1.0, -1.0, -1.0, 2.0, 4.0, -1.0, -1.0), 8)
.filter(_ >= 0.0)
-
+
// Run the partitions, including the consecutive empty ones, through StatCounter
val stats: StatCounter = rdd.stats();
assert(abs(6.0 - stats.sum) < 0.01);
assert(abs(6.0/2 - rdd.mean) < 0.01);
assert(abs(1.0 - rdd.variance) < 0.01);
assert(abs(1.0 - rdd.stdev) < 0.01);
-
+
// Add other tests here for classes that should be able to handle empty partitions correctly
}
}
diff --git a/core/src/test/scala/spark/PipedRDDSuite.scala b/core/src/test/scala/spark/PipedRDDSuite.scala
index ed075f93ec..1c9ca50811 100644
--- a/core/src/test/scala/spark/PipedRDDSuite.scala
+++ b/core/src/test/scala/spark/PipedRDDSuite.scala
@@ -3,10 +3,9 @@ package spark
import org.scalatest.FunSuite
import SparkContext._
-class PipedRDDSuite extends FunSuite with LocalSparkContext {
-
+class PipedRDDSuite extends FunSuite with SharedSparkContext {
+
test("basic pipe") {
- sc = new SparkContext("local", "test")
val nums = sc.makeRDD(Array(1, 2, 3, 4), 2)
val piped = nums.pipe(Seq("cat"))
@@ -20,12 +19,11 @@ class PipedRDDSuite extends FunSuite with LocalSparkContext {
}
test("advanced pipe") {
- sc = new SparkContext("local", "test")
val nums = sc.makeRDD(Array(1, 2, 3, 4), 2)
val bl = sc.broadcast(List("0"))
- val piped = nums.pipe(Seq("cat"),
- Map[String, String](),
+ val piped = nums.pipe(Seq("cat"),
+ Map[String, String](),
(f: String => Unit) => {bl.value.map(f(_));f("\u0001")},
(i:Int, f: String=> Unit) => f(i + "_"))
@@ -43,8 +41,8 @@ class PipedRDDSuite extends FunSuite with LocalSparkContext {
val nums1 = sc.makeRDD(Array("a\t1", "b\t2", "a\t3", "b\t4"), 2)
val d = nums1.groupBy(str=>str.split("\t")(0)).
- pipe(Seq("cat"),
- Map[String, String](),
+ pipe(Seq("cat"),
+ Map[String, String](),
(f: String => Unit) => {bl.value.map(f(_));f("\u0001")},
(i:Tuple2[String, Seq[String]], f: String=> Unit) => {for (e <- i._2){ f(e + "_")}}).collect()
assert(d.size === 8)
@@ -59,7 +57,6 @@ class PipedRDDSuite extends FunSuite with LocalSparkContext {
}
test("pipe with env variable") {
- sc = new SparkContext("local", "test")
val nums = sc.makeRDD(Array(1, 2, 3, 4), 2)
val piped = nums.pipe(Seq("printenv", "MY_TEST_ENV"), Map("MY_TEST_ENV" -> "LALALA"))
val c = piped.collect()
@@ -69,7 +66,6 @@ class PipedRDDSuite extends FunSuite with LocalSparkContext {
}
test("pipe with non-zero exit status") {
- sc = new SparkContext("local", "test")
val nums = sc.makeRDD(Array(1, 2, 3, 4), 2)
val piped = nums.pipe("cat nonexistent_file")
intercept[SparkException] {
diff --git a/core/src/test/scala/spark/RDDSuite.scala b/core/src/test/scala/spark/RDDSuite.scala
index 67f3332d44..d8db69b1c9 100644
--- a/core/src/test/scala/spark/RDDSuite.scala
+++ b/core/src/test/scala/spark/RDDSuite.scala
@@ -7,10 +7,9 @@ import org.scalatest.time.{Span, Millis}
import spark.SparkContext._
import spark.rdd.{CoalescedRDD, CoGroupedRDD, EmptyRDD, PartitionPruningRDD, ShuffledRDD}
-class RDDSuite extends FunSuite with LocalSparkContext {
+class RDDSuite extends FunSuite with SharedSparkContext {
test("basic operations") {
- sc = new SparkContext("local", "test")
val nums = sc.makeRDD(Array(1, 2, 3, 4), 2)
assert(nums.collect().toList === List(1, 2, 3, 4))
val dups = sc.makeRDD(Array(1, 1, 2, 2, 3, 3, 4, 4), 2)
@@ -46,7 +45,6 @@ class RDDSuite extends FunSuite with LocalSparkContext {
}
test("SparkContext.union") {
- sc = new SparkContext("local", "test")
val nums = sc.makeRDD(Array(1, 2, 3, 4), 2)
assert(sc.union(nums).collect().toList === List(1, 2, 3, 4))
assert(sc.union(nums, nums).collect().toList === List(1, 2, 3, 4, 1, 2, 3, 4))
@@ -55,7 +53,6 @@ class RDDSuite extends FunSuite with LocalSparkContext {
}
test("aggregate") {
- sc = new SparkContext("local", "test")
val pairs = sc.makeRDD(Array(("a", 1), ("b", 2), ("a", 2), ("c", 5), ("a", 3)))
type StringMap = HashMap[String, Int]
val emptyMap = new StringMap {
@@ -75,57 +72,14 @@ class RDDSuite extends FunSuite with LocalSparkContext {
assert(result.toSet === Set(("a", 6), ("b", 2), ("c", 5)))
}
- test("basic checkpointing") {
- import java.io.File
- val checkpointDir = File.createTempFile("temp", "")
- checkpointDir.delete()
-
- sc = new SparkContext("local", "test")
- sc.setCheckpointDir(checkpointDir.toString)
- val parCollection = sc.makeRDD(1 to 4)
- val flatMappedRDD = parCollection.flatMap(x => 1 to x)
- flatMappedRDD.checkpoint()
- assert(flatMappedRDD.dependencies.head.rdd == parCollection)
- val result = flatMappedRDD.collect()
- Thread.sleep(1000)
- assert(flatMappedRDD.dependencies.head.rdd != parCollection)
- assert(flatMappedRDD.collect() === result)
-
- checkpointDir.deleteOnExit()
- }
-
test("basic caching") {
- sc = new SparkContext("local", "test")
val rdd = sc.makeRDD(Array(1, 2, 3, 4), 2).cache()
assert(rdd.collect().toList === List(1, 2, 3, 4))
assert(rdd.collect().toList === List(1, 2, 3, 4))
assert(rdd.collect().toList === List(1, 2, 3, 4))
}
- test("unpersist RDD") {
- sc = new SparkContext("local", "test")
- val rdd = sc.makeRDD(Array(1, 2, 3, 4), 2).cache()
- rdd.count
- assert(sc.persistentRdds.isEmpty === false)
- rdd.unpersist()
- assert(sc.persistentRdds.isEmpty === true)
-
- failAfter(Span(3000, Millis)) {
- try {
- while (! sc.getRDDStorageInfo.isEmpty) {
- Thread.sleep(200)
- }
- } catch {
- case _ => { Thread.sleep(10) }
- // Do nothing. We might see exceptions because block manager
- // is racing this thread to remove entries from the driver.
- }
- }
- assert(sc.getRDDStorageInfo.isEmpty === true)
- }
-
test("caching with failures") {
- sc = new SparkContext("local", "test")
val onlySplit = new Partition { override def index: Int = 0 }
var shouldFail = true
val rdd = new RDD[Int](sc, Nil) {
@@ -148,7 +102,6 @@ class RDDSuite extends FunSuite with LocalSparkContext {
}
test("empty RDD") {
- sc = new SparkContext("local", "test")
val empty = new EmptyRDD[Int](sc)
assert(empty.count === 0)
assert(empty.collect().size === 0)
@@ -168,37 +121,6 @@ class RDDSuite extends FunSuite with LocalSparkContext {
}
test("cogrouped RDDs") {
- sc = new SparkContext("local", "test")
- val rdd1 = sc.makeRDD(Array((1, "one"), (1, "another one"), (2, "two"), (3, "three")), 2)
- val rdd2 = sc.makeRDD(Array((1, "one1"), (1, "another one1"), (2, "two1")), 2)
-
- // Use cogroup function
- val cogrouped = rdd1.cogroup(rdd2).collectAsMap()
- assert(cogrouped(1) === (Seq("one", "another one"), Seq("one1", "another one1")))
- assert(cogrouped(2) === (Seq("two"), Seq("two1")))
- assert(cogrouped(3) === (Seq("three"), Seq()))
-
- // Construct CoGroupedRDD directly, with map side combine enabled
- val cogrouped1 = new CoGroupedRDD[Int](
- Seq(rdd1.asInstanceOf[RDD[(Int, Any)]], rdd2.asInstanceOf[RDD[(Int, Any)]]),
- new HashPartitioner(3),
- true).collectAsMap()
- assert(cogrouped1(1).toSeq === Seq(Seq("one", "another one"), Seq("one1", "another one1")))
- assert(cogrouped1(2).toSeq === Seq(Seq("two"), Seq("two1")))
- assert(cogrouped1(3).toSeq === Seq(Seq("three"), Seq()))
-
- // Construct CoGroupedRDD directly, with map side combine disabled
- val cogrouped2 = new CoGroupedRDD[Int](
- Seq(rdd1.asInstanceOf[RDD[(Int, Any)]], rdd2.asInstanceOf[RDD[(Int, Any)]]),
- new HashPartitioner(3),
- false).collectAsMap()
- assert(cogrouped2(1).toSeq === Seq(Seq("one", "another one"), Seq("one1", "another one1")))
- assert(cogrouped2(2).toSeq === Seq(Seq("two"), Seq("two1")))
- assert(cogrouped2(3).toSeq === Seq(Seq("three"), Seq()))
- }
-
- test("coalesced RDDs") {
- sc = new SparkContext("local", "test")
val data = sc.parallelize(1 to 10, 10)
val coalesced1 = data.coalesce(2)
@@ -236,7 +158,6 @@ class RDDSuite extends FunSuite with LocalSparkContext {
}
test("zipped RDDs") {
- sc = new SparkContext("local", "test")
val nums = sc.makeRDD(Array(1, 2, 3, 4), 2)
val zipped = nums.zip(nums.map(_ + 1.0))
assert(zipped.glom().map(_.toList).collect().toList ===
@@ -248,7 +169,6 @@ class RDDSuite extends FunSuite with LocalSparkContext {
}
test("partition pruning") {
- sc = new SparkContext("local", "test")
val data = sc.parallelize(1 to 10, 10)
// Note that split number starts from 0, so > 8 means only 10th partition left.
val prunedRdd = new PartitionPruningRDD(data, splitNum => splitNum > 8)
@@ -260,7 +180,6 @@ class RDDSuite extends FunSuite with LocalSparkContext {
test("mapWith") {
import java.util.Random
- sc = new SparkContext("local", "test")
val ones = sc.makeRDD(Array(1, 1, 1, 1, 1, 1), 2)
val randoms = ones.mapWith(
(index: Int) => new Random(index + 42))
@@ -279,7 +198,6 @@ class RDDSuite extends FunSuite with LocalSparkContext {
test("flatMapWith") {
import java.util.Random
- sc = new SparkContext("local", "test")
val ones = sc.makeRDD(Array(1, 1, 1, 1, 1, 1), 2)
val randoms = ones.flatMapWith(
(index: Int) => new Random(index + 42))
@@ -301,7 +219,6 @@ class RDDSuite extends FunSuite with LocalSparkContext {
test("filterWith") {
import java.util.Random
- sc = new SparkContext("local", "test")
val ints = sc.makeRDD(Array(1, 2, 3, 4, 5, 6), 2)
val sample = ints.filterWith(
(index: Int) => new Random(index + 42))
@@ -319,7 +236,6 @@ class RDDSuite extends FunSuite with LocalSparkContext {
}
test("top with predefined ordering") {
- sc = new SparkContext("local", "test")
val nums = Array.range(1, 100000)
val ints = sc.makeRDD(scala.util.Random.shuffle(nums), 2)
val topK = ints.top(5)
@@ -328,7 +244,6 @@ class RDDSuite extends FunSuite with LocalSparkContext {
}
test("top with custom ordering") {
- sc = new SparkContext("local", "test")
val words = Vector("a", "b", "c", "d")
implicit val ord = implicitly[Ordering[String]].reverse
val rdd = sc.makeRDD(words, 2)
diff --git a/core/src/test/scala/spark/SharedSparkContext.scala b/core/src/test/scala/spark/SharedSparkContext.scala
new file mode 100644
index 0000000000..1da79f9824
--- /dev/null
+++ b/core/src/test/scala/spark/SharedSparkContext.scala
@@ -0,0 +1,25 @@
+package spark
+
+import org.scalatest.Suite
+import org.scalatest.BeforeAndAfterAll
+
+/** Shares a local `SparkContext` between all tests in a suite and closes it at the end */
+trait SharedSparkContext extends BeforeAndAfterAll { self: Suite =>
+
+ @transient private var _sc: SparkContext = _
+
+ def sc: SparkContext = _sc
+
+ override def beforeAll() {
+ _sc = new SparkContext("local", "test")
+ super.beforeAll()
+ }
+
+ override def afterAll() {
+ if (_sc != null) {
+ LocalSparkContext.stop(_sc)
+ _sc = null
+ }
+ super.afterAll()
+ }
+}
diff --git a/core/src/test/scala/spark/ShuffleSuite.scala b/core/src/test/scala/spark/ShuffleSuite.scala
index 0c1ec29f96..950218fa28 100644
--- a/core/src/test/scala/spark/ShuffleSuite.scala
+++ b/core/src/test/scala/spark/ShuffleSuite.scala
@@ -16,54 +16,9 @@ import spark.rdd.ShuffledRDD
import spark.SparkContext._
class ShuffleSuite extends FunSuite with ShouldMatchers with LocalSparkContext {
-
- test("groupByKey") {
- sc = new SparkContext("local", "test")
- val pairs = sc.parallelize(Array((1, 1), (1, 2), (1, 3), (2, 1)))
- val groups = pairs.groupByKey().collect()
- assert(groups.size === 2)
- val valuesFor1 = groups.find(_._1 == 1).get._2
- assert(valuesFor1.toList.sorted === List(1, 2, 3))
- val valuesFor2 = groups.find(_._1 == 2).get._2
- assert(valuesFor2.toList.sorted === List(1))
- }
-
- test("groupByKey with duplicates") {
- sc = new SparkContext("local", "test")
- val pairs = sc.parallelize(Array((1, 1), (1, 2), (1, 3), (1, 1), (2, 1)))
- val groups = pairs.groupByKey().collect()
- assert(groups.size === 2)
- val valuesFor1 = groups.find(_._1 == 1).get._2
- assert(valuesFor1.toList.sorted === List(1, 1, 2, 3))
- val valuesFor2 = groups.find(_._1 == 2).get._2
- assert(valuesFor2.toList.sorted === List(1))
- }
-
- test("groupByKey with negative key hash codes") {
- sc = new SparkContext("local", "test")
- val pairs = sc.parallelize(Array((-1, 1), (-1, 2), (-1, 3), (2, 1)))
- val groups = pairs.groupByKey().collect()
- assert(groups.size === 2)
- val valuesForMinus1 = groups.find(_._1 == -1).get._2
- assert(valuesForMinus1.toList.sorted === List(1, 2, 3))
- val valuesFor2 = groups.find(_._1 == 2).get._2
- assert(valuesFor2.toList.sorted === List(1))
- }
-
- test("groupByKey with many output partitions") {
- sc = new SparkContext("local", "test")
- val pairs = sc.parallelize(Array((1, 1), (1, 2), (1, 3), (2, 1)))
- val groups = pairs.groupByKey(10).collect()
- assert(groups.size === 2)
- val valuesFor1 = groups.find(_._1 == 1).get._2
- assert(valuesFor1.toList.sorted === List(1, 2, 3))
- val valuesFor2 = groups.find(_._1 == 2).get._2
- assert(valuesFor2.toList.sorted === List(1))
- }
-
test("groupByKey with compression") {
try {
- System.setProperty("spark.blockManager.compress", "true")
+ System.setProperty("spark.shuffle.compress", "true")
sc = new SparkContext("local", "test")
val pairs = sc.parallelize(Array((1, 1), (1, 2), (1, 3), (2, 1)), 4)
val groups = pairs.groupByKey(4).collect()
@@ -77,234 +32,6 @@ class ShuffleSuite extends FunSuite with ShouldMatchers with LocalSparkContext {
}
}
- test("reduceByKey") {
- sc = new SparkContext("local", "test")
- val pairs = sc.parallelize(Array((1, 1), (1, 2), (1, 3), (1, 1), (2, 1)))
- val sums = pairs.reduceByKey(_+_).collect()
- assert(sums.toSet === Set((1, 7), (2, 1)))
- }
-
- test("reduceByKey with collectAsMap") {
- sc = new SparkContext("local", "test")
- val pairs = sc.parallelize(Array((1, 1), (1, 2), (1, 3), (1, 1), (2, 1)))
- val sums = pairs.reduceByKey(_+_).collectAsMap()
- assert(sums.size === 2)
- assert(sums(1) === 7)
- assert(sums(2) === 1)
- }
-
- test("reduceByKey with many output partitons") {
- sc = new SparkContext("local", "test")
- val pairs = sc.parallelize(Array((1, 1), (1, 2), (1, 3), (1, 1), (2, 1)))
- val sums = pairs.reduceByKey(_+_, 10).collect()
- assert(sums.toSet === Set((1, 7), (2, 1)))
- }
-
- test("reduceByKey with partitioner") {
- sc = new SparkContext("local", "test")
- val p = new Partitioner() {
- def numPartitions = 2
- def getPartition(key: Any) = key.asInstanceOf[Int]
- }
- val pairs = sc.parallelize(Array((1, 1), (1, 2), (1, 1), (0, 1))).partitionBy(p)
- val sums = pairs.reduceByKey(_+_)
- assert(sums.collect().toSet === Set((1, 4), (0, 1)))
- assert(sums.partitioner === Some(p))
- // count the dependencies to make sure there is only 1 ShuffledRDD
- val deps = new HashSet[RDD[_]]()
- def visit(r: RDD[_]) {
- for (dep <- r.dependencies) {
- deps += dep.rdd
- visit(dep.rdd)
- }
- }
- visit(sums)
- assert(deps.size === 2) // ShuffledRDD, ParallelCollection
- }
-
- test("join") {
- sc = new SparkContext("local", "test")
- val rdd1 = sc.parallelize(Array((1, 1), (1, 2), (2, 1), (3, 1)))
- val rdd2 = sc.parallelize(Array((1, 'x'), (2, 'y'), (2, 'z'), (4, 'w')))
- val joined = rdd1.join(rdd2).collect()
- assert(joined.size === 4)
- assert(joined.toSet === Set(
- (1, (1, 'x')),
- (1, (2, 'x')),
- (2, (1, 'y')),
- (2, (1, 'z'))
- ))
- }
-
- test("join all-to-all") {
- sc = new SparkContext("local", "test")
- val rdd1 = sc.parallelize(Array((1, 1), (1, 2), (1, 3)))
- val rdd2 = sc.parallelize(Array((1, 'x'), (1, 'y')))
- val joined = rdd1.join(rdd2).collect()
- assert(joined.size === 6)
- assert(joined.toSet === Set(
- (1, (1, 'x')),
- (1, (1, 'y')),
- (1, (2, 'x')),
- (1, (2, 'y')),
- (1, (3, 'x')),
- (1, (3, 'y'))
- ))
- }
-
- test("leftOuterJoin") {
- sc = new SparkContext("local", "test")
- val rdd1 = sc.parallelize(Array((1, 1), (1, 2), (2, 1), (3, 1)))
- val rdd2 = sc.parallelize(Array((1, 'x'), (2, 'y'), (2, 'z'), (4, 'w')))
- val joined = rdd1.leftOuterJoin(rdd2).collect()
- assert(joined.size === 5)
- assert(joined.toSet === Set(
- (1, (1, Some('x'))),
- (1, (2, Some('x'))),
- (2, (1, Some('y'))),
- (2, (1, Some('z'))),
- (3, (1, None))
- ))
- }
-
- test("rightOuterJoin") {
- sc = new SparkContext("local", "test")
- val rdd1 = sc.parallelize(Array((1, 1), (1, 2), (2, 1), (3, 1)))
- val rdd2 = sc.parallelize(Array((1, 'x'), (2, 'y'), (2, 'z'), (4, 'w')))
- val joined = rdd1.rightOuterJoin(rdd2).collect()
- assert(joined.size === 5)
- assert(joined.toSet === Set(
- (1, (Some(1), 'x')),
- (1, (Some(2), 'x')),
- (2, (Some(1), 'y')),
- (2, (Some(1), 'z')),
- (4, (None, 'w'))
- ))
- }
-
- test("join with no matches") {
- sc = new SparkContext("local", "test")
- val rdd1 = sc.parallelize(Array((1, 1), (1, 2), (2, 1), (3, 1)))
- val rdd2 = sc.parallelize(Array((4, 'x'), (5, 'y'), (5, 'z'), (6, 'w')))
- val joined = rdd1.join(rdd2).collect()
- assert(joined.size === 0)
- }
-
- test("join with many output partitions") {
- sc = new SparkContext("local", "test")
- val rdd1 = sc.parallelize(Array((1, 1), (1, 2), (2, 1), (3, 1)))
- val rdd2 = sc.parallelize(Array((1, 'x'), (2, 'y'), (2, 'z'), (4, 'w')))
- val joined = rdd1.join(rdd2, 10).collect()
- assert(joined.size === 4)
- assert(joined.toSet === Set(
- (1, (1, 'x')),
- (1, (2, 'x')),
- (2, (1, 'y')),
- (2, (1, 'z'))
- ))
- }
-
- test("groupWith") {
- sc = new SparkContext("local", "test")
- val rdd1 = sc.parallelize(Array((1, 1), (1, 2), (2, 1), (3, 1)))
- val rdd2 = sc.parallelize(Array((1, 'x'), (2, 'y'), (2, 'z'), (4, 'w')))
- val joined = rdd1.groupWith(rdd2).collect()
- assert(joined.size === 4)
- assert(joined.toSet === Set(
- (1, (ArrayBuffer(1, 2), ArrayBuffer('x'))),
- (2, (ArrayBuffer(1), ArrayBuffer('y', 'z'))),
- (3, (ArrayBuffer(1), ArrayBuffer())),
- (4, (ArrayBuffer(), ArrayBuffer('w')))
- ))
- }
-
- test("zero-partition RDD") {
- sc = new SparkContext("local", "test")
- val emptyDir = Files.createTempDir()
- val file = sc.textFile(emptyDir.getAbsolutePath)
- assert(file.partitions.size == 0)
- assert(file.collect().toList === Nil)
- // Test that a shuffle on the file works, because this used to be a bug
- assert(file.map(line => (line, 1)).reduceByKey(_ + _).collect().toList === Nil)
- }
-
- test("keys and values") {
- sc = new SparkContext("local", "test")
- val rdd = sc.parallelize(Array((1, "a"), (2, "b")))
- assert(rdd.keys.collect().toList === List(1, 2))
- assert(rdd.values.collect().toList === List("a", "b"))
- }
-
- test("default partitioner uses partition size") {
- sc = new SparkContext("local", "test")
- // specify 2000 partitions
- val a = sc.makeRDD(Array(1, 2, 3, 4), 2000)
- // do a map, which loses the partitioner
- val b = a.map(a => (a, (a * 2).toString))
- // then a group by, and see we didn't revert to 2 partitions
- val c = b.groupByKey()
- assert(c.partitions.size === 2000)
- }
-
- test("default partitioner uses largest partitioner") {
- sc = new SparkContext("local", "test")
- val a = sc.makeRDD(Array((1, "a"), (2, "b")), 2)
- val b = sc.makeRDD(Array((1, "a"), (2, "b")), 2000)
- val c = a.join(b)
- assert(c.partitions.size === 2000)
- }
-
- test("subtract") {
- sc = new SparkContext("local", "test")
- val a = sc.parallelize(Array(1, 2, 3), 2)
- val b = sc.parallelize(Array(2, 3, 4), 4)
- val c = a.subtract(b)
- assert(c.collect().toSet === Set(1))
- assert(c.partitions.size === a.partitions.size)
- }
-
- test("subtract with narrow dependency") {
- sc = new SparkContext("local", "test")
- // use a deterministic partitioner
- val p = new Partitioner() {
- def numPartitions = 5
- def getPartition(key: Any) = key.asInstanceOf[Int]
- }
- // partitionBy so we have a narrow dependency
- val a = sc.parallelize(Array((1, "a"), (2, "b"), (3, "c"))).partitionBy(p)
- // more partitions/no partitioner so a shuffle dependency
- val b = sc.parallelize(Array((2, "b"), (3, "cc"), (4, "d")), 4)
- val c = a.subtract(b)
- assert(c.collect().toSet === Set((1, "a"), (3, "c")))
- // Ideally we could keep the original partitioner...
- assert(c.partitioner === None)
- }
-
- test("subtractByKey") {
- sc = new SparkContext("local", "test")
- val a = sc.parallelize(Array((1, "a"), (1, "a"), (2, "b"), (3, "c")), 2)
- val b = sc.parallelize(Array((2, 20), (3, 30), (4, 40)), 4)
- val c = a.subtractByKey(b)
- assert(c.collect().toSet === Set((1, "a"), (1, "a")))
- assert(c.partitions.size === a.partitions.size)
- }
-
- test("subtractByKey with narrow dependency") {
- sc = new SparkContext("local", "test")
- // use a deterministic partitioner
- val p = new Partitioner() {
- def numPartitions = 5
- def getPartition(key: Any) = key.asInstanceOf[Int]
- }
- // partitionBy so we have a narrow dependency
- val a = sc.parallelize(Array((1, "a"), (1, "a"), (2, "b"), (3, "c"))).partitionBy(p)
- // more partitions/no partitioner so a shuffle dependency
- val b = sc.parallelize(Array((2, "b"), (3, "cc"), (4, "d")), 4)
- val c = a.subtractByKey(b)
- assert(c.collect().toSet === Set((1, "a"), (1, "a")))
- assert(c.partitioner.get === p)
- }
-
test("shuffle non-zero block size") {
sc = new SparkContext("local-cluster[2,1,512]", "test")
val NUM_BLOCKS = 3
@@ -391,29 +118,6 @@ class ShuffleSuite extends FunSuite with ShouldMatchers with LocalSparkContext {
// We should have at most 4 non-zero sized partitions
assert(nonEmptyBlocks.size <= 4)
}
-
- test("foldByKey") {
- sc = new SparkContext("local", "test")
- val pairs = sc.parallelize(Array((1, 1), (1, 2), (1, 3), (1, 1), (2, 1)))
- val sums = pairs.foldByKey(0)(_+_).collect()
- assert(sums.toSet === Set((1, 7), (2, 1)))
- }
-
- test("foldByKey with mutable result type") {
- sc = new SparkContext("local", "test")
- val pairs = sc.parallelize(Array((1, 1), (1, 2), (1, 3), (1, 1), (2, 1)))
- val bufs = pairs.mapValues(v => ArrayBuffer(v)).cache()
- // Fold the values using in-place mutation
- val sums = bufs.foldByKey(new ArrayBuffer[Int])(_ ++= _).collect()
- assert(sums.toSet === Set((1, ArrayBuffer(1, 2, 3, 1)), (2, ArrayBuffer(1))))
- // Check that the mutable objects in the original RDD were not changed
- assert(bufs.collect().toSet === Set(
- (1, ArrayBuffer(1)),
- (1, ArrayBuffer(2)),
- (1, ArrayBuffer(3)),
- (1, ArrayBuffer(1)),
- (2, ArrayBuffer(1))))
- }
}
object ShuffleSuite {
diff --git a/core/src/test/scala/spark/SizeEstimatorSuite.scala b/core/src/test/scala/spark/SizeEstimatorSuite.scala
index e235ef2f67..b5c8525f91 100644
--- a/core/src/test/scala/spark/SizeEstimatorSuite.scala
+++ b/core/src/test/scala/spark/SizeEstimatorSuite.scala
@@ -35,7 +35,7 @@ class SizeEstimatorSuite
var oldOops: String = _
override def beforeAll() {
- // Set the arch to 64-bit and compressedOops to true to get a deterministic test-case
+ // Set the arch to 64-bit and compressedOops to true to get a deterministic test-case
oldArch = System.setProperty("os.arch", "amd64")
oldOops = System.setProperty("spark.test.useCompressedOops", "true")
}
@@ -46,54 +46,54 @@ class SizeEstimatorSuite
}
test("simple classes") {
- expect(16)(SizeEstimator.estimate(new DummyClass1))
- expect(16)(SizeEstimator.estimate(new DummyClass2))
- expect(24)(SizeEstimator.estimate(new DummyClass3))
- expect(24)(SizeEstimator.estimate(new DummyClass4(null)))
- expect(48)(SizeEstimator.estimate(new DummyClass4(new DummyClass3)))
+ assert(SizeEstimator.estimate(new DummyClass1) === 16)
+ assert(SizeEstimator.estimate(new DummyClass2) === 16)
+ assert(SizeEstimator.estimate(new DummyClass3) === 24)
+ assert(SizeEstimator.estimate(new DummyClass4(null)) === 24)
+ assert(SizeEstimator.estimate(new DummyClass4(new DummyClass3)) === 48)
}
// NOTE: The String class definition varies across JDK versions (1.6 vs. 1.7) and vendors
// (Sun vs IBM). Use a DummyString class to make tests deterministic.
test("strings") {
- expect(40)(SizeEstimator.estimate(DummyString("")))
- expect(48)(SizeEstimator.estimate(DummyString("a")))
- expect(48)(SizeEstimator.estimate(DummyString("ab")))
- expect(56)(SizeEstimator.estimate(DummyString("abcdefgh")))
+ assert(SizeEstimator.estimate(DummyString("")) === 40)
+ assert(SizeEstimator.estimate(DummyString("a")) === 48)
+ assert(SizeEstimator.estimate(DummyString("ab")) === 48)
+ assert(SizeEstimator.estimate(DummyString("abcdefgh")) === 56)
}
test("primitive arrays") {
- expect(32)(SizeEstimator.estimate(new Array[Byte](10)))
- expect(40)(SizeEstimator.estimate(new Array[Char](10)))
- expect(40)(SizeEstimator.estimate(new Array[Short](10)))
- expect(56)(SizeEstimator.estimate(new Array[Int](10)))
- expect(96)(SizeEstimator.estimate(new Array[Long](10)))
- expect(56)(SizeEstimator.estimate(new Array[Float](10)))
- expect(96)(SizeEstimator.estimate(new Array[Double](10)))
- expect(4016)(SizeEstimator.estimate(new Array[Int](1000)))
- expect(8016)(SizeEstimator.estimate(new Array[Long](1000)))
+ assert(SizeEstimator.estimate(new Array[Byte](10)) === 32)
+ assert(SizeEstimator.estimate(new Array[Char](10)) === 40)
+ assert(SizeEstimator.estimate(new Array[Short](10)) === 40)
+ assert(SizeEstimator.estimate(new Array[Int](10)) === 56)
+ assert(SizeEstimator.estimate(new Array[Long](10)) === 96)
+ assert(SizeEstimator.estimate(new Array[Float](10)) === 56)
+ assert(SizeEstimator.estimate(new Array[Double](10)) === 96)
+ assert(SizeEstimator.estimate(new Array[Int](1000)) === 4016)
+ assert(SizeEstimator.estimate(new Array[Long](1000)) === 8016)
}
test("object arrays") {
// Arrays containing nulls should just have one pointer per element
- expect(56)(SizeEstimator.estimate(new Array[String](10)))
- expect(56)(SizeEstimator.estimate(new Array[AnyRef](10)))
+ assert(SizeEstimator.estimate(new Array[String](10)) === 56)
+ assert(SizeEstimator.estimate(new Array[AnyRef](10)) === 56)
// For object arrays with non-null elements, each object should take one pointer plus
// however many bytes that class takes. (Note that Array.fill calls the code in its
// second parameter separately for each object, so we get distinct objects.)
- expect(216)(SizeEstimator.estimate(Array.fill(10)(new DummyClass1)))
- expect(216)(SizeEstimator.estimate(Array.fill(10)(new DummyClass2)))
- expect(296)(SizeEstimator.estimate(Array.fill(10)(new DummyClass3)))
- expect(56)(SizeEstimator.estimate(Array(new DummyClass1, new DummyClass2)))
+ assert(SizeEstimator.estimate(Array.fill(10)(new DummyClass1)) === 216)
+ assert(SizeEstimator.estimate(Array.fill(10)(new DummyClass2)) === 216)
+ assert(SizeEstimator.estimate(Array.fill(10)(new DummyClass3)) === 296)
+ assert(SizeEstimator.estimate(Array(new DummyClass1, new DummyClass2)) === 56)
// Past size 100, our samples 100 elements, but we should still get the right size.
- expect(28016)(SizeEstimator.estimate(Array.fill(1000)(new DummyClass3)))
+ assert(SizeEstimator.estimate(Array.fill(1000)(new DummyClass3)) === 28016)
// If an array contains the *same* element many times, we should only count it once.
val d1 = new DummyClass1
- expect(72)(SizeEstimator.estimate(Array.fill(10)(d1))) // 10 pointers plus 8-byte object
- expect(432)(SizeEstimator.estimate(Array.fill(100)(d1))) // 100 pointers plus 8-byte object
+ assert(SizeEstimator.estimate(Array.fill(10)(d1)) === 72) // 10 pointers plus 8-byte object
+ assert(SizeEstimator.estimate(Array.fill(100)(d1)) === 432) // 100 pointers plus 8-byte object
// Same thing with huge array containing the same element many times. Note that this won't
// return exactly 4032 because it can't tell that *all* the elements will equal the first
@@ -111,10 +111,10 @@ class SizeEstimatorSuite
val initialize = PrivateMethod[Unit]('initialize)
SizeEstimator invokePrivate initialize()
- expect(40)(SizeEstimator.estimate(DummyString("")))
- expect(48)(SizeEstimator.estimate(DummyString("a")))
- expect(48)(SizeEstimator.estimate(DummyString("ab")))
- expect(56)(SizeEstimator.estimate(DummyString("abcdefgh")))
+ assert(SizeEstimator.estimate(DummyString("")) === 40)
+ assert(SizeEstimator.estimate(DummyString("a")) === 48)
+ assert(SizeEstimator.estimate(DummyString("ab")) === 48)
+ assert(SizeEstimator.estimate(DummyString("abcdefgh")) === 56)
resetOrClear("os.arch", arch)
}
@@ -128,10 +128,10 @@ class SizeEstimatorSuite
val initialize = PrivateMethod[Unit]('initialize)
SizeEstimator invokePrivate initialize()
- expect(56)(SizeEstimator.estimate(DummyString("")))
- expect(64)(SizeEstimator.estimate(DummyString("a")))
- expect(64)(SizeEstimator.estimate(DummyString("ab")))
- expect(72)(SizeEstimator.estimate(DummyString("abcdefgh")))
+ assert(SizeEstimator.estimate(DummyString("")) === 56)
+ assert(SizeEstimator.estimate(DummyString("a")) === 64)
+ assert(SizeEstimator.estimate(DummyString("ab")) === 64)
+ assert(SizeEstimator.estimate(DummyString("abcdefgh")) === 72)
resetOrClear("os.arch", arch)
resetOrClear("spark.test.useCompressedOops", oops)
diff --git a/core/src/test/scala/spark/SortingSuite.scala b/core/src/test/scala/spark/SortingSuite.scala
index 495f957e53..f7bf207c68 100644
--- a/core/src/test/scala/spark/SortingSuite.scala
+++ b/core/src/test/scala/spark/SortingSuite.scala
@@ -5,16 +5,14 @@ import org.scalatest.BeforeAndAfter
import org.scalatest.matchers.ShouldMatchers
import SparkContext._
-class SortingSuite extends FunSuite with LocalSparkContext with ShouldMatchers with Logging {
-
+class SortingSuite extends FunSuite with SharedSparkContext with ShouldMatchers with Logging {
+
test("sortByKey") {
- sc = new SparkContext("local", "test")
val pairs = sc.parallelize(Array((1, 0), (2, 0), (0, 0), (3, 0)), 2)
- assert(pairs.sortByKey().collect() === Array((0,0), (1,0), (2,0), (3,0)))
+ assert(pairs.sortByKey().collect() === Array((0,0), (1,0), (2,0), (3,0)))
}
test("large array") {
- sc = new SparkContext("local", "test")
val rand = new scala.util.Random()
val pairArr = Array.fill(1000) { (rand.nextInt(), rand.nextInt()) }
val pairs = sc.parallelize(pairArr, 2)
@@ -24,7 +22,6 @@ class SortingSuite extends FunSuite with LocalSparkContext with ShouldMatchers w
}
test("large array with one split") {
- sc = new SparkContext("local", "test")
val rand = new scala.util.Random()
val pairArr = Array.fill(1000) { (rand.nextInt(), rand.nextInt()) }
val pairs = sc.parallelize(pairArr, 2)
@@ -32,9 +29,8 @@ class SortingSuite extends FunSuite with LocalSparkContext with ShouldMatchers w
assert(sorted.partitions.size === 1)
assert(sorted.collect() === pairArr.sortBy(_._1))
}
-
+
test("large array with many partitions") {
- sc = new SparkContext("local", "test")
val rand = new scala.util.Random()
val pairArr = Array.fill(1000) { (rand.nextInt(), rand.nextInt()) }
val pairs = sc.parallelize(pairArr, 2)
@@ -42,9 +38,8 @@ class SortingSuite extends FunSuite with LocalSparkContext with ShouldMatchers w
assert(sorted.partitions.size === 20)
assert(sorted.collect() === pairArr.sortBy(_._1))
}
-
+
test("sort descending") {
- sc = new SparkContext("local", "test")
val rand = new scala.util.Random()
val pairArr = Array.fill(1000) { (rand.nextInt(), rand.nextInt()) }
val pairs = sc.parallelize(pairArr, 2)
@@ -52,15 +47,13 @@ class SortingSuite extends FunSuite with LocalSparkContext with ShouldMatchers w
}
test("sort descending with one split") {
- sc = new SparkContext("local", "test")
val rand = new scala.util.Random()
val pairArr = Array.fill(1000) { (rand.nextInt(), rand.nextInt()) }
val pairs = sc.parallelize(pairArr, 1)
assert(pairs.sortByKey(false, 1).collect() === pairArr.sortWith((x, y) => x._1 > y._1))
}
-
+
test("sort descending with many partitions") {
- sc = new SparkContext("local", "test")
val rand = new scala.util.Random()
val pairArr = Array.fill(1000) { (rand.nextInt(), rand.nextInt()) }
val pairs = sc.parallelize(pairArr, 2)
@@ -68,7 +61,6 @@ class SortingSuite extends FunSuite with LocalSparkContext with ShouldMatchers w
}
test("more partitions than elements") {
- sc = new SparkContext("local", "test")
val rand = new scala.util.Random()
val pairArr = Array.fill(10) { (rand.nextInt(), rand.nextInt()) }
val pairs = sc.parallelize(pairArr, 30)
@@ -76,14 +68,12 @@ class SortingSuite extends FunSuite with LocalSparkContext with ShouldMatchers w
}
test("empty RDD") {
- sc = new SparkContext("local", "test")
val pairArr = new Array[(Int, Int)](0)
val pairs = sc.parallelize(pairArr, 2)
assert(pairs.sortByKey().collect() === pairArr.sortBy(_._1))
}
test("partition balancing") {
- sc = new SparkContext("local", "test")
val pairArr = (1 to 1000).map(x => (x, x)).toArray
val sorted = sc.parallelize(pairArr, 4).sortByKey()
assert(sorted.collect() === pairArr.sortBy(_._1))
@@ -99,7 +89,6 @@ class SortingSuite extends FunSuite with LocalSparkContext with ShouldMatchers w
}
test("partition balancing for descending sort") {
- sc = new SparkContext("local", "test")
val pairArr = (1 to 1000).map(x => (x, x)).toArray
val sorted = sc.parallelize(pairArr, 4).sortByKey(false)
assert(sorted.collect() === pairArr.sortBy(_._1).reverse)
diff --git a/core/src/test/scala/spark/UnpersistSuite.scala b/core/src/test/scala/spark/UnpersistSuite.scala
new file mode 100644
index 0000000000..94776e7572
--- /dev/null
+++ b/core/src/test/scala/spark/UnpersistSuite.scala
@@ -0,0 +1,30 @@
+package spark
+
+import org.scalatest.FunSuite
+import org.scalatest.concurrent.Timeouts._
+import org.scalatest.time.{Span, Millis}
+import spark.SparkContext._
+
+class UnpersistSuite extends FunSuite with LocalSparkContext {
+ test("unpersist RDD") {
+ sc = new SparkContext("local", "test")
+ val rdd = sc.makeRDD(Array(1, 2, 3, 4), 2).cache()
+ rdd.count
+ assert(sc.persistentRdds.isEmpty === false)
+ rdd.unpersist()
+ assert(sc.persistentRdds.isEmpty === true)
+
+ failAfter(Span(3000, Millis)) {
+ try {
+ while (! sc.getRDDStorageInfo.isEmpty) {
+ Thread.sleep(200)
+ }
+ } catch {
+ case _ => { Thread.sleep(10) }
+ // Do nothing. We might see exceptions because block manager
+ // is racing this thread to remove entries from the driver.
+ }
+ }
+ assert(sc.getRDDStorageInfo.isEmpty === true)
+ }
+}
diff --git a/core/src/test/scala/spark/UtilsSuite.scala b/core/src/test/scala/spark/UtilsSuite.scala
index ed4701574f..4a113e16bf 100644
--- a/core/src/test/scala/spark/UtilsSuite.scala
+++ b/core/src/test/scala/spark/UtilsSuite.scala
@@ -27,24 +27,49 @@ class UtilsSuite extends FunSuite {
assert(os.toByteArray.toList.equals(bytes.toList))
}
- test("memoryStringToMb"){
- assert(Utils.memoryStringToMb("1") == 0)
- assert(Utils.memoryStringToMb("1048575") == 0)
- assert(Utils.memoryStringToMb("3145728") == 3)
+ test("memoryStringToMb") {
+ assert(Utils.memoryStringToMb("1") === 0)
+ assert(Utils.memoryStringToMb("1048575") === 0)
+ assert(Utils.memoryStringToMb("3145728") === 3)
- assert(Utils.memoryStringToMb("1024k") == 1)
- assert(Utils.memoryStringToMb("5000k") == 4)
- assert(Utils.memoryStringToMb("4024k") == Utils.memoryStringToMb("4024K"))
+ assert(Utils.memoryStringToMb("1024k") === 1)
+ assert(Utils.memoryStringToMb("5000k") === 4)
+ assert(Utils.memoryStringToMb("4024k") === Utils.memoryStringToMb("4024K"))
- assert(Utils.memoryStringToMb("1024m") == 1024)
- assert(Utils.memoryStringToMb("5000m") == 5000)
- assert(Utils.memoryStringToMb("4024m") == Utils.memoryStringToMb("4024M"))
+ assert(Utils.memoryStringToMb("1024m") === 1024)
+ assert(Utils.memoryStringToMb("5000m") === 5000)
+ assert(Utils.memoryStringToMb("4024m") === Utils.memoryStringToMb("4024M"))
- assert(Utils.memoryStringToMb("2g") == 2048)
- assert(Utils.memoryStringToMb("3g") == Utils.memoryStringToMb("3G"))
+ assert(Utils.memoryStringToMb("2g") === 2048)
+ assert(Utils.memoryStringToMb("3g") === Utils.memoryStringToMb("3G"))
- assert(Utils.memoryStringToMb("2t") == 2097152)
- assert(Utils.memoryStringToMb("3t") == Utils.memoryStringToMb("3T"))
+ assert(Utils.memoryStringToMb("2t") === 2097152)
+ assert(Utils.memoryStringToMb("3t") === Utils.memoryStringToMb("3T"))
+ }
+
+ test("splitCommandString") {
+ assert(Utils.splitCommandString("") === Seq())
+ assert(Utils.splitCommandString("a") === Seq("a"))
+ assert(Utils.splitCommandString("aaa") === Seq("aaa"))
+ assert(Utils.splitCommandString("a b c") === Seq("a", "b", "c"))
+ assert(Utils.splitCommandString(" a b\t c ") === Seq("a", "b", "c"))
+ assert(Utils.splitCommandString("a 'b c'") === Seq("a", "b c"))
+ assert(Utils.splitCommandString("a 'b c' d") === Seq("a", "b c", "d"))
+ assert(Utils.splitCommandString("'b c'") === Seq("b c"))
+ assert(Utils.splitCommandString("a \"b c\"") === Seq("a", "b c"))
+ assert(Utils.splitCommandString("a \"b c\" d") === Seq("a", "b c", "d"))
+ assert(Utils.splitCommandString("\"b c\"") === Seq("b c"))
+ assert(Utils.splitCommandString("a 'b\" c' \"d' e\"") === Seq("a", "b\" c", "d' e"))
+ assert(Utils.splitCommandString("a\t'b\nc'\nd") === Seq("a", "b\nc", "d"))
+ assert(Utils.splitCommandString("a \"b\\\\c\"") === Seq("a", "b\\c"))
+ assert(Utils.splitCommandString("a \"b\\\"c\"") === Seq("a", "b\"c"))
+ assert(Utils.splitCommandString("a 'b\\\"c'") === Seq("a", "b\\\"c"))
+ assert(Utils.splitCommandString("'a'b") === Seq("ab"))
+ assert(Utils.splitCommandString("'a''b'") === Seq("ab"))
+ assert(Utils.splitCommandString("\"a\"b") === Seq("ab"))
+ assert(Utils.splitCommandString("\"a\"\"b\"") === Seq("ab"))
+ assert(Utils.splitCommandString("''") === Seq(""))
+ assert(Utils.splitCommandString("\"\"") === Seq(""))
}
}
diff --git a/core/src/test/scala/spark/ZippedPartitionsSuite.scala b/core/src/test/scala/spark/ZippedPartitionsSuite.scala
index 5f60aa75d7..96cb295f45 100644
--- a/core/src/test/scala/spark/ZippedPartitionsSuite.scala
+++ b/core/src/test/scala/spark/ZippedPartitionsSuite.scala
@@ -17,9 +17,8 @@ object ZippedPartitionsSuite {
}
}
-class ZippedPartitionsSuite extends FunSuite with LocalSparkContext {
+class ZippedPartitionsSuite extends FunSuite with SharedSparkContext {
test("print sizes") {
- sc = new SparkContext("local", "test")
val data1 = sc.makeRDD(Array(1, 2, 3, 4), 2)
val data2 = sc.makeRDD(Array("1", "2", "3", "4", "5", "6"), 2)
val data3 = sc.makeRDD(Array(1.0, 2.0), 2)
diff --git a/core/src/test/scala/spark/scheduler/JobLoggerSuite.scala b/core/src/test/scala/spark/scheduler/JobLoggerSuite.scala
index 4000c4d520..699901f1a1 100644
--- a/core/src/test/scala/spark/scheduler/JobLoggerSuite.scala
+++ b/core/src/test/scala/spark/scheduler/JobLoggerSuite.scala
@@ -41,7 +41,6 @@ class JobLoggerSuite extends FunSuite with LocalSparkContext with ShouldMatchers
val rootStage = new Stage(0, rootRdd, None, List(shuffleMapStage), jobID)
joblogger.onStageSubmitted(SparkListenerStageSubmitted(rootStage, 4))
- joblogger.getEventQueue.size should be (1)
joblogger.getRddNameTest(parentRdd) should be (parentRdd.getClass.getName)
parentRdd.setName("MyRDD")
joblogger.getRddNameTest(parentRdd) should be ("MyRDD")
diff --git a/docs/python-programming-guide.md b/docs/python-programming-guide.md
index 3a7a8db4a6..7f1e7cf93d 100644
--- a/docs/python-programming-guide.md
+++ b/docs/python-programming-guide.md
@@ -27,14 +27,14 @@ Short functions can be passed to RDD methods using Python's [`lambda`](http://ww
{% highlight python %}
logData = sc.textFile(logFile).cache()
-errors = logData.filter(lambda s: 'ERROR' in s.split())
+errors = logData.filter(lambda line: "ERROR" in line)
{% endhighlight %}
You can also pass functions that are defined using the `def` keyword; this is useful for more complicated functions that cannot be expressed using `lambda`:
{% highlight python %}
def is_error(line):
- return 'ERROR' in line.split()
+ return "ERROR" in line
errors = logData.filter(is_error)
{% endhighlight %}
@@ -43,8 +43,7 @@ Functions can access objects in enclosing scopes, although modifications to thos
{% highlight python %}
error_keywords = ["Exception", "Error"]
def is_error(line):
- words = line.split()
- return any(keyword in words for keyword in error_keywords)
+ return any(keyword in line for keyword in error_keywords)
errors = logData.filter(is_error)
{% endhighlight %}
diff --git a/project/SparkBuild.scala b/project/SparkBuild.scala
index 481c06ef54..6058fb185e 100644
--- a/project/SparkBuild.scala
+++ b/project/SparkBuild.scala
@@ -236,7 +236,7 @@ object SparkBuild extends Build {
def extraAssemblySettings() = Seq(test in assembly := {}) ++ Seq(
mergeStrategy in assembly := {
case m if m.toLowerCase.endsWith("manifest.mf") => MergeStrategy.discard
- case m if m.toLowerCase.matches("meta-inf/.*\\.sf$") => MergeStrategy.discard
+ case m if m.toLowerCase.matches("meta-inf.*\\.sf$") => MergeStrategy.discard
case "reference.conf" => MergeStrategy.concat
case _ => MergeStrategy.first
}
diff --git a/repl/src/test/scala/spark/repl/ReplSuite.scala b/repl/src/test/scala/spark/repl/ReplSuite.scala
index 1c64f9b98d..f46e6d8be4 100644
--- a/repl/src/test/scala/spark/repl/ReplSuite.scala
+++ b/repl/src/test/scala/spark/repl/ReplSuite.scala
@@ -28,24 +28,25 @@ class ReplSuite extends FunSuite {
val separator = System.getProperty("path.separator")
interp.process(Array("-classpath", paths.mkString(separator)))
spark.repl.Main.interp = null
- if (interp.sparkContext != null)
+ if (interp.sparkContext != null) {
interp.sparkContext.stop()
+ }
// To avoid Akka rebinding to the same port, since it doesn't unbind immediately on shutdown
System.clearProperty("spark.driver.port")
System.clearProperty("spark.hostPort")
return out.toString
}
-
+
def assertContains(message: String, output: String) {
- assert(output contains message,
+ assert(output.contains(message),
"Interpreter output did not contain '" + message + "':\n" + output)
}
-
+
def assertDoesNotContain(message: String, output: String) {
- assert(!(output contains message),
+ assert(!output.contains(message),
"Interpreter output contained '" + message + "':\n" + output)
}
-
+
test ("simple foreach with accumulator") {
val output = runInterpreter("local", """
val accum = sc.accumulator(0)
@@ -56,7 +57,7 @@ class ReplSuite extends FunSuite {
assertDoesNotContain("Exception", output)
assertContains("res1: Int = 55", output)
}
-
+
test ("external vars") {
val output = runInterpreter("local", """
var v = 7
@@ -105,7 +106,7 @@ class ReplSuite extends FunSuite {
assertContains("res0: Int = 70", output)
assertContains("res1: Int = 100", output)
}
-
+
test ("broadcast vars") {
// Test that the value that a broadcast var had when it was created is used,
// even if that variable is then modified in the driver program
@@ -143,6 +144,27 @@ class ReplSuite extends FunSuite {
assertContains("res2: Long = 3", output)
}
+ test ("local-cluster mode") {
+ val output = runInterpreter("local-cluster[1,1,512]", """
+ var v = 7
+ def getV() = v
+ sc.parallelize(1 to 10).map(x => getV()).collect.reduceLeft(_+_)
+ v = 10
+ sc.parallelize(1 to 10).map(x => getV()).collect.reduceLeft(_+_)
+ var array = new Array[Int](5)
+ val broadcastArray = sc.broadcast(array)
+ sc.parallelize(0 to 4).map(x => broadcastArray.value(x)).collect
+ array(0) = 5
+ sc.parallelize(0 to 4).map(x => broadcastArray.value(x)).collect
+ """)
+ assertDoesNotContain("error:", output)
+ assertDoesNotContain("Exception", output)
+ assertContains("res0: Int = 70", output)
+ assertContains("res1: Int = 100", output)
+ assertContains("res2: Array[Int] = Array(0, 0, 0, 0, 0)", output)
+ assertContains("res4: Array[Int] = Array(0, 0, 0, 0, 0)", output)
+ }
+
if (System.getenv("MESOS_NATIVE_LIBRARY") != null) {
test ("running on Mesos") {
val output = runInterpreter("localquiet", """
diff --git a/run b/run
index 646d12c1eb..6b5bc01a51 100755
--- a/run
+++ b/run
@@ -21,29 +21,69 @@ fi
if [ "$1" = "spark.deploy.master.Master" -o "$1" = "spark.deploy.worker.Worker" ]; then
SPARK_MEM=${SPARK_DAEMON_MEMORY:-512m}
SPARK_DAEMON_JAVA_OPTS="$SPARK_DAEMON_JAVA_OPTS -Dspark.akka.logLifecycleEvents=true"
- SPARK_JAVA_OPTS=$SPARK_DAEMON_JAVA_OPTS # Empty by default
+ # Do not overwrite SPARK_JAVA_OPTS environment variable in this script
+ OUR_JAVA_OPTS="$SPARK_DAEMON_JAVA_OPTS" # Empty by default
+else
+ OUR_JAVA_OPTS="$SPARK_JAVA_OPTS"
fi
# Add java opts for master, worker, executor. The opts maybe null
case "$1" in
'spark.deploy.master.Master')
- SPARK_JAVA_OPTS="$SPARK_JAVA_OPTS $SPARK_MASTER_OPTS"
+ OUR_JAVA_OPTS="$OUR_JAVA_OPTS $SPARK_MASTER_OPTS"
;;
'spark.deploy.worker.Worker')
- SPARK_JAVA_OPTS="$SPARK_JAVA_OPTS $SPARK_WORKER_OPTS"
+ OUR_JAVA_OPTS="$OUR_JAVA_OPTS $SPARK_WORKER_OPTS"
;;
'spark.executor.StandaloneExecutorBackend')
- SPARK_JAVA_OPTS="$SPARK_JAVA_OPTS $SPARK_EXECUTOR_OPTS"
+ OUR_JAVA_OPTS="$OUR_JAVA_OPTS $SPARK_EXECUTOR_OPTS"
;;
'spark.executor.MesosExecutorBackend')
- SPARK_JAVA_OPTS="$SPARK_JAVA_OPTS $SPARK_EXECUTOR_OPTS"
+ OUR_JAVA_OPTS="$OUR_JAVA_OPTS $SPARK_EXECUTOR_OPTS"
;;
'spark.repl.Main')
- SPARK_JAVA_OPTS="$SPARK_JAVA_OPTS $SPARK_REPL_OPTS"
+ OUR_JAVA_OPTS="$OUR_JAVA_OPTS $SPARK_REPL_OPTS"
;;
esac
+# Figure out whether to run our class with java or with the scala launcher.
+# In most cases, we'd prefer to execute our process with java because scala
+# creates a shell script as the parent of its Java process, which makes it
+# hard to kill the child with stuff like Process.destroy(). However, for
+# the Spark shell, the wrapper is necessary to properly reset the terminal
+# when we exit, so we allow it to set a variable to launch with scala.
+if [ "$SPARK_LAUNCH_WITH_SCALA" == "1" ]; then
+ if [ "$SCALA_HOME" ]; then
+ RUNNER="${SCALA_HOME}/bin/scala"
+ else
+ if [ `command -v scala` ]; then
+ RUNNER="scala"
+ else
+ echo "SCALA_HOME is not set and scala is not in PATH" >&2
+ exit 1
+ fi
+ fi
+else
+ if [ -n "${JAVA_HOME}" ]; then
+ RUNNER="${JAVA_HOME}/bin/java"
+ else
+ if [ `command -v java` ]; then
+ RUNNER="java"
+ else
+ echo "JAVA_HOME is not set" >&2
+ exit 1
+ fi
+ fi
+ if [[ ! -f "$FWDIR/RELEASE" && -z "$SCALA_LIBRARY_PATH" ]]; then
+ if [ -z "$SCALA_HOME" ]; then
+ echo "SCALA_HOME is not set" >&2
+ exit 1
+ fi
+ SCALA_LIBRARY_PATH="$SCALA_HOME/lib"
+ fi
+fi
+
# Figure out how much memory to use per executor and set it as an environment
# variable so that our process sees it and can report it to Mesos
if [ -z "$SPARK_MEM" ] ; then
@@ -52,7 +92,7 @@ fi
export SPARK_MEM
# Set JAVA_OPTS to be able to load native libraries and to set heap size
-JAVA_OPTS="$SPARK_JAVA_OPTS"
+JAVA_OPTS="$OUR_JAVA_OPTS"
JAVA_OPTS="$JAVA_OPTS -Djava.library.path=$SPARK_LIBRARY_PATH"
JAVA_OPTS="$JAVA_OPTS -Xms$SPARK_MEM -Xmx$SPARK_MEM"
# Load extra JAVA_OPTS from conf/java-opts, if it exists
@@ -60,37 +100,35 @@ if [ -e $FWDIR/conf/java-opts ] ; then
JAVA_OPTS="$JAVA_OPTS `cat $FWDIR/conf/java-opts`"
fi
export JAVA_OPTS
+# Attention: when changing the way the JAVA_OPTS are assembled, the change must be reflected in ExecutorRunner.scala!
-# Check if this is a binary distribution or source distribution
-# and build up the classpath appropriately
-if [ -f "$FWDIR/RELEASE" ]; then
- if [ `command -v java` ]; then
- RUNNER="java"
- else
- if [ -z "$JAVA_HOME" ]; then
- echo "JAVA_HOME is not set" >&2
- exit 1
- fi
- RUNNER="${JAVA_HOME}/bin/java"
+if [ ! -f "$FWDIR/RELEASE" ]; then
+ CORE_DIR="$FWDIR/core"
+ EXAMPLES_DIR="$FWDIR/examples"
+ REPL_DIR="$FWDIR/repl"
+
+ # Exit if the user hasn't compiled Spark
+ if [ ! -e "$CORE_DIR/target" ]; then
+ echo "Failed to find Spark classes in $CORE_DIR/target" >&2
+ echo "You need to compile Spark before running this program" >&2
+ exit 1
+ fi
+
+ if [[ "$@" = *repl* && ! -e "$REPL_DIR/target" ]]; then
+ echo "Failed to find Spark classes in $REPL_DIR/target" >&2
+ echo "You need to compile Spark repl module before running this program" >&2
+ exit 1
fi
+fi
- CLASSPATH="$SPARK_CLASSPATH:$FWDIR/jars/*"
+# Compute classpath using external script
+CLASSPATH=`$FWDIR/bin/compute-classpath.sh`
+export CLASSPATH
+if [ "$SPARK_LAUNCH_WITH_SCALA" == "1" ]; then
+ EXTRA_ARGS="" # Java options will be passed to scala as JAVA_OPTS
+else
# The JVM doesn't read JAVA_OPTS by default so we need to pass it in
EXTRA_ARGS="$JAVA_OPTS"
-else
- . "$FWDIR/set-dev-classpath.sh"
fi
-
-# Add hadoop conf dir - else FileSystem.*, etc fail !
-# Note, this assumes that there is either a HADOOP_CONF_DIR or YARN_CONF_DIR which hosts
-# the configurtion files.
-if [ "x" != "x$HADOOP_CONF_DIR" ]; then
- CLASSPATH="$CLASSPATH:$HADOOP_CONF_DIR"
-fi
-if [ "x" != "x$YARN_CONF_DIR" ]; then
- CLASSPATH="$CLASSPATH:$YARN_CONF_DIR"
-fi
-
-export CLASSPATH # Needed for spark-shell
exec "$RUNNER" -cp "$CLASSPATH" $EXTRA_ARGS "$@"
diff --git a/run2.cmd b/run2.cmd
index c6f43dde5b..a9c4df180f 100644
--- a/run2.cmd
+++ b/run2.cmd
@@ -23,7 +23,9 @@ if "%1"=="spark.deploy.worker.Worker" set RUNNING_DAEMON=1
if "x%SPARK_DAEMON_MEMORY%" == "x" set SPARK_DAEMON_MEMORY=512m
set SPARK_DAEMON_JAVA_OPTS=%SPARK_DAEMON_JAVA_OPTS% -Dspark.akka.logLifecycleEvents=true
if "%RUNNING_DAEMON%"=="1" set SPARK_MEM=%SPARK_DAEMON_MEMORY%
-if "%RUNNING_DAEMON%"=="1" set SPARK_JAVA_OPTS=%SPARK_DAEMON_JAVA_OPTS%
+rem Do not overwrite SPARK_JAVA_OPTS environment variable in this script
+if "%RUNNING_DAEMON%"=="0" set OUR_JAVA_OPTS=%SPARK_JAVA_OPTS%
+if "%RUNNING_DAEMON%"=="1" set OUR_JAVA_OPTS=%SPARK_DAEMON_JAVA_OPTS%
rem Check that SCALA_HOME has been specified
if not "x%SCALA_HOME%"=="x" goto scala_exists
@@ -31,50 +33,22 @@ if not "x%SCALA_HOME%"=="x" goto scala_exists
goto exit
:scala_exists
-rem If the user specifies a Mesos JAR, put it before our included one on the classpath
-set MESOS_CLASSPATH=
-if not "x%MESOS_JAR%"=="x" set MESOS_CLASSPATH=%MESOS_JAR%
-
rem Figure out how much memory to use per executor and set it as an environment
rem variable so that our process sees it and can report it to Mesos
if "x%SPARK_MEM%"=="x" set SPARK_MEM=512m
rem Set JAVA_OPTS to be able to load native libraries and to set heap size
-set JAVA_OPTS=%SPARK_JAVA_OPTS% -Djava.library.path=%SPARK_LIBRARY_PATH% -Xms%SPARK_MEM% -Xmx%SPARK_MEM%
-rem Load extra JAVA_OPTS from conf/java-opts, if it exists
-if exist "%FWDIR%conf\java-opts.cmd" call "%FWDIR%conf\java-opts.cmd"
+set JAVA_OPTS=%OUR_JAVA_OPTS% -Djava.library.path=%SPARK_LIBRARY_PATH% -Xms%SPARK_MEM% -Xmx%SPARK_MEM%
+rem Attention: when changing the way the JAVA_OPTS are assembled, the change must be reflected in ExecutorRunner.scala!
set CORE_DIR=%FWDIR%core
-set REPL_DIR=%FWDIR%repl
set EXAMPLES_DIR=%FWDIR%examples
-set BAGEL_DIR=%FWDIR%bagel
-set STREAMING_DIR=%FWDIR%streaming
-set PYSPARK_DIR=%FWDIR%python
-
-rem Build up classpath
-set CLASSPATH=%SPARK_CLASSPATH%;%MESOS_CLASSPATH%;%FWDIR%conf;%CORE_DIR%\target\scala-%SCALA_VERSION%\classes
-set CLASSPATH=%CLASSPATH%;%CORE_DIR%\target\scala-%SCALA_VERSION%\test-classes;%CORE_DIR%\src\main\resources
-set CLASSPATH=%CLASSPATH%;%STREAMING_DIR%\target\scala-%SCALA_VERSION%\classes;%STREAMING_DIR%\target\scala-%SCALA_VERSION%\test-classes
-set CLASSPATH=%CLASSPATH%;%STREAMING_DIR%\lib\org\apache\kafka\kafka\0.7.2-spark\*
-set CLASSPATH=%CLASSPATH%;%REPL_DIR%\target\scala-%SCALA_VERSION%\classes;%EXAMPLES_DIR%\target\scala-%SCALA_VERSION%\classes
-set CLASSPATH=%CLASSPATH%;%FWDIR%lib_managed\jars\*
-set CLASSPATH=%CLASSPATH%;%FWDIR%lib_managed\bundles\*
-set CLASSPATH=%CLASSPATH%;%FWDIR%repl\lib\*
-set CLASSPATH=%CLASSPATH%;%FWDIR%python\lib\*
-set CLASSPATH=%CLASSPATH%;%BAGEL_DIR%\target\scala-%SCALA_VERSION%\classes
-
-rem Add hadoop conf dir - else FileSystem.*, etc fail
-rem Note, this assumes that there is either a HADOOP_CONF_DIR or YARN_CONF_DIR which hosts
-rem the configurtion files.
-if "x%HADOOP_CONF_DIR%"=="x" goto no_hadoop_conf_dir
- set CLASSPATH=%CLASSPATH%;%HADOOP_CONF_DIR%
-:no_hadoop_conf_dir
-
-if "x%YARN_CONF_DIR%"=="x" goto no_yarn_conf_dir
- set CLASSPATH=%CLASSPATH%;%YARN_CONF_DIR%
-:no_yarn_conf_dir
-
+set REPL_DIR=%FWDIR%repl
+rem Compute classpath using external script
+set DONT_PRINT_CLASSPATH=1
+call "%FWDIR%bin\compute-classpath.cmd"
+set DONT_PRINT_CLASSPATH=0
rem Figure out the JAR file that our examples were packaged into.
rem First search in the build path from SBT:
diff --git a/set-dev-classpath.sh b/set-dev-classpath.sh
deleted file mode 100644
index d031c56baf..0000000000
--- a/set-dev-classpath.sh
+++ /dev/null
@@ -1,118 +0,0 @@
-# A BASH script to set the classpath for running Spark out of the developer/github tree
-
-SCALA_VERSION=2.9.3
-
-# Figure out where the Scala framework is installed
-FWDIR="$(cd `dirname $0`; pwd)"
-
-if [ "$SPARK_LAUNCH_WITH_SCALA" == "1" ]; then
- if [ "$SCALA_HOME" ]; then
- RUNNER="${SCALA_HOME}/bin/scala"
- else
- if [ `command -v scala` ]; then
- RUNNER="scala"
- else
- echo "SCALA_HOME is not set and scala is not in PATH" >&2
- exit 1
- fi
- fi
-else
- if [ `command -v java` ]; then
- RUNNER="java"
- else
- if [ -z "$JAVA_HOME" ]; then
- echo "JAVA_HOME is not set" >&2
- exit 1
- fi
- RUNNER="${JAVA_HOME}/bin/java"
- fi
- if [ -z "$SCALA_LIBRARY_PATH" ]; then
- if [ -z "$SCALA_HOME" ]; then
- echo "SCALA_HOME is not set" >&2
- exit 1
- fi
- SCALA_LIBRARY_PATH="$SCALA_HOME/lib"
- fi
-fi
-
-CORE_DIR="$FWDIR/core"
-REPL_DIR="$FWDIR/repl"
-REPL_BIN_DIR="$FWDIR/repl-bin"
-EXAMPLES_DIR="$FWDIR/examples"
-BAGEL_DIR="$FWDIR/bagel"
-STREAMING_DIR="$FWDIR/streaming"
-PYSPARK_DIR="$FWDIR/python"
-
-# Exit if the user hasn't compiled Spark
-if [ ! -e "$CORE_DIR/target" ]; then
- echo "Failed to find Spark classes in $CORE_DIR/target" >&2
- echo "You need to compile Spark before running this program" >&2
- exit 1
-fi
-
-if [[ "$@" = *repl* && ! -e "$REPL_DIR/target" ]]; then
- echo "Failed to find Spark classes in $REPL_DIR/target" >&2
- echo "You need to compile Spark repl module before running this program" >&2
- exit 1
-fi
-
-# Build up classpath
-CLASSPATH="$SPARK_CLASSPATH"
-CLASSPATH="$CLASSPATH:$FWDIR/conf"
-CLASSPATH="$CLASSPATH:$CORE_DIR/target/scala-$SCALA_VERSION/classes"
-if [ -n "$SPARK_TESTING" ] ; then
- CLASSPATH="$CLASSPATH:$CORE_DIR/target/scala-$SCALA_VERSION/test-classes"
- CLASSPATH="$CLASSPATH:$STREAMING_DIR/target/scala-$SCALA_VERSION/test-classes"
-fi
-CLASSPATH="$CLASSPATH:$CORE_DIR/src/main/resources"
-CLASSPATH="$CLASSPATH:$REPL_DIR/target/scala-$SCALA_VERSION/classes"
-CLASSPATH="$CLASSPATH:$EXAMPLES_DIR/target/scala-$SCALA_VERSION/classes"
-CLASSPATH="$CLASSPATH:$STREAMING_DIR/target/scala-$SCALA_VERSION/classes"
-CLASSPATH="$CLASSPATH:$STREAMING_DIR/lib/org/apache/kafka/kafka/0.7.2-spark/*" # <-- our in-project Kafka Jar
-if [ -e "$FWDIR/lib_managed" ]; then
- CLASSPATH="$CLASSPATH:$FWDIR/lib_managed/jars/*"
- CLASSPATH="$CLASSPATH:$FWDIR/lib_managed/bundles/*"
-fi
-CLASSPATH="$CLASSPATH:$REPL_DIR/lib/*"
-
-# Add the shaded JAR for Maven builds
-if [ -e $REPL_BIN_DIR/target ]; then
- for jar in `find "$REPL_BIN_DIR/target" -name 'spark-repl-*-shaded-hadoop*.jar'`; do
- CLASSPATH="$CLASSPATH:$jar"
- done
- # The shaded JAR doesn't contain examples, so include those separately
- EXAMPLES_JAR=`ls "$EXAMPLES_DIR/target/spark-examples"*[0-9T].jar`
- CLASSPATH+=":$EXAMPLES_JAR"
-fi
-CLASSPATH="$CLASSPATH:$BAGEL_DIR/target/scala-$SCALA_VERSION/classes"
-for jar in `find $PYSPARK_DIR/lib -name '*jar'`; do
- CLASSPATH="$CLASSPATH:$jar"
-done
-
-# Figure out the JAR file that our examples were packaged into. This includes a bit of a hack
-# to avoid the -sources and -doc packages that are built by publish-local.
-if [ -e "$EXAMPLES_DIR/target/scala-$SCALA_VERSION/spark-examples"*[0-9T].jar ]; then
- # Use the JAR from the SBT build
- export SPARK_EXAMPLES_JAR=`ls "$EXAMPLES_DIR/target/scala-$SCALA_VERSION/spark-examples"*[0-9T].jar`
-fi
-if [ -e "$EXAMPLES_DIR/target/spark-examples"*[0-9T].jar ]; then
- # Use the JAR from the Maven build
- export SPARK_EXAMPLES_JAR=`ls "$EXAMPLES_DIR/target/spark-examples"*[0-9T].jar`
-fi
-
-
-# Figure out whether to run our class with java or with the scala launcher.
-# In most cases, we'd prefer to execute our process with java because scala
-# creates a shell script as the parent of its Java process, which makes it
-# hard to kill the child with stuff like Process.destroy(). However, for
-# the Spark shell, the wrapper is necessary to properly reset the terminal
-# when we exit, so we allow it to set a variable to launch with scala.
-if [ "$SPARK_LAUNCH_WITH_SCALA" == "1" ]; then
- EXTRA_ARGS="" # Java options will be passed to scala as JAVA_OPTS
-else
- CLASSPATH="$CLASSPATH:$SCALA_LIBRARY_PATH/scala-library.jar"
- CLASSPATH="$CLASSPATH:$SCALA_LIBRARY_PATH/scala-compiler.jar"
- CLASSPATH="$CLASSPATH:$SCALA_LIBRARY_PATH/jline.jar"
- # The JVM doesn't read JAVA_OPTS by default so we need to pass it in
- EXTRA_ARGS="$JAVA_OPTS"
-fi