diff options
-rw-r--r-- | core/src/main/scala/spark/SparkEnv.scala | 2 | ||||
-rw-r--r-- | core/src/main/scala/spark/deploy/worker/ExecutorRunner.scala | 3 | ||||
-rw-r--r-- | core/src/test/scala/spark/DistributedSuite.scala | 2 | ||||
-rw-r--r-- | core/src/test/scala/spark/FileServerSuite.scala | 24 | ||||
-rw-r--r-- | project/SparkBuild.scala | 8 | ||||
-rwxr-xr-x | run | 2 | ||||
-rw-r--r-- | run.cmd | 2 | ||||
-rw-r--r-- | run2.cmd | 68 | ||||
-rw-r--r-- | sbt/sbt.cmd | 5 |
9 files changed, 98 insertions, 18 deletions
diff --git a/core/src/main/scala/spark/SparkEnv.scala b/core/src/main/scala/spark/SparkEnv.scala index 7473b40aa3..6ffae8e85f 100644 --- a/core/src/main/scala/spark/SparkEnv.scala +++ b/core/src/main/scala/spark/SparkEnv.scala @@ -40,6 +40,8 @@ class SparkEnv ( blockManager.stop() blockManager.master.stop() actorSystem.shutdown() + // Akka's awaitTermination doesn't actually wait until the port is unbound, so sleep a bit + Thread.sleep(100) actorSystem.awaitTermination() // Akka's awaitTermination doesn't actually wait until the port is unbound, so sleep a bit Thread.sleep(100) diff --git a/core/src/main/scala/spark/deploy/worker/ExecutorRunner.scala b/core/src/main/scala/spark/deploy/worker/ExecutorRunner.scala index 7043361020..e2a9df275a 100644 --- a/core/src/main/scala/spark/deploy/worker/ExecutorRunner.scala +++ b/core/src/main/scala/spark/deploy/worker/ExecutorRunner.scala @@ -75,7 +75,8 @@ class ExecutorRunner( def buildCommandSeq(): Seq[String] = { val command = jobDesc.command - val runScript = new File(sparkHome, "run").getCanonicalPath + val script = if (System.getProperty("os.name").startsWith("Windows")) "run.cmd" else "run"; + val runScript = new File(sparkHome, script).getCanonicalPath Seq(runScript, command.mainClass) ++ command.arguments.map(substituteVariables) } diff --git a/core/src/test/scala/spark/DistributedSuite.scala b/core/src/test/scala/spark/DistributedSuite.scala index b7b8a79327..93b876d205 100644 --- a/core/src/test/scala/spark/DistributedSuite.scala +++ b/core/src/test/scala/spark/DistributedSuite.scala @@ -18,7 +18,7 @@ class DistributedSuite extends FunSuite with ShouldMatchers with BeforeAndAfter val clusterUrl = "local-cluster[2,1,512]" - var sc: SparkContext = _ + @transient var sc: SparkContext = _ after { if (sc != null) { diff --git a/core/src/test/scala/spark/FileServerSuite.scala b/core/src/test/scala/spark/FileServerSuite.scala index 500af1eb90..fd7a7bd589 100644 --- a/core/src/test/scala/spark/FileServerSuite.scala +++ b/core/src/test/scala/spark/FileServerSuite.scala @@ -3,14 +3,14 @@ package spark import com.google.common.io.Files import org.scalatest.FunSuite import org.scalatest.BeforeAndAfter -import java.io.{File, PrintWriter} +import java.io.{File, PrintWriter, FileReader, BufferedReader} import SparkContext._ class FileServerSuite extends FunSuite with BeforeAndAfter { - var sc: SparkContext = _ - var tmpFile : File = _ - var testJarFile : File = _ + @transient var sc: SparkContext = _ + @transient var tmpFile : File = _ + @transient var testJarFile : File = _ before { // Create a sample text file @@ -38,7 +38,7 @@ class FileServerSuite extends FunSuite with BeforeAndAfter { sc.addFile(tmpFile.toString) val testData = Array((1,1), (1,1), (2,1), (3,5), (2,2), (3,0)) val result = sc.parallelize(testData).reduceByKey { - val in = new java.io.BufferedReader(new java.io.FileReader(tmpFile)) + val in = new BufferedReader(new FileReader("FileServerSuite.txt")) val fileVal = in.readLine().toInt in.close() _ * fileVal + _ * fileVal @@ -53,7 +53,9 @@ class FileServerSuite extends FunSuite with BeforeAndAfter { sc.addJar(sampleJarFile) val testData = Array((1,1), (1,1), (2,1), (3,5), (2,3), (3,0)) val result = sc.parallelize(testData).reduceByKey { (x,y) => - val fac = Thread.currentThread.getContextClassLoader().loadClass("org.uncommons.maths.Maths").getDeclaredMethod("factorial", classOf[Int]) + val fac = Thread.currentThread.getContextClassLoader() + .loadClass("org.uncommons.maths.Maths") + .getDeclaredMethod("factorial", classOf[Int]) val a = fac.invoke(null, x.asInstanceOf[java.lang.Integer]).asInstanceOf[Long].toInt val b = fac.invoke(null, y.asInstanceOf[java.lang.Integer]).asInstanceOf[Long].toInt a + b @@ -66,7 +68,7 @@ class FileServerSuite extends FunSuite with BeforeAndAfter { sc.addFile(tmpFile.toString) val testData = Array((1,1), (1,1), (2,1), (3,5), (2,2), (3,0)) val result = sc.parallelize(testData).reduceByKey { - val in = new java.io.BufferedReader(new java.io.FileReader(tmpFile)) + val in = new BufferedReader(new FileReader("FileServerSuite.txt")) val fileVal = in.readLine().toInt in.close() _ * fileVal + _ * fileVal @@ -75,19 +77,19 @@ class FileServerSuite extends FunSuite with BeforeAndAfter { assert(result.toSet === Set((1,200), (2,300), (3,500))) } - test ("Dynamically adding JARS on a standalone cluster") { sc = new SparkContext("local-cluster[1,1,512]", "test") val sampleJarFile = getClass().getClassLoader().getResource("uncommons-maths-1.2.2.jar").getFile() sc.addJar(sampleJarFile) val testData = Array((1,1), (1,1), (2,1), (3,5), (2,3), (3,0)) val result = sc.parallelize(testData).reduceByKey { (x,y) => - val fac = Thread.currentThread.getContextClassLoader().loadClass("org.uncommons.maths.Maths").getDeclaredMethod("factorial", classOf[Int]) + val fac = Thread.currentThread.getContextClassLoader() + .loadClass("org.uncommons.maths.Maths") + .getDeclaredMethod("factorial", classOf[Int]) val a = fac.invoke(null, x.asInstanceOf[java.lang.Integer]).asInstanceOf[Long].toInt val b = fac.invoke(null, y.asInstanceOf[java.lang.Integer]).asInstanceOf[Long].toInt a + b }.collect() assert(result.toSet === Set((1,2), (2,7), (3,121))) } - -}
\ No newline at end of file +} diff --git a/project/SparkBuild.scala b/project/SparkBuild.scala index 203001954a..0247b46de4 100644 --- a/project/SparkBuild.scala +++ b/project/SparkBuild.scala @@ -22,7 +22,7 @@ object SparkBuild extends Build { def sharedSettings = Defaults.defaultSettings ++ Seq( organization := "org.spark-project", version := "0.6.0-SNAPSHOT", - scalaVersion := "2.9.1", + scalaVersion := "2.9.2", scalacOptions := Seq(/*"-deprecation",*/ "-unchecked", "-optimize"), // -deprecation is too noisy due to usage of old Hadoop API, enable it once that's no longer an issue unmanagedJars in Compile <<= baseDirectory map { base => (base / "lib" ** "*.jar").classpath }, retrieveManaged := true, @@ -61,9 +61,9 @@ object SparkBuild extends Build { "asm" % "asm-all" % "3.3.1", "com.google.protobuf" % "protobuf-java" % "2.4.1", "de.javakaffee" % "kryo-serializers" % "0.9", - "com.typesafe.akka" % "akka-actor" % "2.0.2", - "com.typesafe.akka" % "akka-remote" % "2.0.2", - "com.typesafe.akka" % "akka-slf4j" % "2.0.2", + "com.typesafe.akka" % "akka-actor" % "2.0.3", + "com.typesafe.akka" % "akka-remote" % "2.0.3", + "com.typesafe.akka" % "akka-slf4j" % "2.0.3", "it.unimi.dsi" % "fastutil" % "6.4.4", "colt" % "colt" % "1.2.0", "cc.spray" % "spray-can" % "1.0-M2.1", @@ -1,6 +1,6 @@ #!/bin/bash -SCALA_VERSION=2.9.1 +SCALA_VERSION=2.9.2 # Figure out where the Scala framework is installed FWDIR="$(cd `dirname $0`; pwd)" diff --git a/run.cmd b/run.cmd new file mode 100644 index 0000000000..f78a4350e1 --- /dev/null +++ b/run.cmd @@ -0,0 +1,2 @@ +@echo off +cmd /V /E /C call %~dp0run2.cmd %*
\ No newline at end of file diff --git a/run2.cmd b/run2.cmd new file mode 100644 index 0000000000..9fc4d5054b --- /dev/null +++ b/run2.cmd @@ -0,0 +1,68 @@ +@echo off + +set SCALA_VERSION=2.9.1 + +rem Figure out where the Spark framework is installed +set FWDIR=%~dp0 + +rem Export this as SPARK_HOME +set SPARK_HOME=%FWDIR% + +rem Load environment variables from conf\spark-env.cmd, if it exists +if exist "%FWDIR%conf\spark-env.cmd" call "%FWDIR%conf\spark-env.cmd" + +rem Check that SCALA_HOME has been specified +if not "x%SCALA_HOME%"=="x" goto scala_exists + echo "SCALA_HOME is not set" + goto exit +:scala_exists + +rem If the user specifies a Mesos JAR, put it before our included one on the classpath +set MESOS_CLASSPATH= +if not "x%MESOS_JAR%"=="x" set MESOS_CLASSPATH=%MESOS_JAR% + +rem Figure out how much memory to use per executor and set it as an environment +rem variable so that our process sees it and can report it to Mesos +if "x%SPARK_MEM%"=="x" set SPARK_MEM=512m + +rem Set JAVA_OPTS to be able to load native libraries and to set heap size +set JAVA_OPTS=%SPARK_JAVA_OPTS% -Djava.library.path=%SPARK_LIBRARY_PATH% -Xms%SPARK_MEM% -Xmx%SPARK_MEM% +rem Load extra JAVA_OPTS from conf/java-opts, if it exists +if exist "%FWDIR%conf\java-opts.cmd" call "%FWDIR%conf\java-opts.cmd" + +set CORE_DIR=%FWDIR%core +set REPL_DIR=%FWDIR%repl +set EXAMPLES_DIR=%FWDIR%examples +set BAGEL_DIR=%FWDIR%bagel + +rem Build up classpath +set CLASSPATH=%SPARK_CLASSPATH%;%MESOS_CLASSPATH%;%FWDIR%conf;%CORE_DIR%\target\scala-%SCALA_VERSION%\classes +set CLASSPATH=%CLASSPATH%;%CORE_DIR%\target\scala-%SCALA_VERSION%\test-classes;%CORE_DIR%\src\main\resources +set CLASSPATH=%CLASSPATH%;%REPL_DIR%\target\scala-%SCALA_VERSION%\classes;%EXAMPLES_DIR%\target\scala-%SCALA_VERSION%\classes +for /R "%CORE_DIR%\lib" %%j in (*.jar) do set CLASSPATH=!CLASSPATH!;%%j +for /R "%FWDIR%\lib_managed\jars" %%j in (*.jar) do set CLASSPATH=!CLASSPATH!;%%j +for /R "%FWDIR%\lib_managed\bundles" %%j in (*.jar) do set CLASSPATH=!CLASSPATH!;%%j +for /R "%REPL_DIR%\lib" %%j in (*.jar) do set CLASSPATH=!CLASSPATH!;%%j +set CLASSPATH=%CLASSPATH%;%BAGEL_DIR%\target\scala-%SCALA_VERSION%\classes + +rem Figure out whether to run our class with java or with the scala launcher. +rem In most cases, we'd prefer to execute our process with java because scala +rem creates a shell script as the parent of its Java process, which makes it +rem hard to kill the child with stuff like Process.destroy(). However, for +rem the Spark shell, the wrapper is necessary to properly reset the terminal +rem when we exit, so we allow it to set a variable to launch with scala. +if "%SPARK_LAUNCH_WITH_SCALA%" NEQ 1 goto java_runner + set RUNNER=%SCALA_HOME%\bin\scala + # Java options will be passed to scala as JAVA_OPTS + set EXTRA_ARGS= + goto run_spark +:java_runner + set CLASSPATH=%CLASSPATH%;%SCALA_HOME%\lib\scala-library.jar;%SCALA_HOME%\lib\scala-compiler.jar;%SCALA_HOME%\lib\jline.jar + set RUNNER=java + if not "x%JAVA_HOME%"=="x" set RUNNER=%JAVA_HOME%\bin\java + rem The JVM doesn't read JAVA_OPTS by default so we need to pass it in + set EXTRA_ARGS=%JAVA_OPTS% +:run_spark + +%RUNNER% -cp "%CLASSPATH%" %EXTRA_ARGS% %* +:exit
\ No newline at end of file diff --git a/sbt/sbt.cmd b/sbt/sbt.cmd new file mode 100644 index 0000000000..6b289ab447 --- /dev/null +++ b/sbt/sbt.cmd @@ -0,0 +1,5 @@ +@echo off +set EXTRA_ARGS= +if not "%MESOS_HOME%x"=="x" set EXTRA_ARGS=-Djava.library.path=%MESOS_HOME%\lib\java +set SPARK_HOME=%~dp0.. +java -Xmx1200M -XX:MaxPermSize=200m %EXTRA_ARGS% -jar %SPARK_HOME%\sbt\sbt-launch-*.jar "%*" |