From 6798a09df84fb97e196c84d55cf3e21ad676871f Mon Sep 17 00:00:00 2001 From: Mridul Muralidharan Date: Sun, 7 Apr 2013 17:47:38 +0530 Subject: Add support for building against hadoop2-yarn : adding new maven profile for it --- repl/pom.xml | 71 ++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 71 insertions(+) (limited to 'repl') diff --git a/repl/pom.xml b/repl/pom.xml index a3e4606edc..1f885673f4 100644 --- a/repl/pom.xml +++ b/repl/pom.xml @@ -187,5 +187,76 @@ + + hadoop2-yarn + + hadoop2-yarn + + + + org.spark-project + spark-core + ${project.version} + hadoop2-yarn + + + org.spark-project + spark-bagel + ${project.version} + hadoop2-yarn + runtime + + + org.spark-project + spark-examples + ${project.version} + hadoop2-yarn + runtime + + + org.spark-project + spark-streaming + ${project.version} + hadoop2-yarn + runtime + + + org.apache.hadoop + hadoop-client + provided + + + org.apache.hadoop + hadoop-yarn-api + provided + + + org.apache.hadoop + hadoop-yarn-common + provided + + + org.apache.avro + avro + provided + + + org.apache.avro + avro-ipc + provided + + + + + + org.apache.maven.plugins + maven-jar-plugin + + hadoop2-yarn + + + + + -- cgit v1.2.3 From afee9024430ef79cc0840a5e5788b60c8c53f9d2 Mon Sep 17 00:00:00 2001 From: Mridul Muralidharan Date: Sun, 28 Apr 2013 22:26:45 +0530 Subject: Attempt to fix streaming test failures after yarn branch merge --- bagel/src/test/scala/bagel/BagelSuite.scala | 1 + core/src/test/scala/spark/LocalSparkContext.scala | 3 ++- repl/src/test/scala/spark/repl/ReplSuite.scala | 1 + .../main/scala/spark/streaming/Checkpoint.scala | 30 +++++++++++++++++----- .../spark/streaming/util/MasterFailureTest.scala | 8 +++++- .../spark/streaming/BasicOperationsSuite.scala | 1 + .../scala/spark/streaming/CheckpointSuite.scala | 4 ++- .../test/scala/spark/streaming/FailureSuite.scala | 2 ++ .../scala/spark/streaming/InputStreamsSuite.scala | 1 + .../spark/streaming/WindowOperationsSuite.scala | 1 + 10 files changed, 42 insertions(+), 10 deletions(-) (limited to 'repl') diff --git a/bagel/src/test/scala/bagel/BagelSuite.scala b/bagel/src/test/scala/bagel/BagelSuite.scala index 25db395c22..a09c978068 100644 --- a/bagel/src/test/scala/bagel/BagelSuite.scala +++ b/bagel/src/test/scala/bagel/BagelSuite.scala @@ -23,6 +23,7 @@ class BagelSuite extends FunSuite with Assertions with BeforeAndAfter with Timeo } // To avoid Akka rebinding to the same port, since it doesn't unbind immediately on shutdown System.clearProperty("spark.driver.port") + System.clearProperty("spark.hostPort") } test("halting by voting") { diff --git a/core/src/test/scala/spark/LocalSparkContext.scala b/core/src/test/scala/spark/LocalSparkContext.scala index ff00dd05dd..76d5258b02 100644 --- a/core/src/test/scala/spark/LocalSparkContext.scala +++ b/core/src/test/scala/spark/LocalSparkContext.scala @@ -27,6 +27,7 @@ object LocalSparkContext { sc.stop() // To avoid Akka rebinding to the same port, since it doesn't unbind immediately on shutdown System.clearProperty("spark.driver.port") + System.clearProperty("spark.hostPort") } /** Runs `f` by passing in `sc` and ensures that `sc` is stopped. */ @@ -38,4 +39,4 @@ object LocalSparkContext { } } -} \ No newline at end of file +} diff --git a/repl/src/test/scala/spark/repl/ReplSuite.scala b/repl/src/test/scala/spark/repl/ReplSuite.scala index 43559b96d3..1c64f9b98d 100644 --- a/repl/src/test/scala/spark/repl/ReplSuite.scala +++ b/repl/src/test/scala/spark/repl/ReplSuite.scala @@ -32,6 +32,7 @@ class ReplSuite extends FunSuite { interp.sparkContext.stop() // To avoid Akka rebinding to the same port, since it doesn't unbind immediately on shutdown System.clearProperty("spark.driver.port") + System.clearProperty("spark.hostPort") return out.toString } diff --git a/streaming/src/main/scala/spark/streaming/Checkpoint.scala b/streaming/src/main/scala/spark/streaming/Checkpoint.scala index e303e33e5e..7bd104b8d5 100644 --- a/streaming/src/main/scala/spark/streaming/Checkpoint.scala +++ b/streaming/src/main/scala/spark/streaming/Checkpoint.scala @@ -38,28 +38,43 @@ class Checkpoint(@transient ssc: StreamingContext, val checkpointTime: Time) private[streaming] class CheckpointWriter(checkpointDir: String) extends Logging { val file = new Path(checkpointDir, "graph") + // The file to which we actually write - and then "move" to file. + private val writeFile = new Path(file.getParent, file.getName + ".next") + private val bakFile = new Path(file.getParent, file.getName + ".bk") + + @volatile private var stopped = false + val conf = new Configuration() var fs = file.getFileSystem(conf) val maxAttempts = 3 val executor = Executors.newFixedThreadPool(1) + // Removed code which validates whether there is only one CheckpointWriter per path 'file' since + // I did not notice any errors - reintroduce it ? + class CheckpointWriteHandler(checkpointTime: Time, bytes: Array[Byte]) extends Runnable { def run() { var attempts = 0 val startTime = System.currentTimeMillis() while (attempts < maxAttempts) { + if (stopped) { + logInfo("Already stopped, ignore checkpoint attempt for " + file) + return + } attempts += 1 try { logDebug("Saving checkpoint for time " + checkpointTime + " to file '" + file + "'") - if (fs.exists(file)) { - val bkFile = new Path(file.getParent, file.getName + ".bk") - FileUtil.copy(fs, file, fs, bkFile, true, true, conf) - logDebug("Moved existing checkpoint file to " + bkFile) - } - val fos = fs.create(file) + // This is inherently thread unsafe .. so alleviating it by writing to '.new' and then doing moves : which should be pretty fast. + val fos = fs.create(writeFile) fos.write(bytes) fos.close() - fos.close() + if (fs.exists(file) && fs.rename(file, bakFile)) { + logDebug("Moved existing checkpoint file to " + bakFile) + } + // paranoia + fs.delete(file, false) + fs.rename(writeFile, file) + val finishTime = System.currentTimeMillis(); logInfo("Checkpoint for time " + checkpointTime + " saved to file '" + file + "', took " + bytes.length + " bytes and " + (finishTime - startTime) + " milliseconds") @@ -84,6 +99,7 @@ class CheckpointWriter(checkpointDir: String) extends Logging { } def stop() { + stopped = true executor.shutdown() } } diff --git a/streaming/src/main/scala/spark/streaming/util/MasterFailureTest.scala b/streaming/src/main/scala/spark/streaming/util/MasterFailureTest.scala index f673e5be15..e7a3f92bc0 100644 --- a/streaming/src/main/scala/spark/streaming/util/MasterFailureTest.scala +++ b/streaming/src/main/scala/spark/streaming/util/MasterFailureTest.scala @@ -74,6 +74,7 @@ object MasterFailureTest extends Logging { val operation = (st: DStream[String]) => { val updateFunc = (values: Seq[Long], state: Option[Long]) => { + logInfo("UpdateFunc .. state = " + state.getOrElse(0L) + ", values = " + values) Some(values.foldLeft(0L)(_ + _) + state.getOrElse(0L)) } st.flatMap(_.split(" ")) @@ -159,6 +160,7 @@ object MasterFailureTest extends Logging { // Setup the streaming computation with the given operation System.clearProperty("spark.driver.port") + System.clearProperty("spark.hostPort") var ssc = new StreamingContext("local[4]", "MasterFailureTest", batchDuration, null, Nil, Map()) ssc.checkpoint(checkpointDir.toString) val inputStream = ssc.textFileStream(testDir.toString) @@ -205,6 +207,7 @@ object MasterFailureTest extends Logging { // (iii) Its not timed out yet System.clearProperty("spark.streaming.clock") System.clearProperty("spark.driver.port") + System.clearProperty("spark.hostPort") ssc.start() val startTime = System.currentTimeMillis() while (!killed && !isLastOutputGenerated && !isTimedOut) { @@ -357,13 +360,16 @@ class FileGeneratingThread(input: Seq[String], testDir: Path, interval: Long) // Write the data to a local file and then move it to the target test directory val localFile = new File(localTestDir, (i+1).toString) val hadoopFile = new Path(testDir, (i+1).toString) + val tempHadoopFile = new Path(testDir, ".tmp_" + (i+1).toString) FileUtils.writeStringToFile(localFile, input(i).toString + "\n") var tries = 0 var done = false while (!done && tries < maxTries) { tries += 1 try { - fs.copyFromLocalFile(new Path(localFile.toString), hadoopFile) + // fs.copyFromLocalFile(new Path(localFile.toString), hadoopFile) + fs.copyFromLocalFile(new Path(localFile.toString), tempHadoopFile) + fs.rename(tempHadoopFile, hadoopFile) done = true } catch { case ioe: IOException => { diff --git a/streaming/src/test/scala/spark/streaming/BasicOperationsSuite.scala b/streaming/src/test/scala/spark/streaming/BasicOperationsSuite.scala index cf2ed8b1d4..e7352deb81 100644 --- a/streaming/src/test/scala/spark/streaming/BasicOperationsSuite.scala +++ b/streaming/src/test/scala/spark/streaming/BasicOperationsSuite.scala @@ -15,6 +15,7 @@ class BasicOperationsSuite extends TestSuiteBase { after { // To avoid Akka rebinding to the same port, since it doesn't unbind immediately on shutdown System.clearProperty("spark.driver.port") + System.clearProperty("spark.hostPort") } test("map") { diff --git a/streaming/src/test/scala/spark/streaming/CheckpointSuite.scala b/streaming/src/test/scala/spark/streaming/CheckpointSuite.scala index cac86deeaf..607dea77ec 100644 --- a/streaming/src/test/scala/spark/streaming/CheckpointSuite.scala +++ b/streaming/src/test/scala/spark/streaming/CheckpointSuite.scala @@ -31,6 +31,7 @@ class CheckpointSuite extends TestSuiteBase with BeforeAndAfter { // To avoid Akka rebinding to the same port, since it doesn't unbind immediately on shutdown System.clearProperty("spark.driver.port") + System.clearProperty("spark.hostPort") } var ssc: StreamingContext = null @@ -325,6 +326,7 @@ class CheckpointSuite extends TestSuiteBase with BeforeAndAfter { ) ssc = new StreamingContext(checkpointDir) System.clearProperty("spark.driver.port") + System.clearProperty("spark.hostPort") ssc.start() val outputNew = advanceTimeWithRealDelay[V](ssc, nextNumBatches) // the first element will be re-processed data of the last batch before restart @@ -350,4 +352,4 @@ class CheckpointSuite extends TestSuiteBase with BeforeAndAfter { val outputStream = ssc.graph.getOutputStreams.head.asInstanceOf[TestOutputStream[V]] outputStream.output } -} \ No newline at end of file +} diff --git a/streaming/src/test/scala/spark/streaming/FailureSuite.scala b/streaming/src/test/scala/spark/streaming/FailureSuite.scala index a5fa7ab92d..4529e774e9 100644 --- a/streaming/src/test/scala/spark/streaming/FailureSuite.scala +++ b/streaming/src/test/scala/spark/streaming/FailureSuite.scala @@ -22,10 +22,12 @@ class FailureSuite extends FunSuite with BeforeAndAfter with Logging { val batchDuration = Milliseconds(1000) before { + logInfo("BEFORE ...") FileUtils.deleteDirectory(new File(directory)) } after { + logInfo("AFTER ...") FileUtils.deleteDirectory(new File(directory)) } diff --git a/streaming/src/test/scala/spark/streaming/InputStreamsSuite.scala b/streaming/src/test/scala/spark/streaming/InputStreamsSuite.scala index 67dca2ac31..0acb6db6f2 100644 --- a/streaming/src/test/scala/spark/streaming/InputStreamsSuite.scala +++ b/streaming/src/test/scala/spark/streaming/InputStreamsSuite.scala @@ -41,6 +41,7 @@ class InputStreamsSuite extends TestSuiteBase with BeforeAndAfter { after { // To avoid Akka rebinding to the same port, since it doesn't unbind immediately on shutdown System.clearProperty("spark.driver.port") + System.clearProperty("spark.hostPort") } diff --git a/streaming/src/test/scala/spark/streaming/WindowOperationsSuite.scala b/streaming/src/test/scala/spark/streaming/WindowOperationsSuite.scala index 1b66f3bda2..80d827706f 100644 --- a/streaming/src/test/scala/spark/streaming/WindowOperationsSuite.scala +++ b/streaming/src/test/scala/spark/streaming/WindowOperationsSuite.scala @@ -16,6 +16,7 @@ class WindowOperationsSuite extends TestSuiteBase { after { // To avoid Akka rebinding to the same port, since it doesn't unbind immediately on shutdown System.clearProperty("spark.driver.port") + System.clearProperty("spark.hostPort") } val largerSlideInput = Seq( -- cgit v1.2.3 From 207afe4088219a0c7350b3f80eb60e86c97e140f Mon Sep 17 00:00:00 2001 From: Jey Kottalam Date: Thu, 18 Apr 2013 12:08:11 -0700 Subject: Remove spark-repl's extraneous dependency on spark-streaming --- project/SparkBuild.scala | 2 +- repl/pom.xml | 14 -------------- 2 files changed, 1 insertion(+), 15 deletions(-) (limited to 'repl') diff --git a/project/SparkBuild.scala b/project/SparkBuild.scala index f2410085d8..190d723435 100644 --- a/project/SparkBuild.scala +++ b/project/SparkBuild.scala @@ -29,7 +29,7 @@ object SparkBuild extends Build { lazy val core = Project("core", file("core"), settings = coreSettings) - lazy val repl = Project("repl", file("repl"), settings = replSettings) dependsOn (core) dependsOn (streaming) + lazy val repl = Project("repl", file("repl"), settings = replSettings) dependsOn (core) lazy val examples = Project("examples", file("examples"), settings = examplesSettings) dependsOn (core) dependsOn (streaming) diff --git a/repl/pom.xml b/repl/pom.xml index 038da5d988..92a2020b48 100644 --- a/repl/pom.xml +++ b/repl/pom.xml @@ -96,13 +96,6 @@ hadoop1 runtime - - org.spark-project - spark-streaming - ${project.version} - hadoop1 - runtime - org.apache.hadoop hadoop-core @@ -147,13 +140,6 @@ hadoop2 runtime - - org.spark-project - spark-streaming - ${project.version} - hadoop2 - runtime - org.apache.hadoop hadoop-core -- cgit v1.2.3 From 7902baddc797f86f5bdbcc966f5cd60545638bf7 Mon Sep 17 00:00:00 2001 From: Matei Zaharia Date: Wed, 19 Jun 2013 13:34:30 +0200 Subject: Update ASM to version 4.0 --- core/pom.xml | 4 ++-- core/src/main/scala/spark/ClosureCleaner.scala | 11 +++++------ pom.xml | 6 +++--- project/SparkBuild.scala | 2 +- repl/src/main/scala/spark/repl/ExecutorClassLoader.scala | 3 +-- 5 files changed, 12 insertions(+), 14 deletions(-) (limited to 'repl') diff --git a/core/pom.xml b/core/pom.xml index d8687bf991..88f0ed70f3 100644 --- a/core/pom.xml +++ b/core/pom.xml @@ -32,8 +32,8 @@ compress-lzf - asm - asm-all + org.ow2.asm + asm com.google.protobuf diff --git a/core/src/main/scala/spark/ClosureCleaner.scala b/core/src/main/scala/spark/ClosureCleaner.scala index 50d6a1c5c9..d5e7132ff9 100644 --- a/core/src/main/scala/spark/ClosureCleaner.scala +++ b/core/src/main/scala/spark/ClosureCleaner.scala @@ -5,8 +5,7 @@ import java.lang.reflect.Field import scala.collection.mutable.Map import scala.collection.mutable.Set -import org.objectweb.asm.{ClassReader, MethodVisitor, Type} -import org.objectweb.asm.commons.EmptyVisitor +import org.objectweb.asm.{ClassReader, ClassVisitor, MethodVisitor, Type} import org.objectweb.asm.Opcodes._ import java.io.{InputStream, IOException, ByteArrayOutputStream, ByteArrayInputStream, BufferedInputStream} @@ -162,10 +161,10 @@ private[spark] object ClosureCleaner extends Logging { } } -private[spark] class FieldAccessFinder(output: Map[Class[_], Set[String]]) extends EmptyVisitor { +private[spark] class FieldAccessFinder(output: Map[Class[_], Set[String]]) extends ClassVisitor(ASM4) { override def visitMethod(access: Int, name: String, desc: String, sig: String, exceptions: Array[String]): MethodVisitor = { - return new EmptyVisitor { + return new MethodVisitor(ASM4) { override def visitFieldInsn(op: Int, owner: String, name: String, desc: String) { if (op == GETFIELD) { for (cl <- output.keys if cl.getName == owner.replace('/', '.')) { @@ -188,7 +187,7 @@ private[spark] class FieldAccessFinder(output: Map[Class[_], Set[String]]) exten } } -private[spark] class InnerClosureFinder(output: Set[Class[_]]) extends EmptyVisitor { +private[spark] class InnerClosureFinder(output: Set[Class[_]]) extends ClassVisitor(ASM4) { var myName: String = null override def visit(version: Int, access: Int, name: String, sig: String, @@ -198,7 +197,7 @@ private[spark] class InnerClosureFinder(output: Set[Class[_]]) extends EmptyVisi override def visitMethod(access: Int, name: String, desc: String, sig: String, exceptions: Array[String]): MethodVisitor = { - return new EmptyVisitor { + return new MethodVisitor(ASM4) { override def visitMethodInsn(op: Int, owner: String, name: String, desc: String) { val argTypes = Type.getArgumentTypes(desc) diff --git a/pom.xml b/pom.xml index c893ec755e..3bcb2a3f34 100644 --- a/pom.xml +++ b/pom.xml @@ -190,9 +190,9 @@ 0.8.4 - asm - asm-all - 3.3.1 + org.ow2.asm + asm + 4.0 com.google.protobuf diff --git a/project/SparkBuild.scala b/project/SparkBuild.scala index 824af821f9..b1f3f9a2ea 100644 --- a/project/SparkBuild.scala +++ b/project/SparkBuild.scala @@ -148,7 +148,7 @@ object SparkBuild extends Build { "org.slf4j" % "slf4j-log4j12" % slf4jVersion, "commons-daemon" % "commons-daemon" % "1.0.10", "com.ning" % "compress-lzf" % "0.8.4", - "asm" % "asm-all" % "3.3.1", + "org.ow2.asm" % "asm" % "4.0", "com.google.protobuf" % "protobuf-java" % "2.4.1", "de.javakaffee" % "kryo-serializers" % "0.22", "com.typesafe.akka" % "akka-actor" % "2.0.3" excludeAll(excludeNetty), diff --git a/repl/src/main/scala/spark/repl/ExecutorClassLoader.scala b/repl/src/main/scala/spark/repl/ExecutorClassLoader.scala index 13d81ec1cf..0e9aa863b5 100644 --- a/repl/src/main/scala/spark/repl/ExecutorClassLoader.scala +++ b/repl/src/main/scala/spark/repl/ExecutorClassLoader.scala @@ -8,7 +8,6 @@ import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs.{FileSystem, Path} import org.objectweb.asm._ -import org.objectweb.asm.commons.EmptyVisitor import org.objectweb.asm.Opcodes._ @@ -83,7 +82,7 @@ extends ClassLoader(parent) { } class ConstructorCleaner(className: String, cv: ClassVisitor) -extends ClassAdapter(cv) { +extends ClassVisitor(ASM4, cv) { override def visitMethod(access: Int, name: String, desc: String, sig: String, exceptions: Array[String]): MethodVisitor = { val mv = cv.visitMethod(access, name, desc, sig, exceptions) -- cgit v1.2.3 From b5df1cd668e45fd0cc22c1666136d05548cae3e9 Mon Sep 17 00:00:00 2001 From: Matei Zaharia Date: Sat, 22 Jun 2013 17:12:39 -0700 Subject: ADD_JARS environment variable for spark-shell --- docs/scala-programming-guide.md | 10 ++++++++-- repl/src/main/scala/spark/repl/SparkILoop.scala | 9 +++++++-- 2 files changed, 15 insertions(+), 4 deletions(-) (limited to 'repl') diff --git a/docs/scala-programming-guide.md b/docs/scala-programming-guide.md index b0da130fcb..e9cf9ef36f 100644 --- a/docs/scala-programming-guide.md +++ b/docs/scala-programming-guide.md @@ -43,12 +43,18 @@ new SparkContext(master, appName, [sparkHome], [jars]) The `master` parameter is a string specifying a [Spark or Mesos cluster URL](#master-urls) to connect to, or a special "local" string to run in local mode, as described below. `appName` is a name for your application, which will be shown in the cluster web UI. Finally, the last two parameters are needed to deploy your code to a cluster if running in distributed mode, as described later. -In the Spark shell, a special interpreter-aware SparkContext is already created for you, in the variable called `sc`. Making your own SparkContext will not work. You can set which master the context connects to using the `MASTER` environment variable. For example, to run on four cores, use +In the Spark shell, a special interpreter-aware SparkContext is already created for you, in the variable called `sc`. Making your own SparkContext will not work. You can set which master the context connects to using the `MASTER` environment variable, and you can add JARs to the classpath with the `ADD_JARS` variable. For example, to run `spark-shell` on four cores, use {% highlight bash %} $ MASTER=local[4] ./spark-shell {% endhighlight %} +Or, to also add `code.jar` to its classpath, use: + +{% highlight bash %} +$ MASTER=local[4] ADD_JARS=code.jar ./spark-shell +{% endhighlight %} + ### Master URLs The master URL passed to Spark can be in one of the following formats: @@ -78,7 +84,7 @@ If you want to run your job on a cluster, you will need to specify the two optio * `sparkHome`: The path at which Spark is installed on your worker machines (it should be the same on all of them). * `jars`: A list of JAR files on the local machine containing your job's code and any dependencies, which Spark will deploy to all the worker nodes. You'll need to package your job into a set of JARs using your build system. For example, if you're using SBT, the [sbt-assembly](https://github.com/sbt/sbt-assembly) plugin is a good way to make a single JAR with your code and dependencies. -If you run `spark-shell` on a cluster, any classes you define in the shell will automatically be distributed. +If you run `spark-shell` on a cluster, you can add JARs to it by specifying the `ADD_JARS` environment variable before you launch it. This variable should contain a comma-separated list of JARs. For example, `ADD_JARS=a.jar,b.jar ./spark-shell` will launch a shell with `a.jar` and `b.jar` on its classpath. In addition, any new classes you define in the shell will automatically be distributed. # Resilient Distributed Datasets (RDDs) diff --git a/repl/src/main/scala/spark/repl/SparkILoop.scala b/repl/src/main/scala/spark/repl/SparkILoop.scala index 23556dbc8f..86eed090d0 100644 --- a/repl/src/main/scala/spark/repl/SparkILoop.scala +++ b/repl/src/main/scala/spark/repl/SparkILoop.scala @@ -822,7 +822,7 @@ class SparkILoop(in0: Option[BufferedReader], val out: PrintWriter, val master: spark.repl.Main.interp.out.println("Spark context available as sc."); spark.repl.Main.interp.out.flush(); """) - command("import spark.SparkContext._"); + command("import spark.SparkContext._") } echo("Type in expressions to have them evaluated.") echo("Type :help for more information.") @@ -838,7 +838,8 @@ class SparkILoop(in0: Option[BufferedReader], val out: PrintWriter, val master: if (prop != null) prop else "local" } } - sparkContext = new SparkContext(master, "Spark shell") + val jars = Option(System.getenv("ADD_JARS")).map(_.split(',')).getOrElse(new Array[String](0)) + sparkContext = new SparkContext(master, "Spark shell", System.getenv("SPARK_HOME"), jars) sparkContext } @@ -850,6 +851,10 @@ class SparkILoop(in0: Option[BufferedReader], val out: PrintWriter, val master: printWelcome() echo("Initializing interpreter...") + // Add JARS specified in Spark's ADD_JARS variable to classpath + val jars = Option(System.getenv("ADD_JARS")).map(_.split(',')).getOrElse(new Array[String](0)) + jars.foreach(settings.classpath.append(_)) + this.settings = settings createInterpreter() -- cgit v1.2.3 From 0e0f9d3069039f03bbf5eefe3b0637c89fddf0f1 Mon Sep 17 00:00:00 2001 From: Matei Zaharia Date: Sat, 22 Jun 2013 17:44:04 -0700 Subject: Fix search path for REPL class loader to really find added JARs --- core/src/main/scala/spark/executor/Executor.scala | 38 +++++++++++++---------- repl/src/main/scala/spark/repl/SparkILoop.scala | 4 ++- 2 files changed, 25 insertions(+), 17 deletions(-) (limited to 'repl') diff --git a/core/src/main/scala/spark/executor/Executor.scala b/core/src/main/scala/spark/executor/Executor.scala index 8bebfafce4..2bf55ea9a9 100644 --- a/core/src/main/scala/spark/executor/Executor.scala +++ b/core/src/main/scala/spark/executor/Executor.scala @@ -42,7 +42,8 @@ private[spark] class Executor(executorId: String, slaveHostname: String, propert // Create our ClassLoader and set it on this thread private val urlClassLoader = createClassLoader() - Thread.currentThread.setContextClassLoader(urlClassLoader) + private val replClassLoader = addReplClassLoaderIfNeeded(urlClassLoader) + Thread.currentThread.setContextClassLoader(replClassLoader) // Make any thread terminations due to uncaught exceptions kill the entire // executor process to avoid surprising stalls. @@ -88,7 +89,7 @@ private[spark] class Executor(executorId: String, slaveHostname: String, propert override def run() { val startTime = System.currentTimeMillis() SparkEnv.set(env) - Thread.currentThread.setContextClassLoader(urlClassLoader) + Thread.currentThread.setContextClassLoader(replClassLoader) val ser = SparkEnv.get.closureSerializer.newInstance() logInfo("Running task ID " + taskId) context.statusUpdate(taskId, TaskState.RUNNING, EMPTY_BYTE_BUFFER) @@ -153,26 +154,31 @@ private[spark] class Executor(executorId: String, slaveHostname: String, propert val urls = currentJars.keySet.map { uri => new File(uri.split("/").last).toURI.toURL }.toArray - loader = new URLClassLoader(urls, loader) + new ExecutorURLClassLoader(urls, loader) + } - // If the REPL is in use, add another ClassLoader that will read - // new classes defined by the REPL as the user types code + /** + * If the REPL is in use, add another ClassLoader that will read + * new classes defined by the REPL as the user types code + */ + private def addReplClassLoaderIfNeeded(parent: ClassLoader): ClassLoader = { val classUri = System.getProperty("spark.repl.class.uri") if (classUri != null) { logInfo("Using REPL class URI: " + classUri) - loader = { - try { - val klass = Class.forName("spark.repl.ExecutorClassLoader") - .asInstanceOf[Class[_ <: ClassLoader]] - val constructor = klass.getConstructor(classOf[String], classOf[ClassLoader]) - constructor.newInstance(classUri, loader) - } catch { - case _: ClassNotFoundException => loader - } + try { + val klass = Class.forName("spark.repl.ExecutorClassLoader") + .asInstanceOf[Class[_ <: ClassLoader]] + val constructor = klass.getConstructor(classOf[String], classOf[ClassLoader]) + return constructor.newInstance(classUri, parent) + } catch { + case _: ClassNotFoundException => + logError("Could not find spark.repl.ExecutorClassLoader on classpath!") + System.exit(1) + null } + } else { + return parent } - - return new ExecutorURLClassLoader(Array(), loader) } /** diff --git a/repl/src/main/scala/spark/repl/SparkILoop.scala b/repl/src/main/scala/spark/repl/SparkILoop.scala index 86eed090d0..59f9d05683 100644 --- a/repl/src/main/scala/spark/repl/SparkILoop.scala +++ b/repl/src/main/scala/spark/repl/SparkILoop.scala @@ -838,7 +838,9 @@ class SparkILoop(in0: Option[BufferedReader], val out: PrintWriter, val master: if (prop != null) prop else "local" } } - val jars = Option(System.getenv("ADD_JARS")).map(_.split(',')).getOrElse(new Array[String](0)) + val jars = Option(System.getenv("ADD_JARS")).map(_.split(',')) + .getOrElse(new Array[String](0)) + .map(new java.io.File(_).getAbsolutePath) sparkContext = new SparkContext(master, "Spark shell", System.getenv("SPARK_HOME"), jars) sparkContext } -- cgit v1.2.3 From f2263350eda780aba45f383b722e20702c310e6a Mon Sep 17 00:00:00 2001 From: Matei Zaharia Date: Tue, 25 Jun 2013 18:35:35 -0400 Subject: Added a local-cluster mode test to ReplSuite --- repl/src/test/scala/spark/repl/ReplSuite.scala | 31 +++++++++++++++++++++----- 1 file changed, 26 insertions(+), 5 deletions(-) (limited to 'repl') diff --git a/repl/src/test/scala/spark/repl/ReplSuite.scala b/repl/src/test/scala/spark/repl/ReplSuite.scala index 1c64f9b98d..72ed8aca5b 100644 --- a/repl/src/test/scala/spark/repl/ReplSuite.scala +++ b/repl/src/test/scala/spark/repl/ReplSuite.scala @@ -35,17 +35,17 @@ class ReplSuite extends FunSuite { System.clearProperty("spark.hostPort") return out.toString } - + def assertContains(message: String, output: String) { assert(output contains message, "Interpreter output did not contain '" + message + "':\n" + output) } - + def assertDoesNotContain(message: String, output: String) { assert(!(output contains message), "Interpreter output contained '" + message + "':\n" + output) } - + test ("simple foreach with accumulator") { val output = runInterpreter("local", """ val accum = sc.accumulator(0) @@ -56,7 +56,7 @@ class ReplSuite extends FunSuite { assertDoesNotContain("Exception", output) assertContains("res1: Int = 55", output) } - + test ("external vars") { val output = runInterpreter("local", """ var v = 7 @@ -105,7 +105,7 @@ class ReplSuite extends FunSuite { assertContains("res0: Int = 70", output) assertContains("res1: Int = 100", output) } - + test ("broadcast vars") { // Test that the value that a broadcast var had when it was created is used, // even if that variable is then modified in the driver program @@ -143,6 +143,27 @@ class ReplSuite extends FunSuite { assertContains("res2: Long = 3", output) } + test ("local-cluster mode") { + val output = runInterpreter("local-cluster[1,1,512]", """ + var v = 7 + def getV() = v + sc.parallelize(1 to 10).map(x => getV()).collect.reduceLeft(_+_) + v = 10 + sc.parallelize(1 to 10).map(x => getV()).collect.reduceLeft(_+_) + var array = new Array[Int](5) + val broadcastArray = sc.broadcast(array) + sc.parallelize(0 to 4).map(x => broadcastArray.value(x)).collect + array(0) = 5 + sc.parallelize(0 to 4).map(x => broadcastArray.value(x)).collect + """) + assertDoesNotContain("error:", output) + assertDoesNotContain("Exception", output) + assertContains("res0: Int = 70", output) + assertContains("res1: Int = 100", output) + assertContains("res2: Array[Int] = Array(0, 0, 0, 0, 0)", output) + assertContains("res4: Array[Int] = Array(0, 0, 0, 0, 0)", output) + } + if (System.getenv("MESOS_NATIVE_LIBRARY") != null) { test ("running on Mesos") { val output = runInterpreter("localquiet", """ -- cgit v1.2.3 From 2bd04c3513ffa6deabc290a3931be946b1c18713 Mon Sep 17 00:00:00 2001 From: Matei Zaharia Date: Tue, 25 Jun 2013 18:37:14 -0400 Subject: Formatting --- repl/src/test/scala/spark/repl/ReplSuite.scala | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) (limited to 'repl') diff --git a/repl/src/test/scala/spark/repl/ReplSuite.scala b/repl/src/test/scala/spark/repl/ReplSuite.scala index 72ed8aca5b..f46e6d8be4 100644 --- a/repl/src/test/scala/spark/repl/ReplSuite.scala +++ b/repl/src/test/scala/spark/repl/ReplSuite.scala @@ -28,8 +28,9 @@ class ReplSuite extends FunSuite { val separator = System.getProperty("path.separator") interp.process(Array("-classpath", paths.mkString(separator))) spark.repl.Main.interp = null - if (interp.sparkContext != null) + if (interp.sparkContext != null) { interp.sparkContext.stop() + } // To avoid Akka rebinding to the same port, since it doesn't unbind immediately on shutdown System.clearProperty("spark.driver.port") System.clearProperty("spark.hostPort") @@ -37,12 +38,12 @@ class ReplSuite extends FunSuite { } def assertContains(message: String, output: String) { - assert(output contains message, + assert(output.contains(message), "Interpreter output did not contain '" + message + "':\n" + output) } def assertDoesNotContain(message: String, output: String) { - assert(!(output contains message), + assert(!output.contains(message), "Interpreter output contained '" + message + "':\n" + output) } -- cgit v1.2.3